CPN
Computational Process Networks
RemoteContext.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
27 #include <vector>
28 #include <stdio.h>
29 #include <Variant/Variant.h>
30 
31 namespace CPN {
32 
34  : endwrite(false)
35  {
37  sock.Connect(addr);
38  sock.SetNoDelay(true);
39  thread->Start();
40  }
41 
43  : endwrite(false)
44  {
46  sock.Connect(addrs);
47  sock.SetNoDelay(true);
48  thread->Start();
49  }
50 
52  : sock(fd)
53  {
55  sock.SetNoDelay(true);
56  thread->Start();
57  }
58 
60  EndWrite();
61  thread->Join();
62  }
63 
64  void RemoteContext::SendMessage(const Variant &msg) {
65  // We have the lock
66  if (!sock.Closed() && !endwrite) {
67  std::string message = libvariant::SerializeJSON(msg);
68  //printf("<<< %s\n", message.c_str());
69  unsigned numwritten = 0;
70  while (numwritten < message.size() + 1) {
71  numwritten += sock.Write(message.c_str() + numwritten, (message.size() + 1) - numwritten);
72  }
73  }
74  }
75 
77  std::vector<char> buf(4*1024);
78  unsigned num = 0;
79  try {
80  sock.Readable(false);
81  sock.Poll(-1);
82  while (sock.Good()) {
83  if (buf.size() - num <= 0) {
84  buf.resize(buf.size()*2);
85  }
86  unsigned numread = sock.Recv(&buf[num], buf.size() - num, false);
87  if (numread == 0) {
88  if (sock.Eof()) {
89  Terminate();
90  return 0;
91  }
92  sock.Poll(-1);
93  } else {
94  num += numread;
95  char *end = (char*)memchr(&buf[0], 0, num);
96  while (end != 0) {
97  Variant v = libvariant::DeserializeJSON(&buf[0]);
98  DispatchMessage(v);
99  end += 1;
100  num -= (end - &buf[0]);
101  memmove(&buf[0], end, num);
102  end = (char*)memchr(&buf[0], 0, num);
103  }
104  }
105  }
106  } catch (const ErrnoException &e) {
107  fprintf(stderr, "RemoteContext: Uncaught errno exception (%d): %s\n", e.Error(), e.what());
108  Terminate();
109  }
110  sock.Close();
111  return 0;
112  }
113 
116  endwrite = true;
118  }
119 
122  return endwrite;
123  }
124 
125 }
SocketHandle sock
Definition: RemoteContext.h:50
void ShutdownWrite()
Shutdown the write end of this socket. Any future attempt to write to this socket will fail...
An abstraction of a socket address with convenience methods.
Definition: SocketAddress.h:42
unsigned Recv(void *ptr, unsigned len, bool block)
void DispatchMessage(const Variant &msg)
virtual void Terminate()
Signal to the Context that the network is terminating. After this call most methods will throw a Shut...
void SendMessage(const Variant &msg)
static int Poll(IteratorRef< FileHandle * > begin, IteratorRef< FileHandle * > end, double timeout)
poll a list of FileHandles for any activity and call the appropriate On method.
Definition: FileHandle.cc:34
RemoteContext(const SocketAddress &addr)
bool Closed() const
Definition: FileHandle.h:128
std::auto_ptr< Pthread > thread
Definition: RemoteContext.h:49
void Close()
Close the file and reset the internal state.
Definition: FileHandle.cc:146
void SetNoDelay(bool nodelay)
PthreadFunctional * CreatePthreadFunctional(T *obj, void *(T::*meth)(void))
bool Readable(bool r)
Set that the file is currently readable or not.
Definition: FileHandle.h:86
std::vector< SocketAddress > SockAddrList
Definition: SocketAddress.h:35
virtual const char * what() const
bool Good() const
Convenience method for testing if the file this FileHandle has is open and not at end of file...
Definition: FileHandle.h:125
bool Eof() const
Definition: FileHandle.h:115
void Connect(const SocketAddress &addr)
Create a new socket and try to connect to the given address.
Definition: SocketHandle.cc:48
int Error() const
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.
Definition: FileHandle.cc:225