Camera Control & AI Detection System
Overview
The camera_control.py script is the core component of our AI-powered road damage detection system. It integrates real-time video recording, RC-triggered control, and Edge TPU-accelerated AI inference into a single cohesive system running on Raspberry Pi Zero 2 WH.
Key Features
- RC Switch Triggered Recording: Start/stop video recording via auxiliary channel on RC transmitter
- Real-time AI Inference: Detect road damage (potholes) while recording using Google Coral Edge TPU
- Dual-Stream Processing: 1080p recording + low-res inference stream for optimal performance
- MSP Protocol Integration: Direct communication with INAV flight controller
- Automatic Detection Capture: Save raw images with metadata when damage is detected
System Architecture
┌──────────────────────────────────────────────────────────────┐
│ Camera Control System │
├──────────────────────────────────────────────────────────────┤
│ │
│ RC Transmitter Raspberry Pi Zero 2 WH │
│ │ │ │
│ │ MSP Protocol │ │
│ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ INAV │◄────────────────►│ Serial Port │ │
│ │ FC │ /dev/ttyS0 │ (UART) │ │
│ └─────────────┘ 115200 baud └─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ ┌─────────────────┐ │
│ │ Camera │─────────────────►│ Picamera2 │ │
│ │ Module │ CSI Interface │ Video Pipeline │ │
│ └─────────────┘ └─────────────────┘ │
│ │ │
│ ┌───────────┴──────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐│
│ │ Main Stream │ │ Lores Stream ││
│ │ 1920x1080 │ │ 320x320 ││
│ │ YUV420 │ │ YUV420 ││
│ └──────┬───────┘ └──────┬───────┘│
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐│
│ │ H.264 Encoder│ │ YUV→RGB ││
│ │ Recording │ │ Conversion ││
│ └──────────────┘ └──────┬───────┘│
│ │ │
│ ▼ │
│ ┌──────────────┐│
│ │ TFLite Model ││
│ │ + Edge TPU ││
│ │ (YOLOv8) ││
│ └──────┬───────┘│
│ │ │
│ ▼ │
│ ┌──────────────┐│
│ │ Detection ││
│ │ Results ││
│ └──────┬───────┘│
│ │ │
│ ▼ │
│ ┌──────────────┐│
│ │ Save Image ││
│ │ with Bbox ││
│ └──────────────┘│
│ │
└──────────────────────────────────────────────────────────────┘
How It Works
1. Initialization Phase
On startup, the script initializes three main components:
Serial Communication (/dev/ttyS0 @ 115200 baud)
- Opens UART connection to INAV flight controller
- Uses MSP (MultiWii Serial Protocol) to read RC channel values
- Monitors AUX8 channel (index 8) for recording trigger
AI Model Loading
- Loads quantized YOLOv8 TFLite model (
best_int8.tflite) - Initializes Google Coral Edge TPU accelerator
- Configures input tensor (320x320 int8)
- Sets up output tensor for YOLO predictions
Camera Configuration
- Configures Picamera2 with dual-stream setup:
- Main stream: 1920x1080 YUV420 for H.264 recording
- Lores stream: 320x320 YUV420 for AI inference
- Uses 3 buffer system to prevent frame drops
2. Main Control Loop
The script continuously monitors RC channels and processes video:
while True:
# 1. Read RC channels via MSP
channels = get_rc_channels()
# 2. Check AUX8 switch position
if switch_val > 1500 and not recording:
# Start recording
picam2.start_recording(encoder, filename)
elif switch_val < 1500 and recording:
# Stop recording
picam2.stop_recording()
# 3. If recording, run AI inference
if recording:
frame = picam2.capture_array("lores")
rgb_frame = yuv420_to_rgb(frame)
boxes, classes, scores = run_inference(rgb_frame)
# Save detections
if scores > threshold:
save_detection(rgb_frame, box, score)
3. MSP Protocol Communication
The script uses MSP (MultiWii Serial Protocol) to communicate with the flight controller:
# MSP_RC command (0x69)
MSP_RC_REQUEST = b'$M<\x00\x69\x69'
def get_rc_channels():
ser.write(MSP_RC_REQUEST)
header = ser.read(5) # Read: $M> + size + command
payload = ser.read(size) # Read channel data
checksum = ser.read(1)
# Unpack 16-bit unsigned integers
channels = struct.unpack('<' + 'H'*count, payload)
return channels # Returns tuple: (1000-2000, ...)
Channel Values:
- 1000-1500: Switch OFF (stop recording)
- 1500-2000: Switch ON (start recording)
4. Dual-Stream Video Processing
The Picamera2 library provides two simultaneous streams:
Main Stream (Recording)
- Resolution: 1920x1080
- Format: YUV420
- Purpose: High-quality H.264 video recording
- Encoded and saved to
.h264file
Lores Stream (AI Inference)
- Resolution: 320x320 (matches model input)
- Format: YUV420
- Purpose: Real-time object detection
- Converted to RGB for inference
This dual-stream approach allows high-quality recording while maintaining fast inference speeds.
5. AI Inference Pipeline
YUV to RGB Conversion
def yuv420_to_rgb(yuv_frame, width, height):
yuv = yuv_frame.reshape((height * 3 // 2, width))
rgb = cv2.cvtColor(yuv, cv2.COLOR_YUV2RGB_I420)
return rgb
Model Inference
def run_inference(image):
# Prepare input (int8 quantization)
input_data = (image.astype(np.float32) - 128).astype(np.int8)
input_data = np.expand_dims(input_data, axis=0)
# Run inference on Edge TPU
interpreter.set_tensor(input_details[0]['index'], input_data)
interpreter.invoke()
# Get output [1, 12, 2100] - YOLOv8 format
output_data = interpreter.get_tensor(output_details[0]['index'])
# Post-process with NMS
return yolo_postprocess(output_data, conf_thresh, nms_thresh)
YOLO Post-Processing
- Transpose output from [1, 12, 2100] to [2100, 12]
- Extract confidence scores (max across classes)
- Filter by confidence threshold (0.5)
- Convert center-based boxes to corner format
- Apply Non-Maximum Suppression (NMS) with threshold 0.4
- Normalize coordinates to [0, 1] range
6. Detection Recording
When a pothole is detected (confidence > 0.5):
def save_detection(frame, box, score):
# Save raw frame (no bbox drawing for performance)
filename = f"detect_{timestamp}_{score:.2f}.jpg"
# Convert RGB to BGR for OpenCV
out_img = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
cv2.imwrite(filename, out_img)
# Save detection metadata separately
metadata = {
'bbox': box, # [ymin, xmin, ymax, xmax]
'score': float(score),
'timestamp': timestamp
}
Why no bounding box visualization?
Drawing rectangles and text with OpenCV (cv2.rectangle(), cv2.putText()) is computationally expensive on the Raspberry Pi Zero 2 WH:
- Image copying:
frame.copy()duplicates memory - Rectangle drawing: Anti-aliasing and pixel manipulation
- Text rendering: Font rasterization is CPU-intensive
- Total overhead: ~50-100ms per detection on Pi Zero 2
This overhead would cause:
- Frame drops in video recording
- Delayed inference on subsequent frames
- System instability during multiple simultaneous detections
Solution: Save raw images during flight, add annotations during post-processing on a laptop/desktop.
Code Walkthrough
Configuration Section
SERIAL_PORT = '/dev/ttyS0' # Raspberry Pi UART
BAUD_RATE = 115200 # Betaflight default
VIDEO_PATH = "/home/tpu/Videos/" # Output directory
MODEL_PATH = "/home/tpu/drone_script/best_int8.tflite"
AUX_CHANNEL_INDEX = 8 # AUX8 = index 8
TRIGGER_VALUE = 1500 # PWM threshold
CONFIDENCE_THRESHOLD = 0.5 # Min detection confidence
NMS_THRESHOLD = 0.4 # NMS IoU threshold
Serial Communication
# Initialize serial port
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=0.1)
# MSP command structure: $ M < size command checksum
MSP_RC_REQUEST = b'$M<\x00\x69\x69'
def get_rc_channels():
"""Read RC channel values via MSP protocol"""
ser.reset_input_buffer()
ser.write(MSP_RC_REQUEST)
# Parse response: $ M > size command payload checksum
header = ser.read(5)
if header[:3] != b'$M>':
return None
size = header[3]
payload = ser.read(size)
checksum = ser.read(1)
# Unpack 16-bit channel values
count = size // 2
return struct.unpack('<' + 'H'*count, payload)
AI Model Setup
# Load model with Edge TPU delegate
interpreter = Interpreter(
model_path=MODEL_PATH,
experimental_delegates=[load_delegate('libedgetpu.so.1')]
)
interpreter.allocate_tensors()
# Get model metadata
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
_, input_height, input_width, _ = input_details[0]['shape']
input_type = input_details[0]['dtype'] # np.int8 for quantized
Camera Configuration
picam2 = Picamera2()
config = picam2.create_video_configuration(
main={"size": (1920, 1080), "format": "YUV420"},
lores={"size": (input_width, input_height), "format": "YUV420"},
buffer_count=3 # Triple buffering
)
picam2.configure(config)
picam2.start()
Recording Control
encoder = H264Encoder()
recording = False
current_filename = ""
# Start recording
unique_id = str(uuid.uuid4())[:8]
current_filename = f"{VIDEO_PATH}flight_{timestamp}_{unique_id}.h264"
picam2.start_recording(encoder, current_filename)
recording = True
# Stop recording
picam2.stop_recording()
recording = False
Detection Saving (Performance Optimized)
def save_detection(frame, box, score):
"""Save raw detection without visualization overhead"""
# Convert RGB to BGR (OpenCV format)
out_img = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Save immediately without drawing
filename = f"{VIDEO_PATH}detect_{int(time.time())}_{score:.2f}.jpg"
cv2.imwrite(filename, out_img)
print(f"[AI] Pothole detected! Saved: {filename}")
# Note: cv2.rectangle() and cv2.putText() intentionally omitted
# Drawing adds 50-100ms overhead on Pi Zero 2 WH
Configuration Options
Serial Communication
| Parameter | Value | Description |
|---|---|---|
SERIAL_PORT |
/dev/ttyS0 |
Raspberry Pi UART port |
BAUD_RATE |
115200 |
Must match INAV MSP settings |
To change serial port:
# Enable UART on GPIO 14/15
sudo raspi-config
# Interface Options → Serial Port → Disable login shell, Enable hardware
Recording Trigger
| Parameter | Default | Description |
|---|---|---|
AUX_CHANNEL_INDEX |
8 |
RC channel for recording control (0-indexed) |
TRIGGER_VALUE |
1500 |
PWM value threshold (1000-2000) |
To use different channel:
- Channels are 0-indexed: AUX1=4, AUX2=5, …, AUX8=11
- Configure in INAV Configurator: Modes tab → Set switch position
AI Detection
| Parameter | Default | Description |
|---|---|---|
MODEL_PATH |
best_int8.tflite |
Path to TFLite model |
CONFIDENCE_THRESHOLD |
0.5 |
Minimum detection confidence (0.0-1.0) |
NMS_THRESHOLD |
0.4 |
Non-Maximum Suppression IoU threshold |
To adjust sensitivity:
- Lower
CONFIDENCE_THRESHOLD(e.g., 0.3) → More detections, more false positives - Higher
CONFIDENCE_THRESHOLD(e.g., 0.7) → Fewer detections, more reliable
Video Settings
| Parameter | Default | Description |
|---|---|---|
VIDEO_PATH |
/home/tpu/Videos/ |
Output directory |
| Main resolution | 1920x1080 |
Recording quality |
| Inference resolution | 320x320 |
Model input size |
To change video quality:
config = picam2.create_video_configuration(
main={"size": (1280, 720), "format": "YUV420"}, # 720p
# or
main={"size": (2592, 1944), "format": "YUV420"}, # Max resolution
)
Usage Instructions
Installation
1. Install dependencies:
sudo apt update
sudo apt install -y python3-opencv python3-serial
# Install TFLite runtime
pip3 install tflite-runtime
# Install Edge TPU library
echo "deb https://packages.cloud.google.com/apt coral-edgetpu-stable main" | \
sudo tee /etc/apt/sources.list.d/coral-edgetpu.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt update
sudo apt install libedgetpu1-std python3-pycoral
# Install Picamera2
sudo apt install -y python3-picamera2
2. Enable UART:
sudo raspi-config
# Interface Options → Serial Port
# - Disable login shell over serial
# - Enable serial port hardware
sudo reboot
3. Connect hardware:
- Flight controller TX → Pi GPIO 15 (RXD)
- Flight controller RX → Pi GPIO 14 (TXD)
- Flight controller GND → Pi GND
- Camera module → CSI port
- Coral TPU → USB port
4. Prepare model:
# Copy your trained model
sudo mkdir -p /home/tpu/drone_script
sudo cp best_int8.tflite /home/tpu/drone_script/
# Create video directory
sudo mkdir -p /home/tpu/Videos
sudo chmod 777 /home/tpu/Videos
Running the Script
Manual start:
cd /path/to/script
python3 camera_control.py
Run on boot (systemd service):
sudo nano /etc/systemd/system/camera-control.service
Add:
[Unit]
Description=AI Drone Camera Control
After=network.target
[Service]
Type=simple
User=pi
WorkingDirectory=/home/pi
ExecStart=/usr/bin/python3 /home/pi/camera_control.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
Enable service:
sudo systemctl daemon-reload
sudo systemctl enable camera-control.service
sudo systemctl start camera-control.service
# Check status
sudo systemctl status camera-control.service
# View logs
journalctl -u camera-control.service -f
Operation
-
Power on the drone
- Script auto-starts (if configured as service)
- Wait for “Ready. Waiting for RC switch…” message
-
Arm the drone and take off
- Normal flight operation
-
Enable recording
- Flip AUX8 switch to ON position (>1500)
- Console shows:
[REC] Starting: flight_<timestamp>_<uuid>.h264 - AI detection begins automatically
-
Fly over target area
- Video records to main stream
- AI processes lores stream in real-time
- Detections saved as raw images:
detect_<timestamp>_<score>.jpg - Bounding boxes NOT drawn to save processing resources
-
Disable recording
- Flip AUX8 switch to OFF position (<1500)
- Console shows:
[REC] Stopping. - Video file saved
-
Land and disarm
- Power off drone
- Retrieve video files from
/home/tpu/Videos/
Output Files
Video recordings:
flight_1705232145_a3f7d89c.h264
- Timestamp: Unix epoch time
- UUID: 8-character unique ID
- Format: H.264 elementary stream
Detection images:
detect_1705232156_0.87.jpg
- Timestamp: Unix epoch time
- Score: Detection confidence (0.00-1.00)
- Format: Raw JPEG (no bounding box overlay for performance)
- Bounding box coordinates stored in memory during flight
Convert H.264 to MP4:
ffmpeg -framerate 30 -i flight_1705232145_a3f7d89c.h264 \
-c copy output.mp4
Troubleshooting
Serial Communication Issues
Problem: No RC channels received
# Check serial port
ls -l /dev/ttyS0
# Should show: crw-rw---- 1 root dialout
# Add user to dialout group
sudo usermod -a -G dialout $USER
sudo reboot
# Test serial connection
sudo apt install minicom
minicom -D /dev/ttyS0 -b 115200
Problem: Wrong channel values
- Verify
BAUD_RATEmatches INAV configuration - Check flight controller MSP port configuration (INAV Configurator → Ports tab)
- Try different UART pins (GPIO 14/15)
Camera Issues
Problem: Camera not detected
# Check camera connection
vcgencmd get_camera
# Should show: supported=1 detected=1
# Enable camera
sudo raspi-config
# Interface Options → Legacy Camera → Enable
sudo reboot
# Test camera
libcamera-hello
Problem: Frame drops or stuttering
- Reduce main stream resolution to 720p
- Increase
buffer_countto 4 or 5 - Lower inference frequency (skip frames)
AI Model Issues
Problem: Edge TPU not found
# Check USB connection
lsusb | grep "Google"
# Should show: "Global Unichip Corp."
# Reinstall driver
sudo apt install --reinstall libedgetpu1-std
# Check library
python3 -c "from tflite_runtime.interpreter import load_delegate; \
print(load_delegate('libedgetpu.so.1'))"
Problem: Low detection accuracy
- Adjust
CONFIDENCE_THRESHOLD(try 0.3-0.7) - Retrain model with more diverse dataset
- Check lighting conditions (model trained on)
- Verify model quantization didn’t lose accuracy
Problem: Model loading fails
# Verify model file
ls -lh /home/tpu/drone_script/best_int8.tflite
# Check model format
python3 -c "from tflite_runtime.interpreter import Interpreter; \
print(Interpreter(model_path='best_int8.tflite'))"
# Ensure model is Edge TPU compiled
# File should have _edgetpu suffix or compiled with edgetpu_compiler
Recording Issues
Problem: Video files not created
# Check directory permissions
ls -ld /home/tpu/Videos
sudo chmod 777 /home/tpu/Videos
# Check disk space
df -h
# Monitor for errors
python3 camera_control.py 2>&1 | tee debug.log
Problem: Video playback issues
# Convert to MP4
ffmpeg -framerate 30 -i input.h264 -c copy output.mp4
# Or transcode
ffmpeg -i input.h264 -c:v libx264 -preset fast output.mp4
Advanced Features
Post-Processing: Add Bounding Boxes (Off-drone)
Since bounding boxes aren’t drawn during flight for performance reasons, you can add them during post-processing on a more powerful computer:
import cv2
import json
import glob
def add_bboxes_postprocessing(image_dir, metadata_file):
"""Draw bounding boxes on images after flight"""
# Load detection metadata
with open(metadata_file, 'r') as f:
detections = json.load(f)
for detection in detections:
img_path = detection['image']
box = detection['bbox'] # [ymin, xmin, ymax, xmax]
score = detection['score']
# Load image
img = cv2.imread(img_path)
h, w = img.shape[:2]
# Draw bbox
ymin, xmin, ymax, xmax = box
start = (int(xmin * w), int(ymin * h))
end = (int(xmax * w), int(ymax * h))
cv2.rectangle(img, start, end, (0, 0, 255), 2)
# Add label
label = f"Pothole: {score:.2f}"
cv2.putText(img, label, (start[0], start[1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
# Save annotated version
output = img_path.replace('.jpg', '_annotated.jpg')
cv2.imwrite(output, img)
print(f"Saved: {output}")
# Usage after flight
add_bboxes_postprocessing('/home/tpu/Videos/', 'detections.json')
Add GPS Coordinates to Detections
import serial
import pynmea2
# Open GPS serial port
gps_ser = serial.Serial('/dev/ttyAMA1', 9600, timeout=1)
def get_gps_position():
"""Read current GPS position"""
line = gps_ser.readline().decode('ascii', errors='replace')
if line.startswith('$GPGGA'):
msg = pynmea2.parse(line)
return msg.latitude, msg.longitude, msg.altitude
return None, None, None
def save_detection_with_gps(frame, box, score):
"""Save detection with GPS metadata (no bbox drawing for performance)"""
lat, lon, alt = get_gps_position()
# Save raw image
filename = f"detect_{time.time()}_{score:.2f}.jpg"
out_img = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
cv2.imwrite(filename, out_img)
# Save metadata
metadata = {
'timestamp': time.time(),
'score': float(score),
'bbox': box,
'gps': {'lat': lat, 'lon': lon, 'alt': alt}
}
json_file = filename.replace('.jpg', '.json')
with open(json_file, 'w') as f:
json.dump(metadata, f, indent=2)
Multiple Detection Classes
# Update labels
LABELS = ['pothole', 'crack', 'manhole', 'patch']
def yolo_postprocess(output_data, conf_thresh, nms_thresh):
"""Support multiple classes"""
predictions = np.transpose(output_data[0])
# Get class with max score for each detection
class_ids = np.argmax(predictions[:, 4:], axis=1)
scores = np.max(predictions[:, 4:], axis=1)
# Rest of processing...
final_classes.append(int(class_ids[i]))
return final_boxes, final_classes, final_scores
def save_detection(frame, box, score, class_id):
"""Save raw image with metadata (no visual overlay)"""
filename = f"detect_{time.time()}_{score:.2f}_{LABELS[class_id]}.jpg"
out_img = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
cv2.imwrite(filename, out_img)
# Save metadata with class info to JSON...
Frame Skipping for Performance
frame_counter = 0
INFERENCE_INTERVAL = 3 # Run inference every 3rd frame
if recording:
frame_counter += 1
if frame_counter % INFERENCE_INTERVAL == 0:
# Run inference
yuv_frame = picam2.capture_array("lores")
# Process...
Telemetry Logging
import csv
telemetry_file = None
telemetry_writer = None
def start_telemetry_log(filename):
"""Start logging telemetry data"""
global telemetry_file, telemetry_writer
telemetry_file = open(filename.replace('.h264', '.csv'), 'w')
telemetry_writer = csv.writer(telemetry_file)
telemetry_writer.writerow([
'timestamp', 'throttle', 'roll', 'pitch', 'yaw',
'detections', 'inference_time_ms'
])
def log_telemetry(channels, detections, inference_time):
"""Log telemetry row"""
if telemetry_writer:
telemetry_writer.writerow([
time.time(),
channels[2], # Throttle
channels[0], # Roll
channels[1], # Pitch
channels[3], # Yaw
len(detections),
inference_time * 1000
])
Next Steps
- AI Model Training - Train custom detection models
- Hardware Setup - Complete system integration
- Flight Testing - First flight procedures
Reference
MSP Protocol Resources
Picamera2 Documentation
Edge TPU Resources
| ← Back to Software | ← Back to Home |