Skip to content

Commit b6162ac

Browse files
authored
feat: process update message in the order that they are received (#1058)
* feat: process update message in the order that they are received * feat: beforeSync can be made async again, as the server now processes messages in order
1 parent 5518239 commit b6162ac

File tree

5 files changed

+64
-46
lines changed

5 files changed

+64
-46
lines changed

packages/extension-redis/src/Redis.ts

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -369,16 +369,13 @@ export class Redis implements Extension {
369369
return;
370370
}
371371

372-
new MessageReceiver(message, this.redisTransactionOrigin).apply(
373-
document,
374-
undefined,
375-
(reply) => {
376-
return this.pub.publish(
377-
this.pubKey(document.name),
378-
this.encodeMessage(reply),
379-
);
380-
},
381-
);
372+
const receiver = new MessageReceiver(message, this.redisTransactionOrigin);
373+
await receiver.apply(document, undefined, (reply) => {
374+
return this.pub.publish(
375+
this.pubKey(document.name),
376+
this.encodeMessage(reply),
377+
);
378+
});
382379
};
383380

384381
/**

packages/server/src/ClientConnection.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@ export class ClientConnection<Context = any> {
164164
);
165165

166166
instance.onClose(async (document, event) => {
167+
// Wait for any pending message processing to complete before running
168+
// disconnect hooks. This ensures that document updates from queued messages
169+
// are applied (and their debounced onStoreDocument scheduled) before the
170+
// disconnect handler checks whether to call executeNow.
171+
await instance.waitForPendingMessages();
172+
167173
const disconnectHookPayload: onDisconnectPayload = {
168174
instance: this.documentProvider as Hocuspocus, // TODO, this will be removed when we use events instead of hooks for this class
169175
clientsCount: document.getConnectionsCount(),

packages/server/src/Connection.ts

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ import type Document from "./Document.ts";
99
import { IncomingMessage } from "./IncomingMessage.ts";
1010
import { MessageReceiver } from "./MessageReceiver.ts";
1111
import { OutgoingMessage } from "./OutgoingMessage.ts";
12-
import type {
13-
beforeSyncPayload,
14-
onStatelessPayload,
15-
} from "./types.ts";
12+
import type { beforeSyncPayload, onStatelessPayload } from "./types.ts";
1613

1714
export class Connection<Context = any> {
1815
webSocket: WebSocket;
@@ -39,6 +36,10 @@ export class Connection<Context = any> {
3936

4037
readOnly: boolean;
4138

39+
private messageQueue: Uint8Array[] = [];
40+
41+
private processingPromise: Promise<void> = Promise.resolve();
42+
4243
/**
4344
* Constructor.
4445
*/
@@ -121,6 +122,13 @@ export class Connection<Context = any> {
121122
return this;
122123
}
123124

125+
/**
126+
* Returns a promise that resolves when all queued messages have been processed.
127+
*/
128+
waitForPendingMessages(): Promise<void> {
129+
return this.processingPromise;
130+
}
131+
124132
/**
125133
* Send the given message
126134
*/
@@ -204,30 +212,34 @@ export class Connection<Context = any> {
204212
* @public
205213
*/
206214
public handleMessage(data: Uint8Array): void {
207-
const message = new IncomingMessage(data);
208-
const documentName = message.readVarString();
209-
210-
if (documentName !== this.document.name) return;
211-
212-
message.writeVarString(documentName);
213-
214-
this.callbacks
215-
.beforeHandleMessage(this, data)
216-
.then(() => {
217-
try {
218-
new MessageReceiver(message).apply(this.document, this);
219-
} catch (e: any) {
220-
console.error(
221-
`closing connection ${this.socketId} (while handling ${documentName}) because of exception`,
222-
e,
223-
);
224-
this.close({
225-
code: "code" in e ? e.code : ResetConnection.code,
226-
reason: "reason" in e ? e.reason : ResetConnection.reason,
227-
});
228-
}
229-
})
230-
.catch((e: any) => {
215+
this.messageQueue.push(data);
216+
217+
if (this.messageQueue.length === 1) {
218+
this.processingPromise = this.processMessages();
219+
}
220+
}
221+
222+
private async processMessages() {
223+
while (this.messageQueue.length > 0) {
224+
const rawUpdate = this.messageQueue.at(0) as Uint8Array;
225+
226+
const message = new IncomingMessage(rawUpdate);
227+
const documentName = message.readVarString();
228+
229+
if (documentName !== this.document.name) {
230+
this.messageQueue.shift();
231+
continue;
232+
}
233+
234+
message.writeVarString(documentName);
235+
236+
try {
237+
await this.callbacks.beforeHandleMessage(this, rawUpdate);
238+
const receiver = new MessageReceiver(message);
239+
240+
await receiver.apply(this.document, this);
241+
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
242+
} catch (e: any) {
231243
console.error(
232244
`closing connection ${this.socketId} (while handling ${documentName}) because of exception`,
233245
e,
@@ -236,7 +248,10 @@ export class Connection<Context = any> {
236248
code: "code" in e ? e.code : ResetConnection.code,
237249
reason: "reason" in e ? e.reason : ResetConnection.reason,
238250
});
239-
});
251+
}
252+
253+
this.messageQueue.shift();
254+
}
240255
}
241256
}
242257

packages/server/src/IncomingMessage.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import type { Decoder } from "lib0/decoding";
22
import {
33
createDecoder,
4+
readVarString,
45
readVarUint,
56
readVarUint8Array,
6-
readVarString,
77
} from "lib0/decoding";
88
import type { Encoder } from "lib0/encoding";
99
import {
1010
createEncoder,
11+
length,
1112
toUint8Array,
12-
writeVarUint,
1313
writeVarString,
14-
length,
14+
writeVarUint,
1515
} from "lib0/encoding";
1616
import type { MessageType } from "./types.ts";
1717

packages/server/src/MessageReceiver.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export class MessageReceiver {
2727
this.defaultTransactionOrigin = defaultTransactionOrigin;
2828
}
2929

30-
public apply(
30+
public async apply(
3131
document: Document,
3232
connection?: Connection,
3333
reply?: (message: Uint8Array) => void,
@@ -40,7 +40,7 @@ export class MessageReceiver {
4040
case MessageType.Sync:
4141
case MessageType.SyncReply: {
4242
message.writeVarUint(MessageType.Sync);
43-
this.readSyncMessage(
43+
await this.readSyncMessage(
4444
message,
4545
document,
4646
connection,
@@ -126,7 +126,7 @@ export class MessageReceiver {
126126
}
127127
}
128128

129-
readSyncMessage(
129+
async readSyncMessage(
130130
message: IncomingMessage,
131131
document: Document,
132132
connection?: Connection,
@@ -136,7 +136,7 @@ export class MessageReceiver {
136136
const type = message.readVarUint();
137137

138138
if (connection) {
139-
connection.callbacks.beforeSync(connection, {
139+
await connection.callbacks.beforeSync(connection, {
140140
type,
141141
payload: message.peekVarUint8Array(),
142142
});

0 commit comments

Comments
 (0)