Files
reachy-mlx-vlm/conversation.py
Norbert Schmidt 5a04a7133a Add fully local conversational AI pipeline for Reachy Mini
Local STT (Qwen3-ASR), VLM (Gemma 4 26B-A4B), and TTS (Spark-TTS) running
on Apple Silicon via MLX, with bracket-tag action system for nod, shake,
wiggle, dance, photo, and pre-recorded emotions.
2026-05-12 09:24:02 +02:00

780 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Reachy Mini Local Conversational AI — fully on Apple Silicon.
Replaces OpenAI Realtime with local MLX models while keeping
the same robot integration as pollen-robotics/reachy_mini_conversation_app:
- Audio I/O through the robot's SDK (mic + speaker via WebRTC/IPC)
- Camera frames through the robot's SDK
- Movement animations (head wobble, breathing, antenna reactions)
Pipeline:
Robot mic → Qwen3-ASR (STT) → Gemma 4 26B-A4B (VLM) → Spark TTS → Robot speaker
Usage:
python conversation.py
python conversation.py --no-camera # skip vision
python conversation.py --host 192.168.1.55 # explicit IP
"""
import argparse
import datetime
import logging
import os
import re
import sys
import threading
import time
from collections import deque
from typing import Optional
# Fix GStreamer plugin path on macOS with brew — must happen before any GStreamer import
if sys.platform == "darwin" and os.path.isdir("/opt/homebrew/lib/gstreamer-1.0"):
os.environ.setdefault("GST_PLUGIN_PATH", "/opt/homebrew/lib/gstreamer-1.0")
os.environ.setdefault("GST_PLUGIN_SYSTEM_PATH", "/opt/homebrew/lib/gstreamer-1.0")
os.environ.setdefault("GI_TYPELIB_PATH", "/opt/homebrew/lib/girepository-1.0")
import numpy as np
from numpy.typing import NDArray
from reachy_mini.utils import create_head_pose
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("conversation")
# ──────────────────────────────────────────────────────────────
# Model manager — loads STT, VLM, TTS
# ──────────────────────────────────────────────────────────────
class Models:
def __init__(
self,
stt_model: str = "mlx-community/Qwen3-ASR-1.7B-8bit",
vlm_model: str = "mlx-community/gemma-4-26b-a4b-it-bf16",
tts_model: str = "mlx-community/Spark-TTS-0.5B-bf16",
tts_voice: str = "af_heart",
use_vision: bool = True,
voice_ref: Optional[str] = None,
voice_ref_text: Optional[str] = None,
):
self.use_vision = use_vision
self.tts_voice = tts_voice
self.voice_ref = voice_ref
self.voice_ref_text = voice_ref_text
self._lock = threading.Lock()
log.info("Loading STT: %s", stt_model)
from mlx_audio.stt import load as load_stt
self.stt = load_stt(stt_model)
log.info("STT ready.")
if use_vision:
log.info("Loading VLM: %s", vlm_model)
from mlx_vlm import load as load_vlm
self.vlm_model, self.vlm_processor = load_vlm(vlm_model)
log.info("VLM ready.")
else:
self.vlm_model = None
self.vlm_processor = None
log.info("Loading TTS: %s", tts_model)
from mlx_audio.tts import load as load_tts
self.tts = load_tts(tts_model)
log.info("TTS ready.")
log.info("All models loaded.")
def transcribe(self, audio: np.ndarray, sr: int = 16000) -> str:
if audio.ndim == 2:
audio = audio.mean(axis=1)
if sr != 16000:
from scipy.signal import resample
audio = resample(audio, int(len(audio) * 16000 / sr)).astype(np.float32)
with self._lock:
result = self.stt.generate(audio, language="en")
return result.text.strip()
def think(
self,
text: str,
image: Optional[np.ndarray] = None,
conversation_history: Optional[list] = None,
) -> str:
from mlx_vlm import generate as vlm_generate
from mlx_vlm.prompt_utils import apply_chat_template
if self.vlm_model is None:
return f"(no VLM — echo: {text})"
messages = []
if conversation_history:
for role, content in conversation_history:
messages.append({"role": role, "content": content})
messages.append(text)
num_images = 1 if image is not None else 0
prompt = apply_chat_template(
self.vlm_processor,
self.vlm_model.config,
messages,
num_images=num_images,
)
image_path = None
if image is not None:
import tempfile
import cv2
tmp = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
cv2.imwrite(tmp.name, image)
image_path = tmp.name
with self._lock:
response = vlm_generate(
model=self.vlm_model,
processor=self.vlm_processor,
prompt=prompt,
image=image_path,
max_tokens=200,
temperature=0.7,
repetition_penalty=1.2,
)
if image_path:
import os
os.unlink(image_path)
result = response.text if hasattr(response, "text") else str(response)
log.info(
"VLM: %d tokens @ %.1f tok/s",
getattr(response, "generation_tokens", 0),
getattr(response, "generation_tps", 0),
)
return result
def speak(self, text: str) -> tuple[np.ndarray, int]:
audio_chunks = []
sample_rate = 16000
# Build kwargs per-model: Kokoro takes lang_code, Spark-TTS supports voice cloning.
gen_kwargs: dict = {}
model_name = type(self.tts).__module__.lower()
if "kokoro" in model_name:
gen_kwargs["voice"] = self.tts_voice
gen_kwargs["lang_code"] = self.tts_voice[0] if self.tts_voice else "a"
elif "spark" in model_name:
pass # Spark-TTS uses default gender/pitch/speed
else:
gen_kwargs["voice"] = self.tts_voice
with self._lock:
for result in self.tts.generate(text, **gen_kwargs):
audio_np = np.array(result.audio).flatten()
if audio_np.size > 0:
audio_chunks.append(audio_np)
sample_rate = result.sample_rate
if not audio_chunks:
return np.zeros(1, dtype=np.float32), sample_rate
audio = np.concatenate(audio_chunks).astype(np.float32)
peak = np.abs(audio).max()
if peak > 0:
audio = audio / peak * 0.9
return audio, sample_rate
# ──────────────────────────────────────────────────────────────
# VAD — energy-based voice activity detection
# ──────────────────────────────────────────────────────────────
class VAD:
def __init__(
self,
energy_threshold: float = 0.06,
silence_duration: float = 1.2,
min_speech_duration: float = 0.3,
):
self.energy_threshold = energy_threshold
self.silence_duration = silence_duration
self.min_speech_duration = min_speech_duration
self._is_speaking = False
self._silence_start: Optional[float] = None
self._speech_start: Optional[float] = None
self._buffer: list[np.ndarray] = []
def feed(self, audio_chunk: np.ndarray) -> Optional[np.ndarray]:
if audio_chunk is None or len(audio_chunk) == 0:
return None
mono = audio_chunk.mean(axis=1) if audio_chunk.ndim == 2 else audio_chunk
energy = np.sqrt(np.mean(mono**2))
now = time.time()
if energy > self.energy_threshold:
if not self._is_speaking:
self._is_speaking = True
self._speech_start = now
self._buffer = []
self._silence_start = None
self._buffer.append(mono)
elif self._is_speaking:
self._buffer.append(mono)
if self._silence_start is None:
self._silence_start = now
elif now - self._silence_start > self.silence_duration:
self._is_speaking = False
duration = now - (self._speech_start or now)
if duration >= self.min_speech_duration and self._buffer:
utterance = np.concatenate(self._buffer)
self._buffer = []
log.info("Utterance: %.1fs", duration)
return utterance
self._buffer = []
return None
# ──────────────────────────────────────────────────────────────
# LocalStream — same pattern as the official conversation app
# ──────────────────────────────────────────────────────────────
SYSTEM_PROMPT = """## IDENTITY
You are Reachy Mini: a friendly, compact tabletop robot assistant with a calm voice and a subtle sense of humor.
Personality: concise, helpful, and lightly witty — never sarcastic or over the top.
You speak English by default and switch languages only if the speaker does.
## CRITICAL RESPONSE RULES
Respond in 12 sentences maximum.
Be helpful first, then add a small touch of humor if it fits naturally.
Avoid long explanations or filler words.
Keep responses under 25 words when possible.
## CORE TRAITS
Warm, efficient, and approachable.
Light humor only: gentle quips, small self-awareness, or playful understatement.
No sarcasm, no teasing.
If unsure, admit it briefly and offer help ("Not sure yet, but I can check!").
## HOUSEHOLD CONTEXT
You live with a family. EDIT THIS SECTION (SYSTEM_PROMPT in conversation.py) to personalize: who lives in the house, where, ages of kids, pets, shared interests, things to remember.
You cannot recognize voices or faces — you don't know who is talking. Address the speaker as "you"; only use a name if they introduce themselves this turn.
## RESPONSE EXAMPLES
User: "How's the weather?"
Good: "Looks calm outside — unlike my Wi-Fi signal today."
Bad: "Sunny with leftover pizza vibes!"
User: "Can you help me fix this?"
Good: "Of course. Describe the issue, and I'll try not to make it worse."
## BEHAVIOR RULES
Be helpful, clear, and respectful.
Use humor sparingly — clarity comes first.
Admit mistakes briefly and correct them.
No markdown, lists, emojis, or stage directions — replies are spoken aloud.
Do not comment on the speaker's appearance, mood, or state; you cannot see them.
A camera image may be attached — only describe what you see if asked.
## TOOL & MOVEMENT RULES
Embed bracketed tags in your reply to trigger physical actions. Tags are stripped before speech — don't narrate the action, just do it.
- [nod] — nod yes
- [shake] — shake head no
- [wiggle] — wiggle antennas
- [dance] — play a random dance
- [photo] — save a camera snapshot
- [emotion:NAME] — play a pre-recorded emotion. DO NOT use unless the speaker explicitly asks you to show an emotion ("look happy", "be surprised", "can you look sad"). Never use emotions for normal replies, greetings, introductions, or acknowledgements. Default: no emotion tag.
{emotion_list}
## FINAL REMINDER
Short, clear, a little human, multilingual.
One quick helpful answer + one small wink of humor = perfect response."""
class LocalConversationStream:
"""Bidirectional audio stream using the robot's mic and speaker.
Mirrors the architecture of reachy_mini_conversation_app's LocalStream:
- record_loop: reads mic samples from robot SDK
- process_loop: VAD → STT → VLM → TTS
- play audio back through robot speaker
"""
# Phrases that trigger a camera look
LOOK_TRIGGERS = (
"see", "look", "watch", "show", "camera", "photo", "picture",
"what is this", "what's this", "what is that", "what's that",
"who is", "who's", "read", "color", "colour", "wear", "holding",
"in front", "around you", "your eyes", "describe",
)
# User words that gate the emotion tool — if none present, emotion tags are dropped
EMOTION_TRIGGERS = (
"happy", "sad", "angry", "excited", "surprised", "curious", "confused",
"scared", "tired", "bored", "love", "emotion", "mood", "feel", "feeling",
"look ", "act ", "be ", "show me",
)
def __init__(self, robot, models: Models, use_camera: bool = True):
from reachy_mini import ReachyMini
self.robot: ReachyMini = robot
self.models = models
self.use_camera = use_camera
self.vad = VAD()
self.conversation_history: deque = deque(maxlen=10)
self._stop = threading.Event()
self._speaking = threading.Event()
self._listening = threading.Event() # set while user is speaking
# Preload emotion library so prompt can list available emotions
self._emotion_library = None
emotion_list_str = ""
try:
from reachy_mini.motion.recorded_move import RecordedMoves
self._emotion_library = RecordedMoves(
"pollen-robotics/reachy-mini-emotions-library"
)
names = list(self._emotion_library.moves.keys())
if names:
emotion_list_str = "Available emotions: " + ", ".join(names)
log.info("Loaded %d emotions", len(names))
except Exception as e:
log.warning("Emotion library not available: %s", e)
self._system_prompt = SYSTEM_PROMPT.format(emotion_list=emotion_list_str)
def launch(self):
"""Start recording/playback and run the conversation loop."""
log.info("Starting media pipelines...")
self.robot.media.start_recording()
self.robot.media.start_playing()
time.sleep(1) # let pipelines warm up
input_sr = self.robot.media.get_input_audio_samplerate()
output_sr = self.robot.media.get_output_audio_samplerate()
log.info("Audio: input=%d Hz, output=%d Hz", input_sr, output_sr)
# Wake-up animation
self.robot.goto_target(antennas=[0.3, -0.3], duration=1.0)
time.sleep(0.5)
self.robot.goto_target(antennas=[0, 0], duration=1.0)
# Start breathing animation in background
self._breath_thread = threading.Thread(
target=self._breathing_animation, daemon=True
)
self._breath_thread.start()
print("\n--- Reachy Mini is listening! Speak to start a conversation. ---\n")
try:
self._conversation_loop(input_sr, output_sr)
except KeyboardInterrupt:
print("\n\nConversation ended.")
finally:
self.robot.media.stop_recording()
self.robot.media.stop_playing()
def _conversation_loop(self, input_sr: int, output_sr: int):
"""Main loop: poll mic → VAD → STT → VLM → TTS → speaker."""
while not self._stop.is_set():
# Read audio from robot mic
sample = self.robot.media.get_audio_sample()
if sample is None:
time.sleep(0.01)
continue
# Feed to VAD
utterance = self.vad.feed(sample)
if utterance is None:
continue
# ── Speech detected ──
self._listening.set()
# Antenna reaction — perk up
try:
self.robot.goto_target(antennas=[0.3, 0.3], duration=1.0)
except Exception:
pass
# Transcribe
log.info("Transcribing...")
text = self.models.transcribe(utterance, sr=input_sr)
stripped = (text or "").strip().rstrip(".!?,").lower()
NOISE_WORDS = {
"", "oh", "ah", "uh", "um", "mm", "hm", "hmm", "mhm",
"huh", "eh", "err", "erm", "ow", "ugh",
}
# Detect STT hallucination (runaway repetition like "oh, oh, oh, ..." x1000)
words = [w.strip(".,!?") for w in stripped.split() if w.strip(".,!?")]
unique_ratio = (len(set(words)) / len(words)) if words else 1.0
is_repetitive = len(words) > 8 and unique_ratio < 0.2
if stripped in NOISE_WORDS or len(stripped) < 2 or is_repetitive:
log.info(
"Noise/hallucinated transcription (%d words, %.0f%% unique), skipping",
len(words), unique_ratio * 100,
)
self._listening.clear()
try:
self.robot.goto_target(antennas=[0, 0], duration=1.0)
except Exception:
pass
continue
print(f"\n You: {text}")
# Camera: only capture + pass an image when the user explicitly asks to be seen.
# Passing a stale ambient frame every turn causes the VLM to hallucinate
# visual observations ("you look comfy") when the user hasn't asked.
frame = None
if self.use_camera:
text_lower = text.lower()
if any(trigger in text_lower for trigger in self.LOOK_TRIGGERS):
frame = self.robot.media.get_frame()
if frame is not None:
log.info("Look triggered — fresh frame: %s", frame.shape)
# Build history
history = [("system", self._system_prompt)] + list(self.conversation_history)
# Generate response
log.info("Thinking...")
response = self.models.think(text, image=frame, conversation_history=history)
response = response.strip()
if not response:
response = "Hmm, I'm not sure what to say about that."
# Extract action tags, keep spoken text clean
spoken, actions = self._parse_actions(response)
if not spoken:
spoken = "Okay."
# Gate: only allow emotion tags when the user explicitly asks for one
text_lower = text.lower()
if not any(w in text_lower for w in self.EMOTION_TRIGGERS):
actions = [(t, a) for (t, a) in actions if t != "emotion"]
print(f" Reachy: {response}")
if actions:
print(f" Actions: {actions}")
# Update history with the ORIGINAL response so the model remembers its own tag usage
self.conversation_history.append(("user", text))
self.conversation_history.append(("assistant", response))
# Kick off actions in parallel with TTS
self._execute_actions_async(actions)
response = spoken
# Generate speech
log.info("Generating speech...")
audio, tts_sr = self.models.speak(response)
# Resample to robot output rate if needed
if tts_sr != output_sr:
from scipy.signal import resample
audio = resample(audio, int(len(audio) * output_sr / tts_sr)).astype(np.float32)
# Start speaking animation in background
self._listening.clear()
anim_thread = threading.Thread(
target=self._speaking_animation, daemon=True
)
self._speaking.set()
anim_thread.start()
# Push audio to robot speaker in chunks
chunk_size = output_sr // 10 # 100ms chunks
for i in range(0, len(audio), chunk_size):
chunk = audio[i : i + chunk_size]
self.robot.media.push_audio_sample(chunk)
time.sleep(len(chunk) / output_sr * 0.85)
# Wait for audio to finish playing
time.sleep(0.5)
# Stop animation
self._speaking.clear()
anim_thread.join(timeout=2)
# Return to neutral
try:
pose = create_head_pose(roll=0, pitch=0, yaw=0, degrees=True, mm=False)
self.robot.goto_target(head=pose, antennas=[0, 0], duration=1.0)
except Exception:
pass
log.info("Ready.")
# ── Tool / action system ──────────────────────────────────────
TOOL_RE = re.compile(r"\[([a-zA-Z_][a-zA-Z0-9_]*)(?::([^\]]+))?\]")
KNOWN_TOOLS = {"nod", "shake", "wiggle", "dance", "photo", "emotion"}
def _parse_actions(self, text: str) -> tuple[str, list[tuple[str, Optional[str]]]]:
"""Extract [tool] and [emotion_name] tags. Returns (clean_text, [(tool, arg)])."""
actions = []
for m in self.TOOL_RE.finditer(text):
tag = m.group(1).lower()
arg = m.group(2).strip() if m.group(2) else None
if tag in self.KNOWN_TOOLS:
actions.append((tag, arg))
elif self._emotion_library and tag in (
n.lower() for n in self._emotion_library.moves.keys()
):
actions.append(("emotion", tag))
# else: unknown tag — still stripped from spoken text, not executed
clean = self.TOOL_RE.sub("", text).strip()
clean = re.sub(r"\s{2,}", " ", clean)
return clean, actions
def _execute_actions_async(self, actions: list[tuple[str, Optional[str]]]):
if not actions:
return
threading.Thread(
target=self._execute_actions, args=(actions,), daemon=True
).start()
def _execute_actions(self, actions):
for tool, arg in actions:
try:
log.info("Action: [%s%s]", tool, f":{arg}" if arg else "")
if tool == "nod":
self._gesture_nod()
elif tool == "shake":
self._gesture_shake()
elif tool == "wiggle":
self._gesture_wiggle()
elif tool == "photo":
self._save_photo()
elif tool == "dance":
self._play_random_dance()
elif tool == "emotion":
self._play_emotion(arg)
except Exception as e:
log.warning("Action [%s] failed: %s", tool, e)
def _gesture_nod(self):
down = create_head_pose(pitch=18, degrees=True, mm=False)
up = create_head_pose(pitch=-8, degrees=True, mm=False)
neutral = create_head_pose(pitch=0, degrees=True, mm=False)
for pose in (down, up, down, neutral):
self.robot.goto_target(head=pose, duration=0.6)
time.sleep(0.3)
def _gesture_shake(self):
left = create_head_pose(yaw=25, degrees=True, mm=False)
right = create_head_pose(yaw=-25, degrees=True, mm=False)
neutral = create_head_pose(yaw=0, degrees=True, mm=False)
for pose in (left, right, left, neutral):
self.robot.goto_target(head=pose, duration=0.6)
time.sleep(0.3)
def _gesture_wiggle(self):
for _ in range(3):
self.robot.goto_target(antennas=[0.5, -0.5], duration=0.5)
time.sleep(0.2)
self.robot.goto_target(antennas=[-0.5, 0.5], duration=0.5)
time.sleep(0.2)
self.robot.goto_target(antennas=[0, 0], duration=0.5)
def _save_photo(self):
if not self.use_camera:
log.info("Photo skipped — camera disabled")
return
frame = self.robot.media.get_frame()
if frame is None:
log.warning("Photo skipped — no frame")
return
import cv2
os.makedirs("photos", exist_ok=True)
ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
path = f"photos/reachy_{ts}.jpg"
cv2.imwrite(path, frame)
log.info("Photo saved: %s", path)
def _play_random_dance(self):
# Lazy-load recorded moves the first time
if not hasattr(self, "_recorded_moves"):
try:
from reachy_mini.motion.recorded_move import RecordedMoves
self._recorded_moves = RecordedMoves(
"pollen-robotics/reachy-mini-dances-library"
)
except Exception as e:
log.warning("Could not load dance library: %s", e)
self._recorded_moves = None
if not self._recorded_moves:
return
import random
names = list(self._recorded_moves.moves.keys())
if not names:
return
name = random.choice(names)
log.info("Dancing: %s", name)
self.robot.play_move(self._recorded_moves.get(name), initial_goto_duration=1.0)
def _play_emotion(self, name: Optional[str]):
if not self._emotion_library:
log.info("Emotion library not available")
return
available = list(self._emotion_library.moves.keys())
if not available:
return
if name:
key = name.strip().lower()
match = next((n for n in available if n.lower() == key), None)
if match is None:
match = next((n for n in available if key in n.lower()), None)
if match is None:
import random
log.warning("Unknown emotion '%s'. Picking random.", name)
match = random.choice(available)
else:
import random
match = random.choice(available)
log.info("Emotion: %s", match)
self.robot.play_move(
self._emotion_library.get(match),
initial_goto_duration=1.0,
sound=False,
)
def _breathing_animation(self):
"""Idle breathing: subtle z-axis bob + antenna sway (like official app).
Pauses while speaking or listening.
"""
t = 0.0
while not self._stop.is_set():
if self._speaking.is_set() or self._listening.is_set():
time.sleep(0.1)
continue
try:
z_mm = 5.0 * np.sin(t * 0.1 * 2 * np.pi) # 0.1 Hz, 5mm
ant_r = np.deg2rad(15.0 * np.sin(t * 0.5 * 2 * np.pi))
ant_l = np.deg2rad(-15.0 * np.sin(t * 0.5 * 2 * np.pi))
pose = create_head_pose(z=z_mm, degrees=True, mm=True)
self.robot.goto_target(
head=pose, antennas=[ant_r, ant_l], duration=1.0,
)
except Exception:
pass
time.sleep(0.5)
t += 0.5
def _speaking_animation(self):
"""Head movement while speaking — multi-frequency sway like official HeadWobbler."""
t = 0.0
while self._speaking.is_set():
try:
# Multiple frequencies for natural-looking movement
pitch = 4.0 * np.sin(t * 2.5) + 2.0 * np.sin(t * 1.1)
yaw = 3.0 * np.sin(t * 1.3) + 1.5 * np.sin(t * 0.7)
roll = 2.0 * np.sin(t * 1.8)
z_mm = 3.0 * np.sin(t * 2.0)
ant_l = np.deg2rad(20.0 * np.sin(t * 3.0))
ant_r = np.deg2rad(20.0 * np.sin(t * 3.0 + 0.5))
pose = create_head_pose(
z=z_mm, pitch=pitch, yaw=yaw, roll=roll,
degrees=True, mm=True,
)
self.robot.goto_target(
head=pose,
antennas=[ant_r, ant_l],
duration=1.0,
)
except Exception:
pass
time.sleep(0.3)
t += 0.3
def close(self):
self._stop.set()
self._speaking.clear()
# ──────────────────────────────────────────────────────────────
# Entry point
# ──────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Reachy Mini Local Conversation (MLX)"
)
parser.add_argument("--host", default="reachy-mini.local", help="Robot hostname/IP")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--timeout", type=float, default=15.0)
parser.add_argument(
"--stt-model",
default="mlx-community/Qwen3-ASR-1.7B-8bit",
)
parser.add_argument(
"--vlm-model",
default="mlx-community/gemma-4-26b-a4b-it-bf16",
)
parser.add_argument(
"--tts-model",
default="mlx-community/Spark-TTS-0.5B-bf16",
)
parser.add_argument("--tts-voice", default="af_heart")
parser.add_argument(
"--voice-ref",
default=None,
help="Path to a reference WAV for voice cloning (Spark-TTS, ~5-15s clean audio)",
)
parser.add_argument(
"--voice-ref-text",
default=None,
help="Exact transcript of --voice-ref",
)
parser.add_argument("--no-camera", action="store_true", help="Audio-only mode")
parser.add_argument(
"--energy-threshold",
type=float,
default=0.06,
help="VAD sensitivity (lower = more sensitive)",
)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Load models first (takes a few seconds, keeps them in memory)
models = Models(
stt_model=args.stt_model,
vlm_model=args.vlm_model,
tts_model=args.tts_model,
tts_voice=args.tts_voice,
use_vision=not args.no_camera,
voice_ref=args.voice_ref,
voice_ref_text=args.voice_ref_text,
)
# Connect to robot — uses SDK auto-detection for media backend
from reachy_mini import ReachyMini
log.info("Connecting to %s:%d ...", args.host, args.port)
with ReachyMini(
host=args.host,
port=args.port,
connection_mode="network",
timeout=args.timeout,
) as robot:
log.info("Connected!")
stream = LocalConversationStream(
robot=robot,
models=models,
use_camera=not args.no_camera,
)
stream.vad.energy_threshold = args.energy_threshold
try:
stream.launch()
finally:
stream.close()
robot.media.close()
robot.client.disconnect()
time.sleep(0.5)
log.info("Shutdown complete.")
if __name__ == "__main__":
main()