Package proton
[frames] | no frames]

Source Code for Package proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  27   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  28                    data. 
  29   
  30  """ 
  31  from __future__ import absolute_import 
  32   
  33  from cproton import * 
  34  from .wrapper import Wrapper 
  35  from proton import _compat 
  36   
  37  import logging, weakref, socket, sys, threading 
  38   
  39  try: 
  40    handler = logging.NullHandler() 
  41  except AttributeError: 
42 - class NullHandler(logging.Handler):
43 - def handle(self, record):
44 pass
45
46 - def emit(self, record):
47 pass
48
49 - def createLock(self):
50 self.lock = None
51 52 handler = NullHandler() 53 54 log = logging.getLogger("proton") 55 log.addHandler(handler) 56 57 try: 58 import uuid
59 60 - def generate_uuid():
61 return uuid.uuid4()
62 63 except ImportError: 64 """ 65 No 'native' UUID support. Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 66 """ 67 import struct
68 - class uuid:
69 - class UUID:
70 - def __init__(self, hex=None, bytes=None):
71 if [hex, bytes].count(None) != 1: 72 raise TypeError("need one of hex or bytes") 73 if bytes is not None: 74 self.bytes = bytes 75 elif hex is not None: 76 fields=hex.split("-") 77 fields[4:5] = [fields[4][:4], fields[4][4:]] 78 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
79
80 - def __cmp__(self, other):
81 if isinstance(other, uuid.UUID): 82 return cmp(self.bytes, other.bytes) 83 else: 84 return -1
85
86 - def __str__(self):
87 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
88
89 - def __repr__(self):
90 return "UUID(%r)" % str(self)
91
92 - def __hash__(self):
93 return self.bytes.__hash__()
94 95 import os, random, time 96 rand = random.Random() 97 rand.seed((os.getpid(), time.time(), socket.gethostname()))
98 - def random_uuid():
99 data = [rand.randint(0, 255) for i in xrange(16)] 100 101 # From RFC4122, the version bits are set to 0100 102 data[6] &= 0x0F 103 data[6] |= 0x40 104 105 # From RFC4122, the top two bits of byte 8 get set to 01 106 data[8] &= 0x3F 107 data[8] |= 0x80 108 return "".join(map(chr, data))
109
110 - def uuid4():
111 return uuid.UUID(bytes=random_uuid())
112
113 - def generate_uuid():
114 return uuid4()
115 116 # 117 # Hacks to provide Python2 <---> Python3 compatibility 118 # 119 try: 120 bytes() 121 except NameError: 122 bytes = str 123 try: 124 long() 125 except NameError: 126 long = int 127 try: 128 unicode() 129 except NameError: 130 unicode = str 131 132 133 VERSION_MAJOR = PN_VERSION_MAJOR 134 VERSION_MINOR = PN_VERSION_MINOR 135 VERSION_POINT = PN_VERSION_POINT 136 VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT) 137 API_LANGUAGE = "C" 138 IMPLEMENTATION_LANGUAGE = "C"
139 140 -class Constant(object):
141
142 - def __init__(self, name):
143 self.name = name
144
145 - def __repr__(self):
146 return self.name
147
148 -class ProtonException(Exception):
149 """ 150 The root of the proton exception hierarchy. All proton exception 151 classes derive from this exception. 152 """ 153 pass
154
155 -class Timeout(ProtonException):
156 """ 157 A timeout exception indicates that a blocking operation has timed 158 out. 159 """ 160 pass
161
162 -class Interrupt(ProtonException):
163 """ 164 An interrupt exception indicates that a blocking operation was interrupted. 165 """ 166 pass
167
168 -class MessageException(ProtonException):
169 """ 170 The MessageException class is the root of the message exception 171 hierarchy. All exceptions generated by the Message class derive from 172 this exception. 173 """ 174 pass
175 176 EXCEPTIONS = { 177 PN_TIMEOUT: Timeout, 178 PN_INTR: Interrupt 179 } 180 181 PENDING = Constant("PENDING") 182 ACCEPTED = Constant("ACCEPTED") 183 REJECTED = Constant("REJECTED") 184 RELEASED = Constant("RELEASED") 185 MODIFIED = Constant("MODIFIED") 186 ABORTED = Constant("ABORTED") 187 SETTLED = Constant("SETTLED") 188 189 STATUSES = { 190 PN_STATUS_ABORTED: ABORTED, 191 PN_STATUS_ACCEPTED: ACCEPTED, 192 PN_STATUS_REJECTED: REJECTED, 193 PN_STATUS_RELEASED: RELEASED, 194 PN_STATUS_MODIFIED: MODIFIED, 195 PN_STATUS_PENDING: PENDING, 196 PN_STATUS_SETTLED: SETTLED, 197 PN_STATUS_UNKNOWN: None 198 }
199 200 -class Message(object):
201 """The L{Message} class is a mutable holder of message content. 202 203 @ivar instructions: delivery instructions for the message 204 @type instructions: dict 205 @ivar annotations: infrastructure defined message annotations 206 @type annotations: dict 207 @ivar properties: application defined message properties 208 @type properties: dict 209 @ivar body: message body 210 @type body: bytes | unicode | dict | list | int | long | float | UUID 211 """ 212 213 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 214
215 - def __init__(self, body=None, **kwargs):
216 """ 217 @param kwargs: Message property name/value pairs to initialise the Message 218 """ 219 self._msg = pn_message() 220 self._id = Data(pn_message_id(self._msg)) 221 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 222 self.instructions = None 223 self.annotations = None 224 self.properties = None 225 self.body = body 226 for k,v in _compat.iteritems(kwargs): 227 getattr(self, k) # Raise exception if it's not a valid attribute. 228 setattr(self, k, v)
229
230 - def __del__(self):
231 if hasattr(self, "_msg"): 232 pn_message_free(self._msg) 233 del self._msg
234
235 - def _check(self, err):
236 if err < 0: 237 exc = EXCEPTIONS.get(err, MessageException) 238 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 239 else: 240 return err
241
242 - def _check_property_keys(self):
243 for k in self.properties.keys(): 244 if not isinstance(k, (bytes, str, unicode)): 245 raise MessageException('Application property key is not unicode string: key=%s %s' % (str(k), type(k))) 246 if isinstance(k, bytes): 247 self.properties[_compat.bin2str(k)] = self.properties.pop(k)
248
249 - def _pre_encode(self):
250 inst = Data(pn_message_instructions(self._msg)) 251 ann = Data(pn_message_annotations(self._msg)) 252 props = Data(pn_message_properties(self._msg)) 253 body = Data(pn_message_body(self._msg)) 254 255 inst.clear() 256 if self.instructions is not None: 257 inst.put_object(self.instructions) 258 ann.clear() 259 if self.annotations is not None: 260 ann.put_object(self.annotations) 261 props.clear() 262 if self.properties is not None: 263 self._check_property_keys() 264 props.put_object(self.properties) 265 body.clear() 266 if self.body is not None: 267 body.put_object(self.body)
268
269 - def _post_decode(self):
270 inst = Data(pn_message_instructions(self._msg)) 271 ann = Data(pn_message_annotations(self._msg)) 272 props = Data(pn_message_properties(self._msg)) 273 body = Data(pn_message_body(self._msg)) 274 275 if inst.next(): 276 self.instructions = inst.get_object() 277 else: 278 self.instructions = None 279 if ann.next(): 280 self.annotations = ann.get_object() 281 else: 282 self.annotations = None 283 if props.next(): 284 self.properties = props.get_object() 285 else: 286 self.properties = None 287 if body.next(): 288 self.body = body.get_object() 289 else: 290 self.body = None
291
292 - def clear(self):
293 """ 294 Clears the contents of the L{Message}. All fields will be reset to 295 their default values. 296 """ 297 pn_message_clear(self._msg) 298 self.instructions = None 299 self.annotations = None 300 self.properties = None 301 self.body = None
302
303 - def _is_inferred(self):
304 return pn_message_is_inferred(self._msg)
305
306 - def _set_inferred(self, value):
307 self._check(pn_message_set_inferred(self._msg, bool(value)))
308 309 inferred = property(_is_inferred, _set_inferred, doc=""" 310 The inferred flag for a message indicates how the message content 311 is encoded into AMQP sections. If inferred is true then binary and 312 list values in the body of the message will be encoded as AMQP DATA 313 and AMQP SEQUENCE sections, respectively. If inferred is false, 314 then all values in the body of the message will be encoded as AMQP 315 VALUE sections regardless of their type. 316 """) 317
318 - def _is_durable(self):
319 return pn_message_is_durable(self._msg)
320
321 - def _set_durable(self, value):
322 self._check(pn_message_set_durable(self._msg, bool(value)))
323 324 durable = property(_is_durable, _set_durable, 325 doc=""" 326 The durable property indicates that the message should be held durably 327 by any intermediaries taking responsibility for the message. 328 """) 329
330 - def _get_priority(self):
331 return pn_message_get_priority(self._msg)
332
333 - def _set_priority(self, value):
334 self._check(pn_message_set_priority(self._msg, value))
335 336 priority = property(_get_priority, _set_priority, 337 doc=""" 338 The priority of the message. 339 """) 340
341 - def _get_ttl(self):
342 return millis2secs(pn_message_get_ttl(self._msg))
343
344 - def _set_ttl(self, value):
345 self._check(pn_message_set_ttl(self._msg, secs2millis(value)))
346 347 ttl = property(_get_ttl, _set_ttl, 348 doc=""" 349 The time to live of the message measured in seconds. Expired messages 350 may be dropped. 351 """) 352
353 - def _is_first_acquirer(self):
354 return pn_message_is_first_acquirer(self._msg)
355
356 - def _set_first_acquirer(self, value):
357 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
358 359 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 360 doc=""" 361 True iff the recipient is the first to acquire the message. 362 """) 363
364 - def _get_delivery_count(self):
365 return pn_message_get_delivery_count(self._msg)
366
367 - def _set_delivery_count(self, value):
368 self._check(pn_message_set_delivery_count(self._msg, value))
369 370 delivery_count = property(_get_delivery_count, _set_delivery_count, 371 doc=""" 372 The number of delivery attempts made for this message. 373 """) 374 375
376 - def _get_id(self):
377 return self._id.get_object()
378 - def _set_id(self, value):
379 if type(value) in _compat.INT_TYPES: 380 value = ulong(value) 381 self._id.rewind() 382 self._id.put_object(value)
383 id = property(_get_id, _set_id, 384 doc=""" 385 The id of the message. 386 """) 387
388 - def _get_user_id(self):
389 return pn_message_get_user_id(self._msg)
390
391 - def _set_user_id(self, value):
392 self._check(pn_message_set_user_id(self._msg, value))
393 394 user_id = property(_get_user_id, _set_user_id, 395 doc=""" 396 The user id of the message creator. 397 """) 398
399 - def _get_address(self):
400 return utf82unicode(pn_message_get_address(self._msg))
401
402 - def _set_address(self, value):
403 self._check(pn_message_set_address(self._msg, unicode2utf8(value)))
404 405 address = property(_get_address, _set_address, 406 doc=""" 407 The address of the message. 408 """) 409
410 - def _get_subject(self):
411 return utf82unicode(pn_message_get_subject(self._msg))
412
413 - def _set_subject(self, value):
414 self._check(pn_message_set_subject(self._msg, unicode2utf8(value)))
415 416 subject = property(_get_subject, _set_subject, 417 doc=""" 418 The subject of the message. 419 """) 420
421 - def _get_reply_to(self):
422 return utf82unicode(pn_message_get_reply_to(self._msg))
423
424 - def _set_reply_to(self, value):
425 self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value)))
426 427 reply_to = property(_get_reply_to, _set_reply_to, 428 doc=""" 429 The reply-to address for the message. 430 """) 431
432 - def _get_correlation_id(self):
433 return self._correlation_id.get_object()
434 - def _set_correlation_id(self, value):
435 if type(value) in _compat.INT_TYPES: 436 value = ulong(value) 437 self._correlation_id.rewind() 438 self._correlation_id.put_object(value)
439 440 correlation_id = property(_get_correlation_id, _set_correlation_id, 441 doc=""" 442 The correlation-id for the message. 443 """) 444
445 - def _get_content_type(self):
446 return symbol(utf82unicode(pn_message_get_content_type(self._msg)))
447
448 - def _set_content_type(self, value):
449 self._check(pn_message_set_content_type(self._msg, unicode2utf8(value)))
450 451 content_type = property(_get_content_type, _set_content_type, 452 doc=""" 453 The content-type of the message. 454 """) 455
456 - def _get_content_encoding(self):
457 return symbol(utf82unicode(pn_message_get_content_encoding(self._msg)))
458
459 - def _set_content_encoding(self, value):
460 self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value)))
461 462 content_encoding = property(_get_content_encoding, _set_content_encoding, 463 doc=""" 464 The content-encoding of the message. 465 """) 466
467 - def _get_expiry_time(self):
468 return millis2secs(pn_message_get_expiry_time(self._msg))
469
470 - def _set_expiry_time(self, value):
471 self._check(pn_message_set_expiry_time(self._msg, secs2millis(value)))
472 473 expiry_time = property(_get_expiry_time, _set_expiry_time, 474 doc=""" 475 The expiry time of the message. 476 """) 477
478 - def _get_creation_time(self):
479 return millis2secs(pn_message_get_creation_time(self._msg))
480
481 - def _set_creation_time(self, value):
482 self._check(pn_message_set_creation_time(self._msg, secs2millis(value)))
483 484 creation_time = property(_get_creation_time, _set_creation_time, 485 doc=""" 486 The creation time of the message. 487 """) 488
489 - def _get_group_id(self):
490 return utf82unicode(pn_message_get_group_id(self._msg))
491
492 - def _set_group_id(self, value):
493 self._check(pn_message_set_group_id(self._msg, unicode2utf8(value)))
494 495 group_id = property(_get_group_id, _set_group_id, 496 doc=""" 497 The group id of the message. 498 """) 499
500 - def _get_group_sequence(self):
501 return pn_message_get_group_sequence(self._msg)
502
503 - def _set_group_sequence(self, value):
504 self._check(pn_message_set_group_sequence(self._msg, value))
505 506 group_sequence = property(_get_group_sequence, _set_group_sequence, 507 doc=""" 508 The sequence of the message within its group. 509 """) 510
511 - def _get_reply_to_group_id(self):
512 return utf82unicode(pn_message_get_reply_to_group_id(self._msg))
513
514 - def _set_reply_to_group_id(self, value):
515 self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value)))
516 517 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 518 doc=""" 519 The group-id for any replies. 520 """) 521
522 - def encode(self):
523 self._pre_encode() 524 sz = 16 525 while True: 526 err, data = pn_message_encode(self._msg, sz) 527 if err == PN_OVERFLOW: 528 sz *= 2 529 continue 530 else: 531 self._check(err) 532 return data
533
534 - def decode(self, data):
535 self._check(pn_message_decode(self._msg, data)) 536 self._post_decode()
537
538 - def send(self, sender, tag=None):
539 dlv = sender.delivery(tag or sender.delivery_tag()) 540 encoded = self.encode() 541 sender.stream(encoded) 542 sender.advance() 543 if sender.snd_settle_mode == Link.SND_SETTLED: 544 dlv.settle() 545 return dlv
546
547 - def recv(self, link):
548 """ 549 Receives and decodes the message content for the current delivery 550 from the link. Upon success it will return the current delivery 551 for the link. If there is no current delivery, or if the current 552 delivery is incomplete, or if the link is not a receiver, it will 553 return None. 554 555 @type link: Link 556 @param link: the link to receive a message from 557 @return the delivery associated with the decoded message (or None) 558 559 """ 560 if link.is_sender: return None 561 dlv = link.current 562 if not dlv or dlv.partial: return None 563 dlv.encoded = link.recv(dlv.pending) 564 link.advance() 565 # the sender has already forgotten about the delivery, so we might 566 # as well too 567 if link.remote_snd_settle_mode == Link.SND_SETTLED: 568 dlv.settle() 569 self.decode(dlv.encoded) 570 return dlv
571
572 - def __repr2__(self):
573 props = [] 574 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 575 "priority", "first_acquirer", "delivery_count", "id", 576 "correlation_id", "user_id", "group_id", "group_sequence", 577 "reply_to_group_id", "instructions", "annotations", 578 "properties", "body"): 579 value = getattr(self, attr) 580 if value: props.append("%s=%r" % (attr, value)) 581 return "Message(%s)" % ", ".join(props)
582
583 - def __repr__(self):
584 tmp = pn_string(None) 585 err = pn_inspect(self._msg, tmp) 586 result = pn_string_get(tmp) 587 pn_free(tmp) 588 self._check(err) 589 return result
590 591 _DEFAULT = object()
592 593 -class Selectable(Wrapper):
594 595 @staticmethod
596 - def wrap(impl):
597 if impl is None: 598 return None 599 else: 600 return Selectable(impl)
601
602 - def __init__(self, impl):
603 Wrapper.__init__(self, impl, pn_selectable_attachments)
604
605 - def _init(self):
606 pass
607
608 - def fileno(self, fd = _DEFAULT):
609 if fd is _DEFAULT: 610 return pn_selectable_get_fd(self._impl) 611 elif fd is None: 612 pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) 613 else: 614 pn_selectable_set_fd(self._impl, fd)
615
616 - def _is_reading(self):
617 return pn_selectable_is_reading(self._impl)
618
619 - def _set_reading(self, val):
620 pn_selectable_set_reading(self._impl, bool(val))
621 622 reading = property(_is_reading, _set_reading) 623
624 - def _is_writing(self):
625 return pn_selectable_is_writing(self._impl)
626
627 - def _set_writing(self, val):
628 pn_selectable_set_writing(self._impl, bool(val))
629 630 writing = property(_is_writing, _set_writing) 631
632 - def _get_deadline(self):
633 tstamp = pn_selectable_get_deadline(self._impl) 634 if tstamp: 635 return millis2secs(tstamp) 636 else: 637 return None
638
639 - def _set_deadline(self, deadline):
640 pn_selectable_set_deadline(self._impl, secs2millis(deadline))
641 642 deadline = property(_get_deadline, _set_deadline) 643
644 - def readable(self):
645 pn_selectable_readable(self._impl)
646
647 - def writable(self):
648 pn_selectable_writable(self._impl)
649
650 - def expired(self):
651 pn_selectable_expired(self._impl)
652
653 - def _is_registered(self):
654 return pn_selectable_is_registered(self._impl)
655
656 - def _set_registered(self, registered):
657 pn_selectable_set_registered(self._impl, registered)
658 659 registered = property(_is_registered, _set_registered, 660 doc=""" 661 The registered property may be get/set by an I/O polling system to 662 indicate whether the fd has been registered or not. 663 """) 664 665 @property
666 - def is_terminal(self):
667 return pn_selectable_is_terminal(self._impl)
668
669 - def terminate(self):
670 pn_selectable_terminate(self._impl)
671
672 - def release(self):
673 pn_selectable_release(self._impl)
674
675 -class DataException(ProtonException):
676 """ 677 The DataException class is the root of the Data exception hierarchy. 678 All exceptions raised by the Data class extend this exception. 679 """ 680 pass
681
682 -class UnmappedType:
683
684 - def __init__(self, msg):
685 self.msg = msg
686
687 - def __repr__(self):
688 return "UnmappedType(%s)" % self.msg
689
690 -class ulong(long):
691
692 - def __repr__(self):
693 return "ulong(%s)" % long.__repr__(self)
694
695 -class timestamp(long):
696
697 - def __repr__(self):
698 return "timestamp(%s)" % long.__repr__(self)
699
700 -class symbol(unicode):
701
702 - def __repr__(self):
703 return "symbol(%s)" % unicode.__repr__(self)
704
705 -class char(unicode):
706
707 - def __repr__(self):
708 return "char(%s)" % unicode.__repr__(self)
709
710 -class byte(int):
711
712 - def __repr__(self):
713 return "byte(%s)" % int.__repr__(self)
714
715 -class short(int):
716
717 - def __repr__(self):
718 return "short(%s)" % int.__repr__(self)
719
720 -class int32(int):
721
722 - def __repr__(self):
723 return "int32(%s)" % int.__repr__(self)
724
725 -class ubyte(int):
726
727 - def __repr__(self):
728 return "ubyte(%s)" % int.__repr__(self)
729
730 -class ushort(int):
731
732 - def __repr__(self):
733 return "ushort(%s)" % int.__repr__(self)
734
735 -class uint(long):
736
737 - def __repr__(self):
738 return "uint(%s)" % long.__repr__(self)
739
740 -class float32(float):
741
742 - def __repr__(self):
743 return "float32(%s)" % float.__repr__(self)
744
745 -class decimal32(int):
746
747 - def __repr__(self):
748 return "decimal32(%s)" % int.__repr__(self)
749
750 -class decimal64(long):
751
752 - def __repr__(self):
753 return "decimal64(%s)" % long.__repr__(self)
754
755 -class decimal128(bytes):
756
757 - def __repr__(self):
758 return "decimal128(%s)" % bytes.__repr__(self)
759
760 -class Described(object):
761
762 - def __init__(self, descriptor, value):
763 self.descriptor = descriptor 764 self.value = value
765
766 - def __repr__(self):
767 return "Described(%r, %r)" % (self.descriptor, self.value)
768
769 - def __eq__(self, o):
770 if isinstance(o, Described): 771 return self.descriptor == o.descriptor and self.value == o.value 772 else: 773 return False
774 775 UNDESCRIBED = Constant("UNDESCRIBED")
776 777 -class Array(object):
778
779 - def __init__(self, descriptor, type, *elements):
780 self.descriptor = descriptor 781 self.type = type 782 self.elements = elements
783
784 - def __iter__(self):
785 return iter(self.elements)
786
787 - def __repr__(self):
788 if self.elements: 789 els = ", %s" % (", ".join(map(repr, self.elements))) 790 else: 791 els = "" 792 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
793
794 - def __eq__(self, o):
795 if isinstance(o, Array): 796 return self.descriptor == o.descriptor and \ 797 self.type == o.type and self.elements == o.elements 798 else: 799 return False
800
801 -class Data:
802 """ 803 The L{Data} class provides an interface for decoding, extracting, 804 creating, and encoding arbitrary AMQP data. A L{Data} object 805 contains a tree of AMQP values. Leaf nodes in this tree correspond 806 to scalars in the AMQP type system such as L{ints<INT>} or 807 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 808 compound values in the AMQP type system such as L{lists<LIST>}, 809 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 810 The root node of the tree is the L{Data} object itself and can have 811 an arbitrary number of children. 812 813 A L{Data} object maintains the notion of the current sibling node 814 and a current parent node. Siblings are ordered within their parent. 815 Values are accessed and/or added by using the L{next}, L{prev}, 816 L{enter}, and L{exit} methods to navigate to the desired location in 817 the tree and using the supplied variety of put_*/get_* methods to 818 access or add a value of the desired type. 819 820 The put_* methods will always add a value I{after} the current node 821 in the tree. If the current node has a next sibling the put_* method 822 will overwrite the value on this node. If there is no current node 823 or the current node has no next sibling then one will be added. The 824 put_* methods always set the added/modified node to the current 825 node. The get_* methods read the value of the current node and do 826 not change which node is current. 827 828 The following types of scalar values are supported: 829 830 - L{NULL} 831 - L{BOOL} 832 - L{UBYTE} 833 - L{USHORT} 834 - L{SHORT} 835 - L{UINT} 836 - L{INT} 837 - L{ULONG} 838 - L{LONG} 839 - L{FLOAT} 840 - L{DOUBLE} 841 - L{BINARY} 842 - L{STRING} 843 - L{SYMBOL} 844 845 The following types of compound values are supported: 846 847 - L{DESCRIBED} 848 - L{ARRAY} 849 - L{LIST} 850 - L{MAP} 851 """ 852 853 NULL = PN_NULL; "A null value." 854 BOOL = PN_BOOL; "A boolean value." 855 UBYTE = PN_UBYTE; "An unsigned byte value." 856 BYTE = PN_BYTE; "A signed byte value." 857 USHORT = PN_USHORT; "An unsigned short value." 858 SHORT = PN_SHORT; "A short value." 859 UINT = PN_UINT; "An unsigned int value." 860 INT = PN_INT; "A signed int value." 861 CHAR = PN_CHAR; "A character value." 862 ULONG = PN_ULONG; "An unsigned long value." 863 LONG = PN_LONG; "A signed long value." 864 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 865 FLOAT = PN_FLOAT; "A float value." 866 DOUBLE = PN_DOUBLE; "A double value." 867 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 868 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 869 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 870 UUID = PN_UUID; "A UUID value." 871 BINARY = PN_BINARY; "A binary string." 872 STRING = PN_STRING; "A unicode string." 873 SYMBOL = PN_SYMBOL; "A symbolic string." 874 DESCRIBED = PN_DESCRIBED; "A described value." 875 ARRAY = PN_ARRAY; "An array value." 876 LIST = PN_LIST; "A list value." 877 MAP = PN_MAP; "A map value." 878 879 type_names = { 880 NULL: "null", 881 BOOL: "bool", 882 BYTE: "byte", 883 UBYTE: "ubyte", 884 SHORT: "short", 885 USHORT: "ushort", 886 INT: "int", 887 UINT: "uint", 888 CHAR: "char", 889 LONG: "long", 890 ULONG: "ulong", 891 TIMESTAMP: "timestamp", 892 FLOAT: "float", 893 DOUBLE: "double", 894 DECIMAL32: "decimal32", 895 DECIMAL64: "decimal64", 896 DECIMAL128: "decimal128", 897 UUID: "uuid", 898 BINARY: "binary", 899 STRING: "string", 900 SYMBOL: "symbol", 901 DESCRIBED: "described", 902 ARRAY: "array", 903 LIST: "list", 904 MAP: "map" 905 } 906 907 @classmethod
908 - def type_name(type): return Data.type_names[type]
909
910 - def __init__(self, capacity=16):
911 if type(capacity) in _compat.INT_TYPES: 912 self._data = pn_data(capacity) 913 self._free = True 914 else: 915 self._data = capacity 916 self._free = False
917
918 - def __del__(self):
919 if self._free and hasattr(self, "_data"): 920 pn_data_free(self._data) 921 del self._data
922
923 - def _check(self, err):
924 if err < 0: 925 exc = EXCEPTIONS.get(err, DataException) 926 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 927 else: 928 return err
929
930 - def clear(self):
931 """ 932 Clears the data object. 933 """ 934 pn_data_clear(self._data)
935
936 - def rewind(self):
937 """ 938 Clears current node and sets the parent to the root node. Clearing the 939 current node sets it _before_ the first node, calling next() will advance to 940 the first node. 941 """ 942 assert self._data is not None 943 pn_data_rewind(self._data)
944
945 - def next(self):
946 """ 947 Advances the current node to its next sibling and returns its 948 type. If there is no next sibling the current node remains 949 unchanged and None is returned. 950 """ 951 found = pn_data_next(self._data) 952 if found: 953 return self.type() 954 else: 955 return None
956
957 - def prev(self):
958 """ 959 Advances the current node to its previous sibling and returns its 960 type. If there is no previous sibling the current node remains 961 unchanged and None is returned. 962 """ 963 found = pn_data_prev(self._data) 964 if found: 965 return self.type() 966 else: 967 return None
968
969 - def enter(self):
970 """ 971 Sets the parent node to the current node and clears the current node. 972 Clearing the current node sets it _before_ the first child, 973 call next() advances to the first child. 974 """ 975 return pn_data_enter(self._data)
976
977 - def exit(self):
978 """ 979 Sets the current node to the parent node and the parent node to 980 its own parent. 981 """ 982 return pn_data_exit(self._data)
983
984 - def lookup(self, name):
985 return pn_data_lookup(self._data, name)
986
987 - def narrow(self):
988 pn_data_narrow(self._data)
989
990 - def widen(self):
991 pn_data_widen(self._data)
992
993 - def type(self):
994 """ 995 Returns the type of the current node. 996 """ 997 dtype = pn_data_type(self._data) 998 if dtype == -1: 999 return None 1000 else: 1001 return dtype
1002
1003 - def encoded_size(self):
1004 """ 1005 Returns the size in bytes needed to encode the data in AMQP format. 1006 """ 1007 return pn_data_encoded_size(self._data)
1008
1009 - def encode(self):
1010 """ 1011 Returns a representation of the data encoded in AMQP format. 1012 """ 1013 size = 1024 1014 while True: 1015 cd, enc = pn_data_encode(self._data, size) 1016 if cd == PN_OVERFLOW: 1017 size *= 2 1018 elif cd >= 0: 1019 return enc 1020 else: 1021 self._check(cd)
1022
1023 - def decode(self, encoded):
1024 """ 1025 Decodes the first value from supplied AMQP data and returns the 1026 number of bytes consumed. 1027 1028 @type encoded: binary 1029 @param encoded: AMQP encoded binary data 1030 """ 1031 return self._check(pn_data_decode(self._data, encoded))
1032
1033 - def put_list(self):
1034 """ 1035 Puts a list value. Elements may be filled by entering the list 1036 node and putting element values. 1037 1038 >>> data = Data() 1039 >>> data.put_list() 1040 >>> data.enter() 1041 >>> data.put_int(1) 1042 >>> data.put_int(2) 1043 >>> data.put_int(3) 1044 >>> data.exit() 1045 """ 1046 self._check(pn_data_put_list(self._data))
1047
1048 - def put_map(self):
1049 """ 1050 Puts a map value. Elements may be filled by entering the map node 1051 and putting alternating key value pairs. 1052 1053 >>> data = Data() 1054 >>> data.put_map() 1055 >>> data.enter() 1056 >>> data.put_string("key") 1057 >>> data.put_string("value") 1058 >>> data.exit() 1059 """ 1060 self._check(pn_data_put_map(self._data))
1061
1062 - def put_array(self, described, element_type):
1063 """ 1064 Puts an array value. Elements may be filled by entering the array 1065 node and putting the element values. The values must all be of the 1066 specified array element type. If an array is described then the 1067 first child value of the array is the descriptor and may be of any 1068 type. 1069 1070 >>> data = Data() 1071 >>> 1072 >>> data.put_array(False, Data.INT) 1073 >>> data.enter() 1074 >>> data.put_int(1) 1075 >>> data.put_int(2) 1076 >>> data.put_int(3) 1077 >>> data.exit() 1078 >>> 1079 >>> data.put_array(True, Data.DOUBLE) 1080 >>> data.enter() 1081 >>> data.put_symbol("array-descriptor") 1082 >>> data.put_double(1.1) 1083 >>> data.put_double(1.2) 1084 >>> data.put_double(1.3) 1085 >>> data.exit() 1086 1087 @type described: bool 1088 @param described: specifies whether the array is described 1089 @type element_type: int 1090 @param element_type: the type of the array elements 1091 """ 1092 self._check(pn_data_put_array(self._data, described, element_type))
1093
1094 - def put_described(self):
1095 """ 1096 Puts a described value. A described node has two children, the 1097 descriptor and the value. These are specified by entering the node 1098 and putting the desired values. 1099 1100 >>> data = Data() 1101 >>> data.put_described() 1102 >>> data.enter() 1103 >>> data.put_symbol("value-descriptor") 1104 >>> data.put_string("the value") 1105 >>> data.exit() 1106 """ 1107 self._check(pn_data_put_described(self._data))
1108
1109 - def put_null(self):
1110 """ 1111 Puts a null value. 1112 """ 1113 self._check(pn_data_put_null(self._data))
1114
1115 - def put_bool(self, b):
1116 """ 1117 Puts a boolean value. 1118 1119 @param b: a boolean value 1120 """ 1121 self._check(pn_data_put_bool(self._data, b))
1122
1123 - def put_ubyte(self, ub):
1124 """ 1125 Puts an unsigned byte value. 1126 1127 @param ub: an integral value 1128 """ 1129 self._check(pn_data_put_ubyte(self._data, ub))
1130
1131 - def put_byte(self, b):
1132 """ 1133 Puts a signed byte value. 1134 1135 @param b: an integral value 1136 """ 1137 self._check(pn_data_put_byte(self._data, b))
1138
1139 - def put_ushort(self, us):
1140 """ 1141 Puts an unsigned short value. 1142 1143 @param us: an integral value. 1144 """ 1145 self._check(pn_data_put_ushort(self._data, us))
1146
1147 - def put_short(self, s):
1148 """ 1149 Puts a signed short value. 1150 1151 @param s: an integral value 1152 """ 1153 self._check(pn_data_put_short(self._data, s))
1154
1155 - def put_uint(self, ui):
1156 """ 1157 Puts an unsigned int value. 1158 1159 @param ui: an integral value 1160 """ 1161 self._check(pn_data_put_uint(self._data, ui))
1162
1163 - def put_int(self, i):
1164 """ 1165 Puts a signed int value. 1166 1167 @param i: an integral value 1168 """ 1169 self._check(pn_data_put_int(self._data, i))
1170
1171 - def put_char(self, c):
1172 """ 1173 Puts a char value. 1174 1175 @param c: a single character 1176 """ 1177 self._check(pn_data_put_char(self._data, ord(c)))
1178
1179 - def put_ulong(self, ul):
1180 """ 1181 Puts an unsigned long value. 1182 1183 @param ul: an integral value 1184 """ 1185 self._check(pn_data_put_ulong(self._data, ul))
1186
1187 - def put_long(self, l):
1188 """ 1189 Puts a signed long value. 1190 1191 @param l: an integral value 1192 """ 1193 self._check(pn_data_put_long(self._data, l))
1194
1195 - def put_timestamp(self, t):
1196 """ 1197 Puts a timestamp value. 1198 1199 @param t: an integral value 1200 """ 1201 self._check(pn_data_put_timestamp(self._data, t))
1202
1203 - def put_float(self, f):
1204 """ 1205 Puts a float value. 1206 1207 @param f: a floating point value 1208 """ 1209 self._check(pn_data_put_float(self._data, f))
1210
1211 - def put_double(self, d):
1212 """ 1213 Puts a double value. 1214 1215 @param d: a floating point value. 1216 """ 1217 self._check(pn_data_put_double(self._data, d))
1218
1219 - def put_decimal32(self, d):
1220 """ 1221 Puts a decimal32 value. 1222 1223 @param d: a decimal32 value 1224 """ 1225 self._check(pn_data_put_decimal32(self._data, d))
1226
1227 - def put_decimal64(self, d):
1228 """ 1229 Puts a decimal64 value. 1230 1231 @param d: a decimal64 value 1232 """ 1233 self._check(pn_data_put_decimal64(self._data, d))
1234
1235 - def put_decimal128(self, d):
1236 """ 1237 Puts a decimal128 value. 1238 1239 @param d: a decimal128 value 1240 """ 1241 self._check(pn_data_put_decimal128(self._data, d))
1242
1243 - def put_uuid(self, u):
1244 """ 1245 Puts a UUID value. 1246 1247 @param u: a uuid value 1248 """ 1249 self._check(pn_data_put_uuid(self._data, u.bytes))
1250
1251 - def put_binary(self, b):
1252 """ 1253 Puts a binary value. 1254 1255 @type b: binary 1256 @param b: a binary value 1257 """ 1258 self._check(pn_data_put_binary(self._data, b))
1259
1260 - def put_memoryview(self, mv):
1261 """Put a python memoryview object as an AMQP binary value""" 1262 self.put_binary(mv.tobytes())
1263
1264 - def put_buffer(self, buff):
1265 """Put a python buffer object as an AMQP binary value""" 1266 self.put_binary(bytes(buff))
1267
1268 - def put_string(self, s):
1269 """ 1270 Puts a unicode value. 1271 1272 @type s: unicode 1273 @param s: a unicode value 1274 """ 1275 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1276
1277 - def put_symbol(self, s):
1278 """ 1279 Puts a symbolic value. 1280 1281 @type s: string 1282 @param s: the symbol name 1283 """ 1284 self._check(pn_data_put_symbol(self._data, s.encode('ascii')))
1285
1286 - def get_list(self):
1287 """ 1288 If the current node is a list, return the number of elements, 1289 otherwise return zero. List elements can be accessed by entering 1290 the list. 1291 1292 >>> count = data.get_list() 1293 >>> data.enter() 1294 >>> for i in range(count): 1295 ... type = data.next() 1296 ... if type == Data.STRING: 1297 ... print data.get_string() 1298 ... elif type == ...: 1299 ... ... 1300 >>> data.exit() 1301 """ 1302 return pn_data_get_list(self._data)
1303
1304 - def get_map(self):
1305 """ 1306 If the current node is a map, return the number of child elements, 1307 otherwise return zero. Key value pairs can be accessed by entering 1308 the map. 1309 1310 >>> count = data.get_map() 1311 >>> data.enter() 1312 >>> for i in range(count/2): 1313 ... type = data.next() 1314 ... if type == Data.STRING: 1315 ... print data.get_string() 1316 ... elif type == ...: 1317 ... ... 1318 >>> data.exit() 1319 """ 1320 return pn_data_get_map(self._data)
1321
1322 - def get_array(self):
1323 """ 1324 If the current node is an array, return a tuple of the element 1325 count, a boolean indicating whether the array is described, and 1326 the type of each element, otherwise return (0, False, None). Array 1327 data can be accessed by entering the array. 1328 1329 >>> # read an array of strings with a symbolic descriptor 1330 >>> count, described, type = data.get_array() 1331 >>> data.enter() 1332 >>> data.next() 1333 >>> print "Descriptor:", data.get_symbol() 1334 >>> for i in range(count): 1335 ... data.next() 1336 ... print "Element:", data.get_string() 1337 >>> data.exit() 1338 """ 1339 count = pn_data_get_array(self._data) 1340 described = pn_data_is_array_described(self._data) 1341 type = pn_data_get_array_type(self._data) 1342 if type == -1: 1343 type = None 1344 return count, described, type
1345
1346 - def is_described(self):
1347 """ 1348 Checks if the current node is a described value. The descriptor 1349 and value may be accessed by entering the described value. 1350 1351 >>> # read a symbolically described string 1352 >>> assert data.is_described() # will error if the current node is not described 1353 >>> data.enter() 1354 >>> data.next() 1355 >>> print data.get_symbol() 1356 >>> data.next() 1357 >>> print data.get_string() 1358 >>> data.exit() 1359 """ 1360 return pn_data_is_described(self._data)
1361
1362 - def is_null(self):
1363 """ 1364 Checks if the current node is a null. 1365 """ 1366 return pn_data_is_null(self._data)
1367
1368 - def get_bool(self):
1369 """ 1370 If the current node is a boolean, returns its value, returns False 1371 otherwise. 1372 """ 1373 return pn_data_get_bool(self._data)
1374
1375 - def get_ubyte(self):
1376 """ 1377 If the current node is an unsigned byte, returns its value, 1378 returns 0 otherwise. 1379 """ 1380 return ubyte(pn_data_get_ubyte(self._data))
1381
1382 - def get_byte(self):
1383 """ 1384 If the current node is a signed byte, returns its value, returns 0 1385 otherwise. 1386 """ 1387 return byte(pn_data_get_byte(self._data))
1388
1389 - def get_ushort(self):
1390 """ 1391 If the current node is an unsigned short, returns its value, 1392 returns 0 otherwise. 1393 """ 1394 return ushort(pn_data_get_ushort(self._data))
1395
1396 - def get_short(self):
1397 """ 1398 If the current node is a signed short, returns its value, returns 1399 0 otherwise. 1400 """ 1401 return short(pn_data_get_short(self._data))
1402
1403 - def get_uint(self):
1404 """ 1405 If the current node is an unsigned int, returns its value, returns 1406 0 otherwise. 1407 """ 1408 return uint(pn_data_get_uint(self._data))
1409
1410 - def get_int(self):
1411 """ 1412 If the current node is a signed int, returns its value, returns 0 1413 otherwise. 1414 """ 1415 return int32(pn_data_get_int(self._data))
1416
1417 - def get_char(self):
1418 """ 1419 If the current node is a char, returns its value, returns 0 1420 otherwise. 1421 """ 1422 return char(_compat.unichar(pn_data_get_char(self._data)))
1423
1424 - def get_ulong(self):
1425 """ 1426 If the current node is an unsigned long, returns its value, 1427 returns 0 otherwise. 1428 """ 1429 return ulong(pn_data_get_ulong(self._data))
1430
1431 - def get_long(self):
1432 """ 1433 If the current node is an signed long, returns its value, returns 1434 0 otherwise. 1435 """ 1436 return long(pn_data_get_long(self._data))
1437
1438 - def get_timestamp(self):
1439 """ 1440 If the current node is a timestamp, returns its value, returns 0 1441 otherwise. 1442 """ 1443 return timestamp(pn_data_get_timestamp(self._data))
1444
1445 - def get_float(self):
1446 """ 1447 If the current node is a float, returns its value, raises 0 1448 otherwise. 1449 """ 1450 return float32(pn_data_get_float(self._data))
1451
1452 - def get_double(self):
1453 """ 1454 If the current node is a double, returns its value, returns 0 1455 otherwise. 1456 """ 1457 return pn_data_get_double(self._data)
1458 1459 # XXX: need to convert
1460 - def get_decimal32(self):
1461 """ 1462 If the current node is a decimal32, returns its value, returns 0 1463 otherwise. 1464 """ 1465 return decimal32(pn_data_get_decimal32(self._data))
1466 1467 # XXX: need to convert
1468 - def get_decimal64(self):
1469 """ 1470 If the current node is a decimal64, returns its value, returns 0 1471 otherwise. 1472 """ 1473 return decimal64(pn_data_get_decimal64(self._data))
1474 1475 # XXX: need to convert
1476 - def get_decimal128(self):
1477 """ 1478 If the current node is a decimal128, returns its value, returns 0 1479 otherwise. 1480 """ 1481 return decimal128(pn_data_get_decimal128(self._data))
1482
1483 - def get_uuid(self):
1484 """ 1485 If the current node is a UUID, returns its value, returns None 1486 otherwise. 1487 """ 1488 if pn_data_type(self._data) == Data.UUID: 1489 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 1490 else: 1491 return None
1492
1493 - def get_binary(self):
1494 """ 1495 If the current node is binary, returns its value, returns "" 1496 otherwise. 1497 """ 1498 return pn_data_get_binary(self._data)
1499
1500 - def get_string(self):
1501 """ 1502 If the current node is a string, returns its value, returns "" 1503 otherwise. 1504 """ 1505 return pn_data_get_string(self._data).decode("utf8")
1506
1507 - def get_symbol(self):
1508 """ 1509 If the current node is a symbol, returns its value, returns "" 1510 otherwise. 1511 """ 1512 return symbol(pn_data_get_symbol(self._data).decode('ascii'))
1513
1514 - def copy(self, src):
1515 self._check(pn_data_copy(self._data, src._data))
1516
1517 - def format(self):
1518 sz = 16 1519 while True: 1520 err, result = pn_data_format(self._data, sz) 1521 if err == PN_OVERFLOW: 1522 sz *= 2 1523 continue 1524 else: 1525 self._check(err) 1526 return result
1527
1528 - def dump(self):
1529 pn_data_dump(self._data)
1530
1531 - def put_dict(self, d):
1532 self.put_map() 1533 self.enter() 1534 try: 1535 for k, v in d.items(): 1536 self.put_object(k) 1537 self.put_object(v) 1538 finally: 1539 self.exit()
1540
1541 - def get_dict(self):
1542 if self.enter(): 1543 try: 1544 result = {} 1545 while self.next(): 1546 k = self.get_object() 1547 if self.next(): 1548 v = self.get_object() 1549 else: 1550 v = None 1551 result[k] = v 1552 finally: 1553 self.exit() 1554 return result
1555
1556 - def put_sequence(self, s):
1557 self.put_list() 1558 self.enter() 1559 try: 1560 for o in s: 1561 self.put_object(o) 1562 finally: 1563 self.exit()
1564
1565 - def get_sequence(self):
1566 if self.enter(): 1567 try: 1568 result = [] 1569 while self.next(): 1570 result.append(self.get_object()) 1571 finally: 1572 self.exit() 1573 return result
1574
1575 - def get_py_described(self):
1576 if self.enter(): 1577 try: 1578 self.next() 1579 descriptor = self.get_object() 1580 self.next() 1581 value = self.get_object() 1582 finally: 1583 self.exit() 1584 return Described(descriptor, value)
1585
1586 - def put_py_described(self, d):
1587 self.put_described() 1588 self.enter() 1589 try: 1590 self.put_object(d.descriptor) 1591 self.put_object(d.value) 1592 finally: 1593 self.exit()
1594
1595 - def get_py_array(self):
1596 """ 1597 If the current node is an array, return an Array object 1598 representing the array and its contents. Otherwise return None. 1599 This is a convenience wrapper around get_array, enter, etc. 1600 """ 1601 1602 count, described, type = self.get_array() 1603 if type is None: return None 1604 if self.enter(): 1605 try: 1606 if described: 1607 self.next() 1608 descriptor = self.get_object() 1609 else: 1610 descriptor = UNDESCRIBED 1611 elements = [] 1612 while self.next(): 1613 elements.append(self.get_object()) 1614 finally: 1615 self.exit() 1616 return Array(descriptor, type, *elements)
1617
1618 - def put_py_array(self, a):
1619 described = a.descriptor != UNDESCRIBED 1620 self.put_array(described, a.type) 1621 self.enter() 1622 try: 1623 if described: 1624 self.put_object(a.descriptor) 1625 for e in a.elements: 1626 self.put_object(e) 1627 finally: 1628 self.exit()
1629 1630 put_mappings = { 1631 None.__class__: lambda s, _: s.put_null(), 1632 bool: put_bool, 1633 ubyte: put_ubyte, 1634 ushort: put_ushort, 1635 uint: put_uint, 1636 ulong: put_ulong, 1637 byte: put_byte, 1638 short: put_short, 1639 int32: put_int, 1640 long: put_long, 1641 float32: put_float, 1642 float: put_double, 1643 decimal32: put_decimal32, 1644 decimal64: put_decimal64, 1645 decimal128: put_decimal128, 1646 char: put_char, 1647 timestamp: put_timestamp, 1648 uuid.UUID: put_uuid, 1649 bytes: put_binary, 1650 unicode: put_string, 1651 symbol: put_symbol, 1652 list: put_sequence, 1653 tuple: put_sequence, 1654 dict: put_dict, 1655 Described: put_py_described, 1656 Array: put_py_array 1657 } 1658 # for python 3.x, long is merely an alias for int, but for python 2.x 1659 # we need to add an explicit int since it is a different type 1660 if int not in put_mappings: 1661 put_mappings[int] = put_int 1662 # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both. 1663 try: put_mappings[memoryview] = put_memoryview 1664 except NameError: pass 1665 try: put_mappings[buffer] = put_buffer 1666 except NameError: pass 1667 get_mappings = { 1668 NULL: lambda s: None, 1669 BOOL: get_bool, 1670 BYTE: get_byte, 1671 UBYTE: get_ubyte, 1672 SHORT: get_short, 1673 USHORT: get_ushort, 1674 INT: get_int, 1675 UINT: get_uint, 1676 CHAR: get_char, 1677 LONG: get_long, 1678 ULONG: get_ulong, 1679 TIMESTAMP: get_timestamp, 1680 FLOAT: get_float, 1681 DOUBLE: get_double, 1682 DECIMAL32: get_decimal32, 1683 DECIMAL64: get_decimal64, 1684 DECIMAL128: get_decimal128, 1685 UUID: get_uuid, 1686 BINARY: get_binary, 1687 STRING: get_string, 1688 SYMBOL: get_symbol, 1689 DESCRIBED: get_py_described, 1690 ARRAY: get_py_array, 1691 LIST: get_sequence, 1692 MAP: get_dict 1693 } 1694 1695
1696 - def put_object(self, obj):
1697 putter = self.put_mappings[obj.__class__] 1698 putter(self, obj)
1699
1700 - def get_object(self):
1701 type = self.type() 1702 if type is None: return None 1703 getter = self.get_mappings.get(type) 1704 if getter: 1705 return getter(self) 1706 else: 1707 return UnmappedType(str(type))
1708
1709 -class ConnectionException(ProtonException):
1710 pass
1711
1712 -class Endpoint(object):
1713 1714 LOCAL_UNINIT = PN_LOCAL_UNINIT 1715 REMOTE_UNINIT = PN_REMOTE_UNINIT 1716 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 1717 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 1718 LOCAL_CLOSED = PN_LOCAL_CLOSED 1719 REMOTE_CLOSED = PN_REMOTE_CLOSED 1720
1721 - def _init(self):
1722 self.condition = None
1723
1724 - def _update_cond(self):
1725 obj2cond(self.condition, self._get_cond_impl())
1726 1727 @property
1728 - def remote_condition(self):
1729 return cond2obj(self._get_remote_cond_impl())
1730 1731 # the following must be provided by subclasses
1732 - def _get_cond_impl(self):
1733 assert False, "Subclass must override this!"
1734
1735 - def _get_remote_cond_impl(self):
1736 assert False, "Subclass must override this!"
1737
1738 - def _get_handler(self):
1739 from . import reactor 1740 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 1741 if ractor: 1742 on_error = ractor.on_error_delegate() 1743 else: 1744 on_error = None 1745 record = self._get_attachments() 1746 return WrappedHandler.wrap(pn_record_get_handler(record), on_error)
1747
1748 - def _set_handler(self, handler):
1749 from . import reactor 1750 ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) 1751 if ractor: 1752 on_error = ractor.on_error_delegate() 1753 else: 1754 on_error = None 1755 impl = _chandler(handler, on_error) 1756 record = self._get_attachments() 1757 pn_record_set_handler(record, impl) 1758 pn_decref(impl)
1759 1760 handler = property(_get_handler, _set_handler) 1761 1762 @property
1763 - def transport(self):
1764 return self.connection.transport
1765
1766 -class Condition:
1767
1768 - def __init__(self, name, description=None, info=None):
1769 self.name = name 1770 self.description = description 1771 self.info = info
1772
1773 - def __repr__(self):
1774 return "Condition(%s)" % ", ".join([repr(x) for x in 1775 (self.name, self.description, self.info) 1776 if x])
1777
1778 - def __eq__(self, o):
1779 if not isinstance(o, Condition): return False 1780 return self.name == o.name and \ 1781 self.description == o.description and \ 1782 self.info == o.info
1783
1784 -def obj2cond(obj, cond):
1785 pn_condition_clear(cond) 1786 if obj: 1787 pn_condition_set_name(cond, str(obj.name)) 1788 pn_condition_set_description(cond, obj.description) 1789 info = Data(pn_condition_info(cond)) 1790 if obj.info: 1791 info.put_object(obj.info)
1792
1793 -def cond2obj(cond):
1794 if pn_condition_is_set(cond): 1795 return Condition(pn_condition_get_name(cond), 1796 pn_condition_get_description(cond), 1797 dat2obj(pn_condition_info(cond))) 1798 else: 1799 return None
1800
1801 -def dat2obj(dimpl):
1802 if dimpl: 1803 d = Data(dimpl) 1804 d.rewind() 1805 d.next() 1806 obj = d.get_object() 1807 d.rewind() 1808 return obj
1809
1810 -def obj2dat(obj, dimpl):
1811 if obj is not None: 1812 d = Data(dimpl) 1813 d.put_object(obj)
1814
1815 -def secs2millis(secs):
1816 return long(secs*1000)
1817
1818 -def millis2secs(millis):
1819 return float(millis)/1000.0
1820
1821 -def timeout2millis(secs):
1822 if secs is None: return PN_MILLIS_MAX 1823 return secs2millis(secs)
1824
1825 -def millis2timeout(millis):
1826 if millis == PN_MILLIS_MAX: return None 1827 return millis2secs(millis)
1828
1829 -def unicode2utf8(string):
1830 """Some Proton APIs expect a null terminated string. Convert python text 1831 types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. 1832 This method will throw if the string cannot be converted. 1833 """ 1834 if string is None: 1835 return None 1836 if _compat.IS_PY2: 1837 if isinstance(string, unicode): 1838 return string.encode('utf-8') 1839 elif isinstance(string, str): 1840 return string 1841 else: 1842 # decoding a string results in bytes 1843 if isinstance(string, str): 1844 string = string.encode('utf-8') 1845 # fall through 1846 if isinstance(string, bytes): 1847 return string.decode('utf-8') 1848 raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string)))
1849
1850 -def utf82unicode(string):
1851 """Covert C strings returned from proton-c into python unicode""" 1852 if string is None: 1853 return None 1854 if isinstance(string, _compat.TEXT_TYPES): 1855 # already unicode 1856 return string 1857 elif isinstance(string, _compat.BINARY_TYPES): 1858 return string.decode('utf8') 1859 else: 1860 raise TypeError("Unrecognized string type")
1861
1862 -class Connection(Wrapper, Endpoint):
1863 """ 1864 A representation of an AMQP connection 1865 """ 1866 1867 @staticmethod
1868 - def wrap(impl):
1869 if impl is None: 1870 return None 1871 else: 1872 return Connection(impl)
1873
1874 - def __init__(self, impl = pn_connection):
1875 Wrapper.__init__(self, impl, pn_connection_attachments)
1876
1877 - def _init(self):
1878 Endpoint._init(self) 1879 self.offered_capabilities = None 1880 self.desired_capabilities = None 1881 self.properties = None
1882
1883 - def _get_attachments(self):
1884 return pn_connection_attachments(self._impl)
1885 1886 @property
1887 - def connection(self):
1888 return self
1889 1890 @property
1891 - def transport(self):
1892 return Transport.wrap(pn_connection_transport(self._impl))
1893
1894 - def _check(self, err):
1895 if err < 0: 1896 exc = EXCEPTIONS.get(err, ConnectionException) 1897 raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) 1898 else: 1899 return err
1900
1901 - def _get_cond_impl(self):
1902 return pn_connection_condition(self._impl)
1903
1904 - def _get_remote_cond_impl(self):
1905 return pn_connection_remote_condition(self._impl)
1906
1907 - def collect(self, collector):
1908 if collector is None: 1909 pn_connection_collect(self._impl, None) 1910 else: 1911 pn_connection_collect(self._impl, collector._impl) 1912 self._collector = weakref.ref(collector)
1913
1914 - def _get_container(self):
1915 return utf82unicode(pn_connection_get_container(self._impl))
1916 - def _set_container(self, name):
1917 return pn_connection_set_container(self._impl, unicode2utf8(name))
1918 1919 container = property(_get_container, _set_container) 1920
1921 - def _get_hostname(self):
1922 return utf82unicode(pn_connection_get_hostname(self._impl))
1923 - def _set_hostname(self, name):
1924 return pn_connection_set_hostname(self._impl, unicode2utf8(name))
1925 1926 hostname = property(_get_hostname, _set_hostname, 1927 doc=""" 1928 Set the name of the host (either fully qualified or relative) to which this 1929 connection is connecting to. This information may be used by the remote 1930 peer to determine the correct back-end service to connect the client to. 1931 This value will be sent in the Open performative, and will be used by SSL 1932 and SASL layers to identify the peer. 1933 """) 1934
1935 - def _get_user(self):
1936 return utf82unicode(pn_connection_get_user(self._impl))
1937 - def _set_user(self, name):
1938 return pn_connection_set_user(self._impl, unicode2utf8(name))
1939 1940 user = property(_get_user, _set_user) 1941
1942 - def _get_password(self):
1943 return None
1944 - def _set_password(self, name):
1945 return pn_connection_set_password(self._impl, unicode2utf8(name))
1946 1947 password = property(_get_password, _set_password) 1948 1949 @property
1950 - def remote_container(self):
1951 """The container identifier specified by the remote peer for this connection.""" 1952 return pn_connection_remote_container(self._impl)
1953 1954 @property
1955 - def remote_hostname(self):
1956 """The hostname specified by the remote peer for this connection.""" 1957 return pn_connection_remote_hostname(self._impl)
1958 1959 @property
1961 """The capabilities offered by the remote peer for this connection.""" 1962 return dat2obj(pn_connection_remote_offered_capabilities(self._impl))
1963 1964 @property
1966 """The capabilities desired by the remote peer for this connection.""" 1967 return dat2obj(pn_connection_remote_desired_capabilities(self._impl))
1968 1969 @property
1970 - def remote_properties(self):
1971 """The properties specified by the remote peer for this connection.""" 1972 return dat2obj(pn_connection_remote_properties(self._impl))
1973
1974 - def open(self):
1975 """ 1976 Opens the connection. 1977 1978 In more detail, this moves the local state of the connection to 1979 the ACTIVE state and triggers an open frame to be sent to the 1980 peer. A connection is fully active once both peers have opened it. 1981 """ 1982 obj2dat(self.offered_capabilities, 1983 pn_connection_offered_capabilities(self._impl)) 1984 obj2dat(self.desired_capabilities, 1985 pn_connection_desired_capabilities(self._impl)) 1986 obj2dat(self.properties, pn_connection_properties(self._impl)) 1987 pn_connection_open(self._impl)
1988
1989 - def close(self):
1990 """ 1991 Closes the connection. 1992 1993 In more detail, this moves the local state of the connection to 1994 the CLOSED state and triggers a close frame to be sent to the 1995 peer. A connection is fully closed once both peers have closed it. 1996 """ 1997 self._update_cond() 1998 pn_connection_close(self._impl) 1999 if hasattr(self, '_session_policy'): 2000 # break circular ref 2001 del self._session_policy
2002 2003 @property
2004 - def state(self):
2005 """ 2006 The state of the connection as a bit field. The state has a local 2007 and a remote component. Each of these can be in one of three 2008 states: UNINIT, ACTIVE or CLOSED. These can be tested by masking 2009 against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, 2010 REMOTE_ACTIVE and REMOTE_CLOSED. 2011 """ 2012 return pn_connection_state(self._impl)
2013
2014 - def session(self):
2015 """ 2016 Returns a new session on this connection. 2017 """ 2018 ssn = pn_session(self._impl) 2019 if ssn is None: 2020 raise(SessionException("Session allocation failed.")) 2021 else: 2022 return Session(ssn)
2023
2024 - def session_head(self, mask):
2025 return Session.wrap(pn_session_head(self._impl, mask))
2026 2029 2030 @property
2031 - def work_head(self):
2032 return Delivery.wrap(pn_work_head(self._impl))
2033 2034 @property
2035 - def error(self):
2036 return pn_error_code(pn_connection_error(self._impl))
2037
2038 - def free(self):
2039 pn_connection_release(self._impl)
2040
2041 -class SessionException(ProtonException):
2042 pass
2043
2044 -class Session(Wrapper, Endpoint):
2045 2046 @staticmethod
2047 - def wrap(impl):
2048 if impl is None: 2049 return None 2050 else: 2051 return Session(impl)
2052
2053 - def __init__(self, impl):
2054 Wrapper.__init__(self, impl, pn_session_attachments)
2055
2056 - def _get_attachments(self):
2057 return pn_session_attachments(self._impl)
2058
2059 - def _get_cond_impl(self):
2060 return pn_session_condition(self._impl)
2061
2062 - def _get_remote_cond_impl(self):
2063 return pn_session_remote_condition(self._impl)
2064
2065 - def _get_incoming_capacity(self):
2066 return pn_session_get_incoming_capacity(self._impl)
2067
2068 - def _set_incoming_capacity(self, capacity):
2069 pn_session_set_incoming_capacity(self._impl, capacity)
2070 2071 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2072
2073 - def _get_outgoing_window(self):
2074 return pn_session_get_outgoing_window(self._impl)
2075
2076 - def _set_outgoing_window(self, window):
2077 pn_session_set_outgoing_window(self._impl, window)
2078 2079 outgoing_window = property(_get_outgoing_window, _set_outgoing_window) 2080 2081 @property
2082 - def outgoing_bytes(self):
2083 return pn_session_outgoing_bytes(self._impl)
2084 2085 @property
2086 - def incoming_bytes(self):
2087 return pn_session_incoming_bytes(self._impl)
2088
2089 - def open(self):
2090 pn_session_open(self._impl)
2091
2092 - def close(self):
2093 self._update_cond() 2094 pn_session_close(self._impl)
2095
2096 - def next(self, mask):
2097 return Session.wrap(pn_session_next(self._impl, mask))
2098 2099 @property
2100 - def state(self):
2101 return pn_session_state(self._impl)
2102 2103 @property
2104 - def connection(self):
2105 return Connection.wrap(pn_session_connection(self._impl))
2106
2107 - def sender(self, name):
2108 return Sender(pn_sender(self._impl, unicode2utf8(name)))
2109
2110 - def receiver(self, name):
2111 return Receiver(pn_receiver(self._impl, unicode2utf8(name)))
2112
2113 - def free(self):
2114 pn_session_free(self._impl)
2115
2116 -class LinkException(ProtonException):
2117 pass
2118 2311
2312 -class Terminus(object):
2313 2314 UNSPECIFIED = PN_UNSPECIFIED 2315 SOURCE = PN_SOURCE 2316 TARGET = PN_TARGET 2317 COORDINATOR = PN_COORDINATOR 2318 2319 NONDURABLE = PN_NONDURABLE 2320 CONFIGURATION = PN_CONFIGURATION 2321 DELIVERIES = PN_DELIVERIES 2322 2323 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2324 DIST_MODE_COPY = PN_DIST_MODE_COPY 2325 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2326 2327 EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK 2328 EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION 2329 EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION 2330 EXPIRE_NEVER = PN_EXPIRE_NEVER 2331
2332 - def __init__(self, impl):
2333 self._impl = impl
2334
2335 - def _check(self, err):
2336 if err < 0: 2337 exc = EXCEPTIONS.get(err, LinkException) 2338 raise exc("[%s]" % err) 2339 else: 2340 return err
2341
2342 - def _get_type(self):
2343 return pn_terminus_get_type(self._impl)
2344 - def _set_type(self, type):
2345 self._check(pn_terminus_set_type(self._impl, type))
2346 type = property(_get_type, _set_type) 2347
2348 - def _get_address(self):
2349 """The address that identifies the source or target node""" 2350 return utf82unicode(pn_terminus_get_address(self._impl))
2351 - def _set_address(self, address):
2352 self._check(pn_terminus_set_address(self._impl, unicode2utf8(address)))
2353 address = property(_get_address, _set_address) 2354
2355 - def _get_durability(self):
2356 return pn_terminus_get_durability(self._impl)
2357 - def _set_durability(self, seconds):
2358 self._check(pn_terminus_set_durability(self._impl, seconds))
2359 durability = property(_get_durability, _set_durability) 2360
2361 - def _get_expiry_policy(self):
2362 return pn_terminus_get_expiry_policy(self._impl)
2363 - def _set_expiry_policy(self, seconds):
2364 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2365 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2366
2367 - def _get_timeout(self):
2368 return pn_terminus_get_timeout(self._impl)
2369 - def _set_timeout(self, seconds):
2370 self._check(pn_terminus_set_timeout(self._impl, seconds))
2371 timeout = property(_get_timeout, _set_timeout) 2372
2373 - def _is_dynamic(self):
2374 """Indicates whether the source or target node was dynamically 2375 created""" 2376 return pn_terminus_is_dynamic(self._impl)
2377 - def _set_dynamic(self, dynamic):
2378 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2379 dynamic = property(_is_dynamic, _set_dynamic) 2380
2381 - def _get_distribution_mode(self):
2382 return pn_terminus_get_distribution_mode(self._impl)
2383 - def _set_distribution_mode(self, mode):
2384 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2385 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2386 2387 @property
2388 - def properties(self):
2389 """Properties of a dynamic source or target.""" 2390 return Data(pn_terminus_properties(self._impl))
2391 2392 @property
2393 - def capabilities(self):
2394 """Capabilities of the source or target.""" 2395 return Data(pn_terminus_capabilities(self._impl))
2396 2397 @property
2398 - def outcomes(self):
2399 return Data(pn_terminus_outcomes(self._impl))
2400 2401 @property
2402 - def filter(self):
2403 """A filter on a source allows the set of messages transfered over 2404 the link to be restricted""" 2405 return Data(pn_terminus_filter(self._impl))
2406
2407 - def copy(self, src):
2408 self._check(pn_terminus_copy(self._impl, src._impl))
2409
2410 -class Sender(Link):
2411 """ 2412 A link over which messages are sent. 2413 """ 2414
2415 - def offered(self, n):
2416 pn_link_offered(self._impl, n)
2417
2418 - def stream(self, data):
2419 """ 2420 Send specified data as part of the current delivery 2421 2422 @type data: binary 2423 @param data: data to send 2424 """ 2425 return self._check(pn_link_send(self._impl, data))
2426
2427 - def send(self, obj, tag=None):
2428 """ 2429 Send specified object over this sender; the object is expected to 2430 have a send() method on it that takes the sender and an optional 2431 tag as arguments. 2432 2433 Where the object is a Message, this will send the message over 2434 this link, creating a new delivery for the purpose. 2435 """ 2436 if hasattr(obj, 'send'): 2437 return obj.send(self, tag=tag) 2438 else: 2439 # treat object as bytes 2440 return self.stream(obj)
2441
2442 - def delivery_tag(self):
2443 if not hasattr(self, 'tag_generator'): 2444 def simple_tags(): 2445 count = 1 2446 while True: 2447 yield str(count) 2448 count += 1
2449 self.tag_generator = simple_tags() 2450 return next(self.tag_generator)
2451
2452 -class Receiver(Link):
2453 """ 2454 A link over which messages are received. 2455 """ 2456
2457 - def flow(self, n):
2458 """Increases the credit issued to the remote sender by the specified number of messages.""" 2459 pn_link_flow(self._impl, n)
2460
2461 - def recv(self, limit):
2462 n, binary = pn_link_recv(self._impl, limit) 2463 if n == PN_EOS: 2464 return None 2465 else: 2466 self._check(n) 2467 return binary
2468
2469 - def drain(self, n):
2470 pn_link_drain(self._impl, n)
2471
2472 - def draining(self):
2473 return pn_link_draining(self._impl)
2474
2475 -class NamedInt(int):
2476 2477 values = {} 2478
2479 - def __new__(cls, i, name):
2480 ni = super(NamedInt, cls).__new__(cls, i) 2481 cls.values[i] = ni 2482 return ni
2483
2484 - def __init__(self, i, name):
2485 self.name = name
2486
2487 - def __repr__(self):
2488 return self.name
2489
2490 - def __str__(self):
2491 return self.name
2492 2493 @classmethod
2494 - def get(cls, i):
2495 return cls.values.get(i, i)
2496
2497 -class DispositionType(NamedInt):
2498 values = {}
2499
2500 -class Disposition(object):
2501 2502 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 2503 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 2504 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 2505 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 2506 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 2507
2508 - def __init__(self, impl, local):
2509 self._impl = impl 2510 self.local = local 2511 self._data = None 2512 self._condition = None 2513 self._annotations = None
2514 2515 @property
2516 - def type(self):
2517 return DispositionType.get(pn_disposition_type(self._impl))
2518
2519 - def _get_section_number(self):
2520 return pn_disposition_get_section_number(self._impl)
2521 - def _set_section_number(self, n):
2522 pn_disposition_set_section_number(self._impl, n)
2523 section_number = property(_get_section_number, _set_section_number) 2524
2525 - def _get_section_offset(self):
2526 return pn_disposition_get_section_offset(self._impl)
2527 - def _set_section_offset(self, n):
2528 pn_disposition_set_section_offset(self._impl, n)
2529 section_offset = property(_get_section_offset, _set_section_offset) 2530
2531 - def _get_failed(self):
2532 return pn_disposition_is_failed(self._impl)
2533 - def _set_failed(self, b):
2534 pn_disposition_set_failed(self._impl, b)
2535 failed = property(_get_failed, _set_failed) 2536
2537 - def _get_undeliverable(self):
2538 return pn_disposition_is_undeliverable(self._impl)
2539 - def _set_undeliverable(self, b):
2540 pn_disposition_set_undeliverable(self._impl, b)
2541 undeliverable = property(_get_undeliverable, _set_undeliverable) 2542
2543 - def _get_data(self):
2544 if self.local: 2545 return self._data 2546 else: 2547 return dat2obj(pn_disposition_data(self._impl))
2548 - def _set_data(self, obj):
2549 if self.local: 2550 self._data = obj 2551 else: 2552 raise AttributeError("data attribute is read-only")
2553 data = property(_get_data, _set_data) 2554
2555 - def _get_annotations(self):
2556 if self.local: 2557 return self._annotations 2558 else: 2559 return dat2obj(pn_disposition_annotations(self._impl))
2560 - def _set_annotations(self, obj):
2561 if self.local: 2562 self._annotations = obj 2563 else: 2564 raise AttributeError("annotations attribute is read-only")
2565 annotations = property(_get_annotations, _set_annotations) 2566
2567 - def _get_condition(self):
2568 if self.local: 2569 return self._condition 2570 else: 2571 return cond2obj(pn_disposition_condition(self._impl))
2572 - def _set_condition(self, obj):
2573 if self.local: 2574 self._condition = obj 2575 else: 2576 raise AttributeError("condition attribute is read-only")
2577 condition = property(_get_condition, _set_condition)
2578
2579 -class Delivery(Wrapper):
2580 """ 2581 Tracks and/or records the delivery of a message over a link. 2582 """ 2583 2584 RECEIVED = Disposition.RECEIVED 2585 ACCEPTED = Disposition.ACCEPTED 2586 REJECTED = Disposition.REJECTED 2587 RELEASED = Disposition.RELEASED 2588 MODIFIED = Disposition.MODIFIED 2589 2590 @staticmethod
2591 - def wrap(impl):
2592 if impl is None: 2593 return None 2594 else: 2595 return Delivery(impl)
2596
2597 - def __init__(self, impl):
2598 Wrapper.__init__(self, impl, pn_delivery_attachments)
2599
2600 - def _init(self):
2601 self.local = Disposition(pn_delivery_local(self._impl), True) 2602 self.remote = Disposition(pn_delivery_remote(self._impl), False)
2603 2604 @property
2605 - def tag(self):
2606 """The identifier for the delivery.""" 2607 return pn_delivery_tag(self._impl)
2608 2609 @property
2610 - def writable(self):
2611 """Returns true for an outgoing delivery to which data can now be written.""" 2612 return pn_delivery_writable(self._impl)
2613 2614 @property
2615 - def readable(self):
2616 """Returns true for an incoming delivery that has data to read.""" 2617 return pn_delivery_readable(self._impl)
2618 2619 @property
2620 - def updated(self):
2621 """Returns true if the state of the delivery has been updated 2622 (e.g. it has been settled and/or accepted, rejected etc).""" 2623 return pn_delivery_updated(self._impl)
2624
2625 - def update(self, state):
2626 """ 2627 Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. 2628 """ 2629 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 2630 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 2631 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 2632 pn_delivery_update(self._impl, state)
2633 2634 @property
2635 - def pending(self):
2636 return pn_delivery_pending(self._impl)
2637 2638 @property
2639 - def partial(self):
2640 """ 2641 Returns true for an incoming delivery if not all the data is 2642 yet available. 2643 """ 2644 return pn_delivery_partial(self._impl)
2645 2646 @property
2647 - def local_state(self):
2648 """Returns the local state of the delivery.""" 2649 return DispositionType.get(pn_delivery_local_state(self._impl))
2650 2651 @property
2652 - def remote_state(self):
2653 """ 2654 Returns the state of the delivery as indicated by the remote 2655 peer. 2656 """ 2657 return DispositionType.get(pn_delivery_remote_state(self._impl))
2658 2659 @property
2660 - def settled(self):
2661 """ 2662 Returns true if the delivery has been settled by the remote peer. 2663 """ 2664 return pn_delivery_settled(self._impl)
2665
2666 - def settle(self):
2667 """ 2668 Settles the delivery locally. This indicates the application 2669 considers the delivery complete and does not wish to receive any 2670 further events about it. Every delivery should be settled locally. 2671 """ 2672 pn_delivery_settle(self._impl)
2673 2674 @property
2675 - def aborted(self):
2676 """Returns true if the delivery has been aborted.""" 2677 return pn_delivery_aborted(self._impl)
2678
2679 - def abort(self):
2680 """ 2681 Aborts the delivery. This indicates the application wishes to 2682 invalidate any data that may have already been sent on this delivery. 2683 The delivery cannot be aborted after it has been completely delivered. 2684 """ 2685 pn_delivery_abort(self._impl)
2686 2687 @property
2688 - def work_next(self):
2689 return Delivery.wrap(pn_work_next(self._impl))
2690 2691 @property 2697 2698 @property
2699 - def session(self):
2700 """ 2701 Returns the session over which the delivery was sent or received. 2702 """ 2703 return self.link.session
2704 2705 @property
2706 - def connection(self):
2707 """ 2708 Returns the connection over which the delivery was sent or received. 2709 """ 2710 return self.session.connection
2711 2712 @property
2713 - def transport(self):
2714 return self.connection.transport
2715
2716 -class TransportException(ProtonException):
2717 pass
2718
2719 -class TraceAdapter:
2720
2721 - def __init__(self, tracer):
2722 self.tracer = tracer
2723
2724 - def __call__(self, trans_impl, message):
2725 self.tracer(Transport.wrap(trans_impl), message)
2726
2727 -class Transport(Wrapper):
2728 2729 TRACE_OFF = PN_TRACE_OFF 2730 TRACE_DRV = PN_TRACE_DRV 2731 TRACE_FRM = PN_TRACE_FRM 2732 TRACE_RAW = PN_TRACE_RAW 2733 2734 CLIENT = 1 2735 SERVER = 2 2736 2737 @staticmethod
2738 - def wrap(impl):
2739 if impl is None: 2740 return None 2741 else: 2742 return Transport(_impl=impl)
2743
2744 - def __init__(self, mode=None, _impl = pn_transport):
2745 Wrapper.__init__(self, _impl, pn_transport_attachments) 2746 if mode == Transport.SERVER: 2747 pn_transport_set_server(self._impl) 2748 elif mode is None or mode==Transport.CLIENT: 2749 pass 2750 else: 2751 raise TransportException("Cannot initialise Transport from mode: %s" % str(mode))
2752
2753 - def _init(self):
2754 self._sasl = None 2755 self._ssl = None
2756
2757 - def _check(self, err):
2758 if err < 0: 2759 exc = EXCEPTIONS.get(err, TransportException) 2760 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) 2761 else: 2762 return err
2763
2764 - def _set_tracer(self, tracer):
2765 pn_transport_set_pytracer(self._impl, TraceAdapter(tracer));
2766
2767 - def _get_tracer(self):
2768 adapter = pn_transport_get_pytracer(self._impl) 2769 if adapter: 2770 return adapter.tracer 2771 else: 2772 return None
2773 2774 tracer = property(_get_tracer, _set_tracer, 2775 doc=""" 2776 A callback for trace logging. The callback is passed the transport and log message. 2777 """) 2778
2779 - def log(self, message):
2780 pn_transport_log(self._impl, message)
2781
2782 - def require_auth(self, bool):
2783 pn_transport_require_auth(self._impl, bool)
2784 2785 @property
2786 - def authenticated(self):
2787 return pn_transport_is_authenticated(self._impl)
2788
2789 - def require_encryption(self, bool):
2790 pn_transport_require_encryption(self._impl, bool)
2791 2792 @property
2793 - def encrypted(self):
2794 return pn_transport_is_encrypted(self._impl)
2795 2796 @property
2797 - def user(self):
2798 return pn_transport_get_user(self._impl)
2799
2800 - def bind(self, connection):
2801 """Assign a connection to the transport""" 2802 self._check(pn_transport_bind(self._impl, connection._impl))
2803
2804 - def unbind(self):
2805 """Release the connection""" 2806 self._check(pn_transport_unbind(self._impl))
2807
2808 - def trace(self, n):
2809 pn_transport_trace(self._impl, n)
2810
2811 - def tick(self, now):
2812 """Process any timed events (like heartbeat generation). 2813 now = seconds since epoch (float). 2814 """ 2815 return millis2secs(pn_transport_tick(self._impl, secs2millis(now)))
2816
2817 - def capacity(self):
2818 c = pn_transport_capacity(self._impl) 2819 if c >= PN_EOS: 2820 return c 2821 else: 2822 return self._check(c)
2823
2824 - def push(self, binary):
2825 n = self._check(pn_transport_push(self._impl, binary)) 2826 if n != len(binary): 2827 raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary)))
2828
2829 - def close_tail(self):
2830 self._check(pn_transport_close_tail(self._impl))
2831
2832 - def pending(self):
2833 p = pn_transport_pending(self._impl) 2834 if p >= PN_EOS: 2835 return p 2836 else: 2837 return self._check(p)
2838
2839 - def peek(self, size):
2840 cd, out = pn_transport_peek(self._impl, size) 2841 if cd == PN_EOS: 2842 return None 2843 else: 2844 self._check(cd) 2845 return out
2846
2847 - def pop(self, size):
2848 pn_transport_pop(self._impl, size)
2849
2850 - def close_head(self):
2851 self._check(pn_transport_close_head(self._impl))
2852 2853 @property
2854 - def closed(self):
2855 return pn_transport_closed(self._impl)
2856 2857 # AMQP 1.0 max-frame-size
2858 - def _get_max_frame_size(self):
2859 return pn_transport_get_max_frame(self._impl)
2860
2861 - def _set_max_frame_size(self, value):
2862 pn_transport_set_max_frame(self._impl, value)
2863 2864 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 2865 doc=""" 2866 Sets the maximum size for received frames (in bytes). 2867 """) 2868 2869 @property
2870 - def remote_max_frame_size(self):
2871 return pn_transport_get_remote_max_frame(self._impl)
2872
2873 - def _get_channel_max(self):
2874 return pn_transport_get_channel_max(self._impl)
2875
2876 - def _set_channel_max(self, value):
2877 if pn_transport_set_channel_max(self._impl, value): 2878 raise SessionException("Too late to change channel max.")
2879 2880 channel_max = property(_get_channel_max, _set_channel_max, 2881 doc=""" 2882 Sets the maximum channel that may be used on the transport. 2883 """) 2884 2885 @property
2886 - def remote_channel_max(self):
2887 return pn_transport_remote_channel_max(self._impl)
2888 2889 # AMQP 1.0 idle-time-out
2890 - def _get_idle_timeout(self):
2891 return millis2secs(pn_transport_get_idle_timeout(self._impl))
2892
2893 - def _set_idle_timeout(self, sec):
2894 pn_transport_set_idle_timeout(self._impl, secs2millis(sec))
2895 2896 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 2897 doc=""" 2898 The idle timeout of the connection (float, in seconds). 2899 """) 2900 2901 @property
2902 - def remote_idle_timeout(self):
2903 return millis2secs(pn_transport_get_remote_idle_timeout(self._impl))
2904 2905 @property
2906 - def frames_output(self):
2907 return pn_transport_get_frames_output(self._impl)
2908 2909 @property
2910 - def frames_input(self):
2911 return pn_transport_get_frames_input(self._impl)
2912
2913 - def sasl(self):
2914 return SASL(self)
2915
2916 - def ssl(self, domain=None, session_details=None):
2917 # SSL factory (singleton for this transport) 2918 if not self._ssl: 2919 self._ssl = SSL(self, domain, session_details) 2920 return self._ssl
2921 2922 @property
2923 - def condition(self):
2924 return cond2obj(pn_transport_condition(self._impl))
2925 2926 @property
2927 - def connection(self):
2928 return Connection.wrap(pn_transport_connection(self._impl))
2929
2930 -class SASLException(TransportException):
2931 pass
2932
2933 -class SASL(Wrapper):
2934 2935 OK = PN_SASL_OK 2936 AUTH = PN_SASL_AUTH 2937 SYS = PN_SASL_SYS 2938 PERM = PN_SASL_PERM 2939 TEMP = PN_SASL_TEMP 2940 2941 @staticmethod
2942 - def extended():
2943 return pn_sasl_extended()
2944
2945 - def __init__(self, transport):
2946 Wrapper.__init__(self, transport._impl, pn_transport_attachments) 2947 self._sasl = pn_sasl(transport._impl)
2948
2949 - def _check(self, err):
2950 if err < 0: 2951 exc = EXCEPTIONS.get(err, SASLException) 2952 raise exc("[%s]" % (err)) 2953 else: 2954 return err
2955 2956 @property
2957 - def user(self):
2958 return pn_sasl_get_user(self._sasl)
2959 2960 @property
2961 - def mech(self):
2962 return pn_sasl_get_mech(self._sasl)
2963 2964 @property
2965 - def outcome(self):
2966 outcome = pn_sasl_outcome(self._sasl) 2967 if outcome == PN_SASL_NONE: 2968 return None 2969 else: 2970 return outcome
2971
2972 - def allowed_mechs(self, mechs):
2973 pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs))
2974
2975 - def _get_allow_insecure_mechs(self):
2976 return pn_sasl_get_allow_insecure_mechs(self._sasl)
2977
2978 - def _set_allow_insecure_mechs(self, insecure):
2979 pn_sasl_set_allow_insecure_mechs(self._sasl, insecure)
2980 2981 allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs, 2982 doc=""" 2983 Allow unencrypted cleartext passwords (PLAIN mech) 2984 """) 2985
2986 - def done(self, outcome):
2987 pn_sasl_done(self._sasl, outcome)
2988
2989 - def config_name(self, name):
2990 pn_sasl_config_name(self._sasl, name)
2991
2992 - def config_path(self, path):
2993 pn_sasl_config_path(self._sasl, path)
2994
2995 -class SSLException(TransportException):
2996 pass
2997
2998 -class SSLUnavailable(SSLException):
2999 pass
3000
3001 -class SSLDomain(object):
3002 3003 MODE_CLIENT = PN_SSL_MODE_CLIENT 3004 MODE_SERVER = PN_SSL_MODE_SERVER 3005 VERIFY_PEER = PN_SSL_VERIFY_PEER 3006 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 3007 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 3008
3009 - def __init__(self, mode):
3010 self._domain = pn_ssl_domain(mode) 3011 if self._domain is None: 3012 raise SSLUnavailable()
3013
3014 - def _check(self, err):
3015 if err < 0: 3016 exc = EXCEPTIONS.get(err, SSLException) 3017 raise exc("SSL failure.") 3018 else: 3019 return err
3020
3021 - def set_credentials(self, cert_file, key_file, password):
3022 return self._check( pn_ssl_domain_set_credentials(self._domain, 3023 cert_file, key_file, 3024 password) )
3025 - def set_trusted_ca_db(self, certificate_db):
3026 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 3027 certificate_db) )
3028 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3029 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 3030 verify_mode, 3031 trusted_CAs) )
3032
3033 - def allow_unsecured_client(self):
3034 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3035
3036 - def __del__(self):
3037 pn_ssl_domain_free(self._domain)
3038
3039 -class SSL(object):
3040 3041 @staticmethod
3042 - def present():
3043 return pn_ssl_present()
3044
3045 - def _check(self, err):
3046 if err < 0: 3047 exc = EXCEPTIONS.get(err, SSLException) 3048 raise exc("SSL failure.") 3049 else: 3050 return err
3051
3052 - def __new__(cls, transport, domain, session_details=None):
3053 """Enforce a singleton SSL object per Transport""" 3054 if transport._ssl: 3055 # unfortunately, we've combined the allocation and the configuration in a 3056 # single step. So catch any attempt by the application to provide what 3057 # may be a different configuration than the original (hack) 3058 ssl = transport._ssl 3059 if (domain and (ssl._domain is not domain) or 3060 session_details and (ssl._session_details is not session_details)): 3061 raise SSLException("Cannot re-configure existing SSL object!") 3062 else: 3063 obj = super(SSL, cls).__new__(cls) 3064 obj._domain = domain 3065 obj._session_details = session_details 3066 session_id = None 3067 if session_details: 3068 session_id = session_details.get_session_id() 3069 obj._ssl = pn_ssl( transport._impl ) 3070 if obj._ssl is None: 3071 raise SSLUnavailable() 3072 if domain: 3073 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3074 transport._ssl = obj 3075 return transport._ssl
3076
3077 - def cipher_name(self):
3078 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3079 if rc: 3080 return name 3081 return None
3082
3083 - def protocol_name(self):
3084 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3085 if rc: 3086 return name 3087 return None
3088 3089 SHA1 = PN_SSL_SHA1 3090 SHA256 = PN_SSL_SHA256 3091 SHA512 = PN_SSL_SHA512 3092 MD5 = PN_SSL_MD5 3093 3094 CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME 3095 CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE 3096 CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY 3097 CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME 3098 CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT 3099 CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME 3100
3101 - def get_cert_subject_subfield(self, subfield_name):
3102 subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name) 3103 return subfield_value
3104
3105 - def get_cert_subject(self):
3106 subject = pn_ssl_get_remote_subject(self._ssl) 3107 return subject
3108
3110 # Pass in an unhandled enum 3111 return self.get_cert_subject_subfield(10)
3112 3113 # Convenience functions for obtaining the subfields of the subject field.
3114 - def get_cert_common_name(self):
3116
3117 - def get_cert_organization(self):
3119
3120 - def get_cert_organization_unit(self):
3122
3123 - def get_cert_locality_or_city(self):
3125
3126 - def get_cert_country(self):
3128
3129 - def get_cert_state_or_province(self):
3131
3132 - def get_cert_fingerprint(self, fingerprint_length, digest_name):
3133 rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name) 3134 if rc == PN_OK: 3135 return fingerprint_str 3136 return None
3137 3138 # Convenience functions for obtaining fingerprint for specific hashing algorithms
3140 return self.get_cert_fingerprint(41, 10)
3141
3142 - def get_cert_fingerprint_sha1(self):
3143 return self.get_cert_fingerprint(41, SSL.SHA1)
3144
3146 # sha256 produces a fingerprint that is 64 characters long 3147 return self.get_cert_fingerprint(65, SSL.SHA256)
3148
3150 # sha512 produces a fingerprint that is 128 characters long 3151 return self.get_cert_fingerprint(129, SSL.SHA512)
3152
3153 - def get_cert_fingerprint_md5(self):
3154 return self.get_cert_fingerprint(33, SSL.MD5)
3155 3156 @property
3157 - def remote_subject(self):
3158 return pn_ssl_get_remote_subject( self._ssl )
3159 3160 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3161 RESUME_NEW = PN_SSL_RESUME_NEW 3162 RESUME_REUSED = PN_SSL_RESUME_REUSED 3163
3164 - def resume_status(self):
3165 return pn_ssl_resume_status( self._ssl )
3166
3167 - def _set_peer_hostname(self, hostname):
3168 self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) ))
3169 - def _get_peer_hostname(self):
3170 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3171 self._check(err) 3172 return utf82unicode(name)
3173 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3174 doc=""" 3175 Manage the expected name of the remote peer. Used to authenticate the remote. 3176 """)
3177
3178 3179 -class SSLSessionDetails(object):
3180 """ Unique identifier for the SSL session. Used to resume previous session on a new 3181 SSL connection. 3182 """ 3183
3184 - def __init__(self, session_id):
3185 self._session_id = session_id
3186
3187 - def get_session_id(self):
3188 return self._session_id
3189 3190 3191 wrappers = { 3192 "pn_void": lambda x: pn_void2py(x), 3193 "pn_pyref": lambda x: pn_void2py(x), 3194 "pn_connection": lambda x: Connection.wrap(pn_cast_pn_connection(x)), 3195 "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), 3196 "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), 3197 "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), 3198 "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), 3199 "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) 3200 }
3201 3202 -class Collector:
3203
3204 - def __init__(self):
3205 self._impl = pn_collector()
3206
3207 - def put(self, obj, etype):
3208 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3209
3210 - def peek(self):
3211 return Event.wrap(pn_collector_peek(self._impl))
3212
3213 - def pop(self):
3214 ev = self.peek() 3215 pn_collector_pop(self._impl)
3216
3217 - def __del__(self):
3218 pn_collector_free(self._impl) 3219 del self._impl
3220 3221 if "TypeExtender" not in globals():
3222 - class TypeExtender:
3223 - def __init__(self, number):
3224 self.number = number
3225 - def next(self):
3226 try: 3227 return self.number 3228 finally: 3229 self.number += 1
3230
3231 -class EventType(object):
3232 3233 _lock = threading.Lock() 3234 _extended = TypeExtender(10000) 3235 TYPES = {} 3236
3237 - def __init__(self, name=None, number=None, method=None):
3238 if name is None and number is None: 3239 raise TypeError("extended events require a name") 3240 try: 3241 self._lock.acquire() 3242 if name is None: 3243 name = pn_event_type_name(number) 3244 3245 if number is None: 3246 number = self._extended.next() 3247 3248 if method is None: 3249 method = "on_%s" % name 3250 3251 self.name = name 3252 self.number = number 3253 self.method = method 3254 3255 self.TYPES[number] = self 3256 finally: 3257 self._lock.release()
3258
3259 - def __repr__(self):
3260 return self.name
3261
3262 -def dispatch(handler, method, *args):
3263 m = getattr(handler, method, None) 3264 if m: 3265 return m(*args) 3266 elif hasattr(handler, "on_unhandled"): 3267 return handler.on_unhandled(method, *args)
3268
3269 -class EventBase(object):
3270
3271 - def __init__(self, clazz, context, type):
3272 self.clazz = clazz 3273 self.context = context 3274 self.type = type
3275
3276 - def dispatch(self, handler):
3277 return dispatch(handler, self.type.method, self)
3278
3279 -def _none(x): return None
3280 3281 DELEGATED = Constant("DELEGATED")
3282 3283 -def _core(number, method):
3284 return EventType(number=number, method=method)
3285
3286 -class Event(Wrapper, EventBase):
3287 3288 REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") 3289 REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") 3290 REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") 3291 3292 TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") 3293 3294 CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") 3295 CONNECTION_BOUND = _core(PN_CONNECTION_BOUND, "on_connection_bound") 3296 CONNECTION_UNBOUND = _core(PN_CONNECTION_UNBOUND, "on_connection_unbound") 3297 CONNECTION_LOCAL_OPEN = _core(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") 3298 CONNECTION_LOCAL_CLOSE = _core(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") 3299 CONNECTION_REMOTE_OPEN = _core(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") 3300 CONNECTION_REMOTE_CLOSE = _core(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") 3301 CONNECTION_FINAL = _core(PN_CONNECTION_FINAL, "on_connection_final") 3302 3303 SESSION_INIT = _core(PN_SESSION_INIT, "on_session_init") 3304 SESSION_LOCAL_OPEN = _core(PN_SESSION_LOCAL_OPEN, "on_session_local_open") 3305 SESSION_LOCAL_CLOSE = _core(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") 3306 SESSION_REMOTE_OPEN = _core(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") 3307 SESSION_REMOTE_CLOSE = _core(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") 3308 SESSION_FINAL = _core(PN_SESSION_FINAL, "on_session_final") 3309 3310 LINK_INIT = _core(PN_LINK_INIT, "on_link_init") 3311 LINK_LOCAL_OPEN = _core(PN_LINK_LOCAL_OPEN, "on_link_local_open") 3312 LINK_LOCAL_CLOSE = _core(PN_LINK_LOCAL_CLOSE, "on_link_local_close") 3313 LINK_LOCAL_DETACH = _core(PN_LINK_LOCAL_DETACH, "on_link_local_detach") 3314 LINK_REMOTE_OPEN = _core(PN_LINK_REMOTE_OPEN, "on_link_remote_open") 3315 LINK_REMOTE_CLOSE = _core(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") 3316 LINK_REMOTE_DETACH = _core(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") 3317 LINK_FLOW = _core(PN_LINK_FLOW, "on_link_flow") 3318 LINK_FINAL = _core(PN_LINK_FINAL, "on_link_final") 3319 3320 DELIVERY = _core(PN_DELIVERY, "on_delivery") 3321 3322 TRANSPORT = _core(PN_TRANSPORT, "on_transport") 3323 TRANSPORT_ERROR = _core(PN_TRANSPORT_ERROR, "on_transport_error") 3324 TRANSPORT_HEAD_CLOSED = _core(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") 3325 TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") 3326 TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") 3327 3328 SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") 3329 SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") 3330 SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") 3331 SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") 3332 SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") 3333 SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") 3334 SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") 3335 3336 @staticmethod
3337 - def wrap(impl, number=None):
3338 if impl is None: 3339 return None 3340 3341 if number is None: 3342 number = pn_event_type(impl) 3343 3344 event = Event(impl, number) 3345 3346 # check for an application defined ApplicationEvent and return that. This 3347 # avoids an expensive wrap operation invoked by event.context 3348 if pn_event_class(impl) == PN_PYREF and \ 3349 isinstance(event.context, EventBase): 3350 return event.context 3351 else: 3352 return event
3353
3354 - def __init__(self, impl, number):
3355 Wrapper.__init__(self, impl, pn_event_attachments) 3356 self.__dict__["type"] = EventType.TYPES[number]
3357
3358 - def _init(self):
3359 pass
3360
3361 - def copy(self):
3362 copy = pn_event_copy(self._impl) 3363 return Event.wrap(copy)
3364 3365 @property
3366 - def clazz(self):
3367 cls = pn_event_class(self._impl) 3368 if cls: 3369 return pn_class_name(cls) 3370 else: 3371 return None
3372 3373 @property
3374 - def root(self):
3375 return WrappedHandler.wrap(pn_event_root(self._impl))
3376 3377 @property
3378 - def context(self):
3379 """Returns the context object associated with the event. The type of this depend on the type of event.""" 3380 return wrappers[self.clazz](pn_event_context(self._impl))
3381
3382 - def dispatch(self, handler, type=None):
3383 type = type or self.type 3384 if isinstance(handler, WrappedHandler): 3385 pn_handler_dispatch(handler._impl, self._impl, type.number) 3386 else: 3387 result = dispatch(handler, type.method, self) 3388 if result != DELEGATED and hasattr(handler, "handlers"): 3389 for h in handler.handlers: 3390 self.dispatch(h, type)
3391 3392 3393 @property
3394 - def reactor(self):
3395 """Returns the reactor associated with the event.""" 3396 return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
3397
3398 - def __getattr__(self, name):
3399 r = self.reactor 3400 if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name: 3401 return r 3402 else: 3403 return super(Event, self).__getattr__(name)
3404 3405 @property
3406 - def transport(self):
3407 """Returns the transport associated with the event, or null if none is associated with it.""" 3408 return Transport.wrap(pn_event_transport(self._impl))
3409 3410 @property
3411 - def connection(self):
3412 """Returns the connection associated with the event, or null if none is associated with it.""" 3413 return Connection.wrap(pn_event_connection(self._impl))
3414 3415 @property
3416 - def session(self):
3417 """Returns the session associated with the event, or null if none is associated with it.""" 3418 return Session.wrap(pn_event_session(self._impl))
3419 3420 @property 3424 3425 @property
3426 - def sender(self):
3427 """Returns the sender link associated with the event, or null if 3428 none is associated with it. This is essentially an alias for 3429 link(), that does an additional checkon the type of the 3430 link.""" 3431 l = self.link 3432 if l and l.is_sender: 3433 return l 3434 else: 3435 return None
3436 3437 @property
3438 - def receiver(self):
3439 """Returns the receiver link associated with the event, or null if 3440 none is associated with it. This is essentially an alias for 3441 link(), that does an additional checkon the type of the link.""" 3442 l = self.link 3443 if l and l.is_receiver: 3444 return l 3445 else: 3446 return None
3447 3448 @property
3449 - def delivery(self):
3450 """Returns the delivery associated with the event, or null if none is associated with it.""" 3451 return Delivery.wrap(pn_event_delivery(self._impl))
3452
3453 - def __repr__(self):
3454 return "%s(%s)" % (self.type, self.context)
3455
3456 -class LazyHandlers(object):
3457 - def __get__(self, obj, clazz):
3458 if obj is None: 3459 return self 3460 ret = [] 3461 obj.__dict__['handlers'] = ret 3462 return ret
3463
3464 -class Handler(object):
3465 handlers = LazyHandlers() 3466
3467 - def on_unhandled(self, method, *args):
3468 pass
3469
3470 -class _cadapter:
3471
3472 - def __init__(self, handler, on_error=None):
3473 self.handler = handler 3474 self.on_error = on_error
3475
3476 - def dispatch(self, cevent, ctype):
3477 ev = Event.wrap(cevent, ctype) 3478 ev.dispatch(self.handler)
3479
3480 - def exception(self, exc, val, tb):
3481 if self.on_error is None: 3482 _compat.raise_(exc, val, tb) 3483 else: 3484 self.on_error((exc, val, tb))
3485
3486 -class WrappedHandlersChildSurrogate:
3487 - def __init__(self, delegate):
3488 self.handlers = [] 3489 self.delegate = weakref.ref(delegate)
3490
3491 - def on_unhandled(self, method, event):
3492 delegate = self.delegate() 3493 if delegate: 3494 dispatch(delegate, method, event)
3495
3496 3497 -class WrappedHandlersProperty(object):
3498 - def __get__(self, obj, clazz):
3499 if obj is None: 3500 return None 3501 return self.surrogate(obj).handlers
3502
3503 - def __set__(self, obj, value):
3504 self.surrogate(obj).handlers = value
3505
3506 - def surrogate(self, obj):
3507 key = "_surrogate" 3508 objdict = obj.__dict__ 3509 surrogate = objdict.get(key, None) 3510 if surrogate is None: 3511 objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj) 3512 obj.add(surrogate) 3513 return surrogate
3514
3515 -class WrappedHandler(Wrapper):
3516 3517 handlers = WrappedHandlersProperty() 3518 3519 @classmethod
3520 - def wrap(cls, impl, on_error=None):
3521 if impl is None: 3522 return None 3523 else: 3524 handler = cls(impl) 3525 handler.__dict__["on_error"] = on_error 3526 return handler
3527
3528 - def __init__(self, impl_or_constructor):
3529 Wrapper.__init__(self, impl_or_constructor) 3530 if list(self.__class__.__mro__).index(WrappedHandler) > 1: 3531 # instantiate the surrogate 3532 self.handlers.extend([])
3533
3534 - def _on_error(self, info):
3535 on_error = getattr(self, "on_error", None) 3536 if on_error is None: 3537 _compat.raise_(info[0], info[1], info[2]) 3538 else: 3539 on_error(info)
3540
3541 - def add(self, handler, on_error=None):
3542 if handler is None: return 3543 if on_error is None: on_error = self._on_error 3544 impl = _chandler(handler, on_error) 3545 pn_handler_add(self._impl, impl) 3546 pn_decref(impl)
3547
3548 - def clear(self):
3549 pn_handler_clear(self._impl)
3550
3551 -def _chandler(obj, on_error=None):
3552 if obj is None: 3553 return None 3554 elif isinstance(obj, WrappedHandler): 3555 impl = obj._impl 3556 pn_incref(impl) 3557 return impl 3558 else: 3559 return pn_pyhandler(_cadapter(obj, on_error))
3560
3561 -class Url(object):
3562 """ 3563 Simple URL parser/constructor, handles URLs of the form: 3564 3565 <scheme>://<user>:<password>@<host>:<port>/<path> 3566 3567 All components can be None if not specified in the URL string. 3568 3569 The port can be specified as a service name, e.g. 'amqp' in the 3570 URL string but Url.port always gives the integer value. 3571 3572 Warning: The placement of user and password in URLs is not 3573 recommended. It can result in credentials leaking out in program 3574 logs. Use connection configuration attributes instead. 3575 3576 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' 3577 @ivar user: Username 3578 @ivar password: Password 3579 @ivar host: Host name, ipv6 literal or ipv4 dotted quad. 3580 @ivar port: Integer port. 3581 @ivar host_port: Returns host:port 3582 """ 3583 3584 AMQPS = "amqps" 3585 AMQP = "amqp" 3586
3587 - class Port(int):
3588 """An integer port number that can be constructed from a service name string""" 3589
3590 - def __new__(cls, value):
3591 """@param value: integer port number or string service name.""" 3592 port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) 3593 setattr(port, 'name', str(value)) 3594 return port
3595
3596 - def __eq__(self, x): return str(self) == x or int(self) == x
3597 - def __ne__(self, x): return not self == x
3598 - def __str__(self): return str(self.name)
3599 3600 @staticmethod
3601 - def _port_int(value):
3602 """Convert service, an integer or a service name, into an integer port number.""" 3603 try: 3604 return int(value) 3605 except ValueError: 3606 try: 3607 return socket.getservbyname(value) 3608 except socket.error: 3609 # Not every system has amqp/amqps defined as a service 3610 if value == Url.AMQPS: return 5671 3611 elif value == Url.AMQP: return 5672 3612 else: 3613 raise ValueError("Not a valid port number or service name: '%s'" % value)
3614
3615 - def __init__(self, url=None, defaults=True, **kwargs):
3616 """ 3617 @param url: URL string to parse. 3618 @param defaults: If true, fill in missing default values in the URL. 3619 If false, you can fill them in later by calling self.defaults() 3620 @param kwargs: scheme, user, password, host, port, path. 3621 If specified, replaces corresponding part in url string. 3622 """ 3623 if url: 3624 self._url = pn_url_parse(unicode2utf8(str(url))) 3625 if not self._url: raise ValueError("Invalid URL '%s'" % url) 3626 else: 3627 self._url = pn_url() 3628 for k in kwargs: # Let kwargs override values parsed from url 3629 getattr(self, k) # Check for invalid kwargs 3630 setattr(self, k, kwargs[k]) 3631 if defaults: self.defaults()
3632
3633 - class PartDescriptor(object):
3634 - def __init__(self, part):
3635 self.getter = globals()["pn_url_get_%s" % part] 3636 self.setter = globals()["pn_url_set_%s" % part]
3637 - def __get__(self, obj, type=None): return self.getter(obj._url)
3638 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3639 3640 scheme = PartDescriptor('scheme') 3641 username = PartDescriptor('username') 3642 password = PartDescriptor('password') 3643 host = PartDescriptor('host') 3644 path = PartDescriptor('path') 3645
3646 - def _get_port(self):
3647 portstr = pn_url_get_port(self._url) 3648 return portstr and Url.Port(portstr)
3649
3650 - def _set_port(self, value):
3651 if value is None: pn_url_set_port(self._url, None) 3652 else: pn_url_set_port(self._url, str(Url.Port(value)))
3653 3654 port = property(_get_port, _set_port) 3655
3656 - def __str__(self): return pn_url_str(self._url)
3657
3658 - def __repr__(self):
3659 return "Url(%s://%s/%s)" % (self.scheme, self.host, self.path)
3660
3661 - def __eq__(self, x): return str(self) == str(x)
3662 - def __ne__(self, x): return not self == x
3663
3664 - def __del__(self):
3665 pn_url_free(self._url); 3666 del self._url
3667
3668 - def defaults(self):
3669 """ 3670 Fill in missing values (scheme, host or port) with defaults 3671 @return: self 3672 """ 3673 self.scheme = self.scheme or self.AMQP 3674 self.host = self.host or '0.0.0.0' 3675 self.port = self.port or self.Port(self.scheme) 3676 return self
3677 3678 __all__ = [ 3679 "API_LANGUAGE", 3680 "IMPLEMENTATION_LANGUAGE", 3681 "ABORTED", 3682 "ACCEPTED", 3683 "PENDING", 3684 "REJECTED", 3685 "RELEASED", 3686 "MODIFIED", 3687 "SETTLED", 3688 "UNDESCRIBED", 3689 "Array", 3690 "Collector", 3691 "Condition", 3692 "Connection", 3693 "Data", 3694 "Delivery", 3695 "Disposition", 3696 "Described", 3697 "Endpoint", 3698 "Event", 3699 "EventType", 3700 "Handler", 3701 "Link", 3702 "Message", 3703 "MessageException", 3704 "ProtonException", 3705 "VERSION_MAJOR", 3706 "VERSION_MINOR", 3707 "Receiver", 3708 "SASL", 3709 "Sender", 3710 "Session", 3711 "SessionException", 3712 "SSL", 3713 "SSLDomain", 3714 "SSLSessionDetails", 3715 "SSLUnavailable", 3716 "SSLException", 3717 "Terminus", 3718 "Timeout", 3719 "Interrupt", 3720 "Transport", 3721 "TransportException", 3722 "Url", 3723 "char", 3724 "dispatch", 3725 "symbol", 3726 "timestamp", 3727 "ulong", 3728 "byte", 3729 "short", 3730 "int32", 3731 "ubyte", 3732 "ushort", 3733 "uint", 3734 "float32", 3735 "decimal32", 3736 "decimal64", 3737 "decimal128" 3738 ] 3739