484 lines
15 KiB
Python
484 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
财务Excel数据处理程序 - 优化版本
|
||
读取data/data.xlsx中的Sheet1表格,提取财务数据并输出到res.json
|
||
|
||
优化内容:
|
||
1. 性能优化: 缓存合并单元格查询,减少重复计算
|
||
2. 代码质量: 改进类型提示,增强错误处理
|
||
3. 功能增强: 添加数据统计和验证报告
|
||
4. 可维护性: 提取常量配置,改进日志记录
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
from dataclasses import dataclass, asdict
|
||
from openpyxl import load_workbook
|
||
from typing import List, Dict, Any, Optional, Tuple, Set
|
||
from datetime import datetime
|
||
|
||
# ==================== 常量配置 ====================
|
||
|
||
# Excel列索引常量
|
||
COL_RECEIVED_AMOUNT = 6 # F列: 实收金额
|
||
COL_HANDLING_FEE = 7 # G列: 手续费
|
||
COL_ORDER_NUM = 8 # H列: 订单号
|
||
COL_AMOUNT = 9 # I列: 金额
|
||
COL_ACCOUNT_NAME = 15 # O列: 账户名
|
||
|
||
# 数据起始行
|
||
DATA_START_ROW = 2
|
||
|
||
# 金额匹配容差
|
||
AMOUNT_TOLERANCE = 0.01
|
||
|
||
# 日志配置
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
datefmt='%Y-%m-%d %H:%M:%S'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ==================== 数据类定义 ====================
|
||
|
||
@dataclass
|
||
class Order:
|
||
"""订单数据类"""
|
||
OrderNum: Optional[str]
|
||
Amount: Optional[float]
|
||
AccountName: Optional[str]
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典"""
|
||
return {
|
||
"OrderNum": self.OrderNum,
|
||
"Amount": self.Amount,
|
||
"AccountName": self.AccountName
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class FinancialRecord:
|
||
"""财务记录数据类"""
|
||
ReceivedAmount: Optional[float]
|
||
HandlingFee: float
|
||
Order: List[Order]
|
||
checkRes: bool
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""转换为字典"""
|
||
return {
|
||
"ReceivedAmount": self.ReceivedAmount,
|
||
"HandlingFee": self.HandlingFee,
|
||
"Order": [order.to_dict() for order in self.Order],
|
||
"checkRes": self.checkRes
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class ProcessingStats:
|
||
"""数据处理统计"""
|
||
total_records: int = 0
|
||
valid_records: int = 0
|
||
invalid_records: int = 0
|
||
total_orders: int = 0
|
||
check_failed_records: int = 0
|
||
skipped_empty_orders: int = 0
|
||
processing_time: float = 0.0
|
||
|
||
def print_report(self):
|
||
"""打印统计报告"""
|
||
logger.info("=" * 60)
|
||
logger.info("数据处理统计报告")
|
||
logger.info("=" * 60)
|
||
logger.info(f"总记录数: {self.total_records}")
|
||
logger.info(f"有效记录: {self.valid_records}")
|
||
logger.info(f"无效记录: {self.invalid_records}")
|
||
logger.info(f"订单总数: {self.total_orders}")
|
||
logger.info(f"金额验证失败: {self.check_failed_records}")
|
||
logger.info(f"跳过空订单: {self.skipped_empty_orders}")
|
||
logger.info(f"处理耗时: {self.processing_time:.2f}秒")
|
||
logger.info("=" * 60)
|
||
|
||
|
||
# ==================== 合并单元格缓存 ====================
|
||
|
||
class MergedCellCache:
|
||
"""合并单元格缓存类,优化查询性能"""
|
||
|
||
def __init__(self, merged_ranges):
|
||
"""初始化缓存"""
|
||
self.merged_ranges = merged_ranges
|
||
# 创建行列索引缓存
|
||
self._cache: Dict[Tuple[int, int], Any] = {}
|
||
self._build_cache()
|
||
|
||
def _build_cache(self):
|
||
"""构建缓存索引"""
|
||
for merged_range in self.merged_ranges:
|
||
for row in range(merged_range.min_row, merged_range.max_row + 1):
|
||
for col in range(merged_range.min_col, merged_range.max_col + 1):
|
||
self._cache[(row, col)] = merged_range
|
||
|
||
def get_merged_range(self, row: int, col: int) -> Optional[Any]:
|
||
"""获取指定单元格所属的合并范围"""
|
||
return self._cache.get((row, col))
|
||
|
||
def is_merged(self, row: int, col: int) -> bool:
|
||
"""检查单元格是否在合并区域内"""
|
||
return (row, col) in self._cache
|
||
|
||
|
||
# ==================== 核心处理函数 ====================
|
||
|
||
def get_cell_value(ws, row: int, col: int) -> Any:
|
||
"""
|
||
获取单元格值,处理公式计算结果
|
||
|
||
参数:
|
||
ws: 工作表对象
|
||
row: 行号
|
||
col: 列号
|
||
|
||
返回:
|
||
单元格值
|
||
"""
|
||
try:
|
||
cell = ws.cell(row, col)
|
||
return cell.value
|
||
except Exception as e:
|
||
logger.warning(f"获取单元格({row}, {col})值失败: {e}")
|
||
return None
|
||
|
||
|
||
def get_merged_cell_value(ws, row: int, col: int, cache: MergedCellCache) -> Any:
|
||
"""
|
||
获取合并单元格的值(优化版本)
|
||
如果单元格在合并区域内,返回合并区域左上角单元格的值
|
||
|
||
参数:
|
||
ws: 工作表对象
|
||
row: 行号
|
||
col: 列号
|
||
cache: 合并单元格缓存对象
|
||
|
||
返回:
|
||
单元格值
|
||
"""
|
||
merged_range = cache.get_merged_range(row, col)
|
||
|
||
if merged_range:
|
||
# 在合并区域内,返回左上角的值
|
||
return ws.cell(merged_range.min_row, merged_range.min_col).value
|
||
|
||
# 不在合并区域内,直接返回单元格值
|
||
return ws.cell(row, col).value
|
||
|
||
|
||
def get_f_column_ranges(ws, merged_ranges, start_row: int = DATA_START_ROW) -> List[Tuple[int, int]]:
|
||
"""
|
||
获取F列的所有数据区域(优化版本)
|
||
|
||
参数:
|
||
ws: 工作表对象
|
||
merged_ranges: 合并单元格范围列表
|
||
start_row: 起始行号
|
||
|
||
返回:
|
||
[(起始行, 结束行), ...]
|
||
"""
|
||
# 找到F列的所有合并单元格区域
|
||
f_merges = []
|
||
for merge in merged_ranges:
|
||
if merge.min_col == COL_RECEIVED_AMOUNT and \
|
||
merge.max_col == COL_RECEIVED_AMOUNT and \
|
||
merge.min_row >= start_row:
|
||
f_merges.append((merge.min_row, merge.max_row))
|
||
|
||
# 创建合并单元格行的集合,用于快速查找
|
||
merged_rows: Set[int] = set()
|
||
for start, end in f_merges:
|
||
merged_rows.update(range(start, end + 1))
|
||
|
||
# 处理非合并单元格
|
||
all_ranges = []
|
||
for row in range(start_row, ws.max_row + 1):
|
||
if row not in merged_rows:
|
||
f_value = ws.cell(row, COL_RECEIVED_AMOUNT).value
|
||
if f_value is not None:
|
||
all_ranges.append((row, row))
|
||
|
||
# 添加合并单元格区域并排序
|
||
all_ranges.extend(f_merges)
|
||
all_ranges.sort(key=lambda x: x[0])
|
||
|
||
return all_ranges
|
||
|
||
|
||
def extract_orders(ws, start_row: int, end_row: int, cache: MergedCellCache,
|
||
stats: ProcessingStats) -> List[Order]:
|
||
"""
|
||
提取指定行范围内的订单数据(优化版本)
|
||
|
||
参数:
|
||
ws: 工作表对象
|
||
start_row: 起始行
|
||
end_row: 结束行
|
||
cache: 合并单元格缓存
|
||
stats: 统计对象
|
||
|
||
返回:
|
||
订单列表
|
||
"""
|
||
orders = []
|
||
|
||
for row in range(start_row, end_row + 1):
|
||
# 提取订单数据
|
||
order_num = get_merged_cell_value(ws, row, COL_ORDER_NUM, cache)
|
||
amount = get_merged_cell_value(ws, row, COL_AMOUNT, cache)
|
||
account_name = get_merged_cell_value(ws, row, COL_ACCOUNT_NAME, cache)
|
||
|
||
# 跳过金额为空的行
|
||
if amount is None:
|
||
stats.skipped_empty_orders += 1
|
||
continue
|
||
|
||
order = Order(
|
||
OrderNum=str(order_num).strip() if order_num else None,
|
||
Amount=float(amount) if amount is not None else None,
|
||
AccountName=str(account_name).strip() if account_name else None
|
||
)
|
||
orders.append(order)
|
||
stats.total_orders += 1
|
||
|
||
return orders
|
||
|
||
|
||
def validate_amount(received_amount: Optional[float], handling_fee: float,
|
||
orders: List[Order]) -> bool:
|
||
"""
|
||
验证金额是否匹配
|
||
|
||
参数:
|
||
received_amount: 实收金额
|
||
handling_fee: 手续费
|
||
orders: 订单列表
|
||
|
||
返回:
|
||
是否匹配
|
||
"""
|
||
order_amount_sum = sum(
|
||
order.Amount for order in orders
|
||
if order.Amount is not None
|
||
)
|
||
|
||
received_plus_fee = (received_amount if received_amount is not None else 0) + handling_fee
|
||
|
||
return abs(received_plus_fee - order_amount_sum) < AMOUNT_TOLERANCE
|
||
|
||
|
||
def process_excel_data(file_path: str) -> Tuple[List[FinancialRecord], ProcessingStats]:
|
||
"""
|
||
处理Excel文件,提取所有财务记录(优化版本)
|
||
|
||
参数:
|
||
file_path: Excel文件路径
|
||
|
||
返回:
|
||
(记录列表, 统计信息)
|
||
"""
|
||
start_time = datetime.now()
|
||
stats = ProcessingStats()
|
||
|
||
logger.info(f"开始加载Excel文件: {file_path}")
|
||
|
||
try:
|
||
# 加载Excel文件
|
||
wb = load_workbook(file_path, data_only=True)
|
||
ws = wb['Sheet1']
|
||
|
||
# 获取合并单元格范围并创建缓存
|
||
merged_ranges = list(ws.merged_cells.ranges)
|
||
cache = MergedCellCache(merged_ranges)
|
||
logger.info(f"创建合并单元格缓存,共{len(merged_ranges)}个合并区域")
|
||
|
||
# 获取F列的所有数据区域
|
||
f_ranges = get_f_column_ranges(ws, merged_ranges, DATA_START_ROW)
|
||
logger.info(f"找到{len(f_ranges)}个F列数据区域")
|
||
|
||
results = []
|
||
|
||
for idx, (start_row, end_row) in enumerate(f_ranges, 1):
|
||
stats.total_records += 1
|
||
|
||
# 获取实收金额和手续费
|
||
received_amount = get_cell_value(ws, start_row, COL_RECEIVED_AMOUNT)
|
||
handling_fee = get_cell_value(ws, start_row, COL_HANDLING_FEE)
|
||
|
||
# 手续费为空时记为0
|
||
if handling_fee is None:
|
||
handling_fee = 0
|
||
|
||
# 提取订单列表
|
||
orders = extract_orders(ws, start_row, end_row, cache, stats)
|
||
|
||
# 跳过没有订单的记录
|
||
if not orders:
|
||
stats.invalid_records += 1
|
||
logger.warning(f"记录#{idx} (行{start_row}-{end_row}): 无有效订单,跳过")
|
||
continue
|
||
|
||
# 验证金额
|
||
check_res = validate_amount(received_amount, handling_fee, orders)
|
||
if not check_res:
|
||
stats.check_failed_records += 1
|
||
logger.warning(
|
||
f"记录#{idx} (行{start_row}-{end_row}): "
|
||
f"金额验证失败 - 实收:{received_amount}, 手续费:{handling_fee}, "
|
||
f"订单总额:{sum(o.Amount for o in orders if o.Amount)}"
|
||
)
|
||
|
||
record = FinancialRecord(
|
||
ReceivedAmount=received_amount,
|
||
HandlingFee=handling_fee,
|
||
Order=orders,
|
||
checkRes=check_res
|
||
)
|
||
|
||
results.append(record)
|
||
stats.valid_records += 1
|
||
|
||
# 计算处理时间
|
||
end_time = datetime.now()
|
||
stats.processing_time = (end_time - start_time).total_seconds()
|
||
|
||
logger.info(f"数据提取完成,共{stats.valid_records}条有效记录")
|
||
|
||
return results, stats
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理Excel文件时发生错误: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
def save_to_json(data: List[FinancialRecord], output_file: str):
|
||
"""
|
||
将数据保存为JSON文件
|
||
|
||
参数:
|
||
data: 要保存的数据
|
||
output_file: 输出文件路径
|
||
"""
|
||
try:
|
||
# 转换为字典格式
|
||
data_dict = [record.to_dict() for record in data]
|
||
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data_dict, f, ensure_ascii=False, indent=2)
|
||
|
||
logger.info(f"数据已保存到: {output_file}")
|
||
logger.info(f"总共提取 {len(data)} 条记录")
|
||
except Exception as e:
|
||
logger.error(f"保存JSON文件时发生错误: {e}", exc_info=True)
|
||
raise
|
||
|
||
|
||
def generate_validation_report(data: List[FinancialRecord], output_file: str = 'validation_report.txt'):
|
||
"""
|
||
生成数据验证报告
|
||
|
||
参数:
|
||
data: 财务记录列表
|
||
output_file: 报告文件路径
|
||
"""
|
||
try:
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
f.write("=" * 80 + "\n")
|
||
f.write("财务数据验证报告\n")
|
||
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
f.write("=" * 80 + "\n\n")
|
||
|
||
# 统计信息
|
||
total = len(data)
|
||
failed = sum(1 for r in data if not r.checkRes)
|
||
passed = total - failed
|
||
|
||
f.write(f"总记录数: {total}\n")
|
||
f.write(f"验证通过: {passed} ({passed/total*100:.1f}%)\n")
|
||
f.write(f"验证失败: {failed} ({failed/total*100:.1f}%)\n\n")
|
||
|
||
# 失败记录详情
|
||
if failed > 0:
|
||
f.write("-" * 80 + "\n")
|
||
f.write("验证失败记录详情:\n")
|
||
f.write("-" * 80 + "\n\n")
|
||
|
||
for idx, record in enumerate(data, 1):
|
||
if not record.checkRes:
|
||
order_sum = sum(o.Amount for o in record.Order if o.Amount)
|
||
received_plus_fee = (record.ReceivedAmount or 0) + record.HandlingFee
|
||
diff = received_plus_fee - order_sum
|
||
|
||
f.write(f"记录 #{idx}:\n")
|
||
f.write(f" 实收金额: {record.ReceivedAmount}\n")
|
||
f.write(f" 手续费: {record.HandlingFee}\n")
|
||
f.write(f" 实收+手续费: {received_plus_fee}\n")
|
||
f.write(f" 订单总额: {order_sum}\n")
|
||
f.write(f" 差额: {diff:.2f}\n")
|
||
f.write(f" 订单数量: {len(record.Order)}\n")
|
||
f.write("\n")
|
||
|
||
logger.info(f"验证报告已保存到: {output_file}")
|
||
except Exception as e:
|
||
logger.error(f"生成验证报告时发生错误: {e}", exc_info=True)
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
input_file = 'data/data.xlsx'
|
||
output_file = 'res.json'
|
||
|
||
logger.info("=" * 60)
|
||
logger.info("财务Excel数据处理程序 - 优化版本")
|
||
logger.info("=" * 60)
|
||
logger.info(f"输入文件: {input_file}")
|
||
logger.info(f"输出文件: {output_file}")
|
||
|
||
try:
|
||
# 提取数据
|
||
data, stats = process_excel_data(input_file)
|
||
|
||
# 保存到JSON
|
||
save_to_json(data, output_file)
|
||
|
||
# 生成验证报告
|
||
generate_validation_report(data)
|
||
|
||
# 显示统计信息
|
||
stats.print_report()
|
||
|
||
# 显示前3条记录预览
|
||
if data:
|
||
logger.info("\n前3条记录预览:")
|
||
preview = [record.to_dict() for record in data[:3]]
|
||
print(json.dumps(preview, ensure_ascii=False, indent=2))
|
||
|
||
logger.info("\n处理完成!")
|
||
|
||
except Exception as e:
|
||
logger.error(f"程序执行失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == '__main__':
|
||
exit(main())
|
||
|
||
|
||
|