1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 ================
19 Sharedance API
20 ================
21
22 This module implements a sharedance_ API implementation and connector.
23
24 .. _sharedance: http://sharedance.pureftpd.org/project/sharedance
25
26 :Variables:
27 - `DEFAULT_PORT`: Sharedance default port
28 - `FLAG_COMPRESSED`: Flag for compressed storage
29 - `NO_FLAGS`: Bit mask for checking invalid flag bits
30
31 :Types:
32 - `DEFAULT_PORT`: ``int``
33 - `FLAG_COMPRESSED`: ``int``
34 - `NO_FLAGS`: ``int``
35 """
36 __author__ = u"Andr\xe9 Malo"
37 __docformat__ = "restructuredtext en"
38
39 import datetime as _datetime
40 import itertools as _it
41 import os as _os
42 import socket as _socket
43 import struct as _struct
44 try:
45 import zlib as _zlib
46 except ImportError:
47 _zlib = None
48
49 from wtf import Error
50 from wtf import osutil as _osutil
51 from wtf import stream as _stream
52 from wtf import util as _util
53
54 DEFAULT_PORT = 1042
55
56 FLAG_COMPRESSED = 1
57 NO_FLAGS = ~(FLAG_COMPRESSED)
58
59
60 hashfunc = _util.hash32
64 """ Sharedance communication error """
65
67 """ Sharedance connection error """
68
70 """ Sharedance command error """
71
74
77 """
78 Escape a value as a sharedance key
79
80 The value will be Base64 encoded (but with a slightly modified alphabet)
81
82 :Parameters:
83 - `value`: Value to escape
84
85 :Types:
86 - `value`: ``str``
87
88 :return: The escaped value
89 :rtype: ``str``
90 """
91 return value.encode('base64').replace('\n', '').replace('=', ''
92 ).replace('/', '_').replace('+', '-')
93
96 """
97 Sharedance API
98
99 :IVariables:
100 - `conns`: List of connectors
101 - `_weighted`: Weighted list of connectors
102
103 :Types:
104 - `conns`: ``tuple``
105 - `_weighted`: ``tuple``
106 """
107
109 """
110 Initialization
111
112 :Parameters:
113 - `conns`: List of sharedance connectors
114
115 :Types:
116 - `conns`: ``iterable``
117 """
118 self.conns = tuple(conns)
119 self._weighted = tuple(_it.chain(*[[conn] * conn.weight
120 for conn in self.conns]))
121
122 - def store(self, key, data):
125
129
133
134 - def check(self, key=None, content=None, fetch=None):
135 """ Check the servers for functionality """
136 return [conn.check(key=key, content=content, fetch=fetch)
137 for conn in self.conns]
138
140 """ Determine connector based on the weighted list """
141 return self._weighted[int(abs(hashfunc(key))) % len(self._weighted)]
142
145 """ Separation of the socket handling out of the connection object """
146
147 - def __call__(self, inst, *args, **kwargs):
148 """
149 Decorating logic
150
151 :Parameters:
152 - `inst`: Proxy instance
153 - `args`: function parameters
154 - `kwargs`: function parameters
155
156 :Types:
157 - `inst`: `Sharedance`
158 - `args`: ``tuple``
159 - `kwargs`: ``dict``
160
161 :return: Whatever the deocorated function returns
162 :rtype: any
163 """
164
165
166 try:
167 sock = _osutil.connect((inst.host, inst.port),
168 timeout=inst.timeout, cache=3600)
169 if sock is None:
170 raise SharedanceConnectError(
171 "No connectable address found for %s:%s" % (
172 inst.host, inst.port
173 )
174 )
175 try:
176 conn = \
177 _stream.GenericStream(
178 _stream.MinimalSocketStream(sock), read_exact=True
179 )
180 try:
181 kwargs['_conn'] = conn
182 return self._func(inst, *args, **kwargs)
183 finally:
184 sock, _ = None, conn.close()
185 finally:
186 if sock is not None:
187 sock.close()
188 except (_osutil.SocketError, _socket.error), e:
189 raise SharedanceConnectError(str(e))
190
193 """
194 Sharedance connection abstraction
195
196 If magic is enabled (see `__init__`), the data will be extended with
197 the magic string (`_MAGIC`), followed by 0 byte, a flag number
198 (decimal notation), a LF, the data length (decimal, too) and another
199 LF. On the read this data will be used to transparently check for
200 consistency and evaluate the flags (Currently there's only one:
201 compressed)
202
203 :CVariables:
204 - `_RETURN_OK`: OK return value
205 - `_MAGIC`: Magic string to mark our own format
206
207 :Types:
208 - `_RETURN_OK`: ``str``
209 - `_MAGIC`: ``str``
210 """
211 _RETURN_OK = "OK\n"
212 _MAGIC = "%#\\U;"
213
214 - def __init__(self, spec, compress_threshold=None, timeout=None,
215 weight=None, magic=-1):
216 """
217 Initialization
218
219 The magic parameter determines whether the connection object should
220 transparently handle extended values:
221
222 ``0`` or ``False``
223 No magic should be applied (neither on reads nor writes)
224 ``1`` or ``True``
225 Full magic should be applied
226 ``-1``
227 No write magic should be applied (but reads are interpreted)
228
229 :Parameters:
230 - `spec`: server spec
231 - `timeout`: Timeout in seconds
232 - `magic`: Magic behaviour
233
234 :Types:
235 - `spec`: ``tuple``
236 - `timeout`: ``float``
237 - `magic`: ``int``
238 """
239 self.host, self.port = spec
240 self.timeout, self.magic = timeout, int(magic)
241 self.compress_threshold = compress_threshold
242 if weight is None:
243 weight = 1
244 self.weight = weight
245
246 @_Connected
247 - def store(self, key, data, _conn=None):
248 """
249 Store data in sharedance
250
251 :Parameters:
252 - `key`: Key to store under
253 - `data`: Data to store
254
255 :Types:
256 - `key`: ``str``
257 - `data`: ``str``
258
259 :Exceptions:
260 - `SharedanceCommandError`: The storage was not successful
261 """
262 key, data = str(key), str(data)
263 flags, vlen = 0, len(data)
264 if self.magic > 0:
265 if self.compress_threshold is not None and \
266 vlen >= self.compress_threshold and _zlib is not None:
267 flags |= FLAG_COMPRESSED
268 data = _zlib.compress(data)
269 vlen = len(data)
270 flags, dlen = str(flags), str(vlen)
271 vlen += len(self._MAGIC) + len(flags) + len(dlen) + 3
272
273 write = _conn.write
274 write("S%s" % _struct.pack('!LL', len(key), vlen))
275 write(key)
276 if self.magic > 0:
277 write("%s\0%s\n%s\n" % (self._MAGIC, flags, dlen))
278 write(data)
279 _conn.flush()
280 res = _conn.read()
281 if res != self._RETURN_OK:
282 raise SharedanceCommandError(
283 "Storage failed for key %s: %r" % (key, res)
284 )
285
286 @_Connected
287 - def fetch(self, key, _conn=None):
288 """
289 Fetch data from sharedance
290
291 :Parameters:
292 - `key`: The key to fetch
293
294 :Types:
295 - `key`: ``str``
296
297 :Exceptions:
298 - `KeyError`: The key does not exist
299 - `SharedanceCommandError`: The result was not interpretable
300 """
301 key, write = str(key), _conn.write
302 write("F%s" % _struct.pack('!L', len(key)))
303 write(key)
304 _conn.flush()
305
306 expected, flags = -1, 0
307 if self.magic:
308 value = _stream.read_exact(_conn, len(self._MAGIC))
309 if not value:
310 raise KeyError(key)
311 elif value == self._MAGIC and _conn.read(1) == '\0':
312 line = _conn.readline()
313 try:
314 flags = int(line.rstrip())
315 except (TypeError, ValueError):
316 raise SharedanceFormatError("Invalid flags: %r" % line)
317 if flags & NO_FLAGS:
318 raise SharedanceFormatError(
319 "Unrecognized flags: %s" % flags)
320 line = _conn.readline()
321 try:
322 expected = int(line.rstrip())
323 except (TypeError, ValueError):
324 raise SharedanceFormatError(
325 "Invalid value length: %r" % line)
326
327 data = _stream.read_exact(_conn, expected)
328
329 if expected >= 0 and len(data) != expected:
330 raise SharedanceCommandError("Fetch incomplete")
331 if flags & FLAG_COMPRESSED:
332 if _zlib is None:
333 raise SharedanceCommandError(
334 "Cannot uncompress fetched value (no zlib)")
335 else:
336 try:
337 data = _zlib.decompress(data)
338 except _zlib.error, e:
339 raise SharedanceFormatError(
340 "Decompression error: %s" % str(e))
341 return data
342
343 @_Connected
344 - def delete(self, key, _conn=None):
360
361 - def check(self, key=None, content=None, fetch=None):
362 """
363 Check sharedance server for functionality
364
365 The check results are stored in a dict with the following keys:
366
367 ``spec``
368 [tuple] host and port (``('host', port)``)
369 ``error``
370 [bool] Was there an error?
371 ``status``
372 [unicode] Human readable status description
373 ``time``
374 [int] Time taken for the whole check in microseconds
375 (basically a store/fetch/delete cycle)
376 ``weight``
377 [int] Weight of this particular sharedance (just for convenience)
378
379 :Parameters:
380 - `key`: Key to store under. If omitted or ``None`` it defaults to
381 ``CHECK-<random>``.
382 - `content`: Content to store. If omitted or ``None`` it defaults
383 to a random string
384 - `fetch`: Try fetching the content? If omitted or ``None`` it
385 defaults to ``True``
386
387 :Types:
388 - `key`: ``str``
389 - `content`: ``str``
390 - `fetch`: ``bool``
391
392 :return: Check results
393 :rtype: ``dict``
394 """
395
396
397 if key is None:
398 key = "CHECK-%s" % escape(_os.urandom(6))
399 if content is None:
400 content = _os.urandom(8)
401 if fetch is None:
402 fetch = True
403
404 error, status = False, u"OK"
405 start = _datetime.datetime.utcnow()
406
407 try:
408 self.store(key, content)
409 except SharedanceError, e:
410 error, status = True, u"Store of %s failed: %s" % (
411 repr(key).decode('latin-1'),
412 str(e).decode('latin-1'),
413 )
414 else:
415 try:
416 if fetch:
417 result = self.fetch(key)
418 else:
419 result = content
420 except KeyError:
421 error, status = (
422 True,
423 u"Fetch failed: Key does not exist: %s" %
424 repr(key).decode('latin-1')
425 )
426 except SharedanceError, e:
427 error, status = True, u"Fetch of %s failed: %s" % (
428 repr(key).decode('latin-1'),
429 str(e).decode('latin-1'),
430 )
431 else:
432 if result != content:
433 error, status = (
434 True,
435 u"Store/Fetch cycle of %s failed: %s != %s" % (
436 repr(key).decode('latin-1'),
437 repr(result).decode('latin-1'),
438 repr(content).decode('latin-1'),
439 )
440 )
441
442
443 try:
444 self.delete(key)
445 except SharedanceError, e:
446 msg = u"Deletion of %s failed: %s" % (
447 repr(key).decode('latin-1'), str(e).decode('latin-1')
448 )
449 if error:
450 status = u", ".join(status, msg)
451 else:
452 error, status = True, msg
453
454 timediff = _datetime.datetime.utcnow() - start
455 timediff = timediff.seconds * 1000000 + timediff.microseconds
456 return dict(
457 spec = (self.host, self.port),
458 error = error,
459 status = status,
460 time = timediff,
461 weight = self.weight,
462 )
463