Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
"libsignal": "git+https://github.com/whiskeysockets/libsignal-node",
"lru-cache": "^11.1.0",
"music-metadata": "^11.7.0",
"p-queue": "^9.0.0",
"pino": "^9.6",
"protobufjs": "^7.2.4",
"ws": "^8.13.0"
Expand Down
84 changes: 38 additions & 46 deletions src/Utils/auth-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import NodeCache from '@cacheable/node-cache'
import { AsyncLocalStorage } from 'async_hooks'
import { Mutex } from 'async-mutex'
import { randomBytes } from 'crypto'
import PQueue from 'p-queue'
import { DEFAULT_CACHE_TTLS } from '../Defaults'
import type {
AuthenticationCreds,
Expand Down Expand Up @@ -46,57 +45,50 @@ export function makeCacheableSignalKeyStore(
deleteOnExpire: true
})

// Mutex for protecting cache operations
const cacheMutex = new Mutex()

function getUniqueId(type: string, id: string) {
return `${type}.${id}`
}

return {
async get(type, ids) {
return cacheMutex.runExclusive(async () => {
const data: { [_: string]: SignalDataTypeMap[typeof type] } = {}
const idsToFetch: string[] = []
const data: { [_: string]: SignalDataTypeMap[typeof type] } = {}
const idsToFetch: string[] = []

for (const id of ids) {
const item = (await cache.get<SignalDataTypeMap[typeof type]>(getUniqueId(type, id))) as any
if (typeof item !== 'undefined') {
data[id] = item
} else {
idsToFetch.push(id)
}
for (const id of ids) {
const item = (await cache.get<SignalDataTypeMap[typeof type]>(getUniqueId(type, id))) as any
if (typeof item !== 'undefined') {
data[id] = item
} else {
idsToFetch.push(id)
}
}

if (idsToFetch.length) {
logger?.trace({ items: idsToFetch.length }, 'loading from store')
const fetched = await store.get(type, idsToFetch)
for (const id of idsToFetch) {
const item = fetched[id]
if (item) {
data[id] = item
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
await cache.set(getUniqueId(type, id), item as SignalDataTypeMap[keyof SignalDataTypeMap])
}
if (idsToFetch.length) {
logger?.trace({ items: idsToFetch.length }, 'loading from store')
const fetched = await store.get(type, idsToFetch)
for (const id of idsToFetch) {
const item = fetched[id]
if (item) {
data[id] = item
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
await cache.set(getUniqueId(type, id), item as SignalDataTypeMap[keyof SignalDataTypeMap])
}
}
}

return data
})
return data
},
async set(data) {
return cacheMutex.runExclusive(async () => {
let keys = 0
for (const type in data) {
for (const id in data[type as keyof SignalDataTypeMap]) {
await cache.set(getUniqueId(type, id), data[type as keyof SignalDataTypeMap]![id]!)
keys += 1
}
let keys = 0
for (const type in data) {
for (const id in data[type as keyof SignalDataTypeMap]) {
await cache.set(getUniqueId(type, id), data[type as keyof SignalDataTypeMap]![id]!)
keys += 1
}
}

logger?.trace({ keys }, 'updated cache')
await store.set(data)
})
logger?.trace({ keys }, 'updated cache')
await store.set(data)
},
async clear() {
await cache.flushAll()
Expand All @@ -119,8 +111,8 @@ export const addTransactionCapability = (
): SignalKeyStoreWithTransaction => {
const txStorage = new AsyncLocalStorage<TransactionContext>()

// Queues for concurrency control (keyed by signal data type - bounded set)
const keyQueues = new Map<string, PQueue>()
// Mutexes for concurrency control (keyed by signal data type - bounded set)
const keyMutexes = new Map<string, Mutex>()

// Transaction mutexes with reference counting for cleanup
const txMutexes = new Map<string, Mutex>()
Expand All @@ -130,14 +122,14 @@ export const addTransactionCapability = (
const preKeyManager = new PreKeyManager(state, logger)

/**
* Get or create a queue for a specific key type
* Get or create a mutex for a specific key type
*/
function getQueue(key: string): PQueue {
if (!keyQueues.has(key)) {
keyQueues.set(key, new PQueue({ concurrency: 1 }))
function getKeyMutex(key: string): Mutex {
if (!keyMutexes.has(key)) {
keyMutexes.set(key, new Mutex())
}

return keyQueues.get(key)!
return keyMutexes.get(key)!
}

/**
Expand Down Expand Up @@ -253,7 +245,7 @@ export const addTransactionCapability = (
const ctx = txStorage.getStore()

if (!ctx) {
// No transaction - direct write with queue protection
// No transaction - direct write with mutex protection
const types = Object.keys(data)

// Process pre-keys with validation
Expand All @@ -264,10 +256,10 @@ export const addTransactionCapability = (
}
}

// Write all data in parallel
// Write all data in parallel (each type protected by its own mutex)
await Promise.all(
types.map(type =>
getQueue(type).add(async () => {
getKeyMutex(type).runExclusive(async () => {
const typeData = { [type]: data[type as keyof SignalDataTypeMap] } as SignalDataSet
await state.set(typeData)
})
Expand Down
18 changes: 9 additions & 9 deletions src/Utils/pre-key-manager.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
import PQueue from 'p-queue'
import { Mutex } from 'async-mutex'
import type { SignalDataSet, SignalDataTypeMap, SignalKeyStore } from '../Types'
import type { ILogger } from './logger'

/**
* Manages pre-key operations with proper concurrency control
*/
export class PreKeyManager {
private readonly queues = new Map<string, PQueue>()
private readonly mutexes = new Map<string, Mutex>()

constructor(
private readonly store: SignalKeyStore,
private readonly logger: ILogger
) {}

/**
* Get or create a queue for a specific key type
* Get or create a mutex for a specific key type
*/
private getQueue(keyType: string): PQueue {
if (!this.queues.has(keyType)) {
this.queues.set(keyType, new PQueue({ concurrency: 1 }))
private getMutex(keyType: string): Mutex {
if (!this.mutexes.has(keyType)) {
this.mutexes.set(keyType, new Mutex())
}

return this.queues.get(keyType)!
return this.mutexes.get(keyType)!
}

/**
Expand All @@ -37,7 +37,7 @@ export class PreKeyManager {
const keyData = data[keyType]
if (!keyData) return

return this.getQueue(keyType).add(async () => {
return this.getMutex(keyType).runExclusive(async () => {
// Ensure structures exist
transactionCache[keyType] = transactionCache[keyType] || ({} as any)
mutations[keyType] = mutations[keyType] || ({} as any)
Expand Down Expand Up @@ -108,7 +108,7 @@ export class PreKeyManager {
const keyData = data[keyType]
if (!keyData) return

return this.getQueue(keyType).add(async () => {
return this.getMutex(keyType).runExclusive(async () => {
// Find all deletion requests
const deletionIds = Object.keys(keyData).filter(id => keyData[id] === null)
if (deletionIds.length === 0) return
Expand Down
25 changes: 0 additions & 25 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3012,7 +3012,6 @@ __metadata:
lru-cache: "npm:^11.1.0"
music-metadata: "npm:^11.7.0"
open: "npm:^8.4.2"
p-queue: "npm:^9.0.0"
pino: "npm:^9.6"
pino-pretty: "npm:^13.1.1"
prettier: "npm:^3.5.3"
Expand Down Expand Up @@ -4607,13 +4606,6 @@ __metadata:
languageName: node
linkType: hard

"eventemitter3@npm:^5.0.1":
version: 5.0.1
resolution: "eventemitter3@npm:5.0.1"
checksum: 10c0/4ba5c00c506e6c786b4d6262cfbce90ddc14c10d4667e5c83ae993c9de88aa856033994dd2b35b83e8dc1170e224e66a319fa80adc4c32adcd2379bbc75da814
languageName: node
linkType: hard

"events@npm:^3.3.0":
version: 3.3.0
resolution: "events@npm:3.3.0"
Expand Down Expand Up @@ -7929,23 +7921,6 @@ __metadata:
languageName: node
linkType: hard

"p-queue@npm:^9.0.0":
version: 9.0.0
resolution: "p-queue@npm:9.0.0"
dependencies:
eventemitter3: "npm:^5.0.1"
p-timeout: "npm:^7.0.0"
checksum: 10c0/0f27fcbec9e4e02f34cc3660f14b5746798c51a2425dc3d3c5238924c34726b1580606d5d1432fc05304e2350c00b112ded03eb43c1b49de7791a7150fbfcb4e
languageName: node
linkType: hard

"p-timeout@npm:^7.0.0":
version: 7.0.0
resolution: "p-timeout@npm:7.0.0"
checksum: 10c0/0418be5dd0269916293cfd10f738427a1a33b50d2358cf72fecee2b8a31cc2a663ae178105f1d6ef92301c50f86f78074adb32f8321856a4f332765786fedd93
languageName: node
linkType: hard

"p-try@npm:^2.0.0":
version: 2.2.0
resolution: "p-try@npm:2.2.0"
Expand Down
Loading