1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
+++
date = "2017-08-15T14:19:24-04:00"
title = "Change Streams"
[menu.main]
parent = "Async Tutorials"
identifier = "Async Change Streams"
weight = 21
pre = "<i class='fa'></i>"
+++
## Change Streams - Draft
MongoDB 3.6 introduces a new [`$changeStream`](http://dochub.mongodb.org/core/changestreams) aggregation pipeline
operator.
Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the
`MongoCollection` API includes a new `watch` method. The `ChangeStreamIterable` sets up the change stream and automatically attempts
to resume if it encounters a potentially recoverable error.
## Prerequisites
- The example below requires a ``restaurants`` collection in the ``test`` database. To create and populate the collection, follow the directions in [github] (https://github.com/mongodb/docs-assets/tree/drivers).
- Include the following import statements:
```java
import com.mongodb.Block;
import com.mongodb.async.client.*;
import com.mongodb.async.SingleResultCallback;
import com.mongodb.client.model.*;
import org.bson.Document;
import java.util.Arrays;
```
- Include the following callback code which the examples in the tutorials will use:
```java
SingleResultCallback<Void> callbackWhenFinished = new SingleResultCallback<Void>() {
@Override
public void onResult(final Void result, final Throwable t) {
System.out.println("Operation Finished!");
}
};
```
- Include the following code which the examples in the tutorials will use to print the results of the change stream:
```java
Block<ChangeStreamDocument<Document>> printBlock = new Block<>() {
@Override
public void apply(final ChangeStreamDocument<Document> changeStreamDocument) {
System.out.println(changeStreamDocument);
}
};
```
## Connect to a MongoDB Deployment
Connect to a MongoDB deployment and declare and define a `MongoDatabase` and a `MongoCollection` instances.
For example, include the following code to connect to a replicaSet MongoDB deployment running on localhost on ports `27017`, `27018` and `27019`.
It also defines `database` to refer to the `test` database and `collection` to refer to the `restaurants` collection.
```java
MongoClient mongoClient = MongoClients.create(new ConnectionString("mongodb://localhost:27017,localhost:27018,localhost:27019"));
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");
```
For additional information on connecting to MongoDB, see [Connect to MongoDB]({{< relref "driver-async/tutorials/connect-to-mongodb.md" >}}).
## Watch the collection
To create a change stream use the the [`MongoCollection.watch()`]({{<apiref "com/mongodb/client/MongoCollection.html#watch">}}) method.
In the following example, the change stream prints out all changes it observes.
```java
collection.watch().forEach(printBlock, callbackWhenFinished);
```
## Filtering content
The `watch` method can also be passed a list of [aggregation stages]({{< docsref "meta/aggregation-quick-reference" >}}), that can modify
the data returned by the `$changeStream` operator. Note: not all aggregation operators are supported. See the
[`$changeStream`](http://dochub.mongodb.org/core/changestreams) documentation for more information.
In the following example the change stream prints out all changes it observes, for `insert`, `update`, `replace` and `delete` operations:
- First it uses a [`$match`]({{< docsref "reference/operator/aggregation/match/" >}}) stage to filter for documents where the `operationType`
is either an `insert`, `update`, `replace` or `delete`.
- Then, it sets the `fullDocument` to [`FullDocument.UPDATE_LOOKUP`]({{<apiref "com/mongodb/client/model/FullDocument.html#UPDATE_LOOKUP">}}),
so that the document after the update is included in the results.
```java
collection.watch(asList(Aggregates.match(Filters.in("operationType", asList("insert", "update", "replace", "delete")))))
.fullDocument(FullDocument.UPDATE_LOOKUP)
.forEach(printBlock, callbackWhenFinished);
```
|