Package restkit :: Module client
[hide private]

Source Code for Module restkit.client

  1  # -*- coding: utf-8 - 
  2  # 
  3  # This file is part of restkit released under the MIT license. 
  4  # See the NOTICE for more information. 
  5  import base64 
  6  import errno 
  7  import logging 
  8  import os 
  9  import time 
 10  import socket 
 11  import ssl 
 12  import traceback 
 13  import types 
 14  import urlparse 
 15   
 16  try: 
 17      from http_parser.http import ( 
 18              HttpStream, BadStatusLine, NoMoreData 
 19      ) 
 20      from http_parser.reader import SocketReader 
 21  except ImportError: 
 22      raise ImportError("""http-parser isn't installed or out of data. 
 23   
 24          pip install http-parser""") 
 25   
 26  from restkit import __version__ 
 27   
 28  from restkit.conn import Connection 
 29  from restkit.errors import RequestError, RequestTimeout, RedirectLimit, \ 
 30  ProxyError 
 31  from restkit.session import get_session 
 32  from restkit.util import parse_netloc, rewrite_location, to_bytestring 
 33  from restkit.wrappers import Request, Response 
 34   
 35  MAX_CLIENT_TIMEOUT=300 
 36  MAX_CLIENT_CONNECTIONS = 5 
 37  MAX_CLIENT_TRIES =3 
 38  CLIENT_WAIT_TRIES = 0.3 
 39  MAX_FOLLOW_REDIRECTS = 5 
 40  USER_AGENT = "restkit/%s" % __version__ 
 41   
 42  log = logging.getLogger(__name__) 
 43   
44 -class Client(object):
45 46 """ A client handle a connection at a time. A client is threadsafe, 47 but an handled shouldn't be shared between threads. All connections 48 are shared between threads via a pool. 49 50 >>> from restkit import * 51 >>> c = Client() 52 >>> r = c.request("http://google.com") 53 r>>> r.status 54 '301 Moved Permanently' 55 >>> r.body_string() 56 '<HTML><HEAD><meta http-equiv="content-type" content="text/html;charset=utf-8">\n<TITLE>301 Moved</TITLE></HEAD><BODY>\n<H1>301 Moved</H1>\nThe document has moved\n<A HREF="http://www.google.com/">here</A>.\r\n</BODY></HTML>\r\n' 57 >>> c.follow_redirect = True 58 >>> r = c.request("http://google.com") 59 >>> r.status 60 '200 OK' 61 62 """ 63 64 version = (1, 1) 65 response_class=Response 66
67 - def __init__(self, 68 follow_redirect=False, 69 force_follow_redirect=False, 70 max_follow_redirect=MAX_FOLLOW_REDIRECTS, 71 filters=None, 72 decompress=True, 73 max_status_line_garbage=None, 74 max_header_count=0, 75 pool=None, 76 response_class=None, 77 timeout=None, 78 use_proxy=False, 79 max_tries=3, 80 wait_tries=0.3, 81 pool_size=10, 82 backend="thread", 83 **ssl_args):
84 """ 85 Client parameters 86 ~~~~~~~~~~~~~~~~~ 87 88 :param follow_redirect: follow redirection, by default False 89 :param max_ollow_redirect: number of redirections available 90 :filters: http filters to pass 91 :param decompress: allows the client to decompress the response 92 body 93 :param max_status_line_garbage: defines the maximum number of ignorable 94 lines before we expect a HTTP response's status line. With 95 HTTP/1.1 persistent connections, the problem arises that broken 96 scripts could return a wrong Content-Length (there are more 97 bytes sent than specified). Unfortunately, in some cases, this 98 cannot be detected after the bad response, but only before the 99 next one. So the client is abble to skip bad lines using this 100 limit. 0 disable garbage collection, None means unlimited number 101 of tries. 102 :param max_header_count: determines the maximum HTTP header count 103 allowed. by default no limit. 104 :param pool: the pool to use inherited from socketpool.Pool. By 105 default we use the global one. 106 :param response_class: the response class to use 107 :param timeout: the default timeout of the connection 108 (SO_TIMEOUT) 109 110 :param max_tries: the number of tries before we give up a 111 connection 112 :param wait_tries: number of time we wait between each tries. 113 :attr pool_size: int, default 10. Maximum number of connections we 114 keep in the default pool. 115 :param ssl_args: named argument, see ssl module for more 116 informations 117 """ 118 self.follow_redirect = follow_redirect 119 self.force_follow_redirect = force_follow_redirect 120 self.max_follow_redirect = max_follow_redirect 121 self.decompress = decompress 122 self.filters = filters or [] 123 self.max_status_line_garbage = max_status_line_garbage 124 self.max_header_count = max_header_count 125 self.use_proxy = use_proxy 126 127 self.request_filters = [] 128 self.response_filters = [] 129 self.load_filters() 130 131 132 # set manager 133 134 session_options = dict( 135 retry_delay=wait_tries, 136 max_size = pool_size, 137 retry_max = max_tries, 138 timeout = timeout) 139 140 141 if pool is None: 142 pool = get_session(backend, **session_options) 143 self._pool = pool 144 self.backend = backend 145 146 # change default response class 147 if response_class is not None: 148 self.response_class = response_class 149 150 self.max_tries = max_tries 151 self.wait_tries = wait_tries 152 self.pool_size = pool_size 153 self.timeout = timeout 154 155 self._nb_redirections = self.max_follow_redirect 156 self._url = None 157 self._initial_url = None 158 self._write_cb = None 159 self._headers = None 160 self._sock_key = None 161 self._sock = None 162 self._original = None 163 164 self.method = 'GET' 165 self.body = None 166 self.ssl_args = ssl_args or {}
167
168 - def load_filters(self):
169 """ Populate filters from self.filters. 170 Must be called each time self.filters is updated. 171 """ 172 for f in self.filters: 173 if hasattr(f, "on_request"): 174 self.request_filters.append(f) 175 if hasattr(f, "on_response"): 176 self.response_filters.append(f)
177 178 179
180 - def get_connection(self, request):
181 """ get a connection from the pool or create new one. """ 182 183 addr = parse_netloc(request.parsed_url) 184 is_ssl = request.is_ssl() 185 186 extra_headers = [] 187 conn = None 188 if self.use_proxy: 189 conn = self.proxy_connection(request, 190 addr, is_ssl) 191 if not conn: 192 conn = self._pool.get(host=addr[0], port=addr[1], 193 pool=self._pool, is_ssl=is_ssl, 194 extra_headers=extra_headers, **self.ssl_args) 195 196 197 return conn
198
199 - def proxy_connection(self, request, req_addr, is_ssl):
200 """ do the proxy connection """ 201 proxy_settings = os.environ.get('%s_proxy' % 202 request.parsed_url.scheme) 203 204 if proxy_settings and proxy_settings is not None: 205 request.is_proxied = True 206 207 proxy_settings, proxy_auth = _get_proxy_auth(proxy_settings) 208 addr = parse_netloc(urlparse.urlparse(proxy_settings)) 209 210 if is_ssl: 211 if proxy_auth: 212 proxy_auth = 'Proxy-authorization: %s' % proxy_auth 213 proxy_connect = 'CONNECT %s:%s HTTP/1.0\r\n' % req_addr 214 215 user_agent = request.headers.iget('user_agent') 216 if not user_agent: 217 user_agent = "User-Agent: restkit/%s\r\n" % __version__ 218 219 proxy_pieces = '%s%s%s\r\n' % (proxy_connect, proxy_auth, 220 user_agent) 221 222 conn = self._pool.get(host=addr[0], port=addr[1], 223 pool=self._pool, is_ssl=is_ssl, 224 extra_headers=[], proxy_pieces=proxy_pieces, **self.ssl_args) 225 else: 226 headers = [] 227 if proxy_auth: 228 headers = [('Proxy-authorization', proxy_auth)] 229 230 conn = self._pool.get(host=addr[0], port=addr[1], 231 pool=self._pool, is_ssl=False, 232 extra_headers=[], **self.ssl_args) 233 return conn 234 235 return
236
237 - def make_headers_string(self, request, extra_headers=None):
238 """ create final header string """ 239 headers = request.headers.copy() 240 if extra_headers is not None: 241 for k, v in extra_headers: 242 headers[k] = v 243 244 if not request.body and request.method in ('POST', 'PUT',): 245 headers['Content-Length'] = 0 246 247 if self.version == (1,1): 248 httpver = "HTTP/1.1" 249 else: 250 httpver = "HTTP/1.0" 251 252 ua = headers.iget('user-agent') 253 if not ua: 254 ua = USER_AGENT 255 host = request.host 256 257 accept_encoding = headers.iget('accept-encoding') 258 if not accept_encoding: 259 accept_encoding = 'identity' 260 261 if request.is_proxied: 262 full_path = ("https://" if request.is_ssl() else "http://") + request.host + request.path 263 else: 264 full_path = request.path 265 266 lheaders = [ 267 "%s %s %s\r\n" % (request.method, full_path, httpver), 268 "Host: %s\r\n" % host, 269 "User-Agent: %s\r\n" % ua, 270 "Accept-Encoding: %s\r\n" % accept_encoding 271 ] 272 273 lheaders.extend(["%s: %s\r\n" % (k, str(v)) for k, v in \ 274 headers.items() if k.lower() not in \ 275 ('user-agent', 'host', 'accept-encoding',)]) 276 if log.isEnabledFor(logging.DEBUG): 277 log.debug("Send headers: %s" % lheaders) 278 return "%s\r\n" % "".join(lheaders)
279
280 - def perform(self, request):
281 """ perform the request. If an error happen it will first try to 282 restart it """ 283 284 if log.isEnabledFor(logging.DEBUG): 285 log.debug("Start to perform request: %s %s %s" % 286 (request.host, request.method, request.path)) 287 tries = 0 288 while True: 289 conn = None 290 try: 291 # get or create a connection to the remote host 292 conn = self.get_connection(request) 293 294 # send headers 295 msg = self.make_headers_string(request, 296 conn.extra_headers) 297 298 # send body 299 if request.body is not None: 300 chunked = request.is_chunked() 301 if request.headers.iget('content-length') is None and \ 302 not chunked: 303 raise RequestError( 304 "Can't determine content length and " + 305 "Transfer-Encoding header is not chunked") 306 307 308 # handle 100-Continue status 309 # http://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html#sec8.2.3 310 hdr_expect = request.headers.iget("expect") 311 if hdr_expect is not None and \ 312 hdr_expect.lower() == "100-continue": 313 conn.send(msg) 314 msg = None 315 p = HttpStream(SocketReader(conn.socket()), kind=1, 316 decompress=True) 317 318 319 if p.status_code != 100: 320 self.reset_request() 321 if log.isEnabledFor(logging.DEBUG): 322 log.debug("return response class") 323 return self.response_class(conn, request, p) 324 325 chunked = request.is_chunked() 326 if log.isEnabledFor(logging.DEBUG): 327 log.debug("send body (chunked: %s)" % chunked) 328 329 330 if isinstance(request.body, types.StringTypes): 331 if msg is not None: 332 conn.send(msg + to_bytestring(request.body), 333 chunked) 334 else: 335 conn.send(to_bytestring(request.body), chunked) 336 else: 337 if msg is not None: 338 conn.send(msg) 339 340 if hasattr(request.body, 'read'): 341 if hasattr(request.body, 'seek'): 342 request.body.seek(0) 343 conn.sendfile(request.body, chunked) 344 else: 345 conn.sendlines(request.body, chunked) 346 if chunked: 347 conn.send_chunk("") 348 else: 349 conn.send(msg) 350 351 return self.get_response(request, conn) 352 except socket.gaierror, e: 353 if conn is not None: 354 conn.release(True) 355 raise RequestError(str(e)) 356 except socket.timeout, e: 357 if conn is not None: 358 conn.release(True) 359 raise RequestTimeout(str(e)) 360 except socket.error, e: 361 if log.isEnabledFor(logging.DEBUG): 362 log.debug("socket error: %s" % str(e)) 363 if conn is not None: 364 conn.close() 365 366 errors = (errno.EAGAIN, errno.EPIPE, errno.EBADF, 367 errno.ECONNRESET) 368 if e[0] not in errors or tries >= self.max_tries: 369 raise RequestError("socket.error: %s" % str(e)) 370 371 # should raised an exception in other cases 372 request.maybe_rewind(msg=str(e)) 373 374 except NoMoreData, e: 375 if conn is not None: 376 conn.release(True) 377 378 request.maybe_rewind(msg=str(e)) 379 if tries >= self.max_tries: 380 raise 381 except BadStatusLine: 382 383 if conn is not None: 384 conn.release(True) 385 386 # should raised an exception in other cases 387 request.maybe_rewind(msg="bad status line") 388 389 if tries >= self.max_tries: 390 raise 391 except Exception: 392 # unkown error 393 log.debug("unhandled exception %s" % 394 traceback.format_exc()) 395 if conn is not None: 396 conn.release(True) 397 398 raise 399 tries += 1 400 self._pool.backend_mod.sleep(self.wait_tries)
401
402 - def request(self, url, method='GET', body=None, headers=None):
403 """ perform immediatly a new request """ 404 405 request = Request(url, method=method, body=body, 406 headers=headers) 407 408 # apply request filters 409 # They are applied only once time. 410 for f in self.request_filters: 411 ret = f.on_request(request) 412 if isinstance(ret, Response): 413 # a response instance has been provided. 414 # just return it. Useful for cache filters 415 return ret 416 417 # no response has been provided, do the request 418 self._nb_redirections = self.max_follow_redirect 419 return self.perform(request)
420
421 - def redirect(self, location, request):
422 """ reset request, set new url of request and perform it """ 423 if self._nb_redirections <= 0: 424 raise RedirectLimit("Redirection limit is reached") 425 426 if request.initial_url is None: 427 request.initial_url = self.url 428 429 # make sure location follow rfc2616 430 location = rewrite_location(request.url, location) 431 432 if log.isEnabledFor(logging.DEBUG): 433 log.debug("Redirect to %s" % location) 434 435 # change request url and method if needed 436 request.url = location 437 438 self._nb_redirections -= 1 439 440 #perform a new request 441 return self.perform(request)
442
443 - def get_response(self, request, connection):
444 """ return final respons, it is only accessible via peform 445 method """ 446 if log.isEnabledFor(logging.DEBUG): 447 log.debug("Start to parse response") 448 449 p = HttpStream(SocketReader(connection.socket()), kind=1, 450 decompress=self.decompress) 451 452 if log.isEnabledFor(logging.DEBUG): 453 log.debug("Got response: %s %s" % (p.version(), p.status())) 454 log.debug("headers: [%s]" % p.headers()) 455 456 location = p.headers().get('location') 457 458 if self.follow_redirect: 459 should_close = not p.should_keep_alive() 460 if p.status_code() in (301, 302, 307,): 461 462 # read full body and release the connection 463 p.body_file().read() 464 connection.release(should_close) 465 466 if request.method in ('GET', 'HEAD',) or \ 467 self.force_follow_redirect: 468 if hasattr(self.body, 'read'): 469 try: 470 self.body.seek(0) 471 except AttributeError: 472 raise RequestError("Can't redirect %s to %s " 473 "because body has already been read" 474 % (self.url, location)) 475 return self.redirect(location, request) 476 477 elif p.status_code() == 303 and self.method == "POST": 478 # read full body and release the connection 479 p.body_file().read() 480 connection.release(should_close) 481 482 request.method = "GET" 483 request.body = None 484 return self.redirect(location, request) 485 486 # create response object 487 resp = self.response_class(connection, request, p) 488 489 # apply response filters 490 for f in self.response_filters: 491 f.on_response(resp, request) 492 493 if log.isEnabledFor(logging.DEBUG): 494 log.debug("return response class") 495 496 # return final response 497 return resp
498 499
500 -def _get_proxy_auth(proxy_settings):
501 proxy_username = os.environ.get('proxy-username') 502 if not proxy_username: 503 proxy_username = os.environ.get('proxy_username') 504 proxy_password = os.environ.get('proxy-password') 505 if not proxy_password: 506 proxy_password = os.environ.get('proxy_password') 507 508 proxy_password = proxy_password or "" 509 510 if not proxy_username: 511 u = urlparse.urlparse(proxy_settings) 512 if u.username: 513 proxy_password = u.password or proxy_password 514 proxy_settings = urlparse.urlunparse((u.scheme, 515 u.netloc.split("@")[-1], u.path, u.params, u.query, 516 u.fragment)) 517 518 if proxy_username: 519 user_auth = base64.encodestring('%s:%s' % (proxy_username, 520 proxy_password)) 521 return proxy_settings, 'Basic %s\r\n' % (user_auth.strip()) 522 else: 523 return proxy_settings, ''
524