OpenAdapterOpenAdapter
Endpoints

Speech to Speech (LiveKit)

Wire OpenAdapter's STT (Parakeet), LLM, and TTS into a LiveKit voice agent for real-time, low-latency voice interaction.

Base URL & authentication

Base URL: https://api.openadapter.in

All requests need an Authorization: Bearer sk-cv-... header. Generate or copy your API key from the Dashboard → API Keys page.

This guide explains how to integrate OpenAdapter services (STT, LLM, and TTS) into a LiveKit voice agent. OpenAdapter is a drop-in replacement for standard provider plugins.

Key benefits

  • Fast STT — BBPC Parakeet for sub-second transcription
  • Unified gateway — multiple LLMs (Kimi, GLM, etc.) via a single OpenAI-compatible endpoint
  • Low latency — optimized for real-time voice

1. Prerequisites

  • A LiveKit project (Cloud or self-hosted)
  • An OpenAdapter API Key (sk-cv-...)
  • A BBPC Parakeet API Key (for STT)
  • Python 3.10+

2. Environment setup

Create or update your .env:

# LiveKit
LIVEKIT_URL=wss://your-livekit-project.livekit.cloud
LIVEKIT_API_KEY=your_livekit_key
LIVEKIT_API_SECRET=your_livekit_secret

# OpenAdapter
OPENADAPTER_API_KEY=sk-cv-your-key
OPENADAPTER_BASE_URL=https://api.openadapter.in/v1

# Parakeet STT
PARAKEET_API_KEY=your_parakeet_key
PARAKEET_WS_URL=wss://edge.openadapter.in/parakeet/v1/audio/stream

# Optional: model selection
OPENADAPTER_LLM_MODEL=Kimi-K2.5
OPENADAPTER_TTS_MODEL=tts-1
OPENADAPTER_TTS_VOICE=alloy

3. Implementation

Step 1 — install dependencies

pip install livekit-agents livekit-plugins-openai python-dotenv websockets numpy

Step 2 — drop in the Parakeet STT plugin

OpenAdapter ships a custom STT plugin for LiveKit, ParakeetSTT. Save the code from the appendix as parakeet_stt.py in your project directory.

Step 3 — configure the agent

In your agent.py:

import os
from livekit.agents import Agent, AutoSubscribe, JobContext, WorkerOptions, cli
from livekit.agents.voice import AgentSession
from livekit.plugins import openai as lk_openai
from parakeet_stt import ParakeetSTT

async def entrypoint(ctx: JobContext) -> None:
    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    # 1. Parakeet STT
    stt = ParakeetSTT(
        ws_url=os.environ["PARAKEET_WS_URL"],
        api_key=os.environ["PARAKEET_API_KEY"],
    )

    # 2. LLM via OpenAdapter
    llm = lk_openai.LLM(
        model=os.getenv("OPENADAPTER_LLM_MODEL", "Kimi-K2.5"),
        api_key=os.environ["OPENADAPTER_API_KEY"],
        base_url=os.environ["OPENADAPTER_BASE_URL"],
    )

    # 3. TTS via OpenAdapter
    tts = lk_openai.TTS(
        model=os.getenv("OPENADAPTER_TTS_MODEL", "tts-1"),
        voice=os.getenv("OPENADAPTER_TTS_VOICE", "alloy"),
        api_key=os.environ["OPENADAPTER_API_KEY"],
        base_url=os.environ["OPENADAPTER_BASE_URL"],
    )

    # 4. Compose
    agent = Agent(instructions="You are a helpful voice assistant.")
    session = AgentSession(stt=stt, llm=llm, tts=tts)

    # 5. Start
    await session.start(agent, room=ctx.room)

if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

4. Running the agent

Dev mode (hot-reload):

python agent.py dev

Production:

python agent.py start

Troubleshooting

  • Connection errors — verify LIVEKIT_URL and OPENADAPTER_BASE_URL are reachable.
  • API key invalid — confirm OPENADAPTER_API_KEY is active and your plan has quota left.
  • Audio issuesnumpy must be installed; the Parakeet plugin uses it for resampling.

Appendix: Parakeet STT plugin code

Save as parakeet_stt.py next to your agent.

"""
Parakeet STT plugin for LiveKit Agents.

Connects to the BBPC Parakeet WebSocket streaming endpoint:
  wss://edge.openadapter.in/parakeet/v1/audio/stream?token=<KEY>

Protocol:
  - Send raw PCM16 LE mono 16 kHz binary chunks
  - Send {"action": "finalize"} when done
  - Receive {"status": "listening", ...} on connect
  - Receive {"text": "...", "is_final": true} for each utterance
  - Receive {"status": "finalized"} after flush
"""

from __future__ import annotations

import asyncio
import json
import struct
from dataclasses import dataclass

import numpy as np
import websockets

from livekit import rtc
from livekit.agents import stt
from livekit.agents.types import (
    DEFAULT_API_CONNECT_OPTIONS,
    NOT_GIVEN,
    APIConnectOptions,
    NotGivenOr,
)

TARGET_SAMPLE_RATE = 16000
TARGET_CHANNELS = 1

@dataclass
class ParakeetSTTOptions:
    ws_url: str  # wss://edge.openadapter.in/parakeet/v1/audio/stream
    api_key: str
    language: str = "en"

class ParakeetSTT(stt.STT):
    def __init__(self, *, ws_url: str, api_key: str) -> None:
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=False,  # Parakeet only emits final utterances
            )
        )
        self._opts = ParakeetSTTOptions(ws_url=ws_url, api_key=api_key)

    @property
    def model(self) -> str:
        return "nvidia/parakeet-tdt-0.6b-v2"

    @property
    def provider(self) -> str:
        return "BBPC-parakeet"

    async def _recognize_impl(
        self,
        buffer: stt.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        """Batch recognition via REST fallback."""
        import aiohttp

        pcm_data = b""
        for frame in buffer:
            audio = np.frombuffer(frame.data, dtype=np.int16)
            if frame.num_channels > 1:
                audio = audio.reshape(-1, frame.num_channels).mean(axis=1).astype(np.int16)
            if frame.sample_rate != TARGET_SAMPLE_RATE:
                factor = TARGET_SAMPLE_RATE / frame.sample_rate
                new_len = int(len(audio) * factor)
                audio = np.interp(
                    np.linspace(0, len(audio) - 1, new_len),
                    np.arange(len(audio)),
                    audio,
                ).astype(np.int16)
            pcm_data += audio.tobytes()

        wav_data = _pcm_to_wav(pcm_data, TARGET_SAMPLE_RATE)

        rest_url = self._opts.ws_url.replace("wss://", "https://").replace(
            "/audio/stream", "/audio/transcriptions"
        )
        async with aiohttp.ClientSession() as session:
            form = aiohttp.FormData()
            form.add_field("file", wav_data, filename="audio.wav", content_type="audio/wav")
            form.add_field("response_format", "json")
            async with session.post(
                rest_url,
                data=form,
                headers={"Authorization": f"Bearer {self._opts.api_key}"},
                timeout=aiohttp.ClientTimeout(total=30),
            ) as resp:
                result = await resp.json()

        text = result.get("text", "").strip()
        return stt.SpeechEvent(
            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
            alternatives=[stt.SpeechData(language="en", text=text, confidence=1.0)],
        )

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> "ParakeetStream":
        return ParakeetStream(self, conn_options=conn_options)

class ParakeetStream(stt.RecognizeStream):
    def __init__(self, stt_instance: ParakeetSTT, *, conn_options: APIConnectOptions) -> None:
        super().__init__(stt=stt_instance, conn_options=conn_options, sample_rate=TARGET_SAMPLE_RATE)
        self._stt_opts = stt_instance._opts

    async def _run(self) -> None:
        ws_url = f"{self._stt_opts.ws_url}?token={self._stt_opts.api_key}"

        async with websockets.connect(ws_url, ping_interval=20, ping_timeout=10) as ws:
            hello = json.loads(await ws.recv())
            if hello.get("status") != "listening":
                raise RuntimeError(f"Unexpected connect message: {hello}")

            send_task = asyncio.create_task(self._send_audio(ws))
            recv_task = asyncio.create_task(self._recv_transcripts(ws))

            try:
                await asyncio.gather(send_task, recv_task)
            except Exception:
                send_task.cancel()
                recv_task.cancel()
                raise

    async def _send_audio(self, ws) -> None:
        """Read frames, convert to PCM16 mono 16 kHz, send binary."""
        async for data in self._input_ch:
            if isinstance(data, self._FlushSentinel):
                await ws.send(json.dumps({"action": "finalize"}))
                return

            frame: rtc.AudioFrame = data
            pcm = _frame_to_pcm16_mono_16k(frame)
            if pcm:
                await ws.send(pcm)

        # Input channel closed without flush — send finalize anyway
        await ws.send(json.dumps({"action": "finalize"}))

    async def _recv_transcripts(self, ws) -> None:
        """Receive transcript messages and emit SpeechEvents."""
        async for raw in ws:
            msg = json.loads(raw)

            if msg.get("is_final") and msg.get("text"):
                text = msg["text"].strip()
                if text:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[
                                stt.SpeechData(language="en", text=text, confidence=1.0)
                            ],
                        )
                    )

            elif msg.get("status") == "finalized":
                return

def _frame_to_pcm16_mono_16k(frame: rtc.AudioFrame) -> bytes:
    """Convert a LiveKit AudioFrame to PCM16 LE mono 16 kHz bytes."""
    audio = np.frombuffer(bytes(frame.data), dtype=np.int16)

    if frame.num_channels > 1:
        audio = audio.reshape(-1, frame.num_channels).mean(axis=1).astype(np.int16)

    if frame.sample_rate != TARGET_SAMPLE_RATE:
        factor = TARGET_SAMPLE_RATE / frame.sample_rate
        new_len = int(len(audio) * factor)
        if new_len == 0:
            return b""
        audio = np.interp(
            np.linspace(0, len(audio) - 1, new_len),
            np.arange(len(audio)),
            audio,
        ).astype(np.int16)

    return audio.tobytes()

def _pcm_to_wav(pcm_data: bytes, sample_rate: int) -> bytes:
    """Wrap raw PCM16 mono bytes in a WAV container."""
    num_channels = 1
    bits_per_sample = 16
    byte_rate = sample_rate * num_channels * bits_per_sample // 8
    block_align = num_channels * bits_per_sample // 8
    data_size = len(pcm_data)

    header = struct.pack(
        "<4sI4s4sIHHIIHH4sI",
        b"RIFF",
        36 + data_size,
        b"WAVE",
        b"fmt ",
        16,            # chunk size
        1,             # PCM format
        num_channels,
        sample_rate,
        byte_rate,
        block_align,
        bits_per_sample,
        b"data",
        data_size,
    )
    return header + pcm_data

On this page