Fix unsubscribtion

This commit is contained in:
2022-08-29 00:09:05 +02:00
parent 7eb6d6a75f
commit e8fe008e99

View File

@ -21,8 +21,9 @@ interface DomainServerJson {
interface OutputEnvelopeJson { interface OutputEnvelopeJson {
status: string, status: string,
request_nr: number, requestNr: number,
data: any, data: any,
action: string | null | undefined,
error: { error: {
code: string, code: string,
message: string | null, message: string | null,
@ -155,8 +156,14 @@ export class Session {
httpBaseUrl: string | null; httpBaseUrl: string | null;
websocket: WebSocket | null; websocket: WebSocket | null;
numRequests: number; numRequests: number;
subscriptions: string[]; subscriptions: {
subscriptionEndpoints: {cb: (a: EventJson) => void}[]; callback: (a: EventJson) => void,
requestNr: number | undefined,
abortController: AbortController | undefined,
}[];
subscriptionEndpoints: {
callback: (a: EventJson) => void,
}[];
constructor(domain: Domain) { constructor(domain: Domain) {
this.domain = domain; this.domain = domain;
@ -169,7 +176,8 @@ export class Session {
this.subscriptionEndpoints = []; this.subscriptionEndpoints = [];
} }
close() { async close(keepEndpoints: boolean = false) {
await this.unsubscribeAll(keepEndpoints);
if (this.websocket && (this.websocket.readyState !== WebSocket.CLOSING && this.websocket.readyState !== WebSocket.CLOSED)) { if (this.websocket && (this.websocket.readyState !== WebSocket.CLOSING && this.websocket.readyState !== WebSocket.CLOSED)) {
this.websocket.close(); this.websocket.close();
this.subscriptions = []; this.subscriptions = [];
@ -178,15 +186,15 @@ export class Session {
} }
} }
sleep() { async sleep() {
this.close(); await this.close(true);
} }
async wakeup() { async wakeup() {
await this.chooseDomainServer(); await this.chooseDomainServer();
await this.ping(); await this.ping();
for (const endpoint of this.subscriptionEndpoints) { for (const endpoint of this.subscriptionEndpoints) {
await this._subscribe(endpoint.cb); await this._subscribe(endpoint.callback);
} }
} }
@ -283,7 +291,14 @@ export class Session {
}); });
} }
async send(endpoint: string, data: any, timeout: number = 2000, forceHttp: boolean = false, cb: ((a: OutputEnvelopeJson) => void) | null = null) { async send(
endpoint: string,
data: any,
timeout: number = 2000,
forceHttp: boolean = false,
abortController: AbortController | undefined = undefined,
cb: ((a: OutputEnvelopeJson) => void) | undefined = undefined,
) {
this.numRequests++; this.numRequests++;
while (true) { while (true) {
try { try {
@ -294,7 +309,7 @@ export class Session {
await this.waitForWebSocket(timeout); await this.waitForWebSocket(timeout);
let response; let response;
if (cb === null) { if (cb === undefined) {
response = this.waitForWebSocketResponse(req_nr, timeout); response = this.waitForWebSocketResponse(req_nr, timeout);
} else { } else {
this.websocket.addEventListener("message", (msg) => { this.websocket.addEventListener("message", (msg) => {
@ -322,7 +337,7 @@ export class Session {
return await this.waitForWebSocketResponse(req_nr, timeout); return await this.waitForWebSocketResponse(req_nr, timeout);
} }
} else if (this.httpBaseUrl) { } else if (this.httpBaseUrl) {
const controller = new AbortController(); const controller = abortController || new AbortController();
const timer = setTimeout(() => controller.abort(), timeout); const timer = setTimeout(() => controller.abort(), timeout);
let headers: Record<string, string> = { let headers: Record<string, string> = {
@ -401,15 +416,15 @@ export class Session {
} }
async subscribe(cb: (a: EventJson) => void) { async subscribe(cb: (a: EventJson) => void) {
this.subscriptionEndpoints.push({cb: cb}); this.subscriptionEndpoints.push({callback: cb});
await this._subscribe(cb); await this._subscribe(cb);
} }
private async _subscribe(cb: (a: EventJson) => void) { private async _subscribe(cb: (a: EventJson) => void) {
this.numRequests++; this.numRequests++;
if (this.websocket !== null) { if (this.websocket !== null) {
const subscription = await this.send('subscribe', {}, 60_000, false, (res) => { const subscription = await this.send('subscribe', {}, 60_000, false, undefined, (res) => {
if (res.data && res.data.events) { if (res.action === 'push') {
for (const event of res.data.events) { for (const event of res.data.events) {
cb(event); cb(event);
} }
@ -418,7 +433,10 @@ export class Session {
this.subscriptions.push(subscription); this.subscriptions.push(subscription);
return subscription; return subscription;
} else { } else {
this.send('subscribe', {}, 60_000).then((res) => { const controller = new AbortController();
this.subscriptions.push({callback: cb, requestNr: undefined, abortController: controller});
this.send('subscribe', {}, 60_000, false, controller).then((res) => {
this.subscriptions = [];
if (res.status === "success") { if (res.status === "success") {
this._subscribe(cb); this._subscribe(cb);
cb(res); cb(res);
@ -428,11 +446,28 @@ export class Session {
}, 1000); }, 1000);
} }
}).catch(() => { }).catch(() => {
this.subscriptions = [];
setTimeout(() => { setTimeout(() => {
this._subscribe(cb); this._subscribe(cb);
}, 1000); }, 1000);
}); });
return null; // TODO subscription id return null;
}
}
async unsubscribeAll(keepEndpoints: boolean = false) {
for (const sub of this.subscriptions) {
await this.unsubscribe(sub.requestNr, sub.abortController);
}
this.subscriptions = [];
if (!keepEndpoints) this.subscriptionEndpoints = [];
}
private async unsubscribe(requestNr: number | undefined = undefined, abortController: AbortController | undefined = undefined) {
if (this.websocket !== null) {
await this.send('unsubscribe', {'request_nr': requestNr})
} else {
if (abortController) abortController.abort('unsubscribe');
} }
} }
} }