#!/usr/bin/env python3 """ Clash 节点订阅合并处理脚本 处理多个Clash订阅链接,将它们合并成一个统一的配置文件 """ import json import base64 import os import re import yaml import requests from pathlib import Path from typing import Dict, List, Any, Set, Optional class ClashSubscriptionMerger: def __init__(self, subscribe_file: str = "subscribe.json", template_file: str = "temp.yaml", replacement_file: str = "Replacement.txt"): self.subscribe_file = subscribe_file self.template_file = template_file self.replacement_file = replacement_file self.temp_dir = Path("temp") self.temp_dir.mkdir(exist_ok=True) # 需要过滤的关键词 self.filter_keywords = ["剩余流量", "套餐到期", "未到期"] # 地区关键词映射 self.region_keywords = { "香港": ["HKG", "🇭🇰", "HongKong", "Hong Kong", "HK"], "新加坡": ["Singapore", "🇸🇬", "SGP", "SG"], "USA": ["USA", "🇺🇸", "United States", "US", "America"] } # 存储所有节点名称,用于冲突检测 self.used_names: Set[str] = set() # 流量信息提取正则表达式 self.traffic_patterns = [ r'剩余流量[::]\s*([0-9.]+\s*[KMGT]?B)', r'流量.*?[::]\s*([0-9.]+\s*[KMGT]?B)', r'总流量[::]\s*([0-9.]+\s*[KMGT]?B)', ] # 剩余天数提取正则表达式 self.days_patterns = [ r'距离下次重置.*?[::]\s*([0-9]+)\s*天', r'剩余.*?[::]\s*([0-9]+)\s*天', ] def load_subscriptions(self) -> List[Dict[str, str]]: """加载订阅配置文件""" try: with open(self.subscribe_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"加载订阅配置文件失败: {e}") return [] def fetch_subscription(self, url: str) -> str: """获取订阅内容并解码Base64""" try: headers = { 'User-Agent': 'ClashX/1.118.0 (com.west2online.ClashX; build:1.118.0; iOS 16.0.0) Alamofire/5.6.4' } response = requests.get(url, headers=headers, timeout=30) response.raise_for_status() # 尝试多种解码方式 content = response.content # 先尝试直接当作文本处理 try: text_content = content.decode('utf-8') # 检查是否已经是YAML格式 if text_content.strip().startswith(('mixed-port:', 'port:', 'socks-port:', 'redir-port:')): return text_content except: pass # 尝试Base64解码 try: # 去除可能的换行符和空格 text_content = content.decode('utf-8').strip() decoded_content = base64.b64decode(text_content).decode('utf-8') return decoded_content except: pass # 尝试其他编码 for encoding in ['utf-8', 'gbk', 'gb2312']: try: text_content = content.decode(encoding).strip() decoded_content = base64.b64decode(text_content).decode('utf-8') return decoded_content except: continue return "" except Exception as e: print(f"获取订阅失败 {url}: {e}") return "" def save_temp_yaml(self, group_name: str, content: str) -> bool: """保存临时YAML文件""" try: temp_file = self.temp_dir / f"{group_name}.yaml" with open(temp_file, 'w', encoding='utf-8') as f: f.write(content) print(f"已保存临时文件: {temp_file}") return True except Exception as e: print(f"保存临时文件失败 {group_name}: {e}") return False def load_template(self) -> Dict[str, Any]: """加载模板文件""" try: with open(self.template_file, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except Exception as e: print(f"加载模板文件失败: {e}") return {} def should_filter_proxy(self, proxy_name: str) -> bool: """检查是否应该过滤该代理节点""" for keyword in self.filter_keywords: if keyword in proxy_name: return True return False def should_filter_proxy_group_node(self, node_name: str) -> bool: """检查proxy-groups中的节点是否应该过滤""" filter_prefixes = ['Auto', '剩余流量', '套餐到期', '未到期'] for prefix in filter_prefixes: if node_name.startswith(prefix): return True return False def get_unique_name(self, original_name: str) -> str: """处理名称冲突,返回唯一名称""" if original_name not in self.used_names: self.used_names.add(original_name) return original_name counter = 1 while True: new_name = f"{original_name}_{counter}" if new_name not in self.used_names: self.used_names.add(new_name) return new_name counter += 1 def categorize_proxy_by_region(self, proxy_name: str) -> str: """根据节点名称判断所属地区""" proxy_name_lower = proxy_name.lower() for region, keywords in self.region_keywords.items(): for keyword in keywords: if keyword.lower() in proxy_name_lower or keyword in proxy_name: return region return None def extract_traffic_info(self, proxy_names: List[str]) -> Optional[str]: """从节点名称列表中提取流量信息""" for name in proxy_names: for pattern in self.traffic_patterns: match = re.search(pattern, name) if match: return match.group(1).strip() return None def extract_days_info(self, proxy_names: List[str]) -> Optional[int]: """从节点名称列表中提取剩余天数信息""" for name in proxy_names: for pattern in self.days_patterns: match = re.search(pattern, name) if match: return int(match.group(1)) return None def convert_traffic_to_gb(self, traffic_str: str) -> float: """将流量字符串转换为GB数值""" try: # 移除空格并转换为大写 traffic_str = traffic_str.replace(" ", "").upper() # 提取数字部分 import re match = re.match(r'([0-9.]+)([KMGT]?B?)', traffic_str) if not match: return 0.0 value = float(match.group(1)) unit = match.group(2).replace('B', '') if match.group(2) else '' # 转换为GB if unit == 'T' or unit == 'TB': return value * 1024 elif unit == 'G' or unit == 'GB' or unit == '': return value elif unit == 'M' or unit == 'MB': return value / 1024 elif unit == 'K' or unit == 'KB': return value / (1024 * 1024) else: return value # 默认当作GB处理 except Exception as e: print(f"转换流量失败 '{traffic_str}': {e}") return 0.0 def calculate_daily_average(self, traffic_gb: float, days: int) -> float: """计算平均每天剩余流量(GB)""" if days <= 0: return 0.0 return traffic_gb / days def read_replacement_target(self) -> Optional[str]: """读取Replacement.txt文件获取目标文件路径""" try: if not os.path.exists(self.replacement_file): print(f"Replacement.txt文件不存在,跳过文件替换") return None with open(self.replacement_file, 'r', encoding='utf-8') as f: target_path = f.read().strip() if not target_path: print(f"Replacement.txt文件为空,跳过文件替换") return None print(f"读取到目标文件路径: {target_path}") return target_path except Exception as e: print(f"读取Replacement.txt文件失败: {e}") return None def replace_target_file(self, target_path: str) -> bool: """使用merge.yaml内容替换目标文件""" try: # 检查目标文件是否存在 if not os.path.exists(target_path): print(f"目标文件不存在: {target_path}") return False # 检查merge.yaml是否存在 merge_file = "merge.yaml" if not os.path.exists(merge_file): print(f"merge.yaml文件不存在") return False # 读取merge.yaml内容 with open(merge_file, 'r', encoding='utf-8') as f: merge_content = f.read() # 替换目标文件内容 with open(target_path, 'w', encoding='utf-8') as f: f.write(merge_content) print(f"成功将merge.yaml内容替换到: {target_path}") return True except Exception as e: print(f"替换目标文件失败: {e}") return False def generate_base64_subscription(self) -> bool: """生成Base64编码的订阅内容并保存到res.txt""" try: # 检查merge.yaml是否存在 merge_file = "merge.yaml" if not os.path.exists(merge_file): print(f"merge.yaml文件不存在") return False # 读取merge.yaml内容 with open(merge_file, 'r', encoding='utf-8') as f: merge_content = f.read() # 编码为Base64 encoded_content = base64.b64encode(merge_content.encode('utf-8')).decode('utf-8') # 保存到res.txt with open('res.txt', 'w', encoding='utf-8') as f: f.write(encoded_content) print(f"已生成Base64编码订阅文件: res.txt") print(f"编码长度: {len(encoded_content)} 字符") return True except Exception as e: print(f"生成Base64订阅失败: {e}") return False def get_user_choice(self) -> str: """获取用户选择:文件替换或Base64编码""" while True: print("\n请选择输出方式:") print("1. 文件替换模式 (使用Replacement.txt中的路径)") print("2. Base64编码模式 (生成res.txt订阅文件)") choice = input("请输入选择 (1 或 2): ").strip() if choice == '1': return 'replace' elif choice == '2': return 'base64' else: print("无效选择,请输入 1 或 2") continue def process_temp_files(self) -> Dict[str, Any]: """处理所有临时文件,合并节点信息""" merged_data = { 'proxies': [], 'proxy_groups': [], 'region_nodes': { '香港': [], '新加坡': [], 'USA': [] }, 'group_traffic_info': {}, # 存储每个组的流量信息 'group_days_info': {}, # 存储每个组的剩余天数信息 'group_daily_average': {} # 存储每个组的平均每天剩余流量 } temp_files = list(self.temp_dir.glob("*.yaml")) for temp_file in temp_files: try: with open(temp_file, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: continue print(f"处理文件: {temp_file}") group_name = temp_file.stem # 获取文件名(不含扩展名) # 收集所有节点名称用于流量信息提取 all_proxy_names = [] if 'proxies' in data and data['proxies']: all_proxy_names = [proxy.get('name', '') for proxy in data['proxies']] # 提取流量信息 traffic_info = self.extract_traffic_info(all_proxy_names) if traffic_info: merged_data['group_traffic_info'][group_name] = traffic_info print(f"提取到流量信息: {group_name} -> {traffic_info}") # 提取剩余天数信息 days_info = self.extract_days_info(all_proxy_names) if days_info: merged_data['group_days_info'][group_name] = days_info print(f"提取到剩余天数: {group_name} -> {days_info}天") # 计算平均每天剩余流量 if traffic_info and days_info: traffic_gb = self.convert_traffic_to_gb(traffic_info) daily_average = self.calculate_daily_average(traffic_gb, days_info) merged_data['group_daily_average'][group_name] = daily_average print(f"平均每天剩余流量: {group_name} -> {daily_average:.2f}GB/天") # 处理proxies节点 if 'proxies' in data and data['proxies']: for proxy in data['proxies']: if 'name' in proxy: # 过滤不需要的节点 if self.should_filter_proxy(proxy['name']): continue # 处理名称冲突 unique_name = self.get_unique_name(proxy['name']) proxy['name'] = unique_name merged_data['proxies'].append(proxy) # 按地区分类节点 region = self.categorize_proxy_by_region(unique_name) if region: merged_data['region_nodes'][region].append(unique_name) # 处理proxy-groups(通常第一组包含该订阅站点的所有节点) if 'proxy-groups' in data and data['proxy-groups']: first_group = data['proxy-groups'][0].copy() if 'proxies' in first_group: # 过滤proxy-groups中的节点 filtered_proxies = [] for proxy_name in first_group['proxies']: if not self.should_filter_proxy_group_node(proxy_name): # 检查节点名称是否在我们的proxies中存在(处理过名称冲突后的) if proxy_name in self.used_names: filtered_proxies.append(proxy_name) first_group['proxies'] = filtered_proxies if filtered_proxies: # 只有当有有效节点时才添加 merged_data['proxy_groups'].append(first_group) except Exception as e: print(f"处理文件 {temp_file} 失败: {e}") continue return merged_data def generate_merge_yaml(self, merged_data: Dict[str, Any]) -> bool: """生成最终的merge.yaml文件""" try: # 加载模板 template = self.load_template() if not template: return False # 更新proxies template['proxies'] = merged_data['proxies'] # 更新proxy-groups # 首先添加合并的proxy-groups if merged_data['proxy_groups']: # 在AutoProxy后面插入订阅组 existing_groups = template.get('proxy-groups', []) auto_proxy_group = existing_groups[0].copy() # 获取AutoProxy组的副本 new_groups = [] # 收集新增组的名称和信息,用于排序 group_info_list = [] # 添加合并的组,并更新组名包含流量信息 merged_groups = [] for group in merged_data['proxy_groups']: group_name = group.get('name', '') # 检查是否有对应的流量信息 if group_name in merged_data['group_traffic_info']: traffic_info = merged_data['group_traffic_info'][group_name] new_name = f"{group_name}({traffic_info})" group['name'] = new_name # 获取平均每天剩余流量用于排序 daily_average = merged_data['group_daily_average'].get(group_name, 0.0) group_info_list.append({ 'name': new_name, 'daily_average': daily_average, 'original_name': group_name }) print(f"更新组名: {group_name} -> {new_name}") else: group_info_list.append({ 'name': group_name, 'daily_average': 0.0, 'original_name': group_name }) merged_groups.append(group) # 按平均每天剩余流量降序排序 group_info_list.sort(key=lambda x: x['daily_average'], reverse=True) sorted_group_names = [info['name'] for info in group_info_list] # 打印排序信息 print(f"按平均每天剩余流量排序:") for info in group_info_list: print(f" {info['name']}: {info['daily_average']:.2f}GB/天") # 更新AutoProxy组的proxies列表,按排序插入新组 if 'proxies' in auto_proxy_group: # 将排序后的新组名添加到AutoProxy的proxies列表头部 auto_proxy_group['proxies'] = sorted_group_names + auto_proxy_group['proxies'] print(f"已将排序后的组添加到AutoProxy组的头部") # 组装最终的proxy-groups列表 new_groups.append(auto_proxy_group) # 更新后的AutoProxy组 new_groups.extend(merged_groups) # 合并的订阅组 new_groups.extend(existing_groups[1:]) # 其余的地区组 template['proxy-groups'] = new_groups # 更新地区组的节点 for group in template['proxy-groups']: group_name = group.get('name', '') if group_name == 'Auto-香港': group['proxies'] = merged_data['region_nodes']['香港'] elif group_name == 'Auto-新加坡': group['proxies'] = merged_data['region_nodes']['新加坡'] elif group_name == 'Auto-USA': group['proxies'] = merged_data['region_nodes']['USA'] # 保存merge.yaml with open('merge.yaml', 'w', encoding='utf-8') as f: yaml.dump(template, f, default_flow_style=False, allow_unicode=True, sort_keys=False) print("已生成 merge.yaml 文件") return True except Exception as e: print(f"生成merge.yaml失败: {e}") return False def run(self): """运行主程序""" print("开始处理Clash订阅合并...") # 1. 加载订阅配置 subscriptions = self.load_subscriptions() if not subscriptions: print("没有找到有效的订阅配置") return # 2. 获取并保存每个订阅的内容 for subscription in subscriptions: group_name = subscription.get('group_name', '') url = subscription.get('url', '') if not group_name or not url: print(f"跳过无效订阅: {subscription}") continue # 检查是否已存在临时文件 temp_file = self.temp_dir / f"{group_name}.yaml" if temp_file.exists(): print(f"使用已存在的临时文件: {group_name}") continue print(f"正在处理订阅: {group_name}") content = self.fetch_subscription(url) if content: self.save_temp_yaml(group_name, content) else: print(f"获取订阅内容失败: {group_name}") # 3. 处理临时文件并合并 merged_data = self.process_temp_files() # 4. 生成最终文件 success = self.generate_merge_yaml(merged_data) if success: print(f"\n合并完成!") print(f"总共合并了 {len(merged_data['proxies'])} 个节点") print(f"香港节点: {len(merged_data['region_nodes']['香港'])} 个") print(f"新加坡节点: {len(merged_data['region_nodes']['新加坡'])} 个") print(f"美国节点: {len(merged_data['region_nodes']['USA'])} 个") print(f"其他节点组: {len(merged_data['proxy_groups'])} 个") # 获取用户选择 choice = self.get_user_choice() if choice == 'replace': # 执行文件替换 print(f"\n检查文件替换配置...") target_path = self.read_replacement_target() if target_path: replacement_success = self.replace_target_file(target_path) if replacement_success: print(f"文件替换成功!") else: print(f"文件替换失败!") else: print(f"未配置文件替换或配置无效") elif choice == 'base64': # 生成Base64编码订阅 print(f"\n生成Base64编码订阅...") base64_success = self.generate_base64_subscription() if base64_success: print(f"Base64编码订阅生成成功!") print(f"\n订阅链接配置建议:") print(f"1. 将res.txt文件放置到Web服务器目录") print(f"2. 配置Nginx提供订阅服务") print(f"3. 使用本地订阅链接: http://your-domain/subscription") else: print(f"Base64编码订阅生成失败!") else: print("合并失败!") if __name__ == "__main__": merger = ClashSubscriptionMerger() merger.run()