diff options
author | crupest <crupest@outlook.com> | 2020-08-26 00:51:35 +0800 |
---|---|---|
committer | crupest <crupest@outlook.com> | 2020-08-26 00:51:35 +0800 |
commit | 35494ebade7722e02d0870eb6ce85600831c077d (patch) | |
tree | ff8b3fdf540c3a18870214ad16262593bbb0cb3a /Timeline | |
parent | c620d1d66757567f6b0f7abc66bc0f4c02be7ad3 (diff) | |
download | timeline-35494ebade7722e02d0870eb6ce85600831c077d.tar.gz timeline-35494ebade7722e02d0870eb6ce85600831c077d.tar.bz2 timeline-35494ebade7722e02d0870eb6ce85600831c077d.zip |
DataHub v3.0 . Also fix the bug that posts may be wrong if timeline has changed with same name.
Diffstat (limited to 'Timeline')
-rw-r--r-- | Timeline/ClientApp/src/app/data/DataHub.ts | 91 | ||||
-rw-r--r-- | Timeline/ClientApp/src/app/data/timeline.ts | 357 | ||||
-rw-r--r-- | Timeline/ClientApp/src/app/data/user.ts | 137 |
3 files changed, 307 insertions, 278 deletions
diff --git a/Timeline/ClientApp/src/app/data/DataHub.ts b/Timeline/ClientApp/src/app/data/DataHub.ts index e6be740d..93a9b41f 100644 --- a/Timeline/ClientApp/src/app/data/DataHub.ts +++ b/Timeline/ClientApp/src/app/data/DataHub.ts @@ -9,13 +9,22 @@ export type WithSyncStatus<T> = T & { syncing: boolean }; export class DataLine<TData> { private _current: TData | undefined = undefined; + private _syncPromise: Promise<void> | null = null; private _syncingSubject = new BehaviorSubject<boolean>(false); private _observers: Subscriber<TData>[] = []; constructor( - private config?: { destroyable?: (value: TData | undefined) => boolean } - ) {} + private config: { + sync: () => Promise<void>; + destroyable?: (value: TData | undefined) => boolean; + disableInitSync?: boolean; + } + ) { + if (config.disableInitSync !== true) { + setImmediate(() => void this.sync()); + } + } private subscribe(subscriber: Subscriber<TData>): void { this._observers.push(subscriber); @@ -68,19 +77,33 @@ export class DataLine<TData> { } get isSyncing(): boolean { - return this._syncingSubject.value; + return this._syncPromise != null; } - beginSync(): void { - if (!this._syncingSubject.value) { + sync(): Promise<void> { + if (this._syncPromise == null) { this._syncingSubject.next(true); + this._syncPromise = this.config.sync().then(() => { + this._syncingSubject.next(false); + this._syncPromise = null; + }); } + + return this._syncPromise; } - endSync(): void { - if (this._syncingSubject.value) { - this._syncingSubject.next(false); + syncWithAction( + syncAction: (line: DataLine<TData>) => Promise<void> + ): Promise<void> { + if (this._syncPromise == null) { + this._syncingSubject.next(true); + this._syncPromise = syncAction(this).then(() => { + this._syncingSubject.next(false); + this._syncPromise = null; + }); } + + return this._syncPromise; } get destroyable(): boolean { @@ -88,20 +111,15 @@ export class DataLine<TData> { return ( this._observers.length === 0 && - !this._syncingSubject.value && + !this.isSyncing && (customDestroyable != null ? customDestroyable(this._current) : true) ); } - - endSyncAndNext(value: TData): void { - this.endSync(); - this.next(value); - } } export class DataHub<TKey, TData> { + private sync: (key: TKey, line: DataLine<TData>) => Promise<void>; private keyToString: (key: TKey) => string; - private setup?: (key: TKey, line: DataLine<TData>) => (() => void) | void; private destroyable?: (key: TKey, value: TData | undefined) => boolean; private readonly subscriptionLineMap = new Map<string, DataLine<TData>>(); @@ -109,13 +127,14 @@ export class DataHub<TKey, TData> { private cleanTimerId = 0; // setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called. - constructor(config?: { + constructor(config: { + sync: (key: TKey, line: DataLine<TData>) => Promise<void>; keyToString?: (key: TKey) => string; - setup?: (key: TKey, line: DataLine<TData>) => void; destroyable?: (key: TKey, value: TData | undefined) => boolean; }) { + this.sync = config.sync; this.keyToString = - config?.keyToString ?? + config.keyToString ?? ((value): string => { if (typeof value === "string") return value; else @@ -124,8 +143,7 @@ export class DataHub<TKey, TData> { ); }); - this.setup = config?.setup; - this.destroyable = config?.destroyable; + this.destroyable = config.destroyable; } private cleanLines(): void { @@ -148,17 +166,16 @@ export class DataHub<TKey, TData> { } } - private createLine(key: TKey, useSetup = true): DataLine<TData> { + private createLine(key: TKey, disableInitSync = false): DataLine<TData> { const keyString = this.keyToString(key); - const { setup, destroyable } = this; - const newLine = new DataLine<TData>({ + const { destroyable } = this; + const newLine: DataLine<TData> = new DataLine<TData>({ + sync: () => this.sync(key, newLine), destroyable: destroyable != null ? (value) => destroyable(key, value) : undefined, + disableInitSync: disableInitSync, }); this.subscriptionLineMap.set(keyString, newLine); - if (useSetup) { - setup?.(key, newLine); - } if (this.subscriptionLineMap.size === 1) { this.cleanTimerId = window.setInterval(this.cleanLines.bind(this), 20000); } @@ -166,17 +183,17 @@ export class DataHub<TKey, TData> { } getObservable(key: TKey): Observable<TData> { - return this.getLineOrCreateWithSetup(key).getObservable(); + return this.getLineOrCreate(key).getObservable(); } getSyncStatusObservable(key: TKey): Observable<boolean> { - return this.getLineOrCreateWithSetup(key).getSyncStatusObservable(); + return this.getLineOrCreate(key).getSyncStatusObservable(); } getDataWithSyncStatusObservable( key: TKey ): Observable<WithSyncStatus<TData>> { - return this.getLineOrCreateWithSetup(key).getDataWithSyncStatusObservable(); + return this.getLineOrCreate(key).getDataWithSyncStatusObservable(); } getLine(key: TKey): DataLine<TData> | null { @@ -184,15 +201,25 @@ export class DataHub<TKey, TData> { return this.subscriptionLineMap.get(keyString) ?? null; } - getLineOrCreateWithSetup(key: TKey): DataLine<TData> { + getLineOrCreate(key: TKey): DataLine<TData> { const keyString = this.keyToString(key); return this.subscriptionLineMap.get(keyString) ?? this.createLine(key); } - getLineOrCreateWithoutSetup(key: TKey): DataLine<TData> { + getLineOrCreateWithoutInitSync(key: TKey): DataLine<TData> { const keyString = this.keyToString(key); return ( - this.subscriptionLineMap.get(keyString) ?? this.createLine(key, false) + this.subscriptionLineMap.get(keyString) ?? this.createLine(key, true) ); } + + optionalInitLineWithSyncAction( + key: TKey, + syncAction: (line: DataLine<TData>) => Promise<void> + ): Promise<void> { + const optionalLine = this.getLine(key); + if (optionalLine != null) return Promise.resolve(); + const line = this.createLine(key, true); + return line.syncWithAction(syncAction); + } } diff --git a/Timeline/ClientApp/src/app/data/timeline.ts b/Timeline/ClientApp/src/app/data/timeline.ts index 8e8860a3..0e11b4e7 100644 --- a/Timeline/ClientApp/src/app/data/timeline.ts +++ b/Timeline/ClientApp/src/app/data/timeline.ts @@ -114,6 +114,13 @@ export class TimelineService { .then(); } + private async clearTimelineData(timelineName: string): Promise<void> { + const keys = (await dataStorage.keys()).filter((k) => + k.startsWith(`timeline.${timelineName}`) + ); + await Promise.all(keys.map((k) => dataStorage.removeItem(k))); + } + private convertHttpTimelineToData(timeline: HttpTimelineInfo): TimelineData { return { ...timeline, @@ -122,44 +129,6 @@ export class TimelineService { }; } - private async syncTimeline(timelineName: string): Promise<void> { - const line = this._timelineHub.getLineOrCreateWithoutSetup(timelineName); - if (line.isSyncing) return; - - if (line.value == undefined) { - const cache = await this.getCachedTimeline(timelineName); - if (cache != null) { - line.next({ type: "cache", timeline: cache }); - } - } - - try { - const httpTimeline = await getHttpTimelineClient().getTimeline( - timelineName - ); - - [httpTimeline.owner, ...httpTimeline.members].forEach( - (user) => void userInfoService.saveUser(user) - ); - - const timeline = this.convertHttpTimelineToData(httpTimeline); - await this.saveTimeline(timelineName, timeline); - line.endSyncAndNext({ type: "synced", timeline }); - } catch (e) { - if (e instanceof HttpTimelineNotExistError) { - line.endSyncAndNext({ type: "synced", timeline: null }); - } else { - const cache = await this.getCachedTimeline(timelineName); - if (cache == null) { - line.endSyncAndNext({ type: "offline", timeline: null }); - } else { - line.endSyncAndNext({ type: "offline", timeline: cache }); - } - throwIfNotNetworkError(e); - } - } - } - private _timelineHub = new DataHub< string, | { @@ -171,11 +140,54 @@ export class TimelineService { timeline: TimelineData | null; } >({ - setup: (key) => { - void this.syncTimeline(key); + sync: async (key, line) => { + const cache = await this.getCachedTimeline(key); + + if (line.value == undefined) { + if (cache != null) { + line.next({ type: "cache", timeline: cache }); + } + } + + try { + const httpTimeline = await getHttpTimelineClient().getTimeline(key); + + await userInfoService.saveUsers([ + httpTimeline.owner, + ...httpTimeline.members, + ]); + + const timeline = this.convertHttpTimelineToData(httpTimeline); + + if (cache != null && timeline.uniqueId !== cache.uniqueId) { + console.log( + `Timeline with name ${key} has changed to a new one. Clear old data.` + ); + await this.clearTimelineData(key); // If timeline has changed, clear all old data. + } + + await this.saveTimeline(key, timeline); + + line.next({ type: "synced", timeline }); + } catch (e) { + if (e instanceof HttpTimelineNotExistError) { + line.next({ type: "synced", timeline: null }); + } else { + if (cache == null) { + line.next({ type: "offline", timeline: null }); + } else { + line.next({ type: "offline", timeline: cache }); + } + throwIfNotNetworkError(e); + } + } }, }); + syncTimeline(timelineName: string): Promise<void> { + return this._timelineHub.getLineOrCreate(timelineName).sync(); + } + getTimeline$(timelineName: string): Observable<TimelineWithSyncStatus> { return this._timelineHub.getDataWithSyncStatusObservable(timelineName).pipe( switchMap((state) => { @@ -292,116 +304,115 @@ export class TimelineService { .then(); } - private async syncPosts(timelineName: string): Promise<void> { - const line = this._postsHub.getLineOrCreateWithoutSetup(timelineName); - if (line.isSyncing) return; - line.beginSync(); + private syncPosts(timelineName: string): Promise<void> { + return this._postsHub.getLineOrCreate(timelineName).sync(); + } - if (line.value == null) { - const cache = await this.getCachedPosts(timelineName); - if (cache != null) { - line.next({ type: "cache", posts: cache }); - } + private _postsHub = new DataHub< + string, + { + type: "cache" | "offline" | "synced" | "forbid" | "notexist"; + posts: TimelinePostData[]; } + >({ + sync: async (key, line) => { + // Wait for timeline synced. In case the timeline has changed to another and old data has been cleaned. + await this.syncTimeline(key); + + if (line.value == null) { + const cache = await this.getCachedPosts(key); + if (cache != null) { + line.next({ type: "cache", posts: cache }); + } + } - const now = new Date(); + const now = new Date(); - const lastUpdatedTime = await dataStorage.getItem<Date | null>( - `timeline.${timelineName}.lastUpdated` - ); + const lastUpdatedTime = await dataStorage.getItem<Date | null>( + `timeline.${key}.lastUpdated` + ); - try { - if (lastUpdatedTime == null) { - const httpPosts = await getHttpTimelineClient().listPost( - timelineName, - userService.currentUser?.token - ); + try { + if (lastUpdatedTime == null) { + const httpPosts = await getHttpTimelineClient().listPost( + key, + userService.currentUser?.token + ); - uniqBy( - httpPosts.map((post) => post.author), - "username" - ).forEach((user) => void userInfoService.saveUser(user)); + await userInfoService.saveUsers( + uniqBy( + httpPosts.map((post) => post.author), + "username" + ) + ); - const posts = this.convertHttpPostToDataList(httpPosts); - await this.savePosts(timelineName, posts); - await dataStorage.setItem<Date>( - `timeline.${timelineName}.lastUpdated`, - now - ); + const posts = this.convertHttpPostToDataList(httpPosts); + await this.savePosts(key, posts); + await dataStorage.setItem<Date>(`timeline.${key}.lastUpdated`, now); - line.endSyncAndNext({ type: "synced", posts }); - } else { - const httpPosts = await getHttpTimelineClient().listPost( - timelineName, - userService.currentUser?.token, - { - modifiedSince: lastUpdatedTime, - includeDeleted: true, - } - ); + line.next({ type: "synced", posts }); + } else { + const httpPosts = await getHttpTimelineClient().listPost( + key, + userService.currentUser?.token, + { + modifiedSince: lastUpdatedTime, + includeDeleted: true, + } + ); - const deletedIds = httpPosts.filter((p) => p.deleted).map((p) => p.id); - const changed = httpPosts.filter( - (p): p is HttpTimelinePostInfo => !p.deleted - ); + const deletedIds = httpPosts + .filter((p) => p.deleted) + .map((p) => p.id); + const changed = httpPosts.filter( + (p): p is HttpTimelinePostInfo => !p.deleted + ); - uniqBy( - httpPosts - .map((post) => post.author) - .filter((u): u is HttpUser => u != null), - "username" - ).forEach((user) => void userInfoService.saveUser(user)); + await userInfoService.saveUsers( + uniqBy( + httpPosts + .map((post) => post.author) + .filter((u): u is HttpUser => u != null), + "username" + ) + ); - const cache = (await this.getCachedPosts(timelineName)) ?? []; + const cache = (await this.getCachedPosts(key)) ?? []; - const posts = cache.filter((p) => !deletedIds.includes(p.id)); + const posts = cache.filter((p) => !deletedIds.includes(p.id)); - for (const changedPost of changed) { - const savedChangedPostIndex = posts.findIndex( - (p) => p.id === changedPost.id - ); - if (savedChangedPostIndex === -1) { - posts.push(this.convertHttpPostToData(changedPost)); - } else { - posts[savedChangedPostIndex] = this.convertHttpPostToData( - changedPost + for (const changedPost of changed) { + const savedChangedPostIndex = posts.findIndex( + (p) => p.id === changedPost.id ); + if (savedChangedPostIndex === -1) { + posts.push(this.convertHttpPostToData(changedPost)); + } else { + posts[savedChangedPostIndex] = this.convertHttpPostToData( + changedPost + ); + } } - } - await this.savePosts(timelineName, posts); - await dataStorage.setItem<Date>( - `timeline.${timelineName}.lastUpdated`, - now - ); - line.endSyncAndNext({ type: "synced", posts }); - } - } catch (e) { - if (e instanceof HttpTimelineNotExistError) { - line.endSyncAndNext({ type: "notexist", posts: [] }); - } else if (e instanceof HttpForbiddenError) { - line.endSyncAndNext({ type: "forbid", posts: [] }); - } else { - const cache = await this.getCachedPosts(timelineName); - if (cache == null) { - line.endSyncAndNext({ type: "offline", posts: [] }); + await this.savePosts(key, posts); + await dataStorage.setItem<Date>(`timeline.${key}.lastUpdated`, now); + line.next({ type: "synced", posts }); + } + } catch (e) { + if (e instanceof HttpTimelineNotExistError) { + line.next({ type: "notexist", posts: [] }); + } else if (e instanceof HttpForbiddenError) { + line.next({ type: "forbid", posts: [] }); } else { - line.endSyncAndNext({ type: "offline", posts: cache }); + const cache = await this.getCachedPosts(key); + if (cache == null) { + line.next({ type: "offline", posts: [] }); + } else { + line.next({ type: "offline", posts: cache }); + } + throwIfNotNetworkError(e); } - throwIfNotNetworkError(e); } - } - } - - private _postsHub = new DataHub< - string, - { - type: "cache" | "offline" | "synced" | "forbid" | "notexist"; - posts: TimelinePostData[]; - } - >({ - setup: (key) => { - void this.syncPosts(key); }, }); @@ -479,51 +490,11 @@ export class TimelineService { .then(); } - private async syncPostData(key: { + private syncPostData(key: { timelineName: string; postId: number; }): Promise<void> { - const line = this._postDataHub.getLineOrCreateWithoutSetup(key); - if (line.isSyncing) return; - line.beginSync(); - - const cache = await this.getCachedPostData(key); - if (line.value == null) { - if (cache != null) { - line.next({ type: "cache", data: cache.data }); - } - } - - if (cache == null) { - try { - const res = await getHttpTimelineClient().getPostData( - key.timelineName, - key.postId - ); - await this.savePostData(key, res); - line.endSyncAndNext({ data: res.data, type: "synced" }); - } catch (e) { - line.endSyncAndNext({ type: "offline" }); - throwIfNotNetworkError(e); - } - } else { - try { - const res = await getHttpTimelineClient().getPostData( - key.timelineName, - key.postId, - cache.etag - ); - if (res instanceof NotModified) { - line.endSyncAndNext({ data: cache.data, type: "synced" }); - } else { - await this.savePostData(key, res); - line.endSyncAndNext({ data: res.data, type: "synced" }); - } - } catch (e) { - line.endSyncAndNext({ data: cache.data, type: "offline" }); - throwIfNotNetworkError(e); - } - } + return this._postDataHub.getLineOrCreate(key).sync(); } private _postDataHub = new DataHub< @@ -532,8 +503,44 @@ export class TimelineService { | { data?: undefined; type: "notexist" | "offline" } >({ keyToString: (key) => `${key.timelineName}.${key.postId}`, - setup: (key) => { - void this.syncPostData(key); + sync: async (key, line) => { + const cache = await this.getCachedPostData(key); + if (line.value == null) { + if (cache != null) { + line.next({ type: "cache", data: cache.data }); + } + } + + if (cache == null) { + try { + const res = await getHttpTimelineClient().getPostData( + key.timelineName, + key.postId + ); + await this.savePostData(key, res); + line.next({ data: res.data, type: "synced" }); + } catch (e) { + line.next({ type: "offline" }); + throwIfNotNetworkError(e); + } + } else { + try { + const res = await getHttpTimelineClient().getPostData( + key.timelineName, + key.postId, + cache.etag + ); + if (res instanceof NotModified) { + line.next({ data: cache.data, type: "synced" }); + } else { + await this.savePostData(key, res); + line.next({ data: res.data, type: "synced" }); + } + } catch (e) { + line.next({ data: cache.data, type: "offline" }); + throwIfNotNetworkError(e); + } + } }, }); diff --git a/Timeline/ClientApp/src/app/data/user.ts b/Timeline/ClientApp/src/app/data/user.ts index f42b2d58..66fcd83c 100644 --- a/Timeline/ClientApp/src/app/data/user.ts +++ b/Timeline/ClientApp/src/app/data/user.ts @@ -226,13 +226,16 @@ export function checkLogin(): UserWithToken { export class UserNotExistError extends Error {} export class UserInfoService { - async saveUser(user: HttpUser): Promise<void> { + saveUser(user: HttpUser): Promise<void> { const key = user.username; - const line = this._userHub.getLineOrCreateWithoutSetup(key); - if (line.isSyncing) return; - line.beginSync(); - await this.doSaveUser(user); - line.endSyncAndNext({ user, type: "synced" }); + return this._userHub.optionalInitLineWithSyncAction(key, async (line) => { + await this.doSaveUser(user); + line.next({ user, type: "synced" }); + }); + } + + saveUsers(users: HttpUser[]): Promise<void> { + return Promise.all(users.map((user) => this.saveUser(user))).then(); } private getCachedUser(username: string): Promise<User | null> { @@ -243,31 +246,8 @@ export class UserInfoService { return dataStorage.setItem<HttpUser>(`user.${user.username}`, user).then(); } - private async syncUser(username: string): Promise<void> { - const line = this._userHub.getLineOrCreateWithoutSetup(username); - if (line.isSyncing) return; - line.beginSync(); - - if (line.value == undefined) { - const cache = await this.getCachedUser(username); - if (cache != null) { - line.next({ user: cache, type: "cache" }); - } - } - - try { - const res = await getHttpUserClient().get(username); - await this.doSaveUser(res); - line.endSyncAndNext({ user: res, type: "synced" }); - } catch (e) { - if (e instanceof HttpUserNotExistError) { - line.endSyncAndNext({ type: "notexist" }); - } else { - const cache = await this.getCachedUser(username); - line.endSyncAndNext({ user: cache ?? undefined, type: "offline" }); - throwIfNotNetworkError(e); - } - } + syncUser(username: string): Promise<void> { + return this._userHub.getLineOrCreate(username).sync(); } private _userHub = new DataHub< @@ -275,8 +255,27 @@ export class UserInfoService { | { user: User; type: "cache" | "synced" | "offline" } | { user?: undefined; type: "notexist" | "offline" } >({ - setup: (key) => { - void this.syncUser(key); + sync: async (key, line) => { + if (line.value == undefined) { + const cache = await this.getCachedUser(key); + if (cache != null) { + line.next({ user: cache, type: "cache" }); + } + } + + try { + const res = await getHttpUserClient().get(key); + await this.doSaveUser(res); + line.next({ user: res, type: "synced" }); + } catch (e) { + if (e instanceof HttpUserNotExistError) { + line.next({ type: "notexist" }); + } else { + const cache = await this.getCachedUser(key); + line.next({ user: cache ?? undefined, type: "offline" }); + throwIfNotNetworkError(e); + } + } }, }); @@ -297,42 +296,8 @@ export class UserInfoService { .then(); } - private async syncAvatar(username: string): Promise<void> { - const line = this._avatarHub.getLineOrCreateWithoutSetup(username); - if (line.isSyncing) return; - line.beginSync(); - - const cache = await this.getCachedAvatar(username); - if (line.value == null) { - if (cache != null) { - line.next({ data: cache.data, type: "cache" }); - } - } - - if (cache == null) { - try { - const avatar = await getHttpUserClient().getAvatar(username); - await this.saveAvatar(username, avatar); - line.endSyncAndNext({ data: avatar.data, type: "synced" }); - } catch (e) { - line.endSyncAndNext({ type: "offline" }); - throwIfNotNetworkError(e); - } - } else { - try { - const res = await getHttpUserClient().getAvatar(username, cache.etag); - if (res instanceof NotModified) { - line.endSyncAndNext({ data: cache.data, type: "synced" }); - } else { - const avatar = res; - await this.saveAvatar(username, avatar); - line.endSyncAndNext({ data: avatar.data, type: "synced" }); - } - } catch (e) { - line.endSyncAndNext({ data: cache.data, type: "offline" }); - throwIfNotNetworkError(e); - } - } + syncAvatar(username: string): Promise<void> { + return this._avatarHub.getLineOrCreate(username).sync(); } private _avatarHub = new DataHub< @@ -340,8 +305,38 @@ export class UserInfoService { | { data: Blob; type: "cache" | "synced" | "offline" } | { data?: undefined; type: "notexist" | "offline" } >({ - setup: (key) => { - void this.syncAvatar(key); + sync: async (key, line) => { + const cache = await this.getCachedAvatar(key); + if (line.value == null) { + if (cache != null) { + line.next({ data: cache.data, type: "cache" }); + } + } + + if (cache == null) { + try { + const avatar = await getHttpUserClient().getAvatar(key); + await this.saveAvatar(key, avatar); + line.next({ data: avatar.data, type: "synced" }); + } catch (e) { + line.next({ type: "offline" }); + throwIfNotNetworkError(e); + } + } else { + try { + const res = await getHttpUserClient().getAvatar(key, cache.etag); + if (res instanceof NotModified) { + line.next({ data: cache.data, type: "synced" }); + } else { + const avatar = res; + await this.saveAvatar(key, avatar); + line.next({ data: avatar.data, type: "synced" }); + } + } catch (e) { + line.next({ data: cache.data, type: "offline" }); + throwIfNotNetworkError(e); + } + } }, }); |