From be9eb313ccad0832cb37e1c63e03608c47c2d171 Mon Sep 17 00:00:00 2001 From: crupest Date: Wed, 5 Aug 2020 23:29:14 +0800 Subject: Refactor a lot. --- Timeline/ClientApp/src/app/data/SubscriptionHub.ts | 117 ++++--- Timeline/ClientApp/src/app/data/queue.ts | 14 + Timeline/ClientApp/src/app/data/timeline.ts | 389 +++++++++++++++------ Timeline/ClientApp/src/app/data/user.ts | 6 +- .../src/app/timeline/TimelinePageTemplateUI.tsx | 8 +- 5 files changed, 364 insertions(+), 170 deletions(-) create mode 100644 Timeline/ClientApp/src/app/data/queue.ts (limited to 'Timeline/ClientApp/src') diff --git a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts index 87592da6..8a74c939 100644 --- a/Timeline/ClientApp/src/app/data/SubscriptionHub.ts +++ b/Timeline/ClientApp/src/app/data/SubscriptionHub.ts @@ -1,9 +1,3 @@ -// Remarks for SubscriptionHub: -// 1. Compared with 'push' sematics in rxjs subject, we need 'pull'. In other words, no subscription, no updating. -// 2. Make api easier to use and write less boilerplate codes. -// -// There might be some bugs, especially memory leaks and in asynchronization codes. - import { pull } from 'lodash'; export type Subscriber = (data: TData) => void; @@ -16,18 +10,23 @@ export class Subscription { } } -export class NoValue {} +export interface ISubscriptionLine { + readonly value: undefined | TData; + next(value: TData): void; +} -export class SubscriptionLine { - private _current: TData | NoValue = new NoValue(); +export class SubscriptionLine implements ISubscriptionLine { + private _current: TData | undefined = undefined; private _observers: Subscriber[] = []; - constructor(private config?: { onZeroObserver?: () => void }) {} + constructor( + private config?: { destroyable?: (value: TData | undefined) => boolean } + ) {} subscribe(subscriber: Subscriber): Subscription { this._observers.push(subscriber); - if (!(this._current instanceof NoValue)) { + if (this._current !== undefined) { subscriber(this._current); } return new Subscription(() => this.unsubscribe(subscriber)); @@ -36,9 +35,10 @@ export class SubscriptionLine { private unsubscribe(subscriber: Subscriber): void { if (!this._observers.includes(subscriber)) return; pull(this._observers, subscriber); - if (this._observers.length === 0) { - this?.config?.onZeroObserver?.(); - } + } + + get value(): TData | undefined { + return this._current; } next(value: TData): void { @@ -46,10 +46,13 @@ export class SubscriptionLine { this._observers.forEach((observer) => observer(value)); } - nextWithOld(updator: (old: TData | NoValue) => TData): void { - const value = updator(this._current); - this._current = value; - this._observers.forEach((observer) => observer(value)); + get destroyable(): boolean { + const customDestroyable = this.config?.destroyable; + + return ( + this._observers.length === 0 && + (customDestroyable != null ? customDestroyable(this._current) : true) + ); } } @@ -62,23 +65,25 @@ export class SubscriptionHub private keyToString: (key: TKey) => string; private setup?: ( key: TKey, - next: (value: TData) => void, - line: SubscriptionLine + line: ISubscriptionLine ) => (() => void) | void; + private destroyable?: (key: TKey, value: TData | undefined) => boolean; private readonly subscriptionLineMap = new Map< string, { line: SubscriptionLine; destroyer: (() => void) | undefined; - destroyTimer?: number; // Cancel it when resubscribe. } >(); + private cleanTimerId = 0; + // setup is called after creating line and if it returns a function as destroyer, then when the line is destroyed the destroyer will be called. constructor(config?: { keyToString?: (key: TKey) => string; - setup?: (key: TKey, next: (value: TData) => void) => (() => void) | void; + setup?: (key: TKey, line: ISubscriptionLine) => (() => void) | void; + destroyable?: (key: TKey, value: TData | undefined) => boolean; }) { this.keyToString = config?.keyToString ?? @@ -91,6 +96,28 @@ export class SubscriptionHub }); this.setup = config?.setup; + this.destroyable = config?.destroyable; + } + + private cleanLines(): void { + const toDelete: string[] = []; + for (const [key, info] of this.subscriptionLineMap.entries()) { + if (info.line.destroyable) { + info.destroyer?.(); + toDelete.push(key); + } + } + + if (toDelete.length === 0) return; + + for (const key of toDelete) { + this.subscriptionLineMap.delete(key); + } + + if (this.subscriptionLineMap.size === 0) { + window.clearInterval(this.cleanTimerId); + this.cleanTimerId = 0; + } } subscribe(key: TKey, subscriber: Subscriber): Subscription { @@ -98,48 +125,38 @@ export class SubscriptionHub const line = (() => { const info = this.subscriptionLineMap.get(keyString); if (info == null) { - const { setup } = this; + const { setup, destroyable } = this; const newLine = new SubscriptionLine({ - onZeroObserver: () => { - const i = this.subscriptionLineMap.get(keyString); - if (i != null) { - i.destroyTimer = window.setTimeout(() => { - i.destroyer?.(); - this.subscriptionLineMap.delete(keyString); - }, 10000); - } - }, + destroyable: + destroyable != null + ? (value) => destroyable(key, value) + : undefined, }); - const destroyer = setup?.(key, newLine.next.bind(newLine), newLine); + this.subscriptionLineMap.set(keyString, { + line: newLine, + destroyer: undefined, + }); + const destroyer = setup?.(key, newLine); + if (this.subscriptionLineMap.size === 0) { + this.cleanTimerId = window.setInterval( + this.cleanLines.bind(this), + 20000 + ); + } this.subscriptionLineMap.set(keyString, { line: newLine, destroyer: destroyer != null ? destroyer : undefined, }); return newLine; } else { - if (info.destroyTimer != null) { - window.clearTimeout(info.destroyTimer); - info.destroyTimer = undefined; - } return info.line; } })(); return line.subscribe(subscriber); } - update(key: TKey, value: TData): void { + getLine(key: TKey): ISubscriptionLine | null { const keyString = this.keyToString(key); - const info = this.subscriptionLineMap.get(keyString); - if (info != null) { - info.line.next(value); - } - } - - updateWithOld(key: TKey, updator: (old: TData | NoValue) => TData): void { - const keyString = this.keyToString(key); - const info = this.subscriptionLineMap.get(keyString); - if (info != null) { - info.line.nextWithOld(updator); - } + return this.subscriptionLineMap.get(keyString)?.line ?? null; } } diff --git a/Timeline/ClientApp/src/app/data/queue.ts b/Timeline/ClientApp/src/app/data/queue.ts new file mode 100644 index 00000000..001340a9 --- /dev/null +++ b/Timeline/ClientApp/src/app/data/queue.ts @@ -0,0 +1,14 @@ +const queueMap = new Map>(); + +export function queue(key: string, func: () => Promise): Promise { + const last = queueMap.get(key); + if (last == null) { + const promise = func(); + queueMap.set(key, promise.then(null, null)); + return promise; + } else { + const promise = last.then(() => func()); + queueMap.set(key, promise.then(null, null)); + return promise; + } +} diff --git a/Timeline/ClientApp/src/app/data/timeline.ts b/Timeline/ClientApp/src/app/data/timeline.ts index fb8a3874..22b10ca8 100644 --- a/Timeline/ClientApp/src/app/data/timeline.ts +++ b/Timeline/ClientApp/src/app/data/timeline.ts @@ -7,7 +7,8 @@ import { pull } from 'lodash'; import { convertError } from '../utilities/rxjs'; import { dataStorage } from './common'; -import { SubscriptionHub, ISubscriptionHub, NoValue } from './SubscriptionHub'; +import { queue } from './queue'; +import { SubscriptionHub, ISubscriptionHub } from './SubscriptionHub'; import { UserAuthInfo, checkLogin, userService, userInfoService } from './user'; @@ -31,6 +32,7 @@ import { } from '../http/timeline'; import { BlobWithEtag, NotModified, HttpNetworkError } from '../http/common'; import { HttpUser } from '../http/user'; +import { ExcludeKey } from '../utilities/type'; export type TimelineInfo = HttpTimelineInfo; export type TimelineChangePropertyRequest = HttpTimelinePatchRequest; @@ -71,8 +73,13 @@ export class TimelineNotExistError extends Error {} export class TimelineNameConflictError extends Error {} export type TimelineWithSyncState = + | { + syncState: 'loadcache'; // Loading cache now. + timeline?: undefined; + } | { syncState: + | 'syncing' // Cache loaded and syncing for the first time. | 'offline' // Sync failed and use cache. Null timeline means no cache. | 'synced'; // Sync succeeded. Null timeline means the timeline does not exist. timeline: TimelineInfo | null; @@ -82,14 +89,36 @@ export type TimelineWithSyncState = timeline: TimelineInfo; }; -export interface TimelinePostsWithSyncState { +export interface TimelinePostsTimelineWithSyncState { state: - | 'forbid' // The list is forbidden to see. + | 'loadcache' + | 'syncing' // Syncing now. + | 'offline' // Sync failed and use cache. | 'synced' // Sync succeeded. - | 'offline'; // Sync failed and use cache. + | 'forbid'; // The list is forbidden to see. posts: TimelinePostInfo[]; + timelineUniqueId: string; +} + +export interface TimelinePostsNoTimelineWithSyncState { + state: 'timeline-offline' | 'timeline-notexist'; + posts?: undefined; + timelineUniqueId?: undefined; } +export type TimelinePostsWithSyncState = + | TimelinePostsTimelineWithSyncState + | TimelinePostsNoTimelineWithSyncState; + +type FetchAndCacheTimelineResult = + | { timeline: TimelineInfo; type: 'new' | 'cache' | 'synced' } + | 'offline' + | 'notexist'; + +type FetchAndCachePostsResult = + | { posts: TimelinePostInfo[]; type: 'synced' | 'cache' } + | 'offline'; + interface TimelineCache { timeline: TimelineInfo; lastUpdated: string; @@ -108,13 +137,25 @@ export class TimelineService { return `timeline.${timelineName}`; } - private async fetchAndCacheTimeline( + private getCachedTimeline( + timelineName: string + ): Promise { + return dataStorage + .getItem(timelineName) + .then((cache) => cache?.timeline ?? null); + } + + private fetchAndCacheTimeline( + timelineName: string + ): Promise { + return queue(`TimelineService.fetchAndCacheTimeline.${timelineName}`, () => + this.doFetchAndCacheTimeline(timelineName) + ); + } + + private async doFetchAndCacheTimeline( timelineName: string - ): Promise< - | { timeline: TimelineInfo; type: 'new' | 'cache' | 'synced' } - | 'offline' - | 'notexist' - > { + ): Promise { const cache = await dataStorage.getItem(timelineName); const key = this.getTimelineKey(timelineName); @@ -169,28 +210,60 @@ export class TimelineService { } } + private async syncTimeline(timelineName: string): Promise { + const line = this._timelineSubscriptionHub.getLine(timelineName); + + if (line == null) { + console.log('No subscription, skip sync!'); + return; + } + + const old = line.value; + + if ( + old != null && + (old.syncState === 'loadcache' || old.syncState === 'syncing') + ) { + return; + } + + const next = line.next.bind(line); + + if (old == undefined) { + next({ syncState: 'loadcache' }); + const timeline = await this.getCachedTimeline(timelineName); + next({ syncState: 'syncing', timeline }); + } else { + next({ syncState: 'syncing', timeline: old?.timeline }); + } + + const result = await this.fetchAndCacheTimeline(timelineName); + + if (result === 'offline') { + next({ syncState: 'offline', timeline: null }); + } else if (result === 'notexist') { + next({ syncState: 'synced', timeline: null }); + } else { + const { type, timeline } = result; + if (type === 'cache') { + next({ syncState: 'offline', timeline }); + } else if (type === 'synced') { + next({ syncState: 'synced', timeline }); + } else { + next({ syncState: 'new', timeline }); + } + } + } + private _timelineSubscriptionHub = new SubscriptionHub< string, TimelineWithSyncState >({ - setup: (key, next) => { - void this.fetchAndCacheTimeline(key).then((result) => { - if (result === 'offline') { - next({ syncState: 'offline', timeline: null }); - } else if (result === 'notexist') { - next({ syncState: 'synced', timeline: null }); - } else { - const { type, timeline } = result; - if (type === 'cache') { - next({ syncState: 'offline', timeline }); - } else if (type === 'synced') { - next({ syncState: 'synced', timeline }); - } else { - next({ syncState: 'new', timeline }); - } - } - }); + setup: (key) => { + void this.syncTimeline(key); }, + destroyable: (_, value) => + value?.syncState !== 'loadcache' && value?.syncState !== 'syncing', }); get timelineHub(): ISubscriptionHub { @@ -220,10 +293,7 @@ export class TimelineService { getHttpTimelineClient() .patchTimeline(timelineName, req, user.token) .then((timeline) => { - this._timelineSubscriptionHub.update(timelineName, { - syncState: 'synced', - timeline, - }); + void this.syncTimeline(timelineName); return timeline; }) ); @@ -242,19 +312,8 @@ export class TimelineService { getHttpTimelineClient() .memberPut(timelineName, username, user.token) .then(() => { - userInfoService.getUserInfo(username).subscribe((newUser) => { - this._timelineSubscriptionHub.updateWithOld(timelineName, (old) => { - if (old instanceof NoValue || old.timeline == null) - throw new Error('Timeline not loaded.'); - - return { - ...old, - timeline: { - ...old.timeline, - members: [...old.timeline.members, newUser], - }, - }; - }); + userInfoService.getUserInfo(username).subscribe(() => { + void this.syncTimeline(timelineName); }); }) ); @@ -266,20 +325,7 @@ export class TimelineService { getHttpTimelineClient() .memberDelete(timelineName, username, user.token) .then(() => { - this._timelineSubscriptionHub.updateWithOld(timelineName, (old) => { - if (old instanceof NoValue || old.timeline == null) - throw new Error('Timeline not loaded.'); - - return { - ...old, - timeline: { - ...old.timeline, - members: old.timeline.members.filter( - (u) => u.username !== username - ), - }, - }; - }); + void this.syncTimeline(timelineName); }) ); } @@ -324,35 +370,70 @@ export class TimelineService { } }; - async fetchAndCachePosts( - timeline: TimelineInfo - ): Promise< - | { posts: TimelinePostInfo[]; type: 'synced' | 'cache' } - | 'forbid' - | 'offline' - > { - if (!this.hasReadPermission(userService.currentUser, timeline)) { - return 'forbid'; - } + private convertPostList = ( + posts: HttpTimelinePostInfo[], + dataProvider: ( + post: HttpTimelinePostInfo, + index: number + ) => Promise + ): Promise => { + return Promise.all( + posts.map((post, index) => + this.convertPost(post, () => dataProvider(post, index)) + ) + ); + }; + private async getCachedPosts(timeline: { + name: string; + uniqueId: string; + }): Promise { const postsInfoKey = this.getPostsInfoKey(timeline.uniqueId); const postsInfo = await dataStorage.getItem( postsInfoKey ); - const convertPostList = ( - posts: HttpTimelinePostInfo[], - dataProvider: ( - post: HttpTimelinePostInfo, - index: number - ) => Promise - ): Promise => { - return Promise.all( - posts.map((post, index) => - this.convertPost(post, () => dataProvider(post, index)) + if (postsInfo == null) return []; + + const httpPosts = await Promise.all( + postsInfo.idList.map((postId) => + dataStorage.getItem( + this.getPostKey(timeline.uniqueId, postId) ) - ); - }; + ) + ); + + const posts = await this.convertPostList(httpPosts, (post) => + dataStorage + .getItem( + this.getPostDataKey(timeline.uniqueId, post.id) + ) + .then((d) => d?.data) + ); + + return posts; + } + + private fetchAndCachePosts(timeline: { + name: string; + uniqueId: string; + }): Promise { + return queue( + `TimelineService.fetchAndCachePosts.${timeline.uniqueId}`, + () => this.doFetchAndCachePosts(timeline) + ); + } + + private async doFetchAndCachePosts(timeline: { + name: string; + uniqueId: string; + }): Promise { + const postsInfoKey = this.getPostsInfoKey(timeline.uniqueId); + const postsInfo = await dataStorage.getItem( + postsInfoKey + ); + + const convertPostList = this.convertPostList.bind(this); const now = new Date(); if (postsInfo == null) { @@ -521,32 +602,124 @@ export class TimelineService { } } + private syncPosts(timelineName: string): Promise { + const line = this._postsSubscriptionHub.getLine(timelineName); + if (line == null) return Promise.resolve(); + + const { value } = line; + + if ( + value != null && + value.timelineUniqueId != null && + value.state !== 'forbid' + ) { + return this.syncPostsWithUniqueId({ + name: timelineName, + uniqueId: value.timelineUniqueId, + }); + } else { + return Promise.resolve(); + } + } + + private async syncPostsWithUniqueId(timeline: { + name: string; + uniqueId: string; + }): Promise { + const line = this._postsSubscriptionHub.getLine(timeline.name); + if (line == null) return; + + if ( + line.value != null && + line.value.timelineUniqueId == timeline.uniqueId && + (line.value.state === 'loadcache' || line.value.state === 'syncing') + ) { + return; + } + + const next = ( + value: ExcludeKey + ): void => { + line.next({ + ...value, + timelineUniqueId: timeline.uniqueId, + }); + }; + + const uniqueIdChanged = (): boolean => { + return line.value?.timelineUniqueId !== timeline.uniqueId; + }; + + if (line.value == null) { + next({ + state: 'loadcache', + posts: [], + }); + const posts = await this.getCachedPosts(timeline); + if (uniqueIdChanged()) { + return; + } + next({ + state: 'syncing', + posts, + }); + } else { + next({ + state: 'syncing', + posts: line.value?.posts ?? [], + }); + } + + const result = await this.fetchAndCachePosts(timeline); + if (uniqueIdChanged()) { + return; + } + + if (result === 'offline') { + next({ state: 'offline', posts: [] }); + } else if (result.type === 'synced') { + next({ state: 'synced', posts: result.posts }); + } else { + next({ state: 'offline', posts: result.posts }); + } + } + private _postsSubscriptionHub = new SubscriptionHub< string, TimelinePostsWithSyncState >({ - setup: (key, next) => { + setup: (key, line) => { const sub = this.timelineHub.subscribe(key, (timelineState) => { - if (timelineState.timeline == null) { - if (timelineState.syncState === 'offline') { - next({ state: 'offline', posts: [] }); + if (timelineState.timeline != null) { + if ( + !this.hasReadPermission( + userService.currentUser, + timelineState.timeline + ) + ) { + line.next({ + state: 'forbid', + posts: [], + timelineUniqueId: timelineState.timeline.uniqueId, + }); } else { - next({ state: 'synced', posts: [] }); + if ( + line.value == null || + line.value.timelineUniqueId !== timelineState.timeline.uniqueId + ) { + void this.syncPostsWithUniqueId(timelineState.timeline); + } } } else { - void this.fetchAndCachePosts(timelineState.timeline).then( - (result) => { - if (result === 'forbid') { - next({ state: 'forbid', posts: [] }); - } else if (result === 'offline') { - next({ state: 'offline', posts: [] }); - } else if (result.type === 'synced') { - next({ state: 'synced', posts: result.posts }); - } else { - next({ state: 'offline', posts: result.posts }); - } - } - ); + if (timelineState.syncState === 'synced') { + line.next({ + state: 'timeline-notexist', + }); + } else if (timelineState.syncState === 'offline') { + line.next({ + state: 'timeline-offline', + }); + } } }); return () => { @@ -575,15 +748,7 @@ export class TimelineService { ) ) .then((post) => { - this._postsSubscriptionHub.updateWithOld(timelineName, (old) => { - if (old instanceof NoValue) { - throw new Error('Posts has not been loaded.'); - } - return { - ...old, - posts: [...old.posts, post], - }; - }); + void this.syncPosts(timelineName); return post; }) ).pipe(map((post) => ({ ...post, timelineName }))); @@ -595,15 +760,7 @@ export class TimelineService { getHttpTimelineClient() .deletePost(timelineName, postId, user.token) .then(() => { - this._postsSubscriptionHub.updateWithOld(timelineName, (old) => { - if (old instanceof NoValue) { - throw new Error('Posts has not been loaded.'); - } - return { - ...old, - posts: old.posts.filter((post) => post.id != postId), - }; - }); + void this.syncPosts(timelineName); }) ); } diff --git a/Timeline/ClientApp/src/app/data/user.ts b/Timeline/ClientApp/src/app/data/user.ts index 7d522b26..65b53a6f 100644 --- a/Timeline/ClientApp/src/app/data/user.ts +++ b/Timeline/ClientApp/src/app/data/user.ts @@ -230,11 +230,11 @@ export class UserNotExistError extends Error {} export class UserInfoService { private _avatarSubscriptionHub = new SubscriptionHub({ - setup: (key, next) => { + setup: (key, line) => { void getHttpUserClient() .getAvatar(key) .then((res) => { - next(res.data); + line.next(res.data); }); }, }); @@ -248,7 +248,7 @@ export class UserInfoService { async setAvatar(username: string, blob: Blob): Promise { const user = checkLogin(); await getHttpUserClient().putAvatar(username, blob, user.token); - this._avatarSubscriptionHub.update(username, blob); + this._avatarSubscriptionHub.getLine(username)?.next(blob); } get avatarHub(): ISubscriptionHub { diff --git a/Timeline/ClientApp/src/app/timeline/TimelinePageTemplateUI.tsx b/Timeline/ClientApp/src/app/timeline/TimelinePageTemplateUI.tsx index 42171e13..43925ebb 100644 --- a/Timeline/ClientApp/src/app/timeline/TimelinePageTemplateUI.tsx +++ b/Timeline/ClientApp/src/app/timeline/TimelinePageTemplateUI.tsx @@ -24,7 +24,7 @@ import Timeline, { import AppBar from '../common/AppBar'; import TimelinePostEdit, { TimelinePostSendCallback } from './TimelinePostEdit'; -type TimelinePostSyncState = 'syncing' | 'synced' | 'offline'; +type TimelinePostSyncState = 'loadcache' | 'syncing' | 'synced' | 'offline'; const TimelinePostSyncStateBadge: React.FC<{ state: TimelinePostSyncState; @@ -37,6 +37,7 @@ const TimelinePostSyncStateBadge: React.FC<{
{(() => { switch (state) { + case 'loadcache': case 'syncing': { return ( <> @@ -200,6 +201,11 @@ export default function TimelinePageTemplateUI( if (timeline != null) { let timelineBody: React.ReactElement; if (postListState != null) { + if (postListState.posts == null) { + throw new UiLogicError( + "Timeline is not null but postListState is 'timeline-notexist or 'timeline-offline'." + ); + } if (postListState.state === 'forbid') { timelineBody = (

{t('timeline.messageCantSee')}

-- cgit v1.2.3