import pandas as pd
import os
import re
import requests
from bs4 import BeautifulSoup
import time
import random
# 构建data文件夹路径(从当前目录向上一级,再进入data)
data_dir = os.path.join(os.getcwd(), "..", "data")
data_dir = os.path.abspath(data_dir) # 转换为绝对路径(可选)
os.makedirs(data_dir, exist_ok=True)
# 设置默认文件路径
input_file = os.path.join(data_dir, "data_raw.csv")
# 检查文件是否存在
if not os.path.exists(input_file):
print(f"错误: 文件 {input_file} 不存在!")
else:
print(f"文件存在: {input_file}")
def get_stock_info(stock_code, stock_name):
"""
通过网络搜索获取股票信息
"""
try:
# 构建搜索URL(使用东方财富网)
search_url = f"https://so.eastmoney.com/web/s?keyword={stock_code} {stock_name}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# 发送请求
response = requests.get(search_url, headers=headers, timeout=10)
response.encoding = 'utf-8'
# 解析网页内容
soup = BeautifulSoup(response.text, 'html.parser')
# 查找股票信息
stock_info = {}
# 查找交易所信息
exchange_info = soup.find('div', class_='exchange')
if exchange_info:
stock_info['交易所'] = exchange_info.text.strip()
# 查找板块信息
board_info = soup.find('div', class_='board')
if board_info:
stock_info['板块'] = board_info.text.strip()
# 查找上市时间信息
ipo_date = soup.find('div', class_='ipo-date')
if ipo_date:
stock_info['上市时间'] = ipo_date.text.strip()
return stock_info
except Exception as e:
print(f"获取{stock_code} {stock_name}信息时出错: {str(e)}")
return None
def fill_missing_values(df):
"""
填充缺失值
"""
# 需要检查的字段(排除行业相关字段)
industry_columns = ['东财行业', '证券行业', '一级行业', '二级行业']
check_columns = [col for col in df.columns if col not in industry_columns]
print("\n开始补充缺失值...")
for col in check_columns:
missing_count = df[col].isna().sum()
if missing_count > 0:
print(f"\n处理字段 '{col}' 的 {missing_count} 个空值")
# 获取包含空值的行
missing_rows = df[df[col].isna()]
for idx, row in missing_rows.iterrows():
stock_code = row['股票代码'].replace("'", "") # 移除单引号
stock_name = row['股票名称']
print(f"正在处理: {stock_code} {stock_name}")
# 获取股票信息
stock_info = get_stock_info(stock_code, stock_name)
if stock_info and col in stock_info:
df.at[idx, col] = stock_info[col]
print(f"已补充 {col}: {stock_info[col]}")
# 添加随机延迟,避免请求过快
time.sleep(random.uniform(1, 3))
return df
def check_missing_values(df):
"""
检查除行业字段外的其他字段的空值情况
"""
# 需要检查的字段(排除行业相关字段)
industry_columns = ['东财行业', '证券行业', '一级行业', '二级行业']
check_columns = [col for col in df.columns if col not in industry_columns]
print("\n字段空值检查结果:")
print("-" * 50)
for col in check_columns:
missing_count = df[col].isna().sum()
if missing_count > 0:
print(f"字段 '{col}' 存在 {missing_count} 个空值 ({missing_count/len(df)*100:.2f}%)")
# 显示包含空值的行
missing_rows = df[df[col].isna()]
print("包含空值的行:")
for idx, row in missing_rows.iterrows():
print(f" 行号: {idx}, 股票代码: {row['股票代码']}, 股票名称: {row['股票名称']}")
print("-" * 50)
def save_to_csv(df, filepath):
"""
保存数据到CSV文件,包含错误处理
"""
try:
# 检查目录是否存在
directory = os.path.dirname(filepath)
if not os.path.exists(directory):
os.makedirs(directory)
print(f"创建目录: {directory}")
# 检查文件是否被其他程序占用
if os.path.exists(filepath):
try:
# 尝试以写入模式打开文件
with open(filepath, 'a'):
pass
except IOError:
print(f"错误:文件 {filepath} 可能被其他程序占用")
print("请关闭可能正在使用该文件的程序(如Excel)后重试")
return False
# 保存文件
df.to_csv(filepath, index=False, encoding='utf-8-sig')
print(f"数据已成功保存到: {filepath}")
return True
except PermissionError:
print(f"错误:没有权限写入文件 {filepath}")
print("请检查文件权限或尝试以管理员身份运行程序")
return False
except Exception as e:
print(f"保存文件时出错: {str(e)}")
return False
try:
# 读取原始数据并检查文件存在性
if not os.path.exists(input_file):
raise FileNotFoundError(f"文件 {input_file} 不存在")
data_raw = pd.read_csv(input_file)
# ======================================
# 1. 数据清洗与筛选
# ======================================
# 去除关键字段缺失的行(股票代码、上市时间、交易所必须存在)
df = data_raw.dropna(subset=['股票代码', '上市时间', '交易所'])
# 清洗股票代码(确保6位字符串,并处理前导零)
df['股票代码'] = df['股票代码'].astype(str).str.zfill(6)
# 将股票代码转换为文本格式,确保Excel中显示完整
df['股票代码'] = "'" + df['股票代码']
# 清洗上市时间(保留8位有效日期)
df['上市时间'] = df['上市时间'].astype(str).str.strip()
df = df[df['上市时间'].str.match(r'^\d{8}$', na=False)] # 严格匹配8位数字
df['上市时间'] = pd.to_numeric(df['上市时间'], downcast='integer') # 转为整数
# 提取年月日并过滤无效日期
df['上市年份'] = df['上市时间'] // 10000
df['上市月份'] = (df['上市时间'] // 100) % 100
df['上市日期'] = df['上市时间'] % 100
# 过滤无效日期(如2月30日等)
valid_dates = pd.to_datetime(
df['上市时间'].astype(str),
format='%Y%m%d',
errors='coerce'
)
df = df[valid_dates.notna()] # 仅保留可转换为日期的记录
# 拆分“东财行业”为一级行业和二级行业(只保留前两级)
if '东财行业' in df.columns:
split_cols = df['东财行业'].str.split('-', n=2, expand=True)
df['一级行业'] = split_cols[0].str.strip()
df['二级行业'] = split_cols[1].str.strip() if split_cols.shape[1] > 1 else None
else:
df['一级行业'] = None
df['二级行业'] = None
# ======================================
# 2. 字段处理与结果输出
# ======================================
# 选择最终输出字段
output_columns = [
'股票名称', '交易所', '板块',
'上市年份', '上市月份', '一级行业', '二级行业'
]
# 检查字段完整性并处理缺失值
# 若原始数据没有"板块"字段,则先补充
if '板块' not in df.columns:
df['板块'] = df['股票代码'].apply(lambda x:
'沪市主板' if x.startswith("'60") else
'深市主板' if x.startswith("'000") else
'中小板' if x.startswith("'002") else
'创业板' if x.startswith("'30") else
'科创板' if x.startswith("'688") else
'北交所' if x.startswith("'8") else # 北交所代码通常以8开头
'未知'
)
else:
# 若有"板块"字段但有缺失,则补充缺失部分
df.loc[df['板块'].isna(), '板块'] = df.loc[df['板块'].isna(), '股票代码'].apply(lambda x:
'沪市主板' if x.startswith("'60") else
'深市主板' if x.startswith("'000") else
'中小板' if x.startswith("'002") else
'创业板' if x.startswith("'30") else
'科创板' if x.startswith("'688") else
'北交所' if x.startswith("'8") else
'未知'
)
# 检查其他字段的空值情况
check_missing_values(df)
# 补充缺失值
df = fill_missing_values(df)
# 再次检查空值情况
print("\n补充后的空值检查结果:")
check_missing_values(df)
# 保存结果到 data_demo.csv
# data_dir 已在上方定义并可直接使用,无需重复定义
# 设置默认文件路径
output_file = os.path.join(data_dir, "data_demo.csv")
if not save_to_csv(df, output_file):
print("\n请解决文件保存问题后重试")
exit(1)
# ======================================
# 3. 结果验证与输出
# ======================================
print("\n数据清洗结果:")
print(f"原始数据行数: {len(data_raw)}")
print(f"清洗后有效行数: {len(df)}")
print("\n最终字段信息:")
print(df.head())
print(f"\n是否存在空置字段: {'是' if df.isnull().any().any() else '否'}")
except FileNotFoundError as e:
print(f"错误:{e}")
print("请确认文件路径正确且文件已生成")
except Exception as e:
print(f"处理过程中出现错误: {str(e)}")