commit 2780e3cc5d15827eddb95912e02ed6180ee449a1 Author: ptrcnull Date: Thu Apr 1 19:30:52 2021 +0200 feat: Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bee8a64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__ diff --git a/api.py b/api.py new file mode 100644 index 0000000..55f9bff --- /dev/null +++ b/api.py @@ -0,0 +1,162 @@ +import binascii +import serial +from dataclasses import dataclass +from typing import List +from messaging.sms import SmsDeliver, SmsSubmit +from time import sleep + + +class ModemException(Exception): + pass + +class InvalidResponseException(ModemException): + def __init__(self, message): + super().__init__('Invalid response from modem: ' + message) + + +def b(value): + if isinstance(value, str): + return value.encode('utf-8') + return value + +def encode_utf16(value): + if isinstance(value, bytes): + value = value.decode('utf-8') + return binascii.hexlify(value.encode('utf-16-be')) + +def decode_utf16(value): + return bytes.fromhex(value).decode('utf-16-be').replace('\x1b\x14', '^') + +def parse_response(line): + values = line.split(': ')[1].split(',') + # because FUCKING AT HAS COMMAS IN STRINGS + for i in range(len(values)): + value = values[i] + if len(value) > 0 and value[0] == '"' and value[-1] != '"': + values[i] = values[i] + ',' + values[i+1] + values[i+1] = None + values = filter(lambda x: x is not None, values) + + def mapper(value): + value = value.strip() + if len(value) == 0: + return None + if value.isnumeric(): + return int(value) + if value[0] == '"' and value[-1] == '"': + return value[1:-1] + return list(map(mapper, values)) + + +class Modem: + def __init__(self, port): + self.serial = serial.Serial(port, 460800, timeout=0.1) + self.reset() + + def send(self, data): + data = b(data) # encode as bytes (if string) + if not data.endswith(b'\r'): # add \r to the end of message + data += b'\r' + print('req', data) + self.serial.write(data) # write to the modem + res = self.serial.readall() # get modem's response + print('res', res) + res = res[len(data):] # remove echo + res = res.strip() # remove whitespace + return res.decode('utf-8') # decode to string + + def reset(self): + res = self.send('ATZ') + if res != 'OK': + raise InvalidResponseException(res) + self.set('CMGF', 0) + + def set(self, property, *params, **kwargs): + msg = b'AT+' + b(property) + b'=' + for param in params: + if isinstance(param, str): + msg += b'"' + param.encode('utf-8') + b'"' + if isinstance(param, int): + msg += str(param).encode('utf-8') + if isinstance(param, bytes): + msg += b'"' + param + b'"' + msg += b',' + # trim the last comma + msg = msg[:-1] + res = self.send(msg) + # accept '>' because of CMGS and '' because it may be already set + # also accept when response starts with + because it has data + if res not in ('OK', '>', '') and res[0] != '+': + raise InvalidResponseException(res) + return res + + def query(self, property): + msg = b'AT+' + b(property) + b'?' + res = self.send(msg) + return parse_response(res.split('\n')[0]) + + def send_sms(self, number, message): + submit = SmsSubmit(number, message) + csca = self.query('CSCA')[0] + if '+' in csca: + submit.csca = csca + else: + submit.csca = decode_utf16(csca) + for pdu in submit.to_pdu(): + self.set('CMGS', int(pdu.length)) + res = self.send(pdu.pdu.encode('utf-8') + b'\x1a') + while '+CMGS: ' not in res: + sleep(1) + res = self.serial.readall().decode('utf-8') + + def list_sms(self): + res = self.set('CMGL', 4) + lines = res.split('\r\n') + messages = [] + print(lines) + + for i in range(int(len(lines)/2)): + if lines[2*i] == '': + continue + header = parse_response(lines[2*i]) + content = lines[2*i+1] + + sms = SMS(content) + sms.mid = header[0] + sms.parts = [] + sms.__modem__ = self + + messages.append(sms) + + for i in range(len(messages)): + message = messages[i] + if message is None: + continue + if message.udh is None or message.udh.concat is None: + continue + for j in range(i+1, len(messages)): + next_message = messages[j] + if next_message.udh is None or next_message.udh.concat is None: + continue + if message.udh.concat.ref == next_message.udh.concat.ref: + message.text += next_message.text + message.parts.append(next_message.mid) + messages[j] = None + + messages = list(filter(lambda x: x is not None, messages)) + + return messages + + def remove_sms(self, sms): + self.set('CMGD', sms.mid, 0) + for part in sms.parts: + self.set('CMGD', part, 0) + + +class SMS(SmsDeliver): + mid: int + parts: List[int] + __modem__: Modem + + def remove(self): + __modem__.remove_sms(self) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..a68a8f2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +python-messaging==0.5.13 +pyserial diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..e7d63da --- /dev/null +++ b/shell.nix @@ -0,0 +1,20 @@ +{ pkgs ? import {} }: + +pkgs.stdenv.mkDerivation { + name = "env"; + buildInputs = [ + pkgs.usb_modeswitch + pkgs.python38 + pkgs.python38Packages.pyserial + (pkgs.python38Packages.buildPythonPackage rec { + pname = "python-messaging"; + version = "0.5.13"; + src = pkgs.python38Packages.fetchPypi { + inherit pname version; + sha256 = "183rxrmf8rrm5hw20b4kvzaq18254pmawjbai86nw557gm2x33y0"; + }; + doCheck = false; + }) + ]; +} +