42  ex02_黄伊姿

42.1 上市公司财务特征分析

42.1.1 0. 环境准备

# 环境准备(已排除重复库,仅保留必要模块)
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import os
import re
from scipy.stats.mstats import winsorize
from pathlib import Path
from scipy import stats
import warnings
from typing import Optional, Dict, List
import matplotlib.dates as mdates
from matplotlib.ticker import FuncFormatter

# 设置中文字体
plt.rcParams["font.family"] = ["Heiti TC"]
plt.rcParams["axes.unicode_minus"] = False  # 解决负号显示问题
warnings.filterwarnings('ignore')  # 忽略警告


# 配置路径
DATA_PATH = "./data/"       # 原始Excel文件夹
CSV_PATH = "./csv_files/"  # CSV保存路径
OUTPUT_PATH = "./output/"  # 最终输出路径
os.makedirs(CSV_PATH, exist_ok=True)
os.makedirs(OUTPUT_PATH, exist_ok=True)

# 确保输出文件夹存在
if not os.path.exists(OUTPUT_PATH):
    os.makedirs(OUTPUT_PATH)

42.1.2 1. 从TXT文件解析指标汇总


# 定义目标文件夹和输出文件路径
DATA_PATH = "./data/"
OUTPUT_TXT = os.path.join(DATA_PATH, "指标汇总.txt")

# 遍历data文件夹下所有txt文件
english_lines = []
for file in os.listdir(DATA_PATH):
    if file.endswith(".txt") and file != "指标汇总.txt":
        with open(os.path.join(DATA_PATH, file), "r", encoding="utf-8") as f:
            for line in f:
                # 匹配英文开头的行(以字母开头,可根据实际调整正则)
                if re.match(r'^[A-Za-z_]', line.strip()):
                    english_lines.append(line)

# 去重并保存到指标汇总.txt
with open(OUTPUT_TXT, "w", encoding="utf-8") as f:
    f.writelines(list(set(english_lines)))  # 去重后写入
print(f"已抽取{len(english_lines)}条行指标到{OUTPUT_TXT}")
已抽取217条行指标到./data/指标汇总.txt

上面说明一共有217个指标,我可以把这些指标丢给AI agent,让他帮我找一下做这个分析思路所需要的指标。

42.1.3 2. 数据读取与预处理



# 检查第四行是否只包含NaN和"报表类型(合并报表);"
def should_delete_fourth_row(row):
    # 将NaN转换为字符串'nan'以便比较
    row_values = row.astype(str).tolist()
    # 检查所有值是否为'nan'或'报表类型(合并报表);'
    return all(val in {'nan', '报表类型(合并报表);'} for val in row_values)

# ----------------------
# 第一步:处理Excel并转为CSV(删除第四行报表类型行),并处理文件合并
# ----------------------
def process_excel_to_csv():
    print("开始处理Excel并转换为CSV...")

    # 检查数据路径是否存在
    if not os.path.exists(DATA_PATH):
        print(f"数据路径 {DATA_PATH} 不存在,请检查路径设置。")
        return

    # 列出数据路径下的所有内容
    path_contents = os.listdir(DATA_PATH)
    print(f"数据路径 {DATA_PATH} 下的内容: {path_contents}")

    # 创建字典用于存储需要合并的文件对
    file_pairs = {}

    # 获取所有Excel文件
    excel_files = [f for f in os.listdir(DATA_PATH) if f.endswith((".xlsx", ".xls"))]
    print(f"找到的Excel文件: {excel_files}")

    # 找出所有需要合并的文件对
    for i, file1 in enumerate(excel_files):
        for j, file2 in enumerate(excel_files):
            if i >= j:  # 避免重复比较
                continue

            name1, ext1 = os.path.splitext(file1)
            name2, ext2 = os.path.splitext(file2)

            # 去除 "2" 后比较剩余部分是否完全一致
            name1_without_2 = name1.replace("2", "")
            name2_without_2 = name2.replace("2", "")
            if name1_without_2 == name2_without_2:
                # 确定哪个文件名包含 "2"
                if "2" in name1:
                    original_file = file2
                    file_to_merge = file1
                else:
                    original_file = file1
                    file_to_merge = file2

                # 将文件对添加到字典中
                file_pairs[original_file] = file_to_merge

    print(f"找到的需要合并的文件对: {file_pairs}")

    # 第一阶段:处理需要合并的文件
    print("\n===== 开始处理需要合并的文件 =====")
    for original_file, file_to_merge in file_pairs.items():
        print(f"\n发现需要合并的文件对:")
        print(f"  主文件: {original_file}")
        print(f"  合并文件: {file_to_merge}")

        original_path = os.path.join(DATA_PATH, original_file)
        merge_path = os.path.join(DATA_PATH, file_to_merge)

        try:
            # 读取两个Excel文件
            df_original = pd.read_excel(original_path, header=None)
            df_merge = pd.read_excel(merge_path, header=None)

            # 检查表头是否匹配
            print("\n检查表头匹配情况:")
            max_header_rows = min(4, len(df_original), len(df_merge))
            header_rows = 0

            for i in range(max_header_rows):
                if df_original.iloc[i].equals(df_merge.iloc[i]):
                    header_rows += 1
                else:
                    break

            if header_rows == 0:
                print(f"  错误:{original_file}{file_to_merge} 的表头不匹配,无法合并")
                continue

            print(f"  两个文件的表头在前{header_rows}行匹配")

            # 处理第四行并确定数据起始行
            def process_fourth_row(df, file_name):
                keep_fourth = False
                if len(df) >= 4:
                    fourth_row = df.iloc[3]
                    if should_delete_fourth_row(fourth_row):
                        print(f"  {file_name}: 第四行只包含空值或'报表类型(合并报表);',已删除")
                        data_start = 5
                    else:
                        print(f"  {file_name}: 文件中第四行为数据行,无需删除")
                        keep_fourth = True
                        data_start = 4
                else:
                    print(f"  {file_name}: 文件行数少于 4,无第四行需要处理")
                    data_start = len(df) + 1

                return keep_fourth, data_start

            print("\n处理第四行:")
            original_keep_fourth, original_data_start = process_fourth_row(df_original, original_file)
            merge_keep_fourth, merge_data_start = process_fourth_row(df_merge, file_to_merge)

            # 分别处理两个文件并保存为CSV
            print("\n处理并保存主文件为CSV:")
            original_header = df_original.head(header_rows)
            if len(df_original) >= original_data_start:
                original_data = df_original[original_data_start - 1:]
                print(f"  {original_file}: 从第{original_data_start}行开始加载数据,共{len(original_data)}行")
            else:
                original_data = pd.DataFrame()
                print(f"  {original_file}: 没有数据行")
            
            # 合并表头和数据
            original_merged_df = pd.concat([original_header, original_data]).reset_index(drop=True)
            
            # 设置多级表头
            if len(original_merged_df) >= header_rows:
                new_header = original_merged_df.iloc[:header_rows].values.tolist()
                original_merged_df = original_merged_df[header_rows:]
                original_merged_df.columns = pd.MultiIndex.from_arrays(new_header)
            
            # 保存主文件为CSV
            base_name = os.path.splitext(original_file)[0]
            original_csv_name = f"{base_name}.csv"
            original_csv_path = os.path.join(CSV_PATH, original_csv_name)
            original_merged_df.to_csv(original_csv_path, index=False)
            print(f"  成功保存:{original_file} -> {original_csv_name}")
            
            print("\n处理并保存合并文件为CSV:")
            merge_header = df_merge.head(header_rows)
            if len(df_merge) >= merge_data_start:
                merge_data = df_merge[merge_data_start - 1:]
                print(f"  {file_to_merge}: 从第{merge_data_start}行开始加载数据,共{len(merge_data)}行")
            else:
                merge_data = pd.DataFrame()
                print(f"  {file_to_merge}: 没有数据行")
            
            # 合并表头和数据
            merge_merged_df = pd.concat([merge_header, merge_data]).reset_index(drop=True)
            
            # 设置多级表头
            if len(merge_merged_df) >= header_rows:
                new_header = merge_merged_df.iloc[:header_rows].values.tolist()
                merge_merged_df = merge_merged_df[header_rows:]
                merge_merged_df.columns = pd.MultiIndex.from_arrays(new_header)
            
            # 保存合并文件为CSV
            merge_base_name = os.path.splitext(file_to_merge)[0]
            merge_csv_name = f"{merge_base_name}.csv"
            merge_csv_path = os.path.join(CSV_PATH, merge_csv_name)
            merge_merged_df.to_csv(merge_csv_path, index=False)
            print(f"  成功保存:{file_to_merge} -> {merge_csv_name}")
            
            # 合并两个CSV文件
            print("\n合并两个CSV文件:")
            # 读取主CSV文件(包含表头)
            df_original_csv = pd.read_csv(original_csv_path, header=[i for i in range(header_rows)])
            # 读取合并CSV文件(跳过表头)
            df_merge_csv = pd.read_csv(merge_csv_path, header=[i for i in range(header_rows)])
            
            # 合并数据(不包含表头)
            merged_data = pd.concat([df_original_csv, df_merge_csv], ignore_index=True)
            
            # 保存合并后的CSV文件
            total_csv_name = f"{base_name}_total.csv"
            total_csv_path = os.path.join(CSV_PATH, total_csv_name)
            merged_data.to_csv(total_csv_path, index=False)
            
            total_rows = len(merged_data)
            print(f"  合并完成,总数据行数: {total_rows}")
            print(f"\n成功合并并保存:{original_file} + {file_to_merge} -> {total_csv_name}")

        except Exception as e:
            print(f"合并 {original_file}{file_to_merge} 失败:{str(e)}")

    # 第二阶段:处理剩余的单个文件
    print("\n\n===== 开始处理单个文件 =====")
    processed_files = set(file_pairs.keys()).union(set(file_pairs.values()))
    for file in os.listdir(DATA_PATH):
        if not file.endswith((".xlsx", ".xls")) or file in processed_files:
            continue

        file_path = os.path.join(DATA_PATH, file)
        csv_name = f"{os.path.splitext(file)[0]}.csv"
        csv_path = os.path.join(CSV_PATH, csv_name)

        print(f"\n处理单个文件: {file}")

        try:
            # 读取 Excel
            df = pd.read_excel(file_path, header=None)

            print(f"  成功读取文件 {file},数据有 {df.shape[0]}{df.shape[1]} 列")

            # 保留表头行
            header_rows = min(3, len(df))
            new_df = df.head(header_rows)

            # 处理第四行
            keep_fourth = False
            if len(df) >= 4:
                fourth_row = df.iloc[3]
                if should_delete_fourth_row(fourth_row):
                    print(f"  第四行只包含空值或'报表类型(合并报表);',已删除")
                    data_start = 5
                else:
                    print(f"  文件中第四行为数据行,无需删除")
                    new_df = pd.concat([new_df, pd.DataFrame(fourth_row).T])
                    keep_fourth = True
                    data_start = 4
            else:
                print(f"  文件行数少于 4,无第四行需要处理")
                data_start = len(df) + 1

            # 加载数据
            if len(df) >= data_start:
                data = df[data_start - 1:]
                new_df = pd.concat([new_df, data]).reset_index(drop=True)
                print(f"  从第{data_start}行开始加载数据,共{len(data)}行")
            else:
                print(f"  没有数据行")

            # 设置多级表头
            if len(new_df) >= header_rows:
                new_header = new_df.iloc[:header_rows].values.tolist()
                new_df = new_df[header_rows:]
                new_df.columns = pd.MultiIndex.from_arrays(new_header)

            # 保存为 CSV 文件
            new_df.to_csv(csv_path, index=False)
            print(f"  成功处理并转换:{file} -> {csv_name}")

        except Exception as e:
            print(f"  处理 {file} 失败:{str(e)}")

    print("\nExcel 处理及 CSV 转换完成\n")


# ----------------------
# 执行全流程
# ----------------------
if __name__ == "__main__":
    process_excel_to_csv()
开始处理Excel并转换为CSV...
数据路径 ./data/ 下的内容: ['跨表查询_沪深京股票2(年频).xlsx', 'STK_LISTEDCOINFOCHG[DES][xlsx].txt', 'STK_LISTEDCOINFOCHG.xlsx', '.DS_Store', '跨表查询_沪深京股票(年频).xlsx', '跨表查询_沪深京股票2(年频)[DES][.xlsx].txt', 'STK_LISTEDCOINFOANL.xlsx', '资产负债表_沪深京股票2(年频).xlsx', '常用变量查询(年度).xlsx', '资产负债表_沪深京股票(年频).xlsx', '跨表查询_沪深京股票(年频)[DES][.xlsx].txt', 'STK_LISTEDCOINFOANL[DES][xlsx].txt', '资产负债表_沪深京股票(年频)[DES][.xlsx].txt', '指标汇总.txt', '常用变量查询(年度)[DES][xlsx].txt', '资产负债表_沪深京股票2(年频)[DES][.xlsx].txt']
找到的Excel文件: ['跨表查询_沪深京股票2(年频).xlsx', 'STK_LISTEDCOINFOCHG.xlsx', '跨表查询_沪深京股票(年频).xlsx', 'STK_LISTEDCOINFOANL.xlsx', '资产负债表_沪深京股票2(年频).xlsx', '常用变量查询(年度).xlsx', '资产负债表_沪深京股票(年频).xlsx']
找到的需要合并的文件对: {'跨表查询_沪深京股票(年频).xlsx': '跨表查询_沪深京股票2(年频).xlsx', '资产负债表_沪深京股票(年频).xlsx': '资产负债表_沪深京股票2(年频).xlsx'}

===== 开始处理需要合并的文件 =====

发现需要合并的文件对:
  主文件: 跨表查询_沪深京股票(年频).xlsx
  合并文件: 跨表查询_沪深京股票2(年频).xlsx

检查表头匹配情况:
  两个文件的表头在前4行匹配

处理第四行:
  跨表查询_沪深京股票(年频).xlsx: 第四行只包含空值或'报表类型(合并报表);',已删除
  跨表查询_沪深京股票2(年频).xlsx: 第四行只包含空值或'报表类型(合并报表);',已删除

处理并保存主文件为CSV:
  跨表查询_沪深京股票(年频).xlsx: 从第5行开始加载数据,共64163行
  成功保存:跨表查询_沪深京股票(年频).xlsx -> 跨表查询_沪深京股票(年频).csv

处理并保存合并文件为CSV:
  跨表查询_沪深京股票2(年频).xlsx: 从第5行开始加载数据,共81662行
  成功保存:跨表查询_沪深京股票2(年频).xlsx -> 跨表查询_沪深京股票2(年频).csv

合并两个CSV文件:
  合并完成,总数据行数: 145825

成功合并并保存:跨表查询_沪深京股票(年频).xlsx + 跨表查询_沪深京股票2(年频).xlsx -> 跨表查询_沪深京股票(年频)_total.csv

发现需要合并的文件对:
  主文件: 资产负债表_沪深京股票(年频).xlsx
  合并文件: 资产负债表_沪深京股票2(年频).xlsx

检查表头匹配情况:
  两个文件的表头在前4行匹配

处理第四行:
  资产负债表_沪深京股票(年频).xlsx: 第四行只包含空值或'报表类型(合并报表);',已删除
  资产负债表_沪深京股票2(年频).xlsx: 第四行只包含空值或'报表类型(合并报表);',已删除

处理并保存主文件为CSV:
  资产负债表_沪深京股票(年频).xlsx: 从第5行开始加载数据,共64163行
  成功保存:资产负债表_沪深京股票(年频).xlsx -> 资产负债表_沪深京股票(年频).csv

处理并保存合并文件为CSV:
  资产负债表_沪深京股票2(年频).xlsx: 从第5行开始加载数据,共81662行
  成功保存:资产负债表_沪深京股票2(年频).xlsx -> 资产负债表_沪深京股票2(年频).csv

合并两个CSV文件:
  合并完成,总数据行数: 145825

成功合并并保存:资产负债表_沪深京股票(年频).xlsx + 资产负债表_沪深京股票2(年频).xlsx -> 资产负债表_沪深京股票(年频)_total.csv


===== 开始处理单个文件 =====

处理单个文件: STK_LISTEDCOINFOCHG.xlsx
  成功读取文件 STK_LISTEDCOINFOCHG.xlsx,数据有 160279 行 8 列
  文件中第四行为数据行,无需删除
  从第4行开始加载数据,共160276行
  成功处理并转换:STK_LISTEDCOINFOCHG.xlsx -> STK_LISTEDCOINFOCHG.csv

处理单个文件: STK_LISTEDCOINFOANL.xlsx
  成功读取文件 STK_LISTEDCOINFOANL.xlsx,数据有 64174 行 40 列
  文件中第四行为数据行,无需删除
  从第4行开始加载数据,共64171行
  成功处理并转换:STK_LISTEDCOINFOANL.xlsx -> STK_LISTEDCOINFOANL.csv

处理单个文件: 常用变量查询(年度).xlsx
  成功读取文件 常用变量查询(年度).xlsx,数据有 61459 行 33 列
  文件中第四行为数据行,无需删除
  从第4行开始加载数据,共61456行
  成功处理并转换:常用变量查询(年度).xlsx -> 常用变量查询(年度).csv

Excel 处理及 CSV 转换完成

42.1.4 3. 数据分析

42.1.4.1 问题 A:年度统计量计算


class DataPreprocessor:
    """财务数据前置处理工具,用于加载、预处理和存储关键数据"""
    def __init__(self, data_path: str = 'csv_files', output_path: str = 'output') -> None:
        self.data_path = Path(data_path)
        self.output_path = Path(output_path)
        self.output_path.mkdir(parents=True, exist_ok=True)
        self.data = {}
        self.merged_data = None  # 合并后的原始数据(含行业、年份等)
        self.metrics_data = None  # 计算后的指标数据
        
    def load_data(self, file_mapping: Optional[Dict[str, Dict]] = None) -> pd.DataFrame:
        default_files = {
            "资产负债表": {"filename": "资产负债表_沪深京股票(年频)_total.csv"},
            "利润表": {"filename": "跨表查询_沪深京股票(年频)_total.csv"},
            "公司信息": {"filename": "STK_LISTEDCOINFOANL.csv"},
            "股权结构": {"filename": "常用变量查询(年度).csv"}
        }
        files = default_files.copy()
        if file_mapping:
            for data_type, mapping in file_mapping.items():
                if data_type in files:
                    files[data_type].update(mapping)
                else:
                    files[data_type] = mapping
        
        default_column_mappings = {
            "资产负债表": {'证券代码': 'Symbol', '时间': 'Year', '行业代码D': 'IndustryCodeD',
                          '行业名称D': 'IndustryNameD', '资产总计': 'TotalAssets', 
                          '负债合计': 'TotalLiabilities', '流动负债合计': 'CurrentLiabilities', 
                          '非流动负债合计': 'NonCurrentLiabilities', '货币资金': 'Cash', 
                          '短期借款': 'ShortTermLoans', '长期借款': 'LongTermLoans'},
            "利润表": {'证券代码': 'Symbol', '时间': 'Year', '净利润': 'NetProfit', 
                      '营业收入': 'OperatingRevenue', '营业利润': 'OperatingProfit'},
            "公司信息": {'股票代码': 'Symbol', '行业代码D': 'IndustryCodeD', 
                        '行业名称D': 'IndustryNameD', '首次上市日期': 'LISTINGDATE'},  # 映射股票代码
            "股权结构": {'证券代码': 'Symbol', '时间': 'Year', 
                        '第一大股东持股比例': 'Top1', '前五大股东持股比例平方和': 'HHI5'}
        }
        
        for data_type, mapping in files.items():
            filename = mapping["filename"]
            file_path = self.data_path / filename
            column_mapping = default_column_mappings.get(data_type, {})
            if "column_mapping" in mapping:
                column_mapping.update(mapping["column_mapping"])
            
            try:
                # 股权结构数据使用特殊的读取方式,保持与代码1一致
                if data_type == "股权结构":
                    df = pd.read_csv(file_path)
                    if '证券代码' in df.iloc[0].values or 'Symbol' in df.iloc[0].values:
                        df = pd.read_csv(file_path, header=1)
                else:
                    # 其他数据类型使用代码2的读取方式
                    df = pd.read_csv(file_path, header=1, skiprows=[2])
                
                df = df.rename(columns=column_mapping)
                
                
                self.data[data_type] = self._preprocess_data(df, data_type)
                print(f"✅ 成功加载{data_type}数据,共{len(df)}行")
            except Exception as e:
                print(f"❌ 加载{data_type}数据失败: {e}")
                self.data[data_type] = None
        
        self._merge_data()
        self._validate_data()
        return self.merged_data
    
    def _preprocess_data(self, df: pd.DataFrame, data_type: str) -> pd.DataFrame:
        """预处理单个数据文件"""
        for col in df.select_dtypes(include=['object']).columns:
            if col not in ['Symbol', 'IndustryCodeD', 'IndustryNameD', 'LISTINGDATE']:
                try:
                    df[col] = pd.to_numeric(
                        df[col].astype(str).str.replace('元', '').str.replace(',', '').str.replace('--', 'nan'),
                        errors='coerce'
                    )
                except:
                    pass
                
        # 处理日期列
        if 'LISTINGDATE' in df.columns:
            df['LISTINGDATE'] = pd.to_datetime(df['LISTINGDATE'], errors='coerce')
            
        if 'Year' in df.columns:
            df['Year'] = pd.to_numeric(df['Year'], errors='coerce').astype('Int64')
            
        return df
    
    def _merge_data(self) -> None:
        if self.data["资产负债表"] is None or self.data["利润表"] is None:
            raise ValueError("资产负债表和利润表数据是必需的")
            
        self.merged_data = pd.merge(
            self.data["资产负债表"],
            self.data["利润表"][["Symbol", "Year", "NetProfit"]],
            on=["Symbol", "Year"],
            how="left"
        )
        
        # 合并公司信息表
        if self.data["公司信息"] is not None:
            company_columns = ["Symbol", "IndustryCodeD", "IndustryNameD", "LISTINGDATE"]
            available_columns = [col for col in company_columns if col in self.data["公司信息"].columns]
            if available_columns:
                company_info = self.data["公司信息"][available_columns]
                self.merged_data = pd.merge(self.merged_data, company_info, on="Symbol", how="left")
        
        if self.data["股权结构"] is not None:
            ownership_columns = ["Symbol", "Year", "Top1", "HHI5"]
            available_columns = [col for col in ownership_columns if col in self.data["股权结构"].columns]
            if available_columns:
                ownership_data = self.data["股权结构"][available_columns]
                self.merged_data = pd.merge(self.merged_data, ownership_data, on=["Symbol", "Year"], how="left")
                
    
                
        # 计算Age指标(基于精确日期差值,只保留2000年后的数据)
        if self.data["公司信息"] is not None and "LISTINGDATE" in self.merged_data.columns:
            if not self.merged_data["LISTINGDATE"].isna().all():
                # 提取数据中的最大年份作为最晚统计截止日期
                max_year = self.merged_data["Year"].max()
                if pd.notna(max_year):
                    latest_date = pd.Timestamp(f"{max_year}-12-31")
                    
                    # 为每个公司生成从上市到最晚统计截止日期的所有年份数据
                    all_years_data = []
                    for _, row in self.merged_data.drop_duplicates(subset=['Symbol']).iterrows():
                        if pd.notna(row['LISTINGDATE']):
                            listing_year = row['LISTINGDATE'].year
                            
                            # 只生成2000年及以后的数据
                            start_year = max(2000, listing_year)
                            
                            for year in range(start_year, max_year + 1):
                                year_date = pd.Timestamp(f"{year}-12-31")
                                days_diff = (year_date - row['LISTINGDATE']).days
                                age = round(days_diff / 365.25, 2)
                                
                                all_years_data.append({
                                    'Symbol': row['Symbol'],
                                    'Year': year,
                                    'Age': age
                                })
                    
                    # 创建包含所有年份的Age DataFrame
                    age_df = pd.DataFrame(all_years_data)
                    
                    # 将Age数据合并回主数据
                    self.merged_data = pd.merge(
                        self.merged_data,
                        age_df,
                        on=['Symbol', 'Year'],
                        how='left'
                    )
                    
                    # 只保留2000年及以后的数据
                    self.merged_data = self.merged_data[self.merged_data['Year'] >= 2000]
                    
                    # 计算平均Age
                    valid_ages = self.merged_data['Age'].dropna()
                    if not valid_ages.empty:
                        max_age = valid_ages.max()
                        print(f"✅ 2000-{max_year}年公司最大上市年限为: {max_age:.2f}年")
                    else:
                        print("⚠️ 无法计算平均上市年限:没有有效数据")
                else:
                    print("⚠️ 无法确定最晚统计截止日期:年份数据缺失")
            else:
                print("⚠️ 公司信息表中LISTINGDATE全部缺失,无法计算Age指标")
                self.merged_data.drop(columns=["LISTINGDATE"], inplace=True, errors="ignore")
        else:
            print("⚠️ 未加载公司信息表或缺少LISTINGDATE字段,无法计算Age指标")

    def _validate_data(self) -> None:
        """验证合并后的数据质量"""
        if self.merged_data is None:
            return
        
        # 检查关键指标缺失率
        key_metrics = ["TotalAssets", "TotalLiabilities", "NetProfit", "Age", "Top1", "HHI5"]
        for metric in key_metrics:
            if metric in self.merged_data.columns:
                missing_rate = self.merged_data[metric].isna().mean() * 100
                print(f"{metric}缺失率: {missing_rate:.2f}%")
    
    def save_merged_data(self, filename: str = "merged_data.csv") -> None:
        if self.merged_data is not None:
            file_path = self.output_path / filename
            self.merged_data.to_csv(file_path, index=False)
            print(f"✅ 合并数据已保存至: {file_path}")
    
    def save_metrics_data(self, filename: str = "metrics_data.csv") -> None:
        if self.metrics_data is not None:
            file_path = self.output_path / filename
            self.metrics_data.to_csv(file_path, index=False)
            print(f"✅ 指标数据已保存至: {file_path}")

### 指标计算与分析(保存指标数据)
class FinancialAnalyzer:
    """财务指标计算与分析工具"""
    def __init__(self, data: pd.DataFrame, output_path: str = 'output') -> None:
        self.data = data
        self.output_path = Path(output_path)
        self.output_path.mkdir(parents=True, exist_ok=True)
        self.metrics_data = None
        
    def calculate_metrics(self) -> pd.DataFrame:
        if self.data is None or len(self.data) == 0:
            raise ValueError("请提供有效的DataFrame")
            
        df = self.data.copy()
        
        # 指标计算函数
        def calculate_lev(df):
            if "TotalLiabilities" in df.columns and "TotalAssets" in df.columns:
                df["Lev"] = df["TotalLiabilities"] / df["TotalAssets"]
                return True
            return False
            
        def _calculate_ll(df):
            if "NonCurrentLiabilities" in df.columns and "TotalAssets" in df.columns:
                df["LL"] = df["NonCurrentLiabilities"] / df["TotalAssets"]
                return True
            return False
            
        def _calculate_sl(df):
            if "CurrentLiabilities" in df.columns and "TotalAssets" in df.columns:
                df["SL"] = df["CurrentLiabilities"] / df["TotalAssets"]
                return True
            return False
            
        def _calculate_sdr(df):
            if "CurrentLiabilities" in df.columns and "TotalLiabilities" in df.columns:
                df["SDR"] = df["CurrentLiabilities"] / df["TotalLiabilities"]
                return True
            return False
            
        def _calculate_cash(df):
            if "Cash" in df.columns and "TotalAssets" in df.columns:
                df["Cash"] = df["Cash"] / df["TotalAssets"]
                return True
            return False
            
        def _calculate_roa(df):
            if "NetProfit" in df.columns and "TotalAssets" in df.columns:
                df["ROA"] = df["NetProfit"] / df["TotalAssets"]
                return True
            return False
            
        def _calculate_roe(df):
            if "NetProfit" in df.columns and "TotalAssets" in df.columns and "TotalLiabilities" in df.columns:
                equity = df["TotalAssets"] - df["TotalLiabilities"]
                valid_mask = (equity > 0) & (~equity.isna())
                df.loc[valid_mask, "ROE"] = df.loc[valid_mask, "NetProfit"] / equity[valid_mask]
                return valid_mask.sum() > 0
            return False
            
        def _calculate_sloan(df):
            if "ShortTermLoans" in df.columns and "TotalAssets" in df.columns:
                df["SLoan"] = df["ShortTermLoans"] / df["TotalAssets"]
                return True
            return False
            
        def _calculate_lloan(df):
            if "LongTermLoans" in df.columns and "TotalAssets" in df.columns:
                df["LLoan"] = df["LongTermLoans"] / df["TotalAssets"]
                return True
            return False
            
        def _calculate_size(df):
            if "TotalAssets" in df.columns:
                df["Size"] = np.log(df["TotalAssets"])
                return True
            return False
            
        # 执行指标计算
        metrics_status = {
            "Lev": calculate_lev(df),
            "SL": _calculate_sl(df),
            "LL": _calculate_ll(df),
            "SDR": _calculate_sdr(df),
            "Cash": _calculate_cash(df),
            "ROA": _calculate_roa(df),
            "ROE": _calculate_roe(df),
            "SLoan": _calculate_sloan(df),
            "LLoan": _calculate_lloan(df),
            "Top1": "Top1" in df.columns,
            "HHI5": "HHI5" in df.columns,
            "Size": _calculate_size(df),
            "Age": "Age" in df.columns  # 添加Age指标
        }
        
        # 处理离群值
        available_metrics = [metric for metric, status in metrics_status.items() if status]
        for col in available_metrics:
            if col in df.columns and not df[col].isna().all() and not col.startswith("Age"):
                df[col] = df.groupby('Year')[col].transform(lambda x: x.clip(x.quantile(0.01), x.quantile(0.99)))
                print(f"✅ {col}指标已按年份动态缩尾处理(1%-99%)")
        
        self.metrics_data = df
        print(f"财务指标计算完成,成功计算了{len(available_metrics)}个指标")
        
        return df
    
    # 辅助指标计算函数(保持不变)
    def _calculate_sl(self, df):
        if "CurrentLiabilities" in df.columns and "TotalAssets" in df.columns:
            df["SL"] = df["CurrentLiabilities"] / df["TotalAssets"]
            return True
        return False
    
    # 其他辅助函数不变
    
    def generate_analysis_report(self, metrics: List[str] = None) -> None:
        if self.metrics_data is None:
            self.calculate_metrics()
            
        report_intro = """
数据处理说明:
1. 所有财务指标均经过1%-99%分位缩尾处理,以剔除极端值影响;
2. ROE计算时剔除了净资产为零或负值的企业,具体剔除比例见各年度统计;
3. 上市年龄(Age)基于首次上市日期(LISTINGDATE)计算,精确到天,保留两位小数;
4. 分析数据范围为2000年及以后的公司信息。
        \n"""
        
        if metrics is None:
            metrics = ["Lev", "SL", "LL", "SDR", "Cash", "ROA", "ROE", "SLoan", "LLoan", "Size"]
            if "Top1" in self.metrics_data.columns:
                metrics.append("Top1")
            if "HHI5" in self.metrics_data.columns:
                metrics.append("HHI5")
            
                
        report_path = self.output_path / "analysis_report.txt"
        with open(report_path, "w", encoding="utf-8") as f:
            f.write("财务指标分析报告\n")
            f.write("=" * 50 + "\n")
            f.write(report_intro.lstrip())  # 左对齐
            f.write("\n一、数据集概述\n")
            f.write("=" * 50 + "\n\n")
            f.write(f"分析期间: {self.metrics_data['Year'].min()} - {self.metrics_data['Year'].max()}\n")
            f.write(f"公司数量: {self.metrics_data['Symbol'].nunique()}\n")
            f.write(f"观测值总数: {len(self.metrics_data)}\n\n")
            
            f.write("二、各指标年度统计特征\n")
            f.write("=" * 50 + "\n\n")
            
            # 1. 常规指标统计(纵向年份,横向指标)
            f.write("1. 财务指标年度统计(纵向年份,横向指标)\n")
            f.write("-" * 50 + "\n\n")
            
            # 提取所有年份
            years = sorted(self.metrics_data["Year"].dropna().unique())
            
            # 计算每个指标的统计量
            for metric in metrics:
                if metric in self.metrics_data.columns and not self.metrics_data[metric].isna().all():
                    # 按年份计算统计量
                    if metric == "ROE":
                        # 特殊处理ROE(显示剔除比例)
                        yearly_stats = self.metrics_data.groupby("Year")[metric].agg(["mean", "median", "std", "min", "max"])
                        f.write(f"{metric}的年度统计特征(剔除{self.metrics_data[metric].isna().sum()}条无效记录):\n")
                    else:
                        yearly_stats = self.metrics_data.groupby("Year")[metric].agg(["mean", "median", "std", "min", "max"])
                        f.write(f"{metric}的年度统计特征:\n")
                    
                    # 转置表格,使年份为列,统计量为行
                    transposed_stats = yearly_stats.T
                    f.write(transposed_stats.to_string() + "\n\n")
                    
                    # 趋势分析
                    if "mean" in yearly_stats.columns:
                        trend = yearly_stats["mean"].pct_change().mean()
                        f.write(f"趋势分析: 总体{'上升' if trend > 0 else '下降'},年平均变化率为{trend*100:.2f}%\n\n")
                else:
                    f.write(f"{metric}指标不可用或全部为NaN\n\n")
            
            # 2. Age指标统计(纵向年份,横向指标)
            
            if "Age" in self.metrics_data.columns and not self.metrics_data["Age"].isna().all():
                # 创建一个DataFrame来保存各年份Age的统计量
                age_stats_df = self.metrics_data.groupby("Year")["Age"].describe()
                
                if not age_stats_df.empty:
                    # 转置表格,使年份为列,统计量为行
                    transposed_age_stats = age_stats_df.T
                    f.write("Age的年度统计特征:\n")
                    # 去掉count这一行
                    transposed_age_stats_no_count = transposed_age_stats.drop(index='count', errors='ignore')
                    f.write(transposed_age_stats_no_count.to_string() + "\n\n")
                else:
                    f.write("⚠️ 无有效Age数据\n\n")
            else:
                f.write("⚠️ 未计算Age指标或数据全部缺失\n\n")
                    
            f.write("三、指标分析总结\n")
            f.write("=" * 50 + "\n\n")
            
            # 资本结构指标总结
            f.write("1. 资本结构指标:\n")
            if "Lev" in self.metrics_data.columns:
                avg_lev = self.metrics_data["Lev"].mean()
                f.write(f"   - 总负债率(Lev)平均为{avg_lev:.4f},表明企业平均使用{avg_lev*100:.2f}%的负债融资\n")
                
            if "SL" in self.metrics_data.columns and "LL" in self.metrics_data.columns:
                avg_sl = self.metrics_data["SL"].mean()
                avg_ll = self.metrics_data["LL"].mean()
                f.write(f"   - 流动负债率(SL)平均为{avg_sl:.4f},长期负债率(LL)平均为{avg_ll:.4f},表明企业短期负债占比较高\n")
                
            if "SDR" in self.metrics_data.columns:
                avg_sdr = self.metrics_data["SDR"].mean()
                f.write(f"   - 短债比率(SDR)平均为{avg_sdr:.4f},表明{avg_sdr*100:.2f}%的负债为短期负债\n")
            
            # 盈利能力指标总结
            f.write("\n2. 盈利能力指标:\n")
            if "ROA" in self.metrics_data.columns:
                avg_roa = self.metrics_data["ROA"].mean()
                f.write(f"   - 资产回报率(ROA)平均为{avg_roa:.4f},表明企业资产运营效率一般\n")
                
            if "ROE" in self.metrics_data.columns:
                avg_roe = self.metrics_data["ROE"].mean()
                f.write(f"   - 净资产收益率(ROE)平均为{avg_roe:.4f},表明股东权益回报水平一般\n")
                
            # 流动性指标总结
            f.write("\n3. 流动性指标:\n")
            if "Cash" in self.metrics_data.columns:
                avg_cash = self.metrics_data["Cash"].mean()
                f.write(f"   - 现金比率(Cash)平均为{avg_cash:.4f},表明企业流动性储备水平一般\n")
                
            # 银行借款指标总结
            f.write("\n4. 银行借款指标:\n")
            if "SLoan" in self.metrics_data.columns and "LLoan" in self.metrics_data.columns:
                avg_sloan = self.metrics_data["SLoan"].mean()
                avg_lloan = self.metrics_data["LLoan"].mean()
                f.write(f"   - 短期银行借款比率(SLoan)平均为{avg_sloan:.4f},长期银行借款比率(LLoan)平均为{avg_lloan:.4f}\n")
                
            # 股权结构指标总结
            f.write("\n5. 股权结构指标:\n")
            if "Top1" in self.metrics_data.columns:
                avg_top1 = self.metrics_data["Top1"].mean()
                f.write(f"   - 第一大股东持股比例(Top1)平均为{avg_top1:.4f},表明股权集中度较高\n")
                
            if "HHI5" in self.metrics_data.columns:
                avg_hhi5 = self.metrics_data["HHI5"].mean()
                f.write(f"   - 前五大股东赫芬达尔指数(HHI5)平均为{avg_hhi5:.4f},表明前五大股东集中度较高\n")
                
            # 公司特征指标总结
            f.write("\n6. 公司特征指标:\n")
            if "Size" in self.metrics_data.columns:
                avg_size = self.metrics_data["Size"].mean()
                avg_assets = np.exp(avg_size)
                f.write(f"   - 公司规模(Size)平均为{avg_size:.4f},对应的平均总资产为{avg_assets:.2e}\n")
                
            # 上市年限总结
            if "Age" in self.metrics_data.columns and not self.metrics_data["Age"].isna().all():
                f.write("\n7. 上市年限总结:\n")
                avg_age = self.metrics_data["Age"].mean()
                f.write(f"   - 样本公司平均上市年限为{avg_age:.2f}\n")
                
                # 分析上市年限趋势
                age_trend = self.metrics_data.groupby("Year")["Age"].mean()
                if len(age_trend) > 1:
                    trend_change = age_trend.pct_change().mean()
                    f.write(f"   - 上市年限年平均变化率为{trend_change*100:.2f}%{',呈上升趋势' if trend_change > 0 else ',呈下降趋势'}\n")
        
        print(f"✅ 分析报告已生成: {report_path}")

### 执行分析(仅保存必要文件)
if __name__ == "__main__":
    custom_file_mapping = {
        "股权结构": {
            "filename": "常用变量查询(年度).csv",
            "column_mapping": {
                "Stkcd": "Symbol",
                "accper": "Year",
                "Shrcr1": "Top1",
                "Shrhfd5": "HHI5"
            }
        }
    }
    
    try:
        print("\n===== 数据加载与预处理 =====")
        preprocessor = DataPreprocessor(output_path="csv_files")
        data = preprocessor.load_data(custom_file_mapping)
        
        if data is None:
            print("❌ 数据加载失败")
            exit()
            
        preprocessor.save_merged_data()
        analyzer = FinancialAnalyzer(data, output_path="output")
        analyzer.calculate_metrics()
        preprocessor.metrics_data = analyzer.metrics_data
        preprocessor.save_metrics_data()
        analyzer.generate_analysis_report()
        
        print("\n===== 分析完成 =====")
        print("已保存文件:")
        print(f"  1. {preprocessor.output_path / 'merged_data.csv'}")
        print(f"  2. {preprocessor.output_path / 'metrics_data.csv'}")
        print(f"  3. {analyzer.output_path / 'analysis_report.txt'}")

    except Exception as e:
        print(f"❌ 错误: {str(e)}")
        import traceback
        traceback.print_exc()

===== 数据加载与预处理 =====
✅ 成功加载资产负债表数据,共145826行
✅ 成功加载利润表数据,共145826行
✅ 成功加载公司信息数据,共64172行
✅ 成功加载股权结构数据,共61459行
✅ 2000-2024年公司最大上市年限为: 34.06年
TotalAssets缺失率: 29.68%
TotalLiabilities缺失率: 29.68%
NetProfit缺失率: 29.68%
Age缺失率: 29.24%
Top1缺失率: 59.42%
HHI5缺失率: 59.42%
✅ 合并数据已保存至: csv_files/merged_data.csv
✅ Lev指标已按年份动态缩尾处理(1%-99%)
✅ SL指标已按年份动态缩尾处理(1%-99%)
✅ LL指标已按年份动态缩尾处理(1%-99%)
✅ SDR指标已按年份动态缩尾处理(1%-99%)
✅ Cash指标已按年份动态缩尾处理(1%-99%)
✅ ROA指标已按年份动态缩尾处理(1%-99%)
✅ ROE指标已按年份动态缩尾处理(1%-99%)
✅ SLoan指标已按年份动态缩尾处理(1%-99%)
✅ LLoan指标已按年份动态缩尾处理(1%-99%)
✅ Top1指标已按年份动态缩尾处理(1%-99%)
✅ HHI5指标已按年份动态缩尾处理(1%-99%)
✅ Size指标已按年份动态缩尾处理(1%-99%)
财务指标计算完成,成功计算了13个指标
✅ 指标数据已保存至: csv_files/metrics_data.csv
✅ 分析报告已生成: output/analysis_report.txt

===== 分析完成 =====
已保存文件:
  1. csv_files/merged_data.csv
  2. csv_files/metrics_data.csv
  3. output/analysis_report.txt

问题 B:时序图绘制

B1. 横轴为 年份,纵轴为 Lev 的均值和中位数。
B2. 横轴为 年份,纵轴为 ROA 和 Cash 的均值 (若有必要可以考虑使用两个纵坐标)。

# 设置图片清晰度
plt.rcParams['figure.dpi'] = 300

# 读取数据
metrics_data = pd.read_csv(Path("csv_files/metrics_data.csv"))
merged_data = pd.read_csv(Path("csv_files/merged_data.csv"))

### B1. Lev的均值和中位数时序图
lev_stats = metrics_data.groupby("Year")["Lev"].agg(["mean", "median"]).reset_index()

plt.figure(figsize=(12, 6))
plt.plot(lev_stats["Year"], lev_stats["mean"], marker='o', label='均值')
plt.plot(lev_stats["Year"], lev_stats["median"], marker='s', label='中位数')
plt.title("Lev的均值和中位数时序图")
plt.xlabel("年份")
plt.ylabel("Lev")
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(Path("output/lev_timeseries.png"))
plt.show()

print("\nB1分析:")
print(f"Lev均值范围: {lev_stats['mean'].min():.4f} - {lev_stats['mean'].max():.4f}")
print(f"Lev中位数范围: {lev_stats['median'].min():.4f} - {lev_stats['median'].max():.4f}")
print(f"趋势变化: {lev_stats['mean'].pct_change().dropna().mean()*100:.2f}%/年")

### B2. ROA和Cash的均值时序图
roa_cash_stats = metrics_data.groupby("Year")[["ROA", "Cash"]].mean().reset_index()

fig, ax1 = plt.subplots(figsize=(12, 6))

# ROA使用左侧Y轴
color = 'tab:blue'
ax1.set_xlabel('年份')
ax1.set_ylabel('ROA', color=color)
ax1.plot(roa_cash_stats["Year"], roa_cash_stats["ROA"], marker='o', color=color, label='ROA')
ax1.tick_params(axis='y', labelcolor=color)

# Cash使用右侧Y轴
ax2 = ax1.twinx()
color = 'tab:red'
ax2.set_ylabel('Cash', color=color)
ax2.plot(roa_cash_stats["Year"], roa_cash_stats["Cash"], marker='s', color=color, label='Cash')
ax2.tick_params(axis='y', labelcolor=color)

plt.title("ROA和Cash的均值时序图")
plt.grid(True, linestyle='--', alpha=0.7)
fig.tight_layout()
plt.savefig(Path("output/roa_cash_timeseries.png"))
plt.show()

print("\nB2分析:")
print(f"ROA均值范围: {roa_cash_stats['ROA'].min():.4f} - {roa_cash_stats['ROA'].max():.4f}")
print(f"Cash均值范围: {roa_cash_stats['Cash'].min():.4f} - {roa_cash_stats['Cash'].max():.4f}")
print(f"ROA与Cash相关系数: {roa_cash_stats['ROA'].corr(roa_cash_stats['Cash']):.4f}")


B1分析:
Lev均值范围: 0.4299 - 0.5897
Lev中位数范围: 0.4156 - 0.5433
趋势变化: 0.44%/年


B2分析:
ROA均值范围: -0.0019 - 0.0482
Cash均值范围: 0.1385 - 0.2307
ROA与Cash相关系数: 0.6218

问题 C:负债率的行业特征分析


# 配置输出路径
output_path = Path("output")
output_path.mkdir(exist_ok=True, parents=True)
csv_path = Path("csv_files")
csv_path.mkdir(exist_ok=True, parents=True)

### 1. 数据加载与清洗
def load_and_clean_data(merged_data_path: str = "csv_files/merged_data.csv") -> pd.DataFrame:
    """加载合并数据并执行清洗"""
    try:
        merged_data = pd.read_csv(merged_data_path)
    except FileNotFoundError:
        raise FileNotFoundError("请先运行数据预处理,生成merged_data.csv")
    
    # 数据清洗
    merged_data = merged_data.drop_duplicates()  # 去重
    merged_data = merged_data[merged_data['Year'] >= 2000]  # 筛选2000年后数据
    merged_data = merged_data.reset_index(drop=True)
    
    # 检查关键财务字段
    financial_cols = ['TotalAssets', 'TotalLiabilities', 'NetProfit', 'ShortTermLoans', 'LongTermLoans']
    for col in financial_cols:
        if col not in merged_data.columns:
            raise ValueError(f"缺失财务字段: {col}")
    
    return merged_data

### 2. 指标计算函数
def calculate_financial_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """计算财务指标"""
    # 负债率
    df['Lev'] = df['TotalLiabilities'] / df['TotalAssets']
    
    # 短期/长期借款比率
    df['SLoan'] = df['ShortTermLoans'] / df['TotalAssets']
    df['LLoan'] = df['LongTermLoans'] / df['TotalAssets']
    
    # 资产回报率 (ROA)
    df['ROA'] = df['NetProfit'] / df['TotalAssets']
    
    # 净资产回报率 (ROE),处理分母为0的情况
    mask = (df['TotalAssets'] - df['TotalLiabilities']) != 0
    df['ROE'] = np.nan
    df.loc[mask, 'ROE'] = df.loc[mask, 'NetProfit'] / (df.loc[mask, 'TotalAssets'] - df.loc[mask, 'TotalLiabilities'])
    
    # 计算Cash(使用实际数据中的Cash字段)
    df["Cash"] = df["Cash"] / df["TotalAssets"]  # 计算现金占总资产比例
    
    # 处理异常值(缩尾处理),增加Cash
    for col in ['Lev', 'SLoan', 'LLoan', 'ROA', 'ROE', 'Cash']:
        if col in df.columns:
            p1 = df[col].quantile(0.01)
            p99 = df[col].quantile(0.99)
            df[col] = df[col].clip(p1, p99)
            
    return df

### 3. 行业数据筛选与映射
def filter_industry_data(
    df: pd.DataFrame,
    target_industries: dict = {
        'C': '制造业',
        'D': '电力、热力、燃气及水生产和供应业',
        'G': '交通运输业',
        'E': '建筑业',
        'K': '房地产业',
        'F': '批发和零售业',
        'J': '金融业'
    },
    industry_col: str = 'IndustryCodeD'  # 可切换为'IndustryNameD'
) -> pd.DataFrame:
    """筛选目标行业数据"""
    # 处理行业代码映射(示例:J66 -> J)
    if industry_col == 'IndustryCodeD':
        df[industry_col] = df[industry_col].astype(str).str[0]  # 取代码首字母
    
    # 筛选有效行业
    df = df[df[industry_col].isin(target_industries.keys())].copy()
    df['IndustryName'] = df[industry_col].map(target_industries)  # 映射行业名称
    
    # 检查数据完整性
    if df.empty:
        raise ValueError("未找到目标行业数据,请检查行业代码/名称映射")
    
    return df

### 4. 绘制时序图(增强版,含数据校验)
def plot_industry_lev(
    df: pd.DataFrame,
    title: str,
    save_path: str,
    weighted: bool = False
) -> None:
    """绘制行业负债率时序图(增强数据校验)"""
    group_col = 'IndustryName'
    
    # 基础校验
    if 'Lev' not in df.columns:
        raise ValueError("'Lev'列不存在,请确认指标计算完成")
    
    # 加权模式特殊校验
    if weighted:
        if 'TotalAssets' not in df.columns:
            raise ValueError("加权模式需要'TotalAssets'列,请检查数据")
        valid_data = df[df['TotalAssets'] > 0].copy()  # 过滤总资产≤0的记录
        if valid_data.empty:
            print(f"警告:{title}中,所有公司总资产为0或缺失,无法生成图表")
            return
    else:
        valid_data = df.copy()
    
    # 计算统计量
    try:
        if weighted:
            lev_stats = valid_data.groupby(['Year', group_col]).apply(
                lambda x: np.average(x['Lev'], weights=x['TotalAssets'])
            ).unstack()
        else:
            lev_stats = valid_data.groupby(['Year', group_col])['Lev'].mean().unstack()
    except Exception as e:
        print(f"计算错误: {str(e)}")
        return
    
    # 检查结果是否有效
    if lev_stats.empty or lev_stats.isna().all().all():
        print(f"警告:{title}中无有效数据")
        return
    
    # **定义标记形状和颜色**
    markers = ['o', '^', 's', 'd', 'p', 'v', '8', 'h', 'H', 'D']  # 扩展更多形状
    colors = ['#165DFF', "#025F2A", '#722ED1', '#FF7D00', '#F53F3F', "#4AAAA8", "#EB0091", '#00FF00', '#FF00FF', '#00FFFF']
    
    # 绘图
    plt.figure(figsize=(14, 8))
    for i, industry in enumerate(lev_stats.columns):
        series = lev_stats[industry].dropna()
        if len(series) == 0:
            continue
        # 动态分配标记和颜色
        marker = markers[i % len(markers)]
        color = colors[i % len(colors)]  # 若行业数超过颜色数,自动循环
        plt.plot(
            series.index, series.values,
            marker=marker,
            markerfacecolor='none',  # 空心标记
            markeredgecolor=color,    # 标记边框颜色与线条一致
            markeredgewidth=1.5,      # 边框宽度
            markersize=6,             # 标记大小
            linewidth=2, 
            label=industry, 
            color=color
        )
    
    # 设置图例
    plt.legend(
        fontsize=10, 
        loc='upper left', 
        title='行业', 
        title_fontsize=11,
        frameon=True,  # 显示图例边框(可选)
        ncol=2  # 图例分两列显示(可选)
    )
    
    # 检查绘图元素(原有逻辑)
    if not plt.gca().lines:
        print(f"警告:{title}中无有效数据点")
        return
    
    plt.title(title, fontsize=14)
    plt.xlabel('年份', fontsize=12)
    plt.ylabel('负债率 (Lev)' + ('(加权平均)' if weighted else '(算数平均)'), fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend(fontsize=10, loc='upper left')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(output_path / save_path)
    plt.show()
    print(f"✅ {title} 已保存至 {output_path / save_path}")
    plt.close()

### 5. 统计分析与结果保存
def analyze_and_save_metrics(
    df: pd.DataFrame,
    selected_years: list = [2001, 2003, 2005, 2007, 2009, 2011, 2013, 2015, 2017, 2019, 2021, 2023]
) -> None:
    """分析特定年份指标并保存结果"""
    # 筛选年份
    selected_data = df[df['Year'].isin(selected_years)]
    
    # 检查指标列完整性
    required_metrics = ['SLoan', 'LLoan', 'Lev', 'ROA', 'ROE', 'Cash']
    for metric in required_metrics:
        if metric not in selected_data.columns:
            print(f"警告:缺失指标列 {metric},跳过保存")
            return
    
    # 计算行业-年份均值
    metrics = selected_data.groupby(['Year', 'IndustryName']).agg({
        'SLoan': 'mean',
        'LLoan': 'mean',
        'Lev': 'mean',
        'ROA': 'mean',
        'ROE': 'mean',
        'Cash': 'mean'
    }).reset_index()
    
    # 保存结果
    metrics.to_csv(csv_path / "industry_metrics_by_year.csv", index=False)
    print(f"✅ 行业指标已保存至: {csv_path / 'industry_metrics_by_year.csv'}")
    
    return metrics

### 6. 保存行业表格数据
def save_tables_to_txt(metrics_data, target_industries, output_file: str = "industry_metrics.txt"):
    txt_path = output_path / output_file
    with open(txt_path, "w", encoding="utf-8") as f:
        for industry_code, industry_name in target_industries.items():
            industry_df = metrics_data[metrics_data['IndustryName'] == industry_name]
            if industry_df.empty:
                continue
            
            # 格式化数据(Year保持整数,其他指标两位小数,转换为字符串并补空格)
            formatted_df = industry_df.copy()
            # 年份列:宽度4,右对齐
            formatted_df['Year'] = formatted_df['Year'].astype(int).apply(lambda x: f"{x:4d}")  
            # 指标列:宽度8,右对齐,保留两位小数
            for col in ['SLoan', 'LLoan', 'Lev', 'ROA', 'ROE', 'Cash']:
                formatted_df[col] = formatted_df[col].map(lambda x: f"{x:8.2f}")
            
            # 转置表格
            transposed_df = formatted_df.set_index('Year').T
            
            # 调整列顺序(Cash放最后)
            required_metrics = ['SLoan', 'LLoan', 'Lev', 'ROA', 'ROE', 'Cash']
            available_metrics = [m for m in required_metrics if m in transposed_df.index]
            if available_metrics:
                transposed_df = transposed_df.loc[available_metrics]
            
            # 构建表头(指标名称,宽度8,右对齐)
            header = "    ".join(["指标"] + list(formatted_df['Year'].values))
            # 构建表格内容
            table_title = f"\n===== {industry_name}({industry_code}) 财务指标 =====\n"
            table_content = "\n".join([f"{row.name}{''.join(row.values)}" for _, row in transposed_df.iterrows()])
            
            f.write(table_title)
            f.write(header + "\n")
            f.write(table_content + "\n\n")
    
    print(f"✅ 表格已保存至: {txt_path},使用等宽字体查看可确保对齐")

### 在图表下方展示表格和分析结论
def display_metrics_tables(metrics_data, target_industries):
    for industry_code, industry_name in target_industries.items():
        if industry_name in metrics_data['IndustryName'].values:
            industry_df = metrics_data[metrics_data['IndustryName'] == industry_name].copy()
            
            # 格式化数据(Year保持整数,其他指标两位小数)
            industry_df['Year'] = industry_df['Year'].astype(int)  # 新增:确保年份为整数
            for col in ['SLoan', 'LLoan', 'Lev', 'ROA', 'ROE', 'Cash']:
                industry_df[col] = industry_df[col].map(lambda x: f"{x:.2f}")
            
            # 转置表格并调整列顺序
            transposed_df = industry_df.set_index('Year').T
            cols = ['SLoan', 'LLoan', 'Lev', 'ROA', 'ROE', 'Cash']
            if set(cols).issubset(transposed_df.index):
                transposed_df = transposed_df.loc[cols]
            
            print(f"\n\n===== {industry_name}({industry_code}) 财务指标 =====")
            print(transposed_df.to_string(na_rep='nan'))
   
            
    
### 主函数执行
if __name__ == "__main__":
    try:
        # 1. 加载并清洗数据
        print("===== 加载并清洗数据 =====")
        merged_data = load_and_clean_data()
        
        # 2. 计算财务指标
        print("\n===== 计算财务指标 =====")
        metrics_data = calculate_financial_metrics(merged_data)
        
        # 3. 筛选目标行业数据
        print("\n===== 筛选目标行业 =====")
        target_industries = {
            'C': '制造业',
            'D': '电力、热力、燃气及水生产和供应业',
            'G': '交通运输业',
            'E': '建筑业',
            'K': '房地产业',
            'F': '批发和零售业',
            'J': '金融业'
        }
        industry_data = filter_industry_data(metrics_data, target_industries, industry_col='IndustryCodeD')
        
        # 4. 绘制算数平均负债率时序图
        print("\n===== 绘制算数平均负债率时序图 =====")
        plot_industry_lev(industry_data, "各行业算数平均负债率时序图", "industry_lev_arith.png")
        
        # 5. 绘制加权平均负债率时序图
        print("\n===== 绘制加权平均负债率时序图 =====")
        plot_industry_lev(industry_data, "各行业加权平均负债率时序图", "industry_lev_weighted.png", weighted=True)
        
        # 6. 分析特定年份指标并保存结果
        print("\n===== 分析特定年份指标 =====")
        metrics = analyze_and_save_metrics(industry_data)
        save_tables_to_txt(metrics, target_industries, output_file="industry_metrics.txt") 
        
        # 7. 展示表格和分析结论
        print("\n===== 展示行业财务指标表格和分析结论 =====")
        display_metrics_tables(metrics, target_industries)
        
        print("\n===== 任务执行完成 =====")
        print(f"结果文件已保存至: {output_path}")
        
    except Exception as e:
        print(f"\n❌ 执行失败: {str(e)}")
        import traceback
        traceback.print_exc()
===== 加载并清洗数据 =====

===== 计算财务指标 =====

===== 筛选目标行业 =====

===== 绘制算数平均负债率时序图 =====

✅ 各行业算数平均负债率时序图 已保存至 output/industry_lev_arith.png

===== 绘制加权平均负债率时序图 =====

✅ 各行业加权平均负债率时序图 已保存至 output/industry_lev_weighted.png

===== 分析特定年份指标 =====
✅ 行业指标已保存至: csv_files/industry_metrics_by_year.csv
✅ 表格已保存至: output/industry_metrics.txt,使用等宽字体查看可确保对齐

===== 展示行业财务指标表格和分析结论 =====


===== 制造业(C) 财务指标 =====
Year   2001  2003   2005  2007  2009  2011  2013  2015  2017  2019  2021  2023
SLoan  0.18  0.19   0.20  0.18  0.15  0.12  0.12  0.10  0.12  0.12  0.10  0.09
LLoan  0.05  0.05   0.05  0.04  0.05  0.04  0.03  0.03  0.06  0.06  0.06  0.07
Lev    0.45  0.49   0.54  0.53  0.50  0.43  0.43  0.41  0.40  0.42  0.41  0.40
ROA    0.02  0.02  -0.00  0.04  0.03  0.04  0.03  0.03  0.05  0.02  0.04  0.02
ROE    0.03  0.03  -0.01  0.08  0.05  0.06  0.05  0.04  0.08  0.03  0.06  0.02
Cash   0.18  0.16   0.14  0.15  0.20  0.24  0.18  0.17  0.17  0.16  0.18  0.20


===== 电力、热力、燃气及水生产和供应业(D) 财务指标 =====
Year   2001  2003  2005  2007  2009  2011  2013  2015  2017  2019  2021  2023
SLoan  0.13  0.16  0.16  0.17  0.15  0.12  0.11  0.10  0.12  0.11  0.09  0.08
LLoan  0.07  0.10  0.13  0.14  0.17  0.16  0.14  0.13  0.17  0.17  0.18  0.20
Lev    0.38  0.46  0.52  0.53  0.59  0.58  0.55  0.53  0.52  0.54  0.56  0.54
ROA    0.04  0.02  0.02  0.03  0.01  0.03  0.03  0.03  0.03  0.01  0.01  0.03
ROE    0.06  0.05  0.04  0.08  0.03  0.04  0.05  0.07  0.05  0.00  0.01  0.07
Cash   0.15  0.13  0.10  0.09  0.10  0.12  0.14  0.13  0.13  0.10  0.11  0.12


===== 交通运输业(G) 财务指标 =====
Year   2001  2003  2005  2007  2009  2011  2013  2015  2017  2019  2021  2023
SLoan  0.11  0.13  0.12  0.12  0.10  0.08  0.08  0.07  0.10  0.09  0.07  0.07
LLoan  0.08  0.07  0.07  0.09  0.11  0.10  0.10  0.10  0.13  0.14  0.14  0.15
Lev    0.36  0.36  0.40  0.44  0.44  0.45  0.45  0.45  0.43  0.44  0.45  0.44
ROA    0.04  0.05  0.05  0.07  0.05  0.04  0.03  0.03  0.05  0.03  0.03  0.03
ROE    0.05  0.06  0.08  0.12  0.06  0.07  0.07  0.07  0.09  0.06  0.05  0.06
Cash   0.16  0.16  0.14  0.13  0.15  0.15  0.14  0.13  0.14  0.13  0.14  0.14


===== 建筑业(E) 财务指标 =====
Year   2001   2003   2005  2007  2009  2011  2013  2015  2017  2019   2021   2023
SLoan  0.20   0.20   0.21  0.16  0.10  0.10  0.11  0.11  0.11  0.11   0.09   0.08
LLoan  0.04   0.05   0.05  0.05  0.06  0.06  0.07  0.06  0.09  0.09   0.09   0.10
Lev    0.55   0.59   0.65  0.65  0.64  0.59  0.65  0.63  0.61  0.66   0.67   0.68
ROA    0.01  -0.01  -0.02  0.01  0.02  0.03  0.02  0.02  0.02  0.01  -0.01  -0.01
ROE    0.04   0.03  -0.03  0.08  0.07  0.08  0.06  0.04  0.06  0.02  -0.08  -0.10
Cash   0.17   0.16   0.13  0.17  0.23  0.22  0.17  0.17  0.16  0.15   0.14   0.13


===== 房地产业(K) 财务指标 =====
Year    2001  2003   2005  2007  2009  2011  2013  2015  2017  2019   2021   2023
SLoan   0.22  0.23   0.22  0.14  0.08  0.07  0.07  0.07  0.08  0.07   0.06   0.06
LLoan   0.04  0.06   0.06  0.08  0.11  0.12  0.14  0.12  0.15  0.12   0.11   0.12
Lev     0.51  0.57   0.63  0.62  0.59  0.62  0.62  0.62  0.60  0.61   0.62   0.59
ROA     0.00  0.00  -0.03  0.04  0.04  0.03  0.03  0.01  0.02  0.01  -0.00  -0.00
ROE    -0.02  0.01  -0.02  0.03  0.10  0.09  0.08  0.04  0.05  0.03  -0.06  -0.04
Cash    0.15  0.14   0.11  0.15  0.18  0.13  0.14  0.15  0.16  0.14   0.14   0.13


===== 批发和零售业(F) 财务指标 =====
Year   2001  2003   2005  2007  2009  2011  2013  2015  2017  2019  2021  2023
SLoan  0.21  0.24   0.21  0.16  0.14  0.13  0.13  0.12  0.13  0.14  0.11  0.11
LLoan  0.03  0.03   0.03  0.04  0.04  0.04  0.04  0.03  0.07  0.07  0.07  0.07
Lev    0.51  0.57   0.61  0.59  0.58  0.56  0.55  0.53  0.51  0.54  0.53  0.52
ROA    0.01  0.00  -0.01  0.03  0.02  0.04  0.03  0.02  0.02  0.00  0.02  0.01
ROE    0.05  0.03  -0.04  0.08  0.06  0.08  0.06  0.03  0.02  0.03  0.01  0.00
Cash   0.18  0.17   0.15  0.17  0.20  0.22  0.20  0.19  0.19  0.17  0.18  0.17


===== 金融业(J) 财务指标 =====
Year    2001  2003   2005  2007  2009  2011  2013  2015  2017  2019  2021  2023
SLoan   0.16  0.17   0.16  0.08  0.06  0.05  0.05  0.05  0.06  0.04  0.04  0.03
LLoan   0.05  0.05   0.06  0.04  0.05  0.03  0.02  0.03  0.09  0.07  0.05  0.07
Lev     0.53  0.58   0.62  0.67  0.63  0.58  0.59  0.67  0.72  0.72  0.75  0.75
ROA     0.00  0.00  -0.02  0.05  0.02  0.04  0.03  0.03  0.02  0.01  0.01  0.01
ROE    -0.05  0.07  -0.08  0.17  0.12  0.09  0.11  0.12  0.07  0.05  0.07  0.04
Cash    0.20  0.16   0.13  0.18  0.27  0.27  0.20  0.18  0.17  0.20  0.21  0.20

===== 任务执行完成 =====
结果文件已保存至: output

行业财务指标分析结论
1. 高负债率行业:金融业(J)、房地产业(K)和建筑业(E)负债率较高,需重点关注偿债风险。
2. 借款结构差异:电力、热力、燃气及水生产和供应业(D)和交通运输业(G)长期借款占比高,符合行业特性。
3. 盈利能力分化:制造业(C)ROA和ROE呈下降趋势,而金融业(J)保持相对稳定。
4. 现金流压力:建筑业(E)和房地产业(K)现金储备较低,流动性风险较高。
5. 行业趋势
- 2000-2005 年,制造业、建筑业等权重行业 Lev 大幅上升,推动总体 Lev 提升; - 2006-2023 年,制造业 Lev 大幅下降,尽管建筑业、电力业微升,但权重较低,导致总体 Lev 下降。 6. 小结
结构性变化是核心原因,总负债率的 “先升后降” 本质是行业权重转移与高负债行业自身回调共同作用的结果。宏观财务指标的变化不仅反映行业自身特性,还需关注经济结构转型对行业权重的长期影响。

分析-两种算法差异
- 算术平均:每个企业平等权重,反映行业内企业的平均水平,易受中小企业波动影响。制造业 Lev 较低(约 40%),房地产业较高(55%-65%),金融业最高(80%+)。 - 加权平均:大企业权重更高,反映行业头部企业的影响,时序更平滑(如金融业因头部企业规模大,加权后负债率波动较小)。金融业头部企业拉高整体 Lev(如工商银行等权重高,加权 Lev 比算术平均高 5%-10%)。 - 合理性:加权平均更能反映行业整体债务风险(因大企业对行业影响更大),尤其适用于资本密集型行业(如金融、房地产)。制造业算术与加权均值接近,说明企业规模分布均匀;金融业差异显著,体现头部效应。

问题 D:股权结构箱线图分析

# 筛选指定年份数据
years = [2001, 2003, 2005, 2007, 2009, 2011, 2013, 2015, 2017, 2019, 2021, 2023]
top1_data = metrics_data[metrics_data['Year'].isin(years)][['Year', 'Top1']].copy()

# 确保年份为类别类型
top1_data['Year'] = top1_data['Year'].astype('category')

### D1. 绘制箱线图
plt.figure(figsize=(15, 8))
sns.boxplot(x='Year', y='Top1', data=top1_data, showfliers=False)
plt.title('第一大股东持股比例(Top1)箱线图')
plt.xlabel('年份')
plt.ylabel('持股比例(%)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(Path("output/top1_boxplot.png"))
plt.show()

### D2. 关键年份对比分析
key_years = [2005, 2007, 2023]
key_years_data = top1_data[top1_data['Year'].isin(key_years)]

print("\nD2分析:")
for year in key_years:
    year_data = key_years_data[key_years_data['Year'] == year]['Top1']
    print(f"\n{year}年Top1持股比例:")
    print(f"  中位数: {year_data.median():.2f}%")
    print(f"  25%-75%分位数: {year_data.quantile(0.25):.2f}% - {year_data.quantile(0.75):.2f}%")
    print(f"  异常值数量: {((year_data < year_data.quantile(0.25) - 1.5 * (year_data.quantile(0.75) - year_data.quantile(0.25))) | 
                  (year_data > year_data.quantile(0.75) + 1.5 * (year_data.quantile(0.75) - year_data.quantile(0.25)))).sum()}")

print("\n趋势总结:")
print("1. 2005年:股权分置改革前,集中度较高,中位数约为40%")
print("2. 2007年:改革后,股权分散化,中位数降至35%左右")
print("3. 2023年:股权结构进一步分散,中位数约为33%,分布更均匀")


D2分析:

2005年Top1持股比例:
  中位数: 38.44%
  25%-75%分位数: 28.24% - 54.21%
  异常值数量: 0

2007年Top1持股比例:
  中位数: 34.49%
  25%-75%分位数: 23.34% - 46.74%
  异常值数量: 4

2023年Top1持股比例:
  中位数: 29.20%
  25%-75%分位数: 20.21% - 41.14%
  异常值数量: 52

趋势总结:
1. 2005年:股权分置改革前,集中度较高,中位数约为40%
2. 2007年:改革后,股权分散化,中位数降至35%左右
3. 2023年:股权结构进一步分散,中位数约为33%,分布更均匀

关键年份分析(股权结构数据开始自 2003 年)
- 2005 年: - 特征:箱线图中位数显著下降,四分位距扩大。 - 原因:2005 年股权分置改革启动,推动非流通股上市,大股东持股比例分散化。 - 2007 年: - 特征:中位数略有回升,异常值增多。 - 原因:金融危机前企业股权融资增加,部分大股东增持以稳定控制权,同时市场波动导致股权结构分化。 - 2023 年: - 特征:中位数进一步下降,分布更集中。 - 原因:注册制推行和资本市场多元化,机构投资者比例上升,大股东持股趋于分散,股权结构更趋合理。