diff options
Diffstat (limited to 'Timeline/ClientApp/src/app/data/SubscriptionHub.ts')
-rw-r--r-- | Timeline/ClientApp/src/app/data/SubscriptionHub.ts | 117 |
1 files changed, 67 insertions, 50 deletions
diff --git a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts index 87592da6..8a74c939 100644 --- a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts +++ b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts @@ -1,9 +1,3 @@ -// Remarks for SubscriptionHub:
-// 1. Compared with 'push' sematics in rxjs subject, we need 'pull'. In other words, no subscription, no updating.
-// 2. Make api easier to use and write less boilerplate codes.
-//
-// There might be some bugs, especially memory leaks and in asynchronization codes.
-
import { pull } from 'lodash';
export type Subscriber<TData> = (data: TData) => void;
@@ -16,18 +10,23 @@ export class Subscription { }
}
-export class NoValue {}
+export interface ISubscriptionLine<TData> {
+ readonly value: undefined | TData;
+ next(value: TData): void;
+}
-export class SubscriptionLine<TData> {
- private _current: TData | NoValue = new NoValue();
+export class SubscriptionLine<TData> implements ISubscriptionLine<TData> {
+ private _current: TData | undefined = undefined;
private _observers: Subscriber<TData>[] = [];
- constructor(private config?: { onZeroObserver?: () => void }) {}
+ constructor(
+ private config?: { destroyable?: (value: TData | undefined) => boolean }
+ ) {}
subscribe(subscriber: Subscriber<TData>): Subscription {
this._observers.push(subscriber);
- if (!(this._current instanceof NoValue)) {
+ if (this._current !== undefined) {
subscriber(this._current);
}
return new Subscription(() => this.unsubscribe(subscriber));
@@ -36,9 +35,10 @@ export class SubscriptionLine<TData> { private unsubscribe(subscriber: Subscriber<TData>): void {
if (!this._observers.includes(subscriber)) return;
pull(this._observers, subscriber);
- if (this._observers.length === 0) {
- this?.config?.onZeroObserver?.();
- }
+ }
+
+ get value(): TData | undefined {
+ return this._current;
}
next(value: TData): void {
@@ -46,10 +46,13 @@ export class SubscriptionLine<TData> { this._observers.forEach((observer) => observer(value));
}
- nextWithOld(updator: (old: TData | NoValue) => TData): void {
- const value = updator(this._current);
- this._current = value;
- this._observers.forEach((observer) => observer(value));
+ get destroyable(): boolean {
+ const customDestroyable = this.config?.destroyable;
+
+ return (
+ this._observers.length === 0 &&
+ (customDestroyable != null ? customDestroyable(this._current) : true)
+ );
}
}
@@ -62,23 +65,25 @@ export class SubscriptionHub<TKey, TData> private keyToString: (key: TKey) => string;
private setup?: (
key: TKey,
- next: (value: TData) => void,
- line: SubscriptionLine<TData>
+ line: ISubscriptionLine<TData>
) => (() => void) | void;
+ private destroyable?: (key: TKey, value: TData | undefined) => boolean;
private readonly subscriptionLineMap = new Map<
string,
{
line: SubscriptionLine<TData>;
destroyer: (() => void) | undefined;
- destroyTimer?: number; // Cancel it when resubscribe.
}
>();
+ private cleanTimerId = 0;
+
// setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called.
constructor(config?: {
keyToString?: (key: TKey) => string;
- setup?: (key: TKey, next: (value: TData) => void) => (() => void) | void;
+ setup?: (key: TKey, line: ISubscriptionLine<TData>) => (() => void) | void;
+ destroyable?: (key: TKey, value: TData | undefined) => boolean;
}) {
this.keyToString =
config?.keyToString ??
@@ -91,6 +96,28 @@ export class SubscriptionHub<TKey, TData> });
this.setup = config?.setup;
+ this.destroyable = config?.destroyable;
+ }
+
+ private cleanLines(): void {
+ const toDelete: string[] = [];
+ for (const [key, info] of this.subscriptionLineMap.entries()) {
+ if (info.line.destroyable) {
+ info.destroyer?.();
+ toDelete.push(key);
+ }
+ }
+
+ if (toDelete.length === 0) return;
+
+ for (const key of toDelete) {
+ this.subscriptionLineMap.delete(key);
+ }
+
+ if (this.subscriptionLineMap.size === 0) {
+ window.clearInterval(this.cleanTimerId);
+ this.cleanTimerId = 0;
+ }
}
subscribe(key: TKey, subscriber: Subscriber<TData>): Subscription {
@@ -98,48 +125,38 @@ export class SubscriptionHub<TKey, TData> const line = (() => {
const info = this.subscriptionLineMap.get(keyString);
if (info == null) {
- const { setup } = this;
+ const { setup, destroyable } = this;
const newLine = new SubscriptionLine<TData>({
- onZeroObserver: () => {
- const i = this.subscriptionLineMap.get(keyString);
- if (i != null) {
- i.destroyTimer = window.setTimeout(() => {
- i.destroyer?.();
- this.subscriptionLineMap.delete(keyString);
- }, 10000);
- }
- },
+ destroyable:
+ destroyable != null
+ ? (value) => destroyable(key, value)
+ : undefined,
});
- const destroyer = setup?.(key, newLine.next.bind(newLine), newLine);
+ this.subscriptionLineMap.set(keyString, {
+ line: newLine,
+ destroyer: undefined,
+ });
+ const destroyer = setup?.(key, newLine);
+ if (this.subscriptionLineMap.size === 0) {
+ this.cleanTimerId = window.setInterval(
+ this.cleanLines.bind(this),
+ 20000
+ );
+ }
this.subscriptionLineMap.set(keyString, {
line: newLine,
destroyer: destroyer != null ? destroyer : undefined,
});
return newLine;
} else {
- if (info.destroyTimer != null) {
- window.clearTimeout(info.destroyTimer);
- info.destroyTimer = undefined;
- }
return info.line;
}
})();
return line.subscribe(subscriber);
}
- update(key: TKey, value: TData): void {
+ getLine(key: TKey): ISubscriptionLine<TData> | null {
const keyString = this.keyToString(key);
- const info = this.subscriptionLineMap.get(keyString);
- if (info != null) {
- info.line.next(value);
- }
- }
-
- updateWithOld(key: TKey, updator: (old: TData | NoValue) => TData): void {
- const keyString = this.keyToString(key);
- const info = this.subscriptionLineMap.get(keyString);
- if (info != null) {
- info.line.nextWithOld(updator);
- }
+ return this.subscriptionLineMap.get(keyString)?.line ?? null;
}
}
|