Skip to content

Commit 04e7ddd

Browse files
committed
ADD #763 RxReplicationState.denied$
1 parent a5b6f82 commit 04e7ddd

File tree

7 files changed

+96
-56
lines changed

7 files changed

+96
-56
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Breaking:
1414
Features:
1515
- Added `RxDocument.atomicSet()`
1616
- Added `RxCollection.awaitPersistence()` for in-memory-collections
17+
- Added `RxReplicationState.denied$` [#763](https://github.com/pubkey/rxdb/issues/763)
1718

1819
Bugfixes:
1920
- checkAdapter doesn't cleanup test databases [#714](https://github.com/pubkey/rxdb/issues/714)

docs-src/replication.md

+7
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ Emits each replicated document-data.
4545
replicationState.docs$.subscribe(docData => console.dir(docData));
4646
```
4747

48+
### denied$
49+
Emits when a document failed to replicate (e.g. due to permissions).
50+
51+
```js
52+
replicationState.denied$.subscribe(docData => console.dir(docData));
53+
```
54+
4855
### active$
4956
Emits `true` or `false` depending if the replication is running. For example if you sync with a remote server and the connection dies, this is `false` until the connection can be reestablished.
5057

src/plugins/in-memory.js

-4
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,7 @@ export class InMemoryRxCollection extends RxCollection.RxCollection {
146146
* @overwrite
147147
*/
148148
async _pouchPut(obj, overwrite) {
149-
console.log('new pouch put');
150149
const ret = await this._oldPouchPut(obj, overwrite);
151-
console.log('Ret');
152-
console.dir(ret);
153-
154150
this._nonPersistentRevisions.add(ret.rev);
155151
return ret;
156152
}

src/plugins/replication.js

+33-26
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export class RxReplicationState {
3838
this._subjects = {
3939
change: new Subject(),
4040
docs: new Subject(),
41+
denied: new Subject(),
4142
active: new BehaviorSubject(false),
4243
complete: new BehaviorSubject(false),
4344
error: new Subject(),
@@ -46,7 +47,7 @@ export class RxReplicationState {
4647
// create getters
4748
Object.keys(this._subjects).forEach(key => {
4849
Object.defineProperty(this, key + '$', {
49-
get: function () {
50+
get: function() {
5051
return this._subjects[key].asObservable();
5152
}
5253
});
@@ -60,54 +61,60 @@ export class RxReplicationState {
6061
// change
6162
this._subs.push(
6263
fromEvent(evEmitter, 'change')
63-
.subscribe(ev => this._subjects.change.next(ev))
64+
.subscribe(ev => this._subjects.change.next(ev))
65+
);
66+
67+
// denied
68+
this._subs.push(
69+
fromEvent(evEmitter, 'denied')
70+
.subscribe(ev => this._subjects.denied.next(ev))
6471
);
6572

6673
// docs
6774
this._subs.push(
6875
fromEvent(evEmitter, 'change')
69-
.subscribe(ev => {
70-
if (
71-
this._subjects.docs.observers.length === 0 ||
72-
ev.direction !== 'pull'
73-
) return;
74-
75-
ev.change.docs
76-
.filter(doc => doc.language !== 'query') // remove internal docs
77-
.map(doc => this.collection._handleFromPouch(doc)) // do primary-swap and keycompression
78-
.forEach(doc => this._subjects.docs.next(doc));
79-
}));
76+
.subscribe(ev => {
77+
if (
78+
this._subjects.docs.observers.length === 0 ||
79+
ev.direction !== 'pull'
80+
) return;
81+
82+
ev.change.docs
83+
.filter(doc => doc.language !== 'query') // remove internal docs
84+
.map(doc => this.collection._handleFromPouch(doc)) // do primary-swap and keycompression
85+
.forEach(doc => this._subjects.docs.next(doc));
86+
}));
8087

8188
// error
8289
this._subs.push(
8390
fromEvent(evEmitter, 'error')
84-
.subscribe(ev => this._subjects.error.next(ev))
91+
.subscribe(ev => this._subjects.error.next(ev))
8592
);
8693

8794
// active
8895
this._subs.push(
8996
fromEvent(evEmitter, 'active')
90-
.subscribe(() => this._subjects.active.next(true))
97+
.subscribe(() => this._subjects.active.next(true))
9198
);
9299
this._subs.push(
93100
fromEvent(evEmitter, 'paused')
94-
.subscribe(() => this._subjects.active.next(false))
101+
.subscribe(() => this._subjects.active.next(false))
95102
);
96103

97104
// complete
98105
this._subs.push(
99106
fromEvent(evEmitter, 'complete')
100-
.subscribe(info => {
107+
.subscribe(info => {
101108

102-
/**
103-
* when complete fires, it might be that not all changeEvents
104-
* have passed throught, because of the delay of .wachtForChanges()
105-
* Therefore we have to first ensure that all previous changeEvents have been handled
106-
*/
107-
const unhandledEvents = Array.from(this.collection._watchForChangesUnhandled);
109+
/**
110+
* when complete fires, it might be that not all changeEvents
111+
* have passed throught, because of the delay of .wachtForChanges()
112+
* Therefore we have to first ensure that all previous changeEvents have been handled
113+
*/
114+
const unhandledEvents = Array.from(this.collection._watchForChangesUnhandled);
108115

109-
Promise.all(unhandledEvents).then(() => this._subjects.complete.next(info));
110-
})
116+
Promise.all(unhandledEvents).then(() => this._subjects.complete.next(info));
117+
})
111118
);
112119
}
113120

@@ -252,7 +259,7 @@ export const prototypes = {
252259
export const overwritable = {};
253260

254261
export const hooks = {
255-
createRxCollection: function (collection) {
262+
createRxCollection: function(collection) {
256263
INTERNAL_POUCHDBS.add(collection.pouch);
257264
}
258265
};

test/unit/replication.test.js

+18
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,24 @@ describe('replication.test.js', () => {
367367
await AsyncTestUtil.waitUntil(() => emitedDocs.length === 10);
368368
emitedDocs.forEach(doc => assert.ok(doc.firstName));
369369

370+
c.database.destroy();
371+
c2.database.destroy();
372+
});
373+
});
374+
describe('denied$', () => {
375+
it('should not emit', async () => {
376+
const c = await humansCollection.create(0);
377+
const c2 = await humansCollection.create(10);
378+
const repState = await c.sync({
379+
remote: c2,
380+
waitForLeadership: false
381+
});
382+
const emitted = [];
383+
repState.denied$.subscribe(doc => emitted.push(doc));
384+
385+
await AsyncTestUtil.wait(100);
386+
assert.equal(emitted.length, 0);
387+
370388
c.database.destroy();
371389
c2.database.destroy();
372390
});

typings/plugins/replication.d.ts

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { Observable } from 'rxjs';
2+
3+
import { RxQuery } from '../rx-query.d';
4+
import {
5+
PouchReplicationOptions
6+
} from '../pouch';
7+
8+
export declare class RxReplicationState {
9+
change$: Observable<any>;
10+
docs$: Observable<any>;
11+
denied$: Observable<any>;
12+
active$: Observable<any>;
13+
complete$: Observable<any>;
14+
error$: Observable<any>;
15+
cancel(): Promise<any>;
16+
17+
// if you do a custom sync, put the thing you get back from pouch here
18+
setPouchEventEmitter(pouchSyncState: any): void;
19+
}
20+
21+
export interface SyncOptions {
22+
remote: string | any,
23+
waitForLeadership?: boolean,
24+
direction?: {
25+
push?: boolean,
26+
pull?: boolean
27+
},
28+
// for options see https://pouchdb.com/api.html#replication
29+
options?: PouchReplicationOptions,
30+
query?: RxQuery<any, any>
31+
}

typings/rx-collection.d.ts

+6-26
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ import {
1515
RxQuery
1616
} from './rx-query';
1717
import {
18-
PouchSettings,
19-
PouchReplicationOptions
18+
PouchSettings
2019
} from './pouch';
2120
import {
2221
RxChangeEventInsert,
@@ -28,6 +27,11 @@ import {
2827
RxLocalDocument
2928
} from './rx-document';
3029

30+
import {
31+
SyncOptions,
32+
RxReplicationState
33+
} from './plugins/replication';
34+
3135
export interface RxCollectionCreator {
3236
name: string;
3337
schema: RxJsonSchema;
@@ -48,30 +52,6 @@ export interface RxCollectionCreator {
4852
options?: any;
4953
}
5054

51-
export declare class RxReplicationState {
52-
change$: Observable<any>;
53-
docs$: Observable<any>;
54-
active$: Observable<any>;
55-
complete$: Observable<any>;
56-
error$: Observable<any>;
57-
cancel(): Promise<any>;
58-
59-
// if you do a custom sync, put the thing you get back from pouch here
60-
setPouchEventEmitter(pouchSyncState: any): void;
61-
}
62-
63-
export interface SyncOptions {
64-
remote: string | any,
65-
waitForLeadership?: boolean,
66-
direction?: {
67-
push?: boolean,
68-
pull?: boolean
69-
},
70-
// for options see https://pouchdb.com/api.html#replication
71-
options?: PouchReplicationOptions,
72-
query?: RxQuery<any, any>
73-
}
74-
7555
export declare class RxCollection<RxDocumentType, OrmMethods = {}> {
7656
readonly database: RxDatabase;
7757
readonly name: string;

0 commit comments

Comments
 (0)