Package wtf :: Package ext :: Module memcache
[hide private]
[frames] | no frames]

Source Code for Module wtf.ext.memcache

  1  # -*- coding: ascii -*- 
  2  # 
  3  # Copyright 2007-2012 
  4  # Andr\xe9 Malo or his licensors, as applicable 
  5  # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); 
  7  # you may not use this file except in compliance with the License. 
  8  # You may obtain a copy of the License at 
  9  # 
 10  #     http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, software 
 13  # distributed under the License is distributed on an "AS IS" BASIS, 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 15  # See the License for the specific language governing permissions and 
 16  # limitations under the License. 
 17  """ 
 18  ============== 
 19   Memcache API 
 20  ============== 
 21   
 22  This module implements a memcache API implementation and connector. 
 23   
 24  :Variables: 
 25   - `DEFAULT_PORT`: Memcache default port 
 26   - `CRLF`: CRLF sequence, which finishs most of memcache's commands 
 27   - `STATE_GRACE`: Dead state "grace" 
 28   - `STATE_RETRY`: Dead state "retry" 
 29   - `FLAG_COMPRESSED`: Flag for compressed storage 
 30   - `FLAG_PADDED`: Flag for padded storage 
 31   - `FLAG_SPLIT`: Flag for split storage 
 32   - `NO_FLAGS`: Bit mask for checking invalid flag bits 
 33   - `TYPEMAP`: Type map (id -> codec) 
 34   
 35  :Types: 
 36   - `DEFAULT_PORT`: ``int`` 
 37   - `CRLF`: ``str`` 
 38   - `STATE_GRACE`: ``int`` 
 39   - `STATE_RETRY`: ``int`` 
 40   - `FLAG_COMPRESSED`: ``int`` 
 41   - `FLAG_PADDED`: ``int`` 
 42   - `FLAG_SPLIT`: ``int`` 
 43   - `NO_FLAGS`: ``int`` 
 44   - `TYPEMAP`: ``dict`` 
 45  """ 
 46  __author__ = u"Andr\xe9 Malo" 
 47  __docformat__ = "restructuredtext en" 
 48   
 49  try: 
 50      import cPickle as _pickle 
 51  except ImportError: 
 52      import pickle as _pickle 
 53  import itertools as _it 
 54  try: 
 55      import hashlib as _md5 
 56  except ImportError: 
 57      import md5 as _md5 
 58  import os as _os 
 59  import socket as _socket 
 60  import threading as _threading 
 61  import time as _time 
 62  import weakref as _weakref 
 63  try: 
 64      import zlib as _zlib 
 65  except ImportError: 
 66      _zlib = None 
 67   
 68  from wtf import Error 
 69  from wtf import osutil as _osutil 
 70  from wtf import stream as _stream 
 71  from wtf import util as _util 
 72   
 73  DEFAULT_PORT = 11211 
 74  CRLF = "\r\n" 
 75  STATE_GRACE, STATE_RETRY = xrange(2) 
 76   
 77  # python hash() differs between 32bit and 64bit! 
 78  hashfunc = _util.hash32 
 79   
 80   
 81  # 8 bits for the type 
 82  # 8 bits for the flags 
 83  # 
 84  # ...RNING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARN... 
 85  #   Do not change the order of the types - otherwise already stored stuff 
 86  #   from a memcache will result in crap. New types (up to 255) can be 
 87  #   added at the end of the list. 
 88  # 
 89  #   The first one is the fallback (pickle). 
 90  # 
 91  # ...ING WARNING WARNING WARNING WARNING WARNING WARNING WARNING WARNIN... 
 92  TYPEMAP = dict(enumerate(( 
 93      # name, encoder, decoder 
 94      (None, lambda x: _pickle.dumps(x, -1), _pickle.loads), 
 95      (unicode, 
 96          lambda x: x.encode('utf-8'), 
 97          lambda x: x.decode('utf-8'), 
 98      ), 
 99      (str, str, str), 
100      (bool, 
101          lambda x: x and "1" or "", 
102          bool, 
103      ), 
104      (int, str, int), 
105      (long, str, long), 
106  ))) 
107   
108  FLAG_COMPRESSED = 256 # 2 **  8 
109  FLAG_PADDED = 512     # 2 **  9 
110  FLAG_SPLIT = 1024     # 2 ** 10 
111  NO_FLAGS = ~(FLAG_COMPRESSED | FLAG_PADDED | FLAG_SPLIT) 
112   
113   
114 -class MemcacheError(Error):
115 """ Memcache communication error """
116
117 -class MemcacheConnectError(MemcacheError):
118 """ Memcache connect error """
119
120 -class CommandError(MemcacheError):
121 """ Unrecognized command """
122
123 -class ClientError(MemcacheError):
124 """ Invalid command line """
125
126 -class ServerError(MemcacheError):
127 """ Server error """
128
129 -class UnknownError(MemcacheError):
130 """ Unknown error from the server """
131 132
133 -class Memcache(object):
134 """ 135 Memcache cluster proxy 136 137 :CVariables: 138 - `DEFAULT_GRACE_TIME`: Default grace time in seconds. See `__init__` for 139 further details. 140 - `DEFAULT_RETRY_TIME`: Default retry time in seconds. See `__init__` for 141 details. 142 - `DEFAULT_COMPRESS_THRESHOLD`: Default minimum size for compressing 143 values 144 - `DEFAULT_PADDED`: Default padding behavior 145 - `DEFAULT_SPLIT`: Default splitting behaviour 146 - `DEFAULT_LARGEST_SLAB`: Default maximum slab size 147 - `_TYPEMAP`: typemap 148 149 :IVariables: 150 - `_pools`: List of available pools 151 - `_weighted`: Weighted list of available pools 152 - `_grace_time`: Grace time for this instance 153 - `_retry_time`: Retry time for this instance 154 - `_compress_threshold`: Minimal size for compression 155 - `_padded`: Pad small values (<16 byte)? 156 - `_split`: Allow large value splitting? 157 - `_prefix`: Key prefix to use 158 - `_largest_slab`: Largest SLAB size 159 160 :Types: 161 - `DEFAULT_GRACE_TIME`: ``int`` 162 - `DEFAULT_RETRY_TIME`: ``int`` 163 - `DEFAULT_COMPRESS_THRESHOLD`: ``int`` 164 - `DEFAULT_PADDED`: ``bool`` 165 - `DEFAULT_SPLIT`: ``bool`` 166 - `DEFAULT_LARGEST_SLAB`: ``int`` 167 - `_TYPEMAP`: ``dict`` 168 - `_pools`: ``tuple`` 169 - `_weighted`: ``tuple`` 170 - `_grace_time`: ``int`` 171 - `_retry_time`: ``int`` 172 - `_compress_threshold`: ``int`` 173 - `_padded`: ``bool`` 174 - `_split`: ``bool`` 175 - `_prefix`: ``str`` 176 - `_largest_slab`: ``int`` 177 """ 178 DEFAULT_GRACE_TIME = 30 179 DEFAULT_RETRY_TIME = 60 180 DEFAULT_COMPRESS_THRESHOLD = 128 181 DEFAULT_PADDED = True 182 DEFAULT_SPLIT = True 183 DEFAULT_LARGEST_SLAB = 1048576 # POWER_BLOCK in slabs.c 184 _TYPEMAP = TYPEMAP 185
186 - def __init__(self, pools, prepare=None, grace_time=None, retry_time=None, 187 compress_threshold=None, padded=None, split=None, 188 prefix=None, largest_slab=None):
189 """ 190 Initialization 191 192 `grace_time` and `retry_time` describe the behaviour of 193 the dispatcher in case one or more dead pools. The algorithm works 194 as follows: 195 196 If a server is detected to be unreachable, it is marked dead. Now the 197 grace counter starts to run. Now if the server stays dead until 198 `grace_time` is reached requests for the server (read and write) are 199 discarded. This gives short memcache outages (like restarts) 200 a chance to recover without adding load to the other caches 201 of the cluster. However, when the grace time threshold is reached, 202 the server is considered completely dead and the requests are 203 dispatched to the other ones. The now completely-declared-dead 204 server will be retried every `retry_time` seconds from now on until 205 it's vivified again. 206 207 :Parameters: 208 - `pools`: List of memcache connection pools 209 (``[MemcacheConnectionPool, ...]``) 210 - `prepare`: Key preparation function 211 - `grace_time`: Grace time in seconds 212 - `retry_time`: Retry time in seconds 213 - `compress_threshold`: Minimum size for compression. If omitted or 214 ``None``, `DEFAULT_COMPRESS_THRESHOLD` is applied. 215 - `padded`: Pad small values (< 16 byte)? If omitted or ``None``, 216 `DEFAULT_PADDED` is applied 217 - `split`: Split large values? If omitted or ``None``, 218 `DEFAULT_SPLIT` is applied 219 - `prefix`: Prefix for keys. Empty by default 220 - `largest_slab`: Largest SLAB item size of the server, if omitted or 221 ``None``, `DEFAULT_LARGEST_SLAB` is applied. 222 223 :Types: 224 - `pools`: ``iterable`` 225 - `prepare`: ``callable`` 226 - `grace_time`: ``int`` 227 - `retry_time`: ``int`` 228 - `compress_threshold`: ``int`` 229 - `padded`: ``bool`` 230 - `split`: ``bool`` 231 - `prefix`: ``str`` 232 - `largest_slab`: ``int`` 233 """ 234 # Key config 235 if prepare is None: 236 if prefix: 237 prepare = lambda x: prefix + x 238 else: 239 prepare = lambda x: x 240 elif prefix: 241 _prepare = prepare 242 prepare = lambda x: prefix + _prepare(x) 243 self._prepare_key = prepare 244 245 # Value config 246 if compress_threshold is None: 247 compress_threshold = self.DEFAULT_COMPRESS_THRESHOLD 248 self._compress_threshold = compress_threshold 249 self._padded = padded 250 self._split = split 251 self._largest_slab = \ 252 [largest_slab, self.DEFAULT_LARGEST_SLAB][largest_slab is None] 253 254 # Pool config 255 self._pools = tuple(pools) 256 self._weighted = tuple(_it.chain(*[[pool] * pool.weight 257 for pool in self._pools])) 258 self._grace_time = int( 259 [grace_time, self.DEFAULT_GRACE_TIME][grace_time is None]) 260 self._retry_time = int( 261 [retry_time, self.DEFAULT_RETRY_TIME][retry_time is None])
262
263 - def delete(self, key, block_time=None, all_pools=False):
264 """ 265 Delete a key/value pair from the cache 266 267 :Parameters: 268 - `key`: The key to identify the item to delete 269 - `block_time`: Time to block add and replace requests for this key 270 in seconds. If omitted or ``None``, the blocking time is ``0``. 271 - `all_pools`: Issue delete to each pool? This may be useful to 272 enforce the deletion on backup pools, too. However, it won't 273 delete the key from currently dead pools. So, it might be not 274 that useful after all, but it's the best we can do from this 275 side of the ocean. 276 277 :Types: 278 - `key`: ``str`` 279 - `block_time`: ``int`` 280 - `all_pools`: ``bool`` 281 282 :return: Whether it was really deleted from the main pool (or the 283 current backup pool) of this key (i.e. whether it existed 284 before) 285 :rtype: ``bool`` 286 """ 287 result = False 288 key = self._prepare_key(key) 289 block_time = max(0, int(block_time or 0)) 290 mainpool = None 291 try: 292 conns = self._get_conn(key) 293 if conns is not None: 294 try: 295 conn = conns.keys()[0] 296 mainpool = conn.pool 297 conn.write("delete %s %s%s" % (key, block_time, CRLF)) 298 conn.flush() 299 line = self._error(conn.readline()) 300 finally: 301 conns = conns.keys() 302 while conns: 303 conns.pop().close() 304 result = line == "DELETED" 305 except _socket.error: 306 pass 307 308 if all_pools: 309 for pool in self._pools: 310 if pool == mainpool or pool.dead: 311 continue 312 try: 313 conn = pool.get_conn() 314 try: 315 conn.write("delete %s %s%s" % (key, block_time, CRLF)) 316 conn.flush() 317 conn.readline() # don't care about the response 318 finally: 319 conn.close() 320 except _socket.error: 321 pass 322 return result
323
324 - def get(self, *keys):
325 """ 326 Get a list of key/value pairs from the cache (if applicable) 327 328 The returned dict contains all pairs it could get. But keys maybe 329 missing or the dict might be completely empty (of course). 330 331 :Parameters: 332 - `keys`: The keys to fetch 333 334 :Types: 335 - `keys`: ``tuple`` 336 337 :return: The dict of key/value pairs 338 :rtype: ``dict`` 339 """ 340 # pylint: disable = R0912, R0915 341 342 result = {} 343 if not keys: 344 return result 345 keymap = dict((self._prepare_key(key), key) for key in keys) 346 try: 347 conns = self._get_conn(*keymap.keys()) 348 if not conns: 349 return result 350 conns = conns.items() 351 try: 352 while conns: 353 conn, keys = conns.pop() 354 try: 355 conn.write("get %s%s" % (" ".join(keys), CRLF)) 356 conn.flush() 357 while True: 358 line = self._error(conn.readline()) 359 if line == "END": 360 break 361 elif line.startswith("VALUE "): 362 _, key, flags, length = line.split() 363 flags, length = int(flags), int(length) 364 value = _stream.read_exact(conn, length) 365 if _stream.read_exact(conn, 2) != CRLF: 366 # sync error? 367 conn, _ = None, conn.destroy() 368 return {} 369 try: 370 result[keymap[key]] = \ 371 self._decode_value(flags, value) 372 except (TypeError, ValueError): 373 pass # wrong flags or something 374 except KeyError: 375 raise KeyError('%r, %s: %r' % ( 376 line, key, keymap 377 )) 378 except (SystemExit, KeyboardInterrupt): 379 raise 380 except: 381 import sys 382 e = sys.exc_info() 383 try: 384 msg = "%s:: %r, %s, %r" % ( 385 str(e[1]), line, flags, value 386 ) 387 e = (e[0], msg, e[2]) 388 finally: 389 try: 390 raise e[0], e[1], e[2] 391 finally: 392 del e 393 else: 394 # something else we don't know. Better close 395 # the connection. 396 conn, _ = None, conn.destroy() 397 return {} 398 finally: 399 if conn is not None: 400 conn.close() 401 finally: 402 while conns: 403 conns.pop()[0].close() 404 except _socket.error: 405 pass 406 return result
407
408 - def set(self, key, value, max_age):
409 """ 410 Set a key/value pair unconditionally 411 412 :Parameters: 413 - `key`: The key to store under 414 - `value`: The value to store (should be picklable) 415 - `max_age`: Maximum age in seconds 416 417 :Types: 418 - `key`: ``str`` 419 - `value`: any 420 - `max_age`: ``int`` 421 422 :return: Stored successfully? 423 :rtype: ``bool`` 424 """ 425 return self.store("set", key, value, max_age)
426
427 - def add(self, key, value, max_age):
428 """ 429 Set a key/value pair if the key does not exist yet 430 431 :Parameters: 432 - `key`: The key to store under 433 - `value`: The value to store (should be picklable) 434 - `max_age`: Maximum age in seconds 435 436 :Types: 437 - `key`: ``str`` 438 - `value`: any 439 - `max_age`: ``int`` 440 441 :return: Stored successfully? 442 :rtype: ``bool`` 443 """ 444 return self.store("add", key, value, max_age)
445
446 - def replace(self, key, value, max_age):
447 """ 448 Set a key/value pair only if the key does exist already 449 450 :Parameters: 451 - `key`: The key to store under 452 - `value`: The value to store (should be picklable) 453 - `max_age`: Maximum age in seconds 454 455 :Types: 456 - `key`: ``str`` 457 - `value`: any 458 - `max_age`: ``int`` 459 460 :return: Stored successfully? 461 :rtype: ``bool`` 462 """ 463 return self.store("replace", key, value, max_age)
464
465 - def store(self, method, key, value, max_age, compress=True):
466 """ 467 Store the value under the given key expiring now + expiry 468 469 :Parameters: 470 - `method`: Actual method to call (``set``, ``add`` or ``replace``) 471 - `key`: The key to store under 472 - `value`: The value to store (should be picklable) 473 - `max_age`: Max age of the entry in seconds 474 - `compress`: Compress the value? 475 476 :Types: 477 - `method`: ``str`` 478 - `key`: ``str`` 479 - `value`: any 480 - `max_age`: ``int`` 481 - `compress`: ``bool`` 482 483 :return: Stored successfully? 484 :rtype: ``bool`` 485 """ 486 conn = None 487 key = self._prepare_key(key) 488 try: 489 try: 490 flags, value = self._encode_value( 491 key, value, max_age, compress 492 ) 493 conns = self._get_conn(key) 494 if not conns: 495 return False 496 conn, conns = conns.keys()[0], None 497 expiry = int(_time.time()) + max_age - conn.pool.timediff 498 cmd = "%(cmd)s %(key)s %(flags)s %(exp)s %(len)s%(nl)s" % \ 499 dict( 500 cmd=method, 501 key=key, 502 flags=flags, 503 exp=expiry, 504 len=len(value), 505 nl=CRLF, 506 ) 507 conn.write(cmd) 508 conn.write(value) 509 conn.write(CRLF) 510 conn.flush() 511 return self._error(conn.readline()) == "STORED" 512 except _socket.error: 513 conn, _ = None, conn.destroy() 514 return False 515 finally: 516 if conn is not None: 517 conn.close()
518
519 - def _error(self, line):
520 """ 521 Convert a response line into an error or pass it through 522 523 :Parameters: 524 - `line`: The response line to inspect 525 526 :Types: 527 - `line`: ``str`` 528 529 :return: The stripped response line 530 :rtype: ``str`` 531 532 :Exceptions: 533 - `CommandError`: Command error 534 - `ClientError`: Client error 535 - `ServerError`: Server error 536 """ 537 line = line.strip() 538 if "ERROR" in line: 539 if line == "ERROR": 540 raise CommandError() 541 elif line.startswith("CLIENT_ERROR "): 542 raise ClientError(line[13:]) 543 elif line.startswith("SERVER_ERROR "): 544 raise ServerError(line[13:]) 545 else: 546 pos = line.find(' ') 547 if pos > 0: 548 raise UnknownError(line[pos + 1:]) 549 raise UnknownError() 550 return line
551
552 - def _encode_value(self, key, value, max_age, compress):
553 """ 554 Encode a value for the memcache 555 556 :Parameters: 557 - `key`: The store key 558 - `value`: The value to encode 559 - `max_age`: Maxc age of this item 560 - `compress`: Allow value compression? 561 562 :Types: 563 - `key`: ``str`` 564 - `value`: any 565 - `max_age`: ``int`` 566 - `compress`: ``bool`` 567 568 :return: The flags and the encoded value (``(int, str)``) 569 :rtype: ``tuple`` 570 """ 571 flags, vtype = 0, type(value) 572 for type_id, (kind, encoder, _) in self._TYPEMAP.iteritems(): 573 if type_id == 0: 574 continue 575 if vtype is kind: 576 flags, value = type_id, encoder(value) 577 break 578 else: 579 value = self._TYPEMAP[0][1](value) 580 if compress and \ 581 len(value) >= self._compress_threshold and _zlib is not None: 582 value = _zlib.compress(value, 9) 583 flags |= FLAG_COMPRESSED 584 if self._padded and len(value) < 16: 585 value = value + "\0" * 16 586 flags |= FLAG_PADDED 587 if (len(value) + len(key) + 100) > self._largest_slab: 588 tpl = "split:%s:%s-%%s" % ( 589 _md5.md5(_os.urandom(20)).hexdigest(), key 590 ) 591 blocklen = self._largest_slab - len(tpl) - 100 592 skeys, idx = [], 0 593 while value: 594 skey = tpl % idx 595 skeys.append(skey) 596 idx += 1 597 buf, value = value[:blocklen], value[blocklen:] 598 self.store("set", skey, buf, max_age, compress=False) 599 flags |= FLAG_SPLIT 600 value = ' '.join(skeys) 601 return flags, value
602
603 - def _decode_value(self, flags, value):
604 """ 605 Decode a value depending on its flags 606 607 :Parameters: 608 - `flags`: Flag bit field 609 - `value`: Value to decode 610 611 :Types: 612 - `flags`: ``int`` 613 - `value`: ``str`` 614 615 :return: The decoded value 616 :rtype: any 617 618 :Exceptions: 619 - `ValueError`: Bad flags or bad value 620 """ 621 type_id = flags & 255 622 flags = flags & 65280 623 if type_id not in self._TYPEMAP or flags & NO_FLAGS: 624 raise ValueError() 625 626 if flags & FLAG_SPLIT: 627 keys = str(value).split() 628 value = self.get(*keys) 629 try: 630 value = ''.join([value[key] for key in keys]) 631 except KeyError: 632 raise ValueError() 633 if flags & FLAG_PADDED: 634 if len(value) < 16: 635 raise ValueError() 636 value = value[:-16] 637 if flags & FLAG_COMPRESSED: 638 if _zlib is None: 639 raise ValueError() 640 try: 641 value = _zlib.decompress(value) 642 except _zlib.error, e: 643 raise ValueError(str(e)) 644 value = self._TYPEMAP[type_id][2](value) 645 return value
646
647 - def _get_conn(self, key, *keys):
648 """ 649 Retrieve memcache connection 650 651 The actual memcache connection is selected by the key. 652 The algorithm is a simple 653 ``hashfunc(key) % weighted_selectable_pools`` 654 655 :Parameters: 656 - `key`: The key to use for selection 657 658 :Types: 659 - `key`: ``str`` 660 661 :return: The connection or ``None`` 662 :rtype: `MemcacheConnection` 663 """ 664 pools, conns, seen = self._weighted, {}, {} 665 for key in (key,) + keys: 666 conn, hashed = None, int(abs(hashfunc(key))) 667 while conn is None and pools: 668 pool = pools[hashed % len(pools)] 669 if pool in seen: 670 conns[seen[pool]].append(key) 671 break 672 state, retry = pool.state 673 if state == STATE_RETRY and not retry: 674 pools = pool.backup 675 continue 676 677 try: 678 conn = pool.get_conn() 679 except MemcacheConnectError: 680 if state == STATE_RETRY: 681 pools = pool.backup 682 continue 683 elif state != STATE_GRACE: 684 pool.mark_dead( 685 self._grace_time, self._retry_time, self._pools 686 ) 687 break 688 else: 689 if pool.dead: 690 pool.mark_alive() 691 seen[pool] = conn 692 conns.setdefault(conn, []).append(key) 693 break 694 return conns
695 696
697 -class MemcacheConnection(object):
698 """ 699 Memcache connection representation 700 701 :IVariables: 702 - `pool`: Weak reference to the pool 703 - `_conn`: Underlying connection stream 704 705 :Types: 706 - `pool`: `MemcacheConnectionPool` 707 - `_conn`: `stream.GenericStream` 708 """ 709 __implements__ = [_util.PooledInterface] 710 pool, _conn = None, None 711
712 - def __init__(self, pool, spec, timeout=None):
713 """ 714 Initialization 715 716 :Parameters: 717 - `pool`: Pool reference 718 - `spec`: Connection spec 719 - `timeout`: Communication timeout 720 721 :Types: 722 - `pool`: `MemcacheConnectionPool` 723 - `spec`: ``tuple`` 724 - `timeout`: ``float`` 725 """ 726 self.pool = _weakref.proxy(pool) 727 try: 728 sock = _osutil.connect(spec, timeout=timeout, cache=3600) 729 except _osutil.SocketError, e: 730 raise MemcacheConnectError(str(e)) 731 if sock is None: 732 raise MemcacheConnectError("No connectable address found") 733 self._conn = _stream.GenericStream( 734 _stream.MinimalSocketStream(sock), read_exact=True 735 )
736
737 - def __del__(self):
738 """ Destruction """ 739 self.destroy()
740
741 - def __getattr__(self, name):
742 """ 743 Delegate unknown requests to the underlying connection 744 745 :Parameters: 746 - `name`: The name to lookup 747 748 :Types: 749 - `name`: ``str`` 750 751 :return: The looked up name 752 :rtype: any 753 754 :Exceptions: 755 - `AttributeError`: The symbol could be resolved 756 """ 757 return getattr(self._conn, name)
758
759 - def close(self):
760 """ Close connection """ 761 self.pool.put_conn(self)
762
763 - def destroy(self):
764 """ :See: `wtf.util.PooledInterface.destroy` """ 765 if self.pool is not None: 766 try: 767 self.pool.del_conn(self) 768 finally: 769 try: 770 if self._conn is not None: 771 self._conn.close() 772 except (_socket.error, ValueError, IOError): 773 pass
774 775
776 -class MemcacheConnectionPool(_util.BasePool):
777 """ 778 Memcache connection pool 779 780 :IVariables: 781 - `spec`: Connection spec 782 - `weight`: Relative pool weight 783 - `timeout`: Communication timeout 784 - `timediff`: Time difference between client and server in seconds. The 785 value is determined after each real connect (``c_time - s_time``) 786 - `backup`: The weighted backup pools used in retry state 787 - `_dead`: dead state and recovery information during dead time. If the 788 pool is alive the value is ``None``. If it's dead it's a tuple 789 containing the retry time and the pool list. (``(int, tuple)``) 790 - `get_conn`: Connection getter 791 - `del_conn`: Connection deleter 792 - `_stamp`: Timestamp when the next event should happen. That is either 793 the switch from grace state to retry state or the next retry. 794 - `_state`: The current state of the pool during dead time (`STATE_GRACE` 795 or `STATE_RETRY`) 796 - `_deadlock`: Lock for dead state access 797 798 :Types: 799 - `spec`: ``tuple`` 800 - `weight`: ``int`` 801 - `timeout`: ``float`` 802 - `timediff`: ``int`` 803 - `backup`: ``tuple`` 804 - `get_conn`: ``callable`` 805 - `del_conn`: ``callable`` 806 - `_dead`: ``tuple`` 807 - `_stamp`: ``int`` 808 - `_state`: ``int`` 809 - `_deadlock`: ``threading.RLock`` 810 """ 811 _FORK_PROTECT = True 812 timediff, _dead, _stamp, backup, _state = 0, False, None, (), None 813
814 - def __init__(self, maxconn, maxcached, spec, weight=None, timeout=None):
815 """ 816 Initialization 817 818 :Parameters: 819 - `maxconn`: Hard maximum of connections to hand out 820 - `maxcached`: Maximum number of connections to cache 821 - `spec`: Memcache location spec (``('host', port)``) 822 - `weight`: Relative pool weight (defaults to ``1``) 823 - `timeout`: Memcache timeout (defaults to ``None`` - no timeout) 824 825 :Types: 826 - `maxconn`: ``int`` 827 - `maxcached`: ``int`` 828 - `spec`: ``tuple`` 829 - `weight`: ``int`` 830 - `timeout`: ``float`` 831 """ 832 super(MemcacheConnectionPool, self).__init__(maxconn, maxcached) 833 if weight is None: 834 weight = 1 835 self.spec, self.weight, self.timeout = spec, int(weight), timeout 836 self.get_conn = self.get_obj 837 self.del_conn = self.del_obj 838 self._deadlock = _threading.RLock()
839
840 - def _create(self): # pylint: disable = E0202
841 """ :See: `BasePool._create` """ 842 conn = MemcacheConnection(self, self.spec, timeout=self.timeout) 843 try: 844 conn.write("stats" + CRLF) 845 conn.flush() 846 ctime = int(_time.time()) 847 stime = None 848 while True: 849 line = conn.readline().strip() 850 if line == "END": 851 break 852 if line.startswith('STAT time '): 853 stime = int(line[10:]) 854 if stime is not None: 855 self.timediff = ctime - stime 856 except (TypeError, ValueError, _socket.error): 857 pass 858 return conn
859
860 - def put_conn(self, conn):
861 """ 862 Put back connection, but only if not dead 863 864 :Parameters: 865 - `conn`: The connection to put back. If the pool is marked dead, 866 the connection is just destroyed 867 868 :Types: 869 - `conn`: `MemcacheConnection` 870 """ 871 lock = self._deadlock 872 lock.acquire() 873 try: 874 if self._dead: 875 conn.destroy() 876 else: 877 self.put_obj(conn) 878 finally: 879 lock.release()
880
881 - def mark_dead(self, grace_time, retry_time, pools):
882 """ 883 Mark this pool dead 884 885 :Parameters: 886 - `grace_time`: Grace time 887 - `retry_time`: Retry time 888 - `pools`: List of available pools 889 890 :Types: 891 - `grace_time`: ``int`` 892 - `retry_time`: ``int`` 893 - `pools`: ``tuple`` 894 """ 895 lock = self._deadlock 896 lock.acquire() 897 try: 898 self._dead = (retry_time, pools) 899 self._state = STATE_GRACE 900 self._stamp = int(_time.time()) + grace_time 901 self.clear() 902 finally: 903 lock.release()
904
905 - def mark_alive(self):
906 """ 907 Mark this pool alive 908 909 The method unconditionally removes the backup pool list and the dead 910 status. 911 """ 912 lock = self._deadlock 913 lock.acquire() 914 try: 915 self._dead, self.backup = None, () 916 finally: 917 lock.release()
918
919 - def dead(self):
920 """ 921 Determine dead state of the pool 922 923 :return: Is it dead? 924 :rtype: ``bool`` 925 """ 926 lock = self._deadlock 927 lock.acquire() 928 try: 929 return bool(self._dead) 930 finally: 931 lock.release()
932 dead = property(dead, doc=""" (bool) The current dead state """) 933
934 - def state(self):
935 """ 936 Determine the pool's state 937 938 :return: The state (``(state, retry?)``) 939 :rtype: ``tuple`` 940 """ 941 lock = self._deadlock 942 lock.acquire() 943 try: 944 dead = self._dead 945 if dead: 946 retry, state, now = False, self._state, int(_time.time()) 947 if state == STATE_GRACE: 948 if self._stamp < now: 949 state = self._state = STATE_RETRY 950 self._stamp = now + dead[0] 951 self.backup = tuple(_it.chain(*[ 952 [pool] * pool.weight for pool in dead[1] 953 if not pool.dead 954 ])) 955 elif state == STATE_RETRY and self._stamp < now: 956 self._stamp = now + dead[0] 957 retry = True 958 return state, retry 959 return None, None 960 finally: 961 lock.release()
962 state = property(state, doc=""" 963 (tuple) The pool's state. 964 965 The first item is the dead state (`STATE_GRACE` or `STATE_RETRY`) or 966 ``None`` if the pool is not dead. The second item is only useful in 967 retry state. Then it's a boolean answering the question whether we 968 hit a retry point or not. 969 """) 970