162 lines
4.2 KiB
Python
162 lines
4.2 KiB
Python
import binascii
|
|
import serial
|
|
from dataclasses import dataclass
|
|
from typing import List
|
|
from messaging.sms import SmsDeliver, SmsSubmit
|
|
from time import sleep
|
|
|
|
|
|
class ModemException(Exception):
|
|
pass
|
|
|
|
class InvalidResponseException(ModemException):
|
|
def __init__(self, message):
|
|
super().__init__('Invalid response from modem: ' + message)
|
|
|
|
|
|
def b(value):
|
|
if isinstance(value, str):
|
|
return value.encode('utf-8')
|
|
return value
|
|
|
|
def encode_utf16(value):
|
|
if isinstance(value, bytes):
|
|
value = value.decode('utf-8')
|
|
return binascii.hexlify(value.encode('utf-16-be'))
|
|
|
|
def decode_utf16(value):
|
|
return bytes.fromhex(value).decode('utf-16-be').replace('\x1b\x14', '^')
|
|
|
|
def parse_response(line):
|
|
values = line.split(': ')[1].split(',')
|
|
# because FUCKING AT HAS COMMAS IN STRINGS
|
|
for i in range(len(values)):
|
|
value = values[i]
|
|
if len(value) > 0 and value[0] == '"' and value[-1] != '"':
|
|
values[i] = values[i] + ',' + values[i+1]
|
|
values[i+1] = None
|
|
values = filter(lambda x: x is not None, values)
|
|
|
|
def mapper(value):
|
|
value = value.strip()
|
|
if len(value) == 0:
|
|
return None
|
|
if value.isnumeric():
|
|
return int(value)
|
|
if value[0] == '"' and value[-1] == '"':
|
|
return value[1:-1]
|
|
return list(map(mapper, values))
|
|
|
|
|
|
class Modem:
|
|
def __init__(self, port):
|
|
self.serial = serial.Serial(port, 460800, timeout=0.1)
|
|
self.reset()
|
|
|
|
def send(self, data):
|
|
data = b(data) # encode as bytes (if string)
|
|
if not data.endswith(b'\r'): # add \r to the end of message
|
|
data += b'\r'
|
|
print('req', data)
|
|
self.serial.write(data) # write to the modem
|
|
res = self.serial.readall() # get modem's response
|
|
print('res', res)
|
|
res = res[len(data):] # remove echo
|
|
res = res.strip() # remove whitespace
|
|
return res.decode('utf-8') # decode to string
|
|
|
|
def reset(self):
|
|
res = self.send('ATZ')
|
|
if res != 'OK':
|
|
raise InvalidResponseException(res)
|
|
self.set('CMGF', 0)
|
|
|
|
def set(self, property, *params, **kwargs):
|
|
msg = b'AT+' + b(property) + b'='
|
|
for param in params:
|
|
if isinstance(param, str):
|
|
msg += b'"' + param.encode('utf-8') + b'"'
|
|
if isinstance(param, int):
|
|
msg += str(param).encode('utf-8')
|
|
if isinstance(param, bytes):
|
|
msg += b'"' + param + b'"'
|
|
msg += b','
|
|
# trim the last comma
|
|
msg = msg[:-1]
|
|
res = self.send(msg)
|
|
# accept '>' because of CMGS and '' because it may be already set
|
|
# also accept when response starts with + because it has data
|
|
if res not in ('OK', '>', '') and res[0] != '+':
|
|
raise InvalidResponseException(res)
|
|
return res
|
|
|
|
def query(self, property):
|
|
msg = b'AT+' + b(property) + b'?'
|
|
res = self.send(msg)
|
|
return parse_response(res.split('\n')[0])
|
|
|
|
def send_sms(self, number, message):
|
|
submit = SmsSubmit(number, message)
|
|
csca = self.query('CSCA')[0]
|
|
if '+' in csca:
|
|
submit.csca = csca
|
|
else:
|
|
submit.csca = decode_utf16(csca)
|
|
for pdu in submit.to_pdu():
|
|
self.set('CMGS', int(pdu.length))
|
|
res = self.send(pdu.pdu.encode('utf-8') + b'\x1a')
|
|
while '+CMGS: ' not in res:
|
|
sleep(1)
|
|
res = self.serial.readall().decode('utf-8')
|
|
|
|
def list_sms(self):
|
|
res = self.set('CMGL', 4)
|
|
lines = res.split('\r\n')
|
|
messages = []
|
|
print(lines)
|
|
|
|
for i in range(int(len(lines)/2)):
|
|
if lines[2*i] == '':
|
|
continue
|
|
header = parse_response(lines[2*i])
|
|
content = lines[2*i+1]
|
|
|
|
sms = SMS(content)
|
|
sms.mid = header[0]
|
|
sms.parts = []
|
|
sms.__modem__ = self
|
|
|
|
messages.append(sms)
|
|
|
|
for i in range(len(messages)):
|
|
message = messages[i]
|
|
if message is None:
|
|
continue
|
|
if message.udh is None or message.udh.concat is None:
|
|
continue
|
|
for j in range(i+1, len(messages)):
|
|
next_message = messages[j]
|
|
if next_message.udh is None or next_message.udh.concat is None:
|
|
continue
|
|
if message.udh.concat.ref == next_message.udh.concat.ref:
|
|
message.text += next_message.text
|
|
message.parts.append(next_message.mid)
|
|
messages[j] = None
|
|
|
|
messages = list(filter(lambda x: x is not None, messages))
|
|
|
|
return messages
|
|
|
|
def remove_sms(self, sms):
|
|
self.set('CMGD', sms.mid, 0)
|
|
for part in sms.parts:
|
|
self.set('CMGD', part, 0)
|
|
|
|
|
|
class SMS(SmsDeliver):
|
|
mid: int
|
|
parts: List[int]
|
|
__modem__: Modem
|
|
|
|
def remove(self):
|
|
__modem__.remove_sms(self)
|