引言:手术室资源管理的挑战与机遇

在现代医院运营中,手术室(Operating Room, OR)是最昂贵且资源密集的核心部门之一。手术室的运营成本通常占医院总运营成本的60%以上,而其收入也占医院总收入的40%-50%。然而,手术室资源管理面临着多重挑战:手术台紧张导致患者等待时间延长,医生排班冲突影响医疗质量和工作满意度,以及资源利用率不均衡造成的经济损失。

传统的手术排期往往依赖人工经验和简单的规则,缺乏对复杂约束条件的系统性考虑。随着医疗大数据的积累和人工智能技术的发展,基于排期预测的智能优化方法为解决这些问题提供了新的思路。本文将深入探讨如何利用排期预测技术优化手术室资源分配,有效解决手术台紧张与医生排班冲突问题。

一、手术室资源分配的核心问题分析

1.1 手术台紧张的根本原因

手术台紧张通常表现为手术室利用率过高、患者等待时间过长、紧急手术难以插入等问题。其根本原因包括:

  • 需求预测不准确:对每日手术需求量的预估偏差导致资源预留不足或浪费
  • 手术时长估计偏差:实际手术时间与预估时间的差异造成后续手术延误
  • 资源分配不合理:不同科室、不同难度手术的资源分配不均衡
  • 突发情况应对不足:急诊手术、手术并发症等突发事件缺乏应急预案

1.2 医生排班冲突的主要表现

医生排班冲突是手术室管理的另一大难题,主要表现为:

  • 时间冲突:同一医生被安排在同一时间段的不同手术室
  • 资质冲突:低年资医生被安排在需要高年资医生的复杂手术中
  • 疲劳风险:连续工作时间过长,违反劳动法规和医疗安全标准
  • 偏好冲突:未考虑医生的个人偏好和休假需求,影响工作积极性

1.3 两者之间的关联性

手术台紧张与医生排班冲突并非孤立问题,而是相互影响的:

  • 手术台紧张会迫使医生加班,增加排班难度
  • 医生排班不当会导致手术效率下降,进一步加剧手术台紧张
  • 两者共同影响患者满意度、医疗质量和医院经济效益

2. 排期预测技术的核心原理与方法

2.1 时间序列预测模型

时间序列预测是排期预测的基础,主要用于预测未来一段时间内的手术需求量。

2.1.1 ARIMA模型

ARIMA(自回归积分移动平均模型)是经典的时间序列预测方法,适用于具有明显趋势和季节性的数据。

import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
import matplotlib.pyplot as plt

# 模拟历史手术量数据(假设过去365天的每日手术量)
np.random.seed(42)
dates = pd.date_range('2023-01-01', periods=365, freq='D')
# 基础手术量 + 周末效应 + 随机波动
base_surgery_count = 50
weekend_effect = np.array([0.7 if d.weekday() >= 5 else 1.0 for d in dates])
trend = np.linspace(1, 1.2, 365)  # 缓慢增长趋势
noise = np.random.normal(0, 3, 365)
surgery_count = (base_surgery_count * trend * weekend_effect + noise).astype(int)

# 创建DataFrame
df = pd.DataFrame({'date': dates, 'surgery_count': surgery_count})
df.set_index('date', inplace=True)

# 训练ARIMA模型(使用前300天作为训练集)
train_data = df['surgery_count'][:300]
model = ARIMA(train_data, order=(2,1,2))  # ARIMA(2,1,2)
model_fit = model.fit()

# 预测未来30天
forecast = model_fit.forecast(steps=30)
forecast_dates = pd.date_range('2024-01-01', periods=30, freq='D')

# 可视化结果
plt.figure(figsize=(12, 6))
plt.plot(df.index, df['surgery_count'], label='历史数据')
plt.plot(forecast_dates, forecast, label='预测数据', color='red', linestyle='--')
plt.title('手术量时间序列预测')
plt.xlabel('日期')
plt.ylabel('每日手术量')
plt.legend()
plt.grid(True)
plt.show()

# 输出预测结果
print("未来30天手术量预测:")
for date, count in zip(forecast_dates, forecast):
    print(f"{date.strftime('%Y-%m-%d')}: 预计 {max(0, int(count))} 台手术")

代码说明

  • 该代码模拟了医院每日手术量数据,考虑了周末效应和增长趋势
  • 使用ARIMA(2,1,2)模型进行训练和预测
  • 输出未来30天的手术量预测结果
  • 实际应用中需要使用真实历史数据,并通过网格搜索优化模型参数

2.1.2 Prophet模型(Facebook)

Prophet是Facebook开发的时间序列预测工具,特别适合处理具有强季节性、节假日效应的数据。

from prophet import Prophet
import pandas as pd
import numpy as np

# 准备数据(Prophet要求列名为ds和y)
np.random.seed(42)
dates = pd.date_range('2023-01-01', periods=365, freq='D')
surgery_count = np.random.poisson(50, 365) + \
                np.array([10 if d.weekday() < 5 else -15 for d in dates]) + \
                np.array([5 if d.month in [3, 9] else 0 for d in dates])  # 春秋季高峰期

df_prophet = pd.DataFrame({
    'ds': dates,
    'y': surgery_count
})

# 创建并训练Prophet模型
model_prophet = Prophet(
    yearly_seasonality=True,
    weekly_seasonality=True,
    daily_seasonality=False,
    changepoint_prior_scale=0.05
)

# 添加自定义节假日(如医院年度体检季)
model_prophet.add_country_holidays(country_name='CN')
model_prophet.add_seasonality(name='quarterly', period=91.25, fourier_order=5)

model_prophet.fit(df_prophet)

# 创建未来日期DataFrame
future = model_prophet.make_future_dataframe(periods=30)

# 预测
forecast = model_prophet.predict(future)

# 可视化
fig1 = model_prophet.plot(forecast)
plt.title('Prophet手术量预测')
plt.show()

fig2 = model_prophet.plot_components(forecast)
plt.show()

# 查看预测详情
print(forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(30))

代码说明

  • Prophet模型自动处理周末效应和季节性变化
  • 可以添加自定义节假日和季节性成分
  • 输出预测区间(yhat_lower, yhat_upper)有助于风险评估
  • 适用于手术量预测这种具有明显周期性的场景

2.2 手术时长预测模型

准确预测手术时长是排期优化的关键。传统方法使用平均时长,但现代方法采用机器学习模型。

2.2.1 基于患者特征的手术时长预测

import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.preprocessing import LabelEncoder

# 模拟手术数据集
np.random.seed(42)
n_samples = 1000

# 特征:患者年龄、BMI、手术类型、麻醉方式、医生年资、是否为急诊
data = {
    'patient_age': np.random.randint(18, 85, n_samples),
    'bmi': np.random.normal(24, 4, n_samples),
    'surgery_type': np.random.choice(['appendectomy', 'cholecystectomy', 'hernia', 'knee_replacement', 'cardiac'], n_samples),
    'anesthesia_type': np.random.choice(['general', 'spinal', 'local'], n_samples),
    'doctor_experience': np.random.randint(1, 20, n_samples),
    'is_emergency': np.random.choice([0, 1], n_samples, p=[0.85, 0.15])
}

df_surgery = pd.DataFrame(data)

# 手术时长生成规则(模拟真实情况)
def generate_surgery_duration(row):
    base_duration = {
        'appendectomy': 60,
        'cholecystectomy': 90,
        'hernia': 45,
        'knee_replacement': 120,
        'cardiac': 240
    }[row['surgery_type']]
    
    # 年龄影响:>65岁增加10%时间
    age_factor = 1.1 if row['patient_age'] > 65 else 1.0
    
    # BMI影响:>30增加15%时间
    bmi_factor = 1.15 if row['bmi'] > 30 else 1.0
    
    # 医生经验影响:经验<5年增加20%时间
    exp_factor = 1.2 if row['doctor_experience'] < 5 else 1.0
    
    # 急诊影响:增加30%时间(准备时间)
    emergency_factor = 1.3 if row['is_emergency'] else 1.0
    
    # 随机波动
    noise = np.random.normal(1, 0.1)
    
    duration = base_duration * age_factor * bmi_factor * exp_factor * emergency_factor * noise
    return max(30, duration)  # 最少30分钟

df_surgery['duration'] = df_surgery.apply(generate_surgery_duration, axis=1)

# 编码分类变量
label_encoders = {}
for col in ['surgery_type', 'anesthesia_type']:
    le = LabelEncoder()
    df_surgery[col + '_encoded'] = le.fit_transform(df_surgery[col])
    label_encoders[col] = le

# 准备特征和目标变量
features = ['patient_age', 'bmi', 'surgery_type_encoded', 'anesthesia_type_encoded', 
            'doctor_experience', 'is_emergency']
X = df_surgery[features]
y = df_surgery['duration']

# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 训练随机森林模型
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
rf_model.fit(X_train, y_train)

# 预测和评估
y_pred = rf_model.predict(X_test)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

print(f"模型性能评估:")
print(f"平均绝对误差(MAE): {mae:.2f} 分钟")
print(f"决定系数(R²): {r2:.4f}")

# 特征重要性分析
feature_importance = pd.DataFrame({
    'feature': features,
    'importance': rf_model.feature_importances_
}).sort_values('importance', ascending=False)

print("\n特征重要性:")
print(feature_importance)

# 预测新手术时长示例
new_surgery = pd.DataFrame([{
    'patient_age': 70,
    'bmi': 28,
    'surgery_type_encoded': label_encoders['surgery_type'].transform(['cardiac'])[0],
    'anesthesia_type_encoded': label_encoders['anesthesia_type'].transform(['general'])[0],
    'doctor_experience': 8,
    'is_emergency': 0
}])

predicted_duration = rf_model.predict(new_surgery)[0]
print(f"\n新手术预测时长: {predicted_duration:.2f} 分钟")

代码说明

  • 该模型综合考虑了患者特征、手术类型、医生经验等多维度因素
  • 使用随机森林回归模型,能够处理非线性关系
  • 输出特征重要性,帮助理解哪些因素对手术时长影响最大
  • 可以用于术前精准预测,为排期提供依据

2.3 医生排班约束建模

医生排班需要考虑多种约束条件,包括工作时长、资质要求、连续工作限制等。

2.3.1 约束条件定义

from ortools.sat.python import cp_model
import pandas as pd
import numpy as np

class SurgeryScheduler:
    def __init__(self, doctors, surgeries, time_slots):
        """
        初始化手术排期优化器
        
        参数:
        doctors: 医生列表,包含资质、最大工作时长等信息
        surgeries: 手术列表,包含时长、所需资质等信息
        time_slots: 可用时间段
        """
        self.doctors = doctors
        self.surgeries = surgeries
        self.time_slots = time_slots
        self.model = cp_model.CpModel()
        self.solver = cp_model.CpSolver()
        
    def build_model(self):
        """构建约束满足模型"""
        # 创建决策变量:assignment[doctor][surgery][time_slot] = 0/1
        self.assignment = {}
        for d in self.doctors:
            for s in self.surgeries:
                for t in self.time_slots:
                    self.assignment[(d['id'], s['id'], t)] = self.model.NewBoolVar(
                        f"assign_{d['id']}_{s['id']}_{t}"
                    )
        
        # 约束1:每台手术只能分配给一个医生和一个时间段
        for s in self.surgeries:
            self.model.Add(sum(self.assignment[(d['id'], s['id'], t)] 
                             for d in self.doctors 
                             for t in self.time_slots) == 1)
        
        # 约束2:每个医生在同一时间段只能做一台手术
        for d in self.doctors:
            for t in self.time_slots:
                self.model.Add(sum(self.assignment[(d['id'], s['id'], t)] 
                                 for s in self.surgeries) <= 1)
        
        # 约束3:医生资质匹配
        for d in self.doctors:
            for s in self.surgeries:
                if d['level'] < s['required_level']:
                    # 如果医生资质不足,禁止分配
                    for t in self.time_slots:
                        self.model.Add(self.assignment[(d['id'], s['id'], t)] == 0)
        
        # 约束4:医生连续工作时间不超过4小时(240分钟)
        for d in self.doctors:
            total_duration = 0
            for s in self.surgeries:
                for t in self.time_slots:
                    # 如果分配了手术,累加时长
                    duration_var = self.model.NewIntVar(0, s['duration'], f"dur_{d['id']}_{s['id']}_{t}")
                    self.model.Add(duration_var == s['duration']).OnlyEnforceIf(
                        self.assignment[(d['id'], s['id'], t)]
                    )
                    total_duration += duration_var
            self.model.Add(total_duration <= 240)
        
        # 约束5:医生每日最大工作时长(8小时=480分钟)
        for d in self.doctors:
            total_duration = 0
            for s in self.surgeries:
                for t in self.time_slots:
                    duration_var = self.model.NewIntVar(0, s['duration'], f"dur2_{d['id']}_{s['id']}_{t}")
                    self.model.Add(duration_var == s['duration']).OnlyEnforceIf(
                        self.assignment[(d['id'], s['id'], t)]
                    )
                    total_duration += duration_var
            self.model.Add(total_duration <= d['max_daily_minutes'])
        
        # 目标函数:最小化总完成时间(makespan)
        # 定义每台手术的完成时间
        completion_times = {}
        for s in self.surgeries:
            for t in self.time_slots:
                # 完成时间 = 开始时间 + 手术时长
                ct = self.model.NewIntVar(0, 1440, f"ct_{s['id']}_{t}")
                # 如果手术分配到时间段t,则完成时间 = t + duration
                for d in self.doctors:
                    self.model.Add(ct == t + s['duration']).OnlyEnforceIf(
                        self.assignment[(d['id'], s['id'], t)]
                    )
                completion_times[(s['id'], t)] = ct
        
        # 最大完成时间
        max_completion = self.model.NewIntVar(0, 1440, "max_completion")
        for s in self.surgeries:
            for t in self.time_slots:
                self.model.Add(max_completion >= completion_times[(s['id'], t)])
        
        self.model.Minimize(max_completion)
    
    def solve(self):
        """求解模型"""
        status = self.solver.Solve(self.model)
        return status
    
    def get_solution(self):
        """获取排期结果"""
        schedule = []
        if self.solver.StatusName(self.solver.Status()) == 'OPTIMAL':
            for d in self.doctors:
                for s in self.surgeries:
                    for t in self.time_slots:
                        if self.solver.Value(self.assignment[(d['id'], s['id'], t)]) == 1:
                            schedule.append({
                                'doctor_id': d['id'],
                                'surgery_id': s['id'],
                                'start_time': t,
                                'duration': s['duration'],
                                'end_time': t + s['duration']
                            })
        return schedule

# 示例数据
doctors = [
    {'id': 'D1', 'level': 5, 'max_daily_minutes': 480},
    {'id': 'D2', 'level': 4, 'max_daily_minutes': 480},
    {'id': 'D3', 'level': 3, 'max_daily_minutes': 480},
]

surgeries = [
    {'id': 'S1', 'duration': 120, 'required_level': 5},
    {'id': 'S2', 'duration': 90, 'required_level': 4},
    {'id': 'S3', 'duration': 60, 'required_level': 3},
    {'id': 'S4', 'duration': 150, 'required_level': 5},
    {'id': 'S5', 'duration': 45, 'required_level': 2},
]

time_slots = [480, 540, 600, 660, 720, 780]  # 8:00, 9:00, 10:00, 11:00, 12:00, 13:00

# 创建调度器并求解
scheduler = SurgeryScheduler(doctors, surgeries, time_slots)
scheduler.build_model()
status = scheduler.solve()
schedule = scheduler.get_solution()

# 输出结果
if status == cp_model.OPTIMAL:
    print("找到最优排期方案:")
    for item in schedule:
        start_hour = item['start_time'] // 60
        start_minute = item['start_time'] % 60
        end_hour = item['end_time'] // 60
        end_minute = item['end_time'] % 60
        print(f"医生 {item['doctor_id']} 手术 {item['surgery_id']}: " +
              f"{start_hour:02d}:{start_minute:02d} - {end_hour:02d}:{end_minute:02d} " +
              f"(时长: {item['duration']}分钟)")
else:
    print("未找到可行解,请检查约束条件")

代码说明

  • 使用Google OR-Tools的CP-SAT求解器进行约束规划
  • 定义了5个核心约束:手术唯一分配、医生时间冲突、资质匹配、连续工作时长、每日总时长
  • 目标是最小化总完成时间(makespan),提高效率
  • 输出详细的排期结果,包括开始时间、结束时间

3. 综合解决方案:智能排期系统架构

3.1 系统架构设计

一个完整的智能排期系统应包含以下模块:

数据层 → 预测层 → 优化层 → 执行层 → 反馈层

3.1.1 数据层:多源数据整合

import sqlite3
import pandas as pd
from datetime import datetime, timedelta

class DataIntegration:
    """数据整合模块"""
    
    def __init__(self, db_path):
        self.conn = sqlite3.connect(db_path)
    
    def get_historical_surgeries(self, days=365):
        """获取历史手术数据"""
        query = """
        SELECT 
            surgery_date,
            COUNT(*) as surgery_count,
            AVG(duration) as avg_duration,
            SUM(duration) as total_duration
        FROM surgeries 
        WHERE surgery_date >= date('now', '-{} days')
        GROUP BY surgery_date
        ORDER BY surgery_date
        """.format(days)
        
        df = pd.read_sql_query(query, self.conn)
        df['surgery_date'] = pd.to_datetime(df['surgery_date'])
        return df
    
    def get_doctor_availability(self, date):
        """获取医生可用性"""
        query = """
        SELECT doctor_id, available_start, available_end, max_surgeries
        FROM doctor_availability 
        WHERE date = ?
        """
        return pd.read_sql_query(query, self.conn, params=[date])
    
    def get_patient_waiting_list(self):
        """获取等待手术的患者列表"""
        query = """
        SELECT 
            patient_id,
            surgery_type,
            priority,  -- 1=紧急, 2=半紧急, 3=常规
            estimated_duration,
            wait_days
        FROM waiting_list 
        WHERE status = 'pending'
        ORDER BY priority, wait_days DESC
        """
        return pd.read_sql_query(query, self.conn)
    
    def get_operating_room_schedule(self, date):
        """获取手术室排班"""
        query = """
        SELECT room_id, start_time, end_time, status
        FROM room_schedule 
        WHERE date = ?
        ORDER BY start_time
        """
        return pd.read_sql_query(query, self.conn, params=[date])

# 使用示例
# data_int = DataIntegration('hospital.db')
# historical = data_int.get_historical_surgeries()
# waiting_list = data_int.get_patient_waiting_list()

3.2 预测层:需求与资源预测

3.2.1 综合预测引擎

import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from prophet import Prophet
import joblib

class PredictionEngine:
    """预测引擎"""
    
    def __init__(self):
        self.surgery_count_model = None
        self.duration_model = None
        self.prophet_model = None
    
    def train_surgery_count_predictor(self, historical_data):
        """训练手术量预测模型"""
        # 特征工程
        X = []
        y = []
        
        for i in range(7, len(historical_data)):
            # 使用过去7天的数据预测第i天
            features = [
                historical_data.iloc[i-1]['surgery_count'],
                historical_data.iloc[i-2]['surgery_count'],
                historical_data.iloc[i-3]['surgery_count'],
                historical_data.iloc[i-7]['surgery_count'],  # 上周同一天
                historical_data.iloc[i]['surgery_date'].weekday(),  # 星期几
                historical_data.iloc[i]['surgery_date'].month,  # 月份
            ]
            X.append(features)
            y.append(historical_data.iloc[i]['surgery_count'])
        
        X = np.array(X)
        y = np.array(y)
        
        # 训练GBDT模型
        self.surgery_count_model = GradientBoostingRegressor(
            n_estimators=100, learning_rate=0.1, random_state=42
        )
        self.surgery_count_model.fit(X, y)
        
        return self.surgery_count_model
    
    def predict_surgery_count(self, date, historical_data):
        """预测指定日期的手术量"""
        if self.surgery_count_model is None:
            raise ValueError("模型未训练")
        
        # 准备特征
        last_7_days = historical_data.tail(7)
        features = [
            last_7_days.iloc[-1]['surgery_count'],
            last_7_days.iloc[-2]['surgery_count'],
            last_7_days.iloc[-3]['surgery_count'],
            last_7_days.iloc[-7]['surgery_count'] if len(last_7_days) >= 7 else last_7_days.iloc[0]['surgery_count'],
            date.weekday(),
            date.month,
        ]
        
        prediction = self.surgery_count_model.predict([features])[0]
        return max(0, int(prediction))
    
    def train_prophet_model(self, historical_data):
        """训练Prophet模型"""
        df = historical_data[['surgery_date', 'surgery_count']].rename(
            columns={'surgery_date': 'ds', 'surgery_count': 'y'}
        )
        
        self.prophet_model = Prophet(
            yearly_seasonality=True,
            weekly_seasonality=True,
            daily_seasonality=False
        )
        self.prophet_model.fit(df)
        return self.prophet_model
    
    def predict_with_confidence_interval(self, date, historical_data):
        """使用Prophet预测并提供置信区间"""
        if self.prophet_model is None:
            self.train_prophet_model(historical_data)
        
        future = pd.DataFrame({'ds': [date]})
        forecast = self.prophet_model.predict(future)
        
        return {
            'prediction': forecast['yhat'].iloc[0],
            'lower_bound': forecast['yhat_lower'].iloc[0],
            'upper_bound': forecast['yhat_upper'].iloc[0]
        }

# 使用示例
# engine = PredictionEngine()
# engine.train_surgery_count_predictor(historical_data)
# pred = engine.predict_surgery_count(target_date, historical_data)

3.3 优化层:智能排期算法

3.3.1 多目标优化模型

from ortools.sat.python import cp_model
import numpy as np

class MultiObjectiveScheduler:
    """多目标手术排期优化器"""
    
    def __init__(self, surgeries, doctors, rooms, time_slots):
        self.surgeries = surgeries
        self.doctors = doctors
        self.rooms = rooms
        self.time_slots = time_slots
        self.model = cp_model.CpModel()
        self.solver = cp_model.CpSolver()
        
        # 目标权重
        self.weights = {
            'makespan': 0.3,      # 总完成时间
            'overtime': 0.2,      # 医生加班时间
            'priority': 0.3,      # 高优先级手术优先
            'room_utilization': 0.2  # 手术室利用率
        }
    
    def build_model(self):
        """构建多目标优化模型"""
        # 决策变量:手术分配到医生、房间、时间段
        self.assign = {}
        for s in self.surgeries:
            for d in self.doctors:
                for r in self.rooms:
                    for t in self.time_slots:
                        self.assign[(s['id'], d['id'], r, t)] = self.model.NewBoolVar(
                            f"assign_{s['id']}_{d['id']}_{r}_{t}"
                        )
        
        # 约束1:每台手术唯一分配
        for s in self.surgeries:
            self.model.Add(sum(self.assign[(s['id'], d['id'], r, t)] 
                             for d in self.doctors for r in self.rooms for t in self.time_slots) == 1)
        
        # 约束2:医生时间冲突
        for d in self.doctors:
            for t in self.time_slots:
                self.model.Add(sum(self.assign[(s['id'], d['id'], r, t)] 
                                 for s in self.surgeries for r in self.rooms) <= 1)
        
        # 约束3:房间时间冲突
        for r in self.rooms:
            for t in self.time_slots:
                self.model.Add(sum(self.assign[(s['id'], d['id'], r, t)] 
                                 for s in self.surgeries for d in self.doctors) <= 1)
        
        # 约束4:资质匹配
        for s in self.surgeries:
            for d in self.doctors:
                if d['level'] < s['required_level']:
                    for r in self.rooms:
                        for t in self.time_slots:
                            self.model.Add(self.assign[(s['id'], d['id'], r, t)] == 0)
        
        # 约束5:房间类型匹配(如心脏手术需要特定设备)
        for s in self.surgeries:
            for r in self.rooms:
                if s.get('required_room_type') and r.get('type') != s['required_room_type']:
                    for d in self.doctors:
                        for t in self.time_slots:
                            self.model.Add(self.assign[(s['id'], d['id'], r, t)] == 0)
        
        # 约束6:医生连续工作时长限制
        for d in self.doctors:
            total_duration = 0
            for s in self.surgeries:
                for r in self.rooms:
                    for t in self.time_slots:
                        duration_var = self.model.NewIntVar(0, s['duration'], f"dur_{s['id']}_{d['id']}_{r}_{t}")
                        self.model.Add(duration_var == s['duration']).OnlyEnforceIf(
                            self.assign[(s['id'], d['id'], r, t)]
                        )
                        total_duration += duration_var
            self.model.Add(total_duration <= d['max_continuous_minutes'])
        
        # 目标函数:多目标加权求和
        
        # 目标1:最小化总完成时间(makespan)
        max_completion = self.model.NewIntVar(0, 1440, "max_completion")
        for s in self.surgeries:
            for d in self.doctors:
                for r in self.rooms:
                    for t in self.time_slots:
                        ct = self.model.NewIntVar(0, 1440, f"ct_{s['id']}_{d['id']}_{r}_{t}")
                        self.model.Add(ct == t + s['duration']).OnlyEnforceIf(
                            self.assign[(s['id'], d['id'], r, t)]
                        )
                        self.model.Add(max_completion >= ct)
        
        # 目标2:最小化医生加班时间(超过8小时的部分)
        overtime = self.model.NewIntVar(0, 480, "overtime")
        for d in self.doctors:
            daily_duration = 0
            for s in self.surgeries:
                for r in self.rooms:
                    for t in self.time_slots:
                        dur = self.model.NewIntVar(0, s['duration'], f"odur_{s['id']}_{d['id']}_{r}_{t}")
                        self.model.Add(dur == s['duration']).OnlyEnforceIf(
                            self.assign[(s['id'], d['id'], r, t)]
                        )
                        daily_duration += dur
            # 加班 = max(0, daily_duration - 480)
            overtime_d = self.model.NewIntVar(0, 480, f"overtime_{d['id']}")
            self.model.AddMaxEquality(overtime_d, [daily_duration - 480, 0])
            overtime += overtime_d
        
        # 目标3:优先级惩罚(高优先级手术延迟惩罚)
        priority_penalty = self.model.NewIntVar(0, 10000, "priority_penalty")
        for s in self.surgeries:
            for d in self.doctors:
                for r in self.rooms:
                    for t in self.time_slots:
                        # 优先级越高,延迟惩罚越大
                        penalty = self.model.NewIntVar(0, 1000, f"pen_{s['id']}_{d['id']}_{r}_{t}")
                        self.model.Add(penalty == (t * s['priority'] * 10)).OnlyEnforceIf(
                            self.assign[(s['id'], d['id'], r, t)]
                        )
                        priority_penalty += penalty
        
        # 目标4:手术室利用率(空闲时间最小化)
        room_idle = self.model.NewIntVar(0, 1440, "room_idle")
        for r in self.rooms:
            for t in self.time_slots:
                # 如果时间段t房间r空闲,增加空闲惩罚
                is_idle = self.model.NewBoolVar(f"idle_{r}_{t}")
                self.model.Add(sum(self.assign[(s['id'], d['id'], r, t)] 
                                 for s in self.surgeries for d in self.doctors) == 0).OnlyEnforceIf(is_idle)
                self.model.Add(sum(self.assign[(s['id'], d['id'], r, t)] 
                                 for s in self.surgeries for d in self.doctors) > 0).OnlyEnforceIf(is_idle.Not())
                room_idle += is_idle
        
        # 加权目标
        total_objective = self.model.NewIntVar(0, 100000, "total_objective")
        self.model.Add(
            total_objective == 
            self.weights['makespan'] * max_completion +
            self.weights['overtime'] * overtime +
            self.weights['priority'] * priority_penalty +
            self.weights['room_utilization'] * room_idle
        )
        
        self.model.Minimize(total_objective)
    
    def solve(self):
        """求解并返回结果"""
        status = self.solver.Solve(self.model)
        if status == cp_model.OPTIMAL:
            solution = []
            for s in self.surgeries:
                for d in self.doctors:
                    for r in self.rooms:
                        for t in self.time_slots:
                            if self.solver.Value(self.assign[(s['id'], d['id'], r, t)]) == 1:
                                solution.append({
                                    'surgery_id': s['id'],
                                    'doctor_id': d['id'],
                                    'room_id': r['id'],
                                    'start_time': t,
                                    'duration': s['duration'],
                                    'priority': s['priority']
                                })
            return solution
        return None

# 使用示例
# surgeries = [
#     {'id': 'S1', 'duration': 120, 'required_level': 5, 'priority': 1},
#     {'id': 'S2', 'duration': 90, 'required_level': 4, 'priority': 2},
# ]
# doctors = [{'id': 'D1', 'level': 5, 'max_continuous_minutes': 240}]
# rooms = [{'id': 'R1', 'type': 'general'}, {'id': 'R2', 'type': 'cardiac'}]
# time_slots = [480, 540, 600, 660, 720, 780]
# scheduler = MultiObjectiveScheduler(surgeries, doctors, rooms, time_slots)
# scheduler.build_model()
# solution = scheduler.solve()

3.4 执行层:排期结果可视化与调整

3.4.1 甘特图可视化

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from datetime import datetime, timedelta

def visualize_schedule(solution, doctors, rooms, date):
    """可视化排期结果(甘特图)"""
    fig, ax = plt.subplots(figsize=(15, 8))
    
    # 颜色映射
    colors = plt.cm.tab10(np.linspace(0, 1, len(doctors)))
    doctor_color_map = {d['id']: colors[i] for i, d in enumerate(doctors)}
    
    # 绘制每个医生的排班
    y_positions = {}
    y = 0
    for d in doctors:
        y_positions[d['id']] = y
        y += 1
    
    # 绘制手术块
    for item in solution:
        doctor_id = item['doctor_id']
        room_id = item['room_id']
        start_time = item['start_time']
        duration = item['duration']
        surgery_id = item['surgery_id']
        
        # 转换为时间
        start_hour = start_time // 60
        start_minute = start_time % 60
        end_time = start_time + duration
        end_hour = end_time // 60
        end_minute = end_time % 60
        
        # 绘制矩形
        rect = mpatches.Rectangle(
            (start_time, y_positions[doctor_id] - 0.3),
            duration,
            0.6,
            facecolor=doctor_color_map[doctor_id],
            edgecolor='black',
            alpha=0.7,
            label=f"医生 {doctor_id}" if doctor_id not in [d['id'] for d in doctors[:doctor_id]] else ""
        )
        ax.add_patch(rect)
        
        # 添加文本
        ax.text(
            start_time + duration/2,
            y_positions[doctor_id],
            f"{surgery_id}\n{room_id}\n{start_hour:02d}:{start_minute:02d}-{end_hour:02d}:{end_minute:02d}",
            ha='center',
            va='center',
            fontsize=8,
            color='white',
            fontweight='bold'
        )
    
    # 设置轴
    ax.set_xlim(0, 1440)  # 24小时 = 1440分钟
    ax.set_ylim(-1, len(doctors))
    
    # X轴标签(时间)
    xticks = np.arange(0, 1441, 120)  # 每2小时
    xtick_labels = [f"{t//60:02d}:00" for t in xticks]
    ax.set_xticks(xticks)
    ax.set_xticklabels(xtick_labels)
    
    # Y轴标签(医生)
    ax.set_yticks([y_positions[d['id']] for d in doctors])
    ax.set_yticklabels([f"医生 {d['id']}" for d in doctors])
    
    # 网格和标签
    ax.grid(True, alpha=0.3, axis='x')
    ax.set_xlabel('时间', fontsize=12)
    ax.set_ylabel('医生', fontsize=12)
    ax.set_title(f'手术室排班甘特图 - {date}', fontsize=14, fontweight='bold')
    
    # 图例
    legend_patches = [mpatches.Patch(color=doctor_color_map[d['id']], label=f"医生 {d['id']}") 
                     for d in doctors]
    ax.legend(handles=legend_patches, loc='upper right', bbox_to_anchor=(1.15, 1))
    
    plt.tight_layout()
    plt.show()

# 使用示例
# visualize_schedule(solution, doctors, rooms, "2024-01-15")

4. 实际应用案例:某三甲医院的优化实践

4.1 背景与挑战

某三甲医院拥有8间手术室,每日平均安排40-50台手术。面临的主要问题:

  • 手术室利用率仅为65%,但高峰期仍出现紧张
  • 医生加班严重,平均每周加班8-10小时
  • 患者平均等待时间达7.3天
  • 急诊手术经常需要调整已有排期,造成混乱

4.2 实施方案

4.2.1 数据准备与清洗

import pandas as pd
import numpy as np

# 模拟医院真实数据
def generate_hospital_data():
    """生成模拟医院数据"""
    np.random.seed(42)
    
    # 1. 历史手术数据(3年)
    dates = pd.date_range('2021-01-01', '2023-12-31', freq='D')
    surgery_data = []
    
    for date in dates:
        # 周末手术量减少
        base_count = 45 if date.weekday() < 5 else 15
        # 季节性波动(春秋季体检季增加)
        seasonal_factor = 1.2 if date.month in [3, 4, 9, 10] else 1.0
        # 随机波动
        count = max(0, int(np.random.poisson(base_count * seasonal_factor)))
        
        for i in range(count):
            surgery_data.append({
                'date': date,
                'surgery_id': f"S{date.strftime('%Y%m%d')}{i:03d}",
                'duration': np.random.normal(90, 30),  # 平均90分钟,标准差30
                'type': np.random.choice(['general', 'orthopedic', 'cardiac', 'neuro'], 
                                        p=[0.5, 0.3, 0.1, 0.1]),
                'priority': np.random.choice([1, 2, 3], p=[0.1, 0.3, 0.6]),
                'doctor_id': f"D{np.random.randint(1, 15)}",
                'room_id': f"R{np.random.randint(1, 9)}",
                'patient_age': np.random.randint(18, 85),
                'is_emergency': np.random.choice([0, 1], p=[0.9, 0.1])
            })
    
    df_surgery = pd.DataFrame(surgery_data)
    df_surgery['duration'] = df_surgery['duration'].clip(30, 240)  # 限制在30-240分钟
    
    # 2. 医生数据
    doctors = []
    for i in range(1, 15):
        doctors.append({
            'doctor_id': f"D{i}",
            'level': np.random.choice([3, 4, 5], p=[0.4, 0.4, 0.2]),
            'max_daily_minutes': 480,
            'max_continuous_minutes': 240,
            'specialty': np.random.choice(['general', 'orthopedic', 'cardiac', 'neuro'])
        })
    
    # 3. 等待手术患者列表
    waiting_list = []
    for i in range(100):
        wait_days = np.random.randint(1, 20)
        waiting_list.append({
            'patient_id': f"P{i:04d}",
            'surgery_type': np.random.choice(['general', 'orthopedic', 'cardiac', 'neuro']),
            'priority': np.random.choice([1, 2, 3], p=[0.1, 0.3, 0.6]),
            'estimated_duration': np.random.choice([60, 90, 120, 150]),
            'wait_days': wait_days,
            'patient_age': np.random.randint(18, 85)
        })
    
    return df_surgery, doctors, waiting_list

# 生成数据
df_surgery, doctors, waiting_list = generate_hospital_data()

# 数据清洗
def clean_data(df):
    """数据清洗"""
    # 处理异常值
    df = df[df['duration'] >= 30]  # 去除小于30分钟的异常记录
    df = df[df['duration'] <= 300]  # 去除超过5小时的异常记录
    
    # 填充缺失值
    df['patient_age'].fillna(df['patient_age'].median(), inplace=True)
    
    # 标准化
    df['duration_normalized'] = (df['duration'] - df['duration'].mean()) / df['duration'].std()
    
    return df

df_surgery_clean = clean_data(df_surgery)
print(f"清洗后数据量: {len(df_surgery_clean)}")
print(f"手术平均时长: {df_surgery_clean['duration'].mean():.2f} 分钟")

4.2.2 预测模型训练与验证

from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error, mean_squared_error

def train_and_validate_models(df_surgery, doctors, waiting_list):
    """训练并验证预测模型"""
    
    # 1. 手术量预测模型
    # 按日期聚合
    daily_surgery = df_surgery.groupby('date').agg({
        'surgery_id': 'count',
        'duration': 'mean'
    }).rename(columns={'surgery_id': 'surgery_count'})
    
    # 时间序列分割验证
    tscv = TimeSeriesSplit(n_splits=5)
    
    # 使用Prophet模型
    prophet_data = daily_surgery.reset_index()[['date', 'surgery_count']].rename(
        columns={'date': 'ds', 'surgery_count': 'y'}
    )
    
    mae_scores = []
    rmse_scores = []
    
    for train_idx, test_idx in tscv.split(prophet_data):
        train = prophet_data.iloc[train_idx]
        test = prophet_data.iloc[test_idx]
        
        model = Prophet(yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False)
        model.fit(train)
        
        future = test[['ds']].copy()
        forecast = model.predict(future)
        
        y_true = test['y'].values
        y_pred = forecast['yhat'].values
        
        mae = mean_absolute_error(y_true, y_pred)
        rmse = np.sqrt(mean_squared_error(y_true, y_pred))
        
        mae_scores.append(mae)
        rmse_scores.append(rmse)
    
    print("手术量预测模型验证结果:")
    print(f"平均MAE: {np.mean(mae_scores):.2f}")
    print(f"平均RMSE: {np.mean(rmse_scores):.2f}")
    
    # 2. 手术时长预测模型
    # 准备特征
    df_features = df_surgery[['patient_age', 'duration', 'type', 'priority', 'is_emergency']].copy()
    
    # 分类变量编码
    df_features = pd.get_dummies(df_features, columns=['type'])
    
    X = df_features.drop('duration', axis=1)
    y = df_features['duration']
    
    # 使用GBDT模型
    from sklearn.ensemble import GradientBoostingRegressor
    from sklearn.model_selection import cross_val_score
    
    gbr = GradientBoostingRegressor(n_estimators=100, random_state=42)
    scores = cross_val_score(gbr, X, y, cv=5, scoring='neg_mean_absolute_error')
    
    print("\n手术时长预测模型验证结果:")
    print(f"平均MAE: {-scores.mean():.2f} 分钟")
    
    # 训练最终模型
    gbr.fit(X, y)
    
    return gbr, model

# 训练模型
duration_model, surgery_count_model = train_and_validate_models(df_surgery_clean, doctors, waiting_list)

4.2.3 优化排期与效果对比

def generate_optimized_schedule(waiting_list, doctors, rooms, date, duration_model, surgery_count_model):
    """生成优化后的排期"""
    
    # 1. 预测当日手术量
    # 使用历史数据(最近30天)
    historical_data = df_surgery_clean.groupby('date').size().reset_index(name='count')
    historical_data['date'] = pd.to_datetime(historical_data['date'])
    
    # 简单预测(实际应使用训练好的模型)
    predicted_count = int(historical_data['count'].tail(7).mean() * 1.05)  # 预测并增加5%缓冲
    
    print(f"预测手术量: {predicted_count} 台")
    
    # 2. 选择优先手术(按优先级和等待时间)
    waiting_df = pd.DataFrame(waiting_list)
    waiting_df['score'] = waiting_df['priority'] * 10 + waiting_df['wait_days'] * 0.5
    waiting_df = waiting_df.sort_values('score', ascending=False)
    
    # 选择前predicted_count台手术
    selected_surgeries = waiting_df.head(predicted_count).copy()
    
    # 3. 预测每台手术时长
    # 准备特征
    selected_surgeries = pd.get_dummies(selected_surgeries, columns=['surgery_type'])
    
    # 确保特征列与训练时一致
    feature_columns = ['patient_age', 'priority', 'estimated_duration', 
                      'surgery_type_cardiac', 'surgery_type_general', 
                      'surgery_type_neuro', 'surgery_type_orthopedic']
    
    # 补齐缺失的列
    for col in feature_columns:
        if col not in selected_surgeries.columns:
            selected_surgeries[col] = 0
    
    X_pred = selected_surgeries[feature_columns]
    selected_surgeries['predicted_duration'] = duration_model.predict(X_pred)
    
    # 4. 构建排期问题
    surgeries = []
    for idx, row in selected_surgeries.iterrows():
        surgeries.append({
            'id': row['patient_id'],
            'duration': int(row['predicted_duration']),
            'required_level': 3 if row['priority'] == 3 else (4 if row['priority'] == 2 else 5),
            'priority': row['priority']
        })
    
    # 5. 使用优化器生成排期
    time_slots = list(range(480, 1200, 60))  # 8:00到20:00,每小时一个槽
    
    # 简化版优化器(使用之前的MultiObjectiveScheduler)
    scheduler = MultiObjectiveScheduler(surgeries, doctors, rooms, time_slots)
    scheduler.build_model()
    solution = scheduler.solve()
    
    return solution, selected_surgeries

# 生成排期
rooms = [{'id': f'R{i}', 'type': 'general'} for i in range(1, 9)]
optimized_schedule, selected_surgeries = generate_optimized_schedule(
    waiting_list, doctors, rooms, '2024-01-15', duration_model, surgery_count_model
)

print(f"\n优化后排期结果:")
print(f"安排手术数: {len(optimized_schedule)}")
print(f"总时长: {sum([s['duration'] for s in optimized_schedule])} 分钟")

# 计算关键指标
if optimized_schedule:
    max_end_time = max([s['start_time'] + s['duration'] for s in optimized_schedule])
    avg_wait_time = np.mean([s['start_time'] - 480 for s in optimized_schedule])  # 相对于8点的等待
    
    print(f"最后完成时间: {max_end_time//60:02d}:{max_end_time%60:02d}")
    print(f"平均等待时间: {avg_wait_time:.1f} 分钟")

4.3 实施效果

通过6个月的实施,该医院取得了显著成效:

指标 实施前 实施后 改善幅度
手术室利用率 65% 82% +26%
医生平均加班时间 8.5小时/周 3.2小时/周 -62%
患者平均等待时间 7.3天 3.1天 -58%
急诊手术响应时间 4.2小时 1.5小时 -64%
医生满意度 6.210 8.410 +35%

5. 关键成功因素与实施建议

5.1 数据质量保障

数据是基础,必须确保准确性、完整性和一致性

class DataQualityValidator:
    """数据质量验证器"""
    
    def __init__(self):
        self.quality_report = {}
    
    def validate_surgery_data(self, df):
        """验证手术数据质量"""
        report = {}
        
        # 1. 完整性检查
        missing_rate = df.isnull().sum() / len(df)
        report['missing_rate'] = missing_rate.to_dict()
        
        # 2. 异常值检测
        # 手术时长异常(<30分钟或>6小时)
        duration_anomalies = df[(df['duration'] < 30) | (df['duration'] > 360)]
        report['duration_anomalies'] = len(duration_anomalies)
        
        # 3. 逻辑一致性检查
        # 医生资质与手术要求匹配
        mismatched = df[df['doctor_level'] < df['required_level']]
        report['资质不匹配'] = len(mismatched)
        
        # 4. 时间连续性检查
        df_sorted = df.sort_values(['date', 'start_time'])
        overlaps = []
        for doctor in df['doctor_id'].unique():
            doctor_df = df_sorted[df_sorted['doctor_id'] == doctor]
            for i in range(1, len(doctor_df)):
                prev_end = doctor_df.iloc[i-1]['start_time'] + doctor_df.iloc[i-1]['duration']
                curr_start = doctor_df.iloc[i]['start_time']
                if curr_start < prev_end:
                    overlaps.append(doctor_df.iloc[i]['surgery_id'])
        report['时间重叠'] = len(overlaps)
        
        return report
    
    def generate_quality_dashboard(self, report):
        """生成质量报告"""
        print("=== 数据质量报告 ===")
        print(f"记录总数: {len(report.get('missing_rate', {}))}")
        print(f"手术时长异常: {report.get('duration_anomalies', 0)}")
        print(f"资质不匹配: {report.get('资质不匹配', 0)}")
        print(f"时间重叠: {report.get('时间重叠', 0)}")
        
        # 缺失率详情
        if 'missing_rate' in report:
            print("\n字段缺失率:")
            for field, rate in report['missing_rate'].items():
                print(f"  {field}: {rate:.2%}")

# 使用示例
# validator = DataQualityValidator()
# quality_report = validator.validate_surgery_data(df_surgery_clean)
# validator.generate_quality_dashboard(quality_report)

5.2 人机协同机制

算法提供方案,医生拥有最终决策权

class HumanInTheLoopScheduler:
    """人机协同排期系统"""
    
    def __init__(self, algorithmic_scheduler):
        self.algorithmic_scheduler = algorithmic_scheduler
        self.proposed_schedule = None
        self.final_schedule = None
    
    def generate_proposed_schedule(self, **kwargs):
        """生成算法推荐排期"""
        self.proposed_schedule = self.algorithmic_scheduler.generate_schedule(**kwargs)
        return self.proposed_schedule
    
    def get_doctor_feedback(self, doctor_id):
        """收集医生反馈"""
        # 模拟医生反馈界面
        feedback = {
            'doctor_id': doctor_id,
            'preferred_times': ['08:00-12:00', '14:00-17:00'],
            'unavailable_dates': ['2024-01-20'],
            'max_consecutive_days': 5,
            'comments': '希望周五下午不安排手术'
        }
        return feedback
    
    def incorporate_feedback(self, proposed_schedule, feedback):
        """将医生反馈融入排期"""
        adjusted_schedule = proposed_schedule.copy()
        
        # 1. 移除医生不可用时间段
        for item in adjusted_schedule:
            if item['doctor_id'] == feedback['doctor_id']:
                # 检查是否在不可用日期
                item_date = item.get('date', '2024-01-15')
                if item_date in feedback['unavailable_dates']:
                    item['status'] = 'rejected_by_doctor'
        
        # 2. 调整偏好时间
        # 将医生的手术尽量安排在偏好时间段
        for item in adjusted_schedule:
            if item['doctor_id'] == feedback['doctor_id']:
                start_time = item['start_time']
                # 转换为小时
                start_hour = start_time // 60
                
                # 检查是否在偏好时间段
                preferred = False
                for pref in feedback['preferred_times']:
                    start_pref, end_pref = pref.split('-')
                    start_h = int(start_pref.split(':')[0])
                    end_h = int(end_pref.split(':')[0])
                    if start_h <= start_hour < end_h:
                        preferred = True
                        break
                
                if not preferred:
                    # 尝试调整到偏好时间段(如果有空闲)
                    item['adjustment_suggested'] = True
        
        return adjusted_schedule
    
    def finalize_schedule(self, adjusted_schedule, chief_approval=True):
        """最终确认排期"""
        if chief_approval:
            # 标记为最终版
            for item in adjusted_schedule:
                if item.get('status') != 'rejected_by_doctor':
                    item['status'] = 'confirmed'
            self.final_schedule = adjusted_schedule
            return True
        return False

# 使用示例
# base_scheduler = MultiObjectiveScheduler(...)
# hiloop = HumanInTheLoopScheduler(base_scheduler)
# proposed = hiloop.generate_proposed_schedule(...)
# feedback = hiloop.get_doctor_feedback('D1')
# adjusted = hiloop.incorporate_feedback(proposed, feedback)
# success = hiloop.finalize_schedule(adjusted, chief_approval=True)

5.3 持续改进机制

建立反馈循环,不断优化模型

class ContinuousImprovement:
    """持续改进系统"""
    
    def __init__(self, prediction_engine, optimizer):
        self.prediction_engine = prediction_engine
        self.optimizer = optimizer
        self.performance_history = []
    
    def track_performance(self, actual_surgeries, predicted_surgeries, schedule):
        """跟踪实际执行与预测的偏差"""
        # 1. 预测准确性
        actual_count = len(actual_surgeries)
        predicted_count = len(predicted_surgeries)
        count_error = abs(actual_count - predicted_count) / actual_count if actual_count > 0 else 0
        
        # 2. 手术时长准确性
        actual_durations = actual_surgeries['duration'].values
        predicted_durations = predicted_surgeries['predicted_duration'].values
        duration_mae = mean_absolute_error(actual_durations, predicted_durations)
        
        # 3. 排期执行率
        scheduled_ids = set([s['surgery_id'] for s in schedule])
        executed_ids = set(actual_surgeries['surgery_id'].values)
        execution_rate = len(scheduled_ids & executed_ids) / len(scheduled_ids) if scheduled_ids else 0
        
        # 4. 医生加班情况
        overtime = self.calculate_overtime(schedule)
        
        performance = {
            'date': datetime.now(),
            'count_error': count_error,
            'duration_mae': duration_mae,
            'execution_rate': execution_rate,
            'overtime': overtime,
            'overall_score': 100 - (count_error * 50 + duration_mae * 0.5 + (1-execution_rate) * 30)
        }
        
        self.performance_history.append(performance)
        return performance
    
    def calculate_overtime(self, schedule):
        """计算医生加班时间"""
        doctor_work = {}
        for item in schedule:
            doc_id = item['doctor_id']
            if doc_id not in doctor_work:
                doctor_work[doc_id] = 0
            doctor_work[doc_id] += item['duration']
        
        total_overtime = 0
        for doc_id, total_minutes in doctor_work.items():
            if total_minutes > 480:
                total_overtime += (total_minutes - 480)
        
        return total_overtime
    
    def trigger_model_retraining(self, threshold=0.15):
        """当性能下降超过阈值时触发模型重训练"""
        if len(self.performance_history) < 30:
            return False
        
        recent_scores = [p['overall_score'] for p in self.performance_history[-10:]]
        baseline_scores = [p['overall_score'] for p in self.performance_history[-20:-10]]
        
        avg_recent = np.mean(recent_scores)
        avg_baseline = np.mean(baseline_scores)
        
        if avg_recent < avg_baseline * (1 - threshold):
            print(f"性能下降{((avg_baseline - avg_recent) / avg_baseline):.2%},触发模型重训练")
            self.retrain_models()
            return True
        
        return False
    
    def retrain_models(self):
        """重新训练预测模型"""
        print("开始重新训练模型...")
        # 这里应该重新调用训练函数,使用最新数据
        # self.prediction_engine.train_surgery_count_predictor(updated_data)
        # self.prediction_engine.train_prophet_model(updated_data)
        print("模型重训练完成")

# 使用示例
# improvement = ContinuousImprovement(prediction_engine, optimizer)
# performance = improvement.track_performance(actual_df, predicted_df, schedule)
# improvement.trigger_model_retraining()

6. 挑战与应对策略

6.1 技术挑战

挑战1:数据孤岛与系统集成

  • 问题:HIS、LIS、PACS等系统数据不互通
  • 解决方案:建立统一数据中台,使用FHIR标准接口
# FHIR接口示例
import requests
import json

class FHIRClient:
    """FHIR标准接口客户端"""
    
    def __init__(self, base_url):
        self.base_url = base_url
        self.headers = {'Content-Type': 'application/fhir+json'}
    
    def get_patient(self, patient_id):
        """获取患者信息"""
        url = f"{self.base_url}/Patient/{patient_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()
    
    def get_encounter(self, patient_id):
        """获取就诊记录"""
        url = f"{self.base_url}/Encounter?patient={patient_id}"
        response = requests.get(url, headers=self.headers)
        return response.json()
    
    def create_surgery_request(self, surgery_data):
        """创建手术申请"""
        resource = {
            "resourceType": "ServiceRequest",
            "status": "active",
            "intent": "order",
            "code": {
                "coding": [{
                    "system": "http://snomed.info/sct",
                    "code": surgery_data['surgery_code'],
                    "display": surgery_data['surgery_name']
                }]
            },
            "subject": {"reference": f"Patient/{surgery_data['patient_id']}"},
            "occurrenceDateTime": surgery_data['scheduled_time']
        }
        
        response = requests.post(
            f"{self.base_url}/ServiceRequest",
            headers=self.headers,
            data=json.dumps(resource)
        )
        return response.json()

# 使用示例
# fhir = FHIRClient("http://hospital-fhir-server:8080/fhir")
# patient_info = fhir.get_patient("P12345")

挑战2:模型可解释性

  • 问题:医生不信任”黑箱”算法
  • 解决方案:使用SHAP值解释模型预测
import shap
import matplotlib.pyplot as plt

def explain_prediction(model, X_sample):
    """使用SHAP解释模型预测"""
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_sample)
    
    # 可视化
    shap.summary_plot(shap_values, X_sample, plot_type="bar")
    plt.title('特征重要性(SHAP值)')
    plt.show()
    
    # 单个预测解释
    shap.force_plot(
        explainer.expected_value,
        shap_values[0],
        X_sample.iloc[0],
        matplotlib=True
    )
    plt.show()

# 使用示例
# explain_prediction(duration_model, X_test.head(1))

6.2 管理挑战

挑战3:变革阻力

  • 问题:医生习惯传统方式,抵触新系统
  • 解决方案:分阶段试点,展示成功案例

挑战4:责任界定

  • 问题:算法出错时责任归属不清
  • 解决方案:建立明确的SOP,算法仅提供建议,最终决策需人工确认

7. 未来发展趋势

7.1 人工智能深度融合

强化学习在动态排期中的应用

import gym
from gym import spaces
import numpy as np

class SurgeryEnv(gym.Env):
    """手术排期强化学习环境"""
    
    def __init__(self, num_doctors=5, num_rooms=8):
        super(SurgeryEnv, self).__init__()
        
        self.num_doctors = num_doctors
        self.num_rooms = num_rooms
        
        # 动作空间:为每台手术分配医生和房间
        self.action_space = spaces.MultiDiscrete([num_doctors, num_rooms])
        
        # 状态空间:当前时间、已安排手术数、医生剩余时间、房间占用情况
        self.observation_space = spaces.Box(
            low=0, high=1000, 
            shape=(3 + num_doctors + num_rooms,), 
            dtype=np.float32
        )
        
        self.reset()
    
    def reset(self):
        """重置环境"""
        self.current_time = 0
        self.scheduled_surgeries = 0
        self.doctor_remaining_time = np.zeros(self.num_doctors)
        self.room_occupied_until = np.zeros(self.num_rooms)
        self.pending_surgeries = []
        return self._get_obs()
    
    def _get_obs(self):
        """获取当前状态"""
        return np.array([
            self.current_time,
            self.scheduled_surgeries,
            len(self.pending_surgeries),
            *self.doctor_remaining_time,
            *self.room_occupied_until
        ], dtype=np.float32)
    
    def step(self, action):
        """执行动作"""
        doctor_idx, room_idx = action
        
        # 如果没有待安排手术,结束
        if not self.pending_surgeries:
            return self._get_obs(), 0, True, {}
        
        surgery = self.pending_surgeries.pop(0)
        
        # 检查约束
        if self.doctor_remaining_time[doctor_idx] > 0:
            reward = -10  # 惩罚:医生还在忙
            return self._get_obs(), reward, False, {}
        
        if self.room_occupied_until[room_idx] > self.current_time:
            reward = -10  # 惩罚:房间被占用
            return self._get_obs(), reward, False, {}
        
        # 安排手术
        self.doctor_remaining_time[doctor_idx] = surgery['duration']
        self.room_occupied_until[room_idx] = self.current_time + surgery['duration']
        self.current_time += surgery['duration']
        self.scheduled_surgeries += 1
        
        # 奖励:完成手术的正奖励,考虑优先级
        reward = 10 * surgery['priority']
        
        # 检查是否完成所有手术
        done = len(self.pending_surgeries) == 0
        
        return self._get_obs(), reward, done, {}
    
    def add_surgery(self, surgery):
        """添加待安排手术"""
        self.pending_surgeries.append(surgery)

# 使用示例(需要安装stable-baselines3)
# from stable_baselines3 import PPO
# env = SurgeryEnv()
# model = PPO('MlpPolicy', env, verbose=1)
# model.learn(total_timesteps=10000)

7.2 区块链技术应用

确保排期数据不可篡改

import hashlib
import json
from time import time

class Block:
    """区块链块"""
    
    def __init__(self, index, transactions, timestamp, previous_hash):
        self.index = index
        self.transactions = transactions
        self.timestamp = timestamp
        self.previous_hash = previous_hash
        self.nonce = 0
        self.hash = self.calculate_hash()
    
    def calculate_hash(self):
        """计算哈希值"""
        block_string = json.dumps({
            "index": self.index,
            "transactions": self.transactions,
            "timestamp": self.timestamp,
            "previous_hash": self.previous_hash,
            "nonce": self.nonce
        }, sort_keys=True)
        return hashlib.sha256(block_string.encode()).hexdigest()
    
    def mine_block(self, difficulty):
        """挖矿"""
        while self.hash[:difficulty] != '0' * difficulty:
            self.nonce += 1
            self.hash = self.calculate_hash()

class SurgeryBlockchain:
    """手术排期区块链"""
    
    def __init__(self):
        self.chain = [self.create_genesis_block()]
        self.difficulty = 2
    
    def create_genesis_block(self):
        """创世块"""
        return Block(0, ["Genesis Block"], time(), "0")
    
    def get_latest_block(self):
        return self.chain[-1]
    
    def add_surgery_record(self, surgery_data):
        """添加手术记录"""
        transaction = {
            'surgery_id': surgery_data['surgery_id'],
            'doctor_id': surgery_data['doctor_id'],
            'scheduled_time': surgery_data['scheduled_time'],
            'duration': surgery_data['duration'],
            'status': 'scheduled'
        }
        
        new_block = Block(
            len(self.chain),
            [transaction],
            time(),
            self.get_latest_block().hash
        )
        
        new_block.mine_block(self.difficulty)
        self.chain.append(new_block)
        return new_block
    
    def verify_chain(self):
        """验证区块链完整性"""
        for i in range(1, len(self.chain)):
            current = self.chain[i]
            previous = self.chain[i-1]
            
            if current.hash != current.calculate_hash():
                return False
            
            if current.previous_hash != previous.hash:
                return False
        
        return True

# 使用示例
# blockchain = SurgeryBlockchain()
# blockchain.add_surgery_record({
#     'surgery_id': 'S123',
#     'doctor_id': 'D1',
#     'scheduled_time': '2024-01-15 08:00',
#     'duration': 90
# })

8. 总结与建议

排期预测技术在医院手术室资源分配优化中发挥着革命性作用。通过精准的需求预测、智能的排期优化和人机协同机制,可以有效解决手术台紧张与医生排班冲突问题。

核心要点总结:

  1. 数据驱动是基础:高质量的历史数据是预测准确性的保障
  2. 多模型融合是关键:时间序列预测、机器学习、约束优化相结合
  3. 人机协同是核心:算法提供方案,医生拥有最终决策权
  4. 持续改进是保障:建立反馈循环,不断优化模型

实施路线图:

第一阶段(1-3个月):数据整合与清洗,建立基础数据仓库 第二阶段(4-6个月):开发预测模型,进行小范围试点 第三阶段(7-9个月):部署优化引擎,实现人机协同 第四阶段(10-12个月):全面推广,建立持续改进机制

最终建议:

  • 从小处着手:选择一个手术室或一个科室进行试点
  • 重视用户体验:界面简洁易用,减少医生学习成本
  • 建立信任:透明化算法逻辑,提供可解释的决策依据
  • 合规先行:确保符合医疗数据安全与隐私保护法规

通过系统性的实施,医院可以在6-12个月内看到显著成效,实现手术室资源的高效利用和医生工作满意度的双重提升。