smsapi/api.py

163 lines
4.2 KiB
Python

import binascii
import serial
from dataclasses import dataclass
from typing import List
from messaging.sms import SmsDeliver, SmsSubmit
from time import sleep
class ModemException(Exception):
pass
class InvalidResponseException(ModemException):
def __init__(self, message):
super().__init__('Invalid response from modem: ' + message)
def b(value):
if isinstance(value, str):
return value.encode('utf-8')
return value
def encode_utf16(value):
if isinstance(value, bytes):
value = value.decode('utf-8')
return binascii.hexlify(value.encode('utf-16-be'))
def decode_utf16(value):
return bytes.fromhex(value).decode('utf-16-be').replace('\x1b\x14', '^')
def parse_response(line):
values = line.split(': ')[1].split(',')
# because FUCKING AT HAS COMMAS IN STRINGS
for i in range(len(values)):
value = values[i]
if len(value) > 0 and value[0] == '"' and value[-1] != '"':
values[i] = values[i] + ',' + values[i+1]
values[i+1] = None
values = filter(lambda x: x is not None, values)
def mapper(value):
value = value.strip()
if len(value) == 0:
return None
if value.isnumeric():
return int(value)
if value[0] == '"' and value[-1] == '"':
return value[1:-1]
return list(map(mapper, values))
class Modem:
def __init__(self, port):
self.serial = serial.Serial(port, 460800, timeout=0.1)
self.reset()
def send(self, data):
data = b(data) # encode as bytes (if string)
if not data.endswith(b'\r'): # add \r to the end of message
data += b'\r'
print('req', data)
self.serial.write(data) # write to the modem
res = self.serial.readall() # get modem's response
print('res', res)
res = res[len(data):] # remove echo
res = res.strip() # remove whitespace
return res.decode('utf-8') # decode to string
def reset(self):
self.send('ATE1')
res = self.send('ATZ')
if res != 'OK':
raise InvalidResponseException(res)
self.set('CMGF', 0)
def set(self, property, *params, **kwargs):
msg = b'AT+' + b(property) + b'='
for param in params:
if isinstance(param, str):
msg += b'"' + param.encode('utf-8') + b'"'
if isinstance(param, int):
msg += str(param).encode('utf-8')
if isinstance(param, bytes):
msg += b'"' + param + b'"'
msg += b','
# trim the last comma
msg = msg[:-1]
res = self.send(msg)
# accept '>' because of CMGS and '' because it may be already set
# also accept when response starts with + because it has data
if res not in ('OK', '>', '') and res[0] != '+':
raise InvalidResponseException(res)
return res
def query(self, property):
msg = b'AT+' + b(property) + b'?'
res = self.send(msg)
return parse_response(res.split('\n')[0])
def send_sms(self, number, message):
submit = SmsSubmit(number, message)
csca = self.query('CSCA')[0]
if '+' in csca:
submit.csca = csca
else:
submit.csca = decode_utf16(csca)
for pdu in submit.to_pdu():
self.set('CMGS', int(pdu.length))
res = self.send(pdu.pdu.encode('utf-8') + b'\x1a')
while '+CMGS: ' not in res:
sleep(1)
res = self.serial.readall().decode('utf-8')
def list_sms(self):
res = self.set('CMGL', 4)
lines = res.split('\r\n')
messages = []
print(lines)
for i in range(int(len(lines)/2)):
if lines[2*i] == '':
continue
header = parse_response(lines[2*i])
content = lines[2*i+1]
sms = SMS(content)
sms.mid = header[0]
sms.parts = []
sms.__modem__ = self
messages.append(sms)
for i in range(len(messages)):
message = messages[i]
if message is None:
continue
if message.udh is None or message.udh.concat is None:
continue
for j in range(i+1, len(messages)):
next_message = messages[j]
if next_message.udh is None or next_message.udh.concat is None:
continue
if message.udh.concat.ref == next_message.udh.concat.ref:
message.text += next_message.text
message.parts.append(next_message.mid)
messages[j] = None
messages = list(filter(lambda x: x is not None, messages))
return messages
def remove_sms(self, sms):
self.set('CMGD', sms.mid, 0)
for part in sms.parts:
self.set('CMGD', part, 0)
class SMS(SmsDeliver):
mid: int
parts: List[int]
__modem__: Modem
def remove(self):
self.__modem__.remove_sms(self)