Source code for flexsea.utilities.library

import ctypes as c
import os
from pathlib import Path
from typing import Tuple
import zipfile

from botocore.exceptions import EndpointConnectionError
from semantic_version import Version

import flexsea.utilities.constants as fxc

from .aws import s3_download
from .firmware import Firmware
from .system import get_os


# ============================================
#                get_c_library
# ============================================
[docs]def get_c_library( firmwareVersion: Version, libFile: Path | None, timeout: int = 60 ) -> Tuple: """ Loads the correct C library for interacting with the device. If we're given a library file to use, we make sure it exists. If we're not given a library file to use, we check for a cached file on disk corresponding to our firmware version. If we don't have one, we try to download it from S3. We then use ctypes to load the library. Parameters ---------- firmwareVersion : Version The firmware version the library is built to interact with. libFile : Path, None The path to the local library file to load. timeout : int, optional Time, in seconds, spent trying to connect to S3 before an exception is raised. Raises ------ EndpointConnectionError If we cannot connect to the internet in order to download the necessary library. Returns ------- Tuple Contains the ``ctypes.CDLL``, the version of the library, and the full path to the library file. """ if libFile is None: _os = get_os() libFile = fxc.libsPath.joinpath(str(firmwareVersion), _os, fxc.libFiles[_os]) if not libFile.exists(): libFile.parent.mkdir(parents=True, exist_ok=True) libObj = f"{fxc.libsDir}/{firmwareVersion}/{_os}/{libFile.name}" try: s3_download( libObj, fxc.dephyPublicFilesBucket, str(libFile), None, timeout ) except EndpointConnectionError as err: msg = "Error: could not connect to the internet to download the " msg += "necessary C library file. Please connect to the internet and " msg += "try again." print(msg) raise err clib = _load_clib(libFile, timeout) return (_set_prototypes(clib, firmwareVersion), libFile)
# ============================================ # _load_clib # ============================================ def _load_clib(libFile: Path, timeout: int = 60) -> c.CDLL: """ Uses ctypes to actually create an interface to the library file. If we're on Windows, we have to additionally add several directories to the path. """ if not libFile.is_absolute(): libFile = libFile.expanduser().absolute() libFile = str(libFile) if "win" in get_os(): _add_windows_dlls(libFile, timeout) return c.cdll.LoadLibrary(libFile) # ============================================ # _add_windows_dlls # ============================================ def _add_windows_dlls(libFile: str, timeout: int = 60) -> None: """ There are several dlls that are required for the precompiled C lib to work on Windows. These dlls come packaged with Git Bash, but if you're not using Git Bash and you don't have MinGW on your PATH, then trying to use the library will error out. As such, here we make sure that the necessary dlls are on the system and add them to the PATH. If they are not, we download them from S3 and then add them to the PATH. """ opSys = get_os() dllZip = fxc.dephyPath.joinpath("bootloader_tools", opSys, "win_dlls.zip") base = dllZip.name.split(".")[0] extractedDest = Path(os.path.dirname(dllZip)).joinpath(base) if not dllZip.exists(): obj = str(Path("bootloader_tools").joinpath(opSys, "win_dlls.zip").as_posix()) bucket = fxc.dephyPublicFilesBucket s3_download(obj, bucket, str(dllZip), timeout=timeout) with zipfile.ZipFile(dllZip, "r") as archive: archive.extractall(extractedDest) os.add_dll_directory(extractedDest.joinpath("git_bash_mingw64", "bin")) try: os.add_dll_directory(libFile) except OSError as err: msg = f"Error loading precompiled library: `{libFile}`\n" msg += "The most likely cause is a mismatch between the Python, pip and " msg += "shell architectures.\nPlease ensure all three are either 32 or 64 " msg += "bit.\nKeep different versions isolated by virtual environments.\n" print(msg) raise err # ============================================ # _set_prototypes # ============================================ def _set_prototypes(clib: c.CDLL, firmwareVersion: Version) -> c.CDLL: # pylint: disable=too-many-statements # Open clib.fxOpen.argtypes = [c.c_char_p, c.c_uint, c.c_uint] clib.fxOpen.restype = c.c_int clib.fxIsOpen.argtypes = [c.c_uint] clib.fxIsOpen.restype = c.c_bool # Limited open if firmwareVersion >= Version("12.0.0"): try: clib.fxOpenLimited.argtypes = [c.c_char_p] clib.fxOpenLimited.restype = c.c_int # v12 changed how versioning works and employs a development version # that we do not have access to. Further, the libs were uploaded to S3 # all under 12.0.0 regardless of development version, so there are some # version 12s that don't have this function except AttributeError: pass # Close clib.fxClose.argtypes = [ c.c_uint, ] clib.fxClose.restype = c.c_uint # Start streaming clib.fxStartStreaming.argtypes = [c.c_uint, c.c_uint, c.c_bool] clib.fxStartStreaming.restype = c.c_int clib.fxIsStreaming.argtypes = [c.c_uint] clib.fxIsStreaming.restype = c.c_bool # Log file specification if firmwareVersion >= Version("12.0.0"): try: clib.fxSetDataLogName.argtypes = [c.c_char_p, c.c_uint] clib.fxSetDataLogName.restype = None clib.fxSetLogFileSize.argtypes = [c.c_int, c.c_int] clib.fxSetLogFileSize.restype = None clib.fxSetLogDirectory.argtypes = [c.c_char_p, c.c_uint] clib.fxSetLogDirectory.restype = None # v12 changed how versioning works and employs a development version # that we do not have access to. Further, the libs were uploaded to S3 # all under 12.0.0 regardless of development version, so there are some # version 12s that don't have this function except AttributeError: pass # Start streaming with safety if firmwareVersion >= Version("9.1.2"): clib.fxStartStreamingWithSafety.argtypes = [ c.c_uint, c.c_uint, c.c_bool, c.c_uint16, ] clib.fxStartStreamingWithSafety.restype = c.c_int # Stop streaming clib.fxStopStreaming.argtypes = [ c.c_uint, ] clib.fxStopStreaming.restype = c.c_int # Set gains clib.fxSetGains.argtypes = [ c.c_uint, c.c_uint, c.c_uint, c.c_uint, c.c_uint, c.c_uint, c.c_uint, ] clib.fxSetGains.restype = c.c_int # Send motor command clib.fxSendMotorCommand.argtypes = [c.c_uint, c.c_int, c.c_int] clib.fxSendMotorCommand.restype = c.c_int # Find poles clib.fxFindPoles.argtypes = [ c.c_uint, ] clib.fxFindPoles.restype = c.c_int # Activate bootloader clib.fxActivateBootloader.argtypes = [c.c_uint, c.c_uint8] clib.fxActivateBootloader.restype = c.c_int # Is bootloader active clib.fxIsBootloaderActivated.argtypes = [ c.c_uint, ] clib.fxIsBootloaderActivated.restype = c.c_int # Request firmware version clib.fxRequestFirmwareVersion.argtypes = [ c.c_uint, ] clib.fxRequestFirmwareVersion.restype = c.c_int # Get last received firmware version clib.fxGetLastReceivedFirmwareVersion.argtypes = [ c.c_uint, ] clib.fxGetLastReceivedFirmwareVersion.restype = Firmware # Get device type value if firmwareVersion < Version("10.0.0"): clib.fxGetAppType.argtypes = [ c.c_uint, ] clib.fxGetAppType.restype = c.c_int # Get read data queue size clib.fxGetReadDataQueueSize.argtypes = [ c.c_uint, ] clib.fxGetReadDataQueueSize.restype = c.c_int # Start training clib.fxStartTraining.argtypes = [c.c_uint] clib.fxStartTraining.restype = c.c_int # Set single user mode (training data is re-used between power cycles) clib.fxUseSavedTraining.argtypes = [c.c_uint] clib.fxUseSavedTraining.restype = c.c_int # Set multi user mode (training data is not re-used between power cycles) # Must re-train each time clib.fxDoNotUseSaveTraining.argtypes = [c.c_uint] clib.fxDoNotUseSaveTraining.restype = c.c_int # Is the device in single user mode? clib.fxIsUsingSavedTrainingData.argtypes = [c.c_uint, c.POINTER(c.c_bool)] clib.fxIsUsingSavedTrainingData.restype = c.c_int # Request updated training data from the device clib.fxUpdateTrainingData.argtypes = [c.c_uint] clib.fxUpdateTrainingData.restype = c.c_int # Training steps remaining clib.fxGetStepsRemaining.argtypes = [c.c_uint, c.POINTER(c.c_int)] clib.fxGetStepsRemaining.restype = c.c_int # Get training state clib.fxGetTrainingState.argtypes = [c.c_uint, c.POINTER(c.c_int)] clib.fxGetTrainingState.restype = c.c_int if firmwareVersion >= Version("10.0.0"): # Get max device name length clib.fxGetMaxDeviceNameLength.argtypes = [] clib.fxGetMaxDeviceNameLength.restype = c.c_int # Get device name clib.fxGetDeviceTypeNameWrapper.argtypes = [c.c_uint, c.c_char_p] clib.fxGetDeviceTypeNameWrapper.restype = c.c_int # Get max device side length clib.fxGetMaxDeviceSideNameLength.argtypes = [] clib.fxGetMaxDeviceSideNameLength.restype = c.c_int # Get side clib.fxGetDeviceSideNameWrapper.argtypes = [c.c_uint, c.c_char_p] clib.fxGetDeviceSideNameWrapper.restype = c.c_int # Get libs version clib.fxGetLibsVersion.argtypes = [ c.POINTER(c.c_uint16), c.POINTER(c.c_uint16), c.POINTER(c.c_uint16), ] clib.fxGetLibsVersion.restype = c.c_int # Get max field name length clib.fxGetMaxDataLabelLength.argtypes = [] clib.fxGetMaxDataLabelLength.restype = c.c_int # Get fields clib.fxGetDataLabelsWrapper.argtypes = [ c.c_uint, c.POINTER(c.POINTER(c.c_char)), c.POINTER(c.c_int), ] clib.fxGetDataLabelsWrapper.restype = c.c_int # Get max data elements clib.fxGetMaxDataElements.argtypes = [] clib.fxGetMaxDataElements.restype = c.c_int # Request uvlo clib.fxRequestUVLO.argtypes = [ c.c_uint, ] clib.fxRequestUVLO.restype = c.c_int # Read uvlo clib.fxGetLastReceivedUVLO.argtypes = [ c.c_uint, ] clib.fxGetLastReceivedUVLO.restype = c.c_int # Set uvlo clib.fxSetUVLO.argtypes = [c.c_uint, c.c_uint] clib.fxSetUVLO.restype = c.c_int # Somehow, it appears 10.1 doesn't have these try: # Get num utts clib.fxGetNumUtts.argtypes = [] clib.fxGetNumUtts.restype = c.c_int # Set utts clib.fxSetUTT.argtypes = [c.c_uint, c.POINTER(c.c_int), c.c_uint, c.c_byte] clib.fxSetUTT.restype = c.c_int # Reset utts clib.fxSetUTTsToDefault.argtypes = [ c.c_uint, ] clib.fxSetUTTsToDefault.restype = c.c_int # Save utts clib.fxSaveUTTToMemory.argtypes = [ c.c_uint, ] clib.fxSaveUTTToMemory.restype = c.c_int # Request utts clib.fxRequestUTT.argtypes = [ c.c_uint, ] clib.fxRequestUTT.restype = c.c_int # Get last received utts clib.fxGetLastReceivedUTT.argtypes = [ c.c_uint, c.POINTER(c.c_int), c.c_uint, ] clib.fxGetLastReceivedUTT.restype = c.c_int except AttributeError: print("Warning: could not find UTT methods in library.") # IMU Calibration clib.fxSetImuCalibration.argtypes = [ c.c_uint, ] clib.fxSetImuCalibration.restype = c.c_int return clib # ============================================ # set_read_functions # ============================================
[docs]def set_read_functions( clib: c.CDLL, deviceName: str, isLegacy: bool, deviceType: c.Structure | None ) -> c.CDLL: """ Sets the prototypes for the read and read_all functions. Done here and not with the rest of the prototypes because, for legacy devices, we need the device name, which we can't get until we call open, for which we need the function prototypes... We do it here for non-legacy devices because it's easier than passing and worrying about the firmware version to set_prototypes Parameters ---------- clib : CDLL The object on which the prototypes will be set. deviceName : str The name of the device, e.g., actpack. Used to set the correct function. isLegacy : bool Whether or not the device is a legacy device. The two types handle reading quite differently, so we need to know the type in order to set the methods appropriately. deviceType : Structure, None For legacy devices, this includes the fields and data types of the device's data. Raises ------ ValueError If the device type is unknown. Returns ------- CDLL The library object with the set prototypes. """ if isLegacy: if deviceName == "actpack": readFunc = getattr(clib, "fxReadDevice") readAllFunc = getattr(clib, "fxReadDeviceAll") # This also covers XCs, since thoes are reported as exos elif deviceName == "exo": readFunc = getattr(clib, "fxReadExoDevice") readAllFunc = getattr(clib, "fxReadExoDeviceAll") elif deviceName == "md": readFunc = getattr(clib, "fxReadMdDevice") readAllFunc = getattr(clib, "fxReadMdDeviceAll") else: raise ValueError(f"Unknown device: {deviceName}") readFunc.argtypes = [c.c_uint, c.POINTER(deviceType)] readFunc.restype = c.c_int setattr(clib, "read", readFunc) readAllFunc.argtypes = [c.c_uint, c.POINTER(deviceType), c.c_uint] readAllFunc.restype = c.c_int setattr(clib, "read_all", readAllFunc) else: readFunc = getattr(clib, "fxReadDevice") readFunc.argtypes = [c.c_uint, c.POINTER(c.c_int32), c.POINTER(c.c_int)] readFunc.restype = c.c_int setattr(clib, "read", readFunc) readAllFunc = getattr(clib, "fxReadDeviceAllWrapper") readAllFunc.argtypes = [ c.c_uint, c.POINTER(c.POINTER(c.c_int32)), c.POINTER(c.c_int), ] readAllFunc.restype = None setattr(clib, "read_all", readAllFunc) return clib