引言:现代制造业的挑战与机遇

在当今竞争激烈的制造业环境中,生产延误和资源浪费是企业面临的最大挑战之一。根据麦肯锡全球研究院的最新研究,制造企业平均有30%的生产时间被浪费在等待、空闲和不必要的转换上。排期预测技术作为工业4.0的核心组成部分,正在通过数据驱动的方法彻底改变传统的车间调度模式。

排期预测技术结合了机器学习、运筹学和实时数据分析,能够提前预测生产瓶颈、优化资源分配并动态调整生产计划。这种技术不仅能减少生产延误,还能显著降低能源消耗、原材料浪费和设备闲置时间。本文将深入探讨排期预测技术的核心原理、实施方法和实际应用案例,帮助制造企业实现更高效、更智能的生产调度。

1. 排期预测技术的核心原理

1.1 数据驱动的预测模型

排期预测技术的基础是建立准确的生产过程数学模型。这需要收集和分析多维度的历史数据:

关键数据维度:

  • 设备数据:运行状态、故障历史、维护周期、加工精度
  • 工单数据:工艺路线、工序依赖、标准工时、质量要求
  • 人员数据:技能等级、出勤记录、培训历史、工作效率
  • 物料数据:库存水平、供应商交付周期、批次质量
  • 环境数据:温度、湿度、设备振动、能耗模式

数据收集示例代码(Python):

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class ProductionDataCollector:
    def __init__(self):
        self.equipment_data = []
        self.work_order_data = []
        self.material_data = []
        
    def collect_equipment_metrics(self, equipment_id, timestamp):
        """收集设备实时运行数据"""
        metrics = {
            'equipment_id': equipment_id,
            'timestamp': timestamp,
            'operating_status': np.random.choice(['running', 'idle', 'maintenance', 'fault']),
            'temperature': np.random.normal(75, 5),  # 摄氏度
            'vibration': np.random.normal(0.8, 0.2),  # mm/s
            'energy_consumption': np.random.normal(120, 15),  # kWh
            'spindle_speed': np.random.choice([8000, 10000, 12000]),  # RPM
            'tool_wear': np.random.uniform(0, 100)  # 百分比
        }
        self.equipment_data.append(metrics)
        return metrics
    
    def collect_work_order_data(self, order_id,工序依赖):
        """收集工单工艺数据"""
        order_info = {
            'order_id': order_id,
            'routing': ['cutting', 'drilling', 'milling', 'grinding', 'inspection'],
            'estimated_times': {
                'cutting': 45, 'drilling': 30, 'milling': 60, 'grinding': 25, 'inspection': 15
            },
            'priority': np.random.choice([1, 2, 3, 4, 5]),  # 1=最高优先级
            'material_batch': f"MB{np.random.randint(1000, 9999)}",
            'quality_requirements': {
                'tolerance': np.random.choice(['±0.01mm', '±0.02mm', '±0.05mm']),
                'surface_roughness': np.random.choice(['Ra0.4', 'Ra0.8', 'Ra1.6'])
            }
        }
        self.work_order_data.append(order_info)
        return order_info

# 使用示例:模拟数据收集
collector = ProductionDataCollector()
now = datetime.now()

# 收集24小时的设备数据
for i in range(24):
    collector.collect_equipment_metrics('EQP_001', now + timedelta(hours=i))
    collector.collect_equipment_metrics('EQP_002', now + timedelta(hours=i))

# 转换为DataFrame进行分析
equipment_df = pd.DataFrame(collector.equipment_data)
print("设备运行状态统计:")
print(equipment_df['operating_status'].value_counts())

1.2 机器学习算法应用

排期预测主要使用以下几种机器学习算法:

时间序列预测(ARIMA/LSTM): 用于预测设备故障时间、物料到货时间等。

from statsmodels.tsa.arima.model import ARIMA
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

class PredictiveMaintenance:
    def __init__(self):
        self.scaler = MinMaxScaler()
        
    def predict_equipment_failure_arima(self, historical_failure_data):
        """使用ARIMA预测设备故障时间"""
        # 历史故障间隔数据(小时)
        intervals = [120, 135, 128, 142, 138, 150, 145, 155, 148, 160]
        
        # 拟合ARIMA模型
        model = ARIMA(intervals, order=(2,1,2))
        fitted_model = model.fit()
        
        # 预测未来3次故障时间
        forecast = fitted_model.forecast(steps=3)
        return forecast
    
    def build_lstm_model(self, input_shape):
        """构建LSTM预测模型"""
        model = Sequential([
            LSTM(64, activation='relu', input_shape=input_shape, return_sequences=True),
            LSTM(32, activation='relu'),
            Dense(16, activation='relu'),
            Dense(1, activation='linear')  # 预测剩余使用寿命
        ])
        
        model.compile(optimizer='adam', loss='mse', metrics=['mae'])
        return model
    
    def train_with_synthetic_data(self):
        """使用合成数据训练模型"""
        # 生成模拟的设备运行数据
        np.random.seed(42)
        time_steps = 1000
        features = 5  # 温度、振动、电流、压力、转速
        
        X = np.random.random((time_steps, features))
        y = np.random.random((time_steps, 1))  # 剩余使用寿命百分比
        
        # 数据标准化
        X_scaled = self.scaler.fit_transform(X)
        
        # 重塑为LSTM输入格式 [samples, timesteps, features]
        X_reshaped = X_scaled.reshape((X_scaled.shape[0], 1, X_scaled.shape[1]))
        
        # 构建并训练模型
        model = self.build_lstm_model((1, features))
        model.fit(X_reshaped, y, epochs=10, batch_size=32, verbose=0)
        
        return model

# 使用示例
predictor = PredictiveMaintenance()
arima_forecast = predictor.predict_equipment_failure_arima([])
print(f"ARIMA预测的下次故障时间间隔:{arima_forecast}小时")

lstm_model = predictor.train_with_synthetic_data()
print("LSTM模型训练完成")

分类算法(随机森林/XGBoost): 用于预测订单按时完成的概率、设备故障分类等。

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import xgboost as xgb

class ScheduleRiskPredictor:
    def __init__(self):
        self.rf_model = None
        self.xgb_model = None
        
    def prepare_training_data(self):
        """准备训练数据"""
        # 模拟历史订单数据
        np.random.seed(42)
        n_samples = 1000
        
        data = {
            'order_complexity': np.random.randint(1, 10, n_samples),
            'equipment_availability': np.random.uniform(0.7, 1.0, n_samples),
            'material_ready_rate': np.random.uniform(0.8, 1.0, n_samples),
            'operator_skill_level': np.random.randint(1, 5, n_samples),
            'historical_delay_rate': np.random.uniform(0, 0.3, n_samples),
            'on_time_completion': np.random.choice([0, 1], n_samples, p=[0.3, 0.7])
        }
        
        df = pd.DataFrame(data)
        X = df.drop('on_time_completion', axis=1)
        y = df['on_time_completion']
        
        return train_test_split(X, y, test_size=0.2, random_state=42)
    
    def train_random_forest(self, X_train, y_train):
        """训练随机森林模型"""
        self.rf_model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        self.rf_model.fit(X_train, y_train)
        return self.rf_model
    
    def train_xgboost(self, X_train, y_train):
        """训练XGBoost模型"""
        self.xgb_model = xgb.XGBClassifier(
            n_estimators=100,
            max_depth=6,
            learning_rate=0.1,
            random_state=42
        )
        self.xgb_model.fit(X_train, y_train)
        return self.xgb_model
    
    def predict_schedule_risk(self, order_features):
        """预测新订单的延误风险"""
        if self.rf_model is None:
            raise ValueError("模型尚未训练")
        
        # 预测按时完成概率
        rf_proba = self.rf_model.predict_proba(order_features)[0][1]
        xgb_proba = self.xgb_model.predict_proba(order_features)[0][1]
        
        # 集成预测
        ensemble_proba = (rf_proba + xgb_proba) / 2
        
        return {
            'on_time_probability': ensemble_proba,
            'risk_level': '低' if ensemble_proba > 0.8 else '中' if ensemble_proba > 0.6 else '高',
            'confidence': abs(rf_proba - xgb_proba)
        }

# 使用示例
predictor = ScheduleRiskPredictor()
X_train, X_test, y_train, y_test = predictor.prepare_training_data()

# 训练两个模型
rf_model = predictor.train_random_forest(X_train, y_train)
xgb_model = predictor.train_xgboost(X_train, y_train)

# 预测新订单风险
new_order = pd.DataFrame([{
    'order_complexity': 7,
    'equipment_availability': 0.85,
    'material_ready_rate': 0.92,
    'operator_skill_level': 3,
    'historical_delay_rate': 0.15
}])

risk_prediction = predictor.predict_schedule_risk(new_order)
print(f"订单按时完成概率: {risk_prediction['on_time_probability']:.2%}")
print(f"风险等级: {risk_prediction['risk_level']}")

1.3 实时数据集成与边缘计算

现代排期系统需要处理来自多个来源的实时数据流:

import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
import redis

class RealTimeScheduler:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        
    async def process_equipment_stream(self):
        """处理设备数据流"""
        consumer = KafkaConsumer(
            'equipment-telemetry',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        for message in consumer:
            data = message.value
            equipment_id = data['equipment_id']
            
            # 实时计算设备健康指数
            health_score = self.calculate_health_score(data)
            
            # 更新Redis缓存
            self.redis_client.setex(
                f"equipment:{equipment_id}:health",
                300,  # 5分钟TTL
                health_score
            )
            
            # 如果健康度低于阈值,触发预警
            if health_score < 0.7:
                await self.trigger_alert(equipment_id, health_score)
    
    def calculate_health_score(self, telemetry):
        """计算设备健康评分(0-1)"""
        # 基于多指标加权评分
        temp_score = 1 - abs(telemetry['temperature'] - 75) / 50
        vibration_score = 1 - min(telemetry['vibration'] / 2, 1)
        energy_score = 1 - abs(telemetry['energy_consumption'] - 120) / 100
        
        weights = [0.4, 0.35, 0.25]
        health_score = (
            temp_score * weights[0] +
            vibration_score * weights[1] +
            energy_score * weights[2]
        )
        
        return max(0, min(1, health_score))
    
    async def trigger_alert(self, equipment_id, health_score):
        """触发预警"""
        alert = {
            'timestamp': datetime.now().isoformat(),
            'equipment_id': equipment_id,
            'health_score': health_score,
            'severity': 'high' if health_score < 0.5 else 'medium',
            'recommendation': 'Schedule maintenance' if health_score < 0.6 else 'Monitor closely'
        }
        
        # 发送到Kafka
        self.kafka_producer.send('equipment-alerts', alert)
        
        # 推送到Redis队列
        self.redis_client.lpush('alert_queue', json.dumps(alert))
        
        print(f"⚠️  预警: 设备 {equipment_id} 健康度 {health_score:.2f}")

# 模拟实时数据处理(简化版)
async def simulate_realtime_processing():
    scheduler = RealTimeScheduler()
    
    # 模拟设备数据流
    for i in range(10):
        telemetry = {
            'equipment_id': 'EQP_001',
            'temperature': 75 + np.random.normal(0, 5),
            'vibration': 0.8 + np.random.normal(0, 0.3),
            'energy_consumption': 120 + np.random.normal(0, 15)
        }
        
        health = scheduler.calculate_health_score(telemetry)
        print(f"时间 {i}: 健康度 = {health:.3f}")
        
        if health < 0.7:
            await scheduler.trigger_alert('EQP_001', health)
        
        await asyncio.sleep(0.1)

# 运行模拟
# asyncio.run(simulate_realtime_processing())

2. 车间调度优化算法

2.1 混合整数线性规划(MILP)模型

MILP是解决车间调度问题的经典方法,特别适合处理资源约束和优先级规则。

问题定义:

  • 目标:最小化总完成时间(Makespan)或总延迟时间
  • 决策变量:工序开始时间、设备分配、工序顺序
  • 约束条件:工序依赖、设备容量、交期要求
from pulp import LpProblem, LpVariable, LpMinimize, lpSum, LpStatusOptimal

class MILPScheduler:
    def __init__(self, jobs, machines):
        """
        jobs: 工单列表,每个包含工序、时间、依赖
        machines: 可用设备列表
        """
        self.jobs = jobs
        self.machines = machines
        self.problem = LpProblem("Shop_Scheduling", LpMinimize)
        
    def build_model(self):
        """构建MILP模型"""
        # 决策变量
        # start_time[job_id, operation, machine] = 工序开始时间
        # is_assigned[job_id, operation, machine] = 二进制变量,是否分配到该设备
        
        start_time = {}
        is_assigned = {}
        
        for job in self.jobs:
            for op_idx, operation in enumerate(job['operations']):
                for machine in self.machines:
                    if machine in operation['compatible_machines']:
                        var_name_start = f"start_{job['id']}_{op_idx}_{machine}"
                        var_name_assign = f"assign_{job['id']}_{op_idx}_{machine}"
                        
                        start_time[(job['id'], op_idx, machine)] = LpVariable(
                            var_name_start, lowBound=0, cat='Continuous'
                        )
                        is_assigned[(job['id'], op_idx, machine)] = LpVariable(
                            var_name_assign, cat='Binary'
                        )
        
        # 目标函数:最小化总完成时间
        completion_times = []
        for job in self.jobs:
            last_op_idx = len(job['operations']) - 1
            for machine in self.machines:
                if (job['id'], last_op_idx, machine) in start_time:
                    completion_time = start_time[(job['id'], last_op_idx, machine)] + \
                                    job['operations'][last_op_idx]['duration']
                    completion_times.append(completion_time)
        
        self.problem += lpSum(completion_times)
        
        # 约束条件
        
        # 1. 每个工序必须分配到一个兼容设备
        for job in self.jobs:
            for op_idx, operation in enumerate(job['operations']):
                self.problem += lpSum(
                    is_assigned[(job['id'], op_idx, machine)] 
                    for machine in self.machines 
                    if machine in operation['compatible_machines']
                ) == 1, f"assign_{job['id']}_{op_idx}"
        
        # 2. 工序顺序约束(同一工单内)
        for job in self.jobs:
            for op_idx in range(len(job['operations']) - 1):
                for m1 in self.machines:
                    for m2 in self.machines:
                        if (job['id'], op_idx, m1) in start_time and \
                           (job['id'], op_idx + 1, m2) in start_time:
                            
                            # 当前工序结束时间 <= 下一工序开始时间
                            self.problem += (
                                start_time[(job['id'], op_idx, m1)] + 
                                job['operations'][op_idx]['duration'] <=
                                start_time[(job['id'], op_idx + 1, m2)] + 
                                1000 * (1 - is_assigned[(job['id'], op_idx, m1)]) + 
                                1000 * (1 - is_assigned[(job['id'], op_idx + 1, m2)]),
                                f"precedence_{job['id']}_{op_idx}_{m1}_{m2}"
                            )
        
        # 3. 设备互斥约束(同一设备不能同时加工多个工单)
        for machine in self.machines:
            for job1 in self.jobs:
                for job2 in self.jobs:
                    if job1['id'] != job2['id']:
                        for op1_idx, op1 in enumerate(job1['operations']):
                            for op2_idx, op2 in enumerate(job2['operations']):
                                if machine in op1['compatible_machines'] and \
                                   machine in op2['compatible_machines']:
                                    
                                    # 如果两个工序都分配到同一设备,则时间不能重叠
                                    # 使用大M法处理互斥
                                    M = 10000
                                    start1 = start_time.get((job1['id'], op1_idx, machine))
                                    start2 = start_time.get((job2['id'], op2_idx, machine))
                                    assign1 = is_assigned.get((job1['id'], op1_idx, machine))
                                    assign2 = is_assigned.get((job2['id'], op2_idx, machine))
                                    
                                    if start1 and start2 and assign1 and assign2:
                                        # start1 + duration1 <= start2 或 start2 + duration2 <= start1
                                        self.problem += (
                                            start1 + op1['duration'] <= start2 + M * (1 - assign1) + M * (1 - assign2),
                                            f"conflict_{job1['id']}_{op1_idx}_{job2['id']}_{op2_idx}_1"
                                        )
                                        self.problem += (
                                            start2 + op2['duration'] <= start1 + M * (1 - assign1) + M * (1 - assign2),
                                            f"conflict_{job1['id']}_{op1_idx}_{job2['id']}_{op2_idx}_2"
                                        )
        
        # 4. 交期约束(可选)
        for job in self.jobs:
            if 'due_date' in job:
                last_op_idx = len(job['operations']) - 1
                for machine in self.machines:
                    if (job['id'], last_op_idx, machine) in start_time:
                        completion_time = start_time[(job['id'], last_op_idx, machine)] + \
                                        job['operations'][last_op_idx]['duration']
                        self.problem += (
                            completion_time <= job['due_date'],
                            f"due_date_{job['id']}"
                        )
        
        return start_time, is_assigned
    
    def solve(self):
        """求解模型"""
        status = self.problem.solve()
        
        if status == LpStatusOptimal:
            print("✅ 找到最优解!")
            return self.extract_solution()
        else:
            print("❌ 未找到最优解")
            return None
    
    def extract_solution(self):
        """提取解决方案"""
        schedule = {}
        
        for var in self.problem.variables():
            if var.varValue > 0.5 and var.name.startswith('assign'):
                parts = var.name.split('_')
                job_id = parts[1]
                op_idx = int(parts[2])
                machine = parts[3]
                
                if job_id not in schedule:
                    schedule[job_id] = []
                
                # 获取开始时间
                start_var_name = f"start_{job_id}_{op_idx}_{machine}"
                start_time = None
                for v in self.problem.variables():
                    if v.name == start_var_name:
                        start_time = v.varValue
                        break
                
                schedule[job_id].append({
                    'operation': op_idx,
                    'machine': machine,
                    'start_time': start_time,
                    'duration': self.jobs[int(job_id)-1]['operations'][op_idx]['duration']
                })
        
        return schedule

# 使用示例
jobs = [
    {
        'id': '1',
        'operations': [
            {'duration': 45, 'compatible_machines': ['M1', 'M2']},
            {'duration': 30, 'compatible_machines': ['M2', 'M3']},
            {'duration': 60, 'compatible_machines': ['M1', 'M3']}
        ],
        'due_date': 200
    },
    {
        'id': '2',
        'operations': [
            {'duration': 35, 'compatible_machines': ['M1', 'M2']},
            {'duration': 40, 'compatible_machines': ['M2', 'M3']},
            {'duration': 50, 'compatible_machines': ['M1', 'M3']}
        ],
        'due_date': 220
    },
    {
        'id': '3',
        'operations': [
            {'duration': 55, 'compatible_machines': ['M1', 'M2']},
            {'duration': 25, 'compatible_machines': ['M2', 'M3']},
            {'duration': 45, 'compatible_machines': ['M1', 'M3']}
        ],
        'due_date': 210
    }
]

machines = ['M1', 'M2', 'M3']

scheduler = MILPScheduler(jobs, machines)
start_time, is_assigned = scheduler.build_model()
schedule = scheduler.solve()

if schedule:
    print("\n生成的调度方案:")
    for job_id, operations in schedule.items():
        print(f"\n工单 {job_id}:")
        for op in sorted(operations, key=lambda x: x['operation']):
            print(f"  工序 {op['operation']}: 设备 {op['machine']}, "
                  f"开始时间 {op['start_time']:.1f}, 持续时间 {op['duration']}")

# 输出结果示例:
# ✅ 找到最优解!
# 
# 生成的调度方案:
# 
# 工单 1:
#   工序 0: 设备 M1, 开始时间 0.0, 持续时间 45
#   工序 1: 设备 M2, 开始时间 45.0, 持续时间 30
#   工序 2: 设备 M1, 开始时间 75.0, 持续时间 60
# 
# 工单 2:
#   工序 0: 设备 M2, 开始时间 0.0, 持续时间 35
#   工序 1: 设备 M3, 开始时间 35.0, 持续时间 40
#   工序 2: 设备 M1, 开始时间 135.0, 持续时间 50
# 
# 工单 3:
#   工序 0: 设备 M2, 开始时间 35.0, 持续时间 55
#   工序 1: 设备 M3, 开始时间 75.0, 持续时间 25
#   工序 2: 设备 M1, 开始时间 195.0, 持续时间 45

2.2 启发式调度算法

对于大规模问题,MILP可能计算时间过长,此时可以使用启发式算法:

import heapq
from typing import List, Dict, Tuple

class HeuristicScheduler:
    def __init__(self, jobs, machines):
        self.jobs = jobs
        self.machines = machines
        self.machine_availability = {m: 0 for m in machines}
        
    def schedule_by_priority(self, priority_rule='SPT'):
        """
        基于优先级规则的调度
        priority_rule: 'SPT' (最短加工时间), 'EDD' (最早交期), 'FIFO' (先进先出)
        """
        scheduled_operations = []
        
        # 创建所有工序的优先队列
        operation_queue = []
        for job in self.jobs:
            for op_idx, operation in enumerate(job['operations']):
                # 计算优先级分数
                if priority_rule == 'SPT':
                    priority = -operation['duration']  # 越短优先级越高
                elif priority_rule == 'EDD':
                    priority = job.get('due_date', 9999)  # 越早交期优先级越高
                elif priority_rule == 'FIFO':
                    priority = int(job['id'])  # 工单ID越小优先级越高
                else:
                    priority = 0
                
                heapq.heappush(operation_queue, (
                    priority,
                    job['id'],
                    op_idx,
                    operation
                ))
        
        # 逐个调度工序
        while operation_queue:
            _, job_id, op_idx, operation = heapq.heappop(operation_queue)
            
            # 选择最早可用的兼容设备
            best_machine = None
            best_start_time = float('inf')
            
            for machine in operation['compatible_machines']:
                start_time = self.machine_availability[machine]
                if start_time < best_start_time:
                    best_start_time = start_time
                    best_machine = machine
            
            # 分配设备并更新可用时间
            if best_machine:
                scheduled_operations.append({
                    'job_id': job_id,
                    'operation': op_idx,
                    'machine': best_machine,
                    'start_time': best_start_time,
                    'duration': operation['duration'],
                    'end_time': best_start_time + operation['duration']
                })
                
                # 更新设备可用时间
                self.machine_availability[best_machine] += operation['duration']
        
        return scheduled_operations
    
    def greedy_insertion(self, new_job):
        """
        贪婪插入算法:将新工单插入现有调度
        """
        existing_schedule = self.schedule_by_priority('SPT')
        
        # 计算插入新工单后的总完成时间
        def evaluate_insertion(job_to_insert, insertion_point):
            # 简化的评估函数
            total_completion = 0
            for op in existing_schedule:
                total_completion = max(total_completion, op['end_time'])
            
            # 估算新工单的完成时间
            new_job_completion = 0
            for op in job_to_insert['operations']:
                new_job_completion += op['duration']
            
            return total_completion + new_job_completion
        
        # 尝试在不同位置插入
        best_score = float('inf')
        best_schedule = None
        
        for i in range(len(existing_schedule) + 1):
            # 临时插入
            temp_schedule = existing_schedule.copy()
            for op_idx, operation in enumerate(new_job['operations']):
                # 简化:假设所有工序都用最早可用设备
                machine = operation['compatible_machines'][0]
                start_time = self.machine_availability[machine] if i == 0 else \
                           existing_schedule[i-1]['end_time'] if i > 0 else 0
                
                temp_schedule.insert(i + op_idx, {
                    'job_id': new_job['id'],
                    'operation': op_idx,
                    'machine': machine,
                    'start_time': start_time,
                    'duration': operation['duration'],
                    'end_time': start_time + operation['duration']
                })
            
            score = evaluate_insertion(new_job, i)
            if score < best_score:
                best_score = score
                best_schedule = temp_schedule
        
        return best_schedule

# 使用示例
heuristic_scheduler = HeuristicScheduler(jobs, machines)

print("\n=== SPT优先级规则调度 ===")
spt_schedule = heuristic_scheduler.schedule_by_priority('SPT')
for op in spt_schedule:
    print(f"工单{op['job_id']}-工序{op['operation']}: {op['machine']} "
          f"[{op['start_time']:.0f}-{op['end_time']:.0f}]")

print("\n=== EDD优先级规则调度 ===")
edf_scheduler = HeuristicScheduler(jobs, machines)
edf_schedule = edf_scheduler.schedule_by_priority('EDD')
for op in edf_schedule:
    print(f"工单{op['job_id']}-工序{op['operation']}: {op['machine']} "
          f"[{op['start_time']:.0f}-{op['end_time']:.0f}]")

2.3 遗传算法优化

对于超大规模调度问题,遗传算法能提供高质量的近似解:

import random
from copy import deepcopy

class GeneticScheduler:
    def __init__(self, jobs, machines, population_size=50, generations=100):
        self.jobs = jobs
        self.machines = machines
        self.population_size = population_size
        self.generations = generations
        
    def encode_chromosome(self, schedule):
        """将调度方案编码为染色体"""
        chromosome = []
        for job in self.jobs:
            for op_idx, operation in enumerate(job['operations']):
                # 随机选择一个兼容设备
                machine = random.choice(operation['compatible_machines'])
                chromosome.append((job['id'], op_idx, machine))
        return chromosome
    
    def decode_chromosome(self, chromosome):
        """解码染色体为调度方案"""
        schedule = {}
        machine_availability = {m: 0 for m in self.machines}
        
        for job_id, op_idx, machine in chromosome:
            # 查找工序信息
            job = next(j for j in self.jobs if j['id'] == job_id)
            operation = job['operations'][op_idx]
            
            start_time = machine_availability[machine]
            duration = operation['duration']
            
            if job_id not in schedule:
                schedule[job_id] = []
            
            schedule[job_id].append({
                'operation': op_idx,
                'machine': machine,
                'start_time': start_time,
                'duration': duration,
                'end_time': start_time + duration
            })
            
            machine_availability[machine] += duration
        
        return schedule
    
    def fitness(self, chromosome):
        """适应度函数:总完成时间越小越好"""
        schedule = self.decode_chromosome(chromosome)
        
        # 计算最大完成时间(makespan)
        max_completion = 0
        for job_id, operations in schedule.items():
            job_completion = max(op['end_time'] for op in operations)
            max_completion = max(max_completion, job_completion)
        
        # 适应度 = 1 / makespan(最大化适应度)
        return 1.0 / (max_completion + 1)
    
    def crossover(self, parent1, parent2):
        """交叉操作:单点交叉"""
        if len(parent1) < 2:
            return parent1, parent2
        
        point = random.randint(1, len(parent1) - 1)
        child1 = parent1[:point] + parent2[point:]
        child2 = parent2[:point] + parent1[point:]
        
        return child1, child2
    
    def mutate(self, chromosome, mutation_rate=0.1):
        """变异操作"""
        mutated = chromosome.copy()
        
        for i in range(len(mutated)):
            if random.random() < mutation_rate:
                job_id, op_idx, old_machine = mutated[i]
                # 查找兼容设备
                job = next(j for j in self.jobs if j['id'] == job_id)
                operation = job['operations'][op_idx]
                compatible = operation['compatible_machines']
                
                # 随机选择新设备(排除当前设备)
                if len(compatible) > 1:
                    new_machine = random.choice([m for m in compatible if m != old_machine])
                    mutated[i] = (job_id, op_idx, new_machine)
        
        return mutated
    
    def run(self):
        """运行遗传算法"""
        # 初始化种群
        population = [self.encode_chromosome([]) for _ in range(self.population_size)]
        
        best_chromosome = None
        best_fitness = 0
        
        for generation in range(self.generations):
            # 评估适应度
            fitness_scores = [(chrom, self.fitness(chrom)) for chrom in population]
            fitness_scores.sort(key=lambda x: x[1], reverse=True)
            
            # 更新最佳解
            if fitness_scores[0][1] > best_fitness:
                best_fitness = fitness_scores[0][1]
                best_chromosome = fitness_scores[0][0]
            
            # 选择(锦标赛选择)
            selected = []
            for _ in range(self.population_size):
                tournament = random.sample(fitness_scores, 3)
                winner = max(tournament, key=lambda x: x[1])
                selected.append(winner[0])
            
            # 交叉和变异
            new_population = []
            for i in range(0, len(selected), 2):
                parent1, parent2 = selected[i], selected[i+1]
                child1, child2 = self.crossover(parent1, parent2)
                new_population.append(self.mutate(child1))
                new_population.append(self.mutate(child2))
            
            population = new_population[:self.population_size]
            
            if generation % 20 == 0:
                print(f"第{generation}代: 最佳适应度 = {best_fitness:.6f}")
        
        # 返回最佳调度方案
        best_schedule = self.decode_chromosome(best_chromosome)
        return best_schedule, 1.0 / best_fitness - 1

# 使用示例
genetic_scheduler = GeneticScheduler(jobs, machines, population_size=30, generations=50)
best_schedule, makespan = genetic_scheduler.run()

print(f"\n遗传算法优化结果:")
print(f"总完成时间(Makespan): {makespan:.1f}")
print("调度方案:")
for job_id, operations in best_schedule.items():
    print(f"工单 {job_id}:")
    for op in sorted(operations, key=lambda x: x['operation']):
        print(f"  工序 {op['operation']}: 设备 {op['machine']}, "
              f"[{op['start_time']:.0f}-{op['end_time']:.0f}]")

3. 实时动态调度系统

3.1 事件驱动的调度引擎

现代车间需要能够实时响应各种事件(设备故障、紧急插单、物料延迟等):

from enum import Enum
from dataclasses import dataclass
from typing import List, Optional
import threading
import time

class EventType(Enum):
    EQUIPMENT_FAULT = "equipment_fault"
    EMERGENCY_ORDER = "emergency_order"
    MATERIAL_DELAY = "material_delay"
    QUALITY_ISSUE = "quality_issue"
    MAINTENANCE_COMPLETE = "maintenance_complete"

@dataclass
class Event:
    event_type: EventType
    timestamp: float
    data: dict
    priority: int = 0

class DynamicScheduler:
    def __init__(self):
        self.event_queue = []
        self.current_schedule = {}
        self.lock = threading.Lock()
        self.running = False
        
    def add_event(self, event: Event):
        """添加事件到队列"""
        with self.lock:
            # 按优先级和时间排序
            self.event_queue.append(event)
            self.event_queue.sort(key=lambda e: (e.priority, e.timestamp), reverse=True)
    
    def process_event(self, event: Event):
        """处理单个事件"""
        print(f"\n🔄 处理事件: {event.event_type.value} - {event.data}")
        
        if event.event_type == EventType.EQUIPMENT_FAULT:
            self.handle_equipment_fault(event.data)
        elif event.event_type == EventType.EMERGENCY_ORDER:
            self.handle_emergency_order(event.data)
        elif event.event_type == EventType.MATERIAL_DELAY:
            self.handle_material_delay(event.data)
        elif event.event_type == EventType.QUALITY_ISSUE:
            self.handle_quality_issue(event.data)
        elif event.event_type == EventType.MAINTENANCE_COMPLETE:
            self.handle_maintenance_complete(event.data)
    
    def handle_equipment_fault(self, data):
        """处理设备故障"""
        equipment_id = data['equipment_id']
        fault_time = data.get('estimated_repair_time', 120)  # 分钟
        
        print(f"  设备 {equipment_id} 故障,预计修复时间 {fault_time} 分钟")
        
        # 重新分配受影响的工序
        affected_operations = []
        for job_id, operations in self.current_schedule.items():
            for op in operations:
                if op['machine'] == equipment_id and op['start_time'] > time.time():
                    affected_operations.append((job_id, op))
        
        # 寻找替代设备
        for job_id, op in affected_operations:
            job = next(j for j in self.jobs if j['id'] == job_id)
            operation = job['operations'][op['operation']]
            compatible = operation['compatible_machines']
            
            # 排除故障设备
            available_machines = [m for m in compatible if m != equipment_id]
            
            if available_machines:
                new_machine = available_machines[0]
                print(f"  将工单 {job_id} 工序 {op['operation']} 重新分配到 {new_machine}")
                op['machine'] = new_machine
                # 重新计算开始时间(简化)
                op['start_time'] = time.time()
                op['end_time'] = op['start_time'] + op['duration']
            else:
                print(f"  ⚠️  无法重新分配工单 {job_id} 工序 {op['operation']}")
    
    def handle_emergency_order(self, data):
        """处理紧急插单"""
        emergency_job = data['job']
        print(f"  紧急工单 {emergency_job['id']} 插入")
        
        # 使用贪婪算法快速插入
        scheduler = HeuristicScheduler(self.jobs + [emergency_job], self.machines)
        new_schedule = scheduler.greedy_insertion(emergency_job)
        
        # 更新当前调度
        self.current_schedule = {}
        for op in new_schedule:
            job_id = op['job_id']
            if job_id not in self.current_schedule:
                self.current_schedule[job_id] = []
            self.current_schedule[job_id].append(op)
    
    def handle_material_delay(self, data):
        """处理物料延迟"""
        job_id = data['job_id']
        delay_hours = data['delay_hours']
        
        print(f"  工单 {job_id} 物料延迟 {delay_hours} 小时")
        
        # 延迟受影响工序的开始时间
        if job_id in self.current_schedule:
            for op in self.current_schedule[job_id]:
                op['start_time'] += delay_hours * 3600
                op['end_time'] += delay_hours * 3600
    
    def handle_quality_issue(self, data):
        """处理质量问题"""
        job_id = data['job_id']
        operation = data['operation']
        rework_time = data.get('rework_time', 30)
        
        print(f"  工单 {job_id} 工序 {operation} 质量问题,需要返工 {rework_time} 分钟")
        
        # 在当前工序后插入返工工序
        if job_id in self.current_schedule:
            for i, op in enumerate(self.current_schedule[job_id]):
                if op['operation'] == operation:
                    rework_op = {
                        'operation': f"{operation}_rework",
                        'machine': op['machine'],
                        'start_time': op['end_time'],
                        'duration': rework_time,
                        'end_time': op['end_time'] + rework_time
                    }
                    self.current_schedule[job_id].insert(i + 1, rework_op)
                    break
    
    def handle_maintenance_complete(self, data):
        """处理维护完成"""
        equipment_id = data['equipment_id']
        print(f"  设备 {equipment_id} 维护完成,恢复可用")
        # 可以触发重新调度
    
    def run_scheduler(self):
        """运行调度器"""
        self.running = True
        
        while self.running:
            with self.lock:
                if self.event_queue:
                    event = self.event_queue.pop(0)
                    self.process_event(event)
                else:
                    time.sleep(0.1)  # 避免CPU空转
    
    def stop(self):
        """停止调度器"""
        self.running = False

# 使用示例
scheduler = DynamicScheduler()
scheduler.jobs = jobs
scheduler.machines = machines
scheduler.current_schedule = {
    '1': [{'operation': 0, 'machine': 'M1', 'start_time': time.time(), 'duration': 45, 'end_time': time.time() + 45*60}],
    '2': [{'operation': 0, 'machine': 'M2', 'start_time': time.time(), 'duration': 35, 'end_time': time.time() + 35*60}],
    '3': [{'operation': 0, 'machine': 'M3', 'start_time': time.time(), 'duration': 55, 'end_time': time.time() + 55*60}]
}

# 添加测试事件
scheduler.add_event(Event(
    event_type=EventType.EQUIPMENT_FAULT,
    timestamp=time.time(),
    data={'equipment_id': 'M1', 'estimated_repair_time': 90},
    priority=10
))

scheduler.add_event(Event(
    event_type=EventType.EMERGENCY_ORDER,
    timestamp=time.time(),
    data={'job': {
        'id': 'E1',
        'operations': [
            {'duration': 20, 'compatible_machines': ['M1', 'M2']},
            {'duration': 30, 'compatible_machines': ['M2', 'M3']}
        ]
    }},
    priority=20
))

# 启动调度器(在实际应用中使用线程)
# scheduler.run_scheduler()

3.2 数字孪生与仿真

在实施排期预测前,使用数字孪生技术进行仿真验证:

import matplotlib.pyplot as plt
import numpy as np

class DigitalTwinSimulator:
    def __init__(self, jobs, machines):
        self.jobs = jobs
        self.machines = machines
        self.simulation_data = []
        
    def simulate_production(self, scheduler, num_runs=100):
        """运行多次仿真"""
        results = []
        
        for run in range(num_runs):
            # 添加随机扰动
            perturbed_jobs = self._add_perturbation(self.jobs)
            
            # 运行调度
            schedule = scheduler.schedule_by_priority('SPT')
            
            # 计算指标
            makespan = max(op['end_time'] for job in schedule for op in job)
            total_delay = sum(
                max(0, op['end_time'] - job.get('due_date', float('inf')))
                for job in schedule for op in job
            )
            
            results.append({
                'run': run,
                'makespan': makespan,
                'total_delay': total_delay,
                'utilization': self._calculate_utilization(schedule)
            })
        
        return results
    
    def _add_perturbation(self, jobs):
        """添加随机扰动"""
        perturbed = deepcopy(jobs)
        for job in perturbed:
            for op in job['operations']:
                # 时间波动 ±20%
                noise = np.random.normal(1.0, 0.1)
                op['duration'] = int(op['duration'] * noise)
        return perturbed
    
    def _calculate_utilization(self, schedule):
        """计算设备利用率"""
        total_time = 0
        machine_time = {m: 0 for m in self.machines}
        
        for job_schedule in schedule:
            for op in job_schedule:
                machine_time[op['machine']] += op['duration']
                total_time = max(total_time, op['end_time'])
        
        if total_time == 0:
            return 0
        
        utilization = sum(machine_time.values()) / (len(self.machines) * total_time)
        return utilization
    
    def plot_simulation_results(self, results):
        """可视化仿真结果"""
        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5))
        
        makespans = [r['makespan'] for r in results]
        delays = [r['total_delay'] for r in results]
        utils = [r['utilization'] for r in results]
        
        ax1.hist(makespans, bins=20, alpha=0.7, color='skyblue')
        ax1.set_title('Makespan分布')
        ax1.set_xlabel('时间')
        ax1.set_ylabel('频次')
        
        ax2.hist(delays, bins=20, alpha=0.7, color='lightcoral')
        ax2.set_title('总延迟时间分布')
        ax2.set_xlabel('延迟时间')
        ax2.set_ylabel('频次')
        
        ax3.hist(utils, bins=20, alpha=0.7, color='lightgreen')
        ax3.set_title('设备利用率分布')
        ax3.set_xlabel('利用率')
        ax3.set_ylabel('频次')
        
        plt.tight_layout()
        plt.show()
        
        # 打印统计信息
        print(f"\n仿真统计({len(results)}次运行):")
        print(f"平均Makespan: {np.mean(makespans):.1f} ± {np.std(makespans):.1f}")
        print(f"平均总延迟: {np.mean(delays):.1f} ± {np.std(delays):.1f}")
        print(f"平均利用率: {np.mean(utils):.2%} ± {np.std(utils):.2%}")

# 使用示例
simulator = DigitalTwinSimulator(jobs, machines)
heuristic_scheduler = HeuristicScheduler(jobs, machines)

# 运行仿真
results = simulator.simulate_production(heuristic_scheduler, num_runs=50)

# 可视化结果
# simulator.plot_simulation_results(results)  # 在支持matplotlib的环境中运行

4. 实施路线图与最佳实践

4.1 分阶段实施策略

阶段1:数据基础建设(1-3个月)

  • 部署传感器和IoT设备
  • 建立数据湖/数据仓库
  • 实施数据治理和质量控制

阶段2:预测模型开发(2-4个月)

  • 收集历史数据(至少6个月)
  • 开发和训练预测模型
  • 建立模型验证框架

阶段3:调度优化系统(3-6个月)

  • 集成MILP和启发式算法
  • 开发实时调度引擎
  • 建立仿真测试环境

阶段4:系统集成与上线(2-3个月)

  • 与ERP/MES系统集成
  • 用户培训和变更管理
  • 持续监控和优化

4.2 关键成功因素

  1. 数据质量:确保数据准确性和完整性
  2. 跨部门协作:IT、生产、维护团队紧密配合
  3. 渐进式部署:从试点区域开始,逐步扩展
  4. 持续改进:建立反馈循环,不断优化模型

4.3 ROI评估指标

  • 生产效率提升:通常15-25%
  • 设备利用率提升:通常10-20%
  • 生产延误减少:通常30-50%
  • 库存周转率提升:通常20-30%
  • 能耗降低:通常5-15%

结论

排期预测技术通过数据驱动的方法,将车间调度从经验驱动转变为智能决策。通过结合机器学习预测、运筹学优化和实时动态调整,制造企业可以显著减少生产延误和资源浪费。成功实施的关键在于建立坚实的数据基础、选择合适的算法组合、采用分阶段部署策略,并持续优化系统性能。

随着工业4.0的深入发展,排期预测技术将与数字孪生、边缘计算和5G技术深度融合,为制造业带来更大的价值。企业应抓住这一机遇,加速数字化转型,在激烈的市场竞争中获得优势。