#!/usr/bin/env python3 """ 财务Excel数据处理程序 - 优化版本 读取data/data.xlsx中的Sheet1表格,提取财务数据并输出到res.json 优化内容: 1. 性能优化: 缓存合并单元格查询,减少重复计算 2. 代码质量: 改进类型提示,增强错误处理 3. 功能增强: 添加数据统计和验证报告 4. 可维护性: 提取常量配置,改进日志记录 """ import json import logging from dataclasses import dataclass, asdict from openpyxl import load_workbook from typing import List, Dict, Any, Optional, Tuple, Set from datetime import datetime # ==================== 常量配置 ==================== # Excel列索引常量 COL_RECEIVED_AMOUNT = 6 # F列: 实收金额 COL_HANDLING_FEE = 7 # G列: 手续费 COL_ORDER_NUM = 8 # H列: 订单号 COL_AMOUNT = 9 # I列: 金额 COL_ACCOUNT_NAME = 15 # O列: 账户名 # 数据起始行 DATA_START_ROW = 2 # 金额匹配容差 AMOUNT_TOLERANCE = 0.01 # 日志配置 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # ==================== 数据类定义 ==================== @dataclass class Order: """订单数据类""" OrderNum: Optional[str] Amount: Optional[float] AccountName: Optional[str] def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "OrderNum": self.OrderNum, "Amount": self.Amount, "AccountName": self.AccountName } @dataclass class FinancialRecord: """财务记录数据类""" ReceivedAmount: Optional[float] HandlingFee: float Order: List[Order] checkRes: bool def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { "ReceivedAmount": self.ReceivedAmount, "HandlingFee": self.HandlingFee, "Order": [order.to_dict() for order in self.Order], "checkRes": self.checkRes } @dataclass class ProcessingStats: """数据处理统计""" total_records: int = 0 valid_records: int = 0 invalid_records: int = 0 total_orders: int = 0 check_failed_records: int = 0 skipped_empty_orders: int = 0 processing_time: float = 0.0 def print_report(self): """打印统计报告""" logger.info("=" * 60) logger.info("数据处理统计报告") logger.info("=" * 60) logger.info(f"总记录数: {self.total_records}") logger.info(f"有效记录: {self.valid_records}") logger.info(f"无效记录: {self.invalid_records}") logger.info(f"订单总数: {self.total_orders}") logger.info(f"金额验证失败: {self.check_failed_records}") logger.info(f"跳过空订单: {self.skipped_empty_orders}") logger.info(f"处理耗时: {self.processing_time:.2f}秒") logger.info("=" * 60) # ==================== 合并单元格缓存 ==================== class MergedCellCache: """合并单元格缓存类,优化查询性能""" def __init__(self, merged_ranges): """初始化缓存""" self.merged_ranges = merged_ranges # 创建行列索引缓存 self._cache: Dict[Tuple[int, int], Any] = {} self._build_cache() def _build_cache(self): """构建缓存索引""" for merged_range in self.merged_ranges: for row in range(merged_range.min_row, merged_range.max_row + 1): for col in range(merged_range.min_col, merged_range.max_col + 1): self._cache[(row, col)] = merged_range def get_merged_range(self, row: int, col: int) -> Optional[Any]: """获取指定单元格所属的合并范围""" return self._cache.get((row, col)) def is_merged(self, row: int, col: int) -> bool: """检查单元格是否在合并区域内""" return (row, col) in self._cache # ==================== 核心处理函数 ==================== def get_cell_value(ws, row: int, col: int) -> Any: """ 获取单元格值,处理公式计算结果 参数: ws: 工作表对象 row: 行号 col: 列号 返回: 单元格值 """ try: cell = ws.cell(row, col) return cell.value except Exception as e: logger.warning(f"获取单元格({row}, {col})值失败: {e}") return None def get_merged_cell_value(ws, row: int, col: int, cache: MergedCellCache) -> Any: """ 获取合并单元格的值(优化版本) 如果单元格在合并区域内,返回合并区域左上角单元格的值 参数: ws: 工作表对象 row: 行号 col: 列号 cache: 合并单元格缓存对象 返回: 单元格值 """ merged_range = cache.get_merged_range(row, col) if merged_range: # 在合并区域内,返回左上角的值 return ws.cell(merged_range.min_row, merged_range.min_col).value # 不在合并区域内,直接返回单元格值 return ws.cell(row, col).value def get_f_column_ranges(ws, merged_ranges, start_row: int = DATA_START_ROW) -> List[Tuple[int, int]]: """ 获取F列的所有数据区域(优化版本) 参数: ws: 工作表对象 merged_ranges: 合并单元格范围列表 start_row: 起始行号 返回: [(起始行, 结束行), ...] """ # 找到F列的所有合并单元格区域 f_merges = [] for merge in merged_ranges: if merge.min_col == COL_RECEIVED_AMOUNT and \ merge.max_col == COL_RECEIVED_AMOUNT and \ merge.min_row >= start_row: f_merges.append((merge.min_row, merge.max_row)) # 创建合并单元格行的集合,用于快速查找 merged_rows: Set[int] = set() for start, end in f_merges: merged_rows.update(range(start, end + 1)) # 处理非合并单元格 all_ranges = [] for row in range(start_row, ws.max_row + 1): if row not in merged_rows: f_value = ws.cell(row, COL_RECEIVED_AMOUNT).value if f_value is not None: all_ranges.append((row, row)) # 添加合并单元格区域并排序 all_ranges.extend(f_merges) all_ranges.sort(key=lambda x: x[0]) return all_ranges def extract_orders(ws, start_row: int, end_row: int, cache: MergedCellCache, stats: ProcessingStats) -> List[Order]: """ 提取指定行范围内的订单数据(优化版本) 参数: ws: 工作表对象 start_row: 起始行 end_row: 结束行 cache: 合并单元格缓存 stats: 统计对象 返回: 订单列表 """ orders = [] for row in range(start_row, end_row + 1): # 提取订单数据 order_num = get_merged_cell_value(ws, row, COL_ORDER_NUM, cache) amount = get_merged_cell_value(ws, row, COL_AMOUNT, cache) account_name = get_merged_cell_value(ws, row, COL_ACCOUNT_NAME, cache) # 跳过金额为空的行 if amount is None: stats.skipped_empty_orders += 1 continue order = Order( OrderNum=str(order_num).strip() if order_num else None, Amount=float(amount) if amount is not None else None, AccountName=str(account_name).strip() if account_name else None ) orders.append(order) stats.total_orders += 1 return orders def validate_amount(received_amount: Optional[float], handling_fee: float, orders: List[Order]) -> bool: """ 验证金额是否匹配 参数: received_amount: 实收金额 handling_fee: 手续费 orders: 订单列表 返回: 是否匹配 """ order_amount_sum = sum( order.Amount for order in orders if order.Amount is not None ) received_plus_fee = (received_amount if received_amount is not None else 0) + handling_fee return abs(received_plus_fee - order_amount_sum) < AMOUNT_TOLERANCE def process_excel_data(file_path: str) -> Tuple[List[FinancialRecord], ProcessingStats]: """ 处理Excel文件,提取所有财务记录(优化版本) 参数: file_path: Excel文件路径 返回: (记录列表, 统计信息) """ start_time = datetime.now() stats = ProcessingStats() logger.info(f"开始加载Excel文件: {file_path}") try: # 加载Excel文件 wb = load_workbook(file_path, data_only=True) ws = wb['Sheet1'] # 获取合并单元格范围并创建缓存 merged_ranges = list(ws.merged_cells.ranges) cache = MergedCellCache(merged_ranges) logger.info(f"创建合并单元格缓存,共{len(merged_ranges)}个合并区域") # 获取F列的所有数据区域 f_ranges = get_f_column_ranges(ws, merged_ranges, DATA_START_ROW) logger.info(f"找到{len(f_ranges)}个F列数据区域") results = [] for idx, (start_row, end_row) in enumerate(f_ranges, 1): stats.total_records += 1 # 获取实收金额和手续费 received_amount = get_cell_value(ws, start_row, COL_RECEIVED_AMOUNT) handling_fee = get_cell_value(ws, start_row, COL_HANDLING_FEE) # 手续费为空时记为0 if handling_fee is None: handling_fee = 0 # 提取订单列表 orders = extract_orders(ws, start_row, end_row, cache, stats) # 跳过没有订单的记录 if not orders: stats.invalid_records += 1 logger.warning(f"记录#{idx} (行{start_row}-{end_row}): 无有效订单,跳过") continue # 验证金额 check_res = validate_amount(received_amount, handling_fee, orders) if not check_res: stats.check_failed_records += 1 logger.warning( f"记录#{idx} (行{start_row}-{end_row}): " f"金额验证失败 - 实收:{received_amount}, 手续费:{handling_fee}, " f"订单总额:{sum(o.Amount for o in orders if o.Amount)}" ) record = FinancialRecord( ReceivedAmount=received_amount, HandlingFee=handling_fee, Order=orders, checkRes=check_res ) results.append(record) stats.valid_records += 1 # 计算处理时间 end_time = datetime.now() stats.processing_time = (end_time - start_time).total_seconds() logger.info(f"数据提取完成,共{stats.valid_records}条有效记录") return results, stats except Exception as e: logger.error(f"处理Excel文件时发生错误: {e}", exc_info=True) raise def save_to_json(data: List[FinancialRecord], output_file: str): """ 将数据保存为JSON文件 参数: data: 要保存的数据 output_file: 输出文件路径 """ try: # 转换为字典格式 data_dict = [record.to_dict() for record in data] with open(output_file, 'w', encoding='utf-8') as f: json.dump(data_dict, f, ensure_ascii=False, indent=2) logger.info(f"数据已保存到: {output_file}") logger.info(f"总共提取 {len(data)} 条记录") except Exception as e: logger.error(f"保存JSON文件时发生错误: {e}", exc_info=True) raise def generate_validation_report(data: List[FinancialRecord], output_file: str = 'validation_report.txt'): """ 生成数据验证报告 参数: data: 财务记录列表 output_file: 报告文件路径 """ try: with open(output_file, 'w', encoding='utf-8') as f: f.write("=" * 80 + "\n") f.write("财务数据验证报告\n") f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write("=" * 80 + "\n\n") # 统计信息 total = len(data) failed = sum(1 for r in data if not r.checkRes) passed = total - failed f.write(f"总记录数: {total}\n") f.write(f"验证通过: {passed} ({passed/total*100:.1f}%)\n") f.write(f"验证失败: {failed} ({failed/total*100:.1f}%)\n\n") # 失败记录详情 if failed > 0: f.write("-" * 80 + "\n") f.write("验证失败记录详情:\n") f.write("-" * 80 + "\n\n") for idx, record in enumerate(data, 1): if not record.checkRes: order_sum = sum(o.Amount for o in record.Order if o.Amount) received_plus_fee = (record.ReceivedAmount or 0) + record.HandlingFee diff = received_plus_fee - order_sum f.write(f"记录 #{idx}:\n") f.write(f" 实收金额: {record.ReceivedAmount}\n") f.write(f" 手续费: {record.HandlingFee}\n") f.write(f" 实收+手续费: {received_plus_fee}\n") f.write(f" 订单总额: {order_sum}\n") f.write(f" 差额: {diff:.2f}\n") f.write(f" 订单数量: {len(record.Order)}\n") f.write("\n") logger.info(f"验证报告已保存到: {output_file}") except Exception as e: logger.error(f"生成验证报告时发生错误: {e}", exc_info=True) def main(): """主函数""" input_file = 'data/data.xlsx' output_file = 'res.json' logger.info("=" * 60) logger.info("财务Excel数据处理程序 - 优化版本") logger.info("=" * 60) logger.info(f"输入文件: {input_file}") logger.info(f"输出文件: {output_file}") try: # 提取数据 data, stats = process_excel_data(input_file) # 保存到JSON save_to_json(data, output_file) # 生成验证报告 generate_validation_report(data) # 显示统计信息 stats.print_report() # 显示前3条记录预览 if data: logger.info("\n前3条记录预览:") preview = [record.to_dict() for record in data[:3]] print(json.dumps(preview, ensure_ascii=False, indent=2)) logger.info("\n处理完成!") except Exception as e: logger.error(f"程序执行失败: {e}") import traceback traceback.print_exc() return 1 return 0 if __name__ == '__main__': exit(main())