Source code for Gramophone

import os
import struct
import threading
from collections import namedtuple
from random import randint, sample
from threading import Thread
from time import sleep, time

import usb.core
import usb.backend.libusb1

DIR = os.path.dirname(__file__)

def background(fn):
    """ Decorator for functions that should run in the background. """
    def run(*k, **kw):
        t = threading.Thread(target=fn, args=k, kwargs=kw)
        t.start()
        return t
    return run

def find_devices():
    """ Return a dict of Gramophone devices with their serials as keys. """
    backend = usb.backend.libusb1.get_backend(find_library=lambda x: DIR+"/libusb-1.0.dll")
    devs = usb.core.find(backend=backend, idVendor=0x0483, idProduct=0x5750, find_all=True)

    devices = {}
    for dev in devs:
        G = Gramophone(dev, False)
        G.read_product_info()
        G.read_firmware_info()
        if G.product_info['name'] == 'GRAMO-01':
            ser = G.product_info['serial']
            devices[ser] = G

    return devices

[docs]class Packet(object): """ A 64 byte data packet that can be sent to the Gramophone. """ msn = 0 def __init__(self, target, source, cmd, payload, msn=None): self.target = target self.source = source self.cmd = [cmd] self.payload = payload if msn is None: self.msn = [Packet.msn] Packet.msn += 1 Packet.msn %= 256 else: self.msn = [msn] def __repr__(self): return 'Packet(target={}, source={}, msn={}, cmd={}, payload={})'.format( self.target, self.source, self.msn, self.cmd, self.payload) @property def plen(self): return len(self.payload) @property def filler(self): return [0] * (65-self.plen-8) @property def encoded(self): return self.target + self.source + self.msn + self.cmd +\ [self.plen] + self.payload + self.filler @classmethod def from_array(cls, array): array = list(array) target = array[0:2] source = array[2:4] msn = array[4] cmd = array[5] plen = array[6] payload = array[7:7+plen] return cls(target, source, cmd, payload, msn=msn)
[docs]class Gramophone(object): """ Representation of a Gramophone device. :param device: The USB device identity :type device: usb.core.Device """ error_codes = {0x00: 'PACKET_FAIL_UNKNOWNCMD', 0x01: 'PACKET_FAIL_INVALIDCMDSYNTAX', 0x04: 'PACKET_FAIL_INVALIDPARAMSYNTAX', 0x05: 'PACKET_FAIL_RANGEERROR', 0x06: 'PACKET_FAIL_PARAMNOTFOUND', 0x07: 'PACKET_FAIL_VALIDFAIL', 0x08: 'PACKET_FAIL_ACCESSVIOLATION'} Parameter = namedtuple('Parameter', ['name', 'info', 'type']) parameters = {0x01: Parameter('VSEN3V3', 'Voltage on 3.3V rail.', 'float'), 0x02: Parameter('VSEN5V', 'Voltage on 5V rail.', 'float'), 0x03: Parameter('TSENMCU', 'Internal teperature of the MCU.', 'float'), 0x04: Parameter('TSENEXT', 'External temperature sensor on the board.', 'float'), 0x0A: Parameter('SENSORS', 'The voltages and temperatures in one parameter', 'combo'), 0x0B: Parameter('VOLTAGES', 'The voltages in one parameter', 'combo'), 0x0C: Parameter('TEMPS', 'The temperatures in one parameter', 'combo'), 0x05: Parameter('TIME', 'The time of the internal clock of the device.', 'uint64'), 0x10: Parameter('ENCPOS', 'Encoder position.', 'int32'), 0x11: Parameter('ENCVEL', 'Encoder velocity.', 'vel'), 0x12: Parameter('ENCVELWIN', 'Encoder velocity window size.', 'uint16'), 0x13: Parameter('ENCHOME', 'Encoder homing.', 'uint8'), 0x14: Parameter('ENCHOMEPOS', 'Encoder home position.', 'int32'), 0x20: Parameter('DI-1', 'Digital input 1.', 'uint8'), 0x21: Parameter('DI-2', 'Digital input 2.', 'uint8'), 0x25: Parameter('DI', 'Digital inputs.', 'combo'), 0x30: Parameter('DO-1', 'Digital output 1.', 'uint8'), 0x31: Parameter('DO-2', 'Digital output 2.', 'uint8'), 0x32: Parameter('DO-3', 'Digital output 3.', 'uint8'), 0x33: Parameter('DO-4', 'Digital output 4.', 'uint8'), 0x35: Parameter('DO', 'Digital outputs.', 'combo'), 0x40: Parameter('AO', 'Analogue output.', 'float'), 0xFF: Parameter('LED', 'LED state changed', 'uint8'), 0xAA: Parameter('REC', 'Bundle of parameters for the Recorder module.', 'combo'), 0xBB: Parameter('LINM', 'Bundle of parameters for the LinMaze module.', 'combo') } type_lengths = {'float': 4, 'double': 8, 'uint8': 1, 'uint16': 2, 'uint32': 4, 'uint64': 8, 'int8': 1, 'int16': 2, 'int32': 4, 'int64': 8, 'vel': 5 } combos = { 0x0A:[0x01, 0x02, 0x03, 0x04], # Sensors 0x0B:[0x01, 0x02], # Voltages 0x0C:[0x03, 0x04], # Temperatures 0x25:[0x20, 0x21], # Digital inputs 0x35:[0x30, 0x31, 0x32, 0x33], # Digital outputs 0xAA:[0x05, 0x11, 0x20, 0x21, 0x30, 0x31, 0x32, 0x33], # Time, vel, io 0xBB:[0x05, 0x10, 0x20, 0x21, 0x30, 0x31, 0x32, 0x33] # Time, pos, io } device_states = {0x00: 'IAP', 0x01: 'Application'} def __init__(self, device, verbose=False): self.device = device self.device.set_configuration() self.verbose = verbose self.target = [randint(0x00, 0xFF), randint(0x00, 0xFF)] self.source = [randint(0x00, 0xFF), randint(0x00, 0xFF)] self.firmware_info = None self.product_info = None self.dev_state = 'Unknown' self.bursting = {1: False, 2: False, 3: False, 4: False} self.readers = [] def decode_payload(self, param_id, payload): if self.parameters[param_id].type == 'float': return struct.unpack('f', bytes(payload))[0] if self.parameters[param_id].type in ['int8', 'int16', 'int32', 'int64']: return int.from_bytes(payload, 'little', signed=True) if self.parameters[param_id].type in ['uint8', 'uint16', 'uint32', 'uint64']: return int.from_bytes(payload, 'little', signed=False) if self.parameters[param_id].type == 'vel': return struct.unpack('f', bytes(payload[0:4]))[0]*float(payload[4]) if self.parameters[param_id].type == 'combo': I = 0 values = {} for elemet_id in self.combos[param_id]: e_type = self.parameters[elemet_id].type length = self.type_lengths[e_type] element = self.decode_payload(elemet_id, payload[I:I+length]) I += length values[elemet_id] = element return values def read_input(self, input_id): return self.read_param(0x20+input_id-1) def read_inputs(self): return self.read_params(0x25) def read_output(self, output_id): return self.read_param(0x30+output_id-1) def read_outputs(self): return self.read_params(0x35) def read_analog_out(self): return self.read_param(0x40)
[docs] def read_sensors(self): """ Returns the values read from the sensors in a dict with the parameter ids as keys. """ return self.read_params(0x0A)
def read_voltages(self): return self.read_params(0x0B) def read_temperatures(self): return self.read_params(0x0C) def read_time(self): return self.read_param(0x05) def read_position(self): return self.read_param(0x10) def read_velocity(self): return self.read_param(0x11) def read_window_size(self): return self.read_param(0x12)
[docs] def read_homing_state(self): """ 0 if the encoder is not trying to find the home position, 1 if it is homing and 2 if the home position was found. """ return self.read_param(0x13)
[docs] def read_homing_poition(self): """ The home postion that can be found by homing. """ return self.read_param(0x14)
[docs] def read_firmware_info(self): """ Read the firmware information from the Gramophone. Sets the fimware related variables of the object. :returns: A dictionary with the firmware info fields in a human readable format. :rtype: dict """ ask_firmware = Packet(self.target, self.source, 0x04, []) firmware_packet = self.send(ask_firmware) payload = firmware_packet.payload firmware_release = payload[0] firmware_sub = payload[1] firmware_build = int.from_bytes(payload[2:4], 'little', signed=False) firmware_year = int.from_bytes( payload[4:6], 'little', signed=False) firmware_month = payload[6] firmware_day = payload[7] firmware_hour = payload[8] firmware_minute = payload[9] firmware_second = payload[10] info = { 'release': str(firmware_release)+'.'+str(firmware_sub), 'build': str(firmware_build), 'date': str(firmware_day)+'/'+str(firmware_month).zfill(2)+'/'+str(firmware_year).zfill(2), 'time': str(firmware_hour)+':'+str(firmware_minute)+':'+str(firmware_second), 'date_format': 'dd/mm/yyyy', 'time_format': '24h' } if self.verbose: print('Firmware version:') print(' -Relase:', info['release']) print(' -Build:', info['build']) print(' -Date ({}): {}'.format(info['date_format'], info['date'])) print(' -Time ({}): {}'.format(info['time_format'], info['time'])) self.firmware_info = info return info
def read_product_info(self): ask_product_info = Packet(self.target, self.source, 0x08, []) product_info_packet = self.send(ask_product_info) payload = product_info_packet.payload product_name = ''.join([chr(byte) for byte in payload[0:18] if byte != 0x00]) product_revision = ''.join([chr(byte) for byte in payload[18:24]]) product_serial = int.from_bytes(payload[24:28], 'little', signed=False) product_year = int.from_bytes(payload[28:30], 'little', signed=False) product_month = int.from_bytes(payload[30:31], 'little', signed=False) product_day = int.from_bytes(payload[31:32], 'little', signed=False) info = { 'name': product_name, 'revision': product_revision, 'serial': product_serial, 'production': str(product_day).zfill(2)+'/'+str(product_month).zfill(2)+'/'+str(product_year), 'production_format':'dd/mm/yyyy' } if self.verbose: print('Product info:') print(' -Name:', info['name']) print(' -Revision:', info['revision'] ) print(' -Serial:', hex(info['serial'])) print(' -Production ({}): {}'.format(info['production_format'], info['production'])) self.product_info = info return info def read_recorder_params(self): return self.read_params(0xAA) def read_linmaze_params(self): return self.read_params(0xBB)
[docs] def read_dev_state(self): """ Read the state of the device. The device should be in 0x01 state for usage. The 0x00 state is for setup. :returns: The device state. 'Application' or 'IAP' :rtype: str """ ask_dev_state = Packet(self.target, self.source, 0x05, []) dev_state = self.send(ask_dev_state) self.dev_state = dev_state.payload[0] state = self.device_states[self.dev_state] if self.verbose: print('Device state:', state) return state
def write_output(self, output, value): self.write_param(0x30+output-1, [int(value)]) def write_analog(self, value): self.write_param(0x40, list(struct.pack('f', value)))
[docs] def ping(self): """ Send a ping packet with 5 bytes and print the time the process took. """ rdata = sample(range(0, 255), 5) ping_time = time() ping_packet = Packet(self.target, self.source, 0x00, rdata) pong_packet = self.send(ping_packet) took = (time()-ping_time)*1000 print('Ping!', ping_packet.payload) print('Pong!', pong_packet.payload) print('Took:', took, 'ms')
[docs] def reset(self): """ Reset the device. Returns None if successful and the error string otherwise. """ reset_command = Packet(self.target, self.source, 0xF0, []) response = self.send(reset_command) err = self.decode_response(response) if self.verbose: if err is None: print('Reset successful...') else: print(err) return err
[docs] def decode_response(self, response): """ Decodes a response. Returns the error message if the command was not successful and None otherwise. :param response: The response to decode. :ptype response: Packet """ if response.cmd[0] == 0x01: return None if response.cmd[0] == 0x02: return self.error_codes[response.payload[0]]
def reset_time(self): self.write_param(0x05, [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) def reset_position(self): self.write_param(0x10, [0x00, 0x00, 0x00, 0x00]) def read_param(self, param_id): ask_param = Packet(self.target, self.source, 0x0B, [param_id]) payload = self.send(ask_param).payload if payload is not None: val = self.decode_payload(param_id, payload) if self.verbose: print(self.parameters[param_id].name, '=', val) return val def read_params(self, combo_id): ask_params = Packet(self.target, self.source, 0x0B, self.combos[combo_id]) payload = self.send(ask_params).payload values = self.decode_payload(combo_id, payload) if self.verbose: for key in self.combos[combo_id]: print(self.parameters[key].name, '=', values[key]) return values def write_param(self, param, payload): set_param = Packet(self.target, self.source, 0x0C, [param]+payload) response = self.send(set_param) if self.verbose: if response.cmd[0] == 0x01: print('Writing', self.parameters[param].name, 'succeeded.') if response.cmd[0] == 0x02: print('Writing', self.parameters[param].name, 'failed.', self.error_codes[response.payload[0]])
[docs] def send(self, packet): """ Sends a Packet to the device. :param packet: The Packet to send. :ptype packet: Packet """ try: self.device.write(0x01, packet.encoded) while True: resp = Packet.from_array(self.device.read(0x81, 64)) if resp.target == self.source and \ resp.source == self.target and \ resp.msn == packet.msn: return resp except usb.core.USBError as usb_error: raise GramophoneError(usb_error)
@background def start_burst(self, port, on_time, pause_time): self.bursting[port] = True while self.bursting[port]: self.write_output(port, 1) sleep(on_time) self.write_output(port, 0) sleep(pause_time) def stop_burst(self, port): self.bursting[port] = False
[docs]class GramophoneError(Exception): """ Exception for Gramophone related communication errors. """ pass