From fd770ae690c337941231b79b1ee22513b72d6335 Mon Sep 17 00:00:00 2001 From: crupest Date: Sun, 9 Aug 2020 22:22:46 +0800 Subject: Refactor api of data hub. --- Timeline/ClientApp/src/app/data/DataHub.ts | 100 +++++++++++++++++------------ 1 file changed, 58 insertions(+), 42 deletions(-) (limited to 'Timeline/ClientApp/src') diff --git a/Timeline/ClientApp/src/app/data/DataHub.ts b/Timeline/ClientApp/src/app/data/DataHub.ts index 982aacba..fd6b65d7 100644 --- a/Timeline/ClientApp/src/app/data/DataHub.ts +++ b/Timeline/ClientApp/src/app/data/DataHub.ts @@ -1,21 +1,15 @@ import { pull } from 'lodash'; -import { Observable } from 'rxjs'; +import { Observable, BehaviorSubject, combineLatest } from 'rxjs'; +import { map } from 'rxjs/operators'; export type Subscriber = (data: TData) => void; -export interface IDataLine { - readonly value: undefined | TData; - next(value: TData): void; - readonly isSyncing: boolean; - beginSync(): void; - endSync(): void; - endSyncAndNext(value: TData): void; -} +export type WithSyncStatus = T & { syncing: boolean }; -export class DataLine implements IDataLine { +export class DataLine { private _current: TData | undefined = undefined; - private _syncing = false; + private _syncingSubject = new BehaviorSubject(false); private _observers: Subscriber[] = []; @@ -23,18 +17,47 @@ export class DataLine implements IDataLine { private config?: { destroyable?: (value: TData | undefined) => boolean } ) {} - subscribe(subscriber: Subscriber): void { + private subscribe(subscriber: Subscriber): void { this._observers.push(subscriber); if (this._current !== undefined) { subscriber(this._current); } } - unsubscribe(subscriber: Subscriber): void { + private unsubscribe(subscriber: Subscriber): void { if (!this._observers.includes(subscriber)) return; pull(this._observers, subscriber); } + getObservable(): Observable { + return new Observable((observer) => { + const f = (data: TData): void => { + observer.next(data); + }; + this.subscribe(f); + + return () => { + this.unsubscribe(f); + }; + }); + } + + getSyncStatusObservable(): Observable { + return this._syncingSubject.asObservable(); + } + + getDataWithSyncStatusObservable(): Observable> { + return combineLatest([ + this.getObservable(), + this.getSyncStatusObservable(), + ]).pipe( + map(([data, syncing]) => ({ + ...data, + syncing, + })) + ); + } + get value(): TData | undefined { return this._current; } @@ -45,18 +68,18 @@ export class DataLine implements IDataLine { } get isSyncing(): boolean { - return this._syncing; + return this._syncingSubject.value; } beginSync(): void { - if (!this._syncing) { - this._syncing = true; + if (!this._syncingSubject.value) { + this._syncingSubject.next(true); } } endSync(): void { - if (this._syncing) { - this._syncing = false; + if (this._syncingSubject.value) { + this._syncingSubject.next(false); } } @@ -77,7 +100,7 @@ export class DataLine implements IDataLine { export class DataHub { private keyToString: (key: TKey) => string; - private setup?: (key: TKey, line: IDataLine) => (() => void) | void; + private setup?: (key: TKey, line: DataLine) => (() => void) | void; private destroyable?: (key: TKey, value: TData | undefined) => boolean; private readonly subscriptionLineMap = new Map>(); @@ -87,7 +110,7 @@ export class DataHub { // setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called. constructor(config?: { keyToString?: (key: TKey) => string; - setup?: (key: TKey, line: IDataLine) => void; + setup?: (key: TKey, line: DataLine) => void; destroyable?: (key: TKey, value: TData | undefined) => boolean; }) { this.keyToString = @@ -141,38 +164,31 @@ export class DataHub { return newLine; } - subscribe(key: TKey, subscriber: Subscriber): void { - const keyString = this.keyToString(key); - const line = - this.subscriptionLineMap.get(keyString) ?? this.createLine(key); - return line.subscribe(subscriber); + getObservable(key: TKey): Observable { + return this.getLineOrCreateWithSetup(key).getObservable(); } - unsubscribe(key: TKey, subscriber: Subscriber): void { - const keyString = this.keyToString(key); - const line = this.subscriptionLineMap.get(keyString); - return line?.unsubscribe(subscriber); + getSyncStatusObservable(key: TKey): Observable { + return this.getLineOrCreateWithSetup(key).getSyncStatusObservable(); } - getObservable(key: TKey): Observable { - return new Observable((observer) => { - const f = (data: TData): void => { - observer.next(data); - }; - - this.subscribe(key, f); - return () => { - this.unsubscribe(key, f); - }; - }); + getDataWithSyncStatusObservable( + key: TKey + ): Observable> { + return this.getLineOrCreateWithSetup(key).getDataWithSyncStatusObservable(); } - getLine(key: TKey): IDataLine | null { + getLine(key: TKey): DataLine | null { const keyString = this.keyToString(key); return this.subscriptionLineMap.get(keyString) ?? null; } - getLineOrCreateWithoutSetup(key: TKey): IDataLine { + getLineOrCreateWithSetup(key: TKey): DataLine { + const keyString = this.keyToString(key); + return this.subscriptionLineMap.get(keyString) ?? this.createLine(key); + } + + getLineOrCreateWithoutSetup(key: TKey): DataLine { const keyString = this.keyToString(key); return ( this.subscriptionLineMap.get(keyString) ?? this.createLine(key, false) -- cgit v1.2.3