1
2 u"""
3 :Copyright:
4
5 Copyright 2006 - 2014
6 Andr\xe9 Malo or his licensors, as applicable
7
8 :License:
9
10 Licensed under the Apache License, Version 2.0 (the "License");
11 you may not use this file except in compliance with the License.
12 You may obtain a copy of the License at
13
14 http://www.apache.org/licenses/LICENSE-2.0
15
16 Unless required by applicable law or agreed to in writing, software
17 distributed under the License is distributed on an "AS IS" BASIS,
18 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 See the License for the specific language governing permissions and
20 limitations under the License.
21
22 =================
23 Listener Socket
24 ================
25
26 All listening sockets are represented by this single abstraction.
27
28 :Variables:
29 `AF_INET` : ``int``
30 INET address family
31
32 `AF_INET6` : ``int``
33 INET6 address family (``None`` if not available)
34
35 `AF_UNIX` : ``int``
36 UNIX address family (``None`` if not available)
37 """
38 __author__ = u"Andr\xe9 Malo"
39 __docformat__ = "restructuredtext en"
40
41 import errno as _errno
42 import os as _os
43 import re as _re
44 import socket as _socket
45 import sys as _sys
46 import warnings as _warnings
47
48 from wtf import Error, WtfWarning
49 from wtf import osutil as _osutil
50 from wtf.config import ConfigurationError
51
52 AF_INET = _socket.AF_INET
53 AF_INET6 = getattr(_socket, "AF_INET6", None)
54 AF_UNIX = getattr(_socket, "AF_UNIX", None)
55
56
58 """ Socket shutdown failures """
59
60
62 """ Duplicate listener detected """
63
64
67
68
70 """ Socket timeout """
71
72
74 """ Socket poll error """
75
76
78 """
79 Abstraction to the listener socket
80
81 This actually can contain more than one actual socket, but provides
82 an interface as it was one.
83
84 :CVariables:
85 `_TYPES` : ``tuple``
86 Supported socket types and configuration patterns
87 (``(('name', (regex, ...)), ...)``)
88
89 :IVariables:
90 `_sockets` : ``list``
91 List of actual sockets (``[socket, ...]``)
92 """
93 _TYPES = (
94 (u'tcp', (
95 _re.compile(ur'(?:(?P<ip>[^:]+|\[[^\]]+]|\*):)?(?P<port>\d+)$'),
96 )),
97 (u'unix', (
98 _re.compile(ur'(?P<path>.+)\((?P<perm>\d+)\)$'),
99 _re.compile(ur'(?P<path>.+)(?P<perm>)$'),
100 )),
101 )
102 _sockets = None
103
104 - def __init__(self, listen, basedir=None):
105 """
106 Initialization
107
108 :Parameters:
109 `listen` : iterable
110 The addresses to listen on, may not be empty
111
112 `basedir` : ``str``
113 Basedir for relative paths
114 """
115 listen = tuple(listen)
116 if not listen:
117 raise ConfigurationError("No listeners configured")
118
119
120
121
122
123
124
125 msg = "Invalid listen configuration: %s"
126 self._sockets, kinds = [], dict(self._TYPES)
127 for bind in listen:
128 obind, fixed, tocheck = repr(bind), None, self._TYPES
129 if ':' in bind:
130 fixed = bind[:bind.find(':')].lower()
131 if fixed in kinds:
132 tocheck = (fixed, kinds[fixed])
133 bind = bind[len(fixed) + 1:]
134 else:
135 fixed = None
136 for kind, rexs in tocheck:
137 if bind.startswith(kind + ':'):
138 fixed, bind = bind, bind[len(kind) + 1:]
139 for rex in rexs:
140 match = rex.match(bind)
141 if match:
142 break
143 else:
144 match = None
145 if match is not None:
146 method = getattr(self, "_setup_" + kind)
147 try:
148 method(match.group, basedir)
149 except ConfigurationError, e:
150 stre = str(e)
151 e = _sys.exc_info()
152 try:
153 raise e[0], (msg % obind) + ": " + stre, e[2]
154 finally:
155 del e
156 break
157 else:
158 raise ConfigurationError(msg % obind)
159
160 self.accept = self._finalize_listeners(msg)
161
163 """
164 Finalize the listening sockets
165
166 This method actually sets the sockets to the LISTEN state.
167
168 :Parameters:
169 - `msg`: Configuration error message template
170
171 :Types:
172 - `msg`: ``str``
173
174 :return: Socket acceptor
175 :rtype: ``callable``
176
177 :Exceptions:
178 - `ConfigurationError`: No listeners available
179 """
180 if not self._sockets:
181 raise ConfigurationError("No listener sockets")
182
183 memory, toremove = {}, []
184 for socket in sorted(self._sockets):
185 if socket.key() in memory or socket.anykey() in memory:
186
187 if socket.key() != socket.anykey() or \
188 memory[socket.key()] == socket.family():
189 _warnings.warn("Duplicate listen: %s" % (socket.bindspec),
190 category=ListenerWarning)
191 toremove.append(socket)
192 continue
193 _osutil.close_on_exec(socket)
194 socket.setblocking(False)
195 try:
196 socket.bind()
197 socket.listen(_socket.SOMAXCONN)
198 except _socket.error, e:
199 stre = str(e)
200 e = _sys.exc_info()
201 try:
202 raise ConfigurationError, \
203 (msg % socket.bindspec) + ": " + stre, e[2]
204 finally:
205 del e
206 else:
207 memory[socket.key()] = socket.family()
208
209 while toremove:
210 socket = toremove.pop()
211 self._sockets.remove(socket)
212 try:
213 socket.close()
214 except (_socket.error, OSError), e:
215 _warnings.warn("Socket shutdown problem: %s" % str(e),
216 category=ShutdownWarning)
217
218 return Acceptor(item.realsocket for item in self._sockets)
219
222
224 """ Shutdown the sockets """
225 sockets, self._sockets = self._sockets, None
226 if sockets is not None:
227 for socket in sockets:
228 try:
229 socket.close()
230 except (_socket.error, OSError), e:
231 _warnings.warn("Socket shutdown problem: %s" % str(e),
232 category=ShutdownWarning)
233
235 """
236 Setup TCP/IP(v6) socket and append it to the global list
237
238 :Parameters:
239 - `bind`: Bind parameter accessor (``match.group``)
240 - `basedir`: Basedir for relative paths (unused)
241
242 :Types:
243 - `bind`: ``callable``
244 - `basedir`: ``basestring``
245 """
246 obind = repr(bind(0))
247 host, port, flags = bind(u'ip'), bind(u'port'), 0
248 port = int(port)
249 if not host or host == u'*':
250 host, flags = None, _socket.AI_PASSIVE
251 elif host.startswith(u'[') and host.endswith(u']'):
252 host = host[1:-1].encode('ascii')
253 else:
254 host = host.encode('idna')
255 try:
256 adi = _socket.getaddrinfo(host, port,
257 _socket.AF_UNSPEC, _socket.SOCK_STREAM, 0, flags)
258 for family, stype, proto, _, bind in adi:
259 if not _socket.has_ipv6 and family == AF_INET6:
260 continue
261
262 try:
263 socket = _socket.socket(family, stype, proto)
264 except _socket.error, e:
265 if e[0] == _errno.EAFNOSUPPORT and host is None and \
266 family == AF_INET6:
267
268
269
270
271 continue
272 raise
273 socket.setsockopt(_socket.SOL_SOCKET, _socket.SO_REUSEADDR, 1)
274 self._sockets.append(
275 InetSocket(socket, obind, host, family, bind)
276 )
277 except _socket.error:
278 e = _sys.exc_info()
279 try:
280 raise ConfigurationError, e[1], e[2]
281 finally:
282 del e
283
285 """
286 Setup UNIX domain socket
287
288 :Parameters:
289 - `bind`: Bind parameter accessor (``match.group``)
290 - `basedir`: Basedir for relative paths
291
292 :Types:
293 - `bind`: ``callable``
294 - `basedir`: ``str``
295 """
296 if AF_UNIX is None:
297 raise ConfigurationError("UNIX domain sockets are not available")
298
299 obind = repr(bind(0))
300 if bind(u'perm'):
301 try:
302 socket_perm = int(bind('perm'), 8)
303 except (TypeError, ValueError):
304 raise ConfigurationError("Invalid permission")
305 umask = 0777 & ~socket_perm
306 else:
307 umask = None
308 basedir = basedir or _os.getcwd()
309 if not isinstance(basedir, unicode):
310 basedir = basedir.decode(_sys.getfilesystemencoding())
311 path = _os.path.normpath(_os.path.join(
312 basedir, bind(u'path')
313 )).encode(_sys.getfilesystemencoding())
314 socket = _socket.socket(AF_UNIX, _socket.SOCK_STREAM)
315 self._sockets.append(UnixSocket(socket, obind, path, umask))
316
317
319 """
320 Socket decorating container
321
322 Derive from this container in order to build new concrete containers.
323 These containers are necessary for proper duplicate/warning/error
324 handling, because we need some context for the socket. The socket
325 ordering is also defined in these containers (via `__cmp__`).
326
327 :See: `UnixSocket`, `InetSocket`
328
329 :CVariables:
330 - `_famcomp`: Index for address family comparisons
331
332 :IVariables:
333 - `realsocket`: The actual socket object
334 - `bindspec`: The bind specification from the config
335
336 :Types:
337 - `_famcomp`: ``dict``
338 - `realsocket`: ``socket.socket``
339 - `bindspec`: ``str``
340 """
341 _famcomp = dict((fam, idx) for idx, fam in enumerate((
342 AF_UNIX, AF_INET6, AF_INET
343 )) if fam is not None)
344
346 """
347 Initialization
348
349 :Parameters:
350 - `socket`: The socket object to decorate
351 - `bindspec`: The bind specification from config
352
353 :Types:
354 - `socket`: ``socket.socket``
355 - `bindspec`: ``str``
356 """
357 self.realsocket = socket
358 self.bindspec = bindspec
359
361 """
362 Compare 3-way with another object
363
364 Comparison is done by the socket family index. If the other object
365 is not a `SocketDecorator`, the ``id()`` s are compared.
366
367 :Parameters:
368 - `other`: The other object
369
370 :Types:
371 - `other`: `SocketDecorator`
372
373 :return: Comparison result (``-1``, ``0``, ``1``) for `self` being
374 less, equal or greater than/to `other`
375 :rtype: ``int``
376
377 :Exceptions:
378 - `NotImplementedError`: The socket family of either socket is not
379 in the index
380 """
381 if not isinstance(other, self.__class__):
382 return cmp(id(self), id(other))
383 try:
384 return cmp(
385 self._famcomp[self.family()],
386 self._famcomp[other.family()]
387 )
388 except KeyError:
389 raise NotImplementedError()
390
392 """
393 Compary (2-way) by identity
394
395 :Parameters:
396 - `other`: The other object
397
398 :Types:
399 - `other`: `SocketDecorator`
400
401 :return: Are the objects identical?
402 :rtype: ``bool``
403 """
404 return id(self) == id(other)
405
407 """
408 String representation of the object (suitable for debugging)
409
410 :return: The string representation
411 :rtype: ``str``
412 """
413 return "<%s.%s fileno=%s, family=%s, key=%r>" % (
414 self.__class__.__module__, self.__class__.__name__,
415 self.fileno(), self.family(), self.key(),
416 )
417
419 """ Destructor """
420 self.close()
421
423 """
424 Delegate all undefined symbol requests to the real socket
425
426 :Parameters:
427 - `name`: The symbol to look up
428
429 :Types:
430 - `name`: ``str``
431 """
432 return getattr(self.realsocket, name)
433
435 """ Bind the socket according to its bindspec """
436 raise NotImplementedError()
437
439 """
440 Determine the socket address family
441
442 :return: The family
443 :rtype: ``int``
444 """
445 raise NotImplementedError()
446
448 """
449 Determine the key of the socket, derived from the bindspec
450
451 This key can be considered a normalized version of the bindspec. It
452 has to be hashable.
453
454 :return: The key
455 :rtype: any
456 """
457 raise NotImplementedError()
458
460 """
461 Determine the key of the socket if bindspec would point to ANY
462
463 :See: `key`
464
465 :return: The key
466 :rtype: any
467 """
468 raise NotImplementedError()
469
470
472 """
473 Decorator for UNIX domain sockets
474
475 :IVariables:
476 - `_bound`: Was the socket bound to a path?
477 - `_path`: The path to bind to
478 - `_umask`: The umask to be set when binding to the path (maybe ``None``)
479 - `_normpath`: The normalized path (symlinks resolved) (used as key)
480
481 :Types:
482 - `_bound`: ``bool``
483 - `_path`: ``str``
484 - `_umask`: ``int``
485 - `_normpath`: ``str``
486 """
487 - def __init__(self, socket, bindspec, path, umask):
488 """
489 Initialization
490
491 :Parameters:
492 - `socket`: The actual socket object
493 - `bindspec`: Binding string from configuration
494 - `path`: Path to bind to
495 - `umask`: Umask to apply when binding to the path
496
497 :Types:
498 - `socket`: ``socket.socket``
499 - `bindspec`: ``str``
500 - `path`: ``str``
501 - `umask`: ``int``
502 """
503 super(UnixSocket, self).__init__(socket, bindspec)
504 self._bound, self._path, self._umask = False, path, umask
505 self._normpath = _os.path.normpath(_os.path.realpath(path))
506
508 """ Remove the socket path and close the file handle """
509 _osutil.unlink_silent(self._path)
510 self.realsocket.close()
511
513 """ Bind to the socket path """
514 old_umask = None
515 try:
516 if self._umask is not None:
517 old_umask = _os.umask(self._umask)
518 _osutil.unlink_silent(self._path)
519 self._bound, _ = True, self.realsocket.bind(self._path)
520 finally:
521 if old_umask is not None:
522 _os.umask(old_umask)
523
525 """ Determine the socket family """
526 return AF_UNIX
527
529 """ Determine the socket key """
530 return self._normpath
531
533 """ Determine ANY key """
534 return None
535
536
538 """
539 Decorator for TCP/IP(v6) sockets
540
541 :IVariables:
542 - `_bind`: bind value from ``getaddrinfo(3)``
543 - `_host`: Hostname/IP (or ``None`` for ANY)
544 - `_family`: socket family
545
546 :Types:
547 - `_bind`: ``tuple``
548 - `_host`: ``str``
549 - `_family`: ``int``
550 """
551 - def __init__(self, socket, bindspec, host, family, bind):
552 """
553 Initialization
554
555 :Parameters:
556 - `socket`: Actual socket object
557 - `bindspec`: Bind specification from config
558 - `host`: Hostname/IP or ``None``
559 - `family`: Socket family
560 - `bind`: bind value from ``getaddrinfo(3)``
561
562 :Types:
563 - `socket`: ``socket.socket``
564 - `bindspec`: ``str``
565 - `host`: ``str``
566 - `family`: ``int``
567 - `bind`: ``tuple``
568 """
569 super(InetSocket, self).__init__(socket, bindspec)
570 self._bind, self._host, self._family = bind, host, family
571
573 """
574 Compare (3-way) to a different object
575
576 In addition to the base's ``__cmp__`` method, we compare the host
577 and the rest of the bind value.
578 """
579 return (
580 super(InetSocket, self).__cmp__(other) or
581 cmp(self._host or '', other._host or '') or
582 cmp(self._bind[1:], other._bind[1:])
583 )
584
586 """ Bind the socket according to bindspec """
587 self.realsocket.bind(self._bind)
588
590 """ Determine the socket family """
591 return self._family
592
594 """ Determine the socket key """
595 if self._host is None:
596 return self.anykey()
597 return (self._host, self._family, self._bind[1])
598
600 """ Determine the socket ANY key """
601 return (None, AF_INET, self._bind[1])
602
603
605 """ Acceptor for multiple connections """
606 _IGNOREFAIL = set(getattr(_errno, _name, None) for _name in """
607 EINTR
608 ENOBUFS
609 EPROTO
610 ECONNABORTED
611 ECONNRESET
612 ETIMEDOUT
613 EHOSTUNREACH
614 ENETUNREACH
615 EAGAIN
616 EWOULDBLOCK
617 """.split())
618 if None in _IGNOREFAIL:
619 _IGNOREFAIL.remove(None)
620
622 """
623 Initialization
624
625 :Parameters:
626 - `sockets`: List of sockets to poll
627
628 :Types:
629 - `sockets`: ``iterable``
630 """
631 import collections, select
632 try:
633 pollset = select.poll
634 except AttributeError:
635 pollset = _SelectAdapter()
636 else:
637 pollset = _PollAdapter()
638 self._fdmap = {}
639 for socket in sockets:
640 fd = socket.fileno()
641 pollset.add(fd)
642 self._fdmap[fd] = socket
643 self._set = pollset
644 self._backlog = collections.deque()
645
647 """
648 Accept a new connection
649
650 :Parameters:
651 - `timeout`: Timeout in seconds
652
653 :Types:
654 - `timeout`: ``float``
655
656 :return: New socket and the peername
657 :rtype: ``tuple``
658
659 :Exceptions:
660 - `SocketTimeout`: accept call timed out
661 - `SocketError`: An error occured while accepting the socket
662 """
663 while True:
664 try:
665 sock, peer = self._accept(timeout)
666 except _socket.error, e:
667 if e[0] in self._IGNOREFAIL:
668 continue
669 e = _sys.exc_info()
670 try:
671 raise SocketError, e[1], e[2]
672 finally:
673 del e
674 _osutil.close_on_exec(sock.fileno())
675 return sock, peer
676
678 """
679 Accept a connection
680
681 :Parameters:
682 - `timeout`: Timeout in seconds
683
684 :Types:
685 - `timeout`: ``float``
686
687 :return: The new connection socket and the peername
688 :rtype: ``tuple``
689
690 :Exceptions:
691 - `SocketTimeout`: accept call timed out
692 - `SocketPollError`: Error with poll call
693 - `socket.error`: Socket error
694 """
695 backlog = self._backlog
696 if not backlog:
697 pollset, timeout_used = self._set, timeout
698 if timeout_used is None:
699 timeout_used = 1000
700 else:
701 timeout_used = int(timeout_used * 1000)
702 while True:
703 try:
704 ready = pollset.poll(timeout_used)
705 except pollset.error, e:
706 if e[0] == _errno.EINTR:
707 continue
708 e = _sys.exc_info()
709 try:
710 raise SocketPollError, e[1], e[2]
711 finally:
712 del e
713 if ready:
714 break
715 elif timeout is None:
716 continue
717 raise SocketTimeout(timeout)
718 backlog.extendleft(item[0] for item in ready)
719 return self._fdmap[backlog.pop()].accept()
720
721
723 """
724 Adapter poll API to select implementation
725
726 :IVariables:
727 - `error`: Exception to catch on poll()
728
729 :Types:
730 - `error`: ``Exception``
731 """
732
734 """ Initialization """
735
737 """
738 Register a new file descriptor
739
740 :Parameters:
741 - `fd`: File descriptor to register
742
743 :Types:
744 - `fd`: ``int``
745
746 :Exceptions:
747 - `ValueError`: Error while creating an integer out of `fd`
748 - `TypeError`: Error while creating an integer out of `fd`
749 """
750
752 """
753 Unregister a file descriptor
754
755 :Parameters:
756 - `fd`: File descriptor to unregister
757
758 :Types:
759 - `fd`: ``int``
760
761 :Exceptions:
762 - `ValueError`: Error while creating an integer out of `fd`
763 - `TypeError`: Error while creating an integer out of `fd`
764 - `KeyError`: The descriptor was not registered before
765 """
766
767 - def poll(self, timeout=None):
768 """
769 Poll the list of descriptors
770
771 :Parameters:
772 - `timeout`: Poll timeout in milliseconds
773
774 :Types:
775 - `timeout`: ``int``
776
777 :return: List of (descriptor, event) tuples, event is useless, though
778 :rtype: ``list``
779
780 :Exceptions:
781 - `self.error`: Select error occured
782 """
783
784
786 __implements__ = [_AdapterInterface]
787
789 import select
790 self.error = select.error
791 self._rfds = set()
792
794 self._rfds.add(int(fd))
795
797 self._rfds.remove(int(fd))
798
799 - def poll(self, timeout=None):
800 import select
801 if timeout is not None:
802 timeout = float(timeout) / 1000.0
803 rfds, _, _ = select.select(self._rfds, (), (), timeout)
804 return [(item, 0) for item in rfds]
805
806
825