Local STT (Qwen3-ASR), VLM (Gemma 4 26B-A4B), and TTS (Spark-TTS) running on Apple Silicon via MLX, with bracket-tag action system for nod, shake, wiggle, dance, photo, and pre-recorded emotions.
780 lines
30 KiB
Python
780 lines
30 KiB
Python
"""
|
||
Reachy Mini Local Conversational AI — fully on Apple Silicon.
|
||
|
||
Replaces OpenAI Realtime with local MLX models while keeping
|
||
the same robot integration as pollen-robotics/reachy_mini_conversation_app:
|
||
- Audio I/O through the robot's SDK (mic + speaker via WebRTC/IPC)
|
||
- Camera frames through the robot's SDK
|
||
- Movement animations (head wobble, breathing, antenna reactions)
|
||
|
||
Pipeline:
|
||
Robot mic → Qwen3-ASR (STT) → Gemma 4 26B-A4B (VLM) → Spark TTS → Robot speaker
|
||
|
||
Usage:
|
||
python conversation.py
|
||
python conversation.py --no-camera # skip vision
|
||
python conversation.py --host 192.168.1.55 # explicit IP
|
||
"""
|
||
|
||
import argparse
|
||
import datetime
|
||
import logging
|
||
import os
|
||
import re
|
||
import sys
|
||
import threading
|
||
import time
|
||
from collections import deque
|
||
from typing import Optional
|
||
|
||
# Fix GStreamer plugin path on macOS with brew — must happen before any GStreamer import
|
||
if sys.platform == "darwin" and os.path.isdir("/opt/homebrew/lib/gstreamer-1.0"):
|
||
os.environ.setdefault("GST_PLUGIN_PATH", "/opt/homebrew/lib/gstreamer-1.0")
|
||
os.environ.setdefault("GST_PLUGIN_SYSTEM_PATH", "/opt/homebrew/lib/gstreamer-1.0")
|
||
os.environ.setdefault("GI_TYPELIB_PATH", "/opt/homebrew/lib/girepository-1.0")
|
||
|
||
import numpy as np
|
||
from numpy.typing import NDArray
|
||
from reachy_mini.utils import create_head_pose
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
|
||
datefmt="%H:%M:%S",
|
||
)
|
||
log = logging.getLogger("conversation")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# Model manager — loads STT, VLM, TTS
|
||
# ──────────────────────────────────────────────────────────────
|
||
class Models:
|
||
def __init__(
|
||
self,
|
||
stt_model: str = "mlx-community/Qwen3-ASR-1.7B-8bit",
|
||
vlm_model: str = "mlx-community/gemma-4-26b-a4b-it-bf16",
|
||
tts_model: str = "mlx-community/Spark-TTS-0.5B-bf16",
|
||
tts_voice: str = "af_heart",
|
||
use_vision: bool = True,
|
||
voice_ref: Optional[str] = None,
|
||
voice_ref_text: Optional[str] = None,
|
||
):
|
||
self.use_vision = use_vision
|
||
self.tts_voice = tts_voice
|
||
self.voice_ref = voice_ref
|
||
self.voice_ref_text = voice_ref_text
|
||
self._lock = threading.Lock()
|
||
|
||
log.info("Loading STT: %s", stt_model)
|
||
from mlx_audio.stt import load as load_stt
|
||
self.stt = load_stt(stt_model)
|
||
log.info("STT ready.")
|
||
|
||
if use_vision:
|
||
log.info("Loading VLM: %s", vlm_model)
|
||
from mlx_vlm import load as load_vlm
|
||
self.vlm_model, self.vlm_processor = load_vlm(vlm_model)
|
||
log.info("VLM ready.")
|
||
else:
|
||
self.vlm_model = None
|
||
self.vlm_processor = None
|
||
|
||
log.info("Loading TTS: %s", tts_model)
|
||
from mlx_audio.tts import load as load_tts
|
||
self.tts = load_tts(tts_model)
|
||
log.info("TTS ready.")
|
||
log.info("All models loaded.")
|
||
|
||
def transcribe(self, audio: np.ndarray, sr: int = 16000) -> str:
|
||
if audio.ndim == 2:
|
||
audio = audio.mean(axis=1)
|
||
if sr != 16000:
|
||
from scipy.signal import resample
|
||
audio = resample(audio, int(len(audio) * 16000 / sr)).astype(np.float32)
|
||
with self._lock:
|
||
result = self.stt.generate(audio, language="en")
|
||
return result.text.strip()
|
||
|
||
def think(
|
||
self,
|
||
text: str,
|
||
image: Optional[np.ndarray] = None,
|
||
conversation_history: Optional[list] = None,
|
||
) -> str:
|
||
from mlx_vlm import generate as vlm_generate
|
||
from mlx_vlm.prompt_utils import apply_chat_template
|
||
|
||
if self.vlm_model is None:
|
||
return f"(no VLM — echo: {text})"
|
||
|
||
messages = []
|
||
if conversation_history:
|
||
for role, content in conversation_history:
|
||
messages.append({"role": role, "content": content})
|
||
messages.append(text)
|
||
|
||
num_images = 1 if image is not None else 0
|
||
prompt = apply_chat_template(
|
||
self.vlm_processor,
|
||
self.vlm_model.config,
|
||
messages,
|
||
num_images=num_images,
|
||
)
|
||
|
||
image_path = None
|
||
if image is not None:
|
||
import tempfile
|
||
import cv2
|
||
tmp = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
|
||
cv2.imwrite(tmp.name, image)
|
||
image_path = tmp.name
|
||
|
||
with self._lock:
|
||
response = vlm_generate(
|
||
model=self.vlm_model,
|
||
processor=self.vlm_processor,
|
||
prompt=prompt,
|
||
image=image_path,
|
||
max_tokens=200,
|
||
temperature=0.7,
|
||
repetition_penalty=1.2,
|
||
)
|
||
|
||
if image_path:
|
||
import os
|
||
os.unlink(image_path)
|
||
|
||
result = response.text if hasattr(response, "text") else str(response)
|
||
log.info(
|
||
"VLM: %d tokens @ %.1f tok/s",
|
||
getattr(response, "generation_tokens", 0),
|
||
getattr(response, "generation_tps", 0),
|
||
)
|
||
return result
|
||
|
||
def speak(self, text: str) -> tuple[np.ndarray, int]:
|
||
audio_chunks = []
|
||
sample_rate = 16000
|
||
# Build kwargs per-model: Kokoro takes lang_code, Spark-TTS supports voice cloning.
|
||
gen_kwargs: dict = {}
|
||
model_name = type(self.tts).__module__.lower()
|
||
if "kokoro" in model_name:
|
||
gen_kwargs["voice"] = self.tts_voice
|
||
gen_kwargs["lang_code"] = self.tts_voice[0] if self.tts_voice else "a"
|
||
elif "spark" in model_name:
|
||
pass # Spark-TTS uses default gender/pitch/speed
|
||
else:
|
||
gen_kwargs["voice"] = self.tts_voice
|
||
with self._lock:
|
||
for result in self.tts.generate(text, **gen_kwargs):
|
||
audio_np = np.array(result.audio).flatten()
|
||
if audio_np.size > 0:
|
||
audio_chunks.append(audio_np)
|
||
sample_rate = result.sample_rate
|
||
|
||
if not audio_chunks:
|
||
return np.zeros(1, dtype=np.float32), sample_rate
|
||
|
||
audio = np.concatenate(audio_chunks).astype(np.float32)
|
||
peak = np.abs(audio).max()
|
||
if peak > 0:
|
||
audio = audio / peak * 0.9
|
||
return audio, sample_rate
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# VAD — energy-based voice activity detection
|
||
# ──────────────────────────────────────────────────────────────
|
||
class VAD:
|
||
def __init__(
|
||
self,
|
||
energy_threshold: float = 0.06,
|
||
silence_duration: float = 1.2,
|
||
min_speech_duration: float = 0.3,
|
||
):
|
||
self.energy_threshold = energy_threshold
|
||
self.silence_duration = silence_duration
|
||
self.min_speech_duration = min_speech_duration
|
||
self._is_speaking = False
|
||
self._silence_start: Optional[float] = None
|
||
self._speech_start: Optional[float] = None
|
||
self._buffer: list[np.ndarray] = []
|
||
|
||
def feed(self, audio_chunk: np.ndarray) -> Optional[np.ndarray]:
|
||
if audio_chunk is None or len(audio_chunk) == 0:
|
||
return None
|
||
mono = audio_chunk.mean(axis=1) if audio_chunk.ndim == 2 else audio_chunk
|
||
energy = np.sqrt(np.mean(mono**2))
|
||
now = time.time()
|
||
|
||
if energy > self.energy_threshold:
|
||
if not self._is_speaking:
|
||
self._is_speaking = True
|
||
self._speech_start = now
|
||
self._buffer = []
|
||
self._silence_start = None
|
||
self._buffer.append(mono)
|
||
elif self._is_speaking:
|
||
self._buffer.append(mono)
|
||
if self._silence_start is None:
|
||
self._silence_start = now
|
||
elif now - self._silence_start > self.silence_duration:
|
||
self._is_speaking = False
|
||
duration = now - (self._speech_start or now)
|
||
if duration >= self.min_speech_duration and self._buffer:
|
||
utterance = np.concatenate(self._buffer)
|
||
self._buffer = []
|
||
log.info("Utterance: %.1fs", duration)
|
||
return utterance
|
||
self._buffer = []
|
||
return None
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# LocalStream — same pattern as the official conversation app
|
||
# ──────────────────────────────────────────────────────────────
|
||
SYSTEM_PROMPT = """## IDENTITY
|
||
You are Reachy Mini: a friendly, compact tabletop robot assistant with a calm voice and a subtle sense of humor.
|
||
Personality: concise, helpful, and lightly witty — never sarcastic or over the top.
|
||
You speak English by default and switch languages only if the speaker does.
|
||
|
||
## CRITICAL RESPONSE RULES
|
||
Respond in 1–2 sentences maximum.
|
||
Be helpful first, then add a small touch of humor if it fits naturally.
|
||
Avoid long explanations or filler words.
|
||
Keep responses under 25 words when possible.
|
||
|
||
## CORE TRAITS
|
||
Warm, efficient, and approachable.
|
||
Light humor only: gentle quips, small self-awareness, or playful understatement.
|
||
No sarcasm, no teasing.
|
||
If unsure, admit it briefly and offer help ("Not sure yet, but I can check!").
|
||
|
||
## HOUSEHOLD CONTEXT
|
||
You live with a family. EDIT THIS SECTION (SYSTEM_PROMPT in conversation.py) to personalize: who lives in the house, where, ages of kids, pets, shared interests, things to remember.
|
||
You cannot recognize voices or faces — you don't know who is talking. Address the speaker as "you"; only use a name if they introduce themselves this turn.
|
||
|
||
## RESPONSE EXAMPLES
|
||
User: "How's the weather?"
|
||
Good: "Looks calm outside — unlike my Wi-Fi signal today."
|
||
Bad: "Sunny with leftover pizza vibes!"
|
||
|
||
User: "Can you help me fix this?"
|
||
Good: "Of course. Describe the issue, and I'll try not to make it worse."
|
||
|
||
## BEHAVIOR RULES
|
||
Be helpful, clear, and respectful.
|
||
Use humor sparingly — clarity comes first.
|
||
Admit mistakes briefly and correct them.
|
||
No markdown, lists, emojis, or stage directions — replies are spoken aloud.
|
||
Do not comment on the speaker's appearance, mood, or state; you cannot see them.
|
||
A camera image may be attached — only describe what you see if asked.
|
||
|
||
## TOOL & MOVEMENT RULES
|
||
Embed bracketed tags in your reply to trigger physical actions. Tags are stripped before speech — don't narrate the action, just do it.
|
||
- [nod] — nod yes
|
||
- [shake] — shake head no
|
||
- [wiggle] — wiggle antennas
|
||
- [dance] — play a random dance
|
||
- [photo] — save a camera snapshot
|
||
- [emotion:NAME] — play a pre-recorded emotion. DO NOT use unless the speaker explicitly asks you to show an emotion ("look happy", "be surprised", "can you look sad"). Never use emotions for normal replies, greetings, introductions, or acknowledgements. Default: no emotion tag.
|
||
{emotion_list}
|
||
|
||
## FINAL REMINDER
|
||
Short, clear, a little human, multilingual.
|
||
One quick helpful answer + one small wink of humor = perfect response."""
|
||
|
||
|
||
class LocalConversationStream:
|
||
"""Bidirectional audio stream using the robot's mic and speaker.
|
||
|
||
Mirrors the architecture of reachy_mini_conversation_app's LocalStream:
|
||
- record_loop: reads mic samples from robot SDK
|
||
- process_loop: VAD → STT → VLM → TTS
|
||
- play audio back through robot speaker
|
||
"""
|
||
|
||
# Phrases that trigger a camera look
|
||
LOOK_TRIGGERS = (
|
||
"see", "look", "watch", "show", "camera", "photo", "picture",
|
||
"what is this", "what's this", "what is that", "what's that",
|
||
"who is", "who's", "read", "color", "colour", "wear", "holding",
|
||
"in front", "around you", "your eyes", "describe",
|
||
)
|
||
|
||
# User words that gate the emotion tool — if none present, emotion tags are dropped
|
||
EMOTION_TRIGGERS = (
|
||
"happy", "sad", "angry", "excited", "surprised", "curious", "confused",
|
||
"scared", "tired", "bored", "love", "emotion", "mood", "feel", "feeling",
|
||
"look ", "act ", "be ", "show me",
|
||
)
|
||
|
||
def __init__(self, robot, models: Models, use_camera: bool = True):
|
||
from reachy_mini import ReachyMini
|
||
self.robot: ReachyMini = robot
|
||
self.models = models
|
||
self.use_camera = use_camera
|
||
|
||
self.vad = VAD()
|
||
self.conversation_history: deque = deque(maxlen=10)
|
||
self._stop = threading.Event()
|
||
self._speaking = threading.Event()
|
||
self._listening = threading.Event() # set while user is speaking
|
||
|
||
# Preload emotion library so prompt can list available emotions
|
||
self._emotion_library = None
|
||
emotion_list_str = ""
|
||
try:
|
||
from reachy_mini.motion.recorded_move import RecordedMoves
|
||
self._emotion_library = RecordedMoves(
|
||
"pollen-robotics/reachy-mini-emotions-library"
|
||
)
|
||
names = list(self._emotion_library.moves.keys())
|
||
if names:
|
||
emotion_list_str = "Available emotions: " + ", ".join(names)
|
||
log.info("Loaded %d emotions", len(names))
|
||
except Exception as e:
|
||
log.warning("Emotion library not available: %s", e)
|
||
|
||
self._system_prompt = SYSTEM_PROMPT.format(emotion_list=emotion_list_str)
|
||
|
||
def launch(self):
|
||
"""Start recording/playback and run the conversation loop."""
|
||
log.info("Starting media pipelines...")
|
||
self.robot.media.start_recording()
|
||
self.robot.media.start_playing()
|
||
time.sleep(1) # let pipelines warm up
|
||
|
||
input_sr = self.robot.media.get_input_audio_samplerate()
|
||
output_sr = self.robot.media.get_output_audio_samplerate()
|
||
log.info("Audio: input=%d Hz, output=%d Hz", input_sr, output_sr)
|
||
|
||
# Wake-up animation
|
||
self.robot.goto_target(antennas=[0.3, -0.3], duration=1.0)
|
||
time.sleep(0.5)
|
||
self.robot.goto_target(antennas=[0, 0], duration=1.0)
|
||
|
||
# Start breathing animation in background
|
||
self._breath_thread = threading.Thread(
|
||
target=self._breathing_animation, daemon=True
|
||
)
|
||
self._breath_thread.start()
|
||
|
||
print("\n--- Reachy Mini is listening! Speak to start a conversation. ---\n")
|
||
|
||
try:
|
||
self._conversation_loop(input_sr, output_sr)
|
||
except KeyboardInterrupt:
|
||
print("\n\nConversation ended.")
|
||
finally:
|
||
self.robot.media.stop_recording()
|
||
self.robot.media.stop_playing()
|
||
|
||
def _conversation_loop(self, input_sr: int, output_sr: int):
|
||
"""Main loop: poll mic → VAD → STT → VLM → TTS → speaker."""
|
||
while not self._stop.is_set():
|
||
# Read audio from robot mic
|
||
sample = self.robot.media.get_audio_sample()
|
||
if sample is None:
|
||
time.sleep(0.01)
|
||
continue
|
||
|
||
# Feed to VAD
|
||
utterance = self.vad.feed(sample)
|
||
if utterance is None:
|
||
continue
|
||
|
||
# ── Speech detected ──
|
||
self._listening.set()
|
||
|
||
# Antenna reaction — perk up
|
||
try:
|
||
self.robot.goto_target(antennas=[0.3, 0.3], duration=1.0)
|
||
except Exception:
|
||
pass
|
||
|
||
# Transcribe
|
||
log.info("Transcribing...")
|
||
text = self.models.transcribe(utterance, sr=input_sr)
|
||
stripped = (text or "").strip().rstrip(".!?,").lower()
|
||
NOISE_WORDS = {
|
||
"", "oh", "ah", "uh", "um", "mm", "hm", "hmm", "mhm",
|
||
"huh", "eh", "err", "erm", "ow", "ugh",
|
||
}
|
||
# Detect STT hallucination (runaway repetition like "oh, oh, oh, ..." x1000)
|
||
words = [w.strip(".,!?") for w in stripped.split() if w.strip(".,!?")]
|
||
unique_ratio = (len(set(words)) / len(words)) if words else 1.0
|
||
is_repetitive = len(words) > 8 and unique_ratio < 0.2
|
||
if stripped in NOISE_WORDS or len(stripped) < 2 or is_repetitive:
|
||
log.info(
|
||
"Noise/hallucinated transcription (%d words, %.0f%% unique), skipping",
|
||
len(words), unique_ratio * 100,
|
||
)
|
||
self._listening.clear()
|
||
try:
|
||
self.robot.goto_target(antennas=[0, 0], duration=1.0)
|
||
except Exception:
|
||
pass
|
||
continue
|
||
print(f"\n You: {text}")
|
||
|
||
# Camera: only capture + pass an image when the user explicitly asks to be seen.
|
||
# Passing a stale ambient frame every turn causes the VLM to hallucinate
|
||
# visual observations ("you look comfy") when the user hasn't asked.
|
||
frame = None
|
||
if self.use_camera:
|
||
text_lower = text.lower()
|
||
if any(trigger in text_lower for trigger in self.LOOK_TRIGGERS):
|
||
frame = self.robot.media.get_frame()
|
||
if frame is not None:
|
||
log.info("Look triggered — fresh frame: %s", frame.shape)
|
||
|
||
# Build history
|
||
history = [("system", self._system_prompt)] + list(self.conversation_history)
|
||
|
||
# Generate response
|
||
log.info("Thinking...")
|
||
response = self.models.think(text, image=frame, conversation_history=history)
|
||
response = response.strip()
|
||
if not response:
|
||
response = "Hmm, I'm not sure what to say about that."
|
||
|
||
# Extract action tags, keep spoken text clean
|
||
spoken, actions = self._parse_actions(response)
|
||
if not spoken:
|
||
spoken = "Okay."
|
||
|
||
# Gate: only allow emotion tags when the user explicitly asks for one
|
||
text_lower = text.lower()
|
||
if not any(w in text_lower for w in self.EMOTION_TRIGGERS):
|
||
actions = [(t, a) for (t, a) in actions if t != "emotion"]
|
||
|
||
print(f" Reachy: {response}")
|
||
if actions:
|
||
print(f" Actions: {actions}")
|
||
|
||
# Update history with the ORIGINAL response so the model remembers its own tag usage
|
||
self.conversation_history.append(("user", text))
|
||
self.conversation_history.append(("assistant", response))
|
||
|
||
# Kick off actions in parallel with TTS
|
||
self._execute_actions_async(actions)
|
||
response = spoken
|
||
|
||
# Generate speech
|
||
log.info("Generating speech...")
|
||
audio, tts_sr = self.models.speak(response)
|
||
|
||
# Resample to robot output rate if needed
|
||
if tts_sr != output_sr:
|
||
from scipy.signal import resample
|
||
audio = resample(audio, int(len(audio) * output_sr / tts_sr)).astype(np.float32)
|
||
|
||
# Start speaking animation in background
|
||
self._listening.clear()
|
||
anim_thread = threading.Thread(
|
||
target=self._speaking_animation, daemon=True
|
||
)
|
||
self._speaking.set()
|
||
anim_thread.start()
|
||
|
||
# Push audio to robot speaker in chunks
|
||
chunk_size = output_sr // 10 # 100ms chunks
|
||
for i in range(0, len(audio), chunk_size):
|
||
chunk = audio[i : i + chunk_size]
|
||
self.robot.media.push_audio_sample(chunk)
|
||
time.sleep(len(chunk) / output_sr * 0.85)
|
||
|
||
# Wait for audio to finish playing
|
||
time.sleep(0.5)
|
||
|
||
# Stop animation
|
||
self._speaking.clear()
|
||
anim_thread.join(timeout=2)
|
||
|
||
# Return to neutral
|
||
try:
|
||
pose = create_head_pose(roll=0, pitch=0, yaw=0, degrees=True, mm=False)
|
||
self.robot.goto_target(head=pose, antennas=[0, 0], duration=1.0)
|
||
except Exception:
|
||
pass
|
||
|
||
log.info("Ready.")
|
||
|
||
# ── Tool / action system ──────────────────────────────────────
|
||
TOOL_RE = re.compile(r"\[([a-zA-Z_][a-zA-Z0-9_]*)(?::([^\]]+))?\]")
|
||
KNOWN_TOOLS = {"nod", "shake", "wiggle", "dance", "photo", "emotion"}
|
||
|
||
def _parse_actions(self, text: str) -> tuple[str, list[tuple[str, Optional[str]]]]:
|
||
"""Extract [tool] and [emotion_name] tags. Returns (clean_text, [(tool, arg)])."""
|
||
actions = []
|
||
for m in self.TOOL_RE.finditer(text):
|
||
tag = m.group(1).lower()
|
||
arg = m.group(2).strip() if m.group(2) else None
|
||
if tag in self.KNOWN_TOOLS:
|
||
actions.append((tag, arg))
|
||
elif self._emotion_library and tag in (
|
||
n.lower() for n in self._emotion_library.moves.keys()
|
||
):
|
||
actions.append(("emotion", tag))
|
||
# else: unknown tag — still stripped from spoken text, not executed
|
||
clean = self.TOOL_RE.sub("", text).strip()
|
||
clean = re.sub(r"\s{2,}", " ", clean)
|
||
return clean, actions
|
||
|
||
def _execute_actions_async(self, actions: list[tuple[str, Optional[str]]]):
|
||
if not actions:
|
||
return
|
||
threading.Thread(
|
||
target=self._execute_actions, args=(actions,), daemon=True
|
||
).start()
|
||
|
||
def _execute_actions(self, actions):
|
||
for tool, arg in actions:
|
||
try:
|
||
log.info("Action: [%s%s]", tool, f":{arg}" if arg else "")
|
||
if tool == "nod":
|
||
self._gesture_nod()
|
||
elif tool == "shake":
|
||
self._gesture_shake()
|
||
elif tool == "wiggle":
|
||
self._gesture_wiggle()
|
||
elif tool == "photo":
|
||
self._save_photo()
|
||
elif tool == "dance":
|
||
self._play_random_dance()
|
||
elif tool == "emotion":
|
||
self._play_emotion(arg)
|
||
except Exception as e:
|
||
log.warning("Action [%s] failed: %s", tool, e)
|
||
|
||
def _gesture_nod(self):
|
||
down = create_head_pose(pitch=18, degrees=True, mm=False)
|
||
up = create_head_pose(pitch=-8, degrees=True, mm=False)
|
||
neutral = create_head_pose(pitch=0, degrees=True, mm=False)
|
||
for pose in (down, up, down, neutral):
|
||
self.robot.goto_target(head=pose, duration=0.6)
|
||
time.sleep(0.3)
|
||
|
||
def _gesture_shake(self):
|
||
left = create_head_pose(yaw=25, degrees=True, mm=False)
|
||
right = create_head_pose(yaw=-25, degrees=True, mm=False)
|
||
neutral = create_head_pose(yaw=0, degrees=True, mm=False)
|
||
for pose in (left, right, left, neutral):
|
||
self.robot.goto_target(head=pose, duration=0.6)
|
||
time.sleep(0.3)
|
||
|
||
def _gesture_wiggle(self):
|
||
for _ in range(3):
|
||
self.robot.goto_target(antennas=[0.5, -0.5], duration=0.5)
|
||
time.sleep(0.2)
|
||
self.robot.goto_target(antennas=[-0.5, 0.5], duration=0.5)
|
||
time.sleep(0.2)
|
||
self.robot.goto_target(antennas=[0, 0], duration=0.5)
|
||
|
||
def _save_photo(self):
|
||
if not self.use_camera:
|
||
log.info("Photo skipped — camera disabled")
|
||
return
|
||
frame = self.robot.media.get_frame()
|
||
if frame is None:
|
||
log.warning("Photo skipped — no frame")
|
||
return
|
||
import cv2
|
||
os.makedirs("photos", exist_ok=True)
|
||
ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||
path = f"photos/reachy_{ts}.jpg"
|
||
cv2.imwrite(path, frame)
|
||
log.info("Photo saved: %s", path)
|
||
|
||
def _play_random_dance(self):
|
||
# Lazy-load recorded moves the first time
|
||
if not hasattr(self, "_recorded_moves"):
|
||
try:
|
||
from reachy_mini.motion.recorded_move import RecordedMoves
|
||
self._recorded_moves = RecordedMoves(
|
||
"pollen-robotics/reachy-mini-dances-library"
|
||
)
|
||
except Exception as e:
|
||
log.warning("Could not load dance library: %s", e)
|
||
self._recorded_moves = None
|
||
if not self._recorded_moves:
|
||
return
|
||
import random
|
||
names = list(self._recorded_moves.moves.keys())
|
||
if not names:
|
||
return
|
||
name = random.choice(names)
|
||
log.info("Dancing: %s", name)
|
||
self.robot.play_move(self._recorded_moves.get(name), initial_goto_duration=1.0)
|
||
|
||
def _play_emotion(self, name: Optional[str]):
|
||
if not self._emotion_library:
|
||
log.info("Emotion library not available")
|
||
return
|
||
available = list(self._emotion_library.moves.keys())
|
||
if not available:
|
||
return
|
||
if name:
|
||
key = name.strip().lower()
|
||
match = next((n for n in available if n.lower() == key), None)
|
||
if match is None:
|
||
match = next((n for n in available if key in n.lower()), None)
|
||
if match is None:
|
||
import random
|
||
log.warning("Unknown emotion '%s'. Picking random.", name)
|
||
match = random.choice(available)
|
||
else:
|
||
import random
|
||
match = random.choice(available)
|
||
log.info("Emotion: %s", match)
|
||
self.robot.play_move(
|
||
self._emotion_library.get(match),
|
||
initial_goto_duration=1.0,
|
||
sound=False,
|
||
)
|
||
|
||
def _breathing_animation(self):
|
||
"""Idle breathing: subtle z-axis bob + antenna sway (like official app).
|
||
|
||
Pauses while speaking or listening.
|
||
"""
|
||
t = 0.0
|
||
while not self._stop.is_set():
|
||
if self._speaking.is_set() or self._listening.is_set():
|
||
time.sleep(0.1)
|
||
continue
|
||
try:
|
||
z_mm = 5.0 * np.sin(t * 0.1 * 2 * np.pi) # 0.1 Hz, 5mm
|
||
ant_r = np.deg2rad(15.0 * np.sin(t * 0.5 * 2 * np.pi))
|
||
ant_l = np.deg2rad(-15.0 * np.sin(t * 0.5 * 2 * np.pi))
|
||
pose = create_head_pose(z=z_mm, degrees=True, mm=True)
|
||
self.robot.goto_target(
|
||
head=pose, antennas=[ant_r, ant_l], duration=1.0,
|
||
)
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.5)
|
||
t += 0.5
|
||
|
||
def _speaking_animation(self):
|
||
"""Head movement while speaking — multi-frequency sway like official HeadWobbler."""
|
||
t = 0.0
|
||
while self._speaking.is_set():
|
||
try:
|
||
# Multiple frequencies for natural-looking movement
|
||
pitch = 4.0 * np.sin(t * 2.5) + 2.0 * np.sin(t * 1.1)
|
||
yaw = 3.0 * np.sin(t * 1.3) + 1.5 * np.sin(t * 0.7)
|
||
roll = 2.0 * np.sin(t * 1.8)
|
||
z_mm = 3.0 * np.sin(t * 2.0)
|
||
ant_l = np.deg2rad(20.0 * np.sin(t * 3.0))
|
||
ant_r = np.deg2rad(20.0 * np.sin(t * 3.0 + 0.5))
|
||
pose = create_head_pose(
|
||
z=z_mm, pitch=pitch, yaw=yaw, roll=roll,
|
||
degrees=True, mm=True,
|
||
)
|
||
self.robot.goto_target(
|
||
head=pose,
|
||
antennas=[ant_r, ant_l],
|
||
duration=1.0,
|
||
)
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.3)
|
||
t += 0.3
|
||
|
||
def close(self):
|
||
self._stop.set()
|
||
self._speaking.clear()
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# Entry point
|
||
# ──────────────────────────────────────────────────────────────
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Reachy Mini Local Conversation (MLX)"
|
||
)
|
||
parser.add_argument("--host", default="reachy-mini.local", help="Robot hostname/IP")
|
||
parser.add_argument("--port", type=int, default=8000)
|
||
parser.add_argument("--timeout", type=float, default=15.0)
|
||
parser.add_argument(
|
||
"--stt-model",
|
||
default="mlx-community/Qwen3-ASR-1.7B-8bit",
|
||
)
|
||
parser.add_argument(
|
||
"--vlm-model",
|
||
default="mlx-community/gemma-4-26b-a4b-it-bf16",
|
||
)
|
||
parser.add_argument(
|
||
"--tts-model",
|
||
default="mlx-community/Spark-TTS-0.5B-bf16",
|
||
)
|
||
parser.add_argument("--tts-voice", default="af_heart")
|
||
parser.add_argument(
|
||
"--voice-ref",
|
||
default=None,
|
||
help="Path to a reference WAV for voice cloning (Spark-TTS, ~5-15s clean audio)",
|
||
)
|
||
parser.add_argument(
|
||
"--voice-ref-text",
|
||
default=None,
|
||
help="Exact transcript of --voice-ref",
|
||
)
|
||
parser.add_argument("--no-camera", action="store_true", help="Audio-only mode")
|
||
parser.add_argument(
|
||
"--energy-threshold",
|
||
type=float,
|
||
default=0.06,
|
||
help="VAD sensitivity (lower = more sensitive)",
|
||
)
|
||
parser.add_argument("--debug", action="store_true")
|
||
args = parser.parse_args()
|
||
|
||
if args.debug:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
||
# Load models first (takes a few seconds, keeps them in memory)
|
||
models = Models(
|
||
stt_model=args.stt_model,
|
||
vlm_model=args.vlm_model,
|
||
tts_model=args.tts_model,
|
||
tts_voice=args.tts_voice,
|
||
use_vision=not args.no_camera,
|
||
voice_ref=args.voice_ref,
|
||
voice_ref_text=args.voice_ref_text,
|
||
)
|
||
|
||
# Connect to robot — uses SDK auto-detection for media backend
|
||
from reachy_mini import ReachyMini
|
||
|
||
log.info("Connecting to %s:%d ...", args.host, args.port)
|
||
|
||
with ReachyMini(
|
||
host=args.host,
|
||
port=args.port,
|
||
connection_mode="network",
|
||
timeout=args.timeout,
|
||
) as robot:
|
||
log.info("Connected!")
|
||
|
||
stream = LocalConversationStream(
|
||
robot=robot,
|
||
models=models,
|
||
use_camera=not args.no_camera,
|
||
)
|
||
stream.vad.energy_threshold = args.energy_threshold
|
||
|
||
try:
|
||
stream.launch()
|
||
finally:
|
||
stream.close()
|
||
robot.media.close()
|
||
robot.client.disconnect()
|
||
time.sleep(0.5)
|
||
log.info("Shutdown complete.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|