Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/shaggy-games-approve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
'@powersync/react-native': minor
'@powersync/common': minor
'@powersync/web': minor
'@powersync/op-sqlite': patch
'@powersync/adapter-sql-js': patch
'@powersync/capacitor': patch
'@powersync/node': patch
'@powersync/nuxt': patch
---

Remove `async-mutex` dependency in favor of internal implementation.
1 change: 0 additions & 1 deletion demos/react-multi-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"@supabase/supabase-js": "^2.43.1",
"@vitejs/plugin-react": "^4.2.1",
"@webflow/webflow-cli": "^1.6.9",
"async-mutex": "^0.5.0",
"autoprefixer": "10.4.14",
"lodash": "^4.17.21",
"postcss": "8.4.27",
Expand Down
3 changes: 1 addition & 2 deletions demos/react-multi-client/src/library/SupabaseConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BaseListener,
BaseObserver,
CrudEntry,
Mutex,
PowerSyncBackendConnector,
UpdateType
} from '@powersync/web';
Expand All @@ -14,8 +15,6 @@ export interface SupabaseConnectorListener extends BaseListener {
onCRUDEvent: (event: { crudType: UpdateType; elapsedTimeMs: number }) => void;
}

import { Mutex } from 'async-mutex';

export class SupabaseConnector extends BaseObserver<SupabaseConnectorListener> implements PowerSyncBackendConnector {
static SHARED_MUTEX = new Mutex();
readonly client: SupabaseClient;
Expand Down
3 changes: 1 addition & 2 deletions packages/adapter-sql-js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
"test": "vitest"
},
"dependencies": {
"@powersync/common": "workspace:^",
"async-mutex": "catalog:"
"@powersync/common": "workspace:^"
},
"devDependencies": {
"@powersync/sql-js": "0.0.8",
Expand Down
2 changes: 1 addition & 1 deletion packages/adapter-sql-js/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ export default () => {
]
})
],
external: ['@powersync/common', 'async-mutex']
external: ['@powersync/common']
};
};
5 changes: 3 additions & 2 deletions packages/adapter-sql-js/src/SQLJSAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import {
DBLockOptions,
ILogger,
LockContext,
Mutex,
QueryResult,
SqlExecutor,
SQLOpenFactory,
SQLOpenOptions,
timeoutSignal,
Transaction
} from '@powersync/common';
import { Mutex } from 'async-mutex';
// This uses a pure JS version which avoids the need for WebAssembly, which is not supported in React Native.
import SQLJs from '@powersync/sql-js/dist/sql-asm.js';

Expand Down Expand Up @@ -165,7 +166,7 @@ class SqlJsConnectionPool extends BaseObserver<DBAdapterListener> implements Con
this.tableUpdateCache.clear();
this.iterateListeners((l) => l.tablesUpdated?.(notification));
return result;
});
}, timeoutSignal(options?.timeoutMs));
}

async refreshSchema(): Promise<void> {
Expand Down
3 changes: 0 additions & 3 deletions packages/capacitor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,5 @@
"android": {
"src": "android"
}
},
"dependencies": {
"async-mutex": "catalog:"
}
}
56 changes: 24 additions & 32 deletions packages/capacitor/src/adapter/CapacitorSQLiteAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import {
DBAdapterListener,
DBLockOptions,
LockContext,
mutexRunExclusive,
Mutex,
QueryResult,
timeoutSignal,
Transaction
} from '@powersync/web';
import { Mutex } from 'async-mutex';
import { PowerSyncCore } from '../plugin/PowerSyncCore.js';
import { messageForErrorCode } from '../plugin/PowerSyncPlugin.js';
import { CapacitorSQLiteOpenFactoryOptions, DEFAULT_SQLITE_OPTIONS } from './CapacitorSQLiteOpenFactory.js';
Expand Down Expand Up @@ -228,39 +228,31 @@ class CapacitorConnectionPool extends BaseObserver<DBAdapterListener> implements
}

readLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
return mutexRunExclusive(
this.readMutex,
async () => {
await this.initializedPromise;
return await fn(this.generateLockContext(this.readConnection));
},
options
);
return this.readMutex.runExclusive(async () => {
await this.initializedPromise;
return fn(this.generateLockContext(this.readConnection));
}, timeoutSignal(options?.timeoutMs));
}

writeLock<T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions): Promise<T> {
return mutexRunExclusive(
this.writeMutex,
async () => {
await this.initializedPromise;
const result = await fn(this.generateLockContext(this.writeConnection));

// Fetch table updates
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
const jsonUpdates = updates.values?.[0];
if (!jsonUpdates || !jsonUpdates.table_name) {
throw new Error('Could not fetch table updates');
}
const notification: BatchedUpdateNotification = {
rawUpdates: [],
tables: JSON.parse(jsonUpdates.table_name),
groupedUpdates: {}
};
this.iterateListeners((l) => l.tablesUpdated?.(notification));
return result;
},
options
);
return this.writeMutex.runExclusive(async () => {
await this.initializedPromise;
const result = await fn(this.generateLockContext(this.writeConnection));

// Fetch table updates
const updates = await this.writeConnection.query("SELECT powersync_update_hooks('get') AS table_name");
const jsonUpdates = updates.values?.[0];
if (!jsonUpdates || !jsonUpdates.table_name) {
throw new Error('Could not fetch table updates');
}
const notification: BatchedUpdateNotification = {
rawUpdates: [],
tables: JSON.parse(jsonUpdates.table_name),
groupedUpdates: {}
};
this.iterateListeners((l) => l.tablesUpdated?.(notification));
return result;
}, timeoutSignal(options?.timeoutMs));
}

refreshSchema(): Promise<void> {
Expand Down
10 changes: 3 additions & 7 deletions packages/capacitor/src/sync/CapacitorSyncImplementation.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { AbstractStreamingSyncImplementation, LockOptions, LockType, mutexRunExclusive } from '@powersync/web';
import { Mutex } from 'async-mutex';
import { AbstractStreamingSyncImplementation, LockOptions, LockType, Mutex } from '@powersync/web';

type MutexMap = {
/**
Expand Down Expand Up @@ -56,11 +55,8 @@ export class CapacitorStreamingSyncImplementation extends AbstractStreamingSyncI
mutexRecord.tracking.add(this.instanceId);
const mutex = mutexRecord.locks[lockOptions.type];

return mutexRunExclusive(mutex, async () => {
if (lockOptions.signal?.aborted) {
throw new Error('Aborted');
}
return mutex.runExclusive(async () => {
return await lockOptions.callback();
});
}, lockOptions.signal);
}
}
1 change: 0 additions & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
"test:exports": "attw --pack ."
},
"dependencies": {
"async-mutex": "catalog:",
"event-iterator": "^2.0.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/common/rollup.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function defineBuild(isNode) {
})
]
],
external: ['async-mutex', 'bson', isNode ? 'event-iterator' : undefined]
external: ['bson', isNode ? 'event-iterator' : undefined]
};
}

Expand Down
5 changes: 2 additions & 3 deletions packages/common/src/attachments/AttachmentService.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Mutex } from 'async-mutex';
import { AbstractPowerSyncDatabase } from '../client/AbstractPowerSyncDatabase.js';
import { DifferentialWatchedQuery } from '../client/watched/processors/DifferentialQueryProcessor.js';
import { ILogger } from '../utils/Logger.js';
import { mutexRunExclusive } from '../utils/mutex.js';
import { Mutex } from '../utils/mutex.js';
import { AttachmentContext } from './AttachmentContext.js';
import { AttachmentRecord, AttachmentState } from './Schema.js';

Expand Down Expand Up @@ -55,7 +54,7 @@ export class AttachmentService {
* Executes a callback with exclusive access to the attachment context.
*/
async withContext<T>(callback: (context: AttachmentContext) => Promise<T>): Promise<T> {
return mutexRunExclusive(this.mutex, async () => {
return this.mutex.runExclusive(async () => {
return callback(this.context);
});
}
Expand Down
2 changes: 1 addition & 1 deletion packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { Mutex } from 'async-mutex';
import { EventIterator } from 'event-iterator';
import Logger, { ILogger } from 'js-logger';
import {
Expand Down Expand Up @@ -46,6 +45,7 @@ import { TriggerManagerImpl } from './triggers/TriggerManagerImpl.js';
import { DEFAULT_WATCH_THROTTLE_MS, WatchCompatibleQuery } from './watched/WatchedQuery.js';
import { OnChangeQueryProcessor } from './watched/processors/OnChangeQueryProcessor.js';
import { WatchedQueryComparator } from './watched/processors/comparators.js';
import { Mutex } from '../utils/mutex.js';

export interface DisconnectAndClearOptions {
/** When set to false, data in local-only tables is preserved. */
Expand Down
147 changes: 121 additions & 26 deletions packages/common/src/utils/mutex.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,129 @@
import { Mutex } from 'async-mutex';
export type UnlockFn = () => void;

/**
* Wrapper for async-mutex runExclusive, which allows for a timeout on each exclusive lock.
* An asynchronous mutex implementation.
*
* @internal This class is meant to be used in PowerSync SDKs only, and is not part of the public API.
*/
export async function mutexRunExclusive<T>(
mutex: Mutex,
callback: () => Promise<T>,
options?: { timeoutMs?: number }
): Promise<T> {
return new Promise((resolve, reject) => {
const timeout = options?.timeoutMs;
let timedOut = false;
const timeoutId = timeout
? setTimeout(() => {
timedOut = true;
reject(new Error('Timeout waiting for lock'));
}, timeout)
: undefined;

mutex.runExclusive(async () => {
if (timeoutId) {
clearTimeout(timeoutId);
export class Mutex {
private inCriticalSection = false;

// Linked list of waiters. We don't expect the wait list to become particularly large, and this allows removing
// aborted waiters from the middle of the list efficiently.
private firstWaiter?: MutexWaitNode;
private lastWaiter?: MutexWaitNode;

private addWaiter(onAcquire: () => void): MutexWaitNode {
const node: MutexWaitNode = {
isActive: true,
onAcquire,
prev: this.lastWaiter
};
if (this.lastWaiter) {
this.lastWaiter.next = node;
this.lastWaiter = node;
} else {
// First waiter
this.lastWaiter = this.firstWaiter = node;
}

return node;
}

private deactivateWaiter(waiter: MutexWaitNode) {
const { prev, next } = waiter;
waiter.isActive = false;

if (prev) prev.next = next;
if (next) next.prev = prev;
if (waiter == this.firstWaiter) this.firstWaiter = next;
if (waiter == this.lastWaiter) this.lastWaiter = prev;
}

acquire(abort?: AbortSignal): Promise<UnlockFn> {
return new Promise((resolve, reject) => {
function rejectAborted() {
reject(abort?.reason ?? new Error('Mutex acquire aborted'));
}
if (timedOut) return;
if (abort?.aborted) {
return rejectAborted();
}

let holdsMutex = false;

const markCompleted = () => {
if (!holdsMutex) return;
holdsMutex = false;

const waiter = this.firstWaiter;
if (waiter) {
this.deactivateWaiter(waiter);
// Still in critical section, but owned by next waiter now.
waiter.onAcquire();
} else {
this.inCriticalSection = false;
}
};

if (!this.inCriticalSection) {
this.inCriticalSection = true;
holdsMutex = true;
return resolve(markCompleted);
} else {
let node: MutexWaitNode;

const onAbort = () => {
abort?.removeEventListener('abort', onAbort);

try {
resolve(await callback());
} catch (ex) {
reject(ex);
if (node.isActive) {
this.deactivateWaiter(node);
rejectAborted();
}
};

node = this.addWaiter(() => {
abort?.removeEventListener('abort', onAbort);
holdsMutex = true;
resolve(markCompleted);
});

abort?.addEventListener('abort', onAbort);
}
});
});
}

async runExclusive<T>(fn: () => PromiseLike<T> | T, abort?: AbortSignal): Promise<T> {
const returnMutex = await this.acquire(abort);

try {
return await fn();
} finally {
returnMutex();
}
}
}

interface MutexWaitNode {
/**
* Whether the waiter is currently active (not aborted and not fullfilled).
*/
isActive: boolean;
onAcquire: () => void;
prev?: MutexWaitNode;
next?: MutexWaitNode;
}

/**
* Creates a signal aborting after the set timeout.
*/
export function timeoutSignal(timeout: number): AbortSignal;
export function timeoutSignal(timeout?: number): AbortSignal | undefined;

export function timeoutSignal(timeout?: number): AbortSignal | undefined {
if (timeout == null) return;
if ('timeout' in AbortSignal) return AbortSignal.timeout(timeout);

const controller = new AbortController();
setTimeout(() => controller.abort(new Error('Timeout waiting for lock')), timeout);
return controller.signal;
}
Loading
Loading