1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
// Copyright 2009-present MongoDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cstdlib>
#include <iostream>
#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/builder/basic/kvp.hpp>
#include <bsoncxx/json.hpp>
#include <bsoncxx/stdx/optional.hpp>
#include <bsoncxx/string/to_string.hpp>
#include <mongocxx/change_stream.hpp>
#include <mongocxx/client.hpp>
#include <mongocxx/instance.hpp>
#include <mongocxx/pool.hpp>
#include <mongocxx/uri.hpp>
#include <examples/macros.hh>
namespace {
// watch_forever iterates the change stream until an error occurs.
[[noreturn]] void watch_forever(mongocxx::collection& collection) {
mongocxx::options::change_stream options;
// Wait up to 1 second before polling again.
const std::chrono::milliseconds await_time{1000};
options.max_await_time(await_time);
mongocxx::change_stream stream = collection.watch(options);
while (true) {
for (const auto& event : stream) {
std::cout << bsoncxx::to_json(event) << std::endl;
}
std::cout << "No new notifications. Trying again..." << std::endl;
}
}
} // namespace
int EXAMPLES_CDECL main(int argc, char* argv[]) {
if (std::getenv("MONGOCXX_TEST_TOPOLOGY")) {
std::cerr << "Skipping: change_streams example should not be run by tests" << std::endl;
return 0;
}
mongocxx::instance inst{};
auto uri_str = mongocxx::uri::k_default_uri;
std::string db = "db";
std::string coll = "coll";
for (int i = 1; i < argc; i++) {
std::string arg = argv[i];
bsoncxx::stdx::optional<std::string> nextarg;
if (i < argc - 1) {
nextarg = std::string(argv[i + 1]);
}
if (arg == "--uri") {
if (!nextarg) {
std::cerr << "Expected value for '" << arg << "' option" << std::endl;
return EXIT_FAILURE;
}
uri_str = *nextarg;
i++;
continue;
}
if (arg == "--db") {
if (!nextarg) {
std::cerr << "Expected value for '" << arg << "' option" << std::endl;
return EXIT_FAILURE;
}
db = *nextarg;
i++;
continue;
}
if (arg == "--coll") {
if (!nextarg) {
std::cerr << "Expected value for '" << arg << "'option" << std::endl;
return EXIT_FAILURE;
}
coll = *nextarg;
i++;
continue;
}
std::cerr << "Unexpected argument: '" << arg << "'" << std::endl;
std::cerr << "Usage: " << argv[0] << " [--uri <uri>] [--db <db_name>] [--coll <coll_name>]"
<< std::endl;
return EXIT_FAILURE;
}
mongocxx::pool pool{mongocxx::uri(uri_str)};
try {
auto entry = pool.acquire();
auto collection = (*entry)[db][coll];
std::cout << "Watching for notifications on the collection " << db << "." << coll
<< std::endl;
std::cout << "To observe a notification, try inserting a document." << std::endl;
watch_forever(collection);
return EXIT_SUCCESS;
} catch (const std::exception& exception) {
std::cerr << "Caught exception \"" << exception.what() << "\"" << std::endl;
} catch (...) {
std::cerr << "Caught unknown exception type" << std::endl;
}
return EXIT_FAILURE;
}
|