CPN
Computational Process Networks
VariantCPNLoader.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
24 #include <cpn/VariantCPNLoader.h>
26 #include <set>
27 #include <map>
28 
29 using std::string;
30 using std::set;
31 using std::pair;
32 using std::make_pair;
33 using std::map;
34 
35 namespace CPN {
36 
38  VariantCPNLoader::VariantCPNLoader(Variant conf) : config(conf) { }
40 
41  void VariantCPNLoader::UseD4R(bool value) {
42  config["d4r"] = value;
43  }
44 
46  config["grow-queue-max-threshold"] = value;
47  }
48 
50  config["swallow-broken-queue-exceptions"] = value;
51  }
52 
53  void VariantCPNLoader::AddLib(const std::string &filename) {
54  config["libs"].Append(filename);
55  }
56 
57  void VariantCPNLoader::AddLibList(const std::string &filename) {
58  config["liblist"].Append(filename);
59  }
60 
61  void VariantCPNLoader::AddLibPath(const std::string &path) {
62  config["libpaths"].Append(path);
63  }
64 
65  void VariantCPNLoader::ContextHost(const std::string &host) {
66  config["context"]["host"] = host;
67  }
68 
69  void VariantCPNLoader::ContextPort(const std::string &port) {
70  config["context"]["port"] = port;
71  }
72 
74  config["context"]["loglevel"] = i;
75  }
76 
78  Variant::MapIterator topitr = v.MapBegin(), topend = v.MapEnd();
79  for (;topitr != topend; ++topitr) {
80  if (topitr->first == "context") {
81  Variant::MapIterator mitr = v["context"].MapBegin(), mend = v["context"].MapEnd();
82  for (;mitr != mend; ++mitr) {
83  config["context"][mitr->first] = mitr->second;
84  }
85  } else if (topitr->first == "nodes") {
86  Variant::ListIterator litr = v["nodes"].ListBegin(), lend = v["nodes"].ListEnd();
87  for (;litr != lend; ++litr) {
88  AddNode(*litr);
89  }
90  } else if (topitr->first == "queues") {
91  Variant::ListIterator litr = v["queues"].ListBegin(), lend = v["queues"].ListEnd();
92  for (;litr != lend; ++litr) {
93  AddQueue(*litr);
94  }
95  } else {
96  config[topitr->first] = topitr->second;
97  }
98  }
99  }
100 
101 
102  void VariantCPNLoader::AddNode(Variant v) {
103  if (!config.Contains("nodes")) {
104  config["nodes"] = Variant::ListType;
105  }
106  Variant nodes = config["nodes"];
107  Variant::ListIterator itr = nodes.ListBegin(), end = nodes.ListEnd();
108  for (; itr != end; ++itr) {
109  if (v["name"] == itr->At("name")) {
110  for (Variant::MapIterator mitr = v.MapBegin(),
111  mend = v.MapEnd(); mitr != mend; ++mitr) {
112  itr->At(mitr->first) = mitr->second.Copy();
113  }
114  return;
115  }
116  }
117  nodes.Append(v.Copy());
118  }
119 
120  void VariantCPNLoader::AddQueue(Variant v) {
121  if (!config.Contains("queues")) {
122  config["queues"] = Variant::ListType;
123  }
124  Variant queues = config["queues"];
125  Variant::ListIterator qitr = queues.ListBegin(), qend = queues.ListEnd();
126  for (;qitr != qend; ++qitr) {
127  if (
128  (
129  v["readernode"] == qitr->At("readernode") &&
130  v["readerport"] == qitr->At("readerport")
131  ) ||
132  (
133  v["writernode"] == qitr->At("writernode") &&
134  v["writerport"] == qitr->At("writerport")
135  )
136  ) {
137  for (Variant::MapIterator mitr = v.MapBegin(),
138  mend = v.MapEnd(); mitr != mend; ++mitr) {
139  qitr->At(mitr->first) = mitr->second.Copy();
140  }
141  return;
142  }
143  }
144  queues.Append(v.Copy());
145  }
146 
147  void VariantCPNLoader::AddNodeMapping(const std::string &noden, const std::string &kernn) {
148  config.At("nodemap", Variant::MapType).Set(noden, kernn);
149  }
150 
151  shared_ptr<Context> VariantCPNLoader::LoadContext(Variant v) {
152  shared_ptr<Context> context;
153  if (v.IsNull()) {
154  context = Context::Local();
155  } else {
156  if (!v.Contains("host") && !v.Contains("port")) {
157  context = Context::Local();
158  } else {
160  v["host"].AsString(),
161  v["port"].AsString()
162  );
163  shared_ptr<RemoteContext> ctx = shared_ptr<RemoteContext>(new RemoteContext(addrs));
164  context = ctx;
165  }
166 
167  if (v.Contains("loglevel")) {
168  context->LogLevel(v["loglevel"].AsInt());
169  }
170  }
171  return context;
172  }
173 
174  void VariantCPNLoader::KernelName(const std::string &name) {
175  config["name"] = name;
176  }
177 
178  void VariantCPNLoader::KernelHost(const std::string &host) {
179  config["host"] = host;
180  }
181 
182  void VariantCPNLoader::KernelPort(const std::string &port) {
183  config["port"] = port;
184  }
185 
187  CPN::KernelAttr attr(args["name"].AsString());
188  if (args.Contains("context")) {
189  attr.SetContext(LoadContext(args["context"]));
190  }
191  if (args.Contains("host")) {
192  attr.SetHostName(args["host"].AsString());
193  }
194  if (args.Contains("port")) {
195  attr.SetServName(args["port"].AsString());
196  }
197 
198  if (args.Contains("d4r")) {
199  attr.UseD4R(args["d4r"].AsBool());
200  }
201  if (args.Contains("swallow-broken-queue-exceptions")) {
202  attr.SwallowBrokenQueueExceptions(args["swallow-broken-queue-exceptions"].AsBool());
203  }
204  if (args.Contains("grow-queue-max-threshold")) {
205  attr.GrowQueueMaxThreshold(args["grow-queue-max-threshold"].AsBool());
206  }
207  if (args.Contains("libs") && args["libs"].IsList()) {
208  for (Variant::ListIterator itr = args["libs"].ListBegin(); itr != args["libs"].ListEnd(); ++itr) {
209  attr.AddSharedLib(itr->AsString());
210  }
211  }
212  if (args.Contains("liblist") && args["liblist"].IsList()) {
213  for (Variant::ListIterator itr = args["liblist"].ListBegin(); itr != args["liblist"].ListEnd(); ++itr) {
214  attr.AddNodeList(itr->AsString());
215  }
216  }
217  if (args.Contains("libpaths") && args["libpaths"].IsList()) {
218  for (Variant::ListIterator itr = args["libpaths"].ListBegin(); itr != args["libpaths"].ListEnd(); ++itr) {
219  attr.AddNodeSearchPath(itr->AsString());
220  }
221  }
222  return attr;
223  }
224 
225  void VariantCPNLoader::Setup(CPN::Kernel *kernel, Variant args) {
226  LoadNodes(kernel, args.Get("nodes", Variant::ListType), args.Get("nodemap", Variant::MapType));
227  LoadQueues(kernel, args.Get("queues", Variant::ListType));
228  }
229 
230  void VariantCPNLoader::LoadNodes(CPN::Kernel *kernel, Variant nodelist, Variant nodemap) {
231  if (!nodelist.IsList()) {
232  return;
233  }
234  Variant::ConstListIterator itr, end;
235  itr = nodelist.ListBegin();
236  end = nodelist.ListEnd();
237  while (itr != end) {
238  if (itr->IsMap()) {
239  LoadNode(kernel, *itr, nodemap);
240  }
241  ++itr;
242  }
243  }
244 
245  void VariantCPNLoader::LoadNode(CPN::Kernel *kernel, Variant attr, Variant nodemap) {
246  CPN::NodeAttr nattr(attr["name"].AsString(), attr["type"].AsString());
247  if (nodemap.IsMap() && nodemap.Contains(nattr.GetName())) {
248  nattr.SetKernel(nodemap.At(nattr.GetName()).AsString());
249  } else if (attr.Contains("kernel")) {
250  nattr.SetKernel(attr["kernel"].AsString());
251  }
252  Variant param = attr["param"];
253  if (param.IsMap()) {
254  for (Variant::MapIterator i = param.MapBegin(), e = param.MapEnd();
255  i != e; ++i)
256  {
257  nattr.SetParam(i->first, i->second.AsString());
258  }
259  }
260  kernel->CreateNode(nattr);
261  }
262 
263  void VariantCPNLoader::LoadQueues(CPN::Kernel *kernel, Variant queuelist) {
264  if (!queuelist.IsList()) {
265  return;
266  }
267  Variant::ConstListIterator itr, end;
268  itr = queuelist.ListBegin();
269  end = queuelist.ListEnd();
270  while (itr != end) {
271  if (itr->IsMap()) {
272  LoadQueue(kernel, *itr);
273  }
274  ++itr;
275  }
276  }
277 
278  void VariantCPNLoader::LoadQueue(CPN::Kernel *kernel, Variant attr) {
279  CPN::QueueAttr qattr(attr["size"].AsUnsigned(), attr["threshold"].AsUnsigned());
280  qattr.SetReader(attr["readernode"].AsString(), attr["readerport"].AsString());
281  qattr.SetWriter(attr["writernode"].AsString(), attr["writerport"].AsString());
282  if (attr.Contains("type")) {
283  qattr.SetHint(attr["type"].AsString());
284  }
285  if (attr.Contains("datatype")) {
286  qattr.SetDatatype(attr["datatype"].AsString());
287  }
288  if (attr.Contains("numchannels")) {
289  qattr.SetNumChannels(attr["numchannels"].AsUnsigned());
290  }
291  if (attr.Contains("alpha")) {
292  qattr.SetAlpha(attr["alpha"].AsDouble());
293  }
294  if (attr.Contains("name")) {
295  qattr.SetName(attr["name"].AsString());
296  }
297  if (attr.Contains("maxwritethreshold")) {
298  qattr.SetMaxWriteThreshold(attr["maxwritethreshold"].AsUnsigned());
299  }
300  kernel->CreateQueue(qattr);
301  }
302 
303  std::pair<bool, std::string> VariantCPNLoader::Validate(Variant conf) {
304  set<string> nodeset;
305  if (!conf.Contains("name")) {
306  return make_pair(false, "Kernels must have a name.");
307  }
308 
309  if (conf.Contains("nodes")) {
310  Variant nodes = conf["nodes"];
311  Variant::ListIterator nitr = nodes.ListBegin(), nend = nodes.ListEnd();
312  for (; nitr != nend; ++nitr) {
313  if (!nitr->Contains("name")) {
314  return make_pair(false, "Nodes must have a name");
315  }
316  if (!nitr->Contains("type")) {
317  return make_pair(false, "Nodes must have a type");
318  }
319  if (!nodeset.insert(nitr->At("name").AsString()).second) {
320  return make_pair(false, "Duplicate node");
321  }
322  }
323  }
324  if (conf.Contains("queues")) {
325  map<string, set<string> > readports, writeports;
326  Variant queues = conf["queues"];
327  Variant::ListIterator qitr = queues.ListBegin(), qend = queues.ListEnd();
328  for (;qitr != qend; ++qitr) {
329  if (!qitr->Contains("size")) {
330  return make_pair(false, "Queues must have a size");
331  }
332  if (!qitr->Contains("threshold")) {
333  return make_pair(false, "Queues must have a threshold");
334  }
335  if (!qitr->Contains("readernode")) {
336  return make_pair(false, "Queues must have a readernode");
337  }
338  if (!qitr->Contains("readerport")) {
339  return make_pair(false, "Queues must have a readerport");
340  }
341  if (!readports[qitr->At("readernode").AsString()].insert(qitr->At("readerport").AsString()).second) {
342  return make_pair(false, "Two queues connected to the same reader port.");
343  }
344  if (!qitr->Contains("writernode")) {
345  return make_pair(false, "Queues must have a writernode");
346  }
347  if (!qitr->Contains("writerport")) {
348  return make_pair(false, "Queues must have a writerport");
349  }
350  if (!writeports[qitr->At("writernode").AsString()].insert(qitr->At("writerport").AsString()).second) {
351  return make_pair(false, "Two queues connected to the same writer port.");
352  }
353  }
354  }
355  return make_pair(true, "");
356  }
357 
358 }
void KernelPort(const std::string &port)
CPN::KernelAttr GetKernelAttr()
Attributes for a node.
Definition: NodeAttr.h:58
QueueAttr & SetNumChannels(unsigned channels)
Definition: QueueAttr.h:172
void AddLibList(const std::string &filename)
static shared_ptr< Context > Local()
Create a local context.
Definition: Context.cc:29
KernelAttr & SwallowBrokenQueueExceptions(bool enable)
Definition: KernelAttr.h:94
static void LoadNodes(CPN::Kernel *kernel, Variant nodelist, Variant nodemap)
static void LoadNode(CPN::Kernel *kernel, Variant attr, Variant nodemap)
NodeAttr & SetKernel(const std::string &kname)
Definition: NodeAttr.h:79
void ContextPort(const std::string &port)
The attribute for the Kernel.
Definition: KernelAttr.h:40
static void LoadQueue(CPN::Kernel *kernel, Variant attr)
QueueAttr & SetMaxWriteThreshold(unsigned mwt)
Definition: QueueAttr.h:87
void GrowQueueMaxThreshold(bool value)
QueueAttr & SetAlpha(double a)
alpha is used by the remote queue to decide how much of the queue should go on the read side and how ...
Definition: QueueAttr.h:80
KernelAttr & GrowQueueMaxThreshold(bool enable)
Definition: KernelAttr.h:99
void CreateQueue(const QueueAttr &attr)
Create a new queue.
Definition: Kernel.cc:251
static CPN::shared_ptr< CPN::Context > LoadContext(Variant v)
void AddNodeMapping(const std::string &noden, const std::string &kernn)
QueueAttr & SetDatatype(const std::string &type)
Definition: QueueAttr.h:151
KernelAttr & AddSharedLib(const std::string &lib)
Definition: KernelAttr.h:104
static void LoadQueues(CPN::Kernel *kernel, Variant queuelist)
KernelAttr & UseD4R(bool enable)
Definition: KernelAttr.h:89
void ContextHost(const std::string &host)
KernelAttr & AddNodeList(const std::string &list)
Definition: KernelAttr.h:109
NodeAttr & SetParam(const std::string &key, const std::string value)
Definition: NodeAttr.h:89
void KernelName(const std::string &name)
void AddLibPath(const std::string &path)
void KernelHost(const std::string &host)
QueueAttr & SetReader(const std::string &nodename, const std::string &portname)
Definition: QueueAttr.h:112
void SwallowBrokenQueueExceptions(bool value)
KernelAttr & SetHostName(const std::string &hn)
Definition: KernelAttr.h:69
QueueAttr & SetWriter(const std::string &nodename, const std::string &portname)
Definition: QueueAttr.h:126
const std::string & GetName() const
Definition: NodeAttr.h:110
void AddQueue(Variant v)
static SockAddrList CreateIP(unsigned serv)
Return a list of valid socket address for the given service number or port number.
KernelAttr & AddNodeSearchPath(const std::string &p)
Definition: KernelAttr.h:114
void Setup(CPN::Kernel *kernel)
KernelAttr & SetServName(const std::string &sn)
Definition: KernelAttr.h:74
std::vector< SocketAddress > SockAddrList
Definition: SocketAddress.h:35
QueueAttr & SetHint(std::string hint)
Definition: QueueAttr.h:146
The attributes for a queue.
Definition: QueueAttr.h:55
void AddLib(const std::string &filename)
The Kernel declaration.
Definition: Kernel.h:55
void MergeConfig(Variant v)
QueueAttr & SetName(const std::string &qname)
Definition: QueueAttr.h:92
std::pair< bool, std::string > Validate()
void UseD4R(bool value)
Key_t CreateNode(const NodeAttr &attr)
Definition: Kernel.cc:139
KernelAttr & SetContext(shared_ptr< Context > ctx)
Definition: KernelAttr.h:79