Package wtf :: Package opi :: Module listener
[hide private]
[frames] | no frames]

Source Code for Module wtf.opi.listener

  1  # -*- coding: ascii -*- 
  2  u""" 
  3  :Copyright: 
  4   
  5   Copyright 2006 - 2014 
  6   Andr\xe9 Malo or his licensors, as applicable 
  7   
  8  :License: 
  9   
 10   Licensed under the Apache License, Version 2.0 (the "License"); 
 11   you may not use this file except in compliance with the License. 
 12   You may obtain a copy of the License at 
 13   
 14       http://www.apache.org/licenses/LICENSE-2.0 
 15   
 16   Unless required by applicable law or agreed to in writing, software 
 17   distributed under the License is distributed on an "AS IS" BASIS, 
 18   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 19   See the License for the specific language governing permissions and 
 20   limitations under the License. 
 21   
 22  ================= 
 23   Listener Socket 
 24  ================ 
 25   
 26  All listening sockets are represented by this single abstraction. 
 27   
 28  :Variables: 
 29    `AF_INET` : ``int`` 
 30      INET address family 
 31   
 32    `AF_INET6` : ``int`` 
 33      INET6 address family (``None`` if not available) 
 34   
 35    `AF_UNIX` : ``int`` 
 36      UNIX address family (``None`` if not available) 
 37  """ 
 38  __author__ = u"Andr\xe9 Malo" 
 39  __docformat__ = "restructuredtext en" 
 40   
 41  import errno as _errno 
 42  import os as _os 
 43  import re as _re 
 44  import socket as _socket 
 45  import sys as _sys 
 46  import warnings as _warnings 
 47   
 48  from wtf import Error, WtfWarning 
 49  from wtf import osutil as _osutil 
 50  from wtf.config import ConfigurationError 
 51   
 52  AF_INET = _socket.AF_INET 
 53  AF_INET6 = getattr(_socket, "AF_INET6", None) 
 54  AF_UNIX = getattr(_socket, "AF_UNIX", None) 
 55   
 56   
57 -class ShutdownWarning(WtfWarning):
58 """ Socket shutdown failures """
59 60
61 -class ListenerWarning(WtfWarning):
62 """ Duplicate listener detected """
63 64
65 -class SocketError(Error):
66 """ Socket error """
67 68
69 -class SocketTimeout(SocketError):
70 """ Socket timeout """
71 72
73 -class SocketPollError(SocketError):
74 """ Socket poll error """
75 76
77 -class ListenerSocket(object):
78 """ 79 Abstraction to the listener socket 80 81 This actually can contain more than one actual socket, but provides 82 an interface as it was one. 83 84 :CVariables: 85 `_TYPES` : ``tuple`` 86 Supported socket types and configuration patterns 87 (``(('name', (regex, ...)), ...)``) 88 89 :IVariables: 90 `_sockets` : ``list`` 91 List of actual sockets (``[socket, ...]``) 92 """ 93 _TYPES = ( 94 (u'tcp', ( 95 _re.compile(ur'(?:(?P<ip>[^:]+|\[[^\]]+]|\*):)?(?P<port>\d+)$'), 96 )), 97 (u'unix', ( 98 _re.compile(ur'(?P<path>.+)\((?P<perm>\d+)\)$'), 99 _re.compile(ur'(?P<path>.+)(?P<perm>)$'), 100 )), 101 ) 102 _sockets = None 103
104 - def __init__(self, listen, basedir=None):
105 """ 106 Initialization 107 108 :Parameters: 109 `listen` : iterable 110 The addresses to listen on, may not be empty 111 112 `basedir` : ``str`` 113 Basedir for relative paths 114 """ 115 listen = tuple(listen) 116 if not listen: 117 raise ConfigurationError("No listeners configured") 118 119 # The way some OS work require us to follow a two-step-approach here: 120 # First we "prepare" the sockets by determining the details for 121 # every socket. Then we reorder them, so we can filter out dupes 122 # or includes. Includes are bindings which are already handled 123 # by another binding, like localhost:1206 is included in *:1206 124 # A special, but related problem is the inclusion of IPv4 in IPv6. 125 msg = "Invalid listen configuration: %s" 126 self._sockets, kinds = [], dict(self._TYPES) 127 for bind in listen: 128 obind, fixed, tocheck = repr(bind), None, self._TYPES 129 if ':' in bind: 130 fixed = bind[:bind.find(':')].lower() 131 if fixed in kinds: 132 tocheck = (fixed, kinds[fixed]) 133 bind = bind[len(fixed) + 1:] 134 else: 135 fixed = None 136 for kind, rexs in tocheck: 137 if bind.startswith(kind + ':'): 138 fixed, bind = bind, bind[len(kind) + 1:] 139 for rex in rexs: 140 match = rex.match(bind) 141 if match: 142 break 143 else: 144 match = None 145 if match is not None: 146 method = getattr(self, "_setup_" + kind) 147 try: 148 method(match.group, basedir) 149 except ConfigurationError, e: 150 stre = str(e) 151 e = _sys.exc_info() 152 try: 153 raise e[0], (msg % obind) + ": " + stre, e[2] 154 finally: 155 del e 156 break 157 else: 158 raise ConfigurationError(msg % obind) 159 160 self.accept = self._finalize_listeners(msg)
161
162 - def _finalize_listeners(self, msg):
163 """ 164 Finalize the listening sockets 165 166 This method actually sets the sockets to the LISTEN state. 167 168 :Parameters: 169 - `msg`: Configuration error message template 170 171 :Types: 172 - `msg`: ``str`` 173 174 :return: Socket acceptor 175 :rtype: ``callable`` 176 177 :Exceptions: 178 - `ConfigurationError`: No listeners available 179 """ 180 if not self._sockets: 181 raise ConfigurationError("No listener sockets") 182 183 memory, toremove = {}, [] 184 for socket in sorted(self._sockets): 185 if socket.key() in memory or socket.anykey() in memory: 186 # Do not issue the warning on any-ipv4/ipv6 inclusion 187 if socket.key() != socket.anykey() or \ 188 memory[socket.key()] == socket.family(): 189 _warnings.warn("Duplicate listen: %s" % (socket.bindspec), 190 category=ListenerWarning) 191 toremove.append(socket) 192 continue 193 _osutil.close_on_exec(socket) 194 socket.setblocking(False) 195 try: 196 socket.bind() 197 socket.listen(_socket.SOMAXCONN) 198 except _socket.error, e: 199 stre = str(e) 200 e = _sys.exc_info() 201 try: 202 raise ConfigurationError, \ 203 (msg % socket.bindspec) + ": " + stre, e[2] 204 finally: 205 del e 206 else: 207 memory[socket.key()] = socket.family() 208 209 while toremove: 210 socket = toremove.pop() 211 self._sockets.remove(socket) 212 try: 213 socket.close() 214 except (_socket.error, OSError), e: 215 _warnings.warn("Socket shutdown problem: %s" % str(e), 216 category=ShutdownWarning) 217 218 return Acceptor(item.realsocket for item in self._sockets)
219
220 - def __del__(self):
221 self.close()
222
223 - def close(self):
224 """ Shutdown the sockets """ 225 sockets, self._sockets = self._sockets, None 226 if sockets is not None: 227 for socket in sockets: 228 try: 229 socket.close() 230 except (_socket.error, OSError), e: 231 _warnings.warn("Socket shutdown problem: %s" % str(e), 232 category=ShutdownWarning)
233
234 - def _setup_tcp(self, bind, basedir=None):
235 """ 236 Setup TCP/IP(v6) socket and append it to the global list 237 238 :Parameters: 239 - `bind`: Bind parameter accessor (``match.group``) 240 - `basedir`: Basedir for relative paths (unused) 241 242 :Types: 243 - `bind`: ``callable`` 244 - `basedir`: ``basestring`` 245 """ 246 obind = repr(bind(0)) 247 host, port, flags = bind(u'ip'), bind(u'port'), 0 248 port = int(port) 249 if not host or host == u'*': 250 host, flags = None, _socket.AI_PASSIVE 251 elif host.startswith(u'[') and host.endswith(u']'): 252 host = host[1:-1].encode('ascii') # IPv6 notation [xxx:xxx:xxx] 253 else: 254 host = host.encode('idna') 255 try: 256 adi = _socket.getaddrinfo(host, port, 257 _socket.AF_UNSPEC, _socket.SOCK_STREAM, 0, flags) 258 for family, stype, proto, _, bind in adi: 259 if not _socket.has_ipv6 and family == AF_INET6: 260 continue 261 262 try: 263 socket = _socket.socket(family, stype, proto) 264 except _socket.error, e: 265 if e[0] == _errno.EAFNOSUPPORT and host is None and \ 266 family == AF_INET6: 267 # grmpf. 268 # There are systems (e.g. linux) which emit 269 # IPv6 on ANY, even if they don't support it. 270 # Or is it the libc? Who cares anyway... 271 continue 272 raise 273 socket.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1) 274 self._sockets.append( 275 InetSocket(socket, obind, host, family, bind) 276 ) 277 except _socket.error: 278 e = _sys.exc_info() 279 try: 280 raise ConfigurationError, e[1], e[2] 281 finally: 282 del e
283
284 - def _setup_unix(self, bind, basedir=None):
285 """ 286 Setup UNIX domain socket 287 288 :Parameters: 289 - `bind`: Bind parameter accessor (``match.group``) 290 - `basedir`: Basedir for relative paths 291 292 :Types: 293 - `bind`: ``callable`` 294 - `basedir`: ``str`` 295 """ 296 if AF_UNIX is None: 297 raise ConfigurationError("UNIX domain sockets are not available") 298 299 obind = repr(bind(0)) 300 if bind(u'perm'): 301 try: 302 socket_perm = int(bind('perm'), 8) 303 except (TypeError, ValueError): 304 raise ConfigurationError("Invalid permission") 305 umask = 0777 & ~socket_perm 306 else: 307 umask = None 308 basedir = basedir or _os.getcwd() 309 if not isinstance(basedir, unicode): 310 basedir = basedir.decode(_sys.getfilesystemencoding()) 311 path = _os.path.normpath(_os.path.join( 312 basedir, bind(u'path') 313 )).encode(_sys.getfilesystemencoding()) 314 socket = _socket.socket(AF_UNIX, _socket.SOCK_STREAM) 315 self._sockets.append(UnixSocket(socket, obind, path, umask))
316 317
318 -class SocketDecorator(object):
319 """ 320 Socket decorating container 321 322 Derive from this container in order to build new concrete containers. 323 These containers are necessary for proper duplicate/warning/error 324 handling, because we need some context for the socket. The socket 325 ordering is also defined in these containers (via `__cmp__`). 326 327 :See: `UnixSocket`, `InetSocket` 328 329 :CVariables: 330 - `_famcomp`: Index for address family comparisons 331 332 :IVariables: 333 - `realsocket`: The actual socket object 334 - `bindspec`: The bind specification from the config 335 336 :Types: 337 - `_famcomp`: ``dict`` 338 - `realsocket`: ``socket.socket`` 339 - `bindspec`: ``str`` 340 """ 341 _famcomp = dict((fam, idx) for idx, fam in enumerate(( 342 AF_UNIX, AF_INET6, AF_INET 343 )) if fam is not None) 344
345 - def __init__(self, socket, bindspec):
346 """ 347 Initialization 348 349 :Parameters: 350 - `socket`: The socket object to decorate 351 - `bindspec`: The bind specification from config 352 353 :Types: 354 - `socket`: ``socket.socket`` 355 - `bindspec`: ``str`` 356 """ 357 self.realsocket = socket 358 self.bindspec = bindspec
359
360 - def __cmp__(self, other):
361 """ 362 Compare 3-way with another object 363 364 Comparison is done by the socket family index. If the other object 365 is not a `SocketDecorator`, the ``id()`` s are compared. 366 367 :Parameters: 368 - `other`: The other object 369 370 :Types: 371 - `other`: `SocketDecorator` 372 373 :return: Comparison result (``-1``, ``0``, ``1``) for `self` being 374 less, equal or greater than/to `other` 375 :rtype: ``int`` 376 377 :Exceptions: 378 - `NotImplementedError`: The socket family of either socket is not 379 in the index 380 """ 381 if not isinstance(other, self.__class__): 382 return cmp(id(self), id(other)) 383 try: 384 return cmp( 385 self._famcomp[self.family()], 386 self._famcomp[other.family()] 387 ) 388 except KeyError: 389 raise NotImplementedError()
390
391 - def __eq__(self, other):
392 """ 393 Compary (2-way) by identity 394 395 :Parameters: 396 - `other`: The other object 397 398 :Types: 399 - `other`: `SocketDecorator` 400 401 :return: Are the objects identical? 402 :rtype: ``bool`` 403 """ 404 return id(self) == id(other)
405
406 - def __repr__(self):
407 """ 408 String representation of the object (suitable for debugging) 409 410 :return: The string representation 411 :rtype: ``str`` 412 """ 413 return "<%s.%s fileno=%s, family=%s, key=%r>" % ( 414 self.__class__.__module__, self.__class__.__name__, 415 self.fileno(), self.family(), self.key(), 416 )
417
418 - def __del__(self):
419 """ Destructor """ 420 self.close()
421
422 - def __getattr__(self, name):
423 """ 424 Delegate all undefined symbol requests to the real socket 425 426 :Parameters: 427 - `name`: The symbol to look up 428 429 :Types: 430 - `name`: ``str`` 431 """ 432 return getattr(self.realsocket, name)
433
434 - def bind(self):
435 """ Bind the socket according to its bindspec """ 436 raise NotImplementedError()
437
438 - def family(self):
439 """ 440 Determine the socket address family 441 442 :return: The family 443 :rtype: ``int`` 444 """ 445 raise NotImplementedError()
446
447 - def key(self):
448 """ 449 Determine the key of the socket, derived from the bindspec 450 451 This key can be considered a normalized version of the bindspec. It 452 has to be hashable. 453 454 :return: The key 455 :rtype: any 456 """ 457 raise NotImplementedError()
458
459 - def anykey(self):
460 """ 461 Determine the key of the socket if bindspec would point to ANY 462 463 :See: `key` 464 465 :return: The key 466 :rtype: any 467 """ 468 raise NotImplementedError()
469 470
471 -class UnixSocket(SocketDecorator):
472 """ 473 Decorator for UNIX domain sockets 474 475 :IVariables: 476 - `_bound`: Was the socket bound to a path? 477 - `_path`: The path to bind to 478 - `_umask`: The umask to be set when binding to the path (maybe ``None``) 479 - `_normpath`: The normalized path (symlinks resolved) (used as key) 480 481 :Types: 482 - `_bound`: ``bool`` 483 - `_path`: ``str`` 484 - `_umask`: ``int`` 485 - `_normpath`: ``str`` 486 """
487 - def __init__(self, socket, bindspec, path, umask):
488 """ 489 Initialization 490 491 :Parameters: 492 - `socket`: The actual socket object 493 - `bindspec`: Binding string from configuration 494 - `path`: Path to bind to 495 - `umask`: Umask to apply when binding to the path 496 497 :Types: 498 - `socket`: ``socket.socket`` 499 - `bindspec`: ``str`` 500 - `path`: ``str`` 501 - `umask`: ``int`` 502 """ 503 super(UnixSocket, self).__init__(socket, bindspec) 504 self._bound, self._path, self._umask = False, path, umask 505 self._normpath = _os.path.normpath(_os.path.realpath(path))
506
507 - def close(self):
508 """ Remove the socket path and close the file handle """ 509 _osutil.unlink_silent(self._path) 510 self.realsocket.close()
511
512 - def bind(self):
513 """ Bind to the socket path """ 514 old_umask = None 515 try: 516 if self._umask is not None: 517 old_umask = _os.umask(self._umask) 518 _osutil.unlink_silent(self._path) 519 self._bound, _ = True, self.realsocket.bind(self._path) 520 finally: 521 if old_umask is not None: 522 _os.umask(old_umask)
523
524 - def family(self):
525 """ Determine the socket family """ 526 return AF_UNIX
527
528 - def key(self):
529 """ Determine the socket key """ 530 return self._normpath
531
532 - def anykey(self):
533 """ Determine ANY key """ 534 return None
535 536
537 -class InetSocket(SocketDecorator):
538 """ 539 Decorator for TCP/IP(v6) sockets 540 541 :IVariables: 542 - `_bind`: bind value from ``getaddrinfo(3)`` 543 - `_host`: Hostname/IP (or ``None`` for ANY) 544 - `_family`: socket family 545 546 :Types: 547 - `_bind`: ``tuple`` 548 - `_host`: ``str`` 549 - `_family`: ``int`` 550 """
551 - def __init__(self, socket, bindspec, host, family, bind):
552 """ 553 Initialization 554 555 :Parameters: 556 - `socket`: Actual socket object 557 - `bindspec`: Bind specification from config 558 - `host`: Hostname/IP or ``None`` 559 - `family`: Socket family 560 - `bind`: bind value from ``getaddrinfo(3)`` 561 562 :Types: 563 - `socket`: ``socket.socket`` 564 - `bindspec`: ``str`` 565 - `host`: ``str`` 566 - `family`: ``int`` 567 - `bind`: ``tuple`` 568 """ 569 super(InetSocket, self).__init__(socket, bindspec) 570 self._bind, self._host, self._family = bind, host, family
571
572 - def __cmp__(self, other):
573 """ 574 Compare (3-way) to a different object 575 576 In addition to the base's ``__cmp__`` method, we compare the host 577 and the rest of the bind value. 578 """ 579 return ( 580 super(InetSocket, self).__cmp__(other) or 581 cmp(self._host or '', other._host or '') or 582 cmp(self._bind[1:], other._bind[1:]) 583 )
584
585 - def bind(self):
586 """ Bind the socket according to bindspec """ 587 self.realsocket.bind(self._bind)
588
589 - def family(self):
590 """ Determine the socket family """ 591 return self._family
592
593 - def key(self):
594 """ Determine the socket key """ 595 if self._host is None: 596 return self.anykey() 597 return (self._host, self._family, self._bind[1])
598
599 - def anykey(self):
600 """ Determine the socket ANY key """ 601 return (None, AF_INET, self._bind[1])
602 603
604 -class Acceptor(object):
605 """ Acceptor for multiple connections """ 606 _IGNOREFAIL = set(getattr(_errno, _name, None) for _name in """ 607 EINTR 608 ENOBUFS 609 EPROTO 610 ECONNABORTED 611 ECONNRESET 612 ETIMEDOUT 613 EHOSTUNREACH 614 ENETUNREACH 615 EAGAIN 616 EWOULDBLOCK 617 """.split()) 618 if None in _IGNOREFAIL: 619 _IGNOREFAIL.remove(None) 620
621 - def __init__(self, sockets):
622 """ 623 Initialization 624 625 :Parameters: 626 - `sockets`: List of sockets to poll 627 628 :Types: 629 - `sockets`: ``iterable`` 630 """ 631 import collections, select 632 try: 633 pollset = select.poll 634 except AttributeError: 635 pollset = _SelectAdapter() 636 else: 637 pollset = _PollAdapter() 638 self._fdmap = {} 639 for socket in sockets: 640 fd = socket.fileno() 641 pollset.add(fd) 642 self._fdmap[fd] = socket 643 self._set = pollset 644 self._backlog = collections.deque()
645
646 - def __call__(self, timeout=None):
647 """ 648 Accept a new connection 649 650 :Parameters: 651 - `timeout`: Timeout in seconds 652 653 :Types: 654 - `timeout`: ``float`` 655 656 :return: New socket and the peername 657 :rtype: ``tuple`` 658 659 :Exceptions: 660 - `SocketTimeout`: accept call timed out 661 - `SocketError`: An error occured while accepting the socket 662 """ 663 while True: 664 try: 665 sock, peer = self._accept(timeout) 666 except _socket.error, e: 667 if e[0] in self._IGNOREFAIL: 668 continue 669 e = _sys.exc_info() 670 try: 671 raise SocketError, e[1], e[2] 672 finally: 673 del e 674 _osutil.close_on_exec(sock.fileno()) 675 return sock, peer
676
677 - def _accept(self, timeout=None):
678 """ 679 Accept a connection 680 681 :Parameters: 682 - `timeout`: Timeout in seconds 683 684 :Types: 685 - `timeout`: ``float`` 686 687 :return: The new connection socket and the peername 688 :rtype: ``tuple`` 689 690 :Exceptions: 691 - `SocketTimeout`: accept call timed out 692 - `SocketPollError`: Error with poll call 693 - `socket.error`: Socket error 694 """ 695 backlog = self._backlog 696 if not backlog: 697 pollset, timeout_used = self._set, timeout 698 if timeout_used is None: 699 timeout_used = 1000 700 else: 701 timeout_used = int(timeout_used * 1000) 702 while True: 703 try: 704 ready = pollset.poll(timeout_used) 705 except pollset.error, e: 706 if e[0] == _errno.EINTR: 707 continue 708 e = _sys.exc_info() 709 try: 710 raise SocketPollError, e[1], e[2] 711 finally: 712 del e 713 if ready: 714 break 715 elif timeout is None: 716 continue 717 raise SocketTimeout(timeout) 718 backlog.extendleft(item[0] for item in ready) 719 return self._fdmap[backlog.pop()].accept()
720 721
722 -class _AdapterInterface(object):
723 """ 724 Adapter poll API to select implementation 725 726 :IVariables: 727 - `error`: Exception to catch on poll() 728 729 :Types: 730 - `error`: ``Exception`` 731 """ 732
733 - def __init__(self):
734 """ Initialization """
735
736 - def add(self, fd):
737 """ 738 Register a new file descriptor 739 740 :Parameters: 741 - `fd`: File descriptor to register 742 743 :Types: 744 - `fd`: ``int`` 745 746 :Exceptions: 747 - `ValueError`: Error while creating an integer out of `fd` 748 - `TypeError`: Error while creating an integer out of `fd` 749 """
750
751 - def remove(self, fd):
752 """ 753 Unregister a file descriptor 754 755 :Parameters: 756 - `fd`: File descriptor to unregister 757 758 :Types: 759 - `fd`: ``int`` 760 761 :Exceptions: 762 - `ValueError`: Error while creating an integer out of `fd` 763 - `TypeError`: Error while creating an integer out of `fd` 764 - `KeyError`: The descriptor was not registered before 765 """
766
767 - def poll(self, timeout=None):
768 """ 769 Poll the list of descriptors 770 771 :Parameters: 772 - `timeout`: Poll timeout in milliseconds 773 774 :Types: 775 - `timeout`: ``int`` 776 777 :return: List of (descriptor, event) tuples, event is useless, though 778 :rtype: ``list`` 779 780 :Exceptions: 781 - `self.error`: Select error occured 782 """
783 784
785 -class _SelectAdapter(object):
786 __implements__ = [_AdapterInterface] 787
788 - def __init__(self):
789 import select 790 self.error = select.error 791 self._rfds = set()
792
793 - def add(self, fd):
794 self._rfds.add(int(fd))
795
796 - def remove(self, fd):
797 self._rfds.remove(int(fd))
798
799 - def poll(self, timeout=None):
800 import select 801 if timeout is not None: 802 timeout = float(timeout) / 1000.0 803 rfds, _, _ = select.select(self._rfds, (), (), timeout) 804 return [(item, 0) for item in rfds]
805 806
807 -class _PollAdapter(object):
808 __implements__ = [_AdapterInterface] 809
810 - def __init__(self):
811 import select 812 self.error = select.error 813 self._pollset = select.poll() 814 self.poll = self._pollset.poll
815
816 - def add(self, fd):
817 import select 818 self._pollset.register(fd, select.POLLIN)
819
820 - def remove(self, fd):
821 self._pollset.unregister(fd)
822
823 - def poll(self, timeout=None):
824 return self._pollset.poll(timeout)
825