applib/python/test-swig-api.py
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 #!/usr/bin/env python
       
     2 #
       
     3 #    Copyright 2007 Intel Corporation
       
     4 #
       
     5 #    Licensed under the Apache License, Version 2.0 (the "License");
       
     6 #    you may not use this file except in compliance with the License.
       
     7 #    You may obtain a copy of the License at
       
     8 #
       
     9 #        http://www.apache.org/licenses/LICENSE-2.0
       
    10 #
       
    11 #    Unless required by applicable law or agreed to in writing, software
       
    12 #    distributed under the License is distributed on an "AS IS" BASIS,
       
    13 #    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14 #    See the License for the specific language governing permissions and
       
    15 #    limitations under the License.
       
    16 #
       
    17 
       
    18 #
       
    19 # SWIG exported dtn api example in perl
       
    20 #
       
    21 
       
    22 import sys
       
    23 from dtnapi import *;
       
    24 from select import select
       
    25 
       
    26 h = dtn_open();
       
    27 if (h == -1):
       
    28     print "error in dtn_open";
       
    29     sys.exit(1);
       
    30 
       
    31 print "handle is", h;
       
    32 
       
    33 print "creating a registration for a random eid string"
       
    34 regid = dtn_register(h, "dtn://foo/bar/baz", DTN_REG_DROP, 0, 0, "");
       
    35 print "created new registration -- id %d" % regid;
       
    36 
       
    37 print "unbinding registration "
       
    38 dtn_unbind(h, regid)
       
    39 
       
    40 src = dtn_build_local_eid(h, "src");
       
    41 dst = dtn_build_local_eid(h, "dst");
       
    42 
       
    43 print "src is", src, "dst is", dst;
       
    44 
       
    45 regid = dtn_find_registration(h, dst);
       
    46 if (regid != -1):
       
    47     print "found existing registration -- id ", regid, " calling bind...";
       
    48     dtn_bind(h, regid);
       
    49 else:
       
    50     regid = dtn_register(h, dst, DTN_REG_DROP, 10, 0, "");
       
    51     print "created new registration -- id regid";
       
    52 
       
    53 print "sending a bundle in memory...";
       
    54 id = dtn_send(h, regid, src, dst, "dtn:none", COS_NORMAL,
       
    55 	      0, 30, DTN_PAYLOAD_MEM, "test payload");
       
    56 
       
    57 print "bundle id: ", id;
       
    58 
       
    59 print "  source:", id.source;
       
    60 print "  creation_secs:",  id.creation_secs;
       
    61 print "  creation_seqno:", id.creation_seqno;
       
    62 
       
    63 del id;
       
    64 
       
    65 print "receiving a bundle in memory...";
       
    66 bundle = dtn_recv(h, DTN_PAYLOAD_MEM, 10000);
       
    67 
       
    68 print "bundle:";
       
    69 print "  source:", bundle.source;
       
    70 print "  dest:", bundle.dest;
       
    71 print "  replyto:", bundle.replyto;
       
    72 print "  priority:", bundle.priority;
       
    73 print "  dopts:", bundle.dopts;
       
    74 print "  expiration:", bundle.expiration;
       
    75 print "  creation_secs:", bundle.creation_secs;
       
    76 print "  creation_seqno:", bundle.creation_seqno;
       
    77 print "  payload:", bundle.payload;
       
    78 
       
    79 del bundle;
       
    80 
       
    81 print "dtn_recv timeout:";
       
    82 bundle = dtn_recv(h, DTN_PAYLOAD_MEM, 0);
       
    83 if (bundle or (dtn_errno(h) != DTN_ETIMEOUT)):
       
    84     print "  bundle is bundle, errno is", dtn_errno(h);
       
    85 else:
       
    86     print "  ", dtn_strerror(dtn_errno(h));
       
    87 
       
    88 print "testing expiration status report"
       
    89 id = dtn_send(h, regid, src, "dtn://somewhere_over_the_rainbow",
       
    90               dst, COS_NORMAL,
       
    91 	      DOPTS_DELETE_RCPT, 2, DTN_PAYLOAD_MEM, "test payload");
       
    92 
       
    93 bundle = dtn_recv(h, DTN_PAYLOAD_MEM, 10000);
       
    94 if not bundle:
       
    95     print "error: expected to receive deletion status report"
       
    96     sys.exit(1)
       
    97     
       
    98 print dtn_status_report_reason_to_str(bundle.status_report.reason)
       
    99 
       
   100 print "testing poll"
       
   101 fd = dtn_poll_fd(h)
       
   102 dtn_begin_poll(h, 5000)
       
   103 rd, wr, err = select([fd], [], [], 2)
       
   104 print "should be a poll timeout", rd, wr, err
       
   105 
       
   106 rd, wr, err = select([fd], [], [], 10)
       
   107 print "should be read ready", rd, wr, err
       
   108 
       
   109 print "should timeout twice:"
       
   110 bundle = dtn_recv(h, DTN_PAYLOAD_MEM, 0);
       
   111 if (bundle or (dtn_errno(h) != DTN_ETIMEOUT)):
       
   112     print "  bundle is bundle, errno is", dtn_errno(h);
       
   113 else:
       
   114     print "  ", dtn_strerror(dtn_errno(h));
       
   115 
       
   116 bundle = dtn_recv(h, DTN_PAYLOAD_MEM, 0);
       
   117 if (bundle or (dtn_errno(h) != DTN_ETIMEOUT)):
       
   118     print "  bundle is bundle, errno is", dtn_errno(h);
       
   119 else:
       
   120     print "  ", dtn_strerror(dtn_errno(h));
       
   121 
       
   122 dtn_cancel_poll(h)
       
   123 
       
   124 dtn_close(h);
       
   125