aboutsummaryrefslogtreecommitdiff
path: root/Timeline/ClientApp/src/app/data/SubscriptionHub.ts
blob: 2bc6de56c785b637b36ff5d7223d777c3af8745c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Remarks for SubscriptionHub:
// 1. Compared with 'push' sematics in rxjs subject, we need 'pull'. In other words, no subscription, no updating.
// 2. We need a way to finalize the last object. For example, if it has an object url, we need to revoke it.
// 3. Make api easier to use and write less boilerplate codes.
//
// There might be some bugs, especially memory leaks and in asynchronization codes.

import * as rxjs from 'rxjs';
import { filter } from 'rxjs/operators';

export type Subscriber<TData> = (data: TData) => void;

export class Subscription {
  constructor(private _onUnsubscribe: () => void) {}

  unsubscribe(): void {
    this._onUnsubscribe();
  }
}

class SubscriptionToken {
  constructor(public _subscription: rxjs.Subscription) {}
}

class SubscriptionLine<TData> {
  private _lastDataPromise: Promise<void>;
  private _dataSubject = new rxjs.BehaviorSubject<TData | undefined>(undefined);
  private _data$: rxjs.Observable<TData> = this._dataSubject.pipe(
    filter((d) => d !== undefined)
  ) as rxjs.Observable<TData>;
  private _refCount = 0;

  constructor(
    _creator: () => Promise<TData>,
    private _destroyer: (data: TData) => void,
    private _onZeroRef: (self: SubscriptionLine<TData>) => void
  ) {
    this._lastDataPromise = _creator().then((data) => {
      this._dataSubject.next(data);
    });
  }

  subscribe(subscriber: Subscriber<TData>): SubscriptionToken {
    const subscription = this._data$.subscribe(subscriber);
    this._refCount += 1;
    return new SubscriptionToken(subscription);
  }

  unsubscribe(token: SubscriptionToken): void {
    token._subscription.unsubscribe();
    this._refCount -= 1;
    if (this._refCount === 0) {
      void this._lastDataPromise.then(() => {
        const last = this._dataSubject.value;
        if (last !== undefined) {
          this._destroyer(last);
        }
      });
      this._onZeroRef(this);
    }
  }

  next(updator: () => Promise<TData>): void {
    this._lastDataPromise = this._lastDataPromise
      .then(() => updator())
      .then((data) => {
        const last = this._dataSubject.value;
        if (last !== undefined) {
          this._destroyer(last);
        }
        this._dataSubject.next(data);
      });
  }
}

export interface ISubscriptionHub<TKey, TData> {
  subscribe(key: TKey, subscriber: Subscriber<TData>): Subscription;
}

export class SubscriptionHub<TKey, TData>
  implements ISubscriptionHub<TKey, TData> {
  constructor(
    public keyToString: (key: TKey) => string,
    public creator: (key: TKey) => Promise<TData>,
    public destroyer: (key: TKey, data: TData) => void
  ) {}

  private subscriptionLineMap = new Map<string, SubscriptionLine<TData>>();

  subscribe(key: TKey, subscriber: Subscriber<TData>): Subscription {
    const keyString = this.keyToString(key);
    const line = (() => {
      const savedLine = this.subscriptionLineMap.get(keyString);
      if (savedLine == null) {
        const newLine = new SubscriptionLine<TData>(
          () => this.creator(key),
          (data) => {
            this.destroyer(key, data);
          },
          () => {
            this.subscriptionLineMap.delete(keyString);
          }
        );
        this.subscriptionLineMap.set(keyString, newLine);
        return newLine;
      } else {
        return savedLine;
      }
    })();
    const token = line.subscribe(subscriber);
    return new Subscription(() => {
      line.unsubscribe(token);
    });
  }

  // Old data is destroyed automatically.
  // updator is called only if there is subscription.
  update(key: TKey, updator: (key: TKey) => Promise<TData>): void {
    const keyString = this.keyToString(key);
    const line = this.subscriptionLineMap.get(keyString);
    if (line != null) {
      line.next(() => updator(key));
    }
  }
}