Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature multiparticipant from release #1442

Open
wants to merge 8 commits into
base: 0.x
Choose a base branch
from
Open
25 changes: 5 additions & 20 deletions .github/workflows/build-package.yml
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
name: Build package
name: Build Package

on:
workflow_call:
inputs:
package:
required: true
type: string
artifact_name:
required: true
type: string
workflow_dispatch:
inputs:
package:
description: "Name of the package to build"
required: true
default: "livekit-plugins-browser"
artifact_name:
description: "Artifact name for the distribution package"
required: true
default: "build-artifact"
push:
branches:
- main

jobs:
build_plugins:
build:
runs-on: ubuntu-latest
if: inputs.package != 'livekit-plugins-browser'
defaults:
58 changes: 58 additions & 0 deletions examples/multimodal_agent_no_auto_link.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@

from __future__ import annotations

import logging
from typing import Annotated

import aiohttp
import os
from dotenv import load_dotenv
from livekit.agents import (
AutoSubscribe,
JobContext,
WorkerOptions,
WorkerType,
cli,
llm,
multimodal,
)
from livekit.plugins import openai

# Load environment variables
current_dir = os.path.dirname(os.path.abspath(__file__))
env_path = os.path.join(current_dir, ".env.local")
load_dotenv(dotenv_path=env_path)

logger = logging.getLogger("my-worker")
logger.setLevel(logging.INFO)


async def entrypoint(ctx: JobContext):
logger.info("starting entrypoint")

await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

agent = multimodal.MultimodalAgent(
model=openai.realtime.RealtimeModel(
voice="alloy",
temperature=0.8,
instructions="You are a helpful assistant, managing multiple participants in an audio room",
turn_detection=openai.realtime.ServerVadOptions(
threshold=0.6, prefix_padding_ms=200, silence_duration_ms=500
),
),
auto_link_on_connect=False
)

def on_participant_connected(participant: rtc.Participant):
identity = participant.identity
agent._link_participant(identity)
logger.info(f"Participant connected: {identity}")

ctx.room.on("participant_connected", on_participant_connected)

agent.start(ctx.room)


if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, worker_type=WorkerType.ROOM))
53 changes: 39 additions & 14 deletions livekit-agents/livekit/agents/multimodal/multimodal_agent.py
Original file line number Diff line number Diff line change
@@ -237,15 +237,20 @@ def start(
self._room, self._participant = room, participant

if participant is not None:
if isinstance(participant, rtc.RemoteParticipant):
self._link_participant(participant.identity)
# Check if we should auto-link when a participant is provided
if self._auto_link_on_connect:
if isinstance(participant, rtc.RemoteParticipant):
self._link_participant(participant.identity)
else:
self._link_participant(participant)
else:
self._link_participant(participant)
logger.info(f"Auto-linking disabled. Not linking participant {participant}")
else:
# no participant provided, try to find the first participant in the room
for participant in self._room.remote_participants.values():
self._link_participant(participant.identity)
break
# No participant provided, only auto-link if the flag is enabled
if self._auto_link_on_connect:
for participant in self._room.remote_participants.values():
self._link_participant(participant.identity)
break

self._session = self._model.session(
chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx
@@ -474,20 +479,40 @@ def _on_playout_stopped(interrupted: bool) -> None:
self._session._push_audio(f)

def _on_participant_connected(self, participant: rtc.RemoteParticipant):
if self._linked_participant is None:
"""Handler for when a participant connects."""
if not self._auto_link_on_connect:
logger.info(f"Auto-linking disabled. Participant {participant.identity} connected but not linking.")
return

self._link_participant(participant.identity)
# Existing behavior: auto-link if the flag is enabled
if self._linked_participant is None:
self._link_participant(participant.identity)


def _link_participant(self, participant_identity: str) -> None:
self._linked_participant = self._room.remote_participants.get(
participant_identity
)
"""Link a participant to the voice assistant.

Args:
participant_identity: The identity of the participant to link.
"""
# Unlink the current participant if one is already linked
if self._linked_participant is not None and self._linked_participant.identity != participant_identity:
logger.info(f"Unlinking current participant: {self._linked_participant.identity}")
if self._read_micro_atask:
self._read_micro_atask.cancel() # Stop reading from the previous participant's track
self._subscribed_track = None
self._linked_participant = None

# Link the new participant
self._linked_participant = self._room.remote_participants.get(participant_identity)

if self._linked_participant is None:
logger.error("_link_participant must be called with a valid identity")
logger.error(f"_link_participant must be called with a valid identity: {participant_identity}")
return

self._subscribe_to_microphone()
logger.info(f"Linking new participant: {participant_identity}")
self._subscribe_to_microphone() # Subscribe to the new participant's microphone


async def _micro_task(self, track: rtc.LocalAudioTrack) -> None:
sample_rate = self._model.capabilities.input_audio_sample_rate
37 changes: 31 additions & 6 deletions livekit-agents/livekit/agents/pipeline/pipeline_agent.py
Original file line number Diff line number Diff line change
@@ -199,6 +199,7 @@ def __init__(
before_tts_cb: BeforeTTSCallback = _default_before_tts_cb,
plotting: bool = False,
loop: asyncio.AbstractEventLoop | None = None,
auto_link_on_connect: bool = True,
# backward compatibility
will_synthesize_assistant_reply: WillSynthesizeAssistantReply | None = None,
) -> None:
@@ -236,6 +237,7 @@ def __init__(
"""
super().__init__()
self._loop = loop or asyncio.get_event_loop()
self._auto_link_on_connect = auto_link_on_connect

if will_synthesize_assistant_reply is not None:
logger.warning(
@@ -401,11 +403,11 @@ def _on_vad_metrics(vad_metrics: vad.VADMetrics) -> None:
self._link_participant(participant.identity)
else:
self._link_participant(participant)
else:
# no participant provided, try to find the first participant in the room
elif self._auto_link_on_connect:
# Automatically link to the first participant if auto-linking is enabled
for participant in self._room.remote_participants.values():
self._link_participant(participant.identity)
break
break # Only connect to the first participant initially

self._main_atask = asyncio.create_task(self._main_task())

@@ -542,12 +544,35 @@ async def aclose(self) -> None:
await self._deferred_validation.aclose()

def _on_participant_connected(self, participant: rtc.RemoteParticipant):
if self._human_input is not None:
return
logger.debug("_on_participant_connected called")
# Auto-link the participant if no one is assigned or if auto-linking is enabled
if self._auto_link_on_connect and not self._human_input:
self._link_participant(participant.identity)

def link_participant(self, identity: str) -> None:
"""Link a participant manually by identity, replacing any existing HumanInput if necessary."""
logger.info(f"Attempting to manually link participant: {identity}")

# Ensure the participant exists before attempting to link
participant = self._room.remote_participants.get(identity)

if participant:
# Unassign any existing HumanInput
if self._human_input:
old_identity = self._human_input
logger.info(f"Unassigning previous participant: {old_identity}")
self._human_input = None # Clear the existing HumanInput assignment

# Link the new participant
self._link_participant(identity)
logger.info(f"Participant {identity} successfully linked.")
else:
logger.error(f"Participant with identity {identity} not found.")


self._link_participant(participant.identity)

def _link_participant(self, identity: str) -> None:
logger.debug("_link_participant has been called")
participant = self._room.remote_participants.get(identity)
if participant is None:
logger.error("_link_participant must be called with a valid identity")