import binascii import serial from dataclasses import dataclass from typing import List from messaging.sms import SmsDeliver, SmsSubmit from time import sleep class ModemException(Exception): pass class InvalidResponseException(ModemException): def __init__(self, message): super().__init__('Invalid response from modem: ' + message) def b(value): if isinstance(value, str): return value.encode('utf-8') return value def encode_utf16(value): if isinstance(value, bytes): value = value.decode('utf-8') return binascii.hexlify(value.encode('utf-16-be')) def decode_utf16(value): return bytes.fromhex(value).decode('utf-16-be').replace('\x1b\x14', '^') def parse_response(line): values = line.split(': ')[1].split(',') # because FUCKING AT HAS COMMAS IN STRINGS for i in range(len(values)): value = values[i] if len(value) > 0 and value[0] == '"' and value[-1] != '"': values[i] = values[i] + ',' + values[i+1] values[i+1] = None values = filter(lambda x: x is not None, values) def mapper(value): value = value.strip() if len(value) == 0: return None if value.isnumeric(): return int(value) if value[0] == '"' and value[-1] == '"': return value[1:-1] return list(map(mapper, values)) class Modem: def __init__(self, port): self.serial = serial.Serial(port, 460800, timeout=0.1) self.reset() def send(self, data): data = b(data) # encode as bytes (if string) if not data.endswith(b'\r'): # add \r to the end of message data += b'\r' print('req', data) self.serial.write(data) # write to the modem res = self.serial.readall() # get modem's response print('res', res) res = res[len(data):] # remove echo res = res.strip() # remove whitespace return res.decode('utf-8') # decode to string def reset(self): self.send('ATE1') res = self.send('ATZ') if res != 'OK': raise InvalidResponseException(res) self.set('CMGF', 0) def set(self, property, *params, **kwargs): msg = b'AT+' + b(property) + b'=' for param in params: if isinstance(param, str): msg += b'"' + param.encode('utf-8') + b'"' if isinstance(param, int): msg += str(param).encode('utf-8') if isinstance(param, bytes): msg += b'"' + param + b'"' msg += b',' # trim the last comma msg = msg[:-1] res = self.send(msg) # accept '>' because of CMGS and '' because it may be already set # also accept when response starts with + because it has data if res not in ('OK', '>', '') and res[0] != '+': raise InvalidResponseException(res) return res def query(self, property): msg = b'AT+' + b(property) + b'?' res = self.send(msg) return parse_response(res.split('\n')[0]) def send_sms(self, number, message): submit = SmsSubmit(number, message) csca = self.query('CSCA')[0] if '+' in csca: submit.csca = csca else: submit.csca = decode_utf16(csca) for pdu in submit.to_pdu(): self.set('CMGS', int(pdu.length)) res = self.send(pdu.pdu.encode('utf-8') + b'\x1a') while '+CMGS: ' not in res: sleep(1) res = self.serial.readall().decode('utf-8') def list_sms(self): res = self.set('CMGL', 4) lines = res.split('\r\n') messages = [] print(lines) for i in range(int(len(lines)/2)): if lines[2*i] == '': continue header = parse_response(lines[2*i]) content = lines[2*i+1] sms = SMS(content) sms.mid = header[0] sms.parts = [] sms.__modem__ = self messages.append(sms) for i in range(len(messages)): message = messages[i] if message is None: continue if message.udh is None or message.udh.concat is None: continue for j in range(i+1, len(messages)): next_message = messages[j] if next_message.udh is None or next_message.udh.concat is None: continue if message.udh.concat.ref == next_message.udh.concat.ref: message.text += next_message.text message.parts.append(next_message.mid) messages[j] = None messages = list(filter(lambda x: x is not None, messages)) return messages def remove_sms(self, sms): self.set('CMGD', sms.mid, 0) for part in sms.parts: self.set('CMGD', part, 0) class SMS(SmsDeliver): mid: int parts: List[int] __modem__: Modem def remove(self): self.__modem__.remove_sms(self)