引言:为什么精准掌握节目动态如此重要

在信息爆炸的时代,电视、流媒体平台和网络视频内容层出不穷,用户常常面临”选择困难症”——不知道该看什么,更担心错过真正精彩的节目。传统的节目单查询方式往往只能提供当前或近期的节目信息,无法满足用户对”未来节目动态”的前瞻性需求。精准的排期预测和节目排期表查询系统能够帮助用户:

  1. 提前规划观看时间:避免临时决定导致错过重要节目
  2. 发现潜在兴趣内容:通过预测推荐可能喜欢的节目
  3. 优化娱乐时间管理:合理安排观看计划,提升生活品质
  4. 避免信息过载:过滤无关信息,聚焦真正感兴趣的内容

一、节目排期预测的核心技术原理

1.1 数据收集与预处理

精准的排期预测首先需要全面、准确的数据基础。这包括:

历史节目数据

  • 节目类型(新闻、电视剧、综艺、体育等)
  • 播出时间段(黄金档、深夜档等)
  • 持续时间
  • 收视率/播放量数据
  • 用户评分和评论

用户行为数据

  • 观看历史
  • 偏好设置
  • 互动行为(点赞、收藏、分享)
  • 观看时长和完成度

外部数据

  • 节假日信息
  • 重大事件(体育赛事、颁奖典礼等)
  • 社交媒体热度
  • 行业趋势

1.2 预测模型构建

基于时间序列的预测

对于具有明显时间规律的节目(如每日新闻、周末综艺),可以使用时间序列分析:

import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
import warnings
warnings.filterwarnings('ignore')

class ProgramSchedulePredictor:
    def __init__(self):
        self.historical_data = None
        self.model = None
        
    def load_data(self, data_path):
        """加载历史节目数据"""
        self.historical_data = pd.read_csv(data_path)
        self.historical_data['date'] = pd.to_datetime(self.historical_data['date'])
        return self.historical_data
    
    def prepare_features(self, data):
        """准备训练特征"""
        # 提取时间特征
        data['day_of_week'] = data['date'].dt.dayofweek
        data['month'] = data['date'].dt.month
        data['is_holiday'] = data['date'].isin(self.get_holidays())
        
        # 节目类型编码
        data = pd.get_dummies(data, columns=['program_type'])
        
        return data
    
    def train_arima_model(self, data, program_name):
        """训练ARIMA模型进行时间序列预测"""
        # 过滤特定节目数据
        program_data = data[data['program_name'] == program_name]
        
        # 按日期排序
        program_data = program_data.sort_values('date')
        
        # 使用收视率作为预测目标
        viewership = program_data['viewership'].values
        
        # 拆分训练测试集
        train_size = int(len(viewership) * 0.8)
        train, test = viewership[:train_size], viewership[train_size:]
        
        # 拟合ARIMA模型
        model = ARIMA(train, order=(2,1,2))
        model_fit = model.fit()
        
        # 预测
        forecast = model_fit.forecast(steps=len(test))
        
        return forecast, test
    
    def train_ml_model(self, data):
        """训练机器学习模型进行多特征预测"""
        features = ['day_of_week', 'month', 'is_holiday', 'program_type_新闻', 
                   'program_type_电视剧', 'program_type_综艺', 'program_type_体育']
        target = 'viewership'
        
        X = data[features]
        y = data[target]
        
        # 训练随机森林模型
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X, y)
        
        return model
    
    def predict_future_schedule(self, model, future_dates, program_types):
        """预测未来节目排期"""
        predictions = []
        
        for date, p_type in zip(future_dates, program_types):
            # 构建特征向量
            features = {
                'day_of_week': date.dayofweek,
                'month': date.month,
                'is_holiday': date in self.get_holidays(),
                f'program_type_{p_type}': 1
            }
            
            # 填充其他节目类型为0
            for pt in ['新闻', '电视剧', '综艺', '体育']:
                if f'program_type_{pt}' not in features:
                    features[f'program_type_{pt}'] = 0
            
            # 预测
            feature_vector = [features[k] for k in ['day_of_week', 'month', 'is_holiday'] + 
                            [f'program_type_{pt}' for pt in ['新闻', '电视剧', '综艺', '体育']]]
            pred = model.predict([feature_vector])[0]
            predictions.append(pred)
        
        return predictions
    
    def get_holidays(self):
        """获取节假日数据(示例)"""
        return pd.to_datetime(['2024-01-01', '2024-02-10', '2024-05-01', 
                              '2024-10-01', '2024-12-25'])

# 使用示例
if __name__ == "__main__":
    # 初始化预测器
    predictor = ProgramSchedulePredictor()
    
    # 加载数据(示例数据)
    data = pd.DataFrame({
        'date': pd.date_range('2023-01-01', '2023-12-31', freq='D'),
        'program_name': ['晚间新闻'] * 365,
        'program_type': ['新闻'] * 365,
        'viewership': np.random.normal(1000, 200, 365) + 
                     np.sin(np.arange(365) * 2 * np.pi / 7) * 100  # 周期性变化
    })
    
    # 训练模型
    data_prepared = predictor.prepare_features(data)
    ml_model = predictor.train_ml_model(data_prepared)
    
    # 预测未来
    future_dates = pd.date_range('2024-01-01', '2024-01-07')
    program_types = ['新闻'] * 7
    predictions = predictor.predict_future_schedule(ml_model, future_dates, program_types)
    
    print("未来7天新闻节目收视率预测:")
    for date, pred in zip(future_dates, predictions):
        print(f"{date.strftime('%Y-%m-%d')}: {pred:.2f} 万观众")

基于协同过滤的推荐预测

对于用户个性化推荐,可以使用协同过滤算法:

import numpy as np
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors

class CollaborativeFilteringRecommender:
    def __init__(self, n_neighbors=20):
        self.n_neighbors = n_neighbors
        self.user_index = None
        self.program_index = None
        self.model = None
        
    def create_user_program_matrix(self, user_program_data):
        """创建用户-节目评分矩阵"""
        # user_program_data: DataFrame with columns ['user_id', 'program_id', 'rating']
        
        # 创建用户和节目的索引映射
        unique_users = user_program_data['user_id'].unique()
        unique_programs = user_program_data['program_id'].unique()
        
        self.user_index = {user: idx for idx, user in enumerate(unique_users)}
        self.program_index = {prog: idx for idx, prog in enumerate(unique_programs)}
        
        # 构建稀疏矩阵
        rows = user_program_data['user_id'].map(self.user_index)
        cols = user_program_data['program_id'].map(self.program_index)
        values = user_program_data['rating']
        
        matrix = csr_matrix((values, (rows, cols)), 
                           shape=(len(unique_users), len(unique_programs)))
        
        return matrix
    
    def fit(self, user_program_data):
        """训练KNN模型"""
        matrix = self.create_user_program_matrix(user_program_data)
        
        # 使用余弦相似度
        self.model = NearestNeighbors(n_neighbors=self.n_neighbors + 1, 
                                     metric='cosine', algorithm='brute')
        self.model.fit(matrix)
        
        return self
    
    def recommend_for_user(self, user_id, top_n=10):
        """为用户推荐节目"""
        if user_id not in self.user_index:
            return []
        
        user_idx = self.user_index[user_id]
        
        # 找到相似用户
        distances, indices = self.model.kneighbors(
            self.model._fit_X[user_idx].reshape(1, -1)
        )
        
        # 排除用户自己
        similar_users = indices[0][1:]
        
        # 获取相似用户的节目偏好
        recommendations = {}
        for sim_user_idx in similar_users:
            # 获取该相似用户评分高的节目
            user_ratings = self.model._fit_X[sim_user_idx].toarray()[0]
            high_rated_indices = np.where(user_ratings > 3.5)[0]
            
            for prog_idx in high_rated_indices:
                if prog_idx not in recommendations:
                    recommendations[prog_idx] = 0
                recommendations[prog_idx] += user_ratings[prog_idx]
        
        # 排序并返回节目ID
        sorted_programs = sorted(recommendations.items(), 
                               key=lambda x: x[1], reverse=True)[:top_n]
        
        # 将索引转换回节目ID
        reverse_program_index = {v: k for k, v in self.program_index.items()}
        return [reverse_program_index[prog_idx] for prog_idx, _ in sorted_programs]

# 使用示例
if __name__ == "__main__":
    # 示例数据:用户评分数据
    user_program_data = pd.DataFrame({
        'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4],
        'program_id': ['新闻A', '综艺B', '电视剧C', '新闻A', '体育D', '综艺B', 
                      '新闻A', '电视剧C', '体育D', '综艺B', '电视剧C', '体育D'],
        'rating': [5, 4, 3, 4, 5, 3, 5, 4, 4, 3, 4, 5]
    })
    
    # 训练推荐器
    recommender = CollaborativeFilteringRecommender(n_neighbors=3)
    recommender.fit(user_program_data)
    
    # 为用户1推荐
    recommendations = recommender.recommend_for_user(user_id=1, top_n=5)
    print(f"用户1的推荐节目:{recommendations}")

1.3 混合预测模型

结合多种预测方法的优势:

class HybridPredictor:
    def __init__(self):
        self.time_series_model = None
        self.ml_model = None
        self.cf_model = None
        
    def ensemble_predict(self, program_id, user_id, future_date):
        """集成多种预测方法"""
        # 1. 时间序列预测(基于历史规律)
        ts_pred = self.time_series_model.predict(program_id, future_date)
        
        # 2. 机器学习预测(基于多特征)
        ml_pred = self.ml_model.predict(program_id, future_date)
        
        # 3. 协同过滤预测(基于用户相似性)
        cf_pred = self.cf_model.predict(program_id, user_id)
        
        # 加权集成
        weights = [0.4, 0.4, 0.2]  # 可根据验证集调整
        final_pred = (ts_pred * weights[0] + 
                     ml_pred * weights[1] + 
                     cf_pred * weights[2])
        
        return {
            'final_score': final_pred,
            'components': {
                'time_series': ts_pred,
                'machine_learning': ml_pred,
                'collaborative_filtering': cf_pred
            }
        }

二、节目排期表查询系统设计

2.1 数据存储架构

数据库设计

-- 节目表
CREATE TABLE programs (
    program_id VARCHAR(50) PRIMARY KEY,
    title VARCHAR(255) NOT NULL,
    type ENUM('新闻', '电视剧', '综艺', '体育', '电影', '纪录片') NOT NULL,
    duration_minutes INT,
    description TEXT,
    cast TEXT,
    director VARCHAR(100),
    release_year INT,
    language VARCHAR(50),
    rating DECIMAL(3,1),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 播出计划表
CREATE TABLE schedule (
    schedule_id INT AUTO_INCREMENT PRIMARY KEY,
    program_id VARCHAR(50),
    channel_id VARCHAR(50),
    start_time DATETIME,
    end_time DATETIME,
    is_live BOOLEAN DEFAULT FALSE,
    episode_number INT,
    season_number INT,
    FOREIGN KEY (program_id) REFERENCES programs(program_id),
    INDEX idx_start_time (start_time),
    INDEX idx_channel_time (channel_id, start_time)
);

-- 用户偏好表
CREATE TABLE user_preferences (
    user_id VARCHAR(50),
    program_type VARCHAR(50),
    preferred_time_start TIME,
    preferred_time_end TIME,
    notify_before_minutes INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, program_type)
);

-- 用户观看记录表
CREATE TABLE user_watch_history (
    user_id VARCHAR(50),
    program_id VARCHAR(50),
    watch_time DATETIME,
    completion_rate DECIMAL(5,2),
    rating INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, program_id),
    INDEX idx_user_time (user_id, watch_time)
);

-- 预测结果表
CREATE TABLE program_predictions (
    prediction_id INT AUTO_INCREMENT PRIMARY KEY,
    program_id VARCHAR(50),
    prediction_date DATE,
    predicted_viewership INT,
    confidence_score DECIMAL(5,2),
    model_version VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_program_date (program_id, prediction_date)
);

2.2 后端API实现

Flask实现的查询接口

from flask import Flask, request, jsonify
from datetime import datetime, timedelta
import mysql.connector
import json
from typing import List, Dict

app = Flask(__name__)

class ScheduleQuerySystem:
    def __init__(self, db_config):
        self.db_config = db_config
    
    def get_connection(self):
        return mysql.connector.connect(**self.db_config)
    
    def query_schedule(self, start_time: datetime, end_time: datetime, 
                      channel_id: str = None, program_type: str = None) -> List[Dict]:
        """查询时间段内的节目排期"""
        conn = self.get_connection()
        cursor = conn.cursor(dictionary=True)
        
        query = """
        SELECT 
            s.schedule_id,
            p.program_id,
            p.title,
            p.type,
            p.description,
            s.start_time,
            s.end_time,
            s.is_live,
            s.episode_number,
            s.season_number,
            c.channel_name
        FROM schedule s
        JOIN programs p ON s.program_id = p.program_id
        JOIN channels c ON s.channel_id = c.channel_id
        WHERE s.start_time >= %s AND s.end_time <= %s
        """
        
        params = [start_time, end_time]
        
        if channel_id:
            query += " AND s.channel_id = %s"
            params.append(channel_id)
        
        if program_type:
            query += " AND p.type = %s"
            params.append(program_type)
        
        query += " ORDER BY s.start_time"
        
        cursor.execute(query, params)
        results = cursor.fetchall()
        
        cursor.close()
        conn.close()
        
        return results
    
    def get_user_recommendations(self, user_id: str, days: int = 7) -> List[Dict]:
        """获取用户个性化推荐"""
        conn = self.get_connection()
        cursor = conn.cursor(dictionary=True)
        
        # 获取用户偏好
        cursor.execute("""
            SELECT program_type, preferred_time_start, preferred_time_end
            FROM user_preferences
            WHERE user_id = %s
        """, (user_id,))
        
        preferences = cursor.fetchall()
        
        # 如果没有偏好,使用默认推荐
        if not preferences:
            preferences = [{'program_type': None, 'preferred_time_start': None, 'preferred_time_end': None}]
        
        recommendations = []
        
        for pref in preferences:
            # 查询未来几天的节目
            start_date = datetime.now()
            end_date = start_date + timedelta(days=days)
            
            query = """
                SELECT 
                    p.program_id,
                    p.title,
                    p.type,
                    s.start_time,
                    s.end_time,
                    COALESCE(pp.predicted_viewership, 0) as predicted_popularity,
                    COALESCE(pp.confidence_score, 0) as confidence
                FROM schedule s
                JOIN programs p ON s.program_id = p.program_id
                LEFT JOIN program_predictions pp ON p.program_id = pp.program_id 
                    AND pp.prediction_date = DATE(s.start_time)
                WHERE s.start_time >= %s AND s.start_time <= %s
            """
            
            params = [start_date, end_date]
            
            if pref['program_type']:
                query += " AND p.type = %s"
                params.append(pref['program_type'])
            
            if pref['preferred_time_start'] and pref['preferred_time_end']:
                query += " AND TIME(s.start_time) BETWEEN %s AND %s"
                params.extend([pref['preferred_time_start'], pref['preferred_time_end']])
            
            # 加入用户历史评分权重
            query += """
                ORDER BY 
                    (predicted_popularity * 0.6 + 
                     CASE WHEN EXISTS (
                        SELECT 1 FROM user_watch_history wh 
                        WHERE wh.user_id = %s AND wh.program_id = p.program_id
                     ) THEN 0.4 ELSE 0 END) DESC
                LIMIT 20
            """
            params.append(user_id)
            
            cursor.execute(query, params)
            results = cursor.fetchall()
            recommendations.extend(results)
        
        cursor.close()
        conn.close()
        
        return recommendations
    
    def get_upcoming_notifications(self, user_id: str, minutes_before: int = 15) -> List[Dict]:
        """获取即将开始的节目提醒"""
        conn = self.get_connection()
        cursor = conn.cursor(dictionary=True)
        
        now = datetime.now()
        future_time = now + timedelta(minutes=minutes_before)
        
        query = """
            SELECT 
                p.program_id,
                p.title,
                p.type,
                s.start_time,
                up.notify_before_minutes
            FROM schedule s
            JOIN programs p ON s.program_id = p.program_id
            JOIN user_preferences up ON p.type = up.program_type
            WHERE s.start_time BETWEEN %s AND %s
            AND up.user_id = %s
            AND s.start_time > %s
            ORDER BY s.start_time
        """
        
        cursor.execute(query, [now, future_time, user_id, now])
        results = cursor.fetchall()
        
        cursor.close()
        conn.close()
        
        return results

# Flask路由
@app.route('/api/schedule', methods=['GET'])
def get_schedule():
    """排期查询接口"""
    try:
        start_str = request.args.get('start_time')
        end_str = request.args.get('end_time')
        channel_id = request.args.get('channel_id')
        program_type = request.args.get('program_type')
        
        start_time = datetime.fromisoformat(start_str)
        end_time = datetime.fromisoformat(end_str)
        
        system = ScheduleQuerySystem({
            'host': 'localhost',
            'user': 'schedule_user',
            'password': 'password',
            'database': 'schedule_db'
        })
        
        results = system.query_schedule(start_time, end_time, channel_id, program_type)
        
        return jsonify({
            'status': 'success',
            'count': len(results),
            'data': results
        })
    
    except Exception as e:
        return jsonify({'status': 'error', 'message': str(e)}), 400

@app.route('/api/recommendations/<user_id>', methods=['GET'])
def get_recommendations(user_id):
    """推荐接口"""
    try:
        days = int(request.args.get('days', 7))
        
        system = ScheduleQuerySystem({
            'host': 'localhost',
            'user': 'schedule_user',
            'password': 'password',
            'database': 'schedule_db'
        })
        
        results = system.get_user_recommendations(user_id, days)
        
        return jsonify({
            'status': 'success',
            'count': len(results),
            'data': results
        })
    
    except Exception as e:
        return jsonify({'status': 'error', 'message': str(e)}), 400

@app.route('/api/notifications/<user_id>', methods=['GET'])
def get_notifications(user_id):
    """提醒接口"""
    try:
        minutes_before = int(request.args.get('minutes_before', 15))
        
        system = ScheduleQuerySystem({
            'host': 'localhost',
            'user': 'schedule_user',
            'password': 'password',
            'database': 'schedule_db'
        })
        
        results = system.get_upcoming_notifications(user_id, minutes_before)
        
        return jsonify({
            'status': 'success',
            'count': len(results),
            'data': results
        })
    
    except Exception as e:
        return jsonify({'status': 'error', 'message': str(e)}), 400

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

2.3 前端展示实现

React组件实现

import React, { useState, useEffect } from 'react';
import axios from 'axios';
import './ScheduleApp.css';

const ScheduleApp = () => {
    const [schedule, setSchedule] = useState([]);
    const [recommendations, setRecommendations] = useState([]);
    const [notifications, setNotifications] = useState([]);
    const [loading, setLoading] = useState(false);
    const [filters, setFilters] = useState({
        startDate: '',
        endDate: '',
        channel: '',
        type: ''
    });
    const [userId, setUserId] = useState('user123');

    // 获取排期数据
    const fetchSchedule = async () => {
        setLoading(true);
        try {
            const params = new URLSearchParams({
                start_time: filters.startDate,
                end_time: filters.endDate,
                ...(filters.channel && { channel_id: filters.channel }),
                ...(filters.type && { program_type: filters.type })
            });

            const response = await axios.get(`/api/schedule?${params}`);
            setSchedule(response.data.data);
        } catch (error) {
            console.error('Error fetching schedule:', error);
        } finally {
            setLoading(false);
        }
    };

    // 获取推荐
    const fetchRecommendations = async () => {
        try {
            const response = await axios.get(`/api/recommendations/${userId}?days=7`);
            setRecommendations(response.data.data);
        } catch (error) {
            console.error('Error fetching recommendations:', error);
        }
    };

    // 获取提醒
    const fetchNotifications = async () => {
        try {
            const response = await axios.get(`/api/notifications/${userId}?minutes_before=15`);
            setNotifications(response.data.data);
        } catch (error) {
            console.error('Error fetching notifications:', error);
        }
    };

    // 定时刷新提醒
    useEffect(() => {
        fetchNotifications();
        const interval = setInterval(fetchNotifications, 60000); // 每分钟检查一次
        return () => clearInterval(interval);
    }, [userId]);

    // 格式化时间
    const formatTime = (datetimeStr) => {
        const date = new Date(datetimeStr);
        return date.toLocaleString('zh-CN', {
            month: 'short',
            day: 'numeric',
            hour: '2-digit',
            minute: '2-digit'
        });
    };

    // 添加到日历
    const addToCalendar = (program) => {
        const event = {
            title: program.title,
            start: program.start_time,
            end: program.end_time,
            description: program.description
        };

        // 创建ICS文件
        const icsContent = `BEGIN:VCALENDAR
VERSION:2.0
BEGIN:VEVENT
SUMMARY:${event.title}
DTSTART:${new Date(event.start).toISOString().replace(/[-:]/g, '').split('.')[0]}Z
DTEND:${new Date(event.end).toISOString().replace(/[-:]/g, '').split('.')[0]}Z
DESCRIPTION:${event.description}
END:VEVENT
END:VCALENDAR`;

        const blob = new Blob([icsContent], { type: 'text/calendar' });
        const url = URL.createObjectURL(blob);
        const a = document.createElement('a');
        a.href = url;
        a.download = `${event.title}.ics`;
        a.click();
    };

    return (
        <div className="schedule-app">
            <header className="app-header">
                <h1>智能节目排期系统</h1>
                <div className="user-info">
                    <input 
                        type="text" 
                        value={userId} 
                        onChange={(e) => setUserId(e.target.value)}
                        placeholder="用户ID"
                    />
                </div>
            </header>

            {/* 提醒区域 */}
            {notifications.length > 0 && (
                <div className="notifications-section">
                    <h2>即将开始的节目</h2>
                    <div className="notification-list">
                        {notifications.map(notif => (
                            <div key={notif.program_id} className="notification-item">
                                <span className="program-type">{notif.type}</span>
                                <span className="program-title">{notif.title}</span>
                                <span className="start-time">
                                    {formatTime(notif.start_time)} 开始
                                </span>
                                <button onClick={() => addToCalendar(notif)}>
                                    添加到日历
                                </button>
                            </div>
                        ))}
                    </div>
                </div>
            )}

            {/* 筛选器 */}
            <div className="filter-section">
                <h2>查询条件</h2>
                <div className="filter-grid">
                    <input
                        type="datetime-local"
                        value={filters.startDate}
                        onChange={(e) => setFilters({...filters, startDate: e.target.value})}
                        placeholder="开始时间"
                    />
                    <input
                        type="datetime-local"
                        value={filters.endDate}
                        onChange={(e) => setFilters({...filters, endDate: e.target.value})}
                        placeholder="结束时间"
                    />
                    <select
                        value={filters.channel}
                        onChange={(e) => setFilters({...filters, channel: e.target.value})}
                    >
                        <option value="">所有频道</option>
                        <option value="CCTV1">CCTV-1</option>
                        <option value="CCTV2">CCTV-2</option>
                        <option value="HunanTV">湖南卫视</option>
                    </select>
                    <select
                        value={filters.type}
                        onChange={(e) => setFilters({...filters, type: e.target.value})}
                    >
                        <option value="">所有类型</option>
                        <option value="新闻">新闻</option>
                        <option value="电视剧">电视剧</option>
                        <option value="综艺">综艺</option>
                        <option value="体育">体育</option>
                    </select>
                    <button onClick={fetchSchedule} disabled={loading}>
                        {loading ? '查询中...' : '查询排期'}
                    </button>
                </div>
            </div>

            {/* 排期列表 */}
            <div className="schedule-section">
                <h2>节目排期表</h2>
                {schedule.length === 0 ? (
                    <p className="empty-message">暂无数据,请先查询</p>
                ) : (
                    <div className="schedule-grid">
                        {schedule.map(item => (
                            <div key={item.schedule_id} className="schedule-card">
                                <div className="card-header">
                                    <span className="type-badge">{item.type}</span>
                                    <span className="channel">{item.channel_name}</span>
                                </div>
                                <h3>{item.title}</h3>
                                <p className="time">
                                    {formatTime(item.start_time)} - {formatTime(item.end_time)}
                                </p>
                                {item.episode_number && (
                                    <p className="episode">第{item.episode_number}集</p>
                                )}
                                <p className="description">{item.description}</p>
                                <div className="card-actions">
                                    <button onClick={() => addToCalendar(item)}>
                                        添加到日历
                                    </button>
                                </div>
                            </div>
                        ))}
                    </div>
                )}
            </div>

            {/* 推荐区域 */}
            <div className="recommendations-section">
                <h2>为您推荐</h2>
                <button onClick={fetchRecommendations} className="refresh-btn">
                    刷新推荐
                </button>
                <div className="recommendation-list">
                    {recommendations.map(item => (
                        <div key={item.program_id} className="recommendation-item">
                            <div className="rec-info">
                                <span className="type">{item.type}</span>
                                <span className="title">{item.title}</span>
                                <span className="time">{formatTime(item.start_time)}</span>
                            </div>
                            <div className="rec-score">
                                <span>热度: {item.predicted_popularity}</span>
                                <span>置信: {item.confidence}%</span>
                            </div>
                        </div>
                    ))}
                </div>
            </div>
        </div>
    );
};

export default ScheduleApp;

CSS样式

/* ScheduleApp.css */
.schedule-app {
    max-width: 1200px;
    margin: 0 auto;
    padding: 20px;
    font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}

.app-header {
    background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
    color: white;
    padding: 20px;
    border-radius: 10px;
    margin-bottom: 20px;
    display: flex;
    justify-content: space-between;
    align-items: center;
}

.app-header h1 {
    margin: 0;
    font-size: 24px;
}

.user-info input {
    padding: 8px;
    border: none;
    border-radius: 5px;
    font-size: 14px;
}

.notifications-section {
    background: #fff3cd;
    border: 1px solid #ffeaa7;
    border-radius: 8px;
    padding: 15px;
    margin-bottom: 20px;
}

.notification-list {
    display: flex;
    flex-direction: column;
    gap: 10px;
}

.notification-item {
    display: flex;
    align-items: center;
    gap: 10px;
    padding: 10px;
    background: white;
    border-radius: 5px;
}

.program-type {
    background: #007bff;
    color: white;
    padding: 3px 8px;
    border-radius: 3px;
    font-size: 12px;
}

.program-title {
    font-weight: bold;
    flex: 1;
}

.start-time {
    color: #dc3545;
    font-weight: bold;
}

.filter-section {
    background: #f8f9fa;
    padding: 20px;
    border-radius: 8px;
    margin-bottom: 20px;
}

.filter-grid {
    display: grid;
    grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
    gap: 10px;
    margin-top: 10px;
}

.filter-grid input, .filter-grid select, .filter-grid button {
    padding: 10px;
    border: 1px solid #ddd;
    border-radius: 5px;
    font-size: 14px;
}

.filter-grid button {
    background: #007bff;
    color: white;
    border: none;
    cursor: pointer;
    font-weight: bold;
}

.filter-grid button:hover {
    background: #0056b3;
}

.filter-grid button:disabled {
    background: #6c757d;
    cursor: not-allowed;
}

.schedule-section, .recommendations-section {
    margin-bottom: 30px;
}

.schedule-section h2, .recommendations-section h2 {
    color: #333;
    border-bottom: 2px solid #007bff;
    padding-bottom: 8px;
    margin-bottom: 15px;
}

.empty-message {
    text-align: center;
    color: #6c757d;
    padding: 40px;
    font-style: italic;
}

.schedule-grid {
    display: grid;
    grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
    gap: 15px;
}

.schedule-card {
    background: white;
    border: 1px solid #e0e0e0;
    border-radius: 8px;
    padding: 15px;
    transition: transform 0.2s, box-shadow 0.2s;
}

.schedule-card:hover {
    transform: translateY(-2px);
    box-shadow: 0 4px 12px rgba(0,0,0,0.1);
}

.card-header {
    display: flex;
    justify-content: space-between;
    margin-bottom: 10px;
}

.type-badge {
    background: #28a745;
    color: white;
    padding: 3px 8px;
    border-radius: 3px;
    font-size: 12px;
}

.channel {
    color: #6c757d;
    font-size: 12px;
}

.schedule-card h3 {
    margin: 5px 0;
    color: #333;
    font-size: 16px;
}

.time {
    color: #007bff;
    font-weight: bold;
    margin: 5px 0;
}

.episode {
    color: #dc3545;
    font-size: 12px;
    margin: 3px 0;
}

.description {
    color: #666;
    font-size: 13px;
    line-height: 1.4;
    margin: 8px 0;
    display: -webkit-box;
    -webkit-line-clamp: 2;
    -webkit-box-orient: vertical;
    overflow: hidden;
}

.card-actions button {
    width: 100%;
    padding: 8px;
    background: #28a745;
    color: white;
    border: none;
    border-radius: 5px;
    cursor: pointer;
    font-weight: bold;
}

.card-actions button:hover {
    background: #218838;
}

.refresh-btn {
    background: #ffc107;
    color: #212529;
    border: none;
    padding: 8px 16px;
    border-radius: 5px;
    cursor: pointer;
    font-weight: bold;
    margin-bottom: 10px;
}

.refresh-btn:hover {
    background: #e0a800;
}

.recommendation-list {
    display: flex;
    flex-direction: column;
    gap: 8px;
}

.recommendation-item {
    display: flex;
    justify-content: space-between;
    align-items: center;
    padding: 12px;
    background: white;
    border-radius: 5px;
    border-left: 4px solid #007bff;
}

.rec-info {
    display: flex;
    gap: 10px;
    align-items: center;
    flex: 1;
}

.rec-info .type {
    background: #6c757d;
    color: white;
    padding: 3px 8px;
    border-radius: 3px;
    font-size: 12px;
}

.rec-info .title {
    font-weight: bold;
    flex: 1;
}

.rec-info .time {
    color: #666;
    font-size: 12px;
}

.rec-score {
    display: flex;
    gap: 15px;
    font-size: 12px;
    color: #666;
}

/* 响应式设计 */
@media (max-width: 768px) {
    .schedule-grid {
        grid-template-columns: 1fr;
    }
    
    .filter-grid {
        grid-template-columns: 1fr;
    }
    
    .app-header {
        flex-direction: column;
        gap: 10px;
        text-align: center;
    }
    
    .notification-item {
        flex-direction: column;
        align-items: flex-start;
        gap: 5px;
    }
}

三、精准掌握节目动态的实用策略

3.1 个性化设置策略

用户偏好配置

class UserPreferenceManager:
    def __init__(self, db_config):
        self.db_config = db_config
    
    def set_preference(self, user_id: str, program_type: str, 
                      preferred_time_start: str, preferred_time_end: str,
                      notify_before_minutes: int = 15):
        """设置用户偏好"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        
        query = """
        INSERT INTO user_preferences 
        (user_id, program_type, preferred_time_start, preferred_time_end, notify_before_minutes)
        VALUES (%s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            preferred_time_start = VALUES(preferred_time_start),
            preferred_time_end = VALUES(preferred_time_end),
            notify_before_minutes = VALUES(notify_before_minutes)
        """
        
        cursor.execute(query, (user_id, program_type, preferred_time_start, 
                              preferred_time_end, notify_before_minutes))
        conn.commit()
        
        cursor.close()
        conn.close()
    
    def get_preference(self, user_id: str):
        """获取用户偏好"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        cursor.execute("""
            SELECT program_type, preferred_time_start, preferred_time_end, notify_before_minutes
            FROM user_preferences
            WHERE user_id = %s
        """, (user_id,))
        
        results = cursor.fetchall()
        
        cursor.close()
        conn.close()
        
        return results
    
    def update_watch_history(self, user_id: str, program_id: str, 
                           completion_rate: float, rating: int = None):
        """更新观看历史"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        
        query = """
        INSERT INTO user_watch_history 
        (user_id, program_id, watch_time, completion_rate, rating)
        VALUES (%s, %s, NOW(), %s, %s)
        ON DUPLICATE KEY UPDATE
            watch_time = NOW(),
            completion_rate = VALUES(completion_rate),
            rating = VALUES(rating)
        """
        
        cursor.execute(query, (user_id, program_id, completion_rate, rating))
        conn.commit()
        
        cursor.close()
        conn.close()

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'user': 'schedule_user',
        'password': 'password',
        'database': 'schedule_db'
    }
    
    manager = UserPreferenceManager(db_config)
    
    # 设置偏好
    manager.set_preference(
        user_id='user123',
        program_type='新闻',
        preferred_time_start='19:00',
        preferred_time_end='20:00',
        notify_before_minutes=10
    )
    
    # 更新观看历史
    manager.update_watch_history(
        user_id='user123',
        program_id='program_001',
        completion_rate=95.5,
        rating=5
    )

3.2 智能提醒机制

推送通知系统

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import json

class NotificationSystem:
    def __init__(self, db_config, email_config=None, push_config=None):
        self.db_config = db_config
        self.email_config = email_config
        self.push_config = push_config
    
    def check_upcoming_programs(self):
        """检查即将开始的节目并发送提醒"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        # 查询未来15分钟内开始的节目
        query = """
        SELECT 
            up.user_id,
            p.program_id,
            p.title,
            p.type,
            s.start_time,
            up.notify_before_minutes
        FROM user_preferences up
        JOIN schedule s ON up.program_type = p.type
        JOIN programs p ON s.program_id = p.program_id
        WHERE s.start_time BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 15 MINUTE)
        AND s.start_time > NOW()
        AND NOT EXISTS (
            SELECT 1 FROM notification_sent ns
            WHERE ns.user_id = up.user_id 
            AND ns.program_id = p.program_id
            AND ns.sent_at > DATE_SUB(NOW(), INTERVAL 1 HOUR)
        )
        """
        
        cursor.execute(query)
        upcoming = cursor.fetchall()
        
        for program in upcoming:
            self.send_notification(program)
            self.record_notification_sent(program['user_id'], program['program_id'])
        
        cursor.close()
        conn.close()
    
    def send_notification(self, program):
        """发送通知"""
        user_id = program['user_id']
        
        # 发送邮件
        if self.email_config:
            self.send_email_notification(program)
        
        # 发送推送
        if self.push_config:
            self.send_push_notification(program)
        
        # 短信通知(可选)
        self.send_sms_notification(program)
    
    def send_email_notification(self, program):
        """发送邮件提醒"""
        if not self.email_config:
            return
        
        msg = MIMEMultipart()
        msg['From'] = self.email_config['sender']
        msg['To'] = program['user_id'] + '@example.com'  # 实际应从用户表获取
        msg['Subject'] = f"节目提醒:{program['title']} 即将开始"
        
        body = f"""
        您好!
        
        您关注的节目《{program['title']}》即将在 {program['start_time']} 开始。
        
        节目类型:{program['type']}
        提前15分钟提醒
        
        请不要错过精彩内容!
        """
        
        msg.attach(MIMEText(body, 'plain', 'utf-8'))
        
        try:
            server = smtplib.SMTP(self.email_config['host'], self.email_config['port'])
            server.starttls()
            server.login(self.email_config['sender'], self.email_config['password'])
            server.send_message(msg)
            server.quit()
            print(f"邮件已发送: {program['title']}")
        except Exception as e:
            print(f"邮件发送失败: {e}")
    
    def send_push_notification(self, program):
        """发送推送通知"""
        if not self.push_config:
            return
        
        # 推送服务API调用示例(如极光推送、个推等)
        payload = {
            "platform": ["ios", "android"],
            "audience": {
                "alias": [program['user_id']]
            },
            "notification": {
                "android": {
                    "alert": f"节目提醒:{program['title']} 即将开始",
                    "title": "节目提醒",
                    "builder_id": 1
                },
                "ios": {
                    "alert": f"节目提醒:{program['title']} 即将开始",
                    "sound": "default",
                    "badge": 1
                }
            },
            "options": {
                "time_to_live": 3600,
                "apns_production": False
            }
        }
        
        try:
            response = requests.post(
                f"{self.push_config['api_url']}/push",
                headers={"Authorization": f"Basic {self.push_config['api_key']}"},
                json=payload
            )
            print(f"推送结果: {response.status_code}")
        except Exception as e:
            print(f"推送失败: {e}")
    
    def send_sms_notification(self, program):
        """发送短信提醒(示例)"""
        # 实际使用时需要接入短信服务提供商
        print(f"短信提醒: {program['title']} 将在15分钟后开始")
    
    def record_notification_sent(self, user_id: str, program_id: str):
        """记录已发送的通知"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        
        cursor.execute("""
        INSERT INTO notification_sent (user_id, program_id, sent_at)
        VALUES (%s, %s, NOW())
        """, (user_id, program_id))
        
        conn.commit()
        cursor.close()
        conn.close()

# 定时任务示例(使用APScheduler)
from apscheduler.schedulers.background import BackgroundScheduler

def start_notification_service(db_config):
    """启动通知服务"""
    notification_system = NotificationSystem(db_config)
    
    scheduler = BackgroundScheduler()
    # 每分钟检查一次
    scheduler.add_job(
        notification_system.check_upcoming_programs,
        'interval',
        minutes=1
    )
    
    scheduler.start()
    print("通知服务已启动")
    return scheduler

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'user': 'schedule_user',
        'password': 'password',
        'database': 'schedule_db'
    }
    
    email_config = {
        'host': 'smtp.gmail.com',
        'port': 587,
        'sender': 'notifications@example.com',
        'password': 'app_password'
    }
    
    # 启动服务
    scheduler = start_notification_service(db_config)
    
    # 保持程序运行
    try:
        while True:
            import time
            time.sleep(1)
    except KeyboardInterrupt:
        scheduler.shutdown()

3.3 数据可视化展示

使用Chart.js展示收视趋势

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>节目排期分析仪表板</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <style>
        body {
            font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
            margin: 0;
            padding: 20px;
            background: #f5f5f5;
        }
        .dashboard {
            max-width: 1400px;
            margin: 0 auto;
        }
        .chart-container {
            background: white;
            padding: 20px;
            margin-bottom: 20px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .chart-grid {
            display: grid;
            grid-template-columns: 1fr 1fr;
            gap: 20px;
        }
        h1, h2 {
            color: #333;
        }
        .stats-grid {
            display: grid;
            grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
            gap: 15px;
            margin-bottom: 20px;
        }
        .stat-card {
            background: white;
            padding: 20px;
            border-radius: 8px;
            text-align: center;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .stat-value {
            font-size: 32px;
            font-weight: bold;
            color: #667eea;
        }
        .stat-label {
            color: #666;
            margin-top: 5px;
        }
    </style>
</head>
<body>
    <div class="dashboard">
        <h1>节目排期分析仪表板</h1>
        
        <!-- 统计卡片 -->
        <div class="stats-grid" id="statsGrid"></div>

        <!-- 图表区域 -->
        <div class="chart-grid">
            <div class="chart-container">
                <h2>收视率趋势预测</h2>
                <canvas id="viewershipChart"></canvas>
            </div>
            <div class="chart-container">
                <h2>节目类型分布</h2>
                <canvas id="typeDistributionChart"></canvas>
            </div>
        </div>

        <div class="chart-grid">
            <div class="chart-container">
                <h2>黄金时段热度</h2>
                <canvas id="primeTimeChart"></canvas>
            </div>
            <div class="chart-container">
                <h2>用户偏好分析</h2>
                <canvas id="userPreferenceChart"></canvas>
            </div>
        </div>
    </div>

    <script>
        // 模拟数据
        const mockData = {
            stats: {
                totalPrograms: 156,
                avgViewership: 850,
                upcomingToday: 12,
                userRecommendations: 8
            },
            viewershipTrend: {
                labels: ['周一', '周二', '周三', '周四', '周五', '周六', '周日'],
                datasets: [{
                    label: '预测收视率',
                    data: [780, 820, 790, 850, 920, 1100, 1050],
                    borderColor: '#667eea',
                    backgroundColor: 'rgba(102, 126, 234, 0.1)',
                    tension: 0.4
                }, {
                    label: '实际收视率',
                    data: [765, 835, 780, 865, 910, 1120, 1035],
                    borderColor: '#764ba2',
                    backgroundColor: 'rgba(118, 75, 162, 0.1)',
                    tension: 0.4
                }]
            },
            typeDistribution: {
                labels: ['新闻', '电视剧', '综艺', '体育', '电影', '纪录片'],
                data: [25, 35, 20, 10, 7, 3]
            },
            primeTime: {
                labels: ['18:00', '19:00', '20:00', '21:00', '22:00'],
                data: [650, 920, 1100, 980, 720]
            },
            userPreference: {
                labels: ['新闻', '电视剧', '综艺', '体育'],
                data: [40, 30, 20, 10]
            }
        };

        // 初始化统计卡片
        function initStats() {
            const statsGrid = document.getElementById('statsGrid');
            const stats = mockData.stats;
            
            const cards = [
                { label: '今日节目总数', value: stats.totalPrograms },
                { label: '平均收视率', value: stats.avgViewership + '万' },
                { label: '即将开始', value: stats.upcomingToday },
                { label: '为您推荐', value: stats.userRecommendations }
            ];

            statsGrid.innerHTML = cards.map(card => `
                <div class="stat-card">
                    <div class="stat-value">${card.value}</div>
                    <div class="stat-label">${card.label}</div>
                </div>
            `).join('');
        }

        // 初始化收视率趋势图
        function initViewershipChart() {
            const ctx = document.getElementById('viewershipChart').getContext('2d');
            new Chart(ctx, {
                type: 'line',
                data: mockData.viewershipTrend,
                options: {
                    responsive: true,
                    plugins: {
                        legend: {
                            position: 'top',
                        },
                        tooltip: {
                            mode: 'index',
                            intersect: false,
                        }
                    },
                    scales: {
                        y: {
                            beginAtZero: true,
                            title: {
                                display: true,
                                text: '收视率(万)'
                            }
                        }
                    }
                }
            });
        }

        // 初始化类型分布图
        function initTypeDistributionChart() {
            const ctx = document.getElementById('typeDistributionChart').getContext('2d');
            new Chart(ctx, {
                type: 'doughnut',
                data: {
                    labels: mockData.typeDistribution.labels,
                    datasets: [{
                        data: mockData.typeDistribution.data,
                        backgroundColor: [
                            '#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', '#9966FF', '#FF9F40'
                        ]
                    }]
                },
                options: {
                    responsive: true,
                    plugins: {
                        legend: {
                            position: 'right',
                        }
                    }
                }
            });
        }

        // 初始化黄金时段图
        function initPrimeTimeChart() {
            const ctx = document.getElementById('primeTimeChart').getContext('2d');
            new Chart(ctx, {
                type: 'bar',
                data: {
                    labels: mockData.primeTime.labels,
                    datasets: [{
                        label: '平均热度',
                        data: mockData.primeTime.data,
                        backgroundColor: 'rgba(75, 192, 192, 0.6)',
                        borderColor: 'rgba(75, 192, 192, 1)',
                        borderWidth: 1
                    }]
                },
                options: {
                    responsive: true,
                    scales: {
                        y: {
                            beginAtZero: true,
                            title: {
                                display: true,
                                text: '热度值'
                            }
                        }
                    }
                }
            });
        }

        // 初始化用户偏好图
        function initUserPreferenceChart() {
            const ctx = document.getElementById('userPreferenceChart').getContext('2d');
            new Chart(ctx, {
                type: 'polarArea',
                data: {
                    labels: mockData.userPreference.labels,
                    datasets: [{
                        data: mockData.userPreference.data,
                        backgroundColor: [
                            'rgba(255, 99, 132, 0.6)',
                            'rgba(54, 162, 235, 0.6)',
                            'rgba(255, 206, 86, 0.6)',
                            'rgba(75, 192, 192, 0.6)'
                        ]
                    }]
                },
                options: {
                    responsive: true,
                    plugins: {
                        legend: {
                            position: 'right',
                        }
                    }
                }
            });
        }

        // 页面加载完成后初始化
        document.addEventListener('DOMContentLoaded', function() {
            initStats();
            initViewershipChart();
            initTypeDistributionChart();
            initPrimeTimeChart();
            initUserPreferenceChart();
        });
    </script>
</body>
</html>

四、高级功能与优化策略

4.1 实时数据更新机制

import asyncio
import websockets
import json
from datetime import datetime

class RealTimeScheduleUpdater:
    def __init__(self, db_config):
        self.db_config = db_config
        self.subscribers = set()
    
    async def broadcast_updates(self, update_data):
        """广播更新给所有订阅者"""
        if not self.subscribers:
            return
        
        message = json.dumps({
            'type': 'schedule_update',
            'timestamp': datetime.now().isoformat(),
            'data': update_data
        })
        
        await asyncio.gather(
            *[subscriber.send(message) for subscriber in self.subscribers]
        )
    
    async def handle_websocket(self, websocket, path):
        """处理WebSocket连接"""
        self.subscribers.add(websocket)
        try:
            async for message in websocket:
                data = json.loads(message)
                if data['action'] == 'subscribe':
                    # 用户订阅特定频道或节目类型
                    await self.subscribe_to_updates(websocket, data)
        finally:
            self.subscribers.remove(websocket)
    
    async def subscribe_to_updates(self, websocket, data):
        """处理订阅请求"""
        # 存储订阅信息
        subscription = {
            'websocket': websocket,
            'channel_id': data.get('channel_id'),
            'program_type': data.get('program_type'),
            'user_id': data.get('user_id')
        }
        # 可以存储到Redis或内存中
        print(f"用户 {data.get('user_id')} 订阅了更新")
    
    def monitor_schedule_changes(self):
        """监控排期变化"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        # 检查最近5分钟内的变更
        cursor.execute("""
            SELECT schedule_id, program_id, start_time, end_time, change_type
            FROM schedule_audit_log
            WHERE changed_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
            AND notified = FALSE
        """)
        
        changes = cursor.fetchall()
        
        if changes:
            # 广播更新
            asyncio.create_task(self.broadcast_updates(changes))
            
            # 标记为已通知
            change_ids = [c['schedule_id'] for c in changes]
            cursor.execute("""
                UPDATE schedule_audit_log 
                SET notified = TRUE 
                WHERE schedule_id IN (%s)
            """, (','.join(map(str, change_ids)),))
            conn.commit()
        
        cursor.close()
        conn.close()

# WebSocket服务器启动
async def start_websocket_server(host='localhost', port=8765):
    updater = RealTimeScheduleUpdater({
        'host': 'localhost',
        'user': 'schedule_user',
        'password': 'password',
        'database': 'schedule_db'
    })
    
    server = await websockets.serve(
        updater.handle_websocket, host, port
    )
    
    print(f"WebSocket服务器运行在 ws://{host}:{port}")
    
    # 启动监控任务
    asyncio.create_task(monitor_schedule_changes(updater))
    
    return server

async def monitor_schedule_changes(updater):
    """定时监控排期变化"""
    while True:
        updater.monitor_schedule_changes()
        await asyncio.sleep(60)  # 每分钟检查一次

# 客户端连接示例
async def websocket_client():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        # 发送订阅请求
        await websocket.send(json.dumps({
            'action': 'subscribe',
            'user_id': 'user123',
            'channel_id': 'CCTV1',
            'program_type': '新闻'
        }))
        
        # 接收更新
        async for message in websocket:
            data = json.loads(message)
            print(f"收到更新: {data}")
            # 在这里处理实时更新,如刷新UI、发送通知等

4.2 机器学习模型优化

from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_absolute_error, mean_squared_error
import joblib

class ModelOptimizer:
    def __init__(self):
        self.best_model = None
        self.best_params = None
    
    def optimize_random_forest(self, X, y):
        """优化随机森林参数"""
        param_grid = {
            'n_estimators': [50, 100, 200],
            'max_depth': [None, 10, 20, 30],
            'min_samples_split': [2, 5, 10],
            'min_samples_leaf': [1, 2, 4]
        }
        
        rf = RandomForestRegressor(random_state=42)
        grid_search = GridSearchCV(
            rf, param_grid, 
            cv=5, 
            scoring='neg_mean_absolute_error',
            n_jobs=-1
        )
        
        grid_search.fit(X, y)
        
        self.best_model = grid_search.best_estimator_
        self.best_params = grid_search.best_params_
        
        return self.best_model, self.best_params
    
    def evaluate_model(self, model, X_test, y_test):
        """评估模型性能"""
        predictions = model.predict(X_test)
        
        mae = mean_absolute_error(y_test, predictions)
        rmse = np.sqrt(mean_squared_error(y_test, predictions))
        
        # 计算准确率(在阈值内)
        threshold = 0.1 * np.mean(y_test)
        accuracy = np.mean(np.abs(predictions - y_test) < threshold)
        
        return {
            'mae': mae,
            'rmse': rmse,
            'accuracy': accuracy
        }
    
    def save_model(self, model, filename):
        """保存模型"""
        joblib.dump(model, filename)
        print(f"模型已保存到 {filename}")
    
    def load_model(self, filename):
        """加载模型"""
        self.best_model = joblib.load(filename)
        return self.best_model

# 使用示例
if __name__ == "__main__":
    # 准备数据
    from sklearn.datasets import make_regression
    X, y = make_regression(n_samples=1000, n_features=10, noise=0.1, random_state=42)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    optimizer = ModelOptimizer()
    
    # 优化模型
    print("开始优化模型...")
    model, params = optimizer.optimize_random_forest(X_train, y_train)
    print(f"最佳参数: {params}")
    
    # 评估模型
    metrics = optimizer.evaluate_model(model, X_test, y_test)
    print(f"评估结果: {metrics}")
    
    # 保存模型
    optimizer.save_model(model, 'schedule_predictor.pkl')
    
    # 加载模型
    loaded_model = optimizer.load_model('schedule_predictor.pkl')

4.3 A/B测试框架

import hashlib
from datetime import datetime

class ABTestFramework:
    def __init__(self, db_config):
        self.db_config = db_config
    
    def assign_user_to_variant(self, user_id: str, test_name: str):
        """将用户分配到测试组"""
        # 使用用户ID哈希确定分组(确保一致性)
        hash_value = int(hashlib.md5(f"{user_id}:{test_name}".encode()).hexdigest(), 16)
        
        # 假设50/50分组
        variant = 'A' if hash_value % 2 == 0 else 'B'
        
        # 记录分配
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        
        cursor.execute("""
        INSERT INTO ab_test_assignments (user_id, test_name, variant, assigned_at)
        VALUES (%s, %s, %s, NOW())
        ON DUPLICATE KEY UPDATE variant = VALUES(variant)
        """, (user_id, test_name, variant))
        
        conn.commit()
        cursor.close()
        conn.close()
        
        return variant
    
    def get_recommendation_strategy(self, user_id: str):
        """根据测试组选择推荐策略"""
        variant = self.assign_user_to_variant(user_id, 'recommendation_strategy')
        
        if variant == 'A':
            # 策略A:基于热门度
            return self.popularity_based_recommendation
        else:
            # 策略B:基于协同过滤
            return self.collaborative_filtering_recommendation
    
    def popularity_based_recommendation(self, user_id: str, limit: int = 10):
        """基于热门度的推荐"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        cursor.execute("""
            SELECT p.program_id, p.title, p.type, 
                   COALESCE(pp.predicted_viewership, 0) as score
            FROM programs p
            LEFT JOIN program_predictions pp ON p.program_id = pp.program_id
            WHERE pp.prediction_date = CURDATE()
            ORDER BY score DESC
            LIMIT %s
        """, (limit,))
        
        results = cursor.fetchall()
        cursor.close()
        conn.close()
        
        return results
    
    def collaborative_filtering_recommendation(self, user_id: str, limit: int = 10):
        """基于协同过滤的推荐"""
        # 使用之前实现的协同过滤算法
        # 这里简化为返回随机结果
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        cursor.execute("""
            SELECT p.program_id, p.title, p.type, RAND() as score
            FROM programs p
            ORDER BY score DESC
            LIMIT %s
        """, (limit,))
        
        results = cursor.fetchall()
        cursor.close()
        conn.close()
        
        return results
    
    def record_conversion(self, user_id: str, test_name: str, 
                         variant: str, action: str):
        """记录用户转化行为"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        
        cursor.execute("""
        INSERT INTO ab_test_conversions 
        (user_id, test_name, variant, action, created_at)
        VALUES (%s, %s, %s, %s, NOW())
        """, (user_id, test_name, variant, action))
        
        conn.commit()
        cursor.close()
        conn.close()
    
    def analyze_results(self, test_name: str):
        """分析测试结果"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        # 计算转化率
        cursor.execute("""
        SELECT 
            variant,
            COUNT(DISTINCT user_id) as total_users,
            COUNT(DISTINCT CASE WHEN action = 'watch' THEN user_id END) as watchers,
            COUNT(DISTINCT CASE WHEN action = 'favorite' THEN user_id END) as favorites
        FROM ab_test_assignments a
        LEFT JOIN ab_test_conversions c ON a.user_id = c.user_id 
            AND a.test_name = c.test_name
        WHERE a.test_name = %s
        GROUP BY variant
        """, (test_name,))
        
        results = cursor.fetchall()
        
        # 计算转化率
        for result in results:
            total = result['total_users']
            result['watch_rate'] = (result['watchers'] / total * 100) if total > 0 else 0
            result['favorite_rate'] = (result['favorites'] / total * 100) if total > 0 else 0
        
        cursor.close()
        conn.close()
        
        return results

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'user': 'schedule_user',
        'password': 'password',
        'database': 'schedule_db'
    }
    
    ab_test = ABTestFramework(db_config)
    
    # 为用户分配测试组
    variant = ab_test.assign_user_to_variant('user123', 'recommendation_strategy')
    print(f"用户分配到测试组: {variant}")
    
    # 获取推荐
    strategy = ab_test.get_recommendation_strategy('user123')
    recommendations = strategy('user123', limit=5)
    print(f"推荐结果: {recommendations}")
    
    # 记录转化
    ab_test.record_conversion('user123', 'recommendation_strategy', variant, 'watch')
    
    # 分析结果
    results = ab_test.analyze_results('recommendation_strategy')
    print(f"测试结果: {results}")

五、最佳实践与注意事项

5.1 数据质量保障

class DataQualityValidator:
    """数据质量验证器"""
    
    def __init__(self, db_config):
        self.db_config = db_config
    
    def validate_program_data(self, program_data: dict) -> bool:
        """验证节目数据完整性"""
        required_fields = ['program_id', 'title', 'type', 'duration_minutes']
        
        for field in required_fields:
            if field not in program_data or not program_data[field]:
                print(f"错误: 缺少必填字段 {field}")
                return False
        
        # 验证节目类型
        valid_types = ['新闻', '电视剧', '综艺', '体育', '电影', '纪录片']
        if program_data['type'] not in valid_types:
            print(f"错误: 无效的节目类型 {program_data['type']}")
            return False
        
        # 验证时长
        if program_data['duration_minutes'] <= 0:
            print("错误: 节目时长必须大于0")
            return False
        
        return True
    
    def validate_schedule_data(self, schedule_data: dict) -> bool:
        """验证排期数据"""
        # 验证时间逻辑
        start_time = schedule_data['start_time']
        end_time = schedule_data['end_time']
        
        if end_time <= start_time:
            print("错误: 结束时间必须晚于开始时间")
            return False
        
        # 验证时间冲突
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        
        cursor.execute("""
            SELECT COUNT(*) FROM schedule
            WHERE channel_id = %s
            AND (
                (start_time < %s AND end_time > %s)
                OR (start_time < %s AND end_time > %s)
                OR (start_time >= %s AND end_time <= %s)
            )
        """, (
            schedule_data['channel_id'],
            end_time, start_time,
            start_time, end_time,
            start_time, end_time
        ))
        
        conflict_count = cursor.fetchone()[0]
        cursor.close()
        conn.close()
        
        if conflict_count > 0:
            print("错误: 存在时间冲突")
            return False
        
        return True
    
    def clean_duplicate_programs(self):
        """清理重复节目"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor()
        
        # 查找重复节目(根据标题和类型)
        cursor.execute("""
            SELECT title, type, COUNT(*) as cnt
            FROM programs
            GROUP BY title, type
            HAVING cnt > 1
        """)
        
        duplicates = cursor.fetchall()
        
        for title, p_type, count in duplicates:
            print(f"发现重复: {title} ({p_type}) - {count}个")
            
            # 保留最新的,删除旧的
            cursor.execute("""
                DELETE p1 FROM programs p1
                INNER JOIN programs p2 
                WHERE p1.title = p2.title 
                AND p1.type = p2.type
                AND p1.program_id < p2.program_id
            """)
        
        conn.commit()
        cursor.close()
        conn.close()
    
    def validate_prediction_accuracy(self, days: int = 30):
        """验证预测准确性"""
        conn = mysql.connector.connect(**self.db_config)
        cursor = conn.cursor(dictionary=True)
        
        cursor.execute("""
            SELECT 
                p.program_id,
                p.title,
                pp.predicted_viewership,
                wh.actual_viewership,
                ABS(pp.predicted_viewership - wh.actual_viewership) / wh.actual_viewership * 100 as error_rate
            FROM program_predictions pp
            JOIN programs p ON pp.program_id = p.program_id
            JOIN (
                SELECT program_id, COUNT(*) as actual_viewership
                FROM user_watch_history
                WHERE watch_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
                GROUP BY program_id
            ) wh ON pp.program_id = wh.program_id
            WHERE pp.prediction_date >= DATE_SUB(NOW(), INTERVAL %s DAY)
            AND pp.prediction_date < CURDATE()
        """, (days, days))
        
        results = cursor.fetchall()
        
        if results:
            avg_error = sum(r['error_rate'] for r in results) / len(results)
            print(f"过去{days}天预测平均误差: {avg_error:.2f}%")
            
            # 如果误差超过20%,发出警告
            if avg_error > 20:
                print("警告: 预测误差较大,建议重新训练模型")
        
        cursor.close()
        conn.close()
        
        return results

# 使用示例
if __name__ == "__main__":
    validator = DataQualityValidator({
        'host': 'localhost',
        'user': 'schedule_user',
        'password': 'password',
        'database': 'schedule_db'
    })
    
    # 验证节目数据
    program_data = {
        'program_id': 'PROG001',
        'title': '晚间新闻',
        'type': '新闻',
        'duration_minutes': 30
    }
    
    if validator.validate_program_data(program_data):
        print("节目数据验证通过")
    
    # 验证排期数据
    schedule_data = {
        'channel_id': 'CCTV1',
        'start_time': datetime(2024, 1, 1, 19, 0),
        'end_time': datetime(2024, 1, 1, 19, 30)
    }
    
    if validator.validate_schedule_data(schedule_data):
        print("排期数据验证通过")
    
    # 清理重复数据
    validator.clean_duplicate_programs()
    
    # 验证预测准确性
    validator.validate_prediction_accuracy()

5.2 性能优化建议

# 缓存实现
from functools import lru_cache
import redis

class CacheManager:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
    
    @lru_cache(maxsize=128)
    def get_cached_schedule(self, channel_id: str, date_str: str):
        """缓存排期查询结果"""
        cache_key = f"schedule:{channel_id}:{date_str}"
        
        # 先从Redis获取
        cached = self.redis_client.get(cache_key)
        if cached:
            return json.loads(cached)
        
        # 从数据库查询
        result = self.query_database(channel_id, date_str)
        
        # 缓存1小时
        self.redis_client.setex(cache_key, 3600, json.dumps(result))
        
        return result
    
    def query_database(self, channel_id: str, date_str: str):
        """数据库查询(简化)"""
        # 实际实现中这里会连接数据库
        return {"channel": channel_id, "date": date_str, "programs": []}
    
    def invalidate_cache(self, channel_id: str, date_str: str):
        """使缓存失效"""
        cache_key = f"schedule:{channel_id}:{date_str}"
        self.redis_client.delete(cache_key)

# 数据库索引优化建议
"""
-- 为常用查询创建复合索引
CREATE INDEX idx_schedule_channel_time ON schedule(channel_id, start_time);
CREATE INDEX idx_programs_type ON programs(type);
CREATE INDEX idx_user_preferences_user ON user_preferences(user_id, program_type);
CREATE INDEX idx_watch_history_user_time ON user_watch_history(user_id, watch_time);

-- 分区表(适用于大数据量)
ALTER TABLE schedule PARTITION BY RANGE (YEAR(start_time)) (
    PARTITION p2023 VALUES LESS THAN (2024),
    PARTITION p2024 VALUES LESS THAN (2025),
    PARTITION pfuture VALUES LESS THAN MAXVALUE
);
"""

六、总结

通过本文的详细介绍,我们构建了一个完整的节目排期预测与查询系统。核心要点包括:

  1. 技术架构:采用前后端分离,使用机器学习进行预测,WebSocket实现实时更新
  2. 数据驱动:基于历史数据、用户行为和外部因素进行精准预测
  3. 个性化服务:通过用户偏好设置和协同过滤算法提供定制化推荐
  4. 智能提醒:多渠道推送确保用户不错过精彩内容
  5. 持续优化:通过A/B测试和模型评估不断改进系统

这套系统能够帮助用户精准掌握未来节目动态,有效避免错过精彩内容,同时提供个性化的观看体验。通过合理的数据架构和算法选择,系统可以在保证准确性的同时,提供流畅的用户体验。