Skip to content

Commit bc54d0c

Browse files
authored
Remove Duplicate Methods used in sync (#738)
* remove duplicate methods, fix sync for non-RecordsWrite messages, add protocol configure to e2e sync test
1 parent 09f80b7 commit bc54d0c

File tree

5 files changed

+153
-85
lines changed

5 files changed

+153
-85
lines changed

.changeset/tidy-wasps-smell.md

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@web5/agent": patch
3+
"@web5/identity-agent": patch
4+
"@web5/proxy-agent": patch
5+
"@web5/user-agent": patch
6+
---
7+
8+
Remove Duplicate Methods used in sync & Fix sync bug where only RecordsWrite were being pulled from the remote

packages/agent/src/dwn-api.ts

+2-35
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import type { Readable } from '@web5/common';
2-
import type { DwnConfig, GenericMessage, UnionMessageReply } from '@tbd54566975/dwn-sdk-js';
2+
import type { DwnConfig, GenericMessage } from '@tbd54566975/dwn-sdk-js';
33

44
import { NodeStream } from '@web5/common';
55
import { utils as cryptoUtils } from '@web5/crypto';
66
import { DidDht, DidJwk, DidResolverCacheLevel, UniversalResolver } from '@web5/dids';
77
import { Cid, DataStoreLevel, Dwn, DwnMethodName, EventLogLevel, Message, MessageStoreLevel, ResumableTaskStoreLevel } from '@tbd54566975/dwn-sdk-js';
88

99
import type { Web5PlatformAgent } from './types/agent.js';
10-
import type { DwnMessage, DwnMessageInstance, DwnMessageParams, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';
10+
import type { DwnMessage, DwnMessageReply, DwnMessageWithData, DwnResponse, DwnSigner, MessageHandler, ProcessDwnRequest, SendDwnRequest } from './types/dwn.js';
1111

1212
import { DwnInterface, dwnMessageConstructors } from './types/dwn.js';
1313
import { blobToIsomorphicNodeReadable, getDwnServiceEndpointUrls, isRecordsWrite, webReadableToIsomorphicNodeReadable } from './utils.js';
@@ -382,37 +382,4 @@ export class AgentDwnApi {
382382

383383
return dwnMessageWithBlob;
384384
}
385-
386-
/**
387-
* TODO: Refactor this to consolidate logic in AgentDwnApi and SyncEngineLevel.
388-
* ADDED TO GET SYNC WORKING
389-
* - createMessage()
390-
* - processMessage()
391-
*/
392-
393-
public async createMessage<T extends DwnInterface>({ author, messageParams, messageType }: {
394-
author: string;
395-
messageType: T;
396-
messageParams?: DwnMessageParams[T];
397-
}): Promise<DwnMessageInstance[T]> {
398-
// Determine the signer for the message.
399-
const signer = await this.getSigner(author);
400-
401-
const dwnMessageConstructor = dwnMessageConstructors[messageType];
402-
const dwnMessage = await dwnMessageConstructor.create({
403-
// TODO: Explore whether 'messageParams' should be required in the ProcessDwnRequest type.
404-
...messageParams!,
405-
signer
406-
});
407-
408-
return dwnMessage;
409-
}
410-
411-
public async processMessage({ dataStream, message, targetDid }: {
412-
targetDid: string;
413-
message: GenericMessage;
414-
dataStream?: Readable;
415-
}): Promise<UnionMessageReply> {
416-
return await this._dwn.processMessage(targetDid, message, { dataStream });
417-
}
418385
}

packages/agent/src/sync-engine-level.ts

+20-25
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,10 @@ export class SyncEngineLevel implements SyncEngine {
104104
continue;
105105
}
106106

107-
const messagesRead = await this.agent.dwn.createMessage({
107+
const messagesRead = await this.agent.processDwnRequest({
108+
store : false,
108109
author : did,
110+
target : did,
109111
messageType : DwnInterface.MessagesRead,
110112
messageParams : {
111113
messageCid: messageCid
@@ -118,7 +120,7 @@ export class SyncEngineLevel implements SyncEngine {
118120
reply = await this.agent.rpc.sendDwnRequest({
119121
dwnUrl,
120122
targetDid : did,
121-
message : messagesRead,
123+
message : messagesRead.message,
122124
}) as MessagesReadReply;
123125
} catch(e) {
124126
errored.add(dwnUrl);
@@ -132,27 +134,18 @@ export class SyncEngineLevel implements SyncEngine {
132134
}
133135

134136
const replyEntry = reply.entry;
135-
136-
if (isRecordsWrite(replyEntry)) {
137-
const message = replyEntry.message;
138-
139-
// if the message includes data we convert it to a Node readable stream
140-
// otherwise we set it as undefined, as the message does not include data
141-
// this occurs when the message is a RecordsWrite message that has been updated
142-
const dataStream = replyEntry.data ?
143-
NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream })
144-
: undefined;
145-
146-
const pullReply = await this.agent.dwn.processMessage({
147-
targetDid: did,
148-
message,
149-
dataStream,
150-
});
151-
152-
if (pullReply.status.code === 202 || pullReply.status.code === 409) {
153-
await this.addMessage(did, messageCid);
154-
deleteOperations.push({ type: 'del', key: key });
155-
}
137+
const message = replyEntry.message;
138+
// if the message includes data we convert it to a Node readable stream
139+
// otherwise we set it as undefined, as the message does not include data
140+
// this occurs when the message is a RecordsWrite message that has been updated
141+
const dataStream = isRecordsWrite(replyEntry) && replyEntry.data ?
142+
NodeStream.fromWebReadable({ readableStream: replyEntry.data as unknown as ReadableStream })
143+
: undefined;
144+
145+
const pullReply = await this.agent.dwn.node.processMessage(did, message, { dataStream });
146+
if (pullReply.status.code === 202 || pullReply.status.code === 409) {
147+
await this.addMessage(did, messageCid);
148+
deleteOperations.push({ type: 'del', key: key });
156149
}
157150
}
158151

@@ -308,7 +301,9 @@ export class SyncEngineLevel implements SyncEngine {
308301

309302
if (syncDirection === 'pull') {
310303
// When sync is a pull, get the event log from the remote DWN.
311-
const messagesReadMessage = await this.agent.dwn.createMessage({
304+
const messagesReadMessage = await this.agent.dwn.processRequest({
305+
store : false,
306+
target : did,
312307
author : did,
313308
messageType : DwnInterface.MessagesQuery,
314309
messageParams : { filters: [], cursor }
@@ -318,7 +313,7 @@ export class SyncEngineLevel implements SyncEngine {
318313
messagesReply = await this.agent.rpc.sendDwnRequest({
319314
dwnUrl : dwnUrl,
320315
targetDid : did,
321-
message : messagesReadMessage
316+
message : messagesReadMessage.message
322317
}) as MessagesQueryReply;
323318
} catch {
324319
// If a particular DWN service endpoint is unreachable, silently ignore.

packages/agent/tests/dwn-api.spec.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -604,11 +604,12 @@ describe('AgentDwnApi', () => {
604604

605605
it('returns a 202 Accepted status when the request is not stored', async () => {
606606
// spy on dwn.processMessage
607-
const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage');
607+
const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage');
608608

609609
// Attempt to process the RecordsWrite
610610
const dataBytes = Convert.string('Hello, world!').toUint8Array();
611611
let writeResponse = await testHarness.agent.dwn.processRequest({
612+
store : false,
612613
author : alice.did.uri,
613614
target : alice.did.uri,
614615
messageType : DwnInterface.RecordsWrite,

packages/agent/tests/sync-engine-level.spec.ts

+121-24
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import sinon from 'sinon';
22
import { expect } from 'chai';
33
import { utils as cryptoUtils } from '@web5/crypto';
4-
import { DwnConstant } from '@tbd54566975/dwn-sdk-js';
4+
import { DwnConstant, ProtocolDefinition } from '@tbd54566975/dwn-sdk-js';
55

66
import type { BearerIdentity } from '../src/bearer-identity.js';
77

@@ -82,7 +82,56 @@ describe('SyncEngineLevel', () => {
8282
await testHarness.closeStorage();
8383
});
8484

85-
it('syncs multiple records in both directions', async () => {
85+
it('syncs multiple messages in both directions', async () => {
86+
// create 1 local protocol configure
87+
const protocolDefinition1: ProtocolDefinition = {
88+
published : true,
89+
protocol : 'https://protocol.xyz/example/1',
90+
types : {
91+
foo: {
92+
schema : 'https://schemas.xyz/foo',
93+
dataFormats : ['text/plain', 'application/json']
94+
}
95+
},
96+
structure: {
97+
foo: {}
98+
}
99+
};
100+
101+
const protocolsConfigure1 = await testHarness.agent.processDwnRequest({
102+
author : alice.did.uri,
103+
target : alice.did.uri,
104+
messageType : DwnInterface.ProtocolsConfigure,
105+
messageParams : {
106+
definition: protocolDefinition1
107+
}
108+
});
109+
110+
// create 1 remote protocol configure
111+
const protocolDefinition2: ProtocolDefinition = {
112+
published : true,
113+
protocol : 'https://protocol.xyz/example/2',
114+
types : {
115+
bar: {
116+
schema : 'https://schemas.xyz/bar',
117+
dataFormats : ['text/plain', 'application/json']
118+
}
119+
},
120+
structure: {
121+
bar: {}
122+
}
123+
};
124+
125+
const protocolsConfigure2 = await testHarness.agent.sendDwnRequest({
126+
author : alice.did.uri,
127+
target : alice.did.uri,
128+
messageType : DwnInterface.ProtocolsConfigure,
129+
messageParams : {
130+
definition: protocolDefinition2
131+
}
132+
});
133+
134+
86135
// create 3 local records.
87136
const localRecords: string[] = [];
88137
for (let i = 0; i < 3; i++) {
@@ -152,8 +201,20 @@ describe('SyncEngineLevel', () => {
152201
remoteRecords.push((writeResponse.message!).recordId);
153202
}
154203

204+
// check that protocol1 exists locally
205+
let localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({
206+
author : alice.did.uri,
207+
target : alice.did.uri,
208+
messageType : DwnInterface.ProtocolsQuery,
209+
messageParams : {}
210+
});
211+
let localProtocolsQueryReply = localProtocolsQueryResponse.reply;
212+
expect(localProtocolsQueryReply.status.code).to.equal(200);
213+
expect(localProtocolsQueryReply.entries?.length).to.equal(1);
214+
expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message ]);
215+
155216
// query local and check for only local records
156-
let localQueryResponse = await testHarness.agent.dwn.processRequest({
217+
let localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({
157218
author : alice.did.uri,
158219
target : alice.did.uri,
159220
messageType : DwnInterface.RecordsQuery,
@@ -164,14 +225,26 @@ describe('SyncEngineLevel', () => {
164225
}
165226
}
166227
});
167-
let localDwnQueryReply = localQueryResponse.reply;
168-
expect(localDwnQueryReply.status.code).to.equal(200);
169-
expect(localDwnQueryReply.entries).to.have.length(3);
170-
let localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId);
228+
let localRecordsQueryReply = localRecordsQueryResponse.reply;
229+
expect(localRecordsQueryReply.status.code).to.equal(200);
230+
expect(localRecordsQueryReply.entries).to.have.length(3);
231+
let localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId);
171232
expect(localRecordsFromQuery).to.have.members(localRecords);
172233

234+
// check that protocol2 exists remotely
235+
let remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({
236+
author : alice.did.uri,
237+
target : alice.did.uri,
238+
messageType : DwnInterface.ProtocolsQuery,
239+
messageParams : {}
240+
});
241+
let remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply;
242+
expect(remoteProtocolsQueryReply.status.code).to.equal(200);
243+
expect(remoteProtocolsQueryReply.entries?.length).to.equal(1);
244+
expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure2.message ]);
245+
173246
// query remote and check for only remote records
174-
let remoteQueryResponse = await testHarness.agent.dwn.sendRequest({
247+
let remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({
175248
author : alice.did.uri,
176249
target : alice.did.uri,
177250
messageType : DwnInterface.RecordsQuery,
@@ -182,10 +255,10 @@ describe('SyncEngineLevel', () => {
182255
}
183256
}
184257
});
185-
let remoteDwnQueryReply = remoteQueryResponse.reply;
186-
expect(remoteDwnQueryReply.status.code).to.equal(200);
187-
expect(remoteDwnQueryReply.entries).to.have.length(3);
188-
let remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId);
258+
let remoteRecordsQueryReply = remoteRecordsQueryResponse.reply;
259+
expect(remoteRecordsQueryReply.status.code).to.equal(200);
260+
expect(remoteRecordsQueryReply.entries).to.have.length(3);
261+
let remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId);
189262
expect(remoteRecordsFromQuery).to.have.members(remoteRecords);
190263

191264
// Register Alice's DID to be synchronized.
@@ -197,8 +270,20 @@ describe('SyncEngineLevel', () => {
197270
await syncEngine.push();
198271
await syncEngine.pull();
199272

273+
// query local to see all protocols
274+
localProtocolsQueryResponse = await testHarness.agent.dwn.processRequest({
275+
author : alice.did.uri,
276+
target : alice.did.uri,
277+
messageType : DwnInterface.ProtocolsQuery,
278+
messageParams : {}
279+
});
280+
localProtocolsQueryReply = localProtocolsQueryResponse.reply;
281+
expect(localProtocolsQueryReply.status.code).to.equal(200);
282+
expect(localProtocolsQueryReply.entries?.length).to.equal(2);
283+
expect(localProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]);
284+
200285
// query local node to see all records
201-
localQueryResponse = await testHarness.agent.dwn.processRequest({
286+
localRecordsQueryResponse = await testHarness.agent.dwn.processRequest({
202287
author : alice.did.uri,
203288
target : alice.did.uri,
204289
messageType : DwnInterface.RecordsQuery,
@@ -209,14 +294,26 @@ describe('SyncEngineLevel', () => {
209294
}
210295
}
211296
});
212-
localDwnQueryReply = localQueryResponse.reply;
213-
expect(localDwnQueryReply.status.code).to.equal(200);
214-
expect(localDwnQueryReply.entries).to.have.length(6, 'local');
215-
localRecordsFromQuery = localDwnQueryReply.entries?.map(entry => entry.recordId);
297+
localRecordsQueryReply = localRecordsQueryResponse.reply;
298+
expect(localRecordsQueryReply.status.code).to.equal(200);
299+
expect(localRecordsQueryReply.entries).to.have.length(6, 'local');
300+
localRecordsFromQuery = localRecordsQueryReply.entries?.map(entry => entry.recordId);
216301
expect(localRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]);
217302

218-
// query remote node to see all results
219-
remoteQueryResponse = await testHarness.agent.dwn.sendRequest({
303+
// query remote node to see all protocols
304+
remoteProtocolsQueryResponse = await testHarness.agent.dwn.sendRequest({
305+
author : alice.did.uri,
306+
target : alice.did.uri,
307+
messageType : DwnInterface.ProtocolsQuery,
308+
messageParams : {}
309+
});
310+
remoteProtocolsQueryReply = remoteProtocolsQueryResponse.reply;
311+
expect(remoteProtocolsQueryReply.status.code).to.equal(200);
312+
expect(remoteProtocolsQueryReply.entries?.length).to.equal(2);
313+
expect(remoteProtocolsQueryReply.entries).to.have.deep.equal([ protocolsConfigure1.message, protocolsConfigure2.message ]);
314+
315+
// query remote node to see all records
316+
remoteRecordsQueryResponse = await testHarness.agent.dwn.sendRequest({
220317
author : alice.did.uri,
221318
target : alice.did.uri,
222319
messageType : DwnInterface.RecordsQuery,
@@ -227,10 +324,10 @@ describe('SyncEngineLevel', () => {
227324
}
228325
}
229326
});
230-
remoteDwnQueryReply = remoteQueryResponse.reply;
231-
expect(remoteDwnQueryReply.status.code).to.equal(200);
232-
expect(remoteDwnQueryReply.entries).to.have.length(6, 'remote');
233-
remoteRecordsFromQuery = remoteDwnQueryReply.entries?.map(entry => entry.recordId);
327+
remoteRecordsQueryReply = remoteRecordsQueryResponse.reply;
328+
expect(remoteRecordsQueryReply.status.code).to.equal(200);
329+
expect(remoteRecordsQueryReply.entries).to.have.length(6, 'remote');
330+
remoteRecordsFromQuery = remoteRecordsQueryReply.entries?.map(entry => entry.recordId);
234331
expect(remoteRecordsFromQuery).to.have.members([...localRecords, ...remoteRecords]);
235332
}).slow(1000); // Yellow at 500ms, Red at 1000ms.
236333

@@ -412,7 +509,7 @@ describe('SyncEngineLevel', () => {
412509

413510
// spy on sendDwnRequest to the remote DWN
414511
const sendDwnRequestSpy = sinon.spy(testHarness.agent.rpc, 'sendDwnRequest');
415-
const processMessageSpy = sinon.spy(testHarness.agent.dwn, 'processMessage');
512+
const processMessageSpy = sinon.spy(testHarness.agent.dwn.node, 'processMessage');
416513

417514
// Execute Sync to push records to Alice's remote node
418515
await syncEngine.pull();

0 commit comments

Comments
 (0)