Skip to content

Commit e578e20

Browse files
authoredSep 6, 2024··
Modify stopSync to block if sync is currently active (#889)
- Add a missing changset for #887: `stopSync` now blocks if a current sync is in progress before clearing the interval. An optional timeout can be defined, the default is 2 seconds. After this timeout it will throw. TestHarness has been updated to stop sync before clearing storage, previously this caused an issue where an ongoing sync would attempt to sign messages for DID that no longer had keys after clearing storage. #890 has been created to better address this by creating a signal to gracefully stop sync immediately.
1 parent da3630a commit e578e20

10 files changed

+310
-21
lines changed
 

‎.changeset/afraid-geese-knock.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@web5/agent": patch
3+
"@web5/identity-agent": patch
4+
"@web5/proxy-agent": patch
5+
"@web5/user-agent": patch
6+
---
7+
8+
Fix sync race condition issue

‎package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"@changesets/cli": "^2.27.5",
3232
"@npmcli/package-json": "5.0.0",
3333
"@typescript-eslint/eslint-plugin": "7.9.0",
34-
"@web5/dwn-server": "0.4.8",
34+
"@web5/dwn-server": "0.4.9",
3535
"audit-ci": "^7.0.1",
3636
"eslint-plugin-mocha": "10.4.3",
3737
"globals": "^13.24.0",

‎packages/agent/src/sync-api.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ export class AgentSyncApi implements SyncEngine {
5353
return this._syncEngine.startSync(params);
5454
}
5555

56-
public stopSync(): void {
57-
this._syncEngine.stopSync();
56+
public stopSync(timeout?: number): Promise<void> {
57+
return this._syncEngine.stopSync(timeout);
5858
}
5959
}

‎packages/agent/src/sync-engine-level.ts

+16-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,22 @@ export class SyncEngineLevel implements SyncEngine {
315315
}
316316
}
317317

318-
public stopSync(): void {
318+
/**
319+
* stopSync currently awaits the completion of the current sync operation before stopping the sync interval.
320+
* TODO: implement a signal to gracefully stop sync immediately https://github.com/TBD54566975/web5-js/issues/890
321+
*/
322+
public async stopSync(timeout: number = 2000): Promise<void> {
323+
let elapsedTimeout = 0;
324+
325+
while(this._syncLock) {
326+
if (elapsedTimeout >= timeout) {
327+
throw new Error(`SyncEngineLevel: Existing sync operation did not complete within ${timeout} milliseconds.`);
328+
}
329+
330+
elapsedTimeout += 100;
331+
await new Promise((resolve) => setTimeout(resolve, timeout < 100 ? timeout : 100));
332+
}
333+
319334
if (this._syncIntervalId) {
320335
clearInterval(this._syncIntervalId);
321336
this._syncIntervalId = undefined;

‎packages/agent/src/test-harness.ts

+3
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ export class PlatformAgentTestHarness {
8585
}
8686

8787
public async clearStorage(): Promise<void> {
88+
// first stop any ongoing sync operations
89+
await this.agent.sync.stopSync();
90+
8891
// @ts-expect-error since normally this property shouldn't be set to undefined.
8992
this.agent.agentDid = undefined;
9093
await this.didResolverCache.clear();

‎packages/agent/src/types/sync.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ export interface SyncEngine {
3838
startSync(params: { interval: string }): Promise<void>;
3939
/**
4040
* Stops the periodic sync operation, will complete the current sync operation if one is already in progress.
41+
*
42+
* @param timeout the maximum amount of time, in milliseconds, to wait for the current sync operation to complete. Default is 2000 (2 seconds).
43+
* @throws {Error} if the sync operation fails to stop before the timeout.
4144
*/
42-
stopSync(): void;
45+
stopSync(timeout?: number): Promise<void>;
4346
}

‎packages/agent/tests/sync-engine-level.spec.ts

+270-7
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ describe('SyncEngineLevel', () => {
9292
watermark,
9393
messageCid
9494
});
95-
console.log('key', key);
9695

9796
const syncParams = SyncEngineLevel['parseSyncMessageParamsKey'](key);
9897
expect(syncParams.protocol).to.be.undefined;
@@ -152,7 +151,6 @@ describe('SyncEngineLevel', () => {
152151

153152
sinon.restore();
154153

155-
syncEngine.stopSync();
156154
await syncEngine.clear();
157155
await testHarness.syncStore.clear();
158156
await testHarness.dwnDataStore.clear();
@@ -471,7 +469,7 @@ describe('SyncEngineLevel', () => {
471469
did: alice.did.uri,
472470
});
473471

474-
const clock = sinon.useFakeTimers();
472+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
475473
sinon.stub(syncEngine as any, 'push').resolves();
476474
const pullSpy = sinon.stub(syncEngine as any, 'pull');
477475
pullSpy.returns(new Promise<void>((resolve) => {
@@ -2444,7 +2442,7 @@ describe('SyncEngineLevel', () => {
24442442
const pushSpy = sinon.stub(SyncEngineLevel.prototype as any, 'push');
24452443
pushSpy.resolves();
24462444

2447-
const clock = sinon.useFakeTimers();
2445+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
24482446

24492447
testHarness.agent.sync.startSync({ interval: '500ms' });
24502448

@@ -2463,7 +2461,7 @@ describe('SyncEngineLevel', () => {
24632461
did: alice.did.uri,
24642462
});
24652463

2466-
const clock = sinon.useFakeTimers();
2464+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
24672465

24682466
const pullSpy = sinon.stub(SyncEngineLevel.prototype as any, 'pull');
24692467
pullSpy.returns(new Promise<void>((resolve) => {
@@ -2505,7 +2503,7 @@ describe('SyncEngineLevel', () => {
25052503
did: alice.did.uri,
25062504
});
25072505

2508-
const clock = sinon.useFakeTimers();
2506+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
25092507

25102508
const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');
25112509
// set to be a sync time longer than the interval
@@ -2551,7 +2549,7 @@ describe('SyncEngineLevel', () => {
25512549
did: alice.did.uri,
25522550
});
25532551

2554-
const clock = sinon.useFakeTimers();
2552+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
25552553

25562554
const syncSpy = sinon.stub(SyncEngineLevel.prototype as any, 'sync');
25572555
// set to be a sync time longer than the interval
@@ -2586,5 +2584,270 @@ describe('SyncEngineLevel', () => {
25862584
clock.restore();
25872585
});
25882586
});
2587+
2588+
describe('stopSync()', () => {
2589+
it('stops the sync interval', async () => {
2590+
await testHarness.agent.sync.registerIdentity({
2591+
did: alice.did.uri,
2592+
});
2593+
2594+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
2595+
2596+
const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync');
2597+
2598+
// stub push and pull to take 3 ms each
2599+
const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull');
2600+
pullStub.returns(new Promise<void>((resolve) => {
2601+
clock.setTimeout(() => {
2602+
resolve();
2603+
}, 3);
2604+
}));
2605+
2606+
const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push');
2607+
pushStub.returns(new Promise<void>((resolve) => {
2608+
clock.setTimeout(() => {
2609+
resolve();
2610+
}, 3);
2611+
}));
2612+
2613+
testHarness.agent.sync.startSync({ interval: '500ms' });
2614+
2615+
// expect the immediate sync call
2616+
expect(syncSpy.callCount).to.equal(1);
2617+
2618+
2619+
await clock.tickAsync(1_300); // just under 3 intervals
2620+
2621+
// expect 2 sync interval calls + initial sync
2622+
expect(syncSpy.callCount).to.equal(3);
2623+
2624+
await testHarness.agent.sync.stopSync();
2625+
2626+
await clock.tickAsync(1_000); // 2 intervals
2627+
2628+
// sync calls remain unchanged
2629+
expect(syncSpy.callCount).to.equal(3);
2630+
2631+
syncSpy.restore();
2632+
clock.restore();
2633+
});
2634+
2635+
it('waits for the current sync to complete before stopping', async () => {
2636+
await testHarness.agent.sync.registerIdentity({
2637+
did: alice.did.uri,
2638+
});
2639+
2640+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
2641+
2642+
const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync');
2643+
2644+
// stub push and pull to take 3 ms each
2645+
const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull');
2646+
pullStub.returns(new Promise<void>((resolve) => {
2647+
clock.setTimeout(() => {
2648+
resolve();
2649+
}, 3);
2650+
}));
2651+
2652+
const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push');
2653+
pushStub.returns(new Promise<void>((resolve) => {
2654+
clock.setTimeout(() => {
2655+
resolve();
2656+
}, 3);
2657+
}));
2658+
2659+
testHarness.agent.sync.startSync({ interval: '500ms' });
2660+
2661+
// expect the immediate sync call
2662+
expect(syncSpy.callCount).to.equal(1);
2663+
2664+
await clock.tickAsync(1_300); // just under 3 intervals
2665+
2666+
// expect 2 sync interval calls + initial sync
2667+
expect(syncSpy.callCount).to.equal(3);
2668+
2669+
// cause pull to take longer
2670+
pullStub.returns(new Promise<void>((resolve) => {
2671+
clock.setTimeout(() => {
2672+
resolve();
2673+
}, 1_000);
2674+
}));
2675+
2676+
await clock.tickAsync(201); // Enough time for the next interval to start
2677+
2678+
// next interval was called
2679+
expect(syncSpy.callCount).to.equal(4);
2680+
2681+
// stop the sync
2682+
await new Promise<void>((resolve) => {
2683+
const stopPromise = testHarness.agent.sync.stopSync();
2684+
clock.tickAsync(1_000).then(async () => {
2685+
await stopPromise;
2686+
resolve();
2687+
});
2688+
});
2689+
2690+
// sync calls remain unchanged
2691+
expect(syncSpy.callCount).to.equal(4);
2692+
2693+
// wait for future intervals
2694+
await clock.tickAsync(2_000);
2695+
2696+
// sync calls remain unchanged
2697+
expect(syncSpy.callCount).to.equal(4);
2698+
2699+
syncSpy.restore();
2700+
clock.restore();
2701+
});
2702+
2703+
it('throws if ongoing sync does not complete within 2 seconds', async () => {
2704+
await testHarness.agent.sync.registerIdentity({
2705+
did: alice.did.uri,
2706+
});
2707+
2708+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
2709+
2710+
const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync');
2711+
2712+
// stub push and pull to take 3 ms each
2713+
const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull');
2714+
pullStub.returns(new Promise<void>((resolve) => {
2715+
clock.setTimeout(() => {
2716+
resolve();
2717+
}, 3);
2718+
}));
2719+
2720+
const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push');
2721+
pushStub.returns(new Promise<void>((resolve) => {
2722+
clock.setTimeout(() => {
2723+
resolve();
2724+
}, 3);
2725+
}));
2726+
2727+
testHarness.agent.sync.startSync({ interval: '500ms' });
2728+
2729+
// expect the immediate sync call
2730+
expect(syncSpy.callCount).to.equal(1);
2731+
2732+
await clock.tickAsync(1_300); // just under 3 intervals
2733+
2734+
// expect 2 sync interval calls + initial sync
2735+
expect(syncSpy.callCount).to.equal(3);
2736+
2737+
// cause pull to take longer
2738+
pullStub.returns(new Promise<void>((resolve) => {
2739+
clock.setTimeout(() => {
2740+
resolve();
2741+
}, 2_700); // longer than the 2 seconds
2742+
}));
2743+
2744+
await clock.tickAsync(201); // Enough time for the next interval to start
2745+
2746+
// next interval was called
2747+
expect(syncSpy.callCount).to.equal(4);
2748+
2749+
const stopPromise = testHarness.agent.sync.stopSync();
2750+
2751+
try {
2752+
await new Promise<void>((resolve, reject) => {
2753+
stopPromise.catch((error) => reject(error));
2754+
2755+
clock.runToLastAsync().then(async () => {
2756+
try {
2757+
await stopPromise;
2758+
resolve();
2759+
} catch(error) {
2760+
reject(error);
2761+
}
2762+
});
2763+
2764+
});
2765+
expect.fail('Expected an error to be thrown');
2766+
} catch(error:any) {
2767+
expect(error.message).to.equal('SyncEngineLevel: Existing sync operation did not complete within 2000 milliseconds.');
2768+
}
2769+
2770+
syncSpy.restore();
2771+
clock.restore();
2772+
});
2773+
2774+
it('only waits for the ongoing sync for the given timeout before failing', async () => {
2775+
await testHarness.agent.sync.registerIdentity({
2776+
did: alice.did.uri,
2777+
});
2778+
2779+
const clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
2780+
2781+
const syncSpy = sinon.spy(SyncEngineLevel.prototype as any, 'sync');
2782+
2783+
// stub push and pull to take 3 ms each
2784+
const pullStub = sinon.stub(SyncEngineLevel.prototype as any, 'pull');
2785+
pullStub.returns(new Promise<void>((resolve) => {
2786+
clock.setTimeout(() => {
2787+
resolve();
2788+
}, 3);
2789+
}));
2790+
2791+
const pushStub = sinon.stub(SyncEngineLevel.prototype as any, 'push');
2792+
pushStub.returns(new Promise<void>((resolve) => {
2793+
clock.setTimeout(() => {
2794+
resolve();
2795+
}, 3);
2796+
}));
2797+
2798+
testHarness.agent.sync.startSync({ interval: '500ms' });
2799+
2800+
// expect the immediate sync call
2801+
expect(syncSpy.callCount).to.equal(1);
2802+
2803+
await clock.tickAsync(10); // enough time for the sync round trip to complete
2804+
2805+
// cause pull to take longer
2806+
pullStub.returns(new Promise<void>((resolve) => {
2807+
clock.setTimeout(() => {
2808+
resolve();
2809+
}, 2_700); // longer than the 2 seconds
2810+
}));
2811+
2812+
await clock.tickAsync(501); // Enough time for the next interval to start
2813+
2814+
// next interval was called
2815+
expect(syncSpy.callCount).to.equal(2);
2816+
2817+
const stopPromise = testHarness.agent.sync.stopSync(10);
2818+
try {
2819+
await new Promise<void>((resolve, reject) => {
2820+
stopPromise.catch((error) => reject(error));
2821+
2822+
clock.tickAsync(10).then(async () => {
2823+
try {
2824+
await stopPromise;
2825+
resolve();
2826+
} catch(error) {
2827+
reject(error);
2828+
}
2829+
});
2830+
2831+
});
2832+
expect.fail('Expected an error to be thrown');
2833+
} catch(error:any) {
2834+
expect(error.message).to.equal('SyncEngineLevel: Existing sync operation did not complete within 10 milliseconds.');
2835+
}
2836+
2837+
// call again with a longer timeout
2838+
await new Promise<void>((resolve) => {
2839+
const stopPromise2 = testHarness.agent.sync.stopSync(3_000);
2840+
// enough time for the ongoing sync to complete + 100ms as the check interval
2841+
clock.tickAsync(2800).then(async () => {
2842+
stopPromise2.then(() => resolve());
2843+
});
2844+
});
2845+
2846+
await clock.runToLastAsync();
2847+
syncSpy.restore();
2848+
clock.restore();
2849+
});
2850+
2851+
});
25892852
});
25902853
});

‎packages/api/tests/web5.spec.ts

-3
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ describe('web5 api', () => {
171171

172172
beforeEach(async () => {
173173
sinon.restore();
174-
testHarness.agent.sync.stopSync();
175174
await testHarness.clearStorage();
176175
await testHarness.createAgentDid();
177176
});
@@ -794,8 +793,6 @@ describe('web5 api', () => {
794793
expect(startSyncSpy.args[0][0].interval).to.equal('1m');
795794
});
796795

797-
798-
799796
it('should request all permissions for a protocol if no specific permissions are provided', async () => {
800797

801798
sinon.stub(Web5UserAgent, 'create').resolves(testHarness.agent as Web5UserAgent);

‎packages/dev-env/docker-compose.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ version: "3.98"
33
services:
44
dwn-server:
55
container_name: dwn-server
6-
image: ghcr.io/tbd54566975/dwn-server:0.4.8
6+
image: ghcr.io/tbd54566975/dwn-server:0.4.9
77
ports:
88
- "3000:3000"

‎pnpm-lock.yaml

+5-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)
Please sign in to comment.