news 2026/6/10 21:01:15

拼车匹配程序,根据出发时间,路线相似度,匹配最优拼车伙伴,分摊费用。

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
拼车匹配程序,根据出发时间,路线相似度,匹配最优拼车伙伴,分摊费用。

智能拼车匹配系统设计与实现

一、实际应用场景与痛点分析

应用场景

随着城市化进程加快,交通拥堵和出行成本成为城市居民的重要关切。拼车作为一种经济、环保的出行方式,能够有效减少车辆上路、缓解交通压力、降低出行成本。本系统面向需要日常通勤、长途出行、特殊时段用车的用户,通过智能匹配算法为用户找到最优拼车伙伴。

主要痛点

1. 匹配效率低 - 传统拼车群靠人工匹配,效率低下

2. 路线不优化 - 简单的起点终点匹配,未考虑路线相似度

3. 时间不灵活 - 缺乏对时间弹性的智能处理

4. 费用不公 - 费用分摊缺乏科学计算方法

5. 安全担忧 - 缺乏信誉评价和行程验证

6. 体验差 - 沟通成本高,行程变更处理困难

7. 供需不平衡 - 高峰时段供不应求,平峰时段无人拼车

二、核心逻辑与智能控制原理

系统架构

用户层 → 匹配层 → 决策层 → 执行层

↓ ↓ ↓ ↓

注册/发布 → 相似度计算 → 智能匹配 → 费用分摊

行程查询 → 路径规划 → 多目标优化 → 行程管理

核心智能控制原理

1. 模糊控制 - 处理"时间灵活"、"路线相似"等模糊概念

2. 多目标优化 - 平衡时间、成本、舒适度多个目标

3. 专家系统 - 基于交通规则的匹配策略

4. 强化学习 - 根据历史匹配结果优化算法

5. 博弈论 - 费用分摊的公平性计算

三、代码实现

主程序:smart_ride_sharing.py

#!/usr/bin/env python3

"""

智能拼车匹配系统

基于智能控制原理的拼车匹配与费用分摊系统

"""

import json

import datetime

import time

import math

import heapq

import numpy as np

from typing import Dict, List, Tuple, Optional, Any, Set

from dataclasses import dataclass, asdict, field

from enum import Enum

import matplotlib.pyplot as plt

from matplotlib.patches import Circle, FancyArrowPatch

from collections import defaultdict, deque

import heapq

import uuid

import random

import logging

from dataclasses_json import dataclass_json

import os

from scipy.spatial import distance

from scipy.optimize import linear_sum_assignment

import networkx as nx

from sklearn.neighbors import BallTree

import pickle

# 配置日志

logging.basicConfig(

level=logging.INFO,

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

handlers=[

logging.FileHandler('ride_sharing.log', encoding='utf-8'),

logging.StreamHandler()

]

)

logger = logging.getLogger(__name__)

class UserRole(Enum):

"""用户角色枚举"""

DRIVER = "司机" # 车主

PASSENGER = "乘客" # 乘客

class TripStatus(Enum):

"""行程状态枚举"""

PENDING = "待匹配" # 等待匹配

MATCHED = "已匹配" # 已找到拼车伙伴

CONFIRMED = "已确认" # 双方确认

ONGOING = "进行中" # 行程进行中

COMPLETED = "已完成" # 行程完成

CANCELLED = "已取消" # 已取消

class VehicleType(Enum):

"""车辆类型枚举"""

SEDAN = "轿车" # 4座

SUV = "SUV" # 5-7座

MPV = "MPV" # 7-9座

ELECTRIC = "电动车" # 4座

class MatchStrategy(Enum):

"""匹配策略枚举"""

TIME_FIRST = "时间优先" # 时间匹配度优先

ROUTE_FIRST = "路线优先" # 路线相似度优先

COST_FIRST = "成本优先" # 成本节约优先

BALANCED = "平衡策略" # 平衡各项指标

@dataclass_json

@dataclass

class Location:

"""地理位置"""

id: str

name: str

latitude: float # 纬度

longitude: float # 经度

address: str = ""

district: str = "" # 行政区

def distance_to(self, other: 'Location') -> float:

"""计算两个位置之间的球面距离(公里)"""

# 使用Haversine公式计算大圆距离

R = 6371.0 # 地球半径,单位:公里

lat1 = math.radians(self.latitude)

lon1 = math.radians(self.longitude)

lat2 = math.radians(other.latitude)

lon2 = math.radians(other.longitude)

dlat = lat2 - lat1

dlon = lon2 - lon1

a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2

c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))

return R * c

def __hash__(self):

return hash(self.id)

@dataclass_json

@dataclass

class User:

"""用户信息"""

user_id: str

name: str

phone: str

role: UserRole

vehicle_type: Optional[VehicleType] = None

license_plate: Optional[str] = None

rating: float = 5.0 # 用户评分 1-5

total_trips: int = 0 # 总行程数

credit_score: int = 100 # 信用分

preferences: Dict[str, Any] = field(default_factory=dict) # 用户偏好

def __post_init__(self):

if self.role == UserRole.DRIVER and not self.vehicle_type:

raise ValueError("司机必须指定车辆类型")

def get_seat_capacity(self) -> int:

"""获取车辆座位数"""

if self.role != UserRole.DRIVER:

return 0

capacities = {

VehicleType.SEDAN: 4,

VehicleType.SUV: 5,

VehicleType.MPV: 7,

VehicleType.ELECTRIC: 4

}

return capacities.get(self.vehicle_type, 4)

def can_take_passengers(self) -> int:

"""可搭载乘客数(司机座位-1)"""

return max(0, self.get_seat_capacity() - 1)

@dataclass_json

@dataclass

class TripRequest:

"""行程请求"""

request_id: str

user_id: str

role: UserRole

start_location: Location

end_location: Location

departure_time: datetime.datetime

arrival_deadline: Optional[datetime.datetime] = None

time_flexibility: float = 0.5 # 时间灵活性 0-1,1表示最灵活

route_flexibility: float = 0.5 # 路线灵活性 0-1

max_detour_ratio: float = 0.3 # 最大绕行比例

preferred_gender: Optional[str] = None

luggage_size: int = 0 # 行李大小 0-3

special_requirements: List[str] = field(default_factory=list)

created_at: datetime.datetime = field(default_factory=datetime.datetime.now)

status: TripStatus = TripStatus.PENDING

def get_time_window(self) -> Tuple[datetime.datetime, datetime.datetime]:

"""获取时间窗口"""

# 根据灵活性计算可接受的时间范围

flexibility_minutes = self.time_flexibility * 60 # 最大60分钟灵活性

earliest = self.departure_time - datetime.timedelta(minutes=flexibility_minutes)

latest = self.departure_time + datetime.timedelta(minutes=flexibility_minutes)

if self.arrival_deadline:

latest = min(latest, self.arrival_deadline)

return earliest, latest

def is_time_compatible(self, other: 'TripRequest',

max_time_diff: float = 30) -> bool:

"""检查时间是否兼容"""

time_diff = abs((self.departure_time - other.departure_time).total_seconds() / 60)

# 考虑双方的灵活性

max_allowed_diff = (self.time_flexibility + other.time_flexibility) * 30

return time_diff <= min(max_time_diff, max_allowed_diff)

class RoutePlanner:

"""

路线规划器

计算最优路径和距离

"""

def __init__(self):

# 模拟的道路网络

self.road_network = self._create_sample_network()

self.travel_speed = 30 # 平均速度 30km/h

def _create_sample_network(self) -> Dict[Tuple[float, float], Dict]:

"""创建示例道路网络"""

# 在实际应用中,这里会使用真实地图数据

network = {}

# 添加一些关键节点

locations = [

(39.9042, 116.4074, "天安门"), # 北京中心

(39.9130, 116.3912, "故宫"),

(39.9096, 116.3972, "王府井"),

(39.9215, 116.3832, "西单"),

(39.9788, 116.3682, "中关村"),

(39.8322, 116.3571, "北京南站"),

(40.0716, 116.3155, "首都机场"),

]

for lat, lon, name in locations:

network[(lat, lon)] = {

'name': name,

'connections': [],

'traffic_factor': 1.0

}

# 添加连接

self._add_connections(network)

return network

def _add_connections(self, network: Dict):

"""添加道路连接"""

nodes = list(network.keys())

# 创建随机连接

for i, (lat1, lon1) in enumerate(nodes):

for j, (lat2, lon2) in enumerate(nodes[i+1:], i+1):

# 计算距离

dist = self._haversine_distance(lat1, lon1, lat2, lon2)

# 如果距离在合理范围内,添加连接

if 2 < dist < 20: # 2-20公里范围内

# 模拟交通状况

traffic_factor = 1.0 + random.uniform(0, 0.5) # 0-50%拥堵

network[(lat1, lon1)]['connections'].append({

'to': (lat2, lon2),

'distance': dist,

'traffic_factor': traffic_factor

})

network[(lat2, lon2)]['connections'].append({

'to': (lat1, lon1),

'distance': dist,

'traffic_factor': traffic_factor

})

def _haversine_distance(self, lat1: float, lon1: float,

lat2: float, lon2: float) -> float:

"""计算球面距离"""

R = 6371.0

lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2])

dlat = lat2 - lat1

dlon = lon2 - lon1

a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2

c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))

return R * c

def find_route(self, start: Location, end: Location) -> Dict:

"""查找从起点到终点的路线"""

# 在实际应用中,这里会调用地图API

# 这里使用简化的直线距离计算

# 计算直线距离

direct_distance = start.distance_to(end)

# 模拟实际道路距离(比直线距离长)

road_distance = direct_distance * random.uniform(1.2, 1.5)

# 计算预计时间

travel_time_minutes = (road_distance / self.travel_speed) * 60

return {

'start': start,

'end': end,

'direct_distance': direct_distance,

'road_distance': road_distance,

'estimated_time': travel_time_minutes,

'waypoints': [] # 在实际应用中会有途经点

}

def calculate_shared_route(self, requests: List[TripRequest]) -> Dict:

"""计算共享路线"""

if len(requests) < 2:

return {}

# 找到所有位置点

locations = []

for req in requests:

locations.append(req.start_location)

locations.append(req.end_location)

# 计算中心点

center_lat = sum(loc.latitude for loc in locations) / len(locations)

center_lon = sum(loc.longitude for loc in locations) / len(locations)

# 模拟共享路线

# 在实际应用中,这里会使用旅行商问题(TSP)或车辆路径问题(VRP)算法

shared_distance = 0

for i in range(len(locations) - 1):

shared_distance += locations[i].distance_to(locations[i+1])

# 计算单独出行的总距离

solo_distance = 0

for req in requests:

solo_distance += req.start_location.distance_to(req.end_location)

# 计算绕行比例

detour_ratio = (shared_distance - solo_distance) / solo_distance if solo_distance > 0 else 0

return {

'shared_distance': shared_distance,

'solo_distance': solo_distance,

'detour_ratio': detour_ratio,

'center_point': (center_lat, center_lon),

'total_saving': solo_distance - shared_distance

}

class FuzzyMatcher:

"""

模糊匹配器

处理模糊概念如"时间相近"、"路线相似"

"""

def __init__(self):

# 模糊集合定义

self.time_similarity_sets = {

'perfect': {'center': 0, 'range': 5}, # 完美匹配:±5分钟

'good': {'center': 10, 'range': 10}, # 良好匹配:5-15分钟

'fair': {'center': 20, 'range': 10}, # 一般匹配:15-25分钟

'poor': {'center': 30, 'range': 10} # 差匹配:25-35分钟

}

self.route_similarity_sets = {

'high': {'center': 0.9, 'range': 0.2}, # 高度相似:0.8-1.0

'medium': {'center': 0.7, 'range': 0.2}, # 中度相似:0.6-0.8

'low': {'center': 0.5, 'range': 0.2}, # 低度相似:0.4-0.6

'poor': {'center': 0.3, 'range': 0.2} # 差相似:0.2-0.4

}

self.detour_sets = {

'low': {'center': 0.1, 'range': 0.1}, # 低绕行:0-0.2

'medium': {'center': 0.25, 'range': 0.1},# 中绕行:0.15-0.35

'high': {'center': 0.4, 'range': 0.2} # 高绕行:0.3-0.5

}

def calculate_time_similarity(self, time1: datetime.datetime,

time2: datetime.datetime,

flexibility1: float = 0.5,

flexibility2: float = 0.5) -> float:

"""计算时间相似度(0-1)"""

time_diff_minutes = abs((time1 - time2).total_seconds() / 60)

# 考虑时间灵活性

max_tolerance = 30 + 30 * (flexibility1 + flexibility2)

if time_diff_minutes <= 5:

similarity = 1.0

elif time_diff_minutes >= max_tolerance:

similarity = 0.0

else:

similarity = 1.0 - (time_diff_minutes - 5) / (max_tolerance - 5)

return max(0.0, min(1.0, similarity))

def calculate_route_similarity(self, start1: Location, end1: Location,

start2: Location, end2: Location) -> float:

"""计算路线相似度(0-1)"""

# 计算起点和终点的距离

start_distance = start1.distance_to(start2)

end_distance = end1.distance_to(end2)

# 计算路线方向相似度

vec1 = (end1.latitude - start1.latitude, end1.longitude - start1.longitude)

vec2 = (end2.latitude - start2.latitude, end2.longitude - start2.longitude)

# 计算向量夹角

dot_product = vec1[0]*vec2[0] + vec1[1]*vec2[1]

norm1 = math.sqrt(vec1[0]**2 + vec1[1]**2)

norm2 = math.sqrt(vec2[0]**2 + vec2[1]**2)

if norm1 == 0 or norm2 == 0:

direction_similarity = 0.5

else:

cos_angle = dot_product / (norm1 * norm2)

# 将余弦值映射到0-1范围

direction_similarity = (cos_angle + 1) / 2

# 距离相似度

max_distance = 20 # 20公里

start_similarity = max(0, 1 - start_distance / max_distance)

end_similarity = max(0, 1 - end_distance / max_distance)

# 综合相似度

similarity = 0.3 * start_similarity + 0.3 * end_similarity + 0.4 * direction_similarity

return similarity

def calculate_comprehensive_match_score(self, driver_request: TripRequest,

passenger_request: TripRequest,

route_info: Dict) -> float:

"""计算综合匹配分数"""

# 时间相似度

time_score = self.calculate_time_similarity(

driver_request.departure_time,

passenger_request.departure_time,

driver_request.time_flexibility,

passenger_request.time_flexibility

)

# 路线相似度

route_score = self.calculate_route_similarity(

driver_request.start_location,

driver_request.end_location,

passenger_request.start_location,

passenger_request.end_location

)

# 绕行惩罚

detour_ratio = route_info.get('detour_ratio', 0)

detour_penalty = max(0, 1 - detour_ratio / 0.5) # 绕行超过50%得0分

# 用户评分影响

# 假设有用户对象传入,这里简化处理

driver_rating_factor = 1.0

passenger_rating_factor = 1.0

# 综合评分

weights = {

'time': 0.4,

'route': 0.3,

'detour': 0.2,

'rating': 0.1

}

match_score = (

weights['time'] * time_score +

weights['route'] * route_score +

weights['detour'] * detour_penalty +

weights['rating'] * (driver_rating_factor + passenger_rating_factor) / 2

)

return match_score

def fuzzy_match_classification(self, match_score: float) -> Dict[str, float]:

"""模糊匹配分类"""

classifications = {

'excellent': {'center': 0.9, 'range': 0.2},

'good': {'center': 0.75, 'range': 0.2},

'fair': {'center': 0.6, 'range': 0.2},

'poor': {'center': 0.4, 'range': 0.2}

}

memberships = {}

for level, params in classifications.items():

center = params['center']

width = params['range']

if match_score <= center - width/2 or match_score >= center + width/2:

membership = 0

elif match_score <= center:

membership = (match_score - (center - width/2)) / (width/2)

else:

membership = ((center + width/2) - match_score) / (width/2)

memberships[level] = max(0, min(1, membership))

return memberships

class MultiObjectiveOptimizer:

"""

多目标优化器

平衡时间、成本、舒适度等多个目标

"""

def __init__(self):

self.objectives = ['time_saving', 'cost_saving', 'comfort', 'fairness']

self.weights = {'time': 0.3, 'cost': 0.3, 'comfort': 0.2, 'fairness': 0.2}

def optimize_matching(self, drivers: List[TripRequest],

passengers: List[TripRequest],

strategy: MatchStrategy = MatchStrategy.BALANCED) -> List[Tuple]:

"""优化匹配"""

if not drivers or not passengers:

return []

# 根据策略调整权重

adjusted_weights = self._adjust_weights_for_strategy(strategy)

# 创建成本矩阵

cost_matrix = self._create_cost_matrix(drivers, passengers, adjusted_weights)

# 使用匈牙利算法进行最优匹配

row_ind, col_ind = linear_sum_assignment(cost_matrix, maximize=True)

# 生成匹配结果

matches = []

for i, j in zip(row_ind, col_ind):

if i < len(drivers) and j < len(passengers):

score = cost_matrix[i, j]

if score > 0.5: # 只保留较好的匹配

matches.append((drivers[i], passengers[j], score))

# 按分数排序

matches.sort(key=lambda x: x[2], reverse=True)

return matches

def _adjust_weights_for_strategy(self, strategy: MatchStrategy) -> Dict[str, float]:

"""根据策略调整权重"""

base_weights = self.weights.copy()

if strategy == MatchStrategy.TIME_FIRST:

base_weights['time'] = 0.5

base_weights['cost'] = 0.2

base_weights['comfort'] = 0.2

base_weights['fairness'] = 0.1

elif strategy == MatchStrategy.ROUTE_FIRST:

base_weights['time'] = 0.2

base_weights['cost'] = 0.3

base_weights['comfort'] = 0.4

base_weights['fairness'] = 0.1

elif strategy == MatchStrategy.COST_FIRST:

base_weights['time'] = 0.2

base_weights['cost'] = 0.5

base_weights['comfort'] = 0.2

base_weights['fairness'] = 0.1

return base_weights

def _create_cost_matrix(self, drivers: List[TripRequest],

passengers: List[TripRequest],

weights: Dict) -> np.ndarray:

"""创建成本矩阵"""

n_drivers = len(drivers)

n_passengers = len(passengers)

# 初始化矩阵

cost_matrix = np.zeros((n_drivers, n_passengers))

# 创建路线规划器和模糊匹配器

route_planner = RoutePlanner()

fuzzy_matcher = FuzzyMatcher()

for i, driver in enumerate(drivers):

for j, passenger in enumerate(passengers):

# 计算共享路线

shared_route = route_planner.calculate_shared_route([driver, passenger])

if shared_route.get('detour_ratio', 1) > 0.5:

# 绕行太大,跳过

cost_matrix[i, j] = 0

continue

# 计算各项得分

time_score = self._calculate_time_score(driver, passenger)

cost_score = self._calculate_cost_score(shared_route)

comfort_score = self._calculate_comfort_score(driver, passenger)

fairness_score = self._calculate_fairness_score(driver, passenger, shared_route)

# 综合得分

total_score = (

weights['time'] * time_score +

weights['cost'] * cost_score +

weights['comfort'] * comfort_score +

weights['fairness'] * fairness_score

)

cost_matrix[i, j] = total_score

return cost_matrix

def _calculate_time_score(self, driver: TripRequest, passenger: TripRequest) -> float:

"""计算时间得分"""

fuzzy_matcher = FuzzyMatcher()

return fuzzy_matcher.calculate_time_similarity(

driver.departure_time,

passenger.departure_time,

driver.time_flexibility,

passenger.time_flexibility

)

def _calculate_cost_score(self, shared_route: Dic

如果你觉得这个工具好用,欢迎关注我!

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 23:14:27

强烈安利10个AI论文工具,研究生高效写作必备!

强烈安利10个AI论文工具&#xff0c;研究生高效写作必备&#xff01; 论文写作的“神器”时代已经到来 对于研究生而言&#xff0c;论文写作是一项既重要又充满挑战的任务。随着人工智能技术的不断发展&#xff0c;AI工具逐渐成为学术研究中的得力助手。尤其是在降低AIGC&#…

作者头像 李华
网站建设 2026/6/10 12:58:10

AI科学家悄然“炼丹”:实验室里的静默革命与新药、新材料奇点

凌晨三点的实验室灯火通明却空无一人&#xff0c;机械臂精准地执行着人类科学家需要数周才能完成的实验流程&#xff0c;而AI系统正基于实时数据调整着下一个实验参数。这不是科幻场景&#xff0c;而是清华大学、天津大学等研究机构正在发生的新现实。2026年初&#xff0c;清华…

作者头像 李华
网站建设 2026/6/10 11:45:40

零基础入门:10分钟用FULLCALENDAR创建第一个日历

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个最简单的FULLCALENDAR入门示例&#xff0c;要求&#xff1a;1. 使用CDN方式引入 2. 展示基础月视图 3. 添加3个静态事件 4. 实现点击事件弹出详情 5. 包含完整HTML文件代码…

作者头像 李华
网站建设 2026/6/9 21:08:38

5个真实项目中findIndex()的经典应用案例

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 生成一个包含5个真实业务场景的代码集合&#xff0c;展示findIndex()的实际应用。每个案例需包含&#xff1a;1) 业务场景描述&#xff1b;2) 问题分析&#xff1b;3) findIndex解…

作者头像 李华
网站建设 2026/6/10 11:12:58

RaNER模型入门必看:AI实体侦测服务完整部署教程

RaNER模型入门必看&#xff1a;AI实体侦测服务完整部署教程 1. 引言 1.1 学习目标 本文将带你从零开始&#xff0c;完整部署基于 RaNER&#xff08;Named Entity Recognition&#xff09; 模型的中文命名实体识别服务。你将学会如何使用预置镜像快速启动一个具备高性能 NER …

作者头像 李华
网站建设 2026/6/10 13:55:27

Qwen3-VL-WEBUI应用场景:博物馆文物数字化管理系统

Qwen3-VL-WEBUI应用场景&#xff1a;博物馆文物数字化管理系统 1. 引言&#xff1a;AI驱动的文物数字化新范式 随着文化遗产保护意识的提升&#xff0c;博物馆正面临海量文物信息采集、分类、标注与公众服务的巨大挑战。传统人工处理方式效率低、成本高&#xff0c;且难以实现…

作者头像 李华