CPN
Computational Process Networks
ConnectionServer.cc
Go to the documentation of this file.
1 //=============================================================================
2 // Computational Process Networks class library
3 // Copyright (C) 1997-2006 Gregory E. Allen and The University of Texas
4 //
5 // This library is free software; you can redistribute it and/or modify it
6 // under the terms of the GNU Library General Public License as published
7 // by the Free Software Foundation; either version 2 of the License, or
8 // (at your option) any later version.
9 //
10 // This library is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 // Library General Public License for more details.
14 //
15 // The GNU Public License is available in the file LICENSE, or you
16 // can write to the Free Software Foundation, Inc., 59 Temple Place -
17 // Suite 330, Boston, MA 02111-1307, USA, or you can find it on the
18 // World Wide Web at http://www.fsf.org.
19 //=============================================================================
23 #include "common_priv.h"
24 #include "ConnectionServer.h"
25 #include "PacketHeader.h"
26 #include <cpn/Context.h>
27 #include <cpn/utils/AutoLock.h>
30 #include <deque>
31 #include <cassert>
32 
33 namespace CPN {
34 
36 
37  ConnectionServer::ConnectionServer(SockAddrList addrs, shared_ptr<Context> ctx)
38  : context(ctx), logger(ctx.get(), Logger::INFO), enabled(true)
39  {
40  server.Listen(addrs);
41  }
42 
44  std::deque<FileHandle*> files;
45  files.push_back(&server);
46  files.push_back(&wakeup);
47  FileHandle::Poll(files.begin(), files.end(), -1);
48  wakeup.Read();
49  if (server.Readable()) {
50  server.Readable(false);
51  try {
52  SocketHandle sock(server.Accept());
53  Packet packet;
54  unsigned num = sock.Read(&packet.header, sizeof(packet.header));
55  if (num != sizeof(packet.header)) {
56  return;
57  }
58  if (!packet.Valid()) {
59  return;
60  }
61  if ((packet.Type() == PACKET_ID_READER)
62  || (packet.Type() == PACKET_ID_WRITER)) {
63  AutoPLock al(lock);
64  Key_t key = packet.DestinationKey();
65  shared_ptr<PendingConnection> conn;
66  std::pair<PendingMap::iterator, PendingMap::iterator> range;
67  range = pendingconnections.equal_range(key);
68  PendingMap::iterator entry = range.first;
69  PendingMap::iterator end = range.second;
70  while (entry != end) {
71  if (!entry->second->Done()) {
72  conn = entry->second;
73  break;
74  }
75  ++entry;
76  }
77  if (!conn) {
78  conn = shared_ptr<PendingConnection>(new PendingConnection(key, this));
79  pendingconnections.insert(std::make_pair(key, conn));
80  }
81  conn->Set(sock.FD());
82  sock.Reset();
83  }
84  } catch (const ErrnoException &e) {
85  // Ignore, if we had an error we closed the socket when we
86  // left the scope, will try again later
87  logger.Error("Exception in ConnectionServer main loop (e: %d): %s", e.Error(), e.what());
88  }
89  }
90  }
91 
93  AutoPLock al(lock);
94  server.Close();
95  for (PendingMap::iterator itr = pendingconnections.begin();
96  itr != pendingconnections.end(); ++itr)
97  {
98  itr->second->Cancel();
99  }
100  }
101 
102  shared_ptr<Sync::Future<int> > ConnectionServer::ConnectWriter(Key_t writerkey) {
103  if (server.Closed()) {
104  return shared_ptr<Sync::Future<int> >();
105  }
106  AutoPLock al(lock);
107  shared_ptr<PendingConnection> conn;
108  if (enabled) {
109  al.Unlock();
110  conn = shared_ptr<PendingConnection>(new PendingConnection(writerkey, this));
111  Key_t readerkey = context->GetWritersReader(writerkey);
112  Key_t kernelkey = context->GetReaderKernel(readerkey);
113  std::string hostname;
114  std::string servname;
115  context->GetKernelConnectionInfo(kernelkey, hostname, servname);
116  SocketHandle sock;
117  sock.Connect(SocketAddress::CreateIP(hostname, servname));
118  Packet packet(PACKET_ID_WRITER);
119  packet.SourceKey(writerkey).DestinationKey(readerkey);
120  unsigned num = sock.Write(&packet.header, sizeof(packet.header));
121  if (num != sizeof(packet.header)) {
122  conn->Cancel();
123  } else {
124  conn->Set(sock.FD());
125  sock.Reset();
126  }
127  } else {
128  conn = shared_ptr<PendingConnection>(new PendingConnection(writerkey, this));
129  pendingconnections.insert(std::make_pair(writerkey, conn));
130  }
131  return conn;
132  }
133 
134  shared_ptr<Sync::Future<int> > ConnectionServer::ConnectReader(Key_t readerkey) {
135  if (server.Closed()) {
136  return shared_ptr<Sync::Future<int> >();
137  }
138  AutoPLock al(lock);
139  shared_ptr<PendingConnection> conn;
140  PendingMap::iterator entry = pendingconnections.find(readerkey);
141  if (entry == pendingconnections.end()) {
142  conn = shared_ptr<PendingConnection>(new PendingConnection(readerkey, this));
143  pendingconnections.insert(std::make_pair(readerkey, conn));
144  } else {
145  conn = entry->second;
146  }
147  return conn;
148  }
149 
151  SocketAddress addr;
152  addr.SetFromSockName(server.FD());
153  return addr;
154  }
155 
157  AutoPLock al(lock);
158  enabled = false;
159  }
160 
162  AutoPLock al(lock);
163  enabled = true;
164  PendingMap::iterator entry = pendingconnections.begin();
165  while (entry != pendingconnections.end()) {
166  entry->second->Cancel();
167  ++entry;
168  }
169  }
170 
172  if (server.Closed()) {
173  logger.Error("Server socket closed");
174  }
175  if (!enabled) {
176  logger.Error("Connection handler disabled??");
177  }
178  logger.Error("%u pending connections", pendingconnections.size());
179  }
180 
182  AutoPLock al(lock);
183  std::pair<PendingMap::iterator, PendingMap::iterator> range;
184  range = pendingconnections.equal_range(key);
185  PendingMap::iterator entry = range.first;
186  PendingMap::iterator end = range.second;
187  while (entry != end) {
188  if (entry->second.get() == conn) {
189  pendingconnections.erase(entry);
190  break;
191  }
192  ++entry;
193  }
194  }
195 
197  : key(k), server(serv)
198  {}
199 
201  }
202 
204  AutoPLock al(future_lock);
205  InternalWait();
206  ret = -1;
207  al.Unlock();
208  AutoPLock alf(file_lock);
209  int filed = fd;
210  fd = -1;
211  alf.Unlock();
212  server->PendingDone(key, this);
213  return filed;
214  }
215 
217  AutoPLock alf(file_lock);
218  fd = filed;
219  alf.Unlock();
220  AutoPLock al(future_lock);
221  ASSERT(!done);
222  InternalSet(filed);
223  }
224 
225 }
Logger object that is used for forwarding log messages.
Definition: Logger.h:57
void Listen(const SocketAddress &addr, int queuelength=256)
int Accept(SocketAddress &addr)
PacketHeader header
Definition: PacketHeader.h:148
An abstraction of a socket address with convenience methods.
Definition: SocketAddress.h:42
A FileHandle customized with some socket specific functionality and functions.
Definition: SocketHandle.h:34
ServerSocketHandle server
PendingConnection(Key_t k, ConnectionServer *serv)
The Context abstract data type.
void Error(const char *fmt,...)
Definition: Logger.cc:159
uint64_t SourceKey() const
Definition: PacketHeader.h:127
void Unlock()
Definition: AutoLock.h:61
shared_ptr< Context > context
uint64_t Key_t
Definition: common.h:79
void PendingDone(Key_t key, PendingConnection *conn)
void Read()
Read until not readable anymore.
Definition: WakeupHandle.cc:70
static int Poll(IteratorRef< FileHandle * > begin, IteratorRef< FileHandle * > end, double timeout)
poll a list of FileHandles for any activity and call the appropriate On method.
Definition: FileHandle.cc:34
AutoLock< PthreadMutex > AutoPLock
bool Closed() const
Definition: FileHandle.h:128
void SetFromSockName(int fd)
Fill this SocketAddress with data from this side of the connection represented by fd...
void Close()
Close the file and reset the internal state.
Definition: FileHandle.cc:146
shared_ptr< Sync::Future< int > > ConnectReader(Key_t readerkey)
ConnectionServer(SockAddrList addrs, shared_ptr< Context > ctx)
SocketAddress GetAddress()
void Reset()
Clear all internal state including the file descriptor! WARNING does not close the file! ...
Definition: FileHandle.cc:138
int FD() const
Definition: FileHandle.h:106
static SockAddrList CreateIP(unsigned serv)
Return a list of valid socket address for the given service number or port number.
Declarations of a generic binary packet format.
bool Readable(bool r)
Set that the file is currently readable or not.
Definition: FileHandle.h:86
std::vector< SocketAddress > SockAddrList
Definition: SocketAddress.h:35
virtual const char * what() const
void Connect(const SocketAddress &addr)
Create a new socket and try to connect to the given address.
Definition: SocketHandle.cc:48
#define ASSERT(exp,...)
shared_ptr< Sync::Future< int > > ConnectWriter(Key_t writerkey)
Automatic locking on the stack.
int Error() const
unsigned Write(const void *ptr, unsigned len)
Write data to the file descriptor.
Definition: FileHandle.cc:225