股票组合VaR计算器-蒙特卡洛模拟法

akshare获取数据

#股票组合VaR计算器,蒙特卡洛模拟法,akshare获取数据
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import akshare as ak
from datetime import datetime, timedelta
import seaborn as sns

# 设置中文显示
plt.rcParams["font.family"] = ['SimHei', 'Microsoft YaHei', 'SimSun', 'KaiTi']
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题

class StockVaRCalculator:
    """A股股票组合VaR计算器,使用蒙特卡洛模拟法和akshare获取数据"""
    
    def __init__(self, stock_tickers, weights=None, investment_value=1000000, 
                 simulation_days=10, num_simulations=10000, confidence_level=0.99):
        """
        初始化股票组合VaR计算器
        
        参数:
            stock_tickers: 股票代码列表,使用akshare格式(例如:'sh600000' 表示浦发银行)
            weights: 各股票权重,默认为等权重
            investment_value: 投资组合总价值(元)
            simulation_days: 模拟的天数
            num_simulations: 蒙特卡洛模拟次数
            confidence_level: VaR计算的置信水平
        """
        self.stock_tickers = stock_tickers
        self.num_stocks = len(stock_tickers)
        
        # 处理权重
        if weights is None:
            self.weights = np.ones(self.num_stocks) / self.num_stocks
        else:
            self.weights = np.array(weights)
            # 归一化权重
            self.weights = self.weights / np.sum(self.weights)
            
        self.investment_value = investment_value
        self.simulation_days = simulation_days
        self.num_simulations = num_simulations
        self.confidence_level = confidence_level
        
        self.stock_data = None
        self.returns = None
        self.mean_returns = None
        self.cov_matrix = None
        self.simulation_results = None
        
    def fetch_stock_data(self, start_date=None, end_date=None):
        """使用akshare获取股票历史数据"""
        if end_date is None:
            end_date = datetime.now().strftime('%Y%m%d')
        if start_date is None:
            end_date_obj = datetime.strptime(end_date, '%Y%m%d')
            start_date_obj = end_date_obj - timedelta(days=365)
            start_date = start_date_obj.strftime('%Y%m%d')
            
        print(f"正在获取{self.stock_tickers}的历史数据,时间范围:{start_date}至{end_date}")
        
        all_data = pd.DataFrame()
        
        for ticker in self.stock_tickers:
            try:
                # 使用akshare获取股票日线数据
                stock_data = ak.stock_zh_a_hist_tx(symbol=ticker,  
                                              start_date=start_date, end_date=end_date, 
                                              adjust="")  # 使用前复权价格
                '''
                在使用 akshare 获取 A 股历史行情数据时,可以通过设置adjust参数来获取除权后数据。这个参数支持三种取值:
                "qfq":前复权(最常用,保持当前价格不变,调整历史价格)
                "hfq":后复权(保持历史价格不变,调整当前价格)
                "":不复权(原始价格)
                '''
                
                # 重命名列并添加股票代码
                stock_data = stock_data.rename(columns={'日期': 'date', '收盘': 'close'})
                stock_data['ticker'] = ticker
                stock_data = stock_data[['date', 'ticker', 'close']]
                
                # 转换日期格式
                stock_data['date'] = pd.to_datetime(stock_data['date'])
                
                # 添加到总数据中
                if all_data.empty:
                    all_data = stock_data
                else:
                    all_data = pd.concat([all_data, stock_data], ignore_index=True)
                    
                print(f"成功获取{ticker}的{len(stock_data)}天数据")
                
            except Exception as e:
                print(f"获取{ticker}数据失败: {e}")
        
        # 重塑数据为宽格式
        self.stock_data = all_data.pivot(index='date', columns='ticker', values='close')
        
        # 检查数据完整性
        if self.stock_data.isnull().any().any():
            print("警告:数据包含缺失值,正在进行前向填充")
            self.stock_data = self.stock_data.fillna(method='ffill')
        # 打印最后一天的股票价格(确认是否合理)
        print("最后一天股票价格:")
        print(var_calculator.stock_data.iloc[-1])    
        print(f"数据处理完成,最终数据集包含{len(self.stock_data)}天数据")


        # 验证价格合理性(示例:假设A股价格通常<500元)
        if (self.stock_data > 500).any().any():
            print("警告:检测到异常高价,可能数据单位错误,尝试除以10000")
            self.stock_data = self.stock_data / 10000  # 假设单位为"万元"
            print(f"**修正后最后一天价格: {self.stock_data.iloc[-1]}")

        return self.stock_data
    
    def calculate_returns(self):
        """计算股票收益率"""
        if self.stock_data is None:
            raise ValueError("请先获取股票数据")
            
        # 计算对数收益率
        self.returns = np.log(self.stock_data / self.stock_data.shift(1)).dropna()
        
        # 新增:打印收益率极值(正常应在±10%以内)
        print("收益率极值检测:")
        print(f"最小日收益率: {self.returns.min().min():.2%}")
        print(f"最大日收益率: {self.returns.max().max():.2%}")
        # 计算均值和协方差矩阵
        self.mean_returns = self.returns.mean()
        self.cov_matrix = self.returns.cov()
        
        return self.returns
        
    def run_monte_carlo_simulation(self):
        """运行蒙特卡洛模拟"""
        if self.returns is None:
            raise ValueError("请先计算收益率")
            
        print(f"正在运行蒙特卡洛模拟,模拟次数:{self.num_simulations},预测天数:{self.simulation_days}")
        
        # 计算日均值和协方差矩阵
        daily_mean = self.mean_returns
        daily_cov = self.cov_matrix
        
        # 调整portfolio_sims的维度顺序为 [股票数, 模拟次数, 天数]
        portfolio_sims = np.full(shape=(self.num_stocks, self.num_simulations, self.simulation_days), 
                                fill_value=0.0)
        
        initial_prices = self.stock_data.iloc[-1].values  # 使用最后一天的价格作为初始价格
        
        for s in range(self.num_simulations):
            # 生成相关的随机收益率
            daily_returns = np.random.multivariate_normal(
                daily_mean, 
                daily_cov, 
                self.simulation_days
            ).T  # [股票数, 天数]
            
            # 计算累积收益率(注意:这里使用指数函数将对数收益率转换回价格比率)
            cumulative_returns = np.exp(np.cumsum(daily_returns, axis=1))
            
            # 计算模拟价格路径
            portfolio_sims[:, s, :] = initial_prices.reshape(-1, 1) * cumulative_returns
        
        self.simulation_results = portfolio_sims
        return portfolio_sims
    
    def calculate_portfolio_var(self):
        """计算投资组合的VaR"""
        if self.simulation_results is None:
            raise ValueError("请先运行蒙特卡洛模拟")
            
        # 计算模拟结束时的投资组合价值
        final_values = np.zeros(self.num_simulations)
        
        for s in range(self.num_simulations):
            # 获取所有股票在最后一天的价格 [num_stocks]
            stock_values = self.simulation_results[:, s, -1]
            
            # 计算每只股票的价值 = 权重 * 初始投资 * 模拟价格/初始价格
            portfolio_value = np.sum(self.weights * self.investment_value * stock_values / 
                                    self.stock_data.iloc[-1].values)
            final_values[s] = portfolio_value
            
        # 计算投资组合价值变化
        portfolio_changes = final_values - self.investment_value
        
        # 计算VaR
        var_percentile = 100 * (1 - self.confidence_level)
        var = -np.percentile(portfolio_changes, var_percentile)
        
        # 计算CVaR (条件VaR)
        cvar_mask = portfolio_changes <= -var
        cvar = -np.mean(portfolio_changes[cvar_mask]) if np.sum(cvar_mask) > 0 else 0
        
        # 新增:打印模拟结果统计
        print("\n模拟结果统计:")
        print(f"初始投资组合价值: {self.investment_value:,}元")
        print(f"模拟后组合价值范围: 最小值={np.min(final_values):,.2f}元, 最大值={np.max(final_values):,.2f}元")
        print(f"组合价值变化范围: 最小值={np.min(portfolio_changes):,.2f}元, 最大值={np.max(portfolio_changes):,.2f}元")
        
        return {
            'VaR': var,
            'CVaR': cvar,
            'VaR_percentage': var / self.investment_value * 100,
            'CVaR_percentage': cvar / self.investment_value * 100,
            'portfolio_changes': portfolio_changes,
            'final_values': final_values
        }

    def plot_simulation_results(self, var_results):
        """可视化模拟结果"""
        plt.figure(figsize=(16, 10))
        
        # 1. 绘制模拟价格路径 - 修正维度顺序
        plt.subplot(2, 2, 1)
        for i, ticker in enumerate(self.stock_tickers):
            plt.plot(self.simulation_results[i, :, :].T, alpha=0.1)  # 转置以正确显示时间轴
            plt.title(f"{ticker} 价格模拟路径")
            plt.xlabel("天数")
            plt.ylabel("价格 (元)")
        
        # 2. 绘制投资组合价值分布
        plt.subplot(2, 2, 2)
        sns.histplot(var_results['final_values'], bins=50, kde=True)
        plt.axvline(x=self.investment_value, color='r', linestyle='--', label='初始价值')
        plt.axvline(x=self.investment_value - var_results['VaR'], color='g', linestyle='--', label=f'VaR@{self.confidence_level}')
        plt.title('投资组合价值分布')
        plt.xlabel('组合价值 (元)')
        plt.ylabel('频率')
        plt.legend()
        
        # 3. 绘制投资组合收益分布
        plt.subplot(2, 2, 3)
        sns.histplot(var_results['portfolio_changes'], bins=50, kde=True)
        var_percentile = 100 * (1 - self.confidence_level)
        var_value = -var_results['VaR']
        plt.axvline(x=0, color='r', linestyle='--', label='无收益')
        plt.axvline(x=var_value, color='g', linestyle='--', label=f'VaR@{self.confidence_level}')
        plt.title('投资组合收益分布')
        plt.xlabel('收益 (元)')
        plt.ylabel('频率')
        plt.legend()
        
        # 4. 绘制风险因子热图
        plt.subplot(2, 2, 4)
        sns.heatmap(self.cov_matrix, annot=True, cmap='coolwarm', fmt='g')
        plt.title('股票收益率协方差矩阵')
        
        plt.tight_layout()
        plt.show()    
        
    def calculate_risk_contribution(self, var_results):
        """计算各股票对投资组合风险的贡献"""
        if self.simulation_results is None:
            raise ValueError("请先运行蒙特卡洛模拟")
            
        # 计算每只股票的VaR贡献
        risk_contributions = {}
        
        for i, ticker in enumerate(self.stock_tickers):
            # 计算该股票的权重和波动率
            weight = self.weights[i]
            volatility = np.sqrt(self.cov_matrix.iloc[i, i])
            
            # 计算边际VaR (假设正态分布)
            portfolio_volatility = np.sqrt(np.dot(self.weights, np.dot(self.cov_matrix, self.weights)))
            marginal_var = (weight * volatility * self.cov_matrix.iloc[i, :].dot(self.weights)) / (portfolio_volatility ** 2)
            
            # 计算成分VaR
            component_var = marginal_var * var_results['VaR']
            
            # 计算百分比贡献
            pct_contribution = (component_var / var_results['VaR']) * 100
            
            risk_contributions[ticker] = {
                'weight': weight,
                'volatility': volatility,
                'marginal_var': marginal_var,
                'component_var': component_var,
                'pct_contribution': pct_contribution
            }
            
        return risk_contributions

# 使用示例
if __name__ == "__main__":
    # 设置要分析的股票代码(使用akshare格式)
    #stock_tickers = ['sh600000', 'sz000001', 'sh601318', 'sz000858', 'sh600519']  # 浦发银行、平安银行、中国平安、五粮液、贵州茅台
    stock_tickers = ['sh600938', f'sz000937', 'sh601288', 'sz002555', f'sz000538']  # 浦发银行、平安银行、中国平安、五粮液、贵州茅台

    # 设置权重(如果不设置,默认为等权重)
    #weights = [0.2, 0.2, 0.2, 0.2, 0.2]
    weights = [0.4,0.2,0.2,0.1,0.1]
    '''
    股票	    代码	    持仓	    持股比例    
    中国海油	600938	 ¥89,775.00 	0.399
    农业银行	601288	 ¥48,276.00 	0.215
    云南白药	000538	 ¥34,020.00 	0.151
    冀中能源	000937	 ¥52,806.00 	0.235
    三七互娱	002555	 ¥14,650.00 	0.065

    '''

    # 初始化计算器
    var_calculator = StockVaRCalculator(
        stock_tickers=stock_tickers,
        weights=weights,
        investment_value=1000000,  # 100万元投资组合
        simulation_days=10,        # 预测10天
        num_simulations=10000,     # 10000次模拟
        confidence_level=0.99      # 99%置信水平
    )
    
    # 获取数据
    var_calculator.fetch_stock_data()
    
    # 计算收益率
    var_calculator.calculate_returns()
    
    # 运行蒙特卡洛模拟
    var_calculator.run_monte_carlo_simulation()
    
    # 计算VaR
    var_results = var_calculator.calculate_portfolio_var()
    
    # 计算风险贡献
    risk_contributions = var_calculator.calculate_risk_contribution(var_results)

    # 打印收益率统计特征(正常A股日收益率均值约0.03%,标准差约2%)
    print("收益率统计:")
    print(var_calculator.returns.describe()) 
    # 打印结果
    print("\n===== 风险价值计算结果 =====")
    print(f"投资组合总价值: {var_calculator.investment_value:,}元")
    print(f"{var_calculator.simulation_days}天持有期,{var_calculator.confidence_level*100}%置信水平下:")
    print(f"VaR: {var_results['VaR']:,.2f}元 ({var_results['VaR_percentage']:.2f}%)")
    print(f"CVaR: {var_results['CVaR']:,.2f}元 ({var_results['CVaR_percentage']:.2f}%)")
    
    print("\n===== 各股票风险贡献 =====")
    for ticker, contrib in risk_contributions.items():
        print(f"{ticker}: 权重={contrib['weight']:.2%}, 风险贡献={contrib['pct_contribution']:.2f}%")
    
    # 可视化结果
    var_calculator.plot_simulation_results(var_results)    

    '''
成功获取sh600938的243天数据
成功获取sz000937的243天数据
成功获取sh601288的243天数据
成功获取sz002555的243天数据
成功获取sz000538的243天数据
最后一天股票价格:
ticker
sh600938    25.79
sh601288     5.53
sz000538    56.48
sz000937     6.70
sz002555    14.67
Name: 2025-05-30 00:00:00, dtype: float64
数据处理完成,最终数据集包含243天数据
收益率极值检测:
最小日收益率: -10.46%
最大日收益率: 9.55%
正在运行蒙特卡洛模拟,模拟次数:10000,预测天数:10

模拟结果统计:
初始投资组合价值: 1,000,000元
模拟后组合价值范围: 最小值=855,834.29元, 最大值=1,143,978.61元
组合价值变化范围: 最小值=-144,165.71元, 最大值=143,978.61元
收益率统计:
ticker    sh600938    sh601288    sz000538    sz000937    sz002555
count   242.000000  242.000000  242.000000  242.000000  242.000000
mean     -0.000572    0.000954    0.000252   -0.000769    0.000091
std       0.019611    0.013706    0.013784    0.018998    0.025516
min      -0.100268   -0.047677   -0.076738   -0.086876   -0.104602
25%      -0.009177   -0.007230   -0.006349   -0.009977   -0.015922
50%       0.000350    0.002075    0.000086   -0.001723   -0.000382
75%       0.008776    0.008864    0.006127    0.008332    0.013346
max       0.078906    0.040574    0.077164    0.074801    0.095478

===== 风险价值计算结果 =====
投资组合总价值: 1,000,000元
10天持有期,99.0%置信水平下:
VaR: 87,123.09元 (8.71%)
CVaR: 99,360.77元 (9.94%)

===== 各股票风险贡献 =====
sh600938: 权重=40.00%, 风险贡献=1.08%
sz000937: 权重=20.00%, 风险贡献=0.14%
sh601288: 权重=20.00%, 风险贡献=0.20%
sz002555: 权重=10.00%, 风险贡献=0.20%
sz000538: 权重=10.00%, 风险贡献=0.24%
    '''