Source code for flexsea.device

import ctypes as c
from pathlib import Path
import sys
from time import sleep
from typing import List

from semantic_version import Version

import flexsea.utilities.constants as fxc
from flexsea.utilities.decorators import minimum_required_version
from flexsea.utilities.decorators import requires_device_not
from flexsea.utilities.decorators import requires_status
from flexsea.utilities.decorators import training_warn
from flexsea.utilities.decorators import validate
from flexsea.utilities.firmware import decode_firmware
from flexsea.utilities.firmware import validate_given_firmware_version
from flexsea.utilities.library import get_c_library
from flexsea.utilities.library import set_read_functions
from flexsea.utilities.specs import get_device_spec


# ============================================
#                    Device
# ============================================
[docs]class Device: """ Representation of one of Dephy's devices. Serves as a way to send commands to -- and read data from -- a Dephy device. Communication is done through a COM port (either bluetooth or serial). This class is essentially a Python wrapper around lower-level C/C++ code that has been pre-compiled into a shared library. These library files are stored on S3 and downloaded lazily. They are referenced by a semantic version string specified in the ``firmwareVersion`` constructor argument. Available versions can be listed with :py:func:`flexsea.utilities.firmware.get_available_firmware_versions` Parameters ---------- firmwareVersion : str Semantic version string of the firmware currently on Manage. Used to load the correct pre-compiled C library for communicating with the device. If the full version string, e.g., "10.7.0" is not given, e.g, "10" or "10.7", the string will be expanded to a full version string. If an exact match cannot be found, you will be prompted to use the latest version sharing your version's major version. port : str The name of the communication port the device is connected to. On Windows, this is usually something akin to ``COM3``. On Linux, it is usually something akin to ``/dev/ttyACM0``. On Windows, you can use the Device Manager to search for the port and on Linux you can use the ``ls`` command on ``/dev/ttyACM*`` baudRate : int, optional The communication rate expected by the device in bauds. The default value is 230400, which is the current rate used by all Dephy devices. libFile : str, optional ``flexsea`` serves as a wrapper around pre-compiled C/C++ libraries. Normally, these libraries are downloaded from S3, but in the event that you want to use a custom, local file, you can use this argument to specify the path to that file. logLevel : int, optional Describes the verbosity of the log files created by the device. Can be in the range [0,6], with 0 being the most verbose and 6 disabling logging. interactive : bool, optional There are certain scenarios where, if this is set to ``True``, you will be prompted for confirmation. If ``False``, you will not be prompted and the code will simply proceed. This mostly has to do with using a different library version than the one specified if an exact match for the specified version could not be found. The default value is ``True``. debug : bool, optional Controls the traceback level. If ``False`` (the default), then the traceback limit is set to 0. If ``True``, Python's default traceback limit is used. s3Timeout : int, optional Time, in seconds, spent trying to connect to S3 before an exception is raised. stopMotorOnDisconnect : bool, optional If ``True``, ``stop_motor`` is called by ``close`` (which, in turn, is called by the desctructor). If ``False``, ``stop_motor`` is **not** called by ``close`` or the destructor. The default value is ``False``. This is useful for on-device controllers (controllers that are baked into the device firmware), so that, should the device become disconnected from the computer it is streaming data to, the controller will not suddenly shut off and cause the wearer to potentially fall. If this is ``True``, you must call ``stop_motor`` manually. Attributes ---------- connected : bool If ``True``, a connection has been established with the device over the serial port. streaming : bool If ``True``, the device is currently sending data at the specified rate (:py:attr:`streamingFrequency`) port : str The value of ``port`` passed to the constructor. interactive : bool The value of ``interactive`` passed to the constructor. firmwareVersion : semantic_version.Version The value of ``firmwareVersion`` passed to the constructor. libFile : str The value of ``libFile`` passed to the constructor. baudRate : int The value of ``baudRate`` passed to the constructor. logLevel : int The value of ``logLevel`` passed to the constructor. heartbeat : int How frequently the device should check for a connection to the computer. See: :py:meth:`start_streaming` id : int The decimal id of the device. streamingFrequency : int The frequency (in Hz) at which the device is sending data. See: :py:meth:`start_streaming` bootloaderActive firmwareVersion uvlo hasHabs name side num_utts gains remaining_training_steps success failure invalidParam invalidDevice libVersion isLegacy Examples -------- >>> Device("7.2.0", "COM3") >>> Device("10", "/dev/ttyACM0", logLevel=6, interactive=False) """ # ----- # constructor # ----- def __init__( self, firmwareVersion: str, port: str, baudRate: int = fxc.baudRate, libFile: str = "", logLevel: int = 4, interactive: bool = True, debug: bool = False, s3Timeout: int = 60, stopMotorOnDisconnect: bool = False, ) -> None: if not debug: sys.tracebacklimit = 0 fxc.dephyPath.mkdir(parents=True, exist_ok=True) self.port: str = port self.interactive = interactive self._stopMotorOnDisconnect = stopMotorOnDisconnect self.firmwareVersion = validate_given_firmware_version( firmwareVersion, self.interactive, s3Timeout ) if libFile: self.libFile = Path(libFile).expanduser().absolute() if not self.libFile.is_file(): raise FileNotFoundError(f"Could not find library: {self.libFile}") else: self.libFile = None if baudRate <= 0: raise ValueError("Error: baud rate must be positive.") self.baudRate = baudRate if logLevel < 0 or logLevel > 6: raise ValueError("Log level must be in [0, 6].") self.logLevel = logLevel self.heartbeat: int = 0 self.id: int = 0 self.streamingFrequency: int = 0 (self._clib, self.libFile) = get_c_library( self.firmwareVersion, self.libFile, s3Timeout ) self._fields: List[str] | None = None self._gains: dict = {} self._hasHabs: bool | None = None self._name: str = "" self._side: str = "" self._state: c.Structure | None = None self._stateType: c.Structure | None = None if self.firmwareVersion < fxc.legacyCutoff: self._SUCCESS = c.c_int(0) self._FAILURE = c.c_int(1) self._INVALID_PARAM = c.c_int(2) self._INVALID_DEVICE = c.c_int(3) self._NOT_STREAMING = c.c_int(4) self._isLegacy = True self._libVersion = "undefined" else: self._UNDEFINED = c.c_int(0) self._SUCCESS = c.c_int(1) self._FAILURE = c.c_int(2) self._INVALID_PARAM = c.c_int(3) self._INVALID_DEVICE = c.c_int(4) self._NOT_STREAMING = c.c_int(5) self._isLegacy = False self._libVersion = self._get_lib_version() print(f"Using firmware version: {self.firmwareVersion}") print(f"Using library file: {self.libFile}") # ----- # open # -----
[docs] def open(self, bootloading: bool = False) -> None: """ Establish a connection to a device. This is needed in order to send commands to the device and/or receive data from the device via serial communication through the COM port. Parameters ---------- bootloading : bool (optional) This keyword is really onlymeant to be used by the bootloader and a user of ``flexsea`` should not have to use it at all. Starting with v12.0.0, a development version number was introduced. We can only connect to the device if both the firmware version (e.g., 12.0.0) and the development version (e.g., 2.0.0) match. If only the firmware version matches, we can connect to the device, but not send motor commands, only the bootloading command. """ if self.connected: print("Already connected.") return port = self.port.encode("utf-8") if bootloading and self.firmwareVersion >= Version("12.0.0"): try: self.id = self._clib.fxOpenLimited(port) except AttributeError as err: msg = "Error, unable to connect to device. Your library is missing " msg += "the `fxOpenLimited` function." raise RuntimeError(msg) from err else: self.id = self._clib.fxOpen(port, self.baudRate, self.logLevel) if self.id in (self._INVALID_DEVICE.value, -1): raise RuntimeError("Failed to connect to device.") self._name = self.name self._side = self.side self._hasHabs = self._name not in fxc.noHabs self._get_info_for_reading()
# ----- # _get_info_for_reading # ----- def _get_info_for_reading(self) -> None: if self._isLegacy: self._get_state() else: self._get_fields() self._clib = set_read_functions( self._clib, self._name, self._isLegacy, self._stateType ) # ----- # _get_state # ----- def _get_state(self) -> None: stateSpec = get_device_spec(self._name, self.firmwareVersion) class LegacyDeviceState(c.Structure): _pack_ = 1 _fields_ = [(k, getattr(c, v)) for k, v in stateSpec.items()] self._stateType = LegacyDeviceState self._state = LegacyDeviceState() # ----- # _get_fields # ----- def _get_fields(self) -> None: maxFields = self._clib.fxGetMaxDataElements() maxFieldLength = self._clib.fxGetMaxDataLabelLength() nLabels = c.c_int() # https://code.activestate.com/lists/python-list/704158 labels = (c.POINTER(c.c_char) * maxFields)() for i in range(maxFields): labels[i] = c.create_string_buffer(maxFieldLength) retCode = self._clib.fxGetDataLabelsWrapper(self.id, labels, c.byref(nLabels)) if retCode != self._SUCCESS.value: raise RuntimeError("Could not get device field labels.") # Convert the labels from chars to python strings fields = [""] * nLabels.value if self._fields is None: self._fields = [] for i in range(nLabels.value): for j in range(maxFieldLength): fields[i] += labels[i][j].decode("utf8") fields[i] = fields[i].strip("\x00") self._fields.append(fields[i]) # ----- # close # -----
[docs] def close(self) -> None: """ Severs connection with device. Does not stop the motor by default. To stop the motor on a call to ``close``, Will no longer be able to send commands or receive data. """ if self.connected or self.streaming: if self._stopMotorOnDisconnect: self.stop_motor() # fxClose calls fxStopStreaming for us retCode = self._clib.fxClose(self.id) if retCode != self._SUCCESS.value: raise RuntimeError("Failed to close connection.")
# ----- # start_streaming # -----
[docs] @requires_status("connected") def start_streaming( self, frequency: int, heartbeat: int = 0, useSafety: bool = False ) -> None: """ Instructs the device to send data at the given ``frequency``. Parameters ---------- frequency : int The number of times per second the device should send data heartbeat : int When streaming, the computer periodically sends a message to the device to let it know that the connection between them is still alive. These are called heartbeat messages. This variable specifies the amount of time (in milliseconds) between successive heartbeat messages. This is related to how long the device will wait without receiving a heartbeat message before shutting itself off (five times ``heartbeat``). useSafety : bool, optional If ``True``, the device will stop the motor if it doesn't receive a heartbeat message in time. Raises ------ ValueError If ``frequency`` or ``heartbeat`` are invalid. """ if self.streaming: print("Already streaming.") return try: assert frequency > 0 except AssertionError as err: raise ValueError("Frequency must be > 0.") from err self.streamingFrequency = frequency self.heartbeat = heartbeat if useSafety: self._stream_with_safety() else: self._stream_without_safety()
# ----- # _stream_with_safety # ----- @minimum_required_version("9.1.0") @validate def _stream_with_safety(self) -> int: try: assert fxc.minHeartbeat <= self.heartbeat < self.streamingFrequency except AssertionError as err: msg = f"Heartbeat must be in [{fxc.minHeartbeat}, frequency]" raise ValueError(msg) from err return self._clib.fxStartStreamingWithSafety( self.id, self.streamingFrequency, 1, self.heartbeat ) # ----- # _stream_without_safety # ----- @validate def _stream_without_safety(self) -> int: return self._clib.fxStartStreaming(self.id, self.streamingFrequency, 1) # ----- # stop_streaming # -----
[docs] @requires_status("streaming") def stop_streaming(self) -> None: """ Instructs the device to stop sending data. """ retCode = self._clib.fxStopStreaming(self.id) if retCode != self._SUCCESS.value: raise RuntimeError("Failed to stop streaming.")
# ----- # set_gains # -----
[docs] @requires_status("connected") def set_gains(self, kp: int, ki: int, kd: int, k: int, b: int, ff: int) -> None: """ Sets the gains used by PID controllers on the device. Parameters ---------- kp : int Proportional gain. ki : int Integral gain. kd : int Differential gain. k : int Stiffness (used in impedence control only). b : int Damping (used in impedance control only). ff : int Feed forward gain. """ # There is a bug either on the C side or in the firmware where, # sometimes, the gains aren't set, so we try multiple times returnCode = self._FAILURE for _ in range(5): returnCode = self._clib.fxSetGains(self.id, kp, ki, kd, k, b, ff) sleep(0.001) if returnCode != self._SUCCESS.value: raise RuntimeError("Failed to set gains.") self._gains = {"kp": kp, "ki": ki, "kd": kd, "k": k, "b": b, "ff": ff}
# ----- # command_motor_position # -----
[docs] @requires_status("connected") @validate def command_motor_position(self, value: int) -> int: """ Sets motor to given position. Parameters ---------- value : int Desired motor position in encoder units. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ controller = fxc.controllers["position"] return self._clib.fxSendMotorCommand(self.id, controller, value)
# ----- # command_motor_current # -----
[docs] @requires_status("connected") @validate def command_motor_current(self, value: int) -> int: """ Sends the given current to the motor. Parameters ---------- value : int Desired motor current in milli-Amps. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ controller = fxc.controllers["current"] return self._clib.fxSendMotorCommand(self.id, controller, value)
# ----- # command_motor_voltage # -----
[docs] @requires_status("connected") @validate def command_motor_voltage(self, value: int) -> int: """ Sets motor's voltage. Parameters ---------- value : int Desired motor voltage in milli-volts. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ controller = fxc.controllers["voltage"] return self._clib.fxSendMotorCommand(self.id, controller, value)
# ----- # command_motor_impedance # -----
[docs] @requires_status("connected") @validate def command_motor_impedance(self, value: int) -> int: """ Has the motor simulate a stretched from current position to the desired position using the damping gain b and stiffness gain k. Parameters ---------- value : int Desired motor position in ticks. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ controller = fxc.controllers["impedance"] return self._clib.fxSendMotorCommand(self.id, controller, value)
# ----- # stop_motor # -----
[docs] @requires_status("connected") def stop_motor(self) -> int: """ Stops the motor and resets the gains to zero. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ controller = fxc.controllers["none"] retCode = self._clib.fxSendMotorCommand(self.id, controller, 0) if retCode != self._SUCCESS.value: raise RuntimeError("Failed to stop motor.") self._gains = {"kp": 0, "ki": 0, "kd": 0, "k": 0, "b": 0, "ff": 0} return retCode
# ----- # activate_bootloader # -----
[docs] @requires_status("connected") def activate_bootloader(self, target: str) -> None: """ Activates the bootloader for ``target``. Parameters ---------- target : str Bootloader target. Can be: mn, ex, re, or habs. """ # We deliberately aren't using @validate here because we need # access to the IOError in `set_tunnen_mode` targetCode = fxc.bootloaderTargets[target] retVal = self._clib.fxActivateBootloader(self.id, targetCode) if retVal == self._INVALID_DEVICE.value: raise RuntimeError(f"Invalid device ID for: `{target}`.") if retVal != self._SUCCESS.value: raise IOError
# ----- # bootloaderActive # ----- @property @requires_status("connected") def bootloaderActive(self) -> bool: """ Returns whether or not the bootloader is active or not. Returns ------- bool ``True`` if bootloader is active and ``False`` otherwise. """ returnCode = self._clib.fxIsBootloaderActivated(self.id) if returnCode != self._SUCCESS.value: return False return True # ----- # set_tunnel_mode # -----
[docs] @requires_status("connected") def set_tunnel_mode(self, target: str, timeout: int = 30) -> bool: """ Puts Manage into tunnel mode. All communication goes through Manage, so we need to put it into tunnel mode in order to activate the other bootloaders. When bootloading Manage itself, this causes it to reboot in DFU mode. If bootloading fails for any reason once tunnel mode has been set, the device is bricked and must be programmed manually. The exception to this rule is Habs. Parameters ---------- target : str The name of the target to set. Can be: mn, ex, re, habs, bt121, or xbee. timeout : int, optional The number of seconds to wait for confirmation before failing. Returns ------- activated : bool If ``True``, the bootloader was set successfully. If ``False`` then something went wrong. """ activated = False while timeout > 0 and not activated: if timeout % 5 == 0: try: self.activate_bootloader(target) except IOError: pass # Device gets disconnected briefly when Mn resets, so we wait sleep(1) timeout -= 1 activated = self.bootloaderActive return activated
# ----- # firmware_version # ----- @property @requires_status("connected") def firmware_version(self) -> dict: """ Gets the fimware versions of device's MCUs. Returns ------- Dict A dictionary with the semantic version strings of manage, execute, and regulate's firmware. And habs, if applicable. """ self._clib.fxRequestFirmwareVersion(self.id) sleep(5) fw = self._clib.fxGetLastReceivedFirmwareVersion(self.id) fwDict = { "mn": decode_firmware(fw.mn), "ex": decode_firmware(fw.ex), "re": decode_firmware(fw.re), } if self._hasHabs: fwDict["habs"] = decode_firmware(fw.habs) return fwDict # ----- # print # -----
[docs] @requires_status("streaming") def print(self, data: dict | None = None) -> None: """ Reads the data from the device and then prints it to the screen. If data is given, we print that instead. """ if data is None: data = self.read() for key, value in data.items(): print(f"{key} : {value}")
# ----- # read # -----
[docs] @requires_status("streaming") def read(self, allData: bool = False) -> dict | List[dict]: """ Gets data from the data queue. Parameters ---------- allData : bool, optional If ``False`` (the default), then only the most recent entry in the data queue is obtained. If ``True``, then all of the data entries in the queue are obtained. Returns ------- dict, List[dict] If ``allData`` is ``False``, then a dictionary is returned. If ``allData`` is ``True``, then a list of dictionaries is returned. The dictionaries in each case are keyed by the names of the data fields and the values are the values of those fields. For firmware versions prior to ``10.0.0``, the fields are contained in a device specification file, which you can find in ``~/.dephy/legacy_device_spcs``. These spec files are downloaded lazily from AWS. For firmware versions >= ``10.0.0``, the fields are provided by the device itself, which means the list of fields is both device and firmware-version dependent. """ if allData: if self._isLegacy: return self._read_all_legacy() return self._read_all() if self._isLegacy: return self._read_legacy() return self._read()
# ----- # _read_all_legacy # ----- def _read_all_legacy(self) -> List[dict]: qs = self._clib.fxGetReadDataQueueSize(self.id) data = (self._stateType * qs)() allData = [] nRead = self._clib.read_all(self.id, data, qs) for i in range(nRead): # pylint: disable-next=protected-access allData.append({f[0]: getattr(data[i], f[0]) for f in data[i]._fields_}) return allData # ----- # _read_all # ----- def _read_all(self) -> List[dict]: qs = self._clib.fxGetReadDataQueueSize(self.id) data = (c.POINTER(c.c_int32) * qs)() nElements = c.c_int() for i in range(qs): data[i] = (c.c_int32 * len(self._fields))() self._clib.read_all(self.id, data, c.byref(nElements)) try: assert nElements.value == len(self._fields) except AssertionError as err: print("Different number of fields read than expected.") raise err allData = [] for i in range(qs): singleTimeStepData = [] for j in range(nElements.value): singleTimeStepData.append(data[i][j]) allData.append(dict(zip(self._fields, singleTimeStepData))) return allData # ----- # _read_legacy # ----- def _read_legacy(self) -> dict: if self._clib.read(self.id, c.byref(self._state)) != self._SUCCESS.value: raise RuntimeError("Error: read command failed.") # pylint: disable-next=protected-access return {f[0]: getattr(self._state, f[0]) for f in self._state._fields_} # ----- # _read # ----- def _read(self) -> dict: maxDataElements = self._clib.fxGetMaxDataElements() nFields = c.c_int() deviceData = (c.c_int32 * maxDataElements)() retCode = self._clib.read(self.id, deviceData, c.byref(nFields)) if retCode != self._SUCCESS.value: raise RuntimeError("Could not read from device.") try: assert nFields.value == len(self._fields) except AssertionError as err: print("Incorrect number of fields read.") raise err data = [deviceData[i] for i in range(nFields.value)] return dict(zip(self._fields, data)) # ----- # find_poles # -----
[docs] @requires_status("connected") @validate def find_poles(self) -> int: """ Instructs the device to go through the pole-finding process to align the motor correctly. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ if self.interactive: userInput = input( "WARNING: You should not use this function unless you know what " "you are doing!\nProceed?[y/n] " ) if userInput != "y": print("Aborting pole finding.") return self._FAILURE.value msg = "NOTE: Please wait for the process to complete. The motor will stop " msg += "moving when it is done. It usually takes 1-2 minutes to complete." msg += "NOTE: Once the pole-finding procedure is completed, you must " msg += "power cylce the device for the changes to take effect." print(msg) return self._clib.fxFindPoles(self.id)
# ----- # uvlo - getter # -----
[docs] @requires_status("connected") def get_uvlo(self) -> int: """ Gets the currently set UVLO. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ self._clib.fxRequestUVLO(self.id) sleep(5) return self._clib.fxGetLastReceivedUVLO(self.id)
# ----- # uvlo - setter # -----
[docs] @validate def set_uvlo(self, value: int) -> int: """ Sets the UVLO value for the device. `value` needs to be in milli-volts. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ return self._clib.fxSetUVLO(self.id, value)
# ----- # calibrate_imu # -----
[docs] @requires_status("connected") @validate def calibrate_imu(self) -> int: """ Instructs the device to go through the IMU calibration process. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ if self.interactive: userInput = input( "WARNING: You should not use this function unless you know what " "you are doing!\nProceed?[y/n] " ) if userInput != "y": print("Aborting IMU calibration.") return self._FAILURE.value return self._clib.fxSetImuCalibration(self.id)
# ----- # hasHabs # ----- @property @requires_status("connected") def hasHabs(self) -> bool: """ Returns whether or not the device has a Habsolute encoder. Returns ------- bool ``True`` if the device has a Habsolute encoder and ``Faslse`` otherwise. """ return self._hasHabs # ----- # name # ----- @property @requires_status("connected") def name(self) -> str: """ Returns the human-friendly name of the device, e.g., actpack. Returns ------- str The name of the device. """ if self._name: return self._name if self._isLegacy: self._name = self._get_legacy_name() else: self._name = self._get_name() return self._name # ----- # _get_name # ----- def _get_name(self) -> str: maxDeviceNameLength = self._clib.fxGetMaxDeviceNameLength() deviceName = (c.c_char * maxDeviceNameLength)() retCode = self._clib.fxGetDeviceTypeNameWrapper(self.id, deviceName) if retCode != self._SUCCESS.value: raise RuntimeError("Could not get device name.") return deviceName.value.decode("utf8").lower() # ----- # _get_legacy_name # ----- def _get_legacy_name(self) -> str: deviceTypeCode = self._clib.fxGetAppType(self.id) try: devName = fxc.deviceNames[deviceTypeCode] except KeyError as err: raise RuntimeError("Could not get device name.") from err return devName # ----- # side # ----- @property @requires_status("connected") def side(self) -> str: """ Returns the chirality of the device. Returns ------- str Can be 'left', 'right', 'none' (for no chirality), or 'undefined' (for legacy devices that don't know their side information) """ if self._side: return self._side if self._isLegacy: return "undefined" maxDeviceSideLength = self._clib.fxGetMaxDeviceSideNameLength() deviceSide = (c.c_char * maxDeviceSideLength)() retCode = self._clib.fxGetDeviceSideNameWrapper(self.id, deviceSide) if retCode != self._SUCCESS.value: raise RuntimeError("Could not get device side.") side = deviceSide.value.decode("utf8") # If side isn't applicable (for, e.g., an actpack), string is empty self._side = side.lower() if side else "undefined" return self._side # ----- # _get_lib_version # ----- @minimum_required_version("10.0.0") def _get_lib_version(self) -> Version: major = c.c_uint16() minor = c.c_uint16() patch = c.c_uint16() returnCode = self._clib.fxGetLibsVersion( c.byref(major), c.byref(minor), c.byref(patch) ) if returnCode != self._SUCCESS.value: raise RuntimeError("Could not determine c library version.") return Version(f"{major.value}.{minor.value}.{patch.value}") # ----- # num_utts # ----- @property @requires_device_not("actpack") @minimum_required_version("9.1.0") @requires_status("connected") def num_utts(self) -> int: """ The number of available UTT values is saved as #define NUM_UTT_VALS in the C library, so we have this convenience wrapper to access it. Returns ------- int The number of User Testing Tweaks (UTTs) """ if self.firmwareVersion.major == 9: return fxc.nUttsV9 return self._clib.fxGetNumUtts() # ----- # set_all_utts # -----
[docs] @requires_device_not("actpack") @minimum_required_version("9.1.0") @requires_status("connected") @validate def set_all_utts(self, uttVals: List[int]) -> int: """ Takes in a list of integer values and assigns each one to a UTT value. Parameters ---------- uttVals : List[int] List of values, one for each UTT Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ numUtts = self.num_utts nVals = len(uttVals) try: assert nVals <= numUtts except AssertionError as err: raise ValueError("Error: too many UTT values given.") from err data = (c.c_int * nVals)() for i in range(nVals): data[i] = c.c_int(uttVals[i]) return self._clib.fxSetUTT(self.id, data, nVals, c.c_byte(-1))
# ----- # set_utt # -----
[docs] @requires_device_not("actpack") @minimum_required_version("9.1.0") @requires_status("connected") @validate def set_utt(self, uttVal: int, index: int) -> int: """ Sets the value of a single UTT. Parameters ---------- uttVal : int The value the desired UTT will be set to index : int The array index for the desired UTT Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ numUtts = self.num_utts try: assert 0 <= index < numUtts except AssertionError: print("Error: invalid UTT index.") data = (c.c_int * numUtts)() data[index] = c.c_int(uttVal) return self._clib.fxSetUTT(self.id, data, numUtts, c.c_byte(index))
# ----- # reset_utts # -----
[docs] @requires_device_not("actpack") @minimum_required_version("9.1.0") @requires_status("connected") @validate def reset_utts(self) -> int: """ Resets all UTTs to their default values. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ return self._clib.fxSetUTTsToDefault(self.id)
# ----- # save_utts # -----
[docs] @requires_device_not("actpack") @minimum_required_version("9.1.0") @requires_status("connected") @validate def save_utts(self) -> int: """ Saves the current UTT values to the device's internal memory so that they persist across power cycles. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` """ return self._clib.fxSaveUTTToMemory(self.id)
# ----- # read_utts # -----
[docs] @requires_device_not("actpack") @minimum_required_version("9.1.0") @requires_status("connected") def read_utts(self) -> List[int]: """ UTTs are not sent as a part of regular communication, so here we first request that the device send the UTTs to Mn, and then we read them. Returns ------- List[int] The current value of each UTT. """ if self._clib.fxRequestUTT(self.id) != self._SUCCESS.value: raise RuntimeError("Error: could not request UTTs.") # We have to sleep for a bit in order to allow the device time # to fulfill the request sleep(0.25) numUtts = self.num_utts data = (c.c_int * numUtts)() retCode = self._clib.fxGetLastReceivedUTT(self.id, data, numUtts) if retCode != self._SUCCESS.value: raise RuntimeError("Error: could not read UTTs.") return [data[i] for i in range(numUtts)]
# ----- # gains # ----- @property def gains(self) -> dict: """ Returns the currently set gains. Returns ------- dict The names and values of each gain. """ return self._gains # ----- # start_training # -----
[docs] @validate @requires_device_not("actpack") @requires_status("connected") def start_training(self) -> int: """ Activates training mode. When in training mode, the user must take a certain number of steps. This allows the device to learn the user's gait and set the value of several parameters accordingly in order for the device to provide optimal augmentation. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` Notes ----- Training mode is only available if the device is flashed with one of Dephy's controllers. """ self._update_training_data() sleep(0.25) return self._clib.fxStartTraining(self.id)
# ----- # activate_single_user_mode # -----
[docs] @validate @requires_device_not("actpack") @requires_status("connected") def activate_single_user_mode(self) -> int: """ Puts the device into single-user mode. In this mode, training mode runs once and then the parameters are saved. This means that training mode will not re-activate if the device is power-cycled, so the same gait parameters will be used across sessions. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` Notes ----- Training mode is only available if the device is flashed with one of Dephy's controllers. """ self._update_training_data() sleep(0.25) return self._clib.fxUseSavedTraining(self.id)
# ----- # activate_multi_user_mode # -----
[docs] @validate @requires_device_not("actpack") @requires_status("connected") def activate_multi_user_mode(self) -> int: """ Puts the device into multi-user mode. This causes training mode to activate each time the device is power-cycled, so gait parameters do not persist across power cycles. Returns ------- int Status code indicating success or failure. See Also -------- :py:func:`success` :py:func:`failure` Notes ----- Training mode is only available if the device is flashed with one of Dephy's controllers. """ self._update_training_data() sleep(0.25) return self._clib.fxDoNotUseSaveTraining(self.id)
# ----- # remaining_training_steps # ----- @property @requires_device_not("actpack") @requires_status("connected") def remaining_training_steps(self) -> int: """ Returns the number of steps remaining before training is completed. Returns ------- int The number of steps remaining before training is complete. Notes ----- Training mode is only available if the device is flashed with one of Dephy's controllers. """ self._update_training_data() # Give the device time to process the request and send the data sleep(0.25) stepsRemaining = c.c_int() retCode = self._clib.fxGetStepsRemaining(self.id, c.byref(stepsRemaining)) if retCode != self._SUCCESS.value: raise RuntimeError( "Error: could not determine how many training steps remain." ) return stepsRemaining.value # ----- # get_training_user_mode # -----
[docs] @requires_device_not("actpack") @requires_status("connected") def get_training_user_mode(self) -> str: """ Returns the current mode the device is in: either single-user or multi-user. Returns ------- str The device's current mode. See Also -------- :py:meth:`activate_multi_user_mode` :py:meth:`activate_single_user_mode` Notes ----- Training mode is only available if the device is flashed with one of Dephy's controllers. """ self._update_training_data() sleep(1) singleUserMode = c.c_bool() retCode = self._clib.fxIsUsingSavedTrainingData( self.id, c.byref(singleUserMode) ) if retCode != self._SUCCESS.value: raise RuntimeError("Error: could not determine training mode.") if singleUserMode.value: return "single" return "multi"
# ----- # get_training_state # -----
[docs] @requires_device_not("actpack") @requires_status("connected") def get_training_state(self) -> str: """ Returns the current status of training. Can be: ``loading``, ``in_progress``, ``done``, ``walk_training_in_progress``, or ``run_training_in_progress``. Returns ------- str Status of training. Notes ----- Training mode is only available if the device is flashed with one of Dephy's controllers. """ self._update_training_data() # Give the device time to process the request and send the data sleep(0.25) trainingState = c.c_int() retCode = self._clib.fxGetTrainingState(self.id, c.byref(trainingState)) if retCode != self._SUCCESS.value: raise RuntimeError("Error: could not determine training state.") return fxc.training_states[trainingState.value]
# ----- # _update_training_data # ----- @training_warn @validate def _update_training_data(self) -> None: return self._clib.fxUpdateTrainingData(self.id) # ----- # success # ----- @property def success(self) -> int: """ The integer corresponding to a status code of success. Returns ------- int The integer corresponding to a status code of success. """ return self._SUCCESS.value # ----- # failure # ----- @property def failure(self) -> int: """ The integer corresponding to a status code of failure. Returns ------- int The integer corresponding to a status code of failure. """ return self._FAILURE.value # ----- # undefined # ----- @property @minimum_required_version("10.0.0") def undefined(self) -> int: """ The integer corresponding to a status code of undefined. Returns ------- int The integer corresponding to a status code of undefined. """ return self._UNDEFINED.value # ----- # invalidParam # ----- @property def invalidParam(self) -> int: """ The integer corresponding to a status code of invalidParam. Returns ------- int The integer corresponding to a status code of invalidParam. """ return self._INVALID_PARAM.value # ----- # invalidDevice # ----- @property def invalidDevice(self) -> int: """ The integer corresponding to a status code of invalidDevice. Returns ------- int The integer corresponding to a status code of invalidDevice. """ return self._INVALID_DEVICE.value # ----- # isLegacy # ----- @property def isLegacy(self) -> bool: """ Whether or not the device is a legacy device. Returns ------- bool Whether or not the device is a legacy device. """ return self._isLegacy # ----- # libVersion # ----- @property def libVersion(self) -> str: """ Version string of the currently loaded library. Returns ------- str Version string of the currently loaded library. """ return self._libVersion # ----- # log files # -----
[docs] @minimum_required_version("12.0.0") @requires_status("connected") def set_file_name(self, name) -> None: """ Sets the name of the log file Parameters ---------- name : string The desired name of the log file """ return self._clib.fxSetDataLogName(name.encode("utf-8"), self.id)
[docs] @minimum_required_version("12.0.0") @requires_status("connected") def set_file_size(self, size) -> None: """ Sets the size of the log file Parameters ---------- size: int The desired name of the log file """ return self._clib.fxSetLogFileSize(size, self.id)
[docs] @minimum_required_version("12.0.0") @requires_status("connected") def set_log_directory(self, path) -> None: """ Sets the log directory Parameters ---------- path: string The desired path for the log files """ return self._clib.fxSetLogDirectory(path.encode("utf-8"), self.id)
# ----- # connected # ----- @property def connected(self) -> bool: return self._clib.fxIsOpen(self.id) # ----- # streaming # ----- @property def streaming(self) -> bool: if self.connected: return self._clib.fxIsStreaming(self.id) return False # ----- # request_re_config_settings # ----- @minimum_required_version("13.0.0") @validate def _request_re_config_settings(self) -> int: return self._clib.fxRequestRegulateConfigSettings(self.id) # ----- # battery_type # ----- @minimum_required_version("13.0.0") def get_battery_type(self) -> str: self._request_re_config_settings() sleep(1) batteryType = c.c_int() retCode = self._clib.fxGetBatteryType(self.id, c.byref(batteryType)) if retCode != self._SUCCESS.value: raise RuntimeError("Could not read battery type.") return fxc.batteryTypes[batteryType.value] # ----- # battery_type - setter # ----- @minimum_required_version("13.0.0") @validate def set_battery_type(self, batteryType: int | c.c_int) -> None: if isinstance(batteryType, c.c_int): batteryType = batteryType.value if batteryType not in fxc.batteryTypes: raise ValueError(f"Error: invalid battery type {batteryType}") self._request_re_config_settings() sleep(1) batteryType = c.c_int(batteryType) return self._clib.fxSetBatteryType(self.id, batteryType) # ----- # running_led_sequence # ----- @minimum_required_version("13.0.0") def get_running_led_sequence(self) -> str: self._request_re_config_settings() sleep(1) ledSequence = c.c_int() retCode = self._clib.fxGetRunningLEDSequence(self.id, c.byref(ledSequence)) if retCode != self._SUCCESS.value: raise RuntimeError("Could not read running led sequence.") return fxc.ledSequences[ledSequence.value] # ----- # init_led_sequence # ----- @minimum_required_version("13.0.0") def get_init_led_sequence(self) -> str: self._request_re_config_settings() sleep(1) ledSequence = c.c_int() retCode = self._clib.fxGetInitLEDSequence(self.id, c.byref(ledSequence)) if retCode != self._SUCCESS.value: raise RuntimeError("Could not read init led sequence.") return fxc.ledSequences[ledSequence.value] # ----- # init_led_sequence - setter # ----- @minimum_required_version("13.0.0") @validate def set_init_led_sequence(self, ledSequence: int | c.c_int) -> None: if isinstance(ledSequence, c.c_int): ledSequence = ledSequence.value if ledSequence not in fxc.ledSequences: raise ValueError(f"Error: invalid led sequence {ledSequence}") self._request_re_config_settings() sleep(1) ledSequence = c.c_int(ledSequence) return self._clib.fxSetInitLEDSequence(self.id, ledSequence) # ----- # shutoff_led_sequence # ----- @minimum_required_version("13.0.0") def get_shutoff_led_sequence(self) -> str: self._request_re_config_settings() sleep(1) ledSequence = c.c_int() retCode = self._clib.fxGetShutoffLEDSequence(self.id, c.byref(ledSequence)) if retCode != self._SUCCESS.value: raise RuntimeError("Could not read shutoff led sequence.") return fxc.ledSequences[ledSequence.value] # ----- # shutoff_led_sequence - setter # ----- @minimum_required_version("13.0.0") @validate def set_shutoff_led_sequence(self, ledSequence: int | c.c_int) -> None: if isinstance(ledSequence, c.c_int): ledSequence = ledSequence.value if ledSequence not in fxc.ledSequences: raise ValueError(f"Error: invalid ledSequence type {ledSequence}") self._request_re_config_settings() sleep(1) ledSequence = c.c_int(ledSequence) return self._clib.fxSetShutoffLEDSequence(self.id, ledSequence)