引言:为什么精准掌握节目动态如此重要
在信息爆炸的时代,电视、流媒体平台和网络视频内容层出不穷,用户常常面临”选择困难症”——不知道该看什么,更担心错过真正精彩的节目。传统的节目单查询方式往往只能提供当前或近期的节目信息,无法满足用户对”未来节目动态”的前瞻性需求。精准的排期预测和节目排期表查询系统能够帮助用户:
- 提前规划观看时间:避免临时决定导致错过重要节目
- 发现潜在兴趣内容:通过预测推荐可能喜欢的节目
- 优化娱乐时间管理:合理安排观看计划,提升生活品质
- 避免信息过载:过滤无关信息,聚焦真正感兴趣的内容
一、节目排期预测的核心技术原理
1.1 数据收集与预处理
精准的排期预测首先需要全面、准确的数据基础。这包括:
历史节目数据:
- 节目类型(新闻、电视剧、综艺、体育等)
- 播出时间段(黄金档、深夜档等)
- 持续时间
- 收视率/播放量数据
- 用户评分和评论
用户行为数据:
- 观看历史
- 偏好设置
- 互动行为(点赞、收藏、分享)
- 观看时长和完成度
外部数据:
- 节假日信息
- 重大事件(体育赛事、颁奖典礼等)
- 社交媒体热度
- 行业趋势
1.2 预测模型构建
基于时间序列的预测
对于具有明显时间规律的节目(如每日新闻、周末综艺),可以使用时间序列分析:
import pandas as pd
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
import warnings
warnings.filterwarnings('ignore')
class ProgramSchedulePredictor:
def __init__(self):
self.historical_data = None
self.model = None
def load_data(self, data_path):
"""加载历史节目数据"""
self.historical_data = pd.read_csv(data_path)
self.historical_data['date'] = pd.to_datetime(self.historical_data['date'])
return self.historical_data
def prepare_features(self, data):
"""准备训练特征"""
# 提取时间特征
data['day_of_week'] = data['date'].dt.dayofweek
data['month'] = data['date'].dt.month
data['is_holiday'] = data['date'].isin(self.get_holidays())
# 节目类型编码
data = pd.get_dummies(data, columns=['program_type'])
return data
def train_arima_model(self, data, program_name):
"""训练ARIMA模型进行时间序列预测"""
# 过滤特定节目数据
program_data = data[data['program_name'] == program_name]
# 按日期排序
program_data = program_data.sort_values('date')
# 使用收视率作为预测目标
viewership = program_data['viewership'].values
# 拆分训练测试集
train_size = int(len(viewership) * 0.8)
train, test = viewership[:train_size], viewership[train_size:]
# 拟合ARIMA模型
model = ARIMA(train, order=(2,1,2))
model_fit = model.fit()
# 预测
forecast = model_fit.forecast(steps=len(test))
return forecast, test
def train_ml_model(self, data):
"""训练机器学习模型进行多特征预测"""
features = ['day_of_week', 'month', 'is_holiday', 'program_type_新闻',
'program_type_电视剧', 'program_type_综艺', 'program_type_体育']
target = 'viewership'
X = data[features]
y = data[target]
# 训练随机森林模型
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
return model
def predict_future_schedule(self, model, future_dates, program_types):
"""预测未来节目排期"""
predictions = []
for date, p_type in zip(future_dates, program_types):
# 构建特征向量
features = {
'day_of_week': date.dayofweek,
'month': date.month,
'is_holiday': date in self.get_holidays(),
f'program_type_{p_type}': 1
}
# 填充其他节目类型为0
for pt in ['新闻', '电视剧', '综艺', '体育']:
if f'program_type_{pt}' not in features:
features[f'program_type_{pt}'] = 0
# 预测
feature_vector = [features[k] for k in ['day_of_week', 'month', 'is_holiday'] +
[f'program_type_{pt}' for pt in ['新闻', '电视剧', '综艺', '体育']]]
pred = model.predict([feature_vector])[0]
predictions.append(pred)
return predictions
def get_holidays(self):
"""获取节假日数据(示例)"""
return pd.to_datetime(['2024-01-01', '2024-02-10', '2024-05-01',
'2024-10-01', '2024-12-25'])
# 使用示例
if __name__ == "__main__":
# 初始化预测器
predictor = ProgramSchedulePredictor()
# 加载数据(示例数据)
data = pd.DataFrame({
'date': pd.date_range('2023-01-01', '2023-12-31', freq='D'),
'program_name': ['晚间新闻'] * 365,
'program_type': ['新闻'] * 365,
'viewership': np.random.normal(1000, 200, 365) +
np.sin(np.arange(365) * 2 * np.pi / 7) * 100 # 周期性变化
})
# 训练模型
data_prepared = predictor.prepare_features(data)
ml_model = predictor.train_ml_model(data_prepared)
# 预测未来
future_dates = pd.date_range('2024-01-01', '2024-01-07')
program_types = ['新闻'] * 7
predictions = predictor.predict_future_schedule(ml_model, future_dates, program_types)
print("未来7天新闻节目收视率预测:")
for date, pred in zip(future_dates, predictions):
print(f"{date.strftime('%Y-%m-%d')}: {pred:.2f} 万观众")
基于协同过滤的推荐预测
对于用户个性化推荐,可以使用协同过滤算法:
import numpy as np
from scipy.sparse import csr_matrix
from sklearn.neighbors import NearestNeighbors
class CollaborativeFilteringRecommender:
def __init__(self, n_neighbors=20):
self.n_neighbors = n_neighbors
self.user_index = None
self.program_index = None
self.model = None
def create_user_program_matrix(self, user_program_data):
"""创建用户-节目评分矩阵"""
# user_program_data: DataFrame with columns ['user_id', 'program_id', 'rating']
# 创建用户和节目的索引映射
unique_users = user_program_data['user_id'].unique()
unique_programs = user_program_data['program_id'].unique()
self.user_index = {user: idx for idx, user in enumerate(unique_users)}
self.program_index = {prog: idx for idx, prog in enumerate(unique_programs)}
# 构建稀疏矩阵
rows = user_program_data['user_id'].map(self.user_index)
cols = user_program_data['program_id'].map(self.program_index)
values = user_program_data['rating']
matrix = csr_matrix((values, (rows, cols)),
shape=(len(unique_users), len(unique_programs)))
return matrix
def fit(self, user_program_data):
"""训练KNN模型"""
matrix = self.create_user_program_matrix(user_program_data)
# 使用余弦相似度
self.model = NearestNeighbors(n_neighbors=self.n_neighbors + 1,
metric='cosine', algorithm='brute')
self.model.fit(matrix)
return self
def recommend_for_user(self, user_id, top_n=10):
"""为用户推荐节目"""
if user_id not in self.user_index:
return []
user_idx = self.user_index[user_id]
# 找到相似用户
distances, indices = self.model.kneighbors(
self.model._fit_X[user_idx].reshape(1, -1)
)
# 排除用户自己
similar_users = indices[0][1:]
# 获取相似用户的节目偏好
recommendations = {}
for sim_user_idx in similar_users:
# 获取该相似用户评分高的节目
user_ratings = self.model._fit_X[sim_user_idx].toarray()[0]
high_rated_indices = np.where(user_ratings > 3.5)[0]
for prog_idx in high_rated_indices:
if prog_idx not in recommendations:
recommendations[prog_idx] = 0
recommendations[prog_idx] += user_ratings[prog_idx]
# 排序并返回节目ID
sorted_programs = sorted(recommendations.items(),
key=lambda x: x[1], reverse=True)[:top_n]
# 将索引转换回节目ID
reverse_program_index = {v: k for k, v in self.program_index.items()}
return [reverse_program_index[prog_idx] for prog_idx, _ in sorted_programs]
# 使用示例
if __name__ == "__main__":
# 示例数据:用户评分数据
user_program_data = pd.DataFrame({
'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4],
'program_id': ['新闻A', '综艺B', '电视剧C', '新闻A', '体育D', '综艺B',
'新闻A', '电视剧C', '体育D', '综艺B', '电视剧C', '体育D'],
'rating': [5, 4, 3, 4, 5, 3, 5, 4, 4, 3, 4, 5]
})
# 训练推荐器
recommender = CollaborativeFilteringRecommender(n_neighbors=3)
recommender.fit(user_program_data)
# 为用户1推荐
recommendations = recommender.recommend_for_user(user_id=1, top_n=5)
print(f"用户1的推荐节目:{recommendations}")
1.3 混合预测模型
结合多种预测方法的优势:
class HybridPredictor:
def __init__(self):
self.time_series_model = None
self.ml_model = None
self.cf_model = None
def ensemble_predict(self, program_id, user_id, future_date):
"""集成多种预测方法"""
# 1. 时间序列预测(基于历史规律)
ts_pred = self.time_series_model.predict(program_id, future_date)
# 2. 机器学习预测(基于多特征)
ml_pred = self.ml_model.predict(program_id, future_date)
# 3. 协同过滤预测(基于用户相似性)
cf_pred = self.cf_model.predict(program_id, user_id)
# 加权集成
weights = [0.4, 0.4, 0.2] # 可根据验证集调整
final_pred = (ts_pred * weights[0] +
ml_pred * weights[1] +
cf_pred * weights[2])
return {
'final_score': final_pred,
'components': {
'time_series': ts_pred,
'machine_learning': ml_pred,
'collaborative_filtering': cf_pred
}
}
二、节目排期表查询系统设计
2.1 数据存储架构
数据库设计
-- 节目表
CREATE TABLE programs (
program_id VARCHAR(50) PRIMARY KEY,
title VARCHAR(255) NOT NULL,
type ENUM('新闻', '电视剧', '综艺', '体育', '电影', '纪录片') NOT NULL,
duration_minutes INT,
description TEXT,
cast TEXT,
director VARCHAR(100),
release_year INT,
language VARCHAR(50),
rating DECIMAL(3,1),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 播出计划表
CREATE TABLE schedule (
schedule_id INT AUTO_INCREMENT PRIMARY KEY,
program_id VARCHAR(50),
channel_id VARCHAR(50),
start_time DATETIME,
end_time DATETIME,
is_live BOOLEAN DEFAULT FALSE,
episode_number INT,
season_number INT,
FOREIGN KEY (program_id) REFERENCES programs(program_id),
INDEX idx_start_time (start_time),
INDEX idx_channel_time (channel_id, start_time)
);
-- 用户偏好表
CREATE TABLE user_preferences (
user_id VARCHAR(50),
program_type VARCHAR(50),
preferred_time_start TIME,
preferred_time_end TIME,
notify_before_minutes INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, program_type)
);
-- 用户观看记录表
CREATE TABLE user_watch_history (
user_id VARCHAR(50),
program_id VARCHAR(50),
watch_time DATETIME,
completion_rate DECIMAL(5,2),
rating INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, program_id),
INDEX idx_user_time (user_id, watch_time)
);
-- 预测结果表
CREATE TABLE program_predictions (
prediction_id INT AUTO_INCREMENT PRIMARY KEY,
program_id VARCHAR(50),
prediction_date DATE,
predicted_viewership INT,
confidence_score DECIMAL(5,2),
model_version VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_program_date (program_id, prediction_date)
);
2.2 后端API实现
Flask实现的查询接口
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
import mysql.connector
import json
from typing import List, Dict
app = Flask(__name__)
class ScheduleQuerySystem:
def __init__(self, db_config):
self.db_config = db_config
def get_connection(self):
return mysql.connector.connect(**self.db_config)
def query_schedule(self, start_time: datetime, end_time: datetime,
channel_id: str = None, program_type: str = None) -> List[Dict]:
"""查询时间段内的节目排期"""
conn = self.get_connection()
cursor = conn.cursor(dictionary=True)
query = """
SELECT
s.schedule_id,
p.program_id,
p.title,
p.type,
p.description,
s.start_time,
s.end_time,
s.is_live,
s.episode_number,
s.season_number,
c.channel_name
FROM schedule s
JOIN programs p ON s.program_id = p.program_id
JOIN channels c ON s.channel_id = c.channel_id
WHERE s.start_time >= %s AND s.end_time <= %s
"""
params = [start_time, end_time]
if channel_id:
query += " AND s.channel_id = %s"
params.append(channel_id)
if program_type:
query += " AND p.type = %s"
params.append(program_type)
query += " ORDER BY s.start_time"
cursor.execute(query, params)
results = cursor.fetchall()
cursor.close()
conn.close()
return results
def get_user_recommendations(self, user_id: str, days: int = 7) -> List[Dict]:
"""获取用户个性化推荐"""
conn = self.get_connection()
cursor = conn.cursor(dictionary=True)
# 获取用户偏好
cursor.execute("""
SELECT program_type, preferred_time_start, preferred_time_end
FROM user_preferences
WHERE user_id = %s
""", (user_id,))
preferences = cursor.fetchall()
# 如果没有偏好,使用默认推荐
if not preferences:
preferences = [{'program_type': None, 'preferred_time_start': None, 'preferred_time_end': None}]
recommendations = []
for pref in preferences:
# 查询未来几天的节目
start_date = datetime.now()
end_date = start_date + timedelta(days=days)
query = """
SELECT
p.program_id,
p.title,
p.type,
s.start_time,
s.end_time,
COALESCE(pp.predicted_viewership, 0) as predicted_popularity,
COALESCE(pp.confidence_score, 0) as confidence
FROM schedule s
JOIN programs p ON s.program_id = p.program_id
LEFT JOIN program_predictions pp ON p.program_id = pp.program_id
AND pp.prediction_date = DATE(s.start_time)
WHERE s.start_time >= %s AND s.start_time <= %s
"""
params = [start_date, end_date]
if pref['program_type']:
query += " AND p.type = %s"
params.append(pref['program_type'])
if pref['preferred_time_start'] and pref['preferred_time_end']:
query += " AND TIME(s.start_time) BETWEEN %s AND %s"
params.extend([pref['preferred_time_start'], pref['preferred_time_end']])
# 加入用户历史评分权重
query += """
ORDER BY
(predicted_popularity * 0.6 +
CASE WHEN EXISTS (
SELECT 1 FROM user_watch_history wh
WHERE wh.user_id = %s AND wh.program_id = p.program_id
) THEN 0.4 ELSE 0 END) DESC
LIMIT 20
"""
params.append(user_id)
cursor.execute(query, params)
results = cursor.fetchall()
recommendations.extend(results)
cursor.close()
conn.close()
return recommendations
def get_upcoming_notifications(self, user_id: str, minutes_before: int = 15) -> List[Dict]:
"""获取即将开始的节目提醒"""
conn = self.get_connection()
cursor = conn.cursor(dictionary=True)
now = datetime.now()
future_time = now + timedelta(minutes=minutes_before)
query = """
SELECT
p.program_id,
p.title,
p.type,
s.start_time,
up.notify_before_minutes
FROM schedule s
JOIN programs p ON s.program_id = p.program_id
JOIN user_preferences up ON p.type = up.program_type
WHERE s.start_time BETWEEN %s AND %s
AND up.user_id = %s
AND s.start_time > %s
ORDER BY s.start_time
"""
cursor.execute(query, [now, future_time, user_id, now])
results = cursor.fetchall()
cursor.close()
conn.close()
return results
# Flask路由
@app.route('/api/schedule', methods=['GET'])
def get_schedule():
"""排期查询接口"""
try:
start_str = request.args.get('start_time')
end_str = request.args.get('end_time')
channel_id = request.args.get('channel_id')
program_type = request.args.get('program_type')
start_time = datetime.fromisoformat(start_str)
end_time = datetime.fromisoformat(end_str)
system = ScheduleQuerySystem({
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
})
results = system.query_schedule(start_time, end_time, channel_id, program_type)
return jsonify({
'status': 'success',
'count': len(results),
'data': results
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@app.route('/api/recommendations/<user_id>', methods=['GET'])
def get_recommendations(user_id):
"""推荐接口"""
try:
days = int(request.args.get('days', 7))
system = ScheduleQuerySystem({
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
})
results = system.get_user_recommendations(user_id, days)
return jsonify({
'status': 'success',
'count': len(results),
'data': results
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
@app.route('/api/notifications/<user_id>', methods=['GET'])
def get_notifications(user_id):
"""提醒接口"""
try:
minutes_before = int(request.args.get('minutes_before', 15))
system = ScheduleQuerySystem({
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
})
results = system.get_upcoming_notifications(user_id, minutes_before)
return jsonify({
'status': 'success',
'count': len(results),
'data': results
})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 400
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
2.3 前端展示实现
React组件实现
import React, { useState, useEffect } from 'react';
import axios from 'axios';
import './ScheduleApp.css';
const ScheduleApp = () => {
const [schedule, setSchedule] = useState([]);
const [recommendations, setRecommendations] = useState([]);
const [notifications, setNotifications] = useState([]);
const [loading, setLoading] = useState(false);
const [filters, setFilters] = useState({
startDate: '',
endDate: '',
channel: '',
type: ''
});
const [userId, setUserId] = useState('user123');
// 获取排期数据
const fetchSchedule = async () => {
setLoading(true);
try {
const params = new URLSearchParams({
start_time: filters.startDate,
end_time: filters.endDate,
...(filters.channel && { channel_id: filters.channel }),
...(filters.type && { program_type: filters.type })
});
const response = await axios.get(`/api/schedule?${params}`);
setSchedule(response.data.data);
} catch (error) {
console.error('Error fetching schedule:', error);
} finally {
setLoading(false);
}
};
// 获取推荐
const fetchRecommendations = async () => {
try {
const response = await axios.get(`/api/recommendations/${userId}?days=7`);
setRecommendations(response.data.data);
} catch (error) {
console.error('Error fetching recommendations:', error);
}
};
// 获取提醒
const fetchNotifications = async () => {
try {
const response = await axios.get(`/api/notifications/${userId}?minutes_before=15`);
setNotifications(response.data.data);
} catch (error) {
console.error('Error fetching notifications:', error);
}
};
// 定时刷新提醒
useEffect(() => {
fetchNotifications();
const interval = setInterval(fetchNotifications, 60000); // 每分钟检查一次
return () => clearInterval(interval);
}, [userId]);
// 格式化时间
const formatTime = (datetimeStr) => {
const date = new Date(datetimeStr);
return date.toLocaleString('zh-CN', {
month: 'short',
day: 'numeric',
hour: '2-digit',
minute: '2-digit'
});
};
// 添加到日历
const addToCalendar = (program) => {
const event = {
title: program.title,
start: program.start_time,
end: program.end_time,
description: program.description
};
// 创建ICS文件
const icsContent = `BEGIN:VCALENDAR
VERSION:2.0
BEGIN:VEVENT
SUMMARY:${event.title}
DTSTART:${new Date(event.start).toISOString().replace(/[-:]/g, '').split('.')[0]}Z
DTEND:${new Date(event.end).toISOString().replace(/[-:]/g, '').split('.')[0]}Z
DESCRIPTION:${event.description}
END:VEVENT
END:VCALENDAR`;
const blob = new Blob([icsContent], { type: 'text/calendar' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `${event.title}.ics`;
a.click();
};
return (
<div className="schedule-app">
<header className="app-header">
<h1>智能节目排期系统</h1>
<div className="user-info">
<input
type="text"
value={userId}
onChange={(e) => setUserId(e.target.value)}
placeholder="用户ID"
/>
</div>
</header>
{/* 提醒区域 */}
{notifications.length > 0 && (
<div className="notifications-section">
<h2>即将开始的节目</h2>
<div className="notification-list">
{notifications.map(notif => (
<div key={notif.program_id} className="notification-item">
<span className="program-type">{notif.type}</span>
<span className="program-title">{notif.title}</span>
<span className="start-time">
{formatTime(notif.start_time)} 开始
</span>
<button onClick={() => addToCalendar(notif)}>
添加到日历
</button>
</div>
))}
</div>
</div>
)}
{/* 筛选器 */}
<div className="filter-section">
<h2>查询条件</h2>
<div className="filter-grid">
<input
type="datetime-local"
value={filters.startDate}
onChange={(e) => setFilters({...filters, startDate: e.target.value})}
placeholder="开始时间"
/>
<input
type="datetime-local"
value={filters.endDate}
onChange={(e) => setFilters({...filters, endDate: e.target.value})}
placeholder="结束时间"
/>
<select
value={filters.channel}
onChange={(e) => setFilters({...filters, channel: e.target.value})}
>
<option value="">所有频道</option>
<option value="CCTV1">CCTV-1</option>
<option value="CCTV2">CCTV-2</option>
<option value="HunanTV">湖南卫视</option>
</select>
<select
value={filters.type}
onChange={(e) => setFilters({...filters, type: e.target.value})}
>
<option value="">所有类型</option>
<option value="新闻">新闻</option>
<option value="电视剧">电视剧</option>
<option value="综艺">综艺</option>
<option value="体育">体育</option>
</select>
<button onClick={fetchSchedule} disabled={loading}>
{loading ? '查询中...' : '查询排期'}
</button>
</div>
</div>
{/* 排期列表 */}
<div className="schedule-section">
<h2>节目排期表</h2>
{schedule.length === 0 ? (
<p className="empty-message">暂无数据,请先查询</p>
) : (
<div className="schedule-grid">
{schedule.map(item => (
<div key={item.schedule_id} className="schedule-card">
<div className="card-header">
<span className="type-badge">{item.type}</span>
<span className="channel">{item.channel_name}</span>
</div>
<h3>{item.title}</h3>
<p className="time">
{formatTime(item.start_time)} - {formatTime(item.end_time)}
</p>
{item.episode_number && (
<p className="episode">第{item.episode_number}集</p>
)}
<p className="description">{item.description}</p>
<div className="card-actions">
<button onClick={() => addToCalendar(item)}>
添加到日历
</button>
</div>
</div>
))}
</div>
)}
</div>
{/* 推荐区域 */}
<div className="recommendations-section">
<h2>为您推荐</h2>
<button onClick={fetchRecommendations} className="refresh-btn">
刷新推荐
</button>
<div className="recommendation-list">
{recommendations.map(item => (
<div key={item.program_id} className="recommendation-item">
<div className="rec-info">
<span className="type">{item.type}</span>
<span className="title">{item.title}</span>
<span className="time">{formatTime(item.start_time)}</span>
</div>
<div className="rec-score">
<span>热度: {item.predicted_popularity}</span>
<span>置信: {item.confidence}%</span>
</div>
</div>
))}
</div>
</div>
</div>
);
};
export default ScheduleApp;
CSS样式
/* ScheduleApp.css */
.schedule-app {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.app-header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px;
border-radius: 10px;
margin-bottom: 20px;
display: flex;
justify-content: space-between;
align-items: center;
}
.app-header h1 {
margin: 0;
font-size: 24px;
}
.user-info input {
padding: 8px;
border: none;
border-radius: 5px;
font-size: 14px;
}
.notifications-section {
background: #fff3cd;
border: 1px solid #ffeaa7;
border-radius: 8px;
padding: 15px;
margin-bottom: 20px;
}
.notification-list {
display: flex;
flex-direction: column;
gap: 10px;
}
.notification-item {
display: flex;
align-items: center;
gap: 10px;
padding: 10px;
background: white;
border-radius: 5px;
}
.program-type {
background: #007bff;
color: white;
padding: 3px 8px;
border-radius: 3px;
font-size: 12px;
}
.program-title {
font-weight: bold;
flex: 1;
}
.start-time {
color: #dc3545;
font-weight: bold;
}
.filter-section {
background: #f8f9fa;
padding: 20px;
border-radius: 8px;
margin-bottom: 20px;
}
.filter-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 10px;
margin-top: 10px;
}
.filter-grid input, .filter-grid select, .filter-grid button {
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
font-size: 14px;
}
.filter-grid button {
background: #007bff;
color: white;
border: none;
cursor: pointer;
font-weight: bold;
}
.filter-grid button:hover {
background: #0056b3;
}
.filter-grid button:disabled {
background: #6c757d;
cursor: not-allowed;
}
.schedule-section, .recommendations-section {
margin-bottom: 30px;
}
.schedule-section h2, .recommendations-section h2 {
color: #333;
border-bottom: 2px solid #007bff;
padding-bottom: 8px;
margin-bottom: 15px;
}
.empty-message {
text-align: center;
color: #6c757d;
padding: 40px;
font-style: italic;
}
.schedule-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
gap: 15px;
}
.schedule-card {
background: white;
border: 1px solid #e0e0e0;
border-radius: 8px;
padding: 15px;
transition: transform 0.2s, box-shadow 0.2s;
}
.schedule-card:hover {
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(0,0,0,0.1);
}
.card-header {
display: flex;
justify-content: space-between;
margin-bottom: 10px;
}
.type-badge {
background: #28a745;
color: white;
padding: 3px 8px;
border-radius: 3px;
font-size: 12px;
}
.channel {
color: #6c757d;
font-size: 12px;
}
.schedule-card h3 {
margin: 5px 0;
color: #333;
font-size: 16px;
}
.time {
color: #007bff;
font-weight: bold;
margin: 5px 0;
}
.episode {
color: #dc3545;
font-size: 12px;
margin: 3px 0;
}
.description {
color: #666;
font-size: 13px;
line-height: 1.4;
margin: 8px 0;
display: -webkit-box;
-webkit-line-clamp: 2;
-webkit-box-orient: vertical;
overflow: hidden;
}
.card-actions button {
width: 100%;
padding: 8px;
background: #28a745;
color: white;
border: none;
border-radius: 5px;
cursor: pointer;
font-weight: bold;
}
.card-actions button:hover {
background: #218838;
}
.refresh-btn {
background: #ffc107;
color: #212529;
border: none;
padding: 8px 16px;
border-radius: 5px;
cursor: pointer;
font-weight: bold;
margin-bottom: 10px;
}
.refresh-btn:hover {
background: #e0a800;
}
.recommendation-list {
display: flex;
flex-direction: column;
gap: 8px;
}
.recommendation-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 12px;
background: white;
border-radius: 5px;
border-left: 4px solid #007bff;
}
.rec-info {
display: flex;
gap: 10px;
align-items: center;
flex: 1;
}
.rec-info .type {
background: #6c757d;
color: white;
padding: 3px 8px;
border-radius: 3px;
font-size: 12px;
}
.rec-info .title {
font-weight: bold;
flex: 1;
}
.rec-info .time {
color: #666;
font-size: 12px;
}
.rec-score {
display: flex;
gap: 15px;
font-size: 12px;
color: #666;
}
/* 响应式设计 */
@media (max-width: 768px) {
.schedule-grid {
grid-template-columns: 1fr;
}
.filter-grid {
grid-template-columns: 1fr;
}
.app-header {
flex-direction: column;
gap: 10px;
text-align: center;
}
.notification-item {
flex-direction: column;
align-items: flex-start;
gap: 5px;
}
}
三、精准掌握节目动态的实用策略
3.1 个性化设置策略
用户偏好配置
class UserPreferenceManager:
def __init__(self, db_config):
self.db_config = db_config
def set_preference(self, user_id: str, program_type: str,
preferred_time_start: str, preferred_time_end: str,
notify_before_minutes: int = 15):
"""设置用户偏好"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
query = """
INSERT INTO user_preferences
(user_id, program_type, preferred_time_start, preferred_time_end, notify_before_minutes)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
preferred_time_start = VALUES(preferred_time_start),
preferred_time_end = VALUES(preferred_time_end),
notify_before_minutes = VALUES(notify_before_minutes)
"""
cursor.execute(query, (user_id, program_type, preferred_time_start,
preferred_time_end, notify_before_minutes))
conn.commit()
cursor.close()
conn.close()
def get_preference(self, user_id: str):
"""获取用户偏好"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT program_type, preferred_time_start, preferred_time_end, notify_before_minutes
FROM user_preferences
WHERE user_id = %s
""", (user_id,))
results = cursor.fetchall()
cursor.close()
conn.close()
return results
def update_watch_history(self, user_id: str, program_id: str,
completion_rate: float, rating: int = None):
"""更新观看历史"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
query = """
INSERT INTO user_watch_history
(user_id, program_id, watch_time, completion_rate, rating)
VALUES (%s, %s, NOW(), %s, %s)
ON DUPLICATE KEY UPDATE
watch_time = NOW(),
completion_rate = VALUES(completion_rate),
rating = VALUES(rating)
"""
cursor.execute(query, (user_id, program_id, completion_rate, rating))
conn.commit()
cursor.close()
conn.close()
# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
}
manager = UserPreferenceManager(db_config)
# 设置偏好
manager.set_preference(
user_id='user123',
program_type='新闻',
preferred_time_start='19:00',
preferred_time_end='20:00',
notify_before_minutes=10
)
# 更新观看历史
manager.update_watch_history(
user_id='user123',
program_id='program_001',
completion_rate=95.5,
rating=5
)
3.2 智能提醒机制
推送通知系统
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import json
class NotificationSystem:
def __init__(self, db_config, email_config=None, push_config=None):
self.db_config = db_config
self.email_config = email_config
self.push_config = push_config
def check_upcoming_programs(self):
"""检查即将开始的节目并发送提醒"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
# 查询未来15分钟内开始的节目
query = """
SELECT
up.user_id,
p.program_id,
p.title,
p.type,
s.start_time,
up.notify_before_minutes
FROM user_preferences up
JOIN schedule s ON up.program_type = p.type
JOIN programs p ON s.program_id = p.program_id
WHERE s.start_time BETWEEN NOW() AND DATE_ADD(NOW(), INTERVAL 15 MINUTE)
AND s.start_time > NOW()
AND NOT EXISTS (
SELECT 1 FROM notification_sent ns
WHERE ns.user_id = up.user_id
AND ns.program_id = p.program_id
AND ns.sent_at > DATE_SUB(NOW(), INTERVAL 1 HOUR)
)
"""
cursor.execute(query)
upcoming = cursor.fetchall()
for program in upcoming:
self.send_notification(program)
self.record_notification_sent(program['user_id'], program['program_id'])
cursor.close()
conn.close()
def send_notification(self, program):
"""发送通知"""
user_id = program['user_id']
# 发送邮件
if self.email_config:
self.send_email_notification(program)
# 发送推送
if self.push_config:
self.send_push_notification(program)
# 短信通知(可选)
self.send_sms_notification(program)
def send_email_notification(self, program):
"""发送邮件提醒"""
if not self.email_config:
return
msg = MIMEMultipart()
msg['From'] = self.email_config['sender']
msg['To'] = program['user_id'] + '@example.com' # 实际应从用户表获取
msg['Subject'] = f"节目提醒:{program['title']} 即将开始"
body = f"""
您好!
您关注的节目《{program['title']}》即将在 {program['start_time']} 开始。
节目类型:{program['type']}
提前15分钟提醒
请不要错过精彩内容!
"""
msg.attach(MIMEText(body, 'plain', 'utf-8'))
try:
server = smtplib.SMTP(self.email_config['host'], self.email_config['port'])
server.starttls()
server.login(self.email_config['sender'], self.email_config['password'])
server.send_message(msg)
server.quit()
print(f"邮件已发送: {program['title']}")
except Exception as e:
print(f"邮件发送失败: {e}")
def send_push_notification(self, program):
"""发送推送通知"""
if not self.push_config:
return
# 推送服务API调用示例(如极光推送、个推等)
payload = {
"platform": ["ios", "android"],
"audience": {
"alias": [program['user_id']]
},
"notification": {
"android": {
"alert": f"节目提醒:{program['title']} 即将开始",
"title": "节目提醒",
"builder_id": 1
},
"ios": {
"alert": f"节目提醒:{program['title']} 即将开始",
"sound": "default",
"badge": 1
}
},
"options": {
"time_to_live": 3600,
"apns_production": False
}
}
try:
response = requests.post(
f"{self.push_config['api_url']}/push",
headers={"Authorization": f"Basic {self.push_config['api_key']}"},
json=payload
)
print(f"推送结果: {response.status_code}")
except Exception as e:
print(f"推送失败: {e}")
def send_sms_notification(self, program):
"""发送短信提醒(示例)"""
# 实际使用时需要接入短信服务提供商
print(f"短信提醒: {program['title']} 将在15分钟后开始")
def record_notification_sent(self, user_id: str, program_id: str):
"""记录已发送的通知"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO notification_sent (user_id, program_id, sent_at)
VALUES (%s, %s, NOW())
""", (user_id, program_id))
conn.commit()
cursor.close()
conn.close()
# 定时任务示例(使用APScheduler)
from apscheduler.schedulers.background import BackgroundScheduler
def start_notification_service(db_config):
"""启动通知服务"""
notification_system = NotificationSystem(db_config)
scheduler = BackgroundScheduler()
# 每分钟检查一次
scheduler.add_job(
notification_system.check_upcoming_programs,
'interval',
minutes=1
)
scheduler.start()
print("通知服务已启动")
return scheduler
# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
}
email_config = {
'host': 'smtp.gmail.com',
'port': 587,
'sender': 'notifications@example.com',
'password': 'app_password'
}
# 启动服务
scheduler = start_notification_service(db_config)
# 保持程序运行
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
3.3 数据可视化展示
使用Chart.js展示收视趋势
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>节目排期分析仪表板</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
margin: 0;
padding: 20px;
background: #f5f5f5;
}
.dashboard {
max-width: 1400px;
margin: 0 auto;
}
.chart-container {
background: white;
padding: 20px;
margin-bottom: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.chart-grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
}
h1, h2 {
color: #333;
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-bottom: 20px;
}
.stat-card {
background: white;
padding: 20px;
border-radius: 8px;
text-align: center;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.stat-value {
font-size: 32px;
font-weight: bold;
color: #667eea;
}
.stat-label {
color: #666;
margin-top: 5px;
}
</style>
</head>
<body>
<div class="dashboard">
<h1>节目排期分析仪表板</h1>
<!-- 统计卡片 -->
<div class="stats-grid" id="statsGrid"></div>
<!-- 图表区域 -->
<div class="chart-grid">
<div class="chart-container">
<h2>收视率趋势预测</h2>
<canvas id="viewershipChart"></canvas>
</div>
<div class="chart-container">
<h2>节目类型分布</h2>
<canvas id="typeDistributionChart"></canvas>
</div>
</div>
<div class="chart-grid">
<div class="chart-container">
<h2>黄金时段热度</h2>
<canvas id="primeTimeChart"></canvas>
</div>
<div class="chart-container">
<h2>用户偏好分析</h2>
<canvas id="userPreferenceChart"></canvas>
</div>
</div>
</div>
<script>
// 模拟数据
const mockData = {
stats: {
totalPrograms: 156,
avgViewership: 850,
upcomingToday: 12,
userRecommendations: 8
},
viewershipTrend: {
labels: ['周一', '周二', '周三', '周四', '周五', '周六', '周日'],
datasets: [{
label: '预测收视率',
data: [780, 820, 790, 850, 920, 1100, 1050],
borderColor: '#667eea',
backgroundColor: 'rgba(102, 126, 234, 0.1)',
tension: 0.4
}, {
label: '实际收视率',
data: [765, 835, 780, 865, 910, 1120, 1035],
borderColor: '#764ba2',
backgroundColor: 'rgba(118, 75, 162, 0.1)',
tension: 0.4
}]
},
typeDistribution: {
labels: ['新闻', '电视剧', '综艺', '体育', '电影', '纪录片'],
data: [25, 35, 20, 10, 7, 3]
},
primeTime: {
labels: ['18:00', '19:00', '20:00', '21:00', '22:00'],
data: [650, 920, 1100, 980, 720]
},
userPreference: {
labels: ['新闻', '电视剧', '综艺', '体育'],
data: [40, 30, 20, 10]
}
};
// 初始化统计卡片
function initStats() {
const statsGrid = document.getElementById('statsGrid');
const stats = mockData.stats;
const cards = [
{ label: '今日节目总数', value: stats.totalPrograms },
{ label: '平均收视率', value: stats.avgViewership + '万' },
{ label: '即将开始', value: stats.upcomingToday },
{ label: '为您推荐', value: stats.userRecommendations }
];
statsGrid.innerHTML = cards.map(card => `
<div class="stat-card">
<div class="stat-value">${card.value}</div>
<div class="stat-label">${card.label}</div>
</div>
`).join('');
}
// 初始化收视率趋势图
function initViewershipChart() {
const ctx = document.getElementById('viewershipChart').getContext('2d');
new Chart(ctx, {
type: 'line',
data: mockData.viewershipTrend,
options: {
responsive: true,
plugins: {
legend: {
position: 'top',
},
tooltip: {
mode: 'index',
intersect: false,
}
},
scales: {
y: {
beginAtZero: true,
title: {
display: true,
text: '收视率(万)'
}
}
}
}
});
}
// 初始化类型分布图
function initTypeDistributionChart() {
const ctx = document.getElementById('typeDistributionChart').getContext('2d');
new Chart(ctx, {
type: 'doughnut',
data: {
labels: mockData.typeDistribution.labels,
datasets: [{
data: mockData.typeDistribution.data,
backgroundColor: [
'#FF6384', '#36A2EB', '#FFCE56', '#4BC0C0', '#9966FF', '#FF9F40'
]
}]
},
options: {
responsive: true,
plugins: {
legend: {
position: 'right',
}
}
}
});
}
// 初始化黄金时段图
function initPrimeTimeChart() {
const ctx = document.getElementById('primeTimeChart').getContext('2d');
new Chart(ctx, {
type: 'bar',
data: {
labels: mockData.primeTime.labels,
datasets: [{
label: '平均热度',
data: mockData.primeTime.data,
backgroundColor: 'rgba(75, 192, 192, 0.6)',
borderColor: 'rgba(75, 192, 192, 1)',
borderWidth: 1
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true,
title: {
display: true,
text: '热度值'
}
}
}
}
});
}
// 初始化用户偏好图
function initUserPreferenceChart() {
const ctx = document.getElementById('userPreferenceChart').getContext('2d');
new Chart(ctx, {
type: 'polarArea',
data: {
labels: mockData.userPreference.labels,
datasets: [{
data: mockData.userPreference.data,
backgroundColor: [
'rgba(255, 99, 132, 0.6)',
'rgba(54, 162, 235, 0.6)',
'rgba(255, 206, 86, 0.6)',
'rgba(75, 192, 192, 0.6)'
]
}]
},
options: {
responsive: true,
plugins: {
legend: {
position: 'right',
}
}
}
});
}
// 页面加载完成后初始化
document.addEventListener('DOMContentLoaded', function() {
initStats();
initViewershipChart();
initTypeDistributionChart();
initPrimeTimeChart();
initUserPreferenceChart();
});
</script>
</body>
</html>
四、高级功能与优化策略
4.1 实时数据更新机制
import asyncio
import websockets
import json
from datetime import datetime
class RealTimeScheduleUpdater:
def __init__(self, db_config):
self.db_config = db_config
self.subscribers = set()
async def broadcast_updates(self, update_data):
"""广播更新给所有订阅者"""
if not self.subscribers:
return
message = json.dumps({
'type': 'schedule_update',
'timestamp': datetime.now().isoformat(),
'data': update_data
})
await asyncio.gather(
*[subscriber.send(message) for subscriber in self.subscribers]
)
async def handle_websocket(self, websocket, path):
"""处理WebSocket连接"""
self.subscribers.add(websocket)
try:
async for message in websocket:
data = json.loads(message)
if data['action'] == 'subscribe':
# 用户订阅特定频道或节目类型
await self.subscribe_to_updates(websocket, data)
finally:
self.subscribers.remove(websocket)
async def subscribe_to_updates(self, websocket, data):
"""处理订阅请求"""
# 存储订阅信息
subscription = {
'websocket': websocket,
'channel_id': data.get('channel_id'),
'program_type': data.get('program_type'),
'user_id': data.get('user_id')
}
# 可以存储到Redis或内存中
print(f"用户 {data.get('user_id')} 订阅了更新")
def monitor_schedule_changes(self):
"""监控排期变化"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
# 检查最近5分钟内的变更
cursor.execute("""
SELECT schedule_id, program_id, start_time, end_time, change_type
FROM schedule_audit_log
WHERE changed_at > DATE_SUB(NOW(), INTERVAL 5 MINUTE)
AND notified = FALSE
""")
changes = cursor.fetchall()
if changes:
# 广播更新
asyncio.create_task(self.broadcast_updates(changes))
# 标记为已通知
change_ids = [c['schedule_id'] for c in changes]
cursor.execute("""
UPDATE schedule_audit_log
SET notified = TRUE
WHERE schedule_id IN (%s)
""", (','.join(map(str, change_ids)),))
conn.commit()
cursor.close()
conn.close()
# WebSocket服务器启动
async def start_websocket_server(host='localhost', port=8765):
updater = RealTimeScheduleUpdater({
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
})
server = await websockets.serve(
updater.handle_websocket, host, port
)
print(f"WebSocket服务器运行在 ws://{host}:{port}")
# 启动监控任务
asyncio.create_task(monitor_schedule_changes(updater))
return server
async def monitor_schedule_changes(updater):
"""定时监控排期变化"""
while True:
updater.monitor_schedule_changes()
await asyncio.sleep(60) # 每分钟检查一次
# 客户端连接示例
async def websocket_client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
# 发送订阅请求
await websocket.send(json.dumps({
'action': 'subscribe',
'user_id': 'user123',
'channel_id': 'CCTV1',
'program_type': '新闻'
}))
# 接收更新
async for message in websocket:
data = json.loads(message)
print(f"收到更新: {data}")
# 在这里处理实时更新,如刷新UI、发送通知等
4.2 机器学习模型优化
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import mean_absolute_error, mean_squared_error
import joblib
class ModelOptimizer:
def __init__(self):
self.best_model = None
self.best_params = None
def optimize_random_forest(self, X, y):
"""优化随机森林参数"""
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
rf = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
rf, param_grid,
cv=5,
scoring='neg_mean_absolute_error',
n_jobs=-1
)
grid_search.fit(X, y)
self.best_model = grid_search.best_estimator_
self.best_params = grid_search.best_params_
return self.best_model, self.best_params
def evaluate_model(self, model, X_test, y_test):
"""评估模型性能"""
predictions = model.predict(X_test)
mae = mean_absolute_error(y_test, predictions)
rmse = np.sqrt(mean_squared_error(y_test, predictions))
# 计算准确率(在阈值内)
threshold = 0.1 * np.mean(y_test)
accuracy = np.mean(np.abs(predictions - y_test) < threshold)
return {
'mae': mae,
'rmse': rmse,
'accuracy': accuracy
}
def save_model(self, model, filename):
"""保存模型"""
joblib.dump(model, filename)
print(f"模型已保存到 {filename}")
def load_model(self, filename):
"""加载模型"""
self.best_model = joblib.load(filename)
return self.best_model
# 使用示例
if __name__ == "__main__":
# 准备数据
from sklearn.datasets import make_regression
X, y = make_regression(n_samples=1000, n_features=10, noise=0.1, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
optimizer = ModelOptimizer()
# 优化模型
print("开始优化模型...")
model, params = optimizer.optimize_random_forest(X_train, y_train)
print(f"最佳参数: {params}")
# 评估模型
metrics = optimizer.evaluate_model(model, X_test, y_test)
print(f"评估结果: {metrics}")
# 保存模型
optimizer.save_model(model, 'schedule_predictor.pkl')
# 加载模型
loaded_model = optimizer.load_model('schedule_predictor.pkl')
4.3 A/B测试框架
import hashlib
from datetime import datetime
class ABTestFramework:
def __init__(self, db_config):
self.db_config = db_config
def assign_user_to_variant(self, user_id: str, test_name: str):
"""将用户分配到测试组"""
# 使用用户ID哈希确定分组(确保一致性)
hash_value = int(hashlib.md5(f"{user_id}:{test_name}".encode()).hexdigest(), 16)
# 假设50/50分组
variant = 'A' if hash_value % 2 == 0 else 'B'
# 记录分配
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO ab_test_assignments (user_id, test_name, variant, assigned_at)
VALUES (%s, %s, %s, NOW())
ON DUPLICATE KEY UPDATE variant = VALUES(variant)
""", (user_id, test_name, variant))
conn.commit()
cursor.close()
conn.close()
return variant
def get_recommendation_strategy(self, user_id: str):
"""根据测试组选择推荐策略"""
variant = self.assign_user_to_variant(user_id, 'recommendation_strategy')
if variant == 'A':
# 策略A:基于热门度
return self.popularity_based_recommendation
else:
# 策略B:基于协同过滤
return self.collaborative_filtering_recommendation
def popularity_based_recommendation(self, user_id: str, limit: int = 10):
"""基于热门度的推荐"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT p.program_id, p.title, p.type,
COALESCE(pp.predicted_viewership, 0) as score
FROM programs p
LEFT JOIN program_predictions pp ON p.program_id = pp.program_id
WHERE pp.prediction_date = CURDATE()
ORDER BY score DESC
LIMIT %s
""", (limit,))
results = cursor.fetchall()
cursor.close()
conn.close()
return results
def collaborative_filtering_recommendation(self, user_id: str, limit: int = 10):
"""基于协同过滤的推荐"""
# 使用之前实现的协同过滤算法
# 这里简化为返回随机结果
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT p.program_id, p.title, p.type, RAND() as score
FROM programs p
ORDER BY score DESC
LIMIT %s
""", (limit,))
results = cursor.fetchall()
cursor.close()
conn.close()
return results
def record_conversion(self, user_id: str, test_name: str,
variant: str, action: str):
"""记录用户转化行为"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO ab_test_conversions
(user_id, test_name, variant, action, created_at)
VALUES (%s, %s, %s, %s, NOW())
""", (user_id, test_name, variant, action))
conn.commit()
cursor.close()
conn.close()
def analyze_results(self, test_name: str):
"""分析测试结果"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
# 计算转化率
cursor.execute("""
SELECT
variant,
COUNT(DISTINCT user_id) as total_users,
COUNT(DISTINCT CASE WHEN action = 'watch' THEN user_id END) as watchers,
COUNT(DISTINCT CASE WHEN action = 'favorite' THEN user_id END) as favorites
FROM ab_test_assignments a
LEFT JOIN ab_test_conversions c ON a.user_id = c.user_id
AND a.test_name = c.test_name
WHERE a.test_name = %s
GROUP BY variant
""", (test_name,))
results = cursor.fetchall()
# 计算转化率
for result in results:
total = result['total_users']
result['watch_rate'] = (result['watchers'] / total * 100) if total > 0 else 0
result['favorite_rate'] = (result['favorites'] / total * 100) if total > 0 else 0
cursor.close()
conn.close()
return results
# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
}
ab_test = ABTestFramework(db_config)
# 为用户分配测试组
variant = ab_test.assign_user_to_variant('user123', 'recommendation_strategy')
print(f"用户分配到测试组: {variant}")
# 获取推荐
strategy = ab_test.get_recommendation_strategy('user123')
recommendations = strategy('user123', limit=5)
print(f"推荐结果: {recommendations}")
# 记录转化
ab_test.record_conversion('user123', 'recommendation_strategy', variant, 'watch')
# 分析结果
results = ab_test.analyze_results('recommendation_strategy')
print(f"测试结果: {results}")
五、最佳实践与注意事项
5.1 数据质量保障
class DataQualityValidator:
"""数据质量验证器"""
def __init__(self, db_config):
self.db_config = db_config
def validate_program_data(self, program_data: dict) -> bool:
"""验证节目数据完整性"""
required_fields = ['program_id', 'title', 'type', 'duration_minutes']
for field in required_fields:
if field not in program_data or not program_data[field]:
print(f"错误: 缺少必填字段 {field}")
return False
# 验证节目类型
valid_types = ['新闻', '电视剧', '综艺', '体育', '电影', '纪录片']
if program_data['type'] not in valid_types:
print(f"错误: 无效的节目类型 {program_data['type']}")
return False
# 验证时长
if program_data['duration_minutes'] <= 0:
print("错误: 节目时长必须大于0")
return False
return True
def validate_schedule_data(self, schedule_data: dict) -> bool:
"""验证排期数据"""
# 验证时间逻辑
start_time = schedule_data['start_time']
end_time = schedule_data['end_time']
if end_time <= start_time:
print("错误: 结束时间必须晚于开始时间")
return False
# 验证时间冲突
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM schedule
WHERE channel_id = %s
AND (
(start_time < %s AND end_time > %s)
OR (start_time < %s AND end_time > %s)
OR (start_time >= %s AND end_time <= %s)
)
""", (
schedule_data['channel_id'],
end_time, start_time,
start_time, end_time,
start_time, end_time
))
conflict_count = cursor.fetchone()[0]
cursor.close()
conn.close()
if conflict_count > 0:
print("错误: 存在时间冲突")
return False
return True
def clean_duplicate_programs(self):
"""清理重复节目"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
# 查找重复节目(根据标题和类型)
cursor.execute("""
SELECT title, type, COUNT(*) as cnt
FROM programs
GROUP BY title, type
HAVING cnt > 1
""")
duplicates = cursor.fetchall()
for title, p_type, count in duplicates:
print(f"发现重复: {title} ({p_type}) - {count}个")
# 保留最新的,删除旧的
cursor.execute("""
DELETE p1 FROM programs p1
INNER JOIN programs p2
WHERE p1.title = p2.title
AND p1.type = p2.type
AND p1.program_id < p2.program_id
""")
conn.commit()
cursor.close()
conn.close()
def validate_prediction_accuracy(self, days: int = 30):
"""验证预测准确性"""
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor(dictionary=True)
cursor.execute("""
SELECT
p.program_id,
p.title,
pp.predicted_viewership,
wh.actual_viewership,
ABS(pp.predicted_viewership - wh.actual_viewership) / wh.actual_viewership * 100 as error_rate
FROM program_predictions pp
JOIN programs p ON pp.program_id = p.program_id
JOIN (
SELECT program_id, COUNT(*) as actual_viewership
FROM user_watch_history
WHERE watch_time >= DATE_SUB(NOW(), INTERVAL %s DAY)
GROUP BY program_id
) wh ON pp.program_id = wh.program_id
WHERE pp.prediction_date >= DATE_SUB(NOW(), INTERVAL %s DAY)
AND pp.prediction_date < CURDATE()
""", (days, days))
results = cursor.fetchall()
if results:
avg_error = sum(r['error_rate'] for r in results) / len(results)
print(f"过去{days}天预测平均误差: {avg_error:.2f}%")
# 如果误差超过20%,发出警告
if avg_error > 20:
print("警告: 预测误差较大,建议重新训练模型")
cursor.close()
conn.close()
return results
# 使用示例
if __name__ == "__main__":
validator = DataQualityValidator({
'host': 'localhost',
'user': 'schedule_user',
'password': 'password',
'database': 'schedule_db'
})
# 验证节目数据
program_data = {
'program_id': 'PROG001',
'title': '晚间新闻',
'type': '新闻',
'duration_minutes': 30
}
if validator.validate_program_data(program_data):
print("节目数据验证通过")
# 验证排期数据
schedule_data = {
'channel_id': 'CCTV1',
'start_time': datetime(2024, 1, 1, 19, 0),
'end_time': datetime(2024, 1, 1, 19, 30)
}
if validator.validate_schedule_data(schedule_data):
print("排期数据验证通过")
# 清理重复数据
validator.clean_duplicate_programs()
# 验证预测准确性
validator.validate_prediction_accuracy()
5.2 性能优化建议
# 缓存实现
from functools import lru_cache
import redis
class CacheManager:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
@lru_cache(maxsize=128)
def get_cached_schedule(self, channel_id: str, date_str: str):
"""缓存排期查询结果"""
cache_key = f"schedule:{channel_id}:{date_str}"
# 先从Redis获取
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 从数据库查询
result = self.query_database(channel_id, date_str)
# 缓存1小时
self.redis_client.setex(cache_key, 3600, json.dumps(result))
return result
def query_database(self, channel_id: str, date_str: str):
"""数据库查询(简化)"""
# 实际实现中这里会连接数据库
return {"channel": channel_id, "date": date_str, "programs": []}
def invalidate_cache(self, channel_id: str, date_str: str):
"""使缓存失效"""
cache_key = f"schedule:{channel_id}:{date_str}"
self.redis_client.delete(cache_key)
# 数据库索引优化建议
"""
-- 为常用查询创建复合索引
CREATE INDEX idx_schedule_channel_time ON schedule(channel_id, start_time);
CREATE INDEX idx_programs_type ON programs(type);
CREATE INDEX idx_user_preferences_user ON user_preferences(user_id, program_type);
CREATE INDEX idx_watch_history_user_time ON user_watch_history(user_id, watch_time);
-- 分区表(适用于大数据量)
ALTER TABLE schedule PARTITION BY RANGE (YEAR(start_time)) (
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION pfuture VALUES LESS THAN MAXVALUE
);
"""
六、总结
通过本文的详细介绍,我们构建了一个完整的节目排期预测与查询系统。核心要点包括:
- 技术架构:采用前后端分离,使用机器学习进行预测,WebSocket实现实时更新
- 数据驱动:基于历史数据、用户行为和外部因素进行精准预测
- 个性化服务:通过用户偏好设置和协同过滤算法提供定制化推荐
- 智能提醒:多渠道推送确保用户不错过精彩内容
- 持续优化:通过A/B测试和模型评估不断改进系统
这套系统能够帮助用户精准掌握未来节目动态,有效避免错过精彩内容,同时提供个性化的观看体验。通过合理的数据架构和算法选择,系统可以在保证准确性的同时,提供流畅的用户体验。
