Package wtf :: Package ext :: Module sharedance
[hide private]
[frames] | no frames]

Source Code for Module wtf.ext.sharedance

  1  # -*- coding: ascii -*- 
  2  # 
  3  # Copyright 2007-2012 
  4  # Andr\xe9 Malo or his licensors, as applicable 
  5  # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); 
  7  # you may not use this file except in compliance with the License. 
  8  # You may obtain a copy of the License at 
  9  # 
 10  #     http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, software 
 13  # distributed under the License is distributed on an "AS IS" BASIS, 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 15  # See the License for the specific language governing permissions and 
 16  # limitations under the License. 
 17  """ 
 18  ================ 
 19   Sharedance API 
 20  ================ 
 21   
 22  This module implements a sharedance_ API implementation and connector. 
 23   
 24  .. _sharedance: http://sharedance.pureftpd.org/project/sharedance 
 25   
 26  :Variables: 
 27   - `DEFAULT_PORT`: Sharedance default port 
 28   - `FLAG_COMPRESSED`: Flag for compressed storage 
 29   - `NO_FLAGS`: Bit mask for checking invalid flag bits 
 30   
 31  :Types: 
 32   - `DEFAULT_PORT`: ``int`` 
 33   - `FLAG_COMPRESSED`: ``int`` 
 34   - `NO_FLAGS`: ``int`` 
 35  """ 
 36  __author__ = u"Andr\xe9 Malo" 
 37  __docformat__ = "restructuredtext en" 
 38   
 39  import datetime as _datetime 
 40  import itertools as _it 
 41  import os as _os 
 42  import socket as _socket 
 43  import struct as _struct 
 44  try: 
 45      import zlib as _zlib 
 46  except ImportError: 
 47      _zlib = None 
 48   
 49  from wtf import Error 
 50  from wtf import osutil as _osutil 
 51  from wtf import stream as _stream 
 52  from wtf import util as _util 
 53   
 54  DEFAULT_PORT = 1042 
 55   
 56  FLAG_COMPRESSED = 1 # 2 ** 0 
 57  NO_FLAGS = ~(FLAG_COMPRESSED) 
 58   
 59  # python hash() differs between 32bit and 64bit! 
 60  hashfunc = _util.hash32 
61 62 63 -class SharedanceError(Error):
64 """ Sharedance communication error """
65
66 -class SharedanceConnectError(SharedanceError):
67 """ Sharedance connection error """
68
69 -class SharedanceCommandError(SharedanceError):
70 """ Sharedance command error """
71
72 -class SharedanceFormatError(SharedanceError):
73 """ The format of received extended data is broken """
74
75 76 -def escape(value):
77 """ 78 Escape a value as a sharedance key 79 80 The value will be Base64 encoded (but with a slightly modified alphabet) 81 82 :Parameters: 83 - `value`: Value to escape 84 85 :Types: 86 - `value`: ``str`` 87 88 :return: The escaped value 89 :rtype: ``str`` 90 """ 91 return value.encode('base64').replace('\n', '').replace('=', '' 92 ).replace('/', '_').replace('+', '-')
93
94 95 -class Sharedance(object):
96 """ 97 Sharedance API 98 99 :IVariables: 100 - `conns`: List of connectors 101 - `_weighted`: Weighted list of connectors 102 103 :Types: 104 - `conns`: ``tuple`` 105 - `_weighted`: ``tuple`` 106 """ 107
108 - def __init__(self, conns):
109 """ 110 Initialization 111 112 :Parameters: 113 - `conns`: List of sharedance connectors 114 115 :Types: 116 - `conns`: ``iterable`` 117 """ 118 self.conns = tuple(conns) 119 self._weighted = tuple(_it.chain(*[[conn] * conn.weight 120 for conn in self.conns]))
121
122 - def store(self, key, data):
123 """ Store an item """ 124 return self._get_conn(key).store(key, data)
125
126 - def fetch(self, key):
127 """ Fetch an item """ 128 return self._get_conn(key).fetch(key)
129
130 - def delete(self, key):
131 """ Delete a key """ 132 return self._get_conn(key).delete(key)
133
134 - def check(self, key=None, content=None, fetch=None):
135 """ Check the servers for functionality """ 136 return [conn.check(key=key, content=content, fetch=fetch) 137 for conn in self.conns]
138
139 - def _get_conn(self, key):
140 """ Determine connector based on the weighted list """ 141 return self._weighted[int(abs(hashfunc(key))) % len(self._weighted)]
142
143 144 -class _Connected(_util.BaseDecorator):
145 """ Separation of the socket handling out of the connection object """ 146
147 - def __call__(self, inst, *args, **kwargs):
148 """ 149 Decorating logic 150 151 :Parameters: 152 - `inst`: Proxy instance 153 - `args`: function parameters 154 - `kwargs`: function parameters 155 156 :Types: 157 - `inst`: `Sharedance` 158 - `args`: ``tuple`` 159 - `kwargs`: ``dict`` 160 161 :return: Whatever the deocorated function returns 162 :rtype: any 163 """ 164 # pylint: disable = W0221 165 166 try: 167 sock = _osutil.connect((inst.host, inst.port), 168 timeout=inst.timeout, cache=3600) 169 if sock is None: 170 raise SharedanceConnectError( 171 "No connectable address found for %s:%s" % ( 172 inst.host, inst.port 173 ) 174 ) 175 try: 176 conn = \ 177 _stream.GenericStream( 178 _stream.MinimalSocketStream(sock), read_exact=True 179 ) 180 try: 181 kwargs['_conn'] = conn 182 return self._func(inst, *args, **kwargs) 183 finally: 184 sock, _ = None, conn.close() 185 finally: 186 if sock is not None: 187 sock.close() 188 except (_osutil.SocketError, _socket.error), e: 189 raise SharedanceConnectError(str(e))
190
191 192 -class SharedanceConnector(object):
193 """ 194 Sharedance connection abstraction 195 196 If magic is enabled (see `__init__`), the data will be extended with 197 the magic string (`_MAGIC`), followed by 0 byte, a flag number 198 (decimal notation), a LF, the data length (decimal, too) and another 199 LF. On the read this data will be used to transparently check for 200 consistency and evaluate the flags (Currently there's only one: 201 compressed) 202 203 :CVariables: 204 - `_RETURN_OK`: OK return value 205 - `_MAGIC`: Magic string to mark our own format 206 207 :Types: 208 - `_RETURN_OK`: ``str`` 209 - `_MAGIC`: ``str`` 210 """ 211 _RETURN_OK = "OK\n" 212 _MAGIC = "%#\\U;" # Looks strange? This is a classic! Have fun :-) 213
214 - def __init__(self, spec, compress_threshold=None, timeout=None, 215 weight=None, magic=-1):
216 """ 217 Initialization 218 219 The magic parameter determines whether the connection object should 220 transparently handle extended values: 221 222 ``0`` or ``False`` 223 No magic should be applied (neither on reads nor writes) 224 ``1`` or ``True`` 225 Full magic should be applied 226 ``-1`` 227 No write magic should be applied (but reads are interpreted) 228 229 :Parameters: 230 - `spec`: server spec 231 - `timeout`: Timeout in seconds 232 - `magic`: Magic behaviour 233 234 :Types: 235 - `spec`: ``tuple`` 236 - `timeout`: ``float`` 237 - `magic`: ``int`` 238 """ 239 self.host, self.port = spec 240 self.timeout, self.magic = timeout, int(magic) 241 self.compress_threshold = compress_threshold 242 if weight is None: 243 weight = 1 244 self.weight = weight
245 246 @_Connected
247 - def store(self, key, data, _conn=None):
248 """ 249 Store data in sharedance 250 251 :Parameters: 252 - `key`: Key to store under 253 - `data`: Data to store 254 255 :Types: 256 - `key`: ``str`` 257 - `data`: ``str`` 258 259 :Exceptions: 260 - `SharedanceCommandError`: The storage was not successful 261 """ 262 key, data = str(key), str(data) 263 flags, vlen = 0, len(data) 264 if self.magic > 0: 265 if self.compress_threshold is not None and \ 266 vlen >= self.compress_threshold and _zlib is not None: 267 flags |= FLAG_COMPRESSED 268 data = _zlib.compress(data) 269 vlen = len(data) 270 flags, dlen = str(flags), str(vlen) 271 vlen += len(self._MAGIC) + len(flags) + len(dlen) + 3 272 273 write = _conn.write 274 write("S%s" % _struct.pack('!LL', len(key), vlen)) 275 write(key) 276 if self.magic > 0: 277 write("%s\0%s\n%s\n" % (self._MAGIC, flags, dlen)) 278 write(data) 279 _conn.flush() 280 res = _conn.read() 281 if res != self._RETURN_OK: 282 raise SharedanceCommandError( 283 "Storage failed for key %s: %r" % (key, res) 284 )
285 286 @_Connected
287 - def fetch(self, key, _conn=None):
288 """ 289 Fetch data from sharedance 290 291 :Parameters: 292 - `key`: The key to fetch 293 294 :Types: 295 - `key`: ``str`` 296 297 :Exceptions: 298 - `KeyError`: The key does not exist 299 - `SharedanceCommandError`: The result was not interpretable 300 """ 301 key, write = str(key), _conn.write 302 write("F%s" % _struct.pack('!L', len(key))) 303 write(key) 304 _conn.flush() 305 306 expected, flags = -1, 0 307 if self.magic: 308 value = _stream.read_exact(_conn, len(self._MAGIC)) 309 if not value: 310 raise KeyError(key) 311 elif value == self._MAGIC and _conn.read(1) == '\0': 312 line = _conn.readline() 313 try: 314 flags = int(line.rstrip()) 315 except (TypeError, ValueError): 316 raise SharedanceFormatError("Invalid flags: %r" % line) 317 if flags & NO_FLAGS: 318 raise SharedanceFormatError( 319 "Unrecognized flags: %s" % flags) 320 line = _conn.readline() 321 try: 322 expected = int(line.rstrip()) 323 except (TypeError, ValueError): 324 raise SharedanceFormatError( 325 "Invalid value length: %r" % line) 326 327 data = _stream.read_exact(_conn, expected) 328 329 if expected >= 0 and len(data) != expected: 330 raise SharedanceCommandError("Fetch incomplete") 331 if flags & FLAG_COMPRESSED: 332 if _zlib is None: 333 raise SharedanceCommandError( 334 "Cannot uncompress fetched value (no zlib)") 335 else: 336 try: 337 data = _zlib.decompress(data) 338 except _zlib.error, e: 339 raise SharedanceFormatError( 340 "Decompression error: %s" % str(e)) 341 return data
342 343 @_Connected
344 - def delete(self, key, _conn=None):
345 """ 346 Delete data from sharedance 347 348 :Parameters: 349 - `key`: The key to delete 350 351 :Types: 352 - `key`: ``str`` 353 """ 354 key, write = str(key), _conn.write 355 write("D%s" % _struct.pack('!L', len(key))) 356 write(key) 357 _conn.flush() 358 if _conn.read() != self._RETURN_OK: 359 raise SharedanceCommandError("Deletion failed")
360
361 - def check(self, key=None, content=None, fetch=None):
362 """ 363 Check sharedance server for functionality 364 365 The check results are stored in a dict with the following keys: 366 367 ``spec`` 368 [tuple] host and port (``('host', port)``) 369 ``error`` 370 [bool] Was there an error? 371 ``status`` 372 [unicode] Human readable status description 373 ``time`` 374 [int] Time taken for the whole check in microseconds 375 (basically a store/fetch/delete cycle) 376 ``weight`` 377 [int] Weight of this particular sharedance (just for convenience) 378 379 :Parameters: 380 - `key`: Key to store under. If omitted or ``None`` it defaults to 381 ``CHECK-<random>``. 382 - `content`: Content to store. If omitted or ``None`` it defaults 383 to a random string 384 - `fetch`: Try fetching the content? If omitted or ``None`` it 385 defaults to ``True`` 386 387 :Types: 388 - `key`: ``str`` 389 - `content`: ``str`` 390 - `fetch`: ``bool`` 391 392 :return: Check results 393 :rtype: ``dict`` 394 """ 395 # pylint: disable = R0912 396 397 if key is None: 398 key = "CHECK-%s" % escape(_os.urandom(6)) 399 if content is None: 400 content = _os.urandom(8) 401 if fetch is None: 402 fetch = True 403 404 error, status = False, u"OK" 405 start = _datetime.datetime.utcnow() 406 407 try: 408 self.store(key, content) 409 except SharedanceError, e: 410 error, status = True, u"Store of %s failed: %s" % ( 411 repr(key).decode('latin-1'), 412 str(e).decode('latin-1'), 413 ) 414 else: 415 try: 416 if fetch: 417 result = self.fetch(key) 418 else: 419 result = content 420 except KeyError: 421 error, status = ( 422 True, 423 u"Fetch failed: Key does not exist: %s" % 424 repr(key).decode('latin-1') 425 ) 426 except SharedanceError, e: 427 error, status = True, u"Fetch of %s failed: %s" % ( 428 repr(key).decode('latin-1'), 429 str(e).decode('latin-1'), 430 ) 431 else: 432 if result != content: 433 error, status = ( 434 True, 435 u"Store/Fetch cycle of %s failed: %s != %s" % ( 436 repr(key).decode('latin-1'), 437 repr(result).decode('latin-1'), 438 repr(content).decode('latin-1'), 439 ) 440 ) 441 442 # Always try to delete, once it was stored 443 try: 444 self.delete(key) 445 except SharedanceError, e: 446 msg = u"Deletion of %s failed: %s" % ( 447 repr(key).decode('latin-1'), str(e).decode('latin-1') 448 ) 449 if error: 450 status = u", ".join(status, msg) 451 else: 452 error, status = True, msg 453 454 timediff = _datetime.datetime.utcnow() - start 455 timediff = timediff.seconds * 1000000 + timediff.microseconds 456 return dict( 457 spec = (self.host, self.port), 458 error = error, 459 status = status, 460 time = timediff, 461 weight = self.weight, 462 )
463