Skip to content

Commit a5b6f82

Browse files
committedSep 1, 2018
ADD RxCollection.awaitPersistence()
1 parent 308d6fc commit a5b6f82

File tree

7 files changed

+104
-21
lines changed

7 files changed

+104
-21
lines changed
 

‎CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Breaking:
1313

1414
Features:
1515
- Added `RxDocument.atomicSet()`
16+
- Added `RxCollection.awaitPersistence()` for in-memory-collections
1617

1718
Bugfixes:
1819
- checkAdapter doesn't cleanup test databases [#714](https://github.com/pubkey/rxdb/issues/714)

‎docs-src/in-memory.md

+29-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,25 @@
1-
# in-memory
1+
# InMemory Collections
22

3-
When you do a heavy amount of operations on a `RxCollection`, you might want to optimize this by using the in-memory-replication of the collection. The in-memory-replication behaves equal to the original collection but is stored in the Ram of your computer.
3+
When you do a heavy amount of operations on a `RxCollection`, you might want to optimize this by using the in-memory-replication of the collection. The in-memory-replication behaves equal to the original collection but is stored in the Memory of your computer instead of the hard drive
4+
5+
## Pros:
6+
7+
- Faster queries
8+
- Faster writes
9+
- Querying works over encrypted fields
10+
11+
## Cons:
12+
13+
- The original collection has to be small enough to fit into the memory
14+
- No attachment-support
15+
- Initial creation takes longer (all data is loaded from disc into the memory)
16+
17+
18+
### encryption
19+
Encrypted fields are automatically decrypted inside of the memory-collection. This means you can do queries over encrypted fields.
20+
21+
### replication
22+
The memory-collection is two-way-replicated with its original collection. This means when you change documents on one of them, the update and the change-event will fire on both.
423

524
## RxCollection().inMemory();
625

@@ -20,24 +39,18 @@ const docs = await memCol.find().exec(); // has same result as on the original c
2039

2140
```
2241

23-
### encryption
24-
Encrypted fields are automatically decrypted inside of the memory-collection. This means you can do queries over encrypted fields.
42+
## RxCollection().awaitPersistence()
2543

26-
### replication
27-
The memory-collection is two-way-replicated with its original collection. This means when you change documents on one of them, the update and the change-event will fire on both.
44+
When you do a write into the inMemoryCollection, it takes some time until the change is replicated at the parents collections.
45+
To know when you can be sure that all writes have been replicated, call `awaitPersistence()`
2846

29-
## Pros:
30-
31-
- Faster queries
32-
- Faster writes
33-
- Querying works over encrypted fields
34-
35-
## Cons:
47+
```js
48+
const memCol = await myCollection.inMemory();
3649

37-
- The original collection has to be small enough to fit into the memory
38-
- No attachment-support
39-
- Initial creation takes longer (all data is loaded from disc into the memory)
50+
await memCol.insert({foo: 'bar'});
4051

52+
await memCol.awaitPersistence(); // after this you can be sure that everything is replicated
53+
```
4154

4255
--------------------------------------------------------------------------------
4356

‎orga/releases/8.0.0.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@ Because the fields of an `RxDocument` are defined dynamically by the schema and
3939
Once per collection, a custom RxDocument-protoptype and constructor is created and each RxDocument of this collection is created with the constructor.
4040

4141
## Rewritten the inMemory-plugin
42-
The inMemory-plugin was written with some wrong estimations. I rewrote it and added much more tests.
42+
The inMemory-plugin was written with some wrong estimations. I rewrote it and added much more tests. Also `awaitPersistence()` can now be used to check if all writes have already been replicated into the parent collection.

‎src/plugins/in-memory.js

+47-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ import {
1212

1313
import {
1414
filter,
15-
map
15+
map,
16+
mergeMap,
17+
first
1618
} from 'rxjs/operators';
1719

1820
import RxCollection from '../rx-collection';
@@ -70,6 +72,12 @@ export class InMemoryRxCollection extends RxCollection.RxCollection {
7072

7173
this._observable$ = new Subject();
7274
this._changeEventBuffer = ChangeEventBuffer.create(this);
75+
76+
const parentProto = Object.getPrototypeOf(parentCollection);
77+
this._oldPouchPut = parentProto._pouchPut.bind(this);
78+
79+
this._nonPersistentRevisions = new Set();
80+
this._nonPersistentRevisionsSubject = new Subject(); // emits Set.size() when Set is changed
7381
}
7482

7583
async prepare() {
@@ -103,14 +111,51 @@ export class InMemoryRxCollection extends RxCollection.RxCollection {
103111
* create an ongoing replications between both sides
104112
*/
105113
const thisToParentSub = streamChangedDocuments(this)
106-
.subscribe(doc => applyChangedDocumentToPouch(this._parentCollection, doc));
114+
.pipe(
115+
mergeMap(doc => applyChangedDocumentToPouch(this._parentCollection, doc)
116+
.then(() => doc._rev)
117+
)
118+
)
119+
.subscribe(changeRev => {
120+
this._nonPersistentRevisions.delete(changeRev);
121+
this._nonPersistentRevisionsSubject.next(this._nonPersistentRevisions.size);
122+
});
107123
this._subs.push(thisToParentSub);
108124

109125
const parentToThisSub = streamChangedDocuments(this._parentCollection)
110126
.subscribe(doc => applyChangedDocumentToPouch(this, doc));
111127
this._subs.push(parentToThisSub);
112128
}
113129

130+
/**
131+
* waits until all writes are persistent
132+
* in the parent collection
133+
* @return {Promise}
134+
*/
135+
awaitPersistence() {
136+
if (this._nonPersistentRevisions.size === 0) return Promise.resolve();
137+
return this._nonPersistentRevisionsSubject.pipe(
138+
filter(() => this._nonPersistentRevisions.size === 0),
139+
first()
140+
).toPromise();
141+
}
142+
143+
/**
144+
* To know which events are replicated and which are not,
145+
* the _pouchPut is wrapped
146+
* @overwrite
147+
*/
148+
async _pouchPut(obj, overwrite) {
149+
console.log('new pouch put');
150+
const ret = await this._oldPouchPut(obj, overwrite);
151+
console.log('Ret');
152+
console.dir(ret);
153+
154+
this._nonPersistentRevisions.add(ret.rev);
155+
return ret;
156+
}
157+
158+
114159
/**
115160
* @overwrite
116161
*/

‎src/rx-collection.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,11 @@ export class RxCollection {
231231
}
232232

233233
/**
234-
* [overwrite description]
234+
* every write on the pouchdb
235+
* is tunneld throught this function
235236
* @param {object} obj
236237
* @param {boolean} [overwrite=false] if true, it will overwrite existing document
238+
* @return {Promise}
237239
*/
238240
async _pouchPut(obj, overwrite = false) {
239241
obj = this._handleToPouch(obj);

‎test/unit/in-memory.test.js

+20
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,26 @@ config.parallel('in-memory.test.js', () => {
451451
col.database.destroy();
452452
});
453453
});
454+
describe('.awaitPersistence()', () => {
455+
it('should resolve the promise after some time', async () => {
456+
const col = await humansCollection.create(0);
457+
const memCol = await col.inMemory();
458+
459+
await memCol.awaitPersistence();
460+
461+
await memCol.insert(schemaObjects.simpleHuman());
462+
await memCol.awaitPersistence();
463+
464+
const doc = await memCol.findOne().exec();
465+
await doc.atomicSet('age', 6);
466+
await memCol.awaitPersistence();
467+
468+
await doc.remove();
469+
await memCol.awaitPersistence();
470+
471+
col.database.destroy();
472+
});
473+
});
454474
describe('other', () => {
455475
it('should work with many documents', async () => {
456476
const amount = 100;

‎typings/rx-collection.d.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ export declare class RxCollection<RxDocumentType, OrmMethods = {}> {
104104
postRemove(fun: RxCollectionHookCallback<RxDocumentType, OrmMethods>, parallel: boolean): void;
105105
postCreate(fun: RxCollectionHookCallback<RxDocumentType, OrmMethods>): void;
106106

107-
108107
// migration
109108
migrationNeeded(): Promise<boolean>;
110109
migrate(batchSize: number): Observable<{
@@ -130,6 +129,9 @@ export declare class RxCollection<RxDocumentType, OrmMethods = {}> {
130129
upsertLocal(id: string, data: any): Promise<RxLocalDocument<RxCollection<RxDocumentType, OrmMethods>>>;
131130
getLocal(id: string): Promise<RxLocalDocument<RxCollection<RxDocumentType, OrmMethods>>>;
132131

132+
// only inMemory-collections
133+
awaitPersistence(): Promise<void>;
134+
133135
destroy(): Promise<boolean>;
134136
remove(): Promise<any>;
135137
}

0 commit comments

Comments
 (0)
Please sign in to comment.