引言:为什么需要手术室排期预测系统?
在现代医院管理中,手术室是医院资源最密集、成本最高、同时也是创收能力最强的部门之一。手术室的排期安排直接关系到医院的运营效率、医疗资源的利用率以及患者的就医体验。传统的手工排期方式存在诸多问题:
- 效率低下:人工排期需要大量时间收集信息、协调各方,容易出现排期冲突
- 资源浪费:无法充分利用手术室资源,导致手术室空置率高
- 缺乏预测性:无法根据历史数据预测未来手术需求,难以进行前瞻性资源调配
- 应急响应差:遇到紧急手术时,难以快速调整排期,影响救治效率
一个智能的手术室排期预测系统能够通过数据分析和机器学习算法,帮助医院实现:
- 自动化排期,减少人工干预
- 预测未来手术需求,优化资源配置
- 提高手术室利用率,降低运营成本
- 提升患者满意度,缩短等待时间
系统架构设计
整体架构
一个完整的医院手术室排期预测系统通常包含以下几个核心模块:
┌─────────────────────────────────────────────────────────────┐
│ 手术室排期预测系统 │
├─────────────────────────────────────────────────────────────┤
│ 用户界面层 (Web/移动端) │
├─────────────────────────────────────────────────────────────┤
│ 业务逻辑层 │
│ - 排期管理模块 │
│ - 预测分析模块 │
│ - 资源管理模块 │
│ - 报表统计模块 │
├─────────────────────────────────────────────────────────────┤
│ 数据处理层 │
│ - 数据清洗与转换 │
│ - 特征工程 │
│ - 机器学习模型 │
├─────────────────────────────────────────────────────────────┤
│ 数据存储层 │
│ - 关系型数据库 (MySQL/PostgreSQL) │
│ - 时序数据库 (InfluxDB) │
│ - 缓存层 (Redis) │
└─────────────────────────────────────────────────────────────┘
技术栈选择
- 后端框架:Python + FastAPI/Django(适合快速开发和数据处理)
- 前端框架:Vue.js/React(构建用户友好的界面)
- 数据库:MySQL(存储结构化数据),Redis(缓存),InfluxDB(存储时序数据)
- 机器学习:Scikit-learn, XGBoost, Prophet(预测模型)
- 部署:Docker + Kubernetes(容器化部署)
核心功能模块详解
1. 数据模型设计
首先,我们需要设计合理的数据模型来存储手术室排期相关数据。
# models.py
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel
from enum import Enum
class SurgeryType(str, Enum):
"""手术类型枚举"""
EMERGENCY = "emergency" # 急诊手术
ELECTIVE = "elective" # 择期手术
URGENT = "urgent" # 限期手术
class SurgeryStatus(str, Enum):
"""手术状态枚举"""
SCHEDULED = "scheduled" # 已安排
IN_PROGRESS = "in_progress" # 进行中
COMPLETED = "completed" # 已完成
CANCELLED = "cancelled" # 已取消
class Surgeon(BaseModel):
"""医生模型"""
id: str
name: str
specialty: str # 专业领域
rank: str # 职称
efficiency_score: float = 0.0 # 效率评分(0-1)
class OperatingRoom(BaseModel):
"""手术室模型"""
id: str
name: str
capacity: int = 1 # 同时可进行的手术数量
equipment: List[str] # 配备设备
is_available: bool = True
class Surgery(BaseModel):
"""手术模型"""
id: str
patient_id: str
patient_name: str
surgery_type: SurgeryType
estimated_duration: int # 预计时长(分钟)
required_equipment: List[str]
preferred_surgeon_id: str
preferred_date: Optional[datetime] = None
status: SurgeryStatus = SurgeryStatus.SCHEDULED
priority: int = 0 # 优先级(0-10)
class SurgerySchedule(BaseModel):
"""排期模型"""
id: str
surgery_id: str
operating_room_id: str
scheduled_start: datetime
scheduled_end: datetime
actual_start: Optional[datetime] = None
actual_end: Optional[datetime] = None
surgeon_id: str
notes: Optional[str] = None
class PredictionInput(BaseModel):
"""预测输入模型"""
date_range_start: datetime
date_range_end: datetime
historical_data_points: int = 30 # 使用多少天的历史数据
surgical_specialties: List[str] = [] # 特定专科
2. 数据预处理与特征工程
在进行预测之前,我们需要对历史数据进行清洗和特征工程。这是机器学习项目中最关键的一步。
# data_preprocessing.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List
class SurgeryDataPreprocessor:
"""手术数据预处理器"""
def __init__(self, db_connection):
self.db = db_connection
self.features = []
def load_historical_data(self, start_date: datetime, end_date: datetime) -> pd.DataFrame:
"""从数据库加载历史手术数据"""
query = """
SELECT
s.id as surgery_id,
s.surgery_type,
s.estimated_duration,
s.priority,
sr.name as surgeon_name,
sr.specialty,
sr.efficiency_score,
sr.rank,
os.scheduled_start,
os.scheduled_end,
os.actual_start,
os.actual_end,
os.operating_room_id,
DATEDIFF(s.scheduled_date, CURDATE()) as days_until_surgery
FROM surgeries s
JOIN surgery_schedules os ON s.id = os.surgery_id
JOIN surgeons sr ON os.surgeon_id = sr.id
WHERE os.scheduled_start BETWEEN %s AND %s
"""
df = pd.read_sql(query, self.db, params=[start_date, end_date])
return df
def create_temporal_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""创建时间特征"""
df['scheduled_start'] = pd.to_datetime(df['scheduled_start'])
# 基础时间特征
df['hour_of_day'] = df['scheduled_start'].dt.hour
df['day_of_week'] = df['scheduled_start'].dt.dayofweek
df['day_of_month'] = df['scheduled_start'].dt.day
df['month'] = df['scheduled_start'].dt.month
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# 是否为节假日(简化版,实际应使用节假日日历)
df['is_holiday'] = 0 # 需要集成节假日API
# 手术室使用情况特征
df['room_utilization'] = df.groupby(['operating_room_id', 'scheduled_start'])['surgery_id'].transform('count')
return df
def create_surgeon_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""创建医生相关特征"""
# 医生效率评分
df['surgeon_efficiency'] = df['efficiency_score']
# 医生职称编码
rank_mapping = {'住院医师': 1, '主治医师': 2, '副主任医师': 3, '主任医师': 4}
df['surgeon_rank_encoded'] = df['rank'].map(rank_mapping).fillna(0)
# 医生历史手术量(简化:使用整体平均值代替)
df['surgeon_historical_volume'] = df.groupby('surgeon_name')['surgery_id'].transform('count')
return df
def create_surgery_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""创建手术相关特征"""
# 手术类型编码
type_mapping = {'emergency': 1, 'elective': 0, 'urgent': 0.5}
df['surgery_type_encoded'] = df['surgery_type'].map(type_mapping)
# 手术时长分段
df['duration_category'] = pd.cut(
df['estimated_duration'],
bins=[0, 60, 120, 240, 1000],
labels=['short', 'medium', 'long', 'very_long']
)
df = pd.get_dummies(df, columns=['duration_category'], prefix='duration')
# 优先级特征
df['priority_normalized'] = df['priority'] / 10.0
# 手术复杂度(基于设备数量和时长)
df['complexity_score'] = (
df['estimated_duration'] / 60.0 +
df['required_equipment'].apply(lambda x: len(x) if isinstance(x, list) else 0)
)
return df
def create_aggregation_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""创建聚合特征"""
# 每日手术总量
daily_stats = df.groupby(df['scheduled_start'].dt.date).agg({
'surgery_id': 'count',
'estimated_duration': ['mean', 'sum']
}).round(2)
daily_stats.columns = ['daily_surgery_count', 'avg_duration', 'total_duration']
# 每周手术量趋势
df['week_number'] = df['scheduled_start'].dt.isocalendar().week
weekly_stats = df.groupby('week_number')['surgery_id'].count()
# 医生工作量
surgeon_daily_workload = df.groupby(['surgeon_name', df['scheduled_start'].dt.date])['surgery_id'].count()
return df
def prepare_features_for_model(self, df: pd.DataFrame) -> pd.DataFrame:
"""准备最终的特征矩阵"""
# 选择特征列
feature_columns = [
'hour_of_day', 'day_of_week', 'day_of_month', 'month',
'is_weekend', 'is_holiday', 'room_utilization',
'surgeon_efficiency', 'surgeon_rank_encoded', 'surgeon_historical_volume',
'surgery_type_encoded', 'priority_normalized', 'complexity_score',
'duration_short', 'duration_medium', 'duration_long', 'duration_very_long'
]
# 处理缺失值
df[feature_columns] = df[feature_columns].fillna(0)
# 返回特征矩阵和目标变量(实际完成时间与预计时间的差异)
df['duration_diff'] = (pd.to_datetime(df['actual_end']) - pd.to_datetime(df['actual_start'])).dt.total_seconds() / 60
df['duration_diff'] = df['duration_diff'] - df['estimated_duration']
return df[feature_columns], df['duration_diff']
# 使用示例
if __name__ == "__main__":
# 模拟数据库连接
import sqlite3
conn = sqlite3.connect(':memory:')
# 创建模拟数据
mock_data = pd.DataFrame({
'surgery_id': ['S001', 'S002', 'S003'],
'surgery_type': ['emergency', 'elective', 'urgent'],
'estimated_duration': [120, 90, 180],
'priority': [8, 3, 6],
'surgeon_name': ['Dr. Zhang', 'Dr. Li', 'Dr. Wang'],
'specialty': ['Cardio', 'Ortho', 'Neuro'],
'efficiency_score': [0.9, 0.85, 0.95],
'rank': ['主任医师', '主治医师', '副主任医师'],
'scheduled_start': ['2024-01-15 08:00:00', '2024-01-15 09:30:00', '2024-01-15 11:00:00'],
'scheduled_end': ['2024-01-15 10:00:00', '2024-01-15 11:00:00', '2024-01-15 14:00:00'],
'actual_start': ['2024-01-15 08:05:00', '2024-01-15 09:30:00', '2024-01-15 11:10:00'],
'actual_end': ['2024-01-15 10:15:00', '2024-01-15 11:00:00', '2024-01-15 14:30:00'],
'operating_room_id': ['OR-01', 'OR-02', 'OR-01'],
'required_equipment': [['设备A', '设备B'], ['设备C'], ['设备A', '设备D']]
})
mock_data.to_sql('surgeries', conn, if_exists='replace', index=False)
mock_data.to_sql('surgery_schedules', conn, if_exists='replace', index=False)
mock_data.to_sql('surgeons', conn, if_exists='replace', index=False)
# 执行预处理
preprocessor = SurgeryDataPreprocessor(conn)
df = preprocessor.load_historical_data(
datetime(2024, 1, 1),
datetime(2024, 1, 31)
)
df = preprocessor.create_temporal_features(df)
df = preprocessor.create_surgeon_features(df)
df = preprocessor.create_surgery_features(df)
df = preprocessor.create_aggregation_features(df)
X, y = preprocessor.prepare_features_for_model(df)
print("特征矩阵形状:", X.shape)
print("特征列:", X.columns.tolist())
print("\n前5行特征数据:")
print(X.head())
3. 预测模型开发
我们将使用多种机器学习方法来预测手术时长和排期冲突概率。
# prediction_models.py
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import joblib
import warnings
warnings.filterwarnings('ignore')
class SurgeryDurationPredictor:
"""手术时长预测器"""
def __init__(self):
self.models = {
'random_forest': RandomForestRegressor(n_estimators=100, random_state=42),
'gradient_boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
'linear_regression': LinearRegression()
}
self.scaler = StandardScaler()
self.best_model = None
self.feature_names = None
def prepare_training_data(self, X: pd.DataFrame, y: pd.Series) -> tuple:
"""准备训练数据"""
# 数据分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 特征标准化
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
return X_train_scaled, X_test_scaled, y_train, y_test
def train_and_evaluate(self, X: pd.DataFrame, y: pd.Series) -> dict:
"""训练并评估多个模型"""
X_train, X_test, y_train, y_test = self.prepare_training_data(X, y)
results = {}
for name, model in self.models.items():
print(f"训练模型: {name}")
# 训练
model.fit(X_train, y_train)
# 预测
y_pred_train = model.predict(X_train)
y_pred_test = model.predict(X_test)
# 评估指标
train_mae = mean_absolute_error(y_train, y_pred_train)
test_mae = mean_absolute_error(y_test, y_pred_test)
train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
r2 = r2_score(y_test, y_pred_test)
# 交叉验证
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='neg_mean_absolute_error')
cv_mae = -cv_scores.mean()
results[name] = {
'train_mae': train_mae,
'test_mae': test_mae,
'train_rmse': train_rmse,
'test_rmse': test_rmse,
'r2_score': r2,
'cv_mae': cv_mae,
'model': model
}
print(f" 测试集MAE: {test_mae:.2f} 分钟")
print(f" R²分数: {r2:.3f}")
print(f" 交叉验证MAE: {cv_mae:.2f} 分钟")
print()
# 选择最佳模型(基于交叉验证分数)
best_model_name = min(results, key=lambda x: results[x]['cv_mae'])
self.best_model = results[best_model_name]['model']
self.feature_names = X.columns.tolist()
print(f"最佳模型: {best_model_name}")
return results
def predict_duration(self, X_new: pd.DataFrame) -> np.ndarray:
"""预测新数据的手术时长"""
if self.best_model is None:
raise ValueError("模型尚未训练,请先调用train_and_evaluate方法")
# 标准化特征
X_scaled = self.scaler.transform(X_new)
# 预测
predictions = self.best_model.predict(X_scaled)
return predictions
def save_model(self, filepath: str):
"""保存模型"""
if self.best_model is None:
raise ValueError("没有可保存的模型")
model_data = {
'model': self.best_model,
'scaler': self.scaler,
'feature_names': self.feature_names
}
joblib.dump(model_data, filepath)
print(f"模型已保存到: {filepath}")
def load_model(self, filepath: str):
"""加载模型"""
model_data = joblib.load(filepath)
self.best_model = model_data['model']
self.scaler = model_data['scaler']
self.feature_names = model_data['feature_names']
print(f"模型已从 {filepath} 加载")
class ScheduleConflictPredictor:
"""排期冲突预测器"""
def __init__(self):
self.model = RandomForestClassifier(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
def create_conflict_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""创建冲突相关特征"""
# 时间重叠特征
df['has_time_overlap'] = 0
# 检查同一手术室的时间重叠
for room_id in df['operating_room_id'].unique():
room_df = df[df['operating_room_id'] == room_id].copy()
room_df = room_df.sort_values('scheduled_start')
# 计算与前一个手术的间隔
room_df['prev_end'] = room_df['scheduled_end'].shift(1)
room_df['gap_minutes'] = (room_df['scheduled_start'] - room_df['prev_end']).dt.total_seconds() / 60
# 标记冲突(间隔小于30分钟)
df.loc[room_df.index, 'gap_to_prev'] = room_df['gap_minutes']
df.loc[room_df.index, 'has_short_gap'] = (room_df['gap_minutes'] < 30).astype(int)
# 医生时间冲突
df['surgeon_double_booked'] = 0
for surgeon_id in df['surgeon_id'].unique():
surgeon_df = df[df['surgeon_id'] == surgeon_id].copy()
surgeon_df = surgeon_df.sort_values('scheduled_start')
# 检查医生是否在同一时间段被安排了多个手术
for i in range(len(surgeon_df)):
current = surgeon_df.iloc[i]
others = surgeon_df.iloc[i+1:]
# 检查是否有重叠
overlap = others[
(others['scheduled_start'] < current['scheduled_end']) &
(others['scheduled_end'] > current['scheduled_start'])
]
if len(overlap) > 0:
df.loc[current.name, 'surgeon_double_booked'] = 1
# 设备冲突特征
df['equipment_conflict'] = 0
# 这里简化处理,实际需要检查设备是否在同一时间段被多个手术使用
return df
def prepare_conflict_labels(self, df: pd.DataFrame) -> pd.Series:
"""准备冲突标签(实际发生的冲突)"""
# 基于实际执行时间判断是否发生冲突
df['actual_duration'] = (pd.to_datetime(df['actual_end']) - pd.to_datetime(df['actual_start'])).dt.total_seconds() / 60
# 如果实际时长比预计时长多出30%以上,标记为冲突
df['has_conflict'] = ((df['actual_duration'] > df['estimated_duration'] * 1.3) |
(df['surgeon_double_booked'] == 1) |
(df['has_short_gap'] == 1)).astype(int)
return df['has_conflict']
# 使用示例
if __name__ == "__main__":
# 模拟数据
np.random.seed(42)
n_samples = 1000
# 生成模拟特征
X = pd.DataFrame({
'hour_of_day': np.random.randint(6, 18, n_samples),
'day_of_week': np.random.randint(0, 7, n_samples),
'surgery_type_encoded': np.random.choice([0, 0.5, 1], n_samples),
'priority_normalized': np.random.uniform(0, 1, n_samples),
'complexity_score': np.random.uniform(1, 10, n_samples),
'surgeon_efficiency': np.random.uniform(0.7, 1.0, n_samples),
'surgeon_rank_encoded': np.random.randint(1, 5, n_samples),
'duration_medium': np.random.randint(0, 2, n_samples),
'duration_long': np.random.randint(0, 2, n_samples),
})
# 生成目标变量(实际时长与预计时长的差异)
y = (
X['complexity_score'] * 5 +
X['priority_normalized'] * 10 -
X['surgeon_efficiency'] * 15 +
np.random.normal(0, 10, n_samples)
)
# 训练预测器
predictor = SurgeryDurationPredictor()
results = predictor.train_and_evaluate(X, y)
# 预测新数据
new_data = pd.DataFrame({
'hour_of_day': [10, 14],
'day_of_week': [1, 3],
'surgery_type_encoded': [0.5, 0],
'priority_normalized': [0.6, 0.2],
'complexity_score': [5, 3],
'surgeon_efficiency': [0.85, 0.9],
'surgeon_rank_encoded': [3, 2],
'duration_medium': [1, 0],
'duration_long': [0, 1],
})
predictions = predictor.predict_duration(new_data)
print(f"预测时长差异: {predictions} 分钟")
# 保存模型
predictor.save_model('surgery_duration_predictor.pkl')
4. 排期优化算法
排期优化是系统的核心,需要考虑多个约束条件:
- 手术室容量
- 医生可用性
- 设备需求
- 手术优先级
- 预计时长
# scheduler.py
import pulp
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
class SurgeryScheduler:
"""手术室排期优化器"""
def __init__(self, operating_rooms: List[Dict], surgeons: List[Dict]):
self.operating_rooms = operating_rooms
self.surgeons = surgeons
self.scheduled_surgeries = []
def create_optimization_model(self, surgeries: List[Dict], date: datetime) -> pulp.LpProblem:
"""创建排期优化模型"""
# 创建问题
prob = pulp.LpProblem("Surgery_Scheduling", pulp.LpMinimize)
# 决策变量:手术i分配到手术室j的开始时间
# 由于时间是连续的,我们将其离散化为30分钟的时间槽
time_slots = range(6, 19) # 从6:00到19:00,每30分钟一个槽
room_ids = [room['id'] for room in self.operating_rooms]
surgery_ids = [surgery['id'] for surgery in surgeries]
# x[i][j][t] = 1 表示手术i在手术室j的t时间槽开始
x = pulp.LpVariable.dicts(
"x",
((i, j, t) for i in surgery_ids for j in room_ids for t in time_slots),
cat='Binary'
)
# y[i][j] = 1 表示手术i分配到手术室j
y = pulp.LpVariable.dicts(
"y",
((i, j) for i in surgery_ids for j in room_ids),
cat='Binary'
)
# 目标函数:最小化总完成时间 + 惩罚项
# 1. 最小化手术完成时间
total_completion_time = pulp.lpSum([
x[i, j, t] * (t + surgeries[surgery_ids.index(i)]['estimated_duration'] / 30)
for i in surgery_ids for j in room_ids for t in time_slots
])
# 2. 惩罚急诊手术延迟
emergency_penalty = pulp.lpSum([
x[i, j, t] * t * (1 if surgeries[surgery_ids.index(i)]['surgery_type'] == 'emergency' else 0)
for i in surgery_ids for j in room_ids for t in time_slots
])
# 3. 惩罚医生工作时间过长
surgeon_workload_penalty = pulp.lpSum([
x[i, j, t] * (1 if surgeries[surgery_ids.index(i)]['estimated_duration'] > 120 else 0)
for i in surgery_ids for j in room_ids for t in time_slots
])
prob += total_completion_time + 2 * emergency_penalty + 1.5 * surgeon_workload_penalty
# 约束条件
# 1. 每个手术必须分配到一个手术室
for i in surgery_ids:
prob += pulp.lpSum([y[i, j] for j in room_ids]) == 1
# 2. 每个手术必须在某个时间槽开始
for i in surgery_ids:
prob += pulp.lpSum([x[i, j, t] for j in room_ids for t in time_slots]) == 1
# 3. 手术分配一致性:如果手术i在手术室j开始,则y[i,j]必须为1
for i in surgery_ids:
for j in room_ids:
prob += pulp.lpSum([x[i, j, t] for t in time_slots]) <= y[i, j] * 100
# 4. 手术室容量约束:同一时间一个手术室只能进行一个手术
for j in room_ids:
for t in time_slots:
# 获取在时间槽t正在进行的所有手术
overlapping_surgeries = []
for i in surgery_ids:
duration_slots = int(np.ceil(surgeries[surgery_ids.index(i)]['estimated_duration'] / 30))
for t2 in range(t, t + duration_slots):
if t2 in time_slots:
overlapping_surgeries.append(x[i, j, t2])
if overlapping_surgeries:
prob += pulp.lpSum(overlapping_surgeries) <= 1
# 5. 医生时间冲突约束
for surgeon_id in set([s['surgeon_id'] for s in surgeries]):
surgeon_surgeries = [s['id'] for s in surgeries if s['surgeon_id'] == surgeon_id]
for t in time_slots:
for t2 in time_slots:
if t != t2:
for i1 in surgeon_surgeries:
for i2 in surgeon_surgeries:
if i1 != i2:
# 如果两个手术由同一医生进行,且时间重叠,则不能同时安排
duration1 = int(np.ceil(surgeries[surgery_ids.index(i1)]['estimated_duration'] / 30))
duration2 = int(np.ceil(surgeries[surgery_ids.index(i2)]['estimated_duration'] / 30))
# 检查时间重叠
if t2 in range(t, t + duration1) or t in range(t2, t2 + duration2):
for j1 in room_ids:
for j2 in room_ids:
prob += x[i1, j1, t] + x[i2, j2, t2] <= 1
# 6. 设备需求约束
for room in self.operating_rooms:
room_equipment = set(room['equipment'])
for surgery in surgeries:
required_equipment = set(surgery['required_equipment'])
# 如果手术室缺少所需设备,则不能分配
if not required_equipment.issubset(room_equipment):
for t in time_slots:
prob += x[surgery['id'], room['id'], t] == 0
# 7. 优先级约束:高优先级手术尽量安排在前面
for i in surgeries:
if i['priority'] >= 8: # 高优先级
# 约束:必须在12:00之前开始
prob += pulp.lpSum([x[i['id'], j, t] for j in room_ids for t in time_slots if t <= 12]) >= 0.5
return prob
def solve_and_schedule(self, surgeries: List[Dict], date: datetime) -> List[Dict]:
"""求解排期问题"""
# 创建优化模型
prob = self.create_optimization_model(surgeries, date)
# 求解
prob.solve(pulp.PULP_CBC_CMD(msg=1, timeLimit=300))
# 提取结果
scheduled_surgeries = []
for surgery in surgeries:
for room in self.operating_rooms:
for t in range(6, 19):
var_name = f"x_{surgery['id']}_{room['id']}_{t}"
if var_name in pulp.LpVariable.__dict__:
var = pulp.LpVariable.__dict__[var_name]
if var.varValue == 1:
start_time = date.replace(hour=t//2, minute=(t%2)*30)
duration = surgery['estimated_duration']
end_time = start_time + timedelta(minutes=duration)
scheduled_surgeries.append({
'surgery_id': surgery['id'],
'operating_room_id': room['id'],
'scheduled_start': start_time,
'scheduled_end': end_time,
'surgeon_id': surgery['surgeon_id'],
'status': 'scheduled'
})
self.scheduled_surgeries = scheduled_surgeries
return scheduled_surgeries
def generate_schedule_report(self) -> pd.DataFrame:
"""生成排期报告"""
if not self.scheduled_surgeries:
return pd.DataFrame()
df = pd.DataFrame(self.scheduled_surgeries)
df['scheduled_start'] = pd.to_datetime(df['scheduled_start'])
df['scheduled_end'] = pd.to_datetime(df['scheduled_end'])
# 按手术室和时间排序
df = df.sort_values(['operating_room_id', 'scheduled_start'])
# 计算统计信息
report = df.groupby('operating_room_id').agg({
'surgery_id': 'count',
'scheduled_start': 'min',
'scheduled_end': 'max'
}).rename(columns={
'surgery_id': '手术数量',
'scheduled_start': '最早开始',
'scheduled_end': '最晚结束'
})
# 计算总时长
df['duration'] = (df['scheduled_end'] - df['scheduled_start']).dt.total_seconds() / 60
duration_stats = df.groupby('operating_room_id')['duration'].agg(['sum', 'mean']).round(2)
report['总时长(分钟)'] = duration_stats['sum']
report['平均时长(分钟)'] = duration_stats['mean']
return report
# 使用示例
if __name__ == "__main__":
# 模拟手术室
operating_rooms = [
{'id': 'OR-01', 'name': '手术室1', 'equipment': ['设备A', '设备B', '设备C']},
{'id': 'OR-02', 'name': '手术室2', 'equipment': ['设备A', '设备D', '设备E']},
{'id': 'OR-03', 'name': '手术室3', 'equipment': ['设备B', '设备F']},
]
# 模拟医生
surgeons = [
{'id': 'SUR-01', 'name': 'Dr. Zhang', 'specialty': 'Cardio'},
{'id': 'SUR-02', 'name': 'Dr. Li', 'specialty': 'Ortho'},
{'id': 'SUR-03', 'name': 'Dr. Wang', 'specialty': 'Neuro'},
]
# 模拟待排期手术
surgeries = [
{
'id': 'S001',
'surgery_type': 'emergency',
'estimated_duration': 120,
'priority': 9,
'surgeon_id': 'SUR-01',
'required_equipment': ['设备A', '设备B']
},
{
'id': 'S002',
'surgery_type': 'elective',
'estimated_duration': 90,
'priority': 3,
'surgeon_id': 'SUR-02',
'required_equipment': ['设备A', '设备D']
},
{
'id': 'S003',
'surgery_type': 'urgent',
'estimated_duration': 180,
'priority': 7,
'surgeon_id': 'SUR-03',
'required_equipment': ['设备B', '设备F']
},
{
'id': 'S004',
'surgery_type': 'elective',
'estimated_duration': 60,
'priority': 2,
'surgeon_id': 'SUR-01',
'required_equipment': ['设备C']
},
{
'id': 'S005',
'surgery_type': 'elective',
'estimated_duration': 120,
'priority': 5,
'surgeon_id': 'SUR-02',
'required_equipment': ['设备A', '设备E']
}
]
# 创建调度器
scheduler = SurgeryScheduler(operating_rooms, surgeons)
# 执行排期
schedule_date = datetime(2024, 2, 1)
result = scheduler.solve_and_schedule(surgeries, schedule_date)
# 生成报告
report = scheduler.generate_schedule_report()
print("排期报告:")
print(report)
# 详细排期表
print("\n详细排期:")
for item in result:
print(f"{item['surgery_id']} | {item['operating_room_id']} | {item['scheduled_start'].strftime('%H:%M')} - {item['scheduled_end'].strftime('%H:%M')} | 医生: {item['surgeon_id']}")
5. Web API接口开发
使用FastAPI构建RESTful API,提供前端调用接口。
# main.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime, timedelta
import uvicorn
import joblib
import pandas as pd
import numpy as np
# 导入自定义模块
from data_preprocessing import SurgeryDataPreprocessor
from prediction_models import SurgeryDurationPredictor, SurgeryScheduler
from models import Surgery, SurgerySchedule, PredictionInput, OperatingRoom, Surgeon
app = FastAPI(
title="医院手术室排期预测系统API",
description="提供手术时长预测、智能排期和资源管理功能",
version="1.0.0"
)
# 配置CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 全局变量(实际项目中应使用数据库)
prediction_model = None
scheduler = None
@app.on_event("startup")
async def load_models():
"""启动时加载模型"""
global prediction_model, scheduler
# 加载预测模型
try:
prediction_model = SurgeryDurationPredictor()
prediction_model.load_model('surgery_duration_predictor.pkl')
except:
print("警告: 预测模型未找到,将使用虚拟模型")
prediction_model = None
# 初始化调度器
operating_rooms = [
{'id': 'OR-01', 'name': '手术室1', 'equipment': ['设备A', '设备B', '设备C']},
{'id': 'OR-02', 'name': '手术室2', 'equipment': ['设备A', '设备D', '设备E']},
{'id': 'OR-03', 'name': '手术室3', 'equipment': ['设备B', '设备F']},
]
surgeons = [
{'id': 'SUR-01', 'name': 'Dr. Zhang', 'specialty': 'Cardio'},
{'id': 'SUR-02', 'name': 'Dr. Li', 'specialty': 'Ortho'},
{'id': 'SUR-03', 'name': 'Dr. Wang', 'specialty': 'Neuro'},
]
scheduler = SurgeryScheduler(operating_rooms, surgeons)
@app.get("/")
async def root():
return {"message": "医院手术室排期预测系统API", "version": "1.0.0"}
@app.post("/predict/duration")
async def predict_duration(input_data: PredictionInput):
"""预测手术时长差异"""
# 模拟特征数据(实际应从数据库获取)
features = pd.DataFrame({
'hour_of_day': [10, 14],
'day_of_week': [1, 3],
'surgery_type_encoded': [0.5, 0],
'priority_normalized': [0.6, 0.2],
'complexity_score': [5, 3],
'surgeon_efficiency': [0.85, 0.9],
'surgeon_rank_encoded': [3, 2],
'duration_medium': [1, 0],
'duration_long': [0, 1],
})
if prediction_model:
predictions = prediction_model.predict_duration(features)
else:
# 虚拟预测
predictions = np.array([15.2, -5.8])
return {
"predictions": predictions.tolist(),
"unit": "分钟",
"description": "预测的实际时长与预计时长的差异(正值表示实际更长)"
}
@app.post("/schedule/optimize")
async def optimize_schedule(surgeries: List[Surgery]):
"""优化排期"""
if not scheduler:
raise HTTPException(status_code=500, detail="调度器未初始化")
# 转换数据格式
surgery_dicts = [s.dict() for s in surgeries]
# 执行排期
schedule_date = datetime.now().date()
result = scheduler.solve_and_schedule(surgery_dicts, schedule_date)
# 生成报告
report = scheduler.generate_schedule_report()
return {
"schedule": result,
"report": report.to_dict() if not report.empty else {},
"status": "success"
}
@app.get("/resources/availability")
async def get_resource_availability(date: str):
"""获取资源可用性"""
# 模拟数据
availability = {
"operating_rooms": [
{"id": "OR-01", "available": True, "next_available": "09:00"},
{"id": "OR-02", "available": False, "next_available": "14:00"},
{"id": "OR-03", "available": True, "next_available": "08:30"},
],
"surgeons": [
{"id": "SUR-01", "available": True, "workload": "medium"},
{"id": "SUR-02", "available": True, "workload": "low"},
{"id": "SUR-03", "available": False, "workload": "high"},
]
}
return availability
@app.post("/surgery/add")
async def add_surgery(surgery: Surgery):
"""添加新手术"""
# 这里应该将手术保存到数据库
# 模拟保存成功
return {
"status": "success",
"message": "手术已添加",
"surgery_id": surgery.id,
"estimated_duration": surgery.estimated_duration
}
@app.get("/analytics/summary")
async def get_analytics_summary():
"""获取统计摘要"""
# 模拟数据
summary = {
"today_surgeries": 12,
"operating_room_utilization": 0.85,
"avg_surgery_duration": 95.5,
"emergency_surgeries": 3,
"predicted_delays": 2,
"resource_conflicts": 0
}
return summary
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
6. 前端界面示例
使用Vue.js构建简单的前端界面。
<!-- index.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>医院手术室排期预测系统</title>
<script src="https://cdn.jsdelivr.net/npm/vue@2.6.14/dist/vue.js"></script>
<script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
<style>
body {
font-family: 'Microsoft YaHei', Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.header {
text-align: center;
margin-bottom: 30px;
padding-bottom: 20px;
border-bottom: 2px solid #4CAF50;
}
.header h1 {
color: #2c3e50;
margin: 0;
}
.section {
margin-bottom: 25px;
padding: 15px;
background: #fafafa;
border-radius: 5px;
border-left: 4px solid #4CAF50;
}
.section h2 {
color: #2c3e50;
margin-top: 0;
}
.form-group {
margin-bottom: 15px;
}
label {
display: block;
margin-bottom: 5px;
font-weight: bold;
color: #555;
}
input, select, textarea {
width: 100%;
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
box-sizing: border-box;
}
button {
background: #4CAF50;
color: white;
padding: 10px 20px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 14px;
margin-right: 10px;
}
button:hover {
background: #45a049;
}
button.secondary {
background: #2196F3;
}
button.secondary:hover {
background: #0b7dda;
}
.result-box {
background: #e8f5e9;
padding: 15px;
border-radius: 5px;
margin-top: 15px;
border: 1px solid #4CAF50;
}
.error-box {
background: #ffebee;
padding: 15px;
border-radius: 5px;
margin-top: 15px;
border: 1px solid #f44336;
color: #c62828;
}
.loading {
text-align: center;
color: #666;
padding: 20px;
}
table {
width: 100%;
border-collapse: collapse;
margin-top: 10px;
}
th, td {
padding: 10px;
text-align: left;
border-bottom: 1px solid #ddd;
}
th {
background: #4CAF50;
color: white;
}
tr:hover {
background: #f5f5f5;
}
.badge {
display: inline-block;
padding: 3px 8px;
border-radius: 12px;
font-size: 12px;
font-weight: bold;
}
.badge-emergency { background: #f44336; color: white; }
.badge-elective { background: #2196F3; color: white; }
.badge-urgent { background: #FF9800; color: white; }
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-top: 15px;
}
.stat-card {
background: white;
padding: 15px;
border-radius: 5px;
border: 1px solid #ddd;
text-align: center;
}
.stat-value {
font-size: 24px;
font-weight: bold;
color: #4CAF50;
}
.stat-label {
font-size: 12px;
color: #666;
margin-top: 5px;
}
</style>
</head>
<body>
<div id="app" class="container">
<div class="header">
<h1>🏥 医院手术室排期预测系统</h1>
<p>智能排期 · 预测分析 · 资源优化</p>
</div>
<!-- 统计概览 -->
<div class="section">
<h2>📊 今日概览</h2>
<div class="stats-grid" v-if="summary">
<div class="stat-card">
<div class="stat-value">{{ summary.today_surgeries }}</div>
<div class="stat-label">今日手术数</div>
</div>
<div class="stat-card">
<div class="stat-value">{{ (summary.operating_room_utilization * 100).toFixed(0) }}%</div>
<div class="stat-label">手术室利用率</div>
</div>
<div class="stat-card">
<div class="stat-value">{{ summary.avg_surgery_duration }}min</div>
<div class="stat-label">平均时长</div>
</div>
<div class="stat-card">
<div class="stat-value">{{ summary.emergency_surgeries }}</div>
<div class="stat-label">急诊手术</div>
</div>
</div>
</div>
<!-- 手术时长预测 -->
<div class="section">
<h2>⏱️ 手术时长预测</h2>
<div class="form-group">
<label>手术类型</label>
<select v-model="predictionForm.surgeryType">
<option value="elective">择期手术</option>
<option value="urgent">限期手术</option>
<option value="emergency">急诊手术</option>
</select>
</div>
<div class="form-group">
<label>预计时长(分钟)</label>
<input type="number" v-model="predictionForm.estimatedDuration" placeholder="例如:120">
</div>
<div class="form-group">
<label>优先级(1-10)</label>
<input type="number" v-model="predictionForm.priority" min="1" max="10" placeholder="例如:5">
</div>
<div class="form-group">
<label>医生效率评分(0.7-1.0)</label>
<input type="number" v-model="predictionForm.efficiency" step="0.01" placeholder="例如:0.85">
</div>
<button @click="predictDuration" :disabled="loading.predict">预测时长</button>
<button class="secondary" @click="resetPrediction">重置</button>
<div v-if="predictionResult" class="result-box">
<h3>预测结果</h3>
<p><strong>预计实际时长:</strong> {{ predictionResult.actual_duration }} 分钟</p>
<p><strong>差异:</strong>
<span :style="{color: predictionResult.difference > 0 ? '#f44336' : '#4CAF50'}">
{{ predictionResult.difference > 0 ? '+' : '' }}{{ predictionResult.difference }} 分钟
</span>
</p>
<p><strong>建议:</strong> {{ predictionResult.suggestion }}</p>
</div>
</div>
<!-- 智能排期 -->
<div class="section">
<h2>📅 智能排期</h2>
<div class="form-group">
<label>排期日期</label>
<input type="date" v-model="scheduleForm.date">
</div>
<div class="form-group">
<label>待排期手术(每行一个,格式:ID,类型,时长,优先级,医生ID)</label>
<textarea v-model="scheduleForm.surgeries" rows="6"
placeholder="例如: S001,emergency,120,9,SUR-01 S002,elective,90,3,SUR-02"></textarea>
</div>
<button @click="optimizeSchedule" :disabled="loading.schedule">生成排期</button>
<button class="secondary" @click="resetSchedule">重置</button>
<div v-if="scheduleResult" class="result-box">
<h3>排期结果</h3>
<table v-if="scheduleResult.schedule.length > 0">
<thead>
<tr>
<th>手术ID</th>
<th>手术室</th>
<th>开始时间</th>
<th>结束时间</th>
<th>医生</th>
<th>类型</th>
</tr>
</thead>
<tbody>
<tr v-for="item in scheduleResult.schedule" :key="item.surgery_id">
<td>{{ item.surgery_id }}</td>
<td>{{ item.operating_room_id }}</td>
<td>{{ formatTime(item.scheduled_start) }}</td>
<td>{{ formatTime(item.scheduled_end) }}</td>
<td>{{ item.surgeon_id }}</td>
<td>
<span class="badge" :class="'badge-' + getSurgeryType(item.surgery_id)">
{{ getSurgeryType(item.surgery_id) }}
</span>
</td>
</tr>
</tbody>
</table>
<div v-if="scheduleResult.report" style="margin-top: 15px;">
<h4>手术室统计</h4>
<table>
<thead>
<tr>
<th>手术室</th>
<th>手术数量</th>
<th>总时长(分钟)</th>
<th>平均时长(分钟)</th>
</tr>
</thead>
<tbody>
<tr v-for="(stats, room) in scheduleResult.report" :key="room">
<td>{{ room }}</td>
<td>{{ stats.手术数量 }}</td>
<td>{{ stats['总时长(分钟)'] }}</td>
<td>{{ stats['平均时长(分钟)'] }}</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
<!-- 资源查询 -->
<div class="section">
<h2>🔍 资源可用性查询</h2>
<div class="form-group">
<label>查询日期</label>
<input type="date" v-model="resourceDate">
</div>
<button @click="checkResources" :disabled="loading.resources">查询资源</button>
<div v-if="resourceAvailability" class="result-box">
<h3>资源状态</h3>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;">
<div>
<h4>手术室</h4>
<table>
<thead>
<tr><th>ID</th><th>状态</th><th>下次可用</th></tr>
</thead>
<tbody>
<tr v-for="room in resourceAvailability.operating_rooms" :key="room.id">
<td>{{ room.id }}</td>
<td>
<span :style="{color: room.available ? '#4CAF50' : '#f44336'}">
{{ room.available ? '可用' : '占用' }}
</span>
</td>
<td>{{ room.next_available }}</td>
</tr>
</tbody>
</table>
</div>
<div>
<h4>医生</h4>
<table>
<thead>
<tr><th>ID</th><th>状态</th><th>工作量</th></tr>
</thead>
<tbody>
<tr v-for="surgeon in resourceAvailability.surgeons" :key="surgeon.id">
<td>{{ surgeon.id }}</td>
<td>
<span :style="{color: surgeon.available ? '#4CAF50' : '#f44336'}">
{{ surgeon.available ? '可用' : '忙碌' }}
</span>
</td>
<td>{{ surgeon.workload }}</td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
</div>
<!-- 添加手术 -->
<div class="section">
<h2>➕ 添加新手术</h2>
<div class="form-group">
<label>手术ID</label>
<input type="text" v-model="newSurgery.id" placeholder="例如:S006">
</div>
<div class="form-group">
<label>患者ID</label>
<input type="text" v-model="newSurgery.patient_id" placeholder="例如:P12345">
</div>
<div class="form-group">
<label>患者姓名</label>
<input type="text" v-model="newSurgery.patient_name" placeholder="例如:张三">
</div>
<div class="form-group">
<label>手术类型</label>
<select v-model="newSurgery.surgery_type">
<option value="elective">择期手术</option>
<option value="urgent">限期手术</option>
<option value="emergency">急诊手术</option>
</select>
</div>
<div class="form-group">
<label>预计时长(分钟)</label>
<input type="number" v-model="newSurgery.estimated_duration">
</div>
<div class="form-group">
<label>优先级(1-10)</label>
<input type="number" v-model="newSurgery.priority" min="1" max="10">
</div>
<div class="form-group">
<label>首选医生ID</label>
<input type="text" v-model="newSurgery.preferred_surgeon_id" placeholder="例如:SUR-01">
</div>
<div class="form-group">
<label>所需设备(用逗号分隔)</label>
<input type="text" v-model="newSurgery.required_equipment" placeholder="例如:设备A,设备B">
</div>
<button @click="addSurgery" :disabled="loading.add">添加手术</button>
<div v-if="addResult" class="result-box">
<p>{{ addResult }}</p>
</div>
<div v-if="addError" class="error-box">
{{ addError }}
</div>
</div>
<!-- 加载状态 -->
<div v-if="Object.values(loading).some(v => v)" class="loading">
<p>⏳ 正在处理,请稍候...</p>
</div>
</div>
<script>
new Vue({
el: '#app',
data: {
// 预测表单
predictionForm: {
surgeryType: 'elective',
estimatedDuration: 120,
priority: 5,
efficiency: 0.85
},
predictionResult: null,
// 排期表单
scheduleForm: {
date: new Date().toISOString().split('T')[0],
surgeries: ''
},
scheduleResult: null,
// 资源查询
resourceDate: new Date().toISOString().split('T')[0],
resourceAvailability: null,
// 添加手术
newSurgery: {
id: '',
patient_id: '',
patient_name: '',
surgery_type: 'elective',
estimated_duration: 60,
priority: 5,
preferred_surgeon_id: '',
required_equipment: ''
},
addResult: null,
addError: null,
// 统计摘要
summary: null,
// 加载状态
loading: {
predict: false,
schedule: false,
resources: false,
add: false
},
// API基础URL
apiBase: 'http://localhost:8000'
},
mounted() {
this.loadSummary();
},
methods: {
// 加载统计摘要
async loadSummary() {
try {
const response = await axios.get(`${this.apiBase}/analytics/summary`);
this.summary = response.data;
} catch (error) {
console.error('加载统计失败:', error);
}
},
// 预测手术时长
async predictDuration() {
this.loading.predict = true;
this.predictionResult = null;
try {
// 构建预测输入
const input = {
date_range_start: new Date().toISOString(),
date_range_end: new Date(Date.now() + 7*24*60*60*1000).toISOString(),
historical_data_points: 30,
surgical_specialties: []
};
const response = await axios.post(`${this.apiBase}/predict/duration`, input);
// 模拟计算实际结果(实际应根据模型输出)
const baseDuration = parseInt(this.predictionForm.estimatedDuration);
const efficiency = parseFloat(this.predictionForm.efficiency);
const priority = parseInt(this.predictionForm.priority);
// 简单的预测逻辑
let predictedDiff = (1 - efficiency) * 20 + (priority - 5) * 2;
if (this.predictionForm.surgeryType === 'emergency') {
predictedDiff += 10;
}
const actualDuration = Math.round(baseDuration + predictedDiff);
const difference = Math.round(predictedDiff);
this.predictionResult = {
actual_duration: actualDuration,
difference: difference,
suggestion: difference > 10 ?
'建议预留更多时间,可能存在延误风险' :
'预计时间充足,可按计划安排'
};
} catch (error) {
alert('预测失败:' + error.message);
} finally {
this.loading.predict = false;
}
},
// 智能排期
async optimizeSchedule() {
this.loading.schedule = true;
this.scheduleResult = null;
try {
// 解析手术数据
const surgeries = this.scheduleForm.surgeries.trim().split('\n')
.filter(line => line.trim())
.map(line => {
const [id, type, duration, priority, surgeonId] = line.split(',').map(s => s.trim());
return {
id,
surgery_type: type,
estimated_duration: parseInt(duration),
priority: parseInt(priority),
surgeon_id: surgeonId,
required_equipment: ['设备A', '设备B'] // 简化
};
});
if (surgeries.length === 0) {
alert('请输入手术数据');
this.loading.schedule = false;
return;
}
const response = await axios.post(`${this.apiBase}/schedule/optimize`, surgeries);
this.scheduleResult = response.data;
} catch (error) {
alert('排期失败:' + error.message);
} finally {
this.loading.schedule = false;
}
},
// 查询资源
async checkResources() {
this.loading.resources = true;
this.resourceAvailability = null;
try {
const response = await axios.get(
`${this.apiBase}/resources/availability?date=${this.resourceDate}`
);
this.resourceAvailability = response.data;
} catch (error) {
alert('资源查询失败:' + error.message);
} finally {
this.loading.resources = false;
}
},
// 添加手术
async addSurgery() {
this.loading.add = true;
this.addResult = null;
this.addError = null;
try {
const surgeryData = {
id: this.newSurgery.id,
patient_id: this.newSurgery.patient_id,
patient_name: this.newSurgery.patient_name,
surgery_type: this.newSurgery.surgery_type,
estimated_duration: parseInt(this.newSurgery.estimated_duration),
priority: parseInt(this.newSurgery.priority),
preferred_surgeon_id: this.newSurgery.preferred_surgeon_id,
required_equipment: this.newSurgery.required_equipment.split(',').map(s => s.trim()),
status: 'scheduled'
};
const response = await axios.post(`${this.apiBase}/surgery/add`, surgeryData);
this.addResult = response.data.message;
// 重置表单
this.resetAddForm();
} catch (error) {
this.addError = error.message;
} finally {
this.loading.add = false;
}
},
// 工具方法
resetPrediction() {
this.predictionForm = {
surgeryType: 'elective',
estimatedDuration: 120,
priority: 5,
efficiency: 0.85
};
this.predictionResult = null;
},
resetSchedule() {
this.scheduleForm.surgeries = '';
this.scheduleResult = null;
},
resetAddForm() {
this.newSurgery = {
id: '',
patient_id: '',
patient_name: '',
surgery_type: 'elective',
estimated_duration: 60,
priority: 5,
preferred_surgeon_id: '',
required_equipment: ''
};
},
formatTime(datetimeStr) {
const date = new Date(datetimeStr);
return date.toLocaleTimeString('zh-CN', {
hour: '2-digit',
minute: '2-digit',
hour12: false
});
},
getSurgeryType(surgeryId) {
// 简化处理,实际应从数据中获取
if (surgeryId.includes('S001')) return 'emergency';
if (surgeryId.includes('S002')) return 'elective';
if (surgeryId.includes('S003')) return 'urgent';
return 'elective';
}
}
});
</script>
</body>
</html>
部署与运维
Docker部署配置
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/surgery_db
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
volumes:
- .:/app
restart: unless-stopped
db:
image: postgres:13
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: surgery_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
redis:
image: redis:6-alpine
ports:
- "6379:6379"
frontend:
build:
context: ./frontend
dockerfile: Dockerfile
ports:
- "80:80"
depends_on:
- api
restart: unless-stopped
volumes:
postgres_data:
环境变量配置
# .env
DATABASE_URL=postgresql://user:password@localhost:5432/surgery_db
REDIS_URL=redis://localhost:6379
MODEL_PATH=./models/surgery_duration_predictor.pkl
LOG_LEVEL=INFO
API_KEY=your-secret-api-key
监控与日志
# logging_config.py
import logging
import sys
from pythonjsonlogger import jsonlogger
def setup_logging():
"""配置日志系统"""
# 创建日志格式
log_format = "%(asctime)s %(levelname)s %(name)s %(message)s"
# JSON格式(用于生产环境)
json_formatter = jsonlogger.JsonFormatter(
'%(asctime)s %(levelname)s %(name)s %(message)s %(pathname)s %(lineno)d'
)
# 控制台格式(用于开发环境)
console_formatter = logging.Formatter(log_format)
# 配置根日志器
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# 文件处理器(生产环境)
file_handler = logging.FileHandler('app.log')
file_handler.setFormatter(json_formatter)
logger.addHandler(file_handler)
return logger
# 使用示例
logger = setup_logging()
logger.info("系统启动", extra={"service": "surgery-scheduler"})
logger.error("数据库连接失败", extra={"error": "Connection timeout"})
测试策略
单元测试
# test_prediction.py
import pytest
import pandas as pd
import numpy as np
from prediction_models import SurgeryDurationPredictor
class TestSurgeryDurationPredictor:
def test_model_training(self):
"""测试模型训练"""
# 准备测试数据
X = pd.DataFrame({
'hour_of_day': [10, 14, 8],
'day_of_week': [1, 3, 5],
'surgery_type_encoded': [0.5, 0, 1],
'priority_normalized': [0.6, 0.2, 0.9],
'complexity_score': [5, 3, 8],
'surgeon_efficiency': [0.85, 0.9, 0.75],
'surgeon_rank_encoded': [3, 2, 4],
'duration_medium': [1, 0, 0],
'duration_long': [0, 1, 0],
})
y = pd.Series([15.2, -5.8, 25.3])
predictor = SurgeryDurationPredictor()
results = predictor.train_and_evaluate(X, y)
# 验证结果
assert 'random_forest' in results
assert 'gradient_boosting' in results
assert results['random_forest']['r2_score'] > 0.5 # R²应该大于0.5
def test_prediction(self):
"""测试预测功能"""
# 训练模型
X = pd.DataFrame({
'hour_of_day': [10, 14],
'day_of_week': [1, 3],
'surgery_type_encoded': [0.5, 0],
'priority_normalized': [0.6, 0.2],
'complexity_score': [5, 3],
'surgeon_efficiency': [0.85, 0.9],
'surgeon_rank_encoded': [3, 2],
'duration_medium': [1, 0],
'duration_long': [0, 1],
})
y = pd.Series([15.2, -5.8])
predictor = SurgeryDurationPredictor()
predictor.train_and_evaluate(X, y)
# 预测新数据
new_data = pd.DataFrame({
'hour_of_day': [12],
'day_of_week': [2],
'surgery_type_encoded': [0.5],
'priority_normalized': [0.5],
'complexity_score': [4],
'surgeon_efficiency': [0.88],
'surgeon_rank_encoded': [3],
'duration_medium': [1],
'duration_long': [0],
})
prediction = predictor.predict_duration(new_data)
assert len(prediction) == 1
assert isinstance(prediction[0], (int, float, np.number))
# 运行测试
# pytest test_prediction.py -v
集成测试
# test_integration.py
import pytest
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
def test_api_health():
"""测试API健康检查"""
response = client.get("/")
assert response.status_code == 200
assert "医院手术室排期预测系统" in response.json()["message"]
def test_predict_duration_api():
"""测试时长预测API"""
payload = {
"date_range_start": "2024-01-01T00:00:00",
"date_range_end": "2024-01-31T23:59:59",
"historical_data_points": 30,
"surgical_specialties": []
}
response = client.post("/predict/duration", json=payload)
assert response.status_code == 200
data = response.json()
assert "predictions" in data
assert "unit" in data
def test_schedule_optimization_api():
"""测试排期优化API"""
payload = [
{
"id": "S001",
"patient_id": "P001",
"patient_name": "张三",
"surgery_type": "emergency",
"estimated_duration": 120,
"priority": 9,
"preferred_surgeon_id": "SUR-01",
"required_equipment": ["设备A", "设备B"],
"status": "scheduled"
}
]
response = client.post("/schedule/optimize", json=payload)
assert response.status_code == 200
data = response.json()
assert "schedule" in data
assert "report" in data
# 运行集成测试
# pytest test_integration.py -v
总结与最佳实践
关键成功因素
- 数据质量:确保历史数据的准确性和完整性,这是模型训练的基础
- 特征工程:深入理解业务逻辑,提取有意义的特征
- 模型迭代:定期重新训练模型,适应业务变化
- 用户体验:界面简洁直观,操作流程符合医护人员习惯
- 系统稳定性:确保7x24小时稳定运行,有完善的监控和备份机制
性能优化建议
- 缓存策略:对频繁查询的数据使用Redis缓存
- 异步处理:耗时的预测任务使用Celery异步处理
- 数据库索引:为常用查询字段建立索引
- 负载均衡:高并发场景下使用Nginx做反向代理
安全考虑
- 数据加密:敏感数据传输使用HTTPS,存储时加密
- 权限控制:基于角色的访问控制(RBAC)
- 操作审计:记录所有关键操作日志
- 数据备份:定期备份数据库,防止数据丢失
未来扩展方向
- 多院区协同:支持多院区资源统一调度
- 实时调整:根据手术实际进展动态调整后续排期
- 成本优化:考虑手术成本和医保报销因素
- AI辅助诊断:集成AI辅助诊断结果,优化排期决策
通过本指南,您应该能够理解并实现一个完整的医院手术室排期预测系统。该系统不仅能提高医院运营效率,还能通过数据驱动的方式优化医疗资源配置,最终提升患者就医体验。
