Skip to content

feat(plugins-soniox): surface per-run language segments end-to-end#5730

Open
MSameerAbbas wants to merge 1 commit into
livekit:mainfrom
MSameerAbbas:feat/stt-target-language-segments
Open

feat(plugins-soniox): surface per-run language segments end-to-end#5730
MSameerAbbas wants to merge 1 commit into
livekit:mainfrom
MSameerAbbas:feat/stt-target-language-segments

Conversation

@MSameerAbbas
Copy link
Copy Markdown
Contributor

Summary

Fixes #5685 (and the follow-up source-side symptom raised in the comment thread, which @chenghao-mou approved bundling into the same PR).

Both halves are the same plugin bug: _TokenAccumulator._lang_segments is built per-run by the existing coalescing logic but then dropped in send_endpoint_transcript (and the interim path). The fix surfaces it through new SpeechData fields on the target side, and stops dropping it on the source side in non-translation mode.

Changes

  • stt.SpeechData: add target_languages / target_texts (symmetric to existing source_languages / source_texts). Same LanguageCode coercion in __post_init__. Default None, so the addition is strictly additive for every other plugin.
  • Soniox plugin, translation mode: populate target_* from final._lang_segments on FINAL_TRANSCRIPT and INTERIM_TRANSCRIPT / PREFLIGHT_TRANSCRIPT. Consumers now see the per-run target breakdown for code-switched two-way translation, e.g. target_languages=["en", "es"] / target_texts=["Hello, how are you?", " Estoy bien, gracias."] for the translation of "Hello, ¿cómo estás? I'm doing fine, gracias.".
  • Soniox plugin, non-translation mode: populate source_* from the same accumulator (previously None). A code-switched ja + en utterance now surfaces source_languages=["ja", "en"] / source_texts=["こんにちは、私の名はサムです。", " My name is Sam."] -- matches what the SpeechData docstring already promised for "multi-language detection services".
  • Refactor: extract a _lang_segments_to_fields helper to DRY the conversion across both modes and both event paths; the four duplicated inline list comprehensions collapse to one named operation. The predicate that distinguishes source from target became data-presence-based (final_original._lang_segments) rather than config-based (is_translation_mode is not None), which is what unified both halves cleanly.

SpeechData.text and SpeechData.language are unchanged for back-compat (still the full concatenation and the first translated/detected language, respectively).

Test plan

  • 14 new unit tests in tests/test_plugin_soniox_stt.py covering:
    • SpeechData.__post_init__ target_languages coercion (strings → LanguageCode, None stays None, existing LanguageCode passthrough)
    • _TokenAccumulator._lang_segments per-run coalescing
    • _lang_segments_to_fields helper edge cases (empty → (None, None), non-empty → parallel lists with LanguageCode coercion)
    • Two-way translation, code-switched (the issue's canonical example)
    • One-way translation (single target run)
    • "none" untranslated chunk inside a translated utterance (asymmetric per-run list lengths)
    • Interim path: translation mode merging final + non-final per run on both sides
    • Interim path: non-translation mode populates source_* from final + non-final merged
    • Non-translation single-language: source_* populated, target_* None
    • Non-translation code-switched JA+EN: source_* carries the per-run breakdown
  • Live-verified end-to-end against the real Soniox WebSocket API in console mode:
    • Translation mode, code-switched "Hello, ¿cómo estás? I'm doing fine, gracias."text="Hello, how are you? Estoy bien, gracias.", target_languages=["en", "es"], target_texts=["Hello, how are you?", " Estoy bien, gracias."], "".join(target_texts) == text. Source side unchanged.
    • Non-translation mode, code-switched " こんにちは、私の名はサムです。 My name is Sam."text=" こんにちは、私の名はサムです。 My name is Sam.", source_languages=["ja", "en"], source_texts=[" こんにちは、私の名はサムです。", " My name is Sam."], target_* correctly None. Interim events also surface the multi-language source breakdown progressively as the user code-switches.
  • ruff format clean, ruff check clean, no new mypy --strict errors introduced in changed files.

Follow-ups (intentionally not in this PR)

  • The final / final_original accumulator names are honest about routing today but the new target_* fields make their two-mode roles more glaring (final is "primary user-facing accumulator", final_original is "source-side accumulator that's empty in non-translation mode"). Worth a separate behavior-preserving rename PR to final_primary / final_source.
  • The new target_* fields are wired in Soniox only; other translation-capable plugins (Gladia, Deepgram v2, AWS) can adopt them in follow-up PRs.

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented May 13, 2026

CLA assistant check
All committers have signed the CLA.

Copy link
Copy Markdown
Contributor

@devin-ai-integration devin-ai-integration Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 3 additional findings.

Open in Devin Review

Two symmetric symptoms of the same plugin bug -- `_TokenAccumulator._lang_segments`
was being computed but discarded in `send_endpoint_transcript` and the interim path.

- Add `target_languages` / `target_texts` to `stt.SpeechData`, mirroring the
  existing `source_languages` / `source_texts` source-side fields, with the
  same `LanguageCode` coercion in `__post_init__`.
- In translation mode, populate `target_*` from `final._lang_segments` on
  `FINAL_TRANSCRIPT` and `INTERIM_TRANSCRIPT` / `PREFLIGHT_TRANSCRIPT`, giving
  consumers the per-run target-language breakdown for code-switched two-way
  translation (e.g. `target_languages=["en", "es"]` /
  `target_texts=["Hello, how are you?", " Estoy bien, gracias."]` for the
  translation of "Hello, como estas? I'm doing fine, gracias.").
- In non-translation mode, populate `source_*` from the same accumulator
  (previously `None`, which made code-switched dictation look monolingual --
  e.g. a `ja` + `en` utterance now surfaces `source_languages=["ja", "en"]`).
- Refactor: extract a `_lang_segments_to_fields` helper to DRY the conversion
  across both modes and both event paths; replace the four duplicated inline
  list comprehensions with a single named operation.

`SpeechData.text` and `SpeechData.language` are unchanged for back-compat
(still the full concatenation and the first translated/detected language,
respectively).

Fixes livekit#5685.
@MSameerAbbas MSameerAbbas force-pushed the feat/stt-target-language-segments branch from 66654c5 to 124716d Compare May 14, 2026 04:51
@chenghao-mou
Copy link
Copy Markdown
Member

/test-stt

@github-actions
Copy link
Copy Markdown
Contributor

STT Test Results

Status: ✗ Some tests failed

Metric Count
✓ Passed 20
✗ Failed 3
× Errors 1
→ Skipped 15
▣ Total 39
⏱ Duration 208.9s
Failed Tests
  • tests.test_stt::test_stream[livekit.plugins.speechmatics]
    def finalizer() -> None:
            """Yield again, to finalize."""
      
            async def async_finalizer() -> None:
                try:
                    await gen_obj.__anext__()
                except StopAsyncIteration:
                    pass
                else:
                    msg = "Async generator fixture didn't stop."
                    msg += "Yield only once."
                    raise ValueError(msg)
      
    >       runner.run(async_finalizer(), context=context)
    
    .venv/lib/python3.12/site-packages/pytest_asyncio/plugin.py:330: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <asyncio.runners.Runner object at 0x7fb117334530>
    coro = <coroutine object _wrap_asyncgen_fixture.<locals>._asyncgen_fixture_wrapper.<locals>.finalizer.<locals>.async_finalizer at 0x7fb1253f7ac0>
    
        def run(self, coro, *, context=None):
            """Run a coroutine inside the embedded event loop."""
            if not coroutines.iscoroutine(coro):
                raise ValueError("a coroutine was expected, got {!r}".format(coro))
      
            if events._get_running_loop() is not None:
                # fail fast with short traceback
                raise RuntimeError(
                    "Runner.run() cannot be called from a running event loop")
      
            self._lazy_init()
      
            if context is None:
                context = self._context
            task = self._loop.create_task(coro, context=context)
      
            if (threading.current_thread() is threading.main_thread()
                and signal.getsignal(signal.SIGINT) is signal.default_int_handler
            ):
                sigint_handler = functools.partial(self._on_sigint, main_task=task)
                try:
                    signal.signal(signal.SIGINT, sigint_handler)
                except ValueError:
                    # `signal.signal` may throw if `threading.main_thread` does
                    # not support signals (e.g. embedded interpreter with signals
                    # not registered - see gh-91880)
                    sigint_handler = None
    
  • tests.test_stt::test_stream[livekit.plugins.fireworksai]
    stt_factory = <function parameter_factory.<locals>.<lambda> at 0x7f0a0fc95300>
    request = <FixtureRequest for <Coroutine test_stream[livekit.plugins.fireworksai]>>
    
        @pytest.mark.usefixtures("job_process")
        @pytest.mark.parametrize("stt_factory", STTs)
        async def test_stream(stt_factory: Callable[[], STT], request):
            sample_rate = SAMPLE_RATE
            plugin_id = request.node.callspec.id.split("-")[0]
            frames, transcript, _ = await make_test_speech(chunk_duration_ms=10, sample_rate=sample_rate)
      
            # TODO: differentiate missing key vs other errors
            try:
                stt_instance: STT = stt_factory()
            except ValueError as e:
                pytest.skip(f"{plugin_id}: {e}")
      
            async with stt_instance as stt:
                label = f"{stt.model}@{stt.provider}"
                if not stt.capabilities.streaming:
                    pytest.skip(f"{label} does not support streaming")
      
                for attempt in range(MAX_RETRIES):
                    try:
                        state = {"closing": False}
      
                        async def _stream_input(
                            frames: list[rtc.AudioFrame], stream: RecognizeStream, state: dict = state
                        ):
                            for frame in frames:
                                stream.push_frame(frame)
                                await asyncio.sleep(0.005)
      
                            stream.end_input()
                            state["closing"] = True
      
                        async def _stream_output(stream: RecognizeStream, state: dict = state):
                            text = ""
                            # make sure the events are sent in the right order
                            recv_start, recv_end = False, True
                            start_time = time.time()
                            got_final_transcript = False
      
                            async for event in stream:
                                if event.type == agents.stt.SpeechEventType.START_OF_SPEECH:
    
  • tests.test_stt::test_stream[livekit.plugins.nvidia]
    stt_factory = <function parameter_factory.<locals>.<lambda> at 0x7fb11771b240>
    request = <FixtureRequest for <Coroutine test_stream[livekit.plugins.nvidia]>>
    
        @pytest.mark.usefixtures("job_process")
        @pytest.mark.parametrize("stt_factory", STTs)
        async def test_stream(stt_factory: Callable[[], STT], request):
            sample_rate = SAMPLE_RATE
            plugin_id = request.node.callspec.id.split("-")[0]
            frames, transcript, _ = await make_test_speech(chunk_duration_ms=10, sample_rate=sample_rate)
      
            # TODO: differentiate missing key vs other errors
            try:
                stt_instance: STT = stt_factory()
            except ValueError as e:
                pytest.skip(f"{plugin_id}: {e}")
      
            async with stt_instance as stt:
                label = f"{stt.model}@{stt.provider}"
                if not stt.capabilities.streaming:
                    pytest.skip(f"{label} does not support streaming")
      
                for attempt in range(MAX_RETRIES):
                    try:
                        state = {"closing": False}
      
                        async def _stream_input(
                            frames: list[rtc.AudioFrame], stream: RecognizeStream, state: dict = state
                        ):
                            for frame in frames:
                                stream.push_frame(frame)
                                await asyncio.sleep(0.005)
      
                            stream.end_input()
                            state["closing"] = True
      
                        async def _stream_output(stream: RecognizeStream, state: dict = state):
                            text = ""
                            # make sure the events are sent in the right order
                            recv_start, recv_end = False, True
                            start_time = time.time()
                            got_final_transcript = False
      
                            async for event in stream:
                                if event.type == agents.stt.SpeechEventType.START_OF_SPEECH:
    
  • tests.test_stt::test_stream[livekit.agents.inference]
    stt_factory = <function parameter_factory.<locals>.<lambda> at 0x7fb11771b560>
    request = <FixtureRequest for <Coroutine test_stream[livekit.agents.inference]>>
    
        @pytest.mark.usefixtures("job_process")
        @pytest.mark.parametrize("stt_factory", STTs)
        async def test_stream(stt_factory: Callable[[], STT], request):
            sample_rate = SAMPLE_RATE
            plugin_id = request.node.callspec.id.split("-")[0]
            frames, transcript, _ = await make_test_speech(chunk_duration_ms=10, sample_rate=sample_rate)
      
            # TODO: differentiate missing key vs other errors
            try:
                stt_instance: STT = stt_factory()
            except ValueError as e:
                pytest.skip(f"{plugin_id}: {e}")
      
            async with stt_instance as stt:
                label = f"{stt.model}@{stt.provider}"
                if not stt.capabilities.streaming:
                    pytest.skip(f"{label} does not support streaming")
      
                for attempt in range(MAX_RETRIES):
                    try:
                        state = {"closing": False}
      
                        async def _stream_input(
                            frames: list[rtc.AudioFrame], stream: RecognizeStream, state: dict = state
                        ):
                            for frame in frames:
                                stream.push_frame(frame)
                                await asyncio.sleep(0.005)
      
                            stream.end_input()
                            state["closing"] = True
      
                        async def _stream_output(stream: RecognizeStream, state: dict = state):
                            text = ""
                            # make sure the events are sent in the right order
                            recv_start, recv_end = False, True
                            start_time = time.time()
                            got_final_transcript = False
      
                            async for event in stream:
                                if event.type == agents.stt.SpeechEventType.START_OF_SPEECH:
    
Skipped Tests
Test Reason
tests.test_stt::test_recognize[livekit.plugins.assemblyai] universal-streaming-english@AssemblyAI does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.speechmatics] enhanced@Speechmatics does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.nvidia] unknown@unknown does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.cartesia] ink-whisper@Cartesia does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.soniox] stt-rt-v4@Soniox does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.aws] unknown@Amazon Transcribe does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.fireworksai] unknown@FireworksAI does not support batch recognition
tests.test_stt::test_recognize[livekit.agents.inference] unknown@livekit does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.azure] unknown@Azure STT does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.deepgram.STTv2] flux-general-en@Deepgram does not support batch recognition
tests.test_stt::test_recognize[livekit.plugins.gradium.STT] unknown@Gradium does not support batch recognition
tests.test_stt::test_stream[livekit.plugins.elevenlabs] scribe_v1@ElevenLabs does not support streaming
tests.test_stt::test_stream[livekit.plugins.mistralai] voxtral-mini-latest@MistralAI does not support streaming
tests.test_stt::test_stream[livekit.plugins.fal] Wizper@Fal does not support streaming
tests.test_stt::test_stream[livekit.plugins.openai] gpt-4o-mini-transcribe@api.openai.com does not support streaming

Triggered by workflow run #2125

Copy link
Copy Markdown
Member

@chenghao-mou chenghao-mou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! One small nit and a merge conflict before we can merge this.

# the interim path below.
src_segs, tgt_segs = (
(final_original._lang_segments, final._lang_segments)
if final_original._lang_segments
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should use is_translation_mode here (and at 539-543) to keep the path correct if the provider ever emits translation tokens without originals in a flush window.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: per-run target-language segments on stt.SpeechData

3 participants