引言:量化投资与机器学习的完美结合
在当今数据驱动的金融世界中,量化投资已经成为获取稳健收益的重要手段。通过结合Python编程和机器学习技术,投资者能够从海量数据中挖掘出有价值的模式,构建出超越传统投资策略的模型。本文将从零开始,详细讲解如何使用Python构建一个完整的量化投资系统,包括数据获取、特征工程、模型构建、回测和风险控制等核心环节。
量化投资的核心优势在于其系统性和纪律性。与传统投资相比,量化方法能够消除情绪干扰,严格遵循预设规则执行交易。而机器学习的引入更是让量化投资如虎添翼,它能够自动识别复杂的非线性关系,发现人类分析师难以察觉的市场规律。
本文将使用最新的Python库(如pandas 2.0、scikit-learn 1.3和PyTorch 2.0)来构建一个完整的AI选股模型。我们将重点关注以下几个方面:
- 数据获取与预处理
- 特征工程与因子构建
- 机器学习模型训练
- 回测框架实现
- 风险控制与组合优化
第一部分:数据获取与预处理
1.1 数据源选择
高质量的数据是量化投资的基础。对于A股市场,我们可以使用以下数据源:
- Tushare:免费的财经数据接口
- AKShare:开源的金融数据接口
- Yahoo Finance:国际股票数据
以下代码展示了如何使用Tushare获取A股历史数据:
import tushare as ts
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# 设置Tushare token(需要在Tushare官网注册获取)
ts.set_token('your_token_here')
pro = ts.pro_api()
def get_stock_data(symbol, start_date, end_date):
"""
获取股票历史数据
:param symbol: 股票代码,如'000001.SZ'
:param start_date: 开始日期,格式'YYYYMMDD'
:param end_date: 结束日期,格式'YYYYMMDD'
:return: 包含历史数据的DataFrame
"""
# 获取日线数据
df = pro.daily(ts_code=symbol, start_date=start_date, end_date=end_date)
# 转换日期格式
df['trade_date'] = pd.to_datetime(df['trade_date'])
df = df.sort_values('trade_date').reset_index(drop=True)
# 计算收益率
df['returns'] = df['close'].pct_change()
# 计算对数收益率
df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
return df
# 示例:获取平安银行过去3年的数据
symbol = '000001.SZ'
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=3*365)).strftime('%Y%m%d')
df_stock = get_stock_data(symbol, start_date, end_date)
print(df_stock.head())
1.2 数据清洗与预处理
原始数据通常包含缺失值、异常值等问题,需要进行清洗:
def clean_stock_data(df):
"""
清洗股票数据
:param df: 原始数据
:return: 清洗后的数据
"""
# 删除缺失值
df = df.dropna()
# 处理异常值(使用3σ原则)
numeric_cols = ['open', 'high', 'low', 'close', 'vol', 'amount']
for col in numeric_cols:
mean = df[col].mean()
std = df[col].std()
df = df[(df[col] >= mean - 3*std) & (df[col] <= mean + 3*std)]
# 重置索引
df = df.reset_index(drop=True)
return df
# 清洗数据
df_clean = clean_stock_data(df_stock)
print(f"清洗前数据量: {len(df_stock)}, 清洗后数据量: {len(df_clean)}")
1.3 数据标准化
机器学习模型通常需要标准化数据:
from sklearn.preprocessing import StandardScaler, MinMaxScaler
def standardize_data(df, features):
"""
标准化数据
:param df: 数据框
:param features: 需要标准化的特征列
:return: 标准化后的数据
"""
scaler = StandardScaler()
df_std = df.copy()
df_std[features] = scaler.fit_transform(df[features])
return df_std, scaler
# 示例:标准化特征
features = ['open', 'high', 'low', 'close', 'vol']
df_std, scaler = standardize_data(df_clean, features)
print(df_std[features].head())
第二部分:特征工程与因子构建
2.1 技术指标因子
技术指标是量化投资中最常用的因子类型。以下代码实现了多种技术指标的计算:
def calculate_technical_indicators(df):
"""
计算技术指标
:param df: 包含价格数据的DataFrame
:return: 添加技术指标后的DataFrame
"""
df = df.copy()
# 移动平均线
df['MA5'] = df['close'].rolling(window=5).mean()
df['MA20'] = df['close'].rolling(window=20).mean()
df['MA60'] = df['close'].rolling(window=60).mean()
# MACD
exp1 = df['close'].ewm(span=12, adjust=False).mean()
exp2 = df['close'].ewm(span=26, adjust=False).mean()
df['MACD'] = exp1 - exp2
df['MACD_signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
df['MACD_hist'] = df['MACD'] - df['MACD_signal']
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
# Bollinger Bands
df['BB_mid'] = df['close'].rolling(window=20).mean()
df['BB_std'] = df['close'].rolling(window=20).std()
df['BB_upper'] = df['BB_mid'] + 2 * df['BB_std']
df['BB_lower'] = df['BB_mid'] - 2 * df['BB_std']
# KDJ
low_list = df['low'].rolling(window=9).min()
high_list = df['high'].rolling(window=9).max()
rsv = (df['close'] - low_list) / (high_list - low_list) * 100
df['K'] = rsv.ewm(com=2).mean()
df['D'] = df['K'].ewm(com=2).mean()
df['J'] = 3 * df['K'] - 2 * df['D']
# 成交量指标
df['VOL_MA5'] = df['vol'].rolling(window=5).mean()
df['VOL_MA20'] = df['vol'].rolling(window=20).mean()
# 价格动量
df['MOM'] = df['close'].diff(5)
df['ROC'] = df['close'].pct_change(5) * 100
# 波动率
df['volatility'] = df['returns'].rolling(window=20).std() * np.sqrt(252)
return df
# 计算技术指标
df_with_indicators = calculate_technical_indicators(df_clean)
print(df_with_indicators.columns.tolist())
2.2 基本面因子
对于基本面因子,我们可以获取财务数据:
def get_fundamental_data(symbol, start_date, end_date):
"""
获取基本面数据
:param symbol: 股票代码
:param start_date: 开始日期
:param end_date: 结束日期
:return: 基本面数据DataFrame
"""
# 获取财务指标
df_fundamental = pro.fina_indicator(ts_code=symbol,
start_date=start_date,
end_date=end_date)
# 获取市盈率、市净率
df_daily_basic = pro.daily_basic(ts_code=symbol,
start_date=start_date,
end_date=end_date)
# 合并数据
df_fundamental['trade_date'] = pd.to_datetime(df_fundamental['end_date'])
df_daily_basic['trade_date'] = pd.to_datetime(df_daily_basic['trade_date'])
# 前向填充基本面数据(因为财务数据是季度更新)
df_fundamental = df_fundamental.sort_values('trade_date')
df_fundamental = df_fundamental.set_index('trade_date').resample('D').ffill().reset_index()
# 合并
df_merged = pd.merge(df_daily_basic, df_fundamental, on='trade_date', how='left')
return df_merged
def calculate_fundamental_factors(df):
"""
计算基本面因子
:param df: 基本面数据
:return: 添加因子后的数据
"""
df = df.copy()
# 估值因子
df['PE'] = df['pe_ttm'] # 市盈率
df['PB'] = df['pb'] # 市净率
df['PS'] = df['ps'] # 市销率
df['PCF'] = df['pcf'] # 市现率
# 盈利能力因子
df['ROE'] = df['roe'] # 净资产收益率
df['ROA'] = df['roa'] # 总资产收益率
df['GROSS_MARGIN'] = df['grossprofit_margin'] # 毛利率
# 成长性因子
df['YOY_REV'] = df['yoy_revenue'] # 营收增长率
df['YOY_PROFIT'] = df['yoy_profit'] # 净利润增长率
# 财务健康因子
df['DEBT_TO_EQ'] = df['debt_to_eq'] # 资产负债率
df['CURRENT_RATIO'] = df['current_ratio'] # 流动比率
return df
# 示例:获取并计算基本面因子
# df_fundamental = get_fundamental_data(symbol, start_date, end_date)
# df_fundamental_factors = calculate_fundamental_factors(df_fundamental)
2.3 因子预处理
在构建模型前,需要对因子进行预处理:
def preprocess_factors(df, factor_columns):
"""
预处理因子数据
:param df: 包含因子的数据框
:param factor_columns: 因子列名列表
:return: 预处理后的数据
"""
df_processed = df.copy()
# 1. 缺失值处理
# 对于技术指标,使用前向填充
df_processed[factor_columns] = df_processed[factor_columns].ffill()
# 2. 异常值处理(Winsorization)
for col in factor_columns:
# 计算1%和99%分位数
lower = df_processed[col].quantile(0.01)
upper = df_processed[col].quantile(0.99)
# Winsorization
df_processed[col] = np.where(df_processed[col] < lower, lower, df_processed[col])
df_processed[col] = np.where(df_processed[col] > upper, upper, df_processed[col])
# 3. 标准化
scaler = StandardScaler()
df_processed[factor_columns] = scaler.fit_transform(df_processed[factor_columns])
# 4. 中性化(可选,去除行业和市值影响)
# 这里简化处理,实际中需要获取行业和市值数据
return df_processed, scaler
# 示例:预处理因子
factor_columns = ['MA5', 'MA20', 'MACD', 'RSI', 'K', 'D', 'J', 'MOM', 'ROC', 'volatility']
df_processed, factor_scaler = preprocess_factors(df_with_indicators, factor_columns)
print(df_processed[factor_columns].describe())
第三部分:目标变量构建
3.1 定义投资目标
在监督学习中,我们需要定义目标变量。常见的目标包括:
- 未来N天的收益率
- 是否跑赢基准(二分类)
- 多空信号(三分类)
以下代码展示了如何构建未来收益率目标:
def create_target_variable(df, forward_period=5):
"""
创建目标变量
:param df: 数据框
:param forward_period: 前向预测周期(天)
:return: 添加目标变量的数据框
"""
df = df.copy()
# 计算未来forward_period天的收益率
df['target'] = df['close'].shift(-forward_period) / df['close'] - 1
# 创建分类目标(可选)
# 1: 上涨超过2%,-1: 下跌超过2%,0: 震荡
df['target_class'] = np.where(df['target'] > 0.02, 1,
np.where(df['target'] < -0.02, -1, 0))
# 删除最后forward_period行(因为没有目标值)
df = df.iloc[:-forward_period]
return df
# 创建目标变量
df_final = create_target_variable(df_processed, forward_period=5)
print(df_final[['close', 'target', 'target_class']].tail())
3.2 数据集划分
将数据分为训练集、验证集和测试集:
def split_dataset(df, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
"""
划分数据集
:param df: 数据框
:param train_ratio: 训练集比例
:param val_ratio: 验证集比例
:param test_ratio: 测试集比例
:return: 训练集、验证集、测试集
"""
assert train_ratio + val_ratio + test_ratio == 1.0
total_size = len(df)
train_size = int(total_size * train_ratio)
val_size = int(total_size * val_ratio)
train_df = df.iloc[:train_size]
val_df = df.iloc[train_size:train_size + val_size]
test_df = df.iloc[train_size + val_size:]
return train_df, val_df, test_df
# 划分数据集
train_df, val_df, test_df = split_dataset(df_final)
print(f"训练集: {len(train_df)}, 验证集: {len(val_df)}, 测试集: {len(test_df)}")
第四部分:机器学习模型构建
4.1 基础模型:线性回归
首先,我们从简单的线性回归开始:
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
def train_linear_regression(X_train, y_train, X_val, y_val):
"""
训练线性回归模型
:param X_train: 训练特征
:param y_train: 训练目标
:param X_val: 验证特征
:param y_val: 验证目标
:return: 训练好的模型和评估指标
"""
# 初始化模型
model = LinearRegression()
# 训练模型
model.fit(X_train, y_train)
# 预测
y_train_pred = model.predict(X_train)
y_val_pred = model.predict(X_val)
# 评估
train_mse = mean_squared_error(y_train, y_train_pred)
val_mse = mean_squared_error(y_val, y_val_pred)
train_r2 = r2_score(y_train, y_train_pred)
val_r2 = r2_score(y_val, y_val_pred)
print(f"训练集 MSE: {train_mse:.6f}, R2: {train_r2:.4f}")
print(f"验证集 MSE: {val_mse:.6f}, R2: {val_r2:.4f}")
return model, {'train_mse': train_mse, 'val_mse': val_mse,
'train_r2': train_r2, 'val_r2': val_r2}
# 准备数据
X_train = train_df[factor_columns].values
y_train = train_df['target'].values
X_val = val_df[factor_columns].values
y_val = val_df['target'].values
# 训练线性回归模型
lr_model, lr_metrics = train_linear_regression(X_train, y_train, X_val, y_val)
4.2 随机森林模型
随机森林是量化投资中常用的模型,具有较好的鲁棒性:
from sklearn.ensemble import RandomForestRegressor
def train_random_forest(X_train, y_train, X_val, y_val, n_estimators=100, max_depth=10):
"""
训练随机森林模型
:param X_train: 训练特征
:param y_train: 训练目标
:param X_val: 验证特征
:param y_val: 验证目标
:param n_estimators: 树的数量
:param max_depth: 最大深度
:return: 训练好的模型和评估指标
"""
model = RandomForestRegressor(
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_split=20,
min_samples_leaf=10,
random_state=42,
n_jobs=-1
)
model.fit(X_train, y_train)
# 预测
y_train_pred = model.predict(X_train)
y_val_pred = model.predict(X_val)
# 评估
train_mse = mean_squared_error(y_train, y_train_pred)
val_mse = mean_squared_error(y_val, y_val_pred)
train_r2 = r2_score(y_train, y_train_pred)
val_r2 = r2_score(y_val, y_val_pred)
print(f"随机森林 - 训练集 MSE: {train_mse:.6f}, R2: {train_r2:.4f}")
print(f"随机森林 - 验证集 MSE: {val_mse:.6f}, R2: {val_r2:.4f}")
# 特征重要性
feature_importance = pd.DataFrame({
'feature': factor_columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\n特征重要性:")
print(feature_importance)
return model, {'train_mse': train_mse, 'val_mse': val_mse,
'train_r2': train_r2, 'val_r2': val_r2}, feature_importance
# 训练随机森林模型
rf_model, rf_metrics, rf_importance = train_random_forest(X_train, y_train, X_val, y_val)
4.3 XGBoost模型
XGBoost是目前量化投资中最强大的模型之一:
import xgboost as xgb
def train_xgboost(X_train, y_train, X_val, y_val, params=None):
"""
训练XGBoost模型
:param X_train: 训练特征
:param y_train: 训练目标
:param X_val: 验证特征
:param y_val: 验证目标
:param params: XGBoost参数
:return: 训练好的模型和评估指标
"""
if params is None:
params = {
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'max_depth': 6,
'learning_rate': 0.1,
'subsample': 0.8,
'colsample_bytree': 0.8,
'seed': 42,
'n_jobs': -1
}
# 创建DMatrix
dtrain = xgb.DMatrix(X_train, label=y_train, feature_names=factor_columns)
dval = xgb.DMatrix(X_val, label=y_val, feature_names=factor_columns)
# 训练模型
model = xgb.train(
params,
dtrain,
num_boost_round=1000,
evals=[(dtrain, 'train'), (dval, 'val')],
early_stopping_rounds=50,
verbose_eval=100
)
# 预测
y_train_pred = model.predict(dtrain)
y_val_pred = model.predict(dval)
# 评估
train_mse = mean_squared_error(y_train, y_train_pred)
val_mse = mean_squared_error(y_val, y_val_pred)
train_r2 = r2_score(y_train, y_train_pred)
val_r2 = r2_score(y_val, y_val_pred)
print(f"XGBoost - 训练集 MSE: {train_mse:.6f}, R2: {train_r2:.4f}")
print(f"XGBoost - 验证集 MSE: {val_mse:.6f}, R2: {val_r2:.4f}")
# 特征重要性
feature_importance = pd.DataFrame({
'feature': factor_columns,
'importance': model.get_score(importance_type='gain').values()
}).sort_values('importance', ascending=False)
print("\n特征重要性:")
print(feature_importance)
return model, {'train_mse': train_mse, 'val_mse': val_mse,
'train_r2': train_r2, 'val_r2': val_r2}, feature_importance
# 训练XGBoost模型
xgb_model, xgb_metrics, xgb_importance = train_xgboost(X_train, y_train, X_val, y_val)
4.4 神经网络模型
对于更复杂的模式,可以使用PyTorch构建神经网络:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
class StockPredictionNN(nn.Module):
def __init__(self, input_dim, hidden_dims=[128, 64, 32], dropout_rate=0.3):
super(StockPredictionNN, self).__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout_rate))
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
def train_neural_network(X_train, y_train, X_val, y_val, epochs=100, batch_size=64, lr=0.001):
"""
训练神经网络模型
:param X_train: 训练特征
:param y_train: 训练目标
:param X_val: 验证特征
:param y_val: 验证目标
:param epochs: 训练轮数
:param batch_size: 批次大小
:param lr: 学习率
:return: 训练好的模型和评估指标
"""
# 转换为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train).reshape(-1, 1)
X_val_tensor = torch.FloatTensor(X_val)
y_val_tensor = torch.FloatTensor(y_val).reshape(-1, 1)
# 创建数据加载器
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# 初始化模型
model = StockPredictionNN(input_dim=X_train.shape[1])
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
# 训练循环
train_losses = []
val_losses = []
for epoch in range(epochs):
model.train()
batch_losses = []
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
batch_losses.append(loss.item())
train_loss = np.mean(batch_losses)
# 验证
model.eval()
with torch.no_grad():
val_outputs = model(X_val_tensor)
val_loss = criterion(val_outputs, y_val_tensor).item()
train_losses.append(train_loss)
val_losses.append(val_loss)
if epoch % 20 == 0:
print(f"Epoch {epoch}: Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}")
# 最终评估
model.eval()
with torch.no_grad():
y_train_pred = model(X_train_tensor).numpy().flatten()
y_val_pred = model(X_val_tensor).numpy().flatten()
train_mse = mean_squared_error(y_train, y_train_pred)
val_mse = mean_squared_error(y_val, y_val_pred)
train_r2 = r2_score(y_train, y_train_pred)
val_r2 = r2_score(y_val, y_val_pred)
print(f"神经网络 - 训练集 MSE: {train_mse:.6f}, R2: {train_r2:.4f}")
print(f"神经网络 - 验证集 MSE: {val_mse:.6f}, R2: {val_r2:.4f}")
return model, {'train_mse': train_mse, 'val_mse': val_mse,
'train_r2': train_r2, 'val_r2': val_r2,
'train_losses': train_losses, 'val_losses': val_losses}
# 训练神经网络模型
nn_model, nn_metrics = train_neural_network(X_train, y_train, X_val, y_val, epochs=100)
第五部分:回测框架实现
5.1 回测框架设计
一个完整的回测框架需要考虑以下要素:
- 交易成本
- 滑点
- 资金管理
- 仓位控制
以下是一个简单的回测框架:
class BacktestFramework:
def __init__(self, initial_capital=100000, transaction_cost=0.001, slippage=0.0005):
"""
初始化回测框架
:param initial_capital: 初始资金
:param transaction_cost: 交易成本(百分比)
:param slippage: 滑点(百分比)
"""
self.initial_capital = initial_capital
self.transaction_cost = transaction_cost
self.slippage = slippage
self.results = {}
def run_backtest(self, df, model, factor_columns, target_col='target',
position_size=0.1, stop_loss=0.05, take_profit=0.1):
"""
运行回测
:param df: 包含数据和预测的DataFrame
:param model: 预测模型
:param factor_columns: 特征列
:param target_col: 目标列
:param position_size: 单笔交易仓位比例
:param stop_loss: 止损比例
:param take_profit: 止盈比例
:return: 回测结果
"""
df = df.copy()
# 获取预测值
if hasattr(model, 'predict'):
# sklearn模型
df['prediction'] = model.predict(df[factor_columns].values)
else:
# XGBoost或PyTorch模型
if hasattr(model, 'predict'):
# XGBoost
dmatrix = xgb.DMatrix(df[factor_columns], feature_names=factor_columns)
df['prediction'] = model.predict(dmatrix)
else:
# PyTorch
model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(df[factor_columns].values)
df['prediction'] = model(X_tensor).numpy().flatten()
# 初始化账户状态
capital = self.initial_capital
position = 0 # 持仓数量
entry_price = 0 # 建仓价格
cash = self.initial_capital
trades = []
equity_curve = []
# 遍历每一天
for i in range(len(df) - 1):
current_date = df.iloc[i]['trade_date']
current_price = df.iloc[i]['close']
prediction = df.iloc[i]['prediction']
next_price = df.iloc[i + 1]['close']
# 记录权益
total_value = cash + position * current_price
equity_curve.append({'date': current_date, 'equity': total_value})
# 交易逻辑
if position == 0: # 空仓
# 根据预测信号开仓
if prediction > 0.01: # 预测上涨超过1%
# 计算可买入数量
buy_amount = cash * position_size
shares = buy_amount / current_price
# 扣除交易成本和滑点
cost = buy_amount * (self.transaction_cost + self.slippage)
actual_buy = buy_amount - cost
if actual_buy > 100: # 最小交易金额限制
shares = actual_buy / current_price
position = shares
entry_price = current_price
cash -= buy_amount
trades.append({
'date': current_date,
'action': 'BUY',
'price': current_price,
'shares': shares,
'cost': cost,
'cash': cash,
'position': position
})
else: # 有持仓
# 检查止损、止盈或平仓信号
current_return = (current_price - entry_price) / entry_price
# 止损
if current_return < -stop_loss:
action = 'STOP_LOSS'
close_position = True
# 止盈
elif current_return > take_profit:
action = 'TAKE_PROFIT'
close_position = True
# 预测下跌,平仓
elif prediction < -0.01:
action = 'SELL'
close_position = True
else:
close_position = False
if close_position:
# 卖出
sell_amount = position * current_price
cost = sell_amount * (self.transaction_cost + self.slippage)
actual_sell = sell_amount - cost
cash += actual_sell
profit = actual_sell - (position * entry_price)
trades.append({
'date': current_date,
'action': action,
'price': current_price,
'shares': position,
'cost': cost,
'cash': cash,
'profit': profit
})
position = 0
entry_price = 0
# 最后一天平仓
if position > 0:
final_price = df.iloc[-1]['close']
sell_amount = position * final_price
cost = sell_amount * (self.transaction_cost + self.slippage)
actual_sell = sell_amount - cost
cash += actual_sell
profit = actual_sell - (position * entry_price)
trades.append({
'date': df.iloc[-1]['trade_date'],
'action': 'FINAL_SELL',
'price': final_price,
'shares': position,
'cost': cost,
'cash': cash,
'profit': profit
})
position = 0
# 计算结果
final_capital = cash
total_return = (final_capital - self.initial_capital) / self.initial_capital
equity_df = pd.DataFrame(equity_curve)
# 计算指标
if len(equity_df) > 1:
equity_df['returns'] = equity_df['equity'].pct_change()
sharpe_ratio = equity_df['returns'].mean() / equity_df['returns'].std() * np.sqrt(252)
max_drawdown = (equity_df['equity'] / equity_df['equity'].cummax() - 1).min()
else:
sharpe_ratio = 0
max_drawdown = 0
# 保存结果
self.results = {
'initial_capital': self.initial_capital,
'final_capital': final_capital,
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'trades': pd.DataFrame(trades),
'equity_curve': equity_df,
'num_trades': len(trades)
}
return self.results
def print_results(self):
"""打印回测结果"""
if not self.results:
print("没有回测结果")
return
print("=" * 50)
print("回测结果")
print("=" * 50)
print(f"初始资金: {self.results['initial_capital']:,.2f}")
print(f"最终资金: {self.results['final_capital']:,.2f}")
print(f"总收益率: {self.results['total_return']:.2%}")
print(f"夏普比率: {self.results['sharpe_ratio']:.2f}")
print(f"最大回撤: {self.results['max_drawdown']:.2%}")
print(f"交易次数: {self.results['num_trades']}")
if self.results['num_trades'] > 0:
trades = self.results['trades']
win_rate = (trades['profit'] > 0).mean() if 'profit' in trades.columns else 0
avg_profit = trades['profit'].mean() if 'profit' in trades.columns else 0
print(f"胜率: {win_rate:.2%}")
print(f"平均盈利: {avg_profit:.2f}")
print("=" * 50)
# 示例:使用XGBoost模型进行回测
# 首先需要在测试集上进行预测
test_X = test_df[factor_columns].values
test_y = test_df['target'].values
# 获取测试集预测
test_dmatrix = xgb.DMatrix(test_X, feature_names=factor_columns)
test_df['prediction'] = xgb_model.predict(test_dmatrix)
# 运行回测
backtest = BacktestFramework(initial_capital=100000)
results = backtest.run_backtest(test_df, xgb_model, factor_columns)
backtest.print_results()
5.2 回测结果可视化
import matplotlib.pyplot as plt
import seaborn as sns
def plot_backtest_results(results):
"""
绘制回测结果图表
:param results: 回测结果字典
"""
if not results:
return
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('回测结果分析', fontsize=16)
# 1. 权益曲线
if 'equity_curve' in results and len(results['equity_curve']) > 0:
equity = results['equity_curve']
axes[0, 0].plot(equity['date'], equity['equity'], label='Strategy')
axes[0, 0].plot(equity['date'],
results['initial_capital'] * np.ones(len(equity)),
label='Initial', linestyle='--')
axes[0, 0].set_title('权益曲线')
axes[0, 0].set_ylabel('资金')
axes[0, 0].legend()
axes[0, 0].tick_params(axis='x', rotation=45)
# 2. 回撤曲线
if 'equity_curve' in results and len(results['equity_curve']) > 0:
equity = results['equity_curve']
drawdown = (equity['equity'] / equity['equity'].cummax() - 1) * 100
axes[0, 1].fill_between(equity['date'], drawdown, 0, color='red', alpha=0.3)
axes[0, 1].set_title('回撤曲线 (%)')
axes[0, 1].set_ylabel('回撤 (%)')
axes[0, 1].tick_params(axis='x', rotation=45)
# 3. 交易分布
if 'trades' in results and len(results['trades']) > 0:
trades = results['trades']
if 'profit' in trades.columns:
axes[1, 0].hist(trades['profit'], bins=20, alpha=0.7, color='blue')
axes[1, 0].axvline(trades['profit'].mean(), color='red', linestyle='--', label='Mean')
axes[1, 0].set_title('交易盈亏分布')
axes[1, 0].set_xlabel('盈亏金额')
axes[1, 0].legend()
# 4. 月度收益
if 'equity_curve' in results and len(results['equity_curve']) > 0:
equity = results['equity_curve'].copy()
equity['date'] = pd.to_datetime(equity['date'])
equity.set_index('date', inplace=True)
monthly_returns = equity['equity'].resample('M').last().pct_change()
if len(monthly_returns) > 0:
axes[1, 1].bar(range(len(monthly_returns)), monthly_returns * 100,
color=['green' if x >= 0 else 'red' for x in monthly_returns])
axes[1, 1].set_title('月度收益 (%)')
axes[1, 1].set_ylabel('收益 (%)')
axes[1, 1].set_xlabel('月份')
plt.tight_layout()
plt.show()
# 绘制图表
plot_backtest_results(results)
第六部分:风险控制与组合优化
6.1 风险控制指标
风险控制是量化投资的核心。以下代码实现了多种风险指标的计算:
def calculate_risk_metrics(returns):
"""
计算风险指标
:param returns: 收益率序列
:return: 风险指标字典
"""
if len(returns) < 2:
return {}
metrics = {}
# 基础指标
metrics['total_return'] = (1 + returns).prod() - 1
metrics['annual_return'] = (1 + metrics['total_return']) ** (252 / len(returns)) - 1
metrics['annual_volatility'] = returns.std() * np.sqrt(252)
metrics['sharpe_ratio'] = metrics['annual_return'] / metrics['annual_volatility'] if metrics['annual_volatility'] > 0 else 0
# 最大回撤
cumulative = (1 + returns).cumprod()
running_max = cumulative.expanding().max()
drawdown = (cumulative - running_max) / running_max
metrics['max_drawdown'] = drawdown.min()
# 胜率
win_rate = (returns > 0).mean()
metrics['win_rate'] = win_rate
# 盈亏比
positive_returns = returns[returns > 0]
negative_returns = returns[returns < 0]
if len(positive_returns) > 0 and len(negative_returns) > 0:
avg_win = positive_returns.mean()
avg_loss = abs(negative_returns.mean())
metrics['profit_factor'] = avg_win / avg_loss if avg_loss > 0 else np.inf
else:
metrics['profit_factor'] = np.nan
# Calmar比率
if metrics['max_drawdown'] != 0:
metrics['calmar_ratio'] = metrics['annual_return'] / abs(metrics['max_drawdown'])
else:
metrics['calmar_ratio'] = np.inf
# Sortino比率(下行风险)
downside_returns = returns[returns < 0]
if len(downside_returns) > 0:
downside_vol = downside_returns.std() * np.sqrt(252)
metrics['sortino_ratio'] = metrics['annual_return'] / downside_vol if downside_vol > 0 else 0
else:
metrics['sortino_ratio'] = np.inf
# VaR(风险价值)
metrics['var_95'] = np.percentile(returns, 5)
metrics['var_99'] = np.percentile(returns, 1)
# CVaR(条件风险价值)
metrics['cvar_95'] = returns[returns <= metrics['var_95']].mean()
metrics['cvar_99'] = returns[returns <= metrics['var_99']].mean()
return metrics
# 示例:计算回测结果的风险指标
if 'equity_curve' in results and len(results['equity_curve']) > 1:
equity = results['equity_curve']
returns = equity['equity'].pct_change().dropna()
risk_metrics = calculate_risk_metrics(returns)
print("风险指标:")
for key, value in risk_metrics.items():
print(f"{key}: {value:.4f}")
6.2 组合优化
构建多资产组合时,需要进行优化:
from scipy.optimize import minimize
def portfolio_optimization(returns_df, method='sharpe', risk_free_rate=0.02):
"""
组合优化
:param returns_df: 多资产收益率DataFrame
:param method: 优化目标('sharpe', 'min_volatility', 'max_return')
:param risk_free_rate: 无风险利率
:return: 最优权重
"""
# 计算预期收益率和协方差矩阵
mean_returns = returns_df.mean() * 252 # 年化
cov_matrix = returns_df.cov() * 252
num_assets = len(mean_returns)
def portfolio_volatility(weights):
return np.sqrt(weights.T @ cov_matrix @ weights)
def portfolio_return(weights):
return weights @ mean_returns
def negative_sharpe(weights):
p_return = portfolio_return(weights)
p_vol = portfolio_volatility(weights)
return -(p_return - risk_free_rate) / p_vol
# 约束条件
constraints = (
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}, # 权重和为1
)
# 边界条件
bounds = tuple((0, 1) for _ in range(num_assets)) # 不允许做空
# 初始猜测
initial_weights = np.array([1/num_assets] * num_assets)
if method == 'sharpe':
objective = negative_sharpe
elif method == 'min_volatility':
objective = portfolio_volatility
elif method == 'max_return':
objective = lambda w: -portfolio_return(w)
else:
raise ValueError("Method must be 'sharpe', 'min_volatility', or 'max_return'")
result = minimize(
objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
if result.success:
optimal_weights = result.x
optimal_return = portfolio_return(optimal_weights)
optimal_vol = portfolio_volatility(optimal_weights)
optimal_sharpe = (optimal_return - risk_free_rate) / optimal_vol
return {
'weights': optimal_weights,
'return': optimal_return,
'volatility': optimal_vol,
'sharpe': optimal_sharpe,
'success': True
}
else:
return {'success': False, 'message': result.message}
# 示例:多资产组合优化
# 假设我们有3只股票的收益率数据
# returns_data = pd.DataFrame({
# 'stock1': returns1,
# 'stock2': returns2,
# 'stock3': returns3
# })
# result = portfolio_optimization(returns_data, method='sharpe')
# print(f"最优权重: {result['weights']}")
6.3 风险平价模型
风险平价是一种先进的风险控制方法:
def risk_parity_optimization(returns_df, risk_free_rate=0.02):
"""
风险平价优化
:param returns_df: 多资产收益率DataFrame
:param risk_free_rate: 无风险利率
:return: 风险平价权重
"""
cov_matrix = returns_df.cov() * 252
def risk_contribution(weights):
"""计算每个资产的风险贡献"""
portfolio_vol = np.sqrt(weights.T @ cov_matrix @ weights)
marginal_risk = cov_matrix @ weights / portfolio_vol
risk_contrib = weights * marginal_risk
return risk_contrib
def risk_parity_objective(weights):
"""风险平价目标函数"""
rc = risk_contribution(weights)
# 最小化风险贡献的差异
return np.sum((rc - np.mean(rc))**2)
# 约束条件
constraints = (
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1},
)
bounds = tuple((0, 1) for _ in range(len(returns_df.columns)))
initial_weights = np.array([1/len(returns_df.columns)] * len(returns_df.columns))
result = minimize(
risk_parity_objective,
initial_weights,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
if result.success:
optimal_weights = result.x
rc = risk_contribution(optimal_weights)
return {
'weights': optimal_weights,
'risk_contributions': rc,
'success': True
}
else:
return {'success': False, 'message': result.message}
第七部分:实战案例:构建完整AI选股系统
7.1 系统架构设计
我们将整合前面所有模块,构建一个完整的AI选股系统:
class AIStockSelector:
def __init__(self, data_source='tushare', model_type='xgboost'):
"""
初始化AI选股系统
:param data_source: 数据源
:param model_type: 模型类型
"""
self.data_source = data_source
self.model_type = model_type
self.model = None
self.scaler = None
self.factor_columns = []
def fetch_universe(self, universe_type='csi300'):
"""
获取股票池
:param universe_type: 股票池类型
:return: 股票列表
"""
if universe_type == 'csi300':
# 获取沪深300成分股
df = pro.index_weight(ts_code='000300.SH',
start_date='20230101',
end_date='20231231')
return df['con_code'].unique().tolist()
elif universe_type == 'csi500':
df = pro.index_weight(ts_code='000905.SH',
start_date='20230101',
end_date='20231231')
return df['con_code'].unique().tolist()
else:
# 默认获取所有A股
df = pro.stock_basic(exchange='', list_status='L', fields='ts_code')
return df['ts_code'].tolist()
def build_dataset(self, symbols, start_date, end_date):
"""
构建数据集
:param symbols: 股票列表
:param start_date: 开始日期
:param end_date: 结束日期
:return: 合并的数据集
"""
all_data = []
for symbol in symbols[:50]: # 限制50只股票用于演示
try:
# 获取日线数据
df = get_stock_data(symbol, start_date, end_date)
if len(df) < 100:
continue
# 计算技术指标
df = calculate_technical_indicators(df)
# 添加目标变量
df = create_target_variable(df, forward_period=5)
# 选择特征
factor_cols = ['MA5', 'MA20', 'MACD', 'RSI', 'K', 'D', 'J',
'MOM', 'ROC', 'volatility']
df = df[factor_cols + ['target', 'trade_date', 'close']]
# 删除缺失值
df = df.dropna()
if len(df) > 0:
df['symbol'] = symbol
all_data.append(df)
except Exception as e:
print(f"处理 {symbol} 时出错: {e}")
continue
if not all_data:
return pd.DataFrame()
# 合并所有数据
combined_df = pd.concat(all_data, ignore_index=True)
return combined_df
def train_model(self, df, factor_columns):
"""
训练模型
:param df: 数据集
:param factor_columns: 特征列
"""
self.factor_columns = factor_columns
# 划分数据集
train_df, val_df, test_df = split_dataset(df)
# 标准化特征
X_train = train_df[factor_columns].values
y_train = train_df['target'].values
X_val = val_df[factor_columns].values
y_val = val_df['target'].values
self.scaler = StandardScaler()
X_train_scaled = self.scaler.fit_transform(X_train)
X_val_scaled = self.scaler.transform(X_val)
# 训练模型
if self.model_type == 'xgboost':
self.model, _, _ = train_xgboost(X_train_scaled, y_train, X_val_scaled, y_val)
elif self.model_type == 'random_forest':
self.model, _, _ = train_random_forest(X_train_scaled, y_train, X_val_scaled, y_val)
elif self.model_type == 'neural_network':
self.model, _ = train_neural_network(X_train_scaled, y_train, X_val_scaled, y_val)
else:
raise ValueError(f"不支持的模型类型: {self.model_type}")
print(f"模型训练完成: {self.model_type}")
def predict_and_rank(self, df, top_n=20):
"""
预测并排名
:param df: 需要预测的数据
:param top_n: 选择前N只股票
:return: 排名结果
"""
if self.model is None:
raise ValueError("模型未训练")
# 标准化特征
X = df[self.factor_columns].values
X_scaled = self.scaler.transform(X)
# 预测
if self.model_type == 'xgboost':
dmatrix = xgb.DMatrix(X_scaled, feature_names=self.factor_columns)
predictions = self.model.predict(dmatrix)
elif self.model_type == 'neural_network':
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X_scaled)
predictions = self.model(X_tensor).numpy().flatten()
else:
predictions = self.model.predict(X_scaled)
# 添加预测结果
df_result = df.copy()
df_result['prediction'] = predictions
# 按预测值排序
df_result = df_result.sort_values('prediction', ascending=False)
# 选择前N只
top_stocks = df_result.head(top_n)
return top_stocks
def generate_signals(self, current_date, universe_type='csi300'):
"""
生成交易信号
:param current_date: 当前日期
:param universe_type: 股票池
:return: 交易信号
"""
# 获取股票池
symbols = self.fetch_universe(universe_type)
# 获取最近数据
start_date = (datetime.strptime(current_date, '%Y%m%d') - timedelta(days=100)).strftime('%Y%m%d')
# 构建预测数据集
pred_data = []
for symbol in symbols[:30]: # 限制30只用于演示
try:
df = get_stock_data(symbol, start_date, current_date)
if len(df) < 20:
continue
df = calculate_technical_indicators(df)
df = df.iloc[-1:] # 只取最后一天
df['symbol'] = symbol
pred_data.append(df)
except:
continue
if not pred_data:
return pd.DataFrame()
pred_df = pd.concat(pred_data, ignore_index=True)
# 预测并排名
top_stocks = self.predict_and_rank(pred_df, top_n=10)
return top_stocks[['symbol', 'prediction', 'close', 'MA5', 'MA20', 'RSI']]
# 使用示例
if __name__ == "__main__":
# 初始化系统
ai_selector = AIStockSelector(model_type='xgboost')
# 获取股票池
symbols = ai_selector.fetch_universe('csi300')
print(f"股票池大小: {len(symbols)}")
# 构建数据集
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=3*365)).strftime('%Y%m%d')
print("构建数据集...")
dataset = ai_selector.build_dataset(symbols, start_date, end_date)
print(f"数据集大小: {len(dataset)}")
if len(dataset) > 0:
# 训练模型
factor_cols = ['MA5', 'MA20', 'MACD', 'RSI', 'K', 'D', 'J', 'MOM', 'ROC', 'volatility']
print("训练模型...")
ai_selector.train_model(dataset, factor_cols)
# 生成信号
current_date = end_date
print(f"\n生成{current_date}的交易信号...")
signals = ai_selector.generate_signals(current_date, 'csi300')
if len(signals) > 0:
print("\n推荐买入的股票:")
print(signals.to_string(index=False))
# 保存结果
signals.to_csv(f'signals_{current_date}.csv', index=False)
print(f"\n信号已保存到 signals_{current_date}.csv")
else:
print("未生成有效信号")
else:
print("数据集为空,请检查数据获取")
7.2 模型监控与更新
class ModelMonitor:
def __init__(self, model, scaler, factor_columns):
self.model = model
self.scaler = scaler
self.factor_columns = factor_columns
self.performance_history = []
def track_performance(self, df, predictions, actuals):
"""
跟踪模型性能
:param df: 数据框
:param predictions: 预测值
:param actuals: 实际值
"""
mse = mean_squared_error(actuals, predictions)
mae = mean_absolute_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
# 计算信息系数
ic = np.corrcoef(predictions, actuals)[0, 1]
# 计算分位数预测准确率
df_track = df.copy()
df_track['prediction'] = predictions
df_track['actual'] = actuals
# 预测值前20%的实际表现
top_20 = df_track['prediction'].quantile(0.8)
actual_top_20 = df_track[df_track['prediction'] >= top_20]['actual'].mean()
# 预测值后20%的实际表现
bottom_20 = df_track['prediction'].quantile(0.2)
actual_bottom_20 = df_track[df_track['prediction'] <= bottom_20]['actual'].mean()
performance = {
'date': datetime.now(),
'mse': mse,
'mae': mae,
'r2': r2,
'ic': ic,
'top_20_actual': actual_top_20,
'bottom_20_actual': actual_bottom_20,
'spread': actual_top_20 - actual_bottom_20
}
self.performance_history.append(performance)
return performance
def detect_model_drift(self, recent_performance_threshold=0.05):
"""
检测模型漂移
:param recent_performance_threshold: 最近性能下降阈值
:return: 是否漂移
"""
if len(self.performance_history) < 10:
return False
# 计算最近10次的平均性能
recent_ic = np.mean([p['ic'] for p in self.performance_history[-10:]])
baseline_ic = np.mean([p['ic'] for p in self.performance_history[:5]])
# 如果IC下降超过阈值,认为发生漂移
if (baseline_ic - recent_ic) > recent_performance_threshold:
print(f"警告: 模型性能下降! 基准IC: {baseline_ic:.4f}, 最近IC: {recent_ic:.4f}")
return True
return False
def plot_monitoring(self):
"""绘制监控图表"""
if len(self.performance_history) < 2:
return
dates = [p['date'] for p in self.performance_history]
ics = [p['ic'] for p in self.performance_history]
spreads = [p['spread'] for p in self.performance_history]
fig, axes = plt.subplots(2, 1, figsize=(12, 8))
axes[0].plot(dates, ics, marker='o')
axes[0].axhline(y=0, color='red', linestyle='--')
axes[0].set_title('信息系数(IC)变化')
axes[0].set_ylabel('IC')
axes[1].plot(dates, spreads, marker='s', color='green')
axes[1].set_title('预测分位数Spread')
axes[1].set_ylabel('Spread')
plt.tight_layout()
plt.show()
# 使用监控器
# monitor = ModelMonitor(xgb_model, scaler, factor_columns)
# performance = monitor.track_performance(test_df, predictions, actuals)
# if monitor.detect_model_drift():
# # 触发模型重训练
# pass
第八部分:高级主题与最佳实践
8.1 集成学习与模型融合
def build_ensemble_model(X_train, y_train, X_val, y_val):
"""
构建集成学习模型
:param X_train: 训练特征
:param y_train: 训练目标
:param X_val: 验证特征
:param y_val: 验证目标
:return: 集成模型
"""
from sklearn.ensemble import VotingRegressor
# 训练多个基础模型
models = []
# 随机森林
rf = RandomForestRegressor(n_estimators=100, max_depth=10, random_state=42)
rf.fit(X_train, y_train)
models.append(('rf', rf))
# XGBoost
xgb_params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.1,
'seed': 42
}
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
xgb_model = xgb.train(xgb_params, dtrain, num_boost_round=100, evals=[(dval, 'val')], verbose_eval=False)
models.append(('xgb', xgb_model))
# 线性回归
lr = LinearRegression()
lr.fit(X_train, y_train)
models.append(('lr', lr))
# 创建集成模型(加权平均)
class WeightedEnsemble:
def __init__(self, models, weights):
self.models = models
self.weights = weights
def predict(self, X):
predictions = []
for name, model in self.models:
if name == 'xgb':
dmatrix = xgb.DMatrix(X)
pred = model.predict(dmatrix)
else:
pred = model.predict(X)
predictions.append(pred)
# 加权平均
weighted_pred = np.average(predictions, axis=0, weights=self.weights)
return weighted_pred
# 根据验证集性能确定权重
weights = [0.4, 0.4, 0.2] # 可以根据验证集表现动态调整
ensemble = WeightedEnsemble(models, weights)
# 评估
y_val_pred = ensemble.predict(X_val)
mse = mean_squared_error(y_val, y_val_pred)
r2 = r2_score(y_val, y_val_pred)
print(f"集成模型 - 验证集 MSE: {mse:.6f}, R2: {r2:.4f}")
return ensemble
8.2 超参数优化
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import randint, uniform
def optimize_hyperparameters(X_train, y_train):
"""
超参数优化
:param X_train: 训练特征
:param y_train: 训练目标
:return: 最优参数
"""
# 定义参数分布
param_dist = {
'n_estimators': randint(50, 200),
'max_depth': randint(3, 15),
'min_samples_split': randint(2, 20),
'min_samples_leaf': randint(1, 10),
'max_features': ['sqrt', 'log2', None],
'bootstrap': [True, False]
}
# 初始化模型
rf = RandomForestRegressor(random_state=42, n_jobs=-1)
# 随机搜索
random_search = RandomizedSearchCV(
rf,
param_distributions=param_dist,
n_iter=30,
cv=3,
scoring='neg_mean_squared_error',
random_state=42,
n_jobs=-1,
verbose=1
)
random_search.fit(X_train, y_train)
print("最优参数:", random_search.best_params_)
print("最优分数:", random_search.best_score_)
return random_search.best_params_
# 使用示例
# best_params = optimize_hyperparameters(X_train, y_train)
8.3 避免过拟合的技巧
def create_robust_model(X_train, y_train, X_val, y_val):
"""
构建鲁棒模型,避免过拟合
"""
# 1. 交叉验证
from sklearn.model_selection import TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
# 2. 正则化
from sklearn.linear_model import Ridge
ridge = Ridge(alpha=1.0)
ridge.fit(X_train, y_train)
# 3. 早停(对于XGBoost)
xgb_params = {
'objective': 'reg:squarederror',
'max_depth': 6,
'learning_rate': 0.05,
'subsample': 0.8,
'colsample_bytree': 0.8,
'seed': 42
}
dtrain = xgb.DMatrix(X_train, label=y_train)
dval = xgb.DMatrix(X_val, label=y_val)
xgb_model = xgb.train(
xgb_params,
dtrain,
num_boost_round=1000,
evals=[(dtrain, 'train'), (dval, 'val')],
early_stopping_rounds=50,
verbose_eval=False
)
# 4. 特征选择
from sklearn.feature_selection import SelectKBest, f_regression
selector = SelectKBest(score_func=f_regression, k=5)
X_train_selected = selector.fit_transform(X_train, y_train)
X_val_selected = selector.transform(X_val)
# 5. 集成方法
from sklearn.ensemble import BaggingRegressor
bagging = BaggingRegressor(
base_estimator=RandomForestRegressor(max_depth=5),
n_estimators=10,
random_state=42,
n_jobs=-1
)
bagging.fit(X_train, y_train)
return {
'ridge': ridge,
'xgb': xgb_model,
'bagging': bagging,
'feature_selector': selector
}
def evaluate_robustness(model_dict, X_test, y_test):
"""
评估模型鲁棒性
"""
results = {}
for name, model in model_dict.items():
if name == 'feature_selector':
continue
if name == 'xgb':
dtest = xgb.DMatrix(X_test)
pred = model.predict(dtest)
elif name == 'bagging':
pred = model.predict(X_test)
else:
# 处理特征选择
if 'feature_selector' in model_dict:
X_test_transformed = model_dict['feature_selector'].transform(X_test)
pred = model.predict(X_test_transformed)
else:
pred = model.predict(X_test)
mse = mean_squared_error(y_test, pred)
r2 = r2_score(y_test, pred)
results[name] = {'mse': mse, 'r2': r2}
print(f"{name}: MSE={mse:.6f}, R2={r2:.4f}")
return results
8.4 交易成本优化
def optimize_transaction_costs(df, predictions, transaction_cost=0.001, slippage=0.0005):
"""
优化交易成本
:param df: 数据框
:param predictions: 预测值
:param transaction_cost: 交易成本
:param slippage: 滑点
:return: 优化后的信号
"""
df = df.copy()
df['prediction'] = predictions
# 计算交易信号
df['signal'] = 0
df.loc[df['prediction'] > 0.01, 'signal'] = 1 # 买入
df.loc[df['prediction'] < -0.01, 'signal'] = -1 # 卖出
# 计算信号变化(避免频繁交易)
df['signal_change'] = df['signal'].diff().abs()
# 只保留信号变化的交易日
df['signal'] = df['signal'] * df['signal_change']
# 计算净收益(扣除成本)
df['gross_return'] = df['close'].pct_change() * df['signal'].shift(1)
df['cost'] = (df['signal'].diff().abs() * (transaction_cost + slippage))
df['net_return'] = df['gross_return'] - df['cost']
# 计算累计收益
df['cumulative_gross'] = (1 + df['gross_return']).cumprod()
df['cumulative_net'] = (1 + df['net_return']).cumprod()
# 统计
total_cost = df['cost'].sum()
gross_profit = (df['cumulative_gross'].iloc[-1] - 1) * 100
net_profit = (df['cumulative_net'].iloc[-1] - 1) * 100
print(f"总交易成本: {total_cost:.4f}")
print(f"毛利润: {gross_profit:.2f}%")
print(f"净利润: {net_profit:.2f}%")
print(f"成本侵蚀: {gross_profit - net_profit:.2f}%")
return df[['close', 'signal', 'gross_return', 'net_return', 'cumulative_gross', 'cumulative_net']]
第九部分:实战建议与注意事项
9.1 数据质量的重要性
数据是量化投资的基石。以下是一些数据质量检查的最佳实践:
def data_quality_check(df):
"""
数据质量检查
:param df: 数据框
:return: 质量报告
"""
report = {}
# 1. 完整性检查
report['total_rows'] = len(df)
report['missing_values'] = df.isnull().sum().to_dict()
report['missing_rate'] = (df.isnull().sum() / len(df)).to_dict()
# 2. 一致性检查
# 检查价格是否合理
price_cols = ['open', 'high', 'low', 'close']
for col in price_cols:
if col in df.columns:
# 高价不低于低价
invalid_high_low = (df['high'] < df['low']).sum()
report[f'invalid_high_low_{col}'] = invalid_high_low
# 收盘价在当日范围内
invalid_close = ((df['close'] < df['low']) | (df['close'] > df['high'])).sum()
report[f'invalid_close_{col}'] = invalid_close
# 3. 异常值检查
returns = df['close'].pct_change().dropna()
if len(returns) > 0:
report['extreme_returns'] = {
'max': returns.max(),
'min': returns.min(),
'outliers_3sigma': ((returns > returns.mean() + 3*returns.std()) |
(returns < returns.mean() - 3*returns.std())).sum()
}
# 4. 时间序列完整性
if 'trade_date' in df.columns:
date_range = pd.date_range(start=df['trade_date'].min(),
end=df['trade_date'].max(),
freq='D')
missing_dates = date_range.difference(df['trade_date'])
report['missing_dates'] = len(missing_dates)
report['missing_date_ratio'] = len(missing_dates) / len(date_range)
# 5. 重复值检查
report['duplicates'] = df.duplicated().sum()
return report
# 使用示例
# quality_report = data_quality_check(df_clean)
# print(quality_report)
9.2 避免未来函数
未来函数(Look-ahead bias)是量化投资中最常见的错误之一:
def avoid_look_ahead_bias(df):
"""
避免未来函数的正确做法
"""
# 错误做法:使用未来数据计算因子
# df['MA5'] = df['close'].rolling(5).mean() # 这是正确的
# df['future_return'] = df['close'].shift(-5) / df['close'] - 1 # 这是目标变量,不是因子
# 正确做法:确保因子计算只使用历史数据
df_correct = df.copy()
# 技术指标计算
df_correct['MA5'] = df_correct['close'].rolling(5).mean()
df_correct['MA20'] = df_correct['close'].rolling(20).mean()
# 目标变量(未来收益)
df_correct['target'] = df_correct['close'].shift(-5) / df_correct['close'] - 1
# 删除最后5行(没有目标值)
df_correct = df_correct.iloc[:-5]
# 确保训练时不会用到未来信息
# 在训练前,确保特征矩阵X只包含当前时刻的信息
return df_correct
9.3 样本外测试
def out_of_sample_test(model, train_df, test_df, factor_columns):
"""
样本外测试
:param model: 训练好的模型
:param train_df: 训练集
:param test_df: 测试集
:param factor_columns: 特征列
:return: 样本外表现
"""
# 训练集表现
X_train = train_df[factor_columns].values
y_train = train_df['target'].values
if hasattr(model, 'predict'):
train_pred = model.predict(X_train)
else:
dtrain = xgb.DMatrix(X_train, feature_names=factor_columns)
train_pred = model.predict(dtrain)
train_r2 = r2_score(y_train, train_pred)
# 测试集表现
X_test = test_df[factor_columns].values
y_test = test_df['target'].values
if hasattr(model, 'predict'):
test_pred = model.predict(X_test)
else:
dtest = xgb.DMatrix(X_test, feature_names=factor_columns)
test_pred = model.predict(dtest)
test_r2 = r2_score(y_test, test_pred)
# 性能衰减
decay = train_r2 - test_r2
print(f"训练集 R2: {train_r2:.4f}")
print(f"测试集 R2: {test_r2:.4f}")
print(f"性能衰减: {decay:.4f}")
# 判断是否过拟合
if decay > 0.1:
print("警告: 模型可能存在过拟合")
elif decay < 0:
print("模型在样本外表现更好")
else:
print("模型表现正常")
return {
'train_r2': train_r2,
'test_r2': test_r2,
'decay': decay,
'is_overfit': decay > 0.1
}
9.4 模型解释性
def explain_model_predictions(model, X_sample, feature_names, model_type='xgboost'):
"""
解释模型预测
"""
if model_type == 'xgboost':
# SHAP值
import shap
explainer = shap.TreeExplainer(model)
shap_values = explainer.shap_values(X_sample)
# 绘制SHAP摘要图
shap.summary_plot(shap_values, X_sample, feature_names=feature_names)
# 单个预测解释
shap.force_plot(explainer.expected_value, shap_values[0], X_sample[0], feature_names=feature_names)
elif model_type == 'random_forest':
# 特征重要性
importances = model.feature_importances_
indices = np.argsort(importances)[::-1]
print("特征重要性:")
for f in range(len(feature_names)):
print(f"{f+1}. {feature_names[indices[f]]}: {importances[indices[f]]:.4f}")
# 部分依赖图
from sklearn.inspection import PartialDependenceDisplay
fig, ax = plt.subplots(figsize=(10, 6))
PartialDependenceDisplay.from_estimator(model, X_sample, feature_names, ax=ax)
plt.show()
elif model_type == 'neural_network':
# 使用LIME解释
from lime import lime_tabular
explainer = lime_tabular.LimeTabularExplainer(
X_sample,
feature_names=feature_names,
mode='regression'
)
exp = explainer.explain_instance(X_sample[0], model.predict)
exp.show_in_notebook()
# 使用示例
# explain_model_predictions(xgb_model, X_val, factor_columns, 'xgboost')
第十部分:总结与展望
10.1 核心要点回顾
通过本文的学习,您应该已经掌握了:
- 数据获取与预处理:如何从可靠数据源获取高质量数据,并进行清洗和标准化
- 特征工程:构建有效的技术指标和基本面因子
- 模型构建:从线性回归到XGBoost、神经网络等多种模型
- 回测框架:实现包含交易成本、滑点的完整回测系统
- 风险控制:计算多种风险指标,实现组合优化
- 实战系统:整合所有模块构建AI选股系统
10.2 持续改进方向
量化投资是一个持续迭代的过程:
数据层面:
- 探索另类数据(新闻、社交媒体、卫星图像)
- 提高数据频率(从日线到分钟线)
- 增强数据质量
模型层面:
- 尝试深度学习(LSTM、Transformer)
- 集成学习与模型融合
- 在线学习与模型更新
策略层面:
- 多因子模型
- 行业中性策略
- 风险平价策略
系统层面:
- 实时交易系统
- 模型监控与自动重训练
- 分布式计算
10.3 风险提示
量化投资并非稳赚不赔的魔法,需要注意:
- 模型风险:历史表现不代表未来,模型可能失效
- 数据风险:数据错误会导致整个系统失效
- 市场风险:极端市场环境下,所有策略都可能亏损
- 技术风险:系统故障、网络问题等
建议:
- 始终进行严格的样本外测试
- 分散投资,不要依赖单一策略
- 保持充足的风险准备金
- 持续监控和优化系统
10.4 学习资源推荐
书籍:
- 《量化投资:以Python为工具》
- 《主动投资组合管理》
- 《交易策略评估与最佳化》
网站:
- QuantConnect
- Quantopian社区
- Kaggle金融竞赛
Python库:
- Zipline:回测框架
- PyAlgoTrade:算法交易
- Backtrader:专业回测
通过本文提供的完整代码和详细解释,您应该能够构建一个基础但完整的AI量化投资系统。记住,成功的量化投资需要持续的学习、实验和改进。祝您在量化投资的道路上取得成功!
