Package restkit :: Module tee
[hide private]

Source Code for Module restkit.tee

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of restkit released under the MIT license. 
  4  # See the NOTICE for more information. 
  5   
  6   
  7  """ 
  8  TeeInput replace old FileInput. It use a file 
  9  if size > MAX_BODY or memory. It's now possible to rewind 
 10  read or restart etc ... It's based on TeeInput from Gunicorn. 
 11   
 12  """ 
 13  import copy 
 14  import os 
 15  try: 
 16      from cStringIO import StringIO 
 17  except ImportError: 
 18      from StringIO import StringIO 
 19  import tempfile 
 20   
 21  from restkit import conn 
 22   
23 -class TeeInput(object):
24 25 CHUNK_SIZE = conn.CHUNK_SIZE 26
27 - def __init__(self, stream):
28 self.buf = StringIO() 29 self.eof = False 30 31 if isinstance(stream, basestring): 32 stream = StringIO(stream) 33 self.tmp = StringIO() 34 else: 35 self.tmp = tempfile.TemporaryFile() 36 37 self.stream = stream
38
39 - def __enter__(self):
40 return self
41
42 - def __exit__(self, exc_type, exc_val, traceback):
43 return
44
45 - def seek(self, offset, whence=0):
46 """ naive implementation of seek """ 47 current_size = self._tmp_size() 48 diff = 0 49 if whence == 0: 50 diff = offset - current_size 51 elif whence == 2: 52 diff = (self.tmp.tell() + offset) - current_size 53 elif whence == 3 and not self.eof: 54 # we read until the end 55 while True: 56 self.tmp.seek(0, 2) 57 if not self._tee(self.CHUNK_SIZE): 58 break 59 60 if not self.eof and diff > 0: 61 self._ensure_length(StringIO(), diff) 62 self.tmp.seek(offset, whence)
63
64 - def flush(self):
65 self.tmp.flush()
66
67 - def read(self, length=-1):
68 """ read """ 69 if self.eof: 70 return self.tmp.read(length) 71 72 if length < 0: 73 buf = StringIO() 74 buf.write(self.tmp.read()) 75 while True: 76 chunk = self._tee(self.CHUNK_SIZE) 77 if not chunk: 78 break 79 buf.write(chunk) 80 return buf.getvalue() 81 else: 82 dest = StringIO() 83 diff = self._tmp_size() - self.tmp.tell() 84 if not diff: 85 dest.write(self._tee(length)) 86 return self._ensure_length(dest, length) 87 else: 88 l = min(diff, length) 89 dest.write(self.tmp.read(l)) 90 return self._ensure_length(dest, length)
91
92 - def readline(self, size=-1):
93 if self.eof: 94 return self.tmp.readline() 95 96 orig_size = self._tmp_size() 97 if self.tmp.tell() == orig_size: 98 if not self._tee(self.CHUNK_SIZE): 99 return '' 100 self.tmp.seek(orig_size) 101 102 # now we can get line 103 line = self.tmp.readline() 104 if line.find("\n") >=0: 105 return line 106 107 buf = StringIO() 108 buf.write(line) 109 while True: 110 orig_size = self.tmp.tell() 111 data = self._tee(self.CHUNK_SIZE) 112 if not data: 113 break 114 self.tmp.seek(orig_size) 115 buf.write(self.tmp.readline()) 116 if data.find("\n") >= 0: 117 break 118 return buf.getvalue()
119
120 - def readlines(self, sizehint=0):
121 total = 0 122 lines = [] 123 line = self.readline() 124 while line: 125 lines.append(line) 126 total += len(line) 127 if 0 < sizehint <= total: 128 break 129 line = self.readline() 130 return lines
131
132 - def close(self):
133 if not self.eof: 134 # we didn't read until the end 135 self._close_unreader() 136 return self.tmp.close()
137
138 - def next(self):
139 r = self.readline() 140 if not r: 141 raise StopIteration 142 return r
143 __next__ = next 144
145 - def __iter__(self):
146 return self
147
148 - def _tee(self, length):
149 """ fetch partial body""" 150 buf2 = self.buf 151 buf2.seek(0, 2) 152 chunk = self.stream.read(length) 153 if chunk: 154 self.tmp.write(chunk) 155 self.tmp.flush() 156 self.tmp.seek(0, 2) 157 return chunk 158 159 self._finalize() 160 return ""
161
162 - def _finalize(self):
163 """ here we wil fetch final trailers 164 if any.""" 165 self.eof = True
166
167 - def _tmp_size(self):
168 if hasattr(self.tmp, 'fileno'): 169 return int(os.fstat(self.tmp.fileno())[6]) 170 else: 171 return len(self.tmp.getvalue())
172
173 - def _ensure_length(self, dest, length):
174 if len(dest.getvalue()) < length: 175 data = self._tee(length - len(dest.getvalue())) 176 dest.write(data) 177 return dest.getvalue()
178
179 -class ResponseTeeInput(TeeInput):
180 181 CHUNK_SIZE = conn.CHUNK_SIZE 182
183 - def __init__(self, resp, connection, should_close=False):
184 self.buf = StringIO() 185 self.resp = resp 186 self.stream =resp.body_stream() 187 self.connection = connection 188 self.should_close = should_close 189 self.eof = False 190 191 # set temporary body 192 clen = int(resp.headers.get('content-length') or -1) 193 if clen >= 0: 194 if (clen <= conn.MAX_BODY): 195 self.tmp = StringIO() 196 else: 197 self.tmp = tempfile.TemporaryFile() 198 else: 199 self.tmp = tempfile.TemporaryFile()
200
201 - def close(self):
202 if not self.eof: 203 # we didn't read until the end 204 self._close_unreader() 205 return self.tmp.close()
206
207 - def _close_unreader(self):
208 if not self.eof: 209 self.stream.close() 210 self.connection.release(self.should_close)
211
212 - def _finalize(self):
213 """ here we wil fetch final trailers 214 if any.""" 215 self.eof = True 216 self._close_unreader()
217