|
1 /* |
|
2 * Copyright 2004-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <algorithm> |
|
22 |
|
23 #include <sys/types.h> |
|
24 #include <sys/stat.h> |
|
25 #include <oasys/compat/inet_aton.h> |
|
26 #include <oasys/compat/rpc.h> |
|
27 #include <oasys/io/FileIOClient.h> |
|
28 #include <oasys/io/NetUtils.h> |
|
29 #include <oasys/util/Pointers.h> |
|
30 #include <oasys/util/ScratchBuffer.h> |
|
31 #include <oasys/util/XDRUtils.h> |
|
32 |
|
33 #include "APIServer.h" |
|
34 #include "bundling/APIBlockProcessor.h" |
|
35 #include "bundling/Bundle.h" |
|
36 #include "bundling/BundleEvent.h" |
|
37 #include "bundling/BundleDaemon.h" |
|
38 #include "bundling/BundleStatusReport.h" |
|
39 #include "bundling/SDNV.h" |
|
40 #include "bundling/GbofId.h" |
|
41 #include "naming/EndpointID.h" |
|
42 #include "cmd/APICommand.h" |
|
43 #include "reg/APIRegistration.h" |
|
44 #include "reg/RegistrationTable.h" |
|
45 #include "routing/BundleRouter.h" |
|
46 #include "storage/GlobalStore.h" |
|
47 #include "session/Session.h" |
|
48 |
|
49 #ifndef MIN |
|
50 #define MIN(x, y) ((x)<(y) ? (x) : (y)) |
|
51 #endif |
|
52 |
|
53 namespace dtn { |
|
54 |
|
55 //---------------------------------------------------------------------- |
|
56 APIServer::APIServer() |
|
57 // DELETE_ON_EXIT flag is not set; see below. |
|
58 : TCPServerThread("APIServer", "/dtn/apiserver", 0) |
|
59 { |
|
60 enabled_ = true; |
|
61 local_addr_ = htonl(INADDR_LOOPBACK); |
|
62 local_port_ = DTN_IPC_PORT; |
|
63 |
|
64 // override the defaults via environment variables, if given |
|
65 char *env; |
|
66 if ((env = getenv("DTNAPI_ADDR")) != NULL) { |
|
67 if (inet_aton(env, (struct in_addr*)&local_addr_) == 0) |
|
68 { |
|
69 log_err("DTNAPI_ADDR environment variable (%s) " |
|
70 "not a valid ip address, using default of localhost", |
|
71 env); |
|
72 // in case inet_aton touched it |
|
73 local_addr_ = htonl(INADDR_LOOPBACK); |
|
74 } else { |
|
75 log_debug("local address set to %s by DTNAPI_ADDR " |
|
76 "environment variable", env); |
|
77 } |
|
78 } |
|
79 |
|
80 if ((env = getenv("DTNAPI_PORT")) != NULL) { |
|
81 char *end; |
|
82 u_int port = strtoul(env, &end, 10); |
|
83 if (*end != '\0' || port > 0xffff) |
|
84 { |
|
85 log_err("DTNAPI_PORT environment variable (%s) " |
|
86 "not a valid ip port, using default of %d", |
|
87 env, DTN_IPC_PORT); |
|
88 port = DTN_IPC_PORT; |
|
89 } else { |
|
90 log_debug("api port set to %s by DTNAPI_PORT " |
|
91 "environment variable", env); |
|
92 } |
|
93 local_port_ = (u_int16_t)port; |
|
94 } |
|
95 |
|
96 if (local_addr_ != INADDR_ANY || local_port_ != 0) { |
|
97 log_debug("APIServer init (evironment set addr %s port %d)", |
|
98 intoa(local_addr_), local_port_); |
|
99 } else { |
|
100 log_debug("APIServer init"); |
|
101 } |
|
102 |
|
103 oasys::TclCommandInterp::instance()->reg(new APICommand(this)); |
|
104 } |
|
105 |
|
106 //---------------------------------------------------------------------- |
|
107 void |
|
108 APIServer::accepted(int fd, in_addr_t addr, u_int16_t port) |
|
109 { |
|
110 APIClient* c = new APIClient(fd, addr, port, this); |
|
111 register_client(c); |
|
112 c->start(); |
|
113 } |
|
114 |
|
115 //---------------------------------------------------------------------- |
|
116 |
|
117 // We keep a list of clients (register_client, unregister_client). As |
|
118 // each client shuts down it removes itself from the list. The server |
|
119 // sets should_stop to each of the clients, then spins waiting for the |
|
120 // list of clients to be emptied out. If we spin for a long time |
|
121 // (MAX_SPIN_TIME) without the list getting empty we give up. |
|
122 |
|
123 // note that the thread was created without DELETE_ON_EXIT so that the |
|
124 // thread object sticks around after the thread has died. This has the |
|
125 // upside of helping out APIClient objects that wake up after the |
|
126 // APIServer has given up on them (saving us from a core dump) but has |
|
127 // the downside of losing memory (one APIServer thread object). But |
|
128 // since the APIServer is shut down when we're about to exit, it's not |
|
129 // an issue. And only one APIServer is ever created. |
|
130 |
|
131 void |
|
132 APIServer::shutdown_hook() |
|
133 { |
|
134 // tell the clients to shut down |
|
135 std::list<APIClient *>::iterator ci; |
|
136 client_list_lock.lock("APIServer::shutdown"); |
|
137 for (ci = client_list.begin(); ci != client_list.end(); ++ci) { |
|
138 (*ci)->set_should_stop(); |
|
139 } |
|
140 client_list_lock.unlock(); |
|
141 |
|
142 #define MAX_SPIN_TIME (5 * 1000000) // max sleep in usec |
|
143 #define EACH_SPIN_TIME 10000 // sleep 10ms each time |
|
144 |
|
145 // As clients exit they unregister themselves, so if a client is |
|
146 // still on the list we assume that it is still alive. So here we |
|
147 // loop until the list is empty or MAX_SLEEP_TIME usecs have |
|
148 // passed. (We have a time out in case a client thread is wedged |
|
149 // or blocked waiting for a client. What we really want to catch |
|
150 // here is clients in the middle of processing a request.) |
|
151 int count = 0; |
|
152 while (count++ < (MAX_SPIN_TIME / EACH_SPIN_TIME)) { |
|
153 client_list_lock.lock("APIServer::shutdown"); |
|
154 bool empty = client_list.empty(); |
|
155 client_list_lock.unlock(); |
|
156 if (!empty) |
|
157 usleep(EACH_SPIN_TIME); |
|
158 else |
|
159 break; |
|
160 } |
|
161 return; |
|
162 } |
|
163 |
|
164 |
|
165 //---------------------------------------------------------------------- |
|
166 |
|
167 // manages a list of APIClient objects (threads) that have not exited yet. |
|
168 |
|
169 void |
|
170 APIServer::register_client(APIClient *c) |
|
171 { |
|
172 oasys::ScopeLock l(&client_list_lock, "APIServer::register_client"); |
|
173 client_list.push_front(c); |
|
174 } |
|
175 |
|
176 void |
|
177 APIServer::unregister_client(APIClient *c) |
|
178 { |
|
179 // remove c from the list of active clients |
|
180 oasys::ScopeLock l(&client_list_lock, "APIServer::unregister_client"); |
|
181 client_list.remove(c); |
|
182 } |
|
183 |
|
184 //---------------------------------------------------------------------- |
|
185 APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port, APIServer *parent) |
|
186 : Thread("APIClient", DELETE_ON_EXIT), |
|
187 TCPClient(fd, addr, port, "/dtn/apiclient"), |
|
188 notifier_(logpath_), |
|
189 parent_(parent), |
|
190 total_sent_(0), |
|
191 total_rcvd_(0) |
|
192 { |
|
193 // note that we skip space for the message length and code/status |
|
194 xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE); |
|
195 xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE); |
|
196 |
|
197 bindings_ = new APIRegistrationList(); |
|
198 sessions_ = new APIRegistrationList(); |
|
199 } |
|
200 |
|
201 //---------------------------------------------------------------------- |
|
202 APIClient::~APIClient() |
|
203 { |
|
204 log_debug("client destroyed"); |
|
205 delete_z(bindings_); |
|
206 delete_z(sessions_); |
|
207 } |
|
208 |
|
209 //---------------------------------------------------------------------- |
|
210 void |
|
211 APIClient::close_client() |
|
212 { |
|
213 TCPClient::close(); |
|
214 |
|
215 APIRegistration* reg; |
|
216 while (! bindings_->empty()) { |
|
217 reg = bindings_->front(); |
|
218 bindings_->pop_front(); |
|
219 |
|
220 reg->set_active(false); |
|
221 |
|
222 if (reg->expired()) { |
|
223 log_debug("removing expired registration %d", reg->regid()); |
|
224 BundleDaemon::post(new RegistrationExpiredEvent(reg)); |
|
225 } |
|
226 } |
|
227 |
|
228 // XXX/demmer memory leak here? |
|
229 sessions_->clear(); |
|
230 |
|
231 parent_->unregister_client(this); |
|
232 } |
|
233 |
|
234 //---------------------------------------------------------------------- |
|
235 int |
|
236 APIClient::handle_handshake() |
|
237 { |
|
238 u_int32_t handshake; |
|
239 u_int16_t message_type, ipc_version; |
|
240 |
|
241 int ret = readall((char*)&handshake, sizeof(handshake)); |
|
242 if (ret != sizeof(handshake)) { |
|
243 log_err("error reading handshake: (got %d/%zu), \"error\" %s", |
|
244 ret, sizeof(handshake), strerror(errno)); |
|
245 return -1; |
|
246 } |
|
247 |
|
248 total_rcvd_ += ret; |
|
249 |
|
250 message_type = ntohl(handshake) >> 16; |
|
251 ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff); |
|
252 |
|
253 if (message_type != DTN_OPEN) { |
|
254 log_err("handshake (0x%x)'s message type %d != DTN_OPEN (%d)", |
|
255 handshake, message_type, DTN_OPEN); |
|
256 return -1; |
|
257 } |
|
258 |
|
259 // to handle version mismatch more cleanly, we re-build the |
|
260 // handshake word with our own version and send it back to inform |
|
261 // the client, then if there's a mismatch, close the channel |
|
262 handshake = htonl(DTN_OPEN << 16 | DTN_IPC_VERSION); |
|
263 |
|
264 ret = writeall((char*)&handshake, sizeof(handshake)); |
|
265 if (ret != sizeof(handshake)) { |
|
266 log_err("error writing handshake: %s", strerror(errno)); |
|
267 return -1; |
|
268 } |
|
269 |
|
270 total_sent_ += ret; |
|
271 |
|
272 if (ipc_version != DTN_IPC_VERSION) { |
|
273 log_err("handshake (0x%x)'s version %d != DTN_IPC_VERSION (%d)", |
|
274 handshake, ipc_version, DTN_IPC_VERSION); |
|
275 return -1; |
|
276 } |
|
277 |
|
278 return 0; |
|
279 } |
|
280 |
|
281 //---------------------------------------------------------------------- |
|
282 void |
|
283 APIClient::run() |
|
284 { |
|
285 int ret; |
|
286 u_int8_t type; |
|
287 u_int32_t len; |
|
288 |
|
289 log_info("new session %s:%d -> %s:%d", |
|
290 intoa(local_addr()), local_port(), |
|
291 intoa(remote_addr()), remote_port()); |
|
292 |
|
293 if (handle_handshake() != 0) { |
|
294 close_client(); |
|
295 return; |
|
296 } |
|
297 |
|
298 while (true) { |
|
299 // check if someone has told us to quit by setting the |
|
300 // should_stop flag. if so, we're all done |
|
301 if (should_stop()) { |
|
302 close_client(); |
|
303 return; |
|
304 } |
|
305 |
|
306 xdr_setpos(&xdr_encode_, 0); |
|
307 xdr_setpos(&xdr_decode_, 0); |
|
308 |
|
309 // read the typecode and length of the incoming message into |
|
310 // the fourth byte of the, since the pair is five bytes long |
|
311 // and the XDR engines are set to point at the eighth byte of |
|
312 // the buffer |
|
313 log_debug("waiting for next message... total sent/rcvd: %zu/%zu", |
|
314 total_sent_, total_rcvd_); |
|
315 |
|
316 ret = read(&buf_[3], 5); |
|
317 if (ret <= 0) { |
|
318 log_warn("client disconnected without calling dtn_close"); |
|
319 close_client(); |
|
320 return; |
|
321 } |
|
322 total_rcvd_ += ret; |
|
323 |
|
324 if (ret < 5) { |
|
325 log_err("ack!! can't handle really short read..."); |
|
326 close_client(); |
|
327 return; |
|
328 } |
|
329 |
|
330 // NOTE: this protocol is duplicated in the implementation of |
|
331 // handle_begin_poll to take care of a cancel_poll request |
|
332 // coming in while the thread is waiting for bundles so any |
|
333 // modifications must be propagated there |
|
334 type = buf_[3]; |
|
335 memcpy(&len, &buf_[4], sizeof(len)); |
|
336 |
|
337 len = ntohl(len); |
|
338 |
|
339 ret -= 5; |
|
340 log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len); |
|
341 |
|
342 // if we didn't get the whole message, loop to get the rest, |
|
343 // skipping the header bytes and the already-read amount |
|
344 if (ret < (int)len) { |
|
345 int toget = len - ret; |
|
346 log_debug("reading remainder of message... total sent/rcvd: %zu/%zu", |
|
347 total_sent_, total_rcvd_); |
|
348 if (readall(&buf_[8 + ret], toget) != toget) { |
|
349 log_err("error reading message remainder: %s", |
|
350 strerror(errno)); |
|
351 close_client(); |
|
352 return; |
|
353 } |
|
354 total_rcvd_ += toget; |
|
355 } |
|
356 |
|
357 // check if someone has told us to quit by setting the |
|
358 // should_stop flag. if so, we're all done |
|
359 if (should_stop()) { |
|
360 close_client(); |
|
361 return; |
|
362 } |
|
363 |
|
364 // dispatch to the handler routine |
|
365 switch(type) { |
|
366 #define DISPATCH(_type, _fn) \ |
|
367 case _type: \ |
|
368 ret = _fn(); \ |
|
369 break; |
|
370 |
|
371 DISPATCH(DTN_LOCAL_EID, handle_local_eid); |
|
372 DISPATCH(DTN_REGISTER, handle_register); |
|
373 DISPATCH(DTN_UNREGISTER, handle_unregister); |
|
374 DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration); |
|
375 DISPATCH(DTN_SEND, handle_send); |
|
376 DISPATCH(DTN_CANCEL, handle_cancel); |
|
377 DISPATCH(DTN_BIND, handle_bind); |
|
378 DISPATCH(DTN_UNBIND, handle_unbind); |
|
379 DISPATCH(DTN_RECV, handle_recv); |
|
380 DISPATCH(DTN_BEGIN_POLL, handle_begin_poll); |
|
381 DISPATCH(DTN_CANCEL_POLL, handle_cancel_poll); |
|
382 DISPATCH(DTN_CLOSE, handle_close); |
|
383 DISPATCH(DTN_SESSION_UPDATE, handle_session_update); |
|
384 #undef DISPATCH |
|
385 |
|
386 default: |
|
387 log_err("unknown message type code 0x%x", type); |
|
388 ret = DTN_EMSGTYPE; |
|
389 break; |
|
390 } |
|
391 |
|
392 // if the handler returned -1, then the session should be |
|
393 // immediately terminated |
|
394 if (ret == -1) { |
|
395 close_client(); |
|
396 return; |
|
397 } |
|
398 |
|
399 // send the response |
|
400 if (send_response(ret) != 0) { |
|
401 return; |
|
402 } |
|
403 |
|
404 // if there was an IPC communication error or unknown message |
|
405 // type, close terminate the session |
|
406 // XXX/matt we could potentially close on all errors, not just these 2 |
|
407 if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) { |
|
408 close_client(); |
|
409 return; |
|
410 } |
|
411 |
|
412 } // while(1) |
|
413 } |
|
414 |
|
415 //---------------------------------------------------------------------- |
|
416 int |
|
417 APIClient::send_response(int ret) |
|
418 { |
|
419 u_int32_t len, msglen; |
|
420 |
|
421 // make sure the dispatched function returned a valid error |
|
422 // code |
|
423 ASSERT(ret == DTN_SUCCESS || |
|
424 (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX)); |
|
425 |
|
426 // fill in the reply message with the status code and the |
|
427 // length of the reply. note that if there is no reply, then |
|
428 // the xdr position should still be zero |
|
429 len = xdr_getpos(&xdr_encode_); |
|
430 log_debug("building reply: status %s, length %d", |
|
431 dtn_strerror(ret), len); |
|
432 |
|
433 msglen = len + 8; |
|
434 ret = ntohl(ret); |
|
435 len = htonl(len); |
|
436 |
|
437 memcpy(buf_, &ret, sizeof(ret)); |
|
438 memcpy(&buf_[4], &len, sizeof(len)); |
|
439 |
|
440 log_debug("sending %d byte reply message... total sent/rcvd: %zu/%zu", |
|
441 msglen, total_sent_, total_rcvd_); |
|
442 |
|
443 if (writeall(buf_, msglen) != (int)msglen) { |
|
444 log_err("error sending reply: %s", strerror(errno)); |
|
445 close_client(); |
|
446 return -1; |
|
447 } |
|
448 |
|
449 total_sent_ += msglen; |
|
450 |
|
451 return 0; |
|
452 } |
|
453 |
|
454 //---------------------------------------------------------------------- |
|
455 bool |
|
456 APIClient::is_bound(u_int32_t regid) |
|
457 { |
|
458 APIRegistrationList::iterator iter; |
|
459 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) { |
|
460 if ((*iter)->regid() == regid) { |
|
461 return true; |
|
462 } |
|
463 } |
|
464 |
|
465 return false; |
|
466 } |
|
467 |
|
468 //---------------------------------------------------------------------- |
|
469 int |
|
470 APIClient::handle_local_eid() |
|
471 { |
|
472 dtn_service_tag_t service_tag; |
|
473 dtn_endpoint_id_t local_eid; |
|
474 |
|
475 // unpack the request |
|
476 if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag)) |
|
477 { |
|
478 log_err("error in xdr unpacking arguments"); |
|
479 return DTN_EXDR; |
|
480 } |
|
481 |
|
482 // build up the response |
|
483 EndpointID eid(BundleDaemon::instance()->local_eid()); |
|
484 if (eid.append_service_tag(service_tag.tag) == false) { |
|
485 log_err("error appending service tag"); |
|
486 return DTN_EINVAL; |
|
487 } |
|
488 |
|
489 memset(&local_eid, 0, sizeof(local_eid)); |
|
490 eid.copyto(&local_eid); |
|
491 |
|
492 // pack the response |
|
493 if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) { |
|
494 log_err("internal error in xdr: xdr_dtn_endpoint_id_t"); |
|
495 return DTN_EXDR; |
|
496 } |
|
497 |
|
498 log_debug("get_local_eid encoded %d byte response", |
|
499 xdr_getpos(&xdr_encode_)); |
|
500 |
|
501 return DTN_SUCCESS; |
|
502 } |
|
503 |
|
504 //---------------------------------------------------------------------- |
|
505 int |
|
506 APIClient::handle_register() |
|
507 { |
|
508 APIRegistration* reg; |
|
509 Registration::failure_action_t action; |
|
510 EndpointIDPattern endpoint; |
|
511 std::string script; |
|
512 |
|
513 dtn_reg_info_t reginfo; |
|
514 |
|
515 memset(®info, 0, sizeof(reginfo)); |
|
516 |
|
517 // unpack and parse the request |
|
518 if (!xdr_dtn_reg_info_t(&xdr_decode_, ®info)) |
|
519 { |
|
520 log_err("error in xdr unpacking arguments"); |
|
521 return DTN_EXDR; |
|
522 } |
|
523 |
|
524 // make sure we free any dynamically-allocated bits in the |
|
525 // incoming structure before we exit the proc |
|
526 oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)®info); |
|
527 |
|
528 endpoint.assign(®info.endpoint); |
|
529 |
|
530 if (!endpoint.valid()) { |
|
531 log_err("invalid endpoint id in register: '%s'", |
|
532 reginfo.endpoint.uri); |
|
533 return DTN_EINVAL; |
|
534 } |
|
535 |
|
536 // registration flags are a bitmask currently containing: |
|
537 // |
|
538 // [unused] [3 bits session flags] [2 bits failure action] |
|
539 |
|
540 u_int failure_action = reginfo.flags & 0x3; |
|
541 switch (failure_action) { |
|
542 case DTN_REG_DEFER: action = Registration::DEFER; break; |
|
543 case DTN_REG_DROP: action = Registration::DROP; break; |
|
544 case DTN_REG_EXEC: action = Registration::EXEC; break; |
|
545 default: { |
|
546 log_err("invalid registration flags 0x%x", reginfo.flags); |
|
547 return DTN_EINVAL; |
|
548 } |
|
549 } |
|
550 |
|
551 |
|
552 u_int32_t session_flags = 0; |
|
553 if (reginfo.flags & DTN_SESSION_CUSTODY) { |
|
554 session_flags |= Session::CUSTODY; |
|
555 } |
|
556 if (reginfo.flags & DTN_SESSION_SUBSCRIBE) { |
|
557 session_flags |= Session::SUBSCRIBE; |
|
558 } |
|
559 if (reginfo.flags & DTN_SESSION_PUBLISH) { |
|
560 session_flags |= Session::PUBLISH; |
|
561 } |
|
562 |
|
563 u_int other_flags = reginfo.flags & ~0x1f; |
|
564 if (other_flags != 0) { |
|
565 log_err("invalid registration flags 0x%x", reginfo.flags); |
|
566 return DTN_EINVAL; |
|
567 } |
|
568 |
|
569 if (action == Registration::EXEC) { |
|
570 script.assign(reginfo.script.script_val, reginfo.script.script_len); |
|
571 } |
|
572 |
|
573 u_int32_t regid = GlobalStore::instance()->next_regid(); |
|
574 reg = new APIRegistration(regid, endpoint, action, session_flags, |
|
575 reginfo.expiration, script); |
|
576 |
|
577 if (! reginfo.init_passive) { |
|
578 // store the registration in the list for this session |
|
579 bindings_->push_back(reg); |
|
580 reg->set_active(true); |
|
581 } |
|
582 |
|
583 if (session_flags & Session::CUSTODY) { |
|
584 sessions_->push_back(reg); |
|
585 ASSERT(reg->session_notify_list() != NULL); |
|
586 } |
|
587 |
|
588 BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP), |
|
589 ¬ifier_); |
|
590 |
|
591 // fill the response with the new registration id |
|
592 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) { |
|
593 log_err("internal error in xdr: xdr_dtn_reg_id_t"); |
|
594 return DTN_EXDR; |
|
595 } |
|
596 |
|
597 return DTN_SUCCESS; |
|
598 } |
|
599 |
|
600 //---------------------------------------------------------------------- |
|
601 int |
|
602 APIClient::handle_unregister() |
|
603 { |
|
604 Registration* reg; |
|
605 dtn_reg_id_t regid; |
|
606 |
|
607 // unpack and parse the request |
|
608 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) |
|
609 { |
|
610 log_err("error in xdr unpacking arguments"); |
|
611 return DTN_EXDR; |
|
612 } |
|
613 |
|
614 reg = BundleDaemon::instance()->reg_table()->get(regid); |
|
615 if (reg == NULL) { |
|
616 return DTN_ENOTFOUND; |
|
617 } |
|
618 |
|
619 // handle the special case in which we're unregistering a |
|
620 // currently bound registration, in which we actually leave it |
|
621 // around in the expired state, soit will be cleaned up when the |
|
622 // application either calls dtn_unbind() or closes the api socket |
|
623 if (is_bound(reg->regid()) && reg->active()) { |
|
624 if (reg->expired()) { |
|
625 return DTN_EINVAL; |
|
626 } |
|
627 |
|
628 reg->force_expire(); |
|
629 ASSERT(reg->expired()); |
|
630 return DTN_SUCCESS; |
|
631 } |
|
632 |
|
633 // otherwise it's an error to call unregister on a registration |
|
634 // that's in-use by someone else |
|
635 if (reg->active()) { |
|
636 return DTN_EBUSY; |
|
637 } |
|
638 |
|
639 BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg), |
|
640 ¬ifier_); |
|
641 |
|
642 return DTN_SUCCESS; |
|
643 } |
|
644 |
|
645 //---------------------------------------------------------------------- |
|
646 int |
|
647 APIClient::handle_find_registration() |
|
648 { |
|
649 Registration* reg; |
|
650 EndpointIDPattern endpoint; |
|
651 dtn_endpoint_id_t app_eid; |
|
652 |
|
653 // unpack and parse the request |
|
654 if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid)) |
|
655 { |
|
656 log_err("error in xdr unpacking arguments"); |
|
657 return DTN_EXDR; |
|
658 } |
|
659 |
|
660 endpoint.assign(&app_eid); |
|
661 if (!endpoint.valid()) { |
|
662 log_err("invalid endpoint id in find_registration: '%s'", |
|
663 app_eid.uri); |
|
664 return DTN_EINVAL; |
|
665 } |
|
666 |
|
667 reg = BundleDaemon::instance()->reg_table()->get(endpoint); |
|
668 if (reg == NULL) { |
|
669 return DTN_ENOTFOUND; |
|
670 } |
|
671 |
|
672 u_int32_t regid = reg->regid(); |
|
673 |
|
674 // fill the response with the new registration id |
|
675 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) { |
|
676 log_err("internal error in xdr: xdr_dtn_reg_id_t"); |
|
677 return DTN_EXDR; |
|
678 } |
|
679 |
|
680 return DTN_SUCCESS; |
|
681 } |
|
682 |
|
683 //---------------------------------------------------------------------- |
|
684 int |
|
685 APIClient::handle_bind() |
|
686 { |
|
687 dtn_reg_id_t regid; |
|
688 |
|
689 // unpack the request |
|
690 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) { |
|
691 log_err("error in xdr unpacking arguments"); |
|
692 return DTN_EXDR; |
|
693 } |
|
694 |
|
695 // look up the registration |
|
696 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table(); |
|
697 Registration* reg = regtable->get(regid); |
|
698 |
|
699 if (!reg) { |
|
700 log_err("can't find registration %d", regid); |
|
701 return DTN_ENOTFOUND; |
|
702 } |
|
703 |
|
704 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg); |
|
705 if (api_reg == NULL) { |
|
706 log_crit("registration %d is not an API registration!!", |
|
707 regid); |
|
708 return DTN_ENOTFOUND; |
|
709 } |
|
710 |
|
711 if (api_reg->active()) { |
|
712 log_err("registration %d is already in active mode", regid); |
|
713 return DTN_EBUSY; |
|
714 } |
|
715 |
|
716 // store the registration in the list for this session |
|
717 bindings_->push_back(api_reg); |
|
718 api_reg->set_active(true); |
|
719 |
|
720 log_info("DTN_BIND: bound to registration %d", reg->regid()); |
|
721 |
|
722 return DTN_SUCCESS; |
|
723 } |
|
724 |
|
725 //---------------------------------------------------------------------- |
|
726 int |
|
727 APIClient::handle_unbind() |
|
728 { |
|
729 dtn_reg_id_t regid; |
|
730 |
|
731 // unpack the request |
|
732 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) { |
|
733 log_err("error in xdr unpacking arguments"); |
|
734 return DTN_EXDR; |
|
735 } |
|
736 |
|
737 // look up the registration |
|
738 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table(); |
|
739 Registration* reg = regtable->get(regid); |
|
740 |
|
741 if (!reg) { |
|
742 log_err("can't find registration %d", regid); |
|
743 return DTN_ENOTFOUND; |
|
744 } |
|
745 |
|
746 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg); |
|
747 if (api_reg == NULL) { |
|
748 log_crit("registration %d is not an API registration!!", |
|
749 regid); |
|
750 return DTN_ENOTFOUND; |
|
751 } |
|
752 |
|
753 APIRegistrationList::iterator iter; |
|
754 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) { |
|
755 if (*iter == api_reg) { |
|
756 bindings_->erase(iter); |
|
757 ASSERT(api_reg->active()); |
|
758 api_reg->set_active(false); |
|
759 |
|
760 if (reg->expired()) { |
|
761 log_debug("removing expired registration %d", reg->regid()); |
|
762 BundleDaemon::post(new RegistrationExpiredEvent(reg)); |
|
763 } |
|
764 |
|
765 log_info("DTN_UNBIND: unbound from registration %d", regid); |
|
766 return DTN_SUCCESS; |
|
767 } |
|
768 } |
|
769 |
|
770 log_err("registration %d not bound to this api client", regid); |
|
771 return DTN_ENOTFOUND; |
|
772 } |
|
773 |
|
774 //---------------------------------------------------------------------- |
|
775 int |
|
776 APIClient::handle_send() |
|
777 { |
|
778 dtn_reg_id_t regid; |
|
779 dtn_bundle_spec_t spec; |
|
780 dtn_bundle_payload_t payload; |
|
781 |
|
782 memset(&spec, 0, sizeof(spec)); |
|
783 memset(&payload, 0, sizeof(payload)); |
|
784 |
|
785 /* Unpack the arguments */ |
|
786 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id) || |
|
787 !xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) || |
|
788 !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload)) |
|
789 { |
|
790 log_err("error in xdr unpacking arguments"); |
|
791 return DTN_EXDR; |
|
792 } |
|
793 |
|
794 BundleRef b("APIClient::handle_send"); |
|
795 b = new Bundle(); |
|
796 |
|
797 // make sure any xdr calls to malloc are cleaned up |
|
798 oasys::ScopeXDRFree f1((xdrproc_t)xdr_dtn_bundle_spec_t, |
|
799 (char*)&spec); |
|
800 oasys::ScopeXDRFree f2((xdrproc_t)xdr_dtn_bundle_payload_t, |
|
801 (char*)&payload); |
|
802 |
|
803 // assign the addressing fields... |
|
804 |
|
805 // source and destination are always specified |
|
806 b->mutable_source()->assign(&spec.source); |
|
807 b->mutable_dest()->assign(&spec.dest); |
|
808 |
|
809 // replyto defaults to null |
|
810 if (spec.replyto.uri[0] == '\0') { |
|
811 b->mutable_replyto()->assign(EndpointID::NULL_EID()); |
|
812 } else { |
|
813 b->mutable_replyto()->assign(&spec.replyto); |
|
814 } |
|
815 |
|
816 // custodian is always null |
|
817 b->mutable_custodian()->assign(EndpointID::NULL_EID()); |
|
818 |
|
819 // set the is_singleton bit, first checking if the application |
|
820 // specified a value, then seeing if the scheme is known and can |
|
821 // therefore determine for itself, and finally, checking the |
|
822 // global default |
|
823 if (spec.dopts & DOPTS_SINGLETON_DEST) |
|
824 { |
|
825 b->set_singleton_dest(true); |
|
826 } |
|
827 else if (spec.dopts & DOPTS_MULTINODE_DEST) |
|
828 { |
|
829 b->set_singleton_dest(false); |
|
830 } |
|
831 else |
|
832 { |
|
833 EndpointID::singleton_info_t info; |
|
834 |
|
835 if (b->dest().known_scheme()) { |
|
836 info = b->dest().is_singleton(); |
|
837 |
|
838 // all schemes must make a decision one way or the other |
|
839 ASSERT(info != EndpointID::UNKNOWN); |
|
840 } else { |
|
841 info = EndpointID::is_singleton_default_; |
|
842 } |
|
843 |
|
844 switch (info) { |
|
845 case EndpointID::UNKNOWN: |
|
846 log_err("bundle destination %s in unknown scheme and " |
|
847 "app did not assert singleton/multipoint", |
|
848 b->dest().c_str()); |
|
849 return DTN_EINVAL; |
|
850 |
|
851 case EndpointID::SINGLETON: |
|
852 b->set_singleton_dest(true); |
|
853 break; |
|
854 |
|
855 case EndpointID::MULTINODE: |
|
856 b->set_singleton_dest(false); |
|
857 break; |
|
858 } |
|
859 } |
|
860 |
|
861 // the priority code |
|
862 switch (spec.priority) { |
|
863 #define COS(_cos) case _cos: b->set_priority(Bundle::_cos); break; |
|
864 COS(COS_BULK); |
|
865 COS(COS_NORMAL); |
|
866 COS(COS_EXPEDITED); |
|
867 COS(COS_RESERVED); |
|
868 #undef COS |
|
869 default: |
|
870 log_err("invalid priority level %d", (int)spec.priority); |
|
871 return DTN_EINVAL; |
|
872 }; |
|
873 |
|
874 // The bundle's source EID must be either dtn:none or an EID |
|
875 // registered at this node so check that now. |
|
876 const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); |
|
877 RegistrationList unused; |
|
878 if (b->source() == EndpointID::NULL_EID()) |
|
879 { |
|
880 // Bundles with a null source EID are not allowed to request reports or |
|
881 // custody transfer, and must not be fragmented. |
|
882 if (spec.dopts) { |
|
883 log_err("bundle with null source EID requested reports and/or " |
|
884 "custody transfer"); |
|
885 return DTN_EINVAL; |
|
886 } |
|
887 |
|
888 b->set_do_not_fragment(true); |
|
889 } |
|
890 else if (reg_table->get_matching(b->source(), &unused) != 0) |
|
891 { |
|
892 // Local registration -- don't do anything |
|
893 } |
|
894 else if (b->source().subsume(BundleDaemon::instance()->local_eid())) |
|
895 { |
|
896 // Allow source EIDs that subsume the local eid |
|
897 } |
|
898 else |
|
899 { |
|
900 log_err("this node is not a member of the bundle's source EID (%s)", |
|
901 b->source().str().c_str()); |
|
902 return DTN_EINVAL; |
|
903 } |
|
904 |
|
905 // Now look up the registration ID passed in to see if the bundle |
|
906 // was sent as part of a session |
|
907 Registration* reg = reg_table->get(regid); |
|
908 if (reg && reg->session_flags() != 0) { |
|
909 b->mutable_session_eid()->assign(reg->endpoint().str()); |
|
910 } |
|
911 |
|
912 // delivery options |
|
913 if (spec.dopts & DOPTS_CUSTODY) |
|
914 b->set_custody_requested(true); |
|
915 |
|
916 if (spec.dopts & DOPTS_DELIVERY_RCPT) |
|
917 b->set_delivery_rcpt(true); |
|
918 |
|
919 if (spec.dopts & DOPTS_RECEIVE_RCPT) |
|
920 b->set_receive_rcpt(true); |
|
921 |
|
922 if (spec.dopts & DOPTS_FORWARD_RCPT) |
|
923 b->set_forward_rcpt(true); |
|
924 |
|
925 if (spec.dopts & DOPTS_CUSTODY_RCPT) |
|
926 b->set_custody_rcpt(true); |
|
927 |
|
928 if (spec.dopts & DOPTS_DELETE_RCPT) |
|
929 b->set_deletion_rcpt(true); |
|
930 |
|
931 if (spec.dopts & DOPTS_DO_NOT_FRAGMENT) |
|
932 b->set_do_not_fragment(true); |
|
933 |
|
934 // expiration time |
|
935 b->set_expiration(spec.expiration); |
|
936 |
|
937 // sequence id and obsoletes id |
|
938 if (spec.sequence_id.data.data_len != 0) |
|
939 { |
|
940 std::string str(spec.sequence_id.data.data_val, |
|
941 spec.sequence_id.data.data_len); |
|
942 |
|
943 bool ok = b->mutable_sequence_id()->parse(str); |
|
944 if (! ok) { |
|
945 log_err("invalid sequence id '%s'", str.c_str()); |
|
946 return DTN_EINVAL; |
|
947 } |
|
948 } |
|
949 |
|
950 if (spec.obsoletes_id.data.data_len != 0) |
|
951 { |
|
952 std::string str(spec.obsoletes_id.data.data_val, |
|
953 spec.obsoletes_id.data.data_len); |
|
954 |
|
955 bool ok = b->mutable_obsoletes_id()->parse(str); |
|
956 if (! ok) { |
|
957 log_err("invalid obsoletes id '%s'", str.c_str()); |
|
958 return DTN_EINVAL; |
|
959 } |
|
960 } |
|
961 |
|
962 // extension blocks |
|
963 for (u_int i = 0; i < spec.blocks.blocks_len; i++) { |
|
964 dtn_extension_block_t* block = &spec.blocks.blocks_val[i]; |
|
965 |
|
966 BlockInfo* info = |
|
967 b->api_blocks()->append_block(APIBlockProcessor::instance()); |
|
968 APIBlockProcessor::instance()-> |
|
969 init_block(info, b->api_blocks(), |
|
970 block->type, block->flags, |
|
971 (u_char*)block->data.data_val, |
|
972 block->data.data_len); |
|
973 } |
|
974 |
|
975 // metadata blocks |
|
976 for (unsigned int i = 0; i < spec.metadata.metadata_len; ++i) { |
|
977 dtn_extension_block_t* block = &spec.metadata.metadata_val[i]; |
|
978 |
|
979 LinkRef null_link("APIServer::handle_send"); |
|
980 MetadataVec * vec = b->generated_metadata().find_blocks(null_link); |
|
981 if (vec == NULL) { |
|
982 vec = b->mutable_generated_metadata()->create_blocks(null_link); |
|
983 } |
|
984 ASSERT(vec != NULL); |
|
985 |
|
986 MetadataBlock * meta_block = new MetadataBlock( |
|
987 (u_int64_t)block->type, |
|
988 (u_char *)block->data.data_val, |
|
989 (u_int32_t)block->data.data_len); |
|
990 meta_block->set_flags((u_int64_t)block->flags); |
|
991 |
|
992 // XXX/demmer currently this block needs to be stuck on the |
|
993 // outgoing metadata for the null link (so it's transmit to |
|
994 // all destinations) as well as on the recv_metadata vector so |
|
995 // it's conveyed to local applications. this should really be |
|
996 // cleaned up... |
|
997 vec->push_back(meta_block); |
|
998 b->mutable_recv_metadata()->push_back(meta_block); |
|
999 } |
|
1000 |
|
1001 // validate the bundle metadata |
|
1002 oasys::StringBuffer error; |
|
1003 if (!b->validate(&error)) { |
|
1004 log_err("bundle validation failed: %s", error.data()); |
|
1005 return DTN_EINVAL; |
|
1006 } |
|
1007 |
|
1008 // set up the payload, including calculating its length, but don't |
|
1009 // copy it in yet |
|
1010 size_t payload_len; |
|
1011 char filename[PATH_MAX]; |
|
1012 |
|
1013 switch (payload.location) { |
|
1014 case DTN_PAYLOAD_MEM: |
|
1015 payload_len = payload.buf.buf_len; |
|
1016 break; |
|
1017 |
|
1018 case DTN_PAYLOAD_FILE: |
|
1019 case DTN_PAYLOAD_TEMP_FILE: |
|
1020 struct stat finfo; |
|
1021 sprintf(filename, "%.*s", |
|
1022 (int)payload.filename.filename_len, |
|
1023 payload.filename.filename_val); |
|
1024 |
|
1025 if (stat(filename, &finfo) != 0) |
|
1026 { |
|
1027 log_err("payload file %s does not exist!", filename); |
|
1028 return DTN_EINVAL; |
|
1029 } |
|
1030 |
|
1031 payload_len = finfo.st_size; |
|
1032 break; |
|
1033 |
|
1034 default: |
|
1035 log_err("payload.location of %d unknown", payload.location); |
|
1036 return DTN_EINVAL; |
|
1037 } |
|
1038 |
|
1039 b->mutable_payload()->set_length(payload_len); |
|
1040 |
|
1041 // before filling in the payload, we first probe the router to |
|
1042 // determine if there's sufficient storage for the bundle |
|
1043 bool result; |
|
1044 int reason; |
|
1045 BundleDaemon::post_and_wait( |
|
1046 new BundleAcceptRequest(b, EVENTSRC_APP, &result, &reason), |
|
1047 ¬ifier_); |
|
1048 |
|
1049 if (!result) { |
|
1050 log_info("DTN_SEND bundle not accepted: reason %s", |
|
1051 BundleStatusReport::reason_to_str(reason)); |
|
1052 |
|
1053 switch (reason) { |
|
1054 case BundleProtocol::REASON_DEPLETED_STORAGE: |
|
1055 return DTN_ENOSPACE; |
|
1056 default: |
|
1057 return DTN_EINTERNAL; |
|
1058 } |
|
1059 } |
|
1060 |
|
1061 switch (payload.location) { |
|
1062 case DTN_PAYLOAD_MEM: |
|
1063 b->mutable_payload()->set_data((u_char*)payload.buf.buf_val, |
|
1064 payload.buf.buf_len); |
|
1065 break; |
|
1066 |
|
1067 case DTN_PAYLOAD_FILE: |
|
1068 FILE* file; |
|
1069 int r, left; |
|
1070 u_char buffer[4096]; |
|
1071 size_t offset; |
|
1072 |
|
1073 if ((file = fopen(filename, "r")) == NULL) |
|
1074 { |
|
1075 log_err("payload file %s can't be opened: %s", |
|
1076 filename, strerror(errno)); |
|
1077 return DTN_EINVAL; |
|
1078 } |
|
1079 |
|
1080 left = payload_len; |
|
1081 r = 0; |
|
1082 offset = 0; |
|
1083 while (left > 0) |
|
1084 { |
|
1085 r = fread(buffer, 1, (left>4096)?4096:left, file); |
|
1086 |
|
1087 if (r) |
|
1088 { |
|
1089 b->mutable_payload()->write_data(buffer, offset, r); |
|
1090 left -= r; |
|
1091 offset += r; |
|
1092 } |
|
1093 else |
|
1094 { |
|
1095 sleep(1); // pause before re-reading |
|
1096 } |
|
1097 } |
|
1098 |
|
1099 fclose(file); |
|
1100 break; |
|
1101 |
|
1102 case DTN_PAYLOAD_TEMP_FILE: |
|
1103 if (! b->mutable_payload()->replace_with_file(filename)) { |
|
1104 log_err("payload file %s can't be linked or copied", |
|
1105 filename); |
|
1106 return DTN_EINVAL; |
|
1107 } |
|
1108 |
|
1109 if (::unlink(filename) != 0) { |
|
1110 log_err("error unlinking payload temp file: %s", |
|
1111 strerror(errno)); |
|
1112 // continue on since this is non-fatal |
|
1113 } |
|
1114 } |
|
1115 |
|
1116 // before posting the received event, fill in the bundle id struct |
|
1117 dtn_bundle_id_t id; |
|
1118 memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t)); |
|
1119 id.creation_ts.secs = b->creation_ts().seconds_; |
|
1120 id.creation_ts.seqno = b->creation_ts().seqno_; |
|
1121 id.frag_offset = 0; |
|
1122 id.orig_length = 0; |
|
1123 |
|
1124 log_info("DTN_SEND bundle *%p", b.object()); |
|
1125 |
|
1126 // deliver the bundle |
|
1127 // Note: the bundle state may change once it has been posted |
|
1128 BundleDaemon::post_and_wait( |
|
1129 new BundleReceivedEvent(b.object(), EVENTSRC_APP), |
|
1130 ¬ifier_); |
|
1131 |
|
1132 // return the bundle id struct |
|
1133 if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) { |
|
1134 log_err("internal error in xdr: xdr_dtn_bundle_id_t"); |
|
1135 return DTN_EXDR; |
|
1136 } |
|
1137 |
|
1138 return DTN_SUCCESS; |
|
1139 } |
|
1140 |
|
1141 //---------------------------------------------------------------------- |
|
1142 int |
|
1143 APIClient::handle_cancel() |
|
1144 { |
|
1145 dtn_bundle_id_t id; |
|
1146 |
|
1147 memset(&id, 0, sizeof(id)); |
|
1148 |
|
1149 /* Unpack the arguments */ |
|
1150 if (!xdr_dtn_bundle_id_t(&xdr_decode_, &id)) |
|
1151 { |
|
1152 log_err("error in xdr unpacking arguments"); |
|
1153 return DTN_EXDR; |
|
1154 } |
|
1155 |
|
1156 GbofId gbof_id; |
|
1157 gbof_id.source_ = EndpointID( std::string(id.source.uri) ); |
|
1158 gbof_id.creation_ts_.seconds_ = id.creation_ts.secs; |
|
1159 gbof_id.creation_ts_.seqno_ = id.creation_ts.seqno; |
|
1160 gbof_id.is_fragment_ = (id.orig_length > 0); |
|
1161 gbof_id.frag_length_ = id.orig_length; |
|
1162 gbof_id.frag_offset_ = id.frag_offset; |
|
1163 |
|
1164 BundleRef bundle; |
|
1165 oasys::ScopeLock pending_lock( |
|
1166 BundleDaemon::instance()->pending_bundles()->lock(), "handle_cancel"); |
|
1167 bundle = BundleDaemon::instance()->pending_bundles()->find(gbof_id); |
|
1168 |
|
1169 if (!bundle.object()) { |
|
1170 log_warn("no bundle matching [%s]; cannot cancel", |
|
1171 gbof_id.str().c_str()); |
|
1172 return DTN_ENOTFOUND; |
|
1173 } |
|
1174 |
|
1175 log_info("DTN_CANCEL bundle *%p", bundle.object()); |
|
1176 |
|
1177 BundleDaemon::post(new BundleCancelRequest(bundle, std::string())); |
|
1178 return DTN_SUCCESS; |
|
1179 } |
|
1180 |
|
1181 // Size for temporary memory buffer used when delivering bundles |
|
1182 // via files. |
|
1183 #define DTN_FILE_DELIVERY_BUF_SIZE 1000 |
|
1184 |
|
1185 //---------------------------------------------------------------------- |
|
1186 int |
|
1187 APIClient::handle_recv() |
|
1188 { |
|
1189 dtn_bundle_spec_t spec; |
|
1190 dtn_bundle_payload_t payload; |
|
1191 dtn_bundle_payload_location_t location; |
|
1192 dtn_bundle_status_report_t status_report; |
|
1193 dtn_timeval_t timeout; |
|
1194 oasys::ScratchBuffer<u_char*> buf; |
|
1195 APIRegistration* reg = NULL; |
|
1196 bool sock_ready = false; |
|
1197 oasys::FileIOClient tmpfile; |
|
1198 |
|
1199 // unpack the arguments |
|
1200 if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) || |
|
1201 (!xdr_dtn_timeval_t(&xdr_decode_, &timeout))) |
|
1202 { |
|
1203 log_err("error in xdr unpacking arguments"); |
|
1204 return DTN_EXDR; |
|
1205 } |
|
1206 |
|
1207 int err = wait_for_notify("recv", timeout, ®, NULL, &sock_ready); |
|
1208 if (err != 0) { |
|
1209 return err; |
|
1210 } |
|
1211 |
|
1212 // if there's data on the socket, that either means the socket was |
|
1213 // closed by an exiting application or the app is violating the |
|
1214 // protocol... |
|
1215 if (sock_ready) { |
|
1216 return handle_unexpected_data("handle_recv"); |
|
1217 } |
|
1218 |
|
1219 ASSERT(reg != NULL); |
|
1220 |
|
1221 BundleRef bref("APIClient::handle_recv"); |
|
1222 bref = reg->bundle_list()->pop_front(); |
|
1223 Bundle* b = bref.object(); |
|
1224 ASSERT(b != NULL); |
|
1225 |
|
1226 log_debug("handle_recv: popped *%p for registration %d (timeout %d)", |
|
1227 b, reg->regid(), timeout); |
|
1228 |
|
1229 memset(&spec, 0, sizeof(spec)); |
|
1230 memset(&payload, 0, sizeof(payload)); |
|
1231 memset(&status_report, 0, sizeof(status_report)); |
|
1232 |
|
1233 // copyto will malloc string buffer space that needs to be freed |
|
1234 // at the end of the fn |
|
1235 b->source().copyto(&spec.source); |
|
1236 b->dest().copyto(&spec.dest); |
|
1237 b->replyto().copyto(&spec.replyto); |
|
1238 |
|
1239 spec.dopts = 0; |
|
1240 if (b->custody_requested()) spec.dopts |= DOPTS_CUSTODY; |
|
1241 if (b->delivery_rcpt()) spec.dopts |= DOPTS_DELIVERY_RCPT; |
|
1242 if (b->receive_rcpt()) spec.dopts |= DOPTS_RECEIVE_RCPT; |
|
1243 if (b->forward_rcpt()) spec.dopts |= DOPTS_FORWARD_RCPT; |
|
1244 if (b->custody_rcpt()) spec.dopts |= DOPTS_CUSTODY_RCPT; |
|
1245 if (b->deletion_rcpt()) spec.dopts |= DOPTS_DELETE_RCPT; |
|
1246 |
|
1247 spec.expiration = b->expiration(); |
|
1248 spec.creation_ts.secs = b->creation_ts().seconds_; |
|
1249 spec.creation_ts.seqno = b->creation_ts().seqno_; |
|
1250 spec.delivery_regid = reg->regid(); |
|
1251 |
|
1252 // copy out the sequence id and obsoletes id |
|
1253 std::string sequence_id_str, obsoletes_id_str; |
|
1254 if (! b->sequence_id().empty()) { |
|
1255 sequence_id_str = b->sequence_id().to_str(); |
|
1256 spec.sequence_id.data.data_val = const_cast<char*>(sequence_id_str.c_str()); |
|
1257 spec.sequence_id.data.data_len = sequence_id_str.length(); |
|
1258 } |
|
1259 |
|
1260 if (! b->obsoletes_id().empty()) { |
|
1261 obsoletes_id_str = b->obsoletes_id().to_str(); |
|
1262 spec.obsoletes_id.data.data_val = const_cast<char*>(obsoletes_id_str.c_str()); |
|
1263 spec.obsoletes_id.data.data_len = obsoletes_id_str.length(); |
|
1264 } |
|
1265 |
|
1266 // copy extension blocks |
|
1267 unsigned int blocks_found = 0; |
|
1268 unsigned int data_len = 0; |
|
1269 for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) { |
|
1270 if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) || |
|
1271 (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) || |
|
1272 (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) { |
|
1273 continue; |
|
1274 } |
|
1275 blocks_found++; |
|
1276 data_len += b->recv_blocks()[i].data_length(); |
|
1277 } |
|
1278 |
|
1279 if (blocks_found > 0) { |
|
1280 unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) + |
|
1281 data_len; |
|
1282 void * buf = malloc(buf_len); |
|
1283 memset(buf, 0, buf_len); |
|
1284 |
|
1285 dtn_extension_block_t * bp = (dtn_extension_block_t *)buf; |
|
1286 char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t)); |
|
1287 for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) { |
|
1288 if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) || |
|
1289 (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) || |
|
1290 (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) { |
|
1291 continue; |
|
1292 } |
|
1293 |
|
1294 bp->type = b->recv_blocks()[i].type(); |
|
1295 bp->flags = b->recv_blocks()[i].flags(); |
|
1296 bp->data.data_len = b->recv_blocks()[i].data_length(); |
|
1297 bp->data.data_val = dp; |
|
1298 memcpy(dp, b->recv_blocks()[i].data(), bp->data.data_len); |
|
1299 |
|
1300 bp++; |
|
1301 dp += bp->data.data_len; |
|
1302 } |
|
1303 |
|
1304 spec.blocks.blocks_len = blocks_found; |
|
1305 spec.blocks.blocks_val = (dtn_extension_block_t *)buf; |
|
1306 } |
|
1307 |
|
1308 // copy metadata extension blocks |
|
1309 blocks_found = 0; |
|
1310 data_len = 0; |
|
1311 for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) { |
|
1312 blocks_found++; |
|
1313 data_len += b->recv_metadata()[i]->metadata_len(); |
|
1314 } |
|
1315 |
|
1316 if (blocks_found > 0) { |
|
1317 unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) + |
|
1318 data_len; |
|
1319 void * buf = (char *)malloc(buf_len); |
|
1320 memset(buf, 0, buf_len); |
|
1321 |
|
1322 dtn_extension_block_t * bp = (dtn_extension_block_t *)buf; |
|
1323 char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t)); |
|
1324 for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) { |
|
1325 bp->type = b->recv_metadata()[i]->ontology(); |
|
1326 bp->flags = b->recv_metadata()[i]->flags(); |
|
1327 bp->data.data_len = b->recv_metadata()[i]->metadata_len(); |
|
1328 bp->data.data_val = dp; |
|
1329 memcpy(dp, b->recv_metadata()[i]->metadata(), bp->data.data_len); |
|
1330 dp += bp->data.data_len; |
|
1331 bp++; |
|
1332 } |
|
1333 |
|
1334 spec.metadata.metadata_len = blocks_found; |
|
1335 spec.metadata.metadata_val = (dtn_extension_block_t *)buf; |
|
1336 } |
|
1337 |
|
1338 size_t payload_len = b->payload().length(); |
|
1339 |
|
1340 if (location == DTN_PAYLOAD_MEM && payload_len > DTN_MAX_BUNDLE_MEM) |
|
1341 { |
|
1342 log_debug("app requested memory delivery but payload is too big (%zu bytes)... " |
|
1343 "using files instead", |
|
1344 payload_len); |
|
1345 location = DTN_PAYLOAD_FILE; |
|
1346 } |
|
1347 |
|
1348 if (location == DTN_PAYLOAD_MEM) { |
|
1349 // the app wants the payload in memory |
|
1350 payload.buf.buf_len = payload_len; |
|
1351 if (payload_len != 0) { |
|
1352 buf.reserve(payload_len); |
|
1353 payload.buf.buf_val = |
|
1354 (char*)b->payload().read_data(0, payload_len, buf.buf()); |
|
1355 } else { |
|
1356 payload.buf.buf_val = 0; |
|
1357 } |
|
1358 |
|
1359 } else if (location == DTN_PAYLOAD_FILE) { |
|
1360 const char *tdir; |
|
1361 char templ[64]; |
|
1362 |
|
1363 tdir = getenv("TMP"); |
|
1364 if (tdir == NULL) { |
|
1365 tdir = getenv("TEMP"); |
|
1366 } |
|
1367 if (tdir == NULL) { |
|
1368 tdir = "/tmp"; |
|
1369 } |
|
1370 |
|
1371 snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir); |
|
1372 |
|
1373 if (tmpfile.mkstemp(templ) == -1) { |
|
1374 log_err("can't open temporary file to deliver bundle"); |
|
1375 return DTN_EINTERNAL; |
|
1376 } |
|
1377 |
|
1378 if (chmod(tmpfile.path(), 0666) < 0) { |
|
1379 log_warn("can't set the permission of temp file to 0666: %s", |
|
1380 strerror(errno)); |
|
1381 } |
|
1382 |
|
1383 b->payload().copy_file(&tmpfile); |
|
1384 |
|
1385 payload.filename.filename_val = (char*)tmpfile.path(); |
|
1386 payload.filename.filename_len = tmpfile.path_len() + 1; |
|
1387 tmpfile.close(); |
|
1388 |
|
1389 } else { |
|
1390 log_err("payload location %d not understood", location); |
|
1391 return DTN_EINVAL; |
|
1392 } |
|
1393 |
|
1394 payload.location = location; |
|
1395 |
|
1396 /* |
|
1397 * If the bundle is a status report, parse it and copy out the |
|
1398 * data into the status report. |
|
1399 */ |
|
1400 BundleStatusReport::data_t sr_data; |
|
1401 if (BundleStatusReport::parse_status_report(&sr_data, b)) |
|
1402 { |
|
1403 payload.status_report = &status_report; |
|
1404 sr_data.orig_source_eid_.copyto(&status_report.bundle_id.source); |
|
1405 status_report.bundle_id.creation_ts.secs = |
|
1406 sr_data.orig_creation_tv_.seconds_; |
|
1407 status_report.bundle_id.creation_ts.seqno = |
|
1408 sr_data.orig_creation_tv_.seqno_; |
|
1409 status_report.bundle_id.frag_offset = sr_data.orig_frag_offset_; |
|
1410 status_report.bundle_id.orig_length = sr_data.orig_frag_length_; |
|
1411 |
|
1412 status_report.reason = (dtn_status_report_reason_t)sr_data.reason_code_; |
|
1413 status_report.flags = (dtn_status_report_flags_t)sr_data.status_flags_; |
|
1414 |
|
1415 status_report.receipt_ts.secs = sr_data.receipt_tv_.seconds_; |
|
1416 status_report.receipt_ts.seqno = sr_data.receipt_tv_.seqno_; |
|
1417 status_report.custody_ts.secs = sr_data.custody_tv_.seconds_; |
|
1418 status_report.custody_ts.seqno = sr_data.custody_tv_.seqno_; |
|
1419 status_report.forwarding_ts.secs = sr_data.forwarding_tv_.seconds_; |
|
1420 status_report.forwarding_ts.seqno = sr_data.forwarding_tv_.seqno_; |
|
1421 status_report.delivery_ts.secs = sr_data.delivery_tv_.seconds_; |
|
1422 status_report.delivery_ts.seqno = sr_data.delivery_tv_.seqno_; |
|
1423 status_report.deletion_ts.secs = sr_data.deletion_tv_.seconds_; |
|
1424 status_report.deletion_ts.seqno = sr_data.deletion_tv_.seqno_; |
|
1425 status_report.ack_by_app_ts.secs = sr_data.ack_by_app_tv_.seconds_; |
|
1426 status_report.ack_by_app_ts.seqno = sr_data.ack_by_app_tv_.seqno_; |
|
1427 } |
|
1428 |
|
1429 if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec)) |
|
1430 { |
|
1431 log_err("internal error in xdr: xdr_dtn_bundle_spec_t"); |
|
1432 return DTN_EXDR; |
|
1433 } |
|
1434 |
|
1435 if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload)) |
|
1436 { |
|
1437 log_err("internal error in xdr: xdr_dtn_bundle_payload_t"); |
|
1438 return DTN_EXDR; |
|
1439 } |
|
1440 |
|
1441 // prevent xdr_free of non-malloc'd pointer |
|
1442 payload.status_report = NULL; |
|
1443 |
|
1444 log_info("DTN_RECV: " |
|
1445 "successfully delivered bundle %d to registration %d", |
|
1446 b->bundleid(), reg->regid()); |
|
1447 |
|
1448 BundleDaemon::post(new BundleDeliveredEvent(b, reg)); |
|
1449 |
|
1450 return DTN_SUCCESS; |
|
1451 } |
|
1452 |
|
1453 //---------------------------------------------------------------------- |
|
1454 int |
|
1455 APIClient::handle_begin_poll() |
|
1456 { |
|
1457 dtn_timeval_t timeout; |
|
1458 APIRegistration* recv_reg = NULL; |
|
1459 APIRegistration* notify_reg = NULL; |
|
1460 bool sock_ready = false; |
|
1461 |
|
1462 // unpack the arguments |
|
1463 if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout))) |
|
1464 { |
|
1465 log_err("error in xdr unpacking arguments"); |
|
1466 return DTN_EXDR; |
|
1467 } |
|
1468 |
|
1469 int err = wait_for_notify("poll", timeout, &recv_reg, ¬ify_reg, |
|
1470 &sock_ready); |
|
1471 if (err != 0) { |
|
1472 return err; |
|
1473 } |
|
1474 |
|
1475 // if there's data on the socket, then the application either quit |
|
1476 // and closed the socket, or called dtn_poll_cancel |
|
1477 if (sock_ready) { |
|
1478 log_debug("handle_begin_poll: " |
|
1479 "api socket ready -- trying to read one byte"); |
|
1480 char type; |
|
1481 |
|
1482 int ret = read(&type, 1); |
|
1483 if (ret == 0) { |
|
1484 log_info("IPC socket closed while blocked in read... " |
|
1485 "application must have exited"); |
|
1486 return -1; |
|
1487 } |
|
1488 |
|
1489 if (ret == -1) { |
|
1490 log_err("handle_begin_poll: protocol error -- " |
|
1491 "error while blocked in poll"); |
|
1492 return DTN_ECOMM; |
|
1493 } |
|
1494 |
|
1495 if (type != DTN_CANCEL_POLL) { |
|
1496 log_err("handle_poll: error got unexpected message '%s' " |
|
1497 "while blocked in poll", dtnipc_msgtoa(type)); |
|
1498 return DTN_ECOMM; |
|
1499 } |
|
1500 |
|
1501 // read in the length which must be zero |
|
1502 u_int32_t len; |
|
1503 ret = read((char*)&len, 4); |
|
1504 if (ret != 4 || len != 0) { |
|
1505 log_err("handle_begin_poll: protocol error -- " |
|
1506 "error getting cancel poll length"); |
|
1507 return DTN_ECOMM; |
|
1508 } |
|
1509 |
|
1510 total_rcvd_ += 5; |
|
1511 |
|
1512 log_debug("got DTN_CANCEL_POLL while blocked in poll"); |
|
1513 // immediately send the response to the poll cancel, then |
|
1514 // we return from the handler which will follow it with the |
|
1515 // response code to the original poll request |
|
1516 send_response(DTN_SUCCESS); |
|
1517 } else if (recv_reg != NULL) { |
|
1518 log_debug("handle_begin_poll: bundle arrived"); |
|
1519 |
|
1520 } else if (notify_reg != NULL) { |
|
1521 log_debug("handle_begin_poll: subscriber notify arrived"); |
|
1522 |
|
1523 } else { |
|
1524 // wait_for_notify must have returned one of the above cases |
|
1525 NOTREACHED; |
|
1526 } |
|
1527 |
|
1528 return DTN_SUCCESS; |
|
1529 } |
|
1530 |
|
1531 //---------------------------------------------------------------------- |
|
1532 int |
|
1533 APIClient::handle_cancel_poll() |
|
1534 { |
|
1535 // the only reason we should get in here is if the call to |
|
1536 // dtn_begin_poll() returned but the app still called cancel_poll |
|
1537 // and so the messages crossed. but, since there's nothing wrong |
|
1538 // with this, we just return success in both cases |
|
1539 |
|
1540 return DTN_SUCCESS; |
|
1541 } |
|
1542 |
|
1543 //---------------------------------------------------------------------- |
|
1544 int |
|
1545 APIClient::handle_close() |
|
1546 { |
|
1547 log_info("received DTN_CLOSE message; closing API handle"); |
|
1548 // return -1 to force the session to close: |
|
1549 return -1; |
|
1550 } |
|
1551 |
|
1552 //---------------------------------------------------------------------- |
|
1553 int |
|
1554 APIClient::handle_session_update() |
|
1555 { |
|
1556 APIRegistration* reg = NULL; |
|
1557 bool sock_ready = false; |
|
1558 dtn_timeval_t timeout; |
|
1559 |
|
1560 // unpack the arguments |
|
1561 if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout))) |
|
1562 { |
|
1563 log_err("error in xdr unpacking arguments"); |
|
1564 return DTN_EXDR; |
|
1565 } |
|
1566 |
|
1567 int err = wait_for_notify("session_update", timeout, NULL, ®, |
|
1568 &sock_ready); |
|
1569 if (err != 0) { |
|
1570 return err; |
|
1571 } |
|
1572 |
|
1573 // if there's data on the socket, that either means the socket was |
|
1574 // closed by an exiting application or the app is violating the |
|
1575 // protocol... |
|
1576 if (sock_ready) { |
|
1577 return handle_unexpected_data("handle_session_update"); |
|
1578 } |
|
1579 |
|
1580 ASSERT(reg != NULL); |
|
1581 |
|
1582 BundleRef bref("APIClient::handle_session_update"); |
|
1583 bref = reg->session_notify_list()->pop_front(); |
|
1584 Bundle* b = bref.object(); |
|
1585 ASSERT(b != NULL); |
|
1586 |
|
1587 log_debug("handle_session_update: " |
|
1588 "popped *%p for registration %d (timeout %d)", |
|
1589 b, reg->regid(), timeout); |
|
1590 |
|
1591 |
|
1592 ASSERT(b->session_flags() != 0); |
|
1593 |
|
1594 unsigned int session_flags = 0; |
|
1595 if (b->session_flags() & Session::SUBSCRIBE) { |
|
1596 session_flags |= DTN_SESSION_SUBSCRIBE; |
|
1597 } |
|
1598 // XXX/demmer what to do about UNSUBSCRIBE/PUBLISH?? |
|
1599 |
|
1600 dtn_endpoint_id_t session_eid; |
|
1601 b->session_eid().copyto(&session_eid); |
|
1602 |
|
1603 if (!xdr_u_int(&xdr_encode_, &session_flags) || |
|
1604 !xdr_dtn_endpoint_id_t(&xdr_encode_, &session_eid)) |
|
1605 { |
|
1606 log_err("internal error in xdr"); |
|
1607 return DTN_EXDR; |
|
1608 } |
|
1609 |
|
1610 log_info("session_update: " |
|
1611 "notification for session %s status %s", |
|
1612 b->session_eid().c_str(), Session::flag_str(b->session_flags())); |
|
1613 |
|
1614 BundleDaemon::post(new BundleDeliveredEvent(b, reg)); |
|
1615 |
|
1616 return DTN_SUCCESS; |
|
1617 } |
|
1618 |
|
1619 //---------------------------------------------------------------------- |
|
1620 int |
|
1621 APIClient::wait_for_notify(const char* operation, |
|
1622 dtn_timeval_t dtn_timeout, |
|
1623 APIRegistration** recv_ready_reg, |
|
1624 APIRegistration** session_ready_reg, |
|
1625 bool* sock_ready) |
|
1626 { |
|
1627 APIRegistration* reg; |
|
1628 |
|
1629 ASSERT(sock_ready != NULL); |
|
1630 if (recv_ready_reg) *recv_ready_reg = NULL; |
|
1631 if (session_ready_reg) *session_ready_reg = NULL; |
|
1632 |
|
1633 if (bindings_->empty()) { |
|
1634 log_err("wait_for_notify(%s): no bound registrations", operation); |
|
1635 return DTN_EINVAL; |
|
1636 } |
|
1637 |
|
1638 int timeout = (int)dtn_timeout; |
|
1639 if (timeout < -1) { |
|
1640 log_err("wait_for_notify(%s): " |
|
1641 "invalid timeout value %d", operation, timeout); |
|
1642 return DTN_EINVAL; |
|
1643 } |
|
1644 |
|
1645 // try to optimize by using a statically sized pollfds array, |
|
1646 // otherwise we need to malloc the array. |
|
1647 // |
|
1648 // XXX/demmer this would be cleaner by tweaking the |
|
1649 // StaticScratchBuffer class to be handle arrays of arbitrary |
|
1650 // sized structs |
|
1651 struct pollfd static_pollfds[64]; |
|
1652 struct pollfd* pollfds; |
|
1653 oasys::ScopeMalloc pollfd_malloc; |
|
1654 size_t npollfds = 1; |
|
1655 if (recv_ready_reg) npollfds += bindings_->size(); |
|
1656 if (session_ready_reg) npollfds += sessions_->size(); |
|
1657 |
|
1658 if (npollfds <= 64) { |
|
1659 pollfds = &static_pollfds[0]; |
|
1660 } else { |
|
1661 pollfds = (struct pollfd*)malloc(npollfds * sizeof(struct pollfd)); |
|
1662 pollfd_malloc = pollfds; |
|
1663 } |
|
1664 |
|
1665 struct pollfd* sock_poll = &pollfds[0]; |
|
1666 sock_poll->fd = TCPClient::fd_; |
|
1667 sock_poll->events = POLLIN | POLLERR; |
|
1668 sock_poll->revents = 0; |
|
1669 |
|
1670 // loop through all the registrations -- if one has bundles on its |
|
1671 // list, we don't need to poll, just return it immediately. |
|
1672 // otherwise we'll need to poll it |
|
1673 APIRegistrationList::iterator iter; |
|
1674 unsigned int i = 1; |
|
1675 if (recv_ready_reg) { |
|
1676 log_debug("wait_for_notify(%s): checking %zu bindings", |
|
1677 operation, bindings_->size()); |
|
1678 |
|
1679 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) { |
|
1680 reg = *iter; |
|
1681 |
|
1682 if (! reg->bundle_list()->empty()) { |
|
1683 log_debug("wait_for_notify(%s): " |
|
1684 "immediately returning bundle for reg %d", |
|
1685 operation, reg->regid()); |
|
1686 *recv_ready_reg = reg; |
|
1687 return 0; |
|
1688 } |
|
1689 |
|
1690 pollfds[i].fd = reg->bundle_list()->notifier()->read_fd(); |
|
1691 pollfds[i].events = POLLIN; |
|
1692 pollfds[i].revents = 0; |
|
1693 ++i; |
|
1694 ASSERT(i <= npollfds); |
|
1695 } |
|
1696 } |
|
1697 |
|
1698 // ditto for sessions |
|
1699 if (session_ready_reg) { |
|
1700 log_debug("wait_for_notify(%s): checking %zu sessions", |
|
1701 operation, sessions_->size()); |
|
1702 |
|
1703 for (iter = sessions_->begin(); iter != sessions_->end(); ++iter) |
|
1704 { |
|
1705 reg = *iter; |
|
1706 ASSERT(reg->session_notify_list() != NULL); |
|
1707 if (! reg->session_notify_list()->empty()) { |
|
1708 log_debug("wait_for_notify(%s): " |
|
1709 "immediately returning notified reg %d", |
|
1710 operation, reg->regid()); |
|
1711 *session_ready_reg = reg; |
|
1712 return 0; |
|
1713 } |
|
1714 |
|
1715 pollfds[i].fd = reg->session_notify_list()->notifier()->read_fd(); |
|
1716 pollfds[i].events = POLLIN; |
|
1717 pollfds[i].revents = 0; |
|
1718 ++i; |
|
1719 ASSERT(i <= npollfds); |
|
1720 } |
|
1721 } |
|
1722 |
|
1723 if (timeout == 0) { |
|
1724 log_debug("wait_for_notify(%s): " |
|
1725 "no ready registrations and timeout=%d, returning immediately", |
|
1726 operation, timeout); |
|
1727 return DTN_ETIMEOUT; |
|
1728 } |
|
1729 |
|
1730 log_debug("wait_for_notify(%s): " |
|
1731 "blocking to get events from %zu sources (timeout %d)", |
|
1732 operation, npollfds, timeout); |
|
1733 int nready = oasys::IO::poll_multiple(&pollfds[0], npollfds, timeout, |
|
1734 NULL, logpath_); |
|
1735 |
|
1736 if (nready == oasys::IOTIMEOUT) { |
|
1737 log_debug("wait_for_notify(%s): timeout waiting for events", |
|
1738 operation); |
|
1739 return DTN_ETIMEOUT; |
|
1740 |
|
1741 } else if (nready <= 0) { |
|
1742 log_err("wait_for_notify(%s): unexpected error polling for events", |
|
1743 operation); |
|
1744 return DTN_EINTERNAL; |
|
1745 } |
|
1746 |
|
1747 // if there's data on the socket, immediately exit without |
|
1748 // checking the registrations |
|
1749 if (sock_poll->revents != 0) { |
|
1750 *sock_ready = true; |
|
1751 return 0; |
|
1752 } |
|
1753 |
|
1754 // otherwise, there should be data on one (or more) bundle lists, so |
|
1755 // scan the list to find the first one. |
|
1756 if (recv_ready_reg) { |
|
1757 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) { |
|
1758 reg = *iter; |
|
1759 if (! reg->bundle_list()->empty()) { |
|
1760 *recv_ready_reg = reg; |
|
1761 break; |
|
1762 } |
|
1763 } |
|
1764 } |
|
1765 |
|
1766 if (session_ready_reg) { |
|
1767 for (iter = sessions_->begin(); iter != sessions_->end(); ++iter) |
|
1768 { |
|
1769 reg = *iter; |
|
1770 if (! reg->session_notify_list()->empty()) { |
|
1771 *session_ready_reg = reg; |
|
1772 break; |
|
1773 } |
|
1774 } |
|
1775 } |
|
1776 |
|
1777 if ((recv_ready_reg && *recv_ready_reg == NULL) && |
|
1778 (session_ready_reg && *session_ready_reg == NULL)) |
|
1779 { |
|
1780 log_err("wait_for_notify(%s): error -- no lists have any events", |
|
1781 operation); |
|
1782 return DTN_EINTERNAL; |
|
1783 } |
|
1784 |
|
1785 return 0; |
|
1786 } |
|
1787 |
|
1788 //---------------------------------------------------------------------- |
|
1789 int |
|
1790 APIClient::handle_unexpected_data(const char* operation) |
|
1791 { |
|
1792 log_debug("%s: api socket ready -- trying to read one byte", |
|
1793 operation); |
|
1794 char b; |
|
1795 if (read(&b, 1) != 0) { |
|
1796 log_err("%s: protocol error -- " |
|
1797 "data arrived or error while blocked in recv", |
|
1798 operation); |
|
1799 return DTN_ECOMM; |
|
1800 } |
|
1801 |
|
1802 log_info("IPC socket closed while blocked in read... " |
|
1803 "application must have exited"); |
|
1804 return -1; |
|
1805 } |
|
1806 |
|
1807 } // namespace dtn |