584 lines
24 KiB
Python
584 lines
24 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Clash 节点订阅合并处理脚本
|
||
处理多个Clash订阅链接,将它们合并成一个统一的配置文件
|
||
"""
|
||
|
||
import json
|
||
import base64
|
||
import os
|
||
import re
|
||
import yaml
|
||
import requests
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Set, Optional
|
||
|
||
|
||
class ClashSubscriptionMerger:
|
||
def __init__(self, subscribe_file: str = "subscribe.json", template_file: str = "temp.yaml", replacement_file: str = "Replacement.txt"):
|
||
self.subscribe_file = subscribe_file
|
||
self.template_file = template_file
|
||
self.replacement_file = replacement_file
|
||
self.temp_dir = Path("temp")
|
||
self.temp_dir.mkdir(exist_ok=True)
|
||
|
||
# 需要过滤的关键词
|
||
self.filter_keywords = ["剩余流量", "套餐到期", "未到期"]
|
||
|
||
# 地区关键词映射
|
||
self.region_keywords = {
|
||
"香港": ["HKG", "🇭🇰", "HongKong", "Hong Kong", "HK"],
|
||
"新加坡": ["Singapore", "🇸🇬", "SGP", "SG"],
|
||
"USA": ["USA", "🇺🇸", "United States", "US", "America"]
|
||
}
|
||
|
||
# 存储所有节点名称,用于冲突检测
|
||
self.used_names: Set[str] = set()
|
||
|
||
# 流量信息提取正则表达式
|
||
self.traffic_patterns = [
|
||
r'剩余流量[::]\s*([0-9.]+\s*[KMGT]?B)',
|
||
r'流量.*?[::]\s*([0-9.]+\s*[KMGT]?B)',
|
||
r'总流量[::]\s*([0-9.]+\s*[KMGT]?B)',
|
||
]
|
||
|
||
# 剩余天数提取正则表达式
|
||
self.days_patterns = [
|
||
r'距离下次重置.*?[::]\s*([0-9]+)\s*天',
|
||
r'剩余.*?[::]\s*([0-9]+)\s*天',
|
||
]
|
||
|
||
def load_subscriptions(self) -> List[Dict[str, str]]:
|
||
"""加载订阅配置文件"""
|
||
try:
|
||
with open(self.subscribe_file, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
print(f"加载订阅配置文件失败: {e}")
|
||
return []
|
||
|
||
def fetch_subscription(self, url: str) -> str:
|
||
"""获取订阅内容并解码Base64"""
|
||
try:
|
||
headers = {
|
||
'User-Agent': 'ClashX/1.118.0 (com.west2online.ClashX; build:1.118.0; iOS 16.0.0) Alamofire/5.6.4'
|
||
}
|
||
response = requests.get(url, headers=headers, timeout=30)
|
||
response.raise_for_status()
|
||
|
||
# 尝试多种解码方式
|
||
content = response.content
|
||
|
||
# 先尝试直接当作文本处理
|
||
try:
|
||
text_content = content.decode('utf-8')
|
||
# 检查是否已经是YAML格式
|
||
if text_content.strip().startswith(('mixed-port:', 'port:', 'socks-port:', 'redir-port:')):
|
||
return text_content
|
||
except:
|
||
pass
|
||
|
||
# 尝试Base64解码
|
||
try:
|
||
# 去除可能的换行符和空格
|
||
text_content = content.decode('utf-8').strip()
|
||
decoded_content = base64.b64decode(text_content).decode('utf-8')
|
||
return decoded_content
|
||
except:
|
||
pass
|
||
|
||
# 尝试其他编码
|
||
for encoding in ['utf-8', 'gbk', 'gb2312']:
|
||
try:
|
||
text_content = content.decode(encoding).strip()
|
||
decoded_content = base64.b64decode(text_content).decode('utf-8')
|
||
return decoded_content
|
||
except:
|
||
continue
|
||
|
||
return ""
|
||
|
||
except Exception as e:
|
||
print(f"获取订阅失败 {url}: {e}")
|
||
return ""
|
||
|
||
def save_temp_yaml(self, group_name: str, content: str) -> bool:
|
||
"""保存临时YAML文件"""
|
||
try:
|
||
temp_file = self.temp_dir / f"{group_name}.yaml"
|
||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||
f.write(content)
|
||
print(f"已保存临时文件: {temp_file}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"保存临时文件失败 {group_name}: {e}")
|
||
return False
|
||
|
||
def load_template(self) -> Dict[str, Any]:
|
||
"""加载模板文件"""
|
||
try:
|
||
with open(self.template_file, 'r', encoding='utf-8') as f:
|
||
return yaml.safe_load(f)
|
||
except Exception as e:
|
||
print(f"加载模板文件失败: {e}")
|
||
return {}
|
||
|
||
def should_filter_proxy(self, proxy_name: str) -> bool:
|
||
"""检查是否应该过滤该代理节点"""
|
||
for keyword in self.filter_keywords:
|
||
if keyword in proxy_name:
|
||
return True
|
||
return False
|
||
|
||
def should_filter_proxy_group_node(self, node_name: str) -> bool:
|
||
"""检查proxy-groups中的节点是否应该过滤"""
|
||
filter_prefixes = ['Auto', '剩余流量', '套餐到期', '未到期']
|
||
for prefix in filter_prefixes:
|
||
if node_name.startswith(prefix):
|
||
return True
|
||
return False
|
||
|
||
def get_unique_name(self, original_name: str) -> str:
|
||
"""处理名称冲突,返回唯一名称"""
|
||
if original_name not in self.used_names:
|
||
self.used_names.add(original_name)
|
||
return original_name
|
||
|
||
counter = 1
|
||
while True:
|
||
new_name = f"{original_name}_{counter}"
|
||
if new_name not in self.used_names:
|
||
self.used_names.add(new_name)
|
||
return new_name
|
||
counter += 1
|
||
|
||
def categorize_proxy_by_region(self, proxy_name: str) -> str:
|
||
"""根据节点名称判断所属地区"""
|
||
proxy_name_lower = proxy_name.lower()
|
||
|
||
for region, keywords in self.region_keywords.items():
|
||
for keyword in keywords:
|
||
if keyword.lower() in proxy_name_lower or keyword in proxy_name:
|
||
return region
|
||
|
||
return None
|
||
|
||
def extract_traffic_info(self, proxy_names: List[str]) -> Optional[str]:
|
||
"""从节点名称列表中提取流量信息"""
|
||
for name in proxy_names:
|
||
for pattern in self.traffic_patterns:
|
||
match = re.search(pattern, name)
|
||
if match:
|
||
return match.group(1).strip()
|
||
return None
|
||
|
||
def extract_days_info(self, proxy_names: List[str]) -> Optional[int]:
|
||
"""从节点名称列表中提取剩余天数信息"""
|
||
for name in proxy_names:
|
||
for pattern in self.days_patterns:
|
||
match = re.search(pattern, name)
|
||
if match:
|
||
return int(match.group(1))
|
||
return None
|
||
|
||
def convert_traffic_to_gb(self, traffic_str: str) -> float:
|
||
"""将流量字符串转换为GB数值"""
|
||
try:
|
||
# 移除空格并转换为大写
|
||
traffic_str = traffic_str.replace(" ", "").upper()
|
||
|
||
# 提取数字部分
|
||
import re
|
||
match = re.match(r'([0-9.]+)([KMGT]?B?)', traffic_str)
|
||
if not match:
|
||
return 0.0
|
||
|
||
value = float(match.group(1))
|
||
unit = match.group(2).replace('B', '') if match.group(2) else ''
|
||
|
||
# 转换为GB
|
||
if unit == 'T' or unit == 'TB':
|
||
return value * 1024
|
||
elif unit == 'G' or unit == 'GB' or unit == '':
|
||
return value
|
||
elif unit == 'M' or unit == 'MB':
|
||
return value / 1024
|
||
elif unit == 'K' or unit == 'KB':
|
||
return value / (1024 * 1024)
|
||
else:
|
||
return value # 默认当作GB处理
|
||
|
||
except Exception as e:
|
||
print(f"转换流量失败 '{traffic_str}': {e}")
|
||
return 0.0
|
||
|
||
def calculate_daily_average(self, traffic_gb: float, days: int) -> float:
|
||
"""计算平均每天剩余流量(GB)"""
|
||
if days <= 0:
|
||
return 0.0
|
||
return traffic_gb / days
|
||
|
||
def read_replacement_target(self) -> Optional[str]:
|
||
"""读取Replacement.txt文件获取目标文件路径"""
|
||
try:
|
||
if not os.path.exists(self.replacement_file):
|
||
print(f"Replacement.txt文件不存在,跳过文件替换")
|
||
return None
|
||
|
||
with open(self.replacement_file, 'r', encoding='utf-8') as f:
|
||
target_path = f.read().strip()
|
||
|
||
if not target_path:
|
||
print(f"Replacement.txt文件为空,跳过文件替换")
|
||
return None
|
||
|
||
print(f"读取到目标文件路径: {target_path}")
|
||
return target_path
|
||
|
||
except Exception as e:
|
||
print(f"读取Replacement.txt文件失败: {e}")
|
||
return None
|
||
|
||
def replace_target_file(self, target_path: str) -> bool:
|
||
"""使用merge.yaml内容替换目标文件"""
|
||
try:
|
||
# 检查目标文件是否存在
|
||
if not os.path.exists(target_path):
|
||
print(f"目标文件不存在: {target_path}")
|
||
return False
|
||
|
||
# 检查merge.yaml是否存在
|
||
merge_file = "merge.yaml"
|
||
if not os.path.exists(merge_file):
|
||
print(f"merge.yaml文件不存在")
|
||
return False
|
||
|
||
# 读取merge.yaml内容
|
||
with open(merge_file, 'r', encoding='utf-8') as f:
|
||
merge_content = f.read()
|
||
|
||
# 替换目标文件内容
|
||
with open(target_path, 'w', encoding='utf-8') as f:
|
||
f.write(merge_content)
|
||
|
||
print(f"成功将merge.yaml内容替换到: {target_path}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"替换目标文件失败: {e}")
|
||
return False
|
||
|
||
def generate_base64_subscription(self) -> bool:
|
||
"""生成Base64编码的订阅内容并保存到res.txt"""
|
||
try:
|
||
# 检查merge.yaml是否存在
|
||
merge_file = "merge.yaml"
|
||
if not os.path.exists(merge_file):
|
||
print(f"merge.yaml文件不存在")
|
||
return False
|
||
|
||
# 读取merge.yaml内容
|
||
with open(merge_file, 'r', encoding='utf-8') as f:
|
||
merge_content = f.read()
|
||
|
||
# 编码为Base64
|
||
encoded_content = base64.b64encode(merge_content.encode('utf-8')).decode('utf-8')
|
||
|
||
# 保存到res.txt
|
||
with open('res.txt', 'w', encoding='utf-8') as f:
|
||
f.write(encoded_content)
|
||
|
||
print(f"已生成Base64编码订阅文件: res.txt")
|
||
print(f"编码长度: {len(encoded_content)} 字符")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"生成Base64订阅失败: {e}")
|
||
return False
|
||
|
||
def get_user_choice(self) -> str:
|
||
"""获取用户选择:文件替换或Base64编码"""
|
||
while True:
|
||
print("\n请选择输出方式:")
|
||
print("1. 文件替换模式 (使用Replacement.txt中的路径)")
|
||
print("2. Base64编码模式 (生成res.txt订阅文件)")
|
||
|
||
choice = input("请输入选择 (1 或 2): ").strip()
|
||
|
||
if choice == '1':
|
||
return 'replace'
|
||
elif choice == '2':
|
||
return 'base64'
|
||
else:
|
||
print("无效选择,请输入 1 或 2")
|
||
continue
|
||
|
||
def process_temp_files(self) -> Dict[str, Any]:
|
||
"""处理所有临时文件,合并节点信息"""
|
||
merged_data = {
|
||
'proxies': [],
|
||
'proxy_groups': [],
|
||
'region_nodes': {
|
||
'香港': [],
|
||
'新加坡': [],
|
||
'USA': []
|
||
},
|
||
'group_traffic_info': {}, # 存储每个组的流量信息
|
||
'group_days_info': {}, # 存储每个组的剩余天数信息
|
||
'group_daily_average': {} # 存储每个组的平均每天剩余流量
|
||
}
|
||
|
||
temp_files = list(self.temp_dir.glob("*.yaml"))
|
||
|
||
for temp_file in temp_files:
|
||
try:
|
||
with open(temp_file, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
|
||
if not data:
|
||
continue
|
||
|
||
print(f"处理文件: {temp_file}")
|
||
group_name = temp_file.stem # 获取文件名(不含扩展名)
|
||
|
||
# 收集所有节点名称用于流量信息提取
|
||
all_proxy_names = []
|
||
if 'proxies' in data and data['proxies']:
|
||
all_proxy_names = [proxy.get('name', '') for proxy in data['proxies']]
|
||
|
||
# 提取流量信息
|
||
traffic_info = self.extract_traffic_info(all_proxy_names)
|
||
if traffic_info:
|
||
merged_data['group_traffic_info'][group_name] = traffic_info
|
||
print(f"提取到流量信息: {group_name} -> {traffic_info}")
|
||
|
||
# 提取剩余天数信息
|
||
days_info = self.extract_days_info(all_proxy_names)
|
||
if days_info:
|
||
merged_data['group_days_info'][group_name] = days_info
|
||
print(f"提取到剩余天数: {group_name} -> {days_info}天")
|
||
|
||
# 计算平均每天剩余流量
|
||
if traffic_info and days_info:
|
||
traffic_gb = self.convert_traffic_to_gb(traffic_info)
|
||
daily_average = self.calculate_daily_average(traffic_gb, days_info)
|
||
merged_data['group_daily_average'][group_name] = daily_average
|
||
print(f"平均每天剩余流量: {group_name} -> {daily_average:.2f}GB/天")
|
||
|
||
# 处理proxies节点
|
||
if 'proxies' in data and data['proxies']:
|
||
for proxy in data['proxies']:
|
||
if 'name' in proxy:
|
||
# 过滤不需要的节点
|
||
if self.should_filter_proxy(proxy['name']):
|
||
continue
|
||
|
||
# 处理名称冲突
|
||
unique_name = self.get_unique_name(proxy['name'])
|
||
proxy['name'] = unique_name
|
||
|
||
merged_data['proxies'].append(proxy)
|
||
|
||
# 按地区分类节点
|
||
region = self.categorize_proxy_by_region(unique_name)
|
||
if region:
|
||
merged_data['region_nodes'][region].append(unique_name)
|
||
|
||
# 处理proxy-groups(通常第一组包含该订阅站点的所有节点)
|
||
if 'proxy-groups' in data and data['proxy-groups']:
|
||
first_group = data['proxy-groups'][0].copy()
|
||
|
||
if 'proxies' in first_group:
|
||
# 过滤proxy-groups中的节点
|
||
filtered_proxies = []
|
||
for proxy_name in first_group['proxies']:
|
||
if not self.should_filter_proxy_group_node(proxy_name):
|
||
# 检查节点名称是否在我们的proxies中存在(处理过名称冲突后的)
|
||
if proxy_name in self.used_names:
|
||
filtered_proxies.append(proxy_name)
|
||
|
||
first_group['proxies'] = filtered_proxies
|
||
|
||
if filtered_proxies: # 只有当有有效节点时才添加
|
||
merged_data['proxy_groups'].append(first_group)
|
||
|
||
except Exception as e:
|
||
print(f"处理文件 {temp_file} 失败: {e}")
|
||
continue
|
||
|
||
return merged_data
|
||
|
||
def generate_merge_yaml(self, merged_data: Dict[str, Any]) -> bool:
|
||
"""生成最终的merge.yaml文件"""
|
||
try:
|
||
# 加载模板
|
||
template = self.load_template()
|
||
if not template:
|
||
return False
|
||
|
||
# 更新proxies
|
||
template['proxies'] = merged_data['proxies']
|
||
|
||
# 更新proxy-groups
|
||
# 首先添加合并的proxy-groups
|
||
if merged_data['proxy_groups']:
|
||
# 在AutoProxy后面插入订阅组
|
||
existing_groups = template.get('proxy-groups', [])
|
||
auto_proxy_group = existing_groups[0].copy() # 获取AutoProxy组的副本
|
||
new_groups = []
|
||
|
||
# 收集新增组的名称和信息,用于排序
|
||
group_info_list = []
|
||
|
||
# 添加合并的组,并更新组名包含流量信息
|
||
merged_groups = []
|
||
for group in merged_data['proxy_groups']:
|
||
group_name = group.get('name', '')
|
||
# 检查是否有对应的流量信息
|
||
if group_name in merged_data['group_traffic_info']:
|
||
traffic_info = merged_data['group_traffic_info'][group_name]
|
||
new_name = f"{group_name}({traffic_info})"
|
||
group['name'] = new_name
|
||
|
||
# 获取平均每天剩余流量用于排序
|
||
daily_average = merged_data['group_daily_average'].get(group_name, 0.0)
|
||
group_info_list.append({
|
||
'name': new_name,
|
||
'daily_average': daily_average,
|
||
'original_name': group_name
|
||
})
|
||
print(f"更新组名: {group_name} -> {new_name}")
|
||
else:
|
||
group_info_list.append({
|
||
'name': group_name,
|
||
'daily_average': 0.0,
|
||
'original_name': group_name
|
||
})
|
||
merged_groups.append(group)
|
||
|
||
# 按平均每天剩余流量降序排序
|
||
group_info_list.sort(key=lambda x: x['daily_average'], reverse=True)
|
||
sorted_group_names = [info['name'] for info in group_info_list]
|
||
|
||
# 打印排序信息
|
||
print(f"按平均每天剩余流量排序:")
|
||
for info in group_info_list:
|
||
print(f" {info['name']}: {info['daily_average']:.2f}GB/天")
|
||
|
||
# 更新AutoProxy组的proxies列表,按排序插入新组
|
||
if 'proxies' in auto_proxy_group:
|
||
# 将排序后的新组名添加到AutoProxy的proxies列表头部
|
||
auto_proxy_group['proxies'] = sorted_group_names + auto_proxy_group['proxies']
|
||
print(f"已将排序后的组添加到AutoProxy组的头部")
|
||
|
||
# 组装最终的proxy-groups列表
|
||
new_groups.append(auto_proxy_group) # 更新后的AutoProxy组
|
||
new_groups.extend(merged_groups) # 合并的订阅组
|
||
new_groups.extend(existing_groups[1:]) # 其余的地区组
|
||
|
||
template['proxy-groups'] = new_groups
|
||
|
||
# 更新地区组的节点
|
||
for group in template['proxy-groups']:
|
||
group_name = group.get('name', '')
|
||
|
||
if group_name == 'Auto-香港':
|
||
group['proxies'] = merged_data['region_nodes']['香港']
|
||
elif group_name == 'Auto-新加坡':
|
||
group['proxies'] = merged_data['region_nodes']['新加坡']
|
||
elif group_name == 'Auto-USA':
|
||
group['proxies'] = merged_data['region_nodes']['USA']
|
||
|
||
# 保存merge.yaml
|
||
with open('merge.yaml', 'w', encoding='utf-8') as f:
|
||
yaml.dump(template, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
||
|
||
print("已生成 merge.yaml 文件")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"生成merge.yaml失败: {e}")
|
||
return False
|
||
|
||
def run(self):
|
||
"""运行主程序"""
|
||
print("开始处理Clash订阅合并...")
|
||
|
||
# 1. 加载订阅配置
|
||
subscriptions = self.load_subscriptions()
|
||
if not subscriptions:
|
||
print("没有找到有效的订阅配置")
|
||
return
|
||
|
||
# 2. 获取并保存每个订阅的内容
|
||
for subscription in subscriptions:
|
||
group_name = subscription.get('group_name', '')
|
||
url = subscription.get('url', '')
|
||
|
||
if not group_name or not url:
|
||
print(f"跳过无效订阅: {subscription}")
|
||
continue
|
||
|
||
# 检查是否已存在临时文件
|
||
temp_file = self.temp_dir / f"{group_name}.yaml"
|
||
if temp_file.exists():
|
||
print(f"使用已存在的临时文件: {group_name}")
|
||
continue
|
||
|
||
print(f"正在处理订阅: {group_name}")
|
||
content = self.fetch_subscription(url)
|
||
|
||
if content:
|
||
self.save_temp_yaml(group_name, content)
|
||
else:
|
||
print(f"获取订阅内容失败: {group_name}")
|
||
|
||
# 3. 处理临时文件并合并
|
||
merged_data = self.process_temp_files()
|
||
|
||
# 4. 生成最终文件
|
||
success = self.generate_merge_yaml(merged_data)
|
||
|
||
if success:
|
||
print(f"\n合并完成!")
|
||
print(f"总共合并了 {len(merged_data['proxies'])} 个节点")
|
||
print(f"香港节点: {len(merged_data['region_nodes']['香港'])} 个")
|
||
print(f"新加坡节点: {len(merged_data['region_nodes']['新加坡'])} 个")
|
||
print(f"美国节点: {len(merged_data['region_nodes']['USA'])} 个")
|
||
print(f"其他节点组: {len(merged_data['proxy_groups'])} 个")
|
||
|
||
# 获取用户选择
|
||
choice = self.get_user_choice()
|
||
|
||
if choice == 'replace':
|
||
# 执行文件替换
|
||
print(f"\n检查文件替换配置...")
|
||
target_path = self.read_replacement_target()
|
||
if target_path:
|
||
replacement_success = self.replace_target_file(target_path)
|
||
if replacement_success:
|
||
print(f"文件替换成功!")
|
||
else:
|
||
print(f"文件替换失败!")
|
||
else:
|
||
print(f"未配置文件替换或配置无效")
|
||
|
||
elif choice == 'base64':
|
||
# 生成Base64编码订阅
|
||
print(f"\n生成Base64编码订阅...")
|
||
base64_success = self.generate_base64_subscription()
|
||
if base64_success:
|
||
print(f"Base64编码订阅生成成功!")
|
||
print(f"\n订阅链接配置建议:")
|
||
print(f"1. 将res.txt文件放置到Web服务器目录")
|
||
print(f"2. 配置Nginx提供订阅服务")
|
||
print(f"3. 使用本地订阅链接: http://your-domain/subscription")
|
||
else:
|
||
print(f"Base64编码订阅生成失败!")
|
||
else:
|
||
print("合并失败!")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
merger = ClashSubscriptionMerger()
|
||
merger.run() |