引言:均值回归理论在资产配置中的核心地位

均值回归理论(Mean Reversion Theory)是金融学中最古老且最具影响力的理论之一,它认为资产价格在长期内会趋向于其历史平均值或内在价值。这一理论最早由英国统计学家弗朗西斯·高尔顿在19世纪末提出,后来被广泛应用于股票、债券、外汇和商品等各类资产的投资实践中。

在现代资产配置中,均值回归理论为投资者提供了一个系统性的框架,帮助我们在市场波动中识别被低估或高估的资产,从而做出更理性的投资决策。与传统的”买入持有”策略相比,基于均值回归的资产配置策略更加灵活,能够根据市场估值水平动态调整投资组合,从而在控制风险的同时提高收益。

本文将深入探讨均值回归理论在资产配置中的实战应用,包括理论基础、量化模型构建、实战策略设计、常见误区规避等关键内容,并通过详细的代码示例和实际案例,帮助读者掌握这一强大的投资工具。

均值回归理论的数学基础与统计验证

理论核心概念

均值回归理论建立在以下核心假设之上:

  1. 资产具有内在价值:每种资产都有一个相对稳定的内在价值或均衡价格
  2. 价格偏离是暂时的:市场价格会围绕内在价值波动,但长期会回归
  3. 统计规律性:价格偏离均值的程度和持续时间具有一定的统计规律

数学模型

最简单的均值回归模型可以用以下公式表示:

P(t) = μ + θ * (P(t-1) - μ) + ε(t)

其中:

  • P(t) 是时间 t 的资产价格
  • μ 是长期均值(内在价值)
  • θ 是回归系数(-1 < θ < 0)
  • ε(t) 是白噪声

当 θ 为负值时,模型表现出均值回归特性:如果上一期价格高于均值,本期价格倾向于下降;反之亦然。

统计检验方法

在实际应用中,我们需要通过统计方法验证资产价格是否具有均值回归特性。常用的检验方法包括:

  1. 单位根检验(ADF检验):检验时间序列是否平稳
  2. 方差比检验:检验价格偏离均值的程度
  3. 赫斯特指数:衡量时间序列的长期记忆性

以下是一个使用Python进行ADF检验的完整示例:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller
import yfinance as yf

def test_mean_reversion(stock_symbol, start_date, end_date):
    """
    测试股票价格的均值回归特性
    """
    # 获取股票数据
    stock_data = yf.download(stock_symbol, start=start_date, end=end_date)
    prices = stock_data['Adj Close']
    
    # 计算对数收益率
    log_prices = np.log(prices)
    
    # 进行ADF检验
    adf_result = adfuller(log_prices)
    
    print(f"ADF检验结果 ({stock_symbol}):")
    print(f"ADF统计量: {adf_result[0]:.4f}")
    print(f"P值: {adf_result[1]:.4f}")
    print(f"临界值:")
    for key, value in adf_result[4].items():
        print(f"  {key}: {value:.4f}")
    
    # 判断是否平稳
    if adf_result[1] < 0.05:
        print(f"结论: {stock_symbol} 的价格序列是平稳的,具有均值回归特性")
    else:
        print(f"结论: {stock_symbol} 的价格序列是非平稳的,均值回归特性不明显")
    
    # 可视化
    plt.figure(figsize=(12, 6))
    plt.plot(prices.index, prices.values, label=f'{stock_symbol} 价格')
    plt.plot(prices.index, [prices.mean()] * len(prices), 
             'r--', label=f'均值: {prices.mean():.2f}')
    plt.title(f'{stock_symbol} 价格与均值')
    plt.xlabel('日期')
    plt.ylabel('价格')
    plt.legend()
    plt.grid(True)
    plt.show()
    
    return adf_result

# 测试示例
if __name__ == "__main__":
    # 测试苹果公司股票
    result = test_mean_reversion('AAPL', '2020-01-01', '2023-12-31')

实际案例分析

让我们以标普500指数为例,验证其均值回归特性。通过计算2000-2023年的市盈率(P/E)数据,我们可以观察到明显的均值回归现象:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats

# 模拟标普500历史市盈率数据
np.random.seed(42)
years = np.arange(2000, 2024)
pe_ratio = 25 + 5 * np.sin(years/3) + np.random.normal(0, 2, len(years))
pe_ratio = np.clip(pe_ratio, 10, 35)  # 限制在合理范围内

# 计算均值和标准差
mean_pe = np.mean(pe_ratio)
std_pe = np.std(pe_ratio)

# 计算Z-score
z_scores = (pe_ratio - mean_pe) / std_pe

# 可视化
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# 市盈率走势
ax1.plot(years, pe_ratio, 'b-o', linewidth=2, markersize=4)
ax1.axhline(y=mean_pe, color='r', linestyle='--', label=f'均值: {mean_pe:.2f}')
ax1.fill_between(years, mean_pe - std_pe, mean_pe + std_pe, 
                  alpha=0.2, color='gray', label='±1标准差')
ax1.set_title('标普500历史市盈率 (2000-2023)')
ax1.set_ylabel('市盈率')
ax1.legend()
ax1.grid(True)

# Z-score分布
ax2.plot(years, z_scores, 'g-s', linewidth=2, markersize=4)
ax2.axhline(y=0, color='r', linestyle='--')
ax2.axhline(y=1, color='orange', linestyle='--', label='+1σ (高估)')
ax2.axhline(y=-1, color='orange', linestyle='--', label='-1σ (低估)')
ax2.fill_between(years, -1, 1, alpha=0.1, color='blue')
ax2.set_title('Z-score标准化偏离度')
ax2.set_ylabel('Z-score')
ax2.set_xlabel('年份')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

# 统计分析
print(f"统计摘要:")
print(f"均值: {mean_pe:.2f}")
print(f"标准差: {std_pe:.2f}")
print(f"偏离均值>1σ的概率: {np.mean(np.abs(z_scores) > 1):.2%}")
print(f"偏离均值>2σ的概率: {np.mean(np.abs(z_scores) > 2):.2%}")

这个案例清晰地展示了均值回归的特征:市盈率在长期均值附近波动,极端值出现的概率较低,且偏离程度越大,回归趋势越明显。

基于均值回归的资产配置模型构建

模型架构设计

一个完整的均值回归资产配置模型应包含以下核心组件:

  1. 估值指标计算模块:计算各类资产的相对估值水平
  2. 均值回归信号生成模块:根据估值偏离度生成买卖信号
  3. 风险控制模块:设置止损和仓位管理规则
  4. 组合优化模块:动态调整资产权重

估值指标体系

不同类型的资产需要不同的估值指标:

  • 股票:市盈率(P/E)、市净率(P/B)、市销率(P/S)、股息率
  • 债券:到期收益率、信用利差、期限利差
  • 商品:现货与期货价差、库存水平、供需比
  • 房地产:租金收益率、房价收入比、空置率

信号生成算法

基于Z-score的信号生成是最常用的方法:

class MeanReversionStrategy:
    def __init__(self, lookback_period=252, z_threshold=1.5):
        """
        初始化均值回归策略
        
        参数:
        lookback_period: 回看周期(交易日)
        z_threshold: Z-score阈值,超过此值触发交易信号
        """
        self.lookback_period = lookback_period
        self.z_threshold = z_threshold
        self.position = 0  # 当前仓位:-1做空, 0中性, 1做多
        
    def calculate_signals(self, data):
        """
        计算交易信号
        
        参数:
        data: 包含价格序列的DataFrame
        
        返回:
        signals: 交易信号 DataFrame
        """
        # 计算滚动均值和标准差
        rolling_mean = data['price'].rolling(window=self.lookback_period).mean()
        rolling_std = data['price'].rolling(window=self.lookback_period).std()
        
        # 计算Z-score
        data['z_score'] = (data['price'] - rolling_mean) / rolling_std
        
        # 生成信号
        data['signal'] = 0
        data.loc[data['z_score'] > self.z_threshold, 'signal'] = -1  # 高估,做空
        data.loc[data['z_score'] < -self.z_threshold, 'signal'] = 1   # 低估,做多
        
        # 平仓信号
        data.loc[abs(data['z_score']) < 0.5, 'signal'] = 0
        
        return data
    
    def backtest(self, data, initial_capital=100000):
        """
        回测策略
        
        参数:
        data: 包含价格和信号的数据
        initial_capital: 初始资金
        
        返回:
        results: 回测结果
        """
        capital = initial_capital
        position = 0
        trades = []
        
        for i in range(1, len(data)):
            current_signal = data['signal'].iloc[i]
            price = data['price'].iloc[i]
            prev_price = data['price'].iloc[i-1]
            
            # 交易逻辑
            if current_signal != position:
                if current_signal == 1 and position == 0:  # 开多仓
                    shares = capital / price
                    position = 1
                    trades.append({
                        'date': data.index[i],
                        'action': 'BUY',
                        'price': price,
                        'shares': shares
                    })
                elif current_signal == -1 and position == 0:  # 开空仓
                    shares = capital / price
                    position = -1
                    trades.append({
                        'date': data.index[i],
                        'action': 'SELL_SHORT',
                        'price': price,
                        'shares': shares
                    })
                elif current_signal == 0 and position != 0:  # 平仓
                    if position == 1:
                        action = 'SELL'
                        profit = shares * (price - prev_price)
                    else:
                        action = 'COVER'
                        profit = shares * (prev_price - price)
                    
                    capital += profit
                    trades.append({
                        'date': data.index[i],
                        'action': action,
                        'price': price,
                        'profit': profit
                    })
                    position = 0
                    shares = 0
        
        # 计算最终收益
        final_value = capital
        total_return = (final_value - initial_capital) / initial_capital
        
        return {
            'initial_capital': initial_capital,
            'final_value': final_value,
            'total_return': total_return,
            'trades': pd.DataFrame(trades)
        }

# 完整示例:股票均值回归策略回测
import yfinance as yf
import pandas as pd
import numpy as np

# 获取数据
def run_strategy():
    # 下载标普500 ETF数据
    data = yf.download('SPY', start='2010-01-01', end='2023-12-31')
    data = data[['Adj Close']].rename(columns={'Adj Close': 'price'})
    
    # 初始化策略
    strategy = MeanReversionStrategy(lookback_period=252, z_threshold=1.5)
    
    # 计算信号
    data_with_signals = strategy.calculate_signals(data)
    
    # 回测
    results = strategy.backtest(data_with_signals)
    
    print(f"回测结果:")
    print(f"初始资金: ${results['initial_capital']:,.2f}")
    print(f"最终价值: ${results['final_value']:,.2f}")
    print(f"总收益率: {results['total_return']:.2%}")
    print(f"交易次数: {len(results['trades'])}")
    
    # 可视化
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))
    
    # 价格和信号
    ax1.plot(data_with_signals.index, data_with_signals['price'], label='SPY价格')
    ax1.plot(data_with_signals.index, data_with_signals['price'] * (data_with_signals['signal'] > 0), 
             'g^', markersize=4, label='做多信号')
    ax1.plot(data_with_signals.index, data_with_signals['price'] * (data_with_signals['signal'] < 0), 
             'rv', markersize=4, label='做空信号')
    ax1.set_title('SPY价格与交易信号')
    ax1.legend()
    ax1.grid(True)
    
    # Z-score
    ax2.plot(data_with_signals.index, data_with_signals['z_score'], label='Z-score')
    ax2.axhline(y=1.5, color='r', linestyle='--', label='高估阈值')
    ax2.axhline(y=-1.5, color='g', linestyle='--', label='低估阈值')
    ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3)
    ax2.set_title('Z-score偏离度')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()
    
    return results

# 运行策略
if __name__ == "__main__":
    results = run_strategy()

多资产均值回归配置模型

对于多资产配置,我们需要构建更复杂的模型:

class MultiAssetMeanReversion:
    def __init__(self, assets, lookback=252):
        self.assets = assets
        self.lookback = lookback
        self.weights = {}
        
    def calculate_relative_valuation(self, data_dict):
        """
        计算相对估值指标
        """
        valuations = {}
        for asset, data in data_dict.items():
            # 计算滚动均值和标准差
            rolling_mean = data.rolling(window=self.lookback).mean()
            rolling_std = data.rolling(window=self.lookback).std()
            
            # 计算Z-score
            z_score = (data - rolling_mean) / rolling_std
            valuations[asset] = z_score.iloc[-1]  # 取最新值
        
        return valuations
    
    def optimize_weights(self, valuations):
        """
        根据估值水平优化权重
        """
        # 反向权重:估值越低,权重越高
        raw_weights = {asset: -z_score for asset, z_score in valuations.items()}
        
        # 标准化为正权重
        total = sum(abs(w) for w in raw_weights.values())
        weights = {asset: abs(w) / total for asset, w in raw_weights.items()}
        
        return weights
    
    def generate_portfolio(self, data_dict):
        """
        生成投资组合
        """
        valuations = self.calculate_relative_valuation(data_dict)
        weights = self.optimize_weights(valuations)
        
        print("当前估值水平 (Z-score):")
        for asset, z in valuations.items():
            status = "低估" if z < -1 else "高估" if z > 1 else "合理"
            print(f"  {asset}: {z:.2f} ({status})")
        
        print("\n建议配置权重:")
        for asset, weight in weights.items():
            print(f"  {asset}: {weight:.1%}")
        
        return weights, valuations

# 多资产配置示例
def multi_asset_example():
    # 模拟数据
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
    
    # 创建不同资产的估值指标
    assets_data = {
        '股票': pd.Series(np.random.normal(0, 1, len(dates)) + 0.5, index=dates),
        '债券': pd.Series(np.random.normal(0, 1, len(dates)) - 0.3, index=dates),
        '商品': pd.Series(np.random.normal(0, 1, len(dates)) + 0.8, index=dates),
        '房地产': pd.Series(np.random.normal(0, 1, len(dates)) - 0.1, index=dates)
    }
    
    # 转换为DataFrame
    df = pd.DataFrame(assets_data)
    
    # 初始化模型
    model = MultiAssetMeanReversion(list(assets_data.keys()))
    
    # 生成配置
    weights, valuations = model.generate_portfolio(df)
    
    return weights, valuations

if __name__ == "__main__":
    multi_asset_example()

实战策略:捕捉价值回归机会

策略一:基于市盈率的股票轮动策略

核心思想:在不同估值水平的股票之间轮动,买入低估值股票,卖出高估值股票。

实施步骤

  1. 计算股票组合的滚动市盈率
  2. 标准化为Z-score
  3. 设定买入/卖出阈值
  4. 定期再平衡
def pe_rotation_strategy():
    """
    市盈率轮动策略
    """
    # 模拟三只股票的市盈率数据
    np.random.seed(42)
    dates = pd.date_range('2015-01-01', '2023-12-31', freq='M')
    
    # 创建不同股票的PE序列(具有均值回归特性)
    def create_pe_series(mean, volatility, trend=0):
        pe = mean + trend * np.arange(len(dates)) / 100
        pe += np.random.normal(0, volatility, len(dates))
        pe = np.clip(pe, 5, 40)  # 限制在合理范围
        return pd.Series(pe, index=dates)
    
    stocks = {
        'Stock_A': create_pe_series(20, 5, 0.1),  # 稳定增长
        'Stock_B': create_pe_series(15, 8, -0.05), # 价值型
        'Stock_C': create_pe_series(25, 10, 0.2)   # 成长型
    }
    
    pe_df = pd.DataFrame(stocks)
    
    # 计算Z-score
    z_scores = pe_df.apply(lambda x: (x - x.rolling(24).mean()) / x.rolling(24).std())
    
    # 生成信号:买入Z-score < -1的股票,卖出Z-score > 1的股票
    signals = pd.DataFrame(index=z_scores.index, columns=z_scores.columns)
    signals[z_scores < -1] = 1   # 买入
    signals[z_scores > 1] = -1   # 卖出
    signals.fillna(0, inplace=True)
    
    # 计算策略收益
    returns = pe_df.pct_change().fillna(0)
    strategy_returns = (signals.shift(1) * returns).sum(axis=1) / 3  # 等权重
    
    # 累计收益
    cumulative_returns = (1 + strategy_returns).cumprod()
    
    # 可视化
    fig, axes = plt.subplots(3, 1, figsize=(14, 12))
    
    # PE走势
    for stock in pe_df.columns:
        axes[0].plot(pe_df.index, pe_df[stock], label=stock)
    axes[0].set_title('股票市盈率走势')
    axes[0].legend()
    axes[0].grid(True)
    
    # Z-score
    for stock in z_scores.columns:
        axes[1].plot(z_scores.index, z_scores[stock], label=stock)
    axes[1].axhline(y=1, color='r', linestyle='--', alpha=0.5)
    axes[1].axhline(y=-1, color='g', linestyle='--', alpha=0.5)
    axes[1].set_title('Z-score偏离度')
    axes[1].legend()
    axes[1].grid(True)
    
    # 累计收益
    axes[2].plot(cumulative_returns.index, cumulative_returns.values, 
                 linewidth=2, label='策略收益')
    axes[2].axhline(y=1, color='black', linestyle='-', alpha=0.3)
    axes[2].set_title('策略累计收益')
    axes[2].legend()
    axes[2].grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # 统计摘要
    print("策略表现统计:")
    print(f"总收益率: {cumulative_returns.iloc[-1] - 1:.2%}")
    print(f"年化收益率: {(cumulative_returns.iloc[-1] ** (12/len(dates)) - 1):.2%}")
    print(f"最大回撤: {(cumulative_returns / cumulative_returns.cummax() - 1).min():.2%}")
    print(f"夏普比率: {strategy_returns.mean() / strategy_returns.std() * np.sqrt(12):.2f}")

if __name__ == "__main__":
    pe_rotation_strategy()

策略二:债券久期轮换策略

核心思想:根据利率预期和债券估值,在不同久期的债券之间轮换。

def bond_duration_strategy():
    """
    债券久期轮换策略
    """
    # 模拟不同久期债券的收益率数据
    np.random.seed(42)
    dates = pd.date_range('2018-01-01', '2023-12-31', freq='M')
    
    # 创建收益率曲线数据
    yields = {
        '1Y': 2.0 + 0.1 * np.arange(len(dates)) + np.random.normal(0, 0.2, len(dates)),
        '5Y': 2.5 + 0.15 * np.arange(len(dates)) + np.random.normal(0, 0.15, len(dates)),
        '10Y': 2.8 + 0.12 * np.arange(len(dates)) + np.random.normal(0, 0.1, len(dates)),
        '30Y': 3.0 + 0.08 * np.arange(len(dates)) + np.random.normal(0, 0.08, len(dates))
    }
    
    yield_df = pd.DataFrame(yields, index=dates)
    
    # 计算期限利差
    yield_df['Spread_10Y-1Y'] = yield_df['10Y'] - yield_df['1Y']
    yield_df['Spread_30Y-5Y'] = yield_df['30Y'] - yield_df['5Y']
    
    # 计算滚动均值和Z-score
    lookback = 24
    for spread in ['Spread_10Y-1Y', 'Spread_30Y-5Y']:
        yield_df[f'{spread}_Z'] = (yield_df[spread] - 
                                   yield_df[spread].rolling(lookback).mean()) / \
                                  yield_df[spread].rolling(lookback).std()
    
    # 信号生成:当期限利差扩大时,增加长久期债券
    signals = pd.DataFrame(index=dates, columns=['Short', 'Medium', 'Long'])
    signals['Short'] = 0
    signals['Medium'] = 0
    signals['Long'] = 0
    
    # 规则:Z-score > 1,增加长久期;Z-score < -1,增加短久期
    long_signal = (yield_df['Spread_10Y-1Y_Z'] > 1) | (yield_df['Spread_30Y-5Y_Z'] > 1)
    short_signal = (yield_df['Spread_10Y-1Y_Z'] < -1) | (yield_df['Spread_30Y-5Y_Z'] < -1)
    
    signals.loc[long_signal, 'Long'] = 0.6
    signals.loc[long_signal, 'Medium'] = 0.3
    signals.loc[long_signal, 'Short'] = 0.1
    
    signals.loc[short_signal, 'Long'] = 0.1
    signals.loc[short_signal, 'Medium'] = 0.3
    signals.loc[short_signal, 'Short'] = 0.6
    
    # 中性状态
    neutral = ~long_signal & ~short_signal
    signals.loc[neutral, 'Long'] = 0.25
    signals.loc[neutral, 'Medium'] = 0.5
    signals.loc[neutral, 'Short'] = 0.25
    
    # 计算收益
    returns = yield_df[['1Y', '5Y', '10Y']].pct_change().fillna(0)
    strategy_returns = (signals.shift(1) * returns).sum(axis=1)
    
    # 累计收益
    cumulative_returns = (1 + strategy_returns).cumprod()
    
    # 可视化
    fig, axes = plt.subplots(2, 1, figsize=(14, 10))
    
    # 收益率曲线
    for col in ['1Y', '5Y', '10Y']:
        axes[0].plot(yield_df.index, yield_df[col], label=f'{col}收益率')
    axes[0].set_title('不同期限债券收益率')
    axes[0].legend()
    axes[0].grid(True)
    
    # 策略收益
    axes[1].plot(cumulative_returns.index, cumulative_returns.values, 
                 linewidth=2, label='久期轮换策略')
    axes[1].axhline(y=1, color='black', linestyle='-', alpha=0.3)
    axes[1].set_title('策略累计收益')
    axes[1].legend()
    axes[1].grid(True)
    
    plt.tight_layout()
    plt.show()
    
    print("债券久期轮换策略表现:")
    print(f"总收益率: {cumulative_returns.iloc[-1] - 1:.2%}")
    print(f"年化收益率: {(cumulative_returns.iloc[-1] ** (12/len(dates)) - 1):.2%}")
    print(f"夏普比率: {strategy_returns.mean() / strategy_returns.std() * np.sqrt(12):.2f}")

if __name__ == "__main__":
    bond_duration_strategy()

策略三:跨资产均值回归配置

核心思想:在股票、债券、商品、现金四大类资产间进行动态配置,根据各类资产的估值水平调整权重。

def cross_asset_allocation():
    """
    跨资产均值回归配置
    """
    # 模拟各类资产的估值指标(Z-score)
    np.random.seed(42)
    dates = pd.date_range('2015-01-01', '2023-12-31', freq='M')
    
    # 创建估值序列(具有均值回归特性)
    assets = {
        'Stocks': np.random.normal(0, 1, len(dates)) + 0.2,
        'Bonds': np.random.normal(0, 1, len(dates)) - 0.1,
        'Commodities': np.random.normal(0, 1, len(dates)) + 0.3,
        'Cash': np.zeros(len(dates))  # 现金始终为0
    }
    
    # 添加均值回归特性
    for asset in assets:
        if asset != 'Cash':
            assets[asset] = 0.9 * np.roll(assets[asset], 1) + 0.1 * assets[asset]
            assets[asset][0] = 0
    
    valuation_df = pd.DataFrame(assets, index=dates)
    
    # 计算配置权重
    def calculate_weights(valuation_series):
        """
        根据估值计算权重(反向配置)
        """
        # 反向配置:估值越低,权重越高
        raw_weights = -valuation_series
        
        # 平移使最小值为正
        raw_weights = raw_weights - raw_weights.min() + 0.1
        
        # 归一化
        weights = raw_weights / raw_weights.sum()
        
        return weights
    
    # 滚动计算权重
    weights_df = pd.DataFrame(index=dates, columns=['Stocks', 'Bonds', 'Commodities', 'Cash'])
    lookback = 12
    
    for i in range(lookback, len(dates)):
        recent_vals = valuation_df.iloc[i-lookback:i]
        avg_valuation = recent_vals.mean()
        weights = calculate_weights(avg_valuation)
        weights_df.iloc[i] = weights
    
    # 填充前期
    weights_df.iloc[:lookback] = weights_df.iloc[lookback]
    
    # 模拟各类资产收益
    returns_df = pd.DataFrame(index=dates, columns=['Stocks', 'Bonds', 'Commodities', 'Cash'])
    returns_df['Stocks'] = np.random.normal(0.008, 0.04, len(dates))
    returns_df['Bonds'] = np.random.normal(0.003, 0.015, len(dates))
    returns_df['Commodities'] = np.random.normal(0.005, 0.05, len(dates))
    returns_df['Cash'] = np.random.normal(0.001, 0.001, len(dates))
    
    # 计算策略收益
    strategy_returns = (weights_df.shift(1) * returns_df).sum(axis=1)
    cumulative_returns = (1 + strategy_returns).cumprod()
    
    # 基准:60/40股债组合
    benchmark_weights = pd.DataFrame({
        'Stocks': [0.6] * len(dates),
        'Bonds': [0.4] * len(dates),
        'Commodities': [0] * len(dates),
        'Cash': [0] * len(dates)
    }, index=dates)
    
    benchmark_returns = (benchmark_weights.shift(1) * returns_df).sum(axis=1)
    benchmark_cumulative = (1 + benchmark_returns).cumprod()
    
    # 可视化
    fig, axes = plt.subplots(3, 1, figsize=(14, 12))
    
    # 估值水平
    for asset in valuation_df.columns:
        axes[0].plot(valuation_df.index, valuation_df[asset], label=asset)
    axes[0].set_title('各类资产估值水平 (Z-score)')
    axes[0].legend()
    axes[0].grid(True)
    axes[0].axhline(y=0, color='black', linestyle='-', alpha=0.3)
    
    # 配置权重
    for asset in weights_df.columns:
        axes[1].plot(weights_df.index, weights_df[asset] * 100, label=asset)
    axes[1].set_title('动态配置权重 (%)')
    axes[1].legend()
    axes[1].grid(True)
    axes[1].set_ylabel('权重 (%)')
    
    # 策略对比
    axes[2].plot(cumulative_returns.index, cumulative_returns.values, 
                 linewidth=2, label='均值回归策略')
    axes[2].plot(benchmark_cumulative.index, benchmark_cumulative.values, 
                 linewidth=2, linestyle='--', label='60/40基准')
    axes[2].set_title('策略收益对比')
    axes[2].legend()
    axes[2].grid(True)
    axes[2].set_ylabel('累计收益')
    
    plt.tight_layout()
    plt.show()
    
    # 统计对比
    print("策略表现对比:")
    print(f"均值回归策略 - 总收益: {cumulative_returns.iloc[-1] - 1:.2%}, 年化: {(cumulative_returns.iloc[-1] ** (12/len(dates)) - 1):.2%}")
    print(f"60/40基准    - 总收益: {benchmark_cumulative.iloc[-1] - 1:.2%}, 年化: {(benchmark_cumulative.iloc[-1] ** (12/len(dates)) - 1):.2%}")
    print(f"超额收益: {cumulative_returns.iloc[-1] / benchmark_cumulative.iloc[-1] - 1:.2%}")

if __name__ == "__main__":
    cross_asset_allocation()

常见误区与风险规避

误区一:过度依赖历史均值

问题:将历史均值视为固定不变的锚点,忽视基本面变化。

案例:2000年互联网泡沫时期,科技股的市盈率远超历史均值,但基本面(盈利增长)也在发生根本性变化。简单做空会导致巨大损失。

解决方案

  1. 动态均值:使用滚动均值而非固定均值
  2. 基本面调整:考虑盈利增长率、利率环境等因素
  3. 分位数分析:使用历史分位数而非绝对值
def dynamic_mean_adjustment():
    """
    动态均值调整示例
    """
    np.random.seed(42)
    dates = pd.date_range('2010-01-01', '2023-12-31', freq='M')
    
    # 模拟PE数据,包含结构性变化
    pe = 15 + 0.1 * np.arange(len(dates)) + np.random.normal(0, 2, len(dates))
    pe[100:150] += 10  # 模拟结构性上升
    
    # 传统固定均值
    fixed_mean = np.mean(pe[:100])
    
    # 动态滚动均值
    rolling_mean = pd.Series(pe).rolling(60).mean()
    
    # 考虑趋势的动态均值(指数加权)
    ewm_mean = pd.Series(pe).ewm(span=60).mean()
    
    # 可视化
    plt.figure(figsize=(14, 7))
    plt.plot(dates, pe, label='实际PE', alpha=0.7)
    plt.plot(dates, [fixed_mean] * len(dates), 'r--', label=f'固定均值: {fixed_mean:.2f}')
    plt.plot(dates, rolling_mean, 'g-', label='滚动均值(60M)')
    plt.plot(dates, ewm_mean, 'b-.', label='指数加权均值')
    plt.title('不同均值计算方法对比')
    plt.legend()
    plt.grid(True)
    plt.show()
    
    # 计算信号差异
    fixed_signal = (pe > fixed_mean + 3).astype(int) - (pe < fixed_mean - 3).astype(int)
    dynamic_signal = (pe > rolling_mean + 3).astype(int) - (pe < rolling_mean - 3).astype(int)
    
    print("信号对比:")
    print(f"固定均值信号次数: {np.sum(fixed_signal != 0)}")
    print(f"动态均值信号次数: {np.sum(dynamic_signal != 0)}")
    print(f"信号差异率: {np.mean(fixed_signal != dynamic_signal):.2%}")

if __name__ == "__main__":
    dynamic_mean_adjustment()

误区二:忽视交易成本

问题:频繁交易导致成本侵蚀收益。

解决方案

  1. 降低交易频率:使用更长的信号周期
  2. 成本敏感优化:在信号生成中考虑交易成本
  3. 阈值调整:提高触发阈值,减少无效交易
def transaction_cost_aware_strategy():
    """
    考虑交易成本的策略优化
    """
    np.random.seed(42)
    dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
    
    # 模拟价格数据
    price = 100 + np.cumsum(np.random.normal(0, 1, len(dates)))
    price = pd.Series(price, index=dates)
    
    # 交易成本参数
    cost_per_trade = 0.001  # 0.1% per trade
    slippage = 0.0005       # 0.05% slippage
    
    def backtest_with_costs(threshold, lookback):
        signals = pd.Series(0, index=dates)
        rolling_mean = price.rolling(lookback).mean()
        rolling_std = price.rolling(lookback).std()
        z_score = (price - rolling_mean) / rolling_std
        
        signals[z_score > threshold] = -1
        signals[z_score < -threshold] = 1
        signals[abs(z_score) < 0.5] = 0
        
        # 计算收益(含成本)
        position = 0
        returns = []
        total_cost = 0
        
        for i in range(1, len(dates)):
            signal = signals.iloc[i]
            prev_signal = signals.iloc[i-1]
            price_change = price.iloc[i] / price.iloc[i-1] - 1
            
            # 交易信号变化时产生成本
            if signal != prev_signal:
                cost = cost_per_trade + slippage
                total_cost += cost
                # 成本在当天扣除
                daily_return = price_change - cost if signal != 0 else price_change
            else:
                daily_return = price_change if position != 0 else 0
            
            returns.append(daily_return)
            position = signal
        
        cumulative = (1 + pd.Series(returns, index=dates[1:])).cumprod()
        return cumulative.iloc[-1], total_cost
    
    # 测试不同阈值
    thresholds = [0.5, 1.0, 1.5, 2.0]
    lookbacks = [20, 60, 120, 252]
    
    results = []
    for thr in thresholds:
        for lb in lookbacks:
            final_value, cost = backtest_with_costs(thr, lb)
            results.append({
                'threshold': thr,
                'lookback': lb,
                'final_value': final_value,
                'total_cost': cost,
                'net_return': final_value - 1
            })
    
    results_df = pd.DataFrame(results)
    
    # 找出最优参数
    best = results_df.loc[results_df['net_return'].idxmax()]
    
    print("考虑交易成本的优化结果:")
    print(f"最优阈值: {best['threshold']}")
    print(f"最优回看周期: {best['lookback']}天")
    print(f"净收益率: {best['net_return']:.2%}")
    print(f"总交易成本: {best['total_cost']:.2%}")
    
    # 可视化
    plt.figure(figsize=(12, 6))
    for lb in lookbacks:
        subset = results_df[results_df['lookback'] == lb]
        plt.plot(subset['threshold'], subset['net_return'], 
                 marker='o', label=f'回看{lb}天')
    
    plt.axvline(x=best['threshold'], color='r', linestyle='--', 
                label=f'最优阈值: {best["threshold"]}')
    plt.title('不同参数下的净收益率')
    plt.xlabel('阈值')
    plt.ylabel('净收益率')
    plt.legend()
    plt.grid(True)
    plt.show()

if __name__ == "__main__":
    transaction_cost_aware_strategy()

误区三:忽视尾部风险

问题:均值回归在极端市场条件下可能失效,导致巨大损失。

解决方案

  1. 止损机制:设置最大回撤止损
  2. 仓位管理:根据波动率动态调整仓位
  3. 压力测试:模拟极端市场情况
def risk_managed_strategy():
    """
    风险管理的均值回归策略
    """
    np.random.seed(42)
    dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
    
    # 模拟价格数据(包含极端波动)
    returns = np.random.normal(0.001, 0.02, len(dates))
    # 添加极端事件
    extreme_events = np.random.choice(len(dates), size=5, replace=False)
    returns[extreme_events] *= 5  # 5倍波动
    
    price = 100 * (1 + returns).cumprod()
    price = pd.Series(price, index=dates)
    
    # 基础策略
    lookback = 60
    threshold = 1.5
    rolling_mean = price.rolling(lookback).mean()
    rolling_std = price.rolling(lookback).std()
    z_score = (price - rolling_mean) / rolling_std
    
    signals = pd.Series(0, index=dates)
    signals[z_score > threshold] = -1
    signals[z_score < -threshold] = 1
    
    # 风险管理层
    def apply_risk_management(signals, price):
        """
        应用风险管理规则
        """
        managed_signals = signals.copy()
        
        # 1. 止损:最大回撤超过15%时清仓
        cumulative = (1 * signals.shift(1) * price.pct_change()).cumsum()
        drawdown = cumulative - cumulative.cummax()
        stop_loss = drawdown < -0.15
        
        managed_signals[stop_loss] = 0
        
        # 2. 波动率调整:当波动率超过阈值时,减少仓位
        volatility = price.pct_change().rolling(20).std()
        high_vol = volatility > volatility.quantile(0.9)
        
        managed_signals[high_vol & (managed_signals != 0)] *= 0.5
        
        # 3. 时间衰减:持仓时间过长时逐步平仓
        position_duration = 0
        for i in range(len(managed_signals)):
            if managed_signals.iloc[i] != 0:
                if i > 0 and managed_signals.iloc[i] == managed_signals.iloc[i-1]:
                    position_duration += 1
                else:
                    position_duration = 0
                
                if position_duration > 60:  # 持仓超过60天
                    managed_signals.iloc[i] *= 0.5
        
        return managed_signals
    
    # 应用风险管理
    managed_signals = apply_risk_management(signals, price)
    
    # 回测对比
    def calculate_returns(signals, price):
        returns = (signals.shift(1) * price.pct_change()).fillna(0)
        cumulative = (1 + returns).cumprod()
        return cumulative, returns
    
    base_cum, base_ret = calculate_returns(signals, price)
    managed_cum, managed_ret = calculate_returns(managed_signals, price)
    
    # 可视化
    fig, axes = plt.subplots(3, 1, figsize=(14, 12))
    
    # 价格和信号
    axes[0].plot(price.index, price.values, label='价格', alpha=0.7)
    axes[0].plot(price.index, price * (signals > 0), 'g^', markersize=3, label='基础做多')
    axes[0].plot(price.index, price * (signals < 0), 'rv', markersize=3, label='基础做空')
    axes[0].plot(price.index, price * (managed_signals > 0), 'g+', markersize=5, label='管理后做多')
    axes[0].plot(price.index, price * (managed_signals < 0), 'rx', markersize=5, label='管理后做空')
    axes[0].set_title('价格与信号对比')
    axes[0].legend()
    axes[0].grid(True)
    
    # 累计收益
    axes[1].plot(base_cum.index, base_cum.values, label='基础策略', linewidth=2)
    axes[1].plot(managed_cum.index, managed_cum.values, label='风险管理策略', linewidth=2)
    axes[1].set_title('累计收益对比')
    axes[1].legend()
    axes[1].grid(True)
    
    # 回撤
    base_dd = base_cum / base_cum.cummax() - 1
    managed_dd = managed_cum / managed_cum.cummax() - 1
    axes[2].fill_between(base_dd.index, base_dd, 0, alpha=0.3, label='基础策略回撤')
    axes[2].fill_between(managed_dd.index, managed_dd, 0, alpha=0.3, label='管理后回撤')
    axes[2].set_title('最大回撤对比')
    axes[2].legend()
    axes[2].grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # 统计对比
    print("风险管理效果:")
    print(f"基础策略 - 最大回撤: {base_dd.min():.2%}, 夏普比率: {base_ret.mean()/base_ret.std():.2f}")
    print(f"管理策略 - 最大回撤: {managed_dd.min():.2%}, 夏普比率: {managed_ret.mean()/managed_ret.std():.2f}")
    print(f"回撤改善: {abs(managed_dd.min()) / abs(base_dd.min()) - 1:.2%}")

if __name__ == "__main__":
    risk_managed_strategy()

误区四:忽视市场周期

问题:均值回归在牛市和熊市中的表现差异很大。

解决方案

  1. 市场状态识别:使用机器学习识别市场周期
  2. 自适应参数:根据市场状态调整阈值
  3. 多策略组合:结合趋势跟踪和均值回归
def market_regime_detection():
    """
    市场状态识别与自适应策略
    """
    np.random.seed(42)
    dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
    
    # 模拟不同市场状态的价格
    returns = np.zeros(len(dates))
    
    # 牛市阶段
    returns[:400] = np.random.normal(0.001, 0.01, 400)
    # 熊市阶段
    returns[400:700] = np.random.normal(-0.001, 0.015, 300)
    # 震荡市阶段
    returns[700:] = np.random.normal(0, 0.008, len(returns)-700)
    
    price = 100 * (1 + returns).cumprod()
    price = pd.Series(price, index=dates)
    
    # 市场状态识别指标
    def identify_regime(prices):
        """
        识别市场状态
        """
        returns = prices.pct_change()
        
        # 计算指标
        momentum = returns.rolling(60).sum()  # 60日动量
        volatility = returns.rolling(60).std()  # 60日波动率
        drawdown = prices / prices.cummax() - 1  # 当前回撤
        
        # 状态分类
        regime = pd.Series('Neutral', index=prices.index)
        
        # 牛市:动量高、波动率低、回撤小
       牛市条件 = (momentum > 0.05) & (volatility < returns.std() * 1.2) & (drawdown > -0.1)
        regime[牛市条件] = 'Bull'
        
        # 熊市:动量低、波动率高、回撤大
        熊市条件 = (momentum < -0.05) & (volatility > returns.std() * 1.5) & (drawdown < -0.15)
        regime[熊市条件] = 'Bear'
        
        # 震荡市:其他
        regime[~牛市条件 & ~熊市条件] = 'Range'
        
        return regime, momentum, volatility, drawdown
    
    regime, momentum, volatility, drawdown = identify_regime(price)
    
    # 自适应策略
    def adaptive_strategy(prices, regime_series):
        """
        根据市场状态调整策略参数
        """
        signals = pd.Series(0, index=prices.index)
        
        # 不同状态使用不同参数
        params = {
            'Bull': {'threshold': 2.0, 'lookback': 120},    # 牛市:更严格
            'Bear': {'threshold': 1.0, 'lookback': 30},     # 熊市:更宽松
            'Range': {'threshold': 1.5, 'lookback': 60}     # 震荡:标准
        }
        
        for state in ['Bull', 'Bear', 'Range']:
            mask = regime_series == state
            if not mask.any():
                continue
            
            param = params[state]
            lookback = param['lookback']
            threshold = param['threshold']
            
            rolling_mean = prices[mask].rolling(lookback).mean()
            rolling_std = prices[mask].rolling(lookback).std()
            z_score = (prices[mask] - rolling_mean) / rolling_std
            
            state_signals = pd.Series(0, index=prices[mask].index)
            state_signals[z_score > threshold] = -1
            state_signals[z_score < -threshold] = 1
            
            signals[mask] = state_signals
        
        return signals
    
    # 对比固定参数策略
    def fixed_strategy(prices, threshold=1.5, lookback=60):
        rolling_mean = prices.rolling(lookback).mean()
        rolling_std = prices.rolling(lookback).std()
        z_score = (prices - rolling_mean) / rolling_std
        
        signals = pd.Series(0, index=prices.index)
        signals[z_score > threshold] = -1
        signals[z_score < -threshold] = 1
        
        return signals
    
    # 计算收益
    fixed_sig = fixed_strategy(price)
    adaptive_sig = adaptive_strategy(price, regime)
    
    def calc_returns(sig, price):
        ret = (sig.shift(1) * price.pct_change()).fillna(0)
        cum = (1 + ret).cumprod()
        return cum, ret
    
    fixed_cum, fixed_ret = calc_returns(fixed_sig, price)
    adaptive_cum, adaptive_ret = calc_returns(adaptive_sig, price)
    
    # 可视化
    fig, axes = plt.subplots(3, 1, figsize=(14, 12))
    
    # 价格和状态
    axes[0].plot(price.index, price.values, label='价格', linewidth=1)
    
    # 标记状态区域
    for state, color in zip(['Bull', 'Bear', 'Range'], ['green', 'red', 'blue']):
        mask = regime == state
        if mask.any():
            axes[0].fill_between(price.index, price.min(), price.max(), 
                                 where=mask, alpha=0.2, color=color, label=state)
    
    axes[0].set_title('价格与市场状态')
    axes[0].legend()
    axes[0].grid(True)
    
    # 策略收益对比
    axes[1].plot(fixed_cum.index, fixed_cum.values, label='固定参数', linewidth=2)
    axes[1].plot(adaptive_cum.index, adaptive_cum.values, label='自适应参数', linewidth=2)
    axes[1].set_title('策略收益对比')
    axes[1].legend()
    axes[1].grid(True)
    
    # 状态分布
    state_counts = regime.value_counts()
    axes[2].bar(state_counts.index, state_counts.values, 
                color=['green', 'red', 'blue'], alpha=0.7)
    axes[2].set_title('市场状态分布')
    axes[2].set_ylabel('天数')
    axes[2].grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # 统计对比
    print("自适应策略效果:")
    print(f"固定参数 - 总收益: {fixed_cum.iloc[-1] - 1:.2%}, 夏普: {fixed_ret.mean()/fixed_ret.std():.2f}")
    print(f"自适应   - 总收益: {adaptive_cum.iloc[-1] - 1:.2%}, 夏普: {adaptive_ret.mean()/adaptive_ret.std():.2f}")
    print(f"超额收益: {adaptive_cum.iloc[-1] / fixed_cum.iloc[-1] - 1:.2%}")
    
    # 状态表现分析
    print("\n各状态下策略表现:")
    for state in ['Bull', 'Bear', 'Range']:
        mask = regime == state
        if mask.any():
            fixed_state_ret = (fixed_sig.shift(1) * price.pct_change())[mask].mean()
            adaptive_state_ret = (adaptive_sig.shift(1) * price.pct_change())[mask].mean()
            print(f"{state} - 固定: {fixed_state_ret:.4f}, 自适应: {adaptive_state_ret:.4f}")

if __name__ == "__main__":
    market_regime_detection()

高级应用:机器学习增强的均值回归

随机森林增强模型

使用机器学习预测均值回归的概率:

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

def ml_enhanced_mean_reversion():
    """
    机器学习增强的均值回归策略
    """
    np.random.seed(42)
    dates = pd.date_range('2015-01-01', '2023-12-31', freq='D')
    
    # 创建特征数据
    price = 100 + np.cumsum(np.random.normal(0, 1, len(dates)))
    price = pd.Series(price, index=dates)
    
    # 生成特征
    features = pd.DataFrame(index=dates)
    
    # 基础技术指标
    features['price'] = price
    features['returns'] = price.pct_change()
    features['ma_20'] = price.rolling(20).mean()
    features['ma_60'] = price.rolling(60).mean()
    features['std_20'] = price.rolling(20).std()
    features['rsi'] = calculate_rsi(price)  # RSI指标
    
    # 滞后特征
    for lag in [1, 5, 10, 20]:
        features[f'return_lag_{lag}'] = price.pct_change(lag)
        features[f'zscore_lag_{lag}'] = (price - price.rolling(60).mean()) / price.rolling(60).std().shift(lag)
    
    # 目标变量:未来5天的收益率是否回归
    future_returns = price.pct_change(5).shift(-5)
    features['target'] = (future_returns < -0.02).astype(int)  # 未来下跌概率
    
    # 清理NaN值
    features = features.dropna()
    
    # 分割数据
    X = features.drop('target', axis=1)
    y = features['target']
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)
    
    # 训练模型
    model = RandomForestClassifier(n_estimators=100, random_state=42, max_depth=5)
    model.fit(X_train, y_train)
    
    # 预测
    predictions = model.predict(X_test)
    probabilities = model.predict_proba(X_test)[:, 1]
    
    # 生成交易信号
    signals = pd.Series(0, index=X_test.index)
    signals[probabilities > 0.6] = 1  # 高概率下跌,做空
    signals[probabilities < 0.4] = -1  # 低概率下跌,做多
    
    # 回测
    returns = (signals.shift(1) * price.pct_change()[X_test.index]).fillna(0)
    cumulative = (1 + returns).cumprod()
    
    # 对比传统Z-score策略
    z_score = (price - price.rolling(60).mean()) / price.rolling(60).std()
    z_signals = pd.Series(0, index=X_test.index)
    z_signals[z_score > 1.5] = -1
    z_signals[z_score < -1.5] = 1
    
    z_returns = (z_signals.shift(1) * price.pct_change()[X_test.index]).fillna(0)
    z_cumulative = (1 + z_returns).cumprod()
    
    # 可视化
    fig, axes = plt.subplots(2, 1, figsize=(14, 10))
    
    # 收益对比
    axes[0].plot(cumulative.index, cumulative.values, label='ML增强策略', linewidth=2)
    axes[0].plot(z_cumulative.index, z_cumulative.values, label='传统Z-score', linewidth=2, linestyle='--')
    axes[0].set_title('策略收益对比')
    axes[0].legend()
    axes[0].grid(True)
    
    # 特征重要性
    importances = model.feature_importances_
    indices = np.argsort(importances)[-10:]
    axes[1].barh(range(10), importances[indices])
    axes[1].set_yticks(range(10))
    axes[1].set_yticklabels([X.columns[i] for i in indices])
    axes[1].set_title('特征重要性 (Top 10)')
    axes[1].grid(True)
    
    plt.tight_layout()
    plt.show()
    
    # 性能评估
    print("模型性能:")
    print(classification_report(y_test, predictions))
    print("\n策略表现:")
    print(f"ML策略 - 总收益: {cumulative.iloc[-1] - 1:.2%}")
    print(f"Z-score - 总收益: {z_cumulative.iloc[-1] - 1:.2%}")
    print(f"超额收益: {cumulative.iloc[-1] / z_cumulative.iloc[-1] - 1:.2%}")

def calculate_rsi(prices, period=14):
    """计算RSI指标"""
    delta = prices.diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
    rs = gain / loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

if __name__ == "__main__":
    ml_enhanced_mean_reversion()

实战案例:完整投资组合管理

案例背景

假设我们管理一个1000万美元的基金,需要在股票、债券、商品和另类投资之间进行配置。

实施框架

class PortfolioManager:
    def __init__(self, initial_capital=10_000_000):
        self.initial_capital = initial_capital
        self.current_capital = initial_capital
        self.positions = {}
        self.history = []
        
    def run_portfolio(self, data_dict, start_date='2020-01-01', end_date='2023-12-31'):
        """
        运行完整投资组合管理
        """
        dates = pd.date_range(start_date, end_date, freq='M')
        
        for date in dates:
            # 1. 获取当前数据
            current_data = {asset: data.loc[:date].iloc[-1] for asset, data in data_dict.items()}
            
            # 2. 计算估值
            valuations = self.calculate_valuations(current_data)
            
            # 3. 生成配置方案
            weights = self.optimize_weights(valuations)
            
            # 4. 执行调仓
            self.rebalance(weights, current_data, date)
            
            # 5. 记录历史
            self.history.append({
                'date': date,
                'capital': self.current_capital,
                'positions': self.positions.copy(),
                'weights': weights
            })
        
        return pd.DataFrame(self.history)
    
    def calculate_valuations(self, current_data):
        """计算估值指标"""
        valuations = {}
        for asset, data in current_data.items():
            if asset == 'Stocks':
                # 使用PE作为估值
                pe = data['PE']
                rolling_pe = data['PE_Rolling']
                z_score = (pe - rolling_pe.mean()) / rolling_pe.std()
                valuations[asset] = z_score
            elif asset == 'Bonds':
                # 使用收益率
                yield_ = data['Yield']
                rolling_yield = data['Yield_Rolling']
                z_score = (yield_ - rolling_yield.mean()) / rolling_yield.std()
                valuations[asset] = -z_score  # 收益率越低越好
            elif asset == 'Commodities':
                # 使用库存/需求比
                ratio = data['Inventory_Demand_Ratio']
                rolling_ratio = data['Ratio_Rolling']
                z_score = (ratio - rolling_ratio.mean()) / rolling_ratio.std()
                valuations[asset] = -z_score
            elif asset == 'Real_Estate':
                # 使用租金收益率
                yield_ = data['Rent_Yield']
                rolling_yield = data['Yield_Rolling']
                z_score = (yield_ - rolling_yield.mean()) / rolling_yield.std()
                valuations[asset] = z_score
        
        return valuations
    
    def optimize_weights(self, valuations):
        """优化权重"""
        # 反向配置
        raw_weights = {asset: -z for asset, z in valuations.items()}
        
        # 风险平价调整
        total = sum(abs(w) for w in raw_weights.values())
        weights = {asset: abs(w) / total for asset, w in raw_weights.items()}
        
        # 最小权重限制
        for asset in weights:
            weights[asset] = max(weights[asset], 0.05)  # 至少5%
        
        # 归一化
        total = sum(weights.values())
        weights = {asset: w / total for asset, w in weights.items()}
        
        return weights
    
    def rebalance(self, weights, current_data, date):
        """执行再平衡"""
        target_value = self.current_capital
        
        # 计算目标持仓
        target_positions = {}
        for asset, weight in weights.items():
            target_positions[asset] = target_value * weight
        
        # 计算调整量
        adjustments = {}
        for asset in target_positions:
            current = self.positions.get(asset, 0)
            target = target_positions[asset]
            adjustments[asset] = target - current
        
        # 执行交易(简化,忽略交易成本)
        for asset, adj in adjustments.items():
            if abs(adj) > target_value * 0.01:  # 调整幅度超过1%才交易
                self.positions[asset] = target_positions[asset]
        
        # 记录交易
        self.history.append({
            'date': date,
            'action': 'REBALANCE',
            'adjustments': adjustments
        })
    
    def get_performance(self):
        """计算绩效指标"""
        if not self.history:
            return None
        
        df = pd.DataFrame(self.history)
        df = df[df['capital'].notna()]  # 只保留有资本记录的行
        
        returns = df['capital'].pct_change().fillna(0)
        
        metrics = {
            '总收益率': (df['capital'].iloc[-1] / self.initial_capital - 1),
            '年化收益率': (df['capital'].iloc[-1] / self.initial_capital) ** (12/len(df)) - 1,
            '波动率': returns.std() * np.sqrt(12),
            '夏普比率': returns.mean() / returns.std() * np.sqrt(12) if returns.std() != 0 else 0,
            '最大回撤': (df['capital'] / df['capital'].cummax() - 1).min(),
            '最终资本': df['capital'].iloc[-1]
        }
        
        return metrics

# 模拟数据生成
def generate_portfolio_data():
    """生成投资组合数据"""
    np.random.seed(42)
    dates = pd.date_range('2020-01-01', '2023-12-31', freq='M')
    
    data_dict = {}
    
    # 股票数据
    stock_data = pd.DataFrame({
        'PE': 20 + np.random.normal(0, 5, len(dates)),
        'PE_Rolling': pd.Series(20 + np.random.normal(0, 2, len(dates))).rolling(12).mean()
    }, index=dates)
    data_dict['Stocks'] = stock_data
    
    # 债券数据
    bond_data = pd.DataFrame({
        'Yield': 3.0 + np.random.normal(0, 0.5, len(dates)),
        'Yield_Rolling': pd.Series(3.0 + np.random.normal(0, 0.2, len(dates))).rolling(12).mean()
    }, index=dates)
    data_dict['Bonds'] = bond_data
    
    # 商品数据
    commodity_data = pd.DataFrame({
        'Inventory_Demand_Ratio': 1.0 + np.random.normal(0, 0.3, len(dates)),
        'Ratio_Rolling': pd.Series(1.0 + np.random.normal(0, 0.1, len(dates))).rolling(12).mean()
    }, index=dates)
    data_dict['Commodities'] = commodity_data
    
    # 房地产数据
    real_estate_data = pd.DataFrame({
        'Rent_Yield': 4.5 + np.random.normal(0, 0.5, len(dates)),
        'Yield_Rolling': pd.Series(4.5 + np.random.normal(0, 0.2, len(dates))).rolling(12).mean()
    }, index=dates)
    data_dict['Real_Estate'] = real_estate_data
    
    return data_dict

# 运行完整案例
def run_complete_case():
    """运行完整案例"""
    print("=" * 60)
    print("完整投资组合管理案例")
    print("=" * 60)
    
    # 生成数据
    data_dict = generate_portfolio_data()
    
    # 初始化管理器
    manager = PortfolioManager(initial_capital=10_000_000)
    
    # 运行策略
    history = manager.run_portfolio(data_dict)
    
    # 获取绩效
    metrics = manager.get_performance()
    
    print("\n绩效指标:")
    for key, value in metrics.items():
        if isinstance(value, float):
            print(f"{key}: {value:.2%}" if '收益率' in key or '回撤' in key else f"{key}: {value:,.2f}")
        else:
            print(f"{key}: {value}")
    
    # 可视化
    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    
    # 资本曲线
    capital_data = history[history['capital'].notna()]
    axes[0, 0].plot(capital_data['date'], capital_data['capital'] / 1e6, linewidth=2)
    axes[0, 0].set_title('资本曲线 (百万美元)')
    axes[0, 0].grid(True)
    
    # 权重变化
    weights_df = pd.DataFrame([h['weights'] for h in history if 'weights' in h])
    if not weights_df.empty:
        for col in weights_df.columns:
            axes[0, 1].plot(weights_df.index, weights_df[col] * 100, label=col)
        axes[0, 1].set_title('资产权重变化 (%)')
        axes[0, 1].legend()
        axes[0, 1].grid(True)
    
    # 持仓价值
    positions_df = pd.DataFrame([h['positions'] for h in history if 'positions' in h])
    if not positions_df.empty:
        for col in positions_df.columns:
            axes[1, 0].plot(positions_df.index, positions_df[col] / 1e6, label=col)
        axes[1, 0].set_title('持仓价值 (百万美元)')
        axes[1, 0].legend()
        axes[1, 0].grid(True)
    
    # 月度收益分布
    returns = capital_data['capital'].pct_change().dropna()
    axes[1, 1].hist(returns, bins=20, alpha=0.7, edgecolor='black')
    axes[1, 1].set_title('月度收益分布')
    axes[1, 1].axvline(returns.mean(), color='red', linestyle='--', label=f'均值: {returns.mean():.2%}')
    axes[1, 1].legend()
    axes[1, 1].grid(True)
    
    plt.tight_layout()
    plt.show()
    
    return manager, history, metrics

if __name__ == "__main__":
    manager, history, metrics = run_complete_case()

总结与最佳实践

核心要点回顾

  1. 理论基础:均值回归是资产价格的统计规律,但需要结合基本面分析
  2. 模型构建:从简单的Z-score到复杂的多因子模型,关键是找到有效的估值指标
  3. 风险管理:止损、仓位管理、压力测试缺一不可
  4. 参数优化:避免过拟合,使用滚动窗口优化
  5. 市场适应:识别市场状态,动态调整策略

最佳实践清单

应该做的

  • 使用滚动均值而非固定均值
  • 考虑交易成本和滑点
  • 设置严格的止损机制
  • 定期进行压力测试
  • 结合基本面分析
  • 保持足够的分散化

不应该做的

  • 盲目依赖历史均值
  • 频繁交易
  • 忽视尾部风险
  • 过度拟合参数
  • 孤立使用单一指标
  • 忽视市场周期

持续改进框架

def continuous_improvement_framework():
    """
    持续改进框架
    """
    framework = {
        '监控指标': [
            '策略夏普比率',
            '最大回撤',
            '交易频率',
            '胜率',
            '盈亏比'
        ],
        '定期评估': [
            '每周:检查交易执行情况',
            '每月:评估参数稳定性',
            '每季:进行压力测试',
            '每年:全面策略回顾'
        ],
        '优化方向': [
            '降低交易成本',
            '提高信号质量',
            '增强风险控制',
            '扩展资产范围',
            '引入新数据源'
        ]
    }
    
    return framework

if __name__ == "__main__":
    framework = continuous_improvement_framework()
    print("持续改进框架:")
    for key, items in framework.items():
        print(f"\n{key}:")
        for item in items:
            print(f"  - {item}")

均值回归理论为资产配置提供了强大的理论基础和实用的工具。通过系统性的模型构建、严格的风险管理和持续的优化改进,投资者可以在市场波动中有效捕捉价值回归机会,同时规避常见误区。关键在于保持理性、尊重统计规律,并始终将风险管理放在首位。