File: change-streams.md

package info (click to toggle)
mongo-java-driver 3.6.3-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, buster, forky, sid, trixie
  • size: 16,112 kB
  • sloc: java: 102,506; xml: 395; sh: 43; makefile: 4
file content (103 lines) | stat: -rw-r--r-- 4,132 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
+++
date = "2017-08-15T14:19:24-04:00"
title = "Change Streams"
[menu.main]
  parent = "Async Tutorials"
  identifier = "Async Change Streams"
  weight = 21
  pre = "<i class='fa'></i>"
+++

## Change Streams - Draft

MongoDB 3.6 introduces a new [`$changeStream`](http://dochub.mongodb.org/core/changestreams) aggregation pipeline
operator.

Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the 
`MongoCollection` API includes a new `watch` method. The `ChangeStreamIterable` sets up the change stream and automatically attempts 
to resume if it encounters a potentially recoverable error.

## Prerequisites

- The example below requires a ``restaurants`` collection in the ``test`` database. To create and populate the collection, follow the directions in [github] (https://github.com/mongodb/docs-assets/tree/drivers).

- Include the following import statements:

```java
import com.mongodb.Block;
import com.mongodb.async.client.*;
import com.mongodb.async.SingleResultCallback;

import com.mongodb.client.model.*;

import org.bson.Document;

import java.util.Arrays;
```

- Include the following callback code which the examples in the tutorials will use:

```java
SingleResultCallback<Void> callbackWhenFinished = new SingleResultCallback<Void>() {
    @Override
    public void onResult(final Void result, final Throwable t) {
        System.out.println("Operation Finished!");
    }
};
```

- Include the following code which the examples in the tutorials will use to print the results of the change stream:

```java
Block<ChangeStreamDocument<Document>> printBlock = new Block<>() {
    @Override
    public void apply(final ChangeStreamDocument<Document> changeStreamDocument) {
        System.out.println(changeStreamDocument);
    }
};
```

## Connect to a MongoDB Deployment

Connect to a MongoDB deployment and declare and define a `MongoDatabase` and a `MongoCollection` instances.

For example, include the following code to connect to a replicaSet MongoDB deployment running on localhost on ports `27017`, `27018` and `27019`. 
It also defines `database` to refer to the `test` database and `collection` to refer to the `restaurants` collection.

```java
MongoClient mongoClient = MongoClients.create(new ConnectionString("mongodb://localhost:27017,localhost:27018,localhost:27019"));
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");
```

For additional information on connecting to MongoDB, see [Connect to MongoDB]({{< relref "driver-async/tutorials/connect-to-mongodb.md" >}}).

## Watch the collection

To create a change stream use the the [`MongoCollection.watch()`]({{<apiref "com/mongodb/client/MongoCollection.html#watch">}}) method.

In the following example, the change stream prints out all changes it observes.

```java
collection.watch().forEach(printBlock, callbackWhenFinished);
```

## Filtering content

The `watch` method can also be passed a list of [aggregation stages]({{< docsref "meta/aggregation-quick-reference" >}}), that can modify 
the data returned by the `$changeStream` operator. Note: not all aggregation operators are supported. See the 
[`$changeStream`](http://dochub.mongodb.org/core/changestreams) documentation for more information.

In the following example the change stream prints out all changes it observes, for `insert`, `update`, `replace` and `delete` operations:

- First it uses a [`$match`]({{< docsref "reference/operator/aggregation/match/" >}}) stage to filter for documents where the `operationType` 
is either an `insert`, `update`, `replace` or `delete`.

- Then, it sets the `fullDocument` to [`FullDocument.UPDATE_LOOKUP`]({{<apiref "com/mongodb/client/model/FullDocument.html#UPDATE_LOOKUP">}}),
so that the document after the update is included in the results.

```java
collection.watch(asList(Aggregates.match(Filters.in("operationType", asList("insert", "update", "replace", "delete")))))
        .fullDocument(FullDocument.UPDATE_LOOKUP)
        .forEach(printBlock, callbackWhenFinished);
```