Skip to content

Commit

Permalink
tools/flterm.py: first cleanup pass
Browse files Browse the repository at this point in the history
enjoy-digital committed Feb 19, 2016
1 parent 3d92e3f commit 0e44624
Showing 1 changed file with 61 additions and 101 deletions.
162 changes: 61 additions & 101 deletions misoc/tools/flterm.py
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@
import threading
import argparse


# TODO: cleanup getkey function
if sys.platform == "win32":
def getkey():
import msvcrt
@@ -30,27 +30,22 @@ def getkey():
return c


def character(b):
return b.decode('latin1')

sfl_prompt_req = b"F7: boot from serial\n"
sfl_prompt_ack = b"\x06"

sfl_prompt_req = "F7: boot from serial\n"
sfl_prompt_ack = "\x06"

sfl_magic_req = "sL5DdSMmkekro\n"
sfl_magic_ack = "z6IHG7cYDID6o\n"
sfl_magic_req = b"sL5DdSMmkekro\n"
sfl_magic_ack = b"z6IHG7cYDID6o\n"

# General commands
sfl_cmd_abort = 0x00
sfl_cmd_load = 0x01
sfl_cmd_jump = 0x02

sfl_cmd_abort = b"\x00"
sfl_cmd_load = b"\x01"
sfl_cmd_jump = b"\x02"

# Replies
sfl_ack_success = 'K'
sfl_ack_crcerror = 'C'
sfl_ack_unknown = 'U'
sfl_ack_error = 'E'
sfl_ack_success = b"K"
sfl_ack_crcerror = b"C"
sfl_ack_unknown = b"U"
sfl_ack_error = b"E"


crc16_table = [
@@ -98,40 +93,18 @@ def crc16(l):

class SFLFrame:
def __init__(self):
self.length = None
self.cmd = None
self.payload = []
self.crc = None
self.raw = []
self.cmd = bytes()
self.payload = bytes()

def compute_crc(self):
crc_data = []
crc_data.append(self.cmd)
for d in self.payload:
crc_data.append(d)
self.crc = crc16(crc_data)
return self.crc
return crc16(self.cmd + self.payload)

def encode(self):
self.raw = []
self.raw.append(self.length)
self.compute_crc()
for d in self.crc.to_bytes(2, "big"):
self.raw.append(d)
self.raw.append(self.cmd)
for d in self.payload:
self.raw.append(d)


def get_file_data(filename):
with open(filename, "rb") as f:
data = []
while True:
w = f.read(1)
if not w:
break
data.append(int.from_bytes(w, "big"))
return data
packet = bytes([len(self.payload)])
packet += self.compute_crc().to_bytes(2, "big")
packet += self.cmd
packet += self.payload
return packet


class Flterm:
@@ -143,39 +116,26 @@ def __init__(self, serial_boot, kernel_image, kernel_address):
self.reader_alive = False
self.writer_alive = False

self.detect_prompt_str = " "*len(sfl_prompt_req)
self.detect_magic_str = " "*len(sfl_magic_req)

def open(self, port, speed):
self.serial = serial.serial_for_url(
port,
baudrate=speed,
bytesize=8,
parity="N",
stopbits=1,
xonxoff=0,
timeout=0.25)
self.serial.flushOutput()
self.serial.flushInput()
self.serial.close() # in case port was not correctly closed
self.serial.open()
self.promp_detect_buffer = bytes(len(sfl_prompt_req))

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

prompt

self.magic_detect_buffer = bytes(len(sfl_magic_req))

def close(self):
self.serial.close()
def open(self, port, baudrate):

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

Small detail - you can probably merge this into __init__, and then the hasattr(self, "port") in this code and in close becomes unnecessary.

if hasattr(self, "port"):
return
self.port = serial.serial_for_url(port, baudrate)

def write_exact(self, data):
if isinstance(data, str):
self.serial.write(bytes(data, "utf-8"))
else:
self.serial.write(serial.to_bytes(data))
def close(self):
if not hasattr(self, "port"):
return
self.port.close()
del self.port

def send_frame(self, frame):
frame.encode()
retry = 1
while retry:
self.write_exact(frame.raw)
self.port.write(frame.encode())
# Get the reply from the device
reply = character(self.serial.read())
reply = self.port.read()
if reply == sfl_ack_success:
retry = 0
elif reply == sfl_ack_crcerror:
@@ -186,22 +146,20 @@ def send_frame(self, frame):
return 1

def upload(self, filename, address):
data = get_file_data(filename)
with open(filename, "rb") as f:
data = f.read()
print("[FLTERM] Uploading {} ({} bytes)...".format(filename, len(data)))
current_address = address
position = 0
length = len(data)
start = time.time()
while len(data) != 0:
while len(data):
print("{}%\r".format(100*position//length), end="")
frame = SFLFrame()
frame_data = data[:251]
frame.length = len(frame_data) + 4
frame.cmd = sfl_cmd_load
for d in current_address.to_bytes(4, "big"):
frame.payload.append(d)
for d in frame_data:
frame.payload.append(d)
frame.payload = current_address.to_bytes(4, "big")
frame.payload += frame_data
if self.send_frame(frame) == 0:
return
current_address += len(frame_data)
@@ -218,46 +176,48 @@ def upload(self, filename, address):
def boot(self):
print("[FLTERM] Booting the device.")
frame = SFLFrame()
frame.length = 4
frame.cmd = sfl_cmd_jump
for d in self.kernel_address.to_bytes(4, "big"):
frame.payload.append(d)
frame.payload = self.kernel_address.to_bytes(4, "big")
self.send_frame(frame)

def detect_prompt(self, data):
if data is not "":
self.detect_prompt_str = self.detect_prompt_str[1:] + data
return self.detect_prompt_str == sfl_prompt_req
if len(data):
self.promp_detect_buffer = self.promp_detect_buffer[1:] + data
return self.promp_detect_buffer == sfl_prompt_req
else:
return False

def answer_prompt(self):
print("[FLTERM] Received serial boot prompt from the device.")
self.write_exact(sfl_prompt_ack)
self.port.write(sfl_prompt_ack)

def detect_magic(self, data):
if data is not "":
self.detect_magic_str = self.detect_magic_str[1:] + data
return self.detect_magic_str == sfl_magic_req
if len(data):
self.magic_detect_buffer = self.magic_detect_buffer[1:] + data
return self.magic_detect_buffer == sfl_magic_req
else:
return False

def answer_magic(self):
print("[FLTERM] Received firmware download request from the device.")
if os.path.exists(self.kernel_image):
self.write_exact(sfl_magic_ack)
self.port.write(sfl_magic_ack)
self.upload(self.kernel_image, self.kernel_address)
self.boot()
print("[FLTERM] Done.");

def reader(self):
try:
while self.reader_alive:
c = character(self.serial.read())
if c == '\r':
sys.stdout.write('\n')
c = self.port.read()
if c == b"\r":
sys.stdout.write(b"\n")
else:
sys.stdout.write(c)
try:
# TODO: cleanup

This comment has been minimized.

Copy link
@sbourdeauducq

This comment has been minimized.

Copy link
@enjoy-digital

enjoy-digital Feb 19, 2016

Author Contributor

thanks!

sys.stdout.write(c.decode())
except:
pass
sys.stdout.flush()

if self.kernel_image is not None:
@@ -286,14 +246,13 @@ def writer(self):
try:
b = getkey()
except KeyboardInterrupt:
b = serial.to_bytes([3])
c = character(b)
if c == chr(0x03):
b = b"\x03"
if b == b"\x03":

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

What is this for? simply handling KeyboardInterrupt? What about using a non-bytes value e.g. None instead of b"\x03"?

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

Or no value at all, can't you call self.stop in the exception handler?

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

And then exit the loop with break?

This comment has been minimized.

Copy link
@enjoy-digital

enjoy-digital Feb 19, 2016

Author Contributor

In fact we directly catch the keyboard interrupt with getkey, so I simplified the code. (just kept the b = getkey())

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

How do we "catch the keyboard interrupt"? Ctrl-C is character 0x03 and that gets passed directly?
This could be an unreliable assumption, special characters are, as a general rule, a terrible mess and they vary widely from platform to platform, or from terminal program to terminal program.

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

(It might actually work, but we are deep in the rainforest here, so expect bugs)

This comment has been minimized.

Copy link
@enjoy-digital

enjoy-digital Feb 19, 2016

Author Contributor

While testing I was not able to enter the KeyboardInterrupt exception (Ctrl-C was catched by getkey()).
But if you see a better way to do that I can implement it.

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

How did the C flterm do it?

This comment has been minimized.

Copy link
@enjoy-digital

enjoy-digital Feb 19, 2016

Author Contributor

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

I don't think so, more likely it sets different termios options that don't swallow Ctrl-C.

This comment has been minimized.

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

Seems you just need to remove:

        new[6][termios.VMIN] = 1
        new[6][termios.VTIME] = 0
self.stop()
elif c == '\n':
self.serial.write(serial.to_bytes([10]))
elif b == b"\n":
self.port.write(b"\x0a")
else:
self.serial.write(b)
self.port.write(b)
except:
self.writer_alive = False
raise
@@ -343,6 +302,7 @@ def main():
flterm.join(True)
except KeyboardInterrupt:
pass
flterm.close()

This comment has been minimized.

Copy link
@sbourdeauducq

sbourdeauducq Feb 19, 2016

Member

flterm = FlTerm(...) try: ... finally: flterm.close()

This comment has been minimized.

Copy link
@enjoy-digital

enjoy-digital Feb 19, 2016

Author Contributor

Applied



if __name__ == "__main__":

0 comments on commit 0e44624

Please sign in to comment.