1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
#include "WsClient.h"
#include <memory>
#include <unordered_set>
struct ClientContext
{
std::unordered_set<std::string> channels_;
std::shared_ptr<nosql::RedisSubscriber> subscriber_;
};
void WsClient::handleNewMessage(const WebSocketConnectionPtr &wsConnPtr,
std::string &&message,
const WebSocketMessageType &type)
{
if (type == WebSocketMessageType::Ping ||
type == WebSocketMessageType::Pong ||
type == WebSocketMessageType::Close)
{
return;
}
if (type != WebSocketMessageType::Text)
{
LOG_ERROR << "Unsupported message type " << (int)type;
return;
}
LOG_DEBUG << "WsClient new message from "
<< wsConnPtr->peerAddr().toIpPort();
auto context = wsConnPtr->getContext<ClientContext>();
if (!context)
{
auto pos = message.find(' ');
if (pos == std::string::npos)
{
wsConnPtr->send("Invalid publish message.");
return;
}
std::string channel = message.substr(0, pos);
std::string msg = message.substr(pos + 1);
LOG_INFO << "PUBLISH " << channel << " " << msg;
// Publisher
drogon::app().getRedisClient()->execCommandAsync(
[wsConnPtr](const nosql::RedisResult &result) {
std::string nSubs = std::to_string(result.asInteger());
LOG_INFO << "PUBLISH success to " << nSubs << " subscribers.";
wsConnPtr->send("PUBLISH success to " + nSubs +
" subscribers.");
},
[wsConnPtr](const nosql::RedisException &ex) {
LOG_INFO << "PUBLISH failed, " << ex.what();
wsConnPtr->send(std::string("PUBLISH failed: ") + ex.what());
},
"PUBLISH %s %s",
channel.c_str(),
msg.c_str());
return;
}
std::string channel = std::move(message);
if (channel.empty())
{
wsConnPtr->send("Channel not provided");
return;
}
bool subscribe = true;
if (channel.compare(0, 6, "unsub ") == 0)
{
channel = channel.substr(6);
subscribe = false;
}
if (subscribe)
{
if (context->channels_.find(channel) != context->channels_.end())
{
wsConnPtr->send("Already subscribed to channel " + channel);
return;
}
context->subscriber_->subscribe(
channel,
[channel, wsConnPtr](const std::string &subChannel,
const std::string &subMessage) {
assert(subChannel == channel);
LOG_INFO << "Receive channel message " << subMessage;
std::string resp = "{\"channel\":\"" + subChannel +
"\",\"message\":\"" + subMessage + "\"}";
wsConnPtr->send(resp);
});
context->channels_.insert(channel);
wsConnPtr->send("Subscribe to channel: " + channel);
}
else
{
if (context->channels_.find(channel) == context->channels_.end())
{
wsConnPtr->send("Channel not subscribed.");
return;
}
context->channels_.erase(channel);
context->subscriber_->unsubscribe(channel);
wsConnPtr->send("Unsubscribe from channel: " + channel);
}
}
void WsClient::handleNewConnection(const HttpRequestPtr &req,
const WebSocketConnectionPtr &wsConnPtr)
{
if (req->getPath() == "/sub")
{
LOG_DEBUG << "WsClient new subscriber connection from "
<< wsConnPtr->peerAddr().toIpPort();
std::shared_ptr<ClientContext> context =
std::make_shared<ClientContext>();
context->subscriber_ = drogon::app().getRedisClient()->newSubscriber();
wsConnPtr->setContext(context);
}
else
{
LOG_DEBUG << "WsClient new publisher connection from "
<< wsConnPtr->peerAddr().toIpPort();
}
}
void WsClient::handleConnectionClosed(const WebSocketConnectionPtr &wsConnPtr)
{
LOG_DEBUG << "WsClient close connection from "
<< wsConnPtr->peerAddr().toIpPort();
// Channels will be auto unsubscribed when subscriber destructed.
// auto context = wsConnPtr->getContext<ClientContext>();
// for (auto& channel : context->channels_)
// {
// context->subscriber_->unsubscribe(channel);
// }
wsConnPtr->clearContext();
}
|