diff --git a/src/usimp.ts b/src/usimp.ts index b439afa..6444fda 100644 --- a/src/usimp.ts +++ b/src/usimp.ts @@ -21,8 +21,9 @@ interface DomainServerJson { interface OutputEnvelopeJson { status: string, - request_nr: number, + requestNr: number, data: any, + action: string | null | undefined, error: { code: string, message: string | null, @@ -155,8 +156,14 @@ export class Session { httpBaseUrl: string | null; websocket: WebSocket | null; numRequests: number; - subscriptions: string[]; - subscriptionEndpoints: {cb: (a: EventJson) => void}[]; + subscriptions: { + callback: (a: EventJson) => void, + requestNr: number | undefined, + abortController: AbortController | undefined, + }[]; + subscriptionEndpoints: { + callback: (a: EventJson) => void, + }[]; constructor(domain: Domain) { this.domain = domain; @@ -169,7 +176,8 @@ export class Session { this.subscriptionEndpoints = []; } - close() { + async close(keepEndpoints: boolean = false) { + await this.unsubscribeAll(keepEndpoints); if (this.websocket && (this.websocket.readyState !== WebSocket.CLOSING && this.websocket.readyState !== WebSocket.CLOSED)) { this.websocket.close(); this.subscriptions = []; @@ -178,15 +186,15 @@ export class Session { } } - sleep() { - this.close(); + async sleep() { + await this.close(true); } async wakeup() { await this.chooseDomainServer(); await this.ping(); for (const endpoint of this.subscriptionEndpoints) { - await this._subscribe(endpoint.cb); + await this._subscribe(endpoint.callback); } } @@ -283,7 +291,14 @@ export class Session { }); } - async send(endpoint: string, data: any, timeout: number = 2000, forceHttp: boolean = false, cb: ((a: OutputEnvelopeJson) => void) | null = null) { + async send( + endpoint: string, + data: any, + timeout: number = 2000, + forceHttp: boolean = false, + abortController: AbortController | undefined = undefined, + cb: ((a: OutputEnvelopeJson) => void) | undefined = undefined, + ) { this.numRequests++; while (true) { try { @@ -294,7 +309,7 @@ export class Session { await this.waitForWebSocket(timeout); let response; - if (cb === null) { + if (cb === undefined) { response = this.waitForWebSocketResponse(req_nr, timeout); } else { this.websocket.addEventListener("message", (msg) => { @@ -322,7 +337,7 @@ export class Session { return await this.waitForWebSocketResponse(req_nr, timeout); } } else if (this.httpBaseUrl) { - const controller = new AbortController(); + const controller = abortController || new AbortController(); const timer = setTimeout(() => controller.abort(), timeout); let headers: Record = { @@ -401,15 +416,15 @@ export class Session { } async subscribe(cb: (a: EventJson) => void) { - this.subscriptionEndpoints.push({cb: cb}); + this.subscriptionEndpoints.push({callback: cb}); await this._subscribe(cb); } private async _subscribe(cb: (a: EventJson) => void) { this.numRequests++; if (this.websocket !== null) { - const subscription = await this.send('subscribe', {}, 60_000, false, (res) => { - if (res.data && res.data.events) { + const subscription = await this.send('subscribe', {}, 60_000, false, undefined, (res) => { + if (res.action === 'push') { for (const event of res.data.events) { cb(event); } @@ -418,7 +433,10 @@ export class Session { this.subscriptions.push(subscription); return subscription; } else { - this.send('subscribe', {}, 60_000).then((res) => { + const controller = new AbortController(); + this.subscriptions.push({callback: cb, requestNr: undefined, abortController: controller}); + this.send('subscribe', {}, 60_000, false, controller).then((res) => { + this.subscriptions = []; if (res.status === "success") { this._subscribe(cb); cb(res); @@ -428,11 +446,28 @@ export class Session { }, 1000); } }).catch(() => { + this.subscriptions = []; setTimeout(() => { this._subscribe(cb); }, 1000); }); - return null; // TODO subscription id + return null; + } + } + + async unsubscribeAll(keepEndpoints: boolean = false) { + for (const sub of this.subscriptions) { + await this.unsubscribe(sub.requestNr, sub.abortController); + } + this.subscriptions = []; + if (!keepEndpoints) this.subscriptionEndpoints = []; + } + + private async unsubscribe(requestNr: number | undefined = undefined, abortController: AbortController | undefined = undefined) { + if (this.websocket !== null) { + await this.send('unsubscribe', {'request_nr': requestNr}) + } else { + if (abortController) abortController.abort('unsubscribe'); } } }