332 lines
10 KiB
Python
332 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
财务Excel数据处理程序的单元测试
|
|
"""
|
|
|
|
import unittest
|
|
import json
|
|
import os
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
from openpyxl import Workbook
|
|
from openpyxl.worksheet.worksheet import Worksheet
|
|
|
|
# 导入被测试的模块
|
|
try:
|
|
from process_excel_optimized import (
|
|
Order, FinancialRecord, ProcessingStats,
|
|
MergedCellCache, get_cell_value, validate_amount
|
|
)
|
|
USE_OPTIMIZED = True
|
|
except ImportError:
|
|
USE_OPTIMIZED = False
|
|
print("警告: 无法导入优化版本,部分测试将被跳过")
|
|
|
|
|
|
class TestOrder(unittest.TestCase):
|
|
"""测试Order数据类"""
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_order_creation(self):
|
|
"""测试订单创建"""
|
|
order = Order(
|
|
OrderNum="TEST001",
|
|
Amount=1000.50,
|
|
AccountName="测试账户"
|
|
)
|
|
|
|
self.assertEqual(order.OrderNum, "TEST001")
|
|
self.assertEqual(order.Amount, 1000.50)
|
|
self.assertEqual(order.AccountName, "测试账户")
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_order_to_dict(self):
|
|
"""测试订单转换为字典"""
|
|
order = Order(
|
|
OrderNum="TEST001",
|
|
Amount=1000.50,
|
|
AccountName="测试账户"
|
|
)
|
|
|
|
result = order.to_dict()
|
|
|
|
self.assertIsInstance(result, dict)
|
|
self.assertEqual(result["OrderNum"], "TEST001")
|
|
self.assertEqual(result["Amount"], 1000.50)
|
|
self.assertEqual(result["AccountName"], "测试账户")
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_order_with_none_values(self):
|
|
"""测试订单号为空的情况"""
|
|
order = Order(
|
|
OrderNum=None,
|
|
Amount=1000.50,
|
|
AccountName="测试账户"
|
|
)
|
|
|
|
self.assertIsNone(order.OrderNum)
|
|
self.assertEqual(order.Amount, 1000.50)
|
|
|
|
|
|
class TestFinancialRecord(unittest.TestCase):
|
|
"""测试FinancialRecord数据类"""
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_record_creation(self):
|
|
"""测试财务记录创建"""
|
|
orders = [
|
|
Order("TEST001", 1000.0, "账户1"),
|
|
Order("TEST002", 2000.0, "账户2")
|
|
]
|
|
|
|
record = FinancialRecord(
|
|
ReceivedAmount=2975.0,
|
|
HandlingFee=25.0,
|
|
Order=orders,
|
|
checkRes=True
|
|
)
|
|
|
|
self.assertEqual(record.ReceivedAmount, 2975.0)
|
|
self.assertEqual(record.HandlingFee, 25.0)
|
|
self.assertEqual(len(record.Order), 2)
|
|
self.assertTrue(record.checkRes)
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_record_to_dict(self):
|
|
"""测试财务记录转换为字典"""
|
|
orders = [Order("TEST001", 1000.0, "账户1")]
|
|
record = FinancialRecord(
|
|
ReceivedAmount=1000.0,
|
|
HandlingFee=0.0,
|
|
Order=orders,
|
|
checkRes=True
|
|
)
|
|
|
|
result = record.to_dict()
|
|
|
|
self.assertIsInstance(result, dict)
|
|
self.assertEqual(result["ReceivedAmount"], 1000.0)
|
|
self.assertIsInstance(result["Order"], list)
|
|
self.assertEqual(len(result["Order"]), 1)
|
|
|
|
|
|
class TestProcessingStats(unittest.TestCase):
|
|
"""测试ProcessingStats统计类"""
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_stats_initialization(self):
|
|
"""测试统计对象初始化"""
|
|
stats = ProcessingStats()
|
|
|
|
self.assertEqual(stats.total_records, 0)
|
|
self.assertEqual(stats.valid_records, 0)
|
|
self.assertEqual(stats.invalid_records, 0)
|
|
self.assertEqual(stats.total_orders, 0)
|
|
self.assertEqual(stats.check_failed_records, 0)
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_stats_update(self):
|
|
"""测试统计数据更新"""
|
|
stats = ProcessingStats()
|
|
|
|
stats.total_records = 10
|
|
stats.valid_records = 8
|
|
stats.invalid_records = 2
|
|
stats.total_orders = 25
|
|
|
|
self.assertEqual(stats.total_records, 10)
|
|
self.assertEqual(stats.valid_records, 8)
|
|
self.assertEqual(stats.invalid_records, 2)
|
|
self.assertEqual(stats.total_orders, 25)
|
|
|
|
|
|
class TestMergedCellCache(unittest.TestCase):
|
|
"""测试MergedCellCache缓存类"""
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_cache_creation(self):
|
|
"""测试缓存创建"""
|
|
# 创建模拟的合并单元格范围
|
|
mock_range = Mock()
|
|
mock_range.min_row = 2
|
|
mock_range.max_row = 4
|
|
mock_range.min_col = 6
|
|
mock_range.max_col = 6
|
|
|
|
cache = MergedCellCache([mock_range])
|
|
|
|
# 测试缓存是否正确识别合并单元格
|
|
self.assertTrue(cache.is_merged(2, 6))
|
|
self.assertTrue(cache.is_merged(3, 6))
|
|
self.assertTrue(cache.is_merged(4, 6))
|
|
self.assertFalse(cache.is_merged(5, 6))
|
|
self.assertFalse(cache.is_merged(2, 7))
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_cache_get_merged_range(self):
|
|
"""测试获取合并范围"""
|
|
mock_range = Mock()
|
|
mock_range.min_row = 2
|
|
mock_range.max_row = 4
|
|
mock_range.min_col = 6
|
|
mock_range.max_col = 6
|
|
|
|
cache = MergedCellCache([mock_range])
|
|
|
|
# 测试获取合并范围
|
|
result = cache.get_merged_range(3, 6)
|
|
self.assertIsNotNone(result)
|
|
|
|
# 测试非合并单元格
|
|
result = cache.get_merged_range(10, 10)
|
|
self.assertIsNone(result)
|
|
|
|
|
|
class TestValidateAmount(unittest.TestCase):
|
|
"""测试金额验证函数"""
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_validate_exact_match(self):
|
|
"""测试金额完全匹配"""
|
|
orders = [
|
|
Order("TEST001", 1000.0, "账户1"),
|
|
Order("TEST002", 975.0, "账户2")
|
|
]
|
|
|
|
result = validate_amount(1975.0, 0.0, orders)
|
|
self.assertTrue(result)
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_validate_with_handling_fee(self):
|
|
"""测试包含手续费的金额验证"""
|
|
orders = [
|
|
Order("TEST001", 2000.0, "账户1")
|
|
]
|
|
|
|
result = validate_amount(1975.0, 25.0, orders)
|
|
self.assertTrue(result)
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_validate_within_tolerance(self):
|
|
"""测试在容差范围内的金额"""
|
|
orders = [
|
|
Order("TEST001", 1000.005, "账户1")
|
|
]
|
|
|
|
# 差额在0.01容差范围内
|
|
result = validate_amount(1000.0, 0.0, orders)
|
|
self.assertTrue(result)
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_validate_mismatch(self):
|
|
"""测试金额不匹配"""
|
|
orders = [
|
|
Order("TEST001", 1000.0, "账户1"),
|
|
Order("TEST002", 500.0, "账户2")
|
|
]
|
|
|
|
result = validate_amount(1000.0, 0.0, orders)
|
|
self.assertFalse(result)
|
|
|
|
@unittest.skipIf(not USE_OPTIMIZED, "需要优化版本")
|
|
def test_validate_with_none_amount(self):
|
|
"""测试包含空金额的订单"""
|
|
orders = [
|
|
Order("TEST001", 1000.0, "账户1"),
|
|
Order("TEST002", None, "账户2") # 空金额应被忽略
|
|
]
|
|
|
|
result = validate_amount(1000.0, 0.0, orders)
|
|
self.assertTrue(result)
|
|
|
|
|
|
class TestIntegration(unittest.TestCase):
|
|
"""集成测试"""
|
|
|
|
def test_json_output_format(self):
|
|
"""测试JSON输出格式"""
|
|
# 检查res.json是否存在
|
|
if not os.path.exists('res.json'):
|
|
self.skipTest("res.json文件不存在")
|
|
|
|
# 读取JSON文件
|
|
with open('res.json', 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# 验证数据结构
|
|
self.assertIsInstance(data, list)
|
|
|
|
if len(data) > 0:
|
|
record = data[0]
|
|
|
|
# 验证必需字段
|
|
self.assertIn("ReceivedAmount", record)
|
|
self.assertIn("HandlingFee", record)
|
|
self.assertIn("Order", record)
|
|
self.assertIn("checkRes", record)
|
|
|
|
# 验证Order结构
|
|
self.assertIsInstance(record["Order"], list)
|
|
if len(record["Order"]) > 0:
|
|
order = record["Order"][0]
|
|
self.assertIn("OrderNum", order)
|
|
self.assertIn("Amount", order)
|
|
self.assertIn("AccountName", order)
|
|
|
|
|
|
class TestConfigIntegration(unittest.TestCase):
|
|
"""配置文件集成测试"""
|
|
|
|
def test_config_file_exists(self):
|
|
"""测试配置文件是否存在"""
|
|
self.assertTrue(
|
|
os.path.exists('config.ini'),
|
|
"config.ini配置文件应该存在"
|
|
)
|
|
|
|
def test_exchange_rate_file(self):
|
|
"""测试汇率文件"""
|
|
if not os.path.exists('exchange_rate.txt'):
|
|
self.skipTest("exchange_rate.txt文件不存在")
|
|
|
|
with open('exchange_rate.txt', 'r', encoding='utf-8') as f:
|
|
content = f.read().strip()
|
|
|
|
# 验证可以转换为浮点数
|
|
try:
|
|
rate = float(content)
|
|
self.assertGreater(rate, 0)
|
|
self.assertLess(rate, 100)
|
|
except ValueError:
|
|
self.fail("汇率文件内容无法转换为数字")
|
|
|
|
|
|
def run_tests():
|
|
"""运行所有测试"""
|
|
# 创建测试套件
|
|
loader = unittest.TestLoader()
|
|
suite = unittest.TestSuite()
|
|
|
|
# 添加所有测试类
|
|
suite.addTests(loader.loadTestsFromTestCase(TestOrder))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestFinancialRecord))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestProcessingStats))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestMergedCellCache))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestValidateAmount))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestIntegration))
|
|
suite.addTests(loader.loadTestsFromTestCase(TestConfigIntegration))
|
|
|
|
# 运行测试
|
|
runner = unittest.TextTestRunner(verbosity=2)
|
|
result = runner.run(suite)
|
|
|
|
# 返回测试结果
|
|
return result.wasSuccessful()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
success = run_tests()
|
|
exit(0 if success else 1)
|
|
|
|
|
|
|