This commit is contained in:
Aiden_
2025-10-31 21:24:08 +08:00
parent cc9f3e21c9
commit 5b32208194
12 changed files with 2047 additions and 23 deletions

483
process_excel_optimized.py Normal file
View File

@@ -0,0 +1,483 @@
#!/usr/bin/env python3
"""
财务Excel数据处理程序 - 优化版本
读取data/data.xlsx中的Sheet1表格,提取财务数据并输出到res.json
优化内容:
1. 性能优化: 缓存合并单元格查询,减少重复计算
2. 代码质量: 改进类型提示,增强错误处理
3. 功能增强: 添加数据统计和验证报告
4. 可维护性: 提取常量配置,改进日志记录
"""
import json
import logging
from dataclasses import dataclass, asdict
from openpyxl import load_workbook
from typing import List, Dict, Any, Optional, Tuple, Set
from datetime import datetime
# ==================== 常量配置 ====================
# Excel列索引常量
COL_RECEIVED_AMOUNT = 6 # F列: 实收金额
COL_HANDLING_FEE = 7 # G列: 手续费
COL_ORDER_NUM = 8 # H列: 订单号
COL_AMOUNT = 9 # I列: 金额
COL_ACCOUNT_NAME = 15 # O列: 账户名
# 数据起始行
DATA_START_ROW = 2
# 金额匹配容差
AMOUNT_TOLERANCE = 0.01
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# ==================== 数据类定义 ====================
@dataclass
class Order:
"""订单数据类"""
OrderNum: Optional[str]
Amount: Optional[float]
AccountName: Optional[str]
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"OrderNum": self.OrderNum,
"Amount": self.Amount,
"AccountName": self.AccountName
}
@dataclass
class FinancialRecord:
"""财务记录数据类"""
ReceivedAmount: Optional[float]
HandlingFee: float
Order: List[Order]
checkRes: bool
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"ReceivedAmount": self.ReceivedAmount,
"HandlingFee": self.HandlingFee,
"Order": [order.to_dict() for order in self.Order],
"checkRes": self.checkRes
}
@dataclass
class ProcessingStats:
"""数据处理统计"""
total_records: int = 0
valid_records: int = 0
invalid_records: int = 0
total_orders: int = 0
check_failed_records: int = 0
skipped_empty_orders: int = 0
processing_time: float = 0.0
def print_report(self):
"""打印统计报告"""
logger.info("=" * 60)
logger.info("数据处理统计报告")
logger.info("=" * 60)
logger.info(f"总记录数: {self.total_records}")
logger.info(f"有效记录: {self.valid_records}")
logger.info(f"无效记录: {self.invalid_records}")
logger.info(f"订单总数: {self.total_orders}")
logger.info(f"金额验证失败: {self.check_failed_records}")
logger.info(f"跳过空订单: {self.skipped_empty_orders}")
logger.info(f"处理耗时: {self.processing_time:.2f}")
logger.info("=" * 60)
# ==================== 合并单元格缓存 ====================
class MergedCellCache:
"""合并单元格缓存类,优化查询性能"""
def __init__(self, merged_ranges):
"""初始化缓存"""
self.merged_ranges = merged_ranges
# 创建行列索引缓存
self._cache: Dict[Tuple[int, int], Any] = {}
self._build_cache()
def _build_cache(self):
"""构建缓存索引"""
for merged_range in self.merged_ranges:
for row in range(merged_range.min_row, merged_range.max_row + 1):
for col in range(merged_range.min_col, merged_range.max_col + 1):
self._cache[(row, col)] = merged_range
def get_merged_range(self, row: int, col: int) -> Optional[Any]:
"""获取指定单元格所属的合并范围"""
return self._cache.get((row, col))
def is_merged(self, row: int, col: int) -> bool:
"""检查单元格是否在合并区域内"""
return (row, col) in self._cache
# ==================== 核心处理函数 ====================
def get_cell_value(ws, row: int, col: int) -> Any:
"""
获取单元格值,处理公式计算结果
参数:
ws: 工作表对象
row: 行号
col: 列号
返回:
单元格值
"""
try:
cell = ws.cell(row, col)
return cell.value
except Exception as e:
logger.warning(f"获取单元格({row}, {col})值失败: {e}")
return None
def get_merged_cell_value(ws, row: int, col: int, cache: MergedCellCache) -> Any:
"""
获取合并单元格的值(优化版本)
如果单元格在合并区域内,返回合并区域左上角单元格的值
参数:
ws: 工作表对象
row: 行号
col: 列号
cache: 合并单元格缓存对象
返回:
单元格值
"""
merged_range = cache.get_merged_range(row, col)
if merged_range:
# 在合并区域内,返回左上角的值
return ws.cell(merged_range.min_row, merged_range.min_col).value
# 不在合并区域内,直接返回单元格值
return ws.cell(row, col).value
def get_f_column_ranges(ws, merged_ranges, start_row: int = DATA_START_ROW) -> List[Tuple[int, int]]:
"""
获取F列的所有数据区域优化版本
参数:
ws: 工作表对象
merged_ranges: 合并单元格范围列表
start_row: 起始行号
返回:
[(起始行, 结束行), ...]
"""
# 找到F列的所有合并单元格区域
f_merges = []
for merge in merged_ranges:
if merge.min_col == COL_RECEIVED_AMOUNT and \
merge.max_col == COL_RECEIVED_AMOUNT and \
merge.min_row >= start_row:
f_merges.append((merge.min_row, merge.max_row))
# 创建合并单元格行的集合,用于快速查找
merged_rows: Set[int] = set()
for start, end in f_merges:
merged_rows.update(range(start, end + 1))
# 处理非合并单元格
all_ranges = []
for row in range(start_row, ws.max_row + 1):
if row not in merged_rows:
f_value = ws.cell(row, COL_RECEIVED_AMOUNT).value
if f_value is not None:
all_ranges.append((row, row))
# 添加合并单元格区域并排序
all_ranges.extend(f_merges)
all_ranges.sort(key=lambda x: x[0])
return all_ranges
def extract_orders(ws, start_row: int, end_row: int, cache: MergedCellCache,
stats: ProcessingStats) -> List[Order]:
"""
提取指定行范围内的订单数据(优化版本)
参数:
ws: 工作表对象
start_row: 起始行
end_row: 结束行
cache: 合并单元格缓存
stats: 统计对象
返回:
订单列表
"""
orders = []
for row in range(start_row, end_row + 1):
# 提取订单数据
order_num = get_merged_cell_value(ws, row, COL_ORDER_NUM, cache)
amount = get_merged_cell_value(ws, row, COL_AMOUNT, cache)
account_name = get_merged_cell_value(ws, row, COL_ACCOUNT_NAME, cache)
# 跳过金额为空的行
if amount is None:
stats.skipped_empty_orders += 1
continue
order = Order(
OrderNum=str(order_num).strip() if order_num else None,
Amount=float(amount) if amount is not None else None,
AccountName=str(account_name).strip() if account_name else None
)
orders.append(order)
stats.total_orders += 1
return orders
def validate_amount(received_amount: Optional[float], handling_fee: float,
orders: List[Order]) -> bool:
"""
验证金额是否匹配
参数:
received_amount: 实收金额
handling_fee: 手续费
orders: 订单列表
返回:
是否匹配
"""
order_amount_sum = sum(
order.Amount for order in orders
if order.Amount is not None
)
received_plus_fee = (received_amount if received_amount is not None else 0) + handling_fee
return abs(received_plus_fee - order_amount_sum) < AMOUNT_TOLERANCE
def process_excel_data(file_path: str) -> Tuple[List[FinancialRecord], ProcessingStats]:
"""
处理Excel文件提取所有财务记录优化版本
参数:
file_path: Excel文件路径
返回:
(记录列表, 统计信息)
"""
start_time = datetime.now()
stats = ProcessingStats()
logger.info(f"开始加载Excel文件: {file_path}")
try:
# 加载Excel文件
wb = load_workbook(file_path, data_only=True)
ws = wb['Sheet1']
# 获取合并单元格范围并创建缓存
merged_ranges = list(ws.merged_cells.ranges)
cache = MergedCellCache(merged_ranges)
logger.info(f"创建合并单元格缓存,共{len(merged_ranges)}个合并区域")
# 获取F列的所有数据区域
f_ranges = get_f_column_ranges(ws, merged_ranges, DATA_START_ROW)
logger.info(f"找到{len(f_ranges)}个F列数据区域")
results = []
for idx, (start_row, end_row) in enumerate(f_ranges, 1):
stats.total_records += 1
# 获取实收金额和手续费
received_amount = get_cell_value(ws, start_row, COL_RECEIVED_AMOUNT)
handling_fee = get_cell_value(ws, start_row, COL_HANDLING_FEE)
# 手续费为空时记为0
if handling_fee is None:
handling_fee = 0
# 提取订单列表
orders = extract_orders(ws, start_row, end_row, cache, stats)
# 跳过没有订单的记录
if not orders:
stats.invalid_records += 1
logger.warning(f"记录#{idx} (行{start_row}-{end_row}): 无有效订单,跳过")
continue
# 验证金额
check_res = validate_amount(received_amount, handling_fee, orders)
if not check_res:
stats.check_failed_records += 1
logger.warning(
f"记录#{idx} (行{start_row}-{end_row}): "
f"金额验证失败 - 实收:{received_amount}, 手续费:{handling_fee}, "
f"订单总额:{sum(o.Amount for o in orders if o.Amount)}"
)
record = FinancialRecord(
ReceivedAmount=received_amount,
HandlingFee=handling_fee,
Order=orders,
checkRes=check_res
)
results.append(record)
stats.valid_records += 1
# 计算处理时间
end_time = datetime.now()
stats.processing_time = (end_time - start_time).total_seconds()
logger.info(f"数据提取完成,共{stats.valid_records}条有效记录")
return results, stats
except Exception as e:
logger.error(f"处理Excel文件时发生错误: {e}", exc_info=True)
raise
def save_to_json(data: List[FinancialRecord], output_file: str):
"""
将数据保存为JSON文件
参数:
data: 要保存的数据
output_file: 输出文件路径
"""
try:
# 转换为字典格式
data_dict = [record.to_dict() for record in data]
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(data_dict, f, ensure_ascii=False, indent=2)
logger.info(f"数据已保存到: {output_file}")
logger.info(f"总共提取 {len(data)} 条记录")
except Exception as e:
logger.error(f"保存JSON文件时发生错误: {e}", exc_info=True)
raise
def generate_validation_report(data: List[FinancialRecord], output_file: str = 'validation_report.txt'):
"""
生成数据验证报告
参数:
data: 财务记录列表
output_file: 报告文件路径
"""
try:
with open(output_file, 'w', encoding='utf-8') as f:
f.write("=" * 80 + "\n")
f.write("财务数据验证报告\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("=" * 80 + "\n\n")
# 统计信息
total = len(data)
failed = sum(1 for r in data if not r.checkRes)
passed = total - failed
f.write(f"总记录数: {total}\n")
f.write(f"验证通过: {passed} ({passed/total*100:.1f}%)\n")
f.write(f"验证失败: {failed} ({failed/total*100:.1f}%)\n\n")
# 失败记录详情
if failed > 0:
f.write("-" * 80 + "\n")
f.write("验证失败记录详情:\n")
f.write("-" * 80 + "\n\n")
for idx, record in enumerate(data, 1):
if not record.checkRes:
order_sum = sum(o.Amount for o in record.Order if o.Amount)
received_plus_fee = (record.ReceivedAmount or 0) + record.HandlingFee
diff = received_plus_fee - order_sum
f.write(f"记录 #{idx}:\n")
f.write(f" 实收金额: {record.ReceivedAmount}\n")
f.write(f" 手续费: {record.HandlingFee}\n")
f.write(f" 实收+手续费: {received_plus_fee}\n")
f.write(f" 订单总额: {order_sum}\n")
f.write(f" 差额: {diff:.2f}\n")
f.write(f" 订单数量: {len(record.Order)}\n")
f.write("\n")
logger.info(f"验证报告已保存到: {output_file}")
except Exception as e:
logger.error(f"生成验证报告时发生错误: {e}", exc_info=True)
def main():
"""主函数"""
input_file = 'data/data.xlsx'
output_file = 'res.json'
logger.info("=" * 60)
logger.info("财务Excel数据处理程序 - 优化版本")
logger.info("=" * 60)
logger.info(f"输入文件: {input_file}")
logger.info(f"输出文件: {output_file}")
try:
# 提取数据
data, stats = process_excel_data(input_file)
# 保存到JSON
save_to_json(data, output_file)
# 生成验证报告
generate_validation_report(data)
# 显示统计信息
stats.print_report()
# 显示前3条记录预览
if data:
logger.info("\n前3条记录预览:")
preview = [record.to_dict() for record in data[:3]]
print(json.dumps(preview, ensure_ascii=False, indent=2))
logger.info("\n处理完成!")
except Exception as e:
logger.error(f"程序执行失败: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
exit(main())