#!/usr/bin/env python3 """ 根据res.json生成会计分录表AccountingEntries.xlsx """ import json import os from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill from typing import List, Dict, Any # 默认汇率 DEFAULT_EXCHANGE_RATE = 7.1072 def load_exchange_rate() -> float: """ 从exchange_rate.txt文件中读取汇率 如果文件不存在或值异常,则使用默认汇率 返回: 汇率值 """ rate_file = 'exchange_rate.txt' # 检查文件是否存在 if not os.path.exists(rate_file): print(f"汇率文件 {rate_file} 不存在,使用默认汇率: {DEFAULT_EXCHANGE_RATE}") return DEFAULT_EXCHANGE_RATE try: # 读取文件内容 with open(rate_file, 'r', encoding='utf-8') as f: content = f.read().strip() # 尝试转换为浮点数 rate = float(content) # 验证汇率是否合理 (假设汇率应该在 0.1 到 100 之间) if rate <= 0 or rate > 100: print(f"汇率文件中的值 {rate} 不合理,使用默认汇率: {DEFAULT_EXCHANGE_RATE}") return DEFAULT_EXCHANGE_RATE print(f"从 {rate_file} 读取汇率: {rate}") return rate except ValueError: print(f"汇率文件 {rate_file} 中的值无法解析,使用默认汇率: {DEFAULT_EXCHANGE_RATE}") return DEFAULT_EXCHANGE_RATE except Exception as e: print(f"读取汇率文件时发生错误: {e},使用默认汇率: {DEFAULT_EXCHANGE_RATE}") return DEFAULT_EXCHANGE_RATE def create_accounting_entries(data: List[Dict[str, Any]], exchange_rate: float) -> List[Dict[str, Any]]: """ 根据财务数据生成会计分录 参数: data: res.json中的财务数据 exchange_rate: 汇率 返回: 会计分录列表 """ entries = [] for record in data: received_amount = record["ReceivedAmount"] handling_fee = record["HandlingFee"] orders = record["Order"] check_res = record.get("checkRes", True) # 获取checkRes字段 # 跳过无效记录 if received_amount is None or not orders: continue # 1. ReceivedAmount 借方记录 # 科目代码: 1002.02, 科目名称: 银行存款 - 中行USD for order in orders: order_num = order["OrderNum"] account_name = order["AccountName"] entry_debit = { "到账金额": received_amount, "手续费": handling_fee, "订单号": order_num, "应收账款": "", "金蝶名称": account_name, "摘要": f"美金收款-{order_num}", "借/贷": "借", "科目代码(*)": "1002.02", "科目名称(*)": "银行存款 - 中行USD", "核算项目": account_name, "币别": "美元", "汇率": exchange_rate, "原币金额": received_amount, "金额": round(received_amount * exchange_rate, 2), "_check_res": check_res # 添加checkRes标记 } entries.append(entry_debit) break # 只记录一次借方 # 2. 手续费借方记录 (如果手续费>0) # 科目代码: 5603.03, 科目名称: 财务费用-手续费 if handling_fee > 0: # 获取第一个订单号用于摘要 first_order_num = orders[0]["OrderNum"] if orders else "" entry_fee = { "到账金额": received_amount, "手续费": handling_fee, "订单号": first_order_num, "应收账款": "", "金蝶名称": "", "摘要": f"美金收款-{first_order_num}", "借/贷": "借", "科目代码(*)": "5603.03", "科目名称(*)": "财务费用-手续费", "核算项目": "", "币别": "人民币", "汇率": "", "原币金额": "", "金额": round(handling_fee * exchange_rate, 2), "_check_res": check_res # 添加checkRes标记 } entries.append(entry_fee) # 3. Order列表中每一项的贷方记录 # 科目代码: 1122, 科目名称: 应收账款 for order in orders: order_num = order["OrderNum"] amount = order["Amount"] account_name = order["AccountName"] # 跳过金额为空的订单 if amount is None: continue entry_order = { "到账金额": received_amount, "手续费": handling_fee, "订单号": order_num, "应收账款": amount, # 填入Order的Amount金额 "金蝶名称": account_name, "摘要": f"美金收款-{order_num}", "借/贷": "贷", "科目代码(*)": "1122", "科目名称(*)": "应收账款", "核算项目": account_name, "币别": "美元", "汇率": exchange_rate, "原币金额": amount, "金额": round(amount * exchange_rate, 2), "_check_res": check_res # 添加checkRes标记 } entries.append(entry_order) return entries def save_to_excel(entries: List[Dict[str, Any]], output_file: str): """ 将会计分录保存为Excel文件 参数: entries: 会计分录列表 output_file: 输出文件路径 """ from openpyxl.utils import get_column_letter wb = Workbook() ws = wb.active ws.title = "会计分录" # 定义表头 headers = [ "到账金额", "手续费", "订单号", "应收账款", "金蝶名称", "摘要", "借/贷", "科目代码(*)", "科目名称(*)", "核算项目", "币别", "汇率", "原币金额", "金额" ] # 写入表头 for col_idx, header in enumerate(headers, start=1): cell = ws.cell(row=1, column=col_idx, value=header) cell.font = Font(bold=True) cell.fill = PatternFill(start_color="CCE5FF", end_color="CCE5FF", fill_type="solid") cell.alignment = Alignment(horizontal="center", vertical="center") # 写入数据 error_fill = PatternFill(start_color="FAD1D4", end_color="FAD1D4", fill_type="solid") for row_idx, entry in enumerate(entries, start=2): check_res = entry.get("_check_res", True) # 写入数据 ws.cell(row=row_idx, column=1, value=entry.get("到账金额", "")) ws.cell(row=row_idx, column=2, value=entry.get("手续费", "")) ws.cell(row=row_idx, column=3, value=entry.get("订单号", "")) ws.cell(row=row_idx, column=4, value=entry.get("应收账款", "")) ws.cell(row=row_idx, column=5, value=entry.get("金蝶名称", "")) ws.cell(row=row_idx, column=6, value=entry.get("摘要", "")) ws.cell(row=row_idx, column=7, value=entry.get("借/贷", "")) ws.cell(row=row_idx, column=8, value=entry.get("科目代码(*)", "")) ws.cell(row=row_idx, column=9, value=entry.get("科目名称(*)", "")) ws.cell(row=row_idx, column=10, value=entry.get("核算项目", "")) ws.cell(row=row_idx, column=11, value=entry.get("币别", "")) ws.cell(row=row_idx, column=12, value=entry.get("汇率", "")) ws.cell(row=row_idx, column=13, value=entry.get("原币金额", "")) ws.cell(row=row_idx, column=14, value=entry.get("金额", "")) # 先设置所有背景颜色(在合并单元格之前) for row_idx, entry in enumerate(entries, start=2): check_res = entry.get("_check_res", True) if not check_res: for col_idx in range(1, 15): ws.cell(row=row_idx, column=col_idx).fill = error_fill # 合并同一ReceivedAmount的"到账金额"和"手续费"单元格 merge_groups = {} # {(received_amount, handling_fee): [row_start, row_end]} for row_idx, entry in enumerate(entries, start=2): received_amount = entry.get("到账金额", "") handling_fee = entry.get("手续费", "") key = (received_amount, handling_fee) if key not in merge_groups: merge_groups[key] = [row_idx, row_idx] else: # 检查是否连续 if row_idx == merge_groups[key][1] + 1: merge_groups[key][1] = row_idx else: # 不连续,创建新组 merge_groups[f"{key}_{row_idx}"] = [row_idx, row_idx] # 执行合并 for key, (start_row, end_row) in merge_groups.items(): if start_row < end_row: # 只有多于1行时才合并 # 合并"到账金额"列(A列) ws.merge_cells(f'A{start_row}:A{end_row}') ws.cell(start_row, 1).alignment = Alignment(horizontal="center", vertical="center") # 合并"手续费"列(B列) ws.merge_cells(f'B{start_row}:B{end_row}') ws.cell(start_row, 2).alignment = Alignment(horizontal="center", vertical="center") # 合并后重新应用背景颜色(确保合并单元格也有背景色) for row_idx, entry in enumerate(entries, start=2): check_res = entry.get("_check_res", True) if not check_res: for col_idx in range(1, 15): ws.cell(row=row_idx, column=col_idx).fill = error_fill # 调整列宽 column_widths = [12, 10, 18, 12, 25, 25, 8, 15, 25, 25, 10, 10, 12, 15] for col_idx, width in enumerate(column_widths, start=1): ws.column_dimensions[chr(64 + col_idx)].width = width # 保存文件 wb.save(output_file) print(f"\n会计分录已保存到: {output_file}") print(f"总共生成 {len(entries)} 条会计分录") def main(): """主函数""" input_file = 'res.json' output_file = 'AccountingEntries.xlsx' print("开始生成会计分录...") print(f"读取文件: {input_file}") try: # 加载汇率 exchange_rate = load_exchange_rate() # 读取JSON数据 with open(input_file, 'r', encoding='utf-8') as f: data = json.load(f) print(f"读取了 {len(data)} 条财务记录") print(f"使用汇率: {exchange_rate}") # 生成会计分录 entries = create_accounting_entries(data, exchange_rate) # 保存到Excel save_to_excel(entries, output_file) # 统计信息 debit_count = sum(1 for e in entries if e["借/贷"] == "借") credit_count = sum(1 for e in entries if e["借/贷"] == "贷") print(f"\n统计:") print(f" 借方记录: {debit_count} 条") print(f" 贷方记录: {credit_count} 条") print("\n处理完成!") except Exception as e: print(f"\n错误: {e}") import traceback traceback.print_exc() if __name__ == '__main__': main()