feat: initial commit

This commit is contained in:
ptrcnull 2024-03-16 06:57:28 +01:00
commit c1afc62c17
Signed by: ptrcnull
GPG key ID: 411F7B30801DD9CA
10 changed files with 385 additions and 0 deletions

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
**/__pycache__/
.dist/
dist/
.mypy_cache/
.ruff_cache/
nyacme.toml

9
LICENSE.txt Normal file
View file

@ -0,0 +1,9 @@
Copyright (c) 2024 ptrcnull
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

86
nyacme/__main__.py Executable file
View file

@ -0,0 +1,86 @@
#!/usr/bin/env python3
import argparse
import os.path
import subprocess
import logging
import shutil
from datetime import datetime
from config import read_config
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(name)s: %(message)s')
log = logging.getLogger('nyacme')
def main() -> None:
parser = argparse.ArgumentParser(
prog='nyacme',
description='fun uacme wrapper'
)
parser.add_argument('-c', '--config', default='/etc/nyacme.toml')
parser.add_argument('-o', '--output', default='/etc/ssl/uacme')
args = parser.parse_args()
config = read_config(args.config)
acquired = False
for domain in config.domains:
# arguments passed to uacme
uacme_domains = [domain]
if domain.startswith('*.'):
uacme_domains = [ domain[2:], domain ]
domain = domain[2:]
cert_path = f'{args.output}/{domain}/cert.pem'
if os.path.exists(cert_path):
out = subprocess.run([ 'openssl', 'x509', '-enddate', '-noout', '-in', cert_path ], stdout=subprocess.PIPE, check=True).stdout.decode('utf-8').strip()
date = datetime.strptime(out, 'notAfter=%b %d %H:%M:%S %Y %Z')
# if more than 1 month, skip
delta = date - datetime.now()
if delta.days > 30:
log.info(f'cert for {domain} expires in more than a month ({delta.days} days), skipping')
# continue
log.info(f'getting cert for {domain}')
env = os.environ.copy()
env['NYACME_CONFIG'] = args.config
hook_path = shutil.which('nyacme-hook')
if not hook_path:
hook_path = os.path.join(os.path.dirname(__file__), 'hook.py')
res = subprocess.run([
'uacme', '-v',
'--hook', hook_path,
'--confdir', args.output,
'-b', '384',
'--type', 'EC',
# '--force',
'issue'
] + uacme_domains, env=env)
if res.returncode == 0:
acquired = True
private_key = os.path.join(args.output, f'private/{domain}/key.pem')
domain_key = os.path.join(args.output, f'{domain}/cert.pem.key')
domain_pem = os.path.join(args.output, f'{domain}/cert.pem')
shutil.copy2(private_key, domain_key)
# TODO: add user/group to config
shutil.chown(domain_key, 'acme', 'acme')
os.chmod(domain_key, 0o440)
all_pem = os.path.join(args.output, f'all/{domain}.pem')
all_key = os.path.join(args.output, f'all/{domain}.pem.key')
shutil.copy2(domain_pem, all_pem)
shutil.copy2(domain_key, all_key)
if acquired:
for cmd in config.post_acquire:
subprocess.run(cmd, shell=True, check=True)
if __name__ == '__main__':
main()

80
nyacme/config.py Normal file
View file

@ -0,0 +1,80 @@
from typing import Optional
import tomllib
import logging
import sys
import os
log = logging.getLogger(__name__)
class Config:
post_acquire: list[str]
domains: dict[str, str]
secrets: dict[str, str]
acme_path: str
def find_zone(self, domain: str) -> str:
parts = domain.split('.')
for i in range(len(parts)-1):
zone = '.'.join(parts[i:])
if '.'.join(parts[i:]) in self.domains:
return zone
log.error('could not find zone for domain %s', domain)
sys.exit(1)
def get_handler(self, domain: str) -> str:
return self.domains[domain]
def get_secret(self, handler: str) -> str:
return self.secrets[handler]
def read_config(path: Optional[str]) -> Config:
if not path:
# should be here only when running from hook
path = os.getenv('NYACME_CONFIG', '/etc/nyacme.toml')
with open(path, 'rb') as file:
raw_conf = tomllib.load(file)
for key in raw_conf:
if key not in ('domains', 'secrets'):
log.warning('unknown config key: %s', key)
c = Config()
if 'domains' not in raw_conf:
log.error('missing "domains"')
sys.exit(1)
for k, v in raw_conf['domains'].items():
assert isinstance(k, str), f'domain "{k}" is not a string'
assert isinstance(v, str), f'domain "{k}" handler {v} is not a string'
c.domains = raw_conf['domains']
if 'secrets' not in raw_conf:
log.error('missing "secrets"')
sys.exit(1)
for k, v in raw_conf['secrets'].items():
assert isinstance(k, str), f'secret key "{k}" is not a string'
assert isinstance(v, str), f'secret "{k}" value {v} is not a string'
c.secrets = raw_conf['secrets']
post_acquire = []
if 'post_acquire' in raw_conf:
assert isinstance(raw_conf['post_acquire'], list), 'post_acquire is not a list'
for cmd in raw_conf['post_acquire']:
assert isinstance(cmd, str), 'post_acquire item has to be a string'
post_acquire.append(cmd)
c.post_acquire = post_acquire
if 'acme_path' in raw_conf:
c.acme_path = raw_conf['acme_path']
else:
c.acme_path = '/var/www/acme/.well-known/acme-challenge'
return c

View file

@ -0,0 +1,5 @@
from .hetzner import HetznerHandler
__all__ = [
'HetznerHandler'
]

25
nyacme/handlers/base.py Normal file
View file

@ -0,0 +1,25 @@
import logging
class Handler:
zone: str
config: Config
token: str
nameservers: list[str]
def __init__(self, zone: str, config: Config, token: str) -> None:
self.log = logging.getLogger('nyacme_hook.' + type(self).__name__)
self.zone = zone
self.config = config
self.token = token
def create(self, record_name: str, record_value: str) -> None:
'''
Create a record with a given name and value.
'''
pass
def remove(self, record_name: str) -> None:
'''
Remove a record, or do nothing in case record does not exist.
'''
pass

View file

@ -0,0 +1,60 @@
import urllib.request
import json
from typing import Optional, Any
from .base import Handler
class HetznerHandler(Handler):
# discovered
zone_id: str
nameservers: list[str]
def __init__(self, zone_name: str, config: Config, token: str) -> None:
super().__init__(zone_name, config, token)
self.secret = config.get_secret('hetzner')
zones = self.fetch('/zones')['zones']
for zone in zones:
if zone['name'] == zone_name:
self.zone_id = zone['id']
self.nameservers = zone['ns']
break
def fetch(self, url: str, data: Optional[Any] = None, **kwargs: str) -> Any:
req = urllib.request.Request('https://dns.hetzner.com/api/v1' + url)
req.add_header('Auth-API-Token', self.secret)
method = 'GET'
if data:
req.data = json.dumps(data).encode('utf-8')
req.add_header('Content-Type', 'application/json;charset=utf-8')
method = 'POST'
req.method = kwargs.get('method', method)
try:
with urllib.request.urlopen(req) as f:
return json.load(f)
except urllib.error.HTTPError as ex:
self.log.error('cannot %s %s: %s', req.method, url, ex)
res = ex.fp.read().decode('utf-8')
try:
raise Exception(json.loads(res)['error'])
except Exception:
raise Exception(res)
def create(self, record_name: str, record_value: str) -> None:
self.remove(record_name)
self.log.info('creating %s with value %s', record_name, record_value)
self.fetch('/records', {
'value': record_value,
'ttl': 300,
'type': 'TXT',
'name': record_name,
'zone_id': self.zone_id,
})
def remove(self, record_name: str) -> None:
records = self.fetch(f'/records?zone_id={self.zone_id}')['records']
for record in records:
if record['name'] == record_name:
self.log.info('removing %s', record_name)
self.fetch(f'/records/{record["id"]}', method='DELETE')

17
nyacme/handlers/http.py Normal file
View file

@ -0,0 +1,17 @@
import os
from .base import Handler
from ..config import Config
class HTTPHandler(Handler):
def __init__(self, zone: str, config: Config, token: str) -> None:
super().__init__(zone_name, config, token)
self.filepath = os.path.join(config.acme_path, token)
def create(self, record_name: str, record_value: str) -> None:
with open(self.filepath, 'w') as f:
f.write(record_value)
def remove(self, record_name: str) -> None:
if os.path.isfile(self.filepath):
os.unlink(self.filepath)

73
nyacme/hook.py Executable file
View file

@ -0,0 +1,73 @@
#!/usr/bin/env python3
import argparse
import logging
from itertools import chain
import time
import sys
import dns.resolver
from config import read_config
from handlers import HetznerHandler
logging.basicConfig(level=logging.INFO, format='> [%(levelname)s] %(name)s: %(message)s')
log = logging.getLogger('nyacme_hook')
handlers = {'hetzner': HetznerHandler}
def main() -> None:
parser = argparse.ArgumentParser(
prog='nyacme-hook',
description='nyacme hook (not meant to be ran manually)'
)
parser.add_argument('method', help='one of begin, done or failed')
parser.add_argument('type', help='challenge type (dns-01, http-01 or tls-alpn-01)')
parser.add_argument('domain', help='the identifier the challenge refers to (domain name)')
parser.add_argument('token', help='the challenge token')
parser.add_argument('auth', help='the key authorization (DNS record contents, etc.)')
args = parser.parse_args()
config = read_config(None)
record_name = f'_acme-challenge.{args.domain}'
zone_name = config.find_zone(args.domain)
short_record_name = record_name.replace('.' + zone_name, '')
handler_name = config.get_handler(zone_name)
handler = handlers[handler_name](zone_name, config)
if args.method == 'begin':
handler.create(short_record_name, args.auth)
else:
handler.remove(short_record_name)
if args.type == 'dns-01':
resolver = dns.resolver.Resolver('', configure=False)
resolver.nameservers = list(chain.from_iterable(list(map(resolve4, handler.nameservers)) + list(map(resolve6, handler.nameservers))))
for i in range(5):
log.info('checking DNS (attempt %d/5)', i+1)
try:
res = resolver.resolve(record_name, 'TXT')
values = list(map(lambda rdata: rdata.to_text().strip('"'), res))
except dns.resolver.NXDOMAIN:
values = []
log.info('response from DNS: %s', values)
if (args.method == 'begin') == (args.auth in values):
sys.exit(0)
time.sleep(5)
log.warning('could not ensure the DNS record was created!!')
def resolve4(addr: str) -> list[str]:
res = dns.resolver.resolve(addr, 'A')
return list(map(str, res))
def resolve6(addr: str) -> list[str]:
res = dns.resolver.resolve(addr, 'AAAA')
return list(map(str, res))
if __name__ == '__main__':
main()

24
pyproject.toml Normal file
View file

@ -0,0 +1,24 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "nyacme"
version = "0.1.0"
authors = [
{name = "Patrycja Rosa", email = "python@ptrcnull.me"},
]
description = "a uacme wrapper that maybe probably doesn't suck too much"
readme = "README.md"
requires-python = ">=3.7"
license = {text = "BSD-2-Clause"}
classifiers = [
"Programming Language :: Python :: 3",
]
dependencies = [
"dnspython >= 2.4.2"
]
[project.scripts]
nyacme = "nyacme.main:main"
nyacme-hook = "nyacme.hook:main"