Utilize response from data_track_stream_read request#686
Conversation
| try: | ||
| if not read_response.HasField("eos"): | ||
| return None | ||
| except ValueError: | ||
| return None | ||
|
|
||
| return getattr(read_response, "eos") |
There was a problem hiding this comment.
🔴 Wrong protobuf field name eos used instead of eos_event in _read_response_eos, causing synchronous EOS detection to silently fail
The DataTrackStreamReadResponse protobuf message has a field named eos_event (see data_track_pb2.pyi:700-711), but _read_response_eos checks for "eos" via HasField("eos") and getattr(read_response, "eos"). Since "eos" is not a valid field on this message, HasField("eos") raises ValueError, which is caught by the except ValueError block, causing the method to always return None. This means the synchronous EOS detection path (the entire point of this PR change) never triggers. If the FFI layer returns a synchronous EOS in the read response and does not also send an async event, the code will hang forever on await self._queue.get() at line 238.
Field name in proto vs code
Proto (data_track_pb2.pyi:700-711):
EOS_EVENT_FIELD_NUMBER: builtins.int
def eos_event(self) -> global___DataTrackStreamEOS
def HasField(self, field_name: typing.Literal["eos_event", b"eos_event"]) -> builtins.bool
Code (data_track.py:268,273):
read_response.HasField("eos") # wrong name
getattr(read_response, "eos") # wrong name
| try: | |
| if not read_response.HasField("eos"): | |
| return None | |
| except ValueError: | |
| return None | |
| return getattr(read_response, "eos") | |
| try: | |
| if not read_response.HasField("eos_event"): | |
| return None | |
| except ValueError: | |
| return None | |
| return getattr(read_response, "eos_event") |
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Pull request overview
This PR updates the Python RTC SDK’s DataTrackStream read loop to utilize the synchronous DataTrackStreamRead response (which can include an EOS marker) to avoid a race where a subscriber might miss the EOS event when a data track is unpublished shortly after subscription.
Changes:
- Capture and inspect the
data_track_stream_readresponse on each__anext__call. - If the response indicates EOS, close the stream and terminate iteration (or raise
SubscribeDataTrackError) immediately. - Refactor EOS handling into a shared helper (
_handle_eos) and add response parsing helper (_read_response_eos).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not read_response.HasField("eos"): | ||
| return None | ||
| except ValueError: | ||
| return None | ||
|
|
||
| return getattr(read_response, "eos") | ||
|
|
| async def __anext__(self) -> DataTrackFrame: | ||
| if self._closed: | ||
| raise StopAsyncIteration | ||
|
|
||
| self._send_read_request() | ||
| eos = self._send_read_request() | ||
| if eos is not None: | ||
| self._handle_eos(eos) | ||
|
|
Overview
DataTrackStreamRead Request now has a response
Why
if participant
bsubscribes to data trackTrack1and participantaunpublishesTrack1there is a race between whenbcreates the subscription and can therefore receive the EOS event. This creates a state wherebsubscribed toTrack1and never receives the EOS event.Fix
utilizing the response from DataTrackStreamRead Request with an optional EOS field.
Rust SDK
livekit/rust-sdks#1093
NOTE: the rust SDK version this repo points to already includes the necessary FFI changes.