-
Notifications
You must be signed in to change notification settings - Fork 88
Commit
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,7 @@ | |
import threading | ||
import argparse | ||
|
||
|
||
# TODO: cleanup getkey function | ||
if sys.platform == "win32": | ||
def getkey(): | ||
import msvcrt | ||
|
@@ -30,27 +30,22 @@ def getkey(): | |
return c | ||
|
||
|
||
def character(b): | ||
return b.decode('latin1') | ||
|
||
sfl_prompt_req = b"F7: boot from serial\n" | ||
sfl_prompt_ack = b"\x06" | ||
|
||
sfl_prompt_req = "F7: boot from serial\n" | ||
sfl_prompt_ack = "\x06" | ||
|
||
sfl_magic_req = "sL5DdSMmkekro\n" | ||
sfl_magic_ack = "z6IHG7cYDID6o\n" | ||
sfl_magic_req = b"sL5DdSMmkekro\n" | ||
sfl_magic_ack = b"z6IHG7cYDID6o\n" | ||
|
||
# General commands | ||
sfl_cmd_abort = 0x00 | ||
sfl_cmd_load = 0x01 | ||
sfl_cmd_jump = 0x02 | ||
|
||
sfl_cmd_abort = b"\x00" | ||
sfl_cmd_load = b"\x01" | ||
sfl_cmd_jump = b"\x02" | ||
|
||
# Replies | ||
sfl_ack_success = 'K' | ||
sfl_ack_crcerror = 'C' | ||
sfl_ack_unknown = 'U' | ||
sfl_ack_error = 'E' | ||
sfl_ack_success = b"K" | ||
sfl_ack_crcerror = b"C" | ||
sfl_ack_unknown = b"U" | ||
sfl_ack_error = b"E" | ||
|
||
|
||
crc16_table = [ | ||
|
@@ -98,40 +93,18 @@ def crc16(l): | |
|
||
class SFLFrame: | ||
def __init__(self): | ||
self.length = None | ||
self.cmd = None | ||
self.payload = [] | ||
self.crc = None | ||
self.raw = [] | ||
self.cmd = bytes() | ||
self.payload = bytes() | ||
|
||
def compute_crc(self): | ||
crc_data = [] | ||
crc_data.append(self.cmd) | ||
for d in self.payload: | ||
crc_data.append(d) | ||
self.crc = crc16(crc_data) | ||
return self.crc | ||
return crc16(self.cmd + self.payload) | ||
|
||
def encode(self): | ||
self.raw = [] | ||
self.raw.append(self.length) | ||
self.compute_crc() | ||
for d in self.crc.to_bytes(2, "big"): | ||
self.raw.append(d) | ||
self.raw.append(self.cmd) | ||
for d in self.payload: | ||
self.raw.append(d) | ||
|
||
|
||
def get_file_data(filename): | ||
with open(filename, "rb") as f: | ||
data = [] | ||
while True: | ||
w = f.read(1) | ||
if not w: | ||
break | ||
data.append(int.from_bytes(w, "big")) | ||
return data | ||
packet = bytes([len(self.payload)]) | ||
packet += self.compute_crc().to_bytes(2, "big") | ||
packet += self.cmd | ||
packet += self.payload | ||
return packet | ||
|
||
|
||
class Flterm: | ||
|
@@ -143,39 +116,26 @@ def __init__(self, serial_boot, kernel_image, kernel_address): | |
self.reader_alive = False | ||
self.writer_alive = False | ||
|
||
self.detect_prompt_str = " "*len(sfl_prompt_req) | ||
self.detect_magic_str = " "*len(sfl_magic_req) | ||
|
||
def open(self, port, speed): | ||
self.serial = serial.serial_for_url( | ||
port, | ||
baudrate=speed, | ||
bytesize=8, | ||
parity="N", | ||
stopbits=1, | ||
xonxoff=0, | ||
timeout=0.25) | ||
self.serial.flushOutput() | ||
self.serial.flushInput() | ||
self.serial.close() # in case port was not correctly closed | ||
self.serial.open() | ||
self.promp_detect_buffer = bytes(len(sfl_prompt_req)) | ||
This comment has been minimized.
Sorry, something went wrong. |
||
self.magic_detect_buffer = bytes(len(sfl_magic_req)) | ||
|
||
def close(self): | ||
self.serial.close() | ||
def open(self, port, baudrate): | ||
This comment has been minimized.
Sorry, something went wrong.
sbourdeauducq
Member
|
||
if hasattr(self, "port"): | ||
return | ||
self.port = serial.serial_for_url(port, baudrate) | ||
|
||
def write_exact(self, data): | ||
if isinstance(data, str): | ||
self.serial.write(bytes(data, "utf-8")) | ||
else: | ||
self.serial.write(serial.to_bytes(data)) | ||
def close(self): | ||
if not hasattr(self, "port"): | ||
return | ||
self.port.close() | ||
del self.port | ||
|
||
def send_frame(self, frame): | ||
frame.encode() | ||
retry = 1 | ||
while retry: | ||
self.write_exact(frame.raw) | ||
self.port.write(frame.encode()) | ||
# Get the reply from the device | ||
reply = character(self.serial.read()) | ||
reply = self.port.read() | ||
if reply == sfl_ack_success: | ||
retry = 0 | ||
elif reply == sfl_ack_crcerror: | ||
|
@@ -186,22 +146,20 @@ def send_frame(self, frame): | |
return 1 | ||
|
||
def upload(self, filename, address): | ||
data = get_file_data(filename) | ||
with open(filename, "rb") as f: | ||
data = f.read() | ||
print("[FLTERM] Uploading {} ({} bytes)...".format(filename, len(data))) | ||
current_address = address | ||
position = 0 | ||
length = len(data) | ||
start = time.time() | ||
while len(data) != 0: | ||
while len(data): | ||
print("{}%\r".format(100*position//length), end="") | ||
frame = SFLFrame() | ||
frame_data = data[:251] | ||
frame.length = len(frame_data) + 4 | ||
frame.cmd = sfl_cmd_load | ||
for d in current_address.to_bytes(4, "big"): | ||
frame.payload.append(d) | ||
for d in frame_data: | ||
frame.payload.append(d) | ||
frame.payload = current_address.to_bytes(4, "big") | ||
frame.payload += frame_data | ||
if self.send_frame(frame) == 0: | ||
return | ||
current_address += len(frame_data) | ||
|
@@ -218,46 +176,48 @@ def upload(self, filename, address): | |
def boot(self): | ||
print("[FLTERM] Booting the device.") | ||
frame = SFLFrame() | ||
frame.length = 4 | ||
frame.cmd = sfl_cmd_jump | ||
for d in self.kernel_address.to_bytes(4, "big"): | ||
frame.payload.append(d) | ||
frame.payload = self.kernel_address.to_bytes(4, "big") | ||
self.send_frame(frame) | ||
|
||
def detect_prompt(self, data): | ||
if data is not "": | ||
self.detect_prompt_str = self.detect_prompt_str[1:] + data | ||
return self.detect_prompt_str == sfl_prompt_req | ||
if len(data): | ||
self.promp_detect_buffer = self.promp_detect_buffer[1:] + data | ||
return self.promp_detect_buffer == sfl_prompt_req | ||
else: | ||
return False | ||
|
||
def answer_prompt(self): | ||
print("[FLTERM] Received serial boot prompt from the device.") | ||
self.write_exact(sfl_prompt_ack) | ||
self.port.write(sfl_prompt_ack) | ||
|
||
def detect_magic(self, data): | ||
if data is not "": | ||
self.detect_magic_str = self.detect_magic_str[1:] + data | ||
return self.detect_magic_str == sfl_magic_req | ||
if len(data): | ||
self.magic_detect_buffer = self.magic_detect_buffer[1:] + data | ||
return self.magic_detect_buffer == sfl_magic_req | ||
else: | ||
return False | ||
|
||
def answer_magic(self): | ||
print("[FLTERM] Received firmware download request from the device.") | ||
if os.path.exists(self.kernel_image): | ||
self.write_exact(sfl_magic_ack) | ||
self.port.write(sfl_magic_ack) | ||
self.upload(self.kernel_image, self.kernel_address) | ||
self.boot() | ||
print("[FLTERM] Done."); | ||
|
||
def reader(self): | ||
try: | ||
while self.reader_alive: | ||
c = character(self.serial.read()) | ||
if c == '\r': | ||
sys.stdout.write('\n') | ||
c = self.port.read() | ||
if c == b"\r": | ||
sys.stdout.write(b"\n") | ||
else: | ||
sys.stdout.write(c) | ||
try: | ||
# TODO: cleanup | ||
This comment has been minimized.
Sorry, something went wrong.
sbourdeauducq
Member
|
||
sys.stdout.write(c.decode()) | ||
except: | ||
pass | ||
sys.stdout.flush() | ||
|
||
if self.kernel_image is not None: | ||
|
@@ -286,14 +246,13 @@ def writer(self): | |
try: | ||
b = getkey() | ||
except KeyboardInterrupt: | ||
b = serial.to_bytes([3]) | ||
c = character(b) | ||
if c == chr(0x03): | ||
b = b"\x03" | ||
if b == b"\x03": | ||
This comment has been minimized.
Sorry, something went wrong.
sbourdeauducq
Member
|
||
self.stop() | ||
elif c == '\n': | ||
self.serial.write(serial.to_bytes([10])) | ||
elif b == b"\n": | ||
self.port.write(b"\x0a") | ||
else: | ||
self.serial.write(b) | ||
self.port.write(b) | ||
except: | ||
self.writer_alive = False | ||
raise | ||
|
@@ -343,6 +302,7 @@ def main(): | |
flterm.join(True) | ||
except KeyboardInterrupt: | ||
pass | ||
flterm.close() | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
|
||
|
||
if __name__ == "__main__": | ||
|
prompt