news 2026/4/16 18:22:07

拼车匹配程序,根据出发时间,路线相似度,匹配最优拼车伙伴,分摊费用。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
拼车匹配程序,根据出发时间,路线相似度,匹配最优拼车伙伴,分摊费用。

智能拼车匹配系统设计与实现

一、实际应用场景与痛点分析

应用场景

随着城市化进程加快,交通拥堵和出行成本成为城市居民的重要关切。拼车作为一种经济、环保的出行方式,能够有效减少车辆上路、缓解交通压力、降低出行成本。本系统面向需要日常通勤、长途出行、特殊时段用车的用户,通过智能匹配算法为用户找到最优拼车伙伴。

主要痛点

1. 匹配效率低 - 传统拼车群靠人工匹配,效率低下

2. 路线不优化 - 简单的起点终点匹配,未考虑路线相似度

3. 时间不灵活 - 缺乏对时间弹性的智能处理

4. 费用不公 - 费用分摊缺乏科学计算方法

5. 安全担忧 - 缺乏信誉评价和行程验证

6. 体验差 - 沟通成本高,行程变更处理困难

7. 供需不平衡 - 高峰时段供不应求,平峰时段无人拼车

二、核心逻辑与智能控制原理

系统架构

用户层 → 匹配层 → 决策层 → 执行层

↓ ↓ ↓ ↓

注册/发布 → 相似度计算 → 智能匹配 → 费用分摊

行程查询 → 路径规划 → 多目标优化 → 行程管理

核心智能控制原理

1. 模糊控制 - 处理"时间灵活"、"路线相似"等模糊概念

2. 多目标优化 - 平衡时间、成本、舒适度多个目标

3. 专家系统 - 基于交通规则的匹配策略

4. 强化学习 - 根据历史匹配结果优化算法

5. 博弈论 - 费用分摊的公平性计算

三、代码实现

主程序:smart_ride_sharing.py

#!/usr/bin/env python3

"""

智能拼车匹配系统

基于智能控制原理的拼车匹配与费用分摊系统

"""

import json

import datetime

import time

import math

import heapq

import numpy as np

from typing import Dict, List, Tuple, Optional, Any, Set

from dataclasses import dataclass, asdict, field

from enum import Enum

import matplotlib.pyplot as plt

from matplotlib.patches import Circle, FancyArrowPatch

from collections import defaultdict, deque

import heapq

import uuid

import random

import logging

from dataclasses_json import dataclass_json

import os

from scipy.spatial import distance

from scipy.optimize import linear_sum_assignment

import networkx as nx

from sklearn.neighbors import BallTree

import pickle

# 配置日志

logging.basicConfig(

level=logging.INFO,

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

handlers=[

logging.FileHandler('ride_sharing.log', encoding='utf-8'),

logging.StreamHandler()

]

)

logger = logging.getLogger(__name__)

class UserRole(Enum):

"""用户角色枚举"""

DRIVER = "司机" # 车主

PASSENGER = "乘客" # 乘客

class TripStatus(Enum):

"""行程状态枚举"""

PENDING = "待匹配" # 等待匹配

MATCHED = "已匹配" # 已找到拼车伙伴

CONFIRMED = "已确认" # 双方确认

ONGOING = "进行中" # 行程进行中

COMPLETED = "已完成" # 行程完成

CANCELLED = "已取消" # 已取消

class VehicleType(Enum):

"""车辆类型枚举"""

SEDAN = "轿车" # 4座

SUV = "SUV" # 5-7座

MPV = "MPV" # 7-9座

ELECTRIC = "电动车" # 4座

class MatchStrategy(Enum):

"""匹配策略枚举"""

TIME_FIRST = "时间优先" # 时间匹配度优先

ROUTE_FIRST = "路线优先" # 路线相似度优先

COST_FIRST = "成本优先" # 成本节约优先

BALANCED = "平衡策略" # 平衡各项指标

@dataclass_json

@dataclass

class Location:

"""地理位置"""

id: str

name: str

latitude: float # 纬度

longitude: float # 经度

address: str = ""

district: str = "" # 行政区

def distance_to(self, other: 'Location') -> float:

"""计算两个位置之间的球面距离(公里)"""

# 使用Haversine公式计算大圆距离

R = 6371.0 # 地球半径,单位:公里

lat1 = math.radians(self.latitude)

lon1 = math.radians(self.longitude)

lat2 = math.radians(other.latitude)

lon2 = math.radians(other.longitude)

dlat = lat2 - lat1

dlon = lon2 - lon1

a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2

c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))

return R * c

def __hash__(self):

return hash(self.id)

@dataclass_json

@dataclass

class User:

"""用户信息"""

user_id: str

name: str

phone: str

role: UserRole

vehicle_type: Optional[VehicleType] = None

license_plate: Optional[str] = None

rating: float = 5.0 # 用户评分 1-5

total_trips: int = 0 # 总行程数

credit_score: int = 100 # 信用分

preferences: Dict[str, Any] = field(default_factory=dict) # 用户偏好

def __post_init__(self):

if self.role == UserRole.DRIVER and not self.vehicle_type:

raise ValueError("司机必须指定车辆类型")

def get_seat_capacity(self) -> int:

"""获取车辆座位数"""

if self.role != UserRole.DRIVER:

return 0

capacities = {

VehicleType.SEDAN: 4,

VehicleType.SUV: 5,

VehicleType.MPV: 7,

VehicleType.ELECTRIC: 4

}

return capacities.get(self.vehicle_type, 4)

def can_take_passengers(self) -> int:

"""可搭载乘客数(司机座位-1)"""

return max(0, self.get_seat_capacity() - 1)

@dataclass_json

@dataclass

class TripRequest:

"""行程请求"""

request_id: str

user_id: str

role: UserRole

start_location: Location

end_location: Location

departure_time: datetime.datetime

arrival_deadline: Optional[datetime.datetime] = None

time_flexibility: float = 0.5 # 时间灵活性 0-1,1表示最灵活

route_flexibility: float = 0.5 # 路线灵活性 0-1

max_detour_ratio: float = 0.3 # 最大绕行比例

preferred_gender: Optional[str] = None

luggage_size: int = 0 # 行李大小 0-3

special_requirements: List[str] = field(default_factory=list)

created_at: datetime.datetime = field(default_factory=datetime.datetime.now)

status: TripStatus = TripStatus.PENDING

def get_time_window(self) -> Tuple[datetime.datetime, datetime.datetime]:

"""获取时间窗口"""

# 根据灵活性计算可接受的时间范围

flexibility_minutes = self.time_flexibility * 60 # 最大60分钟灵活性

earliest = self.departure_time - datetime.timedelta(minutes=flexibility_minutes)

latest = self.departure_time + datetime.timedelta(minutes=flexibility_minutes)

if self.arrival_deadline:

latest = min(latest, self.arrival_deadline)

return earliest, latest

def is_time_compatible(self, other: 'TripRequest',

max_time_diff: float = 30) -> bool:

"""检查时间是否兼容"""

time_diff = abs((self.departure_time - other.departure_time).total_seconds() / 60)

# 考虑双方的灵活性

max_allowed_diff = (self.time_flexibility + other.time_flexibility) * 30

return time_diff <= min(max_time_diff, max_allowed_diff)

class RoutePlanner:

"""

路线规划器

计算最优路径和距离

"""

def __init__(self):

# 模拟的道路网络

self.road_network = self._create_sample_network()

self.travel_speed = 30 # 平均速度 30km/h

def _create_sample_network(self) -> Dict[Tuple[float, float], Dict]:

"""创建示例道路网络"""

# 在实际应用中,这里会使用真实地图数据

network = {}

# 添加一些关键节点

locations = [

(39.9042, 116.4074, "天安门"), # 北京中心

(39.9130, 116.3912, "故宫"),

(39.9096, 116.3972, "王府井"),

(39.9215, 116.3832, "西单"),

(39.9788, 116.3682, "中关村"),

(39.8322, 116.3571, "北京南站"),

(40.0716, 116.3155, "首都机场"),

]

for lat, lon, name in locations:

network[(lat, lon)] = {

'name': name,

'connections': [],

'traffic_factor': 1.0

}

# 添加连接

self._add_connections(network)

return network

def _add_connections(self, network: Dict):

"""添加道路连接"""

nodes = list(network.keys())

# 创建随机连接

for i, (lat1, lon1) in enumerate(nodes):

for j, (lat2, lon2) in enumerate(nodes[i+1:], i+1):

# 计算距离

dist = self._haversine_distance(lat1, lon1, lat2, lon2)

# 如果距离在合理范围内,添加连接

if 2 < dist < 20: # 2-20公里范围内

# 模拟交通状况

traffic_factor = 1.0 + random.uniform(0, 0.5) # 0-50%拥堵

network[(lat1, lon1)]['connections'].append({

'to': (lat2, lon2),

'distance': dist,

'traffic_factor': traffic_factor

})

network[(lat2, lon2)]['connections'].append({

'to': (lat1, lon1),

'distance': dist,

'traffic_factor': traffic_factor

})

def _haversine_distance(self, lat1: float, lon1: float,

lat2: float, lon2: float) -> float:

"""计算球面距离"""

R = 6371.0

lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])

dlat = lat2 - lat1

dlon = lon2 - lon1

a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2

c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))

return R * c

def find_route(self, start: Location, end: Location) -> Dict:

"""查找从起点到终点的路线"""

# 在实际应用中,这里会调用地图API

# 这里使用简化的直线距离计算

# 计算直线距离

direct_distance = start.distance_to(end)

# 模拟实际道路距离(比直线距离长)

road_distance = direct_distance * random.uniform(1.2, 1.5)

# 计算预计时间

travel_time_minutes = (road_distance / self.travel_speed) * 60

return {

'start': start,

'end': end,

'direct_distance': direct_distance,

'road_distance': road_distance,

'estimated_time': travel_time_minutes,

'waypoints': [] # 在实际应用中会有途经点

}

def calculate_shared_route(self, requests: List[TripRequest]) -> Dict:

"""计算共享路线"""

if len(requests) < 2:

return {}

# 找到所有位置点

locations = []

for req in requests:

locations.append(req.start_location)

locations.append(req.end_location)

# 计算中心点

center_lat = sum(loc.latitude for loc in locations) / len(locations)

center_lon = sum(loc.longitude for loc in locations) / len(locations)

# 模拟共享路线

# 在实际应用中,这里会使用旅行商问题(TSP)或车辆路径问题(VRP)算法

shared_distance = 0

for i in range(len(locations) - 1):

shared_distance += locations[i].distance_to(locations[i+1])

# 计算单独出行的总距离

solo_distance = 0

for req in requests:

solo_distance += req.start_location.distance_to(req.end_location)

# 计算绕行比例

detour_ratio = (shared_distance - solo_distance) / solo_distance if solo_distance > 0 else 0

return {

'shared_distance': shared_distance,

'solo_distance': solo_distance,

'detour_ratio': detour_ratio,

'center_point': (center_lat, center_lon),

'total_saving': solo_distance - shared_distance

}

class FuzzyMatcher:

"""

模糊匹配器

处理模糊概念如"时间相近"、"路线相似"

"""

def __init__(self):

# 模糊集合定义

self.time_similarity_sets = {

'perfect': {'center': 0, 'range': 5}, # 完美匹配:±5分钟

'good': {'center': 10, 'range': 10}, # 良好匹配:5-15分钟

'fair': {'center': 20, 'range': 10}, # 一般匹配:15-25分钟

'poor': {'center': 30, 'range': 10} # 差匹配:25-35分钟

}

self.route_similarity_sets = {

'high': {'center': 0.9, 'range': 0.2}, # 高度相似:0.8-1.0

'medium': {'center': 0.7, 'range': 0.2}, # 中度相似:0.6-0.8

'low': {'center': 0.5, 'range': 0.2}, # 低度相似:0.4-0.6

'poor': {'center': 0.3, 'range': 0.2} # 差相似:0.2-0.4

}

self.detour_sets = {

'low': {'center': 0.1, 'range': 0.1}, # 低绕行:0-0.2

'medium': {'center': 0.25, 'range': 0.1},# 中绕行:0.15-0.35

'high': {'center': 0.4, 'range': 0.2} # 高绕行:0.3-0.5

}

def calculate_time_similarity(self, time1: datetime.datetime,

time2: datetime.datetime,

flexibility1: float = 0.5,

flexibility2: float = 0.5) -> float:

"""计算时间相似度(0-1)"""

time_diff_minutes = abs((time1 - time2).total_seconds() / 60)

# 考虑时间灵活性

max_tolerance = 30 + 30 * (flexibility1 + flexibility2)

if time_diff_minutes <= 5:

similarity = 1.0

elif time_diff_minutes >= max_tolerance:

similarity = 0.0

else:

similarity = 1.0 - (time_diff_minutes - 5) / (max_tolerance - 5)

return max(0.0, min(1.0, similarity))

def calculate_route_similarity(self, start1: Location, end1: Location,

start2: Location, end2: Location) -> float:

"""计算路线相似度(0-1)"""

# 计算起点和终点的距离

start_distance = start1.distance_to(start2)

end_distance = end1.distance_to(end2)

# 计算路线方向相似度

vec1 = (end1.latitude - start1.latitude, end1.longitude - start1.longitude)

vec2 = (end2.latitude - start2.latitude, end2.longitude - start2.longitude)

# 计算向量夹角

dot_product = vec1[0]*vec2[0] + vec1[1]*vec2[1]

norm1 = math.sqrt(vec1[0]**2 + vec1[1]**2)

norm2 = math.sqrt(vec2[0]**2 + vec2[1]**2)

if norm1 == 0 or norm2 == 0:

direction_similarity = 0.5

else:

cos_angle = dot_product / (norm1 * norm2)

# 将余弦值映射到0-1范围

direction_similarity = (cos_angle + 1) / 2

# 距离相似度

max_distance = 20 # 20公里

start_similarity = max(0, 1 - start_distance / max_distance)

end_similarity = max(0, 1 - end_distance / max_distance)

# 综合相似度

similarity = 0.3 * start_similarity + 0.3 * end_similarity + 0.4 * direction_similarity

return similarity

def calculate_comprehensive_match_score(self, driver_request: TripRequest,

passenger_request: TripRequest,

route_info: Dict) -> float:

"""计算综合匹配分数"""

# 时间相似度

time_score = self.calculate_time_similarity(

driver_request.departure_time,

passenger_request.departure_time,

driver_request.time_flexibility,

passenger_request.time_flexibility

)

# 路线相似度

route_score = self.calculate_route_similarity(

driver_request.start_location,

driver_request.end_location,

passenger_request.start_location,

passenger_request.end_location

)

# 绕行惩罚

detour_ratio = route_info.get('detour_ratio', 0)

detour_penalty = max(0, 1 - detour_ratio / 0.5) # 绕行超过50%得0分

# 用户评分影响

# 假设有用户对象传入,这里简化处理

driver_rating_factor = 1.0

passenger_rating_factor = 1.0

# 综合评分

weights = {

'time': 0.4,

'route': 0.3,

'detour': 0.2,

'rating': 0.1

}

match_score = (

weights['time'] * time_score +

weights['route'] * route_score +

weights['detour'] * detour_penalty +

weights['rating'] * (driver_rating_factor + passenger_rating_factor) / 2

)

return match_score

def fuzzy_match_classification(self, match_score: float) -> Dict[str, float]:

"""模糊匹配分类"""

classifications = {

'excellent': {'center': 0.9, 'range': 0.2},

'good': {'center': 0.75, 'range': 0.2},

'fair': {'center': 0.6, 'range': 0.2},

'poor': {'center': 0.4, 'range': 0.2}

}

memberships = {}

for level, params in classifications.items():

center = params['center']

width = params['range']

if match_score <= center - width/2 or match_score >= center + width/2:

membership = 0

elif match_score <= center:

membership = (match_score - (center - width/2)) / (width/2)

else:

membership = ((center + width/2) - match_score) / (width/2)

memberships[level] = max(0, min(1, membership))

return memberships

class MultiObjectiveOptimizer:

"""

多目标优化器

平衡时间、成本、舒适度等多个目标

"""

def __init__(self):

self.objectives = ['time_saving', 'cost_saving', 'comfort', 'fairness']

self.weights = {'time': 0.3, 'cost': 0.3, 'comfort': 0.2, 'fairness': 0.2}

def optimize_matching(self, drivers: List[TripRequest],

passengers: List[TripRequest],

strategy: MatchStrategy = MatchStrategy.BALANCED) -> List[Tuple]:

"""优化匹配"""

if not drivers or not passengers:

return []

# 根据策略调整权重

adjusted_weights = self._adjust_weights_for_strategy(strategy)

# 创建成本矩阵

cost_matrix = self._create_cost_matrix(drivers, passengers, adjusted_weights)

# 使用匈牙利算法进行最优匹配

row_ind, col_ind = linear_sum_assignment(cost_matrix, maximize=True)

# 生成匹配结果

matches = []

for i, j in zip(row_ind, col_ind):

if i < len(drivers) and j < len(passengers):

score = cost_matrix[i, j]

if score > 0.5: # 只保留较好的匹配

matches.append((drivers[i], passengers[j], score))

# 按分数排序

matches.sort(key=lambda x: x[2], reverse=True)

return matches

def _adjust_weights_for_strategy(self, strategy: MatchStrategy) -> Dict[str, float]:

"""根据策略调整权重"""

base_weights = self.weights.copy()

if strategy == MatchStrategy.TIME_FIRST:

base_weights['time'] = 0.5

base_weights['cost'] = 0.2

base_weights['comfort'] = 0.2

base_weights['fairness'] = 0.1

elif strategy == MatchStrategy.ROUTE_FIRST:

base_weights['time'] = 0.2

base_weights['cost'] = 0.3

base_weights['comfort'] = 0.4

base_weights['fairness'] = 0.1

elif strategy == MatchStrategy.COST_FIRST:

base_weights['time'] = 0.2

base_weights['cost'] = 0.5

base_weights['comfort'] = 0.2

base_weights['fairness'] = 0.1

return base_weights

def _create_cost_matrix(self, drivers: List[TripRequest],

passengers: List[TripRequest],

weights: Dict) -> np.ndarray:

"""创建成本矩阵"""

n_drivers = len(drivers)

n_passengers = len(passengers)

# 初始化矩阵

cost_matrix = np.zeros((n_drivers, n_passengers))

# 创建路线规划器和模糊匹配器

route_planner = RoutePlanner()

fuzzy_matcher = FuzzyMatcher()

for i, driver in enumerate(drivers):

for j, passenger in enumerate(passengers):

# 计算共享路线

shared_route = route_planner.calculate_shared_route([driver, passenger])

if shared_route.get('detour_ratio', 1) > 0.5:

# 绕行太大,跳过

cost_matrix[i, j] = 0

continue

# 计算各项得分

time_score = self._calculate_time_score(driver, passenger)

cost_score = self._calculate_cost_score(shared_route)

comfort_score = self._calculate_comfort_score(driver, passenger)

fairness_score = self._calculate_fairness_score(driver, passenger, shared_route)

# 综合得分

total_score = (

weights['time'] * time_score +

weights['cost'] * cost_score +

weights['comfort'] * comfort_score +

weights['fairness'] * fairness_score

)

cost_matrix[i, j] = total_score

return cost_matrix

def _calculate_time_score(self, driver: TripRequest, passenger: TripRequest) -> float:

"""计算时间得分"""

fuzzy_matcher = FuzzyMatcher()

return fuzzy_matcher.calculate_time_similarity(

driver.departure_time,

passenger.departure_time,

driver.time_flexibility,

passenger.time_flexibility

)

def _calculate_cost_score(self, shared_route: Dic

如果你觉得这个工具好用,欢迎关注我!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 13:01:31

RaNER vs BERT实战对比:中文命名实体识别精度与性能评测

RaNER vs BERT实战对比&#xff1a;中文命名实体识别精度与性能评测 1. 选型背景与评测目标 在自然语言处理&#xff08;NLP&#xff09;任务中&#xff0c;命名实体识别&#xff08;Named Entity Recognition, NER&#xff09; 是信息抽取的核心环节&#xff0c;广泛应用于智…

作者头像 李华
网站建设 2026/4/15 17:00:44

Qwen3-VL-WEBUI监控方案:模型运行状态跟踪部署教程

Qwen3-VL-WEBUI监控方案&#xff1a;模型运行状态跟踪部署教程 1. 引言 随着多模态大模型在视觉理解、语言生成和跨模态推理能力上的飞速发展&#xff0c;Qwen3-VL-WEBUI 成为开发者与研究者快速部署、调试和监控阿里通义千问系列最强视觉语言模型的重要工具。该 WEBUI 基于阿…

作者头像 李华
网站建设 2026/4/16 11:03:46

Qwen2.5-7B微调实战:云端Colab替代方案,数据更安全

Qwen2.5-7B微调实战&#xff1a;云端Colab替代方案&#xff0c;数据更安全 引言&#xff1a;为什么需要替代Colab的微调方案&#xff1f; 作为一名AI研究员或开发者&#xff0c;当你需要微调大语言模型时&#xff0c;Google Colab可能是你首先想到的工具。它免费、易用&#…

作者头像 李华
网站建设 2026/4/16 14:30:11

AI智能实体侦测服务政府项目申报:单位名称自动校验实战

AI智能实体侦测服务政府项目申报&#xff1a;单位名称自动校验实战 1. 引言&#xff1a;AI 智能实体侦测服务在政务场景中的价值 随着电子政务系统的快速发展&#xff0c;政府项目申报材料的自动化处理需求日益增长。传统的人工审核方式不仅效率低下&#xff0c;还容易因信息…

作者头像 李华
网站建设 2026/4/16 11:08:30

Redis数据类型选择:如何提升10倍性能

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个Redis性能对比测试工具&#xff0c;针对五种数据类型进行以下测试&#xff1a;1. 10万次写入耗时对比&#xff1b;2. 范围查询效率对比&#xff1b;3. 内存占用对比&#…

作者头像 李华
网站建设 2026/4/16 17:25:55

5分钟搞定:用MINICONDA快速搭建Python原型环境

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 开发一个原型环境快速生成器&#xff0c;根据项目类型(数据科学/Web开发/自动化等)自动&#xff1a;1.下载MINICONDA 2.创建专用虚拟环境 3.安装基础依赖包 4.生成示例项目结构 5.…

作者头像 李华