aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2020-08-09 17:40:46 +0800
committercrupest <crupest@outlook.com>2020-08-09 17:40:46 +0800
commit53214d5aa49fe4a4c85c9a6ebc0941de476d96c5 (patch)
treef8f84ddc41ae0b604b1334353c3fdc6e8a6db31b
parent282c28754a7225fb7f53aa7dfb7598fd54d2db88 (diff)
downloadtimeline-53214d5aa49fe4a4c85c9a6ebc0941de476d96c5.tar.gz
timeline-53214d5aa49fe4a4c85c9a6ebc0941de476d96c5.tar.bz2
timeline-53214d5aa49fe4a4c85c9a6ebc0941de476d96c5.zip
Refactor subscription hub.
-rw-r--r--Timeline/ClientApp/src/app/data/SubscriptionHub.ts106
1 files changed, 46 insertions, 60 deletions
diff --git a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts
index 24f1885d..7c24983b 100644
--- a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts
+++ b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts
@@ -3,14 +3,6 @@ import { Observable } from 'rxjs';
export type Subscriber<TData> = (data: TData) => void;
-export class Subscription {
- constructor(private _onUnsubscribe: () => void) {}
-
- unsubscribe(): void {
- this._onUnsubscribe();
- }
-}
-
export interface ISubscriptionLine<TData> {
readonly value: undefined | TData;
next(value: TData): void;
@@ -25,15 +17,14 @@ export class SubscriptionLine<TData> implements ISubscriptionLine<TData> {
private config?: { destroyable?: (value: TData | undefined) => boolean }
) {}
- subscribe(subscriber: Subscriber<TData>): Subscription {
+ subscribe(subscriber: Subscriber<TData>): void {
this._observers.push(subscriber);
if (this._current !== undefined) {
subscriber(this._current);
}
- return new Subscription(() => this.unsubscribe(subscriber));
}
- private unsubscribe(subscriber: Subscriber<TData>): void {
+ unsubscribe(subscriber: Subscriber<TData>): void {
if (!this._observers.includes(subscriber)) return;
pull(this._observers, subscriber);
}
@@ -57,12 +48,7 @@ export class SubscriptionLine<TData> implements ISubscriptionLine<TData> {
}
}
-export interface ISubscriptionHub<TKey, TData> {
- subscribe(key: TKey, subscriber: Subscriber<TData>): Subscription;
-}
-
-export class SubscriptionHub<TKey, TData>
- implements ISubscriptionHub<TKey, TData> {
+export class SubscriptionHub<TKey, TData> {
private keyToString: (key: TKey) => string;
private setup?: (
key: TKey,
@@ -72,10 +58,7 @@ export class SubscriptionHub<TKey, TData>
private readonly subscriptionLineMap = new Map<
string,
- {
- line: SubscriptionLine<TData>;
- destroyer: (() => void) | undefined;
- }
+ SubscriptionLine<TData>
>();
private cleanTimerId = 0;
@@ -83,7 +66,7 @@ export class SubscriptionHub<TKey, TData>
// setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called.
constructor(config?: {
keyToString?: (key: TKey) => string;
- setup?: (key: TKey, line: ISubscriptionLine<TData>) => (() => void) | void;
+ setup?: (key: TKey, line: ISubscriptionLine<TData>) => void;
destroyable?: (key: TKey, value: TData | undefined) => boolean;
}) {
this.keyToString =
@@ -102,9 +85,8 @@ export class SubscriptionHub<TKey, TData>
private cleanLines(): void {
const toDelete: string[] = [];
- for (const [key, info] of this.subscriptionLineMap.entries()) {
- if (info.line.destroyable) {
- info.destroyer?.();
+ for (const [key, line] of this.subscriptionLineMap.entries()) {
+ if (line.destroyable) {
toDelete.push(key);
}
}
@@ -121,54 +103,58 @@ export class SubscriptionHub<TKey, TData>
}
}
- subscribe(key: TKey, subscriber: Subscriber<TData>): Subscription {
+ private createLine(key: TKey, useSetup = true): SubscriptionLine<TData> {
const keyString = this.keyToString(key);
- const line = (() => {
- const info = this.subscriptionLineMap.get(keyString);
- if (info == null) {
- const { setup, destroyable } = this;
- const newLine = new SubscriptionLine<TData>({
- destroyable:
- destroyable != null
- ? (value) => destroyable(key, value)
- : undefined,
- });
- this.subscriptionLineMap.set(keyString, {
- line: newLine,
- destroyer: undefined,
- });
- const destroyer = setup?.(key, newLine);
- if (this.subscriptionLineMap.size === 0) {
- this.cleanTimerId = window.setInterval(
- this.cleanLines.bind(this),
- 20000
- );
- }
- this.subscriptionLineMap.set(keyString, {
- line: newLine,
- destroyer: destroyer != null ? destroyer : undefined,
- });
- return newLine;
- } else {
- return info.line;
- }
- })();
+ const { setup, destroyable } = this;
+ const newLine = new SubscriptionLine<TData>({
+ destroyable:
+ destroyable != null ? (value) => destroyable(key, value) : undefined,
+ });
+ this.subscriptionLineMap.set(keyString, newLine);
+ if (useSetup) {
+ setup?.(key, newLine);
+ }
+ if (this.subscriptionLineMap.size === 1) {
+ this.cleanTimerId = window.setInterval(this.cleanLines.bind(this), 20000);
+ }
+ return newLine;
+ }
+
+ subscribe(key: TKey, subscriber: Subscriber<TData>): void {
+ const keyString = this.keyToString(key);
+ const line =
+ this.subscriptionLineMap.get(keyString) ?? this.createLine(key);
return line.subscribe(subscriber);
}
+ unsubscribe(key: TKey, subscriber: Subscriber<TData>): void {
+ const keyString = this.keyToString(key);
+ const line = this.subscriptionLineMap.get(keyString);
+ return line?.unsubscribe(subscriber);
+ }
+
getObservable(key: TKey): Observable<TData> {
return new Observable((observer) => {
- const sub = this.subscribe(key, (data) => {
+ const f = (data: TData): void => {
observer.next(data);
- });
+ };
+
+ this.subscribe(key, f);
return () => {
- sub.unsubscribe();
+ this.unsubscribe(key, f);
};
});
}
getLine(key: TKey): ISubscriptionLine<TData> | null {
const keyString = this.keyToString(key);
- return this.subscriptionLineMap.get(keyString)?.line ?? null;
+ return this.subscriptionLineMap.get(keyString) ?? null;
+ }
+
+ getLineOrCreateWithoutSetup(key: TKey): ISubscriptionLine<TData> {
+ const keyString = this.keyToString(key);
+ return (
+ this.subscriptionLineMap.get(keyString) ?? this.createLine(key, false)
+ );
}
}