Files
TableProcessing/process_excel_optimized.py
Aiden_ 5b32208194 xx
2025-10-31 21:24:08 +08:00

484 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
财务Excel数据处理程序 - 优化版本
读取data/data.xlsx中的Sheet1表格,提取财务数据并输出到res.json
优化内容:
1. 性能优化: 缓存合并单元格查询,减少重复计算
2. 代码质量: 改进类型提示,增强错误处理
3. 功能增强: 添加数据统计和验证报告
4. 可维护性: 提取常量配置,改进日志记录
"""
import json
import logging
from dataclasses import dataclass, asdict
from openpyxl import load_workbook
from typing import List, Dict, Any, Optional, Tuple, Set
from datetime import datetime
# ==================== 常量配置 ====================
# Excel列索引常量
COL_RECEIVED_AMOUNT = 6 # F列: 实收金额
COL_HANDLING_FEE = 7 # G列: 手续费
COL_ORDER_NUM = 8 # H列: 订单号
COL_AMOUNT = 9 # I列: 金额
COL_ACCOUNT_NAME = 15 # O列: 账户名
# 数据起始行
DATA_START_ROW = 2
# 金额匹配容差
AMOUNT_TOLERANCE = 0.01
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# ==================== 数据类定义 ====================
@dataclass
class Order:
"""订单数据类"""
OrderNum: Optional[str]
Amount: Optional[float]
AccountName: Optional[str]
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"OrderNum": self.OrderNum,
"Amount": self.Amount,
"AccountName": self.AccountName
}
@dataclass
class FinancialRecord:
"""财务记录数据类"""
ReceivedAmount: Optional[float]
HandlingFee: float
Order: List[Order]
checkRes: bool
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"ReceivedAmount": self.ReceivedAmount,
"HandlingFee": self.HandlingFee,
"Order": [order.to_dict() for order in self.Order],
"checkRes": self.checkRes
}
@dataclass
class ProcessingStats:
"""数据处理统计"""
total_records: int = 0
valid_records: int = 0
invalid_records: int = 0
total_orders: int = 0
check_failed_records: int = 0
skipped_empty_orders: int = 0
processing_time: float = 0.0
def print_report(self):
"""打印统计报告"""
logger.info("=" * 60)
logger.info("数据处理统计报告")
logger.info("=" * 60)
logger.info(f"总记录数: {self.total_records}")
logger.info(f"有效记录: {self.valid_records}")
logger.info(f"无效记录: {self.invalid_records}")
logger.info(f"订单总数: {self.total_orders}")
logger.info(f"金额验证失败: {self.check_failed_records}")
logger.info(f"跳过空订单: {self.skipped_empty_orders}")
logger.info(f"处理耗时: {self.processing_time:.2f}")
logger.info("=" * 60)
# ==================== 合并单元格缓存 ====================
class MergedCellCache:
"""合并单元格缓存类,优化查询性能"""
def __init__(self, merged_ranges):
"""初始化缓存"""
self.merged_ranges = merged_ranges
# 创建行列索引缓存
self._cache: Dict[Tuple[int, int], Any] = {}
self._build_cache()
def _build_cache(self):
"""构建缓存索引"""
for merged_range in self.merged_ranges:
for row in range(merged_range.min_row, merged_range.max_row + 1):
for col in range(merged_range.min_col, merged_range.max_col + 1):
self._cache[(row, col)] = merged_range
def get_merged_range(self, row: int, col: int) -> Optional[Any]:
"""获取指定单元格所属的合并范围"""
return self._cache.get((row, col))
def is_merged(self, row: int, col: int) -> bool:
"""检查单元格是否在合并区域内"""
return (row, col) in self._cache
# ==================== 核心处理函数 ====================
def get_cell_value(ws, row: int, col: int) -> Any:
"""
获取单元格值,处理公式计算结果
参数:
ws: 工作表对象
row: 行号
col: 列号
返回:
单元格值
"""
try:
cell = ws.cell(row, col)
return cell.value
except Exception as e:
logger.warning(f"获取单元格({row}, {col})值失败: {e}")
return None
def get_merged_cell_value(ws, row: int, col: int, cache: MergedCellCache) -> Any:
"""
获取合并单元格的值(优化版本)
如果单元格在合并区域内,返回合并区域左上角单元格的值
参数:
ws: 工作表对象
row: 行号
col: 列号
cache: 合并单元格缓存对象
返回:
单元格值
"""
merged_range = cache.get_merged_range(row, col)
if merged_range:
# 在合并区域内,返回左上角的值
return ws.cell(merged_range.min_row, merged_range.min_col).value
# 不在合并区域内,直接返回单元格值
return ws.cell(row, col).value
def get_f_column_ranges(ws, merged_ranges, start_row: int = DATA_START_ROW) -> List[Tuple[int, int]]:
"""
获取F列的所有数据区域优化版本
参数:
ws: 工作表对象
merged_ranges: 合并单元格范围列表
start_row: 起始行号
返回:
[(起始行, 结束行), ...]
"""
# 找到F列的所有合并单元格区域
f_merges = []
for merge in merged_ranges:
if merge.min_col == COL_RECEIVED_AMOUNT and \
merge.max_col == COL_RECEIVED_AMOUNT and \
merge.min_row >= start_row:
f_merges.append((merge.min_row, merge.max_row))
# 创建合并单元格行的集合,用于快速查找
merged_rows: Set[int] = set()
for start, end in f_merges:
merged_rows.update(range(start, end + 1))
# 处理非合并单元格
all_ranges = []
for row in range(start_row, ws.max_row + 1):
if row not in merged_rows:
f_value = ws.cell(row, COL_RECEIVED_AMOUNT).value
if f_value is not None:
all_ranges.append((row, row))
# 添加合并单元格区域并排序
all_ranges.extend(f_merges)
all_ranges.sort(key=lambda x: x[0])
return all_ranges
def extract_orders(ws, start_row: int, end_row: int, cache: MergedCellCache,
stats: ProcessingStats) -> List[Order]:
"""
提取指定行范围内的订单数据(优化版本)
参数:
ws: 工作表对象
start_row: 起始行
end_row: 结束行
cache: 合并单元格缓存
stats: 统计对象
返回:
订单列表
"""
orders = []
for row in range(start_row, end_row + 1):
# 提取订单数据
order_num = get_merged_cell_value(ws, row, COL_ORDER_NUM, cache)
amount = get_merged_cell_value(ws, row, COL_AMOUNT, cache)
account_name = get_merged_cell_value(ws, row, COL_ACCOUNT_NAME, cache)
# 跳过金额为空的行
if amount is None:
stats.skipped_empty_orders += 1
continue
order = Order(
OrderNum=str(order_num).strip() if order_num else None,
Amount=float(amount) if amount is not None else None,
AccountName=str(account_name).strip() if account_name else None
)
orders.append(order)
stats.total_orders += 1
return orders
def validate_amount(received_amount: Optional[float], handling_fee: float,
orders: List[Order]) -> bool:
"""
验证金额是否匹配
参数:
received_amount: 实收金额
handling_fee: 手续费
orders: 订单列表
返回:
是否匹配
"""
order_amount_sum = sum(
order.Amount for order in orders
if order.Amount is not None
)
received_plus_fee = (received_amount if received_amount is not None else 0) + handling_fee
return abs(received_plus_fee - order_amount_sum) < AMOUNT_TOLERANCE
def process_excel_data(file_path: str) -> Tuple[List[FinancialRecord], ProcessingStats]:
"""
处理Excel文件提取所有财务记录优化版本
参数:
file_path: Excel文件路径
返回:
(记录列表, 统计信息)
"""
start_time = datetime.now()
stats = ProcessingStats()
logger.info(f"开始加载Excel文件: {file_path}")
try:
# 加载Excel文件
wb = load_workbook(file_path, data_only=True)
ws = wb['Sheet1']
# 获取合并单元格范围并创建缓存
merged_ranges = list(ws.merged_cells.ranges)
cache = MergedCellCache(merged_ranges)
logger.info(f"创建合并单元格缓存,共{len(merged_ranges)}个合并区域")
# 获取F列的所有数据区域
f_ranges = get_f_column_ranges(ws, merged_ranges, DATA_START_ROW)
logger.info(f"找到{len(f_ranges)}个F列数据区域")
results = []
for idx, (start_row, end_row) in enumerate(f_ranges, 1):
stats.total_records += 1
# 获取实收金额和手续费
received_amount = get_cell_value(ws, start_row, COL_RECEIVED_AMOUNT)
handling_fee = get_cell_value(ws, start_row, COL_HANDLING_FEE)
# 手续费为空时记为0
if handling_fee is None:
handling_fee = 0
# 提取订单列表
orders = extract_orders(ws, start_row, end_row, cache, stats)
# 跳过没有订单的记录
if not orders:
stats.invalid_records += 1
logger.warning(f"记录#{idx} (行{start_row}-{end_row}): 无有效订单,跳过")
continue
# 验证金额
check_res = validate_amount(received_amount, handling_fee, orders)
if not check_res:
stats.check_failed_records += 1
logger.warning(
f"记录#{idx} (行{start_row}-{end_row}): "
f"金额验证失败 - 实收:{received_amount}, 手续费:{handling_fee}, "
f"订单总额:{sum(o.Amount for o in orders if o.Amount)}"
)
record = FinancialRecord(
ReceivedAmount=received_amount,
HandlingFee=handling_fee,
Order=orders,
checkRes=check_res
)
results.append(record)
stats.valid_records += 1
# 计算处理时间
end_time = datetime.now()
stats.processing_time = (end_time - start_time).total_seconds()
logger.info(f"数据提取完成,共{stats.valid_records}条有效记录")
return results, stats
except Exception as e:
logger.error(f"处理Excel文件时发生错误: {e}", exc_info=True)
raise
def save_to_json(data: List[FinancialRecord], output_file: str):
"""
将数据保存为JSON文件
参数:
data: 要保存的数据
output_file: 输出文件路径
"""
try:
# 转换为字典格式
data_dict = [record.to_dict() for record in data]
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data_dict, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到: {output_file}")
logger.info(f"总共提取 {len(data)} 条记录")
except Exception as e:
logger.error(f"保存JSON文件时发生错误: {e}", exc_info=True)
raise
def generate_validation_report(data: List[FinancialRecord], output_file: str = 'validation_report.txt'):
"""
生成数据验证报告
参数:
data: 财务记录列表
output_file: 报告文件路径
"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write("=" * 80 + "\n")
f.write("财务数据验证报告\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 80 + "\n\n")
# 统计信息
total = len(data)
failed = sum(1 for r in data if not r.checkRes)
passed = total - failed
f.write(f"总记录数: {total}\n")
f.write(f"验证通过: {passed} ({passed/total*100:.1f}%)\n")
f.write(f"验证失败: {failed} ({failed/total*100:.1f}%)\n\n")
# 失败记录详情
if failed > 0:
f.write("-" * 80 + "\n")
f.write("验证失败记录详情:\n")
f.write("-" * 80 + "\n\n")
for idx, record in enumerate(data, 1):
if not record.checkRes:
order_sum = sum(o.Amount for o in record.Order if o.Amount)
received_plus_fee = (record.ReceivedAmount or 0) + record.HandlingFee
diff = received_plus_fee - order_sum
f.write(f"记录 #{idx}:\n")
f.write(f" 实收金额: {record.ReceivedAmount}\n")
f.write(f" 手续费: {record.HandlingFee}\n")
f.write(f" 实收+手续费: {received_plus_fee}\n")
f.write(f" 订单总额: {order_sum}\n")
f.write(f" 差额: {diff:.2f}\n")
f.write(f" 订单数量: {len(record.Order)}\n")
f.write("\n")
logger.info(f"验证报告已保存到: {output_file}")
except Exception as e:
logger.error(f"生成验证报告时发生错误: {e}", exc_info=True)
def main():
"""主函数"""
input_file = 'data/data.xlsx'
output_file = 'res.json'
logger.info("=" * 60)
logger.info("财务Excel数据处理程序 - 优化版本")
logger.info("=" * 60)
logger.info(f"输入文件: {input_file}")
logger.info(f"输出文件: {output_file}")
try:
# 提取数据
data, stats = process_excel_data(input_file)
# 保存到JSON
save_to_json(data, output_file)
# 生成验证报告
generate_validation_report(data)
# 显示统计信息
stats.print_report()
# 显示前3条记录预览
if data:
logger.info("\n前3条记录预览:")
preview = [record.to_dict() for record in data[:3]]
print(json.dumps(preview, ensure_ascii=False, indent=2))
logger.info("\n处理完成!")
except Exception as e:
logger.error(f"程序执行失败: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
exit(main())