引言:均值回归理论在资产配置中的核心地位
均值回归理论(Mean Reversion Theory)是金融学中最古老且最具影响力的理论之一,它认为资产价格在长期内会趋向于其历史平均值或内在价值。这一理论最早由英国统计学家弗朗西斯·高尔顿在19世纪末提出,后来被广泛应用于股票、债券、外汇和商品等各类资产的投资实践中。
在现代资产配置中,均值回归理论为投资者提供了一个系统性的框架,帮助我们在市场波动中识别被低估或高估的资产,从而做出更理性的投资决策。与传统的”买入持有”策略相比,基于均值回归的资产配置策略更加灵活,能够根据市场估值水平动态调整投资组合,从而在控制风险的同时提高收益。
本文将深入探讨均值回归理论在资产配置中的实战应用,包括理论基础、量化模型构建、实战策略设计、常见误区规避等关键内容,并通过详细的代码示例和实际案例,帮助读者掌握这一强大的投资工具。
均值回归理论的数学基础与统计验证
理论核心概念
均值回归理论建立在以下核心假设之上:
- 资产具有内在价值:每种资产都有一个相对稳定的内在价值或均衡价格
- 价格偏离是暂时的:市场价格会围绕内在价值波动,但长期会回归
- 统计规律性:价格偏离均值的程度和持续时间具有一定的统计规律
数学模型
最简单的均值回归模型可以用以下公式表示:
P(t) = μ + θ * (P(t-1) - μ) + ε(t)
其中:
- P(t) 是时间 t 的资产价格
- μ 是长期均值(内在价值)
- θ 是回归系数(-1 < θ < 0)
- ε(t) 是白噪声
当 θ 为负值时,模型表现出均值回归特性:如果上一期价格高于均值,本期价格倾向于下降;反之亦然。
统计检验方法
在实际应用中,我们需要通过统计方法验证资产价格是否具有均值回归特性。常用的检验方法包括:
- 单位根检验(ADF检验):检验时间序列是否平稳
- 方差比检验:检验价格偏离均值的程度
- 赫斯特指数:衡量时间序列的长期记忆性
以下是一个使用Python进行ADF检验的完整示例:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller
import yfinance as yf
def test_mean_reversion(stock_symbol, start_date, end_date):
"""
测试股票价格的均值回归特性
"""
# 获取股票数据
stock_data = yf.download(stock_symbol, start=start_date, end=end_date)
prices = stock_data['Adj Close']
# 计算对数收益率
log_prices = np.log(prices)
# 进行ADF检验
adf_result = adfuller(log_prices)
print(f"ADF检验结果 ({stock_symbol}):")
print(f"ADF统计量: {adf_result[0]:.4f}")
print(f"P值: {adf_result[1]:.4f}")
print(f"临界值:")
for key, value in adf_result[4].items():
print(f" {key}: {value:.4f}")
# 判断是否平稳
if adf_result[1] < 0.05:
print(f"结论: {stock_symbol} 的价格序列是平稳的,具有均值回归特性")
else:
print(f"结论: {stock_symbol} 的价格序列是非平稳的,均值回归特性不明显")
# 可视化
plt.figure(figsize=(12, 6))
plt.plot(prices.index, prices.values, label=f'{stock_symbol} 价格')
plt.plot(prices.index, [prices.mean()] * len(prices),
'r--', label=f'均值: {prices.mean():.2f}')
plt.title(f'{stock_symbol} 价格与均值')
plt.xlabel('日期')
plt.ylabel('价格')
plt.legend()
plt.grid(True)
plt.show()
return adf_result
# 测试示例
if __name__ == "__main__":
# 测试苹果公司股票
result = test_mean_reversion('AAPL', '2020-01-01', '2023-12-31')
实际案例分析
让我们以标普500指数为例,验证其均值回归特性。通过计算2000-2023年的市盈率(P/E)数据,我们可以观察到明显的均值回归现象:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
# 模拟标普500历史市盈率数据
np.random.seed(42)
years = np.arange(2000, 2024)
pe_ratio = 25 + 5 * np.sin(years/3) + np.random.normal(0, 2, len(years))
pe_ratio = np.clip(pe_ratio, 10, 35) # 限制在合理范围内
# 计算均值和标准差
mean_pe = np.mean(pe_ratio)
std_pe = np.std(pe_ratio)
# 计算Z-score
z_scores = (pe_ratio - mean_pe) / std_pe
# 可视化
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# 市盈率走势
ax1.plot(years, pe_ratio, 'b-o', linewidth=2, markersize=4)
ax1.axhline(y=mean_pe, color='r', linestyle='--', label=f'均值: {mean_pe:.2f}')
ax1.fill_between(years, mean_pe - std_pe, mean_pe + std_pe,
alpha=0.2, color='gray', label='±1标准差')
ax1.set_title('标普500历史市盈率 (2000-2023)')
ax1.set_ylabel('市盈率')
ax1.legend()
ax1.grid(True)
# Z-score分布
ax2.plot(years, z_scores, 'g-s', linewidth=2, markersize=4)
ax2.axhline(y=0, color='r', linestyle='--')
ax2.axhline(y=1, color='orange', linestyle='--', label='+1σ (高估)')
ax2.axhline(y=-1, color='orange', linestyle='--', label='-1σ (低估)')
ax2.fill_between(years, -1, 1, alpha=0.1, color='blue')
ax2.set_title('Z-score标准化偏离度')
ax2.set_ylabel('Z-score')
ax2.set_xlabel('年份')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.show()
# 统计分析
print(f"统计摘要:")
print(f"均值: {mean_pe:.2f}")
print(f"标准差: {std_pe:.2f}")
print(f"偏离均值>1σ的概率: {np.mean(np.abs(z_scores) > 1):.2%}")
print(f"偏离均值>2σ的概率: {np.mean(np.abs(z_scores) > 2):.2%}")
这个案例清晰地展示了均值回归的特征:市盈率在长期均值附近波动,极端值出现的概率较低,且偏离程度越大,回归趋势越明显。
基于均值回归的资产配置模型构建
模型架构设计
一个完整的均值回归资产配置模型应包含以下核心组件:
- 估值指标计算模块:计算各类资产的相对估值水平
- 均值回归信号生成模块:根据估值偏离度生成买卖信号
- 风险控制模块:设置止损和仓位管理规则
- 组合优化模块:动态调整资产权重
估值指标体系
不同类型的资产需要不同的估值指标:
- 股票:市盈率(P/E)、市净率(P/B)、市销率(P/S)、股息率
- 债券:到期收益率、信用利差、期限利差
- 商品:现货与期货价差、库存水平、供需比
- 房地产:租金收益率、房价收入比、空置率
信号生成算法
基于Z-score的信号生成是最常用的方法:
class MeanReversionStrategy:
def __init__(self, lookback_period=252, z_threshold=1.5):
"""
初始化均值回归策略
参数:
lookback_period: 回看周期(交易日)
z_threshold: Z-score阈值,超过此值触发交易信号
"""
self.lookback_period = lookback_period
self.z_threshold = z_threshold
self.position = 0 # 当前仓位:-1做空, 0中性, 1做多
def calculate_signals(self, data):
"""
计算交易信号
参数:
data: 包含价格序列的DataFrame
返回:
signals: 交易信号 DataFrame
"""
# 计算滚动均值和标准差
rolling_mean = data['price'].rolling(window=self.lookback_period).mean()
rolling_std = data['price'].rolling(window=self.lookback_period).std()
# 计算Z-score
data['z_score'] = (data['price'] - rolling_mean) / rolling_std
# 生成信号
data['signal'] = 0
data.loc[data['z_score'] > self.z_threshold, 'signal'] = -1 # 高估,做空
data.loc[data['z_score'] < -self.z_threshold, 'signal'] = 1 # 低估,做多
# 平仓信号
data.loc[abs(data['z_score']) < 0.5, 'signal'] = 0
return data
def backtest(self, data, initial_capital=100000):
"""
回测策略
参数:
data: 包含价格和信号的数据
initial_capital: 初始资金
返回:
results: 回测结果
"""
capital = initial_capital
position = 0
trades = []
for i in range(1, len(data)):
current_signal = data['signal'].iloc[i]
price = data['price'].iloc[i]
prev_price = data['price'].iloc[i-1]
# 交易逻辑
if current_signal != position:
if current_signal == 1 and position == 0: # 开多仓
shares = capital / price
position = 1
trades.append({
'date': data.index[i],
'action': 'BUY',
'price': price,
'shares': shares
})
elif current_signal == -1 and position == 0: # 开空仓
shares = capital / price
position = -1
trades.append({
'date': data.index[i],
'action': 'SELL_SHORT',
'price': price,
'shares': shares
})
elif current_signal == 0 and position != 0: # 平仓
if position == 1:
action = 'SELL'
profit = shares * (price - prev_price)
else:
action = 'COVER'
profit = shares * (prev_price - price)
capital += profit
trades.append({
'date': data.index[i],
'action': action,
'price': price,
'profit': profit
})
position = 0
shares = 0
# 计算最终收益
final_value = capital
total_return = (final_value - initial_capital) / initial_capital
return {
'initial_capital': initial_capital,
'final_value': final_value,
'total_return': total_return,
'trades': pd.DataFrame(trades)
}
# 完整示例:股票均值回归策略回测
import yfinance as yf
import pandas as pd
import numpy as np
# 获取数据
def run_strategy():
# 下载标普500 ETF数据
data = yf.download('SPY', start='2010-01-01', end='2023-12-31')
data = data[['Adj Close']].rename(columns={'Adj Close': 'price'})
# 初始化策略
strategy = MeanReversionStrategy(lookback_period=252, z_threshold=1.5)
# 计算信号
data_with_signals = strategy.calculate_signals(data)
# 回测
results = strategy.backtest(data_with_signals)
print(f"回测结果:")
print(f"初始资金: ${results['initial_capital']:,.2f}")
print(f"最终价值: ${results['final_value']:,.2f}")
print(f"总收益率: {results['total_return']:.2%}")
print(f"交易次数: {len(results['trades'])}")
# 可视化
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
# 价格和信号
ax1.plot(data_with_signals.index, data_with_signals['price'], label='SPY价格')
ax1.plot(data_with_signals.index, data_with_signals['price'] * (data_with_signals['signal'] > 0),
'g^', markersize=4, label='做多信号')
ax1.plot(data_with_signals.index, data_with_signals['price'] * (data_with_signals['signal'] < 0),
'rv', markersize=4, label='做空信号')
ax1.set_title('SPY价格与交易信号')
ax1.legend()
ax1.grid(True)
# Z-score
ax2.plot(data_with_signals.index, data_with_signals['z_score'], label='Z-score')
ax2.axhline(y=1.5, color='r', linestyle='--', label='高估阈值')
ax2.axhline(y=-1.5, color='g', linestyle='--', label='低估阈值')
ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
ax2.set_title('Z-score偏离度')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.show()
return results
# 运行策略
if __name__ == "__main__":
results = run_strategy()
多资产均值回归配置模型
对于多资产配置,我们需要构建更复杂的模型:
class MultiAssetMeanReversion:
def __init__(self, assets, lookback=252):
self.assets = assets
self.lookback = lookback
self.weights = {}
def calculate_relative_valuation(self, data_dict):
"""
计算相对估值指标
"""
valuations = {}
for asset, data in data_dict.items():
# 计算滚动均值和标准差
rolling_mean = data.rolling(window=self.lookback).mean()
rolling_std = data.rolling(window=self.lookback).std()
# 计算Z-score
z_score = (data - rolling_mean) / rolling_std
valuations[asset] = z_score.iloc[-1] # 取最新值
return valuations
def optimize_weights(self, valuations):
"""
根据估值水平优化权重
"""
# 反向权重:估值越低,权重越高
raw_weights = {asset: -z_score for asset, z_score in valuations.items()}
# 标准化为正权重
total = sum(abs(w) for w in raw_weights.values())
weights = {asset: abs(w) / total for asset, w in raw_weights.items()}
return weights
def generate_portfolio(self, data_dict):
"""
生成投资组合
"""
valuations = self.calculate_relative_valuation(data_dict)
weights = self.optimize_weights(valuations)
print("当前估值水平 (Z-score):")
for asset, z in valuations.items():
status = "低估" if z < -1 else "高估" if z > 1 else "合理"
print(f" {asset}: {z:.2f} ({status})")
print("\n建议配置权重:")
for asset, weight in weights.items():
print(f" {asset}: {weight:.1%}")
return weights, valuations
# 多资产配置示例
def multi_asset_example():
# 模拟数据
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
# 创建不同资产的估值指标
assets_data = {
'股票': pd.Series(np.random.normal(0, 1, len(dates)) + 0.5, index=dates),
'债券': pd.Series(np.random.normal(0, 1, len(dates)) - 0.3, index=dates),
'商品': pd.Series(np.random.normal(0, 1, len(dates)) + 0.8, index=dates),
'房地产': pd.Series(np.random.normal(0, 1, len(dates)) - 0.1, index=dates)
}
# 转换为DataFrame
df = pd.DataFrame(assets_data)
# 初始化模型
model = MultiAssetMeanReversion(list(assets_data.keys()))
# 生成配置
weights, valuations = model.generate_portfolio(df)
return weights, valuations
if __name__ == "__main__":
multi_asset_example()
实战策略:捕捉价值回归机会
策略一:基于市盈率的股票轮动策略
核心思想:在不同估值水平的股票之间轮动,买入低估值股票,卖出高估值股票。
实施步骤:
- 计算股票组合的滚动市盈率
- 标准化为Z-score
- 设定买入/卖出阈值
- 定期再平衡
def pe_rotation_strategy():
"""
市盈率轮动策略
"""
# 模拟三只股票的市盈率数据
np.random.seed(42)
dates = pd.date_range('2015-01-01', '2023-12-31', freq='M')
# 创建不同股票的PE序列(具有均值回归特性)
def create_pe_series(mean, volatility, trend=0):
pe = mean + trend * np.arange(len(dates)) / 100
pe += np.random.normal(0, volatility, len(dates))
pe = np.clip(pe, 5, 40) # 限制在合理范围
return pd.Series(pe, index=dates)
stocks = {
'Stock_A': create_pe_series(20, 5, 0.1), # 稳定增长
'Stock_B': create_pe_series(15, 8, -0.05), # 价值型
'Stock_C': create_pe_series(25, 10, 0.2) # 成长型
}
pe_df = pd.DataFrame(stocks)
# 计算Z-score
z_scores = pe_df.apply(lambda x: (x - x.rolling(24).mean()) / x.rolling(24).std())
# 生成信号:买入Z-score < -1的股票,卖出Z-score > 1的股票
signals = pd.DataFrame(index=z_scores.index, columns=z_scores.columns)
signals[z_scores < -1] = 1 # 买入
signals[z_scores > 1] = -1 # 卖出
signals.fillna(0, inplace=True)
# 计算策略收益
returns = pe_df.pct_change().fillna(0)
strategy_returns = (signals.shift(1) * returns).sum(axis=1) / 3 # 等权重
# 累计收益
cumulative_returns = (1 + strategy_returns).cumprod()
# 可视化
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
# PE走势
for stock in pe_df.columns:
axes[0].plot(pe_df.index, pe_df[stock], label=stock)
axes[0].set_title('股票市盈率走势')
axes[0].legend()
axes[0].grid(True)
# Z-score
for stock in z_scores.columns:
axes[1].plot(z_scores.index, z_scores[stock], label=stock)
axes[1].axhline(y=1, color='r', linestyle='--', alpha=0.5)
axes[1].axhline(y=-1, color='g', linestyle='--', alpha=0.5)
axes[1].set_title('Z-score偏离度')
axes[1].legend()
axes[1].grid(True)
# 累计收益
axes[2].plot(cumulative_returns.index, cumulative_returns.values,
linewidth=2, label='策略收益')
axes[2].axhline(y=1, color='black', linestyle='-', alpha=0.3)
axes[2].set_title('策略累计收益')
axes[2].legend()
axes[2].grid(True)
plt.tight_layout()
plt.show()
# 统计摘要
print("策略表现统计:")
print(f"总收益率: {cumulative_returns.iloc[-1] - 1:.2%}")
print(f"年化收益率: {(cumulative_returns.iloc[-1] ** (12/len(dates)) - 1):.2%}")
print(f"最大回撤: {(cumulative_returns / cumulative_returns.cummax() - 1).min():.2%}")
print(f"夏普比率: {strategy_returns.mean() / strategy_returns.std() * np.sqrt(12):.2f}")
if __name__ == "__main__":
pe_rotation_strategy()
策略二:债券久期轮换策略
核心思想:根据利率预期和债券估值,在不同久期的债券之间轮换。
def bond_duration_strategy():
"""
债券久期轮换策略
"""
# 模拟不同久期债券的收益率数据
np.random.seed(42)
dates = pd.date_range('2018-01-01', '2023-12-31', freq='M')
# 创建收益率曲线数据
yields = {
'1Y': 2.0 + 0.1 * np.arange(len(dates)) + np.random.normal(0, 0.2, len(dates)),
'5Y': 2.5 + 0.15 * np.arange(len(dates)) + np.random.normal(0, 0.15, len(dates)),
'10Y': 2.8 + 0.12 * np.arange(len(dates)) + np.random.normal(0, 0.1, len(dates)),
'30Y': 3.0 + 0.08 * np.arange(len(dates)) + np.random.normal(0, 0.08, len(dates))
}
yield_df = pd.DataFrame(yields, index=dates)
# 计算期限利差
yield_df['Spread_10Y-1Y'] = yield_df['10Y'] - yield_df['1Y']
yield_df['Spread_30Y-5Y'] = yield_df['30Y'] - yield_df['5Y']
# 计算滚动均值和Z-score
lookback = 24
for spread in ['Spread_10Y-1Y', 'Spread_30Y-5Y']:
yield_df[f'{spread}_Z'] = (yield_df[spread] -
yield_df[spread].rolling(lookback).mean()) / \
yield_df[spread].rolling(lookback).std()
# 信号生成:当期限利差扩大时,增加长久期债券
signals = pd.DataFrame(index=dates, columns=['Short', 'Medium', 'Long'])
signals['Short'] = 0
signals['Medium'] = 0
signals['Long'] = 0
# 规则:Z-score > 1,增加长久期;Z-score < -1,增加短久期
long_signal = (yield_df['Spread_10Y-1Y_Z'] > 1) | (yield_df['Spread_30Y-5Y_Z'] > 1)
short_signal = (yield_df['Spread_10Y-1Y_Z'] < -1) | (yield_df['Spread_30Y-5Y_Z'] < -1)
signals.loc[long_signal, 'Long'] = 0.6
signals.loc[long_signal, 'Medium'] = 0.3
signals.loc[long_signal, 'Short'] = 0.1
signals.loc[short_signal, 'Long'] = 0.1
signals.loc[short_signal, 'Medium'] = 0.3
signals.loc[short_signal, 'Short'] = 0.6
# 中性状态
neutral = ~long_signal & ~short_signal
signals.loc[neutral, 'Long'] = 0.25
signals.loc[neutral, 'Medium'] = 0.5
signals.loc[neutral, 'Short'] = 0.25
# 计算收益
returns = yield_df[['1Y', '5Y', '10Y']].pct_change().fillna(0)
strategy_returns = (signals.shift(1) * returns).sum(axis=1)
# 累计收益
cumulative_returns = (1 + strategy_returns).cumprod()
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# 收益率曲线
for col in ['1Y', '5Y', '10Y']:
axes[0].plot(yield_df.index, yield_df[col], label=f'{col}收益率')
axes[0].set_title('不同期限债券收益率')
axes[0].legend()
axes[0].grid(True)
# 策略收益
axes[1].plot(cumulative_returns.index, cumulative_returns.values,
linewidth=2, label='久期轮换策略')
axes[1].axhline(y=1, color='black', linestyle='-', alpha=0.3)
axes[1].set_title('策略累计收益')
axes[1].legend()
axes[1].grid(True)
plt.tight_layout()
plt.show()
print("债券久期轮换策略表现:")
print(f"总收益率: {cumulative_returns.iloc[-1] - 1:.2%}")
print(f"年化收益率: {(cumulative_returns.iloc[-1] ** (12/len(dates)) - 1):.2%}")
print(f"夏普比率: {strategy_returns.mean() / strategy_returns.std() * np.sqrt(12):.2f}")
if __name__ == "__main__":
bond_duration_strategy()
策略三:跨资产均值回归配置
核心思想:在股票、债券、商品、现金四大类资产间进行动态配置,根据各类资产的估值水平调整权重。
def cross_asset_allocation():
"""
跨资产均值回归配置
"""
# 模拟各类资产的估值指标(Z-score)
np.random.seed(42)
dates = pd.date_range('2015-01-01', '2023-12-31', freq='M')
# 创建估值序列(具有均值回归特性)
assets = {
'Stocks': np.random.normal(0, 1, len(dates)) + 0.2,
'Bonds': np.random.normal(0, 1, len(dates)) - 0.1,
'Commodities': np.random.normal(0, 1, len(dates)) + 0.3,
'Cash': np.zeros(len(dates)) # 现金始终为0
}
# 添加均值回归特性
for asset in assets:
if asset != 'Cash':
assets[asset] = 0.9 * np.roll(assets[asset], 1) + 0.1 * assets[asset]
assets[asset][0] = 0
valuation_df = pd.DataFrame(assets, index=dates)
# 计算配置权重
def calculate_weights(valuation_series):
"""
根据估值计算权重(反向配置)
"""
# 反向配置:估值越低,权重越高
raw_weights = -valuation_series
# 平移使最小值为正
raw_weights = raw_weights - raw_weights.min() + 0.1
# 归一化
weights = raw_weights / raw_weights.sum()
return weights
# 滚动计算权重
weights_df = pd.DataFrame(index=dates, columns=['Stocks', 'Bonds', 'Commodities', 'Cash'])
lookback = 12
for i in range(lookback, len(dates)):
recent_vals = valuation_df.iloc[i-lookback:i]
avg_valuation = recent_vals.mean()
weights = calculate_weights(avg_valuation)
weights_df.iloc[i] = weights
# 填充前期
weights_df.iloc[:lookback] = weights_df.iloc[lookback]
# 模拟各类资产收益
returns_df = pd.DataFrame(index=dates, columns=['Stocks', 'Bonds', 'Commodities', 'Cash'])
returns_df['Stocks'] = np.random.normal(0.008, 0.04, len(dates))
returns_df['Bonds'] = np.random.normal(0.003, 0.015, len(dates))
returns_df['Commodities'] = np.random.normal(0.005, 0.05, len(dates))
returns_df['Cash'] = np.random.normal(0.001, 0.001, len(dates))
# 计算策略收益
strategy_returns = (weights_df.shift(1) * returns_df).sum(axis=1)
cumulative_returns = (1 + strategy_returns).cumprod()
# 基准:60/40股债组合
benchmark_weights = pd.DataFrame({
'Stocks': [0.6] * len(dates),
'Bonds': [0.4] * len(dates),
'Commodities': [0] * len(dates),
'Cash': [0] * len(dates)
}, index=dates)
benchmark_returns = (benchmark_weights.shift(1) * returns_df).sum(axis=1)
benchmark_cumulative = (1 + benchmark_returns).cumprod()
# 可视化
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
# 估值水平
for asset in valuation_df.columns:
axes[0].plot(valuation_df.index, valuation_df[asset], label=asset)
axes[0].set_title('各类资产估值水平 (Z-score)')
axes[0].legend()
axes[0].grid(True)
axes[0].axhline(y=0, color='black', linestyle='-', alpha=0.3)
# 配置权重
for asset in weights_df.columns:
axes[1].plot(weights_df.index, weights_df[asset] * 100, label=asset)
axes[1].set_title('动态配置权重 (%)')
axes[1].legend()
axes[1].grid(True)
axes[1].set_ylabel('权重 (%)')
# 策略对比
axes[2].plot(cumulative_returns.index, cumulative_returns.values,
linewidth=2, label='均值回归策略')
axes[2].plot(benchmark_cumulative.index, benchmark_cumulative.values,
linewidth=2, linestyle='--', label='60/40基准')
axes[2].set_title('策略收益对比')
axes[2].legend()
axes[2].grid(True)
axes[2].set_ylabel('累计收益')
plt.tight_layout()
plt.show()
# 统计对比
print("策略表现对比:")
print(f"均值回归策略 - 总收益: {cumulative_returns.iloc[-1] - 1:.2%}, 年化: {(cumulative_returns.iloc[-1] ** (12/len(dates)) - 1):.2%}")
print(f"60/40基准 - 总收益: {benchmark_cumulative.iloc[-1] - 1:.2%}, 年化: {(benchmark_cumulative.iloc[-1] ** (12/len(dates)) - 1):.2%}")
print(f"超额收益: {cumulative_returns.iloc[-1] / benchmark_cumulative.iloc[-1] - 1:.2%}")
if __name__ == "__main__":
cross_asset_allocation()
常见误区与风险规避
误区一:过度依赖历史均值
问题:将历史均值视为固定不变的锚点,忽视基本面变化。
案例:2000年互联网泡沫时期,科技股的市盈率远超历史均值,但基本面(盈利增长)也在发生根本性变化。简单做空会导致巨大损失。
解决方案:
- 动态均值:使用滚动均值而非固定均值
- 基本面调整:考虑盈利增长率、利率环境等因素
- 分位数分析:使用历史分位数而非绝对值
def dynamic_mean_adjustment():
"""
动态均值调整示例
"""
np.random.seed(42)
dates = pd.date_range('2010-01-01', '2023-12-31', freq='M')
# 模拟PE数据,包含结构性变化
pe = 15 + 0.1 * np.arange(len(dates)) + np.random.normal(0, 2, len(dates))
pe[100:150] += 10 # 模拟结构性上升
# 传统固定均值
fixed_mean = np.mean(pe[:100])
# 动态滚动均值
rolling_mean = pd.Series(pe).rolling(60).mean()
# 考虑趋势的动态均值(指数加权)
ewm_mean = pd.Series(pe).ewm(span=60).mean()
# 可视化
plt.figure(figsize=(14, 7))
plt.plot(dates, pe, label='实际PE', alpha=0.7)
plt.plot(dates, [fixed_mean] * len(dates), 'r--', label=f'固定均值: {fixed_mean:.2f}')
plt.plot(dates, rolling_mean, 'g-', label='滚动均值(60M)')
plt.plot(dates, ewm_mean, 'b-.', label='指数加权均值')
plt.title('不同均值计算方法对比')
plt.legend()
plt.grid(True)
plt.show()
# 计算信号差异
fixed_signal = (pe > fixed_mean + 3).astype(int) - (pe < fixed_mean - 3).astype(int)
dynamic_signal = (pe > rolling_mean + 3).astype(int) - (pe < rolling_mean - 3).astype(int)
print("信号对比:")
print(f"固定均值信号次数: {np.sum(fixed_signal != 0)}")
print(f"动态均值信号次数: {np.sum(dynamic_signal != 0)}")
print(f"信号差异率: {np.mean(fixed_signal != dynamic_signal):.2%}")
if __name__ == "__main__":
dynamic_mean_adjustment()
误区二:忽视交易成本
问题:频繁交易导致成本侵蚀收益。
解决方案:
- 降低交易频率:使用更长的信号周期
- 成本敏感优化:在信号生成中考虑交易成本
- 阈值调整:提高触发阈值,减少无效交易
def transaction_cost_aware_strategy():
"""
考虑交易成本的策略优化
"""
np.random.seed(42)
dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
# 模拟价格数据
price = 100 + np.cumsum(np.random.normal(0, 1, len(dates)))
price = pd.Series(price, index=dates)
# 交易成本参数
cost_per_trade = 0.001 # 0.1% per trade
slippage = 0.0005 # 0.05% slippage
def backtest_with_costs(threshold, lookback):
signals = pd.Series(0, index=dates)
rolling_mean = price.rolling(lookback).mean()
rolling_std = price.rolling(lookback).std()
z_score = (price - rolling_mean) / rolling_std
signals[z_score > threshold] = -1
signals[z_score < -threshold] = 1
signals[abs(z_score) < 0.5] = 0
# 计算收益(含成本)
position = 0
returns = []
total_cost = 0
for i in range(1, len(dates)):
signal = signals.iloc[i]
prev_signal = signals.iloc[i-1]
price_change = price.iloc[i] / price.iloc[i-1] - 1
# 交易信号变化时产生成本
if signal != prev_signal:
cost = cost_per_trade + slippage
total_cost += cost
# 成本在当天扣除
daily_return = price_change - cost if signal != 0 else price_change
else:
daily_return = price_change if position != 0 else 0
returns.append(daily_return)
position = signal
cumulative = (1 + pd.Series(returns, index=dates[1:])).cumprod()
return cumulative.iloc[-1], total_cost
# 测试不同阈值
thresholds = [0.5, 1.0, 1.5, 2.0]
lookbacks = [20, 60, 120, 252]
results = []
for thr in thresholds:
for lb in lookbacks:
final_value, cost = backtest_with_costs(thr, lb)
results.append({
'threshold': thr,
'lookback': lb,
'final_value': final_value,
'total_cost': cost,
'net_return': final_value - 1
})
results_df = pd.DataFrame(results)
# 找出最优参数
best = results_df.loc[results_df['net_return'].idxmax()]
print("考虑交易成本的优化结果:")
print(f"最优阈值: {best['threshold']}")
print(f"最优回看周期: {best['lookback']}天")
print(f"净收益率: {best['net_return']:.2%}")
print(f"总交易成本: {best['total_cost']:.2%}")
# 可视化
plt.figure(figsize=(12, 6))
for lb in lookbacks:
subset = results_df[results_df['lookback'] == lb]
plt.plot(subset['threshold'], subset['net_return'],
marker='o', label=f'回看{lb}天')
plt.axvline(x=best['threshold'], color='r', linestyle='--',
label=f'最优阈值: {best["threshold"]}')
plt.title('不同参数下的净收益率')
plt.xlabel('阈值')
plt.ylabel('净收益率')
plt.legend()
plt.grid(True)
plt.show()
if __name__ == "__main__":
transaction_cost_aware_strategy()
误区三:忽视尾部风险
问题:均值回归在极端市场条件下可能失效,导致巨大损失。
解决方案:
- 止损机制:设置最大回撤止损
- 仓位管理:根据波动率动态调整仓位
- 压力测试:模拟极端市场情况
def risk_managed_strategy():
"""
风险管理的均值回归策略
"""
np.random.seed(42)
dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
# 模拟价格数据(包含极端波动)
returns = np.random.normal(0.001, 0.02, len(dates))
# 添加极端事件
extreme_events = np.random.choice(len(dates), size=5, replace=False)
returns[extreme_events] *= 5 # 5倍波动
price = 100 * (1 + returns).cumprod()
price = pd.Series(price, index=dates)
# 基础策略
lookback = 60
threshold = 1.5
rolling_mean = price.rolling(lookback).mean()
rolling_std = price.rolling(lookback).std()
z_score = (price - rolling_mean) / rolling_std
signals = pd.Series(0, index=dates)
signals[z_score > threshold] = -1
signals[z_score < -threshold] = 1
# 风险管理层
def apply_risk_management(signals, price):
"""
应用风险管理规则
"""
managed_signals = signals.copy()
# 1. 止损:最大回撤超过15%时清仓
cumulative = (1 * signals.shift(1) * price.pct_change()).cumsum()
drawdown = cumulative - cumulative.cummax()
stop_loss = drawdown < -0.15
managed_signals[stop_loss] = 0
# 2. 波动率调整:当波动率超过阈值时,减少仓位
volatility = price.pct_change().rolling(20).std()
high_vol = volatility > volatility.quantile(0.9)
managed_signals[high_vol & (managed_signals != 0)] *= 0.5
# 3. 时间衰减:持仓时间过长时逐步平仓
position_duration = 0
for i in range(len(managed_signals)):
if managed_signals.iloc[i] != 0:
if i > 0 and managed_signals.iloc[i] == managed_signals.iloc[i-1]:
position_duration += 1
else:
position_duration = 0
if position_duration > 60: # 持仓超过60天
managed_signals.iloc[i] *= 0.5
return managed_signals
# 应用风险管理
managed_signals = apply_risk_management(signals, price)
# 回测对比
def calculate_returns(signals, price):
returns = (signals.shift(1) * price.pct_change()).fillna(0)
cumulative = (1 + returns).cumprod()
return cumulative, returns
base_cum, base_ret = calculate_returns(signals, price)
managed_cum, managed_ret = calculate_returns(managed_signals, price)
# 可视化
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
# 价格和信号
axes[0].plot(price.index, price.values, label='价格', alpha=0.7)
axes[0].plot(price.index, price * (signals > 0), 'g^', markersize=3, label='基础做多')
axes[0].plot(price.index, price * (signals < 0), 'rv', markersize=3, label='基础做空')
axes[0].plot(price.index, price * (managed_signals > 0), 'g+', markersize=5, label='管理后做多')
axes[0].plot(price.index, price * (managed_signals < 0), 'rx', markersize=5, label='管理后做空')
axes[0].set_title('价格与信号对比')
axes[0].legend()
axes[0].grid(True)
# 累计收益
axes[1].plot(base_cum.index, base_cum.values, label='基础策略', linewidth=2)
axes[1].plot(managed_cum.index, managed_cum.values, label='风险管理策略', linewidth=2)
axes[1].set_title('累计收益对比')
axes[1].legend()
axes[1].grid(True)
# 回撤
base_dd = base_cum / base_cum.cummax() - 1
managed_dd = managed_cum / managed_cum.cummax() - 1
axes[2].fill_between(base_dd.index, base_dd, 0, alpha=0.3, label='基础策略回撤')
axes[2].fill_between(managed_dd.index, managed_dd, 0, alpha=0.3, label='管理后回撤')
axes[2].set_title('最大回撤对比')
axes[2].legend()
axes[2].grid(True)
plt.tight_layout()
plt.show()
# 统计对比
print("风险管理效果:")
print(f"基础策略 - 最大回撤: {base_dd.min():.2%}, 夏普比率: {base_ret.mean()/base_ret.std():.2f}")
print(f"管理策略 - 最大回撤: {managed_dd.min():.2%}, 夏普比率: {managed_ret.mean()/managed_ret.std():.2f}")
print(f"回撤改善: {abs(managed_dd.min()) / abs(base_dd.min()) - 1:.2%}")
if __name__ == "__main__":
risk_managed_strategy()
误区四:忽视市场周期
问题:均值回归在牛市和熊市中的表现差异很大。
解决方案:
- 市场状态识别:使用机器学习识别市场周期
- 自适应参数:根据市场状态调整阈值
- 多策略组合:结合趋势跟踪和均值回归
def market_regime_detection():
"""
市场状态识别与自适应策略
"""
np.random.seed(42)
dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
# 模拟不同市场状态的价格
returns = np.zeros(len(dates))
# 牛市阶段
returns[:400] = np.random.normal(0.001, 0.01, 400)
# 熊市阶段
returns[400:700] = np.random.normal(-0.001, 0.015, 300)
# 震荡市阶段
returns[700:] = np.random.normal(0, 0.008, len(returns)-700)
price = 100 * (1 + returns).cumprod()
price = pd.Series(price, index=dates)
# 市场状态识别指标
def identify_regime(prices):
"""
识别市场状态
"""
returns = prices.pct_change()
# 计算指标
momentum = returns.rolling(60).sum() # 60日动量
volatility = returns.rolling(60).std() # 60日波动率
drawdown = prices / prices.cummax() - 1 # 当前回撤
# 状态分类
regime = pd.Series('Neutral', index=prices.index)
# 牛市:动量高、波动率低、回撤小
牛市条件 = (momentum > 0.05) & (volatility < returns.std() * 1.2) & (drawdown > -0.1)
regime[牛市条件] = 'Bull'
# 熊市:动量低、波动率高、回撤大
熊市条件 = (momentum < -0.05) & (volatility > returns.std() * 1.5) & (drawdown < -0.15)
regime[熊市条件] = 'Bear'
# 震荡市:其他
regime[~牛市条件 & ~熊市条件] = 'Range'
return regime, momentum, volatility, drawdown
regime, momentum, volatility, drawdown = identify_regime(price)
# 自适应策略
def adaptive_strategy(prices, regime_series):
"""
根据市场状态调整策略参数
"""
signals = pd.Series(0, index=prices.index)
# 不同状态使用不同参数
params = {
'Bull': {'threshold': 2.0, 'lookback': 120}, # 牛市:更严格
'Bear': {'threshold': 1.0, 'lookback': 30}, # 熊市:更宽松
'Range': {'threshold': 1.5, 'lookback': 60} # 震荡:标准
}
for state in ['Bull', 'Bear', 'Range']:
mask = regime_series == state
if not mask.any():
continue
param = params[state]
lookback = param['lookback']
threshold = param['threshold']
rolling_mean = prices[mask].rolling(lookback).mean()
rolling_std = prices[mask].rolling(lookback).std()
z_score = (prices[mask] - rolling_mean) / rolling_std
state_signals = pd.Series(0, index=prices[mask].index)
state_signals[z_score > threshold] = -1
state_signals[z_score < -threshold] = 1
signals[mask] = state_signals
return signals
# 对比固定参数策略
def fixed_strategy(prices, threshold=1.5, lookback=60):
rolling_mean = prices.rolling(lookback).mean()
rolling_std = prices.rolling(lookback).std()
z_score = (prices - rolling_mean) / rolling_std
signals = pd.Series(0, index=prices.index)
signals[z_score > threshold] = -1
signals[z_score < -threshold] = 1
return signals
# 计算收益
fixed_sig = fixed_strategy(price)
adaptive_sig = adaptive_strategy(price, regime)
def calc_returns(sig, price):
ret = (sig.shift(1) * price.pct_change()).fillna(0)
cum = (1 + ret).cumprod()
return cum, ret
fixed_cum, fixed_ret = calc_returns(fixed_sig, price)
adaptive_cum, adaptive_ret = calc_returns(adaptive_sig, price)
# 可视化
fig, axes = plt.subplots(3, 1, figsize=(14, 12))
# 价格和状态
axes[0].plot(price.index, price.values, label='价格', linewidth=1)
# 标记状态区域
for state, color in zip(['Bull', 'Bear', 'Range'], ['green', 'red', 'blue']):
mask = regime == state
if mask.any():
axes[0].fill_between(price.index, price.min(), price.max(),
where=mask, alpha=0.2, color=color, label=state)
axes[0].set_title('价格与市场状态')
axes[0].legend()
axes[0].grid(True)
# 策略收益对比
axes[1].plot(fixed_cum.index, fixed_cum.values, label='固定参数', linewidth=2)
axes[1].plot(adaptive_cum.index, adaptive_cum.values, label='自适应参数', linewidth=2)
axes[1].set_title('策略收益对比')
axes[1].legend()
axes[1].grid(True)
# 状态分布
state_counts = regime.value_counts()
axes[2].bar(state_counts.index, state_counts.values,
color=['green', 'red', 'blue'], alpha=0.7)
axes[2].set_title('市场状态分布')
axes[2].set_ylabel('天数')
axes[2].grid(True)
plt.tight_layout()
plt.show()
# 统计对比
print("自适应策略效果:")
print(f"固定参数 - 总收益: {fixed_cum.iloc[-1] - 1:.2%}, 夏普: {fixed_ret.mean()/fixed_ret.std():.2f}")
print(f"自适应 - 总收益: {adaptive_cum.iloc[-1] - 1:.2%}, 夏普: {adaptive_ret.mean()/adaptive_ret.std():.2f}")
print(f"超额收益: {adaptive_cum.iloc[-1] / fixed_cum.iloc[-1] - 1:.2%}")
# 状态表现分析
print("\n各状态下策略表现:")
for state in ['Bull', 'Bear', 'Range']:
mask = regime == state
if mask.any():
fixed_state_ret = (fixed_sig.shift(1) * price.pct_change())[mask].mean()
adaptive_state_ret = (adaptive_sig.shift(1) * price.pct_change())[mask].mean()
print(f"{state} - 固定: {fixed_state_ret:.4f}, 自适应: {adaptive_state_ret:.4f}")
if __name__ == "__main__":
market_regime_detection()
高级应用:机器学习增强的均值回归
随机森林增强模型
使用机器学习预测均值回归的概率:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
def ml_enhanced_mean_reversion():
"""
机器学习增强的均值回归策略
"""
np.random.seed(42)
dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
# 创建特征数据
price = 100 + np.cumsum(np.random.normal(0, 1, len(dates)))
price = pd.Series(price, index=dates)
# 生成特征
features = pd.DataFrame(index=dates)
# 基础技术指标
features['price'] = price
features['returns'] = price.pct_change()
features['ma_20'] = price.rolling(20).mean()
features['ma_60'] = price.rolling(60).mean()
features['std_20'] = price.rolling(20).std()
features['rsi'] = calculate_rsi(price) # RSI指标
# 滞后特征
for lag in [1, 5, 10, 20]:
features[f'return_lag_{lag}'] = price.pct_change(lag)
features[f'zscore_lag_{lag}'] = (price - price.rolling(60).mean()) / price.rolling(60).std().shift(lag)
# 目标变量:未来5天的收益率是否回归
future_returns = price.pct_change(5).shift(-5)
features['target'] = (future_returns < -0.02).astype(int) # 未来下跌概率
# 清理NaN值
features = features.dropna()
# 分割数据
X = features.drop('target', axis=1)
y = features['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
# 训练模型
model = RandomForestClassifier(n_estimators=100, random_state=42, max_depth=5)
model.fit(X_train, y_train)
# 预测
predictions = model.predict(X_test)
probabilities = model.predict_proba(X_test)[:, 1]
# 生成交易信号
signals = pd.Series(0, index=X_test.index)
signals[probabilities > 0.6] = 1 # 高概率下跌,做空
signals[probabilities < 0.4] = -1 # 低概率下跌,做多
# 回测
returns = (signals.shift(1) * price.pct_change()[X_test.index]).fillna(0)
cumulative = (1 + returns).cumprod()
# 对比传统Z-score策略
z_score = (price - price.rolling(60).mean()) / price.rolling(60).std()
z_signals = pd.Series(0, index=X_test.index)
z_signals[z_score > 1.5] = -1
z_signals[z_score < -1.5] = 1
z_returns = (z_signals.shift(1) * price.pct_change()[X_test.index]).fillna(0)
z_cumulative = (1 + z_returns).cumprod()
# 可视化
fig, axes = plt.subplots(2, 1, figsize=(14, 10))
# 收益对比
axes[0].plot(cumulative.index, cumulative.values, label='ML增强策略', linewidth=2)
axes[0].plot(z_cumulative.index, z_cumulative.values, label='传统Z-score', linewidth=2, linestyle='--')
axes[0].set_title('策略收益对比')
axes[0].legend()
axes[0].grid(True)
# 特征重要性
importances = model.feature_importances_
indices = np.argsort(importances)[-10:]
axes[1].barh(range(10), importances[indices])
axes[1].set_yticks(range(10))
axes[1].set_yticklabels([X.columns[i] for i in indices])
axes[1].set_title('特征重要性 (Top 10)')
axes[1].grid(True)
plt.tight_layout()
plt.show()
# 性能评估
print("模型性能:")
print(classification_report(y_test, predictions))
print("\n策略表现:")
print(f"ML策略 - 总收益: {cumulative.iloc[-1] - 1:.2%}")
print(f"Z-score - 总收益: {z_cumulative.iloc[-1] - 1:.2%}")
print(f"超额收益: {cumulative.iloc[-1] / z_cumulative.iloc[-1] - 1:.2%}")
def calculate_rsi(prices, period=14):
"""计算RSI指标"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
if __name__ == "__main__":
ml_enhanced_mean_reversion()
实战案例:完整投资组合管理
案例背景
假设我们管理一个1000万美元的基金,需要在股票、债券、商品和另类投资之间进行配置。
实施框架
class PortfolioManager:
def __init__(self, initial_capital=10_000_000):
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.positions = {}
self.history = []
def run_portfolio(self, data_dict, start_date='2020-01-01', end_date='2023-12-31'):
"""
运行完整投资组合管理
"""
dates = pd.date_range(start_date, end_date, freq='M')
for date in dates:
# 1. 获取当前数据
current_data = {asset: data.loc[:date].iloc[-1] for asset, data in data_dict.items()}
# 2. 计算估值
valuations = self.calculate_valuations(current_data)
# 3. 生成配置方案
weights = self.optimize_weights(valuations)
# 4. 执行调仓
self.rebalance(weights, current_data, date)
# 5. 记录历史
self.history.append({
'date': date,
'capital': self.current_capital,
'positions': self.positions.copy(),
'weights': weights
})
return pd.DataFrame(self.history)
def calculate_valuations(self, current_data):
"""计算估值指标"""
valuations = {}
for asset, data in current_data.items():
if asset == 'Stocks':
# 使用PE作为估值
pe = data['PE']
rolling_pe = data['PE_Rolling']
z_score = (pe - rolling_pe.mean()) / rolling_pe.std()
valuations[asset] = z_score
elif asset == 'Bonds':
# 使用收益率
yield_ = data['Yield']
rolling_yield = data['Yield_Rolling']
z_score = (yield_ - rolling_yield.mean()) / rolling_yield.std()
valuations[asset] = -z_score # 收益率越低越好
elif asset == 'Commodities':
# 使用库存/需求比
ratio = data['Inventory_Demand_Ratio']
rolling_ratio = data['Ratio_Rolling']
z_score = (ratio - rolling_ratio.mean()) / rolling_ratio.std()
valuations[asset] = -z_score
elif asset == 'Real_Estate':
# 使用租金收益率
yield_ = data['Rent_Yield']
rolling_yield = data['Yield_Rolling']
z_score = (yield_ - rolling_yield.mean()) / rolling_yield.std()
valuations[asset] = z_score
return valuations
def optimize_weights(self, valuations):
"""优化权重"""
# 反向配置
raw_weights = {asset: -z for asset, z in valuations.items()}
# 风险平价调整
total = sum(abs(w) for w in raw_weights.values())
weights = {asset: abs(w) / total for asset, w in raw_weights.items()}
# 最小权重限制
for asset in weights:
weights[asset] = max(weights[asset], 0.05) # 至少5%
# 归一化
total = sum(weights.values())
weights = {asset: w / total for asset, w in weights.items()}
return weights
def rebalance(self, weights, current_data, date):
"""执行再平衡"""
target_value = self.current_capital
# 计算目标持仓
target_positions = {}
for asset, weight in weights.items():
target_positions[asset] = target_value * weight
# 计算调整量
adjustments = {}
for asset in target_positions:
current = self.positions.get(asset, 0)
target = target_positions[asset]
adjustments[asset] = target - current
# 执行交易(简化,忽略交易成本)
for asset, adj in adjustments.items():
if abs(adj) > target_value * 0.01: # 调整幅度超过1%才交易
self.positions[asset] = target_positions[asset]
# 记录交易
self.history.append({
'date': date,
'action': 'REBALANCE',
'adjustments': adjustments
})
def get_performance(self):
"""计算绩效指标"""
if not self.history:
return None
df = pd.DataFrame(self.history)
df = df[df['capital'].notna()] # 只保留有资本记录的行
returns = df['capital'].pct_change().fillna(0)
metrics = {
'总收益率': (df['capital'].iloc[-1] / self.initial_capital - 1),
'年化收益率': (df['capital'].iloc[-1] / self.initial_capital) ** (12/len(df)) - 1,
'波动率': returns.std() * np.sqrt(12),
'夏普比率': returns.mean() / returns.std() * np.sqrt(12) if returns.std() != 0 else 0,
'最大回撤': (df['capital'] / df['capital'].cummax() - 1).min(),
'最终资本': df['capital'].iloc[-1]
}
return metrics
# 模拟数据生成
def generate_portfolio_data():
"""生成投资组合数据"""
np.random.seed(42)
dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
data_dict = {}
# 股票数据
stock_data = pd.DataFrame({
'PE': 20 + np.random.normal(0, 5, len(dates)),
'PE_Rolling': pd.Series(20 + np.random.normal(0, 2, len(dates))).rolling(12).mean()
}, index=dates)
data_dict['Stocks'] = stock_data
# 债券数据
bond_data = pd.DataFrame({
'Yield': 3.0 + np.random.normal(0, 0.5, len(dates)),
'Yield_Rolling': pd.Series(3.0 + np.random.normal(0, 0.2, len(dates))).rolling(12).mean()
}, index=dates)
data_dict['Bonds'] = bond_data
# 商品数据
commodity_data = pd.DataFrame({
'Inventory_Demand_Ratio': 1.0 + np.random.normal(0, 0.3, len(dates)),
'Ratio_Rolling': pd.Series(1.0 + np.random.normal(0, 0.1, len(dates))).rolling(12).mean()
}, index=dates)
data_dict['Commodities'] = commodity_data
# 房地产数据
real_estate_data = pd.DataFrame({
'Rent_Yield': 4.5 + np.random.normal(0, 0.5, len(dates)),
'Yield_Rolling': pd.Series(4.5 + np.random.normal(0, 0.2, len(dates))).rolling(12).mean()
}, index=dates)
data_dict['Real_Estate'] = real_estate_data
return data_dict
# 运行完整案例
def run_complete_case():
"""运行完整案例"""
print("=" * 60)
print("完整投资组合管理案例")
print("=" * 60)
# 生成数据
data_dict = generate_portfolio_data()
# 初始化管理器
manager = PortfolioManager(initial_capital=10_000_000)
# 运行策略
history = manager.run_portfolio(data_dict)
# 获取绩效
metrics = manager.get_performance()
print("\n绩效指标:")
for key, value in metrics.items():
if isinstance(value, float):
print(f"{key}: {value:.2%}" if '收益率' in key or '回撤' in key else f"{key}: {value:,.2f}")
else:
print(f"{key}: {value}")
# 可视化
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# 资本曲线
capital_data = history[history['capital'].notna()]
axes[0, 0].plot(capital_data['date'], capital_data['capital'] / 1e6, linewidth=2)
axes[0, 0].set_title('资本曲线 (百万美元)')
axes[0, 0].grid(True)
# 权重变化
weights_df = pd.DataFrame([h['weights'] for h in history if 'weights' in h])
if not weights_df.empty:
for col in weights_df.columns:
axes[0, 1].plot(weights_df.index, weights_df[col] * 100, label=col)
axes[0, 1].set_title('资产权重变化 (%)')
axes[0, 1].legend()
axes[0, 1].grid(True)
# 持仓价值
positions_df = pd.DataFrame([h['positions'] for h in history if 'positions' in h])
if not positions_df.empty:
for col in positions_df.columns:
axes[1, 0].plot(positions_df.index, positions_df[col] / 1e6, label=col)
axes[1, 0].set_title('持仓价值 (百万美元)')
axes[1, 0].legend()
axes[1, 0].grid(True)
# 月度收益分布
returns = capital_data['capital'].pct_change().dropna()
axes[1, 1].hist(returns, bins=20, alpha=0.7, edgecolor='black')
axes[1, 1].set_title('月度收益分布')
axes[1, 1].axvline(returns.mean(), color='red', linestyle='--', label=f'均值: {returns.mean():.2%}')
axes[1, 1].legend()
axes[1, 1].grid(True)
plt.tight_layout()
plt.show()
return manager, history, metrics
if __name__ == "__main__":
manager, history, metrics = run_complete_case()
总结与最佳实践
核心要点回顾
- 理论基础:均值回归是资产价格的统计规律,但需要结合基本面分析
- 模型构建:从简单的Z-score到复杂的多因子模型,关键是找到有效的估值指标
- 风险管理:止损、仓位管理、压力测试缺一不可
- 参数优化:避免过拟合,使用滚动窗口优化
- 市场适应:识别市场状态,动态调整策略
最佳实践清单
✅ 应该做的:
- 使用滚动均值而非固定均值
- 考虑交易成本和滑点
- 设置严格的止损机制
- 定期进行压力测试
- 结合基本面分析
- 保持足够的分散化
❌ 不应该做的:
- 盲目依赖历史均值
- 频繁交易
- 忽视尾部风险
- 过度拟合参数
- 孤立使用单一指标
- 忽视市场周期
持续改进框架
def continuous_improvement_framework():
"""
持续改进框架
"""
framework = {
'监控指标': [
'策略夏普比率',
'最大回撤',
'交易频率',
'胜率',
'盈亏比'
],
'定期评估': [
'每周:检查交易执行情况',
'每月:评估参数稳定性',
'每季:进行压力测试',
'每年:全面策略回顾'
],
'优化方向': [
'降低交易成本',
'提高信号质量',
'增强风险控制',
'扩展资产范围',
'引入新数据源'
]
}
return framework
if __name__ == "__main__":
framework = continuous_improvement_framework()
print("持续改进框架:")
for key, items in framework.items():
print(f"\n{key}:")
for item in items:
print(f" - {item}")
均值回归理论为资产配置提供了强大的理论基础和实用的工具。通过系统性的模型构建、严格的风险管理和持续的优化改进,投资者可以在市场波动中有效捕捉价值回归机会,同时规避常见误区。关键在于保持理性、尊重统计规律,并始终将风险管理放在首位。
