Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/server/src/DirectConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class DirectConnection implements DirectConnectionInterface {

transaction(this.document)

await this.instance.storeDocumentHooks(this.document, {
await this.instance.onStoreDocument(this.document, {
clientsCount: this.document.getConnectionsCount(),
context: this.context,
document: this.document,
Expand All @@ -50,7 +50,7 @@ export class DirectConnection implements DirectConnectionInterface {

this.document?.removeDirectConnection()

await this.instance.storeDocumentHooks(this.document, {
await this.instance.onStoreDocument(this.document, {
clientsCount: this.document.getConnectionsCount(),
context: this.context,
document: this.document,
Expand Down
61 changes: 40 additions & 21 deletions packages/server/src/Hocuspocus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
AwarenessUpdate,
Configuration,
ConnectionConfiguration,
Extension,
HookName,
HookPayloadByName,
beforeBroadcastStatelessPayload,
Expand All @@ -43,6 +44,8 @@ export const defaultConfiguration = {
gcFilter: () => true,
},
unloadImmediately: true,
storageQueue: 'default',
storageQueues: { default: {} },
stopOnSignals: true,
}

Expand Down Expand Up @@ -92,6 +95,11 @@ export class Hocuspocus {
this.configuration = {
...this.configuration,
...configuration,
storageQueues: {
default: {},
...this.configuration.storageQueues,
...configuration.storageQueues,
},
}

this.configuration.extensions.sort((a, b) => {
Expand Down Expand Up @@ -129,6 +137,14 @@ export class Hocuspocus {
afterUnloadDocument: this.configuration.afterUnloadDocument,
onDisconnect: this.configuration.onDisconnect,
onDestroy: this.configuration.onDestroy,
storageQueue: this.configuration.storageQueue,
})

// create storage queues that are referenced by extensions but not pre-defined at the top level
this.configuration.extensions.forEach(extension => {
if (extension.storageQueue && !this.configuration.storageQueues[extension.storageQueue]) {
this.configuration.storageQueues[extension.storageQueue] = {}
}
})

this.hooks('onConfigure', {
Expand Down Expand Up @@ -354,14 +370,7 @@ export class Hocuspocus {
// Only run this if the document has finished loading earlier (i.e. not to persist the empty
// ydoc if the onLoadDocument hook returned an error)
if (!document.isLoading) {
this.debounce(
`onStoreDocument-${document.name}`,
() => {
this.storeDocumentHooks(document, hookPayload)
},
this.configuration.unloadImmediately ? 0 : this.configuration.debounce,
this.configuration.maxDebounce,
)
this.onStoreDocument(document, hookPayload, this.configuration.unloadImmediately)
} else {
// Remove document from memory immediately
this.unloadDocument(document)
Expand Down Expand Up @@ -402,14 +411,7 @@ export class Hocuspocus {
return
}

this.debounce(
`onStoreDocument-${document.name}`,
() => {
this.storeDocumentHooks(document, hookPayload)
},
this.configuration.debounce,
this.configuration.maxDebounce,
)
this.onStoreDocument(document, hookPayload)
}

/**
Expand Down Expand Up @@ -485,10 +487,27 @@ export class Hocuspocus {
return document
}

storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload) {
return this.hooks('onStoreDocument', hookPayload)
onStoreDocument(document: Document, hookPayload: onStoreDocumentPayload, unloadImmediately = false) {
const promises = Object.entries(this.configuration.storageQueues).map(([queue, { debounce = this.configuration.debounce, maxDebounce = this.configuration.maxDebounce }]) => {
return this.debounce(
`onStoreDocument-${queue}-${document.name}`,
() => this.storeDocumentHooks(document, hookPayload, queue),
unloadImmediately ? 0 : debounce,
maxDebounce,
)
})
return Promise.all(promises)
}

storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload, queue = 'default') {
const filter = (extension: Extension) => {
return (
extension.storageQueue === queue || (extension.storageQueue === undefined && queue === 'default')
)
}
return this.hooks('onStoreDocument', hookPayload, null, filter)
.then(() => {
this.hooks('afterStoreDocument', hookPayload).then(() => {
this.hooks('afterStoreDocument', hookPayload, null, filter).then(() => {
// Remove document from memory.

if (document.getConnectionsCount() > 0) {
Expand All @@ -511,7 +530,7 @@ export class Hocuspocus {
* Run the given hook on all configured extensions.
* Runs the given callback after each hook.
*/
hooks<T extends HookName>(name: T, payload: HookPayloadByName[T], callback: Function | null = null): Promise<any> {
hooks<T extends HookName>(name: T, payload: HookPayloadByName[T], callback: Function | null = null, filter?: (extension: Extension) => boolean): Promise<any> {
const { extensions } = this.configuration

// create a new `thenable` chain
Expand All @@ -520,7 +539,7 @@ export class Hocuspocus {

extensions
// get me all extensions which have the given hook
.filter(extension => typeof extension[name] === 'function')
.filter(extension => typeof extension[name] === 'function' && (filter?.(extension) ?? true))
// run through all the configured hooks
.forEach(extension => {
chain = chain
Expand Down
125 changes: 68 additions & 57 deletions packages/server/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
IncomingHttpHeaders, IncomingMessage, ServerResponse,
} from 'http'
import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from 'http'
import { URLSearchParams } from 'url'
import { Awareness } from 'y-protocols/awareness'
import Connection from './Connection.js'
Expand All @@ -27,55 +25,56 @@ export interface AwarenessUpdate {
}

export interface ConnectionConfiguration {
readOnly: boolean
requiresAuthentication: boolean
isAuthenticated: boolean
readOnly: boolean,
requiresAuthentication: boolean,
isAuthenticated: boolean,
}

export interface Extension {
priority?: number;
extensionName?: string;
onConfigure?(data: onConfigurePayload): Promise<any>;
onListen?(data: onListenPayload): Promise<any>;
onUpgrade?(data: onUpgradePayload): Promise<any>;
onConnect?(data: onConnectPayload): Promise<any>;
connected?(data: connectedPayload): Promise<any>;
onAuthenticate?(data: onAuthenticatePayload): Promise<any>;
onLoadDocument?(data: onLoadDocumentPayload): Promise<any>;
afterLoadDocument?(data: afterLoadDocumentPayload): Promise<any>;
beforeHandleMessage?(data: beforeHandleMessagePayload): Promise<any>;
beforeBroadcastStateless?(data: beforeBroadcastStatelessPayload): Promise<any>;
onStateless?(payload: onStatelessPayload): Promise<any>;
onChange?(data: onChangePayload): Promise<any>;
onStoreDocument?(data: onStoreDocumentPayload): Promise<any>;
afterStoreDocument?(data: afterStoreDocumentPayload): Promise<any>;
onAwarenessUpdate?(data: onAwarenessUpdatePayload): Promise<any>;
onRequest?(data: onRequestPayload): Promise<any>;
onDisconnect?(data: onDisconnectPayload): Promise<any>;
afterUnloadDocument?(data: afterUnloadDocumentPayload): Promise<any>;
onDestroy?(data: onDestroyPayload): Promise<any>;
priority?: number,
extensionName?: string,
storageQueue?: string,
onConfigure?(data: onConfigurePayload): Promise<any>,
onListen?(data: onListenPayload): Promise<any>,
onUpgrade?(data: onUpgradePayload): Promise<any>,
onConnect?(data: onConnectPayload): Promise<any>,
connected?(data: connectedPayload): Promise<any>,
onAuthenticate?(data: onAuthenticatePayload): Promise<any>,
onLoadDocument?(data: onLoadDocumentPayload): Promise<any>,
afterLoadDocument?(data: afterLoadDocumentPayload): Promise<any>,
beforeHandleMessage?(data: beforeHandleMessagePayload): Promise<any>,
beforeBroadcastStateless?(data: beforeBroadcastStatelessPayload): Promise<any>,
onStateless?(payload: onStatelessPayload): Promise<any>,
onChange?(data: onChangePayload): Promise<any>,
onStoreDocument?(data: onStoreDocumentPayload): Promise<any>,
afterStoreDocument?(data: afterStoreDocumentPayload): Promise<any>,
onAwarenessUpdate?(data: onAwarenessUpdatePayload): Promise<any>,
onRequest?(data: onRequestPayload): Promise<any>,
onDisconnect?(data: onDisconnectPayload): Promise<any>,
afterUnloadDocument?(data: afterUnloadDocumentPayload): Promise<any>,
onDestroy?(data: onDestroyPayload): Promise<any>,
}

export type HookName =
'onConfigure' |
'onListen' |
'onUpgrade' |
'onConnect' |
'connected' |
'onAuthenticate' |
'onLoadDocument' |
'afterLoadDocument' |
'beforeHandleMessage' |
'beforeBroadcastStateless' |
'onStateless' |
'onChange' |
'onStoreDocument' |
'afterStoreDocument' |
'onAwarenessUpdate' |
'onRequest' |
'onDisconnect' |
'afterUnloadDocument' |
'onDestroy'
| 'onConfigure'
| 'onListen'
| 'onUpgrade'
| 'onConnect'
| 'connected'
| 'onAuthenticate'
| 'onLoadDocument'
| 'afterLoadDocument'
| 'beforeHandleMessage'
| 'beforeBroadcastStateless'
| 'onStateless'
| 'onChange'
| 'onStoreDocument'
| 'afterStoreDocument'
| 'onAwarenessUpdate'
| 'onRequest'
| 'onDisconnect'
| 'afterUnloadDocument'
| 'onDestroy'

export type HookPayloadByName = {
onConfigure: onConfigurePayload,
Expand All @@ -98,6 +97,13 @@ export type HookPayloadByName = {
afterUnloadDocument: afterUnloadDocumentPayload,
onDestroy: onDestroyPayload,
}

export type StorageQueueConfigs = {
[key: string]: {
debounce?: number,
maxDebounce?: number,
},
}
export interface Configuration extends Extension {
/**
* A name for the instance, used for logging.
Expand Down Expand Up @@ -127,7 +133,7 @@ export interface Configuration extends Extension {
/**
* Makes sure to call `onStoreDocument` at least in the given amount of time (ms).
*/
maxDebounce: number
maxDebounce: number,
/**
* By default, the servers show a start screen. If passed false, the server will start quietly.
*/
Expand All @@ -153,9 +159,14 @@ export interface Configuration extends Extension {
*/
yDocOptions: {
gc: boolean, // enable or disable garbage collection (see https://github.com/yjs/yjs/blob/main/INTERNALS.md#deletions)
gcFilter: () => boolean, // will be called before garbage collecting ; return false to keep it
gcFilter: () => boolean, // will be called before garbage collecting , return false to keep it
},

/**
* Define specific debounce settings for each storage queue, allowing multiple extensions to store
* documents in different locations in parallel at different rates.
*/
storageQueues: StorageQueueConfigs,
}

export interface onStatelessPayload {
Expand Down Expand Up @@ -188,7 +199,7 @@ export interface onConnectPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

// @todo Change 'connection' to 'connectionConfig', and 'connectionInstance' to 'connection' in next major release
Expand All @@ -202,7 +213,7 @@ export interface connectedPayload {
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration,
connectionInstance: Connection
connectionInstance: Connection,
}

// @todo Change 'connection' to 'connectionConfig' in next major release
Expand All @@ -215,7 +226,7 @@ export interface onLoadDocumentPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

// @todo Change 'connection' to 'connectionConfig' in next major release
Expand All @@ -228,7 +239,7 @@ export interface afterLoadDocumentPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

export interface onChangePayload {
Expand All @@ -254,7 +265,7 @@ export interface beforeHandleMessagePayload {
requestParameters: URLSearchParams,
update: Uint8Array,
socketId: string,
connection: Connection
connection: Connection,
}

export interface beforeBroadcastStatelessPayload {
Expand Down Expand Up @@ -304,7 +315,7 @@ export interface fetchPayload {
requestHeaders: IncomingHttpHeaders,
requestParameters: URLSearchParams,
socketId: string,
connection: ConnectionConfiguration
connection: ConnectionConfiguration,
}

export interface storePayload extends onStoreDocumentPayload {
Expand Down Expand Up @@ -352,11 +363,11 @@ export interface onConfigurePayload {
}

export interface afterUnloadDocumentPayload {
instance: Hocuspocus;
documentName: string;
instance: Hocuspocus,
documentName: string,
}

export interface DirectConnection {
transact(transaction: (document: Document) => void): Promise<void>,
disconnect(): void
disconnect(): void,
}
Loading