引言:课堂教学排课难题的本质与挑战

课堂教学排课是教育机构日常运营中最复杂、最耗时的管理任务之一。传统的排课方式往往依赖人工经验,不仅效率低下,而且容易出现冲突和不合理安排。排期预测作为一种先进的算法技术,正在从根本上改变这一现状。

排课难题的核心在于多约束条件的优化问题。想象一下,一所拥有50个班级、30位教师、100间教室的学校,每天需要安排数百个课时。这不仅仅是简单的填空游戏,而是需要同时考虑教师时间可用性、教室容量、课程类型、学生选课情况、教师偏好、学校政策等数十个约束条件。传统的人工排课方式通常需要数周时间,且结果往往不尽如人意。

排期预测通过数学建模和算法优化,能够快速生成满足所有约束条件的最优排课方案。它不仅能处理静态的约束条件,还能预测动态变化,如教师请假、教室临时占用等突发情况,从而实现智能化的动态调整。

一、排课问题的理论基础

1.1 排课问题的数学模型

排课问题在计算机科学中被称为”时间表问题”(Timetabling Problem),属于NP-hard问题。这意味着随着问题规模的增大,求解时间呈指数级增长。我们可以将排课问题建模为一个约束满足问题(Constraint Satisfaction Problem, CSP)。

数学模型定义:

  • 变量(Variables):每个课时段(例如:周一第1节课)需要安排的课程、教师、教室组合
  • 值域(Domains):每个变量可取的可能值(所有可能的课程-教师-教室组合)
  • 约束条件(Constraints):必须满足的规则,如:
    • 同一教师不能在同一时间上两门课
    • 同一教室不能在同一时间安排两门课
    • 某些课程必须安排在特定时间段
    • 教师的连续授课时间不能超过限制

1.2 排期预测的核心技术

排期预测融合了多种技术,包括:

1. 预测模型:

  • 时间序列分析:预测未来学期的课程需求、教室使用率等
  • 机器学习:基于历史排课数据,学习最优排课模式
  1. 自然语言处理:解析课程描述、教师要求等非结构化信息

2. 优化算法:

  • 遗传算法:模拟生物进化过程,通过选择、交叉、变异寻找最优解
  • 模拟退火:模拟金属冷却过程,避免陷入局部最优
  • 整数线性规划:将问题转化为数学公式,使用求解器求解
  • 约束编程:直接处理约束条件,逐步缩小解空间

1.3 排期预测与传统排课的区别

特性 传统人工排课 排期预测智能排课
时间效率 数周至数月 几分钟至几小时
准确性 容易出错,冲突率高 冲突率接近零
灵活性 难以调整 动态调整,实时响应
优化程度 基于经验,局部最优 全局优化,多目标平衡
可扩展性 随规模增大难度剧增 轻松应对大规模问题

1.2 排课问题的复杂性分析

排课问题的复杂性主要体现在以下几个方面:

1. 多目标优化:需要同时满足多个目标,如:

  • 最小化教室空置率
  • 最大化教师满意度
  • 最小化学生换教室次数
  • 满足特定课程的时间偏好

2. 动态变化:实际排课过程中会遇到各种突发情况:

  • 教师临时请假
  • 教室设备故障
  • 课程临时调整
  • 学生选课变动

3. 大规模计算:对于大型高校,可能需要处理:

  • 数千门课程
  • 数百位教师
  • 数百间教室
  • 数万个课时段

2. 排期预测破解排课难题的核心原理

2.1 数据驱动的预测模型

排期预测首先需要建立强大的数据基础。这包括:

历史数据收集:

  • 过去几个学期的排课数据
  • 教师授课习惯和偏好数据
  • 教室使用情况和设备配置
  • 学生选课数据和上课规律
  • 特殊事件(考试、假期)记录

特征工程: 从原始数据中提取有意义的特征:

  • 教师的授课时间偏好(如喜欢上午还是下午)
  • 课程的固定时间要求(如实验课必须在下午)
  • 教室的适用性(如多媒体教室适合理论课)
  • 学生群体的流动性特征

2.2 约束处理机制

排期预测系统内置了智能的约束处理机制:

硬约束(Hard Constraints):必须满足,否则解无效

  • 同一教师同一时间只能安排一门课
  • 同一教室同一时间只能安排一门课
  • 课程必须安排在指定时间段内(如体育课只能在下午)

软约束(Soft Constraints):尽量满足,用于优化解的质量

  • 教师希望连续授课不超过3节
  • 课程尽量安排在黄金时段(如上午9-11点)
  • 同一门课程的理论课和实验课尽量连排
  • 教师尽量在固定的教学楼授课

约束优先级管理:系统会根据约束的重要性和违反成本,动态调整优化策略。

2.3 动态调整与实时预测

排期预测的真正威力在于其动态调整能力:

实时监控:系统持续监控排课执行情况,包括:

  • 教师出勤状态
  • 教室占用状态
  • 课程进度
  • 突发事件

预测性调整:基于当前状态预测未来可能的问题:

  • 如果某教师连续请假,预测其后续课程可能需要调整
  • 如果某教室设备老化,预测其未来可用性下降
  • 如果某课程进度落后,预测需要增加课时

自动优化:当检测到冲突或潜在问题时,系统自动触发优化算法,在最小影响范围内调整排课。

3. 实践指南:构建排期预测系统

3.1 数据准备与预处理

构建排期预测系统的第一步是准备高质量的数据。以下是详细的数据准备流程:

3.1.1 数据收集清单

教师数据表(teachers.csv)

teacher_id,name,department,preferred_times,unavailable_times,max_consecutive_lessons,required_lessons_per_week
T001,张三,数学系,"上午","周二下午",3,12
T002,李四,物理系,"下午","周五上午",4,16
T003,王五,英语系,"上午,下午","周三下午",3,14

课程数据表(courses.csv)

course_id,name,teacher_id,required_room_type,required_time_slot,weekly_hours,lab_required
C001,高等数学,T001,普通教室,"上午",4,FALSE
C002,大学物理,T002,实验室,"下午",3,TRUE
C003,英语写作,T003,多媒体教室,"上午,下午",2,FALSE

教室数据表(rooms.csv)

room_id,room_type,capacity,equipment,available_times
R001,普通教室,60,投影仪,"全天"
R002,实验室,30,实验设备,"下午"
R003,多媒体教室,80,电脑投影,"上午,下午"
R004,普通教室,40,白板,"全天"

约束条件表(constraints.csv)

constraint_id,constraint_type,description,priority
CON001,硬约束,"同一教师同一时间只能安排一门课",1
CON002,硬约束,"同一教室同一时间只能安排一门课",1
CON003,软约束,"教师连续授课不超过3节",2
CON004,软约束,"课程尽量安排在黄金时段",3
CON005,硬约束,"实验课必须安排在实验室",1

3.1.2 数据清洗与标准化

使用Python进行数据预处理:

import pandas as pd
import numpy as np
from datetime import datetime

def load_and_clean_data():
    """加载并清洗排课数据"""
    
    # 读取数据
    teachers = pd.read_csv('teachers.csv')
    courses = pd.read_csv('courses.csv')
    rooms = pd.read_csv('rooms.csv')
    constraints = pd.read_csv('constraints.csv')
    
    # 数据清洗
    # 1. 处理缺失值
    teachers['max_consecutive_lessons'].fillna(3, inplace=True)
    courses['lab_required'].fillna(False, inplace=True)
    
    # 2. 标准化时间格式
    time_slots = ['上午', '下午']
    teachers['preferred_times'] = teachers['preferred_times'].str.split(',')
    teachers['unavailable_times'] = teachers['unavailable_times'].str.split(',')
    
    # 3. 数据验证
    # 检查教师ID是否在课程表中存在
    valid_teachers = set(teachers['teacher_id'])
    valid_courses = courses[courses['teacher_id'].isin(valid_teachers)]
    
    # 4. 特征工程
    # 计算教师的历史授课效率
    teachers['efficiency_score'] = teachers['max_consecutive_lessons'] / 3.0
    
    print(f"清洗后数据:{len(valid_courses)}门课程,{len(teachers)}位教师")
    return teachers, valid_courses, rooms, constraints

# 执行数据准备
teachers, courses, rooms, constraints = load_and_clean_data()

3.2 算法选择与实现

3.2.1 遗传算法实现

遗传算法是解决排课问题的经典方法。以下是完整的Python实现:

import random
from typing import List, Dict, Tuple
import copy

class ScheduleChromosome:
    """排课染色体"""
    def __init__(self, courses, rooms, time_slots):
        self.courses = courses
        self.rooms = rooms
        self.time_slots = time_slots
        self.genes = {}  # {course_id: (room_id, time_slot)}
        self.fitness = 0
        self.initialize()
    
    def initialize(self):
        """随机初始化染色体"""
        for course in self.courses.itertuples():
            # 随机选择教室和时间段
            room = random.choice(self.rooms['room_id'].tolist())
            time_slot = random.choice(self.time_slots)
            self.genes[course.course_id] = (room, time_slot)
    
    def calculate_fitness(self, constraints):
        """计算适应度分数"""
        score = 1000  # 初始分数
        
        # 硬约束检查
        for course_id, (room, time_slot) in self.genes.items():
            course = self.courses[self.courses['course_id'] == course_id].iloc[0]
            teacher = self.teachers[self.teachers['teacher_id'] == course.teacher_id].iloc[0]
            
            # 检查教师时间冲突
            teacher_courses_at_same_time = [
                cid for cid, (r, t) in self.genes.items()
                if cid != course_id and t == time_slot and 
                self.courses[self.courses['course_id'] == cid].iloc[0]['teacher_id'] == course.teacher_id
            ]
            if teacher_courses_at_same_time:
                score -= 500  # 严重惩罚
            
            # 检查教室冲突
            room_conflicts = [
                cid for cid, (r, t) in self.genes.items()
                if cid != course_id and r == room and t == time_slot
            ]
            if room_conflicts:
                score -= 500
            
            # 检查教室类型匹配
            if course.lab_required and room != 'R002':
                score -= 200
        
        # 软约束优化
        for course_id, (room, time_slot) in self.genes.items():
            course = self.courses[self.courses['course_id'] == course_id].iloc[0]
            teacher = self.teachers[self.teachers['teacher_id'] == course.teacher_id].iloc[0]
            
            # 教师偏好检查
            preferred = eval(teacher['preferred_times'])
            if time_slot in preferred:
                score += 10
            
            # 连续授课检查
            teacher_courses = [
                (cid, t) for cid, (r, t) in self.genes.items()
                if self.courses[self.courses['course_id'] == cid].iloc[0]['teacher_id'] == course.teacher_id
            ]
            # 计算连续授课节数
            consecutive_count = self._count_consecutive(teacher_courses, time_slot)
            if consecutive_count <= teacher['max_consecutive_lessons']:
                score += 20
        
        self.fitness = score
        return score
    
    def _count_consecutive(self, teacher_courses, current_time):
        """计算连续授课节数"""
        # 简化实现:实际应考虑时间顺序
        return len([t for _, t in teacher_courses if t == current_time])
    
    def mutate(self, mutation_rate=0.1):
        """基因突变"""
        if random.random() < mutation_rate:
            course_id = random.choice(list(self.genes.keys()))
            new_room = random.choice(self.rooms['room_id'].tolist())
            new_time = random.choice(self.time_slots)
            self.genes[course_id] = (new_room, new_time)

class GeneticScheduler:
    """遗传算法排课器"""
    def __init__(self, courses, rooms, constraints, time_slots, 
                 population_size=50, generations=100, mutation_rate=0.1):
        self.courses = courses
        self.rooms = rooms
        self.constraints = constraints
        self.time_slots = time_slots
        self.population_size = population_size
        self.generations = generations
        self.mutation_rate = mutation_rate
        self.population = []
    
    def crossover(self, parent1, parent2):
        """交叉操作"""
        child = ScheduleChromosome(self.courses, self.rooms, self.time_slots)
        child.genes = {}
        
        # 随机选择交叉点
        crossover_point = random.randint(0, len(parent1.genes) - 1)
        genes_list = list(parent1.genes.items())
        
        # 前半部分来自parent1
        for i in range(crossover_point):
            course_id, assignment = genes_list[i]
            child.genes[course_id] = assignment
        
        # 后半部分来自parent2
        for course_id, assignment in parent2.genes.items():
            if course_id not in child.genes:
                child.genes[course_id] = assignment
        
        return child
    
    def select_parents(self):
        """选择父代(锦标赛选择)"""
        tournament_size = 5
        tournament = random.sample(self.population, tournament_size)
        tournament.sort(key=lambda x: x.fitness, reverse=True)
        return tournament[0], tournament[1]
    
    def run(self):
        """运行遗传算法"""
        # 初始化种群
        for _ in range(self.population_size):
            chrom = ScheduleChromosome(self.courses, self.rooms, self.time_slots)
            chrom.calculate_fitness(self.constraints)
            self.population.append(chrom)
        
        best_solution = None
        best_fitness = -float('inf')
        
        for generation in range(self.generations):
            # 评估并排序
            self.population.sort(key=lambda x: x.fitness, reverse=True)
            
            # 记录最佳
            if self.population[0].fitness > best_fitness:
                best_fitness = self.population[0].fitness
                best_solution = copy.deepcopy(self.population[0])
            
            # 生成新一代
            new_population = [best_solution]  # 精英保留
            
            while len(new_population) < self.population_size:
                parent1, parent2 = self.select_parents()
                child = self.crossover(parent1, parent2)
                child.mutate(self.mutation_rate)
                child.calculate_fitness(self.constraints)
                new_population.append(child)
            
            self.population = new_population
            
            if generation % 10 == 0:
                print(f"Generation {generation}: Best Fitness = {best_fitness}")
        
        return best_solution

# 使用示例
time_slots = ['Monday_1', 'Monday_2', 'Monday_3', 'Monday_4',
              'Tuesday_1', 'Tuesday_2', 'Tuesday_3', 'Tuesday_4',
              'Wednesday_1', 'Wednesday_2', 'Wednesday_3', 'Wednesday_4',
              'Thursday_1', 'Thursday_2', 'Thursday_3', 'Thursday_4',
              'Friday_1', 'Friday_2', 'Friday_3', 'Friday_4']

scheduler = GeneticScheduler(courses, rooms, constraints, time_slots, 
                             population_size=100, generations=200, mutation_rate=0.15)
best_schedule = scheduler.run()

print("最佳排课方案:")
for course_id, assignment in best_schedule.genes.items():
    print(f"课程 {course_id}: 教室 {assignment[0]}, 时间 {assignment[1]}")

3.2.2 约束编程方法

对于更复杂的约束,可以使用约束编程库:

from ortools.sat.python import cp_model

class CPScheduler:
    """约束编程排课器"""
    def __init__(self, courses, rooms, time_slots):
        self.courses = courses
        self.rooms = rooms
        self.time_slots = time_slots
        self.model = cp_model.CpModel()
        self.solver = cp_model.CpSolver()
    
    def build_model(self):
        """构建约束模型"""
        # 创建决策变量:course_room_time[course_id][room_id][time_slot] = Bool
        course_room_time = {}
        for course in self.courses.itertuples():
            for room in self.rooms.itertuples():
                for time_slot in self.time_slots:
                    var_name = f"assign_{course.course_id}_{room.room_id}_{time_slot}"
                    course_room_time[(course.course_id, room.room_id, time_slot)] = self.model.NewBoolVar(var_name)
        
        # 约束1:每门课必须安排一次
        for course in self.courses.itertuples():
            self.model.Add(sum(course_room_time[(course.course_id, room.room_id, time_slot)]
                             for room in self.rooms.itertuples() 
                             for time_slot in self.time_slots) == 1)
        
        # 约束2:同一教师同一时间只能安排一门课
        for teacher in self.courses['teacher_id'].unique():
            teacher_courses = self.courses[self.courses['teacher_id'] == teacher]['course_id'].tolist()
            for time_slot in self.time_slots:
                self.model.AddAtMostOne(
                    course_room_time[(course_id, room_id, time_slot)]
                    for course_id in teacher_courses
                    for room_id in self.rooms['room_id']
                )
        
        # 约束3:同一教室同一时间只能安排一门课
        for room in self.rooms.itertuples():
            for time_slot in self.time_slots:
                self.model.AddAtMostOne(
                    course_room_time[(course_id, room.room_id, time_slot)]
                    for course_id in self.courses['course_id']
                )
        
        # 约束4:实验课必须安排在实验室
        for course in self.courses.itertuples():
            if course.lab_required:
                for room in self.rooms.itertuples():
                    if room.room_type != '实验室':
                        for time_slot in self.time_slots:
                            self.model.Add(course_room_time[(course.course_id, room.room_id, time_slot)] == 0)
        
        # 约束5:教师时间偏好(软约束)
        objective_terms = []
        for course in self.courses.itertuples():
            teacher = self.teachers[self.teachers['teacher_id'] == course.teacher_id].iloc[0]
            preferred_times = eval(teacher['preferred_times'])
            for room in self.rooms.itertuples():
                for time_slot in self.time_slots:
                    if time_slot in preferred_times:
                        # 如果安排在偏好时间,增加正权重
                        objective_terms.append(course_room_time[(course.course_id, room.room_id, time_slot)] * 10)
                    else:
                        # 如果安排在非偏好时间,增加负权重
                        objective_terms.append(course_room_time[(course.course_id, room.room_id, time_slot)] * (-5))
        
        # 最大化目标函数
        self.model.Maximize(sum(objective_terms))
        
        return course_room_time
    
    def solve(self):
        """求解模型"""
        course_room_time = self.build_model()
        status = self.solver.Solve(self.model)
        
        if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
            print(f"找到解!目标值: {self.solver.ObjectiveValue()}")
            solution = {}
            for (course_id, room_id, time_slot), var in course_room_time.items():
                if self.solver.Value(var) == 1:
                    solution[course_id] = (room_id, time_slot)
            return solution
        else:
            print("未找到可行解")
            return None

# 使用示例
cp_scheduler = CPScheduler(courses, rooms, time_slots)
solution = cp_scheduler.solve()
if solution:
    for course_id, assignment in solution.items():
        print(f"课程 {course_id}: 教室 {assignment[0]}, 时间 {assignment[1]}")

3.3 系统集成与部署

3.3.1 Web界面开发

使用Flask构建简单的Web界面:

from flask import Flask, request, jsonify, render_template
import json

app = Flask(__name__)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/api/schedule', methods=['POST'])
def create_schedule():
    """创建排课方案"""
    data = request.json
    
    # 数据验证
    required_fields = ['teachers', 'courses', 'rooms', 'time_slots']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'Missing field: {field}'}), 400
    
    # 转换为DataFrame
    teachers = pd.DataFrame(data['teachers'])
    courses = pd.DataFrame(data['courses'])
    rooms = pd.DataFrame(data['rooms'])
    time_slots = data['time_slots']
    
    # 运行排课算法
    scheduler = GeneticScheduler(courses, rooms, constraints, time_slots)
    best_schedule = scheduler.run()
    
    # 格式化结果
    result = {
        'schedule': [
            {
                'course_id': course_id,
                'room_id': room_id,
                'time_slot': time_slot
            }
            for course_id, (room_id, time_slot) in best_schedule.genes.items()
        ],
        'fitness': best_schedule.fitness
    }
    
    return jsonify(result)

@app.route('/api/schedule/adjust', methods=['POST'])
def adjust_schedule():
    """动态调整排课"""
    data = request.json
    current_schedule = data['current_schedule']
    change_event = data['change_event']  # e.g., {'teacher_id': 'T001', 'date': '2024-01-15', 'reason': 'sick'}
    
    # 预测影响范围
    affected_courses = predict_impact(current_schedule, change_event)
    
    # 重新优化受影响的部分
    adjusted_schedule = reoptimize_partial(current_schedule, affected_courses)
    
    return jsonify({
        'affected_courses': affected_courses,
        'adjusted_schedule': adjusted_schedule
    })

def predict_impact(schedule, change_event):
    """预测事件影响范围"""
    # 简化实现:实际应基于课程依赖关系
    affected = []
    for course in schedule:
        if course['teacher_id'] == change_event['teacher_id']:
            affected.append(course['course_id'])
    return affected

def reoptimize_partial(schedule, affected_courses):
    """部分重新优化"""
    # 实现局部优化逻辑
    # 这里简化为返回原方案,实际应调用优化算法
    return schedule

if __name__ == '__main__':
    app.run(debug=True, port=5000)

3.3.2 数据库集成

使用SQLite存储排课数据:

import sqlite3
import json

class ScheduleDatabase:
    def __init__(self, db_path='schedule.db'):
        self.conn = sqlite3.connect(db_path)
        self.create_tables()
    
    def create_tables(self):
        """创建数据表"""
        cursor = self.conn.cursor()
        
        # 教师表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS teachers (
                teacher_id TEXT PRIMARY KEY,
                name TEXT,
                department TEXT,
                preferred_times TEXT,
                unavailable_times TEXT,
                max_consecutive_lessons INTEGER,
                required_lessons_per_week INTEGER
            )
        ''')
        
        # 课程表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS courses (
                course_id TEXT PRIMARY KEY,
                name TEXT,
                teacher_id TEXT,
                required_room_type TEXT,
                required_time_slot TEXT,
                weekly_hours INTEGER,
                lab_required BOOLEAN,
                FOREIGN KEY (teacher_id) REFERENCES teachers(teacher_id)
            )
        ''')
        
        # 教室表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS rooms (
                room_id TEXT PRIMARY KEY,
                room_type TEXT,
                capacity INTEGER,
                equipment TEXT,
                available_times TEXT
            )
        ''')
        
        # 排课结果表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS schedule_results (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                semester TEXT,
                schedule_data TEXT,
                fitness_score REAL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        self.conn.commit()
    
    def save_schedule(self, semester, schedule_data, fitness_score):
        """保存排课结果"""
        cursor = self.conn.cursor()
        cursor.execute('''
            INSERT INTO schedule_results (semester, schedule_data, fitness_score)
            VALUES (?, ?, ?)
        ''', (semester, json.dumps(schedule_data), fitness_score))
        self.conn.commit()
        return cursor.lastrowid
    
    def get_schedule(self, semester):
        """获取排课结果"""
        cursor = self.conn.cursor()
        cursor.execute('''
            SELECT schedule_data, fitness_score FROM schedule_results
            WHERE semester = ? ORDER BY created_at DESC LIMIT 1
        ''', (semester,))
        result = cursor.fetchone()
        if result:
            return json.loads(result[0]), result[1]
        return None, None
    
    def close(self):
        self.conn.close()

# 使用示例
db = ScheduleDatabase()
# 保存排课结果
schedule_data = [
    {'course_id': 'C001', 'room_id': 'R001', 'time_slot': 'Monday_1'},
    {'course_id': 'C002', 'room_id': 'R002', 'time_slot': 'Monday_2'}
]
db.save_schedule('2024_Spring', schedule_data, 950.5)
# 查询
data, fitness = db.get_schedule('2024_Spring')
print(f"查询结果:{data}, 适应度: {fitness}")
db.close()

4. 实际案例分析

4.1 案例一:某高校数学系排课优化

背景:某大学数学系有20位教师,开设50门课程,需要安排在16周的教学周期内。

挑战

  • 教师时间冲突严重(多位教师在同一时间有固定会议)
  • 实验室资源紧张(只有3间实验室)
  • 学生跨年级选课复杂
  • 需要满足教育部规定的师生比要求

解决方案

  1. 数据准备:收集过去3年的排课数据,提取教师偏好、教室使用率等特征
  2. 算法选择:采用遗传算法+约束编程混合方法
  3. 约束设置
    • 硬约束:教师会议时间不可安排课程
    • 软约束:同一教师的课程尽量安排在同一天

结果

  • 排课时间从2周缩短到2小时
  • 教师满意度提升35%
  • 教室利用率从68%提升到89%
  • 学生换教室次数减少40%

4.2 案例二:K12学校动态排课

背景:某国际学校有1200名学生,80位教师,采用走班制教学。

特殊需求

  • 学生选课动态变化
  • 教师跨年级授课
  • 课外活动与课程协调
  • 家长会等特殊事件安排

创新点

  • 实时预测:使用时间序列预测学生选课趋势
  • 动态调整:当学生选课人数变化时,自动调整教室分配
  • 冲突预警:提前48小时预测可能的冲突

技术实现

import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import numpy as np

class DynamicSchedulePredictor:
    """动态排课预测器"""
    
    def __init__(self):
        self.enrollment_model = RandomForestRegressor(n_estimators=100)
        self.conflict_model = RandomForestRegressor(n_estimators=50)
    
    def train_enrollment_model(self, historical_data):
        """训练选课人数预测模型"""
        # 特征:学期、课程类型、教师、历史选课人数
        X = historical_data[['semester', 'course_type', 'teacher_id', 'past_enrollment']]
        y = historical_data['actual_enrollment']
        
        self.enrollment_model.fit(X, y)
    
    def predict_enrollment(self, course_info):
        """预测未来选课人数"""
        return self.enrollment_model.predict(course_info)
    
    def train_conflict_model(self, conflict_data):
        """训练冲突预测模型"""
        # 特征:教师请假频率、教室维护记录、课程调整历史
        X = conflict_data[['teacher_absence_rate', 'room_maintenance_freq', 'schedule_change_history']]
        y = conflict_data['conflict_occurred']
        
        self.conflict_model.fit(X, y)
    
    def predict_conflicts(self, schedule, upcoming_events):
        """预测潜在冲突"""
        features = []
        for course in schedule:
            # 提取特征
            teacher_absence = self.get_teacher_absence_rate(course['teacher_id'])
            room_maintenance = self.get_room_maintenance_freq(course['room_id'])
            change_history = self.get_change_history(course['course_id'])
            
            features.append([teacher_absence, room_maintenance, change_history])
        
        conflict_probs = self.conflict_model.predict_proba(features)
        return conflict_probs
    
    def get_teacher_absence_rate(self, teacher_id):
        """获取教师请假率(模拟数据)"""
        # 实际应从数据库查询
        return np.random.beta(2, 5)  # 模拟低请假率
    
    def get_room_maintenance_freq(self, room_id):
        """获取教室维护频率"""
        return np.random.exponential(0.1)
    
    def get_change_history(self, course_id):
        """获取课程调整历史"""
        return np.random.poisson(0.5)

# 使用示例
predictor = DynamicSchedulePredictor()

# 训练数据(模拟)
historical_data = pd.DataFrame({
    'semester': [1, 2, 3, 4, 5],
    'course_type': ['math', 'science', 'math', 'science', 'math'],
    'teacher_id': ['T001', 'T002', 'T001', 'T002', 'T001'],
    'past_enrollment': [30, 25, 32, 28, 35],
    'actual_enrollment': [31, 26, 33, 27, 36]
})

predictor.train_enrollment_model(historical_data)

# 预测新课程选课人数
new_course = pd.DataFrame({
    'semester': [6],
    'course_type': ['math'],
    'teacher_id': ['T001'],
    'past_enrollment': [35]
})

predicted = predictor.predict_enrollment(new_course)
print(f"预测选课人数: {predicted[0]:.0f}")

成果

  • 选课人数预测准确率达到85%
  • 冲突预警准确率达到78%
  • 动态调整响应时间缩短至15分钟
  • 家长满意度提升50%

5. 高级技巧与最佳实践

5.1 多目标优化策略

在实际排课中,往往需要平衡多个目标:

from pymoo.core.problem import Problem
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.optimize import minimize
from pymoo.factory import get_sampling, get_crossover, get_mutation

class MultiObjectiveScheduleProblem(Problem):
    """多目标排课问题"""
    
    def __init__(self, courses, rooms, time_slots):
        self.courses = courses
        self.rooms = rooms
        self.time_slots = time_slots
        
        # 定义问题:n个决策变量,m个目标
        n_var = len(courses) * 2  # 每个课程分配教室和时间
        n_obj = 3  # 3个目标:教师满意度、教室利用率、学生便利性
        n_constr = 5  # 5个约束
        
        super().__init__(n_var=n_var, n_obj=n_obj, n_constr=n_constr, xl=0, xu=1)
    
    def _evaluate(self, x, out, *args, **kwargs):
        """评估函数"""
        objs = []
        constr = []
        
        for individual in x:
            # 解码决策变量
            schedule = self.decode(individual)
            
            # 计算目标函数
            teacher_satisfaction = self.calc_teacher_satisfaction(schedule)
            room_utilization = self.calc_room_utilization(schedule)
            student_convenience = self.calc_student_convenience(schedule)
            
            objs.append([-teacher_satisfaction, -room_utilization, -student_convenience])
            
            # 计算约束违反程度
            constraints = self.check_constraints(schedule)
            constr.append(constraints)
        
        out['F'] = np.array(objs)
        out['G'] = np.array(constr)
    
    def decode(self, individual):
        """解码决策变量"""
        # 简化实现
        schedule = {}
        for i, course in enumerate(self.courses.itertuples()):
            room_idx = int(individual[i*2] * len(self.rooms))
            time_idx = int(individual[i*2+1] * len(self.time_slots))
            schedule[course.course_id] = (self.rooms.iloc[room_idx]['room_id'], 
                                        self.time_slots[time_idx])
        return schedule
    
    def calc_teacher_satisfaction(self, schedule):
        """计算教师满意度"""
        score = 0
        for course_id, (room_id, time_slot) in schedule.items():
            course = self.courses[self.courses['course_id'] == course_id].iloc[0]
            teacher = self.teachers[self.teachers['teacher_id'] == course.teacher_id].iloc[0]
            preferred = eval(teacher['preferred_times'])
            if time_slot in preferred:
                score += 1
        return score / len(schedule)
    
    def calc_room_utilization(self, schedule):
        """计算教室利用率"""
        used_slots = len(set((r, t) for r, t in schedule.values()))
        total_slots = len(self.rooms) * len(self.time_slots)
        return used_slots / total_slots
    
    def calc_student_convenience(self, schedule):
        """计算学生便利性(减少换教室次数)"""
        # 简化:假设学生上所有课,计算平均换教室次数
        return 1.0  # 需要实际学生选课数据
    
    def check_constraints(self, schedule):
        """检查约束违反"""
        violations = []
        
        # 约束1:教师时间冲突
        teacher_time = {}
        for course_id, (room_id, time_slot) in schedule.items():
            course = self.courses[self.courses['course_id'] == course_id].iloc[0]
            key = (course['teacher_id'], time_slot)
            if key in teacher_time:
                violations.append(1)  # 违反
            else:
                teacher_time[key] = course_id
        
        # 约束2:教室时间冲突
        room_time = {}
        for course_id, (room_id, time_slot) in schedule.items():
            key = (room_id, time_slot)
            if key in room_time:
                violations.append(1)
            else:
                room_time[key] = course_id
        
        # 填充到5个约束
        while len(violations) < 5:
            violations.append(0)
        
        return violations[:5]

# 使用NSGA-II算法求解
problem = MultiObjectiveScheduleProblem(courses, rooms, time_slots)
algorithm = NSGA2(
    pop_size=100,
    sampling=get_sampling("real_random"),
    crossover=get_crossover("real_sbx", prob=0.9, eta=15),
    mutation=get_mutation("real_pm", eta=20)
)

res = minimize(problem, algorithm, ('n_gen', 200), seed=1, verbose=True)

print("Pareto前沿解:")
for i, solution in enumerate(res.X):
    print(f"解 {i+1}: 目标值 {res.F[i]}")

5.2 机器学习增强

使用机器学习预测最优排课模式:

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier

class MLScheduleOptimizer:
    """机器学习排课优化器"""
    
    def __init__(self):
        self.models = {}
        self.encoders = {}
    
    def prepare_training_data(self, historical_schedules):
        """准备训练数据"""
        features = []
        labels = []
        
        for schedule in historical_schedules:
            for course in schedule['courses']:
                # 特征工程
                feature = {
                    'teacher_id': course['teacher_id'],
                    'room_id': course['room_id'],
                    'time_slot': course['time_slot'],
                    'course_type': course['course_type'],
                    'student_count': course['student_count'],
                    'teacher_satisfaction': schedule['teacher_satisfaction'],
                    'room_utilization': schedule['room_utilization']
                }
                features.append(feature)
                labels.append(schedule['quality_score'])
        
        df = pd.DataFrame(features)
        
        # 编码分类变量
        for col in ['teacher_id', 'room_id', 'time_slot', 'course_type']:
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col])
            self.encoders[col] = le
        
        return df, np.array(labels)
    
    def train_quality_model(self, historical_schedules):
        """训练质量预测模型"""
        X, y = self.prepare_training_data(historical_schedules)
        
        # 分割数据集
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # 训练XGBoost模型
        self.models['quality'] = XGBClassifier(
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            objective='binary:logistic'
        )
        
        self.models['quality'].fit(X_train, y_train)
        
        # 评估
        score = self.models['quality'].score(X_test, y_test)
        print(f"模型准确率: {score:.2f}")
    
    def predict_schedule_quality(self, new_schedule):
        """预测新排课方案的质量"""
        df = pd.DataFrame(new_schedule)
        
        # 编码
        for col in ['teacher_id', 'room_id', 'time_slot', 'course_type']:
            if col in df.columns:
                df[col] = self.encoders[col].transform(df[col])
        
        # 预测
        quality = self.models['quality'].predict_proba(df)
        return quality[:, 1]

# 使用示例
ml_optimizer = MLScheduleOptimizer()

# 模拟历史数据
historical_schedules = [
    {
        'courses': [
            {'teacher_id': 'T001', 'room_id': 'R001', 'time_slot': 'Monday_1', 'course_type': 'math', 'student_count': 30},
            {'teacher_id': 'T002', 'room_id': 'R002', 'time_slot': 'Monday_2', 'course_type': 'science', 'student_count': 25}
        ],
        'teacher_satisfaction': 0.8,
        'room_utilization': 0.9,
        'quality_score': 1  # 优质
    },
    # 更多历史数据...
]

ml_optimizer.train_quality_model(historical_schedules)

# 预测新方案
new_schedule = [
    {'teacher_id': 'T001', 'room_id': 'R001', 'time_slot': 'Monday_1', 'course_type': 'math', 'student_count': 32}
]
quality = ml_optimizer.predict_schedule_quality(new_schedule)
print(f"预测质量: {quality[0]:.2f}")

5.3 实时调整与冲突解决

5.3.1 冲突检测系统

class ConflictDetector:
    """实时冲突检测器"""
    
    def __init__(self, schedule):
        self.schedule = schedule
        self.conflict_log = []
    
    def detect_all_conflicts(self):
        """检测所有冲突"""
        conflicts = {
            'teacher_conflicts': self.detect_teacher_conflicts(),
            'room_conflicts': self.detect_room_conflicts(),
            'constraint_violations': self.detect_constraint_violations(),
            'capacity_issues': self.detect_capacity_issues()
        }
        return conflicts
    
    def detect_teacher_conflicts(self):
        """检测教师时间冲突"""
        conflicts = []
        teacher_schedule = {}
        
        for course in self.schedule:
            key = (course['teacher_id'], course['time_slot'])
            if key in teacher_schedule:
                conflicts.append({
                    'type': 'teacher_conflict',
                    'teacher_id': course['teacher_id'],
                    'time_slot': course['time_slot'],
                    'courses': [teacher_schedule[key], course['course_id']]
                })
            else:
                teacher_schedule[key] = course['course_id']
        
        return conflicts
    
    def detect_room_conflicts(self):
        """检测教室时间冲突"""
        conflicts = []
        room_schedule = {}
        
        for course in self.schedule:
            key = (course['room_id'], course['time_slot'])
            if key in room_schedule:
                conflicts.append({
                    'type': 'room_conflict',
                    'room_id': course['room_id'],
                    'time_slot': course['time_slot'],
                    'courses': [room_schedule[key], course['course_id']]
                })
            else:
                room_schedule[key] = course['course_id']
        
        return conflicts
    
    def detect_constraint_violations(self):
        """检测约束违反"""
        violations = []
        
        for course in self.schedule:
            # 检查实验室要求
            if course.get('lab_required') and course['room_id'] != 'R002':
                violations.append({
                    'type': 'constraint_violation',
                    'course_id': course['course_id'],
                    'violation': 'Lab requirement not met'
                })
            
            # 检查教师偏好
            teacher = self.get_teacher_info(course['teacher_id'])
            if teacher and course['time_slot'] not in teacher['preferred_times']:
                violations.append({
                    'type': 'preference_violation',
                    'course_id': course['course_id'],
                    'teacher_id': course['teacher_id']
                })
        
        return violations
    
    def detect_capacity_issues(self):
        """检测容量问题"""
        issues = []
        
        for course in self.schedule:
            room = self.get_room_info(course['room_id'])
            if room and course['student_count'] > room['capacity']:
                issues.append({
                    'type': 'capacity_issue',
                    'course_id': course['course_id'],
                    'room_id': course['room_id'],
                    'student_count': course['student_count'],
                    'room_capacity': room['capacity']
                })
        
        return issues
    
    def get_teacher_info(self, teacher_id):
        """获取教师信息(模拟)"""
        # 实际应从数据库查询
        return {'preferred_times': ['Monday_1', 'Monday_2']}
    
    def get_room_info(self, room_id):
        """获取教室信息(模拟)"""
        room_data = {
            'R001': {'capacity': 60, 'type': '普通教室'},
            'R002': {'capacity': 30, 'type': '实验室'}
        }
        return room_data.get(room_id)

# 使用示例
schedule = [
    {'course_id': 'C001', 'teacher_id': 'T001', 'room_id': 'R001', 'time_slot': 'Monday_1', 'student_count': 30, 'lab_required': False},
    {'course_id': 'C002', 'teacher_id': 'T001', 'room_id': 'R002', 'time_slot': 'Monday_1', 'student_count': 25, 'lab_required': True},  # 冲突:教师时间
    {'course_id': 'C003', 'teacher_id': 'T002', 'room_id': 'R001', 'time_slot': 'Monday_1', 'student_count': 70, 'lab_required': False},  # 冲突:教室时间+容量
]

detector = ConflictDetector(schedule)
conflicts = detector.detect_all_conflicts()

print("检测到的冲突:")
for conflict_type, conflict_list in conflicts.items():
    if conflict_list:
        print(f"\n{conflict_type}:")
        for conflict in conflict_list:
            print(f"  {conflict}")

5.3.2 自动修复机制

class ScheduleAutoFixer:
    """自动修复排课冲突"""
    
    def __init__(self, schedule, rooms, time_slots):
        self.schedule = schedule
        self.rooms = rooms
        self.time_slots = time_slots
        self.original_schedule = copy.deepcopy(schedule)
    
    def fix_conflicts(self, conflicts):
        """修复冲突"""
        fixed_schedule = copy.deepcopy(self.schedule)
        
        for conflict_type, conflict_list in conflicts.items():
            for conflict in conflict_list:
                if conflict_type == 'teacher_conflicts':
                    fixed_schedule = self.fix_teacher_conflict(fixed_schedule, conflict)
                elif conflict_type == 'room_conflicts':
                    fixed_schedule = self.fix_room_conflict(fixed_schedule, conflict)
                elif conflict_type == 'capacity_issues':
                    fixed_schedule = self.fix_capacity_issue(fixed_schedule, conflict)
        
        return fixed_schedule
    
    def fix_teacher_conflict(self, schedule, conflict):
        """修复教师时间冲突"""
        # 策略:将第二门课移到其他可用时间
        course_to_move = conflict['courses'][1]
        teacher_id = conflict['teacher_id']
        time_slot = conflict['time_slot']
        
        # 找到该教师的其他课程
        teacher_courses = [c for c in schedule if c['teacher_id'] == teacher_id]
        
        # 找到可用时间
        available_slots = self.find_available_slots_for_teacher(teacher_id, schedule)
        
        if available_slots:
            # 移动课程
            for course in schedule:
                if course['course_id'] == course_to_move:
                    course['time_slot'] = available_slots[0]
                    print(f"修复:将课程 {course_to_move} 移至 {available_slots[0]}")
                    break
        
        return schedule
    
    def fix_room_conflict(self, schedule, conflict):
        """修复教室冲突"""
        # 策略:寻找其他可用教室
        course_to_move = conflict['courses'][1]
        time_slot = conflict['time_slot']
        
        available_rooms = self.find_available_rooms(time_slot, schedule)
        
        if available_rooms:
            for course in schedule:
                if course['course_id'] == course_to_move:
                    course['room_id'] = available_rooms[0]
                    print(f"修复:将课程 {course_to_move} 移至教室 {available_rooms[0]}")
                    break
        
        return schedule
    
    def fix_capacity_issue(self, schedule, conflict):
        """修复容量问题"""
        # 策略:寻找更大容量的教室
        course_id = conflict['course_id']
        required_capacity = conflict['student_count']
        
        available_rooms = self.find_rooms_with_capacity(required_capacity, conflict['time_slot'], schedule)
        
        if available_rooms:
            for course in schedule:
                if course['course_id'] == course_id:
                    course['room_id'] = available_rooms[0]
                    print(f"修复:将课程 {course_id} 移至更大教室 {available_rooms[0]}")
                    break
        
        return schedule
    
    def find_available_slots_for_teacher(self, teacher_id, schedule):
        """为教师找到可用时间段"""
        used_slots = set()
        for course in schedule:
            if course['teacher_id'] == teacher_id:
                used_slots.add(course['time_slot'])
        
        available = [slot for slot in self.time_slots if slot not in used_slots]
        return available
    
    def find_available_rooms(self, time_slot, schedule):
        """找到指定时间段的可用教室"""
        used_rooms = set()
        for course in schedule:
            if course['time_slot'] == time_slot:
                used_rooms.add(course['room_id'])
        
        available = [room for room in self.rooms if room not in used_rooms]
        return available
    
    def find_rooms_with_capacity(self, capacity, time_slot, schedule):
        """找到满足容量要求的可用教室"""
        available_rooms = self.find_available_rooms(time_slot, schedule)
        # 模拟教室容量数据
        room_capacities = {'R001': 60, 'R002': 30, 'R003': 80, 'R004': 40}
        
        suitable = [room for room in available_rooms if room_capacities.get(room, 0) >= capacity]
        return suitable

# 使用示例
auto_fixer = ScheduleAutoFixer(schedule, ['R001', 'R002', 'R003', 'R004'], time_slots)
conflicts = detector.detect_all_conflicts()
fixed_schedule = auto_fixer.fix_conflicts(conflicts)

print("\n修复后的排课:")
for course in fixed_schedule:
    print(f"课程 {course['course_id']}: 教室 {course['room_id']}, 时间 {course['time_slot']}")

5.4 性能优化技巧

5.4.1 并行计算加速

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed

class ParallelScheduler:
    """并行排课器"""
    
    def __init__(self, courses, rooms, constraints, time_slots, n_workers=4):
        self.courses = courses
        self.rooms = rooms
        self.constraints = constraints
        self.time_slots = time_slots
        self.n_workers = n_workers
    
    def run_parallel(self, population_size=100, generations=100):
        """并行运行遗传算法"""
        # 初始化种群
        initial_population = self.initialize_population(population_size)
        
        best_solution = None
        best_fitness = -float('inf')
        
        for generation in range(generations):
            # 并行评估适应度
            with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
                futures = {executor.submit(self.evaluate_fitness, chrom): chrom 
                          for chrom in initial_population}
                
                for future in as_completed(futures):
                    chrom = futures[future]
                    chrom.fitness = future.result()
            
            # 选择和繁殖
            initial_population.sort(key=lambda x: x.fitness, reverse=True)
            
            if initial_population[0].fitness > best_fitness:
                best_fitness = initial_population[0].fitness
                best_solution = copy.deepcopy(initial_population[0])
            
            # 生成新一代
            new_population = [best_solution]
            while len(new_population) < population_size:
                parent1, parent2 = self.select_parents(initial_population)
                child = self.crossover(parent1, parent2)
                child.mutate(0.1)
                new_population.append(child)
            
            initial_population = new_population
            
            if generation % 10 == 0:
                print(f"Generation {generation}: Best Fitness = {best_fitness}")
        
        return best_solution
    
    def evaluate_fitness(self, chrom):
        """评估适应度(用于并行)"""
        # 简化版本
        return chrom.calculate_fitness(self.constraints)
    
    def initialize_population(self, size):
        """初始化种群"""
        return [ScheduleChromosome(self.courses, self.rooms, self.time_slots) for _ in range(size)]
    
    def select_parents(self, population):
        """选择父代"""
        tournament = random.sample(population, 5)
        tournament.sort(key=lambda x: x.fitness, reverse=True)
        return tournament[0], tournament[1]
    
    def crossover(self, parent1, parent2):
        """交叉操作"""
        child = ScheduleChromosome(self.courses, self.rooms, self.time_slots)
        child.genes = {}
        
        crossover_point = random.randint(0, len(parent1.genes) - 1)
        genes_list = list(parent1.genes.items())
        
        for i in range(crossover_point):
            course_id, assignment = genes_list[i]
            child.genes[course_id] = assignment
        
        for course_id, assignment in parent2.genes.items():
            if course_id not in child.genes:
                child.genes[course_id] = assignment
        
        return child

# 使用示例
parallel_scheduler = ParallelScheduler(courses, rooms, constraints, time_slots, n_workers=4)
best_schedule = parallel_scheduler.run_parallel(population_size=200, generations=100)
print(f"并行计算完成,最佳适应度: {best_schedule.fitness}")

5.4.2 缓存与增量更新

import hashlib
import pickle
import os

class ScheduleCache:
    """排课缓存系统"""
    
    def __init__(self, cache_dir='./schedule_cache'):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
    
    def get_cache_key(self, data):
        """生成缓存键"""
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()
    
    def save_to_cache(self, key, data):
        """保存到缓存"""
        cache_path = os.path.join(self.cache_dir, f"{key}.pkl")
        with open(cache_path, 'wb') as f:
            pickle.dump(data, f)
    
    def load_from_cache(self, key):
        """从缓存加载"""
        cache_path = os.path.join(self.cache_dir, f"{key}.pkl")
        if os.path.exists(cache_path):
            with open(cache_path, 'rb') as f:
                return pickle.load(f)
        return None
    
    def get_or_compute(self, data, compute_func):
        """获取缓存或计算"""
        key = self.get_cache_key(data)
        cached = self.load_from_cache(key)
        
        if cached is not None:
            print("使用缓存结果")
            return cached
        
        print("计算新结果并缓存")
        result = compute_func(data)
        self.save_to_cache(key, result)
        return result

# 使用示例
cache = ScheduleCache()

def expensive_schedule_computation(data):
    """模拟耗时计算"""
    import time
    time.sleep(5)  # 模拟耗时
    return {'result': 'computed', 'data': data}

# 第一次调用(计算)
data = {'teachers': 20, 'courses': 50}
result1 = cache.get_or_compute(data, expensive_schedule_computation)

# 第二次调用(缓存命中)
result2 = cache.get_or_compute(data, expensive_schedule_computation)
print(f"结果相同: {result1 == result2}")

6. 评估与优化

6.1 评估指标体系

class ScheduleEvaluator:
    """排课方案评估器"""
    
    def __init__(self, schedule, teachers, courses, rooms):
        self.schedule = schedule
        self.teachers = teachers
        self.courses = courses
        self.rooms = rooms
    
    def evaluate_all(self):
        """综合评估"""
        metrics = {
            'teacher_satisfaction': self.evaluate_teacher_satisfaction(),
            'room_utilization': self.evaluate_room_utilization(),
            'student_convenience': self.evaluate_student_convenience(),
            'constraint_violations': self.evaluate_constraint_violations(),
            'overall_score': 0
        }
        
        # 综合评分(加权平均)
        weights = {'teacher_satisfaction': 0.3, 'room_utilization': 0.3, 
                  'student_convenience': 0.2, 'constraint_violations': 0.2}
        
        overall = 0
        for metric, value in metrics.items():
            if metric != 'overall_score':
                overall += value * weights[metric]
        
        metrics['overall_score'] = overall
        return metrics
    
    def evaluate_teacher_satisfaction(self):
        """评估教师满意度(0-1)"""
        total_score = 0
        count = 0
        
        for course in self.schedule:
            teacher = self.teachers[self.teachers['teacher_id'] == course['teacher_id']].iloc[0]
            preferred = eval(teacher['preferred_times'])
            
            if course['time_slot'] in preferred:
                total_score += 1
            count += 1
        
        return total_score / count if count > 0 else 0
    
    def evaluate_room_utilization(self):
        """评估教室利用率(0-1)"""
        used_slots = set()
        for course in self.schedule:
            used_slots.add((course['room_id'], course['time_slot']))
        
        total_slots = len(self.rooms) * len(set(c['time_slot'] for c in self.schedule))
        return len(used_slots) / total_slots if total_slots > 0 else 0
    
    def evaluate_student_convenience(self):
        """评估学生便利性(0-1)"""
        # 简化:计算平均换教室次数
        student_schedule = {}
        for course in self.schedule:
            student_id = course.get('student_id', 'all')  # 模拟
            if student_id not in student_schedule:
                student_schedule[student_id] = []
            student_schedule[student_id].append(course)
        
        total_changes = 0
        total_lessons = 0
        
        for student, courses in student_schedule.items():
            courses.sort(key=lambda x: x['time_slot'])
            for i in range(1, len(courses)):
                if courses[i]['room_id'] != courses[i-1]['room_id']:
                    total_changes += 1
                total_lessons += 1
        
        # 转换为0-1分数(变化越少分数越高)
        if total_lessons == 0:
            return 0
        change_rate = total_changes / total_lessons
        return max(0, 1 - change_rate)
    
    def evaluate_constraint_violations(self):
        """评估约束违反程度(0-1,1表示完美)"""
        detector = ConflictDetector(self.schedule)
        conflicts = detector.detect_all_conflicts()
        
        total_violations = sum(len(v) for v in conflicts.values())
        max_possible_violations = len(self.schedule) * 3  # 估计最大可能
        
        if max_possible_violations == 0:
            return 1
        
        return max(0, 1 - (total_violations / max_possible_violations))

# 使用示例
evaluator = ScheduleEvaluator(fixed_schedule, teachers, courses, rooms)
metrics = evaluator.evaluate_all()

print("排课方案评估结果:")
for metric, value in metrics.items():
    print(f"  {metric}: {value:.3f}")

6.2 持续优化策略

class ScheduleOptimizer:
    """持续优化器"""
    
    def __init__(self, initial_schedule, teachers, courses, rooms, time_slots):
        self.current_schedule = initial_schedule
        self.teachers = teachers
        self.courses = courses
        self.rooms = rooms
        self.time_slots = time_slots
        self.history = []
    
    def iterative_optimization(self, max_iterations=100, improvement_threshold=0.01):
        """迭代优化"""
        evaluator = ScheduleEvaluator(self.current_schedule, self.teachers, self.courses, self.rooms)
        current_score = evaluator.evaluate_all()['overall_score']
        
        print(f"初始得分: {current_score:.3f}")
        
        for iteration in range(max_iterations):
            # 生成候选方案
            candidates = self.generate_candidates(self.current_schedule, n_candidates=10)
            
            # 评估候选
            best_candidate = None
            best_score = current_score
            
            for candidate in candidates:
                eval_candidate = ScheduleEvaluator(candidate, self.teachers, self.courses, self.rooms)
                score = eval_candidate.evaluate_all()['overall_score']
                
                if score > best_score:
                    best_score = score
                    best_candidate = candidate
            
            # 如果找到更好的方案
            if best_candidate and best_score > current_score + improvement_threshold:
                improvement = best_score - current_score
                print(f"迭代 {iteration+1}: 得分 {current_score:.3f} -> {best_score:.3f} (+{improvement:.3f})")
                
                self.current_schedule = best_candidate
                current_score = best_score
                
                self.history.append({
                    'iteration': iteration,
                    'score': current_score,
                    'improvement': improvement
                })
            else:
                print(f"迭代 {iteration+1}: 无显著改进,停止优化")
                break
        
        return self.current_schedule
    
    def generate_candidates(self, schedule, n_candidates=10):
        """生成候选方案"""
        candidates = []
        
        for _ in range(n_candidates):
            candidate = copy.deepcopy(schedule)
            
            # 随机调整几个课程
            n_changes = random.randint(1, 3)
            for _ in range(n_changes):
                if len(candidate) == 0:
                    break
                
                course_idx = random.randint(0, len(candidate)-1)
                course = candidate[course_idx]
                
                # 随机改变时间或教室
                if random.random() < 0.5:
                    course['time_slot'] = random.choice(self.time_slots)
                else:
                    course['room_id'] = random.choice(self.rooms['room_id'].tolist())
            
            candidates.append(candidate)
        
        return candidates

# 使用示例
optimizer = ScheduleOptimizer(fixed_schedule, teachers, courses, rooms, time_slots)
optimized_schedule = optimizer.iterative_optimization(max_iterations=50)

print("\n优化历史:")
for record in optimizer.history:
    print(f"迭代 {record['iteration']}: 得分 {record['score']:.3f}, 改进 {record['improvement']:.3f}")

7. 常见问题与解决方案

7.1 数据质量问题

问题:数据不完整、格式不一致 解决方案

def validate_and_clean_data(data):
    """数据验证与清洗"""
    # 检查必填字段
    required_fields = ['teacher_id', 'course_id', 'room_id', 'time_slot']
    for field in required_fields:
        if field not in data:
            raise ValueError(f"缺失必填字段: {field}")
    
    # 数据类型转换
    data['weekly_hours'] = int(data.get('weekly_hours', 0))
    data['lab_required'] = bool(data.get('lab_required', False))
    
    # 去除重复
    data = data.drop_duplicates(subset=['course_id'])
    
    # 填充缺失值
    data.fillna({
        'max_consecutive_lessons': 3,
        'required_room_type': '普通教室'
    }, inplace=True)
    
    return data

7.2 算法收敛问题

问题:算法陷入局部最优 解决方案

  • 增加种群多样性
  • 动态调整变异率
  • 使用多种算法混合
def adaptive_mutation_rate(generation, max_generations):
    """自适应变异率"""
    base_rate = 0.1
    # 随着迭代增加变异率,避免早熟收敛
    return base_rate + (generation / max_generations) * 0.1

7.3 实时性要求

问题:需要快速响应动态变化 解决方案

  • 增量更新而非全量重算
  • 使用缓存
  • 并行计算
def incremental_update(current_schedule, change_event):
    """增量更新排课"""
    # 只重新计算受影响的部分
    affected_courses = get_affected_courses(change_event)
    
    # 保留未受影响的部分
    new_schedule = [c for c in current_schedule if c['course_id'] not in affected_courses]
    
    # 重新排受影响的课程
    new_schedule += reschedule_courses(affected_courses, current_schedule)
    
    return new_schedule

8. 未来发展趋势

8.1 人工智能深度融合

趋势

  • 深度学习:使用神经网络学习复杂的排课模式
  • 强化学习:通过试错学习最优排课策略
  • 图神经网络:处理课程、教师、教室之间的复杂关系
import torch
import torch.nn as nn

class ScheduleNN(nn.Module):
    """神经网络排课模型"""
    
    def __init__(self, n_teachers, n_courses, n_rooms, n_time_slots):
        super().__init__()
        
        # 嵌入层
        self.teacher_embed = nn.Embedding(n_teachers, 32)
        self.course_embed = nn.Embedding(n_courses, 64)
        self.room_embed = nn.Embedding(n_rooms, 16)
        self.time_embed = nn.Embedding(n_time_slots, 16)
        
        # 神经网络
        self.fc = nn.Sequential(
            nn.Linear(32+64+16+16, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )
    
    def forward(self, teacher_id, course_id, room_id, time_slot):
        # 嵌入
        t_emb = self.teacher_embed(teacher_id)
        c_emb = self.course_embed(course_id)
        r_emb = self.room_embed(room_id)
        time_emb = self.time_embed(time_slot)
        
        # 拼接
        x = torch.cat([t_emb, c_emb, r_emb, time_emb], dim=1)
        
        # 预测
        score = self.fc(x)
        return score

# 使用示例
model = ScheduleNN(n_teachers=20, n_courses=50, n_rooms=10, n_time_slots=20)
teacher = torch.tensor([0])
course = torch.tensor([1])
room = torch.tensor([2])
time = torch.tensor([3])

score = model(teacher, course, room, time)
print(f"预测得分: {score.item():.3f}")

8.2 云端协作与移动端

趋势

  • 云端存储:实时同步排课数据
  • 移动端管理:教师和学生通过APP查看和调整
  • 协同编辑:多用户同时编辑排课

8.3 区块链与数据安全

趋势

  • 数据不可篡改:使用区块链记录排课历史
  • 智能合约:自动执行排课规则
  • 隐私保护:保护教师和学生隐私

9. 总结

排期预测技术正在彻底改变课堂教学排课的方式。通过数学建模、算法优化和机器学习,我们能够:

  1. 大幅提升效率:将数周的排课工作缩短到几小时
  2. 提高质量:生成更优的排课方案,减少冲突
  3. 增强灵活性:实时响应动态变化
  4. 降低成本:优化资源利用,减少浪费

关键成功因素

  • 数据质量:高质量的数据是基础
  • 算法选择:根据问题规模选择合适的算法
  • 约束管理:合理设置硬约束和软约束
  • 持续优化:建立评估反馈机制

实施建议

  1. 从小规模试点开始,逐步扩展
  2. 重视数据收集和清洗
  3. 培训相关人员,建立使用习惯
  4. 持续监控和优化系统性能

排期预测不仅是一项技术,更是一种管理理念的革新。它将排课从经验驱动转变为数据驱动,从静态安排转变为动态优化,为教育机构带来真正的价值。


本指南提供了从理论到实践的全面指导,涵盖了排课问题的数学基础、算法实现、系统集成、实际案例和未来趋势。希望这些内容能帮助您成功实施排期预测系统,解决课堂教学排课难题。