From a489d373ae64e5369cda83d16db981f1b39471ca Mon Sep 17 00:00:00 2001 From: Tjomsaas Date: Tue, 17 Dec 2024 15:10:30 +0100 Subject: [PATCH 1/7] Fixed issue where unicode exception would be thrown if a previous PPK2 session were not terminated properly due to garbage data coming along with the metadata. We now expect them and try to get metadata one more time, but this time without garbage from previous session --- src/ppk2_api/ppk2_api.py | 55 ++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index 4b39a69..d2d902d 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -153,13 +153,25 @@ def _read_metadata(self): """Read metadata""" # try to get metadata from device for _ in range(0, 5): - # it appears the second reading is the metadata read = self.ser.read(self.ser.in_waiting) time.sleep(0.1) - # TODO add a read_until serial read function with a timeout - if read != b'' and "END" in read.decode("utf-8"): - return read.decode("utf-8") + if not read: + continue # No data, try again + + # Try decoding the data + try: + metadata = read.decode("utf-8") + except UnicodeDecodeError: + # If decoding fails, try again in next iteration + continue + + # Check if the metadata is valid (i.e., contains "END") + if "END" in metadata: + return metadata + + # If we exit the loop, it means we couldn't get valid metadata + raise ValueError("Could not retrieve valid metadata from the device.") def _parse_metadata(self, metadata): """Parse metadata and store it to modifiers""" @@ -229,17 +241,40 @@ def list_devices(): ] return devices + def get_data(self): """Return readings of one sampling period""" sampling_data = self.ser.read(self.ser.in_waiting) return sampling_data - def get_modifiers(self): - """Gets and sets modifiers from device memory""" - self._write_serial((PPK2_Command.GET_META_DATA, )) - metadata = self._read_metadata() - ret = self._parse_metadata(metadata) - return ret + def get_modifiers(self, retries=2): + """ + Retrieve and parse modifiers from the device memory, with optional retries. + + In cases where the PPK2 tool did not shut down gracefully, the device may still + hold residual data from the previous session. The first GET_META_DATA command + may return a mix of valid metadata and garbage. Rather than parsing + and filtering out this garbage on the first try, issuing the GET_META_DATA command + again often yields clean data. This function will retry up to the + specified number of times before giving up. + """ + + for attempt in range(1, retries + 1): + # Send command to request metadata + self._write_serial((PPK2_Command.GET_META_DATA, )) + try: + metadata = self._read_metadata() + ret = self._parse_metadata(metadata) + print(f"Attempt {attempt}/{retries} - Got metadata from PPK2") + + return ret + except ValueError as e: + print(f"Attempt {attempt}/{retries} - Failed to get valid PPK2 metadata: {e}") + # If this wasn't the last attempt, we try again by sending GET_META_DATA again. + + + print("Failed to get modifiers after multiple attempts.") + return None def start_measuring(self): """Start continuous measurement""" From 683d0c4c5b5804d82e1ebb7919a165a7e42dd4fd Mon Sep 17 00:00:00 2001 From: Tjomsaas Date: Tue, 17 Dec 2024 15:12:03 +0100 Subject: [PATCH 2/7] Added example to reproduce/showcase error and fix. This examples should also just work after connecting to a PPK2 via USB --- example.py => examples/example.py | 0 examples/example_measure_mw.py | 97 +++++++++++++++++++++++++ example_mp.py => examples/example_mp.py | 0 3 files changed, 97 insertions(+) rename example.py => examples/example.py (100%) create mode 100644 examples/example_measure_mw.py rename example_mp.py => examples/example_mp.py (100%) diff --git a/example.py b/examples/example.py similarity index 100% rename from example.py rename to examples/example.py diff --git a/examples/example_measure_mw.py b/examples/example_measure_mw.py new file mode 100644 index 0000000..e966b1b --- /dev/null +++ b/examples/example_measure_mw.py @@ -0,0 +1,97 @@ +import time +import threading + +from src.ppk2_api.ppk2_api import PPK2_API + + +class PowerProfiler: + conversion_factor = 1_000_000 # uA to mW + + def __init__(self, voltage_mv=3700): + self.voltage_mv = voltage_mv + self.total_power_mW = 0 + self.total_samples = 0 + + self.lock = threading.Lock() + self.ppk2 = None + self.sampling_thread = None + self.sampling_enabled = False + + + def _setup_ppk2(self): + ppk2s_connected = PPK2_API.list_devices() + + # Check if we have at least one PPK2 device + if not ppk2s_connected: + raise ConnectionError("No PPK2 devices found!") + + print(ppk2s_connected) + # Just select the first available PPK2 device + for ppk2_port_tuple in ppk2s_connected: + ppk2_port = ppk2_port_tuple[0] #Just get the port part of the tuple + print(f"Connecting to {ppk2_port}") + + self.ppk2 = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) + + ret = self.ppk2.get_modifiers() + if ret is not None: + break + + print(f"Failed to connect to {ppk2_port}") + + self.ppk2.set_source_voltage(self.voltage_mv) + self.ppk2.use_source_meter() + self.ppk2.toggle_DUT_power("ON") + + self.ppk2.start_measuring() + print("Initialized Power Profiler") + + + def _run_sampling(self): + try: + self._setup_ppk2() + while self.sampling_enabled: + time.sleep(0.01) + read_data = self.ppk2.get_data() + + if read_data == b"": + continue + + samples, raw_digital = self.ppk2.get_samples(read_data) + if not samples: + continue + + average_current_uA = sum(samples) / len(samples) + average_power_mW = (average_current_uA * self.voltage_mv) / self.conversion_factor + formatted_power = round(average_power_mW, 2) + + with self.lock: + self.total_power_mW += formatted_power + self.total_samples += 1 + average_of_averages_mW = self.total_power_mW / self.total_samples + + print(f"{formatted_power} mW, Avg: {average_of_averages_mW:.2f} mW") + + except Exception as e: + self.sampling_enabled = False + print(f"An error occurred: {e}") + + def start_sampling(self): + self.sampling_enabled = True + self.sampling_thread = threading.Thread(target=self._run_sampling, daemon=True) + self.sampling_thread.start() + + def stop_sampling(self): + self.sampling_enabled = False + self.sampling_thread.join() + + +def main(): + sampler = PowerProfiler(voltage_mv=3800) + sampler.start_sampling() + input("Press Enter to exit...\n") + sampler.stop_sampling() + + +if __name__ == "__main__": + main() diff --git a/example_mp.py b/examples/example_mp.py similarity index 100% rename from example_mp.py rename to examples/example_mp.py From 36e145a24cf46bfaebfc53a34dbd5de514689a76 Mon Sep 17 00:00:00 2001 From: Tjomsaas Date: Tue, 14 Apr 2026 15:22:31 +0200 Subject: [PATCH 3/7] Empty buffer on ppk2 init, remove retries on metadata as it was rather treating the symptom instead of the root cause --- src/ppk2_api/ppk2_api.py | 55 +++++++++++++++++++++------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index d2d902d..1d226e1 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -55,6 +55,12 @@ def __init__(self, port: str, **kwargs): self.ser = None self.ser = serial.Serial(port, **kwargs) self.ser.baudrate = 9600 + # Give some time for the serial to connect + time.sleep(0.05) + # Residual data can be present after a bad shutdown of the PPK2. + # This can cause issues with parsing metadata and getting clean readings. + # To mitigate this, we clear the serial buffer at initialization to ensure we start with a clean slate. + self._empty_serial_buffer() self.modifiers = { "Calibrated": None, @@ -94,7 +100,7 @@ def __init__(self, port: str, **kwargs): # adc measurement buffer remainder and len of remainder self.remainder = {"sequence": b'', "len": 0} - + def __del__(self): """Destructor""" try: @@ -118,6 +124,15 @@ def _write_serial(self, cmd_tuple): except Exception as e: logging.error(f"An error occured when writing to serial port: {e}") + def _empty_serial_buffer(self): + #read until there is no more data in buffer + logging.debug("Emptying serial buffer...") + buf = 1 + while buf: + buf = self.ser.read_all() + logging.debug(f"Serial buffer cleared with {len(buf)} bytes read.") + time.sleep(0.01) + def _twos_comp(self, val): """Compute the 2's complement of int32 value""" if (val & (1 << (32 - 1))) != 0: @@ -194,7 +209,7 @@ def _parse_metadata(self, metadata): self.modifiers[key][str(ind)] = float( data_pair[1]) return True - except Exception as e: + except Exception: # if exception triggers serial port is probably not correct return None @@ -218,8 +233,8 @@ def _handle_raw_data(self, adc_value): analog_value = self.get_adc_result( current_measurement_range, adc_result) * 10**6 return analog_value, bits - except Exception as e: - print("Measurement outside of range!") + except Exception: + logging.error("Measurement outside of range!") return None, None @staticmethod @@ -247,33 +262,21 @@ def get_data(self): sampling_data = self.ser.read(self.ser.in_waiting) return sampling_data - def get_modifiers(self, retries=2): + def get_modifiers(self): """ - Retrieve and parse modifiers from the device memory, with optional retries. - - In cases where the PPK2 tool did not shut down gracefully, the device may still - hold residual data from the previous session. The first GET_META_DATA command - may return a mix of valid metadata and garbage. Rather than parsing - and filtering out this garbage on the first try, issuing the GET_META_DATA command - again often yields clean data. This function will retry up to the - specified number of times before giving up. + Retrieve and parse modifiers from the device memory. """ - for attempt in range(1, retries + 1): - # Send command to request metadata - self._write_serial((PPK2_Command.GET_META_DATA, )) - try: - metadata = self._read_metadata() - ret = self._parse_metadata(metadata) - print(f"Attempt {attempt}/{retries} - Got metadata from PPK2") - - return ret - except ValueError as e: - print(f"Attempt {attempt}/{retries} - Failed to get valid PPK2 metadata: {e}") - # If this wasn't the last attempt, we try again by sending GET_META_DATA again. + # Send command to request metadata + self._write_serial((PPK2_Command.GET_META_DATA, )) + try: + metadata = self._read_metadata() + ret = self._parse_metadata(metadata) + return ret + except ValueError as e: + raise ValueError(f"Failed to read valid metadata: {e}") - print("Failed to get modifiers after multiple attempts.") return None def start_measuring(self): From bd2a0da120348edc4b45fe651bcc1a4701f68fad Mon Sep 17 00:00:00 2001 From: Tjomsaas Date: Tue, 14 Apr 2026 15:23:26 +0200 Subject: [PATCH 4/7] Add retries on connection as a bad exit can cause the serial port to not be immidiatly discovered --- examples/example_measure_mw.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/examples/example_measure_mw.py b/examples/example_measure_mw.py index e966b1b..7fb808a 100644 --- a/examples/example_measure_mw.py +++ b/examples/example_measure_mw.py @@ -17,11 +17,16 @@ def __init__(self, voltage_mv=3700): self.sampling_thread = None self.sampling_enabled = False + def _connect_ppk2(self): + ppk2s_connected = [] + #After reset, the PPK2 can take a moment to reappear on the system. We wait for it to show up for a few seconds before giving up. + for _ in range(10): + ppk2s_connected = PPK2_API.list_devices() + print(f"PPK2 devices found: {ppk2s_connected}") + if ppk2s_connected: + break + time.sleep(0.5) - def _setup_ppk2(self): - ppk2s_connected = PPK2_API.list_devices() - - # Check if we have at least one PPK2 device if not ppk2s_connected: raise ConnectionError("No PPK2 devices found!") @@ -38,7 +43,11 @@ def _setup_ppk2(self): break print(f"Failed to connect to {ppk2_port}") + + def _setup_ppk2(self): + self._connect_ppk2() + self.ppk2.set_source_voltage(self.voltage_mv) self.ppk2.use_source_meter() self.ppk2.toggle_DUT_power("ON") From 2b12d1db703b074807b9f0cd2d74159a5f6cb986 Mon Sep 17 00:00:00 2001 From: Tjomsaas Date: Tue, 14 Apr 2026 15:38:49 +0200 Subject: [PATCH 5/7] Remove logging and raise exceptions instead. Calling logger.*(message) messes up with higher level application loggers. The application using the ppk2 lib should decide what to do with the exceptions and thereby log what they need. --- src/ppk2_api/ppk2_api.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index 1d226e1..872c045 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -7,7 +7,6 @@ import time import serial import struct -import logging import os import queue import threading @@ -109,8 +108,9 @@ def __del__(self): if self.ser: self.ser.close() - except Exception as e: - logging.error(f"An error occured while closing ppk2_api: {e}") + except Exception: + # Destructors should not raise; cleanup is best-effort here. + pass def _pack_struct(self, cmd_tuple): """Returns packed struct""" @@ -118,19 +118,14 @@ def _pack_struct(self, cmd_tuple): def _write_serial(self, cmd_tuple): """Writes cmd bytes to serial""" - try: - cmd_packed = self._pack_struct(cmd_tuple) - self.ser.write(cmd_packed) - except Exception as e: - logging.error(f"An error occured when writing to serial port: {e}") + cmd_packed = self._pack_struct(cmd_tuple) + self.ser.write(cmd_packed) def _empty_serial_buffer(self): #read until there is no more data in buffer - logging.debug("Emptying serial buffer...") buf = 1 while buf: buf = self.ser.read_all() - logging.debug(f"Serial buffer cleared with {len(buf)} bytes read.") time.sleep(0.01) def _twos_comp(self, val): @@ -234,7 +229,6 @@ def _handle_raw_data(self, adc_value): current_measurement_range, adc_result) * 10**6 return analog_value, bits except Exception: - logging.error("Measurement outside of range!") return None, None @staticmethod From df7706f48da4cde4743a7c176d1eaf763768dd33 Mon Sep 17 00:00:00 2001 From: Tjomsaas Date: Tue, 14 Apr 2026 15:48:28 +0200 Subject: [PATCH 6/7] Fixed borken api call in power_profiler.py --- src/power_profiler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/power_profiler.py b/src/power_profiler.py index 7af0f6f..77e545c 100644 --- a/src/power_profiler.py +++ b/src/power_profiler.py @@ -127,7 +127,7 @@ def measurement_loop(self): if self.measuring: # read data if currently measuring read_data = self.ppk2.get_data() if read_data != b'': - samples = self.ppk2.get_samples(read_data) + samples, raw_digital = self.ppk2.get_samples(read_data) self.current_measurements += samples # can easily sum lists, will append individual data time.sleep(0.001) # TODO figure out correct sleep duration @@ -192,4 +192,4 @@ def get_average_charge_mC(self): def get_measurement_duration_s(self): """Returns duration of measurement""" measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds - return measurement_duration_s \ No newline at end of file + return measurement_duration_s From 047c3066dd4132d6b65f5dd3fb778e40a06e76e4 Mon Sep 17 00:00:00 2001 From: Tjomsaas Date: Tue, 14 Apr 2026 16:09:31 +0200 Subject: [PATCH 7/7] Harden API by adding types to functions --- examples/example_measure_mw.py | 17 +++--- src/power_profiler.py | 53 +++++++++---------- src/ppk2_api/ppk2_api.py | 95 +++++++++++++++++----------------- 3 files changed, 81 insertions(+), 84 deletions(-) diff --git a/examples/example_measure_mw.py b/examples/example_measure_mw.py index 7fb808a..97f3f56 100644 --- a/examples/example_measure_mw.py +++ b/examples/example_measure_mw.py @@ -35,14 +35,15 @@ def _connect_ppk2(self): for ppk2_port_tuple in ppk2s_connected: ppk2_port = ppk2_port_tuple[0] #Just get the port part of the tuple print(f"Connecting to {ppk2_port}") - - self.ppk2 = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) - - ret = self.ppk2.get_modifiers() - if ret is not None: - break - - print(f"Failed to connect to {ppk2_port}") + try: + self.ppk2 = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) + self.ppk2.get_modifiers() + return + except Exception: + print(f"Failed to connect to {ppk2_port}") + self.ppk2 = None + + raise ConnectionError("Could not initialize any detected PPK2 device.") def _setup_ppk2(self): diff --git a/src/power_profiler.py b/src/power_profiler.py index 77e545c..30d2810 100644 --- a/src/power_profiler.py +++ b/src/power_profiler.py @@ -26,48 +26,43 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): self.ppk2 = PPK2_API(serial_port) try: - ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct - print(f"Initialized ppk2 api: {ret}") + self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct + print("Initialized ppk2 api") except Exception as e: print(f"Error initializing power profiler: {e}") - ret = None raise e - if not ret: - self.ppk2 = None - raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}") - else: - self.ppk2.use_source_meter() + self.ppk2.use_source_meter() - self.source_voltage_mV = source_voltage_mV + self.source_voltage_mV = source_voltage_mV - self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V + self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V - print(f"Set power profiler source voltage: {self.source_voltage_mV}") + print(f"Set power profiler source voltage: {self.source_voltage_mV}") - self.measuring = False - self.current_measurements = [] + self.measuring = False + self.current_measurements = [] - # local variables used to calculate power consumption - self.measurement_start_time = None - self.measurement_stop_time = None + # local variables used to calculate power consumption + self.measurement_start_time = None + self.measurement_stop_time = None - time.sleep(1) + time.sleep(1) - self.stop = False + self.stop = False - self.measurement_thread = Thread(target=self.measurement_loop, daemon=True) - self.measurement_thread.start() + self.measurement_thread = Thread(target=self.measurement_loop, daemon=True) + self.measurement_thread.start() - # write to csv - self.filename = filename - if self.filename is not None: - with open(self.filename, 'w', newline='') as file: - writer = csv.writer(file) - row = [] - for key in ["ts", "avg1000"]: - row.append(key) - writer.writerow(row) + # write to csv + self.filename = filename + if self.filename is not None: + with open(self.filename, 'w', newline='') as file: + writer = csv.writer(file) + row = [] + for key in ["ts", "avg1000"]: + row.append(key) + writer.writerow(row) def write_csv_rows(self, samples): """Write csv row""" diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index 872c045..c6e5236 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -10,6 +10,12 @@ import os import queue import threading +from typing import Dict, List, Optional, Tuple + +DeviceInfo = Tuple[str, str] +Mask = Dict[str, int] +RawMeasurement = Tuple[Optional[float], Optional[int]] +SampleBatch = Tuple[List[float], List[int]] class PPK2_Command(): """Serial command opcodes""" @@ -45,7 +51,7 @@ class PPK2_Modes(): class PPK2_API(): - def __init__(self, port: str, **kwargs): + def __init__(self, port: str, **kwargs) -> None: ''' port - port where PPK2 is connected **kwargs - keyword arguments to pass to the pySerial constructor @@ -100,7 +106,7 @@ def __init__(self, port: str, **kwargs): # adc measurement buffer remainder and len of remainder self.remainder = {"sequence": b'', "len": 0} - def __del__(self): + def __del__(self) -> None: """Destructor""" try: # reset device @@ -112,29 +118,29 @@ def __del__(self): # Destructors should not raise; cleanup is best-effort here. pass - def _pack_struct(self, cmd_tuple): + def _pack_struct(self, cmd_tuple: Tuple[int, ...]) -> bytes: """Returns packed struct""" return struct.pack("B" * len(cmd_tuple), *cmd_tuple) - def _write_serial(self, cmd_tuple): + def _write_serial(self, cmd_tuple: Tuple[int, ...]) -> None: """Writes cmd bytes to serial""" cmd_packed = self._pack_struct(cmd_tuple) self.ser.write(cmd_packed) - def _empty_serial_buffer(self): + def _empty_serial_buffer(self) -> None: #read until there is no more data in buffer buf = 1 while buf: buf = self.ser.read_all() time.sleep(0.01) - def _twos_comp(self, val): + def _twos_comp(self, val: int) -> int: """Compute the 2's complement of int32 value""" if (val & (1 << (32 - 1))) != 0: val = val - (1 << 32) # compute negative value return val - def _convert_source_voltage(self, mV): + def _convert_source_voltage(self, mV: int) -> Tuple[int, int]: """Convert input voltage to device command""" # minimal possible mV is 800 if mV < self.vdd_low: @@ -159,7 +165,7 @@ def _convert_source_voltage(self, mV): return set_b_1, set_b_2 - def _read_metadata(self): + def _read_metadata(self) -> str: """Read metadata""" # try to get metadata from device for _ in range(0, 5): @@ -183,7 +189,7 @@ def _read_metadata(self): # If we exit the loop, it means we couldn't get valid metadata raise ValueError("Could not retrieve valid metadata from the device.") - def _parse_metadata(self, metadata): + def _parse_metadata(self, metadata: str) -> None: """Parse metadata and store it to modifiers""" # TODO handle more robustly try: @@ -203,22 +209,20 @@ def _parse_metadata(self, metadata): else: self.modifiers[key][str(ind)] = float( data_pair[1]) - return True - except Exception: - # if exception triggers serial port is probably not correct - return None + except Exception as e: + raise ValueError("Could not parse metadata.") from e - def _generate_mask(self, bits, pos): + def _generate_mask(self, bits: int, pos: int) -> Mask: pos = pos mask = ((2**bits-1) << pos) mask = self._twos_comp(mask) return {"mask": mask, "pos": pos} - def _get_masked_value(self, value, meas, is_bits=False): + def _get_masked_value(self, value: int, meas: Mask, is_bits: bool=False) -> int: masked_value = (value & meas["mask"]) >> meas["pos"] return masked_value - def _handle_raw_data(self, adc_value): + def _handle_raw_data(self, adc_value: int) -> RawMeasurement: """Convert raw value to analog value""" try: current_measurement_range = min(self._get_masked_value( @@ -232,31 +236,31 @@ def _handle_raw_data(self, adc_value): return None, None @staticmethod - def list_devices(): + def list_devices() -> List[DeviceInfo]: import serial.tools.list_ports ports = serial.tools.list_ports.comports() if os.name == "nt": devices = [ - (port.device, port.serial_number[:8]) + (port.device, (port.serial_number or "")[:8]) for port in ports - if port.description.startswith("nRF Connect USB CDC ACM") and port.location.endswith("1") + if (port.description or "").startswith("nRF Connect USB CDC ACM") and (port.location or "").endswith("1") ] else: devices = [ - (port.device, port.serial_number[:8]) + (port.device, (port.serial_number or "")[:8]) for port in ports - if port.product == "PPK2" and port.location.endswith("1") + if (port.product or "") == "PPK2" and (port.location or "").endswith("1") ] return devices - def get_data(self): + def get_data(self) -> bytes: """Return readings of one sampling period""" sampling_data = self.ser.read(self.ser.in_waiting) return sampling_data - def get_modifiers(self): + def get_modifiers(self) -> None: """ Retrieve and parse modifiers from the device memory. """ @@ -264,16 +268,13 @@ def get_modifiers(self): # Send command to request metadata self._write_serial((PPK2_Command.GET_META_DATA, )) try: - metadata = self._read_metadata() - ret = self._parse_metadata(metadata) - return ret + metadata = self._read_metadata() + self._parse_metadata(metadata) except ValueError as e: - raise ValueError(f"Failed to read valid metadata: {e}") + raise ValueError(f"Failed to read valid metadata: {e}") from e - return None - - def start_measuring(self): + def start_measuring(self) -> None: """Start continuous measurement""" if not self.current_vdd: if self.mode == PPK2_Modes.SOURCE_MODE: @@ -283,11 +284,11 @@ def start_measuring(self): self._write_serial((PPK2_Command.AVERAGE_START, )) - def stop_measuring(self): + def stop_measuring(self) -> None: """Stop continuous measurement""" self._write_serial((PPK2_Command.AVERAGE_STOP, )) - def set_source_voltage(self, mV): + def set_source_voltage(self, mV: int) -> None: """Inits device - based on observation only REGULATOR_SET is the command. The other two values correspond to the voltage level. @@ -297,7 +298,7 @@ def set_source_voltage(self, mV): self._write_serial((PPK2_Command.REGULATOR_SET, b_1, b_2)) self.current_vdd = mV - def toggle_DUT_power(self, state): + def toggle_DUT_power(self, state: str) -> None: """Toggle DUT power based on parameter""" if state == "ON": self._write_serial( @@ -307,19 +308,19 @@ def toggle_DUT_power(self, state): self._write_serial( (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) # 12,0 - def use_ampere_meter(self): + def use_ampere_meter(self) -> None: """Configure device to use ampere meter""" self.mode = PPK2_Modes.AMPERE_MODE self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.TRIGGER_SET)) # 17,1 - def use_source_meter(self): + def use_source_meter(self) -> None: """Configure device to use source meter""" self.mode = PPK2_Modes.SOURCE_MODE self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.AVG_NUM_SET)) # 17,2 - def get_adc_result(self, current_range, adc_value): + def get_adc_result(self, current_range: int, adc_value: int) -> float: """Get result of adc conversion""" current_range = str(current_range) result_without_gain = (adc_value - self.modifiers["O"][current_range]) * ( @@ -364,11 +365,11 @@ def get_adc_result(self, current_range, adc_value): self.prev_range = current_range return adc - def _digital_to_analog(self, adc_value): + def _digital_to_analog(self, adc_value: bytes) -> int: """Convert discrete value to analog value""" return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value - def digital_channels(self, bits): + def digital_channels(self, bits: List[int]) -> List[List[int]]: """ Convert raw digital data to digital channels. @@ -388,7 +389,7 @@ def digital_channels(self, bits): digital_channels[7].append((sample & 128) >> 7) return digital_channels - def get_samples(self, buf): + def get_samples(self, buf: bytes) -> SampleBatch: """ Returns list of samples read in one sampling period. The number of sampled values depends on the delay between serial reads. @@ -434,7 +435,7 @@ class PPK_Fetch(threading.Thread): ''' Background process for polling the data in multi-threaded variant ''' - def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): + def __init__(self, ppk2: "PPK2_API", quit_evt: threading.Event, buffer_len_s: float=10, buffer_chunk_s: float=0.5) -> None: super().__init__() self._ppk2 = ppk2 self._quit = quit_evt @@ -454,7 +455,7 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): self._buffer_q = queue.Queue() - def run(self): + def run(self) -> None: s = 0 t = time.time() local_buffer = b'' @@ -489,7 +490,7 @@ def run(self): except queue.Empty: break - def get_data(self): + def get_data(self) -> bytes: ret = b'' count = 0 while True: @@ -506,7 +507,7 @@ class PPK2_MP(PPK2_API): Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns a background process on start_measuring() ''' - def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs): + def __init__(self, port: str, buffer_max_size_seconds: float=10, buffer_chunk_seconds: float=0.1, **kwargs) -> None: ''' port - port where PPK2 is connected buffer_max_size_seconds - how many seconds of data to keep in the buffer @@ -520,7 +521,7 @@ def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, * self._buffer_max_size_seconds = buffer_max_size_seconds self._buffer_chunk_seconds = buffer_chunk_seconds - def __del__(self): + def __del__(self) -> None: """Destructor""" PPK2_API.stop_measuring(self) self._quit_evt.clear() @@ -531,7 +532,7 @@ def __del__(self): self._fetcher = None del self._fetcher - def start_measuring(self): + def start_measuring(self) -> None: # discard the data in the buffer self.stop_measuring() while self.get_data()!=b'': @@ -545,7 +546,7 @@ def start_measuring(self): self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds) self._fetcher.start() - def stop_measuring(self): + def stop_measuring(self) -> None: PPK2_API.stop_measuring(self) self.get_data() # flush the serial buffer (to prevent unicode error on next command) self._quit_evt.set() @@ -553,7 +554,7 @@ def stop_measuring(self): self._fetcher.join() # join() will block if the queue isn't empty self._fetcher = None - def get_data(self): + def get_data(self) -> bytes: try: return self._fetcher.get_data() except (TypeError, AttributeError):