diff options
author | crupest <crupest@outlook.com> | 2023-09-20 20:26:42 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-20 20:26:42 +0800 |
commit | f836d77e73f3ea0af45c5f71dae7268143d6d86f (patch) | |
tree | 573cfafd972106d69bef0d41ff5f270ec3c43ec2 /FrontEnd/src/services/timeline.ts | |
parent | 4a069bf1268f393d5467166356f691eb89963152 (diff) | |
parent | 901fe3d7c032d284da5c9bce24c4aaee9054c7ac (diff) | |
download | timeline-f836d77e73f3ea0af45c5f71dae7268143d6d86f.tar.gz timeline-f836d77e73f3ea0af45c5f71dae7268143d6d86f.tar.bz2 timeline-f836d77e73f3ea0af45c5f71dae7268143d6d86f.zip |
Merge pull request #1395 from crupest/dev
Refector 2023 v0.1
Diffstat (limited to 'FrontEnd/src/services/timeline.ts')
-rw-r--r-- | FrontEnd/src/services/timeline.ts | 199 |
1 files changed, 150 insertions, 49 deletions
diff --git a/FrontEnd/src/services/timeline.ts b/FrontEnd/src/services/timeline.ts index 707c956f..41a7bff0 100644 --- a/FrontEnd/src/services/timeline.ts +++ b/FrontEnd/src/services/timeline.ts @@ -1,9 +1,22 @@ -import { TimelineVisibility } from "@/http/timeline"; import XRegExp from "xregexp"; -import { Observable } from "rxjs"; -import { HubConnectionBuilder, HubConnectionState } from "@microsoft/signalr"; - -import { getHttpToken } from "@/http/common"; +import { + Observable, + BehaviorSubject, + switchMap, + filter, + first, + distinctUntilChanged, +} from "rxjs"; +import { + HubConnection, + HubConnectionBuilder, + HubConnectionState, +} from "@microsoft/signalr"; + +import { TimelineVisibility } from "~src/http/timeline"; +import { token$ } from "~src/http/common"; + +// cSpell:ignore onreconnected onreconnecting const timelineNameReg = XRegExp("^[-_\\p{L}]*$", "u"); @@ -20,17 +33,23 @@ export const timelineVisibilityTooltipTranslationMap: Record< Private: "timeline.visibilityTooltip.private", }; -export function getTimelinePostUpdate$( - owner: string, - timeline: string, -): Observable<{ update: boolean; state: HubConnectionState }> { - return new Observable((subscriber) => { - subscriber.next({ - update: false, - state: HubConnectionState.Connecting, - }); +type ConnectionState = + | "Connecting" + | "Reconnecting" + | "Disconnected" + | "Connected"; + +type Connection = { + connection: HubConnection; + state$: Observable<ConnectionState>; +}; + +function createConnection$(token: string | null): Observable<Connection> { + return new Observable<Connection>((subscriber) => { + const connectionStateSubject = new BehaviorSubject<ConnectionState>( + "Connecting", + ); - const token = getHttpToken(); const connection = new HubConnectionBuilder() .withUrl("/api/hub/timeline", { accessTokenFactory: token == null ? undefined : () => token, @@ -38,56 +57,138 @@ export function getTimelinePostUpdate$( .withAutomaticReconnect() .build(); - const o = owner; - const t = timeline; + connection.onclose = () => { + connectionStateSubject.next("Disconnected"); + }; + + connection.onreconnecting = () => { + connectionStateSubject.next("Reconnecting"); + }; + + connection.onreconnected = () => { + connectionStateSubject.next("Connected"); + }; + + let requestStopped = false; + + void connection.start().then( + () => { + connectionStateSubject.next("Connected"); + }, + (e) => { + if (!requestStopped) { + throw e; + } + }, + ); + + subscriber.next({ + connection, + state$: connectionStateSubject.asObservable(), + }); + + return () => { + void connection.stop(); + requestStopped = true; + }; + }); +} + +const connectionSubject = new BehaviorSubject<Connection | null>(null); + +token$ + .pipe(distinctUntilChanged(), switchMap(createConnection$)) + .subscribe(connectionSubject); + +const connection$ = connectionSubject + .asObservable() + .pipe(filter((c): c is Connection => c != null)); + +function createTimelinePostUpdateCount$( + connection: Connection, + owner: string, + timeline: string, +): Observable<number> { + const [o, t] = [owner, timeline]; + return new Observable<number>((subscriber) => { + const hubConnection = connection.connection; + let count = 0; const handler = (owner: string, timeline: string): void => { if (owner === o && timeline === t) { - subscriber.next({ update: true, state: connection.state }); + subscriber.next(count++); } }; - connection.onclose(() => { - subscriber.next({ - update: false, - state: HubConnectionState.Disconnected, + let hubOn = false; + + const subscription = connection.state$ + .pipe(first((state) => state === "Connected")) + .subscribe(() => { + hubConnection.on("OnTimelinePostChangedV2", handler); + void hubConnection.invoke( + "SubscribeTimelinePostChangeV2", + owner, + timeline, + ); + hubOn = true; }); - }); - connection.onreconnecting(() => { - subscriber.next({ - update: false, - state: HubConnectionState.Reconnecting, - }); - }); + return () => { + if (hubOn) { + void hubConnection.invoke( + "UnsubscribeTimelinePostChangeV2", + owner, + timeline, + ); + hubConnection.off("OnTimelinePostChangedV2", handler); + } + + subscription.unsubscribe(); + }; + }); +} + +type OldUpdateInfo = { update: boolean; state: HubConnectionState }; - connection.onreconnected(() => { +function createTimelinePostOldUpdateInfo$( + connection: Connection, + owner: string, + timeline: string, +): Observable<OldUpdateInfo> { + return new Observable<OldUpdateInfo>((subscriber) => { + let savedState: ConnectionState = "Connecting"; + + const postUpdateSubscription = createTimelinePostUpdateCount$( + connection, + owner, + timeline, + ).subscribe(() => { subscriber.next({ - update: false, - state: HubConnectionState.Connected, + update: true, + state: savedState as HubConnectionState, }); }); - connection.on("OnTimelinePostChangedV2", handler); - - void connection.start().then(() => { - subscriber.next({ update: false, state: HubConnectionState.Connected }); - - return connection.invoke( - "SubscribeTimelinePostChangeV2", - owner, - timeline, - ); + const stateSubscription = connection.state$.subscribe((state) => { + savedState = state; + subscriber.next({ update: false, state: state as HubConnectionState }); }); return () => { - connection.off("OnTimelinePostChangedV2", handler); - - if (connection.state === HubConnectionState.Connected) { - void connection - .invoke("UnsubscribeTimelinePostChangeV2", owner, timeline) - .then(() => connection.stop()); - } + stateSubscription.unsubscribe(); + postUpdateSubscription.unsubscribe(); }; }); } + +export function getTimelinePostUpdate$( + owner: string, + timeline: string, +): Observable<OldUpdateInfo> { + return connection$.pipe( + switchMap((connection) => + createTimelinePostOldUpdateInfo$(connection, owner, timeline), + ), + ); +} |