引言:科研设备排期管理的挑战与机遇

在现代科研环境中,实验设备是推动科学发现的核心资源。然而,设备资源的有限性与实验需求的无限性之间存在着天然的矛盾。传统的设备排期管理往往依赖于人工经验或简单的先到先得机制,这种方式在面对复杂的科研项目时显得力不从心。实验延误、资源冲突、设备闲置等问题不仅浪费了宝贵的科研经费,更可能延误重大科学突破的时机。

随着大数据和人工智能技术的发展,利用数据预测来优化设备排期管理已成为可能。通过分析历史使用数据、实验特征和外部因素,我们可以建立预测模型,提前识别潜在的资源冲突风险,智能推荐最优排期方案,从而显著提高设备利用率和科研效率。本文将深入探讨如何构建这样一个数据驱动的设备排期预测管理系统。

一、数据基础:构建预测模型的基石

1.1 需要收集的核心数据类型

要建立有效的预测模型,首先需要系统性地收集多维度数据。这些数据可以分为以下几类:

设备基础数据

  • 设备规格参数(如分辨率、灵敏度、处理能力等)
  • 设备维护记录和故障历史
  • 设备使用年限和性能衰减曲线
  • 设备适用的实验类型和限制条件

实验项目数据

  • 实验类型(如细胞培养、材料表征、化学合成等)
  • 预计实验时长(基于历史数据或理论计算)
  • 实验复杂度等级
  • 样品数量和处理批次
  • 实验所需的前置条件(如预处理时间、环境要求等)

用户行为数据

  • 研究人员的历史预约记录
  • 实际使用时长与预约时长的偏差
  • 取消预约的频率和原因
  • 实验成功率和重复率

时间序列数据

  • 设备使用的时间分布(小时、天、周、月、季度)
  • 季节性波动(如学期末、基金申请季等)
  • 节假日影响
  • 大型会议或活动期间的使用模式

外部因素数据

  • 科研项目周期和里程碑
  • 经费到账时间
  • 人员招聘和流动情况
  • 合作单位的实验需求

1.2 数据收集的技术实现

以下是一个Python代码示例,展示如何设计一个数据收集和预处理的框架:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sqlite3

class EquipmentDataCollector:
    def __init__(self, db_path):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
    
    def collect_device_usage_data(self, start_date, end_date):
        """
        收集设备使用数据
        """
        query = """
        SELECT 
            device_id,
            device_name,
            user_id,
            reservation_time,
            actual_start_time,
            actual_end_time,
            experiment_type,
            status
        FROM equipment_reservations
        WHERE reservation_time BETWEEN ? AND ?
        """
        df = pd.read_sql_query(query, self.conn, 
                              params=[start_date, end_date])
        
        # 计算实际使用时长
        df['actual_duration'] = (pd.to_datetime(df['actual_end_time']) - 
                                pd.to_datetime(df['actual_start_time'])).dt.total_seconds() / 3600
        
        # 计算预约偏差
        df['duration_variance'] = df['actual_duration'] - df['planned_duration']
        
        return df
    
    def collect_device_maintenance_data(self):
        """
        收集设备维护数据
        """
        query = """
        SELECT 
            device_id,
            maintenance_date,
            maintenance_type,
            downtime_hours,
            cost
        FROM maintenance_records
        """
        return pd.read_sql_query(query, self.conn)
    
    def collect_user_behavior_data(self):
        """
        收集用户行为数据
        """
        query = """
        SELECT 
            user_id,
            COUNT(*) as total_reservations,
            AVG(CASE WHEN status='cancelled' THEN 1 ELSE 0 END) as cancellation_rate,
            AVG(duration_variance) as avg_variance
        FROM equipment_reservations
        GROUP BY user_id
        """
        return pd.read_sql_query(query, self.conn)

# 使用示例
collector = EquipmentDataCollector('equipment.db')
usage_data = collector.collect_device_usage_data('2023-01-01', '2023-12-31')
maintenance_data = collector.collect_device_maintenance_data()
user_data = collector.collect_user_behavior_data()

1.3 数据清洗与特征工程

原始数据往往包含噪声和缺失值,需要进行系统性的清洗和特征工程:

class DataPreprocessor:
    def __init__(self):
        self.feature_columns = []
    
    def clean_usage_data(self, df):
        """
        清洗设备使用数据
        """
        # 移除异常值(使用时长超过24小时或小于0)
        df = df[(df['actual_duration'] > 0) & (df['actual_duration'] <= 24)]
        
        # 填充缺失值
        df['actual_duration'].fillna(df['planned_duration'], inplace=True)
        
        # 标准化时间特征
        df['reservation_hour'] = pd.to_datetime(df['reservation_time']).dt.hour
        df['reservation_dow'] = pd.to_datetime(df['reservation_time']).dt.dayofweek
        df['reservation_month'] = pd.to_datetime(df['reservation_time']).dt.month
        
        return df
    
    def create_features(self, df):
        """
        创建预测特征
        """
        features = pd.DataFrame()
        
        # 时间特征
        features['hour'] = df['reservation_hour']
        features['day_of_week'] = df['reservation_dow']
        features['month'] = df['reservation_month']
        
        # 设备特征
        features['device_age'] = df['device_age']
        features['device_utilization_rate'] = df['device_utilization_rate']
        
        # 用户特征
        features['user_experience_level'] = df['user_experience_level']
        features['user_cancellation_rate'] = df['user_cancellation_rate']
        
        # 实验特征
        features['experiment_complexity'] = df['experiment_complexity']
        features['is_weekend'] = df['reservation_dow'].isin([5, 6]).astype(int)
        
        # 历史统计特征
        features['device_avg_duration'] = df['device_avg_duration']
        features['user_avg_duration'] = df['user_avg_duration']
        
        self.feature_columns = features.columns.tolist()
        return features
    
    def create_target_variable(self, df):
        """
        创建目标变量:是否发生冲突或延误
        """
        # 冲突定义:实际使用时间与预约时间重叠
        # 延误定义:实际开始时间晚于预约开始时间超过30分钟
        df['conflict'] = ((df['overlap_count'] > 0) | 
                         (df['delay_minutes'] > 30)).astype(int)
        return df['conflict']

# 使用示例
preprocessor = DataPreprocessor()
cleaned_data = preprocessor.clean_usage_data(usage_data)
features = preprocessor.create_features(cleaned_data)
target = preprocessor.create_target_variable(cleaned_data)

二、预测模型构建:从理论到实践

2.1 模型选择与设计

针对设备排期预测,我们需要解决两个核心问题:

  1. 冲突预测:预测某个时间段是否会发生资源冲突
  2. 时长预测:预测实验实际需要的时间,以便更合理地安排后续实验

对于冲突预测,这是一个典型的二分类问题,可以使用以下模型:

  • 逻辑回归:简单、可解释性强,适合作为基线模型
  • 随机森林:能处理非线性关系,特征重要性分析直观
  • XGBoost/LightGBM:性能优异,适合大规模数据
  • 神经网络:当数据量足够大且特征复杂时表现更好

对于时长预测,这是一个回归问题,可以使用:

  • 线性回归:简单基准
  • 梯度提升树:能捕捉复杂的非线性关系
  • 时间序列模型(ARIMA/LSTM):如果数据有明显的时间依赖性

2.2 模型实现代码

以下是一个完整的模型构建示例:

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor
from sklearn.metrics import classification_report, confusion_matrix, mean_squared_error
import xgboost as xgb
import joblib

class EquipmentSchedulingPredictor:
    def __init__(self):
        self.conflict_model = None
        self.duration_model = None
        self.feature_importance = None
    
    def prepare_training_data(self, features, target, duration_target):
        """
        准备训练数据
        """
        # 分割数据集
        X_train, X_test, y_train, y_test = train_test_split(
            features, target, test_size=0.2, random_state=42, stratify=target
        )
        
        # 对于时长预测,需要相同的分割
        _, _, y_train_dur, y_test_dur = train_test_split(
            features, duration_target, test_size=0.2, random_state=42
        )
        
        return X_train, X_test, y_train, y_test, y_train_dur, y_test_dur
    
    def train_conflict_model(self, X_train, y_train):
        """
        训练冲突预测模型
        """
        # 使用XGBoost
        self.conflict_model = xgb.XGBClassifier(
            n_estimators=200,
            max_depth=6,
            learning_rate=0.1,
            subsample=0.8,
            colsample_bytree=0.8,
            random_state=42,
            eval_metric='logloss'
        )
        
        # 交叉验证
        cv_scores = cross_val_score(self.conflict_model, X_train, y_train, 
                                   cv=5, scoring='f1')
        print(f"Cross-validation F1 scores: {cv_scores}")
        print(f"Mean F1: {cv_scores.mean():.4f}")
        
        # 训练模型
        self.conflict_model.fit(X_train, y_train)
        
        # 获取特征重要性
        self.feature_importance = pd.DataFrame({
            'feature': X_train.columns,
            'importance': self.conflict_model.feature_importances_
        }).sort_values('importance', ascending=False)
        
        return self.conflict_model
    
    def train_duration_model(self, X_train, y_train):
        """
        训练时长预测模型
        """
        self.duration_model = GradientBoostingRegressor(
            n_estimators=150,
            max_depth=5,
            learning_rate=0.1,
            random_state=42
        )
        
        self.duration_model.fit(X_train, y_train)
        return self.duration_model
    
    def evaluate_models(self, X_test, y_test, y_test_dur):
        """
        评估模型性能
        """
        # 冲突模型评估
        y_pred = self.conflict_model.predict(X_test)
        y_pred_proba = self.conflict_model.predict_proba(X_test)[:, 1]
        
        print("=== 冲突预测模型评估 ===")
        print(classification_report(y_test, y_pred))
        
        # 时长模型评估
        y_pred_dur = self.duration_model.predict(X_test)
        mse = mean_squared_error(y_test_dur, y_pred_dur)
        rmse = np.sqrt(mse)
        print(f"\n=== 时长预测模型评估 ===")
        print(f"RMSE: {rmse:.2f} 小时")
        
        return {
            'conflict_report': classification_report(y_test, y_pred, output_dict=True),
            'duration_rmse': rmse
        }
    
    def predict_scheduling_risk(self, new_reservation_features):
        """
        预测新预约的风险
        """
        if self.conflict_model is None or self.duration_model is None:
            raise ValueError("Models not trained yet")
        
        # 预测冲突概率
        conflict_prob = self.conflict_model.predict_proba(
            new_reservation_features
        )[:, 1]
        
        # 预测实际时长
        predicted_duration = self.duration_model.predict(
            new_reservation_features
        )
        
        return {
            'conflict_probability': conflict_prob,
            'predicted_duration': predicted_duration,
            'risk_level': ['低' if p < 0.3 else '中' if p < 0.7 else '高' 
                          for p in conflict_prob]
        }

# 使用示例
predictor = EquipmentSchedulingPredictor()

# 准备数据
X_train, X_test, y_train, y_test, y_train_dur, y_test_dur = \
    predictor.prepare_training_data(features, target, cleaned_data['actual_duration'])

# 训练模型
predictor.train_conflict_model(X_train, y_train)
predictor.train_duration_model(X_train, y_train_dur)

# 评估
results = predictor.evaluate_models(X_test, y_test, y_test_dur)

# 预测新预约
new_reservation = pd.DataFrame({
    'hour': [14],
    'day_of_week': [2],
    'month': [6],
    'device_age': [5],
    'device_utilization_rate': [0.75],
    'user_experience_level': [2],
    'user_cancellation_rate': [0.1],
    'experiment_complexity': [3],
    'is_weekend': [0],
    'device_avg_duration': [2.5],
    'user_avg_duration': [3.0]
})

risk_assessment = predictor.predict_scheduling_risk(new_reservation)
print(f"\n风险评估结果: {risk_assessment}")

2.3 模型解释与优化

模型训练完成后,需要进行解释和优化:

def analyze_model_performance(predictor, X_test, y_test):
    """
    深入分析模型性能
    """
    # 特征重要性可视化
    import matplotlib.pyplot as plt
    import seaborn as sns
    
    plt.figure(figsize=(10, 6))
    sns.barplot(data=predictor.feature_importance.head(10), 
                x='importance', y='feature')
    plt.title('Top 10 Feature Importances')
    plt.tight_layout()
    plt.show()
    
    # 混淆矩阵分析
    from sklearn.metrics import confusion_matrix
    
    y_pred = predictor.conflict_model.predict(X_test)
    cm = confusion_matrix(y_test, y_pred)
    
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.show()
    
    # 阈值调整
    from sklearn.metrics import precision_recall_curve
    
    y_pred_proba = predictor.conflict_model.predict_proba(X_test)[:, 1]
    precision, recall, thresholds = precision_recall_curve(y_test, y_pred_proba)
    
    # 找到最佳阈值(F1分数最大)
    f1_scores = 2 * (precision * recall) / (precision + recall)
    best_threshold = thresholds[np.argmax(f1_scores)]
    
    print(f"Optimal threshold: {best_threshold:.3f}")
    print(f"Best F1: {np.max(f1_scores):.3f}")
    
    return best_threshold

# 调整预测阈值
best_threshold = analyze_model_performance(predictor, X_test, y_test)

三、智能排期优化算法

3.1 优化目标与约束

基于预测模型,我们可以构建智能排期优化系统。优化目标通常包括:

  • 最小化总冲突概率
  • 最大化设备利用率
  • 最小化实验延误
  • 平衡用户满意度

约束条件包括:

  • 设备可用时间窗口
  • 实验的先后依赖关系
  • 用户的特殊要求(如特定时间段)
  • 维护和校准时间

3.2 遗传算法实现智能排期

以下是一个使用遗传算法进行智能排期的实现:

import random
from typing import List, Dict, Tuple
import numpy as np

class GeneticScheduler:
    def __init__(self, equipment_pool, reservation_requests, predictor):
        self.equipment_pool = equipment_pool  # 设备列表
        self.requests = reservation_requests  # 预约请求
        self.predictor = predictor  # 预测模型
        self.population_size = 50
        self.generations = 100
        self.mutation_rate = 0.1
    
    def chromosome_to_schedule(self, chromosome):
        """
        将染色体解码为排期方案
        chromosome: [设备ID, 开始时间, 时长, 设备ID, 开始时间, 时长, ...]
        """
        schedule = []
        for i in range(0, len(chromosome), 3):
            if i + 2 < len(chromosome):
                schedule.append({
                    'request_id': i // 3,
                    'equipment_id': chromosome[i],
                    'start_time': chromosome[i+1],
                    'duration': chromosome[i+2]
                })
        return schedule
    
    def calculate_fitness(self, chromosome):
        """
        计算适应度分数(越低越好)
        """
        schedule = self.chromosome_to_schedule(chromosome)
        total_cost = 0
        
        for slot in schedule:
            # 获取特征
            features = self.get_features_for_slot(slot)
            
            # 预测冲突概率
            conflict_prob = self.predictor.conflict_model.predict_proba(
                features.reshape(1, -1)
            )[0][1]
            
            # 预测时长
            pred_duration = self.predictor.duration_model.predict(
                features.reshape(1, -1)
            )[0]
            
            # 计算成本
            # 1. 冲突成本
            conflict_cost = conflict_prob * 1000
            
            # 2. 延误成本(预测时长与实际安排的偏差)
            duration_diff = abs(slot['duration'] - pred_duration)
            duration_cost = duration_diff * 50
            
            # 3. 设备利用率成本(避免过短或过长的间隔)
            utilization_cost = self.calculate_utilization_cost(slot)
            
            total_cost += conflict_cost + duration_cost + utilization_cost
        
        # 惩罚冲突(同一设备同一时间段被多次预约)
        equipment_time_map = {}
        for slot in schedule:
            key = (slot['equipment_id'], slot['start_time'])
            if key in equipment_time_map:
                total_cost += 10000  # 严重惩罚
            else:
                equipment_time_map[key] = slot
        
        return total_cost
    
    def get_features_for_slot(self, slot):
        """
        获取排期槽的特征
        """
        # 这里需要根据实际数据结构来实现
        # 示例:返回一个特征向量
        request = self.requests[slot['request_id']]
        
        features = np.array([
            slot['start_time'] % 24,  # 小时
            (slot['start_time'] // 24) % 7,  # 星期
            request['device_age'],
            request['device_utilization_rate'],
            request['user_experience_level'],
            request['user_cancellation_rate'],
            request['experiment_complexity'],
            0 if (slot['start_time'] // 24) % 7 in [5, 6] else 1,  # 是否工作日
            request['device_avg_duration'],
            request['user_avg_duration']
        ])
        
        return features
    
    def calculate_utilization_cost(self, slot):
        """
        计算设备利用率成本
        """
        # 理想时长为4小时,过短或过长都有成本
        ideal_duration = 4.0
        duration_penalty = abs(slot['duration'] - ideal_duration) * 10
        
        # 时间段偏好(避免过早或过晚)
        hour = slot['start_time'] % 24
        if hour < 8 or hour > 20:
            time_penalty = 50
        elif hour < 9 or hour > 18:
            time_penalty = 20
        else:
            time_penalty = 0
        
        return duration_penalty + time_penalty
    
    def initialize_population(self):
        """
        初始化种群
        """
        population = []
        for _ in range(self.population_size):
            chromosome = []
            for i, request in enumerate(self.requests):
                # 随机选择设备
                equipment_id = random.choice(self.equipment_pool)
                # 随机选择开始时间(在可用窗口内)
                start_time = random.randint(0, 167)  # 一周内
                # 随机选择时长(2-8小时)
                duration = random.uniform(2.0, 8.0)
                
                chromosome.extend([equipment_id, start_time, duration])
            population.append(chromosome)
        return population
    
    def selection(self, population, fitness_scores):
        """
        选择操作(锦标赛选择)
        """
        selected = []
        for _ in range(len(population)):
            # 随机选择3个个体,取最优
            candidates = random.sample(list(zip(population, fitness_scores)), 3)
            best = min(candidates, key=lambda x: x[1])[0]
            selected.append(best)
        return selected
    
    def crossover(self, parent1, parent2):
        """
        交叉操作(单点交叉)
        """
        if len(parent1) < 2:
            return parent1, parent2
        
        point = random.randint(1, len(parent1) - 1)
        child1 = parent1[:point] + parent2[point:]
        child2 = parent2[:point] + parent1[point:]
        return child1, child2
    
    def mutation(self, chromosome):
        """
        变异操作
        """
        for i in range(len(chromosome)):
            if random.random() < self.mutation_rate:
                if i % 3 == 0:  # 设备ID
                    chromosome[i] = random.choice(self.equipment_pool)
                elif i % 3 == 1:  # 开始时间
                    chromosome[i] = random.randint(0, 167)
                else:  # 时长
                    chromosome[i] = random.uniform(2.0, 8.0)
        return chromosome
    
    def evolve(self):
        """
        执行遗传算法
        """
        # 初始化种群
        population = self.initialize_population()
        
        best_solution = None
        best_fitness = float('inf')
        
        for generation in range(self.generations):
            # 计算适应度
            fitness_scores = [self.calculate_fitness(chrom) for chrom in population]
            
            # 更新最佳解
            min_fitness = min(fitness_scores)
            if min_fitness < best_fitness:
                best_fitness = min_fitness
                best_solution = population[np.argmin(fitness_scores)]
            
            # 选择
            selected = self.selection(population, fitness_scores)
            
            # 交叉和变异
            new_population = []
            for i in range(0, len(selected), 2):
                if i + 1 < len(selected):
                    child1, child2 = self.crossover(selected[i], selected[i+1])
                    new_population.append(self.mutation(child1))
                    new_population.append(self.mutation(child2))
                else:
                    new_population.append(self.mutation(selected[i]))
            
            population = new_population
            
            if generation % 10 == 0:
                print(f"Generation {generation}: Best Fitness = {best_fitness:.2f}")
        
        return self.chromosome_to_schedule(best_solution), best_fitness

# 使用示例
equipment_pool = [1, 2, 3, 4, 5]  # 设备ID列表
requests = [
    {'device_age': 3, 'device_utilization_rate': 0.7, 
     'user_experience_level': 2, 'user_cancellation_rate': 0.1,
     'experiment_complexity': 3, 'device_avg_duration': 2.5,
     'user_avg_duration': 3.0},
    # ... 更多请求
]

scheduler = GeneticScheduler(equipment_pool, requests, predictor)
optimal_schedule, fitness = scheduler.evolve()
print(f"Optimal schedule: {optimal_schedule}")
print(f"Fitness: {fitness}")

3.3 实时排期调整

在实际运行中,需要考虑实时调整:

class RealTimeScheduler:
    def __init__(self, predictor, optimizer):
        self.predictor = predictor
        self.optimizer = optimizer
        self.active_reservations = []
    
    def check_conflict_realtime(self, new_request):
        """
        实时检查新请求是否会与现有预约冲突
        """
        features = self.extract_features(new_request)
        risk = self.predictor.predict_scheduling_risk(features)
        
        if risk['risk_level'] == '高':
            # 建议替代时间
            alternative_times = self.suggest_alternative_times(new_request)
            return {
                'status': 'rejected',
                'reason': 'High conflict risk',
                'alternatives': alternative_times
            }
        
        return {'status': 'approved', 'risk': risk}
    
    def suggest_alternative_times(self, request):
        """
        建议替代时间
        """
        # 生成候选时间段
        candidates = []
        for day_offset in range(7):  # 未来7天
            for hour in range(8, 18):  # 工作时间
                candidate_time = self.get_candidate_time(day_offset, hour)
                candidate_features = self.build_features(request, candidate_time)
                risk = self.predictor.predict_scheduling_risk(candidate_features)
                
                if risk['risk_level'] in ['低', '中']:
                    candidates.append({
                        'time': candidate_time,
                        'risk': risk['conflict_probability'][0],
                        'duration': risk['predicted_duration'][0]
                    })
        
        # 按风险排序
        candidates.sort(key=lambda x: x['risk'])
        return candidates[:5]  # 返回前5个建议
    
    def adjust_existing_schedule(self, emergency_event):
        """
        处理紧急事件(如设备故障)时的排期调整
        """
        # 找到受影响的预约
        affected = [r for r in self.active_reservations 
                   if r['equipment_id'] == emergency_event['equipment_id']]
        
        # 重新排程受影响的预约
        for reservation in affected:
            # 寻找新设备
            alternative_equipment = self.find_alternative_equipment(reservation)
            
            if alternative_equipment:
                # 更新排程
                self.update_reservation(reservation, alternative_equipment)
            else:
                # 延迟预约
                self.delay_reservation(reservation, emergency_event['duration'])
    
    def find_alternative_equipment(self, reservation):
        """
        为预约寻找替代设备
        """
        available_equipment = self.get_available_equipment(
            reservation['required_features']
        )
        
        best_equipment = None
        best_score = float('inf')
        
        for equipment in available_equipment:
            # 评估替代设备的适用性
            score = self.evaluate_equipment_fit(reservation, equipment)
            if score < best_score:
                best_score = score
                best_equipment = equipment
        
        return best_equipment

四、系统集成与部署

4.1 系统架构设计

一个完整的设备排期预测管理系统应该包含以下组件:

┌─────────────────────────────────────────────────────────────┐
│                     Web Interface                           │
│  (预约申请、排期查看、风险提示、建议替代方案)                 │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│                  API Gateway                                │
│  (请求路由、认证授权、限流)                                 │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│              Prediction Service                             │
│  - 冲突预测模型                                             │
│  - 时长预测模型                                             │
│  - 实时风险评估                                             │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│              Optimization Engine                            │
│  - 智能排期算法                                             │
│  - 替代方案生成                                             │
│  - 动态调整策略                                             │
└──────────────────────┬──────────────────────────────────────┘
                       │
┌──────────────────────▼──────────────────────────────────────┐
│              Data Storage & Processing                      │
│  - 历史数据存储                                             │
│  - 特征工程管道                                             │
│  - 模型训练与更新                                           │
└─────────────────────────────────────────────────────────────┘

4.2 REST API实现

使用FastAPI构建一个简单的REST API:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import uvicorn

app = FastAPI(title="智能设备排期预测系统")

# 数据模型
class ReservationRequest(BaseModel):
    user_id: str
    equipment_id: int
    experiment_type: str
    planned_duration: float
    start_time: str  # ISO格式
    experiment_complexity: int
    sample_count: int

class PredictionResponse(BaseModel):
    conflict_probability: float
    predicted_duration: float
    risk_level: str
    suggestions: List[str]

class ScheduleResponse(BaseModel):
    schedule: List[Dict]
    total_conflict_risk: float
    optimization_score: float

# 全局变量(实际应用中应该使用依赖注入)
predictor = None
scheduler = None

@app.on_event("startup")
async def load_models():
    """
    启动时加载模型
    """
    global predictor, scheduler
    try:
        predictor = joblib.load('models/conflict_predictor.pkl')
        scheduler = joblib.load('models/optimizer.pkl')
        print("Models loaded successfully")
    except Exception as e:
        print(f"Failed to load models: {e}")

@app.post("/predict", response_model=PredictionResponse)
async def predict_risk(request: ReservationRequest):
    """
    预测单个预约的风险
    """
    try:
        # 特征提取
        features = extract_features_from_request(request)
        
        # 预测
        risk = predictor.predict_scheduling_risk(features)
        
        # 生成建议
        suggestions = []
        if risk['risk_level'] == '高':
            alternatives = scheduler.suggest_alternative_times(request)
            suggestions = [f"建议考虑 {alt['time']} (风险: {alt['risk']:.2%})" 
                          for alt in alternatives[:3]]
        
        return PredictionResponse(
            conflict_probability=risk['conflict_probability'][0],
            predicted_duration=risk['predicted_duration'][0],
            risk_level=risk['risk_level'][0],
            suggestions=suggestions
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/schedule", response_model=ScheduleResponse)
async def create_optimal_schedule(requests: List[ReservationRequest]):
    """
    为多个预约请求生成最优排期
    """
    try:
        # 转换为内部格式
        equipment_pool = list(set(r.equipment_id for r in requests))
        request_data = [{
            'device_age': 3,  # 从数据库获取
            'device_utilization_rate': 0.7,
            'user_experience_level': 2,
            'user_cancellation_rate': 0.1,
            'experiment_complexity': r.experiment_complexity,
            'device_avg_duration': 2.5,
            'user_avg_duration': 3.0
        } for r in requests]
        
        # 执行优化
        optimal_schedule, fitness = scheduler.evolve()
        
        # 计算总体风险
        total_risk = sum(slot.get('conflict_prob', 0) for slot in optimal_schedule)
        
        return ScheduleResponse(
            schedule=optimal_schedule,
            total_conflict_risk=total_risk,
            optimization_score=1.0 / (1.0 + fitness)
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/equipment/{equipment_id}/availability")
async def get_availability(equipment_id: int, date: str):
    """
    查询设备可用性
    """
    try:
        # 这里应该查询数据库
        # 返回该设备在指定日期的可用时间段
        available_slots = [
            {"start": "09:00", "end": "12:00", "risk": 0.1},
            {"start": "13:00", "end": "17:00", "risk": 0.2}
        ]
        return {"equipment_id": equipment_id, "available_slots": available_slots}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

def extract_features_from_request(request: ReservationRequest) -> np.ndarray:
    """
    从请求中提取特征
    """
    # 实际应用中需要从数据库查询用户和设备的历史数据
    start_time = datetime.fromisoformat(request.start_time)
    
    features = np.array([
        start_time.hour,
        start_time.weekday(),
        start_time.month,
        3.0,  # device_age (从数据库获取)
        0.7,  # device_utilization_rate
        2.0,  # user_experience_level
        0.1,  # user_cancellation_rate
        request.experiment_complexity,
        1 if start_time.weekday() in [5, 6] else 0,
        2.5,  # device_avg_duration
        3.0   # user_avg_duration
    ]).reshape(1, -1)
    
    return features

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

4.3 模型持续更新

class ModelUpdater:
    def __init__(self, predictor, db_path):
        self.predictor = predictor
        self.db_path = db_path
        self.retrain_threshold = 100  # 新数据达到100条时重训练
    
    def check_and_update(self):
        """
        检查是否需要更新模型
        """
        conn = sqlite3.connect(self.db_path)
        
        # 获取新数据数量
        cursor = conn.cursor()
        cursor.execute("""
            SELECT COUNT(*) FROM equipment_reservations 
            WHERE created_at > (SELECT MAX(training_date) FROM model_versions)
        """)
        new_data_count = cursor.fetchone()[0]
        
        if new_data_count >= self.retrain_threshold:
            print(f"New data count: {new_data_count}, retraining models...")
            self.retrain_models()
            
            # 保存新模型版本
            self.save_model_version()
            
            # 更新模型版本记录
            cursor.execute("""
                INSERT INTO model_versions (training_date, data_count, model_path)
                VALUES (?, ?, ?)
            """, (datetime.now(), new_data_count, 'models/'))
            conn.commit()
        
        conn.close()
    
    def retrain_models(self):
        """
        重新训练模型
        """
        # 获取最新数据
        collector = EquipmentDataCollector(self.db_path)
        start_date = datetime.now() - timedelta(days=90)  # 最近90天
        usage_data = collector.collect_device_usage_data(
            start_date.strftime('%Y-%m-%d'), 
            datetime.now().strftime('%Y-%m-%d')
        )
        
        # 数据预处理
        preprocessor = DataPreprocessor()
        cleaned_data = preprocessor.clean_usage_data(usage_data)
        features = preprocessor.create_features(cleaned_data)
        target = preprocessor.create_target_variable(cleaned_data)
        
        # 重新训练
        X_train, X_test, y_train, y_test, y_train_dur, y_test_dur = \
            self.predictor.prepare_training_data(features, target, cleaned_data['actual_duration'])
        
        self.predictor.train_conflict_model(X_train, y_train)
        self.predictor.train_duration_model(X_train, y_train_dur)
        
        # 评估新模型
        results = self.predictor.evaluate_models(X_test, y_test, y_test_dur)
        
        print("Retraining completed. New performance:")
        print(f"F1 Score: {results['conflict_report']['weighted avg']['f1-score']:.3f}")
        print(f"RMSE: {results['duration_rmse']:.2f}")
    
    def save_model_version(self, version_name=None):
        """
        保存模型版本
        """
        if version_name is None:
            version_name = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        import os
        os.makedirs('models/versions', exist_ok=True)
        
        joblib.dump(self.predictor.conflict_model, 
                   f'models/versions/conflict_model_{version_name}.pkl')
        joblib.dump(self.predictor.duration_model, 
                   f'models/versions/duration_model_{version_name}.pkl')
        
        print(f"Model version {version_name} saved.")

五、实施策略与最佳实践

5.1 分阶段实施计划

第一阶段:数据收集与基线建立(1-2个月)

  • 部署数据收集系统
  • 建立数据仓库
  • 分析历史数据模式
  • 建立当前性能基线

第二阶段:模型开发与验证(2-3个月)

  • 开发预测模型
  • 在历史数据上验证
  • A/B测试(与传统排期并行)
  • 收集用户反馈

第三阶段:系统集成与试点(1-2个月)

  • 开发用户界面
  • 集成到现有系统
  • 选择1-2个实验室试点
  • 培训用户

第四阶段:全面推广与持续优化(持续)

  • 逐步扩大使用范围
  • 建立模型更新机制
  • 持续监控和优化

5.2 关键成功因素

  1. 数据质量:确保数据的准确性和完整性
  2. 用户参与:让科研人员参与系统设计和反馈
  3. 透明度:向用户解释预测结果和建议
  4. 灵活性:允许用户覆盖系统建议
  5. 持续改进:建立反馈循环和模型更新机制

5.3 常见陷阱与规避方法

陷阱1:过度依赖自动化

  • 问题:完全自动化可能忽视特殊情况
  • 解决方案:保持人工审核和覆盖机制

陷阱2:数据偏差

  • 问题:历史数据可能包含偏见
  • 解决方案:定期审计数据,确保代表性

陷阱3:模型漂移

  • 问题:设备和用户行为会随时间变化
  • 解决方案:建立定期模型更新机制

陷阱4:用户抵触

  • 问题:科研人员可能不信任AI建议
  • 解决方案:提供透明的解释,展示成功案例

六、效果评估与持续改进

6.1 关键指标监控

class PerformanceMonitor:
    def __init__(self, db_path):
        self.db_path = db_path
    
    def calculate_kpis(self, start_date, end_date):
        """
        计算关键绩效指标
        """
        conn = sqlite3.connect(self.db_path)
        
        # 设备利用率
        utilization_query = """
        SELECT 
            device_id,
            SUM(actual_duration) / (COUNT(DISTINCT DATE(reservation_time)) * 24) as utilization_rate
        FROM equipment_reservations
        WHERE reservation_time BETWEEN ? AND ?
        GROUP BY device_id
        """
        utilization_df = pd.read_sql_query(utilization_query, conn, 
                                          params=[start_date, end_date])
        
        # 冲突率
        conflict_query = """
        SELECT 
            COUNT(CASE WHEN conflict = 1 THEN 1 END) * 1.0 / COUNT(*) as conflict_rate
        FROM equipment_reservations
        WHERE reservation_time BETWEEN ? AND ?
        """
        conflict_rate = pd.read_sql_query(conflict_query, conn, 
                                         params=[start_date, end_date]).iloc[0, 0]
        
        # 平均延误时间
        delay_query = """
        SELECT 
            AVG(delay_minutes) as avg_delay
        FROM equipment_reservations
        WHERE reservation_time BETWEEN ? AND ?
        AND delay_minutes IS NOT NULL
        """
        avg_delay = pd.read_sql_query(delay_query, conn, 
                                     params=[start_date, end_date]).iloc[0, 0]
        
        # 用户满意度(基于取消率和重新预约率)
        satisfaction_query = """
        SELECT 
            user_id,
            AVG(CASE WHEN status = 'cancelled' THEN 1 ELSE 0 END) as cancellation_rate,
            AVG(CASE WHEN status = 'rescheduled' THEN 1 ELSE 0 END) as reschedule_rate
        FROM equipment_reservations
        WHERE reservation_time BETWEEN ? AND ?
        GROUP BY user_id
        """
        satisfaction_df = pd.read_sql_query(satisfaction_query, conn, 
                                           params=[start_date, end_date])
        
        conn.close()
        
        return {
            'avg_utilization_rate': utilization_df['utilization_rate'].mean(),
            'conflict_rate': conflict_rate,
            'avg_delay_minutes': avg_delay,
            'avg_cancellation_rate': satisfaction_df['cancellation_rate'].mean(),
            'avg_reschedule_rate': satisfaction_df['reschedule_rate'].mean()
        }
    
    def generate_report(self, period='monthly'):
        """
        生成性能报告
        """
        from datetime import datetime, timedelta
        
        if period == 'monthly':
            end_date = datetime.now()
            start_date = end_date - timedelta(days=30)
        
        kpis = self.calculate_kpis(start_date.strftime('%Y-%m-%d'), 
                                  end_date.strftime('%Y-%m-%d'))
        
        print(f"=== Performance Report ({start_date.strftime('%Y-%m')} to {end_date.strftime('%Y-%m')}) ===")
        print(f"设备平均利用率: {kpis['avg_utilization_rate']:.2%}")
        print(f"冲突率: {kpis['conflict_rate']:.2%}")
        print(f"平均延误时间: {kpis['avg_delay_minutes']:.2f} 分钟")
        print(f"平均取消率: {kpis['avg_cancellation_rate']:.2%}")
        print(f"平均重新预约率: {kpis['avg_reschedule_rate']:.2%}")
        
        # 趋势分析
        self.analyze_trends()
        
        return kpis
    
    def analyze_trends(self):
        """
        趋势分析
        """
        # 这里可以实现更复杂的趋势分析
        # 例如:使用时间序列分析检测性能变化
        pass

6.2 A/B测试框架

class ABTestFramework:
    def __init__(self, db_path):
        self.db_path = db_path
    
    def run_ab_test(self, test_name, duration_days=30):
        """
        运行A/B测试
        """
        conn = sqlite3.connect(self.db_path)
        
        # 分组:50%使用新系统,50%使用传统系统
        cursor = conn.cursor()
        cursor.execute("""
            UPDATE users 
            SET group = CASE 
                WHEN user_id % 2 = 0 THEN 'A' 
                ELSE 'B' 
            END
        """)
        
        # 收集数据
        start_date = datetime.now()
        end_date = start_date + timedelta(days=duration_days)
        
        # 在测试期间,分别记录两组的性能
        # A组:使用预测系统
        # B组:使用传统系统
        
        # 测试结束后分析
        results = self.analyze_ab_results(test_name, start_date, end_date)
        
        conn.close()
        return results
    
    def analyze_ab_results(self, test_name, start_date, end_date):
        """
        分析A/B测试结果
        """
        conn = sqlite3.connect(self.db_path)
        
        query = """
        SELECT 
            group_type,
            AVG(conflict_rate) as avg_conflict_rate,
            AVG(delay_minutes) as avg_delay,
            AVG(utilization_rate) as avg_utilization,
            COUNT(*) as sample_size
        FROM experiment_results
        WHERE test_name = ? AND date BETWEEN ? AND ?
        GROUP BY group_type
        """
        
        df = pd.read_sql_query(query, conn, 
                              params=[test_name, start_date, end_date])
        
        conn.close()
        
        # 统计显著性检验
        from scipy import stats
        
        # 这里简化处理,实际应该收集详细数据
        print(f"A/B Test Results for {test_name}:")
        print(df.to_string(index=False))
        
        # 判断是否有显著改善
        if len(df) == 2:
            group_a = df[df['group_type'] == 'A'].iloc[0]
            group_b = df[df['group_type'] == 'B'].iloc[0]
            
            improvement = {
                'conflict_rate': (group_b['avg_conflict_rate'] - group_a['avg_conflict_rate']) / group_a['avg_conflict_rate'],
                'delay': (group_b['avg_delay'] - group_a['avg_delay']) / group_a['avg_delay'],
                'utilization': (group_a['avg_utilization'] - group_b['avg_utilization']) / group_b['avg_utilization']
            }
            
            print("\nImprovement:")
            for metric, value in improvement.items():
                print(f"{metric}: {value:.2%}")
        
        return df

七、结论与展望

通过数据预测来优化科研设备排期管理是一个复杂但极具价值的课题。本文从数据基础、模型构建、智能优化、系统集成等多个维度进行了详细阐述。关键要点包括:

  1. 数据是核心:高质量、多维度的数据是构建有效预测模型的基础
  2. 模型选择要匹配问题:根据具体需求选择合适的预测模型和优化算法
  3. 系统集成是关键:将预测能力无缝集成到现有工作流程中
  4. 持续改进是保障:建立监控、评估和更新机制

未来,随着技术的发展,我们可以期待:

  • 更精准的预测:结合更多外部数据源(如天气、学术会议等)
  • 更智能的优化:使用强化学习进行动态排期
  • 更自然的交互:通过对话式AI进行排期管理
  • 更广泛的适用性:将经验推广到其他科研资源管理场景

通过科学的数据分析和智能算法,我们能够显著提升科研设备的使用效率,减少资源浪费,最终加速科学发现的进程。这不仅是技术的进步,更是科研管理理念的革新。