news 2026/4/16 4:58:07

检测前方障碍物距离,当距离过近时语音报警,避免视障用户碰撞受伤。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
检测前方障碍物距离,当距离过近时语音报警,避免视障用户碰撞受伤。

视障人士防撞预警系统

一、实际应用场景与痛点

应用场景

视障用户李先生在户外使用导盲杖行走。虽然导盲杖能探测地面障碍,但无法检测空中障碍物(如低垂树枝、广告牌、打开的窗户)和前方较远障碍。在一次行走中,他不慎撞到低矮的伸缩门,导致头部受伤。他急需一款能实时探测前方障碍物并语音预警的智能辅助设备。

核心痛点

1. 探测盲区:导盲杖主要探测地面,对空中和前方障碍探测不足

2. 反应延迟:发现障碍时已距离过近,来不及避让

3. 预警不直观:传统振动反馈无法表达障碍物的距离和方位

4. 环境适应性差:不同光照、天气条件下探测准确性下降

5. 操作复杂:多设备配合使用对视障用户不友好

6. 续航问题:长时间户外使用需要持续供电

二、核心逻辑设计

1. 初始化传感器(超声波/雷达/深度摄像头)

2. 实时扫描前方区域,获取距离数据

3. 处理原始数据,滤除噪声干扰

4. 划分安全区域(近、中、远距离警告区)

5. 根据距离计算危险等级

6. 生成多级别语音警报

7. 通过骨传导耳机输出警报

8. 记录行走路径和障碍信息

三、模块化代码实现

主程序文件:obstacle_detection_system.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

视障人士防撞预警系统

实时检测前方障碍物,语音预警避免碰撞

版本:2.0.0

作者:无障碍智能助手

"""

import os

import sys

import time

import threading

import queue

import json

import math

from datetime import datetime

from typing import Dict, List, Tuple, Optional, Any

from dataclasses import dataclass, asdict

from enum import Enum

import warnings

warnings.filterwarnings('ignore')

# 硬件接口

try:

import RPi.GPIO as GPIO

RASPBERRY_PI_AVAILABLE = True

except ImportError:

RASPBERRY_PI_AVAILABLE = False

print("提示: 非树莓派平台,使用模拟传感器")

try:

import numpy as np

NUMPY_AVAILABLE = True

except ImportError:

NUMPY_AVAILABLE = False

print("警告: numpy未安装,部分功能受限")

# 音频处理

try:

import pyaudio

import wave

import simpleaudio as sa

AUDIO_AVAILABLE = True

except ImportError:

AUDIO_AVAILABLE = False

print("警告: 音频模块未安装,语音提示功能受限")

# 语音合成

try:

import pyttsx3

TTS_AVAILABLE = True

except ImportError:

TTS_AVAILABLE = False

print("警告: 语音合成未安装,使用蜂鸣器提示")

class AlertLevel(Enum):

"""警报级别枚举"""

SAFE = 0 # 安全

CAUTION = 1 # 注意

WARNING = 2 # 警告

DANGER = 3 # 危险

CRITICAL = 4 # 紧急

@dataclass

class DetectionZone:

"""探测区域定义"""

name: str

angle_start: float # 起始角度(度)

angle_end: float # 结束角度(度)

distance_min: float # 最小探测距离(米)

distance_max: float # 最大探测距离(米)

alert_thresholds: Dict[AlertLevel, float] # 各级别警报阈值

@dataclass

class Obstacle:

"""障碍物信息"""

timestamp: float

distance: float # 距离(米)

angle: float # 角度(度)

width: float # 宽度(米,估计值)

height: float # 高度(米,估计值)

confidence: float # 置信度

alert_level: AlertLevel

position_x: float # 相对位置X

position_y: float # 相对位置Y

class SensorInterface:

"""传感器接口基类"""

def __init__(self, config: Dict):

"""

初始化传感器

Args:

config: 传感器配置

"""

self.config = config

self.is_connected = False

self.calibration_data = {}

def connect(self) -> bool:

"""连接传感器"""

raise NotImplementedError

def disconnect(self) -> None:

"""断开连接"""

raise NotImplementedError

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取指定角度的距离

Args:

angle: 探测角度(度)

Returns:

距离(米),None表示无效

"""

raise NotImplementedError

def scan_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

raise NotImplementedError

def calibrate(self) -> Dict:

"""校准传感器"""

raise NotImplementedError

class UltrasonicSensor(SensorInterface):

"""超声波传感器实现"""

def __init__(self, config: Dict):

"""

初始化超声波传感器

Args:

config: 配置字典

trigger_pin: 触发引脚

echo_pin: 回声引脚

max_distance: 最大探测距离

temperature: 环境温度(用于声速校准)

"""

super().__init__(config)

self.trigger_pin = config.get('trigger_pin', 23)

self.echo_pin = config.get('echo_pin', 24)

self.max_distance = config.get('max_distance', 4.0) # 米

self.temperature = config.get('temperature', 25.0) # 摄氏度

# 计算声速(米/秒)

self.sound_speed = 331.3 + 0.606 * self.temperature

# 伺服控制(如果支持扫描)

self.servo_pin = config.get('servo_pin')

self.current_angle = 0.0

self.setup_pins()

def setup_pins(self):

"""设置GPIO引脚"""

if RASPBERRY_PI_AVAILABLE:

GPIO.setmode(GPIO.BCM)

GPIO.setup(self.trigger_pin, GPIO.OUT)

GPIO.setup(self.echo_pin, GPIO.IN)

GPIO.output(self.trigger_pin, GPIO.LOW)

if self.servo_pin:

GPIO.setup(self.servo_pin, GPIO.OUT)

self.servo = GPIO.PWM(self.servo_pin, 50) # 50Hz PWM

self.servo.start(0)

def connect(self) -> bool:

"""连接传感器"""

try:

# 测试传感器

distance = self.get_distance()

if distance and 0 < distance <= self.max_distance:

self.is_connected = True

print("超声波传感器连接成功")

return True

except Exception as e:

print(f"超声波传感器连接失败: {e}")

self.is_connected = False

return False

def disconnect(self) -> None:

"""断开连接"""

if RASPBERRY_PI_AVAILABLE:

if hasattr(self, 'servo'):

self.servo.stop()

GPIO.cleanup()

self.is_connected = False

def set_angle(self, angle: float) -> bool:

"""

设置伺服角度

Args:

angle: 目标角度(-90到90度)

Returns:

是否成功

"""

if not self.servo_pin or not RASPBERRY_PI_AVAILABLE:

return False

# 限制角度范围

angle = max(-90, min(90, angle))

# 转换为PWM占空比(假设伺服角度范围:0.5ms-2.5ms对应0-180度)

duty_cycle = 2.5 + (angle + 90) * (10.0 / 180.0) # 映射到2.5-12.5%

self.servo.ChangeDutyCycle(duty_cycle)

time.sleep(0.1) # 给伺服时间转动

self.servo.ChangeDutyCycle(0) # 停止PWM信号防止抖动

self.current_angle = angle

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取指定角度的距离

Args:

angle: 探测角度

Returns:

距离(米),None表示无效

"""

if not self.is_connected and not self.connect():

return None

# 设置角度

if angle != self.current_angle:

self.set_angle(angle)

if RASPBERRY_PI_AVAILABLE:

try:

# 发送触发脉冲

GPIO.output(self.trigger_pin, GPIO.HIGH)

time.sleep(0.00001) # 10微秒

GPIO.output(self.trigger_pin, GPIO.LOW)

# 等待回声引脚变高

start_time = time.time()

while GPIO.input(self.echo_pin) == GPIO.LOW:

if time.time() - start_time > 0.1: # 超时100ms

return None

start_time = time.time()

pulse_start = time.time()

# 等待回声引脚变低

while GPIO.input(self.echo_pin) == GPIO.HIGH:

pulse_end = time.time()

if pulse_end - pulse_start > 0.1: # 超时100ms

return None

# 计算距离

pulse_duration = pulse_end - pulse_start

distance = (pulse_duration * self.sound_speed) / 2

# 有效性检查

if 0.02 <= distance <= self.max_distance: # 2cm到最大距离

return distance

except Exception as e:

print(f"超声波测距错误: {e}")

else:

# 模拟模式,用于测试

import random

time.sleep(0.05) # 模拟测量延迟

# 模拟障碍物:距离随角度变化

base_distance = 2.0

angle_factor = 1.0 + abs(angle) / 90.0 # 角度越大,距离越小

noise = random.uniform(-0.1, 0.1)

distance = base_distance / angle_factor + noise

distance = max(0.1, min(self.max_distance, distance))

return distance

return None

def scan_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

measurements = []

if start_angle > end_angle:

start_angle, end_angle = end_angle, start_angle

current_angle = start_angle

while current_angle <= end_angle:

distance = self.get_distance(current_angle)

if distance:

measurements.append((current_angle, distance))

current_angle += step

time.sleep(0.05) # 防止扫描过快

return measurements

def calibrate(self) -> Dict:

"""校准传感器"""

print("开始超声波传感器校准...")

calibration_results = {

'temperature': self.temperature,

'sound_speed': self.sound_speed,

'measurements': []

}

# 测量已知距离

test_distances = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]

for known_distance in test_distances:

print(f"请将障碍物放置在 {known_distance} 米处,按Enter继续...")

input()

measurements = []

for _ in range(5):

distance = self.get_distance(0)

if distance:

measurements.append(distance)

time.sleep(0.1)

if measurements:

avg_measured = sum(measurements) / len(measurements)

error = avg_measured - known_distance

calibration_results['measurements'].append({

'known': known_distance,

'measured': avg_measured,

'error': error,

'error_percent': (error / known_distance) * 100

})

print(f"已知距离: {known_distance:.2f}m, 测量: {avg_measured:.2f}m, "

f"误差: {error:.3f}m ({error/known_distance*100:.1f}%)")

# 计算校准系数

if calibration_results['measurements']:

total_error = sum(m['error'] for m in calibration_results['measurements'])

avg_error = total_error / len(calibration_results['measurements'])

calibration_results['calibration_offset'] = -avg_error

calibration_results['calibration_factor'] = 1.0

print(f"校准完成,建议偏移量: {-avg_error:.3f}m")

return calibration_results

class RadarSensor(SensorInterface):

"""毫米波雷达传感器(模拟)"""

def __init__(self, config: Dict):

super().__init__(config)

# 雷达特定初始化

self.resolution = config.get('resolution', 0.05) # 距离分辨率

self.fov = config.get('fov', 120) # 视场角

self.connected = False

def connect(self) -> bool:

"""连接雷达"""

# 模拟连接

time.sleep(0.5)

self.connected = True

print("雷达传感器连接成功(模拟)")

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""雷达测距"""

if not self.connected:

return None

# 模拟雷达数据

time.sleep(0.01)

return 1.5 + 0.5 * math.sin(time.time()) # 模拟动态距离

class DepthCameraSensor(SensorInterface):

"""深度摄像头传感器(模拟)"""

def __init__(self, config: Dict):

super().__init__(config)

self.width = config.get('width', 640)

self.height = config.get('height', 480)

self.connected = False

def connect(self) -> bool:

"""连接深度摄像头"""

# 模拟连接

time.sleep(0.5)

self.connected = True

print("深度摄像头连接成功(模拟)")

return True

def get_distance(self, angle: float = 0.0) -> Optional[float]:

"""深度摄像头测距"""

if not self.connected:

return None

# 模拟深度数据

time.sleep(0.02)

return 2.0 + 0.3 * math.cos(time.time() * 2) # 模拟动态距离

class SensorManager:

"""传感器管理器(支持多传感器融合)"""

def __init__(self, config: Dict):

"""

初始化传感器管理器

Args:

config: 传感器配置

"""

self.config = config

self.sensors = {}

self.data_history = []

self.history_size = 100

# 初始化传感器

self.init_sensors()

def init_sensors(self):

"""初始化所有传感器"""

sensor_configs = self.config.get('sensors', {})

# 超声波传感器

if 'ultrasonic' in sensor_configs and sensor_configs['ultrasonic'].get('enabled', True):

try:

self.sensors['ultrasonic'] = UltrasonicSensor(

sensor_configs['ultrasonic']

)

print("超声波传感器初始化完成")

except Exception as e:

print(f"超声波传感器初始化失败: {e}")

# 雷达传感器

if 'radar' in sensor_configs and sensor_configs['radar'].get('enabled', False):

try:

self.sensors['radar'] = RadarSensor(sensor_configs['radar'])

print("雷达传感器初始化完成")

except Exception as e:

print(f"雷达传感器初始化失败: {e}")

# 深度摄像头

if 'depth_camera' in sensor_configs and sensor_configs['depth_camera'].get('enabled', False):

try:

self.sensors['depth_camera'] = DepthCameraSensor(sensor_configs['depth_camera'])

print("深度摄像头初始化完成")

except Exception as e:

print(f"深度摄像头初始化失败: {e}")

if not self.sensors:

print("警告: 没有可用的传感器,使用模拟传感器")

self.sensors['simulated'] = UltrasonicSensor({

'trigger_pin': None,

'echo_pin': None,

'max_distance': 4.0

})

def connect_all(self) -> bool:

"""连接所有传感器"""

connected = False

for name, sensor in self.sensors.items():

if sensor.connect():

connected = True

print(f"{name}传感器连接成功")

else:

print(f"{name}传感器连接失败")

return connected

def get_fused_distance(self, angle: float = 0.0) -> Optional[float]:

"""

获取融合后的距离数据

Args:

angle: 探测角度

Returns:

融合后的距离

"""

distances = []

weights = []

for name, sensor in self.sensors.items():

if sensor.is_connected:

distance = sensor.get_distance(angle)

if distance:

distances.append(distance)

# 根据传感器类型设置权重

if name == 'ultrasonic':

weight = 0.6

elif name == 'radar':

weight = 0.8

elif name == 'depth_camera':

weight = 1.0

else:

weight = 0.5

weights.append(weight)

if not distances:

return None

# 加权平均

weighted_sum = sum(d * w for d, w in zip(distances, weights))

total_weight = sum(weights)

fused_distance = weighted_sum / total_weight

# 记录到历史数据

self.data_history.append({

'timestamp': time.time(),

'angle': angle,

'distance': fused_distance,

'sensor_readings': distances

})

# 限制历史数据大小

if len(self.data_history) > self.history_size:

self.data_history.pop(0)

return fused_distance

def scan_fused_sector(self, start_angle: float, end_angle: float,

step: float = 1.0) -> List[Tuple[float, float]]:

"""

扫描融合扇形区域

Args:

start_angle: 起始角度

end_angle: 结束角度

step: 扫描步长

Returns:

角度-距离对列表

"""

measurements = []

if start_angle > end_angle:

start_angle, end_angle = end_angle, start_angle

# 使用主传感器扫描

main_sensor = next(iter(self.sensors.values()))

current_angle = start_angle

while current_angle <= end_angle:

distance = self.get_fused_distance(current_angle)

if distance:

measurements.append((current_angle, distance))

current_angle += step

return measurements

def disconnect_all(self):

"""断开所有传感器"""

for sensor in self.sensors.values():

sensor.disconnect()

class ObstacleDetector:

"""障碍物检测器"""

def __init__(self, config: Dict):

"""

初始化障碍物检测器

Args:

config: 检测器配置

"""

self.config = config

self.zones = self.setup_detection_zones()

self.obstacle_history = []

self.history_size = 50

# 滤波参数

self.filter_window = config.get('filter_window', 5)

self.distance_history = {}

def setup_detection_zones(self) -> List[DetectionZone]:

"""设置探测区域"""

zones_config = self.config.get('zones', [

{

'name': 'center',

'angle_start': -15,

'angle_end': 15,

'distance_min': 0.1,

'distance_max': 4.0,

'alert_thresholds': {

AlertLevel.SAFE: 2.0,

AlertLevel.CAUTION: 1.5,

AlertLevel.WARNING: 1.0,

AlertLevel.DANGER: 0.5,

AlertLevel.CRITICAL: 0.3

}

},

{

'name': 'left',

'angle_start': -60,

'angle_end': -15,

'distance_min': 0.1,

'distance_max': 3.0,

'alert_thresholds': {

AlertLevel.SAFE: 1.5,

AlertLevel.CAUTION: 1.0,

AlertLevel.WARNING: 0.7,

AlertLevel.DANGER: 0.4,

AlertLevel.CRITICAL: 0.25

}

},

{

'name': 'right',

'angle_start': 15,

'angle_end': 60,

'distance_min': 0.1,

'distance_max': 3.0,

'alert_thresholds': {

AlertLevel.SAFE: 1.5,

AlertLevel.CAUTION: 1.0,

AlertLevel.WARNING: 0.7,

AlertLevel.DANGER: 0.4,

AlertLevel.CRITICAL: 0.25

}

}

])

zones = []

for zone_config in zones_config:

thresholds = {

AlertLevel[key.upper()]: value

for key, value in zone_config['alert_thresholds'].items()

}

zone = DetectionZone(

name=zone_config['name'],

angle_start=zone_config['angle_start'],

angle_end=zone_config['angle_end'],

distance_min=zone_config['distance_min'],

distance_max=zone_config['distance_max'],

alert_thresholds=thresholds

)

zones.append(zone)

return zones

def apply_filter(self, angle: float, distance: float) -> float:

"""

应用滤波器去除噪声

Args:

如果你觉得这个工具好用,欢迎关注我!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/14 12:28:55

LaTeX公式自动校对:VibeThinker辅助数学文档编写

LaTeX公式自动校对&#xff1a;VibeThinker辅助数学文档编写 在撰写学术论文、算法推导或竞赛讲义时&#xff0c;你是否曾因一个不等式方向写反、某个引理遗漏而被审稿人反复质疑&#xff1f;又或者&#xff0c;在完成一页看似严谨的LaTeX推导后&#xff0c;自己回看时却发现“…

作者头像 李华
网站建设 2026/4/14 12:43:44

Docker容器启动后立即停止?破解Exited (0)状态之谜(附8种解决方案)

第一章&#xff1a;Docker容器运行状态概述Docker 容器在其生命周期中会经历多种运行状态&#xff0c;这些状态反映了容器当前所处的执行阶段。了解这些状态有助于快速诊断问题、优化资源调度以及实现自动化运维管理。容器的主要运行状态 created&#xff1a;容器已创建但尚未启…

作者头像 李华
网站建设 2026/4/12 15:36:32

Codeforces模拟赛AI辅助:VibeThinker提供算法策略建议

VibeThinker&#xff1a;小模型如何在算法竞赛中实现“降维打击” 在一场紧张的Codeforces模拟赛中&#xff0c;你卡在了一道Div.2 C题——树上每个节点都有颜色&#xff0c;要求统计每棵子树中不同颜色的数量。时间一分一秒流逝&#xff0c;思路迟迟无法成型。这时&#xff0c…

作者头像 李华
网站建设 2026/3/27 12:06:54

(Docker Compose版本兼容性全解析):从开发到部署的避坑手册

第一章&#xff1a;Docker Compose版本适配概述在使用 Docker Compose 管理多容器应用时&#xff0c;不同版本的 Compose 文件格式与 Docker 引擎之间存在兼容性要求。正确选择并适配 Compose 版本&#xff0c;是确保应用顺利部署和运行的关键前提。版本兼容性说明 Docker Comp…

作者头像 李华
网站建设 2026/3/26 5:38:55

零基础也能懂:全加器布尔表达式解析

从零开始搞懂全加器&#xff1a;不只是“112”的背后逻辑你有没有想过&#xff0c;计算机到底是怎么算数的&#xff1f;我们随手敲下5 3&#xff0c;屏幕立刻显示8。这看似简单的过程&#xff0c;其实背后藏着一套精密的数字电路机制——而这一切的起点&#xff0c;就是全加器…

作者头像 李华
网站建设 2026/4/14 11:05:04

Bilibili科普视频创意:用动画讲解VibeThinker技术亮点

VibeThinker-1.5B&#xff1a;小模型如何破解高难度数学与编程题&#xff1f; 在AI狂飙突进的今天&#xff0c;千亿参数大模型似乎成了“智能”的代名词。但你有没有想过——一个只有15亿参数的小模型&#xff0c;也能解出AIME&#xff08;美国数学邀请赛&#xff09;级别的难题…

作者头像 李华