1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
| import requests import time import schedule import os import socket import struct from datetime import datetime import configparser
CONFIG_FILE = "config.ini"
config = configparser.ConfigParser() try: config.read(CONFIG_FILE)
ZONE = config.get('DDNS', 'ZONE') TOKEN = config.get('DDNS', 'TOKEN') UPDATE_URL = config.get('DDNS', 'UPDATE_URL') LOG_FILE = config.get('Paths', 'LOG_FILE') LAST_IP_FILE = config.get('Paths', 'LAST_IP_FILE')
except configparser.Error as e: conprint(f"【ERR】 错误: 无法读取配置文件 {CONFIG_FILE} 或缺少必要的配置项: {e}") conprint("【ERR】 请确保 config.ini 文件存在,且包含 [DDNS] 和 [Paths] 段落以及所有必要的键。") exit(1)
try: import miniupnpc UPNP_ENABLED = True except ImportError: UPNP_ENABLED = False conprint("【WARN】 未安装 miniupnpc 库,无法通过 UPNP 获取 IPv4 地址。请按照说明安装。")
def log_and_print(message): """ 记录日志到文件并打印到控制台 """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" print(log_entry) with open(LOG_FILE, "a") as log_file: log_file.write(log_entry + "\n")
def conprint(message): """ 打印到控制台 """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") console_entry = f"[{timestamp}] {message}" print(console_entry)
def read_last_ip(): """ 从文件读取上次保存的 IP 地址 """ if os.path.exists(LAST_IP_FILE): with open(LAST_IP_FILE, "r") as f: lines = f.readlines() ipv4 = lines[0].strip() if len(lines) > 0 else None ipv6 = lines[1].strip() if len(lines) > 1 else None return ipv4, ipv6 return None, None
def write_last_ip(ipv4, ipv6): """ 将当前 IP 地址写入文件 """ with open(LAST_IP_FILE, "w") as f: f.write(f"{ipv4 if ipv4 else ''}\n") f.write(f"{ipv6 if ipv6 else ''}\n")
def get_local_ipv6(): """ 获取本地公网 IPv6 地址。 通过遍历本地网卡地址,判断是否为全球单播地址。 """ ipv6_addresses = [] try: for info in socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET6): ip = info[4][0] if ip.startswith("fe80:") or ip.startswith("fc00:") or ip.startswith("fd00:") or ip.startswith("::1"): continue if ip.startswith("2") or ip.startswith("3"): ipv6_addresses.append(ip) if ipv6_addresses: return ipv6_addresses[0] else: log_and_print("【ERR】 未找到公网 IPv6 地址。") return None except Exception as e: log_and_print(f"【ERR】 获取本地 IPv6 地址失败: {e}") return None
def get_upnp_external_ip(): """ 通过 UPNP 从路由器获取外部 IPv4 地址。 """ if not UPNP_ENABLED: log_and_print("【ERR】 UPNP 未启用或 miniupnpc 库未安装,无法获取外部 IPv4 地址。") return None try: u = miniupnpc.UPnP() u.discoverdelay = 200 conprint("[INFO] 正在发现 UPNP 设备...") ndevices = u.discover() conprint(f"[INFO] 发现 {ndevices} 个 UPNP 设备。") if ndevices > 0: igd_selected_result = u.selectigd()
try: external_ip = u.externalipaddress() if external_ip: conprint(f"[INFO] 通过 UPNP 获取到外部 IPv4 地址: {external_ip}") return external_ip else: log_and_print("【ERR】 没有获取到外部IPv4地址!请检查Upnp设备及连接。") return None except Exception as e: log_and_print(f"【ERR】 获取外部 IPv4 地址时发生错误: {e}") import traceback log_and_print(traceback.format_exc()) return None else: log_and_print("【ERR】 未发现 UPNP 设备!") return None except Exception as e: log_and_print(f"【ERR】 通过 UPNP 获取外部 IPv4 地址失败(发现或选择IGD阶段): {e}") import traceback log_and_print(traceback.format_exc()) return None
def update_ddns(ipv4, ipv6): """ 更新 DDNS 服务 """ try: if ipv4: response = requests.get(UPDATE_URL, params={ "ipv4": ipv4, "zone": ZONE, "token": TOKEN }) if response.status_code == 200: log_and_print(f"【SUCC】 IPv4 更新成功: {ipv4}") else: log_and_print(f"【ERR】 IPv4 更新失败: {response.status_code}, {response.text}")
if ipv6: response = requests.get(UPDATE_URL, params={ "ipv6": ipv6, "zone": ZONE, "token": TOKEN }) if response.status_code == 200: log_and_print(f"【SUCC】 IPv6 更新成功: {ipv6}") else: log_and_print(f"【ERR】 IPv6 更新失败: {response.status_code}, {response.text}")
except requests.RequestException as e: log_and_print(f"【ERR】 更新 DDNS 失败: {e}")
def check_and_update(): """ 检查当前 IP 地址并更新 DDNS """ last_ipv4, last_ipv6 = read_last_ip()
current_ipv4 = get_upnp_external_ip() current_ipv6 = get_local_ipv6()
ipv4_changed = False ipv6_changed = False
if current_ipv4 and current_ipv4 != last_ipv4: log_and_print(f"[INFO] 检测到 IPv4 地址改变: {last_ipv4 if last_ipv4 else '首次获取'} -> {current_ipv4}") ipv4_changed = True elif current_ipv4: conprint(f"[INFO] IPv4 地址未改变: {current_ipv4}") else: log_and_print("【ERR】 未能获取到有效的 IPv4 地址。")
if current_ipv6 and current_ipv6 != last_ipv6: log_and_print(f"[INFO] 检测到 IPv6 地址改变: {last_ipv6 if last_ipv6 else '首次获取'} -> {current_ipv6}") ipv6_changed = True elif current_ipv6: conprint(f"[INFO] IPv6 地址未改变: {current_ipv6}") else: log_and_print("【ERR】 未能获取到有效的 IPv6 地址。")
if ipv4_changed or ipv6_changed: log_and_print("[INFO] IP地址发生变化,正在更新DDNS记录...") update_ddns(ipv4=current_ipv4 if ipv4_changed else None, ipv6=current_ipv6 if ipv6_changed else None) write_last_ip(current_ipv4, current_ipv6) else: conprint("[INFO] IP地址未发生变化,无需更新DDNS记录。")
schedule.every(15).minutes.do(check_and_update)
if __name__ == "__main__": log_and_print("——————————————————————————————") log_and_print("[INFO] DDNS 更新服务已启动,每隔 5 分钟检查一次 IP 地址变化...") check_and_update()
while True: schedule.run_pending() time.sleep(1)
|