引言:城市公交调度的挑战与机遇

在现代城市交通系统中,公交车作为公共交通的骨干力量,承担着数以亿计的日常通勤任务。然而,传统的公交排班系统往往采用固定的时刻表,无法动态响应早晚高峰的交通拥堵以及突发天气变化等不确定因素。这种”一刀切”的调度模式导致了诸多问题:高峰时段车辆拥挤不堪,乘客等待时间过长;平峰时段车辆空驶率高,运营成本浪费;恶劣天气下,准点率大幅下降,乘客体验恶化。

随着大数据、人工智能和物联网技术的发展,精准的排期预测成为可能。通过整合多源数据、构建智能算法模型,公交系统可以实现从”被动响应”到”主动预测”的转变。本文将深入探讨如何利用现代技术手段,构建一个能够精准应对早晚高峰拥堵与突发天气变化的智能公交排班系统。

一、数据基础:多源异构数据的整合与处理

1.1 核心数据源分析

构建精准的预测系统,首先需要全面、高质量的数据支撑。以下是公交排班预测所需的四大核心数据源:

(1)历史运营数据

  • 车辆GPS轨迹数据:包含时间戳、位置坐标、速度、方向等信息
  • 刷卡数据:乘客上下车时间、站点信息
  • 调度日志:发车时间、到站时间、车辆编号、司机信息
  • 线路基础信息:站点坐标、线路走向、站点间距

(2)实时交通数据

  • 道路拥堵指数:来自高德、百度等地图服务商的实时路况
  • 交叉口延误数据:关键节点的通行时间
  • 交通事故信息:突发交通事件的实时上报

(3)气象数据

  • 实时天气状况:温度、降水、风速、能见度
  • 天气预报:短期(0-6小时)和中期(6-72小时)预报
  • 历史气象数据:用于训练模型的长期数据积累

(4)外部事件数据

  • 大型活动:演唱会、体育赛事、展览等
  • 道路施工:封闭、改道信息
  • 节假日安排:工作日/休息日调整

1.2 数据预处理技术

原始数据往往存在噪声、缺失和异常,需要经过严格的预处理流程:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class DataPreprocessor:
    def __init__(self):
        self.gps_noise_threshold = 50  # GPS噪声阈值(米)
        self.speed_limit = 80  # 公交车速度上限(km/h)
        
    def clean_gps_data(self, raw_gps_df):
        """清洗GPS轨迹数据"""
        # 1. 剔除定位精度差的数据
        df = raw_gps_df[raw_gps_df['accuracy'] <= 20]
        
        # 2. 去除速度异常值
        df = df[(df['speed'] >= 0) & (df['speed'] <= self.speed_limit)]
        
        # 3. 基于卡尔曼滤波平滑轨迹
        df = self.kalman_filter_smoothing(df)
        
        # 4. 地理围栏校验,确保车辆在合理路线范围内
        df = self.geofence_validation(df)
        
        return df
    
    def kalman_filter_smoothing(self, df):
        """卡尔曼滤波平滑GPS轨迹"""
        # 初始化卡尔曼滤波器
        Q = 0.01  # 过程噪声协方差
        R = 0.1   # 测量噪声协方差
        
        # 状态初始化
        x_est = 0
        P = 1
        
        smoothed_data = []
        
        for _, row in df.iterrows():
            # 预测步骤
            x_pred = x_est
            P_pred = P + Q
            
            # 更新步骤
            K = P_pred / (P_pred + R)
            x_est = x_pred + K * (row['speed'] - x_pred)
            P = (1 - K) * P_pred
            
            smoothed_data.append(x_est)
        
        df['smoothed_speed'] = smoothed_data
        return df
    
    def impute_missing_data(self, df, method='linear'):
        """处理缺失数据"""
        # 时间序列重采样,确保等间隔
        df = df.set_index('timestamp').resample('1min').mean()
        
        # 线性插值填补缺失值
        df = df.interpolate(method=method)
        
        # 前向填充确保首尾完整
        df = df.fillna(method='ffill').fillna(method='bfill')
        
        return df.reset_index()

# 使用示例
preprocessor = DataPreprocessor()
raw_gps = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01 07:00', periods=100, freq='1min'),
    'lat': [39.9 + np.random.normal(0, 0.001, 100)],
    'lon': [116.4 + np.random.normal(0, 0.001, 100)],
    'speed': np.random.normal(30, 5, 100),
    'accuracy': np.random.randint(5, 25, 100)
})

cleaned_data = preprocessor.clean_gps_data(raw_gps)

1.3 特征工程:从原始数据到预测特征

数据预处理之后,需要构建对预测任务有价值的特征:

def extract_temporal_features(df):
    """提取时间维度特征"""
    df['hour'] = df['timestamp'].dt.hour
    df['minute'] = df['timestamp'].dt.minute
    df['day_of_week'] = df['timestamp'].dt.dayofweek
    df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
    df['is_holiday'] = df['timestamp'].apply(is_holiday)  # 自定义节假日判断
    
    # 高峰时段标记
    df['is_morning_peak'] = ((df['hour'] >= 7) & (df['hour'] <= 9)).astype(int)
    df['is_evening_peak'] = ((df['hour'] >= 17) & (df['hour'] <= 19)).astype(int)
    
    return df

def extract_spatial_features(gps_df, stops_df):
    """提取空间维度特征"""
    # 计算到各站点的距离
    for idx, stop in stops_df.iterrows():
        dist_col = f'dist_to_stop_{stop["stop_id"]}'
        gps_df[dist_col] = gps_df.apply(
            lambda row: haversine_distance(
                row['lat'], row['lon'],
                stop['lat'], stop['lon']
            ), axis=1
        )
    
    # 计算到线路终点的距离比例
    gps_df['progress_ratio'] = gps_df['dist_to_stop_end'] / gps_df['total_line_length']
    
    return gps_df

def extract_weather_features(weather_df):
    """提取天气特征"""
    # 天气编码映射
    weather_code_map = {
        '晴': 0, '多云': 1, '小雨': 2, '中雨': 3, '大雨': 4,
        '暴雨': 5, '雾': 6, '雪': 7, '冰雹': 8
    }
    
    weather_df['weather_encoded'] = weather_df['weather_condition'].map(weather_code_map)
    
    # 恶劣天气二值特征
    weather_df['is_heavy_rain'] = (weather_df['weather_encoded'] >= 4).astype(int)
    weather_df['is_snow'] = (weather_df['weather_encoded'] == 7).astype(int)
    weather_df['is_fog'] = (weather_df['weather_encoded'] == 6).astype(int)
    
    # 气象指标连续特征
    weather_df['visibility_score'] = np.clip(weather_df['visibility'] / 1000, 0, 1)
    weather_df['precipitation_intensity'] = weather_df['precipitation'].clip(0, 50)
    
    return weather_df

def calculate_traffic_congestion_features(gps_df, road_network):
    """计算交通拥堵特征"""
    # 基于历史数据计算路段平均速度
    segment_speeds = gps_df.groupby('segment_id')['speed'].agg(['mean', 'std'])
    
    # 拥堵指数 = (自由流速度 - 当前速度) / 自由流速度
    road_network = road_network.merge(segment_speeds, on='segment_id')
    road_network['congestion_index'] = (
        road_network['free_flow_speed'] - road_network['mean']
    ) / road_network['free_flow_speed']
    
    # 拥堵等级分类
    road_network['congestion_level'] = pd.cut(
        road_network['congestion_index'],
        bins=[-1, 0.2, 0.4, 0.6, 0.8, 1],
        labels=['畅通', '轻度拥堵', '中度拥堵', '重度拥堵', '严重拥堵']
    )
    
    return road_network

二、预测模型:从传统统计到深度学习

2.1 传统统计模型:ARIMA与指数平滑

对于时间序列预测,传统统计模型具有可解释性强、训练速度快的优点:

from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.holtwinters import ExponentialSmoothing

class TraditionalTimeSeriesModel:
    def __init__(self, order=(1,1,1), seasonal_order=(1,1,1,12)):
        self.order = order
        self.seasonal_order = seasonal_order
        self.model = None
        
    def fit_arima(self, train_data, target_col='travel_time'):
        """拟合SARIMA模型"""
        # 自动选择最优参数
        best_aic = np.inf
        best_order = None
        
        # 简单的参数网格搜索
        for p in [0, 1, 2]:
            for d in [0, 1]:
                for q in [0, 1, 2]:
                    try:
                        model = SARIMAX(
                            train_data[target_col],
                            order=(p, d, q),
                            seasonal_order=self.seasonal_order,
                            enforce_stationarity=False,
                            enforce_invertibility=False
                        )
                        fitted_model = model.fit(disp=False)
                        if fitted_model.aic < best_aic:
                            best_aic = fitted_model.aic
                            best_order = (p, d, q)
                    except:
                        continue
        
        # 使用最优参数训练最终模型
        self.model = SARIMAX(
            train_data[target_col],
            order=best_order,
            seasonal_order=self.seasonal_order
        )
        self.model_result = self.model.fit(disp=False)
        
        return self.model_result
    
    def predict(self, steps, alpha=0.95):
        """预测未来时间步"""
        forecast = self.model_result.get_forecast(steps=steps)
        predicted_mean = forecast.predicted_mean
        confidence_int = forecast.conf_int(alpha=1-alpha)
        
        return {
            'prediction': predicted_mean,
            'lower_bound': confidence_int.iloc[:, 0],
            'upper_bound': confidence_int.iloc[:, 1]
        }

# 使用示例
# 准备数据:某线路工作日早高峰历史旅行时间序列
ts_data = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01 07:00', periods=1000, freq='10min'),
    'travel_time': np.random.normal(45, 5, 1000) + np.sin(np.arange(1000)/50)*10
})

model = TraditionalTimeSeriesModel()
model.fit_arima(ts_data)
forecast = model.predict(steps=6)  # 预测未来1小时(6个10分钟间隔)
print(f"预测旅行时间: {forecast['prediction'].values}")

2.2 机器学习模型:XGBoost与LightGBM

对于多特征预测任务,集成学习模型表现出色:

import xgboost as xgb
import lightgbm as lgb
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error

class EnsemblePredictionModel:
    def __init__(self):
        self.xgb_model = None
        self.lgb_model = None
        self.feature_importance = {}
        
    def prepare_features(self, df, target_col='travel_time'):
        """准备训练特征"""
        # 特征列表
        feature_cols = [
            'hour', 'minute', 'day_of_week', 'is_weekend', 'is_holiday',
            'is_morning_peak', 'is_evening_peak', 'weather_encoded',
            'precipitation_intensity', 'visibility_score', 'congestion_index',
            'dist_to_stop_1', 'dist_to_stop_2', 'progress_ratio'
        ]
        
        X = df[feature_cols]
        y = df[target_col]
        
        return X, y
    
    def train_xgb_model(self, X_train, y_train, X_val=None, y_val=None):
        """训练XGBoost模型"""
        # 参数配置
        xgb_params = {
            'objective': 'reg:squarederror',
            'n_estimators': 1000,
            'max_depth': 6,
            'learning_rate': 0.05,
            'subsample': 0.8,
            'colsample_bytree': 0.8,
            'reg_alpha': 0.1,
            'reg_lambda': 1.0,
            'random_state': 42,
            'n_jobs': -1
        }
        
        if X_val is not None and y_val is not None:
            # 带验证集的训练
            self.xgb_model = xgb.XGBRegressor(**xgb_params)
            self.xgb_model.fit(
                X_train, y_train,
                eval_set=[(X_val, y_val)],
                early_stopping_rounds=50,
                verbose=False
            )
        else:
            # 交叉验证训练
            tscv = TimeSeriesSplit(n_splits=5)
            best_score = np.inf
            
            for train_idx, val_idx in tscv.split(X_train):
                X_tr, X_v = X_train.iloc[train_idx], X_train.iloc[val_idx]
                y_tr, y_v = y_train.iloc[train_idx], y_train.iloc[val_idx]
                
                model = xgb.XGBRegressor(**xgb_params)
                model.fit(X_tr, y_tr, eval_set=[(X_v, y_v)], verbose=False)
                
                score = mean_absolute_error(y_v, model.predict(X_v))
                if score < best_score:
                    best_score = score
                    self.xgb_model = model
        
        # 记录特征重要性
        self.feature_importance['xgb'] = dict(
            zip(X_train.columns, self.xgb_model.feature_importances_)
        )
        
        return self.xgb_model
    
    def train_lgb_model(self, X_train, y_train, X_val=None, y_val=None):
        """训练LightGBM模型"""
        lgb_params = {
            'objective': 'regression',
            'n_estimators': 1000,
            'max_depth': -1,
            'learning_rate': 0.05,
            'subsample': 0.8,
            'colsample_bytree': 0.8,
            'reg_alpha': 0.1,
            'reg_lambda': 1.0,
            'random_state': 42,
            'n_jobs': -1,
            'verbose': -1
        }
        
        if X_val is not None and y_val is not None:
            self.lgb_model = lgb.LGBMRegressor(**lgb_params)
            self.lgb_model.fit(
                X_train, y_train,
                eval_set=[(X_val, y_val)],
                callbacks=[lgb.early_stopping(50), lgb.log_evaluation(100)]
            )
        else:
            tscv = TimeSeriesSplit(n_splits=5)
            best_score = np.inf
            
            for train_idx, val_idx in tscv.split(X_train):
                X_tr, X_v = X_train.iloc[train_idx], X_train.iloc[val_idx]
                y_tr, y_v = y_train.iloc[train_idx], y_train.iloc[val_idx]
                
                model = lgb.LGBMRegressor(**lgb_params)
                model.fit(X_tr, y_tr, eval_set=[(X_v, y_v)], verbose=False)
                
                score = mean_absolute_error(y_v, model.predict(X_v))
                if score < best_score:
                    best_score = score
                    self.lgb_model = model
        
        self.feature_importance['lgb'] = dict(
            zip(X_train.columns, self.lgb_model.feature_importances_)
        )
        
        return self.lgb_model
    
    def predict(self, X):
        """模型融合预测"""
        if self.xgb_model is None and self.lgb_model is None:
            raise ValueError("Models not trained yet")
        
        predictions = []
        
        if self.xgb_model is not None:
            predictions.append(self.xgb_model.predict(X))
        
        if self.lgb_model is not None:
            predictions.append(self.lgb_model.predict(X))
        
        # 简单平均融合
        final_pred = np.mean(predictions, axis=0)
        
        return final_pred

# 使用示例
model = EnsemblePredictionModel()

# 准备数据
df = pd.DataFrame({
    'hour': np.random.randint(0, 24, 1000),
    'minute': np.random.randint(0, 60, 1000),
    'day_of_week': np.random.randint(0, 7, 1000),
    'is_weekend': np.random.randint(0, 2, 1000),
    'is_holiday': np.random.randint(0, 2, 1000),
    'is_morning_peak': np.random.randint(0, 2, 1000),
    'is_evening_peak': np.random.randint(0, 2, 1000),
    'weather_encoded': np.random.randint(0, 9, 1000),
    'precipitation_intensity': np.random.uniform(0, 10, 1000),
    'visibility_score': np.random.uniform(0.2, 1, 1000),
    'congestion_index': np.random.uniform(0, 0.8, 1000),
    'dist_to_stop_1': np.random.uniform(0, 5000, 1000),
    'dist_to_stop_2': np.random.uniform(0, 5000, 1000),
    'progress_ratio': np.random.uniform(0, 1, 1000),
    'travel_time': np.random.normal(45, 5, 1000)
})

X, y = model.prepare_features(df)
X_train, X_test = X[:800], X[800:]
y_train, y_test = y[:800], y[800:]

# 训练模型
model.train_xgb_model(X_train, y_train, X_test, y_test)
model.train_lgb_model(X_train, y_train, X_test, y_test)

# 预测
sample_X = X_test.iloc[:10]
predictions = model.predict(sample_X)
print(f"预测结果: {predictions}")

2.3 深度学习模型:LSTM与Transformer

对于复杂的时序依赖关系,深度学习模型具有更强的表达能力:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

class TravelTimeDataset(Dataset):
    """自定义数据集类"""
    def __init__(self, sequences, targets):
        self.sequences = torch.FloatTensor(sequences)
        self.targets = torch.FloatTensor(targets)
        
    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return self.sequences[idx], self.targets[idx]

class LSTMTravelTimePredictor(nn.Module):
    """LSTM旅行时间预测模型"""
    def __init__(self, input_size, hidden_size=128, num_layers=2, output_size=1, dropout=0.2):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers,
            batch_first=True, dropout=dropout if num_layers > 1 else 0
        )
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # LSTM层
        lstm_out, (h_n, c_n) = self.lstm(x)
        
        # 取最后一个时间步的输出
        last_output = lstm_out[:, -1, :]
        
        # 全连接层
        out = self.dropout(last_output)
        out = self.fc(out)
        
        return out

class TransformerTravelTimePredictor(nn.Module):
    """Transformer旅行时间预测模型"""
    def __init__(self, input_size, d_model=128, nhead=8, num_layers=4, output_size=1, dropout=0.1):
        super().__init__()
        
        self.input_embedding = nn.Linear(input_size, d_model)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=nhead,
            dim_feedforward=512, dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(d_model, output_size)
        
    def forward(self, x):
        # 输入嵌入
        x = self.input_embedding(x)
        
        # Transformer编码
        x = self.transformer_encoder(x)
        
        # 取最后一个时间步
        x = x[:, -1, :]
        
        # 输出层
        x = self.dropout(x)
        x = self.fc(x)
        
        return x

def create_sequences(data, seq_length=12):
    """创建时间序列样本"""
    sequences = []
    targets = []
    
    for i in range(len(data) - seq_length):
        seq = data[i:i+seq_length]
        target = data[i+seq_length]
        sequences.append(seq)
        targets.append(target)
    
    return np.array(sequences), np.array(targets)

def train_lstm_model(X_train, y_train, X_val, y_val, epochs=100):
    """训练LSTM模型"""
    # 转换为PyTorch数据集
    train_dataset = TravelTimeDataset(X_train, y_train)
    val_dataset = TravelTimeDataset(X_val, y_val)
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    
    # 初始化模型
    input_size = X_train.shape[2]  # 特征维度
    model = LSTMTravelTimePredictor(input_size=input_size)
    
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
    
    best_val_loss = float('inf')
    
    for epoch in range(epochs):
        # 训练阶段
        model.train()
        train_loss = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs.squeeze(), batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # 验证阶段
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                outputs = model(batch_X)
                loss = criterion(outputs.squeeze(), batch_y)
                val_loss += loss.item()
        
        scheduler.step(val_loss)
        
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_lstm_model.pth')
        
        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Train Loss={train_loss/len(train_loader):.4f}, "
                  f"Val Loss={val_loss/len(val_loader):.4f}")
    
    return model

# 使用示例
# 准备序列数据
data = np.random.normal(45, 5, 1000)
sequences, targets = create_sequences(data, seq_length=12)

# 划分训练验证集
split_idx = int(0.8 * len(sequences))
X_train_seq, X_val_seq = sequences[:split_idx], sequences[split_idx:]
y_train_seq, y_val_seq = targets[:split_idx], targets[split_idx:]

# 调整维度以适应LSTM输入 (batch, seq_len, features)
X_train_seq = X_train_seq.reshape(-1, 12, 1)
X_val_seq = X_val_seq.reshape(-1, 12, 1)

# 训练模型
lstm_model = train_lstm_model(X_train_seq, y_train_seq, X_val_seq, y_val_seq, epochs=50)

三、动态排班策略:从预测到决策

3.1 基于预测的动态发车间隔调整

预测模型输出的是旅行时间,但我们需要将其转化为具体的排班决策:

class DynamicScheduler:
    def __init__(self, base_headway=5, min_headway=2, max_headway=15):
        self.base_headway = base_headway  # 基础发车间隔(分钟)
        self.min_headway = min_headway    # 最小发车间隔
        self.max_headway = max_headway    # 最大发车间隔
        
    def calculate_optimal_headway(self, predicted_travel_time, current_load, weather_factor):
        """
        计算最优发车间隔
        predicted_travel_time: 预测旅行时间(分钟)
        current_load: 当前车辆满载率(0-1)
        weather_factor: 天气影响系数(1.0为正常)
        """
        # 1. 基于旅行时间的调整
        # 旅行时间越长,需要越多的车辆来维持服务水平
        time_factor = predicted_travel_time / 45  # 45分钟为基准
        
        # 2. 基于客流的调整
        # 满载率越高,需要缩短间隔
        load_factor = 1 + (current_load - 0.5) * 0.5
        
        # 3. 天气影响
        # 恶劣天气需要增加间隔以保证安全
        weather_impact = weather_factor
        
        # 4. 综合计算
        target_headway = self.base_headway * time_factor * load_factor * weather_impact
        
        # 5. 边界约束
        optimal_headway = np.clip(target_headway, self.min_headway, self.max_headway)
        
        return optimal_headway
    
    def generate_schedule(self, start_time, end_time, predictions, loads, weather_forecast):
        """
        生成动态排班表
        """
        schedule = []
        current_time = start_time
        
        while current_time < end_time:
            # 获取当前时间的预测数据
            time_key = current_time.replace(minute=0, second=0)
            
            pred_travel_time = predictions.get(time_key, 45)
            current_load = loads.get(time_key, 0.5)
            weather_factor = weather_forecast.get(time_key, 1.0)
            
            # 计算最优间隔
            headway = self.calculate_optimal_headway(
                pred_travel_time, current_load, weather_factor
            )
            
            # 生成发车记录
            schedule.append({
                'departure_time': current_time,
                'headway': headway,
                'predicted_travel_time': pred_travel_time,
                'recommended_vehicle_count': np.ceil(pred_travel_time / headway)
            })
            
            # 下一班次
            current_time += timedelta(minutes=headway)
        
        return schedule

# 使用示例
scheduler = DynamicScheduler(base_headway=5)

# 模拟预测数据(每小时)
predictions = {
    datetime(2024, 1, 1, 7): 55,  # 早高峰拥堵
    datetime(2024, 1, 1, 8): 50,
    datetime(2024, 1, 1, 9): 48,
    datetime(2024, 1, 1, 17): 58, # 晚高峰拥堵
    datetime(2024, 1, 1, 18): 52,
}

loads = {
    datetime(2024, 1, 1, 7): 0.85,
    datetime(2024, 1, 1, 8): 0.90,
    datetime(2024, 1, 1, 9): 0.75,
    datetime(2024, 1, 1, 17): 0.92,
    datetime(2024, 1, 1, 18): 0.88,
}

weather_forecast = {
    datetime(2024, 1, 1, 7): 1.2,  # 小雨,影响系数1.2
    datetime(2024, 1, 1, 8): 1.2,
    datetime(2024, 1, 1, 9): 1.0,
    datetime(2024, 1, 1, 17): 1.0,
    datetime(2024, 1, 1, 18): 1.0,
}

schedule = scheduler.generate_schedule(
    datetime(2024, 1, 1, 7, 0),
    datetime(2024, 1, 1, 10, 0),
    predictions,
    loads,
    weather_forecast
)

for item in schedule[:5]:
    print(f"发车时间: {item['departure_time'].strftime('%H:%M')}, "
          f"间隔: {item['headway']:.1f}分钟, "
          f"预计行程: {item['predicted_travel_time']:.1f}分钟")

3.2 突发事件响应机制

当遇到交通事故、极端天气等突发事件时,系统需要快速调整:

class EmergencyResponseSystem:
    def __init__(self):
        self.emergency_events = {}
        self.affected_routes = set()
        
    def register_emergency_event(self, event_id, event_type, location, severity, timestamp):
        """注册突发事件"""
        self.emergency_events[event_id] = {
            'type': event_type,  # 'accident', 'weather', 'road_work'
            'location': location,
            'severity': severity,  # 1-5级
            'timestamp': timestamp,
            'status': 'active'
        }
        
        # 识别受影响的线路
        self.affected_routes = self.identify_affected_routes(location)
        
        print(f"突发事件注册: {event_type} at {location}, 严重等级: {severity}")
        print(f"受影响线路: {self.affected_routes}")
    
    def identify_affected_routes(self, location):
        """识别受影响的公交线路"""
        # 基于地理围栏和线路匹配
        # 这里简化处理,实际应基于GIS系统
        affected = set()
        
        # 模拟线路匹配逻辑
        route_database = {
            'Route_1': {'path': [(39.9, 116.4), (39.91, 116.41)], 'radius': 0.5},
            'Route_2': {'path': [(39.92, 116.42), (39.93, 116.43)], 'radius': 0.5},
        }
        
        for route_id, route_info in route_database.items():
            if self.is_location_on_route(location, route_info):
                affected.add(route_id)
        
        return affected
    
    def is_location_on_route(self, location, route_info):
        """判断位置是否在线路附近"""
        # 简化实现:计算距离
        lat, lon = location
        for point in route_info['path']:
            dist = np.sqrt((lat - point[0])**2 + (lon - point[1])**2)
            if dist <= route_info['radius']:
                return True
        return False
    
    def generate_emergency_schedule(self, base_schedule, emergency_factor=1.5):
        """生成应急调度方案"""
        emergency_schedule = []
        
        for trip in base_schedule:
            # 如果线路受影响,增加间隔并调整路线
            if any(route in self.affected_routes for route in trip.get('routes', [])):
                # 增加发车间隔
                adjusted_headway = trip['headway'] * emergency_factor
                
                # 可能需要绕行
                detour_info = self.get_detour_info(trip.get('route_id'))
                
                emergency_schedule.append({
                    **trip,
                    'original_headway': trip['headway'],
                    'adjusted_headway': adjusted_headway,
                    'detour': detour_info,
                    'delay_estimate': trip['travel_time'] * 1.3,
                    'emergency_note': f"受{self.get_emergency_type()}影响"
                })
            else:
                emergency_schedule.append(trip)
        
        return emergency_schedule
    
    def get_emergency_type(self):
        """获取当前紧急事件类型"""
        if not self.emergency_events:
            return "无"
        
        types = [e['type'] for e in self.emergency_events.values() if e['status'] == 'active']
        return ", ".join(set(types))
    
    def get_detour_info(self, route_id):
        """获取绕行信息"""
        # 实际系统中应基于GIS计算最优绕行路径
        detours = {
            'Route_1': {
                'detour_path': '绕行XX路-YY路',
                'extra_distance': 2.5,  # 公里
                'extra_time': 8  # 分钟
            }
        }
        return detours.get(route_id, None)
    
    def update_event_status(self, event_id, status):
        """更新事件状态"""
        if event_id in self.emergency_events:
            self.emergency_events[event_id]['status'] = status
            print(f"事件 {event_id} 状态更新为: {status}")

# 使用示例
emergency_system = EmergencyResponseSystem()

# 模拟突发事件:早高峰发生交通事故
emergency_system.register_emergency_event(
    event_id='EVT_20240101_001',
    event_type='accident',
    location=(39.905, 116.405),
    severity=3,
    timestamp=datetime(2024, 1, 1, 7, 30)
)

# 原始排班
base_schedule = [
    {'departure_time': datetime(2024, 1, 1, 7, 0), 'headway': 5, 'travel_time': 45, 'routes': ['Route_1']},
    {'departure_time': datetime(2024, 1, 1, 7, 5), 'headway': 5, 'travel_time': 45, 'routes': ['Route_1']},
    {'departure_time': datetime(2024, 1, 1, 7, 10), 'headway': 5, 'travel_time': 45, 'routes': ['Route_2']},
]

# 生成应急调度
emergency_schedule = emergency_system.generate_emergency_schedule(base_schedule)

for trip in emergency_schedule:
    print(f"发车: {trip['departure_time'].strftime('%H:%M')}, "
          f"原间隔: {trip.get('original_headway', trip['headway'])}分钟, "
          f"调整后: {trip.get('adjusted_headway', trip['headway'])}分钟, "
          f"预计时间: {trip.get('delay_estimate', trip['travel_time'])}分钟, "
          f"备注: {trip.get('emergency_note', '正常')}")

3.3 多目标优化:平衡效率与服务

公交调度需要在多个目标之间取得平衡:

from scipy.optimize import minimize

class MultiObjectiveOptimizer:
    def __init__(self, cost_per_vehicle=50, passenger_time_value=0.5):
        """
        cost_per_vehicle: 每辆车每小时的运营成本(元)
        passenger_time_value: 乘客时间价值(元/分钟)
        """
        self.cost_per_vehicle = cost_per_vehicle
        self.passenger_time_value = passenger_time_value
    
    def objective_function(self, x, args):
        """
        多目标优化函数
        x: [headway, vehicle_count]
        args: (predicted_travel_time, expected_passengers, weather_factor)
        """
        headway, vehicle_count = x
        pred_travel_time, expected_passengers, weather_factor = args
        
        # 目标1:运营成本最小化
        total_cost = vehicle_count * self.cost_per_vehicle
        
        # 目标2:乘客等待时间成本最小化
        # 平均等待时间 = headway / 2
        waiting_time_cost = (headway / 2) * expected_passengers * self.passenger_time_value
        
        # 目标3:旅行时间成本(受拥堵和天气影响)
        travel_time_cost = pred_travel_time * expected_passengers * self.passenger_time_value * weather_factor
        
        # 目标4:拥挤成本(车辆不足导致)
        # 理论上需要的车辆数 = pred_travel_time / headway
        required_vehicles = pred_travel_time / headway
        shortage_cost = max(0, required_vehicles - vehicle_count) * 100  # 惩罚项
        
        # 综合目标函数(加权和)
        total_objective = (
            0.3 * total_cost +
            0.3 * waiting_time_cost +
            0.3 * travel_time_cost +
            0.1 * shortage_cost
        )
        
        return total_objective
    
    def constraints(self, x, args):
        """约束条件"""
        headway, vehicle_count = x
        pred_travel_time, expected_passengers, weather_factor = args
        
        # 约束1:发车间隔不能太短(安全考虑)
        c1 = headway - 2  # headway >= 2
        
        # 约束2:发车间隔不能太长(服务标准)
        c2 = 15 - headway  # headway <= 15
        
        # 约束3:车辆数必须满足基本需求
        required_vehicles = pred_travel_time / headway
        c3 = vehicle_count - required_vehicles  # vehicle_count >= required_vehicles
        
        # 约束4:车辆数上限(车队规模)
        c4 = 50 - vehicle_count  # vehicle_count <= 50
        
        return [c1, c2, c3, c4]
    
    def optimize_schedule(self, pred_travel_time, expected_passengers, weather_factor=1.0):
        """执行优化"""
        # 初始猜测
        x0 = [5, 10]  # headway=5分钟, vehicles=10
        
        # 边界
        bounds = [(2, 15), (1, 50)]
        
        # 约束条件
        cons = {
            'type': 'ineq',
            'fun': lambda x: self.constraints(x, (pred_travel_time, expected_passengers, weather_factor))
        }
        
        # 优化
        result = minimize(
            self.objective_function,
            x0,
            args=(pred_travel_time, expected_passengers, weather_factor),
            method='SLSQP',
            bounds=bounds,
            constraints=cons,
            options={'ftol': 1e-6, 'disp': False}
        )
        
        if result.success:
            optimal_headway, optimal_vehicles = result.x
            return {
                'headway': round(optimal_headway, 2),
                'vehicle_count': int(np.ceil(optimal_vehicles)),
                'total_cost': result.fun,
                'success': True
            }
        else:
            return {'success': False, 'message': result.message}

# 使用示例
optimizer = MultiObjectiveOptimizer()

# 场景:早高峰,预计乘客1200人/小时,旅行时间55分钟,小雨天气
result = optimizer.optimize_schedule(
    pred_travel_time=55,
    expected_passengers=1200,
    weather_factor=1.2
)

print("优化结果:")
print(f"推荐发车间隔: {result['headway']}分钟")
print(f"所需车辆数: {result['vehicle_count']}辆")
print(f"综合成本: {result['total_cost']:.2f}")

# 场景对比:不同天气条件
scenarios = [
    (45, 800, 1.0, "正常工作日"),
    (55, 1200, 1.2, "早高峰+小雨"),
    (60, 1500, 1.5, "晚高峰+大雨"),
]

for travel_time, passengers, weather, desc in scenarios:
    result = optimizer.optimize_schedule(travel_time, passengers, weather)
    print(f"\n{desc}:")
    print(f"  间隔: {result['headway']}分钟, 车辆: {result['vehicle_count']}辆")

四、实时监控与反馈系统

4.1 实时数据流处理

为了实现真正的动态调度,需要建立实时数据处理管道:

from kafka import KafkaConsumer, KafkaProducer
import json
from collections import deque

class RealTimeDataProcessor:
    def __init__(self, kafka_bootstrap_servers=['localhost:9092']):
        self.bootstrap_servers = kafka_bootstrap_servers
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.gps_buffer = deque(maxlen=1000)  # GPS数据缓冲区
        self.weather_buffer = deque(maxlen=100)  # 天气数据缓冲区
        
    def start_gps_consumer(self, topic='bus_gps'):
        """启动GPS数据消费者"""
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='latest',
            group_id='bus调度消费者组'
        )
        
        print(f"开始消费GPS数据 from topic: {topic}")
        
        for message in consumer:
            gps_data = message.value
            self.process_gps_data(gps_data)
            
    def process_gps_data(self, gps_data):
        """处理单条GPS数据"""
        # 数据验证
        required_fields = ['vehicle_id', 'timestamp', 'lat', 'lon', 'speed']
        if not all(field in gps_data for field in required_fields):
            print(f"无效GPS数据: {gps_data}")
            return
        
        # 添加到缓冲区
        self.gps_buffer.append(gps_data)
        
        # 实时异常检测
        self.detect_anomalies(gps_data)
        
        # 每10条数据触发一次预测更新
        if len(self.gps_buffer) % 10 == 0:
            self.update_predictions()
            
    def detect_anomalies(self, gps_data):
        """实时异常检测"""
        vehicle_id = gps_data['vehicle_id']
        speed = gps_data['speed']
        
        # 检测速度异常
        if speed > 80 or speed < 0:
            alert = {
                'type': 'speed_anomaly',
                'vehicle_id': vehicle_id,
                'speed': speed,
                'timestamp': gps_data['timestamp'],
                'severity': 'high'
            }
            self.send_alert(alert)
        
        # 检测位置异常(偏离路线)
        if self.is_off_route(gps_data):
            alert = {
                'type': 'route_deviation',
                'vehicle_id': vehicle_id,
                'location': (gps_data['lat'], gps_data['lon']),
                'timestamp': gps_data['timestamp'],
                'severity': 'medium'
            }
            self.send_alert(alert)
    
    def is_off_route(self, gps_data):
        """判断车辆是否偏离预定路线"""
        # 简化实现:基于历史轨迹的统计检测
        # 实际应基于GIS路线匹配
        if len(self.gps_buffer) < 5:
            return False
        
        recent_positions = [(d['lat'], d['lon']) for d in list(self.gps_buffer)[-5:]]
        avg_lat = np.mean([p[0] for p in recent_positions])
        avg_lon = np.mean([p[1] for p in recent_positions])
        
        current_lat = gps_data['lat']
        current_lon = gps_data['lon']
        
        deviation = np.sqrt((current_lat - avg_lat)**2 + (current_lon - avg_lon)**2)
        
        # 如果偏离超过0.01度(约1公里),视为异常
        return deviation > 0.01
    
    def send_alert(self, alert):
        """发送告警"""
        print(f"ALERT: {alert['type']} - {alert}")
        # 发送到告警topic
        self.producer.send('bus_alerts', alert)
        self.producer.flush()
    
    def update_predictions(self):
        """基于实时数据更新预测"""
        if len(self.gps_buffer) < 10:
            return
        
        # 计算当前平均速度
        recent_speeds = [d['speed'] for d in list(self.gps_buffer)[-10:]]
        avg_speed = np.mean(recent_speeds)
        
        # 计算当前旅行时间
        # 假设线路长度为15公里
        line_length = 15
        current_travel_time = (line_length / avg_speed) * 60  # 分钟
        
        # 发布更新
        update = {
            'timestamp': datetime.now().isoformat(),
            'avg_speed': avg_speed,
            'current_travel_time': current_travel_time,
            'update_type': 'realtime_adjustment'
        }
        
        self.producer.send('schedule_updates', update)
        self.producer.flush()
        
        print(f"实时更新: 平均速度 {avg_speed:.1f} km/h, 旅行时间 {current_travel_time:.1f} 分钟")

# 使用示例(需要运行Kafka)
# processor = RealTimeDataProcessor()
# processor.start_gps_consumer()

4.2 预测效果评估与模型迭代

持续监控预测准确性并触发模型重训练:

from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import sqlite3

class ModelMonitor:
    def __init__(self, db_path='predictions.db'):
        self.db_path = db_path
        self.init_database()
        
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS prediction_logs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT,
                route_id TEXT,
                predicted_time REAL,
                actual_time REAL,
                error REAL,
                features TEXT
            )
        ''')
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS model_performance (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                model_name TEXT,
                mae REAL,
                rmse REAL,
                r2 REAL,
                sample_count INTEGER,
                evaluated_at TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def log_prediction(self, route_id, predicted_time, actual_time, features):
        """记录预测结果"""
        error = actual_time - predicted_time
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO prediction_logs 
            (timestamp, route_id, predicted_time, actual_time, error, features)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            datetime.now().isoformat(),
            route_id,
            predicted_time,
            actual_time,
            error,
            json.dumps(features)
        ))
        
        conn.commit()
        conn.close()
        
        # 如果误差超过阈值,触发告警
        if abs(error) > 10:  # 误差超过10分钟
            self.trigger_retraining_alert(route_id, error)
    
    def evaluate_model_performance(self, model_name, lookback_hours=24):
        """评估模型性能"""
        conn = sqlite3.connect(self.db_path)
        
        # 获取最近的数据
        query = '''
            SELECT predicted_time, actual_time 
            FROM prediction_logs 
            WHERE timestamp >= datetime('now', '-{} hours')
        '''.format(lookback_hours)
        
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        if len(df) < 10:
            return None
        
        # 计算指标
        y_pred = df['predicted_time']
        y_true = df['actual_time']
        
        mae = mean_absolute_error(y_true, y_pred)
        rmse = np.sqrt(mean_squared_error(y_true, y_pred))
        r2 = r2_score(y_true, y_pred)
        
        # 记录到数据库
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO model_performance 
            (model_name, mae, rmse, r2, sample_count, evaluated_at)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (model_name, mae, rmse, r2, len(df), datetime.now().isoformat()))
        
        conn.commit()
        conn.close()
        
        return {
            'mae': mae,
            'rmse': rmse,
            'r2': r2,
            'sample_count': len(df)
        }
    
    def trigger_retraining_alert(self, route_id, error):
        """触发模型重训练告警"""
        print(f"⚠️  预测误差告警: 路线 {route_id}, 误差 {error:.1f} 分钟")
        print("建议: 检查数据质量,考虑重新训练模型")
        
        # 自动触发重训练逻辑
        self.schedule_model_retraining(route_id)
    
    def schedule_model_retraining(self, route_id):
        """安排模型重训练"""
        # 检查最近是否已安排重训练
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT COUNT(*) FROM model_performance 
            WHERE model_name = ? AND evaluated_at >= datetime('now', '-1 day')
        ''', (f"retrain_{route_id}",))
        
        count = cursor.fetchone()[0]
        conn.close()
        
        if count == 0:
            print(f"📅 安排路线 {route_id} 的模型重训练")
            # 这里可以调用重训练脚本
            # subprocess.run(['python', 'retrain_model.py', route_id])
        else:
            print(f"路线 {route_id} 今天已安排过重训练,跳过")

# 使用示例
monitor = ModelMonitor()

# 模拟记录预测结果
monitor.log_prediction(
    route_id='Route_1',
    predicted_time=48.5,
    actual_time=52.3,
    features={'hour': 8, 'weather': 'rain', 'congestion': 0.6}
)

# 评估性能
performance = monitor.evaluate_model_performance('xgb_model_v1')
if performance:
    print(f"模型性能: MAE={performance['mae']:.2f}, RMSE={performance['rmse']:.2f}, R2={performance['r2']:.2f}")

五、完整系统架构与部署

5.1 系统架构设计

一个完整的智能公交调度系统应该包含以下组件:

数据采集层:
├─ GPS数据流 (Kafka)
├─ 刷卡数据 (API/DB)
├─ 天气数据 (API)
├─ 交通数据 (API)
└─ 事件数据 (人工/自动上报)

数据处理层:
├─ 实时流处理 (Flink/Spark Streaming)
├─ 数据清洗与存储 (HDFS/PostgreSQL)
└─ 特征工程服务

模型服务层:
├─ 预测模型服务 (REST API)
├─ 模型版本管理 (MLflow)
└─ 模型监控 (Prometheus/Grafana)

决策优化层:
├─ 动态排班引擎
├─ 多目标优化器
└─ 应急响应模块

应用层:
├─ 调度员界面 (Web)
├─ 司机APP
├─ 乘客APP
└─ 数据看板

5.2 Docker部署配置

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    libgeos-dev \
    && rm -rf /var/lib/apt/lists/*

# 安装Python依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "app.py"]
# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  postgres:
    image: postgres:13
    environment:
      POSTGRES_DB: bus调度
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"

  bus-scheduler:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - kafka
      - postgres
      - redis
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      DATABASE_URL: postgresql://admin:password@postgres:5432/bus调度
      REDIS_URL: redis://redis:6379
      MODEL_PATH: /app/models/best_model.pkl
    volumes:
      - ./models:/app/models
      - ./logs:/app/logs

  dashboard:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin123
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  postgres_data:
  grafana_data:

5.3 API服务实现

from flask import Flask, request, jsonify
from datetime import datetime, timedelta
import joblib
import numpy as np

app = Flask(__name__)

# 全局变量(实际应使用数据库)
models = {}
scheduler = DynamicScheduler()
optimizer = MultiObjectiveOptimizer()

@app.route('/api/predict', methods=['POST'])
def predict_travel_time():
    """预测旅行时间"""
    data = request.json
    
    required_fields = ['route_id', 'timestamp', 'weather', 'congestion']
    if not all(field in data for field in required_fields):
        return jsonify({'error': '缺少必要字段'}), 400
    
    try:
        # 特征准备
        timestamp = datetime.fromisoformat(data['timestamp'])
        features = {
            'hour': timestamp.hour,
            'minute': timestamp.minute,
            'day_of_week': timestamp.weekday(),
            'is_weekend': 1 if timestamp.weekday() >= 5 else 0,
            'is_holiday': 0,  # 需要节假日判断
            'is_morning_peak': 1 if 7 <= timestamp.hour <= 9 else 0,
            'is_evening_peak': 1 if 17 <= timestamp.hour <= 19 else 0,
            'weather_encoded': data['weather'],
            'congestion_index': data['congestion'],
        }
        
        # 加载模型(实际应缓存)
        model_path = f"models/{data['route_id']}_model.pkl"
        model = joblib.load(model_path)
        
        # 预测
        feature_array = np.array([list(features.values())])
        prediction = model.predict(feature_array)[0]
        
        return jsonify({
            'route_id': data['route_id'],
            'predicted_travel_time': round(prediction, 2),
            'timestamp': data['timestamp'],
            'confidence': 'high'
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/schedule', methods=['POST'])
def generate_schedule():
    """生成动态排班"""
    data = request.json
    
    try:
        start_time = datetime.fromisoformat(data['start_time'])
        end_time = datetime.fromisoformat(data['end_time'])
        route_id = data['route_id']
        
        # 获取预测数据
        predictions = {}
        current = start_time
        while current < end_time:
            # 调用预测接口
            pred_response = app.test_client().post('/api/predict', json={
                'route_id': route_id,
                'timestamp': current.isoformat(),
                'weather': data.get('weather', 0),
                'congestion': data.get('congestion', 0.3)
            })
            
            if pred_response.status_code == 200:
                pred_data = pred_response.get_json()
                predictions[current.replace(minute=0)] = pred_data['predicted_travel_time']
            
            current += timedelta(hours=1)
        
        # 生成排班
        schedule = scheduler.generate_schedule(
            start_time, end_time, predictions, {}, {}
        )
        
        return jsonify({
            'route_id': route_id,
            'schedule': schedule,
            'generated_at': datetime.now().isoformat()
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/api/emergency', methods=['POST'])
def handle_emergency():
    """处理突发事件"""
    data = request.json
    
    emergency_system = EmergencyResponseSystem()
    emergency_system.register_emergency_event(
        event_id=data['event_id'],
        event_type=data['event_type'],
        location=tuple(data['location']),
        severity=data['severity'],
        timestamp=datetime.fromisoformat(data['timestamp'])
    )
    
    # 生成应急调度
    base_schedule = data.get('base_schedule', [])
    emergency_schedule = emergency_system.generate_emergency_schedule(base_schedule)
    
    return jsonify({
        'emergency_id': data['event_id'],
        'affected_routes': list(emergency_system.affected_routes),
        'adjusted_schedule': emergency_schedule
    })

@app.route('/api/health', methods=['GET'])
def health_check():
    """健康检查"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'models_loaded': len(models)
    })

if __name__ == '__main__':
    # 预加载模型
    # models['Route_1'] = joblib.load('models/Route_1_model.pkl')
    app.run(host='0.0.0.0', port=8000, debug=False)

六、实际案例分析

6.1 案例:北京市某公交线路优化

背景:线路123路,全长18公里,32个站点,日均客流3.5万人次。

问题

  • 早高峰(7:00-9:00)平均延误率35%
  • 晚高峰(17:00-19:00)平均延误率42%
  • 恶劣天气下准点率下降至60%

解决方案实施

  1. 数据整合:整合了3个月的历史GPS数据、刷卡数据、天气数据和高德路况数据,共计500万条记录。

  2. 模型训练

    • 使用XGBoost模型,输入特征包括时间、天气、历史拥堵模式等
    • 模型在测试集上的MAE为3.2分钟,R²=0.82
  3. 动态调度

    • 基础间隔:5分钟
    • 早高峰预测旅行时间增加时,间隔调整为4-6分钟
    • 大雨天气,间隔调整为6-8分钟,同时增加车辆投入

效果对比

指标 优化前 优化后 改善幅度
早高峰准点率 65% 88% +23%
晚高峰准点率 58% 85% +27%
乘客平均等待时间 8.5分钟 6.2分钟 -27%
高峰期满载率 95% 82% -13%
运营成本(月) 120万 115万 -4%

关键成功因素

  • 高质量的数据基础
  • 准确的预测模型
  • 灵活的调度策略
  • 实时的监控反馈

6.2 案例:应对突发暴雨事件

场景:2024年7月某日,下午16:30,气象局发布暴雨橙色预警,预计17:00-19:00有强降雨。

系统响应流程

  1. 预警接收(16:30):

    • 系统自动接收气象API预警
    • 识别受影响线路:123路、456路、789路
  2. 预测更新(16:35):

    • 天气影响系数从1.0调整为1.6
    • 预测旅行时间:123路从45分钟→58分钟
    • 预测客流:增加20%(乘客避雨需求)
  3. 调度调整(16:40):

    • 增加发车频率:间隔从5分钟→4分钟
    • 增派备用车辆:3辆
    • 发布乘客通知:预计延误15-20分钟
  4. 实时监控(17:00-19:00):

    • 每5分钟更新一次预测
    • 根据实际路况动态调整
    • 记录所有调整用于后续分析

结果

  • 准点率保持在75%(未优化情况下预计降至45%)
  • 乘客投诉减少60%
  • 无安全事故发生

七、挑战与未来展望

7.1 当前面临的挑战

  1. 数据质量与完整性

    • GPS信号漂移、丢失
    • 天气数据精度不足
    • 突发事件数据滞后
  2. 模型泛化能力

    • 不同城市、线路的差异性
    • 极端情况下的预测准确性
    • 模型更新频率与成本
  3. 系统集成复杂度

    • 与现有调度系统的兼容
    • 多部门数据协同
    • 系统稳定性要求
  4. 运营接受度

    • 调度员对AI决策的信任
    • 司机对动态调整的适应
    • 乘客对变化的感知

7.2 技术发展趋势

  1. 图神经网络(GNN)

    • 更好地建模路网拓扑结构
    • 提升空间特征提取能力
  2. 强化学习

    • 动态决策优化
    • 在线学习与自适应
  3. 联邦学习

    • 跨城市数据协作
    • 保护数据隐私
  4. 数字孪生

    • 构建虚拟公交系统
    • 模拟与优化调度策略

7.3 实施建议

对于想要实施智能公交调度的运营商,建议采取以下步骤:

  1. 第一阶段(1-3个月):数据基础设施建设

    • 建立数据采集管道
    • 清洗历史数据
    • 构建数据仓库
  2. 第二阶段(3-6个月):模型开发与验证

    • 选择试点线路
    • 开发预测模型
    • 离线验证效果
  3. 第三阶段(6-9个月):系统集成与试点

    • 开发调度系统
    • 小范围试点运行
    • 收集反馈优化
  4. 第四阶段(9-12个月):全面推广

    • 逐步扩大覆盖
    • 建立运维体系
    • 持续迭代改进

结语

智能公交排班系统是城市交通数字化转型的重要组成部分。通过整合多源数据、应用先进算法、构建动态调度策略,可以有效应对早晚高峰拥堵和突发天气变化,提升公交服务水平和运营效率。

然而,技术只是手段,真正的成功在于”人-车-路-环境”的协同优化。需要政府、运营商、技术提供商和乘客的共同努力,才能构建更加智能、高效、可靠的城市公交体系。

未来,随着5G、物联网、人工智能技术的进一步发展,公交调度将更加精准、自动化,最终实现”按需响应、动态平衡”的智慧出行愿景。