Files
TableProcessing/process_excel.py
yangfan cc9f3e21c9 fix
2025-10-17 17:44:24 +08:00

215 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
财务Excel数据处理程序
读取data/data.xlsx中的Sheet1表格,提取财务数据并输出到res.json
"""
import json
from openpyxl import load_workbook
from typing import List, Dict, Any, Optional
def get_cell_value(ws, row: int, col: int) -> Any:
"""获取单元格值,处理公式计算结果"""
cell = ws.cell(row, col)
return cell.value
def get_merged_cell_value(ws, row: int, col: int, merged_ranges) -> Any:
"""
获取合并单元格的值
如果单元格在合并区域内,返回合并区域左上角单元格的值
"""
for merged_range in merged_ranges:
if merged_range.min_row <= row <= merged_range.max_row and \
merged_range.min_col <= col <= merged_range.max_col:
# 返回合并区域左上角的值
return ws.cell(merged_range.min_row, merged_range.min_col).value
# 不在任何合并区域内,直接返回单元格值
return ws.cell(row, col).value
def get_f_column_ranges(ws, start_row: int = 2) -> List[tuple]:
"""
获取F列的所有数据区域(包括合并和非合并单元格)
从第2行开始读取
返回: [(起始行, 结束行), ...]
"""
merged_cells = list(ws.merged_cells.ranges)
# 找到F列的所有合并单元格区域
f_merges = []
for merge in merged_cells:
# F列是第6列
if merge.min_col == 6 and merge.max_col == 6 and merge.min_row >= start_row:
f_merges.append((merge.min_row, merge.max_row))
# 创建合并单元格行的集合,用于快速查找
merged_rows = set()
for start, end in f_merges:
for row in range(start, end + 1):
merged_rows.add(row)
# 处理非合并单元格(从第2行开始到最大行)
all_ranges = []
for row in range(start_row, ws.max_row + 1):
if row not in merged_rows:
# 检查F列是否有值
f_value = ws.cell(row, 6).value
if f_value is not None:
# 非合并单元格,单独一行
all_ranges.append((row, row))
# 添加合并单元格区域
all_ranges.extend(f_merges)
# 按起始行排序
all_ranges.sort(key=lambda x: x[0])
return all_ranges
def extract_orders(ws, start_row: int, end_row: int, merged_ranges) -> List[Dict[str, Any]]:
"""
提取指定行范围内的订单数据
参数:
ws: 工作表对象
start_row: 起始行
end_row: 结束行
merged_ranges: 合并单元格范围列表
返回:
订单列表
"""
orders = []
for row in range(start_row, end_row + 1):
# H列: 订单号, I列: 金额, O列: 账户名
order_num = get_merged_cell_value(ws, row, 8, merged_ranges) # H列
amount = get_merged_cell_value(ws, row, 9, merged_ranges) # I列
account_name = get_merged_cell_value(ws, row, 15, merged_ranges) # O列
# 跳过金额为空的行(金额是必需的),但订单号可以为空
if amount is None:
continue
order = {
"OrderNum": str(order_num).strip() if order_num else None,
"Amount": amount,
"AccountName": str(account_name).strip() if account_name else None
}
orders.append(order)
return orders
def process_excel_data(file_path: str) -> List[Dict[str, Any]]:
"""
处理Excel文件,提取所有财务记录
从第2行开始读取数据
参数:
file_path: Excel文件路径
返回:
记录列表,每条记录包含ReceivedAmount, HandlingFee和Order列表
"""
# 加载Excel文件,data_only=True读取公式计算结果
wb = load_workbook(file_path, data_only=True)
ws = wb['Sheet1']
# 获取所有合并单元格范围
merged_ranges = list(ws.merged_cells.ranges)
# 获取F列的所有数据区域(从第2行开始)
f_ranges = get_f_column_ranges(ws, start_row=2)
print(f"找到 {len(f_ranges)} 个F列数据区域(包含合并和非合并单元格)")
results = []
for start_row, end_row in f_ranges:
# 获取F列(实收金额)和G列(手续费)的值
# 合并单元格的值在左上角,非合并单元格直接获取
received_amount = get_cell_value(ws, start_row, 6) # F列
handling_fee = get_cell_value(ws, start_row, 7) # G列
# 手续费为空时记为0
if handling_fee is None:
handling_fee = 0
# 提取该区域的订单列表
orders = extract_orders(ws, start_row, end_row, merged_ranges)
# 跳过没有订单的记录
if not orders:
continue
# 计算订单金额总和
order_amount_sum = sum(order["Amount"] for order in orders if order["Amount"] is not None)
# 计算实收金额+手续费
received_plus_fee = (received_amount if received_amount is not None else 0) + handling_fee
# 检查是否相等(考虑浮点数精度)
check_res = abs(received_plus_fee - order_amount_sum) < 0.01
record = {
"ReceivedAmount": received_amount,
"HandlingFee": handling_fee,
"Order": orders,
"checkRes": check_res
}
results.append(record)
return results
def save_to_json(data: List[Dict[str, Any]], output_file: str):
"""
将数据保存为JSON文件
参数:
data: 要保存的数据
output_file: 输出文件路径
"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"\n数据已保存到: {output_file}")
print(f"总共提取 {len(data)} 条记录")
def main():
"""主函数"""
input_file = 'data/data.xlsx'
output_file = 'res.json'
print("开始处理Excel文件...")
print(f"输入文件: {input_file}")
try:
# 提取数据
data = process_excel_data(input_file)
# 保存到JSON
save_to_json(data, output_file)
# 显示前3条记录作为预览
if data:
print("\n前3条记录预览:")
print(json.dumps(data[:3], ensure_ascii=False, indent=2))
print("\n处理完成!")
except Exception as e:
print(f"\n错误: {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()