Skip to content

Utilize response from data_track_stream_read request#686

Open
stephen-derosa wants to merge 1 commit into
mainfrom
sderosa/BOT-347-data_track_stream_read-response
Open

Utilize response from data_track_stream_read request#686
stephen-derosa wants to merge 1 commit into
mainfrom
sderosa/BOT-347-data_track_stream_read-response

Conversation

@stephen-derosa
Copy link
Copy Markdown
Contributor

@stephen-derosa stephen-derosa commented May 22, 2026

Overview

DataTrackStreamRead Request now has a response

Why

if participant b subscribes to data track Track1 and participant a unpublishes Track1 there is a race between when b creates the subscription and can therefore receive the EOS event. This creates a state where b subscribed to Track1 and never receives the EOS event.

Fix

utilizing the response from DataTrackStreamRead Request with an optional EOS field.

Rust SDK

livekit/rust-sdks#1093

NOTE: the rust SDK version this repo points to already includes the necessary FFI changes.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 potential issue.

View 2 additional findings in Devin Review.

Open in Devin Review

Comment on lines +267 to +273
try:
if not read_response.HasField("eos"):
return None
except ValueError:
return None

return getattr(read_response, "eos")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Wrong protobuf field name eos used instead of eos_event in _read_response_eos, causing synchronous EOS detection to silently fail

The DataTrackStreamReadResponse protobuf message has a field named eos_event (see data_track_pb2.pyi:700-711), but _read_response_eos checks for "eos" via HasField("eos") and getattr(read_response, "eos"). Since "eos" is not a valid field on this message, HasField("eos") raises ValueError, which is caught by the except ValueError block, causing the method to always return None. This means the synchronous EOS detection path (the entire point of this PR change) never triggers. If the FFI layer returns a synchronous EOS in the read response and does not also send an async event, the code will hang forever on await self._queue.get() at line 238.

Field name in proto vs code

Proto (data_track_pb2.pyi:700-711):

EOS_EVENT_FIELD_NUMBER: builtins.int
def eos_event(self) -> global___DataTrackStreamEOS
def HasField(self, field_name: typing.Literal["eos_event", b"eos_event"]) -> builtins.bool

Code (data_track.py:268,273):

read_response.HasField("eos")  # wrong name
getattr(read_response, "eos")  # wrong name
Suggested change
try:
if not read_response.HasField("eos"):
return None
except ValueError:
return None
return getattr(read_response, "eos")
try:
if not read_response.HasField("eos_event"):
return None
except ValueError:
return None
return getattr(read_response, "eos_event")
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Python RTC SDK’s DataTrackStream read loop to utilize the synchronous DataTrackStreamRead response (which can include an EOS marker) to avoid a race where a subscriber might miss the EOS event when a data track is unpublished shortly after subscription.

Changes:

  • Capture and inspect the data_track_stream_read response on each __anext__ call.
  • If the response indicates EOS, close the stream and terminate iteration (or raise SubscribeDataTrackError) immediately.
  • Refactor EOS handling into a shared helper (_handle_eos) and add response parsing helper (_read_response_eos).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +268 to +274
if not read_response.HasField("eos"):
return None
except ValueError:
return None

return getattr(read_response, "eos")

Comment on lines 230 to +237
async def __anext__(self) -> DataTrackFrame:
if self._closed:
raise StopAsyncIteration

self._send_read_request()
eos = self._send_read_request()
if eos is not None:
self._handle_eos(eos)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants