Fri, 27 Dec 2013 11:56:22 +0100
addedd car activity packet to receiver in car firmware
""" FreeSlot project Blackbox communication library """ import serial, sys, string, time, binascii # how often should a command retried when busy? RETRIES = 10 # define loglevels DEBUG = 20 LOGLEVEL = 20 def log(level, msg): """ Logging output function """ if level <= LOGLEVEL: print msg class SerialCommunicator(): def __init__(self, device, speed): self.device = device self.speed = speed self.com = None self.connected = False def connect(self): if self.connected: return True try: self.com = serial.Serial(self.device, baudrate=self.speed, xonxoff=0, timeout=0.1) except serial.SerialException, err: print err sys.exit(1) self.connected = True return True def disconnect(self): self.com = None return True def read(self, size = 1): return self.com.read(size) def write(self, msg, getanswer=False): self.com.write(msg + "\n") if getanswer: return self.readline() return None def readline(self): answer = self.com.readline() return string.strip(answer, "\n") def query(self, msg): retry = 0 response = self.write(msg, True) while (retry < RETRIES) and (response == "BUSY"): time.sleep(0.1) response = self.write(msg, True) retry += 1 log( DEBUG, "%i> %s\n< %s" % (retry, msg, response) ) return response class Blackbox(): def __init__(self): self.com = None self.info = None self.log = open("serial.log", "w") def readline(self): if not self.com: return "" # TODO: Binaerprotokoll implementieren und als "alte" ASCII Antwort zurueckgeben! # line = self.com.readline() if line == "F:": # parse binary fuel info datalen = ord(self.com.read(1)) if datalen != 7: self.log.write("F: ERROR, incorrect length header") return "" data = self.com.read(7) if len(data) != 7: self.log.write("F: ERROR LEN%i = %s\n" % (datalen, repr(data))) return "" slot = ord(data[0]) fuel = (ord(data[1]) * 256) + ord(data[2]) clk = (ord(data[3]) * 256*256*256) + (ord(data[4]) * 256*256) + (ord(data[5]) * 256) + ord(data[6]) self.com.readline() # clear to next linefeed line = "F:%i:%x:%x\n" % (slot, fuel, clk) elif line == "L:": # parse binary lap info datalen = ord(self.com.read(1)) if datalen != 12: self.log.write("L: ERROR, incorrect length header") return "" data = self.com.read(12) if len(data) != 12: self.log.write("L: ERROR LEN%i = %s\n" % (datalen, repr(data))) return "" track = ord(data[0]) laps = (ord(data[1]) * 256) + ord(data[2]) slot = ord(data[3]) diff = (ord(data[4]) * 256*256*256) + (ord(data[5]) * 256*256) + (ord(data[6]) * 256) + ord(data[7]) clk = (ord(data[8]) * 256*256*256) + (ord(data[9]) * 256*256) + (ord(data[10]) * 256) + ord(data[11]) self.com.readline() # clear to next linefeed line = "L:%i:%x:%i:%x:%x\n" % (track, laps, slot, diff, clk) elif line == "RW:": # parse binary responsewire info datalen = ord(self.com.read(1)) if datalen != 8: self.log.write("RW: ERROR, incorrect length header") return "" data = self.com.read(8) if len(data) != 8: self.log.write("RW: ERROR LEN%i = %s\n" % (datalen, repr(data))) return "" slot = ord(data[0]) track = ord(data[1]) sender = ord(data[2]) status = ord(data[3]) clk = (ord(data[4]) * 256*256*256) + (ord(data[5]) * 256*256) + (ord(data[6]) * 256) + ord(data[7]) self.com.readline() # clear to next linefeed line = "RW:%i:%i:%i:%i:%x\n" % (slot, track, sender, status, clk) self.log.write(line) if line.find(chr(0)) == -1: return line else: self.log.write("malformed RX\n") return "" def query(self, msg): if self.com: return self.com.query(msg) return "" def connect(self, device="/dev/ttyUSB0", speed=57600): # old connection speed 57600 if self.com == None: self.com = SerialCommunicator(device, speed) if self.com.connected: self.com.disconnect() self.com.connect() self.info = self.readinfo() def disconnect(self): self.com.disconnect() def readinfo(self): """ Read complete Information from connected box This does not include race+car status! """ return None def progcar(self, carid, command, value): """ Send program packets to specified car id valid command: speed, brake, fuel valid value: 4 bit integer (0..15) valid carid: 0..5 """ if (carid < 0) or (carid > 5): return "ERR - invalid carid" cmd = -1 if command == "accel": cmd = 0 if command == "brake": cmd = 1 if command == "fuel": cmd = 2 if (cmd == -1): return "ERR - invalid command" if (value<0) or (value>15): return "ERR - invalid value" if command == "accel" and value < 6: return "ERR - value too low" # transform value 10..15 to A..F if (value>9): value = chr(ord("A") + (value-10)) command = "P%i%s%i" % (cmd, value, carid) response = self.com.query( command ) return response def blinkcar(self, carid, blink): """ Set car blinking state """ if (carid < 0) or (carid > 5): return "ERR - invalid carid" if blink: return self.com.query( "P48%i" % carid ) else: return self.com.query( "P40%i" % carid ) def unlimitedfuel(self, carid, boolval): """ Set unlimited fuel on/off for a car """ if (carid < 0) or (carid > 5): return "ERR - invalid carid" value = 0 if boolval: value = 1 return self.com.query( "U%i%i" % (carid, value) ) def speedlimit(self, carid, value): """ Set the maximum controller speed for a car Attention: this is software limited, this does not affect car acceleration! """ if (carid < 0) or (carid > 5): return "ERR - invalid carid" if (value<0) or (value>15): return "ERR - invalid value" # transform value 10..15 to A..F if (value>9): value = chr(ord("A") + (value-10)) return self.com.query( "L%i%s" % (carid, value) ) def speedminimum(self, carid, value): """ Set the minimzm controller speed for a car """ if (carid < 0) or (carid > 5): return "ERR - invalid carid" if (value<0) or (value>15): return "ERR - invalid value" # transform value 10..15 to A..F if (value>9): value = chr(ord("A") + (value-10)) return self.com.query( "S%i%s" % (carid, value) ) def fueldivisor(self, value): """ Set the minimzm controller speed for a car """ if (value<0) or (value>255): return "ERR - invalid value" return self.com.query( "F:%s" % (value) ) def setmode(self, mode): """ Switch the Blackbox mode Valid modes are: idle, prepare, race note: box will permanently send status info in race mode, so no polling is required """ return True def getmode(self): self.readinfo() return self.info["mode"]