aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcrupest <crupest@outlook.com>2020-08-03 17:26:40 +0800
committercrupest <crupest@outlook.com>2020-08-03 17:26:40 +0800
commitff94145165d6877e47db7ec11c4d3b802f6f3532 (patch)
tree1417e08efb90cd808ae59b0cc671c517db617b93
parent66417650b46f27f8238cc6997e2ff14579c1244a (diff)
downloadtimeline-ff94145165d6877e47db7ec11c4d3b802f6f3532.tar.gz
timeline-ff94145165d6877e47db7ec11c4d3b802f6f3532.tar.bz2
timeline-ff94145165d6877e47db7ec11c4d3b802f6f3532.zip
...
-rw-r--r--Timeline/ClientApp/src/app/data/SubscriptionHub.ts168
-rw-r--r--Timeline/ClientApp/src/app/data/timeline.ts133
2 files changed, 144 insertions, 157 deletions
diff --git a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts
index 406d293f..c127c31a 100644
--- a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts
+++ b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts
@@ -1,13 +1,10 @@
// Remarks for SubscriptionHub:
// 1. Compared with 'push' sematics in rxjs subject, we need 'pull'. In other words, no subscription, no updating.
-// 2. We need a way to finalize the last object. For example, if it has an object url, we need to revoke it.
-// 3. Make api easier to use and write less boilerplate codes.
-//
-// Currently updator will wait for last update or creation to finish. So the old data passed to it will always be right. We may add feature for just cancel last one but not wait for it.
+// 2. Make api easier to use and write less boilerplate codes.
//
// There might be some bugs, especially memory leaks and in asynchronization codes.
-import * as rxjs from 'rxjs';
+import { pull } from 'lodash';
export type Subscriber<TData> = (data: TData) => void;
@@ -19,60 +16,34 @@ export class Subscription {
}
}
-class SubscriptionToken {
- constructor(public _subscription: rxjs.Subscription) {}
-}
+class NoValue {}
+
+export class SubscriptionLine<TData> {
+ private _current: TData | NoValue = new NoValue();
+
+ private _observers: Subscriber<TData>[] = [];
+
+ constructor(private config?: { onZeroObserver?: () => void }) {}
-class SubscriptionLine<TData> {
- private _lastDataPromise: Promise<TData>;
- private _dataSubject: rxjs.BehaviorSubject<TData>;
- private _refCount = 0;
-
- constructor(
- defaultValueProvider: () => TData,
- setup: ((old: TData) => Promise<TData>) | undefined,
- private _destroyer: ((data: TData) => void) | undefined,
- private _onZeroRef: (self: SubscriptionLine<TData>) => void
- ) {
- const initValue = defaultValueProvider();
- this._lastDataPromise = Promise.resolve(initValue);
- this._dataSubject = new rxjs.BehaviorSubject<TData>(initValue);
- if (setup != null) {
- this.next(setup);
+ subscribe(subscriber: Subscriber<TData>): Subscription {
+ this._observers.push(subscriber);
+ if (!(this._current instanceof NoValue)) {
+ subscriber(this._current);
}
- }
- subscribe(subscriber: Subscriber<TData>): SubscriptionToken {
- const subscription = this._dataSubject.subscribe(subscriber);
- this._refCount += 1;
- return new SubscriptionToken(subscription);
+ return new Subscription(() => this.unsubscribe(subscriber));
}
- unsubscribe(token: SubscriptionToken): void {
- token._subscription.unsubscribe();
- this._refCount -= 1;
- if (this._refCount === 0) {
- const { _destroyer: destroyer } = this;
- if (destroyer != null) {
- void this._lastDataPromise.then((data) => {
- destroyer(data);
- });
- }
- this._onZeroRef(this);
+ private unsubscribe(subscriber: Subscriber<TData>): void {
+ if (!this._observers.includes(subscriber)) return;
+ pull(this._observers, subscriber);
+ if (this._observers.length === 0) {
+ this?.config?.onZeroObserver?.();
}
}
- next(updator: (old: TData) => Promise<TData>): void {
- this._lastDataPromise = this._lastDataPromise
- .then((old) => updator(old))
- .then((data) => {
- const last = this._dataSubject.value;
- if (this._destroyer != null) {
- this._destroyer(last);
- }
- this._dataSubject.next(data);
- return data;
- });
+ next(value: TData): void {
+ this._observers.forEach((observer) => observer(value));
}
}
@@ -82,53 +53,78 @@ export interface ISubscriptionHub<TKey, TData> {
export class SubscriptionHub<TKey, TData>
implements ISubscriptionHub<TKey, TData> {
- // If setup is set, update is called with setup immediately after setting default value.
- constructor(
- public keyToString: (key: TKey) => string,
- public defaultValueProvider: (key: TKey) => TData,
- public setup?: (key: TKey) => Promise<TData>,
- public destroyer?: (key: TKey, data: TData) => void
- ) {}
+ private keyToString: (key: TKey) => string;
+ private setup?: (
+ key: TKey,
+ next: (value: TData) => void
+ ) => (() => void) | void;
- private subscriptionLineMap = new Map<string, SubscriptionLine<TData>>();
+ private readonly subscriptionLineMap = new Map<
+ string,
+ {
+ line: SubscriptionLine<TData>;
+ destroyer: (() => void) | undefined;
+ destroyTimer?: number; // Cancel it when resubscribe.
+ }
+ >();
+
+ // setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called.
+ constructor(config?: {
+ keyToString?: (key: TKey) => string;
+ setup?: (key: TKey, next: (value: TData) => void) => (() => void) | void;
+ }) {
+ this.keyToString =
+ config?.keyToString ??
+ ((value): string => {
+ if (typeof value === 'string') return value;
+ else
+ throw new Error(
+ 'Default keyToString function only pass string value.'
+ );
+ });
+
+ this.setup = config?.setup;
+ }
subscribe(key: TKey, subscriber: Subscriber<TData>): Subscription {
const keyString = this.keyToString(key);
const line = (() => {
- const savedLine = this.subscriptionLineMap.get(keyString);
- if (savedLine == null) {
- const { setup, destroyer } = this;
- const newLine = new SubscriptionLine<TData>(
- () => this.defaultValueProvider(key),
- setup != null ? () => setup(key) : undefined,
- destroyer != null
- ? (data) => {
- destroyer(key, data);
- }
- : undefined,
- () => {
- this.subscriptionLineMap.delete(keyString);
- }
- );
- this.subscriptionLineMap.set(keyString, newLine);
+ const info = this.subscriptionLineMap.get(keyString);
+ if (info == null) {
+ const { setup } = this;
+ const newLine = new SubscriptionLine<TData>({
+ onZeroObserver: () => {
+ const i = this.subscriptionLineMap.get(keyString);
+ if (i != null) {
+ i.destroyTimer = window.setTimeout(() => {
+ i.destroyer?.();
+ this.subscriptionLineMap.delete(keyString);
+ }, 10000);
+ }
+ },
+ });
+ const destroyer = setup?.(key, newLine.next.bind(newLine));
+ this.subscriptionLineMap.set(keyString, {
+ line: newLine,
+ destroyer: destroyer != null ? destroyer : undefined,
+ });
return newLine;
} else {
- return savedLine;
+ if (info.destroyTimer != null) {
+ window.clearTimeout(info.destroyTimer);
+ info.destroyTimer = undefined;
+ }
+ return info.line;
}
})();
- const token = line.subscribe(subscriber);
- return new Subscription(() => {
- line.unsubscribe(token);
- });
+ return line.subscribe(subscriber);
}
- // Old data is destroyed automatically.
- // updator is called only if there is subscription.
- update(key: TKey, updator: (key: TKey, old: TData) => Promise<TData>): void {
+ update(key: TKey, value: TData): void {
const keyString = this.keyToString(key);
- const line = this.subscriptionLineMap.get(keyString);
- if (line != null) {
- line.next((old) => updator(key, old));
+ const info = this.subscriptionLineMap.get(keyString);
+ if (info != null) {
+ info.line.next(value);
}
}
}
diff --git a/Timeline/ClientApp/src/app/data/timeline.ts b/Timeline/ClientApp/src/app/data/timeline.ts
index b30f3a7d..8680e9b8 100644
--- a/Timeline/ClientApp/src/app/data/timeline.ts
+++ b/Timeline/ClientApp/src/app/data/timeline.ts
@@ -32,7 +32,7 @@ import {
HttpTimelineNameConflictError,
HttpTimelineGenericPostInfo,
} from '../http/timeline';
-import { BlobWithEtag, NotModified } from '../http/common';
+import { BlobWithEtag, NotModified, HttpNetworkError } from '../http/common';
export type TimelineInfo = HttpTimelineInfo;
export type TimelineChangePropertyRequest = HttpTimelinePatchRequest;
@@ -76,23 +76,17 @@ export interface TimelinePostListState {
posts: TimelinePostInfo[];
}
-export interface TimelineInfoLoadingState {
- state: 'loading'; // Loading from cache.
- timeline: null;
-}
-
-export interface TimelineInfoNonLoadingState {
- state:
- | 'syncing' // Cache loaded and syncing now. If null means there is no cache for the timeline.
- | 'offline' // Sync failed and use cache.
- | 'synced' // Sync succeeded. If null means the timeline does not exist.
- | 'new'; // This is a new timeline different from cached one. If null means the timeline does not exist.
- timeline: TimelineInfo | null;
-}
-
-export type TimelineInfoState =
- | TimelineInfoLoadingState
- | TimelineInfoNonLoadingState;
+export type TimelineWithSyncState =
+ | {
+ syncState:
+ | 'offline' // Sync failed and use cache. Null timeline means no cache.
+ | 'synced'; // Sync succeeded. Null timeline means the timeline does not exist.
+ timeline: TimelineInfo | null;
+ }
+ | {
+ syncState: 'new'; // This is a new timeline different from cached one.
+ timeline: TimelineInfo;
+ };
interface TimelineCache {
timeline: TimelineInfo;
@@ -112,43 +106,35 @@ export class TimelineService {
return `timeline.${timelineName}`;
}
- private getCachedTimeline(
+ private async fetchAndCacheTimeline(
timelineName: string
- ): Promise<TimelineInfo | null> {
- return dataStorage
- .getItem<TimelineCache | null>(this.getTimelineKey(timelineName))
- .then((cache) => cache?.timeline ?? null);
- }
-
- private async syncTimeline(timelineName: string): Promise<TimelineInfo> {
+ ): Promise<
+ | { timeline: TimelineInfo; type: 'new' | 'cache' | 'synced' }
+ | 'offline'
+ | 'notexist'
+ > {
const cache = await dataStorage.getItem<TimelineCache | null>(timelineName);
+ const key = this.getTimelineKey(timelineName);
const save = (cache: TimelineCache): Promise<TimelineCache> =>
- dataStorage.setItem<TimelineCache>(
- this.getTimelineKey(timelineName),
- cache
- );
- const push = (state: TimelineInfoState): void => {
- this._timelineSubscriptionHub.update(timelineName, () =>
- Promise.resolve(state)
- );
- };
+ dataStorage.setItem<TimelineCache>(key, cache);
- let result: TimelineInfo;
const now = new Date();
if (cache == null) {
try {
- const res = await getHttpTimelineClient().getTimeline(timelineName);
- result = res;
- await save({ timeline: result, lastUpdated: now.toISOString() });
- push({ state: 'synced', timeline: result });
+ const timeline = await getHttpTimelineClient().getTimeline(
+ timelineName
+ );
+ await save({ timeline, lastUpdated: now.toISOString() });
+ return { timeline, type: 'synced' };
} catch (e) {
if (e instanceof HttpTimelineNotExistError) {
- push({ state: 'synced', timeline: null });
+ return 'notexist';
+ } else if (e instanceof HttpNetworkError) {
+ return 'offline';
} else {
- push({ state: 'offline', timeline: null });
+ throw e;
}
- throw e;
}
} else {
try {
@@ -157,50 +143,55 @@ export class TimelineService {
ifModifiedSince: new Date(cache.lastUpdated),
});
if (res instanceof NotModified) {
- result = cache.timeline;
- await save({ timeline: result, lastUpdated: now.toISOString() });
- push({ state: 'synced', timeline: result });
+ const { timeline } = cache;
+ await save({ timeline, lastUpdated: now.toISOString() });
+ return { timeline, type: 'synced' };
} else {
- result = res;
- await save({ timeline: result, lastUpdated: now.toISOString() });
+ const timeline = res;
+ await save({ timeline, lastUpdated: now.toISOString() });
if (res.uniqueId === cache.timeline.uniqueId) {
- push({ state: 'synced', timeline: result });
+ return { timeline, type: 'synced' };
} else {
- push({ state: 'new', timeline: result });
+ return { timeline, type: 'new' };
}
}
} catch (e) {
if (e instanceof HttpTimelineNotExistError) {
- push({ state: 'new', timeline: null });
- } else {
- push({ state: 'offline', timeline: cache.timeline });
+ await dataStorage.removeItem(key);
+ return 'notexist';
+ } else if (e instanceof HttpNetworkError) {
+ return { timeline: cache.timeline, type: 'cache' };
}
throw e;
}
}
- return result;
}
private _timelineSubscriptionHub = new SubscriptionHub<
string,
- TimelineInfoState
- >(
- (key) => key,
- () => ({
- state: 'loading',
- timeline: null,
- }),
- async (key) => {
- const result = await this.getCachedTimeline(key);
- void this.syncTimeline(key);
- return {
- state: 'syncing',
- timeline: result,
- };
- }
- );
+ TimelineWithSyncState
+ >({
+ setup: (key, next) => {
+ void this.fetchAndCacheTimeline(key).then((result) => {
+ if (result === 'offline') {
+ next({ syncState: 'offline', timeline: null });
+ } else if (result === 'notexist') {
+ next({ syncState: 'synced', timeline: null });
+ } else {
+ const { type, timeline } = result;
+ if (type === 'cache') {
+ next({ syncState: 'offline', timeline });
+ } else if (type === 'synced') {
+ next({ syncState: 'synced', timeline });
+ } else {
+ next({ syncState: 'new', timeline });
+ }
+ }
+ });
+ },
+ });
- get timelineHub(): ISubscriptionHub<string, TimelineInfoState> {
+ get timelineHub(): ISubscriptionHub<string, TimelineWithSyncState> {
return this._timelineSubscriptionHub;
}