This commit is contained in:
root 2025-05-22 03:14:10 +00:00
parent 59bffca8f8
commit 593c08a069
2 changed files with 42 additions and 29 deletions

View File

@ -15,35 +15,48 @@ class AliyunDNS:
def params(self, params): # 请求体公共加密 def params(self, params): # 请求体公共加密
params = { params = {
'Format': 'JSON', "Format": "JSON",
'Version': '2015-01-09', "Version": "2015-01-09",
'AccessKeyId': self.ACCESS_KEY_ID, "AccessKeyId": self.ACCESS_KEY_ID,
'SignatureMethod': 'HMAC-SHA1', "SignatureMethod": "HMAC-SHA1",
'SignatureVersion': '1.0', "SignatureVersion": "1.0",
'SignatureNonce': str(time.time()), "SignatureNonce": str(time.time()),
'Timestamp': datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), "Timestamp": datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
**params **params
} }
# 加密验证签名部分 # 加密验证签名部分
sign = sorted(params.items(), key=lambda x: x[0]) sign = sorted(params.items(), key=lambda x: x[0])
sign = '&'.join([urllib.parse.quote_plus(k) + '=' + urllib.parse.quote_plus(v) for k, v in sign]) sign = "&".join([urllib.parse.quote_plus(k) + "=" + urllib.parse.quote_plus(v) for k, v in sign])
sign = 'GET&%2F&' + urllib.parse.quote_plus(sign) sign = "GET&%2F&" + urllib.parse.quote_plus(sign)
sign = base64.b64encode( sign = base64.b64encode(
hmac.new( hmac.new(
self.ACCESS_KEY_SECRET.encode('utf-8') + b'&', self.ACCESS_KEY_SECRET.encode("utf-8") + b"&",
sign.encode('utf-8'), sign.encode("utf-8"),
hashlib.sha1 hashlib.sha1
).digest() ).digest()
).decode('utf-8') ).decode("utf-8")
params['Signature'] = sign params["Signature"] = sign
return params return params
def DescribeDomainRecords(self, DomainName, PageSize=100): # 查看所有解析记录 def DescribeDomains(self, PageNumber=1, PageSize=100): # 查看所有域名
response = requests.get( response = requests.get(
url=self.BASE_URL, url=self.BASE_URL,
params=self.params(params={ params=self.params(params={
'Action': 'DescribeDomainRecords', "Action": "DescribeDomains",
'DomainName': DomainName, "PageNumber": str(PageNumber),
"PageSize": str(PageSize)
})
)
assert response.status_code == 200
return response.json()
def DescribeDomainRecords(self, DomainName, PageNumber=1, PageSize=100): # 查看所有解析记录
response = requests.get(
url=self.BASE_URL,
params=self.params(params={
"Action": "DescribeDomainRecords",
"DomainName": DomainName,
"PageNumber": str(PageNumber),
"PageSize": str(PageSize) "PageSize": str(PageSize)
}) })
) )
@ -54,9 +67,9 @@ class AliyunDNS:
response = requests.get( response = requests.get(
url=self.BASE_URL, url=self.BASE_URL,
params=self.params(params={ params=self.params(params={
'Action': 'AddDomainRecord', "Action": "AddDomainRecord",
'DomainName': DomainName, "DomainName": DomainName,
'RR': RR, 'Type': Type, 'Value': Value, 'Line': Line, 'TTL': str(TTL) "RR": RR, "Type": Type, "Value": Value, "Line": Line, "TTL": str(TTL)
}) })
) )
assert response.status_code == 200 assert response.status_code == 200
@ -66,10 +79,9 @@ class AliyunDNS:
response = requests.get( response = requests.get(
url=self.BASE_URL, url=self.BASE_URL,
params=self.params(params={ params=self.params(params={
'Action': 'DeleteDomainRecord', "Action": "DeleteDomainRecord",
'RecordId': RecordId, "RecordId": RecordId,
}) })
) )
assert response.status_code == 200 assert response.status_code == 200
return response.json() return response.json()

View File

@ -6,7 +6,6 @@ import uvicorn
from dns import AliyunDNS from dns import AliyunDNS
# Aliyun DNS 相关配置 # Aliyun DNS 相关配置
DOMAIN_NAMES = str(os.getenv("DOMAIN_NAMES", "")).split(",")
BASE_URL = str(os.getenv("BASE_URL", "http://alidns.aliyuncs.com")) BASE_URL = str(os.getenv("BASE_URL", "http://alidns.aliyuncs.com"))
ACCESS_KEY_ID = str(os.getenv("ACCESS_KEY_ID", "")) ACCESS_KEY_ID = str(os.getenv("ACCESS_KEY_ID", ""))
ACCESS_KEY_SECRET = str(os.getenv("ACCESS_KEY_SECRET", "")) ACCESS_KEY_SECRET = str(os.getenv("ACCESS_KEY_SECRET", ""))
@ -20,12 +19,14 @@ SERVER_PORT = int(os.getenv("SERVER_PORT", "60000"))
class Server: class Server:
def __init__(self): def __init__(self):
self.aliyun_dns = AliyunDNS(BASE_URL, ACCESS_KEY_ID, ACCESS_KEY_SECRET) self.dns = AliyunDNS(BASE_URL, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
self.resolutions = {domain_name: list() for domain_name in DOMAIN_NAMES} domains = self.dns.DescribeDomains()
domains = [domain["DomainName"] for domain in domains["Domains"]["Domain"]]
self.resolutions = {domain: list() for domain in domains}
def domain(self, domain): def domain(self, domain):
self.resolutions[domain].clear() self.resolutions[domain].clear()
records = self.aliyun_dns.DescribeDomainRecords(DomainName=domain) records = self.dns.DescribeDomainRecords(DomainName=domain)
for record in records["DomainRecords"]["Record"]: for record in records["DomainRecords"]["Record"]:
self.resolutions[domain].append( self.resolutions[domain].append(
[record["RR"], record["Value"], record["RecordId"]]) [record["RR"], record["Value"], record["RecordId"]])
@ -33,7 +34,7 @@ class Server:
def webui(self, app, path): def webui(self, app, path):
with gradio.Blocks() as webui: with gradio.Blocks() as webui:
domain = gradio.Dropdown(label="主域名", choices=DOMAIN_NAMES, value=None) domain = gradio.Dropdown(label="主域名", choices=list(self.resolutions.keys()), value=None)
editor = gradio.DataFrame(label="编辑解析信息", headers=["子域名", "解析地址", "记录编码"], editor = gradio.DataFrame(label="编辑解析信息", headers=["子域名", "解析地址", "记录编码"],
row_count=100, col_count=(3, "fixed"), datatype=["str", "str", "str"], type="pandas") row_count=100, col_count=(3, "fixed"), datatype=["str", "str", "str"], type="pandas")
update = gradio.Button(value="更新解析信息") update = gradio.Button(value="更新解析信息")
@ -55,11 +56,11 @@ class Server:
for rr, ip, id in (now - last): for rr, ip, id in (now - last):
if rr == "" or ip == "": continue # 子域名或解析地址为空则不做解析 if rr == "" or ip == "": continue # 子域名或解析地址为空则不做解析
print(f"创建解析:{domain} {rr} {ip}") print(f"创建解析:{domain} {rr} {ip}")
self.aliyun_dns.AddDomainRecord(DomainName=domain, RR=rr, Type="A", Value=ip, Line="default", TTL=600) self.dns.AddDomainRecord(DomainName=domain, RR=rr, Type="A", Value=ip, Line="default", TTL=600)
for rr, ip, id in (last - now): for rr, ip, id in (last - now):
print(f"删除解析:{domain} {rr} {ip} {id}") print(f"删除解析:{domain} {rr} {ip} {id}")
self.aliyun_dns.DeleteDomainRecord(RecordId=id) self.dns.DeleteDomainRecord(RecordId=id)
self.domain(domain=domain) # 更新解析信息 self.domain(domain=domain) # 更新解析信息
return self.resolutions[domain] return self.resolutions[domain]