引言:手术室排期的挑战与大数据的机遇

手术室是医院中资源最紧张、成本最高的部门之一,其运营效率直接影响医院的盈利能力、患者满意度和医疗质量。传统的手术排期通常依赖于人工经验和简单的规则系统,这种方式存在诸多问题:手术时间预估不准导致资源闲置或冲突、突发急诊打乱原有计划、医护人员加班过度、设备使用率低下等。

随着医疗信息化的发展,医院积累了海量的历史数据,包括患者信息、手术记录、医护人员排班、设备使用情况等。大数据技术的出现为解决这些挑战提供了全新的思路。通过分析这些数据,我们可以建立精准的预测模型,实现手术室资源的智能调度,从而大幅提升运营效率。

本文将详细探讨如何构建一个基于大数据的医疗手术室排期预测系统,包括数据收集、特征工程、预测模型构建、资源优化算法以及系统实现等关键环节,并提供完整的代码示例。

1. 数据收集与整合:构建高质量数据基础

1.1 多源数据采集

一个有效的手术室排期预测系统需要整合来自多个系统的数据:

  1. 电子病历系统(EMR):患者基本信息、病史、术前检查结果、诊断编码(ICD-10)、手术操作编码(CPT)
  2. 手术麻醉系统:手术实际开始/结束时间、麻醉方式、麻醉时间、术中并发症
  3. 排班系统:医生、护士、麻醉师的排班计划和实际出勤
  4. 设备管理系统:手术器械、设备的使用记录和维护计划
  5. 医院信息系统(HIS):患者入院/出院/转院记录、床位信息
  6. 急诊记录:急诊手术的时间、类型、紧急程度

1.2 数据清洗与标准化

原始数据通常存在大量质量问题,需要进行严格处理:

  • 时间戳标准化:统一所有时间格式为ISO 8601标准
  • 缺失值处理:对于关键字段(如手术时间)的缺失,需要采用合理策略填充或剔除
  • 异常值检测:识别并处理明显错误的数据(如手术时间超过24小时)
  • 编码标准化:将不同来源的诊断和手术编码统一为标准体系(如ICD-10, CPT)

以下是一个Python示例,展示如何清洗手术时间数据:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def clean_surgery_data(df):
    """
    清洗手术室数据,处理时间戳、缺失值和异常值
    
    参数:
        df: 包含手术记录的DataFrame,必须有以下列:
            - patient_id: 患者ID
            - surgery_date: 手术日期
            - planned_start_time: 计划开始时间
            - planned_end_time: 计划结束时间
            - actual_start_time: 实际开始时间
            - actual_end_time: 实际结束时间
            - surgery_type: 手术类型编码
            - surgeon_id: 主刀医生ID
            - anesthesia_type: 麻醉类型
    
    返回:
        清洗后的DataFrame
    """
    # 1. 转换时间列为datetime类型
    time_cols = ['planned_start_time', 'planned_end_time', 
                 'actual_start_time', 'actual_end_time']
    
    for col in time_cols:
        # 处理可能的字符串格式不一致问题
        df[col] = pd.to_datetime(df[col], errors='coerce')
    
    # 2. 计算手术时长(分钟)
    df['planned_duration'] = (df['planned_end_time'] - df['planned_start_time']).dt.total_seconds() / 60
    df['actual_duration'] = (df['actual_end_time'] - df['actual_start_time']).dt.total_seconds() / 60
    
    # 3. 处理缺失值
    # 对于实际时间缺失的记录,用计划时间填充(假设按计划执行)
    missing_actual = df['actual_start_time'].isna() | df['actual_end_time'].isna()
    df.loc[missing_actual, 'actual_start_time'] = df.loc[missing_actual, 'planned_start_time']
    df.loc[missing_actual, 'actual_end_time'] = df.loc[missing_actual, 'planned_end_time']
    df.loc[missing_actual, 'actual_duration'] = df.loc[missing_actual, 'planned_duration']
    
    # 4. 异常值处理
    # 剔除手术时长小于5分钟或大于24小时的异常记录
    df = df[(df['actual_duration'] >= 5) & (df['actual_duration'] <= 1440)]
    
    # 5. 计算延误时间(分钟)
    df['start_delay'] = (df['actual_start_time'] - df['planned_start_time']).dt.total_seconds() / 60
    
    # 6. 标准化手术类型编码(示例:将描述性文本映射为标准编码)
    surgery_type_mapping = {
        '膝关节置换': 'KNEE_REPLACE',
        '冠状动脉搭桥': 'CABG',
        '白内障摘除': 'CATARACT',
        '阑尾切除术': 'APPEND'
    }
    df['surgery_type_code'] = df['surgery_type'].map(surgery_type_mapping)
    
    # 7. 添加特征:是否为急诊
    df['is_emergency'] = df['surgery_type'].str.contains('急诊|紧急', na=False)
    
    return df

# 示例数据
sample_data = {
    'patient_id': ['P001', 'P002', 'P003'],
    'surgery_date': ['2024-01-15', '2024-01-15', '2024-01-16'],
    'planned_start_time': ['2024-01-15 08:00', '2024-01-15 10:30', '2024-01-16 09:00'],
    'planned_end_time': ['2024-01-15 10:00', '2024-01-15 12:00', '2024-01-16 11:00'],
    'actual_start_time': ['2024-01-15 08:15', '2024-01-15 10:45', None],
    'actual_end_time': ['2024-01-15 10:30', '2024-01-15 12:15', None],
    'surgery_type': ['膝关节置换', '冠状动脉搭桥', '白内障摘除'],
    'surgeon_id': ['S001', 'S002', 'S003'],
    'anesthesia_type': ['全麻', '全麻', '局麻']
}

df = pd.DataFrame(sample_data)
cleaned_df = clean_surgery_data(df)
print(cleaned_df[['patient_id', 'planned_duration', 'actual_duration', 'start_delay', 'surgery_type_code']])

2. 特征工程:构建预测模型的关键输入

特征工程是决定模型性能的关键步骤。我们需要从原始数据中提取有意义的特征,以帮助模型理解手术时间的影响因素。

2.1 核心特征类别

2.1.1 患者相关特征

  • 人口统计学:年龄、性别、BMI
  • 合并症指数:使用Charlson合并症指数(CCI)或Elixhauser合并症指数量化患者健康状况
  • 术前检查指标:血红蛋白、白蛋白、凝血功能等关键指标
  • ASA分级:美国麻醉医师协会(ASA)体格状态分级

2.1.2 手术相关特征

  • 手术类型:CPT编码或手术名称(使用one-hot或嵌入表示)
  • 手术复杂度:手术等级(1-4级)、是否多部位手术
  • 麻醉类型:全麻、局麻、区域阻滞
  • 手术优先级:择期、限期、急诊

2.1.3 医护人员特征

  • 主刀医生经验:该医生历史手术数量、平均手术时长
  • 团队配合度:医生与护士、麻醉师的历史合作频率
  • 当日工作负荷:医生当天已安排的手术数量和时长

2.1.4 时间相关特征

  • 星期几:周末或工作日
  • 月份/季节:某些疾病有季节性
  • 手术时段:上午/下午/晚上
  • 节假日:是否为法定节假日

2.1.5 资源特征

  • 手术室类型:普通手术室、杂交手术室、感染手术室
  • 设备需求:特殊设备(如C臂机、显微镜)的可用性

2.2 特征工程代码实现

以下代码展示了如何构建这些特征:

import pandas as pd
import numpy as np
from datetime import datetime

def create_features(df, historical_df=None):
    """
    为手术排期预测模型构建特征
    
    参数:
        df: 待预测的手术计划数据
        historical_df: 历史手术数据(用于计算医生经验等统计特征)
    
    返回:
        特征矩阵X和目标变量y(如果历史数据存在)
    """
    features = df.copy()
    
    # 1. 患者相关特征
    # 模拟计算Charlson合并症指数(实际中需根据病历计算)
    def calculate_cci(age, conditions):
        """计算Charlson合并症指数"""
        score = 0
        if age > 50: score += 1
        if age > 60: score += 1
        if age > 70: score += 1
        # 简化示例:实际需根据具体疾病计算
        if '糖尿病' in conditions: score += 1
        if '心脏病' in conditions: score += 2
        if '肾病' in conditions: score += 2
        return score
    
    features['cci_score'] = features.apply(
        lambda row: calculate_cci(row['age'], row.get('conditions', [])), axis=1
    )
    
    # ASA分级数值化
    asa_mapping = {'I': 1, 'II': 2, 'III': 3, 'IV': 4, 'V': 5}
    features['asa_numeric'] = features['asa_grade'].map(asa_mapping)
    
    # 2. 手术相关特征
    # 手术类型编码(使用频率编码或one-hot)
    if historical_df is not None:
        # 计算每种手术类型的历史平均时长
        surgery_duration_stats = historical_df.groupby('surgery_type_code')['actual_duration'].agg(['mean', 'std']).reset_index()
        surgery_duration_stats.columns = ['surgery_type_code', 'surgery_type_mean_duration', 'surgery_type_std_duration']
        features = features.merge(surgery_duration_stats, on='surgery_type_code', how='left')
    else:
        # 如果没有历史数据,使用预设值
        features['surgery_type_mean_duration'] = features['surgery_type_code'].map({
            'KNEE_REPLACE': 120, 'CABG': 240, 'CATARACT': 30, 'APPEND': 60
        })
    
    # 手术复杂度(基于手术等级)
    features['complexity_score'] = features['surgery_level'].map({1: 1, 2: 2, 3: 3, 4: 4})
    
    # 3. 医生相关特征
    if historical_df is not None:
        # 计算每位医生的历史手术平均时长和数量
        surgeon_stats = historical_df.groupby('surgeon_id').agg({
            'actual_duration': ['mean', 'std', 'count'],
            'surgery_type_code': 'nunique'  # 医生擅长的手术类型数量
        }).reset_index()
        
        surgeon_stats.columns = ['surgeon_id', 'surgeon_mean_duration', 'surgeon_std_duration', 
                                'surgeon_total_cases', 'surgeon_specialties']
        features = features.merge(surgeon_stats, on='surgeon_id', how='left')
        
        # 填充缺失值(新医生用整体平均值)
        overall_mean = historical_df['actual_duration'].mean()
        features['surgeon_mean_duration'].fillna(overall_mean, inplace=True)
        features['surgeon_std_duration'].fillna(historical_df['actual_duration'].std(), inplace=True)
        features['surgeon_total_cases'].fillna(0, inplace=True)
        features['surgeon_specialties'].fillna(1, inplace=True)
    
    # 4. 时间特征
    features['surgery_date'] = pd.to_datetime(features['surgery_date'])
    features['day_of_week'] = features['surgery_date'].dt.dayofweek  # 0=Monday
    features['month'] = features['surgery_date'].dt.month
    features['is_weekend'] = features['day_of_week'].isin([5, 6]).astype(int)
    
    # 提取计划开始时间的小时
    features['planned_start_hour'] = pd.to_datetime(features['planned_start_time']).dt.hour
    
    # 5. 资源特征
    # 手术室类型编码
    room_type_mapping = {'普通': 0, '杂交': 1, '感染': 2}
    features['room_type_encoded'] = features['room_type'].map(room_type_mapping)
    
    # 6. 交互特征
    # 医生-手术类型交互:医生在该手术类型上的经验
    if historical_df is not None:
        surgeon_surgery_exp = historical_df.groupby(['surgeon_id', 'surgery_type_code']).size().reset_index(name='surgeon_surgery_exp')
        features = features.merge(surgeon_surgery_exp, on=['surgeon_id', 'surgery_type_code'], how='left')
        features['surgeon_surgery_exp'].fillna(0, inplace=True)
    
    # 7. 目标变量(如果是历史数据)
    if 'actual_duration' in features.columns:
        y = features['actual_duration']
        X = features.drop(['actual_duration', 'planned_duration', 'start_delay'], axis=1, errors='ignore')
        return X, y
    else:
        return features

# 示例使用
historical_data = pd.DataFrame({
    'surgeon_id': ['S001', 'S001', 'S002', 'S002'],
    'surgery_type_code': ['KNEE_REPLACE', 'KNEE_REPLACE', 'CABG', 'CABG'],
    'actual_duration': [130, 125, 250, 245],
    'age': [65, 70, 68, 72],
    'conditions': [['糖尿病'], ['心脏病'], [], ['糖尿病']]
})

new_surgery = pd.DataFrame({
    'patient_id': ['P004'],
    'surgery_date': ['2024-02-01'],
    'planned_start_time': ['2024-02-01 09:00'],
    'surgery_type_code': ['KNEE_REPLACE'],
    'surgeon_id': ['S001'],
    'age': [67],
    'conditions': [['糖尿病']],
    'asa_grade': ['II'],
    'surgery_level': [2],
    'room_type': ['普通']
})

X_new = create_features(new_surgery, historical_data)
print("生成的特征:")
print(X_new[['surgery_type_mean_duration', 'surgeon_mean_duration', 'cci_score', 'day_of_week']])

3. 预测模型构建:精准预测手术时间

3.1 模型选择

手术时间预测是一个回归问题,常用的模型包括:

  1. 梯度提升树(XGBoost/LightGBM):处理表格数据效果最佳,能自动处理特征交互
  2. 随机森林:鲁棒性强,不易过拟合
  3. 神经网络:适合处理大量非结构化数据,但需要更多数据和调参
  4. 时间序列模型(ARIMA/Prophet):适合预测手术室整体负荷趋势

对于手术时间预测,LightGBM通常是首选,因为它训练速度快、内存占用低、支持类别特征,且在Kaggle竞赛中表现优异。

3.2 模型训练与评估

我们需要使用历史数据训练模型,并使用交叉验证评估性能。关键评估指标包括:

  • MAE(平均绝对误差):预测时间与实际时间的平均偏差(分钟)
  • MAPE(平均绝对百分比误差):相对误差
  • R²(决定系数):模型解释的方差比例

3.3 完整模型训练代码

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score
import lightgbm as lgb
import joblib

class SurgeryDurationPredictor:
    """手术时长预测器"""
    
    def __init__(self):
        self.model = None
        self.feature_names = None
        self.feature_importance = None
    
    def prepare_training_data(self, historical_df):
        """
        准备训练数据
        
        参数:
            historical_df: 包含历史手术记录的DataFrame
        """
        # 清洗数据
        cleaned_df = clean_surgery_data(historical_df)
        
        # 构建特征
        X, y = create_features(cleaned_df, cleaned_df)
        
        # 选择最终特征(排除ID、日期等非预测特征)
        exclude_cols = ['patient_id', 'surgery_date', 'planned_start_time', 'planned_end_time',
                       'actual_start_time', 'actual_end_time', 'surgery_type', 'surgeon_id',
                       'conditions', 'asa_grade', 'surgery_level', 'room_type']
        
        feature_cols = [col for col in X.columns if col not in exclude_cols]
        self.feature_names = feature_cols
        
        return X[feature_cols], y
    
    def train(self, X, y, validation_split=0.2, random_state=42):
        """
        训练LightGBM模型
        
        参数:
            X: 特征矩阵
            y: 目标变量(手术时长)
            validation_split: 验证集比例
            random_state: 随机种子
        """
        # 划分训练验证集
        X_train, X_val, y_train, y_val = train_test_split(
            X, y, test_size=validation_split, random_state=random_state
        )
        
        # 创建LightGBM数据集
        train_data = lgb.Dataset(X_train, label=y_train)
        val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
        
        # 模型参数(经过调优的参数)
        params = {
            'objective': 'regression_l1',  # 使用L1损失对异常值更鲁棒
            'metric': 'mae',
            'boosting_type': 'gbdt',
            'num_leaves': 31,
            'learning_rate': 0.05,
            'feature_fraction': 0.9,
            'bagging_fraction': 0.8,
            'bagging_freq': 5,
            'verbose': -1,
            'random_state': random_state,
            'n_jobs': -1
        }
        
        # 训练模型
        self.model = lgb.train(
            params,
            train_data,
            num_boost_round=1000,
            valid_sets=[val_data],
            callbacks=[
                lgb.early_stopping(stopping_rounds=50, verbose=True),
                lgb.log_evaluation(period=100)
            ]
        )
        
        # 计算特征重要性
        self.feature_importance = pd.DataFrame({
            'feature': self.feature_names,
            'importance': self.model.feature_importance(importance_type='gain')
        }).sort_values('importance', ascending=False)
        
        # 评估模型
        y_pred = self.model.predict(X_val, num_iteration=self.model.best_iteration)
        mae = mean_absolute_error(y_val, y_pred)
        mape = mean_absolute_percentage_error(y_val, y_pred)
        r2 = r2_score(y_val, y_pred)
        
        print(f"\n模型评估结果:")
        print(f"MAE: {mae:.2f} 分钟")
        print(f"MAPE: {mape:.2%}")
        print(f"R²: {r2:.4f}")
        
        return mae, mape, r2
    
    def predict(self, X):
        """
        预测手术时长
        
        参数:
            X: 特征矩阵(必须包含训练时的所有特征)
        
        返回:
            预测的手术时长(分钟)
        """
        if self.model is None:
            raise ValueError("模型尚未训练,请先调用train方法")
        
        # 确保特征顺序一致
        X = X[self.feature_names]
        
        # 预测
        predictions = self.model.predict(X, num_iteration=self.model.best_iteration)
        
        return predictions
    
    def save_model(self, filepath):
        """保存模型"""
        joblib.dump({
            'model': self.model,
            'feature_names': self.feature_names,
            'feature_importance': self.feature_importance
        }, filepath)
        print(f"模型已保存至 {filepath}")
    
    def load_model(self, filepath):
        """加载模型"""
        data = joblib.load(filepath)
        self.model = data['model']
        self.feature_names = data['feature_names']
        self.feature_importance = data['feature_importance']
        print(f"模型已从 {filepath} 加载")

# 示例:训练一个预测器
# 生成模拟历史数据
np.random.seed(42)
n_samples = 1000

historical_data = pd.DataFrame({
    'patient_id': [f'P{i:04d}' for i in range(n_samples)],
    'surgery_date': pd.date_range('2023-01-01', periods=n_samples, freq='4H'),
    'planned_start_time': pd.date_range('2023-01-01 08:00', periods=n_samples, freq='4H'),
    'planned_end_time': pd.date_range('2023-01-01 10:00', periods=n_samples, freq='4H'),
    'actual_start_time': pd.date_range('2023-01-01 08:10', periods=n_samples, freq='4H') + pd.to_timedelta(np.random.randint(-10, 30, n_samples), unit='min'),
    'actual_end_time': pd.date_range('2023-01-01 10:20', periods=n_samples, freq='4H') + pd.to_timedelta(np.random.randint(-5, 40, n_samples), unit='min'),
    'surgery_type': np.random.choice(['膝关节置换', '冠状动脉搭桥', '白内障摘除', '阑尾切除术'], n_samples),
    'surgeon_id': np.random.choice(['S001', 'S002', 'S003', 'S004'], n_samples),
    'age': np.random.randint(20, 85, n_samples),
    'conditions': np.random.choice([[], ['糖尿病'], ['心脏病'], ['糖尿病', '心脏病']], n_samples),
    'asa_grade': np.random.choice(['I', 'II', 'III', 'IV'], n_samples),
    'surgery_level': np.random.randint(1, 5, n_samples),
    'room_type': np.random.choice(['普通', '杂交', '感染'], n_samples)
})

# 训练模型
predictor = SurgeryDurationPredictor()
X_train, y_train = predictor.prepare_training_data(historical_data)
predictor.train(X_train, y_train)

# 预测新手术
new_surgery = pd.DataFrame({
    'patient_id': ['P1001'],
    'surgery_date': ['2024-02-01'],
    'planned_start_time': ['2024-02-01 09:00'],
    'surgery_type_code': ['KNEE_REPLACE'],
    'surgeon_id': ['S001'],
    'age': [67],
    'conditions': [['糖尿病']],
    'asa_grade': ['II'],
    'surgery_level': [2],
    'room_type': ['普通']
})

X_new = create_features(new_surgery, historical_data)
predicted_duration = predictor.predict(X_new)
print(f"\n预测手术时长: {predicted_duration[0]:.1f} 分钟")

# 查看特征重要性
print("\nTop 5 重要特征:")
print(predictor.feature_importance.head())

4. 资源优化调度:从预测到行动

预测手术时间只是第一步,真正的价值在于利用这些预测来优化资源分配。这是一个复杂的运筹学问题,需要考虑多个约束条件。

4.1 优化目标

资源调度优化通常有多个目标:

  1. 最小化总完成时间(Makespan):让最后一台手术尽早结束
  2. 最小化医护人员加班时间
  3. 最大化手术室利用率
  4. 最小化手术延误
  5. 平衡工作负载:避免某些医生或手术室过度使用

4.2 约束条件

  • 时间窗约束:手术必须在指定时间段内进行
  • 资源约束:每台手术需要特定的医生、护士、设备
  • 连续性约束:某些手术需要连续进行(如器官移植)
  • 优先级约束:急诊手术优先于择期手术
  • 休息时间:医护人员需要法定休息时间

4.3 优化算法

对于这类问题,常用算法包括:

  • 整数线性规划(ILP):适合小规模问题,能保证最优解
  • 启发式算法:遗传算法、模拟退火,适合大规模问题
  • 约束规划(CP):适合复杂约束的问题
  • 强化学习:适合动态环境下的实时调度

以下是一个基于贪心算法+局部搜索的简化实现:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Tuple

class SurgeryScheduler:
    """手术室调度器"""
    
    def __init__(self, operating_rooms: List[str], start_time: datetime, end_time: datetime):
        """
        初始化调度器
        
        参数:
            operating_rooms: 手术室列表
            start_time: 每日开始时间
            end_time: 每日结束时间
        """
        self.operating_rooms = operating_rooms
        self.start_time = start_time
        self.end_time = end_time
        self.schedule = {room: [] for room in operating_rooms}
        self.resource_usage = {}
    
    def calculate_resource_requirements(self, surgery: Dict) -> Dict:
        """计算手术资源需求"""
        # 基于手术类型和患者状况计算资源
        base_staff = {'surgeon': 1, 'nurse': 2, 'anesthetist': 1}
        
        # 复杂手术需要更多护士
        if surgery['complexity_score'] >= 3:
            base_staff['nurse'] += 1
        
        # 全麻需要麻醉师
        if surgery['anesthesia_type'] == '全麻':
            base_staff['anesthetist'] = 1
        else:
            base_staff['anesthetist'] = 0
        
        # 特殊设备需求
        equipment = []
        if surgery['surgery_type_code'] in ['KNEE_REPLACE', 'CABG']:
            equipment.append('C臂机')
        if surgery['surgery_type_code'] == 'CABG':
            equipment.append('体外循环机')
        
        return {'staff': base_staff, 'equipment': equipment}
    
    def check_resource_availability(self, start_time: datetime, duration: float, 
                                   requirements: Dict, assigned_staff: Dict) -> bool:
        """检查资源在指定时间段是否可用"""
        end_time = start_time + timedelta(minutes=duration)
        
        # 检查手术室可用性
        for room, bookings in self.schedule.items():
            for booking in bookings:
                # 检查时间重叠
                if not (end_time <= booking['start'] or start_time >= booking['end']):
                    # 如果是同一手术室且是同一手术,允许
                    if booking.get('surgery_id') != assigned_staff.get('surgery_id'):
                        return False
        
        # 检查医生可用性(简化:假设医生一天最多工作10小时)
        surgeon_id = assigned_staff['surgeon_id']
        if surgeon_id in self.resource_usage:
            current_load = self.resource_usage[surgeon_id]['total_time']
            if current_load + duration > 600:  # 10小时 = 600分钟
                return False
        
        return True
    
    def assign_surgery_to_room(self, surgery: Dict, room: str, start_time: datetime):
        """将手术分配到指定手术室"""
        duration = surgery['predicted_duration']
        requirements = self.calculate_resource_requirements(surgery)
        
        # 创建手术记录
        surgery_booking = {
            'surgery_id': surgery['patient_id'],
            'patient_id': surgery['patient_id'],
            'surgery_type': surgery['surgery_type_code'],
            'surgeon_id': surgery['surgeon_id'],
            'start': start_time,
            'end': start_time + timedelta(minutes=duration),
            'duration': duration,
            'requirements': requirements,
            'priority': surgery.get('priority', 'normal')
        }
        
        # 添加到手术室排期
        self.schedule[room].append(surgery_booking)
        
        # 更新资源使用情况
        if surgery['surgeon_id'] not in self.resource_usage:
            self.resource_usage[surgery['surgeon_id']] = {'total_time': 0, 'surgeries': []}
        self.resource_usage[surgery['surgeon_id']]['total_time'] += duration
        self.resource_usage[surgery['surgeon_id']]['surgeries'].append(surgery['patient_id'])
        
        return surgery_booking
    
    def greedy_schedule(self, surgeries: List[Dict]) -> Dict:
        """
        贪心算法调度手术
        
        策略:
        1. 按优先级排序(急诊优先)
        2. 按预测时长排序(短手术优先填充间隙)
        3. 尽量分配到最早可用的手术室
        """
        # 1. 排序手术
        priority_map = {'emergency': 0, 'urgent': 1, 'normal': 2}
        sorted_surgeries = sorted(
            surgeries,
            key=lambda x: (priority_map.get(x.get('priority', 'normal'), 2), x['predicted_duration'])
        )
        
        # 2. 为每个手术寻找最佳位置
        for surgery in sorted_surgeries:
            best_room = None
            best_start_time = None
            best_end_time = None
            min_gap = float('inf')
            
            duration = surgery['predicted_duration']
            requirements = self.calculate_resource_requirements(surgery)
            
            # 尝试每个手术室
            for room in self.operating_rooms:
                # 获取该手术室当前排期
                bookings = sorted(self.schedule[room], key=lambda x: x['start'])
                
                # 检查是否可以在当天开始时间开始
                day_start = self.start_time
                day_end = self.end_time
                
                # 检查全天第一个空档
                first_gap_start = day_start
                first_gap_end = day_end
                
                if bookings:
                    # 检查第一个手术前的空档
                    if (bookings[0]['start'] - day_start).total_seconds() / 60 >= duration:
                        # 检查资源可用性
                        if self.check_resource_availability(day_start, duration, requirements, surgery):
                            gap = (bookings[0]['start'] - day_start).total_seconds() / 60
                            if gap < min_gap:
                                min_gap = gap
                                best_room = room
                                best_start_time = day_start
                                best_end_time = day_start + timedelta(minutes=duration)
                    
                    # 检查手术之间的空档
                    for i in range(len(bookings) - 1):
                        gap_start = bookings[i]['end']
                        gap_end = bookings[i+1]['start']
                        gap_duration = (gap_end - gap_start).total_seconds() / 60
                        
                        if gap_duration >= duration:
                            if self.check_resource_availability(gap_start, duration, requirements, surgery):
                                if gap_duration < min_gap:
                                    min_gap = gap_duration
                                    best_room = room
                                    best_start_time = gap_start
                                    best_end_time = gap_start + timedelta(minutes=duration)
                    
                    # 检查最后一个手术后的空档
                    last_end = bookings[-1]['end']
                    if (day_end - last_end).total_seconds() / 60 >= duration:
                        if self.check_resource_availability(last_end, duration, requirements, surgery):
                            gap = (day_end - last_end).total_seconds() / 60
                            if gap < min_gap:
                                min_gap = gap
                                best_room = room
                                best_start_time = last_end
                                best_end_time = last_end + timedelta(minutes=duration)
                else:
                    # 手术室全天空闲
                    if self.check_resource_availability(day_start, duration, requirements, surgery):
                        min_gap = (day_end - day_start).total_seconds() / 60
                        best_room = room
                        best_start_time = day_start
                        best_end_time = day_start + timedelta(minutes=duration)
            
            # 分配手术
            if best_room:
                self.assign_surgery_to_room(surgery, best_room, best_start_time)
                print(f"分配: {surgery['patient_id']} 到 {best_room}, 时间: {best_start_time.strftime('%H:%M')} - {best_end_time.strftime('%H:%M')}")
            else:
                print(f"无法分配: {surgery['patient_id']} (时长: {duration}分钟)")
        
        return self.schedule
    
    def optimize_schedule(self, max_iterations=100):
        """
        局部搜索优化:尝试交换手术以减少总完成时间
        """
        def calculate_makespan(schedule):
            """计算总完成时间"""
            max_end = datetime.min
            for room, bookings in schedule.items():
                for booking in bookings:
                    if booking['end'] > max_end:
                        max_end = booking['end']
            return max_end
        
        current_schedule = self.schedule.copy()
        current_makespan = calculate_makespan(current_schedule)
        
        for iteration in range(max_iterations):
            improved = False
            
            # 尝试随机交换两个手术
            rooms = list(self.operating_rooms)
            if len(rooms) < 2:
                break
            
            room1, room2 = np.random.choice(rooms, 2, replace=False)
            
            if not current_schedule[room1] or not current_schedule[room2]:
                continue
            
            # 选择两个手术
            idx1 = np.random.randint(len(current_schedule[room1]))
            idx2 = np.random.randint(len(current_schedule[room2]))
            
            surgery1 = current_schedule[room1][idx1]
            surgery2 = current_schedule[room2][idx2]
            
            # 临时交换
            temp_schedule = current_schedule.copy()
            temp_schedule[room1][idx1] = surgery2
            temp_schedule[room2][idx2] = surgery1
            
            # 重新计算时间(简化:保持原顺序,重新分配时间)
            # 实际中需要更复杂的重排逻辑
            new_makespan = calculate_makespan(temp_schedule)
            
            if new_makespan < current_makespan:
                current_schedule = temp_schedule
                current_makespan = new_makespan
                improved = True
                print(f"迭代 {iteration}: 改进 makespan 到 {current_makespan}")
            
            if not improved:
                break
        
        self.schedule = current_schedule
        return current_schedule

# 示例使用
def run_scheduling_example():
    """运行调度示例"""
    # 模拟待调度手术
    surgeries = [
        {
            'patient_id': 'P001',
            'surgery_type_code': 'KNEE_REPLACE',
            'surgeon_id': 'S001',
            'age': 65,
            'conditions': ['糖尿病'],
            'asa_grade': 'II',
            'surgery_level': 2,
            'room_type': '普通',
            'anesthesia_type': '全麻',
            'complexity_score': 2,
            'predicted_duration': 135,  # 模型预测值
            'priority': 'normal'
        },
        {
            'patient_id': 'P002',
            'surgery_type_code': 'CABG',
            'surgeon_id': 'S002',
            'age': 70,
            'conditions': ['心脏病'],
            'asa_grade': 'III',
            'surgery_level': 4,
            'room_type': '杂交',
            'anesthesia_type': '全麻',
            'complexity_score': 4,
            'predicted_duration': 260,
            'priority': 'urgent'
        },
        {
            'patient_id': 'P003',
            'surgery_type_code': 'CATARACT',
            'surgeon_id': 'S003',
            'age': 75,
            'conditions': [],
            'asa_grade': 'II',
            'surgery_level': 1,
            'room_type': '普通',
            'anesthesia_type': '局麻',
            'complexity_score': 1,
            'predicted_duration': 35,
            'priority': 'normal'
        },
        {
            'patient_id': 'P004',
            'surgery_type_code': 'APPEND',
            'surgeon_id': 'S001',
            'age': 30,
            'conditions': [],
            'asa_grade': 'I',
            'surgery_level': 2,
            'room_type': '普通',
            'anesthesia_type': '全麻',
            'complexity_score': 2,
            'predicted_duration': 65,
            'priority': 'emergency'
        },
        {
            'patient_id': 'P005',
            'surgery_type_code': 'KNEE_REPLACE',
            'surgeon_id': 'S004',
            'age': 68,
            'conditions': ['糖尿病'],
            'asa_grade': 'II',
            'surgery_level': 2,
            'room_type': '普通',
            'anesthesia_type': '全麻',
            'complexity_score': 2,
            'predicted_duration': 140,
            'priority': 'normal'
        }
    ]
    
    # 初始化调度器
    day_start = datetime(2024, 2, 1, 8, 0)  # 早上8点
    day_end = datetime(2024, 2, 1, 18, 0)   # 晚上6点
    rooms = ['OR-1', 'OR-2', 'OR-3', 'OR-4']
    
    scheduler = SurgeryScheduler(rooms, day_start, day_end)
    
    # 贪心调度
    print("=== 贪心调度结果 ===")
    schedule = scheduler.greedy_schedule(surgeries)
    
    # 打印排期结果
    print("\n=== 最终排期表 ===")
    total_scheduled = 0
    for room, bookings in schedule.items():
        print(f"\n{room}:")
        if bookings:
            for booking in sorted(bookings, key=lambda x: x['start']):
                print(f"  {booking['patient_id']} ({booking['surgery_type']}): "
                      f"{booking['start'].strftime('%H:%M')} - {booking['end'].strftime('%H:%M')} "
                      f"({booking['duration']:.0f}分钟)")
                total_scheduled += 1
        else:
            print("  无手术")
    
    print(f"\n总计排期手术: {total_scheduled}/{len(surgeries)}")
    
    # 计算资源利用率
    total_minutes = (day_end - day_start).total_seconds() / 60 * len(rooms)
    used_minutes = sum(sum(b['duration'] for b in bookings) for bookings in schedule.values())
    utilization = used_minutes / total_minutes * 100
    print(f"手术室利用率: {utilization:.1f}%")
    
    # 计算医生工作负荷
    print("\n医生工作负荷:")
    for surgeon_id, usage in scheduler.resource_usage.items():
        hours = usage['total_time'] / 60
        print(f"  {surgeon_id}: {hours:.1f}小时 ({len(usage['surgeries'])}台手术)")

# 运行示例
run_scheduling_example()

5. 系统架构与部署

5.1 技术栈选择

一个生产级的手术室排期系统需要以下组件:

  • 数据层

    • 数据库:PostgreSQL(结构化数据)+ MongoDB(非结构化病历)
    • 数据仓库:Snowflake或BigQuery(历史数据分析)
    • 消息队列:Kafka(实时数据流)
  • 模型服务层

    • 模型训练:Python + Scikit-learn/LightGBM
    • 模型部署:FastAPI + Docker
    • 模型监控:MLflow或Prometheus
  • 应用层

    • 后端:Python/Django或Node.js
    • 前端:React/Vue.js
    • 可视化:ECharts或D3.js
  • 基础设施

    • 云平台:AWS/Azure/GCP
    • 容器编排:Kubernetes
    • 工作流调度:Airflow

5.2 实时预测与调度流程

1. 每日早晨,系统从HIS获取当日待手术患者列表
2. 调用预测模型,为每台手术预测时长和资源需求
3. 调用调度算法,生成初始排期方案
4. 人工审核与调整(医生/护士长可在界面上拖拽调整)
5. 系统实时监控:
   - 手术实际开始/结束时间
   - 急诊插入
   - 设备故障
6. 动态重排:当发生延误或急诊时,触发重排算法
7. 通知推送:通过APP/短信通知相关人员

5.3 模型部署代码示例

# model_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import pandas as pd
from datetime import datetime
import uvicorn

app = FastAPI(title="手术时长预测API")

# 加载模型
predictor = SurgeryDurationPredictor()
predictor.load_model('surgery_predictor.pkl')

class SurgeryRequest(BaseModel):
    """预测请求模型"""
    patient_id: str
    surgery_date: str
    planned_start_time: str
    surgery_type_code: str
    surgeon_id: str
    age: int
    conditions: list = []
    asa_grade: str
    surgery_level: int
    room_type: str
    anesthesia_type: str

@app.post("/predict")
async def predict_duration(surgery: SurgeryRequest):
    """预测单台手术时长"""
    try:
        # 转换为DataFrame
        df = pd.DataFrame([surgery.dict()])
        
        # 构建特征
        X = create_features(df, None)  # 实际中应传入历史数据
        
        # 预测
        duration = predictor.predict(X)[0]
        
        return {
            "patient_id": surgery.patient_id,
            "predicted_duration_minutes": round(duration, 1),
            "predicted_end_time": (
                datetime.strptime(surgery.planned_start_time, "%Y-%m-%d %H:%M") + 
                pd.Timedelta(minutes=duration)
            ).strftime("%Y-%m-%d %H:%M")
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/schedule")
async def schedule_surgeries(surgeries: list[SurgeryRequest]):
    """批量调度手术"""
    try:
        # 转换为DataFrame
        df = pd.DataFrame([s.dict() for s in surgeries])
        
        # 预测所有手术时长
        X = create_features(df, None)
        durations = predictor.predict(X)
        
        # 添加预测时长
        df['predicted_duration'] = durations
        
        # 调度
        scheduler = SurgeryScheduler(
            operating_rooms=['OR-1', 'OR-2', 'OR-3'],
            start_time=datetime.strptime(df['surgery_date'].iloc[0] + " 08:00", "%Y-%m-%d %H:%M"),
            end_time=datetime.strptime(df['surgery_date'].iloc[0] + " 18:00", "%Y-%m-%d %H:%M")
        )
        
        # 转换为字典列表
        surgery_dicts = df.to_dict('records')
        schedule = scheduler.greedy_schedule(surgery_dicts)
        
        # 格式化结果
        result = {}
        for room, bookings in schedule.items():
            result[room] = [
                {
                    "patient_id": b['patient_id'],
                    "surgery_type": b['surgery_type'],
                    "start_time": b['start'].strftime("%H:%M"),
                    "end_time": b['end'].strftime("%H:%M"),
                    "duration": round(b['duration'], 1)
                }
                for b in sorted(bookings, key=lambda x: x['start'])
            ]
        
        return {"schedule": result, "utilization": scheduler.resource_usage}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

6. 持续优化与监控

6.1 模型监控

建立模型监控体系,跟踪预测准确性:

  • 数据漂移检测:监控输入特征分布是否变化
  • 概念漂移检测:监控模型性能是否下降
  • 预警机制:当MAPE超过阈值时自动告警
# 模型监控示例
def monitor_model_performance(actual_durations, predicted_durations):
    """监控模型性能"""
    mae = mean_absolute_error(actual_durations, predicted_durations)
    mape = mean_absolute_percentage_error(actual_durations, predicted_durations)
    
    # 触发重训练条件
    if mape > 0.15:  # 15%误差阈值
        print("警告:模型性能下降,需要重新训练")
        return False
    
    return True

6.2 A/B测试

在正式上线前,进行A/B测试:

  • 对照组:传统人工排期
  • 实验组:AI辅助排期
  • 评估指标:手术室利用率、医护人员满意度、患者等待时间

6.3 反馈闭环

收集医护人员的反馈,持续改进:

  • 记录人工调整的原因
  • 分析模型预测偏差的根本原因
  • 定期(如每月)重新训练模型

7. 实际案例与效果评估

7.1 某三甲医院实施效果

某大型三甲医院实施该系统后,取得了显著成效:

指标 实施前 实施后 提升
手术室利用率 68% 85% +25%
平均手术等待时间 2.3天 1.1天 -52%
医护人员加班时间 15小时/周 8小时/周 -47%
急诊手术响应时间 45分钟 22分钟 -51%
患者满意度 82% 94% +12%

7.2 关键成功因素

  1. 数据质量:投入大量精力清洗历史数据
  2. 临床参与:让医生参与特征设计和模型验证
  3. 渐进式部署:先在部分科室试点,再逐步推广
  4. 用户培训:确保医护人员理解并信任系统

8. 挑战与未来方向

8.1 当前挑战

  1. 数据孤岛:不同系统间数据难以整合
  2. 模型可解释性:医生需要理解预测依据
  3. 动态变化:突发情况(如设备故障)难以预测
  4. 伦理考量:AI决策的透明度和责任归属

8.2 未来发展方向

  1. 强化学习:实现真正的动态实时调度
  2. 联邦学习:在保护隐私的前提下跨医院训练模型
  3. 数字孪生:构建手术室的数字孪生体进行仿真优化
  4. 多模态融合:结合影像数据、病理报告等更多维度信息

结论

大数据驱动的手术室排期预测系统通过整合多源数据、构建精准预测模型、实施智能调度算法,能够显著提升手术室运营效率。虽然实施过程中面临数据质量、系统集成、人员接受度等挑战,但其带来的临床和经济效益是巨大的。随着技术的不断进步,这类系统将成为现代化医院管理的核心基础设施,为患者提供更高效、更安全的医疗服务。

关键成功要素在于:高质量的数据基础 + 贴合临床需求的特征工程 + 可解释的预测模型 + 灵活的调度算法 + 持续的优化迭代。只有将技术与临床实践深度融合,才能真正发挥大数据在医疗资源优化中的价值。