86 lines
2.8 KiB
Python
86 lines
2.8 KiB
Python
import argparse
|
|
import logging
|
|
import sys
|
|
import time
|
|
from itertools import chain
|
|
|
|
import dns.resolver
|
|
|
|
from .config import read_config
|
|
from .handlers import HetznerHandler, HTTPHandler
|
|
|
|
logging.basicConfig(level=logging.INFO, format='> [%(levelname)s] %(name)s: %(message)s')
|
|
log = logging.getLogger('nyacme_hook')
|
|
|
|
|
|
handlers = {
|
|
'hetzner': HetznerHandler,
|
|
'http': HTTPHandler,
|
|
}
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
prog='nyacme-hook',
|
|
description='nyacme hook (not meant to be ran manually)'
|
|
)
|
|
parser.add_argument('method', help='one of begin, done or failed')
|
|
parser.add_argument('type', help='challenge type (dns-01, http-01 or tls-alpn-01)')
|
|
parser.add_argument('domain', help='the identifier the challenge refers to (domain name)')
|
|
parser.add_argument('token', help='the challenge token')
|
|
parser.add_argument('auth', help='the key authorization (DNS record contents, etc.)')
|
|
args = parser.parse_args()
|
|
|
|
config = read_config(None)
|
|
|
|
record_name = f'_acme-challenge.{args.domain}'
|
|
zone_name = config.find_zone(args.domain)
|
|
short_record_name = record_name.replace('.' + zone_name, '')
|
|
|
|
handler_name = config.get_handler(zone_name)
|
|
handler = handlers[handler_name](zone_name, config, args.token)
|
|
|
|
if (args.type == 'dns-01') == (handler_name == 'http'):
|
|
log.error('wrong handler for type, try again')
|
|
sys.exit(1)
|
|
|
|
if args.method == 'begin':
|
|
handler.create(short_record_name, args.auth)
|
|
else:
|
|
handler.remove(short_record_name)
|
|
|
|
if args.type == 'dns-01':
|
|
resolver = dns.resolver.Resolver('', configure=False)
|
|
ns4 = list(map(resolve4, handler.nameservers))
|
|
ns6 = list(map(resolve6, handler.nameservers))
|
|
resolver.nameservers = list(chain.from_iterable(ns4 + ns6))
|
|
for i in range(5):
|
|
log.info('checking DNS (attempt %d/5)', i+1)
|
|
try:
|
|
res = resolver.resolve(record_name, 'TXT')
|
|
values = list(map(lambda rdata: rdata.to_text().strip('"'), res))
|
|
except dns.resolver.NXDOMAIN:
|
|
values = []
|
|
log.info('response from DNS: %s', values)
|
|
|
|
if (args.method == 'begin') == (args.auth in values):
|
|
if args.method == 'begin':
|
|
# success, but have to wait
|
|
# because let's encrypt dns is ??? slow
|
|
time.sleep(10)
|
|
sys.exit(0)
|
|
|
|
time.sleep(5)
|
|
log.warning('could not ensure the DNS record was created!!')
|
|
|
|
def resolve4(addr: str) -> list[str]:
|
|
res = dns.resolver.resolve(addr, 'A')
|
|
return list(map(str, res))
|
|
|
|
def resolve6(addr: str) -> list[str]:
|
|
res = dns.resolver.resolve(addr, 'AAAA')
|
|
return list(map(str, res))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|