215 lines
6.3 KiB
Python
215 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
财务Excel数据处理程序
|
|
读取data/data.xlsx中的Sheet1表格,提取财务数据并输出到res.json
|
|
"""
|
|
|
|
import json
|
|
from openpyxl import load_workbook
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
|
|
def get_cell_value(ws, row: int, col: int) -> Any:
|
|
"""获取单元格值,处理公式计算结果"""
|
|
cell = ws.cell(row, col)
|
|
return cell.value
|
|
|
|
|
|
def get_merged_cell_value(ws, row: int, col: int, merged_ranges) -> Any:
|
|
"""
|
|
获取合并单元格的值
|
|
如果单元格在合并区域内,返回合并区域左上角单元格的值
|
|
"""
|
|
for merged_range in merged_ranges:
|
|
if merged_range.min_row <= row <= merged_range.max_row and \
|
|
merged_range.min_col <= col <= merged_range.max_col:
|
|
# 返回合并区域左上角的值
|
|
return ws.cell(merged_range.min_row, merged_range.min_col).value
|
|
|
|
# 不在任何合并区域内,直接返回单元格值
|
|
return ws.cell(row, col).value
|
|
|
|
|
|
def get_f_column_ranges(ws, start_row: int = 2) -> List[tuple]:
|
|
"""
|
|
获取F列的所有数据区域(包括合并和非合并单元格)
|
|
从第2行开始读取
|
|
返回: [(起始行, 结束行), ...]
|
|
"""
|
|
merged_cells = list(ws.merged_cells.ranges)
|
|
|
|
# 找到F列的所有合并单元格区域
|
|
f_merges = []
|
|
for merge in merged_cells:
|
|
# F列是第6列
|
|
if merge.min_col == 6 and merge.max_col == 6 and merge.min_row >= start_row:
|
|
f_merges.append((merge.min_row, merge.max_row))
|
|
|
|
# 创建合并单元格行的集合,用于快速查找
|
|
merged_rows = set()
|
|
for start, end in f_merges:
|
|
for row in range(start, end + 1):
|
|
merged_rows.add(row)
|
|
|
|
# 处理非合并单元格(从第2行开始到最大行)
|
|
all_ranges = []
|
|
for row in range(start_row, ws.max_row + 1):
|
|
if row not in merged_rows:
|
|
# 检查F列是否有值
|
|
f_value = ws.cell(row, 6).value
|
|
if f_value is not None:
|
|
# 非合并单元格,单独一行
|
|
all_ranges.append((row, row))
|
|
|
|
# 添加合并单元格区域
|
|
all_ranges.extend(f_merges)
|
|
|
|
# 按起始行排序
|
|
all_ranges.sort(key=lambda x: x[0])
|
|
return all_ranges
|
|
|
|
|
|
def extract_orders(ws, start_row: int, end_row: int, merged_ranges) -> List[Dict[str, Any]]:
|
|
"""
|
|
提取指定行范围内的订单数据
|
|
|
|
参数:
|
|
ws: 工作表对象
|
|
start_row: 起始行
|
|
end_row: 结束行
|
|
merged_ranges: 合并单元格范围列表
|
|
|
|
返回:
|
|
订单列表
|
|
"""
|
|
orders = []
|
|
|
|
for row in range(start_row, end_row + 1):
|
|
# H列: 订单号, I列: 金额, O列: 账户名
|
|
order_num = get_merged_cell_value(ws, row, 8, merged_ranges) # H列
|
|
amount = get_merged_cell_value(ws, row, 9, merged_ranges) # I列
|
|
account_name = get_merged_cell_value(ws, row, 15, merged_ranges) # O列
|
|
|
|
# 跳过金额为空的行(金额是必需的),但订单号可以为空
|
|
if amount is None:
|
|
continue
|
|
|
|
order = {
|
|
"OrderNum": str(order_num).strip() if order_num else None,
|
|
"Amount": amount,
|
|
"AccountName": str(account_name).strip() if account_name else None
|
|
}
|
|
orders.append(order)
|
|
|
|
return orders
|
|
|
|
|
|
def process_excel_data(file_path: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
处理Excel文件,提取所有财务记录
|
|
从第2行开始读取数据
|
|
|
|
参数:
|
|
file_path: Excel文件路径
|
|
|
|
返回:
|
|
记录列表,每条记录包含ReceivedAmount, HandlingFee和Order列表
|
|
"""
|
|
# 加载Excel文件,data_only=True读取公式计算结果
|
|
wb = load_workbook(file_path, data_only=True)
|
|
ws = wb['Sheet1']
|
|
|
|
# 获取所有合并单元格范围
|
|
merged_ranges = list(ws.merged_cells.ranges)
|
|
|
|
# 获取F列的所有数据区域(从第2行开始)
|
|
f_ranges = get_f_column_ranges(ws, start_row=2)
|
|
|
|
print(f"找到 {len(f_ranges)} 个F列数据区域(包含合并和非合并单元格)")
|
|
|
|
results = []
|
|
|
|
for start_row, end_row in f_ranges:
|
|
# 获取F列(实收金额)和G列(手续费)的值
|
|
# 合并单元格的值在左上角,非合并单元格直接获取
|
|
received_amount = get_cell_value(ws, start_row, 6) # F列
|
|
handling_fee = get_cell_value(ws, start_row, 7) # G列
|
|
|
|
# 手续费为空时记为0
|
|
if handling_fee is None:
|
|
handling_fee = 0
|
|
|
|
# 提取该区域的订单列表
|
|
orders = extract_orders(ws, start_row, end_row, merged_ranges)
|
|
|
|
# 跳过没有订单的记录
|
|
if not orders:
|
|
continue
|
|
|
|
# 计算订单金额总和
|
|
order_amount_sum = sum(order["Amount"] for order in orders if order["Amount"] is not None)
|
|
|
|
# 计算实收金额+手续费
|
|
received_plus_fee = (received_amount if received_amount is not None else 0) + handling_fee
|
|
|
|
# 检查是否相等(考虑浮点数精度)
|
|
check_res = abs(received_plus_fee - order_amount_sum) < 0.01
|
|
|
|
record = {
|
|
"ReceivedAmount": received_amount,
|
|
"HandlingFee": handling_fee,
|
|
"Order": orders,
|
|
"checkRes": check_res
|
|
}
|
|
|
|
results.append(record)
|
|
|
|
return results
|
|
|
|
|
|
def save_to_json(data: List[Dict[str, Any]], output_file: str):
|
|
"""
|
|
将数据保存为JSON文件
|
|
|
|
参数:
|
|
data: 要保存的数据
|
|
output_file: 输出文件路径
|
|
"""
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"\n数据已保存到: {output_file}")
|
|
print(f"总共提取 {len(data)} 条记录")
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
input_file = 'data/data.xlsx'
|
|
output_file = 'res.json'
|
|
|
|
print("开始处理Excel文件...")
|
|
print(f"输入文件: {input_file}")
|
|
|
|
try:
|
|
# 提取数据
|
|
data = process_excel_data(input_file)
|
|
|
|
# 保存到JSON
|
|
save_to_json(data, output_file)
|
|
|
|
# 显示前3条记录作为预览
|
|
if data:
|
|
print("\n前3条记录预览:")
|
|
print(json.dumps(data[:3], ensure_ascii=False, indent=2))
|
|
|
|
print("\n处理完成!")
|
|
|
|
except Exception as e:
|
|
print(f"\n错误: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|