Preserve next-message bytes when draining oversized protocol message

ProtocolReader discarded the entire chunk containing the newline
delimiter while draining an oversized message, including bytes after
the newline that belong to the next pipelined message. This corrupted
framing for the rest of the connection (affects server and client).
Salvaged bytes are now kept in a _leftover buffer that read_message()
consumes before touching the stream.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
filip
2026-06-12 16:08:12 +02:00
parent f42ecf5c5b
commit f0666ea6ac

View File

@@ -86,9 +86,21 @@ class ProtocolReader:
def __init__(self, reader: asyncio.StreamReader): def __init__(self, reader: asyncio.StreamReader):
self._reader = reader self._reader = reader
self._leftover = b"" # bytes after the delimiter salvaged from a drain
async def read_message(self) -> dict | None: async def read_message(self) -> dict | None:
"""Read and parse one message. Returns None on EOF.""" """Read and parse one message. Returns None on EOF."""
prefix = b""
if self._leftover:
nl = self._leftover.find(b"\n")
if nl != -1:
line = self._leftover[:nl + 1]
self._leftover = self._leftover[nl + 1:]
if len(line) > MAX_MESSAGE_BYTES:
raise ValueError("Message exceeds maximum size")
return parse_message(line.strip())
prefix = self._leftover
self._leftover = b""
try: try:
line = await self._reader.readuntil(b"\n") line = await self._reader.readuntil(b"\n")
except (asyncio.IncompleteReadError, ConnectionError): except (asyncio.IncompleteReadError, ConnectionError):
@@ -102,11 +114,19 @@ class ProtocolReader:
chunk = await self._reader.read(max(remaining, 4096)) chunk = await self._reader.read(max(remaining, 4096))
if not chunk: if not chunk:
return None # EOF while draining return None # EOF while draining
if b"\n" in chunk: nl = chunk.find(b"\n")
break # found delimiter, oversized message fully drained if nl != -1:
# Bytes after the delimiter belong to the NEXT message —
# discarding them would corrupt framing for the rest of
# the connection.
self._leftover = chunk[nl + 1:]
break
raise ValueError("Message exceeds maximum size") raise ValueError("Message exceeds maximum size")
if not line: if not line:
return None return None
line = prefix + line
if len(line) > MAX_MESSAGE_BYTES:
raise ValueError("Message exceeds maximum size")
return parse_message(line.strip()) return parse_message(line.strip())