CPN
Computational Process Networks
RemoteContextServer.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
27 #include <stdio.h>
28 #include <stdarg.h>
29 
30 namespace CPN {
31 
33  : debuglevel(0), shutdown(false), numlivenodes(0), keycount(0)
34  {
35  }
36 
38  }
39 
40  void RemoteContextServer::dbprintf(int level, const char *fmt, ...) {
41  if (debuglevel >= level) {
42  va_list ap;
43  va_start(ap, fmt);
44  vprintf(fmt, ap);
45  va_end(ap);
46  }
47  }
48 
49  void RemoteContextServer::DispatchMessage(const std::string &sender, const Variant &msg) {
50  dbprintf(4, "msg:%s:%s\n", sender.c_str(), libvariant::SerializeJSON(msg).c_str());
51  if (!msg.IsMap()) { return; }
52  if (IsTerminated()) {
53  return;
54  }
55  RCTXMT_t type = msg["type"].AsNumber<RCTXMT_t>();
56  switch (type) {
58  SetupKernel(sender, msg);
59  break;
61  SignalKernelStart(msg);
62  break;
64  SignalKernelEnd(msg);
65  break;
67  GetKernelInfo(sender, msg);
68  break;
72  case RCTXMT_CREATE_NODE:
73  RouteKernelMessage(msg);
74  break;
76  CreateNodeKey(sender, msg);
77  break;
79  SignalNodeStart(msg);
80  break;
82  SignalNodeEnd(msg);
83  break;
85  GetNodeInfo(sender, msg);
86  break;
88  GetNumNodeLive(sender, msg);
89  break;
91  GetCreateEndpointKey(sender, msg);
92  break;
94  GetEndpointInfo(sender, msg);
95  break;
97  GetCreateEndpointKey(sender, msg);
98  break;
100  GetEndpointInfo(sender, msg);
101  break;
103  ConnectEndpoints(msg);
104  break;
105  case RCTXMT_TERMINATE:
106  Terminate();
107  break;
108  case RCTXMT_LOG:
109  LogMessage(sender + ":" + msg["msg"].AsString());
110  break;
111  default:
112  ASSERT(false);
113  }
114  }
115 
117  Variant msg(Variant::MapType);
118  msg["type"] = RCTXMT_TERMINATE;
119  BroadcastMessage(msg);
120  shutdown = true;
121  }
122 
123  void RemoteContextServer::SetupKernel(const std::string &sender, const Variant &msg) {
124  std::string name = msg["name"].AsString();
125  Variant reply(Variant::MapType);
126  reply["msgid"] = msg["msgid"];
127  reply["msgtype"] = "reply";
128  if (kernelmap.find(name) != kernelmap.end()) {
129  reply["success"] = false;
130  return;
131  } else {
132  Variant kernelinfo(Variant::MapType);
133  Key_t kernelkey = NewKey();
134  kernelinfo["key"] = kernelkey;
135  kernelinfo["name"] = name;
136  kernelinfo["hostname"] = msg["hostname"];
137  kernelinfo["servname"] = msg["servname"];
138  kernelinfo["live"] = false;
139  kernelinfo["type"] = "kernelinfo";
140  kernelinfo["client"] = sender;
141  datamap.insert(std::make_pair(kernelkey, kernelinfo));
142  kernelmap.insert(std::make_pair(name, kernelkey));
143  dbprintf(2, "Kernel %s created\n", name.c_str());
144  reply["success"] = true;
145  reply["kernelinfo"] = kernelinfo.Copy();
146  }
147  SendMessage(sender, reply);
148  }
149 
150  void RemoteContextServer::SignalKernelStart(const Variant &msg) {
151  Key_t kernelkey = msg["key"].AsNumber<Key_t>();
152  Variant kernelinfo = datamap[kernelkey];
153  ASSERT(kernelinfo["type"].AsString() == "kernelinfo");
154  kernelinfo["live"] = true;
155  dbprintf(2, "Kernel %s started\n", kernelinfo["name"].AsString().c_str());
156  Variant notice = NewBroadcastMessage();
157  notice["kernelinfo"] = kernelinfo.Copy();
158  BroadcastMessage(notice);
159  }
160 
161  void RemoteContextServer::SignalKernelEnd(const Variant &msg) {
162  Key_t kernelkey = msg["key"].AsNumber<Key_t>();
163  Variant kernelinfo = datamap[kernelkey];
164  ASSERT(kernelinfo["type"].AsString() == "kernelinfo");
165  kernelinfo["live"] = false;
166  dbprintf(2, "Kernel %s stopped\n", kernelinfo["name"].AsString().c_str());
167  Variant notice = NewBroadcastMessage();
168  notice["kernelinfo"] = kernelinfo.Copy();
169  BroadcastMessage(notice);
170  }
171 
172  void RemoteContextServer::GetKernelInfo(const std::string &sender, const Variant &msg) {
173  Key_t kernelkey;
174  Variant reply(Variant::MapType);
175  reply["msgtype"] = "reply";
176  reply["msgid"] = msg["msgid"];
177  reply["success"] = false;
178  if (msg.Contains("name")) {
179  NameKeyMap::iterator entry = kernelmap.find(msg["name"].AsString());
180  if (entry == kernelmap.end()) {
181  SendMessage(sender, reply);
182  return;
183  }
184  kernelkey = entry->second;
185  } else {
186  kernelkey = msg["key"].AsNumber<Key_t>();
187  }
188  DataMap::iterator entry = datamap.find(kernelkey);
189  if (entry != datamap.end()) {
190  ASSERT(entry->second["type"].AsString() == "kernelinfo");
191  reply["success"] = true;
192  reply["kernelinfo"] = entry->second.Copy();
193  }
194  SendMessage(sender, reply);
195  }
196 
197  void RemoteContextServer::CreateNodeKey(const std::string &sender, const Variant &msg) {
198  std::string nodename = msg["name"].AsString();
199  Variant reply(Variant::MapType);
200  reply["msgid"] = msg["msgid"];
201  reply["msgtype"] = "reply";
202  if (nodemap.find(nodename) != nodemap.end()) {
203  reply["success"] = false;
204  } else {
205  Key_t nodekey = NewKey();
206  Variant nodeinfo(Variant::MapType);
207  nodeinfo["name"] = nodename;
208  nodeinfo["key"] = nodekey;
209  nodeinfo["kernelkey"] = msg["kernelkey"];
210  nodeinfo["started"] = false;
211  nodeinfo["dead"] = false;
212  nodeinfo["type"] = "nodeinfo";
213  nodeinfo["endpoints"] = Variant::MapType;
214  dbprintf(2, "Node %s created\n", nodename.c_str());
215  nodemap[nodename] = nodekey;
216  datamap[nodekey] = nodeinfo;
217  reply["success"] = true;
218  reply["nodeinfo"] = nodeinfo.Copy();
219  }
220  SendMessage(sender, reply);
221  }
222 
223  void RemoteContextServer::SignalNodeStart(const Variant &msg) {
224  ++numlivenodes;
225  Key_t nodekey = msg["key"].AsNumber<Key_t>();
226  Variant nodeinfo = datamap[nodekey];
227  ASSERT(nodeinfo["type"].AsString() == "nodeinfo");
228  nodeinfo["started"] = true;
229  dbprintf(2, "Node %s started\n", nodeinfo["name"].AsString().c_str());
230  Variant notice = NewBroadcastMessage();
231  notice["nodeinfo"] = nodeinfo.Copy();
232  BroadcastMessage(notice);
233  }
234 
235  void RemoteContextServer::SignalNodeEnd(const Variant &msg) {
236  --numlivenodes;
237  Key_t nodekey = msg["key"].AsNumber<Key_t>();
238  Variant nodeinfo = datamap[nodekey];
239  ASSERT(nodeinfo["type"].AsString() == "nodeinfo");
240  nodeinfo["dead"] = true;
241  dbprintf(2, "Node %s stopped\n", nodeinfo["name"].AsString().c_str());
242  Variant notice = NewBroadcastMessage();
243  notice["nodeinfo"] = nodeinfo.Copy();
244  BroadcastMessage(notice);
245  }
246 
247  void RemoteContextServer::GetNodeInfo(const std::string &sender, const Variant &msg) {
248  Key_t nodekey;
249  Variant reply(Variant::MapType);
250  reply["msgtype"] = "reply";
251  reply["msgid"] = msg["msgid"];
252  reply["success"] = false;
253  if (msg.Contains("name")) {
254  NameKeyMap::iterator entry = nodemap.find(msg["name"].AsString());
255  if (entry == nodemap.end()) {
256  SendMessage(sender, reply);
257  return;
258  }
259  nodekey = entry->second;
260  } else {
261  nodekey = msg["key"].AsNumber<Key_t>();
262  }
263  DataMap::iterator entry = datamap.find(nodekey);
264  if (entry != datamap.end()) {
265  reply["nodeinfo"] = entry->second.Copy();
266  ASSERT(reply["nodeinfo"]["type"].AsString() == "nodeinfo");
267  reply["success"] = true;
268  }
269  SendMessage(sender, reply);
270  }
271 
272  void RemoteContextServer::GetNumNodeLive(const std::string &sender, const Variant &msg) {
273  Variant reply(Variant::MapType);
274  reply["msgid"] = msg["msgid"];
275  reply["msgtype"] = "reply";
276  reply["numlivenodes"] = numlivenodes;
277  reply["success"] = true;
278  SendMessage(sender, reply);
279  }
280 
281  void RemoteContextServer::GetCreateEndpointKey(const std::string &sender, const Variant &msg) {
282  std::string name = msg["name"].AsString();
283  Variant reply;
284  reply["msgid"] = msg["msgid"];
285  reply["msgtype"] = "reply";
286  reply["success"] = false;
287  Key_t nodekey = msg["nodekey"].AsNumber<Key_t>();
288  DataMap::iterator nodeentry = datamap.find(nodekey);
289  if (nodeentry != datamap.end()) {
290  Variant nodeinfo = nodeentry->second;
291  Variant epinfo;
292  Key_t epkey;
293  if (nodeinfo["endpoints"].Contains(name)) {
294  epkey = nodeinfo["endpoints"][name].AsNumber<Key_t>();
295  epinfo = datamap[epkey];
296  ASSERT(epinfo["type"].AsString() == "endpointinfo");
297  } else {
298  epkey = NewKey();
299  nodeinfo["endpoints"][name] = epkey;
300  epinfo = Variant(Variant::MapType);
301  epinfo["key"] = epkey;
302  epinfo["name"] = name;
303  epinfo["nodekey"] = nodekey;
304  epinfo["kernelkey"] = nodeinfo["kernelkey"];
305  epinfo["live"] = true;
306  epinfo["type"] = "endpointinfo";
307  datamap[epkey] = epinfo;
308  }
309  reply["endpointinfo"] = epinfo.Copy();
310  reply["success"] = true;
311  }
312  SendMessage(sender, reply);
313  }
314 
315  void RemoteContextServer::GetEndpointInfo(const std::string &sender, const Variant &msg) {
316  Key_t epkey = msg["key"].AsNumber<Key_t>();
317  Variant reply;
318  reply["msgid"] = msg["msgid"];
319  reply["msgtype"] = "reply";
320  reply["success"] = false;
321  DataMap::iterator entry = datamap.find(epkey);
322  if (entry != datamap.end()) {
323  reply["success"] = true;
324  reply["endpointinfo"] = entry->second.Copy();
325  ASSERT(reply["endpointinfo"]["type"].AsString() == "endpointinfo");
326  }
327  SendMessage(sender, reply);
328 
329  }
330 
331  void RemoteContextServer::ConnectEndpoints(const Variant &msg) {
332  Key_t wkey = msg["writerkey"].AsNumber<Key_t>();
333  Key_t rkey = msg["readerkey"].AsNumber<Key_t>();
334  std::string qname = msg["qname"].AsString();
335  datamap[wkey]["readerkey"] = rkey;
336  datamap[rkey]["writerkey"] = wkey;
337  datamap[rkey]["qname"] = qname;
338  datamap[wkey]["qname"] = qname;
339  }
340 
342  // 0 is reserved
343  return ++keycount;
344  }
345 
346  void RemoteContextServer::RouteKernelMessage(const Variant &msg) {
347  Key_t kernelkey = msg["kernelkey"].AsNumber<Key_t>();
348  Variant kernelinfo = datamap[kernelkey];
349  SendMessage(kernelinfo["client"].AsString(), msg);
350  }
351 
353  Variant msg;
354  msg["msgtype"] = "broadcast";
355  msg["numlivenodes"] = numlivenodes;
356  return msg;
357  }
358 }
RCTXMT_t
RCTXMT Remote Context Message Type These are the message types that the remote context uses to send i...
Definition: RCTXMT.h:33
void GetNodeInfo(const std::string &sender, const Variant &msg)
virtual void SendMessage(const std::string &recipient, const Variant &msg)=0
Send the given message to the given client.
void ConnectEndpoints(const Variant &msg)
void GetKernelInfo(const std::string &sender, const Variant &msg)
virtual void LogMessage(const std::string &msg)=0
Log a message.
uint64_t Key_t
Definition: common.h:79
void GetNumNodeLive(const std::string &sender, const Variant &msg)
void dbprintf(int level, const char *fmt,...)
void SignalKernelStart(const Variant &msg)
virtual void BroadcastMessage(const Variant &msg)=0
Broadcast a message to all clients.
void GetEndpointInfo(const std::string &sender, const Variant &msg)
void SignalNodeEnd(const Variant &msg)
void RouteKernelMessage(const Variant &msg)
void DispatchMessage(const std::string &sender, const Variant &msg)
Process the given message.
void SignalNodeStart(const Variant &msg)
void CreateNodeKey(const std::string &sender, const Variant &msg)
void SetupKernel(const std::string &sender, const Variant &msg)
void SignalKernelEnd(const Variant &msg)
void GetCreateEndpointKey(const std::string &sender, const Variant &msg)
#define ASSERT(exp,...)