aboutsummaryrefslogtreecommitdiff
path: root/FrontEnd/src
diff options
context:
space:
mode:
Diffstat (limited to 'FrontEnd/src')
-rw-r--r--FrontEnd/src/services/timeline.ts162
1 files changed, 114 insertions, 48 deletions
diff --git a/FrontEnd/src/services/timeline.ts b/FrontEnd/src/services/timeline.ts
index 707c956f..d695b481 100644
--- a/FrontEnd/src/services/timeline.ts
+++ b/FrontEnd/src/services/timeline.ts
@@ -1,9 +1,15 @@
-import { TimelineVisibility } from "@/http/timeline";
import XRegExp from "xregexp";
-import { Observable } from "rxjs";
-import { HubConnectionBuilder, HubConnectionState } from "@microsoft/signalr";
+import { Observable, BehaviorSubject, switchMap, filter } from "rxjs";
+import {
+ HubConnection,
+ HubConnectionBuilder,
+ HubConnectionState,
+} from "@microsoft/signalr";
+
+import { TimelineVisibility } from "@/http/timeline";
+import { token$ } from "@/http/common";
-import { getHttpToken } from "@/http/common";
+// cSpell:ignore onreconnected onreconnecting
const timelineNameReg = XRegExp("^[-_\\p{L}]*$", "u");
@@ -20,17 +26,23 @@ export const timelineVisibilityTooltipTranslationMap: Record<
Private: "timeline.visibilityTooltip.private",
};
-export function getTimelinePostUpdate$(
- owner: string,
- timeline: string,
-): Observable<{ update: boolean; state: HubConnectionState }> {
- return new Observable((subscriber) => {
- subscriber.next({
- update: false,
- state: HubConnectionState.Connecting,
- });
+type ConnectionState =
+ | "Connecting"
+ | "Reconnecting"
+ | "Disconnected"
+ | "Connected";
+
+type Connection = {
+ connection: HubConnection;
+ state$: Observable<ConnectionState>;
+};
+
+function createConnection$(token: string | null): Observable<Connection> {
+ return new Observable<Connection>((subscriber) => {
+ const connectionStateSubject = new BehaviorSubject<ConnectionState>(
+ "Connecting",
+ );
- const token = getHttpToken();
const connection = new HubConnectionBuilder()
.withUrl("/api/hub/timeline", {
accessTokenFactory: token == null ? undefined : () => token,
@@ -38,56 +50,110 @@ export function getTimelinePostUpdate$(
.withAutomaticReconnect()
.build();
- const o = owner;
- const t = timeline;
+ connection.onclose = () => {
+ connectionStateSubject.next("Disconnected");
+ };
- const handler = (owner: string, timeline: string): void => {
- if (owner === o && timeline === t) {
- subscriber.next({ update: true, state: connection.state });
- }
+ connection.onreconnecting = () => {
+ connectionStateSubject.next("Reconnecting");
};
- connection.onclose(() => {
- subscriber.next({
- update: false,
- state: HubConnectionState.Disconnected,
- });
- });
+ connection.onreconnected = () => {
+ connectionStateSubject.next("Connected");
+ };
- connection.onreconnecting(() => {
- subscriber.next({
- update: false,
- state: HubConnectionState.Reconnecting,
- });
+ void connection.start().then(() => {
+ connectionStateSubject.next("Connected");
});
- connection.onreconnected(() => {
- subscriber.next({
- update: false,
- state: HubConnectionState.Connected,
- });
+ subscriber.next({
+ connection,
+ state$: connectionStateSubject.asObservable(),
});
- connection.on("OnTimelinePostChangedV2", handler);
+ return () => {
+ void connection.stop();
+ };
+ });
+}
- void connection.start().then(() => {
- subscriber.next({ update: false, state: HubConnectionState.Connected });
+const connectionSubject = new BehaviorSubject<Connection | null>(null);
+
+token$.pipe(switchMap(createConnection$)).subscribe(connectionSubject);
- return connection.invoke(
- "SubscribeTimelinePostChangeV2",
+const connection$ = connectionSubject
+ .asObservable()
+ .pipe(filter((c): c is Connection => c != null));
+
+function createTimelinePostUpdateCount$(
+ connection: HubConnection,
+ owner: string,
+ timeline: string,
+): Observable<number> {
+ const [o, t] = [owner, timeline];
+ return new Observable<number>((subscriber) => {
+ let count = 0;
+
+ const handler = (owner: string, timeline: string): void => {
+ if (owner === o && timeline === t) {
+ subscriber.next(count++);
+ }
+ };
+
+ connection.on("OnTimelinePostChangedV2", handler);
+ void connection.invoke("SubscribeTimelinePostChangeV2", owner, timeline);
+
+ return () => {
+ void connection.invoke(
+ "UnsubscribeTimelinePostChangeV2",
owner,
timeline,
);
+ connection.off("OnTimelinePostChangedV2", handler);
+ };
+ });
+}
+
+type OldUpdateInfo = { update: boolean; state: HubConnectionState };
+
+function createTimelinePostOldUpdateInfo$(
+ connection: Connection,
+ owner: string,
+ timeline: string,
+): Observable<OldUpdateInfo> {
+ return new Observable<OldUpdateInfo>((subscriber) => {
+ let savedState: ConnectionState = "Connecting";
+
+ const postUpdateSubscription = createTimelinePostUpdateCount$(
+ connection.connection,
+ owner,
+ timeline,
+ ).subscribe(() => {
+ subscriber.next({
+ update: true,
+ state: savedState as HubConnectionState,
+ });
});
- return () => {
- connection.off("OnTimelinePostChangedV2", handler);
+ const stateSubscription = connection.state$.subscribe((state) => {
+ savedState = state;
+ subscriber.next({ update: false, state: state as HubConnectionState });
+ });
- if (connection.state === HubConnectionState.Connected) {
- void connection
- .invoke("UnsubscribeTimelinePostChangeV2", owner, timeline)
- .then(() => connection.stop());
- }
+ return () => {
+ stateSubscription.unsubscribe();
+ postUpdateSubscription.unsubscribe();
};
});
}
+
+export function getTimelinePostUpdate$(
+ owner: string,
+ timeline: string,
+): Observable<OldUpdateInfo> {
+ return connection$.pipe(
+ switchMap((connection) =>
+ createTimelinePostOldUpdateInfo$(connection, owner, timeline),
+ ),
+ );
+}