1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 ==============
19 Memcache API
20 ==============
21
22 This module implements a memcache API implementation and connector.
23
24 :Variables:
25 - `DEFAULT_PORT`: Memcache default port
26 - `CRLF`: CRLF sequence, which finishs most of memcache's commands
27 - `STATE_GRACE`: Dead state "grace"
28 - `STATE_RETRY`: Dead state "retry"
29 - `FLAG_COMPRESSED`: Flag for compressed storage
30 - `FLAG_PADDED`: Flag for padded storage
31 - `FLAG_SPLIT`: Flag for split storage
32 - `NO_FLAGS`: Bit mask for checking invalid flag bits
33 - `TYPEMAP`: Type map (id -> codec)
34
35 :Types:
36 - `DEFAULT_PORT`: ``int``
37 - `CRLF`: ``str``
38 - `STATE_GRACE`: ``int``
39 - `STATE_RETRY`: ``int``
40 - `FLAG_COMPRESSED`: ``int``
41 - `FLAG_PADDED`: ``int``
42 - `FLAG_SPLIT`: ``int``
43 - `NO_FLAGS`: ``int``
44 - `TYPEMAP`: ``dict``
45 """
46 __author__ = u"Andr\xe9 Malo"
47 __docformat__ = "restructuredtext en"
48
49 try:
50 import cPickle as _pickle
51 except ImportError:
52 import pickle as _pickle
53 import itertools as _it
54 try:
55 import hashlib as _md5
56 except ImportError:
57 import md5 as _md5
58 import os as _os
59 import socket as _socket
60 import threading as _threading
61 import time as _time
62 import weakref as _weakref
63 try:
64 import zlib as _zlib
65 except ImportError:
66 _zlib = None
67
68 from wtf import Error
69 from wtf import osutil as _osutil
70 from wtf import stream as _stream
71 from wtf import util as _util
72
73 DEFAULT_PORT = 11211
74 CRLF = "\r\n"
75 STATE_GRACE, STATE_RETRY = xrange(2)
76
77
78 hashfunc = _util.hash32
79
80
81
82
83
84
85
86
87
88
89
90
91
92 TYPEMAP = dict(enumerate((
93
94 (None, lambda x: _pickle.dumps(x, -1), _pickle.loads),
95 (unicode,
96 lambda x: x.encode('utf-8'),
97 lambda x: x.decode('utf-8'),
98 ),
99 (str, str, str),
100 (bool,
101 lambda x: x and "1" or "",
102 bool,
103 ),
104 (int, str, int),
105 (long, str, long),
106 )))
107
108 FLAG_COMPRESSED = 256
109 FLAG_PADDED = 512
110 FLAG_SPLIT = 1024
111 NO_FLAGS = ~(FLAG_COMPRESSED | FLAG_PADDED | FLAG_SPLIT)
112
113
115 """ Memcache communication error """
116
118 """ Memcache connect error """
119
121 """ Unrecognized command """
122
124 """ Invalid command line """
125
128
130 """ Unknown error from the server """
131
132
134 """
135 Memcache cluster proxy
136
137 :CVariables:
138 - `DEFAULT_GRACE_TIME`: Default grace time in seconds. See `__init__` for
139 further details.
140 - `DEFAULT_RETRY_TIME`: Default retry time in seconds. See `__init__` for
141 details.
142 - `DEFAULT_COMPRESS_THRESHOLD`: Default minimum size for compressing
143 values
144 - `DEFAULT_PADDED`: Default padding behavior
145 - `DEFAULT_SPLIT`: Default splitting behaviour
146 - `DEFAULT_LARGEST_SLAB`: Default maximum slab size
147 - `_TYPEMAP`: typemap
148
149 :IVariables:
150 - `_pools`: List of available pools
151 - `_weighted`: Weighted list of available pools
152 - `_grace_time`: Grace time for this instance
153 - `_retry_time`: Retry time for this instance
154 - `_compress_threshold`: Minimal size for compression
155 - `_padded`: Pad small values (<16 byte)?
156 - `_split`: Allow large value splitting?
157 - `_prefix`: Key prefix to use
158 - `_largest_slab`: Largest SLAB size
159
160 :Types:
161 - `DEFAULT_GRACE_TIME`: ``int``
162 - `DEFAULT_RETRY_TIME`: ``int``
163 - `DEFAULT_COMPRESS_THRESHOLD`: ``int``
164 - `DEFAULT_PADDED`: ``bool``
165 - `DEFAULT_SPLIT`: ``bool``
166 - `DEFAULT_LARGEST_SLAB`: ``int``
167 - `_TYPEMAP`: ``dict``
168 - `_pools`: ``tuple``
169 - `_weighted`: ``tuple``
170 - `_grace_time`: ``int``
171 - `_retry_time`: ``int``
172 - `_compress_threshold`: ``int``
173 - `_padded`: ``bool``
174 - `_split`: ``bool``
175 - `_prefix`: ``str``
176 - `_largest_slab`: ``int``
177 """
178 DEFAULT_GRACE_TIME = 30
179 DEFAULT_RETRY_TIME = 60
180 DEFAULT_COMPRESS_THRESHOLD = 128
181 DEFAULT_PADDED = True
182 DEFAULT_SPLIT = True
183 DEFAULT_LARGEST_SLAB = 1048576
184 _TYPEMAP = TYPEMAP
185
186 - def __init__(self, pools, prepare=None, grace_time=None, retry_time=None,
187 compress_threshold=None, padded=None, split=None,
188 prefix=None, largest_slab=None):
189 """
190 Initialization
191
192 `grace_time` and `retry_time` describe the behaviour of
193 the dispatcher in case one or more dead pools. The algorithm works
194 as follows:
195
196 If a server is detected to be unreachable, it is marked dead. Now the
197 grace counter starts to run. Now if the server stays dead until
198 `grace_time` is reached requests for the server (read and write) are
199 discarded. This gives short memcache outages (like restarts)
200 a chance to recover without adding load to the other caches
201 of the cluster. However, when the grace time threshold is reached,
202 the server is considered completely dead and the requests are
203 dispatched to the other ones. The now completely-declared-dead
204 server will be retried every `retry_time` seconds from now on until
205 it's vivified again.
206
207 :Parameters:
208 - `pools`: List of memcache connection pools
209 (``[MemcacheConnectionPool, ...]``)
210 - `prepare`: Key preparation function
211 - `grace_time`: Grace time in seconds
212 - `retry_time`: Retry time in seconds
213 - `compress_threshold`: Minimum size for compression. If omitted or
214 ``None``, `DEFAULT_COMPRESS_THRESHOLD` is applied.
215 - `padded`: Pad small values (< 16 byte)? If omitted or ``None``,
216 `DEFAULT_PADDED` is applied
217 - `split`: Split large values? If omitted or ``None``,
218 `DEFAULT_SPLIT` is applied
219 - `prefix`: Prefix for keys. Empty by default
220 - `largest_slab`: Largest SLAB item size of the server, if omitted or
221 ``None``, `DEFAULT_LARGEST_SLAB` is applied.
222
223 :Types:
224 - `pools`: ``iterable``
225 - `prepare`: ``callable``
226 - `grace_time`: ``int``
227 - `retry_time`: ``int``
228 - `compress_threshold`: ``int``
229 - `padded`: ``bool``
230 - `split`: ``bool``
231 - `prefix`: ``str``
232 - `largest_slab`: ``int``
233 """
234
235 if prepare is None:
236 if prefix:
237 prepare = lambda x: prefix + x
238 else:
239 prepare = lambda x: x
240 elif prefix:
241 _prepare = prepare
242 prepare = lambda x: prefix + _prepare(x)
243 self._prepare_key = prepare
244
245
246 if compress_threshold is None:
247 compress_threshold = self.DEFAULT_COMPRESS_THRESHOLD
248 self._compress_threshold = compress_threshold
249 self._padded = padded
250 self._split = split
251 self._largest_slab = \
252 [largest_slab, self.DEFAULT_LARGEST_SLAB][largest_slab is None]
253
254
255 self._pools = tuple(pools)
256 self._weighted = tuple(_it.chain(*[[pool] * pool.weight
257 for pool in self._pools]))
258 self._grace_time = int(
259 [grace_time, self.DEFAULT_GRACE_TIME][grace_time is None])
260 self._retry_time = int(
261 [retry_time, self.DEFAULT_RETRY_TIME][retry_time is None])
262
263 - def delete(self, key, block_time=None, all_pools=False):
264 """
265 Delete a key/value pair from the cache
266
267 :Parameters:
268 - `key`: The key to identify the item to delete
269 - `block_time`: Time to block add and replace requests for this key
270 in seconds. If omitted or ``None``, the blocking time is ``0``.
271 - `all_pools`: Issue delete to each pool? This may be useful to
272 enforce the deletion on backup pools, too. However, it won't
273 delete the key from currently dead pools. So, it might be not
274 that useful after all, but it's the best we can do from this
275 side of the ocean.
276
277 :Types:
278 - `key`: ``str``
279 - `block_time`: ``int``
280 - `all_pools`: ``bool``
281
282 :return: Whether it was really deleted from the main pool (or the
283 current backup pool) of this key (i.e. whether it existed
284 before)
285 :rtype: ``bool``
286 """
287 result = False
288 key = self._prepare_key(key)
289 block_time = max(0, int(block_time or 0))
290 mainpool = None
291 try:
292 conns = self._get_conn(key)
293 if conns is not None:
294 try:
295 conn = conns.keys()[0]
296 mainpool = conn.pool
297 conn.write("delete %s %s%s" % (key, block_time, CRLF))
298 conn.flush()
299 line = self._error(conn.readline())
300 finally:
301 conns = conns.keys()
302 while conns:
303 conns.pop().close()
304 result = line == "DELETED"
305 except _socket.error:
306 pass
307
308 if all_pools:
309 for pool in self._pools:
310 if pool == mainpool or pool.dead:
311 continue
312 try:
313 conn = pool.get_conn()
314 try:
315 conn.write("delete %s %s%s" % (key, block_time, CRLF))
316 conn.flush()
317 conn.readline()
318 finally:
319 conn.close()
320 except _socket.error:
321 pass
322 return result
323
324 - def get(self, *keys):
325 """
326 Get a list of key/value pairs from the cache (if applicable)
327
328 The returned dict contains all pairs it could get. But keys maybe
329 missing or the dict might be completely empty (of course).
330
331 :Parameters:
332 - `keys`: The keys to fetch
333
334 :Types:
335 - `keys`: ``tuple``
336
337 :return: The dict of key/value pairs
338 :rtype: ``dict``
339 """
340
341
342 result = {}
343 if not keys:
344 return result
345 keymap = dict((self._prepare_key(key), key) for key in keys)
346 try:
347 conns = self._get_conn(*keymap.keys())
348 if not conns:
349 return result
350 conns = conns.items()
351 try:
352 while conns:
353 conn, keys = conns.pop()
354 try:
355 conn.write("get %s%s" % (" ".join(keys), CRLF))
356 conn.flush()
357 while True:
358 line = self._error(conn.readline())
359 if line == "END":
360 break
361 elif line.startswith("VALUE "):
362 _, key, flags, length = line.split()
363 flags, length = int(flags), int(length)
364 value = _stream.read_exact(conn, length)
365 if _stream.read_exact(conn, 2) != CRLF:
366
367 conn, _ = None, conn.destroy()
368 return {}
369 try:
370 result[keymap[key]] = \
371 self._decode_value(flags, value)
372 except (TypeError, ValueError):
373 pass
374 except KeyError:
375 raise KeyError('%r, %s: %r' % (
376 line, key, keymap
377 ))
378 except (SystemExit, KeyboardInterrupt):
379 raise
380 except:
381 import sys
382 e = sys.exc_info()
383 try:
384 msg = "%s:: %r, %s, %r" % (
385 str(e[1]), line, flags, value
386 )
387 e = (e[0], msg, e[2])
388 finally:
389 try:
390 raise e[0], e[1], e[2]
391 finally:
392 del e
393 else:
394
395
396 conn, _ = None, conn.destroy()
397 return {}
398 finally:
399 if conn is not None:
400 conn.close()
401 finally:
402 while conns:
403 conns.pop()[0].close()
404 except _socket.error:
405 pass
406 return result
407
408 - def set(self, key, value, max_age):
409 """
410 Set a key/value pair unconditionally
411
412 :Parameters:
413 - `key`: The key to store under
414 - `value`: The value to store (should be picklable)
415 - `max_age`: Maximum age in seconds
416
417 :Types:
418 - `key`: ``str``
419 - `value`: any
420 - `max_age`: ``int``
421
422 :return: Stored successfully?
423 :rtype: ``bool``
424 """
425 return self.store("set", key, value, max_age)
426
427 - def add(self, key, value, max_age):
428 """
429 Set a key/value pair if the key does not exist yet
430
431 :Parameters:
432 - `key`: The key to store under
433 - `value`: The value to store (should be picklable)
434 - `max_age`: Maximum age in seconds
435
436 :Types:
437 - `key`: ``str``
438 - `value`: any
439 - `max_age`: ``int``
440
441 :return: Stored successfully?
442 :rtype: ``bool``
443 """
444 return self.store("add", key, value, max_age)
445
446 - def replace(self, key, value, max_age):
447 """
448 Set a key/value pair only if the key does exist already
449
450 :Parameters:
451 - `key`: The key to store under
452 - `value`: The value to store (should be picklable)
453 - `max_age`: Maximum age in seconds
454
455 :Types:
456 - `key`: ``str``
457 - `value`: any
458 - `max_age`: ``int``
459
460 :return: Stored successfully?
461 :rtype: ``bool``
462 """
463 return self.store("replace", key, value, max_age)
464
465 - def store(self, method, key, value, max_age, compress=True):
466 """
467 Store the value under the given key expiring now + expiry
468
469 :Parameters:
470 - `method`: Actual method to call (``set``, ``add`` or ``replace``)
471 - `key`: The key to store under
472 - `value`: The value to store (should be picklable)
473 - `max_age`: Max age of the entry in seconds
474 - `compress`: Compress the value?
475
476 :Types:
477 - `method`: ``str``
478 - `key`: ``str``
479 - `value`: any
480 - `max_age`: ``int``
481 - `compress`: ``bool``
482
483 :return: Stored successfully?
484 :rtype: ``bool``
485 """
486 conn = None
487 key = self._prepare_key(key)
488 try:
489 try:
490 flags, value = self._encode_value(
491 key, value, max_age, compress
492 )
493 conns = self._get_conn(key)
494 if not conns:
495 return False
496 conn, conns = conns.keys()[0], None
497 expiry = int(_time.time()) + max_age - conn.pool.timediff
498 cmd = "%(cmd)s %(key)s %(flags)s %(exp)s %(len)s%(nl)s" % \
499 dict(
500 cmd=method,
501 key=key,
502 flags=flags,
503 exp=expiry,
504 len=len(value),
505 nl=CRLF,
506 )
507 conn.write(cmd)
508 conn.write(value)
509 conn.write(CRLF)
510 conn.flush()
511 return self._error(conn.readline()) == "STORED"
512 except _socket.error:
513 conn, _ = None, conn.destroy()
514 return False
515 finally:
516 if conn is not None:
517 conn.close()
518
520 """
521 Convert a response line into an error or pass it through
522
523 :Parameters:
524 - `line`: The response line to inspect
525
526 :Types:
527 - `line`: ``str``
528
529 :return: The stripped response line
530 :rtype: ``str``
531
532 :Exceptions:
533 - `CommandError`: Command error
534 - `ClientError`: Client error
535 - `ServerError`: Server error
536 """
537 line = line.strip()
538 if "ERROR" in line:
539 if line == "ERROR":
540 raise CommandError()
541 elif line.startswith("CLIENT_ERROR "):
542 raise ClientError(line[13:])
543 elif line.startswith("SERVER_ERROR "):
544 raise ServerError(line[13:])
545 else:
546 pos = line.find(' ')
547 if pos > 0:
548 raise UnknownError(line[pos + 1:])
549 raise UnknownError()
550 return line
551
553 """
554 Encode a value for the memcache
555
556 :Parameters:
557 - `key`: The store key
558 - `value`: The value to encode
559 - `max_age`: Maxc age of this item
560 - `compress`: Allow value compression?
561
562 :Types:
563 - `key`: ``str``
564 - `value`: any
565 - `max_age`: ``int``
566 - `compress`: ``bool``
567
568 :return: The flags and the encoded value (``(int, str)``)
569 :rtype: ``tuple``
570 """
571 flags, vtype = 0, type(value)
572 for type_id, (kind, encoder, _) in self._TYPEMAP.iteritems():
573 if type_id == 0:
574 continue
575 if vtype is kind:
576 flags, value = type_id, encoder(value)
577 break
578 else:
579 value = self._TYPEMAP[0][1](value)
580 if compress and \
581 len(value) >= self._compress_threshold and _zlib is not None:
582 value = _zlib.compress(value, 9)
583 flags |= FLAG_COMPRESSED
584 if self._padded and len(value) < 16:
585 value = value + "\0" * 16
586 flags |= FLAG_PADDED
587 if (len(value) + len(key) + 100) > self._largest_slab:
588 tpl = "split:%s:%s-%%s" % (
589 _md5.md5(_os.urandom(20)).hexdigest(), key
590 )
591 blocklen = self._largest_slab - len(tpl) - 100
592 skeys, idx = [], 0
593 while value:
594 skey = tpl % idx
595 skeys.append(skey)
596 idx += 1
597 buf, value = value[:blocklen], value[blocklen:]
598 self.store("set", skey, buf, max_age, compress=False)
599 flags |= FLAG_SPLIT
600 value = ' '.join(skeys)
601 return flags, value
602
604 """
605 Decode a value depending on its flags
606
607 :Parameters:
608 - `flags`: Flag bit field
609 - `value`: Value to decode
610
611 :Types:
612 - `flags`: ``int``
613 - `value`: ``str``
614
615 :return: The decoded value
616 :rtype: any
617
618 :Exceptions:
619 - `ValueError`: Bad flags or bad value
620 """
621 type_id = flags & 255
622 flags = flags & 65280
623 if type_id not in self._TYPEMAP or flags & NO_FLAGS:
624 raise ValueError()
625
626 if flags & FLAG_SPLIT:
627 keys = str(value).split()
628 value = self.get(*keys)
629 try:
630 value = ''.join([value[key] for key in keys])
631 except KeyError:
632 raise ValueError()
633 if flags & FLAG_PADDED:
634 if len(value) < 16:
635 raise ValueError()
636 value = value[:-16]
637 if flags & FLAG_COMPRESSED:
638 if _zlib is None:
639 raise ValueError()
640 try:
641 value = _zlib.decompress(value)
642 except _zlib.error, e:
643 raise ValueError(str(e))
644 value = self._TYPEMAP[type_id][2](value)
645 return value
646
648 """
649 Retrieve memcache connection
650
651 The actual memcache connection is selected by the key.
652 The algorithm is a simple
653 ``hashfunc(key) % weighted_selectable_pools``
654
655 :Parameters:
656 - `key`: The key to use for selection
657
658 :Types:
659 - `key`: ``str``
660
661 :return: The connection or ``None``
662 :rtype: `MemcacheConnection`
663 """
664 pools, conns, seen = self._weighted, {}, {}
665 for key in (key,) + keys:
666 conn, hashed = None, int(abs(hashfunc(key)))
667 while conn is None and pools:
668 pool = pools[hashed % len(pools)]
669 if pool in seen:
670 conns[seen[pool]].append(key)
671 break
672 state, retry = pool.state
673 if state == STATE_RETRY and not retry:
674 pools = pool.backup
675 continue
676
677 try:
678 conn = pool.get_conn()
679 except MemcacheConnectError:
680 if state == STATE_RETRY:
681 pools = pool.backup
682 continue
683 elif state != STATE_GRACE:
684 pool.mark_dead(
685 self._grace_time, self._retry_time, self._pools
686 )
687 break
688 else:
689 if pool.dead:
690 pool.mark_alive()
691 seen[pool] = conn
692 conns.setdefault(conn, []).append(key)
693 break
694 return conns
695
696
698 """
699 Memcache connection representation
700
701 :IVariables:
702 - `pool`: Weak reference to the pool
703 - `_conn`: Underlying connection stream
704
705 :Types:
706 - `pool`: `MemcacheConnectionPool`
707 - `_conn`: `stream.GenericStream`
708 """
709 __implements__ = [_util.PooledInterface]
710 pool, _conn = None, None
711
712 - def __init__(self, pool, spec, timeout=None):
736
738 """ Destruction """
739 self.destroy()
740
742 """
743 Delegate unknown requests to the underlying connection
744
745 :Parameters:
746 - `name`: The name to lookup
747
748 :Types:
749 - `name`: ``str``
750
751 :return: The looked up name
752 :rtype: any
753
754 :Exceptions:
755 - `AttributeError`: The symbol could be resolved
756 """
757 return getattr(self._conn, name)
758
762
764 """ :See: `wtf.util.PooledInterface.destroy` """
765 if self.pool is not None:
766 try:
767 self.pool.del_conn(self)
768 finally:
769 try:
770 if self._conn is not None:
771 self._conn.close()
772 except (_socket.error, ValueError, IOError):
773 pass
774
775
777 """
778 Memcache connection pool
779
780 :IVariables:
781 - `spec`: Connection spec
782 - `weight`: Relative pool weight
783 - `timeout`: Communication timeout
784 - `timediff`: Time difference between client and server in seconds. The
785 value is determined after each real connect (``c_time - s_time``)
786 - `backup`: The weighted backup pools used in retry state
787 - `_dead`: dead state and recovery information during dead time. If the
788 pool is alive the value is ``None``. If it's dead it's a tuple
789 containing the retry time and the pool list. (``(int, tuple)``)
790 - `get_conn`: Connection getter
791 - `del_conn`: Connection deleter
792 - `_stamp`: Timestamp when the next event should happen. That is either
793 the switch from grace state to retry state or the next retry.
794 - `_state`: The current state of the pool during dead time (`STATE_GRACE`
795 or `STATE_RETRY`)
796 - `_deadlock`: Lock for dead state access
797
798 :Types:
799 - `spec`: ``tuple``
800 - `weight`: ``int``
801 - `timeout`: ``float``
802 - `timediff`: ``int``
803 - `backup`: ``tuple``
804 - `get_conn`: ``callable``
805 - `del_conn`: ``callable``
806 - `_dead`: ``tuple``
807 - `_stamp`: ``int``
808 - `_state`: ``int``
809 - `_deadlock`: ``threading.RLock``
810 """
811 _FORK_PROTECT = True
812 timediff, _dead, _stamp, backup, _state = 0, False, None, (), None
813
814 - def __init__(self, maxconn, maxcached, spec, weight=None, timeout=None):
815 """
816 Initialization
817
818 :Parameters:
819 - `maxconn`: Hard maximum of connections to hand out
820 - `maxcached`: Maximum number of connections to cache
821 - `spec`: Memcache location spec (``('host', port)``)
822 - `weight`: Relative pool weight (defaults to ``1``)
823 - `timeout`: Memcache timeout (defaults to ``None`` - no timeout)
824
825 :Types:
826 - `maxconn`: ``int``
827 - `maxcached`: ``int``
828 - `spec`: ``tuple``
829 - `weight`: ``int``
830 - `timeout`: ``float``
831 """
832 super(MemcacheConnectionPool, self).__init__(maxconn, maxcached)
833 if weight is None:
834 weight = 1
835 self.spec, self.weight, self.timeout = spec, int(weight), timeout
836 self.get_conn = self.get_obj
837 self.del_conn = self.del_obj
838 self._deadlock = _threading.RLock()
839
841 """ :See: `BasePool._create` """
842 conn = MemcacheConnection(self, self.spec, timeout=self.timeout)
843 try:
844 conn.write("stats" + CRLF)
845 conn.flush()
846 ctime = int(_time.time())
847 stime = None
848 while True:
849 line = conn.readline().strip()
850 if line == "END":
851 break
852 if line.startswith('STAT time '):
853 stime = int(line[10:])
854 if stime is not None:
855 self.timediff = ctime - stime
856 except (TypeError, ValueError, _socket.error):
857 pass
858 return conn
859
861 """
862 Put back connection, but only if not dead
863
864 :Parameters:
865 - `conn`: The connection to put back. If the pool is marked dead,
866 the connection is just destroyed
867
868 :Types:
869 - `conn`: `MemcacheConnection`
870 """
871 lock = self._deadlock
872 lock.acquire()
873 try:
874 if self._dead:
875 conn.destroy()
876 else:
877 self.put_obj(conn)
878 finally:
879 lock.release()
880
881 - def mark_dead(self, grace_time, retry_time, pools):
882 """
883 Mark this pool dead
884
885 :Parameters:
886 - `grace_time`: Grace time
887 - `retry_time`: Retry time
888 - `pools`: List of available pools
889
890 :Types:
891 - `grace_time`: ``int``
892 - `retry_time`: ``int``
893 - `pools`: ``tuple``
894 """
895 lock = self._deadlock
896 lock.acquire()
897 try:
898 self._dead = (retry_time, pools)
899 self._state = STATE_GRACE
900 self._stamp = int(_time.time()) + grace_time
901 self.clear()
902 finally:
903 lock.release()
904
906 """
907 Mark this pool alive
908
909 The method unconditionally removes the backup pool list and the dead
910 status.
911 """
912 lock = self._deadlock
913 lock.acquire()
914 try:
915 self._dead, self.backup = None, ()
916 finally:
917 lock.release()
918
920 """
921 Determine dead state of the pool
922
923 :return: Is it dead?
924 :rtype: ``bool``
925 """
926 lock = self._deadlock
927 lock.acquire()
928 try:
929 return bool(self._dead)
930 finally:
931 lock.release()
932 dead = property(dead, doc=""" (bool) The current dead state """)
933
962 state = property(state, doc="""
963 (tuple) The pool's state.
964
965 The first item is the dead state (`STATE_GRACE` or `STATE_RETRY`) or
966 ``None`` if the pool is not dead. The second item is only useful in
967 retry state. Then it's a boolean answering the question whether we
968 hit a retry point or not.
969 """)
970