1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Generic Buffered ``file`` Stream
19 ================================
20
21 In order to use the stream, you need to supply an actual implementation
22 of the low level octet stream. This stream implementation is useful
23 in order to decorate other streams and not implement the full API every time.
24
25 :Variables:
26 - `dev_null`: /dev/null like stream (EOF on reading, doing nothing on
27 writing)
28
29 :Types:
30 - `dev_null`: `GenericStream`
31 """
32 __author__ = u"Andr\xe9 Malo"
33 __docformat__ = "restructuredtext en"
34
35
36 import errno as _errno
37 import socket as _socket
38
39 from wtf.util import Property
43 """
44 Represents a buffered stream
45
46 :CVariables:
47 - `_DEFAULT_CHUNK_SIZE`: Default buffer size
48
49 :IVariables:
50 - `_octet_stream`: The stream to actually fetch the data from
51 - `_chunk_size`: Actual buffer size to use
52 - `_blockiter`: Block iteration size (``1: block = line``)
53 - `_rbuffer`: read buffer
54 - `_wbuffer`: write buffer
55 - `_flush`: underlying flush function
56 - `_closed`: Is the stream closed?
57 - `softspace`: Softspace parameter
58
59 :Types:
60 - `_DEFAULT_CHUNK_SIZE`: ``int``
61 - `_octet_stream`: ``file``
62 - `_chunk_size`: ``int``
63 - `_blockiter`: ``int``
64 - `_rbuffer`: ``list``
65 - `_wbuffer`: ``list``
66 - `_flush`: ``callable``
67 - `_closed`: ``bool``
68 - `softspace`: ``bool``
69 """
70 _DEFAULT_CHUNK_SIZE = 8192
71 _octet_stream = None
72 _closed = True
73 softspace = False
74
75 - def __new__(cls, stream, buffering=-1, blockiter=1, read_exact=False):
76 """
77 Construction
78
79 :Parameters:
80 - `stream`: The low level stream
81 - `buffering`: The buffer specification:
82 (``< 0``: default buffer size, ``==0``: unbuffered,
83 ``>0``: this very buffer size)
84 - `blockiter`: iteration block spec
85 (``<= 0: default chunk, 1: line, > 1: this blocksize``)
86 - `read_exact`: Try reading up to the requested size? If ``False``,
87 a simple ``.read(size)`` will always yield the chunk size at max.
88 This is normal behaviour on slow calls, e.g. sockets. However,
89 some software expects to get the maximum (or even the exact value)
90 bytes. In this case use a stream with ``read_exact`` set to
91 ``True``.
92
93 :Types:
94 - `stream`: ``file``
95 - `buffering`: ``int``
96 - `blockiter`: ``int``
97 - `read_exact`: ``bool``
98 """
99
100
101 self = super(GenericStream, cls).__new__(cls)
102 if buffering < 0:
103 buffering = self._DEFAULT_CHUNK_SIZE
104 elif buffering == 0:
105 buffering = 1
106 self._octet_stream = stream
107 self._chunk_size = buffering
108 self._blockiter = max(0, blockiter) or self._DEFAULT_CHUNK_SIZE
109 self._rbuffer = []
110 self._wbuffer = []
111 self._closed = False
112 try:
113 self._flush = self._octet_stream.flush
114 except AttributeError:
115 self._flush = lambda: None
116 if read_exact:
117 self.read = self.read_exact
118 return self
119
122
124 """ Iterator generator """
125 return self
126
128 """ Return the next line or block """
129 if self._blockiter == 1:
130 line = self.readline()
131 else:
132 line = self.read(self._blockiter)
133 if not line:
134 raise StopIteration()
135 return line
136
137 @Property
139 """
140 The name of the stream, if any
141
142 :Type: ``str``
143 """
144
145
146 def fget(self):
147 return self._octet_stream.name
148 return locals()
149
150 @Property
152 """
153 Is the stream is closed?
154
155 :Type: ``bool``
156 """
157
158
159 def fget(self):
160 stream = self._octet_stream
161 if stream is not None:
162 try:
163 return stream.closed
164 except AttributeError:
165 return self._closed
166 return True
167 return locals()
168
170 """ Determine underlying fileno """
171 if self.closed:
172 raise ValueError("I/O operation on closed stream")
173
174 return self._octet_stream.fileno()
175
195
196 - def flush(self, _passdown=True):
197 """
198 Flush the write buffer
199
200 :Parameters:
201 - `_passdown`: Call flush() on the underlying stream, too
202
203 :Types:
204 - `_passdown`: ``bool``
205 """
206 if self.closed:
207 raise ValueError("I/O operation on closed stream")
208
209
210 buf, self._wbuffer = "".join(self._wbuffer), []
211 if buf:
212 self._octet_stream.write(buf)
213 if _passdown:
214 self._flush()
215
217 """
218 Does the stream refer to a tty?
219
220 :return: Does the stream refer to a tty?
221 :rtype: ``bool``
222 """
223 if self.closed:
224 raise ValueError("I/O operation on closed stream")
225
226 try:
227 isatty = self._octet_stream.isatty
228 except AttributeError:
229 return False
230 return isatty()
231
232 - def read(self, size=-1):
233 """
234 Reads a specified amount of bytes (at max) from the stream
235
236 :Parameters:
237 - `size`: The maximum number of bytes to read (``< 0`` means
238 to slurp the whole stream; ``== 0`` means to return the current
239 buffer or the next buffer it the current buffer is empty)
240
241 :Types:
242 - `size`: ``int``
243
244 :return: The read bytes; if empty you've hit EOF
245 :rtype: ``str``
246
247 :Exceptions:
248 - `ValueError`: The stream is closed
249 """
250 return self._bufferedread(size)
251
253 """
254 Read exactly size bytes from stream, except on EOF
255
256 :Parameters:
257 - `size`: expected number of bytes
258
259 :Types:
260 - `size`: ``int``
261
262 :return: The read bytes
263 :rtype: ``str``
264 """
265 return _read_exact(self._bufferedread, size)
266
268 """
269 Read a line from the stream
270
271 :Parameters:
272 - `size`: The maximum number of bytes to read (``<= 0`` means
273 to read until the next newline or EOF, which is the default
274 behaviour)
275
276 :Types:
277 - `size`: ``int``
278
279 :return: The read bytes including the newline; if empty you've hit
280 EOF
281 :rtype: ``str``
282 """
283 if self.closed:
284 raise ValueError("I/O operation on closed stream")
285
286 if size < 0:
287 size = 0
288
289 read = self._bufferedread
290 linebuffer = read(size)
291 if linebuffer:
292 findstart = 0
293 while True:
294 newbuffer = None
295 eolpos = linebuffer.find("\n", findstart)
296 if eolpos >= 0 and (size == 0 or eolpos < size):
297 self._unread(linebuffer[eolpos + 1:])
298 linebuffer = linebuffer[:eolpos + 1]
299 break
300 elif size > 0:
301 llen = len(linebuffer)
302 if llen == size:
303 break
304 elif llen > size:
305 self._unread(linebuffer[size:])
306 linebuffer = linebuffer[:size]
307 break
308 else:
309 newbuffer = read(size - llen)
310 else:
311 newbuffer = read(size)
312 if not newbuffer:
313 break
314 findstart = len(linebuffer)
315 linebuffer += newbuffer
316 return linebuffer
317
319 """
320 Returns all lines as a list
321
322 :Parameters:
323 - `size`: Maximum size for a single line.
324
325 :Types:
326 - `size`: ``int``
327 """
328 lines = []
329 while True:
330 line = self.readline(size)
331 if not line:
332 break
333 lines.append(line)
334 return lines
335
337 """
338 Iterator of the lines
339
340 :Depreciated: Use the iterator API instead
341 """
342 return self
343
345 """
346 Write data into the stream
347
348 :Parameters:
349 - `data`: The data to write
350
351 :Types:
352 - `data`: ``str``
353 """
354 if self.closed:
355 raise ValueError("I/O operation on closed stream")
356
357 if data:
358 self._wbuffer.append(data)
359
360 if self._chunk_size <= 0 or \
361 sum(map(len, self._wbuffer)) > self._chunk_size:
362 self.flush(False)
363
365 """
366 Write lines to the stream
367
368 :Parameters:
369 - `lines`: The list of lines to write
370
371 :Types:
372 - `lines`: ``iterable``
373 """
374 for line in lines:
375 self.write(line)
376
378 """
379 Pushes `tounread` octets back
380
381 :Parameters:
382 - `tounread`: The buffer to push back
383
384 :Types:
385 - `tounread`: ``str``
386 """
387 if tounread:
388 self._rbuffer.append(tounread)
389
391 """
392 Read a specified amount of bytes (at max) from the stream
393
394 :Parameters:
395 - `size`: The maximum number of bytes to read (``< 0`` means
396 to slurp the whole stream; ``== 0`` means to return the current
397 buffer or the next buffer it the current buffer is empty)
398
399 :Types:
400 - `size`: ``int``
401
402 :return: The read bytes; if empty you've hit EOF
403 :rtype: ``str``
404
405 :Exceptions:
406 - `ValueError`: The stream is closed
407 """
408
409
410 if self.closed:
411 raise ValueError("I/O operation on closed stream")
412
413 buf, chunk_size = self._rbuffer, self._chunk_size
414 if size == 0:
415 if buf:
416 return buf.pop()
417
418 size = chunk_size
419
420
421 buf = "".join(buf[::-1])
422 if size < 0:
423 chunks = [buf]
424 else:
425 chunks, buf = [buf[:size]], buf[size:]
426 if buf:
427 self._rbuffer = [buf]
428 return chunks[0]
429 self._rbuffer = []
430
431 bytes_return = size < 0 and size - 1 or len(chunks[0])
432 while bytes_return < size:
433 if size > 0:
434 chunk_size = min(size - bytes_return, chunk_size)
435 chunk = self._octet_stream.read(chunk_size)
436 if not chunk:
437 break
438 elif size < 0:
439 chunks.append(chunk)
440 else:
441 bytes_toadd = size - bytes_return
442 chunk_toadd = chunk[:bytes_toadd]
443 chunks.append(chunk_toadd)
444 buf = chunk[bytes_toadd:]
445 if buf:
446 self._rbuffer = [buf]
447 break
448 bytes_return += len(chunk_toadd)
449 if size > 0:
450 break
451 return "".join(chunks)
452
455 """
456 Minimal stream out of a socket
457
458 This effectively maps ``recv`` to ``read`` and ``sendall`` to ``write``.
459
460 :See: `GenericStream`
461
462 :IVariables:
463 `_sock` : ``socket.socket``
464 The socket in question
465
466 `_shutdown` : ``int``
467 shutdown parameter on close
468 """
469 name = '<socket>'
470
471 - def __init__(self, sock, shutdown=None):
472 """
473 Initialization
474
475 :Parameters:
476 `sock` : ``socket.socket``
477 The socket in question
478
479 `shutdown` : ``int``
480 Shutdown parameter on close (``socket.SHUT_*``). If omitted or
481 ``None``, the close method of the socket is called (if exists).
482 """
483 if shutdown is not None and shutdown < 0:
484 shutdown = None
485 self._shutdown = shutdown
486 self._sock = sock
487
490
492 """
493 Delegate all unknown symbol requests to the socket itself
494
495 :Parameters:
496 - `name`: The symbol to lookup
497
498 :Types:
499 - `name`: ``str``
500
501 :return: The looked up symbol
502 :rtype: any
503
504 :Exceptions:
505 - `AttributeError`: Symbol not found
506 """
507 return getattr(self._sock, name)
508
509 @Property
511 """
512 Is the stream closed?
513
514 :Type: ``bool``
515 """
516
517
518 def fget(self):
519 return self._sock is None
520 return locals()
521
523 """ Close the stream (not necessarily the socket) """
524 try:
525 sock, self._sock = self._sock, None
526 except AttributeError:
527 pass
528 else:
529 if sock is not None:
530 if self._shutdown is not None:
531 try:
532 shutdown = sock.shutdown
533 except AttributeError:
534 pass
535 else:
536 try:
537 shutdown(self._shutdown)
538 except _socket.error, e:
539 if e[0] != _errno.ENOTCONN:
540 raise
541 else:
542 try:
543 close = sock.close
544 except AttributeError:
545 pass
546 else:
547 close()
548
549 - def read(self, size):
550 """
551 Read `size` bytes (or less) from the socket
552
553 :Parameters:
554 - `size`: The number of bytes to read (``> 0``)
555
556 :Types:
557 - `size`: ``int``
558
559 :return: The bytes read
560 :rtype: ``str``
561
562 :Exceptions:
563 - `ValueError`: The stream is closed
564 - `socket.error`: Something happened to the socket
565 """
566 if self.closed:
567 raise ValueError("I/O operation on closed stream")
568 return self._sock.recv(size)
569
571 """
572 Write data to the socket
573
574 :Parameters:
575 - `data`: The data to write
576
577 :Types:
578 - `data`: ``str``
579
580 :Exceptions:
581 - `ValueError`: The stream is closed
582 - `socket.error`: Something happened to the socket
583 """
584 if self.closed:
585 raise ValueError("I/O operation on closed stream")
586 self._sock.sendall(data)
587
590 """
591 Read exactly size bytes from stream, except on EOF
592
593 :Parameters:
594 - `stream`: The stream to read from.
595 - `size`: expected number of bytes
596
597 :Types:
598 - `stream`: ``file``
599 - `size`: ``int``
600
601 :return: The read bytes
602 :rtype: ``str``
603 """
604 return _read_exact(stream.read, size)
605
608 """
609 Read exactly size bytes with `read`, except on EOF
610
611 :Parameters:
612 - `read`: The reading function
613 - `size`: expected number of bytes
614
615 :Types:
616 - `read`: ``callable``
617 - `size`: ``int``
618
619 :return: The read bytes
620 :rtype: ``str``
621 """
622 if size < 0:
623 return read(size)
624 vlen, buf = 0, []
625 push = buf.append
626 while vlen < size:
627 val = read(size - vlen)
628 if not val:
629 break
630 vlen += len(val)
631 push(val)
632 return "".join(buf)
633
634
635 from wtf import c_override
636 cimpl = c_override('_wtf_cstream')
637 if cimpl is not None:
638
639 GenericStream = cimpl.GenericStream
640 MinimalSocketStream = cimpl.MinimalSocketStream
641 read_exact = cimpl.read_exact
642 del c_override, cimpl
646 """
647 /dev/null like stream
648
649 Returns EOF on read requests and throws away any written stuff.
650 """
651 - def read(self, size=-1):
652 """ Return EOF """
653
654
655 return ""
656
658 """ Do nothing """
659 pass
660 dev_null = GenericStream(dev_null())
661