Package wtf :: Module stream
[hide private]
[frames] | no frames]

Source Code for Module wtf.stream

  1  # -*- coding: ascii -*- 
  2  # 
  3  # Copyright 2005-2012 
  4  # Andr\xe9 Malo or his licensors, as applicable 
  5  # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); 
  7  # you may not use this file except in compliance with the License. 
  8  # You may obtain a copy of the License at 
  9  # 
 10  #     http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, software 
 13  # distributed under the License is distributed on an "AS IS" BASIS, 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 15  # See the License for the specific language governing permissions and 
 16  # limitations under the License. 
 17  """ 
 18  Generic Buffered ``file`` Stream 
 19  ================================ 
 20   
 21  In order to use the stream, you need to supply an actual implementation 
 22  of the low level octet stream. This stream implementation is useful 
 23  in order to decorate other streams and not implement the full API every time. 
 24   
 25  :Variables: 
 26   - `dev_null`: /dev/null like stream (EOF on reading, doing nothing on 
 27     writing) 
 28   
 29  :Types: 
 30   - `dev_null`: `GenericStream` 
 31  """ 
 32  __author__ = u"Andr\xe9 Malo" 
 33  __docformat__ = "restructuredtext en" 
 34   
 35   
 36  import errno as _errno 
 37  import socket as _socket 
 38   
 39  from wtf.util import Property 
40 41 42 -class GenericStream(object):
43 """ 44 Represents a buffered stream 45 46 :CVariables: 47 - `_DEFAULT_CHUNK_SIZE`: Default buffer size 48 49 :IVariables: 50 - `_octet_stream`: The stream to actually fetch the data from 51 - `_chunk_size`: Actual buffer size to use 52 - `_blockiter`: Block iteration size (``1: block = line``) 53 - `_rbuffer`: read buffer 54 - `_wbuffer`: write buffer 55 - `_flush`: underlying flush function 56 - `_closed`: Is the stream closed? 57 - `softspace`: Softspace parameter 58 59 :Types: 60 - `_DEFAULT_CHUNK_SIZE`: ``int`` 61 - `_octet_stream`: ``file`` 62 - `_chunk_size`: ``int`` 63 - `_blockiter`: ``int`` 64 - `_rbuffer`: ``list`` 65 - `_wbuffer`: ``list`` 66 - `_flush`: ``callable`` 67 - `_closed`: ``bool`` 68 - `softspace`: ``bool`` 69 """ 70 _DEFAULT_CHUNK_SIZE = 8192 # must be > 1 71 _octet_stream = None 72 _closed = True 73 softspace = False 74
75 - def __new__(cls, stream, buffering=-1, blockiter=1, read_exact=False):
76 """ 77 Construction 78 79 :Parameters: 80 - `stream`: The low level stream 81 - `buffering`: The buffer specification: 82 (``< 0``: default buffer size, ``==0``: unbuffered, 83 ``>0``: this very buffer size) 84 - `blockiter`: iteration block spec 85 (``<= 0: default chunk, 1: line, > 1: this blocksize``) 86 - `read_exact`: Try reading up to the requested size? If ``False``, 87 a simple ``.read(size)`` will always yield the chunk size at max. 88 This is normal behaviour on slow calls, e.g. sockets. However, 89 some software expects to get the maximum (or even the exact value) 90 bytes. In this case use a stream with ``read_exact`` set to 91 ``True``. 92 93 :Types: 94 - `stream`: ``file`` 95 - `buffering`: ``int`` 96 - `blockiter`: ``int`` 97 - `read_exact`: ``bool`` 98 """ 99 # pylint: disable = W0212, W0621 100 101 self = super(GenericStream, cls).__new__(cls) 102 if buffering < 0: 103 buffering = self._DEFAULT_CHUNK_SIZE 104 elif buffering == 0: 105 buffering = 1 106 self._octet_stream = stream 107 self._chunk_size = buffering 108 self._blockiter = max(0, blockiter) or self._DEFAULT_CHUNK_SIZE 109 self._rbuffer = [] 110 self._wbuffer = [] 111 self._closed = False 112 try: 113 self._flush = self._octet_stream.flush 114 except AttributeError: 115 self._flush = lambda: None 116 if read_exact: 117 self.read = self.read_exact 118 return self
119
120 - def __del__(self):
121 self.close()
122
123 - def __iter__(self):
124 """ Iterator generator """ 125 return self
126
127 - def next(self):
128 """ Return the next line or block """ 129 if self._blockiter == 1: # pylint: disable = E1101 130 line = self.readline() 131 else: 132 line = self.read(self._blockiter) # pylint: disable = E1101 133 if not line: 134 raise StopIteration() 135 return line
136 137 @Property
138 - def name():
139 """ 140 The name of the stream, if any 141 142 :Type: ``str`` 143 """ 144 # pylint: disable = E0211, C0111, W0212, W0612 145 146 def fget(self): 147 return self._octet_stream.name
148 return locals()
149 150 @Property
151 - def closed():
152 """ 153 Is the stream is closed? 154 155 :Type: ``bool`` 156 """ 157 # pylint: disable = E0211, C0111, W0212, W0612 158 159 def fget(self): 160 stream = self._octet_stream 161 if stream is not None: 162 try: 163 return stream.closed 164 except AttributeError: 165 return self._closed 166 return True
167 return locals() 168
169 - def fileno(self):
170 """ Determine underlying fileno """ 171 if self.closed: 172 raise ValueError("I/O operation on closed stream") 173 174 return self._octet_stream.fileno()
175
176 - def close(self):
177 """ 178 Close the stream 179 180 The call is passed to the underlying octet stream. 181 """ 182 if not self.closed: 183 try: 184 self.flush(False) 185 finally: 186 self._closed, stream, self._octet_stream = \ 187 True, self._octet_stream, None 188 if stream is not None: 189 try: 190 close = stream.close 191 except AttributeError: 192 pass # well, just don't close it then 193 else: 194 close()
195
196 - def flush(self, _passdown=True):
197 """ 198 Flush the write buffer 199 200 :Parameters: 201 - `_passdown`: Call flush() on the underlying stream, too 202 203 :Types: 204 - `_passdown`: ``bool`` 205 """ 206 if self.closed: 207 raise ValueError("I/O operation on closed stream") 208 209 # pylint: disable = W0201 210 buf, self._wbuffer = "".join(self._wbuffer), [] 211 if buf: 212 self._octet_stream.write(buf) 213 if _passdown: 214 self._flush() # pylint: disable = E1101
215
216 - def isatty(self):
217 """ 218 Does the stream refer to a tty? 219 220 :return: Does the stream refer to a tty? 221 :rtype: ``bool`` 222 """ 223 if self.closed: 224 raise ValueError("I/O operation on closed stream") 225 226 try: 227 isatty = self._octet_stream.isatty 228 except AttributeError: 229 return False 230 return isatty()
231
232 - def read(self, size=-1):
233 """ 234 Reads a specified amount of bytes (at max) from the stream 235 236 :Parameters: 237 - `size`: The maximum number of bytes to read (``< 0`` means 238 to slurp the whole stream; ``== 0`` means to return the current 239 buffer or the next buffer it the current buffer is empty) 240 241 :Types: 242 - `size`: ``int`` 243 244 :return: The read bytes; if empty you've hit EOF 245 :rtype: ``str`` 246 247 :Exceptions: 248 - `ValueError`: The stream is closed 249 """ 250 return self._bufferedread(size)
251
252 - def read_exact(self, size=-1):
253 """ 254 Read exactly size bytes from stream, except on EOF 255 256 :Parameters: 257 - `size`: expected number of bytes 258 259 :Types: 260 - `size`: ``int`` 261 262 :return: The read bytes 263 :rtype: ``str`` 264 """ 265 return _read_exact(self._bufferedread, size)
266
267 - def readline(self, size=0):
268 """ 269 Read a line from the stream 270 271 :Parameters: 272 - `size`: The maximum number of bytes to read (``<= 0`` means 273 to read until the next newline or EOF, which is the default 274 behaviour) 275 276 :Types: 277 - `size`: ``int`` 278 279 :return: The read bytes including the newline; if empty you've hit 280 EOF 281 :rtype: ``str`` 282 """ 283 if self.closed: 284 raise ValueError("I/O operation on closed stream") 285 286 if size < 0: 287 size = 0 # read default chunks 288 289 read = self._bufferedread 290 linebuffer = read(size) 291 if linebuffer: 292 findstart = 0 293 while True: 294 newbuffer = None 295 eolpos = linebuffer.find("\n", findstart) 296 if eolpos >= 0 and (size == 0 or eolpos < size): 297 self._unread(linebuffer[eolpos + 1:]) 298 linebuffer = linebuffer[:eolpos + 1] 299 break 300 elif size > 0: 301 llen = len(linebuffer) 302 if llen == size: 303 break 304 elif llen > size: 305 self._unread(linebuffer[size:]) 306 linebuffer = linebuffer[:size] 307 break 308 else: 309 newbuffer = read(size - llen) 310 else: 311 newbuffer = read(size) 312 if not newbuffer: 313 break 314 findstart = len(linebuffer) 315 linebuffer += newbuffer 316 return linebuffer
317
318 - def readlines(self, size=0):
319 """ 320 Returns all lines as a list 321 322 :Parameters: 323 - `size`: Maximum size for a single line. 324 325 :Types: 326 - `size`: ``int`` 327 """ 328 lines = [] 329 while True: 330 line = self.readline(size) 331 if not line: 332 break 333 lines.append(line) 334 return lines
335
336 - def xreadlines(self):
337 """ 338 Iterator of the lines 339 340 :Depreciated: Use the iterator API instead 341 """ 342 return self
343
344 - def write(self, data):
345 """ 346 Write data into the stream 347 348 :Parameters: 349 - `data`: The data to write 350 351 :Types: 352 - `data`: ``str`` 353 """ 354 if self.closed: 355 raise ValueError("I/O operation on closed stream") 356 357 if data: 358 self._wbuffer.append(data) 359 # pylint: disable = E1101 360 if self._chunk_size <= 0 or \ 361 sum(map(len, self._wbuffer)) > self._chunk_size: 362 self.flush(False)
363
364 - def writelines(self, lines):
365 """ 366 Write lines to the stream 367 368 :Parameters: 369 - `lines`: The list of lines to write 370 371 :Types: 372 - `lines`: ``iterable`` 373 """ 374 for line in lines: 375 self.write(line)
376
377 - def _unread(self, tounread):
378 """ 379 Pushes `tounread` octets back 380 381 :Parameters: 382 - `tounread`: The buffer to push back 383 384 :Types: 385 - `tounread`: ``str`` 386 """ 387 if tounread: 388 self._rbuffer.append(tounread)
389
390 - def _bufferedread(self, size):
391 """ 392 Read a specified amount of bytes (at max) from the stream 393 394 :Parameters: 395 - `size`: The maximum number of bytes to read (``< 0`` means 396 to slurp the whole stream; ``== 0`` means to return the current 397 buffer or the next buffer it the current buffer is empty) 398 399 :Types: 400 - `size`: ``int`` 401 402 :return: The read bytes; if empty you've hit EOF 403 :rtype: ``str`` 404 405 :Exceptions: 406 - `ValueError`: The stream is closed 407 """ 408 # pylint: disable = E1101, R0912 409 410 if self.closed: 411 raise ValueError("I/O operation on closed stream") 412 413 buf, chunk_size = self._rbuffer, self._chunk_size 414 if size == 0: 415 if buf: 416 return buf.pop() 417 # else: 418 size = chunk_size 419 420 # return `size` bytes; < 0 means 'slurp all' 421 buf = "".join(buf[::-1]) # flatten the reverse buffer array 422 if size < 0: 423 chunks = [buf] 424 else: 425 chunks, buf = [buf[:size]], buf[size:] 426 if buf: 427 self._rbuffer = [buf] 428 return chunks[0] 429 self._rbuffer = [] 430 431 bytes_return = size < 0 and size - 1 or len(chunks[0]) 432 while bytes_return < size: 433 if size > 0: 434 chunk_size = min(size - bytes_return, chunk_size) 435 chunk = self._octet_stream.read(chunk_size) 436 if not chunk: 437 break 438 elif size < 0: 439 chunks.append(chunk) 440 else: 441 bytes_toadd = size - bytes_return 442 chunk_toadd = chunk[:bytes_toadd] 443 chunks.append(chunk_toadd) 444 buf = chunk[bytes_toadd:] 445 if buf: 446 self._rbuffer = [buf] # pylint: disable = W0201 447 break 448 bytes_return += len(chunk_toadd) 449 if size > 0: 450 break 451 return "".join(chunks)
452
453 454 -class MinimalSocketStream(object):
455 """ 456 Minimal stream out of a socket 457 458 This effectively maps ``recv`` to ``read`` and ``sendall`` to ``write``. 459 460 :See: `GenericStream` 461 462 :IVariables: 463 `_sock` : ``socket.socket`` 464 The socket in question 465 466 `_shutdown` : ``int`` 467 shutdown parameter on close 468 """ 469 name = '<socket>' 470
471 - def __init__(self, sock, shutdown=None):
472 """ 473 Initialization 474 475 :Parameters: 476 `sock` : ``socket.socket`` 477 The socket in question 478 479 `shutdown` : ``int`` 480 Shutdown parameter on close (``socket.SHUT_*``). If omitted or 481 ``None``, the close method of the socket is called (if exists). 482 """ 483 if shutdown is not None and shutdown < 0: 484 shutdown = None 485 self._shutdown = shutdown 486 self._sock = sock
487
488 - def __del__(self):
489 self.close()
490
491 - def __getattr__(self, name):
492 """ 493 Delegate all unknown symbol requests to the socket itself 494 495 :Parameters: 496 - `name`: The symbol to lookup 497 498 :Types: 499 - `name`: ``str`` 500 501 :return: The looked up symbol 502 :rtype: any 503 504 :Exceptions: 505 - `AttributeError`: Symbol not found 506 """ 507 return getattr(self._sock, name)
508 509 @Property
510 - def closed():
511 """ 512 Is the stream closed? 513 514 :Type: ``bool`` 515 """ 516 # pylint: disable = E0211, C0111, W0212, W0612 517 518 def fget(self): 519 return self._sock is None
520 return locals()
521
522 - def close(self):
523 """ Close the stream (not necessarily the socket) """ 524 try: 525 sock, self._sock = self._sock, None 526 except AttributeError: 527 pass 528 else: 529 if sock is not None: 530 if self._shutdown is not None: 531 try: 532 shutdown = sock.shutdown 533 except AttributeError: 534 pass 535 else: 536 try: 537 shutdown(self._shutdown) 538 except _socket.error, e: 539 if e[0] != _errno.ENOTCONN: 540 raise 541 else: 542 try: 543 close = sock.close 544 except AttributeError: 545 pass 546 else: 547 close()
548
549 - def read(self, size):
550 """ 551 Read `size` bytes (or less) from the socket 552 553 :Parameters: 554 - `size`: The number of bytes to read (``> 0``) 555 556 :Types: 557 - `size`: ``int`` 558 559 :return: The bytes read 560 :rtype: ``str`` 561 562 :Exceptions: 563 - `ValueError`: The stream is closed 564 - `socket.error`: Something happened to the socket 565 """ 566 if self.closed: 567 raise ValueError("I/O operation on closed stream") 568 return self._sock.recv(size)
569
570 - def write(self, data):
571 """ 572 Write data to the socket 573 574 :Parameters: 575 - `data`: The data to write 576 577 :Types: 578 - `data`: ``str`` 579 580 :Exceptions: 581 - `ValueError`: The stream is closed 582 - `socket.error`: Something happened to the socket 583 """ 584 if self.closed: 585 raise ValueError("I/O operation on closed stream") 586 self._sock.sendall(data)
587
588 589 -def read_exact(stream, size):
590 """ 591 Read exactly size bytes from stream, except on EOF 592 593 :Parameters: 594 - `stream`: The stream to read from. 595 - `size`: expected number of bytes 596 597 :Types: 598 - `stream`: ``file`` 599 - `size`: ``int`` 600 601 :return: The read bytes 602 :rtype: ``str`` 603 """ 604 return _read_exact(stream.read, size)
605
606 607 -def _read_exact(read, size):
608 """ 609 Read exactly size bytes with `read`, except on EOF 610 611 :Parameters: 612 - `read`: The reading function 613 - `size`: expected number of bytes 614 615 :Types: 616 - `read`: ``callable`` 617 - `size`: ``int`` 618 619 :return: The read bytes 620 :rtype: ``str`` 621 """ 622 if size < 0: 623 return read(size) 624 vlen, buf = 0, [] 625 push = buf.append 626 while vlen < size: 627 val = read(size - vlen) 628 if not val: 629 break 630 vlen += len(val) 631 push(val) 632 return "".join(buf)
633 634 635 from wtf import c_override 636 cimpl = c_override('_wtf_cstream') 637 if cimpl is not None: 638 # pylint: disable = E1103 639 GenericStream = cimpl.GenericStream 640 MinimalSocketStream = cimpl.MinimalSocketStream 641 read_exact = cimpl.read_exact 642 del c_override, cimpl
643 644 645 -class dev_null(object): # pylint: disable = C0103
646 """ 647 /dev/null like stream 648 649 Returns EOF on read requests and throws away any written stuff. 650 """
651 - def read(self, size=-1):
652 """ Return EOF """ 653 # pylint: disable = W0613 654 655 return ""
656
657 - def write(self, data):
658 """ Do nothing """ 659 pass
660 dev_null = GenericStream(dev_null()) 661