#!/usr/bin/env python3 """ 财务Excel数据处理程序 读取data/data.xlsx中的Sheet1表格,提取财务数据并输出到res.json """ import json from openpyxl import load_workbook from typing import List, Dict, Any, Optional def get_cell_value(ws, row: int, col: int) -> Any: """获取单元格值,处理公式计算结果""" cell = ws.cell(row, col) return cell.value def get_merged_cell_value(ws, row: int, col: int, merged_ranges) -> Any: """ 获取合并单元格的值 如果单元格在合并区域内,返回合并区域左上角单元格的值 """ for merged_range in merged_ranges: if merged_range.min_row <= row <= merged_range.max_row and \ merged_range.min_col <= col <= merged_range.max_col: # 返回合并区域左上角的值 return ws.cell(merged_range.min_row, merged_range.min_col).value # 不在任何合并区域内,直接返回单元格值 return ws.cell(row, col).value def get_f_column_ranges(ws, start_row: int = 2) -> List[tuple]: """ 获取F列的所有数据区域(包括合并和非合并单元格) 从第2行开始读取 返回: [(起始行, 结束行), ...] """ merged_cells = list(ws.merged_cells.ranges) # 找到F列的所有合并单元格区域 f_merges = [] for merge in merged_cells: # F列是第6列 if merge.min_col == 6 and merge.max_col == 6 and merge.min_row >= start_row: f_merges.append((merge.min_row, merge.max_row)) # 创建合并单元格行的集合,用于快速查找 merged_rows = set() for start, end in f_merges: for row in range(start, end + 1): merged_rows.add(row) # 处理非合并单元格(从第2行开始到最大行) all_ranges = [] for row in range(start_row, ws.max_row + 1): if row not in merged_rows: # 检查F列是否有值 f_value = ws.cell(row, 6).value if f_value is not None: # 非合并单元格,单独一行 all_ranges.append((row, row)) # 添加合并单元格区域 all_ranges.extend(f_merges) # 按起始行排序 all_ranges.sort(key=lambda x: x[0]) return all_ranges def extract_orders(ws, start_row: int, end_row: int, merged_ranges) -> List[Dict[str, Any]]: """ 提取指定行范围内的订单数据 参数: ws: 工作表对象 start_row: 起始行 end_row: 结束行 merged_ranges: 合并单元格范围列表 返回: 订单列表 """ orders = [] for row in range(start_row, end_row + 1): # H列: 订单号, I列: 金额, O列: 账户名 order_num = get_merged_cell_value(ws, row, 8, merged_ranges) # H列 amount = get_merged_cell_value(ws, row, 9, merged_ranges) # I列 account_name = get_merged_cell_value(ws, row, 15, merged_ranges) # O列 # 跳过空订单号的行 if order_num is None or str(order_num).strip() == '': continue order = { "OrderNum": str(order_num).strip() if order_num else None, "Amount": amount, "AccountName": str(account_name).strip() if account_name else None } orders.append(order) return orders def process_excel_data(file_path: str) -> List[Dict[str, Any]]: """ 处理Excel文件,提取所有财务记录 从第2行开始读取数据 参数: file_path: Excel文件路径 返回: 记录列表,每条记录包含ReceivedAmount, HandlingFee和Order列表 """ # 加载Excel文件,data_only=True读取公式计算结果 wb = load_workbook(file_path, data_only=True) ws = wb['Sheet1'] # 获取所有合并单元格范围 merged_ranges = list(ws.merged_cells.ranges) # 获取F列的所有数据区域(从第2行开始) f_ranges = get_f_column_ranges(ws, start_row=2) print(f"找到 {len(f_ranges)} 个F列数据区域(包含合并和非合并单元格)") results = [] for start_row, end_row in f_ranges: # 获取F列(实收金额)和G列(手续费)的值 # 合并单元格的值在左上角,非合并单元格直接获取 received_amount = get_cell_value(ws, start_row, 6) # F列 handling_fee = get_cell_value(ws, start_row, 7) # G列 # 手续费为空时记为0 if handling_fee is None: handling_fee = 0 # 提取该区域的订单列表 orders = extract_orders(ws, start_row, end_row, merged_ranges) # 跳过没有订单的记录 if not orders: continue # 计算订单金额总和 order_amount_sum = sum(order["Amount"] for order in orders if order["Amount"] is not None) # 计算实收金额+手续费 received_plus_fee = (received_amount if received_amount is not None else 0) + handling_fee # 检查是否相等(考虑浮点数精度) check_res = abs(received_plus_fee - order_amount_sum) < 0.01 record = { "ReceivedAmount": received_amount, "HandlingFee": handling_fee, "Order": orders, "checkRes": check_res } results.append(record) return results def save_to_json(data: List[Dict[str, Any]], output_file: str): """ 将数据保存为JSON文件 参数: data: 要保存的数据 output_file: 输出文件路径 """ with open(output_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) print(f"\n数据已保存到: {output_file}") print(f"总共提取 {len(data)} 条记录") def main(): """主函数""" input_file = 'data/data.xlsx' output_file = 'res.json' print("开始处理Excel文件...") print(f"输入文件: {input_file}") try: # 提取数据 data = process_excel_data(input_file) # 保存到JSON save_to_json(data, output_file) # 显示前3条记录作为预览 if data: print("\n前3条记录预览:") print(json.dumps(data[:3], ensure_ascii=False, indent=2)) print("\n处理完成!") except Exception as e: print(f"\n错误: {e}") import traceback traceback.print_exc() if __name__ == '__main__': main()