Add fully local conversational AI pipeline for Reachy Mini

Local STT (Qwen3-ASR), VLM (Gemma 4 26B-A4B), and TTS (Spark-TTS) running
on Apple Silicon via MLX, with bracket-tag action system for nod, shake,
wiggle, dance, photo, and pre-recorded emotions.
This commit is contained in:
Norbert Schmidt
2026-05-12 09:24:02 +02:00
parent 3a8a8e3145
commit 5a04a7133a
12 changed files with 4074 additions and 0 deletions

29
.gitignore vendored Normal file
View File

@@ -0,0 +1,29 @@
# Python
__pycache__/
*.py[cod]
*$py.class
.venv/
venv/
*.egg-info/
# macOS
.DS_Store
# Editor
.vscode/
.idea/
*.swp
# Project-local — user data and generated artifacts
.claude/
CLAUDE.md
photos/
*.wav
*.aiff
*.jpg
*.png
*.mp3
# Logs / scratch
*.log
/tmp/

120
README.md Normal file
View File

@@ -0,0 +1,120 @@
# reachy-mlx-vlm
Fully local conversational AI for the [Reachy Mini](https://www.pollen-robotics.com/reachy-mini) robot — every model runs on your Mac using [MLX](https://github.com/ml-explore/mlx). No cloud APIs, no realtime websockets, no usage tiers.
The robot's microphone, speaker and camera stream over WebRTC/IPC to the Mac, where the pipeline runs end-to-end on Apple Silicon, then sends synthesized audio back to the robot.
```
┌──────────────┐ mic ┌─────────────────────────────────────────────┐ speaker ┌──────────────┐
│ Reachy Mini │ ─────────▶ │ Mac (Apple Silicon) │ ──────────▶ │ Reachy Mini │
│ │ │ │ │ │
│ mic + cam │ camera │ Qwen3-ASR → Gemma 4 VLM → Spark-TTS │ │ speaker │
│ │ ─────────▶ │ (STT) (think) (voice) │ │ │
│ │ │ │ │ │
└──────────────┘ └─────────────────────────────────────────────┘ └──────────────┘
▲ │
└───────────────── head pose / antennas / dance / emotion / photo ───────────────────────┘
```
## What it does
- **Listens** via the robot's microphone, voice-activity detected with energy-based VAD + hallucination filtering.
- **Transcribes** with Qwen3-ASR-1.7B (8-bit MLX).
- **Thinks** with Gemma 4 26B-A4B (`mlx-community/gemma-4-26b-a4b-it-bf16`) — vision-capable, so it can describe what it sees on request.
- **Speaks** with Spark-TTS-0.5B-bf16 streamed back through the robot's speaker.
- **Moves** while talking — a multi-frequency head-wobble during speech, subtle breathing animation while idle.
- **Acts** on bracketed tags emitted by the model: `[nod]`, `[shake]`, `[wiggle]`, `[dance]`, `[photo]`, `[emotion:NAME]`. The tags are stripped from speech and executed in parallel with TTS.
## Requirements
- **Hardware**: Apple Silicon Mac (M1+) with ~32 GB RAM, Reachy Mini Wireless on the same Wi-Fi.
- **Software**: macOS, Python 3.12+, [uv](https://github.com/astral-sh/uv), GStreamer (`brew install gstreamer`), `sshpass` (for the helper scripts: `brew install sshpass`).
- **Disk**: ~30 GB for the model weights downloaded from Hugging Face on first run.
## Setup
```bash
git clone https://github.com/Pocket-science/reachy-mlx-vlm.git
cd reachy-mlx-vlm
uv sync
```
That's it. The first time you run `./run.sh`, MLX will pull the Qwen3-ASR, Gemma 4 VLM, Spark-TTS, and `reachy-mini-emotions-library` repos from Hugging Face (one-off, then cached).
> **Note on `mlx-vlm` versions.** The `pyproject.toml` lock currently resolves to `mlx-vlm 0.3.9` because `reachy-mini 1.2.3` pins `transformers` below the floor required by `mlx-vlm >= 0.5.0`. The codebase works fine on 0.3.90.4.4. To run 0.5.0 on the same venv:
>
> ```bash
> VIRTUAL_ENV=.venv uv pip install --no-deps mlx==0.31.2 mlx-metal==0.31.2 mlx-vlm==0.5.0
> ```
>
> This sidesteps the dependency tree — your Mac never actually runs `transformers` to drive the robot, so the constraint isn't load-bearing.
## Run
```bash
./run.sh # default — vision on, talks to reachy-mini.local
./run.sh --no-camera # audio-only mode
./run.sh --host 192.168.1.55 # explicit robot IP
./run.sh --energy-threshold 0.04 # more sensitive VAD
./run.sh --debug # verbose logs
```
The robot's daemon must be reachable at `http://reachy-mini.local:8000`. The wireless model ships with `--no-autostart`, so click the wake-up button in the dashboard once before launching, or hit:
```bash
curl -X POST "http://reachy-mini.local:8000/api/daemon/start?wake_up=true"
```
## Tool / action system
The model can emit bracketed tags anywhere in its reply. They are parsed out, executed asynchronously, and stripped from the spoken text.
| Tag | What it does |
| ------------------ | --------------------------------------------------------------------- |
| `[nod]` | Three-pose pitch nod. |
| `[shake]` | Three-pose yaw shake. |
| `[wiggle]` | Three antenna oscillations. |
| `[dance]` | Plays a random move from `pollen-robotics/reachy-mini-dances-library`. |
| `[photo]` | Saves a JPEG snapshot from the camera to `photos/`. |
| `[emotion:NAME]` | Plays a named pre-recorded emotion (silent — sound track muted). |
The emotion library is listed in the system prompt at startup so the model can pick by name. To avoid overuse, the runtime drops `[emotion:...]` tags whenever the user's utterance does not include an emotion trigger word (`happy`, `sad`, `look ...`, etc.). Configure the trigger list in `LocalConversationStream.EMOTION_TRIGGERS`.
## Customizing the personality
The system prompt lives at the top of `conversation.py` (`SYSTEM_PROMPT`). It follows the same structure as Pollen Robotics' official prompt (identity / response rules / tools) and currently bakes in a household context (names, location). Edit those lines to match your environment — or wire it to read from a config file.
## Helper scripts
| Script | Purpose |
| ------------------------------------- | ------------------------------------------------------------------ |
| `run.sh "..."` | Launch the conversation loop. |
| `speak.sh "Hello"` | One-shot TTS — generate on the Mac, play through the robot. |
| `take_photo.sh out.jpg` | Grab a single frame from the robot's camera over SSH. |
| `record_voice.sh 12 voice_ref.wav` | Capture 12 s of audio through the robot's microphone. |
| `test_reachy.py` | Minimal SDK smoke test — wiggles the antennas. |
| `look_at_click.py` | Click the camera feed to make the robot look at a pixel. |
The helper scripts assume the default `pollen` / `root` SSH credentials shipped with ReachyMiniOS.
## How this differs from the official conversation app
[`pollen-robotics/reachy_mini_conversation_app`](https://github.com/pollen-robotics/reachy_mini_conversation_app) uses the OpenAI Realtime API for STT + LLM + TTS in a single websocket session, with a full function-calling tool system (each tool is a class with a JSON schema, dispatched async with cancellation).
This project trades that off:
- **Pros**: zero cloud cost, fully offline, your voice never leaves the Mac, no rate limits.
- **Cons**: ~36 s round-trip on a base M-series chip (the 26B VLM is the bottleneck). Tools are bracket-tag strings parsed from plain text, not strict JSON function calls, because MLX's local VLM stack doesn't expose native tool calling.
If you want real-time turn-taking, use the official app. If you want a robot that thinks entirely on your laptop, this is for you.
## Acknowledgements
- [Pollen Robotics](https://www.pollen-robotics.com/) for the Reachy Mini SDK and the conversation-app architecture this riffs on.
- [`mlx-audio`](https://github.com/Blaizzy/mlx-audio) for the STT / TTS adapters.
- [`mlx-vlm`](https://github.com/Blaizzy/mlx-vlm) for the Gemma 4 VLM runtime.
- The [`mlx`](https://github.com/ml-explore/mlx) project at Apple ML Research.
## License
MIT — see [LICENSE](LICENSE).

779
conversation.py Normal file
View File

@@ -0,0 +1,779 @@
"""
Reachy Mini Local Conversational AI — fully on Apple Silicon.
Replaces OpenAI Realtime with local MLX models while keeping
the same robot integration as pollen-robotics/reachy_mini_conversation_app:
- Audio I/O through the robot's SDK (mic + speaker via WebRTC/IPC)
- Camera frames through the robot's SDK
- Movement animations (head wobble, breathing, antenna reactions)
Pipeline:
Robot mic → Qwen3-ASR (STT) → Gemma 4 26B-A4B (VLM) → Spark TTS → Robot speaker
Usage:
python conversation.py
python conversation.py --no-camera # skip vision
python conversation.py --host 192.168.1.55 # explicit IP
"""
import argparse
import datetime
import logging
import os
import re
import sys
import threading
import time
from collections import deque
from typing import Optional
# Fix GStreamer plugin path on macOS with brew — must happen before any GStreamer import
if sys.platform == "darwin" and os.path.isdir("/opt/homebrew/lib/gstreamer-1.0"):
os.environ.setdefault("GST_PLUGIN_PATH", "/opt/homebrew/lib/gstreamer-1.0")
os.environ.setdefault("GST_PLUGIN_SYSTEM_PATH", "/opt/homebrew/lib/gstreamer-1.0")
os.environ.setdefault("GI_TYPELIB_PATH", "/opt/homebrew/lib/girepository-1.0")
import numpy as np
from numpy.typing import NDArray
from reachy_mini.utils import create_head_pose
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("conversation")
# ──────────────────────────────────────────────────────────────
# Model manager — loads STT, VLM, TTS
# ──────────────────────────────────────────────────────────────
class Models:
def __init__(
self,
stt_model: str = "mlx-community/Qwen3-ASR-1.7B-8bit",
vlm_model: str = "mlx-community/gemma-4-26b-a4b-it-bf16",
tts_model: str = "mlx-community/Spark-TTS-0.5B-bf16",
tts_voice: str = "af_heart",
use_vision: bool = True,
voice_ref: Optional[str] = None,
voice_ref_text: Optional[str] = None,
):
self.use_vision = use_vision
self.tts_voice = tts_voice
self.voice_ref = voice_ref
self.voice_ref_text = voice_ref_text
self._lock = threading.Lock()
log.info("Loading STT: %s", stt_model)
from mlx_audio.stt import load as load_stt
self.stt = load_stt(stt_model)
log.info("STT ready.")
if use_vision:
log.info("Loading VLM: %s", vlm_model)
from mlx_vlm import load as load_vlm
self.vlm_model, self.vlm_processor = load_vlm(vlm_model)
log.info("VLM ready.")
else:
self.vlm_model = None
self.vlm_processor = None
log.info("Loading TTS: %s", tts_model)
from mlx_audio.tts import load as load_tts
self.tts = load_tts(tts_model)
log.info("TTS ready.")
log.info("All models loaded.")
def transcribe(self, audio: np.ndarray, sr: int = 16000) -> str:
if audio.ndim == 2:
audio = audio.mean(axis=1)
if sr != 16000:
from scipy.signal import resample
audio = resample(audio, int(len(audio) * 16000 / sr)).astype(np.float32)
with self._lock:
result = self.stt.generate(audio, language="en")
return result.text.strip()
def think(
self,
text: str,
image: Optional[np.ndarray] = None,
conversation_history: Optional[list] = None,
) -> str:
from mlx_vlm import generate as vlm_generate
from mlx_vlm.prompt_utils import apply_chat_template
if self.vlm_model is None:
return f"(no VLM — echo: {text})"
messages = []
if conversation_history:
for role, content in conversation_history:
messages.append({"role": role, "content": content})
messages.append(text)
num_images = 1 if image is not None else 0
prompt = apply_chat_template(
self.vlm_processor,
self.vlm_model.config,
messages,
num_images=num_images,
)
image_path = None
if image is not None:
import tempfile
import cv2
tmp = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False)
cv2.imwrite(tmp.name, image)
image_path = tmp.name
with self._lock:
response = vlm_generate(
model=self.vlm_model,
processor=self.vlm_processor,
prompt=prompt,
image=image_path,
max_tokens=200,
temperature=0.7,
repetition_penalty=1.2,
)
if image_path:
import os
os.unlink(image_path)
result = response.text if hasattr(response, "text") else str(response)
log.info(
"VLM: %d tokens @ %.1f tok/s",
getattr(response, "generation_tokens", 0),
getattr(response, "generation_tps", 0),
)
return result
def speak(self, text: str) -> tuple[np.ndarray, int]:
audio_chunks = []
sample_rate = 16000
# Build kwargs per-model: Kokoro takes lang_code, Spark-TTS supports voice cloning.
gen_kwargs: dict = {}
model_name = type(self.tts).__module__.lower()
if "kokoro" in model_name:
gen_kwargs["voice"] = self.tts_voice
gen_kwargs["lang_code"] = self.tts_voice[0] if self.tts_voice else "a"
elif "spark" in model_name:
pass # Spark-TTS uses default gender/pitch/speed
else:
gen_kwargs["voice"] = self.tts_voice
with self._lock:
for result in self.tts.generate(text, **gen_kwargs):
audio_np = np.array(result.audio).flatten()
if audio_np.size > 0:
audio_chunks.append(audio_np)
sample_rate = result.sample_rate
if not audio_chunks:
return np.zeros(1, dtype=np.float32), sample_rate
audio = np.concatenate(audio_chunks).astype(np.float32)
peak = np.abs(audio).max()
if peak > 0:
audio = audio / peak * 0.9
return audio, sample_rate
# ──────────────────────────────────────────────────────────────
# VAD — energy-based voice activity detection
# ──────────────────────────────────────────────────────────────
class VAD:
def __init__(
self,
energy_threshold: float = 0.06,
silence_duration: float = 1.2,
min_speech_duration: float = 0.3,
):
self.energy_threshold = energy_threshold
self.silence_duration = silence_duration
self.min_speech_duration = min_speech_duration
self._is_speaking = False
self._silence_start: Optional[float] = None
self._speech_start: Optional[float] = None
self._buffer: list[np.ndarray] = []
def feed(self, audio_chunk: np.ndarray) -> Optional[np.ndarray]:
if audio_chunk is None or len(audio_chunk) == 0:
return None
mono = audio_chunk.mean(axis=1) if audio_chunk.ndim == 2 else audio_chunk
energy = np.sqrt(np.mean(mono**2))
now = time.time()
if energy > self.energy_threshold:
if not self._is_speaking:
self._is_speaking = True
self._speech_start = now
self._buffer = []
self._silence_start = None
self._buffer.append(mono)
elif self._is_speaking:
self._buffer.append(mono)
if self._silence_start is None:
self._silence_start = now
elif now - self._silence_start > self.silence_duration:
self._is_speaking = False
duration = now - (self._speech_start or now)
if duration >= self.min_speech_duration and self._buffer:
utterance = np.concatenate(self._buffer)
self._buffer = []
log.info("Utterance: %.1fs", duration)
return utterance
self._buffer = []
return None
# ──────────────────────────────────────────────────────────────
# LocalStream — same pattern as the official conversation app
# ──────────────────────────────────────────────────────────────
SYSTEM_PROMPT = """## IDENTITY
You are Reachy Mini: a friendly, compact tabletop robot assistant with a calm voice and a subtle sense of humor.
Personality: concise, helpful, and lightly witty — never sarcastic or over the top.
You speak English by default and switch languages only if the speaker does.
## CRITICAL RESPONSE RULES
Respond in 12 sentences maximum.
Be helpful first, then add a small touch of humor if it fits naturally.
Avoid long explanations or filler words.
Keep responses under 25 words when possible.
## CORE TRAITS
Warm, efficient, and approachable.
Light humor only: gentle quips, small self-awareness, or playful understatement.
No sarcasm, no teasing.
If unsure, admit it briefly and offer help ("Not sure yet, but I can check!").
## HOUSEHOLD CONTEXT
You live with a family. EDIT THIS SECTION (SYSTEM_PROMPT in conversation.py) to personalize: who lives in the house, where, ages of kids, pets, shared interests, things to remember.
You cannot recognize voices or faces — you don't know who is talking. Address the speaker as "you"; only use a name if they introduce themselves this turn.
## RESPONSE EXAMPLES
User: "How's the weather?"
Good: "Looks calm outside — unlike my Wi-Fi signal today."
Bad: "Sunny with leftover pizza vibes!"
User: "Can you help me fix this?"
Good: "Of course. Describe the issue, and I'll try not to make it worse."
## BEHAVIOR RULES
Be helpful, clear, and respectful.
Use humor sparingly — clarity comes first.
Admit mistakes briefly and correct them.
No markdown, lists, emojis, or stage directions — replies are spoken aloud.
Do not comment on the speaker's appearance, mood, or state; you cannot see them.
A camera image may be attached — only describe what you see if asked.
## TOOL & MOVEMENT RULES
Embed bracketed tags in your reply to trigger physical actions. Tags are stripped before speech — don't narrate the action, just do it.
- [nod] — nod yes
- [shake] — shake head no
- [wiggle] — wiggle antennas
- [dance] — play a random dance
- [photo] — save a camera snapshot
- [emotion:NAME] — play a pre-recorded emotion. DO NOT use unless the speaker explicitly asks you to show an emotion ("look happy", "be surprised", "can you look sad"). Never use emotions for normal replies, greetings, introductions, or acknowledgements. Default: no emotion tag.
{emotion_list}
## FINAL REMINDER
Short, clear, a little human, multilingual.
One quick helpful answer + one small wink of humor = perfect response."""
class LocalConversationStream:
"""Bidirectional audio stream using the robot's mic and speaker.
Mirrors the architecture of reachy_mini_conversation_app's LocalStream:
- record_loop: reads mic samples from robot SDK
- process_loop: VAD → STT → VLM → TTS
- play audio back through robot speaker
"""
# Phrases that trigger a camera look
LOOK_TRIGGERS = (
"see", "look", "watch", "show", "camera", "photo", "picture",
"what is this", "what's this", "what is that", "what's that",
"who is", "who's", "read", "color", "colour", "wear", "holding",
"in front", "around you", "your eyes", "describe",
)
# User words that gate the emotion tool — if none present, emotion tags are dropped
EMOTION_TRIGGERS = (
"happy", "sad", "angry", "excited", "surprised", "curious", "confused",
"scared", "tired", "bored", "love", "emotion", "mood", "feel", "feeling",
"look ", "act ", "be ", "show me",
)
def __init__(self, robot, models: Models, use_camera: bool = True):
from reachy_mini import ReachyMini
self.robot: ReachyMini = robot
self.models = models
self.use_camera = use_camera
self.vad = VAD()
self.conversation_history: deque = deque(maxlen=10)
self._stop = threading.Event()
self._speaking = threading.Event()
self._listening = threading.Event() # set while user is speaking
# Preload emotion library so prompt can list available emotions
self._emotion_library = None
emotion_list_str = ""
try:
from reachy_mini.motion.recorded_move import RecordedMoves
self._emotion_library = RecordedMoves(
"pollen-robotics/reachy-mini-emotions-library"
)
names = list(self._emotion_library.moves.keys())
if names:
emotion_list_str = "Available emotions: " + ", ".join(names)
log.info("Loaded %d emotions", len(names))
except Exception as e:
log.warning("Emotion library not available: %s", e)
self._system_prompt = SYSTEM_PROMPT.format(emotion_list=emotion_list_str)
def launch(self):
"""Start recording/playback and run the conversation loop."""
log.info("Starting media pipelines...")
self.robot.media.start_recording()
self.robot.media.start_playing()
time.sleep(1) # let pipelines warm up
input_sr = self.robot.media.get_input_audio_samplerate()
output_sr = self.robot.media.get_output_audio_samplerate()
log.info("Audio: input=%d Hz, output=%d Hz", input_sr, output_sr)
# Wake-up animation
self.robot.goto_target(antennas=[0.3, -0.3], duration=1.0)
time.sleep(0.5)
self.robot.goto_target(antennas=[0, 0], duration=1.0)
# Start breathing animation in background
self._breath_thread = threading.Thread(
target=self._breathing_animation, daemon=True
)
self._breath_thread.start()
print("\n--- Reachy Mini is listening! Speak to start a conversation. ---\n")
try:
self._conversation_loop(input_sr, output_sr)
except KeyboardInterrupt:
print("\n\nConversation ended.")
finally:
self.robot.media.stop_recording()
self.robot.media.stop_playing()
def _conversation_loop(self, input_sr: int, output_sr: int):
"""Main loop: poll mic → VAD → STT → VLM → TTS → speaker."""
while not self._stop.is_set():
# Read audio from robot mic
sample = self.robot.media.get_audio_sample()
if sample is None:
time.sleep(0.01)
continue
# Feed to VAD
utterance = self.vad.feed(sample)
if utterance is None:
continue
# ── Speech detected ──
self._listening.set()
# Antenna reaction — perk up
try:
self.robot.goto_target(antennas=[0.3, 0.3], duration=1.0)
except Exception:
pass
# Transcribe
log.info("Transcribing...")
text = self.models.transcribe(utterance, sr=input_sr)
stripped = (text or "").strip().rstrip(".!?,").lower()
NOISE_WORDS = {
"", "oh", "ah", "uh", "um", "mm", "hm", "hmm", "mhm",
"huh", "eh", "err", "erm", "ow", "ugh",
}
# Detect STT hallucination (runaway repetition like "oh, oh, oh, ..." x1000)
words = [w.strip(".,!?") for w in stripped.split() if w.strip(".,!?")]
unique_ratio = (len(set(words)) / len(words)) if words else 1.0
is_repetitive = len(words) > 8 and unique_ratio < 0.2
if stripped in NOISE_WORDS or len(stripped) < 2 or is_repetitive:
log.info(
"Noise/hallucinated transcription (%d words, %.0f%% unique), skipping",
len(words), unique_ratio * 100,
)
self._listening.clear()
try:
self.robot.goto_target(antennas=[0, 0], duration=1.0)
except Exception:
pass
continue
print(f"\n You: {text}")
# Camera: only capture + pass an image when the user explicitly asks to be seen.
# Passing a stale ambient frame every turn causes the VLM to hallucinate
# visual observations ("you look comfy") when the user hasn't asked.
frame = None
if self.use_camera:
text_lower = text.lower()
if any(trigger in text_lower for trigger in self.LOOK_TRIGGERS):
frame = self.robot.media.get_frame()
if frame is not None:
log.info("Look triggered — fresh frame: %s", frame.shape)
# Build history
history = [("system", self._system_prompt)] + list(self.conversation_history)
# Generate response
log.info("Thinking...")
response = self.models.think(text, image=frame, conversation_history=history)
response = response.strip()
if not response:
response = "Hmm, I'm not sure what to say about that."
# Extract action tags, keep spoken text clean
spoken, actions = self._parse_actions(response)
if not spoken:
spoken = "Okay."
# Gate: only allow emotion tags when the user explicitly asks for one
text_lower = text.lower()
if not any(w in text_lower for w in self.EMOTION_TRIGGERS):
actions = [(t, a) for (t, a) in actions if t != "emotion"]
print(f" Reachy: {response}")
if actions:
print(f" Actions: {actions}")
# Update history with the ORIGINAL response so the model remembers its own tag usage
self.conversation_history.append(("user", text))
self.conversation_history.append(("assistant", response))
# Kick off actions in parallel with TTS
self._execute_actions_async(actions)
response = spoken
# Generate speech
log.info("Generating speech...")
audio, tts_sr = self.models.speak(response)
# Resample to robot output rate if needed
if tts_sr != output_sr:
from scipy.signal import resample
audio = resample(audio, int(len(audio) * output_sr / tts_sr)).astype(np.float32)
# Start speaking animation in background
self._listening.clear()
anim_thread = threading.Thread(
target=self._speaking_animation, daemon=True
)
self._speaking.set()
anim_thread.start()
# Push audio to robot speaker in chunks
chunk_size = output_sr // 10 # 100ms chunks
for i in range(0, len(audio), chunk_size):
chunk = audio[i : i + chunk_size]
self.robot.media.push_audio_sample(chunk)
time.sleep(len(chunk) / output_sr * 0.85)
# Wait for audio to finish playing
time.sleep(0.5)
# Stop animation
self._speaking.clear()
anim_thread.join(timeout=2)
# Return to neutral
try:
pose = create_head_pose(roll=0, pitch=0, yaw=0, degrees=True, mm=False)
self.robot.goto_target(head=pose, antennas=[0, 0], duration=1.0)
except Exception:
pass
log.info("Ready.")
# ── Tool / action system ──────────────────────────────────────
TOOL_RE = re.compile(r"\[([a-zA-Z_][a-zA-Z0-9_]*)(?::([^\]]+))?\]")
KNOWN_TOOLS = {"nod", "shake", "wiggle", "dance", "photo", "emotion"}
def _parse_actions(self, text: str) -> tuple[str, list[tuple[str, Optional[str]]]]:
"""Extract [tool] and [emotion_name] tags. Returns (clean_text, [(tool, arg)])."""
actions = []
for m in self.TOOL_RE.finditer(text):
tag = m.group(1).lower()
arg = m.group(2).strip() if m.group(2) else None
if tag in self.KNOWN_TOOLS:
actions.append((tag, arg))
elif self._emotion_library and tag in (
n.lower() for n in self._emotion_library.moves.keys()
):
actions.append(("emotion", tag))
# else: unknown tag — still stripped from spoken text, not executed
clean = self.TOOL_RE.sub("", text).strip()
clean = re.sub(r"\s{2,}", " ", clean)
return clean, actions
def _execute_actions_async(self, actions: list[tuple[str, Optional[str]]]):
if not actions:
return
threading.Thread(
target=self._execute_actions, args=(actions,), daemon=True
).start()
def _execute_actions(self, actions):
for tool, arg in actions:
try:
log.info("Action: [%s%s]", tool, f":{arg}" if arg else "")
if tool == "nod":
self._gesture_nod()
elif tool == "shake":
self._gesture_shake()
elif tool == "wiggle":
self._gesture_wiggle()
elif tool == "photo":
self._save_photo()
elif tool == "dance":
self._play_random_dance()
elif tool == "emotion":
self._play_emotion(arg)
except Exception as e:
log.warning("Action [%s] failed: %s", tool, e)
def _gesture_nod(self):
down = create_head_pose(pitch=18, degrees=True, mm=False)
up = create_head_pose(pitch=-8, degrees=True, mm=False)
neutral = create_head_pose(pitch=0, degrees=True, mm=False)
for pose in (down, up, down, neutral):
self.robot.goto_target(head=pose, duration=0.6)
time.sleep(0.3)
def _gesture_shake(self):
left = create_head_pose(yaw=25, degrees=True, mm=False)
right = create_head_pose(yaw=-25, degrees=True, mm=False)
neutral = create_head_pose(yaw=0, degrees=True, mm=False)
for pose in (left, right, left, neutral):
self.robot.goto_target(head=pose, duration=0.6)
time.sleep(0.3)
def _gesture_wiggle(self):
for _ in range(3):
self.robot.goto_target(antennas=[0.5, -0.5], duration=0.5)
time.sleep(0.2)
self.robot.goto_target(antennas=[-0.5, 0.5], duration=0.5)
time.sleep(0.2)
self.robot.goto_target(antennas=[0, 0], duration=0.5)
def _save_photo(self):
if not self.use_camera:
log.info("Photo skipped — camera disabled")
return
frame = self.robot.media.get_frame()
if frame is None:
log.warning("Photo skipped — no frame")
return
import cv2
os.makedirs("photos", exist_ok=True)
ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
path = f"photos/reachy_{ts}.jpg"
cv2.imwrite(path, frame)
log.info("Photo saved: %s", path)
def _play_random_dance(self):
# Lazy-load recorded moves the first time
if not hasattr(self, "_recorded_moves"):
try:
from reachy_mini.motion.recorded_move import RecordedMoves
self._recorded_moves = RecordedMoves(
"pollen-robotics/reachy-mini-dances-library"
)
except Exception as e:
log.warning("Could not load dance library: %s", e)
self._recorded_moves = None
if not self._recorded_moves:
return
import random
names = list(self._recorded_moves.moves.keys())
if not names:
return
name = random.choice(names)
log.info("Dancing: %s", name)
self.robot.play_move(self._recorded_moves.get(name), initial_goto_duration=1.0)
def _play_emotion(self, name: Optional[str]):
if not self._emotion_library:
log.info("Emotion library not available")
return
available = list(self._emotion_library.moves.keys())
if not available:
return
if name:
key = name.strip().lower()
match = next((n for n in available if n.lower() == key), None)
if match is None:
match = next((n for n in available if key in n.lower()), None)
if match is None:
import random
log.warning("Unknown emotion '%s'. Picking random.", name)
match = random.choice(available)
else:
import random
match = random.choice(available)
log.info("Emotion: %s", match)
self.robot.play_move(
self._emotion_library.get(match),
initial_goto_duration=1.0,
sound=False,
)
def _breathing_animation(self):
"""Idle breathing: subtle z-axis bob + antenna sway (like official app).
Pauses while speaking or listening.
"""
t = 0.0
while not self._stop.is_set():
if self._speaking.is_set() or self._listening.is_set():
time.sleep(0.1)
continue
try:
z_mm = 5.0 * np.sin(t * 0.1 * 2 * np.pi) # 0.1 Hz, 5mm
ant_r = np.deg2rad(15.0 * np.sin(t * 0.5 * 2 * np.pi))
ant_l = np.deg2rad(-15.0 * np.sin(t * 0.5 * 2 * np.pi))
pose = create_head_pose(z=z_mm, degrees=True, mm=True)
self.robot.goto_target(
head=pose, antennas=[ant_r, ant_l], duration=1.0,
)
except Exception:
pass
time.sleep(0.5)
t += 0.5
def _speaking_animation(self):
"""Head movement while speaking — multi-frequency sway like official HeadWobbler."""
t = 0.0
while self._speaking.is_set():
try:
# Multiple frequencies for natural-looking movement
pitch = 4.0 * np.sin(t * 2.5) + 2.0 * np.sin(t * 1.1)
yaw = 3.0 * np.sin(t * 1.3) + 1.5 * np.sin(t * 0.7)
roll = 2.0 * np.sin(t * 1.8)
z_mm = 3.0 * np.sin(t * 2.0)
ant_l = np.deg2rad(20.0 * np.sin(t * 3.0))
ant_r = np.deg2rad(20.0 * np.sin(t * 3.0 + 0.5))
pose = create_head_pose(
z=z_mm, pitch=pitch, yaw=yaw, roll=roll,
degrees=True, mm=True,
)
self.robot.goto_target(
head=pose,
antennas=[ant_r, ant_l],
duration=1.0,
)
except Exception:
pass
time.sleep(0.3)
t += 0.3
def close(self):
self._stop.set()
self._speaking.clear()
# ──────────────────────────────────────────────────────────────
# Entry point
# ──────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Reachy Mini Local Conversation (MLX)"
)
parser.add_argument("--host", default="reachy-mini.local", help="Robot hostname/IP")
parser.add_argument("--port", type=int, default=8000)
parser.add_argument("--timeout", type=float, default=15.0)
parser.add_argument(
"--stt-model",
default="mlx-community/Qwen3-ASR-1.7B-8bit",
)
parser.add_argument(
"--vlm-model",
default="mlx-community/gemma-4-26b-a4b-it-bf16",
)
parser.add_argument(
"--tts-model",
default="mlx-community/Spark-TTS-0.5B-bf16",
)
parser.add_argument("--tts-voice", default="af_heart")
parser.add_argument(
"--voice-ref",
default=None,
help="Path to a reference WAV for voice cloning (Spark-TTS, ~5-15s clean audio)",
)
parser.add_argument(
"--voice-ref-text",
default=None,
help="Exact transcript of --voice-ref",
)
parser.add_argument("--no-camera", action="store_true", help="Audio-only mode")
parser.add_argument(
"--energy-threshold",
type=float,
default=0.06,
help="VAD sensitivity (lower = more sensitive)",
)
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Load models first (takes a few seconds, keeps them in memory)
models = Models(
stt_model=args.stt_model,
vlm_model=args.vlm_model,
tts_model=args.tts_model,
tts_voice=args.tts_voice,
use_vision=not args.no_camera,
voice_ref=args.voice_ref,
voice_ref_text=args.voice_ref_text,
)
# Connect to robot — uses SDK auto-detection for media backend
from reachy_mini import ReachyMini
log.info("Connecting to %s:%d ...", args.host, args.port)
with ReachyMini(
host=args.host,
port=args.port,
connection_mode="network",
timeout=args.timeout,
) as robot:
log.info("Connected!")
stream = LocalConversationStream(
robot=robot,
models=models,
use_camera=not args.no_camera,
)
stream.vad.energy_threshold = args.energy_threshold
try:
stream.launch()
finally:
stream.close()
robot.media.close()
robot.client.disconnect()
time.sleep(0.5)
log.info("Shutdown complete.")
if __name__ == "__main__":
main()

70
look_at_click.py Normal file
View File

@@ -0,0 +1,70 @@
"""Demonstrate how to make Reachy Mini look at a point in an image.
When you click on the image, Reachy Mini will look at the point you clicked on.
It uses OpenCV to capture video from a camera and display it, and Reachy Mini's
look_at_image method to make the robot look at the specified point.
Note: The daemon must be running before executing this script.
"""
import argparse
import cv2
from reachy_mini import ReachyMini
def click(event, x, y, flags, param):
"""Handle mouse click events to get the coordinates of the click."""
if event == cv2.EVENT_LBUTTONDOWN:
param["just_clicked"] = True
param["x"] = x
param["y"] = y
def main(backend: str) -> None:
"""Show the camera feed from Reachy Mini and make it look at clicked points."""
state = {"x": 0, "y": 0, "just_clicked": False}
cv2.namedWindow("Reachy Mini Camera")
cv2.setMouseCallback("Reachy Mini Camera", click, param=state)
print("Click on the image to make ReachyMini look at that point.")
print("Press 'q' to quit the camera feed.")
with ReachyMini(localhost_only=False, media_backend=backend) as reachy_mini:
try:
while True:
frame = reachy_mini.media.get_frame()
if frame is None:
print("Failed to grab frame.")
continue
cv2.imshow("Reachy Mini Camera", frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
print("Exiting...")
break
if state["just_clicked"]:
reachy_mini.look_at_image(state["x"], state["y"], duration=0.3)
state["just_clicked"] = False
except KeyboardInterrupt:
print("Interrupted. Closing viewer...")
finally:
cv2.destroyAllWindows()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Display Reachy Mini's camera feed and make it look at clicked points."
)
parser.add_argument(
"--backend",
type=str,
choices=["default", "gstreamer", "webrtc"],
default="default",
help="Media backend to use.",
)
args = parser.parse_args()
main(backend=args.backend)

23
pyproject.toml Normal file
View File

@@ -0,0 +1,23 @@
[project]
name = "reachy-mlx-vlm"
version = "0.1.0"
description = "Fully local conversational AI for the Reachy Mini robot, running on Apple Silicon with MLX."
readme = "README.md"
requires-python = ">=3.12"
license = { text = "MIT" }
authors = [{ name = "Norbert Schmidt" }]
keywords = ["reachy-mini", "robotics", "mlx", "llm", "tts", "stt", "apple-silicon"]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: MacOS",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
]
dependencies = [
"einops>=0.8.1",
"mlx-audio>=0.2.9",
"mlx-vlm>=0.3.9",
"numba>=0.63.1",
"reachy-mini[gstreamer,mujoco]>=1.2.3",
"tiktoken>=0.12.0",
]

61
record_voice.py Normal file
View File

@@ -0,0 +1,61 @@
"""
Record a voice reference clip for Spark-TTS cloning.
Usage:
.venv/bin/python record_voice.py [seconds] [output.wav]
Defaults: 12 seconds, output to voice_ref.wav
Tips:
- Quiet room, no background noise
- Natural speaking pace, not robotic
- Read a passage of real sentences (not word lists)
- 5-15 seconds works best
"""
import sys
import wave
import numpy as np
import sounddevice as sd
SAMPLE_RATE = 16000
def main():
duration = float(sys.argv[1]) if len(sys.argv) > 1 else 12.0
outpath = sys.argv[2] if len(sys.argv) > 2 else "voice_ref.wav"
print(f"Recording {duration:.0f}s to {outpath}")
print("Speak naturally. Starting in 3...")
sd.sleep(1000)
print("2...")
sd.sleep(1000)
print("1...")
sd.sleep(1000)
print("GO — speak now")
audio = sd.rec(
int(duration * SAMPLE_RATE),
samplerate=SAMPLE_RATE,
channels=1,
dtype="int16",
)
sd.wait()
print("Done.")
with wave.open(outpath, "wb") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(SAMPLE_RATE)
f.writeframes(audio.tobytes())
peak = np.abs(audio).max()
print(f"Saved {outpath} — peak={peak} (healthy range: 500030000)")
if peak < 2000:
print("WARNING: very quiet recording. Speak louder or move closer to mic.")
elif peak > 32000:
print("WARNING: clipping. Move further from mic or lower input volume.")
if __name__ == "__main__":
main()

48
record_voice.sh Executable file
View File

@@ -0,0 +1,48 @@
#!/bin/bash
# Record a voice reference via the Reachy Mini's microphone.
# Usage: ./record_voice.sh [seconds] [output.wav]
set -e
DURATION="${1:-12}"
OUTPUT="${2:-voice_ref.wav}"
echo "Recording ${DURATION}s via Reachy Mini's microphone to ${OUTPUT}"
echo "Stand/sit near the robot. Starting in 3 seconds..."
sshpass -p 'root' ssh -o StrictHostKeyChecking=no pollen@reachy-mini.local "/venvs/mini_daemon/bin/python -c \"
import time, wave, numpy as np
from reachy_mini import ReachyMini
with ReachyMini() as mini:
mini.media.start_recording()
time.sleep(1.5) # warm-up + countdown buffer
print('GO — speak now', flush=True)
sr = mini.media.get_input_audio_samplerate()
chunks = []
start = time.time()
while time.time() - start < ${DURATION}:
sample = mini.media.get_audio_sample()
if sample is not None:
mono = sample.mean(axis=1) if sample.ndim == 2 else sample
chunks.append(mono)
else:
time.sleep(0.01)
mini.media.stop_recording()
audio = np.concatenate(chunks).astype(np.float32)
peak = float(np.abs(audio).max())
print(f'Captured {len(audio)/sr:.1f}s @ {sr}Hz, peak={peak:.3f}', flush=True)
# Normalize + convert to int16
if peak > 0:
audio = audio / peak * 0.9
pcm = (audio * 32767).astype(np.int16)
with wave.open('/tmp/voice_ref.wav', 'wb') as f:
f.setnchannels(1); f.setsampwidth(2); f.setframerate(sr)
f.writeframes(pcm.tobytes())
\""
sshpass -p 'root' scp pollen@reachy-mini.local:/tmp/voice_ref.wav "${OUTPUT}"
echo "Saved ${OUTPUT}"

3
run.sh Executable file
View File

@@ -0,0 +1,3 @@
#!/bin/bash
cd "$(dirname "$0")"
exec .venv/bin/python conversation.py "$@"

26
speak.sh Executable file
View File

@@ -0,0 +1,26 @@
#!/bin/bash
# Make Reachy Mini speak a sentence via local Spark-TTS on the Mac,
# then play it back through the robot's speaker over SSH.
#
# Usage: ./speak.sh "Hello world"
TEXT="${1:-Hello, I am Reachy Mini.}"
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
# Generate audio with Spark-TTS (mlx-audio) on the Mac.
"$SCRIPT_DIR/.venv/bin/python" -m mlx_audio.tts.generate \
--model mlx-community/Spark-TTS-0.5B-bf16 \
--text "$TEXT" \
--file_prefix /tmp/reachy_speech
# Copy to the robot and play it through its speaker.
sshpass -p 'root' scp -o StrictHostKeyChecking=no /tmp/reachy_speech_000.wav pollen@reachy-mini.local:/tmp/speech.wav
sshpass -p 'root' ssh -o StrictHostKeyChecking=no pollen@reachy-mini.local "/venvs/mini_daemon/bin/python -c \"
import time
from reachy_mini import ReachyMini
with ReachyMini() as mini:
mini.media.play_sound('/tmp/speech.wav')
time.sleep(10)
\""
echo "Spoke: $TEXT"

24
take_photo.sh Executable file
View File

@@ -0,0 +1,24 @@
#!/bin/bash
# Take a photo from Reachy Mini camera and copy it locally
# Usage: ./take_photo.sh [output_filename]
OUTPUT="${1:-photo.jpg}"
sshpass -p 'root' ssh -o StrictHostKeyChecking=no pollen@reachy-mini.local "/venvs/mini_daemon/bin/python -c \"
from reachy_mini import ReachyMini
import cv2
import time
with ReachyMini() as mini:
time.sleep(1)
for i in range(5):
frame = mini.media.get_frame()
if frame is not None:
cv2.imwrite('/tmp/photo.jpg', frame)
print('captured')
break
time.sleep(0.5)
\""
sshpass -p 'root' scp -o StrictHostKeyChecking=no pollen@reachy-mini.local:/tmp/photo.jpg "$OUTPUT"
echo "Saved to $OUTPUT"

10
test_reachy.py Normal file
View File

@@ -0,0 +1,10 @@
from reachy_mini import ReachyMini
with ReachyMini(localhost_only=False, media_backend='no_media') as mini:
print("Connected to Reachy Mini!")
# Wiggle antennas
print("Wiggling antennas...")
mini.goto_target(antennas=[0.5, -0.5], duration=0.5)
mini.goto_target(antennas=[-0.5, 0.5], duration=0.5)
mini.goto_target(antennas=[0, 0], duration=0.5)

2881
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff