地铁客流智能预警与引导系统
一、实际应用场景与痛点
应用场景
现代化都市地铁系统的客流管理:
1. 早晚高峰时段:
- 通勤高峰期(7:00-9:00, 17:00-19:00)
- 节假日前夕
- 大型活动散场时段
2. 关键站点:
- 交通枢纽站(火车站、机场、长途汽车站)
- 商业中心站
- 换乘站
- 旅游景点站
3. 车厢分布不均:
- 车头车尾拥挤,中间车厢较空
- 不同线路车厢拥挤度差异
- 不同时间段客流变化
4. 特殊事件:
- 临时封站
- 设备故障
- 恶劣天气
- 疫情防控需求
痛点分析
1. 乘车体验差:
- 高峰期拥挤不堪,舒适度低
- 车厢内空气不流通
- 站立空间不足,安全隐患
- 上下车困难
2. 运营效率低:
- 站台客流积压
- 列车停站时间延长
- 乘客上车困难导致延误
- 线路运行不均衡
3. 安全隐患:
- 超载运行风险
- 紧急疏散困难
- 拥挤踩踏风险
- 疫情防控难度大
4. 信息不对称:
- 乘客不知前方车厢拥挤情况
- 无法预知换乘站客流
- 缺少实时引导信息
- 乘客选择盲目
5. 资源浪费:
- 部分车厢过度拥挤
- 部分车厢空间闲置
- 能源消耗不均衡
- 设备利用率低
二、核心逻辑讲解
系统架构
基于"感知-预测-决策-引导"的闭环控制系统:
数据采集层 → 数据处理层 → 智能分析层 → 决策控制层 → 执行引导层
│ │ │ │ │
传感器网络 数据融合 客流预测 调度决策 乘客引导
视频监控 异常检测 拥挤评估 预警分级 信息发布
票务系统 特征提取 趋势分析 资源分配 动态调节
时空关联 模式识别 优化算法 反馈调整
核心算法原理
1. 多源数据融合模型
综合拥挤度 = w1×视频分析 + w2×传感器 + w3×票务 + w4×WIFI
其中:w1+w2+w3+w4=1,动态调整权重
2. 时空客流预测模型
- 时间维度:分钟、小时、日、周、季节
- 空间维度:车站、车厢、通道、出入口
- 影响因素:天气、节假日、大型活动、突发事件
3. 智能预警分级
等级1(绿色):舒适 ≤ 2人/㎡
等级2(黄色):较舒适 2-4人/㎡
等级3(橙色):拥挤 4-6人/㎡
等级4(红色):严重拥挤 >6人/㎡
等级5(黑色):危险 >8人/㎡
4. 动态引导优化
目标函数:min(∑拥挤度方差) + α×∑换乘时间 + β×∑等车时间
约束条件:安全容量、舒适度阈值、运营规范
优化目标函数
总效益 = α·乘客舒适度 + β·运营效率 + γ·安全系数 - δ·引导成本
约束条件:乘客等待时间 < 阈值,换乘便利性 > 阈值,能耗 < 限额
三、模块化代码实现
项目结构
smart_subway_crowd/
│
├── main.py # 主程序入口
├── config.py # 配置文件
├── requirements.txt # 依赖包
├── setup.py # 安装脚本
│
├── core/ # 核心算法模块
│ ├── __init__.py
│ ├── data_fusion.py # 数据融合模块
│ ├── crowd_predictor.py # 客流预测模块
│ ├── alert_engine.py # 预警引擎
│ ├── guidance_optimizer.py # 引导优化器
│ └── performance_analyzer.py # 性能分析
│
├── sensors/ # 传感器模块
│ ├── __init__.py
│ ├── video_analyzer.py # 视频分析
│ ├── weight_sensor.py # 重量传感器
│ ├── wifi_scanner.py # WIFI探测
│ ├── infrared_counter.py # 红外计数
│ └── smart_camera.py # 智能相机
│
├── models/ # 数据模型
│ ├── __init__.py
│ ├── subway_station.py # 车站模型
│ ├── subway_train.py # 列车模型
│ ├── passenger_flow.py # 客流模型
│ └── guidance_info.py # 引导信息模型
│
├── display/ # 显示引导模块
│ ├── __init__.py
│ ├── led_display.py # LED显示屏
│ ├── mobile_app.py # 手机APP接口
│ ├── audio_announce.py # 语音播报
│ └── indicator_lights.py # 指示灯
│
├── utils/ # 工具函数
│ ├── __init__.py
│ ├── time_utils.py # 时间工具
│ ├── spatial_utils.py # 空间工具
│ ├── data_utils.py # 数据工具
│ └── logging_utils.py # 日志工具
│
├── database/ # 数据库
│ ├── __init__.py
│ ├── realtime_db.py # 实时数据库
│ ├── history_db.py # 历史数据库
│ └── config_db.py # 配置数据库
│
├── simulation/ # 仿真模块
│ ├── __init__.py
│ ├── passenger_simulator.py # 乘客仿真
│ ├── train_simulator.py # 列车仿真
│ └── station_simulator.py # 车站仿真
│
├── web_dashboard/ # Web管理界面
│ ├── app.py
│ ├── templates/
│ └── static/
│
└── tests/ # 测试文件
├── __init__.py
├── test_crowd_predictor.py
└── test_guidance_optimizer.py
核心代码实现
main.py
#!/usr/bin/env python3
"""
地铁客流智能预警与引导系统
基于智能控制的客流监测与引导系统
"""
import os
import sys
import time
import json
import logging
import threading
import queue
import signal
import atexit
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Any
import numpy as np
import pandas as pd
from dataclasses import asdict
import uuid
# 导入自定义模块
from config import SystemConfig
from models.subway_station import SubwayStation
from models.subway_train import SubwayTrain
from models.passenger_flow import PassengerFlow
from models.guidance_info import GuidanceInfo
from core.data_fusion import DataFusionEngine
from core.crowd_predictor import CrowdPredictor
from core.alert_engine import AlertEngine
from core.guidance_optimizer import GuidanceOptimizer
from core.performance_analyzer import PerformanceAnalyzer
from sensors.video_analyzer import VideoAnalyzer
from sensors.weight_sensor import WeightSensor
from sensors.wifi_scanner import WifiScanner
from sensors.infrared_counter import InfraredCounter
from display.led_display import LEDDisplay
from display.mobile_app import MobileAppInterface
from display.audio_announce import AudioAnnouncer
from utils.logging_utils import setup_logging
from database.realtime_db import RealtimeDatabase
from database.history_db import HistoryDatabase
from simulation.passenger_simulator import PassengerSimulator
# 设置日志
logger = setup_logging()
# 全局变量
running = False
system_threads = []
class SmartSubwayCrowdSystem:
"""地铁客流智能预警系统"""
def __init__(self, config_path: str = 'config.yaml'):
"""
初始化系统
Args:
config_path: 配置文件路径
"""
# 加载配置
self.config = SystemConfig(config_path)
# 初始化数据库
self.realtime_db = RealtimeDatabase(self.config.REALTIME_DB_PATH)
self.history_db = HistoryDatabase(self.config.HISTORY_DB_PATH)
# 初始化车站模型
self.stations = self._initialize_stations()
# 初始化列车模型
self.trains = self._initialize_trains()
# 初始化传感器
self.sensors = self._initialize_sensors()
# 初始化显示设备
self.displays = self._initialize_displays()
# 初始化核心算法模块
self.data_fusion = DataFusionEngine(self.config)
self.crowd_predictor = CrowdPredictor(self.config)
self.alert_engine = AlertEngine(self.config)
self.guidance_optimizer = GuidanceOptimizer(self.config)
self.performance_analyzer = PerformanceAnalyzer()
# 初始化仿真器(用于测试和开发)
if self.config.ENABLE_SIMULATION:
self.simulator = PassengerSimulator(self.config)
# 消息队列
self.sensor_queue = queue.Queue() # 传感器数据队列
self.alert_queue = queue.Queue() # 预警队列
self.guidance_queue = queue.Queue() # 引导队列
self.display_queue = queue.Queue() # 显示队列
# 线程控制
self.running = False
self.sensor_thread = None
self.fusion_thread = None
self.alert_thread = None
self.guidance_thread = None
self.display_thread = None
# 统计信息
self.stats = {
'start_time': datetime.now(),
'total_passengers': 0,
'peak_passengers': 0,
'alert_count': 0,
'guidance_count': 0,
'avg_crowd_density': 0.0,
'avg_wait_time': 0.0,
'satisfaction_score': 0.0,
'efficiency_score': 0.0
}
# 预警历史
self.alert_history = []
# 引导历史
self.guidance_history = []
# 加载历史数据
self._load_history_data()
# 训练预测模型
self._train_predictor()
# 注册退出处理
atexit.register(self.cleanup)
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
logger.info("地铁客流智能预警系统初始化完成")
def _initialize_stations(self) -> Dict[str, SubwayStation]:
"""初始化车站"""
stations = {}
try:
# 从数据库加载车站配置
station_configs = self.realtime_db.get_station_configs()
for config in station_configs:
station = SubwayStation.from_config(config)
stations[station.station_id] = station
logger.info(f"初始化车站: {station.name} ({station.station_id})")
# 如果没有配置,创建示例车站
if not stations:
logger.warning("无车站配置,创建示例车站")
# 创建示例线路
example_stations = [
{
'station_id': 'S001',
'name': '人民广场站',
'line': '1号线',
'platforms': ['P1', 'P2'],
'capacity': 5000,
'critical_points': ['A口', 'B口', 'C口', 'D口']
},
{
'station_id': 'S002',
'name': '南京东路站',
'line': '2号线',
'platforms': ['P1', 'P2'],
'capacity': 4000,
'critical_points': ['1口', '2口', '3口', '4口']
}
]
for config in example_stations:
station = SubwayStation.from_config(config)
stations[station.station_id] = station
logger.info(f"创建示例车站: {station.name}")
except Exception as e:
logger.error(f"初始化车站失败: {e}")
return stations
def _initialize_trains(self) -> Dict[str, SubwayTrain]:
"""初始化列车"""
trains = {}
try:
# 从数据库加载列车配置
train_configs = self.realtime_db.get_train_configs()
for config in train_configs:
train = SubwayTrain.from_config(config)
trains[train.train_id] = train
logger.info(f"初始化列车: {train.train_id} ({train.line})")
# 如果没有配置,创建示例列车
if not trains:
logger.warning("无列车配置,创建示例列车")
# 创建示例列车
for i in range(5):
train_id = f'T{100 + i:03d}'
train = SubwayTrain(
train_id=train_id,
line='1号线',
direction='上行',
car_count=6,
capacity_per_car=200,
current_station='S001',
next_station='S002',
arrival_time=datetime.now() + timedelta(minutes=i*2)
)
trains[train_id] = train
except Exception as e:
logger.error(f"初始化列车失败: {e}")
return trains
def _initialize_sensors(self) -> Dict[str, Any]:
"""初始化传感器"""
sensors = {}
try:
# 视频分析器
if self.config.ENABLE_VIDEO_ANALYSIS:
video_config = {
'camera_ips': self.config.CAMERA_IPS,
'analysis_interval': self.config.VIDEO_INTERVAL
}
sensors['video'] = VideoAnalyzer(video_config)
logger.info("视频分析器初始化完成")
# 重量传感器
if self.config.ENABLE_WEIGHT_SENSOR:
weight_config = {
'sensor_ids': self.config.WEIGHT_SENSOR_IDS,
'sampling_rate': self.config.WEIGHT_SAMPLING_RATE
}
sensors['weight'] = WeightSensor(weight_config)
logger.info("重量传感器初始化完成")
# WIFI扫描器
if self.config.ENABLE_WIFI_SCAN:
wifi_config = {
'ap_macs': self.config.WIFI_AP_MACS,
'scan_interval': self.config.WIFI_INTERVAL
}
sensors['wifi'] = WifiScanner(wifi_config)
logger.info("WIFI扫描器初始化完成")
# 红外计数器
if self.config.ENABLE_INFRARED:
infrared_config = {
'counter_ids': self.config.INFRARED_IDS,
'counting_zones': self.config.COUNTING_ZONES
}
sensors['infrared'] = InfraredCounter(infrared_config)
logger.info("红外计数器初始化完成")
except Exception as e:
logger.error(f"初始化传感器失败: {e}")
return sensors
def _initialize_displays(self) -> Dict[str, Any]:
"""初始化显示设备"""
displays = {}
try:
# LED显示屏
if self.config.ENABLE_LED_DISPLAY:
led_config = {
'display_ips': self.config.LED_DISPLAY_IPS,
'refresh_rate': self.config.LED_REFRESH_RATE
}
displays['led'] = LEDDisplay(led_config)
logger.info("LED显示屏初始化完成")
# 手机APP接口
if self.config.ENABLE_MOBILE_APP:
app_config = {
'api_url': self.config.APP_API_URL,
'update_interval': self.config.APP_UPDATE_INTERVAL
}
displays['mobile'] = MobileAppInterface(app_config)
logger.info("手机APP接口初始化完成")
# 语音播报
if self.config.ENABLE_AUDIO:
audio_config = {
'speaker_ips': self.config.SPEAKER_IPS,
'volume_levels': self.config.VOLUME_LEVELS
}
displays['audio'] = AudioAnnouncer(audio_config)
logger.info("语音播报初始化完成")
except Exception as e:
logger.error(f"初始化显示设备失败: {e}")
return displays
def _load_history_data(self):
"""加载历史数据"""
try:
# 加载历史客流数据
history_data = self.history_db.get_history_passenger_data(
days=self.config.HISTORY_DAYS
)
if history_data:
logger.info(f"加载历史数据: {len(history_data)} 条记录")
else:
logger.warning("无历史数据可用")
except Exception as e:
logger.error(f"加载历史数据失败: {e}")
def _train_predictor(self):
"""训练预测器"""
try:
# 获取训练数据
train_data = self.history_db.get_training_data()
if train_data and len(train_data) >= self.config.MIN_TRAINING_SAMPLES:
self.crowd_predictor.train(train_data)
logger.info(f"客流预测器训练完成,样本数: {len(train_data)}")
else:
logger.warning(f"训练样本不足: {len(train_data) if train_data else 0},使用默认模型")
except Exception as e:
logger.error(f"训练预测器失败: {e}")
def _sensor_collection_thread(self):
"""传感器数据采集线程"""
logger.info("传感器数据采集线程启动")
last_collection_time = {}
while self.running:
try:
current_time = time.time()
# 采集各传感器数据
for sensor_type, sensor in self.sensors.items():
# 控制采集频率
interval = self.config.SENSOR_INTERVALS.get(sensor_type, 1.0)
last_time = last_collection_time.get(sensor_type, 0)
if current_time - last_time >= interval:
try:
# 采集数据
sensor_data = sensor.collect_data()
if sensor_data:
# 添加时间戳
sensor_data['timestamp'] = datetime.now()
sensor_data['sensor_type'] = sensor_type
# 放入队列
self.sensor_queue.put(sensor_data)
logger.debug(f"采集{sensor_type}数据: {len(sensor_data.get('data', []))} 条")
except Exception as e:
logger.error(f"采集{sensor_type}数据失败: {e}")
# 更新最后采集时间
last_collection_time[sensor_type] = current_time
# 控制采集频率
time.sleep(0.1) # 100ms
except Exception as e:
logger.error(f"传感器采集线程错误: {e}")
time.sleep(1)
def _data_fusion_thread(self):
"""数据融合线程"""
logger.info("数据融合线程启动")
# 数据缓冲区
data_buffer = {}
last_fusion_time = 0
while self.running:
try:
current_time = time.time()
# 从队列获取数据
while not self.sensor_queue.empty():
try:
sensor_data = self.sensor_queue.get_nowait()
# 添加到缓冲区
sensor_type = sensor_data.get('sensor_type', 'unknown')
timestamp = sensor_data.get('timestamp')
if sensor_type not in data_buffer:
data_buffer[sensor_type] = []
data_buffer[sensor_type].append(sensor_data)
except queue.Empty:
break
# 定期进行数据融合
if current_time - last_fusion_time >= self.config.FUSION_INTERVAL:
if data_buffer:
# 执行数据融合
fusion_result = self.data_fusion.fuse(data_buffer)
if fusion_result:
# 更新车站和列车状态
self._update_states_from_fusion(fusion_result)
# 保存到数据库
self._save_fusion_result(fusion_result)
logger.debug(f"数据融合完成,车站数: {len(fusion_result.get('stations', {}))}")
# 清空缓冲区
data_buffer.clear()
last_fusion_time = current_time
time.sleep(0.1)
except Exception as e:
logger.error(f"数据融合线程错误: {e}")
time.sleep(1)
def _update_states_from_fusion(self, fusion_result: Dict):
"""从融合结果更新状态"""
try:
# 更新车站状态
station_data = fusion_result.get('stations', {})
for station_id, station_info in station_data.items():
if station_id in self.stations:
station = self.stations[station_id]
# 更新车站客流
if 'passenger_count' in station_info:
station.update_passenger_count(
station_info['passenger_count'],
station_info.get('timestamp', datetime.now())
)
# 更新关键点拥挤度
if 'critical_points' in station_info:
station.update_critical_points(station_info['critical_points'])
# 更新列车状态
train_data = fusion_result.get('trains', {})
for train_id, train_info in train_data.items():
if train_id in self.trains:
train = self.trains[train_id]
# 更新车厢拥挤度
if 'car_crowding' in train_info:
train.update_car_crowding(train_info['car_crowding'])
# 更新位置信息
if 'current_station' in train_info:
train.current_station = train_info['current_station']
if 'next_station' in train_info:
train.next_station = train_info['next_station']
if 'arrival_time' in train_info:
train.arrival_time = train_info['arrival_time']
except Exception as e:
logger.error(f"更新状态失败: {e}")
def _save_fusion_result(self, fusion_result: Dict):
"""保存融合结果到数据库"""
try:
如果你觉得这个工具好用,欢迎关注我!