Files
ClashSubscribeMerge/merger.py
2025-07-23 19:31:36 +08:00

584 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Clash 节点订阅合并处理脚本
处理多个Clash订阅链接将它们合并成一个统一的配置文件
"""
import json
import base64
import os
import re
import yaml
import requests
from pathlib import Path
from typing import Dict, List, Any, Set, Optional
class ClashSubscriptionMerger:
def __init__(self, subscribe_file: str = "subscribe.json", template_file: str = "temp.yaml", replacement_file: str = "Replacement.txt"):
self.subscribe_file = subscribe_file
self.template_file = template_file
self.replacement_file = replacement_file
self.temp_dir = Path("temp")
self.temp_dir.mkdir(exist_ok=True)
# 需要过滤的关键词
self.filter_keywords = ["剩余流量", "套餐到期", "未到期"]
# 地区关键词映射
self.region_keywords = {
"香港": ["HKG", "🇭🇰", "HongKong", "Hong Kong", "HK"],
"新加坡": ["Singapore", "🇸🇬", "SGP", "SG"],
"USA": ["USA", "🇺🇸", "United States", "US", "America"]
}
# 存储所有节点名称,用于冲突检测
self.used_names: Set[str] = set()
# 流量信息提取正则表达式
self.traffic_patterns = [
r'剩余流量[:]\s*([0-9.]+\s*[KMGT]?B)',
r'流量.*?[:]\s*([0-9.]+\s*[KMGT]?B)',
r'总流量[:]\s*([0-9.]+\s*[KMGT]?B)',
]
# 剩余天数提取正则表达式
self.days_patterns = [
r'距离下次重置.*?[:]\s*([0-9]+)\s*天',
r'剩余.*?[:]\s*([0-9]+)\s*天',
]
def load_subscriptions(self) -> List[Dict[str, str]]:
"""加载订阅配置文件"""
try:
with open(self.subscribe_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"加载订阅配置文件失败: {e}")
return []
def fetch_subscription(self, url: str) -> str:
"""获取订阅内容并解码Base64"""
try:
headers = {
'User-Agent': 'ClashX/1.118.0 (com.west2online.ClashX; build:1.118.0; iOS 16.0.0) Alamofire/5.6.4'
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
# 尝试多种解码方式
content = response.content
# 先尝试直接当作文本处理
try:
text_content = content.decode('utf-8')
# 检查是否已经是YAML格式
if text_content.strip().startswith(('mixed-port:', 'port:', 'socks-port:', 'redir-port:')):
return text_content
except:
pass
# 尝试Base64解码
try:
# 去除可能的换行符和空格
text_content = content.decode('utf-8').strip()
decoded_content = base64.b64decode(text_content).decode('utf-8')
return decoded_content
except:
pass
# 尝试其他编码
for encoding in ['utf-8', 'gbk', 'gb2312']:
try:
text_content = content.decode(encoding).strip()
decoded_content = base64.b64decode(text_content).decode('utf-8')
return decoded_content
except:
continue
return ""
except Exception as e:
print(f"获取订阅失败 {url}: {e}")
return ""
def save_temp_yaml(self, group_name: str, content: str) -> bool:
"""保存临时YAML文件"""
try:
temp_file = self.temp_dir / f"{group_name}.yaml"
with open(temp_file, 'w', encoding='utf-8') as f:
f.write(content)
print(f"已保存临时文件: {temp_file}")
return True
except Exception as e:
print(f"保存临时文件失败 {group_name}: {e}")
return False
def load_template(self) -> Dict[str, Any]:
"""加载模板文件"""
try:
with open(self.template_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f"加载模板文件失败: {e}")
return {}
def should_filter_proxy(self, proxy_name: str) -> bool:
"""检查是否应该过滤该代理节点"""
for keyword in self.filter_keywords:
if keyword in proxy_name:
return True
return False
def should_filter_proxy_group_node(self, node_name: str) -> bool:
"""检查proxy-groups中的节点是否应该过滤"""
filter_prefixes = ['Auto', '剩余流量', '套餐到期', '未到期']
for prefix in filter_prefixes:
if node_name.startswith(prefix):
return True
return False
def get_unique_name(self, original_name: str) -> str:
"""处理名称冲突,返回唯一名称"""
if original_name not in self.used_names:
self.used_names.add(original_name)
return original_name
counter = 1
while True:
new_name = f"{original_name}_{counter}"
if new_name not in self.used_names:
self.used_names.add(new_name)
return new_name
counter += 1
def categorize_proxy_by_region(self, proxy_name: str) -> str:
"""根据节点名称判断所属地区"""
proxy_name_lower = proxy_name.lower()
for region, keywords in self.region_keywords.items():
for keyword in keywords:
if keyword.lower() in proxy_name_lower or keyword in proxy_name:
return region
return None
def extract_traffic_info(self, proxy_names: List[str]) -> Optional[str]:
"""从节点名称列表中提取流量信息"""
for name in proxy_names:
for pattern in self.traffic_patterns:
match = re.search(pattern, name)
if match:
return match.group(1).strip()
return None
def extract_days_info(self, proxy_names: List[str]) -> Optional[int]:
"""从节点名称列表中提取剩余天数信息"""
for name in proxy_names:
for pattern in self.days_patterns:
match = re.search(pattern, name)
if match:
return int(match.group(1))
return None
def convert_traffic_to_gb(self, traffic_str: str) -> float:
"""将流量字符串转换为GB数值"""
try:
# 移除空格并转换为大写
traffic_str = traffic_str.replace(" ", "").upper()
# 提取数字部分
import re
match = re.match(r'([0-9.]+)([KMGT]?B?)', traffic_str)
if not match:
return 0.0
value = float(match.group(1))
unit = match.group(2).replace('B', '') if match.group(2) else ''
# 转换为GB
if unit == 'T' or unit == 'TB':
return value * 1024
elif unit == 'G' or unit == 'GB' or unit == '':
return value
elif unit == 'M' or unit == 'MB':
return value / 1024
elif unit == 'K' or unit == 'KB':
return value / (1024 * 1024)
else:
return value # 默认当作GB处理
except Exception as e:
print(f"转换流量失败 '{traffic_str}': {e}")
return 0.0
def calculate_daily_average(self, traffic_gb: float, days: int) -> float:
"""计算平均每天剩余流量GB"""
if days <= 0:
return 0.0
return traffic_gb / days
def read_replacement_target(self) -> Optional[str]:
"""读取Replacement.txt文件获取目标文件路径"""
try:
if not os.path.exists(self.replacement_file):
print(f"Replacement.txt文件不存在跳过文件替换")
return None
with open(self.replacement_file, 'r', encoding='utf-8') as f:
target_path = f.read().strip()
if not target_path:
print(f"Replacement.txt文件为空跳过文件替换")
return None
print(f"读取到目标文件路径: {target_path}")
return target_path
except Exception as e:
print(f"读取Replacement.txt文件失败: {e}")
return None
def replace_target_file(self, target_path: str) -> bool:
"""使用merge.yaml内容替换目标文件"""
try:
# 检查目标文件是否存在
if not os.path.exists(target_path):
print(f"目标文件不存在: {target_path}")
return False
# 检查merge.yaml是否存在
merge_file = "merge.yaml"
if not os.path.exists(merge_file):
print(f"merge.yaml文件不存在")
return False
# 读取merge.yaml内容
with open(merge_file, 'r', encoding='utf-8') as f:
merge_content = f.read()
# 替换目标文件内容
with open(target_path, 'w', encoding='utf-8') as f:
f.write(merge_content)
print(f"成功将merge.yaml内容替换到: {target_path}")
return True
except Exception as e:
print(f"替换目标文件失败: {e}")
return False
def generate_base64_subscription(self) -> bool:
"""生成Base64编码的订阅内容并保存到res.txt"""
try:
# 检查merge.yaml是否存在
merge_file = "merge.yaml"
if not os.path.exists(merge_file):
print(f"merge.yaml文件不存在")
return False
# 读取merge.yaml内容
with open(merge_file, 'r', encoding='utf-8') as f:
merge_content = f.read()
# 编码为Base64
encoded_content = base64.b64encode(merge_content.encode('utf-8')).decode('utf-8')
# 保存到res.txt
with open('res.txt', 'w', encoding='utf-8') as f:
f.write(encoded_content)
print(f"已生成Base64编码订阅文件: res.txt")
print(f"编码长度: {len(encoded_content)} 字符")
return True
except Exception as e:
print(f"生成Base64订阅失败: {e}")
return False
def get_user_choice(self) -> str:
"""获取用户选择文件替换或Base64编码"""
while True:
print("\n请选择输出方式:")
print("1. 文件替换模式 (使用Replacement.txt中的路径)")
print("2. Base64编码模式 (生成res.txt订阅文件)")
choice = input("请输入选择 (1 或 2): ").strip()
if choice == '1':
return 'replace'
elif choice == '2':
return 'base64'
else:
print("无效选择,请输入 1 或 2")
continue
def process_temp_files(self) -> Dict[str, Any]:
"""处理所有临时文件,合并节点信息"""
merged_data = {
'proxies': [],
'proxy_groups': [],
'region_nodes': {
'香港': [],
'新加坡': [],
'USA': []
},
'group_traffic_info': {}, # 存储每个组的流量信息
'group_days_info': {}, # 存储每个组的剩余天数信息
'group_daily_average': {} # 存储每个组的平均每天剩余流量
}
temp_files = list(self.temp_dir.glob("*.yaml"))
for temp_file in temp_files:
try:
with open(temp_file, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
continue
print(f"处理文件: {temp_file}")
group_name = temp_file.stem # 获取文件名(不含扩展名)
# 收集所有节点名称用于流量信息提取
all_proxy_names = []
if 'proxies' in data and data['proxies']:
all_proxy_names = [proxy.get('name', '') for proxy in data['proxies']]
# 提取流量信息
traffic_info = self.extract_traffic_info(all_proxy_names)
if traffic_info:
merged_data['group_traffic_info'][group_name] = traffic_info
print(f"提取到流量信息: {group_name} -> {traffic_info}")
# 提取剩余天数信息
days_info = self.extract_days_info(all_proxy_names)
if days_info:
merged_data['group_days_info'][group_name] = days_info
print(f"提取到剩余天数: {group_name} -> {days_info}")
# 计算平均每天剩余流量
if traffic_info and days_info:
traffic_gb = self.convert_traffic_to_gb(traffic_info)
daily_average = self.calculate_daily_average(traffic_gb, days_info)
merged_data['group_daily_average'][group_name] = daily_average
print(f"平均每天剩余流量: {group_name} -> {daily_average:.2f}GB/天")
# 处理proxies节点
if 'proxies' in data and data['proxies']:
for proxy in data['proxies']:
if 'name' in proxy:
# 过滤不需要的节点
if self.should_filter_proxy(proxy['name']):
continue
# 处理名称冲突
unique_name = self.get_unique_name(proxy['name'])
proxy['name'] = unique_name
merged_data['proxies'].append(proxy)
# 按地区分类节点
region = self.categorize_proxy_by_region(unique_name)
if region:
merged_data['region_nodes'][region].append(unique_name)
# 处理proxy-groups通常第一组包含该订阅站点的所有节点
if 'proxy-groups' in data and data['proxy-groups']:
first_group = data['proxy-groups'][0].copy()
if 'proxies' in first_group:
# 过滤proxy-groups中的节点
filtered_proxies = []
for proxy_name in first_group['proxies']:
if not self.should_filter_proxy_group_node(proxy_name):
# 检查节点名称是否在我们的proxies中存在处理过名称冲突后的
if proxy_name in self.used_names:
filtered_proxies.append(proxy_name)
first_group['proxies'] = filtered_proxies
if filtered_proxies: # 只有当有有效节点时才添加
merged_data['proxy_groups'].append(first_group)
except Exception as e:
print(f"处理文件 {temp_file} 失败: {e}")
continue
return merged_data
def generate_merge_yaml(self, merged_data: Dict[str, Any]) -> bool:
"""生成最终的merge.yaml文件"""
try:
# 加载模板
template = self.load_template()
if not template:
return False
# 更新proxies
template['proxies'] = merged_data['proxies']
# 更新proxy-groups
# 首先添加合并的proxy-groups
if merged_data['proxy_groups']:
# 在AutoProxy后面插入订阅组
existing_groups = template.get('proxy-groups', [])
auto_proxy_group = existing_groups[0].copy() # 获取AutoProxy组的副本
new_groups = []
# 收集新增组的名称和信息,用于排序
group_info_list = []
# 添加合并的组,并更新组名包含流量信息
merged_groups = []
for group in merged_data['proxy_groups']:
group_name = group.get('name', '')
# 检查是否有对应的流量信息
if group_name in merged_data['group_traffic_info']:
traffic_info = merged_data['group_traffic_info'][group_name]
new_name = f"{group_name}({traffic_info})"
group['name'] = new_name
# 获取平均每天剩余流量用于排序
daily_average = merged_data['group_daily_average'].get(group_name, 0.0)
group_info_list.append({
'name': new_name,
'daily_average': daily_average,
'original_name': group_name
})
print(f"更新组名: {group_name} -> {new_name}")
else:
group_info_list.append({
'name': group_name,
'daily_average': 0.0,
'original_name': group_name
})
merged_groups.append(group)
# 按平均每天剩余流量降序排序
group_info_list.sort(key=lambda x: x['daily_average'], reverse=True)
sorted_group_names = [info['name'] for info in group_info_list]
# 打印排序信息
print(f"按平均每天剩余流量排序:")
for info in group_info_list:
print(f" {info['name']}: {info['daily_average']:.2f}GB/天")
# 更新AutoProxy组的proxies列表按排序插入新组
if 'proxies' in auto_proxy_group:
# 将排序后的新组名添加到AutoProxy的proxies列表头部
auto_proxy_group['proxies'] = sorted_group_names + auto_proxy_group['proxies']
print(f"已将排序后的组添加到AutoProxy组的头部")
# 组装最终的proxy-groups列表
new_groups.append(auto_proxy_group) # 更新后的AutoProxy组
new_groups.extend(merged_groups) # 合并的订阅组
new_groups.extend(existing_groups[1:]) # 其余的地区组
template['proxy-groups'] = new_groups
# 更新地区组的节点
for group in template['proxy-groups']:
group_name = group.get('name', '')
if group_name == 'Auto-香港':
group['proxies'] = merged_data['region_nodes']['香港']
elif group_name == 'Auto-新加坡':
group['proxies'] = merged_data['region_nodes']['新加坡']
elif group_name == 'Auto-USA':
group['proxies'] = merged_data['region_nodes']['USA']
# 保存merge.yaml
with open('merge.yaml', 'w', encoding='utf-8') as f:
yaml.dump(template, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
print("已生成 merge.yaml 文件")
return True
except Exception as e:
print(f"生成merge.yaml失败: {e}")
return False
def run(self):
"""运行主程序"""
print("开始处理Clash订阅合并...")
# 1. 加载订阅配置
subscriptions = self.load_subscriptions()
if not subscriptions:
print("没有找到有效的订阅配置")
return
# 2. 获取并保存每个订阅的内容
for subscription in subscriptions:
group_name = subscription.get('group_name', '')
url = subscription.get('url', '')
if not group_name or not url:
print(f"跳过无效订阅: {subscription}")
continue
# 检查是否已存在临时文件
temp_file = self.temp_dir / f"{group_name}.yaml"
if temp_file.exists():
print(f"使用已存在的临时文件: {group_name}")
continue
print(f"正在处理订阅: {group_name}")
content = self.fetch_subscription(url)
if content:
self.save_temp_yaml(group_name, content)
else:
print(f"获取订阅内容失败: {group_name}")
# 3. 处理临时文件并合并
merged_data = self.process_temp_files()
# 4. 生成最终文件
success = self.generate_merge_yaml(merged_data)
if success:
print(f"\n合并完成!")
print(f"总共合并了 {len(merged_data['proxies'])} 个节点")
print(f"香港节点: {len(merged_data['region_nodes']['香港'])}")
print(f"新加坡节点: {len(merged_data['region_nodes']['新加坡'])}")
print(f"美国节点: {len(merged_data['region_nodes']['USA'])}")
print(f"其他节点组: {len(merged_data['proxy_groups'])}")
# 获取用户选择
choice = self.get_user_choice()
if choice == 'replace':
# 执行文件替换
print(f"\n检查文件替换配置...")
target_path = self.read_replacement_target()
if target_path:
replacement_success = self.replace_target_file(target_path)
if replacement_success:
print(f"文件替换成功!")
else:
print(f"文件替换失败!")
else:
print(f"未配置文件替换或配置无效")
elif choice == 'base64':
# 生成Base64编码订阅
print(f"\n生成Base64编码订阅...")
base64_success = self.generate_base64_subscription()
if base64_success:
print(f"Base64编码订阅生成成功!")
print(f"\n订阅链接配置建议:")
print(f"1. 将res.txt文件放置到Web服务器目录")
print(f"2. 配置Nginx提供订阅服务")
print(f"3. 使用本地订阅链接: http://your-domain/subscription")
else:
print(f"Base64编码订阅生成失败!")
else:
print("合并失败!")
if __name__ == "__main__":
merger = ClashSubscriptionMerger()
merger.run()