Files
TableProcessing/generate_accounting_entries.py
yangfan cc9f3e21c9 fix
2025-10-17 17:44:24 +08:00

318 lines
11 KiB
Python

#!/usr/bin/env python3
"""
根据res.json生成会计分录表AccountingEntries.xlsx
"""
import json
import os
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill
from typing import List, Dict, Any
# 默认汇率
DEFAULT_EXCHANGE_RATE = 7.1072
def load_exchange_rate() -> float:
"""
从exchange_rate.txt文件中读取汇率
如果文件不存在或值异常,则使用默认汇率
返回:
汇率值
"""
rate_file = 'exchange_rate.txt'
# 检查文件是否存在
if not os.path.exists(rate_file):
print(f"汇率文件 {rate_file} 不存在,使用默认汇率: {DEFAULT_EXCHANGE_RATE}")
return DEFAULT_EXCHANGE_RATE
try:
# 读取文件内容
with open(rate_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
# 尝试转换为浮点数
rate = float(content)
# 验证汇率是否合理 (假设汇率应该在 0.1 到 100 之间)
if rate <= 0 or rate > 100:
print(f"汇率文件中的值 {rate} 不合理,使用默认汇率: {DEFAULT_EXCHANGE_RATE}")
return DEFAULT_EXCHANGE_RATE
print(f"{rate_file} 读取汇率: {rate}")
return rate
except ValueError:
print(f"汇率文件 {rate_file} 中的值无法解析,使用默认汇率: {DEFAULT_EXCHANGE_RATE}")
return DEFAULT_EXCHANGE_RATE
except Exception as e:
print(f"读取汇率文件时发生错误: {e},使用默认汇率: {DEFAULT_EXCHANGE_RATE}")
return DEFAULT_EXCHANGE_RATE
def create_accounting_entries(data: List[Dict[str, Any]], exchange_rate: float) -> List[Dict[str, Any]]:
"""
根据财务数据生成会计分录
参数:
data: res.json中的财务数据
exchange_rate: 汇率
返回:
会计分录列表
"""
entries = []
for record in data:
received_amount = record["ReceivedAmount"]
handling_fee = record["HandlingFee"]
orders = record["Order"]
check_res = record.get("checkRes", True) # 获取checkRes字段
# 跳过无效记录
if received_amount is None or not orders:
continue
# 1. ReceivedAmount 借方记录
# 科目代码: 1002.02, 科目名称: 银行存款 - 中行USD
for order in orders:
order_num = order["OrderNum"]
account_name = order["AccountName"]
# 当orderNum为空时,摘要固定为"美金收款"
summary = "美金收款" if not order_num else f"美金收款-{order_num}"
entry_debit = {
"到账金额": received_amount,
"手续费": handling_fee,
"订单号": order_num,
"应收账款": "",
"金蝶名称": account_name,
"摘要": summary,
"借/贷": "",
"科目代码(*)": "1002.02",
"科目名称(*)": "银行存款 - 中行USD",
"核算项目": account_name,
"币别": "美元",
"汇率": exchange_rate,
"原币金额": received_amount,
"金额": round(received_amount * exchange_rate, 2),
"_check_res": check_res # 添加checkRes标记
}
entries.append(entry_debit)
break # 只记录一次借方
# 2. 手续费借方记录 (如果手续费>0)
# 科目代码: 5603.03, 科目名称: 财务费用-手续费
if handling_fee > 0:
# 获取第一个订单号用于摘要
first_order_num = orders[0]["OrderNum"] if orders else ""
# 当orderNum为空时,摘要固定为"美金收款"
summary = "美金收款" if not first_order_num else f"美金收款-{first_order_num}"
entry_fee = {
"到账金额": received_amount,
"手续费": handling_fee,
"订单号": first_order_num,
"应收账款": "",
"金蝶名称": "",
"摘要": summary,
"借/贷": "",
"科目代码(*)": "5603.03",
"科目名称(*)": "财务费用-手续费",
"核算项目": "",
"币别": "人民币",
"汇率": "",
"原币金额": "",
"金额": round(handling_fee * exchange_rate, 2),
"_check_res": check_res # 添加checkRes标记
}
entries.append(entry_fee)
# 3. Order列表中每一项的贷方记录
# 科目代码: 1122, 科目名称: 应收账款
for order in orders:
order_num = order["OrderNum"]
amount = order["Amount"]
account_name = order["AccountName"]
# 跳过金额为空的订单
if amount is None:
continue
# 当orderNum为空时,摘要固定为"美金收款"
summary = "美金收款" if not order_num else f"美金收款-{order_num}"
entry_order = {
"到账金额": received_amount,
"手续费": handling_fee,
"订单号": order_num,
"应收账款": amount, # 填入Order的Amount金额
"金蝶名称": account_name,
"摘要": summary,
"借/贷": "",
"科目代码(*)": "1122",
"科目名称(*)": "应收账款",
"核算项目": account_name,
"币别": "美元",
"汇率": exchange_rate,
"原币金额": amount,
"金额": round(amount * exchange_rate, 2),
"_check_res": check_res # 添加checkRes标记
}
entries.append(entry_order)
return entries
def save_to_excel(entries: List[Dict[str, Any]], output_file: str):
"""
将会计分录保存为Excel文件
参数:
entries: 会计分录列表
output_file: 输出文件路径
"""
from openpyxl.utils import get_column_letter
wb = Workbook()
ws = wb.active
ws.title = "会计分录"
# 定义表头
headers = [
"到账金额", "手续费", "订单号", "应收账款", "金蝶名称",
"摘要", "借/贷", "科目代码(*)", "科目名称(*)",
"核算项目", "币别", "汇率", "原币金额", "金额"
]
# 写入表头
for col_idx, header in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="CCE5FF", end_color="CCE5FF", fill_type="solid")
cell.alignment = Alignment(horizontal="center", vertical="center")
# 写入数据
error_fill = PatternFill(start_color="FAD1D4", end_color="FAD1D4", fill_type="solid")
for row_idx, entry in enumerate(entries, start=2):
check_res = entry.get("_check_res", True)
# 写入数据
ws.cell(row=row_idx, column=1, value=entry.get("到账金额", ""))
ws.cell(row=row_idx, column=2, value=entry.get("手续费", ""))
ws.cell(row=row_idx, column=3, value=entry.get("订单号", ""))
ws.cell(row=row_idx, column=4, value=entry.get("应收账款", ""))
ws.cell(row=row_idx, column=5, value=entry.get("金蝶名称", ""))
ws.cell(row=row_idx, column=6, value=entry.get("摘要", ""))
ws.cell(row=row_idx, column=7, value=entry.get("借/贷", ""))
ws.cell(row=row_idx, column=8, value=entry.get("科目代码(*)", ""))
ws.cell(row=row_idx, column=9, value=entry.get("科目名称(*)", ""))
ws.cell(row=row_idx, column=10, value=entry.get("核算项目", ""))
ws.cell(row=row_idx, column=11, value=entry.get("币别", ""))
ws.cell(row=row_idx, column=12, value=entry.get("汇率", ""))
ws.cell(row=row_idx, column=13, value=entry.get("原币金额", ""))
ws.cell(row=row_idx, column=14, value=entry.get("金额", ""))
# 先设置所有背景颜色(在合并单元格之前)
for row_idx, entry in enumerate(entries, start=2):
check_res = entry.get("_check_res", True)
if not check_res:
for col_idx in range(1, 15):
ws.cell(row=row_idx, column=col_idx).fill = error_fill
# 合并同一ReceivedAmount的"到账金额"和"手续费"单元格
merge_groups = {} # {(received_amount, handling_fee): [row_start, row_end]}
for row_idx, entry in enumerate(entries, start=2):
received_amount = entry.get("到账金额", "")
handling_fee = entry.get("手续费", "")
key = (received_amount, handling_fee)
if key not in merge_groups:
merge_groups[key] = [row_idx, row_idx]
else:
# 检查是否连续
if row_idx == merge_groups[key][1] + 1:
merge_groups[key][1] = row_idx
else:
# 不连续,创建新组
merge_groups[f"{key}_{row_idx}"] = [row_idx, row_idx]
# 执行合并
for key, (start_row, end_row) in merge_groups.items():
if start_row < end_row: # 只有多于1行时才合并
# 合并"到账金额"列(A列)
ws.merge_cells(f'A{start_row}:A{end_row}')
ws.cell(start_row, 1).alignment = Alignment(horizontal="center", vertical="center")
# 合并"手续费"列(B列)
ws.merge_cells(f'B{start_row}:B{end_row}')
ws.cell(start_row, 2).alignment = Alignment(horizontal="center", vertical="center")
# 合并后重新应用背景颜色(确保合并单元格也有背景色)
for row_idx, entry in enumerate(entries, start=2):
check_res = entry.get("_check_res", True)
if not check_res:
for col_idx in range(1, 15):
ws.cell(row=row_idx, column=col_idx).fill = error_fill
# 调整列宽
column_widths = [12, 10, 18, 12, 25, 25, 8, 15, 25, 25, 10, 10, 12, 15]
for col_idx, width in enumerate(column_widths, start=1):
ws.column_dimensions[chr(64 + col_idx)].width = width
# 保存文件
wb.save(output_file)
print(f"\n会计分录已保存到: {output_file}")
print(f"总共生成 {len(entries)} 条会计分录")
def main():
"""主函数"""
input_file = 'res.json'
output_file = 'AccountingEntries.xlsx'
print("开始生成会计分录...")
print(f"读取文件: {input_file}")
try:
# 加载汇率
exchange_rate = load_exchange_rate()
# 读取JSON数据
with open(input_file, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"读取了 {len(data)} 条财务记录")
print(f"使用汇率: {exchange_rate}")
# 生成会计分录
entries = create_accounting_entries(data, exchange_rate)
# 保存到Excel
save_to_excel(entries, output_file)
# 统计信息
debit_count = sum(1 for e in entries if e["借/贷"] == "")
credit_count = sum(1 for e in entries if e["借/贷"] == "")
print(f"\n统计:")
print(f" 借方记录: {debit_count}")
print(f" 贷方记录: {credit_count}")
print("\n处理完成!")
except Exception as e:
print(f"\n错误: {e}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()