diff --git a/example.py b/examples/example.py similarity index 100% rename from example.py rename to examples/example.py diff --git a/examples/example_measure_mw.py b/examples/example_measure_mw.py new file mode 100644 index 0000000..97f3f56 --- /dev/null +++ b/examples/example_measure_mw.py @@ -0,0 +1,107 @@ +import time +import threading + +from src.ppk2_api.ppk2_api import PPK2_API + + +class PowerProfiler: + conversion_factor = 1_000_000 # uA to mW + + def __init__(self, voltage_mv=3700): + self.voltage_mv = voltage_mv + self.total_power_mW = 0 + self.total_samples = 0 + + self.lock = threading.Lock() + self.ppk2 = None + self.sampling_thread = None + self.sampling_enabled = False + + def _connect_ppk2(self): + ppk2s_connected = [] + #After reset, the PPK2 can take a moment to reappear on the system. We wait for it to show up for a few seconds before giving up. + for _ in range(10): + ppk2s_connected = PPK2_API.list_devices() + print(f"PPK2 devices found: {ppk2s_connected}") + if ppk2s_connected: + break + time.sleep(0.5) + + if not ppk2s_connected: + raise ConnectionError("No PPK2 devices found!") + + print(ppk2s_connected) + # Just select the first available PPK2 device + for ppk2_port_tuple in ppk2s_connected: + ppk2_port = ppk2_port_tuple[0] #Just get the port part of the tuple + print(f"Connecting to {ppk2_port}") + try: + self.ppk2 = PPK2_API(ppk2_port, timeout=1, write_timeout=1, exclusive=True) + self.ppk2.get_modifiers() + return + except Exception: + print(f"Failed to connect to {ppk2_port}") + self.ppk2 = None + + raise ConnectionError("Could not initialize any detected PPK2 device.") + + + def _setup_ppk2(self): + self._connect_ppk2() + + self.ppk2.set_source_voltage(self.voltage_mv) + self.ppk2.use_source_meter() + self.ppk2.toggle_DUT_power("ON") + + self.ppk2.start_measuring() + print("Initialized Power Profiler") + + + def _run_sampling(self): + try: + self._setup_ppk2() + while self.sampling_enabled: + time.sleep(0.01) + read_data = self.ppk2.get_data() + + if read_data == b"": + continue + + samples, raw_digital = self.ppk2.get_samples(read_data) + if not samples: + continue + + average_current_uA = sum(samples) / len(samples) + average_power_mW = (average_current_uA * self.voltage_mv) / self.conversion_factor + formatted_power = round(average_power_mW, 2) + + with self.lock: + self.total_power_mW += formatted_power + self.total_samples += 1 + average_of_averages_mW = self.total_power_mW / self.total_samples + + print(f"{formatted_power} mW, Avg: {average_of_averages_mW:.2f} mW") + + except Exception as e: + self.sampling_enabled = False + print(f"An error occurred: {e}") + + def start_sampling(self): + self.sampling_enabled = True + self.sampling_thread = threading.Thread(target=self._run_sampling, daemon=True) + self.sampling_thread.start() + + def stop_sampling(self): + self.sampling_enabled = False + self.sampling_thread.join() + + +def main(): + sampler = PowerProfiler(voltage_mv=3800) + sampler.start_sampling() + input("Press Enter to exit...\n") + sampler.stop_sampling() + + +if __name__ == "__main__": + main() diff --git a/example_mp.py b/examples/example_mp.py similarity index 100% rename from example_mp.py rename to examples/example_mp.py diff --git a/src/power_profiler.py b/src/power_profiler.py index 7af0f6f..30d2810 100644 --- a/src/power_profiler.py +++ b/src/power_profiler.py @@ -26,48 +26,43 @@ def __init__(self, serial_port=None, source_voltage_mV=3300, filename=None): self.ppk2 = PPK2_API(serial_port) try: - ret = self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct - print(f"Initialized ppk2 api: {ret}") + self.ppk2.get_modifiers() # try to read modifiers, if it fails serial port is probably not correct + print("Initialized ppk2 api") except Exception as e: print(f"Error initializing power profiler: {e}") - ret = None raise e - if not ret: - self.ppk2 = None - raise Exception(f"Error when initing PowerProfiler with serial port {serial_port}") - else: - self.ppk2.use_source_meter() + self.ppk2.use_source_meter() - self.source_voltage_mV = source_voltage_mV + self.source_voltage_mV = source_voltage_mV - self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V + self.ppk2.set_source_voltage(self.source_voltage_mV) # set to 3.3V - print(f"Set power profiler source voltage: {self.source_voltage_mV}") + print(f"Set power profiler source voltage: {self.source_voltage_mV}") - self.measuring = False - self.current_measurements = [] + self.measuring = False + self.current_measurements = [] - # local variables used to calculate power consumption - self.measurement_start_time = None - self.measurement_stop_time = None + # local variables used to calculate power consumption + self.measurement_start_time = None + self.measurement_stop_time = None - time.sleep(1) + time.sleep(1) - self.stop = False + self.stop = False - self.measurement_thread = Thread(target=self.measurement_loop, daemon=True) - self.measurement_thread.start() + self.measurement_thread = Thread(target=self.measurement_loop, daemon=True) + self.measurement_thread.start() - # write to csv - self.filename = filename - if self.filename is not None: - with open(self.filename, 'w', newline='') as file: - writer = csv.writer(file) - row = [] - for key in ["ts", "avg1000"]: - row.append(key) - writer.writerow(row) + # write to csv + self.filename = filename + if self.filename is not None: + with open(self.filename, 'w', newline='') as file: + writer = csv.writer(file) + row = [] + for key in ["ts", "avg1000"]: + row.append(key) + writer.writerow(row) def write_csv_rows(self, samples): """Write csv row""" @@ -127,7 +122,7 @@ def measurement_loop(self): if self.measuring: # read data if currently measuring read_data = self.ppk2.get_data() if read_data != b'': - samples = self.ppk2.get_samples(read_data) + samples, raw_digital = self.ppk2.get_samples(read_data) self.current_measurements += samples # can easily sum lists, will append individual data time.sleep(0.001) # TODO figure out correct sleep duration @@ -192,4 +187,4 @@ def get_average_charge_mC(self): def get_measurement_duration_s(self): """Returns duration of measurement""" measurement_duration_s = (self.measurement_stop_time - self.measurement_start_time) # measurement duration in seconds - return measurement_duration_s \ No newline at end of file + return measurement_duration_s diff --git a/src/ppk2_api/ppk2_api.py b/src/ppk2_api/ppk2_api.py index 4b39a69..c6e5236 100644 --- a/src/ppk2_api/ppk2_api.py +++ b/src/ppk2_api/ppk2_api.py @@ -7,10 +7,15 @@ import time import serial import struct -import logging import os import queue import threading +from typing import Dict, List, Optional, Tuple + +DeviceInfo = Tuple[str, str] +Mask = Dict[str, int] +RawMeasurement = Tuple[Optional[float], Optional[int]] +SampleBatch = Tuple[List[float], List[int]] class PPK2_Command(): """Serial command opcodes""" @@ -46,7 +51,7 @@ class PPK2_Modes(): class PPK2_API(): - def __init__(self, port: str, **kwargs): + def __init__(self, port: str, **kwargs) -> None: ''' port - port where PPK2 is connected **kwargs - keyword arguments to pass to the pySerial constructor @@ -55,6 +60,12 @@ def __init__(self, port: str, **kwargs): self.ser = None self.ser = serial.Serial(port, **kwargs) self.ser.baudrate = 9600 + # Give some time for the serial to connect + time.sleep(0.05) + # Residual data can be present after a bad shutdown of the PPK2. + # This can cause issues with parsing metadata and getting clean readings. + # To mitigate this, we clear the serial buffer at initialization to ensure we start with a clean slate. + self._empty_serial_buffer() self.modifiers = { "Calibrated": None, @@ -94,8 +105,8 @@ def __init__(self, port: str, **kwargs): # adc measurement buffer remainder and len of remainder self.remainder = {"sequence": b'', "len": 0} - - def __del__(self): + + def __del__(self) -> None: """Destructor""" try: # reset device @@ -103,28 +114,33 @@ def __del__(self): if self.ser: self.ser.close() - except Exception as e: - logging.error(f"An error occured while closing ppk2_api: {e}") + except Exception: + # Destructors should not raise; cleanup is best-effort here. + pass - def _pack_struct(self, cmd_tuple): + def _pack_struct(self, cmd_tuple: Tuple[int, ...]) -> bytes: """Returns packed struct""" return struct.pack("B" * len(cmd_tuple), *cmd_tuple) - def _write_serial(self, cmd_tuple): + def _write_serial(self, cmd_tuple: Tuple[int, ...]) -> None: """Writes cmd bytes to serial""" - try: - cmd_packed = self._pack_struct(cmd_tuple) - self.ser.write(cmd_packed) - except Exception as e: - logging.error(f"An error occured when writing to serial port: {e}") + cmd_packed = self._pack_struct(cmd_tuple) + self.ser.write(cmd_packed) + + def _empty_serial_buffer(self) -> None: + #read until there is no more data in buffer + buf = 1 + while buf: + buf = self.ser.read_all() + time.sleep(0.01) - def _twos_comp(self, val): + def _twos_comp(self, val: int) -> int: """Compute the 2's complement of int32 value""" if (val & (1 << (32 - 1))) != 0: val = val - (1 << 32) # compute negative value return val - def _convert_source_voltage(self, mV): + def _convert_source_voltage(self, mV: int) -> Tuple[int, int]: """Convert input voltage to device command""" # minimal possible mV is 800 if mV < self.vdd_low: @@ -149,19 +165,31 @@ def _convert_source_voltage(self, mV): return set_b_1, set_b_2 - def _read_metadata(self): + def _read_metadata(self) -> str: """Read metadata""" # try to get metadata from device for _ in range(0, 5): - # it appears the second reading is the metadata read = self.ser.read(self.ser.in_waiting) time.sleep(0.1) - # TODO add a read_until serial read function with a timeout - if read != b'' and "END" in read.decode("utf-8"): - return read.decode("utf-8") + if not read: + continue # No data, try again + + # Try decoding the data + try: + metadata = read.decode("utf-8") + except UnicodeDecodeError: + # If decoding fails, try again in next iteration + continue - def _parse_metadata(self, metadata): + # Check if the metadata is valid (i.e., contains "END") + if "END" in metadata: + return metadata + + # If we exit the loop, it means we couldn't get valid metadata + raise ValueError("Could not retrieve valid metadata from the device.") + + def _parse_metadata(self, metadata: str) -> None: """Parse metadata and store it to modifiers""" # TODO handle more robustly try: @@ -181,22 +209,20 @@ def _parse_metadata(self, metadata): else: self.modifiers[key][str(ind)] = float( data_pair[1]) - return True except Exception as e: - # if exception triggers serial port is probably not correct - return None + raise ValueError("Could not parse metadata.") from e - def _generate_mask(self, bits, pos): + def _generate_mask(self, bits: int, pos: int) -> Mask: pos = pos mask = ((2**bits-1) << pos) mask = self._twos_comp(mask) return {"mask": mask, "pos": pos} - def _get_masked_value(self, value, meas, is_bits=False): + def _get_masked_value(self, value: int, meas: Mask, is_bits: bool=False) -> int: masked_value = (value & meas["mask"]) >> meas["pos"] return masked_value - def _handle_raw_data(self, adc_value): + def _handle_raw_data(self, adc_value: int) -> RawMeasurement: """Convert raw value to analog value""" try: current_measurement_range = min(self._get_masked_value( @@ -206,42 +232,49 @@ def _handle_raw_data(self, adc_value): analog_value = self.get_adc_result( current_measurement_range, adc_result) * 10**6 return analog_value, bits - except Exception as e: - print("Measurement outside of range!") + except Exception: return None, None @staticmethod - def list_devices(): + def list_devices() -> List[DeviceInfo]: import serial.tools.list_ports ports = serial.tools.list_ports.comports() if os.name == "nt": devices = [ - (port.device, port.serial_number[:8]) + (port.device, (port.serial_number or "")[:8]) for port in ports - if port.description.startswith("nRF Connect USB CDC ACM") and port.location.endswith("1") + if (port.description or "").startswith("nRF Connect USB CDC ACM") and (port.location or "").endswith("1") ] else: devices = [ - (port.device, port.serial_number[:8]) + (port.device, (port.serial_number or "")[:8]) for port in ports - if port.product == "PPK2" and port.location.endswith("1") + if (port.product or "") == "PPK2" and (port.location or "").endswith("1") ] return devices - def get_data(self): + + def get_data(self) -> bytes: """Return readings of one sampling period""" sampling_data = self.ser.read(self.ser.in_waiting) return sampling_data - def get_modifiers(self): - """Gets and sets modifiers from device memory""" + def get_modifiers(self) -> None: + """ + Retrieve and parse modifiers from the device memory. + """ + + # Send command to request metadata self._write_serial((PPK2_Command.GET_META_DATA, )) - metadata = self._read_metadata() - ret = self._parse_metadata(metadata) - return ret + try: + metadata = self._read_metadata() + self._parse_metadata(metadata) + except ValueError as e: + raise ValueError(f"Failed to read valid metadata: {e}") from e - def start_measuring(self): + + def start_measuring(self) -> None: """Start continuous measurement""" if not self.current_vdd: if self.mode == PPK2_Modes.SOURCE_MODE: @@ -251,11 +284,11 @@ def start_measuring(self): self._write_serial((PPK2_Command.AVERAGE_START, )) - def stop_measuring(self): + def stop_measuring(self) -> None: """Stop continuous measurement""" self._write_serial((PPK2_Command.AVERAGE_STOP, )) - def set_source_voltage(self, mV): + def set_source_voltage(self, mV: int) -> None: """Inits device - based on observation only REGULATOR_SET is the command. The other two values correspond to the voltage level. @@ -265,7 +298,7 @@ def set_source_voltage(self, mV): self._write_serial((PPK2_Command.REGULATOR_SET, b_1, b_2)) self.current_vdd = mV - def toggle_DUT_power(self, state): + def toggle_DUT_power(self, state: str) -> None: """Toggle DUT power based on parameter""" if state == "ON": self._write_serial( @@ -275,19 +308,19 @@ def toggle_DUT_power(self, state): self._write_serial( (PPK2_Command.DEVICE_RUNNING_SET, PPK2_Command.NO_OP)) # 12,0 - def use_ampere_meter(self): + def use_ampere_meter(self) -> None: """Configure device to use ampere meter""" self.mode = PPK2_Modes.AMPERE_MODE self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.TRIGGER_SET)) # 17,1 - def use_source_meter(self): + def use_source_meter(self) -> None: """Configure device to use source meter""" self.mode = PPK2_Modes.SOURCE_MODE self._write_serial((PPK2_Command.SET_POWER_MODE, PPK2_Command.AVG_NUM_SET)) # 17,2 - def get_adc_result(self, current_range, adc_value): + def get_adc_result(self, current_range: int, adc_value: int) -> float: """Get result of adc conversion""" current_range = str(current_range) result_without_gain = (adc_value - self.modifiers["O"][current_range]) * ( @@ -332,11 +365,11 @@ def get_adc_result(self, current_range, adc_value): self.prev_range = current_range return adc - def _digital_to_analog(self, adc_value): + def _digital_to_analog(self, adc_value: bytes) -> int: """Convert discrete value to analog value""" return int.from_bytes(adc_value, byteorder="little", signed=False) # convert reading to analog value - def digital_channels(self, bits): + def digital_channels(self, bits: List[int]) -> List[List[int]]: """ Convert raw digital data to digital channels. @@ -356,7 +389,7 @@ def digital_channels(self, bits): digital_channels[7].append((sample & 128) >> 7) return digital_channels - def get_samples(self, buf): + def get_samples(self, buf: bytes) -> SampleBatch: """ Returns list of samples read in one sampling period. The number of sampled values depends on the delay between serial reads. @@ -402,7 +435,7 @@ class PPK_Fetch(threading.Thread): ''' Background process for polling the data in multi-threaded variant ''' - def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): + def __init__(self, ppk2: "PPK2_API", quit_evt: threading.Event, buffer_len_s: float=10, buffer_chunk_s: float=0.5) -> None: super().__init__() self._ppk2 = ppk2 self._quit = quit_evt @@ -422,7 +455,7 @@ def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): self._buffer_q = queue.Queue() - def run(self): + def run(self) -> None: s = 0 t = time.time() local_buffer = b'' @@ -457,7 +490,7 @@ def run(self): except queue.Empty: break - def get_data(self): + def get_data(self) -> bytes: ret = b'' count = 0 while True: @@ -474,7 +507,7 @@ class PPK2_MP(PPK2_API): Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns a background process on start_measuring() ''' - def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, **kwargs): + def __init__(self, port: str, buffer_max_size_seconds: float=10, buffer_chunk_seconds: float=0.1, **kwargs) -> None: ''' port - port where PPK2 is connected buffer_max_size_seconds - how many seconds of data to keep in the buffer @@ -488,7 +521,7 @@ def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1, * self._buffer_max_size_seconds = buffer_max_size_seconds self._buffer_chunk_seconds = buffer_chunk_seconds - def __del__(self): + def __del__(self) -> None: """Destructor""" PPK2_API.stop_measuring(self) self._quit_evt.clear() @@ -499,7 +532,7 @@ def __del__(self): self._fetcher = None del self._fetcher - def start_measuring(self): + def start_measuring(self) -> None: # discard the data in the buffer self.stop_measuring() while self.get_data()!=b'': @@ -513,7 +546,7 @@ def start_measuring(self): self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds) self._fetcher.start() - def stop_measuring(self): + def stop_measuring(self) -> None: PPK2_API.stop_measuring(self) self.get_data() # flush the serial buffer (to prevent unicode error on next command) self._quit_evt.set() @@ -521,7 +554,7 @@ def stop_measuring(self): self._fetcher.join() # join() will block if the queue isn't empty self._fetcher = None - def get_data(self): + def get_data(self) -> bytes: try: return self._fetcher.get_data() except (TypeError, AttributeError):