"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.""" import re import json from typing import ( Callable, Generic, TypeVar, Optional, Generator, AsyncGenerator, Tuple, ) import httpx T = TypeVar("T") class EventStream(Generic[T]): response: httpx.Response generator: Generator[T, None, None] def __init__( self, response: httpx.Response, decoder: Callable[[str], T], sentinel: Optional[str] = None, ): self.response = response self.generator = stream_events(response, decoder, sentinel) def __iter__(self): return self def __next__(self): return next(self.generator) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.response.close() class EventStreamAsync(Generic[T]): response: httpx.Response generator: AsyncGenerator[T, None] def __init__( self, response: httpx.Response, decoder: Callable[[str], T], sentinel: Optional[str] = None, ): self.response = response self.generator = stream_events_async(response, decoder, sentinel) def __aiter__(self): return self async def __anext__(self): return await self.generator.__anext__() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.response.aclose() class ServerEvent: id: Optional[str] = None event: Optional[str] = None data: Optional[str] = None retry: Optional[int] = None MESSAGE_BOUNDARIES = [ b"\r\n\r\n", b"\n\n", b"\r\r", ] async def stream_events_async( response: httpx.Response, decoder: Callable[[str], T], sentinel: Optional[str] = None, ) -> AsyncGenerator[T, None]: buffer = bytearray() position = 0 discard = False async for chunk in response.aiter_bytes(): # We've encountered the sentinel value and should no longer process # incoming data. Instead we throw new data away until the server closes # the connection. if discard: continue buffer += chunk for i in range(position, len(buffer)): char = buffer[i : i + 1] seq: Optional[bytes] = None if char in [b"\r", b"\n"]: for boundary in MESSAGE_BOUNDARIES: seq = _peek_sequence(i, buffer, boundary) if seq is not None: break if seq is None: continue block = buffer[position:i] position = i + len(seq) event, discard = _parse_event(block, decoder, sentinel) if event is not None: yield event if position > 0: buffer = buffer[position:] position = 0 event, discard = _parse_event(buffer, decoder, sentinel) if event is not None: yield event def stream_events( response: httpx.Response, decoder: Callable[[str], T], sentinel: Optional[str] = None, ) -> Generator[T, None, None]: buffer = bytearray() position = 0 discard = False for chunk in response.iter_bytes(): # We've encountered the sentinel value and should no longer process # incoming data. Instead we throw new data away until the server closes # the connection. if discard: continue buffer += chunk for i in range(position, len(buffer)): char = buffer[i : i + 1] seq: Optional[bytes] = None if char in [b"\r", b"\n"]: for boundary in MESSAGE_BOUNDARIES: seq = _peek_sequence(i, buffer, boundary) if seq is not None: break if seq is None: continue block = buffer[position:i] position = i + len(seq) event, discard = _parse_event(block, decoder, sentinel) if event is not None: yield event if position > 0: buffer = buffer[position:] position = 0 event, discard = _parse_event(buffer, decoder, sentinel) if event is not None: yield event def _parse_event( raw: bytearray, decoder: Callable[[str], T], sentinel: Optional[str] = None ) -> Tuple[Optional[T], bool]: block = raw.decode() lines = re.split(r"\r?\n|\r", block) publish = False event = ServerEvent() data = "" for line in lines: if not line: continue delim = line.find(":") if delim <= 0: continue field = line[0:delim] value = line[delim + 1 :] if delim < len(line) - 1 else "" if len(value) and value[0] == " ": value = value[1:] if field == "event": event.event = value publish = True elif field == "data": data += value + "\n" publish = True elif field == "id": event.id = value publish = True elif field == "retry": event.retry = int(value) if value.isdigit() else None publish = True if sentinel and data == f"{sentinel}\n": return None, True if data: data = data[:-1] event.data = data data_is_primitive = ( data.isnumeric() or data == "true" or data == "false" or data == "null" ) data_is_json = ( data.startswith("{") or data.startswith("[") or data.startswith('"') ) if data_is_primitive or data_is_json: try: event.data = json.loads(data) except Exception: pass out = None if publish: out = decoder(json.dumps(event.__dict__)) return out, False def _peek_sequence(position: int, buffer: bytearray, sequence: bytes): if len(sequence) > (len(buffer) - position): return None for i, seq in enumerate(sequence): if buffer[position + i] != seq: return None return sequence