diff options
| author | crupest <crupest@outlook.com> | 2020-10-27 19:21:35 +0800 | 
|---|---|---|
| committer | crupest <crupest@outlook.com> | 2020-10-27 19:21:35 +0800 | 
| commit | 05ccb4d8f1bbe3fb64e117136b4a89bcfb0b0b33 (patch) | |
| tree | 929e514de85eb82a5acb96ecffc6e6d2d95f878f /FrontEnd/src/app/services/DataHub.ts | |
| parent | 986c6f2e3b858d6332eba0b42acc6861cd4d0227 (diff) | |
| download | timeline-05ccb4d8f1bbe3fb64e117136b4a89bcfb0b0b33.tar.gz timeline-05ccb4d8f1bbe3fb64e117136b4a89bcfb0b0b33.tar.bz2 timeline-05ccb4d8f1bbe3fb64e117136b4a89bcfb0b0b33.zip  | |
Split front and back end.
Diffstat (limited to 'FrontEnd/src/app/services/DataHub.ts')
| -rw-r--r-- | FrontEnd/src/app/services/DataHub.ts | 225 | 
1 files changed, 225 insertions, 0 deletions
diff --git a/FrontEnd/src/app/services/DataHub.ts b/FrontEnd/src/app/services/DataHub.ts new file mode 100644 index 00000000..93a9b41f --- /dev/null +++ b/FrontEnd/src/app/services/DataHub.ts @@ -0,0 +1,225 @@ +import { pull } from "lodash"; +import { Observable, BehaviorSubject, combineLatest } from "rxjs"; +import { map } from "rxjs/operators"; + +export type Subscriber<TData> = (data: TData) => void; + +export type WithSyncStatus<T> = T & { syncing: boolean }; + +export class DataLine<TData> { +  private _current: TData | undefined = undefined; + +  private _syncPromise: Promise<void> | null = null; +  private _syncingSubject = new BehaviorSubject<boolean>(false); + +  private _observers: Subscriber<TData>[] = []; + +  constructor( +    private config: { +      sync: () => Promise<void>; +      destroyable?: (value: TData | undefined) => boolean; +      disableInitSync?: boolean; +    } +  ) { +    if (config.disableInitSync !== true) { +      setImmediate(() => void this.sync()); +    } +  } + +  private subscribe(subscriber: Subscriber<TData>): void { +    this._observers.push(subscriber); +    if (this._current !== undefined) { +      subscriber(this._current); +    } +  } + +  private unsubscribe(subscriber: Subscriber<TData>): void { +    if (!this._observers.includes(subscriber)) return; +    pull(this._observers, subscriber); +  } + +  getObservable(): Observable<TData> { +    return new Observable<TData>((observer) => { +      const f = (data: TData): void => { +        observer.next(data); +      }; +      this.subscribe(f); + +      return () => { +        this.unsubscribe(f); +      }; +    }); +  } + +  getSyncStatusObservable(): Observable<boolean> { +    return this._syncingSubject.asObservable(); +  } + +  getDataWithSyncStatusObservable(): Observable<WithSyncStatus<TData>> { +    return combineLatest([ +      this.getObservable(), +      this.getSyncStatusObservable(), +    ]).pipe( +      map(([data, syncing]) => ({ +        ...data, +        syncing, +      })) +    ); +  } + +  get value(): TData | undefined { +    return this._current; +  } + +  next(value: TData): void { +    this._current = value; +    this._observers.forEach((observer) => observer(value)); +  } + +  get isSyncing(): boolean { +    return this._syncPromise != null; +  } + +  sync(): Promise<void> { +    if (this._syncPromise == null) { +      this._syncingSubject.next(true); +      this._syncPromise = this.config.sync().then(() => { +        this._syncingSubject.next(false); +        this._syncPromise = null; +      }); +    } + +    return this._syncPromise; +  } + +  syncWithAction( +    syncAction: (line: DataLine<TData>) => Promise<void> +  ): Promise<void> { +    if (this._syncPromise == null) { +      this._syncingSubject.next(true); +      this._syncPromise = syncAction(this).then(() => { +        this._syncingSubject.next(false); +        this._syncPromise = null; +      }); +    } + +    return this._syncPromise; +  } + +  get destroyable(): boolean { +    const customDestroyable = this.config?.destroyable; + +    return ( +      this._observers.length === 0 && +      !this.isSyncing && +      (customDestroyable != null ? customDestroyable(this._current) : true) +    ); +  } +} + +export class DataHub<TKey, TData> { +  private sync: (key: TKey, line: DataLine<TData>) => Promise<void>; +  private keyToString: (key: TKey) => string; +  private destroyable?: (key: TKey, value: TData | undefined) => boolean; + +  private readonly subscriptionLineMap = new Map<string, DataLine<TData>>(); + +  private cleanTimerId = 0; + +  // setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called. +  constructor(config: { +    sync: (key: TKey, line: DataLine<TData>) => Promise<void>; +    keyToString?: (key: TKey) => string; +    destroyable?: (key: TKey, value: TData | undefined) => boolean; +  }) { +    this.sync = config.sync; +    this.keyToString = +      config.keyToString ?? +      ((value): string => { +        if (typeof value === "string") return value; +        else +          throw new Error( +            "Default keyToString function only pass string value." +          ); +      }); + +    this.destroyable = config.destroyable; +  } + +  private cleanLines(): void { +    const toDelete: string[] = []; +    for (const [key, line] of this.subscriptionLineMap.entries()) { +      if (line.destroyable) { +        toDelete.push(key); +      } +    } + +    if (toDelete.length === 0) return; + +    for (const key of toDelete) { +      this.subscriptionLineMap.delete(key); +    } + +    if (this.subscriptionLineMap.size === 0) { +      window.clearInterval(this.cleanTimerId); +      this.cleanTimerId = 0; +    } +  } + +  private createLine(key: TKey, disableInitSync = false): DataLine<TData> { +    const keyString = this.keyToString(key); +    const { destroyable } = this; +    const newLine: DataLine<TData> = new DataLine<TData>({ +      sync: () => this.sync(key, newLine), +      destroyable: +        destroyable != null ? (value) => destroyable(key, value) : undefined, +      disableInitSync: disableInitSync, +    }); +    this.subscriptionLineMap.set(keyString, newLine); +    if (this.subscriptionLineMap.size === 1) { +      this.cleanTimerId = window.setInterval(this.cleanLines.bind(this), 20000); +    } +    return newLine; +  } + +  getObservable(key: TKey): Observable<TData> { +    return this.getLineOrCreate(key).getObservable(); +  } + +  getSyncStatusObservable(key: TKey): Observable<boolean> { +    return this.getLineOrCreate(key).getSyncStatusObservable(); +  } + +  getDataWithSyncStatusObservable( +    key: TKey +  ): Observable<WithSyncStatus<TData>> { +    return this.getLineOrCreate(key).getDataWithSyncStatusObservable(); +  } + +  getLine(key: TKey): DataLine<TData> | null { +    const keyString = this.keyToString(key); +    return this.subscriptionLineMap.get(keyString) ?? null; +  } + +  getLineOrCreate(key: TKey): DataLine<TData> { +    const keyString = this.keyToString(key); +    return this.subscriptionLineMap.get(keyString) ?? this.createLine(key); +  } + +  getLineOrCreateWithoutInitSync(key: TKey): DataLine<TData> { +    const keyString = this.keyToString(key); +    return ( +      this.subscriptionLineMap.get(keyString) ?? this.createLine(key, true) +    ); +  } + +  optionalInitLineWithSyncAction( +    key: TKey, +    syncAction: (line: DataLine<TData>) => Promise<void> +  ): Promise<void> { +    const optionalLine = this.getLine(key); +    if (optionalLine != null) return Promise.resolve(); +    const line = this.createLine(key, true); +    return line.syncWithAction(syncAction); +  } +}  | 
