引言:理解股市时间序列预测的核心价值

股市交易时间序列预测是现代量化投资的核心技术,它通过分析历史价格数据、交易量、市场情绪等多维度信息,来预测未来股价走势和交易机会。精准的预测不仅能帮助投资者把握市场脉搏,还能有效规避潜在风险。

时间序列预测不同于传统的技术分析,它基于数学模型和统计学原理,通过识别数据中的模式、趋势和周期性变化来进行预测。这种方法更加客观和系统化,能够处理大量数据并发现人眼难以察觉的规律。

在实际应用中,时间序列预测面临诸多挑战:市场的非线性特征、外部事件冲击(如政策变化、公司公告)、市场情绪波动等都会影响预测准确性。因此,构建一个稳健的预测系统需要综合考虑多种因素,并采用先进的机器学习和深度学习技术。

本文将详细介绍如何利用时间序列分析技术进行股市预测,包括数据准备、特征工程、模型选择、风险控制等关键环节,并提供完整的代码示例。

1. 数据准备与预处理

1.1 数据收集

高质量的数据是预测的基础。我们需要收集以下数据:

  • 历史价格数据:开盘价、最高价、最低价、收盘价(OHLC)
  • 交易量数据:成交量、成交额
  • 基本面数据:市盈率、市净率、每股收益等
  • 市场情绪数据:新闻舆情、社交媒体情绪、分析师评级
  • 宏观经济数据:利率、通胀率、GDP增长率等
import pandas as pd
import numpy as np
import yfinance as yf
import akshare as ak
import warnings
warnings.filterwarnings('ignore')

def fetch_stock_data(symbol, start_date, end_date):
    """
    获取股票历史数据
    """
    try:
        # 使用akshare获取A股数据
        stock_df = ak.stock_zh_a_hist(symbol=symbol, period="daily", 
                                     start_date=start_date, end_date=end_date,
                                     adjust="qfq")
        # 重命名列
        stock_df.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'turnover', 'amplitude', 
                           'change_pct', 'change_amount', 'turnover_rate']
        stock_df['date'] = pd.to_datetime(stock_df['date'])
        stock_df.set_index('date', inplace=True)
        return stock_df
    except:
        # 如果akshare失败,使用yfinance获取美股数据
        ticker = yf.Ticker(symbol)
        stock_df = ticker.history(start=start_date, end=end_date)
        return stock_df

# 示例:获取贵州茅台数据
symbol = "600519"  # 贵州茅台
start_date = "2020-01-01"
end_date = "2024-01-01"
data = fetch_stock_data(symbol, start_date, end_date)
print(f"获取到 {len(data)} 条数据")
print(data.head())

1.2 数据清洗与预处理

数据清洗是确保数据质量的关键步骤,包括处理缺失值、异常值、重复值等。

def clean_stock_data(df):
    """
    清洗股票数据
    """
    # 1. 处理缺失值
    df = df.dropna()
    
    # 2. 处理异常值(使用3σ原则)
    for col in ['open', 'high', 'low', 'close']:
        mean = df[col].mean()
        std = df[col].std()
        df = df[(df[col] >= mean - 3*std) & (df[col] <= mean + 3*std)]
    
    # 3. 检查并处理重复索引
    df = df[~df.index.duplicated(keep='first')]
    
    # 4. 确保数据按时间排序
    df = df.sort_index()
    
    return df

# 清洗数据
cleaned_data = clean_stock_data(data)
print(f"清洗后剩余 {len(cleaned_data)} 条数据")

1.3 数据标准化与归一化

由于不同特征的量纲不同,需要进行标准化处理,使模型更容易收敛。

from sklearn.preprocessing import StandardScaler, MinMaxScaler

def scale_features(df, columns_to_scale):
    """
    标准化特征
    """
    # 创建副本避免修改原数据
    df_scaled = df.copy()
    
    # 使用StandardScaler进行标准化
    scaler = StandardScaler()
    df_scaled[columns_to_scale] = scaler.fit_transform(df[columns_to_scale])
    
    return df_scaled, scaler

# 选择需要标准化的列
columns_to_scale = ['open', 'high', 'low', 'close', 'volume']
scaled_data, scaler = scale_features(cleaned_data, columns_to_scale)
print("标准化后的数据:")
print(scaled_data.head())

2. 特征工程:构建预测指标体系

特征工程是时间序列预测中最重要的环节之一,好的特征能够显著提升模型性能。

2.1 技术指标计算

技术指标是基于价格和交易量计算的衍生指标,能够捕捉市场的趋势、动量、波动性等特征。

def calculate_technical_indicators(df):
    """
    计算常用技术指标
    """
    df = df.copy()
    
    # 1. 移动平均线
    df['MA5'] = df['close'].rolling(window=5).mean()
    df['MA10'] = df['close'].rolling(window=10).mean()
    df['MA20'] = df['close'].rolling(window=20).mean()
    df['MA60'] = df['close'].rolling(window=60).mean()
    
    # 2. 指数移动平均线
    df['EMA12'] = df['close'].ewm(span=12).mean()
    df['EMA26'] = df['close'].ewm(span=26).mean()
    
    # 3. MACD指标
    df['MACD'] = df['EMA12'] - df['EMA26']
    df['MACD_signal'] = df['MACD'].ewm(span=9).mean()
    df['MACD_hist'] = df['MACD'] - df['MACD_signal']
    
    # 4. RSI相对强弱指标
    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['RSI'] = 100 - (100 / (1 + rs))
    
    # 5. Bollinger Bands
    df['BB_mid'] = df['close'].rolling(window=20).mean()
    df['BB_std'] = df['close'].rolling(window=20).std()
    df['BB_upper'] = df['BB_mid'] + 2 * df['BB_std']
    df['BB_lower'] = df['BB_mid'] - 2 * df['BB_std']
    
    # 6. 布林带宽度
    df['BB_width'] = (df['BB_upper'] - df['BB_lower']) / df['BB_mid']
    
    # 7. 成交量指标
    df['VOL_MA5'] = df['volume'].rolling(window=5).mean()
    df['VOL_MA10'] = df['volume'].rolling(window=10).mean()
    df['VOL_ratio'] = df['volume'] / df['VOL_MA10']
    
    # 8. 价格变化率
    df['price_change'] = df['close'].pct_change()
    df['price_change_5'] = df['close'].pct_change(5)
    df['price_change_20'] = df['close'].pct_change(20)
    
    # 9. 波动率(标准差)
    df['volatility'] = df['close'].rolling(window=20).std()
    
    # 10. ATR(平均真实波幅)
    high_low = df['high'] - df['low']
    high_close = np.abs(df['high'] - df['close'].shift())
    low_close = np.abs(df['low'] - df['close'].shift())
    true_range = np.maximum(high_low, np.maximum(high_close, low_close))
    df['ATR'] = true_range.rolling(window=14).mean()
    
    # 11. KD指标
    low_min = df['low'].rolling(window=9).min()
    high_max = df['high'].rolling(window=9).max()
    df['K'] = 100 * (df['close'] - low_min) / (high_max - low_min)
    df['D'] = df['K'].rolling(window=3).mean()
    df['J'] = 3 * df['K'] - 2 * df['D']
    
    # 12. OBV能量潮
    df['OBV'] = (np.sign(df['close'].diff()) * df['volume']).cumsum()
    
    # 13. 均线排列状态
    df['MA排列'] = np.where((df['MA5'] > df['MA10']) & (df['MA10'] > df['MA20']), 1, 
                           np.where((df['MA5'] < df['MA10']) & (df['MA10'] < df['MA20']), -1, 0))
    
    return df

# 计算技术指标
data_with_indicators = calculate_technical_indicators(cleaned_data)
print("添加技术指标后的数据:")
print(data_with_indicators[['close', 'MA5', 'MA10', 'RSI', 'MACD', 'BB_width']].tail())

2.2 时间特征提取

时间序列数据具有时间依赖性,提取时间特征有助于模型理解时间模式。

def extract_time_features(df):
    """
    提取时间特征
    """
    df = df.copy()
    
    # 1. 时间周期特征
    df['day_of_week'] = df.index.dayofweek
    df['month'] = df.index.month
    df['quarter'] = df.index.quarter
    df['day_of_month'] = df.index.day
    
    # 2. 是否月初/月末
    df['is_month_start'] = df.index.is_month_start.astype(int)
    df['is_month_end'] = df.index.is_month_end.astype(int)
    
    # 3. 是否季度末
    df['is_quarter_end'] = df.index.is_quarter_end.astype(int)
    
    # 4. 是否周末(对于A股,周末不开市,但可以作为特征)
    df['is_weekend'] = df.index.dayofweek.isin([5, 6]).astype(int)
    
    # 5. 距离节假日天数(简化版)
    # 实际应用中需要结合具体节假日日历
    df['days_to_holiday'] = 0  # 这里简化处理
    
    # 6. 交易日序号
    df['trading_day_seq'] = range(len(df))
    
    return df

# 提取时间特征
data_with_time = extract_time_features(data_with_indicators)
print("添加时间特征后的数据:")
print(data_with_time[['close', 'day_of_week', 'month', 'is_month_start']].tail())

2.3 滞后特征与滚动统计

时间序列预测的核心是利用历史信息预测未来,因此滞后特征和滚动统计特征非常重要。

def create_lag_features(df, lag_periods=[1, 2, 3, 5, 10]):
    """
    创建滞后特征
    """
    df = df.copy()
    
    for period in lag_periods:
        # 价格滞后
        df[f'close_lag_{period}'] = df['close'].shift(period)
        df[f'volume_lag_{period}'] = df['volume'].shift(period)
        df[f'return_lag_{period}'] = df['close'].pct_change(period).shift(1)
        
        # 技术指标滞后
        df[f'RSI_lag_{period}'] = df['RSI'].shift(period)
        df[f'MACD_lag_{period}'] = df['MACD'].shift(period)
        
    return df

def create_rolling_features(df, windows=[5, 10, 20]):
    """
    创建滚动统计特征
    """
    df = df.copy()
    
    for window in windows:
        # 滚动均值
        df[f'close_rolling_mean_{window}'] = df['close'].rolling(window=window).mean()
        df[f'volume_rolling_mean_{window}'] = df['volume'].rolling(window=window).mean()
        
        # 滚动标准差(波动率)
        df[f'close_rolling_std_{window}'] = df['close'].rolling(window=window).std()
        
        # 滚动最大最小值
        df[f'close_rolling_max_{window}'] = df['close'].rolling(window=window).max()
        df[f'close_rolling_min_{window}'] = df['close'].rolling(window=window).min()
        
        # 滚动分位数
        df[f'close_rolling_quantile_25_{window}'] = df['close'].rolling(window=window).quantile(0.25)
        df[f'close_rolling_quantile_75_{window}'] = df['close'].rolling(window=window).quantile(0.5)
        
        # 滚动偏度和峰度
        df[f'close_rolling_skew_{window}'] = df['close'].rolling(window=window).skew()
        df[f'close_rolling_kurt_{window}'] = df['close'].rolling(window=window).kurt()
        
    return df

# 创建滞后和滚动特征
data_with_lag = create_lag_features(data_with_time)
data_with_rolling = create_rolling_features(data_with_lag)

print("添加滞后和滚动特征后的数据列:")
print(f"总列数:{len(data_with_rolling.columns)}")
print(f"部分列名:{list(data_with_rolling.columns)[:15]}")

2.4 目标变量构建

在预测中,我们需要明确预测目标。常见的目标包括:

  • 分类目标:预测涨跌方向(涨=1,跌=0)
  • 回归目标:预测未来价格或收益率
  • 多步预测:预测未来多个时间点的价格
def create_target_variables(df, prediction_horizon=5):
    """
    创建目标变量
    prediction_horizon: 预测未来几天
    """
    df = df.copy()
    
    # 1. 未来价格
    df[f'future_price_{prediction_horizon}'] = df['close'].shift(-prediction_horizon)
    
    # 2. 未来收益率
    df[f'future_return_{prediction_horizon}'] = df['close'].pct_change(prediction_horizon).shift(-prediction_horizon)
    
    # 3. 涨跌分类目标(二分类)
    df[f'price_direction_{prediction_horizon}'] = np.where(df[f'future_return_{prediction_horizon}'] > 0, 1, 0)
    
    # 4. 多分类目标(大涨、小涨、小跌、大跌)
    def multi_class_target(return_value):
        if return_value > 0.02:
            return 3  # 大涨
        elif return_value > 0:
            return 2  # 小涨
        elif return_value > -0.02:
            return 1  # 小跌
        else:
            return 0  # 大跌
    
    df[f'price_multi_class_{prediction_horizon}'] = df[f'future_return_{prediction_horizon}'].apply(multi_class_target)
    
    return df

# 创建目标变量
data_with_target = create_target_variables(data_with_rolling, prediction_horizon=5)
print("添加目标变量后的数据:")
print(data_with_target[['close', 'future_price_5', 'future_return_5', 'price_direction_5', 'price_multi_class_5']].tail(10))

3. 模型选择与构建

3.1 传统机器学习模型

对于时间序列预测,我们可以使用多种机器学习模型,包括随机森林、梯度提升树等。

from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

def prepare_model_data(df, target_col, drop_cols=None):
    """
    准备模型数据
    """
    if drop_cols is None:
        drop_cols = []
    
    # 删除包含NaN的行
    df_clean = df.dropna()
    
    # 特征和目标
    X = df_clean.drop(columns=[target_col] + drop_cols)
    y = df_clean[target_col]
    
    # 删除原始数据中的非数值列
    X = X.select_dtypes(include=[np.number])
    
    return X, y

def train_time_series_model(X, y, model_type='random_forest'):
    """
    训练时间序列模型(使用时间序列交叉验证)
    """
    # 时间序列分割(避免未来数据泄露)
    tscv = TimeSeriesSplit(n_splits=5)
    
    if model_type == 'random_forest':
        model = RandomForestClassifier(n_estimators=100, random_state=42, max_depth=10)
    elif model_type == 'gradient_boosting':
        model = GradientBoostingClassifier(n_estimators=100, random_state=42, max_depth=5)
    elif model_type == 'logistic':
        model = LogisticRegression(random_state=42, max_iter=1000)
    
    # 存储每折的性能
    scores = []
    predictions = []
    actuals = []
    
    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
        
        # 训练模型
        model.fit(X_train, y_train)
        
        # 预测
        y_pred = model.predict(X_test)
        
        # 评估
        score = accuracy_score(y_test, y_pred)
        scores.append(score)
        
        predictions.extend(y_pred)
        actuals.extend(y_test.values)
    
    # 总体评估
    print(f"平均准确率: {np.mean(scores):.4f} (+/- {np.std(scores):.4f})")
    print("\n分类报告:")
    print(classification_report(actuals, predictions))
    
    # 混淆矩阵
    cm = confusion_matrix(actuals, predictions)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
    plt.title('Confusion Matrix')
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.show()
    
    # 重新在全量数据上训练最终模型
    model.fit(X, y)
    
    return model, scores

# 准备数据
target_col = 'price_direction_5'
X, y = prepare_model_data(data_with_target, target_col)

print(f"特征维度: {X.shape[1]}")
print(f"样本数量: {X.shape[0]}")
print(f"目标分布:\n{y.value_counts(normalize=True)}")

# 训练随机森林模型
rf_model, rf_scores = train_time_series_model(X, y, model_type='random_forest')

3.2 深度学习模型:LSTM

LSTM(长短期记忆网络)是处理时间序列数据的经典深度学习模型,特别适合捕捉长期依赖关系。

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

class StockDataset(Dataset):
    """
    股票数据集类
    """
    def __init__(self, X, y, sequence_length=30):
        self.X = torch.FloatTensor(X.values)
        self.y = torch.FloatTensor(y.values)
        self.seq_len = sequence_length
        
    def __len__(self):
        return len(self.X) - self.seq_len + 1
    
    def __getitem__(self, idx):
        return self.X[idx:idx+self.seq_len], self.y[idx+self.seq_len-1]

class LSTMModel(nn.Module):
    """
    LSTM模型
    """
    def __init__(self, input_size, hidden_size=64, num_layers=2, output_size=1, dropout=0.2):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, 
                           batch_first=True, dropout=dropout)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()  # 用于二分类
        
    def forward(self, x):
        # 初始化隐藏状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        
        # LSTM前向传播
        out, _ = self.lstm(x, (h0, c0))
        
        # 取最后一个时间步的输出
        out = out[:, -1, :]
        out = self.dropout(out)
        out = self.fc(out)
        out = self.sigmoid(out)
        
        return out

def train_lstm_model(X_train, y_train, X_val, y_val, sequence_length=30, epochs=50, batch_size=32):
    """
    训练LSTM模型
    """
    # 创建数据集
    train_dataset = StockDataset(X_train, y_train, sequence_length)
    val_dataset = StockDataset(X_val, y_val, sequence_length)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    # 初始化模型
    input_size = X_train.shape[1]
    model = LSTMModel(input_size=input_size, hidden_size=64, num_layers=2, output_size=1)
    
    # 损失函数和优化器
    criterion = nn.BCELoss()  # 二分类交叉熵
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
    # 训练记录
    train_losses = []
    val_losses = []
    
    for epoch in range(epochs):
        # 训练阶段
        model.train()
        train_loss = 0
        for batch_X, batch_y in train_loader:
            batch_X = batch_X.unsqueeze(-1) if batch_X.dim() == 2 else batch_X
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs.squeeze(), batch_y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        # 验证阶段
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X = batch_X.unsqueeze(-1) if batch_X.dim() == 2 else batch_X
                outputs = model(batch_X)
                loss = criterion(outputs.squeeze(), batch_y)
                val_loss += loss.item()
        
        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
    
    # 绘制损失曲线
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    plt.show()
    
    return model, train_losses, val_losses

def evaluate_lstm_model(model, X_test, y_test, sequence_length=30):
    """
    评估LSTM模型
    """
    test_dataset = StockDataset(X_test, y_test, sequence_length)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    model.eval()
    predictions = []
    actuals = []
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X = batch_X.unsqueeze(-1) if batch_X.dim() == 2 else batch_X
            outputs = model(batch_X)
            preds = (outputs.squeeze() > 0.5).float()
            predictions.extend(preds.numpy())
            actuals.extend(batch_y.numpy())
    
    # 计算准确率
    accuracy = accuracy_score(actuals, predictions)
    print(f"LSTM模型准确率: {accuracy:.4f}")
    print("\n分类报告:")
    print(classification_report(actuals, predictions))
    
    return predictions, actuals

# 准备LSTM数据(需要归一化)
from sklearn.preprocessing import MinMaxScaler

# 重新准备数据,使用MinMaxScaler
scaler_lstm = MinMaxScaler()
X_lstm = scaler_lstm.fit_transform(X)

# 分割数据集
train_size = int(len(X_lstm) * 0.7)
val_size = int(len(X_lstm) * 0.15)
test_size = len(X_lstm) - train_size - val_size

X_train_lstm = X_lstm[:train_size]
y_train_lstm = y[:train_size]
X_val_lstm = X_lstm[train_size:train_size+val_size]
y_val_lstm = y[train_size:train_size+val_size]
X_test_lstm = X_lstm[train_size+val_size:]
y_test_lstm = y[train_size+val_size:]

print(f"训练集: {len(X_train_lstm)}, 验证集: {len(X_val_lstm)}, 测试集: {len(X_test_lstm)}")

# 训练LSTM模型
lstm_model, train_losses, val_losses = train_lstm_model(
    X_train_lstm, y_train_lstm, X_val_lstm, y_val_lstm, 
    sequence_length=30, epochs=50, batch_size=32
)

# 评估LSTM模型
lstm_predictions, lstm_actuals = evaluate_lstm_model(lstm_model, X_test_lstm, y_test_lstm)

3.3 集成模型与模型融合

为了提升预测稳定性,可以采用模型融合策略,结合多个模型的优势。

class EnsembleModel:
    """
    集成模型类
    """
    def __init__(self, models, weights=None):
        self.models = models
        self.weights = weights if weights is not None else [1/len(models)] * len(models)
    
    def predict(self, X):
        """
        加权平均预测
        """
        predictions = []
        for model in self.models:
            if hasattr(model, 'predict_proba'):
                # 对于分类模型,使用概率预测
                pred = model.predict_proba(X)[:, 1]
            else:
                # 对于深度学习模型
                model.eval()
                with torch.no_grad():
                    X_tensor = torch.FloatTensor(X.values if hasattr(X, 'values') else X)
                    if X_tensor.dim() == 2:
                        X_tensor = X_tensor.unsqueeze(0)
                    pred = model(X_tensor).squeeze().numpy()
            predictions.append(pred)
        
        # 加权平均
        weighted_pred = np.average(predictions, axis=0, weights=self.weights)
        return (weighted_pred > 0.5).astype(int)
    
    def evaluate(self, X, y):
        """
        评估集成模型
        """
        predictions = self.predict(X)
        accuracy = accuracy_score(y, predictions)
        print(f"集成模型准确率: {accuracy:.4f}")
        print("\n分类报告:")
        print(classification_report(y, predictions))
        return predictions

# 创建集成模型
# 注意:这里需要将LSTM模型转换为可预测的函数
class LSTMWrapper:
    def __init__(self, lstm_model, sequence_length, input_scaler):
        self.lstm_model = lstm_model
        self.seq_len = sequence_length
        self.scaler = input_scaler
    
    def predict_proba(self, X):
        # 需要重新构建序列
        if len(X) < self.seq_len:
            return np.zeros(len(X))
        
        # 归一化
        X_scaled = self.scaler.transform(X)
        
        # 创建序列
        sequences = []
        for i in range(len(X_scaled) - self.seq_len + 1):
            seq = X_scaled[i:i+self.seq_len]
            sequences.append(seq)
        
        if not sequences:
            return np.zeros(len(X))
        
        sequences = np.array(sequences)
        
        # 预测
        self.lstm_model.eval()
        with torch.no_grad():
            X_tensor = torch.FloatTensor(sequences)
            preds = self.lstm_model(X_tensor).squeeze().numpy()
        
        # 补齐长度
        if len(preds) < len(X):
            padding = np.zeros(len(X) - len(preds))
            preds = np.concatenate([padding, preds])
        
        return np.column_stack([1-preds, preds])  # 返回概率格式

# 创建集成模型
lstm_wrapper = LSTMWrapper(lstm_model, 30, scaler_lstm)
ensemble = EnsembleModel([rf_model, lstm_wrapper], weights=[0.5, 0.5])

# 评估集成模型(使用测试集)
X_test = X.iloc[train_size+val_size:]
y_test = y.iloc[train_size+val_size:]
ensemble_predictions = ensemble.evaluate(X_test.values, y_test.values)

4. 风险控制与回测系统

4.1 风险指标计算

在实际交易中,风险控制比预测准确性更重要。我们需要计算多个风险指标。

def calculate_risk_metrics(returns):
    """
    计算风险指标
    """
    metrics = {}
    
    # 1. 累计收益
    cumulative_return = (1 + returns).cumprod()
    metrics['cumulative_return'] = cumulative_return.iloc[-1]
    
    # 2. 年化收益
    annual_return = (1 + metrics['cumulative_return']) ** (252 / len(returns)) - 1
    metrics['annual_return'] = annual_return
    
    # 3. 年化波动率
    annual_volatility = returns.std() * np.sqrt(252)
    metrics['annual_volatility'] = annual_volatility
    
    # 4. 夏普比率(假设无风险利率为3%)
    risk_free_rate = 0.03
    sharpe_ratio = (annual_return - risk_free_rate) / annual_volatility
    metrics['sharpe_ratio'] = sharpe_ratio
    
    # 5. 最大回撤
    rolling_max = cumulative_return.cummax()
    drawdown = (cumulative_return - rolling_max) / rolling_max
    max_drawdown = drawdown.min()
    metrics['max_drawdown'] = max_drawdown
    
    # 6. 胜率
    win_rate = (returns > 0).mean()
    metrics['win_rate'] = win_rate
    
    # 7. 盈亏比
    avg_win = returns[returns > 0].mean()
    avg_loss = returns[returns < 0].mean()
    profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else np.inf
    metrics['profit_factor'] = profit_factor
    
    # 8. 连续亏损次数
    consecutive_losses = 0
    max_consecutive_losses = 0
    for r in returns:
        if r < 0:
            consecutive_losses += 1
            max_consecutive_losses = max(max_consecutive_losses, consecutive_losses)
        else:
            consecutive_losses = 0
    metrics['max_consecutive_losses'] = max_consecutive_losses
    
    return metrics

def print_risk_metrics(metrics):
    """
    打印风险指标
    """
    print("=== 风险指标 ===")
    print(f"累计收益: {metrics['cumulative_return']:.2%}")
    print(f"年化收益: {metrics['annual_return']:.2%}")
    print(f"年化波动率: {metrics['annual_volatility']:.2%}")
    print(f"夏普比率: {metrics['sharpe_ratio']:.2f}")
    print(f"最大回撤: {metrics['max_drawdown']:.2%}")
    print(f"胜率: {metrics['win_rate']:.2%}")
    print(f"盈亏比: {metrics['profit_factor']:.2f}")
    print(f"最大连续亏损次数: {metrics['max_consecutive_losses']}")

4.2 简单回测系统

class SimpleBacktest:
    """
    简单回测系统
    """
    def __init__(self, initial_capital=100000):
        self.initial_capital = initial_capital
        self.capital = initial_capital
        self.position = 0  # 持仓数量
        self.trades = []
        self.equity_curve = []
        
    def run_backtest(self, data, predictions, transaction_cost=0.001):
        """
        运行回测
        """
        self.capital = self.initial_capital
        self.position = 0
        self.trades = []
        self.equity_curve = []
        
        # 确保索引对齐
        data = data.iloc[len(data) - len(predictions):]
        
        for i, (idx, row) in enumerate(data.iterrows()):
            if i >= len(predictions):
                break
                
            prediction = predictions[i]
            price = row['close']
            
            # 记录当前权益
            current_equity = self.capital + self.position * price
            self.equity_curve.append(current_equity)
            
            # 交易逻辑:预测上涨则买入,预测下跌则卖出
            if prediction == 1 and self.position == 0:  # 买入信号
                shares_to_buy = self.capital // price
                if shares_to_buy > 0:
                    cost = shares_to_buy * price * (1 + transaction_cost)
                    if cost <= self.capital:
                        self.position = shares_to_buy
                        self.capital -= cost
                        self.trades.append({
                            'date': idx,
                            'action': 'BUY',
                            'price': price,
                            'shares': shares_to_buy,
                            'cost': cost
                        })
            
            elif prediction == 0 and self.position > 0:  # 卖出信号
                revenue = self.position * price * (1 - transaction_cost)
                self.capital += revenue
                self.trades.append({
                    'date': idx,
                    'action': 'SELL',
                    'price': price,
                    'shares': self.position,
                    'revenue': revenue
                })
                self.position = 0
        
        # 最后平仓
        if self.position > 0:
            last_price = data.iloc[-1]['close']
            revenue = self.position * last_price * (1 - transaction_cost)
            self.capital += revenue
            self.trades.append({
                'date': data.index[-1],
                'action': 'SELL',
                'price': last_price,
                'shares': self.position,
                'revenue': revenue
            })
            self.position = 0
        
        return self.get_results()
    
    def get_results(self):
        """
        获取回测结果
        """
        if not self.equity_curve:
            return None
            
        equity_series = pd.Series(self.equity_curve)
        returns = equity_series.pct_change().dropna()
        
        # 计算风险指标
        risk_metrics = calculate_risk_metrics(returns)
        
        # 交易统计
        buy_trades = [t for t in self.trades if t['action'] == 'BUY']
        sell_trades = [t for t in self.trades if t['action'] == 'SELL']
        
        # 计算交易相关指标
        total_trades = len(buy_trades)
        if total_trades > 0:
            avg_trade_size = np.mean([t['shares'] for t in buy_trades])
        else:
            avg_trade_size = 0
        
        results = {
            'initial_capital': self.initial_capital,
            'final_capital': self.capital,
            'total_return': (self.capital - self.initial_capital) / self.initial_capital,
            'num_trades': total_trades,
            'avg_trade_size': avg_trade_size,
            'equity_curve': equity_series,
            'returns': returns,
            **risk_metrics
        }
        
        return results

def plot_backtest_results(results):
    """
    绘制回测结果
    """
    if results is None:
        return
        
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # 1. 权益曲线
    axes[0, 0].plot(results['equity_curve'].values)
    axes[0, 0].set_title('Equity Curve')
    axes[0, 0].set_xlabel('Time')
    axes[0, 0].set_ylabel('Equity')
    axes[0, 0].grid(True)
    
    # 2. 收益分布
    axes[0, 1].hist(results['returns'].values, bins=50, alpha=0.7)
    axes[0, 1].set_title('Return Distribution')
    axes[0, 1].set_xlabel('Return')
    axes[0, 1].set_ylabel('Frequency')
    axes[0, 1].grid(True)
    
    # 3. 累计收益
    cumulative_returns = (1 + results['returns']).cumprod()
    axes[1, 0].plot(cumulative_returns.values)
    axes[1, 0].set_title('Cumulative Returns')
    axes[1, 0].set_xlabel('Time')
    axes[1, 0].set_ylabel('Cumulative Return')
    axes[1, 0].grid(True)
    
    # 4. 回撤曲线
    rolling_max = results['equity_curve'].cummax()
    drawdown = (results['equity_curve'] - rolling_max) / rolling_max
    axes[1, 1].plot(drawdown.values, color='red')
    axes[1, 1].set_title('Drawdown')
    axes[1, 1].set_xlabel('Time')
    axes[1, 1].set_ylabel('Drawdown')
    axes[1, 1].grid(True)
    
    plt.tight_layout()
    plt.show()

# 运行回测(使用随机森林预测结果)
backtest = SimpleBacktest(initial_capital=100000)
results = backtest.run_backtest(data_with_target, ensemble_predictions, transaction_cost=0.001)

if results:
    print_risk_metrics(results)
    plot_backtest_results(results)

4.3 风险控制策略

class RiskManagedBacktest(SimpleBacktest):
    """
    带风险控制的回测系统
    """
    def __init__(self, initial_capital=100000, max_position_size=0.3, stop_loss_pct=0.05, take_profit_pct=0.1):
        super().__init__(initial_capital)
        self.max_position_size = max_position_size  # 单笔最大仓位比例
        self.stop_loss_pct = stop_loss_pct  # 止损比例
        self.take_profit_pct = take_profit_pct  # 止盈比例
        self.entry_price = None  # 建仓价格
        
    def run_backtest(self, data, predictions, transaction_cost=0.001):
        """
        运行带风险控制的回测
        """
        self.capital = self.initial_capital
        self.position = 0
        self.trades = []
        self.equity_curve = []
        self.entry_price = None
        
        data = data.iloc[len(data) - len(predictions):]
        
        for i, (idx, row) in enumerate(data.iterrows()):
            if i >= len(predictions):
                break
                
            prediction = predictions[i]
            price = row['close']
            
            # 记录当前权益
            current_equity = self.capital + self.position * price
            self.equity_curve.append(current_equity)
            
            # 止损和止盈检查
            if self.position > 0 and self.entry_price is not None:
                # 止损
                if price < self.entry_price * (1 - self.stop_loss_pct):
                    self._close_position(price, idx, 'STOP_LOSS')
                    continue
                
                # 止盈
                if price > self.entry_price * (1 + self.take_profit_pct):
                    self._close_position(price, idx, 'TAKE_PROFIT')
                    continue
            
            # 交易逻辑
            if prediction == 1 and self.position == 0:  # 买入信号
                # 计算仓位大小(根据风险控制)
                position_size = min(self.max_position_size * self.capital, self.capital)
                shares_to_buy = int(position_size // price)
                
                if shares_to_buy > 0:
                    cost = shares_to_buy * price * (1 + transaction_cost)
                    if cost <= self.capital:
                        self.position = shares_to_buy
                        self.capital -= cost
                        self.entry_price = price
                        self.trades.append({
                            'date': idx,
                            'action': 'BUY',
                            'price': price,
                            'shares': shares_to_buy,
                            'cost': cost,
                            'type': 'NORMAL'
                        })
            
            elif prediction == 0 and self.position > 0:  # 卖出信号
                self._close_position(price, idx, 'NORMAL')
        
        # 最后平仓
        if self.position > 0:
            self._close_position(data.iloc[-1]['close'], data.index[-1], 'FORCE_CLOSE')
        
        return self.get_results()
    
    def _close_position(self, price, date, close_type):
        """
        平仓操作
        """
        revenue = self.position * price * (1 - 0.001)  # 交易成本
        self.capital += revenue
        self.trades.append({
            'date': date,
            'action': 'SELL',
            'price': price,
            'shares': self.position,
            'revenue': revenue,
            'type': close_type
        })
        self.position = 0
        self.entry_price = None

# 运行带风险控制的回测
risk_backtest = RiskManagedBacktest(
    initial_capital=100000,
    max_position_size=0.3,
    stop_loss_pct=0.05,
    take_profit_pct=0.1
)

risk_results = risk_backtest.run_backtest(data_with_target, ensemble_predictions, transaction_cost=0.001)

if risk_results:
    print("\n=== 带风险控制的回测结果 ===")
    print_risk_metrics(risk_results)
    plot_backtest_results(risk_results)

5. 模型评估与优化

5.1 多维度评估指标

除了准确率,还需要考虑其他评估指标,如精确率、召回率、F1分数、ROC-AUC等。

from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score

def comprehensive_evaluation(y_true, y_pred, y_proba=None):
    """
    综合评估函数
    """
    print("=== 综合评估指标 ===")
    
    # 基础指标
    accuracy = accuracy_score(y_true, y_pred)
    print(f"准确率: {accuracy:.4f}")
    
    # 分类报告
    print("\n分类报告:")
    print(classification_report(y_true, y_pred))
    
    # 混淆矩阵
    cm = confusion_matrix(y_true, y_pred)
    print("\n混淆矩阵:")
    print(cm)
    
    # 如果有概率预测,计算ROC-AUC
    if y_proba is not None:
        # ROC曲线
        fpr, tpr, thresholds = roc_curve(y_true, y_proba)
        roc_auc = auc(fpr, tpr)
        print(f"\nROC-AUC: {roc_auc:.4f}")
        
        # 绘制ROC曲线
        plt.figure(figsize=(8, 6))
        plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.2f})')
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('ROC Curve')
        plt.legend(loc="lower right")
        plt.grid(True)
        plt.show()
        
        # PR曲线
        precision, recall, _ = precision_recall_curve(y_true, y_proba)
        avg_precision = average_precision_score(y_true, y_proba)
        
        plt.figure(figsize=(8, 6))
        plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AP = {avg_precision:.2f})')
        plt.xlabel('Recall')
        plt.ylabel('Precision')
        plt.title('Precision-Recall Curve')
        plt.legend(loc="lower left")
        plt.grid(True)
        plt.show()

# 获取预测概率(用于综合评估)
# 对于随机森林
rf_proba = rf_model.predict_proba(X_test)[:, 1]

# 对于LSTM(需要特殊处理)
def get_lstm_proba(lstm_model, X, sequence_length, scaler):
    test_dataset = StockDataset(X, y_test_lstm, sequence_length)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    lstm_model.eval()
    probas = []
    
    with torch.no_grad():
        for batch_X, _ in test_loader:
            batch_X = batch_X.unsqueeze(-1) if batch_X.dim() == 2 else batch_X
            outputs = lstm_model(batch_X)
            probas.extend(outputs.squeeze().numpy())
    
    # 补齐长度
    if len(probas) < len(X):
        padding = np.zeros(len(X) - len(probas))
        probas = np.concatenate([padding, probas])
    
    return probas

lstm_proba = get_lstm_proba(lstm_model, X_test_lstm, 30, scaler_lstm)

# 综合评估
print("随机森林模型评估:")
comprehensive_evaluation(y_test.values, ensemble_predictions, rf_proba)

print("\nLSTM模型评估:")
comprehensive_evaluation(y_test_lstm.values, lstm_predictions, lstm_proba)

5.2 特征重要性分析

理解哪些特征对预测最重要,有助于优化模型和解释预测结果。

def analyze_feature_importance(model, feature_names, top_n=20):
    """
    分析特征重要性
    """
    if hasattr(model, 'feature_importances_'):
        importances = model.feature_importances_
        indices = np.argsort(importances)[::-1]
        
        print("=== 特征重要性排名 ===")
        for i in range(min(top_n, len(indices))):
            print(f"{i+1}. {feature_names[indices[i]]}: {importances[indices[i]]:.4f}")
        
        # 绘制特征重要性图
        plt.figure(figsize=(12, 8))
        top_indices = indices[:top_n]
        plt.barh(range(top_n), importances[top_indices][::-1])
        plt.yticks(range(top_n), [feature_names[i] for i in top_indices][::-1])
        plt.xlabel('Feature Importance')
        plt.title(f'Top {top_n} Feature Importances')
        plt.tight_layout()
        plt.show()
        
        return dict(zip(feature_names, importances))
    
    else:
        print("模型不支持特征重要性分析")
        return None

# 分析随机森林的特征重要性
feature_importance = analyze_feature_importance(rf_model, X.columns.tolist(), top_n=15)

5.3 超参数调优

通过网格搜索或随机搜索来优化模型超参数。

from sklearn.model_selection import RandomizedSearchCV

def optimize_hyperparameters(X, y):
    """
    使用随机搜索优化超参数
    """
    # 定义参数分布
    param_dist = {
        'n_estimators': [50, 100, 200, 300],
        'max_depth': [5, 10, 15, 20, None],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4],
        'max_features': ['sqrt', 'log2', None]
    }
    
    # 创建模型
    rf = RandomForestClassifier(random_state=42)
    
    # 时间序列交叉验证
    tscv = TimeSeriesSplit(n_splits=3)
    
    # 随机搜索
    random_search = RandomizedSearchCV(
        rf, param_distributions=param_dist, n_iter=20, cv=tscv,
        scoring='accuracy', n_jobs=-1, random_state=42, verbose=1
    )
    
    # 搜索最佳参数
    random_search.fit(X, y)
    
    print("最佳参数:", random_search.best_params_)
    print("最佳分数:", random_search.best_score_)
    
    return random_search.best_estimator_

# 优化超参数(注意:这可能需要较长时间)
# best_rf_model = optimize_hyperparameters(X, y)

6. 实战应用与部署

6.1 实时预测系统

import time
import threading
from datetime import datetime, timedelta

class RealTimePredictor:
    """
    实时预测系统
    """
    def __init__(self, model, feature_engineering_func, scaler=None):
        self.model = model
        self.feature_engineering_func = feature_engineering_func
        self.scaler = scaler
        self.is_running = False
        self.prediction_history = []
        
    def fetch_latest_data(self, symbol):
        """
        获取最新数据
        """
        try:
            # 获取最近的数据
            end_date = datetime.now().strftime('%Y%m%d')
            start_date = (datetime.now() - timedelta(days=60)).strftime('%Y%m%d')
            
            data = fetch_stock_data(symbol, start_date, end_date)
            return data
        except Exception as e:
            print(f"获取数据失败: {e}")
            return None
    
    def preprocess_data(self, raw_data):
        """
        预处理数据
        """
        if raw_data is None or len(raw_data) < 30:
            return None
        
        # 清洗数据
        cleaned = clean_stock_data(raw_data)
        
        # 计算技术指标
        with_indicators = calculate_technical_indicators(cleaned)
        
        # 提取时间特征
        with_time = extract_time_features(with_indicators)
        
        # 创建滞后特征
        with_lag = create_lag_features(with_time)
        
        # 创建滚动特征
        with_rolling = create_rolling_features(with_lag)
        
        # 删除包含NaN的行
        final_data = with_rolling.dropna()
        
        return final_data
    
    def predict(self, data):
        """
        进行预测
        """
        if data is None or len(data) == 0:
            return None
        
        # 准备特征(删除不需要的列)
        feature_cols = [col for col in data.columns if col not in [
            'future_price_5', 'future_return_5', 'price_direction_5', 'price_multi_class_5'
        ]]
        
        X = data[feature_cols].select_dtypes(include=[np.number])
        
        if len(X) == 0:
            return None
        
        # 获取最新一行进行预测
        latest_features = X.iloc[-1:].values
        
        # 如果是LSTM模型,需要特殊处理
        if hasattr(self.model, 'lstm'):  # LSTMWrapper
            # 需要构建序列
            if len(X) >= 30:
                sequence = X.iloc[-30:].values
                if self.scaler:
                    sequence = self.scaler.transform(sequence)
                sequence = torch.FloatTensor(sequence).unsqueeze(0)
                
                self.model.lstm_model.eval()
                with torch.no_grad():
                    prob = self.model.lstm_model(sequence).squeeze().item()
                prediction = 1 if prob > 0.5 else 0
                confidence = prob if prediction == 1 else 1 - prob
            else:
                return None
        else:  # 普通模型
            if self.scaler:
                latest_features = self.scaler.transform(latest_features)
            prediction = self.model.predict(latest_features)[0]
            
            # 获取概率
            if hasattr(self.model, 'predict_proba'):
                prob = self.model.predict_proba(latest_features)[0, 1]
                confidence = prob if prediction == 1 else 1 - prob
            else:
                confidence = 0.5
        
        return {
            'prediction': prediction,
            'confidence': confidence,
            'timestamp': datetime.now(),
            'features': X.iloc[-1].to_dict()
        }
    
    def run_continuous(self, symbol, interval=300):
        """
        运行连续预测
        """
        self.is_running = True
        
        def prediction_loop():
            while self.is_running:
                try:
                    # 获取数据
                    raw_data = self.fetch_latest_data(symbol)
                    
                    # 预处理
                    processed_data = self.preprocess_data(raw_data)
                    
                    # 预测
                    result = self.predict(processed_data)
                    
                    if result:
                        self.prediction_history.append(result)
                        print(f"[{result['timestamp']}] 预测: {'上涨' if result['prediction'] == 1 else '下跌'}, "
                              f"置信度: {result['confidence']:.2%}")
                    
                    # 等待
                    time.sleep(interval)
                    
                except Exception as e:
                    print(f"预测循环错误: {e}")
                    time.sleep(interval)
        
        # 在单独线程中运行
        thread = threading.Thread(target=prediction_loop)
        thread.daemon = True
        thread.start()
        
        print(f"开始实时预测 {symbol},每 {interval} 秒执行一次")
    
    def stop(self):
        """
        停止预测
        """
        self.is_running = False
        print("停止实时预测")

# 使用示例
# predictor = RealTimePredictor(rf_model, calculate_technical_indicators)
# predictor.run_continuous("600519", interval=60)  # 每分钟预测一次
# time.sleep(300)  # 运行5分钟
# predictor.stop()

6.2 模型保存与加载

import joblib
import pickle

def save_model(model, model_path, scaler=None, scaler_path=None):
    """
    保存模型和缩放器
    """
    # 保存模型
    if hasattr(model, 'lstm'):  # LSTMWrapper
        # 保存LSTM模型
        torch.save(model.lstm_model.state_dict(), model_path + '_lstm.pth')
        # 保存包装器参数
        with open(model_path + '_wrapper.pkl', 'wb') as f:
            pickle.dump({'seq_len': model.seq_len}, f)
    else:  # 普通模型
        joblib.dump(model, model_path)
    
    # 保存缩放器
    if scaler is not None and scaler_path is not None:
        joblib.dump(scaler, scaler_path)
    
    print(f"模型已保存到 {model_path}")

def load_model(model_path, model_type='random_forest', scaler_path=None):
    """
    加载模型和缩放器
    """
    scaler = None
    if scaler_path:
        scaler = joblib.load(scaler_path)
    
    if model_type == 'lstm':
        # 加载LSTM模型
        # 需要知道原始模型结构
        # 这里简化处理,实际应用中需要保存模型结构
        print("LSTM模型加载需要重新初始化结构")
        return None, scaler
    else:
        model = joblib.load(model_path)
        return model, scaler

# 保存模型
save_model(rf_model, 'models/rf_model.pkl', scaler, 'models/scaler.pkl')

# 加载模型
# loaded_model, loaded_scaler = load_model('models/rf_model.pkl', scaler_path='models/scaler.pkl')

7. 高级策略与优化

7.1 多因子模型

结合多个预测因子进行综合决策。

class MultiFactorModel:
    """
    多因子模型
    """
    def __init__(self, models_dict, weights=None):
        self.models = models_dict  # {'model_name': model}
        self.weights = weights if weights is not None else {name: 1/len(models_dict) for name in models_dict}
    
    def predict(self, X):
        """
        多因子综合预测
        """
        predictions = {}
        probabilities = {}
        
        for name, model in self.models.items():
            if hasattr(model, 'predict_proba'):
                prob = model.predict_proba(X)[:, 1]
                pred = (prob > 0.5).astype(int)
            else:
                # LSTM处理
                pred = model.predict(X)
                prob = pred  # 假设返回的是概率
            
            predictions[name] = pred
            probabilities[name] = prob
        
        # 加权投票
        weighted_sum = np.zeros(len(X))
        for name in self.models:
            weighted_sum += probabilities[name] * self.weights[name]
        
        final_prediction = (weighted_sum > 0.5).astype(int)
        
        return final_prediction, predictions, probabilities

# 使用示例
models_dict = {
    'random_forest': rf_model,
    'lstm': lstm_wrapper
}
multi_factor = MultiFactorModel(models_dict)

# 预测
final_pred, individual_preds, individual_probs = multi_factor.predict(X_test.values)
print("多因子模型准确率:", accuracy_score(y_test.values, final_pred))

7.2 市场状态识别

根据市场状态调整预测策略。

def identify_market_state(data):
    """
    识别市场状态
    """
    # 计算市场状态指标
    # 1. 趋势状态
    ma5 = data['close'].rolling(5).mean()
    ma20 = data['close'].rolling(20).mean()
    trend = np.where(ma5 > ma20, 'up', np.where(ma5 < ma20, 'down', 'sideways'))
    
    # 2. 波动状态
    volatility = data['close'].rolling(20).std()
    vol_median = volatility.median()
    vol_state = np.where(volatility > vol_median * 1.5, 'high', 
                        np.where(volatility < vol_median * 0.5, 'low', 'normal'))
    
    # 3. 成交量状态
    vol_ma = data['volume'].rolling(20).mean()
    vol_ratio = data['volume'] / vol_ma
    volume_state = np.where(vol_ratio > 1.5, 'high', 
                           np.where(vol_ratio < 0.5, 'low', 'normal'))
    
    # 组合状态
    market_state = [f"{t}-{v}-{vs}" for t, v, vs in zip(trend, vol_state, volume_state)]
    
    return market_state

# 识别市场状态
market_states = identify_market_state(cleaned_data)
print("市场状态分布:")
print(pd.Series(market_states).value_counts())

7.3 自适应学习

模型需要根据市场变化进行自适应调整。

class AdaptiveModel:
    """
    自适应模型
    """
    def __init__(self, base_model, retrain_interval=100, window_size=500):
        self.base_model = base_model
        self.retrain_interval = retrain_interval
        self.window_size = window_size
        self.prediction_count = 0
        self.performance_history = []
        
    def predict(self, X):
        """
        预测并记录性能
        """
        self.prediction_count += 1
        
        # 定期重新训练
        if self.prediction_count % self.retrain_interval == 0:
            self.retrain()
        
        return self.base_model.predict(X)
    
    def retrain(self, X_new=None, y_new=None):
        """
        重新训练模型
        """
        print(f"开始重新训练模型(第 {self.prediction_count} 次预测)")
        
        if X_new is not None and y_new is not None:
            # 使用新数据重新训练
            self.base_model.fit(X_new, y_new)
        else:
            # 使用滑动窗口重新训练
            # 这里简化处理,实际应用中需要获取最新数据
            print("需要提供新数据进行重新训练")
        
        print("重新训练完成")

# 使用示例
adaptive_rf = AdaptiveModel(rf_model, retrain_interval=200)

8. 风险管理与合规

8.1 仓位管理

class PositionManager:
    """
    仓位管理器
    """
    def __init__(self, max_position=0.3, max_loss=0.1, max_drawdown=0.2):
        self.max_position = max_position  # 最大仓位比例
        self.max_loss = max_loss  # 单笔最大损失比例
        self.max_drawdown = max_drawdown  # 最大回撤限制
        self.current_position = 0
        self.peak_capital = 0
        
    def calculate_position_size(self, capital, price, stop_loss_price):
        """
        根据风险计算仓位大小
        """
        # 单笔最大风险
        max_risk_amount = capital * self.max_loss
        
        # 潜在损失
        potential_loss_per_share = price - stop_loss_price
        
        if potential_loss_per_share <= 0:
            return 0
        
        # 计算可买入数量
        shares = max_risk_amount // potential_loss_per_share
        
        # 仓位限制
        max_shares = (capital * self.max_position) // price
        
        return min(shares, max_shares)
    
    def check_drawdown_limit(self, current_capital):
        """
        检查回撤限制
        """
        if current_capital > self.peak_capital:
            self.peak_capital = current_capital
        
        if self.peak_capital > 0:
            drawdown = (self.peak_capital - current_capital) / self.peak_capital
            if drawdown > self.max_drawdown:
                return False  # 触发回撤限制,停止交易
        
        return True

# 使用示例
pos_manager = PositionManager(max_position=0.3, max_loss=0.05, max_drawdown=0.15)

8.2 交易成本优化

def optimize_transaction_cost(data, predictions, cost_range=np.arange(0.0005, 0.003, 0.0005)):
    """
    优化交易成本
    """
    results = []
    
    for cost in cost_range:
        backtest = SimpleBacktest()
        result = backtest.run_backtest(data, predictions, transaction_cost=cost)
        
        if result:
            results.append({
                'cost': cost,
                'total_return': result['total_return'],
                'sharpe_ratio': result['sharpe_ratio'],
                'max_drawdown': result['max_drawdown'],
                'num_trades': result['num_trades']
            })
    
    results_df = pd.DataFrame(results)
    
    # 绘制结果
    plt.figure(figsize=(12, 8))
    plt.subplot(2, 2, 1)
    plt.plot(results_df['cost'], results_df['total_return'])
    plt.title('Total Return vs Transaction Cost')
    plt.xlabel('Transaction Cost')
    plt.ylabel('Total Return')
    
    plt.subplot(2, 2, 2)
    plt.plot(results_df['cost'], results_df['sharpe_ratio'])
    plt.title('Sharpe Ratio vs Transaction Cost')
    plt.xlabel('Transaction Cost')
    plt.ylabel('Sharpe Ratio')
    
    plt.subplot(2, 2, 3)
    plt.plot(results_df['cost'], results_df['max_drawdown'])
    plt.title('Max Drawdown vs Transaction Cost')
    plt.xlabel('Transaction Cost')
    plt.ylabel('Max Drawdown')
    
    plt.subplot(2, 2, 4)
    plt.plot(results_df['cost'], results_df['num_trades'])
    plt.title('Number of Trades vs Transaction Cost')
    plt.xlabel('Transaction Cost')
    plt.ylabel('Number of Trades')
    
    plt.tight_layout()
    plt.show()
    
    return results_df

# 优化交易成本
# cost_results = optimize_transaction_cost(data_with_target, ensemble_predictions)
# print(cost_results)

9. 总结与最佳实践

9.1 关键要点总结

  1. 数据质量是基础:确保数据清洁、完整、准确,避免未来数据泄露
  2. 特征工程至关重要:好的特征比复杂的模型更重要
  3. 模型选择要合适:根据数据特点选择模型,不要盲目追求复杂度
  4. 风险控制优先:永远把风险控制放在收益之前
  5. 持续监控和优化:市场在变化,模型需要定期更新

9.2 常见陷阱与避免方法

  1. 过拟合:使用时间序列交叉验证,保持训练数据和测试数据的时间顺序
  2. 数据泄露:确保特征工程中不使用未来信息
  3. 幸存者偏差:考虑退市股票和除权除息的影响
  4. 交易成本忽视:在回测中必须包含交易成本
  5. 过度交易:设置合理的交易频率限制

9.3 进一步学习方向

  1. 深度学习:Transformer、Attention机制在时间序列中的应用
  2. 强化学习:用于动态仓位管理和交易策略优化
  3. 自然语言处理:结合新闻、社交媒体情绪进行预测
  4. 高频交易:微秒级数据处理和预测
  5. 另类数据:卫星图像、信用卡数据等非传统数据源

9.4 代码组织建议

# 推荐的项目结构
"""
project/
├── data/
│   ├── raw/              # 原始数据
│   ├── processed/        # 处理后的数据
│   └── features/         # 特征数据
├── models/
│   ├── trained/          # 训练好的模型
│   └── scalers/          # 缩放器
├── src/
│   ├── data_processing.py  # 数据处理模块
│   ├── feature_engineering.py  # 特征工程模块
│   ├── model_training.py  # 模型训练模块
│   ├── backtesting.py     # 回测模块
│   ├── risk_management.py # 风险管理模块
│   └── real_time_predictor.py  # 实时预测模块
├── config/
│   ├── model_config.yaml  # 模型配置
│   └── trading_config.yaml  # 交易配置
├── notebooks/
│   ├── exploration.ipynb  # 数据探索
│   └── analysis.ipynb     # 结果分析
└── main.py               # 主程序
"""

# 主程序示例框架
"""
def main():
    # 1. 配置参数
    config = load_config('config/trading_config.yaml')
    
    # 2. 数据准备
    data = fetch_and_process_data(config['symbol'], config['start_date'], config['end_date'])
    
    # 3. 特征工程
    features = engineer_features(data)
    
    # 4. 模型训练
    model = train_model(features, config['model_type'])
    
    # 5. 回测评估
    results = backtest(model, features, config['initial_capital'])
    
    # 6. 风险评估
    risk_metrics = calculate_risk_metrics(results['returns'])
    
    # 7. 保存结果
    save_results(results, risk_metrics, config['output_path'])
    
    # 8. 实时预测(可选)
    if config['real_time']:
        predictor = RealTimePredictor(model, engineer_features)
        predictor.run_continuous(config['symbol'])

if __name__ == '__main__':
    main()
"""

结语

股市交易时间序列预测是一个复杂但充满机遇的领域。通过系统化的方法、严谨的回测和严格的风险控制,我们可以构建出稳健的预测系统。记住,没有任何模型能够100%准确预测市场,关键在于建立一套完整的投资体系,在控制风险的前提下实现持续稳定的收益。

成功的量化交易不仅需要技术能力,更需要对市场的深刻理解和良好的心理素质。建议从小规模开始,逐步积累经验,不断完善策略,最终形成适合自己的交易体系。

最后,提醒投资者:股市有风险,投资需谨慎。任何预测模型都只能作为辅助决策工具,不能替代独立的投资判断。