23 #include "common_priv.h"
46 config[
"grow-queue-max-threshold"] = value;
50 config[
"swallow-broken-queue-exceptions"] = value;
54 config[
"libs"].Append(filename);
58 config[
"liblist"].Append(filename);
62 config[
"libpaths"].Append(path);
66 config[
"context"][
"host"] = host;
70 config[
"context"][
"port"] = port;
74 config[
"context"][
"loglevel"] = i;
78 Variant::MapIterator topitr = v.MapBegin(), topend = v.MapEnd();
79 for (;topitr != topend; ++topitr) {
80 if (topitr->first ==
"context") {
81 Variant::MapIterator mitr = v[
"context"].MapBegin(), mend = v[
"context"].MapEnd();
82 for (;mitr != mend; ++mitr) {
83 config[
"context"][mitr->first] = mitr->second;
85 }
else if (topitr->first ==
"nodes") {
86 Variant::ListIterator litr = v[
"nodes"].ListBegin(), lend = v[
"nodes"].ListEnd();
87 for (;litr != lend; ++litr) {
90 }
else if (topitr->first ==
"queues") {
91 Variant::ListIterator litr = v[
"queues"].ListBegin(), lend = v[
"queues"].ListEnd();
92 for (;litr != lend; ++litr) {
96 config[topitr->first] = topitr->second;
103 if (!
config.Contains(
"nodes")) {
104 config[
"nodes"] = Variant::ListType;
106 Variant nodes =
config[
"nodes"];
107 Variant::ListIterator itr = nodes.ListBegin(), end = nodes.ListEnd();
108 for (; itr != end; ++itr) {
109 if (v[
"name"] == itr->At(
"name")) {
110 for (Variant::MapIterator mitr = v.MapBegin(),
111 mend = v.MapEnd(); mitr != mend; ++mitr) {
112 itr->At(mitr->first) = mitr->second.Copy();
117 nodes.Append(v.Copy());
121 if (!
config.Contains(
"queues")) {
122 config[
"queues"] = Variant::ListType;
124 Variant queues =
config[
"queues"];
125 Variant::ListIterator qitr = queues.ListBegin(), qend = queues.ListEnd();
126 for (;qitr != qend; ++qitr) {
129 v[
"readernode"] == qitr->At(
"readernode") &&
130 v[
"readerport"] == qitr->At(
"readerport")
133 v[
"writernode"] == qitr->At(
"writernode") &&
134 v[
"writerport"] == qitr->At(
"writerport")
137 for (Variant::MapIterator mitr = v.MapBegin(),
138 mend = v.MapEnd(); mitr != mend; ++mitr) {
139 qitr->At(mitr->first) = mitr->second.Copy();
144 queues.Append(v.Copy());
148 config.At(
"nodemap", Variant::MapType).Set(noden, kernn);
152 shared_ptr<Context> context;
156 if (!v.Contains(
"host") && !v.Contains(
"port")) {
160 v[
"host"].AsString(),
163 shared_ptr<RemoteContext> ctx = shared_ptr<RemoteContext>(
new RemoteContext(addrs));
167 if (v.Contains(
"loglevel")) {
168 context->LogLevel(v[
"loglevel"].AsInt());
188 if (args.Contains(
"context")) {
191 if (args.Contains(
"host")) {
194 if (args.Contains(
"port")) {
198 if (args.Contains(
"d4r")) {
199 attr.
UseD4R(args[
"d4r"].AsBool());
201 if (args.Contains(
"swallow-broken-queue-exceptions")) {
204 if (args.Contains(
"grow-queue-max-threshold")) {
207 if (args.Contains(
"libs") && args[
"libs"].IsList()) {
208 for (Variant::ListIterator itr = args[
"libs"].ListBegin(); itr != args[
"libs"].ListEnd(); ++itr) {
212 if (args.Contains(
"liblist") && args[
"liblist"].IsList()) {
213 for (Variant::ListIterator itr = args[
"liblist"].ListBegin(); itr != args[
"liblist"].ListEnd(); ++itr) {
217 if (args.Contains(
"libpaths") && args[
"libpaths"].IsList()) {
218 for (Variant::ListIterator itr = args[
"libpaths"].ListBegin(); itr != args[
"libpaths"].ListEnd(); ++itr) {
226 LoadNodes(kernel, args.Get(
"nodes", Variant::ListType), args.Get(
"nodemap", Variant::MapType));
227 LoadQueues(kernel, args.Get(
"queues", Variant::ListType));
231 if (!nodelist.IsList()) {
234 Variant::ConstListIterator itr, end;
235 itr = nodelist.ListBegin();
236 end = nodelist.ListEnd();
246 CPN::NodeAttr nattr(attr[
"name"].AsString(), attr[
"type"].AsString());
247 if (nodemap.IsMap() && nodemap.Contains(nattr.
GetName())) {
249 }
else if (attr.Contains(
"kernel")) {
250 nattr.
SetKernel(attr[
"kernel"].AsString());
252 Variant param = attr[
"param"];
254 for (Variant::MapIterator i = param.MapBegin(), e = param.MapEnd();
257 nattr.
SetParam(i->first, i->second.AsString());
264 if (!queuelist.IsList()) {
267 Variant::ConstListIterator itr, end;
268 itr = queuelist.ListBegin();
269 end = queuelist.ListEnd();
279 CPN::QueueAttr qattr(attr[
"size"].AsUnsigned(), attr[
"threshold"].AsUnsigned());
280 qattr.
SetReader(attr[
"readernode"].AsString(), attr[
"readerport"].AsString());
281 qattr.
SetWriter(attr[
"writernode"].AsString(), attr[
"writerport"].AsString());
282 if (attr.Contains(
"type")) {
283 qattr.
SetHint(attr[
"type"].AsString());
285 if (attr.Contains(
"datatype")) {
288 if (attr.Contains(
"numchannels")) {
291 if (attr.Contains(
"alpha")) {
292 qattr.
SetAlpha(attr[
"alpha"].AsDouble());
294 if (attr.Contains(
"name")) {
295 qattr.
SetName(attr[
"name"].AsString());
297 if (attr.Contains(
"maxwritethreshold")) {
305 if (!conf.Contains(
"name")) {
306 return make_pair(
false,
"Kernels must have a name.");
309 if (conf.Contains(
"nodes")) {
310 Variant nodes = conf[
"nodes"];
311 Variant::ListIterator nitr = nodes.ListBegin(), nend = nodes.ListEnd();
312 for (; nitr != nend; ++nitr) {
313 if (!nitr->Contains(
"name")) {
314 return make_pair(
false,
"Nodes must have a name");
316 if (!nitr->Contains(
"type")) {
317 return make_pair(
false,
"Nodes must have a type");
319 if (!nodeset.insert(nitr->At(
"name").AsString()).second) {
320 return make_pair(
false,
"Duplicate node");
324 if (conf.Contains(
"queues")) {
325 map<string, set<string> > readports, writeports;
326 Variant queues = conf[
"queues"];
327 Variant::ListIterator qitr = queues.ListBegin(), qend = queues.ListEnd();
328 for (;qitr != qend; ++qitr) {
329 if (!qitr->Contains(
"size")) {
330 return make_pair(
false,
"Queues must have a size");
332 if (!qitr->Contains(
"threshold")) {
333 return make_pair(
false,
"Queues must have a threshold");
335 if (!qitr->Contains(
"readernode")) {
336 return make_pair(
false,
"Queues must have a readernode");
338 if (!qitr->Contains(
"readerport")) {
339 return make_pair(
false,
"Queues must have a readerport");
341 if (!readports[qitr->At(
"readernode").AsString()].insert(qitr->At(
"readerport").AsString()).second) {
342 return make_pair(
false,
"Two queues connected to the same reader port.");
344 if (!qitr->Contains(
"writernode")) {
345 return make_pair(
false,
"Queues must have a writernode");
347 if (!qitr->Contains(
"writerport")) {
348 return make_pair(
false,
"Queues must have a writerport");
350 if (!writeports[qitr->At(
"writernode").AsString()].insert(qitr->At(
"writerport").AsString()).second) {
351 return make_pair(
false,
"Two queues connected to the same writer port.");
355 return make_pair(
true,
"");
void KernelPort(const std::string &port)
CPN::KernelAttr GetKernelAttr()
QueueAttr & SetNumChannels(unsigned channels)
void AddLibList(const std::string &filename)
static shared_ptr< Context > Local()
Create a local context.
KernelAttr & SwallowBrokenQueueExceptions(bool enable)
static void LoadNodes(CPN::Kernel *kernel, Variant nodelist, Variant nodemap)
static void LoadNode(CPN::Kernel *kernel, Variant attr, Variant nodemap)
NodeAttr & SetKernel(const std::string &kname)
void ContextPort(const std::string &port)
The attribute for the Kernel.
static void LoadQueue(CPN::Kernel *kernel, Variant attr)
QueueAttr & SetMaxWriteThreshold(unsigned mwt)
void GrowQueueMaxThreshold(bool value)
QueueAttr & SetAlpha(double a)
alpha is used by the remote queue to decide how much of the queue should go on the read side and how ...
KernelAttr & GrowQueueMaxThreshold(bool enable)
void CreateQueue(const QueueAttr &attr)
Create a new queue.
static CPN::shared_ptr< CPN::Context > LoadContext(Variant v)
void AddNodeMapping(const std::string &noden, const std::string &kernn)
QueueAttr & SetDatatype(const std::string &type)
KernelAttr & AddSharedLib(const std::string &lib)
static void LoadQueues(CPN::Kernel *kernel, Variant queuelist)
KernelAttr & UseD4R(bool enable)
void ContextHost(const std::string &host)
KernelAttr & AddNodeList(const std::string &list)
NodeAttr & SetParam(const std::string &key, const std::string value)
void KernelName(const std::string &name)
void AddLibPath(const std::string &path)
void KernelHost(const std::string &host)
QueueAttr & SetReader(const std::string &nodename, const std::string &portname)
void SwallowBrokenQueueExceptions(bool value)
KernelAttr & SetHostName(const std::string &hn)
QueueAttr & SetWriter(const std::string &nodename, const std::string &portname)
const std::string & GetName() const
static SockAddrList CreateIP(unsigned serv)
Return a list of valid socket address for the given service number or port number.
KernelAttr & AddNodeSearchPath(const std::string &p)
void Setup(CPN::Kernel *kernel)
KernelAttr & SetServName(const std::string &sn)
std::vector< SocketAddress > SockAddrList
QueueAttr & SetHint(std::string hint)
The attributes for a queue.
void AddLib(const std::string &filename)
void MergeConfig(Variant v)
QueueAttr & SetName(const std::string &qname)
std::pair< bool, std::string > Validate()
Key_t CreateNode(const NodeAttr &attr)
KernelAttr & SetContext(shared_ptr< Context > ctx)