wip: aaaaaaaa

This commit is contained in:
ptrcnull 2024-02-19 02:43:45 +01:00
parent 26efd8e1a7
commit 63560563ea
3 changed files with 251 additions and 50 deletions

75
kakushi/dhcrypto.py Normal file
View file

@ -0,0 +1,75 @@
# stolen from https://github.com/mitya57/secretstorage/blob/acf549440f86062a8f616530ff9a487fafffd92c/secretstorage/dhcrypto.py
import hmac
import math
import os
from hashlib import sha256
from typing import Optional
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# A standard 1024 bits (128 bytes) prime number for use in Diffie-Hellman exchange
DH_PRIME_1024_BYTES = (
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xC9, 0x0F, 0xDA, 0xA2, 0x21, 0x68,
0xC2, 0x34, 0xC4, 0xC6, 0x62, 0x8B, 0x80, 0xDC, 0x1C, 0xD1, 0x29, 0x02, 0x4E, 0x08,
0x8A, 0x67, 0xCC, 0x74, 0x02, 0x0B, 0xBE, 0xA6, 0x3B, 0x13, 0x9B, 0x22, 0x51, 0x4A,
0x08, 0x79, 0x8E, 0x34, 0x04, 0xDD, 0xEF, 0x95, 0x19, 0xB3, 0xCD, 0x3A, 0x43, 0x1B,
0x30, 0x2B, 0x0A, 0x6D, 0xF2, 0x5F, 0x14, 0x37, 0x4F, 0xE1, 0x35, 0x6D, 0x6D, 0x51,
0xC2, 0x45, 0xE4, 0x85, 0xB5, 0x76, 0x62, 0x5E, 0x7E, 0xC6, 0xF4, 0x4C, 0x42, 0xE9,
0xA6, 0x37, 0xED, 0x6B, 0x0B, 0xFF, 0x5C, 0xB6, 0xF4, 0x06, 0xB7, 0xED, 0xEE, 0x38,
0x6B, 0xFB, 0x5A, 0x89, 0x9F, 0xA5, 0xAE, 0x9F, 0x24, 0x11, 0x7C, 0x4B, 0x1F, 0xE6,
0x49, 0x28, 0x66, 0x51, 0xEC, 0xE6, 0x53, 0x81, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF
)
def int_to_bytes(number: int) -> bytes:
return number.to_bytes(math.ceil(number.bit_length() / 8), 'big')
DH_PRIME_1024 = int.from_bytes(DH_PRIME_1024_BYTES, 'big')
class Session:
def __init__(self) -> None:
self.object_path: Optional[str] = None
self.aes_key: Optional[bytes] = None
self.encrypted = False
# 128-bytes-long strong random number
self.my_private_key = int.from_bytes(os.urandom(0x80), 'big')
self.my_public_key = pow(2, self.my_private_key, DH_PRIME_1024)
def set_client_public_key(self, client_public_key: int) -> None:
self.encrypted = True
common_secret_int = pow(client_public_key, self.my_private_key,
DH_PRIME_1024)
common_secret = int_to_bytes(common_secret_int)
# Prepend NULL bytes if needed
common_secret = b'\x00' * (0x80 - len(common_secret)) + common_secret
# HKDF with null salt, empty info and SHA-256 hash
salt = b'\x00' * 0x20
pseudo_random_key = hmac.new(salt, common_secret, sha256).digest()
output_block = hmac.new(pseudo_random_key, b'\x01', sha256).digest()
# Resulting AES key should be 128-bit
self.aes_key = output_block[:0x10]
def get_my_public_key(self):
return int_to_bytes(self.my_public_key)
def encrypt(self, secret):
if isinstance(secret, str):
secret = secret.encode('utf-8')
if not self.encrypted:
return b'', secret
padding = 0x10 - (len(secret) & 0xf)
secret += bytes((padding,) * padding)
aes_iv = os.urandom(0x10)
aes = algorithms.AES(self.aes_key)
encryptor = Cipher(aes, modes.CBC(aes_iv), default_backend()).encryptor()
encrypted_secret = encryptor.update(secret) + encryptor.finalize()
return aes_iv, encrypted_secret

View file

@ -2,6 +2,29 @@ import socket
import os
import os.path
import typing
import shlex
class Key(dict):
def from_line(line):
key = Key()
for token in shlex.split(line):
if '=' in token:
k, v = token.split('=')
key[k] = v
elif token.endswith('!'):
key[token] = None
return key
def has_secret(self):
return any(map(lambda kv: kv[0].endswith('!'), self.items()))
def to_query(self):
return dict(filter(lambda kv: kv[1] is not None, self.items()))
# this is silly but apparently according to the freedesktop shitspec there can only be one,
# so return whichever is first
def get_secret(self):
return next(filter(lambda kv: kv[0].endswith('!'), self.items()))[1]
class Client:
def __init__(self, addr = None):
@ -30,12 +53,25 @@ class Client:
self.sock.send(qs.encode('utf-8'))
raw_res = self._read_raw_response()
return raw_res
res = []
for line in raw_res:
if line.startswith('error '):
raise Exception(line[6:])
if not line.startswith('key '):
raise Exception(f'parse error: {line}')
line = line[4:]
# parse 'key ...' line
key = Key.from_line(line)
res.append(key)
return res
def _read_raw_response(self):
res = ''
while not '\nend\n' in res:
res += self.sock.recv(4096).decode('utf-8')
if res.startswith('end\n'):
break
res = res.strip()
res_lines = res.splitlines()
res_lines.pop()

View file

@ -1,48 +1,53 @@
from curses import KEY_CANCEL
from dataclasses import dataclass
from random import randint, random
from pydbus import SessionBus
from pydbus.generic import signal
from gi.repository import GLib
import himitsu
import dhcrypto
loop = GLib.MainLoop()
bus = SessionBus()
hiq = himitsu.Client()
SESSIONS_NS = '/org/freedesktop/secrets/sessions/'
class Collection(object):
'''
<node>
<interface name="org.freedesktop.Secret.Collection">
<property name="Items" type="ao" access="read" />
<property name="Private" type="s" access="read" />
<property name="Label" type="s" access="readwrite" />
<property name="Locked" type="b" access="read" />
<property name="Created" type="t" access="read" />
<property name="Modified" type="t" access="read" />
<method name="Delete">
<arg name="prompt" type="o" direction="out" />
<arg type="o" name="prompt" direction="out" />
</method>
<method name="SearchItems">
<arg name="attributes" type="a{ss}" direction="in" />
<arg name="results" type="ao" direction="out" />
<arg type="a{ss}" name="attributes" direction="in" />
<arg type="ao" name="results" direction="out" />
</method>
<method name="CreateItem">
<arg name="properties" type="a{sv}" direction="in" />
<arg name="secret" type="(sayay)" direction="in" />
<arg name="replace" type="b" direction="in" />
<arg name="item" type="o" direction="out" />
<arg name="prompt" type="o" direction="out" />
<arg type="a{sv}" name="properties" direction="in" />
<arg type="(oayays)" name="secret" direction="in" />
<arg type="b" name="replace" direction="in" />
<arg type="o" name="item" direction="out" />
<arg type="o" name="prompt" direction="out" />
</method>
<signal name="ItemCreated">
<arg name="item" type="o" />
<arg type="o" name="item" />
</signal>
<signal name="ItemDeleted">
<arg name="item" type="o" />
<arg type="o" name="item" />
</signal>
<signal name="ItemChanged">
<arg type="o" name="item" />
</signal>
<property type="ao" name="Items" access="read" />
<property type="s" name="Label" access="readwrite" />
<property type="b" name="Locked" access="read" />
<property type="t" name="Created" access="read" />
<property type="t" name="Modified" access="read" />
</interface>
</node>
'''
@ -59,58 +64,142 @@ class Collection(object):
ItemCreated = signal()
ItemDeleted = signal()
ItemChanged = signal()
class Service(object):
'''
<node>
<interface name="org.freedesktop.Secret.Service">
<property name="Collections" type="ao" access="read" />
<property name="DefaultCollection" type="o" access="readwrite" />
<method name="OpenSession">
<arg name="result" type="o" direction="out" />
<arg type="s" name="algorithm" direction="in" />
<arg type="v" name="input" direction="in" />
<arg type="v" name="output" direction="out" />
<arg type="o" name="result" direction="out" />
</method>
<method name="CreateCollection">
<arg name="label" type="s" direction="in" />
<arg name="private" type="b" direction="in" />
<arg type="a{sv}" name="properties" direction="in" />
<arg type="s" name="alias" direction="in" />
<arg type="o" name="collection" direction="out" />
<arg type="o" name="prompt" direction="out" />
</method>
<method name="SearchItems">
<arg type="a{ss}" name="attributes" direction="in" />
<arg type="ao" name="unlocked" direction="out" />
<arg type="ao" name="locked" direction="out" />
</method>
<method name="Unlock">
<arg type="ao" name="objects" direction="in" />
<arg type="ao" name="unlocked" direction="out" />
<arg type="o" name="prompt" direction="out" />
</method>
<method name="Lock">
<arg type="ao" name="objects" direction="in" />
<arg type="ao" name="locked" direction="out" />
<arg type="o" name="Prompt" direction="out" />
</method>
<method name="LockService" />
<method name="SearchCollections">
<arg name="fields" type="a{ss}" direction="in" />
<arg name="results" type="ao" direction="out" />
<arg name="locked" type="ao" direction="out" />
<method name="ChangeLock">
<arg type="o" name="collection" direction="in" />
<arg type="o" name="prompt" direction="out" />
</method>
<method name="RetrieveSecrets">
<arg name="items" type='as' direction='in' />
<arg name="secrets" type='ao' direction='out' />
<method name="GetSecrets">
<arg type="ao" name="items" direction="in" />
<arg type="o" name="session" direction="in" />
<arg type="a{o(oayays)}" name="secrets" direction="out" />
</method>
<method name="ReadAlias">
<arg type="s" name="name" direction="in" />
<arg type="o" name="collection" direction="out" />
</method>
<method name="SetAlias">
<arg type="s" name="name" direction="in" />
<arg type="o" name="collection" direction="in" />
</method>
<signal name="CollectionCreated">
<arg name="collection" type="o" />
<arg type="o" name="collection" />
</signal>
<signal name="CollectionDeleted">
<arg name="collection" type="o" />
<arg type="o" name="collection" />
</signal>
<signal name="CollectionChanged">
<arg type="o" name="collection" />
</signal>
<property type="ao" name="Collections" access="read" />
</interface>
<node name="collection" />
</node>
'''
sessions = []
sessions = {}
keycache = []
def OpenSession(self):
print('OpenSession')
pass
def OpenSession(self, algorithm, input):
print('OpenSession', algorithm, input)
session_id = len(self.sessions)
session_path = f'{SESSIONS_NS}{str(session_id)}'
session = dhcrypto.Session()
self.sessions[session_id] = session
if algorithm == "plain":
return (GLib.Variant('s', 'plain'), session_path)
if algorithm == "dh-ietf1024-sha256-aes128-cbc-pkcs7":
pubkey = int.from_bytes(input, 'big')
session.set_client_public_key(pubkey)
return (GLib.Variant('ay', session.get_my_public_key()), session_path)
# if algorithm != "plain":
# # i hate this
# exc = NotSupported("org.freedesktop.DBus.Error.NotSupported")
# type(exc).__name__ = 'org.freedesktop.DBus.Error.NotSupported'
# raise exc
raise "you're not supposed to be here."
return (None, None)
def LockService(self):
pass
def SearchCollections(self, fields):
print('SearchCollections', fields)
return ([], [])
def SearchItems(self, attributes):
print('SearchItems', attributes)
results = hiq.query(**attributes)
print('SearchItems#results', results)
unlocked = []
locked = []
for res in results:
objID = None
if res in self.keycache:
objID = self.keycache.index(res)
else:
objID = len(self.keycache)
self.keycache.append(res)
objPath = f'/org/freedesktop/secrets/items/{str(objID)}'
if res.has_secret():
locked.append(objPath)
else:
unlocked.append(objPath)
print('SearchItems#locked', locked)
print('SearchItems#unlocked', unlocked)
return (locked, unlocked)
def GetSecrets(self, items, session_path):
print('GetSecrets', items, session_path)
ret = {}
for path in items:
key = self.keycache[int(path.replace('/org/freedesktop/secrets/items/', ''))]
session = self.sessions[int(session_path.replace(SESSIONS_NS, ''))]
dec_key = hiq.query_decrypt(**key.to_query())
# uhhhhhh himitsu can return multiple keys here, narrow it down to whichever has a secret
dec_key = next(filter(lambda k: k.has_secret(), dec_key))
secret = dec_key.get_secret().encode('utf-8')
# handle encryption if necessary
iv, enc = session.encrypt(secret)
ret[path] = (session_path, iv, enc, 'text/plain')
print('GetSecrets#ret', ret)
return ret
@property
def Collections(self):
@ -123,6 +212,7 @@ class Service(object):
CollectionChanged = signal()
def entrypoint():
print('Starting kakushi...')
bus.publish(
"org.freedesktop.secrets", Service(),
('collection/default', Collection())