# -*- coding: utf-8 -*- """ hyper/http20/stream ~~~~~~~~~~~~~~~~~~~ Objects that make up the stream-level abstraction of hyper's HTTP/2 support. These objects are not expected to be part of the public HTTP/2 API: they're intended purely for use inside hyper's HTTP/2 abstraction. Conceptually, a single HTTP/2 connection is made up of many streams: each stream is an independent, bi-directional sequence of HTTP headers and data. Each stream is identified by a monotonically increasing integer, assigned to the stream by the endpoint that initiated the stream. """ import vinetrimmer.vendor.h2.exceptions from ..common.headers import HTTPHeaderMap from .util import h2_safe_headers import logging log = logging.getLogger(__name__) # Define the largest chunk of data we'll send in one go. Realistically, we # should take the MSS into account but that's pretty dull, so let's just say # 1kB and call it a day. MAX_CHUNK = 1024 class Stream(object): """ A single HTTP/2 stream. A stream is an independent, bi-directional sequence of HTTP headers and data. Each stream is identified by a single integer. From a HTTP perspective, a stream _approximately_ matches a single request-response pair. """ def __init__(self, stream_id, window_manager, connection, send_outstanding_data, recv_cb, close_cb): self.stream_id = stream_id self.headers = HTTPHeaderMap() # Set to a key-value set of the response headers once their # HEADERS..CONTINUATION frame sequence finishes. self.response_headers = None # Set to a key-value set of the response trailers once their # HEADERS..CONTINUATION frame sequence finishes. self.response_trailers = None # A dict mapping the promised stream ID of a pushed resource to a # key-value set of its request headers. Entries are added once their # PUSH_PROMISE..CONTINUATION frame sequence finishes. self.promised_headers = {} # Unconsumed response data chunks. Empties after every call to _read(). self.data = [] # Whether the remote side has completed the stream. self.remote_closed = False # Whether we have closed the stream. self.local_closed = False # There are two flow control windows: one for data we're sending, # one for data being sent to us. self._in_window_manager = window_manager # Save off a reference to the state machine wrapped with lock. self._conn = connection # Save off a data callback. self._send_outstanding_data = send_outstanding_data self._recv_cb = recv_cb self._close_cb = close_cb def add_header(self, name, value, replace=False): """ Adds a single HTTP header to the headers to be sent on the request. """ if not replace: self.headers[name] = value else: self.headers.replace(name, value) def send_headers(self, end_stream=False): """ Sends the complete saved header block on the stream. """ headers = self.get_headers() with self._conn as conn: conn.send_headers(self.stream_id, headers, end_stream) self._send_outstanding_data() if end_stream: self.local_closed = True def send_data(self, data, final): """ Send some data on the stream. If this is the end of the data to be sent, the ``final`` flag _must_ be set to True. If no data is to be sent, set ``data`` to ``None``. """ # Define a utility iterator for file objects. def file_iterator(fobj): while True: data = fobj.read(MAX_CHUNK) yield data if len(data) < MAX_CHUNK: break # Build the appropriate iterator for the data, in chunks of CHUNK_SIZE. if hasattr(data, 'read'): chunks = file_iterator(data) else: chunks = (data[i:i+MAX_CHUNK] for i in range(0, len(data), MAX_CHUNK)) for chunk in chunks: self._send_chunk(chunk, final) def _read(self, amt=None): """ Read data from the stream. Unlike a normal read behaviour, this function returns _at least_ ``amt`` data, but may return more. """ def listlen(list): return sum(map(len, list)) # Keep reading until the stream is closed or we get enough data. while (not self.remote_closed and (amt is None or listlen(self.data) < amt)): self._recv_cb(stream_id=self.stream_id) result = b''.join(self.data) self.data = [] return result def _read_one_frame(self): """ Reads a single data frame from the stream and returns it. """ # Keep reading until the stream is closed or we have a data frame. while not self.remote_closed and not self.data: self._recv_cb(stream_id=self.stream_id) try: return self.data.pop(0) except IndexError: return None def receive_response(self, event): """ Receive response headers. """ # TODO: If this is called while we're still sending data, we may want # to stop sending that data and check the response. Early responses to # big uploads are almost always a problem. self.response_headers = HTTPHeaderMap(event.headers) def receive_trailers(self, event): """ Receive response trailers. """ self.response_trailers = HTTPHeaderMap(event.headers) def receive_push(self, event): """ Receive the request headers for a pushed stream. """ self.promised_headers[event.pushed_stream_id] = event.headers def receive_data(self, event): """ Receive a chunk of data. """ size = event.flow_controlled_length increment = self._in_window_manager._handle_frame(size) # Append the data to the buffer. self.data.append(event.data) if increment: try: with self._conn as conn: conn.increment_flow_control_window( increment, stream_id=self.stream_id ) except vinetrimmer.vendor.h2.exceptions.StreamClosedError: # We haven't got to it yet, but the stream is already # closed. We don't need to increment the window in this # case! pass else: self._send_outstanding_data() def receive_end_stream(self, event): """ All of the data is returned now. """ self.remote_closed = True def receive_reset(self, event): """ Stream forcefully reset. """ self.remote_closed = True self._close_cb(self.stream_id) def get_headers(self): """ Provides the headers to the connection object. """ # Strip any headers invalid in H2. return h2_safe_headers(self.headers) def getheaders(self): """ Once all data has been sent on this connection, returns a key-value set of the headers of the response to the original request. """ # Keep reading until all headers are received. while self.response_headers is None: self._recv_cb(stream_id=self.stream_id) # Find the Content-Length header if present. self._in_window_manager.document_size = ( int(self.response_headers.get(b'content-length', [0])[0]) ) return self.response_headers def gettrailers(self): """ Once all data has been sent on this connection, returns a key-value set of the trailers of the response to the original request. .. warning:: Note that this method requires that the stream is totally exhausted. This means that, if you have not completely read from the stream, all stream data will be read into memory. :returns: The key-value set of the trailers, or ``None`` if no trailers were sent. """ # Keep reading until the stream is done. while not self.remote_closed: self._recv_cb(stream_id=self.stream_id) return self.response_trailers def get_pushes(self, capture_all=False): """ Returns a generator that yields push promises from the server. Note that this method is not idempotent; promises returned in one call will not be returned in subsequent calls. Iterating through generators returned by multiple calls to this method simultaneously results in undefined behavior. :param capture_all: If ``False``, the generator will yield all buffered push promises without blocking. If ``True``, the generator will first yield all buffered push promises, then yield additional ones as they arrive, and terminate when the original stream closes. """ while True: for pair in self.promised_headers.items(): yield pair self.promised_headers = {} if not capture_all or self.remote_closed: break self._recv_cb(stream_id=self.stream_id) def close(self, error_code=None): """ Closes the stream. If the stream is currently open, attempts to close it as gracefully as possible. :param error_code: (optional) The error code to reset the stream with. :returns: Nothing. """ # FIXME: I think this is overbroad, but for now it's probably ok. if not (self.remote_closed and self.local_closed): try: with self._conn as conn: conn.reset_stream(self.stream_id, error_code or 0) except h2.exceptions.ProtocolError: # If for any reason we can't reset the stream, just # tolerate it. pass else: self._send_outstanding_data(tolerate_peer_gone=True) self.remote_closed = True self.local_closed = True self._close_cb(self.stream_id) @property def _out_flow_control_window(self): """ The size of our outbound flow control window. """ with self._conn as conn: return conn.local_flow_control_window(self.stream_id) def _send_chunk(self, data, final): """ Implements most of the sending logic. Takes a single chunk of size at most MAX_CHUNK, wraps it in a frame and sends it. Optionally sets the END_STREAM flag if this is the last chunk (determined by being of size less than MAX_CHUNK) and no more data is to be sent. """ # If we don't fit in the connection window, try popping frames off the # connection in hope that one might be a window update frame. while len(data) > self._out_flow_control_window: self._recv_cb() # If the length of the data is less than MAX_CHUNK, we're probably # at the end of the file. If this is the end of the data, mark it # as END_STREAM. end_stream = False if len(data) < MAX_CHUNK and final: end_stream = True # Send the frame and decrement the flow control window. with self._conn as conn: conn.send_data( stream_id=self.stream_id, data=data, end_stream=end_stream ) self._send_outstanding_data() if end_stream: self.local_closed = True