37  G4-02

文档说明:本文档为中国上市公司数据清洗文档。
主要为异常数据识别和剔除;保障数据最后输出为我们直接、可用的数据

import pandas as pd
import os
import re
import requests
from bs4 import BeautifulSoup
import time
import random

# 构建data文件夹路径(从当前目录向上一级,再进入data)
data_dir = os.path.join(os.getcwd(), "..", "data")
data_dir = os.path.abspath(data_dir)  # 转换为绝对路径(可选)
os.makedirs(data_dir, exist_ok=True)

# 设置默认文件路径
input_file = os.path.join(data_dir, "data_raw.csv")

# 检查文件是否存在
if not os.path.exists(input_file):
    print(f"错误: 文件 {input_file} 不存在!")
else:
    print(f"文件存在: {input_file}")


def get_stock_info(stock_code, stock_name):
    """
    通过网络搜索获取股票信息
    """
    try:
        # 构建搜索URL(使用东方财富网)
        search_url = f"https://so.eastmoney.com/web/s?keyword={stock_code} {stock_name}"
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
        }
        
        # 发送请求
        response = requests.get(search_url, headers=headers, timeout=10)
        response.encoding = 'utf-8'
        
        # 解析网页内容
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 查找股票信息
        stock_info = {}
        
        # 查找交易所信息
        exchange_info = soup.find('div', class_='exchange')
        if exchange_info:
            stock_info['交易所'] = exchange_info.text.strip()
        
        # 查找板块信息
        board_info = soup.find('div', class_='board')
        if board_info:
            stock_info['板块'] = board_info.text.strip()
        
        # 查找上市时间信息
        ipo_date = soup.find('div', class_='ipo-date')
        if ipo_date:
            stock_info['上市时间'] = ipo_date.text.strip()
        
        return stock_info
    
    except Exception as e:
        print(f"获取{stock_code} {stock_name}信息时出错: {str(e)}")
        return None

def fill_missing_values(df):
    """
    填充缺失值
    """
    # 需要检查的字段(排除行业相关字段)
    industry_columns = ['东财行业', '证券行业', '一级行业', '二级行业']
    check_columns = [col for col in df.columns if col not in industry_columns]
    
    print("\n开始补充缺失值...")
    for col in check_columns:
        missing_count = df[col].isna().sum()
        if missing_count > 0:
            print(f"\n处理字段 '{col}' 的 {missing_count} 个空值")
            # 获取包含空值的行
            missing_rows = df[df[col].isna()]
            
            for idx, row in missing_rows.iterrows():
                stock_code = row['股票代码'].replace("'", "")  # 移除单引号
                stock_name = row['股票名称']
                print(f"正在处理: {stock_code} {stock_name}")
                
                # 获取股票信息
                stock_info = get_stock_info(stock_code, stock_name)
                
                if stock_info and col in stock_info:
                    df.at[idx, col] = stock_info[col]
                    print(f"已补充 {col}: {stock_info[col]}")
                
                # 添加随机延迟,避免请求过快
                time.sleep(random.uniform(1, 3))
    
    return df

def check_missing_values(df):
    """
    检查除行业字段外的其他字段的空值情况
    """
    # 需要检查的字段(排除行业相关字段)
    industry_columns = ['东财行业', '证券行业', '一级行业', '二级行业']
    check_columns = [col for col in df.columns if col not in industry_columns]
    
    print("\n字段空值检查结果:")
    print("-" * 50)
    for col in check_columns:
        missing_count = df[col].isna().sum()
        if missing_count > 0:
            print(f"字段 '{col}' 存在 {missing_count} 个空值 ({missing_count/len(df)*100:.2f}%)")
            # 显示包含空值的行
            missing_rows = df[df[col].isna()]
            print("包含空值的行:")
            for idx, row in missing_rows.iterrows():
                print(f"  行号: {idx}, 股票代码: {row['股票代码']}, 股票名称: {row['股票名称']}")
    print("-" * 50)

def save_to_csv(df, filepath):
    """
    保存数据到CSV文件,包含错误处理
    """
    try:
        # 检查目录是否存在
        directory = os.path.dirname(filepath)
        if not os.path.exists(directory):
            os.makedirs(directory)
            print(f"创建目录: {directory}")
        
        # 检查文件是否被其他程序占用
        if os.path.exists(filepath):
            try:
                # 尝试以写入模式打开文件
                with open(filepath, 'a'):
                    pass
            except IOError:
                print(f"错误:文件 {filepath} 可能被其他程序占用")
                print("请关闭可能正在使用该文件的程序(如Excel)后重试")
                return False
        
        # 保存文件
        df.to_csv(filepath, index=False, encoding='utf-8-sig')
        print(f"数据已成功保存到: {filepath}")
        return True
    
    except PermissionError:
        print(f"错误:没有权限写入文件 {filepath}")
        print("请检查文件权限或尝试以管理员身份运行程序")
        return False
    except Exception as e:
        print(f"保存文件时出错: {str(e)}")
        return False

try:
    # 读取原始数据并检查文件存在性
    if not os.path.exists(input_file):
        raise FileNotFoundError(f"文件 {input_file} 不存在")
    
    data_raw = pd.read_csv(input_file)
    
    # ======================================
    # 1. 数据清洗与筛选
    # ======================================
    
    # 去除关键字段缺失的行(股票代码、上市时间、交易所必须存在)
    df = data_raw.dropna(subset=['股票代码', '上市时间', '交易所'])
    
    # 清洗股票代码(确保6位字符串,并处理前导零)
    df['股票代码'] = df['股票代码'].astype(str).str.zfill(6)
    # 将股票代码转换为文本格式,确保Excel中显示完整
    df['股票代码'] = "'" + df['股票代码']
    
    # 清洗上市时间(保留8位有效日期)
    df['上市时间'] = df['上市时间'].astype(str).str.strip()
    df = df[df['上市时间'].str.match(r'^\d{8}$', na=False)]  # 严格匹配8位数字
    df['上市时间'] = pd.to_numeric(df['上市时间'], downcast='integer')  # 转为整数
    
    # 提取年月日并过滤无效日期
    df['上市年份'] = df['上市时间'] // 10000
    df['上市月份'] = (df['上市时间'] // 100) % 100
    df['上市日期'] = df['上市时间'] % 100
    
    # 过滤无效日期(如2月30日等)
    valid_dates = pd.to_datetime(
        df['上市时间'].astype(str), 
        format='%Y%m%d', 
        errors='coerce'
    )
    df = df[valid_dates.notna()]  # 仅保留可转换为日期的记录
    
    # 拆分“东财行业”为一级行业和二级行业(只保留前两级)
    if '东财行业' in df.columns:
        split_cols = df['东财行业'].str.split('-', n=2, expand=True)
        df['一级行业'] = split_cols[0].str.strip()
        df['二级行业'] = split_cols[1].str.strip() if split_cols.shape[1] > 1 else None
    else:
        df['一级行业'] = None
        df['二级行业'] = None
    # ======================================
    # 2. 字段处理与结果输出
    # ======================================
    
    # 选择最终输出字段
    output_columns = [
        '股票名称', '交易所', '板块', 
        '上市年份', '上市月份', '一级行业', '二级行业'
    ]
    
    # 检查字段完整性并处理缺失值
    # 若原始数据没有"板块"字段,则先补充
    if '板块' not in df.columns:
        df['板块'] = df['股票代码'].apply(lambda x: 
            '沪市主板' if x.startswith("'60") else
            '深市主板' if x.startswith("'000") else
            '中小板' if x.startswith("'002") else
            '创业板' if x.startswith("'30") else
            '科创板' if x.startswith("'688") else
            '北交所' if x.startswith("'8") else  # 北交所代码通常以8开头
            '未知'
        )
    else:
        # 若有"板块"字段但有缺失,则补充缺失部分
        df.loc[df['板块'].isna(), '板块'] = df.loc[df['板块'].isna(), '股票代码'].apply(lambda x: 
            '沪市主板' if x.startswith("'60") else
            '深市主板' if x.startswith("'000") else
            '中小板' if x.startswith("'002") else
            '创业板' if x.startswith("'30") else
            '科创板' if x.startswith("'688") else
            '北交所' if x.startswith("'8") else
            '未知'
        )
    
    # 检查其他字段的空值情况
    check_missing_values(df)
    
    # 补充缺失值
    df = fill_missing_values(df)
    
    # 再次检查空值情况
    print("\n补充后的空值检查结果:")
    check_missing_values(df)
    
    # 保存结果到 data_demo.csv

    # data_dir 已在上方定义并可直接使用,无需重复定义

    # 设置默认文件路径
    output_file = os.path.join(data_dir, "data_demo.csv")

    if not save_to_csv(df, output_file):
        print("\n请解决文件保存问题后重试")
        exit(1)
    
    # ======================================
    # 3. 结果验证与输出
    # ======================================
    
    print("\n数据清洗结果:")
    print(f"原始数据行数: {len(data_raw)}")
    print(f"清洗后有效行数: {len(df)}")
    print("\n最终字段信息:")
    print(df.head())
    print(f"\n是否存在空置字段: {'是' if df.isnull().any().any() else '否'}")

except FileNotFoundError as e:
    print(f"错误:{e}")
    print("请确认文件路径正确且文件已生成")
except Exception as e:
    print(f"处理过程中出现错误: {str(e)}")
文件存在: e:\work\YJS\作业\数据分析\Proj_china_stock_analysis(2)\Proj_china_stock_analysis\data\data_raw.csv

字段空值检查结果:
--------------------------------------------------
--------------------------------------------------

开始补充缺失值...

补充后的空值检查结果:

字段空值检查结果:
--------------------------------------------------
--------------------------------------------------
数据已成功保存到: e:\work\YJS\作业\数据分析\Proj_china_stock_analysis(2)\Proj_china_stock_analysis\data\data_demo.csv

数据清洗结果:
原始数据行数: 5720
清洗后有效行数: 5715

最终字段信息:
      股票代码  股票名称    交易所   板块      上市时间    最新价               东财行业  \
0  '001390   N古麒  深圳交易所   主板  20250529   32.0       纺织服装-纺织-其他纺织   
1  '834475  三友科技  北京交易所  北交所  20200727  24.67   机械设备-专用设备-矿山冶金机械   
2  '837748  路桥信息  北京交易所  北交所  20230816  38.38  信息技术-计算机软件-其他软件服务   
3  '300169  天晟新材  深圳交易所  创业板  20110125   8.26   基础化工-化学制品-其他化学制品   
4  '300149  睿智医药  深圳交易所  创业板  20101222   8.56     医药生物-生物医药-生物医药   

                        证监会行业   行业为空  上市年份  上市月份  上市日期  一级行业   二级行业  
0        制造业-皮革、毛皮、羽毛及其制品和制鞋业  False  2025     5    29  纺织服装     纺织  
1                 制造业-专用设备制造业  False  2020     7    27  机械设备   专用设备  
2  信息传输、软件和信息技术服务业-软件和信息技术服务业  False  2023     8    16  信息技术  计算机软件  
3                制造业-橡胶和塑料制品业  False  2011     1    25  基础化工   化学制品  
4          科学研究和技术服务业-研究和试验发展  False  2010    12    22  医药生物   生物医药  

是否存在空置字段: 是