CPN
Computational Process Networks
RemoteContextDaemon.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
27 #include <stdio.h>
28 
29 namespace CPN {
30 
32  {
33  SetReuseAddr();
34  Listen(addr);
35  }
36 
38  {
39  SetReuseAddr();
40  Listen(addrs);
41  }
43  }
44 
46  Readable(false);
47  SocketAddress addr;
48  addr.SetFromSockName(FD());
49  dbprintf(1, "Listening on %s:%s\n", addr.GetHostName().c_str(), addr.GetServName().c_str());
50  while (true) {
51  std::vector<FileHandle*> fds;
52  if (!Closed()) {
53  if (Readable()) {
54  Read();
55  }
56  fds.push_back(this);
57  }
58  ClientMap::iterator itr = clients.begin();
59  while (itr != clients.end()) {
60  ClientPtr client = itr->second;
61  if (client->Closed()) {
62  ClientMap::iterator todelete = itr;
63  ++itr;
64  clients.erase(todelete);
65  Terminate();
66  } else {
67  if (client->Readable()) {
68  client->Read();
69  }
70  fds.push_back(client.get());
71  ++itr;
72  }
73  }
74  if ((clients.empty() && IsTerminated()) || fds.empty()) {
75  break;
76  }
77  Poll(fds.begin(), fds.end(), -1);
78  }
79  }
80 
82  if (IsTerminated()) return;
84  Close();
85  ClientMap::iterator itr = clients.begin();
86  while (itr != clients.end()) {
87  ClientPtr client = itr->second;
88  if (!client->Closed()) {
89  try {
90  client->ShutdownWrite();
91  } catch (const ErrnoException &e) {
92  //ignore
93  }
94  }
95  ++itr;
96  }
97  }
98 
99  void RemoteContextDaemon::Terminate(const std::string &name) {
100  dbprintf(1, "Terminating %s\n", name.c_str());
101  clients[name]->Close();
102  }
103 
105  int nfd = Accept();
106  if (nfd >= 0) {
107  ClientPtr conn = ClientPtr(new Client(this, nfd));
108  clients.insert(std::make_pair(conn->GetName(), conn));
109  Readable(false);
110  }
111  }
112 
113  void RemoteContextDaemon::SendMessage(const std::string &recipient, const Variant &msg) {
114  if (IsTerminated()) {
115  dbprintf(1, "Trying to send a reply after shutdown\n%s:%s",
116  recipient.c_str(), libvariant::SerializeJSON(msg).c_str());
117  return;
118  }
119  ClientMap::iterator entry = clients.find(recipient);
120  ASSERT(entry != clients.end());
121  dbprintf(4, "reply:%s:%s\n", recipient.c_str(), libvariant::SerializeJSON(msg).c_str());
122  entry->second->Send(msg);
123  }
124 
125  void RemoteContextDaemon::BroadcastMessage(const Variant &msg) {
126  if (IsTerminated()) {
127  dbprintf(1, "Trying to braodcast after shutdown\n%s", libvariant::SerializeJSON(msg).c_str());
128  return;
129  }
130  ClientMap::iterator entry = clients.begin();
131  dbprintf(4, "broadcast:%s\n", libvariant::SerializeJSON(msg).c_str());
132  while (entry != clients.end()) {
133  entry->second->Send(msg);
134  ++entry;
135  }
136  }
137 
138  void RemoteContextDaemon::LogMessage(const std::string &msg) {
139  dbprintf(1, "log:%s\n", msg.c_str());
140  }
141 
143  : SocketHandle(nfd), daemon(d)
144  {
145  Readable(false);
146  SetNoDelay(true);
147  SocketAddress addr;
148  addr.SetFromPeerName(FD());
149  name = addr.GetHostName() + ":" + addr.GetServName();
150  d->dbprintf(1, "New connection from %s\n", name.c_str());
151  }
152 
154  std::vector<char> buf(4*1024);
155  unsigned num = 0;
156  try {
157  while (Good()) {
158  if (buf.size() - num <= 0) {
159  buf.resize(buf.size()*2);
160  }
161  unsigned numread = Recv(&buf[num], buf.size() - num, false);
162  if (numread == 0) {
163  if (Eof()) {
164  daemon->Terminate(name);
165  }
166  break;
167  }
168  num += numread;
169  char *end = (char*)memchr(&buf[0], 0, num);
170  while (end != 0) {
171  Variant v = libvariant::DeserializeJSON(&buf[0]);
172  daemon->DispatchMessage(name, v);
173  end += 1;
174  num -= (end - &buf[0]);
175  memmove(&buf[0], end, num);
176  end = (char*)memchr(&buf[0], 0, num);
177  }
178  }
179  } catch (const ErrnoException &e) {
180  if (!daemon->IsTerminated()) {
181  daemon->dbprintf(1, "Unable to read from %s:%d: %s\n", name.c_str(), e.Error(), e.what());
182  }
183  }
184  }
185 
186  void RemoteContextDaemon::Client::Send(const Variant &msg) {
187  if (Closed()) {
188  return;
189  }
190  try {
191  std::string message = libvariant::SerializeJSON(msg);
192  unsigned numwritten = 0;
193  while (numwritten < message.size() + 1) {
194  numwritten += Write(message.c_str() + numwritten, (message.size() + 1) - numwritten);
195  }
196  } catch (const ErrnoException &e) {
197  daemon->dbprintf(1, "Unable to write to %s:%d: %s\n", name.c_str(), e.Error(), e.what());
198  daemon->Terminate(name);
199  }
200  }
201 
202 }
void LogMessage(const std::string &msg)
Log a message.
Client(RemoteContextDaemon *d, int nfd)
void Listen(const SocketAddress &addr, int queuelength=256)
void Run()
Run the actual Context returns when the context is terminated and all clients have disconnected...
An abstraction of a socket address with convenience methods.
Definition: SocketAddress.h:42
A FileHandle customized with some socket specific functionality and functions.
Definition: SocketHandle.h:34
std::string GetHostName(bool numerichost=true) const
void SetFromPeerName(int fd)
Fill this SocketAddress with data from the other side of the connection represented by fd...
void dbprintf(int level, const char *fmt,...)
static int Poll(IteratorRef< FileHandle * > begin, IteratorRef< FileHandle * > end, double timeout)
poll a list of FileHandles for any activity and call the appropriate On method.
Definition: FileHandle.cc:34
bool Closed() const
Definition: FileHandle.h:128
void SetFromSockName(int fd)
Fill this SocketAddress with data from this side of the connection represented by fd...
void Close()
Close the file and reset the internal state.
Definition: FileHandle.cc:146
std::tr1::shared_ptr< Client > ClientPtr
void SetNoDelay(bool nodelay)
std::string GetServName() const
void SetReuseAddr(bool reuse=true)
Turn on reuse of the address. Only takes effect if set before Listen.
void BroadcastMessage(const Variant &msg)
Broadcast a message to all clients.
RemoteContextDaemon(const SocketAddress &addr)
int FD() const
Definition: FileHandle.h:106
std::vector< SocketAddress > SockAddrList
Definition: SocketAddress.h:35
virtual const char * what() const
bool Readable() const
Gives the current readability status of the file.
Definition: FileHandle.h:91
void SendMessage(const std::string &recipient, const Variant &msg)
Send the given message to the given client.
bool Good() const
Convenience method for testing if the file this FileHandle has is open and not at end of file...
Definition: FileHandle.h:125
bool Eof() const
Definition: FileHandle.h:115
#define ASSERT(exp,...)
int Error() const
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.
Definition: FileHandle.cc:225