From 53214d5aa49fe4a4c85c9a6ebc0941de476d96c5 Mon Sep 17 00:00:00 2001 From: crupest Date: Sun, 9 Aug 2020 17:40:46 +0800 Subject: Refactor subscription hub. --- Timeline/ClientApp/src/app/data/SubscriptionHub.ts | 106 +++++++++------------ 1 file changed, 46 insertions(+), 60 deletions(-) (limited to 'Timeline/ClientApp/src') diff --git a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts index 24f1885d..7c24983b 100644 --- a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts +++ b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts @@ -3,14 +3,6 @@ import { Observable } from 'rxjs'; export type Subscriber = (data: TData) => void; -export class Subscription { - constructor(private _onUnsubscribe: () => void) {} - - unsubscribe(): void { - this._onUnsubscribe(); - } -} - export interface ISubscriptionLine { readonly value: undefined | TData; next(value: TData): void; @@ -25,15 +17,14 @@ export class SubscriptionLine implements ISubscriptionLine { private config?: { destroyable?: (value: TData | undefined) => boolean } ) {} - subscribe(subscriber: Subscriber): Subscription { + subscribe(subscriber: Subscriber): void { this._observers.push(subscriber); if (this._current !== undefined) { subscriber(this._current); } - return new Subscription(() => this.unsubscribe(subscriber)); } - private unsubscribe(subscriber: Subscriber): void { + unsubscribe(subscriber: Subscriber): void { if (!this._observers.includes(subscriber)) return; pull(this._observers, subscriber); } @@ -57,12 +48,7 @@ export class SubscriptionLine implements ISubscriptionLine { } } -export interface ISubscriptionHub { - subscribe(key: TKey, subscriber: Subscriber): Subscription; -} - -export class SubscriptionHub - implements ISubscriptionHub { +export class SubscriptionHub { private keyToString: (key: TKey) => string; private setup?: ( key: TKey, @@ -72,10 +58,7 @@ export class SubscriptionHub private readonly subscriptionLineMap = new Map< string, - { - line: SubscriptionLine; - destroyer: (() => void) | undefined; - } + SubscriptionLine >(); private cleanTimerId = 0; @@ -83,7 +66,7 @@ export class SubscriptionHub // setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called. constructor(config?: { keyToString?: (key: TKey) => string; - setup?: (key: TKey, line: ISubscriptionLine) => (() => void) | void; + setup?: (key: TKey, line: ISubscriptionLine) => void; destroyable?: (key: TKey, value: TData | undefined) => boolean; }) { this.keyToString = @@ -102,9 +85,8 @@ export class SubscriptionHub private cleanLines(): void { const toDelete: string[] = []; - for (const [key, info] of this.subscriptionLineMap.entries()) { - if (info.line.destroyable) { - info.destroyer?.(); + for (const [key, line] of this.subscriptionLineMap.entries()) { + if (line.destroyable) { toDelete.push(key); } } @@ -121,54 +103,58 @@ export class SubscriptionHub } } - subscribe(key: TKey, subscriber: Subscriber): Subscription { + private createLine(key: TKey, useSetup = true): SubscriptionLine { const keyString = this.keyToString(key); - const line = (() => { - const info = this.subscriptionLineMap.get(keyString); - if (info == null) { - const { setup, destroyable } = this; - const newLine = new SubscriptionLine({ - destroyable: - destroyable != null - ? (value) => destroyable(key, value) - : undefined, - }); - this.subscriptionLineMap.set(keyString, { - line: newLine, - destroyer: undefined, - }); - const destroyer = setup?.(key, newLine); - if (this.subscriptionLineMap.size === 0) { - this.cleanTimerId = window.setInterval( - this.cleanLines.bind(this), - 20000 - ); - } - this.subscriptionLineMap.set(keyString, { - line: newLine, - destroyer: destroyer != null ? destroyer : undefined, - }); - return newLine; - } else { - return info.line; - } - })(); + const { setup, destroyable } = this; + const newLine = new SubscriptionLine({ + destroyable: + destroyable != null ? (value) => destroyable(key, value) : undefined, + }); + this.subscriptionLineMap.set(keyString, newLine); + if (useSetup) { + setup?.(key, newLine); + } + if (this.subscriptionLineMap.size === 1) { + this.cleanTimerId = window.setInterval(this.cleanLines.bind(this), 20000); + } + return newLine; + } + + subscribe(key: TKey, subscriber: Subscriber): void { + const keyString = this.keyToString(key); + const line = + this.subscriptionLineMap.get(keyString) ?? this.createLine(key); return line.subscribe(subscriber); } + unsubscribe(key: TKey, subscriber: Subscriber): void { + const keyString = this.keyToString(key); + const line = this.subscriptionLineMap.get(keyString); + return line?.unsubscribe(subscriber); + } + getObservable(key: TKey): Observable { return new Observable((observer) => { - const sub = this.subscribe(key, (data) => { + const f = (data: TData): void => { observer.next(data); - }); + }; + + this.subscribe(key, f); return () => { - sub.unsubscribe(); + this.unsubscribe(key, f); }; }); } getLine(key: TKey): ISubscriptionLine | null { const keyString = this.keyToString(key); - return this.subscriptionLineMap.get(keyString)?.line ?? null; + return this.subscriptionLineMap.get(keyString) ?? null; + } + + getLineOrCreateWithoutSetup(key: TKey): ISubscriptionLine { + const keyString = this.keyToString(key); + return ( + this.subscriptionLineMap.get(keyString) ?? this.createLine(key, false) + ); } } -- cgit v1.2.3