23 #include "common_priv.h"
33 : debuglevel(0), shutdown(false), numlivenodes(0), keycount(0)
50 dbprintf(4,
"msg:%s:%s\n", sender.c_str(), libvariant::SerializeJSON(msg).c_str());
51 if (!msg.IsMap()) {
return; }
109 LogMessage(sender +
":" + msg[
"msg"].AsString());
117 Variant msg(Variant::MapType);
124 std::string name = msg[
"name"].AsString();
125 Variant reply(Variant::MapType);
126 reply[
"msgid"] = msg[
"msgid"];
127 reply[
"msgtype"] =
"reply";
129 reply[
"success"] =
false;
132 Variant kernelinfo(Variant::MapType);
134 kernelinfo[
"key"] = kernelkey;
135 kernelinfo[
"name"] = name;
136 kernelinfo[
"hostname"] = msg[
"hostname"];
137 kernelinfo[
"servname"] = msg[
"servname"];
138 kernelinfo[
"live"] =
false;
139 kernelinfo[
"type"] =
"kernelinfo";
140 kernelinfo[
"client"] = sender;
141 datamap.insert(std::make_pair(kernelkey, kernelinfo));
142 kernelmap.insert(std::make_pair(name, kernelkey));
143 dbprintf(2,
"Kernel %s created\n", name.c_str());
144 reply[
"success"] =
true;
145 reply[
"kernelinfo"] = kernelinfo.Copy();
151 Key_t kernelkey = msg[
"key"].AsNumber<
Key_t>();
152 Variant kernelinfo =
datamap[kernelkey];
153 ASSERT(kernelinfo[
"type"].AsString() ==
"kernelinfo");
154 kernelinfo[
"live"] =
true;
155 dbprintf(2,
"Kernel %s started\n", kernelinfo[
"name"].AsString().c_str());
157 notice[
"kernelinfo"] = kernelinfo.Copy();
162 Key_t kernelkey = msg[
"key"].AsNumber<
Key_t>();
163 Variant kernelinfo =
datamap[kernelkey];
164 ASSERT(kernelinfo[
"type"].AsString() ==
"kernelinfo");
165 kernelinfo[
"live"] =
false;
166 dbprintf(2,
"Kernel %s stopped\n", kernelinfo[
"name"].AsString().c_str());
168 notice[
"kernelinfo"] = kernelinfo.Copy();
174 Variant reply(Variant::MapType);
175 reply[
"msgtype"] =
"reply";
176 reply[
"msgid"] = msg[
"msgid"];
177 reply[
"success"] =
false;
178 if (msg.Contains(
"name")) {
179 NameKeyMap::iterator entry =
kernelmap.find(msg[
"name"].AsString());
184 kernelkey = entry->second;
186 kernelkey = msg[
"key"].AsNumber<
Key_t>();
188 DataMap::iterator entry =
datamap.find(kernelkey);
190 ASSERT(entry->second[
"type"].AsString() ==
"kernelinfo");
191 reply[
"success"] =
true;
192 reply[
"kernelinfo"] = entry->second.Copy();
198 std::string nodename = msg[
"name"].AsString();
199 Variant reply(Variant::MapType);
200 reply[
"msgid"] = msg[
"msgid"];
201 reply[
"msgtype"] =
"reply";
203 reply[
"success"] =
false;
206 Variant nodeinfo(Variant::MapType);
207 nodeinfo[
"name"] = nodename;
208 nodeinfo[
"key"] = nodekey;
209 nodeinfo[
"kernelkey"] = msg[
"kernelkey"];
210 nodeinfo[
"started"] =
false;
211 nodeinfo[
"dead"] =
false;
212 nodeinfo[
"type"] =
"nodeinfo";
213 nodeinfo[
"endpoints"] = Variant::MapType;
214 dbprintf(2,
"Node %s created\n", nodename.c_str());
217 reply[
"success"] =
true;
218 reply[
"nodeinfo"] = nodeinfo.Copy();
226 Variant nodeinfo =
datamap[nodekey];
227 ASSERT(nodeinfo[
"type"].AsString() ==
"nodeinfo");
228 nodeinfo[
"started"] =
true;
229 dbprintf(2,
"Node %s started\n", nodeinfo[
"name"].AsString().c_str());
231 notice[
"nodeinfo"] = nodeinfo.Copy();
238 Variant nodeinfo =
datamap[nodekey];
239 ASSERT(nodeinfo[
"type"].AsString() ==
"nodeinfo");
240 nodeinfo[
"dead"] =
true;
241 dbprintf(2,
"Node %s stopped\n", nodeinfo[
"name"].AsString().c_str());
243 notice[
"nodeinfo"] = nodeinfo.Copy();
249 Variant reply(Variant::MapType);
250 reply[
"msgtype"] =
"reply";
251 reply[
"msgid"] = msg[
"msgid"];
252 reply[
"success"] =
false;
253 if (msg.Contains(
"name")) {
254 NameKeyMap::iterator entry =
nodemap.find(msg[
"name"].AsString());
259 nodekey = entry->second;
261 nodekey = msg[
"key"].AsNumber<
Key_t>();
263 DataMap::iterator entry =
datamap.find(nodekey);
265 reply[
"nodeinfo"] = entry->second.Copy();
266 ASSERT(reply[
"nodeinfo"][
"type"].AsString() ==
"nodeinfo");
267 reply[
"success"] =
true;
273 Variant reply(Variant::MapType);
274 reply[
"msgid"] = msg[
"msgid"];
275 reply[
"msgtype"] =
"reply";
277 reply[
"success"] =
true;
282 std::string name = msg[
"name"].AsString();
284 reply[
"msgid"] = msg[
"msgid"];
285 reply[
"msgtype"] =
"reply";
286 reply[
"success"] =
false;
287 Key_t nodekey = msg[
"nodekey"].AsNumber<
Key_t>();
288 DataMap::iterator nodeentry =
datamap.find(nodekey);
289 if (nodeentry !=
datamap.end()) {
290 Variant nodeinfo = nodeentry->second;
293 if (nodeinfo[
"endpoints"].Contains(name)) {
294 epkey = nodeinfo[
"endpoints"][name].AsNumber<
Key_t>();
296 ASSERT(epinfo[
"type"].AsString() ==
"endpointinfo");
299 nodeinfo[
"endpoints"][name] = epkey;
300 epinfo = Variant(Variant::MapType);
301 epinfo[
"key"] = epkey;
302 epinfo[
"name"] = name;
303 epinfo[
"nodekey"] = nodekey;
304 epinfo[
"kernelkey"] = nodeinfo[
"kernelkey"];
305 epinfo[
"live"] =
true;
306 epinfo[
"type"] =
"endpointinfo";
309 reply[
"endpointinfo"] = epinfo.Copy();
310 reply[
"success"] =
true;
318 reply[
"msgid"] = msg[
"msgid"];
319 reply[
"msgtype"] =
"reply";
320 reply[
"success"] =
false;
321 DataMap::iterator entry =
datamap.find(epkey);
323 reply[
"success"] =
true;
324 reply[
"endpointinfo"] = entry->second.Copy();
325 ASSERT(reply[
"endpointinfo"][
"type"].AsString() ==
"endpointinfo");
332 Key_t wkey = msg[
"writerkey"].AsNumber<
Key_t>();
333 Key_t rkey = msg[
"readerkey"].AsNumber<
Key_t>();
334 std::string qname = msg[
"qname"].AsString();
335 datamap[wkey][
"readerkey"] = rkey;
336 datamap[rkey][
"writerkey"] = wkey;
337 datamap[rkey][
"qname"] = qname;
338 datamap[wkey][
"qname"] = qname;
347 Key_t kernelkey = msg[
"kernelkey"].AsNumber<
Key_t>();
348 Variant kernelinfo =
datamap[kernelkey];
354 msg[
"msgtype"] =
"broadcast";
RCTXMT_t
RCTXMT Remote Context Message Type These are the message types that the remote context uses to send i...
void GetNodeInfo(const std::string &sender, const Variant &msg)
virtual void SendMessage(const std::string &recipient, const Variant &msg)=0
Send the given message to the given client.
void ConnectEndpoints(const Variant &msg)
void GetKernelInfo(const std::string &sender, const Variant &msg)
virtual void LogMessage(const std::string &msg)=0
Log a message.
void GetNumNodeLive(const std::string &sender, const Variant &msg)
void dbprintf(int level, const char *fmt,...)
void SignalKernelStart(const Variant &msg)
virtual void BroadcastMessage(const Variant &msg)=0
Broadcast a message to all clients.
Variant NewBroadcastMessage()
void GetEndpointInfo(const std::string &sender, const Variant &msg)
void SignalNodeEnd(const Variant &msg)
void RouteKernelMessage(const Variant &msg)
void DispatchMessage(const std::string &sender, const Variant &msg)
Process the given message.
bool IsTerminated() const
void SignalNodeStart(const Variant &msg)
void CreateNodeKey(const std::string &sender, const Variant &msg)
void SetupKernel(const std::string &sender, const Variant &msg)
virtual ~RemoteContextServer()
void SignalKernelEnd(const Variant &msg)
void GetCreateEndpointKey(const std::string &sender, const Variant &msg)