diff --git a/src/qkit/analysis/circle_fit/circle_fit_2019/circuit.py b/src/qkit/analysis/circle_fit/circle_fit_2019/circuit.py index 86611b216..24eda4c22 100644 --- a/src/qkit/analysis/circle_fit/circle_fit_2019/circuit.py +++ b/src/qkit/analysis/circle_fit/circle_fit_2019/circuit.py @@ -419,7 +419,7 @@ def _fit_phase(self, z_data, guesses=None): phase = np.unwrap(np.angle(z_data)) # For centered circle roll-off should be close to 2pi. If not warn user. - if np.max(phase) - np.min(phase) <= 0.8*2*np.pi: + if np.max(phase) - np.min(phase) <= 0.4*2*np.pi: logging.warning( "Data does not cover a full circle (only {:.1f}".format( np.max(phase) - np.min(phase) @@ -427,6 +427,8 @@ def _fit_phase(self, z_data, guesses=None): +" rad). Increase the frequency span around the resonance?" ) roll_off = np.max(phase) - np.min(phase) + elif np.max(phase) - np.min(phase) <= 0.8*2*np.pi: + roll_off = np.max(phase) - np.min(phase) else: roll_off = 2*np.pi diff --git a/src/qkit/analysis/crit_detection_iv.py b/src/qkit/analysis/crit_detection_iv.py new file mode 100644 index 000000000..7d5ecf952 --- /dev/null +++ b/src/qkit/analysis/crit_detection_iv.py @@ -0,0 +1,91 @@ +import itertools +from typing import override, Any, Literal +from enum import Enum +from scipy import signal +import numpy as np + +from qkit.measure.unified_measurements import AnalysisTypeAdapter, MeasurementTypeAdapter, DataView, DataViewSet, DataReference +from qkit.analysis.numerical_derivative import SavgolNumericalDerivative + +class CritDetectionIV(AnalysisTypeAdapter): + """ + Analyzes critical points of IV/VI curves + + Analysis results are '(i/v)c_lower_j' and '(i/v)c_upper_j', with dimension lowered by 1 compared to i_j, v_j + Example: Getting Ic and Irt from a current bias 4 point measurement + -> set onWhat = "v", critVal = 5e-6 (whatever value properly cuts off random noise) + + """ + + _onWhat: Literal["i", "v", "di_dv", "dv_di"] + _critVal: float + _numderHelper: SavgolNumericalDerivative | None + + def __init__(self, onWhat: Literal["i", "v", "di_dv", "dv_di"], critVal: float, numderHelper: SavgolNumericalDerivative | None = None): + super().__init__() + self._onWhat = onWhat + self._critVal = critVal + self._numderHelper = numderHelper + if onWhat in ["di_dv", "dv_di"]: + assert isinstance(numderHelper, SavgolNumericalDerivative), "When using crit detection on derivative of data, SavgolNumericalDerivative needs to be passed aswell" + + @override + def perform_analysis(self, data: tuple['MeasurementTypeAdapter.GeneratedData', ...]) -> tuple['MeasurementTypeAdapter.GeneratedData', ...]: + parent_schema = tuple([element.descriptor for element in data]) + output_schema = self.expected_structure(parent_schema) + out = [] + flipBiasMeas = (parent_schema[0].name == "i_0") ^ (self._onWhat in ["v", "dvdi"]) + if self._onWhat in ["di_dv", "dv_di"]: + data = self._numderHelper.perform_analysis(data) + for ((dxdy, dydx), (x, y)) in zip(itertools.batched(output_schema, 2), itertools.batched(data, 2)): + out.append(dxdy.with_data(signal.savgol_filter(x.data, **self.savgol_kwargs)/signal.savgol_filter(y.data, **self.savgol_kwargs))) + out.append(dydx.with_data(signal.savgol_filter(y.data, **self.savgol_kwargs)/signal.savgol_filter(x.data, **self.savgol_kwargs))) + return tuple(out) + + def _crit_find_thresh(x_vals: np.ndarray, y_vals: np.ndarray, thresh: float = 1e-6) -> tuple[np.ndarray]: + """ + helper function for main threshold detection on y-vals, x-vals needed for sanity checking + data should be flipped & mirrored to ideally look like + ^ x_vals (.), y_vals (x), crits (o) + | .x + | . x + | . x + | . + 0 +------oxxxxxxxxxxxxxxxo-----> #idx + | x . + | x . + | x. + | .x + v + """ + # thresh detect + upper_idxs = np.argmax(np.logical_and(y_vals > thresh, x_vals > 0), axis=-1) + upper_idxs = np.where(np.any(np.logical_and(y_vals > thresh, x_vals > 0), axis=-1), upper_idxs, x_vals.shape[-1] - 1) # default to max if no tresh found + lower_idxs = y_vals.shape[-1] - 1 - np.argmax(np.flip(np.logical_and(y_vals > thresh, x_vals < 0), axis=-1), axis=-1) # flip because argmax returns first occurence + lower_idxs = np.where(np.any(np.logical_and(y_vals > thresh, x_vals < 0), axis=-1), lower_idxs, 0) + return upper_idxs, lower_idxs + + + @override + def expected_structure(self, parent_schema: tuple['MeasurementTypeAdapter.DataDescriptor', ...]) -> tuple['MeasurementTypeAdapter.DataDescriptor', ...]: + structure = [] + for i, bias in enumerate(parent_schema[::2]): + structure += [ + MeasurementTypeAdapter.DataDescriptor( + name=f"ic_upper_{i}" if self._onWhat in ["v", "dv_di"] else f"vc_upper_{i}", + unit=f"{bias.unit}", + axes=bias.axes[:-1], + category="analysis" + ), + MeasurementTypeAdapter.DataDescriptor( + name=f"ic_lower_{i}" if self._onWhat in ["v", "dv_di"] else f"vc_lower_{i}", + unit=f"{bias.unit}", + axes=bias.axes[:-1], + category="analysis" + ), + ] + return tuple(structure) + + @override + def default_views(self, parent_schema: tuple['MeasurementTypeAdapter.DataDescriptor', ...]) -> dict[str, DataView]: + return () diff --git a/src/qkit/analysis/numerical_derivative.py b/src/qkit/analysis/numerical_derivative.py index 2ad6d33d0..1df63ae1e 100644 --- a/src/qkit/analysis/numerical_derivative.py +++ b/src/qkit/analysis/numerical_derivative.py @@ -1,5 +1,5 @@ import itertools -from typing import override +from typing import override, Any from scipy import signal @@ -16,15 +16,15 @@ class SavgolNumericalDerivative(AnalysisTypeAdapter): Assumes that names are in the form of '[IV]_(?:b_)?_[0-9]' """ - _window_length: int - _polyorder: int - _derivative: int + savgol_kwargs: dict[str, Any] - def __init__(self, window_length: int = 15, polyorder: int = 3, derivative: int = 1): + def __init__(self, **savgol_kwargs): + """ + savgol_kwargs: + kwargs to pass to scipy.signal.savgol_filter alongside data, refer to scipy docs for more information + """ super().__init__() - self._window_length = window_length - self._polyorder = polyorder - self._derivative = derivative + self.savgol_kwargs = {"window_length": 15, "polyorder": 3, "deriv": 1} | savgol_kwargs @override def perform_analysis(self, data: tuple['MeasurementTypeAdapter.GeneratedData', ...]) -> tuple[ @@ -33,14 +33,8 @@ def perform_analysis(self, data: tuple['MeasurementTypeAdapter.GeneratedData', . output_schema = self.expected_structure(parent_schema) out = [] for ((dxdy, dydx), (x, y)) in zip(itertools.batched(output_schema, 2), itertools.batched(data, 2)): - out.append(dxdy.with_data( - signal.savgol_filter(x.data, window_length=self._window_length, polyorder=self._polyorder, deriv=self._derivative)\ - / signal.savgol_filter(y.data, window_length=self._window_length, polyorder=self._polyorder, deriv=self._derivative) - )) - out.append(dydx.with_data( - signal.savgol_filter(y.data, window_length=self._window_length, polyorder=self._polyorder, deriv=self._derivative)\ - / signal.savgol_filter(x.data, window_length=self._window_length, polyorder=self._polyorder, deriv=self._derivative) - )) + out.append(dxdy.with_data(signal.savgol_filter(x.data, **self.savgol_kwargs)/signal.savgol_filter(y.data, **self.savgol_kwargs))) + out.append(dydx.with_data(signal.savgol_filter(y.data, **self.savgol_kwargs)/signal.savgol_filter(x.data, **self.savgol_kwargs))) return tuple(out) @@ -52,13 +46,15 @@ def expected_structure(self, parent_schema: tuple['MeasurementTypeAdapter.DataDe structure += [ MeasurementTypeAdapter.DataDescriptor( name=f"d{x.name}_d{y.name}", - unit=f"{x.unit}_{y.unit}", - axes=x.axes + unit=f"{x.unit}/{y.unit}", + axes=x.axes, + category="analysis" ), MeasurementTypeAdapter.DataDescriptor( name=f"d{y.name}_d{x.name}", - unit=f"{y.unit}_{x.unit}", - axes=x.axes + unit=f"{y.unit}/{x.unit}", + axes=x.axes, + category="analysis" ) ] return tuple(structure) @@ -67,16 +63,26 @@ def expected_structure(self, parent_schema: tuple['MeasurementTypeAdapter.DataDe def default_views(self, parent_schema: tuple['MeasurementTypeAdapter.DataDescriptor', ...]) -> dict[str, DataView]: schema = self.expected_structure(parent_schema) variable_names = (schema[0].name.split('_')[0], schema[1].name.split('_')[0]) - return { - f'd{variable_names[0]}_d{variable_names[1]}': DataView( + return { # dx/dy + f'{variable_names[0]}_{variable_names[1]}': DataView( + view_params={ + "labels": (schema[0].axes[0].name, f'{variable_names[0]}_{variable_names[1]}'), + 'plot_style': 1, + 'markersize': 5 + }, view_sets=[ DataViewSet( x_path=DataReference(entry.axes[0].name,), y_path=DataReference(entry.name, category='analysis') ) for entry in schema[0::2] ] - ), - f'd{variable_names[1]}_d{variable_names[0]}': DataView( + ), # dy/dx + f'{variable_names[1]}_{variable_names[0]}': DataView( + view_params={ + "labels": (schema[1].axes[0].name, f'{variable_names[1]}_{variable_names[0]}'), + 'plot_style': 1, + 'markersize': 5 + }, view_sets=[ DataViewSet( x_path=DataReference(entry.axes[0].name), diff --git a/src/qkit/analysis/resonator_fitting.py b/src/qkit/analysis/resonator_fitting.py new file mode 100644 index 000000000..fe7c96c13 --- /dev/null +++ b/src/qkit/analysis/resonator_fitting.py @@ -0,0 +1,298 @@ +import numpy as np +from abc import ABC, abstractmethod +import scipy.optimize as spopt +import scipy.ndimage +import logging +from qkit.storage.store import Data as qkitData +from qkit.storage.hdf_dataset import hdf_dataset +from qkit.analysis.circle_fit.circle_fit_2019 import circuit + +class ResonatorFitBase(ABC): + """ + Defines the core functionality any fit function should offer, namely freq, amp & phase in, simulated freq, amp, phase + extracted data out. + Contrary to previous version, fit-functionality is completely decoupled from the h5 file format, this should be handled in the respective measurement script instead. + Complex IQ data + view should be created in measurement script by default as well. + + extract_data dict contains information like f_res, Qc, etc. and its (final) keys should be static accessible for each fit function class overriding this base implementation. + This allows preparing hdf datasets for them without them being calculated yet + """ + def __init__(self): + self.freq_fit: np.ndarray[float] = None + self.amp_fit: np.ndarray[float] = None + self.pha_fit: np.ndarray[float] = None + self.extract_data: dict[str, float] = {} + self.out_nop = 501 # number of points the output fits are plotted with + + @abstractmethod + def do_fit(self, freq: np.ndarray[float] = [0, 1], amp: np.ndarray[float] = None, pha: np.ndarray[float] = None): + logging.error("Somehow ran into abstract resonator fit base class fit-function") + self.freq_fit = np.linspace(np.min(freq), np.max(freq), self.out_nop) + self.amp_fit = np.ones(self.out_nop) + self.pha_fit = np.zeros(self.out_nop) + self.extract_data = {} + return self + + +def autofit_file(file: qkitData | str, fit_func: ResonatorFitBase, f_min: float = None, f_max: float = None): + # ensure file is qkit.storage.store.Data + if type(file) == str: + try: + file = qkitData(file) # assume path + except: + logging.error("'{}' is not a valid path".format(file)) + raise AttributeError + # check if file has freq, amp, phase + try: + freq_data = np.array(file.data.frequency) + amp_data = np.array(file.data.amplitude) + pha_data = np.array(file.data.phase) + except: + logging.error("Could not access frequency, amplitude and/or phase. Is this really a VNA measurement?") + raise AttributeError + # do not allow overriding existing fits + try: + dummy = file.analysis._fit_frequency + logging.error("File already contains fit data. Please access and store everything manually.") + raise ValueError + except: + pass + # add frequency coordinate + f_min = -np.inf if f_min is None else f_min + f_max = np.inf if f_max is None else f_max + selly = (freq_data >= f_min) & (freq_data <= f_max) + file_freq_fit = file.add_coordinate("_fit_frequency", unit="Hz", folder="analysis") + file_freq_fit.add(np.linspace(np.min(freq_data[selly]), np.max(freq_data[selly]), fit_func.out_nop)) + file_extracts: dict[str, hdf_dataset] = {} + + if len(amp_data.shape) == 1: # 1D measurement + # add result datastores + file_amp_fit = file.add_value_vector("_fit_amplitude", x=file_freq_fit, unit="arb. unit", folder="analysis") + file_pha_fit = file.add_value_vector("_fit_phase", x=file_freq_fit, unit="rad", folder="analysis") + file_real_fit = file.add_value_vector("_fit_real", x=file_freq_fit, unit="", folder="analysis") + file_imag_fit = file.add_value_vector("_fit_imag", x=file_freq_fit, unit="", folder="analysis") + for key in fit_func.extract_data.keys(): + file_extracts[key] = file.add_coordinate("fit_" + key, folder="analysis") + # actual fitting + fit_func.do_fit(freq_data[selly], amp_data[selly], pha_data[selly]) + # fill entries + file_amp_fit.append(fit_func.amp_fit) + file_pha_fit.append(fit_func.pha_fit) + file_real_fit.append(fit_func.amp_fit*np.cos(fit_func.pha_fit)) + file_imag_fit.append(fit_func.amp_fit*np.sin(fit_func.pha_fit)) + for key, val in fit_func.extract_data.items(): + file_extracts[key].add(np.array([val])) + + elif len(amp_data.shape) == 2: # 2D measurement + # add result datastores + file_amp_fit = file.add_value_matrix("_fit_amplitude", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file_freq_fit, unit="arb. unit", folder="analysis") + file_pha_fit = file.add_value_matrix("_fit_phase", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file_freq_fit, unit="rad", folder="analysis") + file_real_fit = file.add_value_matrix("_fit_real", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file_freq_fit, unit="", folder="analysis") + file_imag_fit = file.add_value_matrix("_fit_imag", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file_freq_fit, unit="", folder="analysis") + buffer_extracts: dict[str, np.ndarray] = {} + for key in fit_func.extract_data.keys(): + file_extracts[key] = file.add_value_vector("fit_" + key, x=file.get_dataset(file.data.amplitude.x_ds_url), folder="analysis") + buffer_extracts[key] = np.full(file[file.data.amplitude.x_ds_url].shape[0], np.nan) + for ix in range(amp_data.shape[0]): + # actual fitting + fit_func.do_fit(freq_data[selly], amp_data[ix, selly], pha_data[ix, selly]) + # fill entries + file_amp_fit.append(fit_func.amp_fit) + file_pha_fit.append(fit_func.pha_fit) + file_real_fit.append(fit_func.amp_fit*np.cos(fit_func.pha_fit)) + file_imag_fit.append(fit_func.amp_fit*np.sin(fit_func.pha_fit)) + for key, val in fit_func.extract_data.items(): + buffer_extracts[key][ix] = val + file_extracts[key].append(buffer_extracts[key], reset=True) + + elif len(amp_data.shape) == 3: # 3D measurement + # add result datastores + file_amp_fit = file.add_value_box("_fit_amplitude", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file.get_dataset(file.data.amplitude.y_ds_url), z=file_freq_fit, unit="arb. unit", folder="analysis") + file_pha_fit = file.add_value_box("_fit_phase", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file.get_dataset(file.data.amplitude.y_ds_url), z=file_freq_fit, unit="rad", folder="analysis") + file_real_fit = file.add_value_box("_fit_real", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file.get_dataset(file.data.amplitude.y_ds_url), z=file_freq_fit, unit="", folder="analysis") + file_imag_fit = file.add_value_box("_fit_imag", x=file.get_dataset(file.data.amplitude.x_ds_url), y=file.get_dataset(file.data.amplitude.y_ds_url), z=file_freq_fit, unit="", folder="analysis") + for key in fit_func.extract_data.keys(): + file_extracts[key] = file.add_value_matrix("fit_" + key, x=file.get_dataset(file.data.amplitude.x_ds_url), y=file.get_dataset(file.data.amplitude.y_ds_url), folder="analysis") + buffer_extracts: dict[str, np.ndarray] = {} + import itertools # alternative to nested loops that allows easier breaking when fill is reached + for ix, iy in itertools.product(range(amp_data.shape[0]), range(amp_data.shape[1])): + if iy == 0: + for key in fit_func.extract_data.keys(): + buffer_extracts[key] = np.full(file[file.data.amplitude.y_ds_url].shape[0], np.nan) + # actual fitting + fit_func.do_fit(freq_data[selly], amp_data[ix, iy, selly], pha_data[ix, iy, selly]) + # fill entries + file_amp_fit.append(fit_func.amp_fit) + file_pha_fit.append(fit_func.pha_fit) + file_real_fit.append(fit_func.amp_fit*np.cos(fit_func.pha_fit)) + file_imag_fit.append(fit_func.amp_fit*np.sin(fit_func.pha_fit)) + for key, val in fit_func.extract_data.items(): + buffer_extracts[key][iy] = val + file_extracts[key].append(buffer_extracts[key], reset=(iy != 0)) + if (ix + 1 == file.data.amplitude.fill[0]) & (iy + 1 == file.data.amplitude.fill[1]): + # The measurement stopped somewhere in the middle of a xy-sweep, no more data to analyze + break + if iy + 1 == amp_data.shape[1]: + file_amp_fit.next_matrix() + file_pha_fit.next_matrix() + file_real_fit.next_matrix() + file_imag_fit.next_matrix() + else: + logging.error("What?") + raise NotImplementedError() + + # add/update views + file["/entry/views/IQ"].attrs.create("xy_1", "/entry/analysis0/_fit_real:/entry/analysis0/_fit_imag") + file["/entry/views/IQ"].attrs.create("xy_1_filter", "None") + file["/entry/views/IQ"].attrs.create("overlays", 1) + file.add_view("AmplitudeFit", file.data.frequency, file.data.amplitude).add(file_freq_fit, file_amp_fit) + file.add_view("PhaseFit", file.data.frequency, file.data.phase).add(file_freq_fit, file_pha_fit) + + +class CircleFit(ResonatorFitBase): + def __init__(self, n_ports: int, fit_delay_max_iterations: int = 5, fixed_delay: float = None, isolation: int = 15, guesses: list[float] = None): + super().__init__() + self.extract_data = { + 'Qc': np.nan, + 'Qc_max': np.nan, + 'Qc_min': np.nan, + 'Qc_no_dia_corr': np.nan, + 'Qi': np.nan, + 'Qi_err': np.nan, + 'Qi_max': np.nan, + 'Qi_min': np.nan, + 'Qi_no_dia_corr': np.nan, + 'Qi_no_dia_corr_err': np.nan, + 'Ql': np.nan, + 'Ql_err': np.nan, + 'a': np.nan, + 'absQc_err': np.nan, + 'alpha': np.nan, + 'chi_square': np.nan, + 'delay': np.nan, + 'delay_remaining': np.nan, + 'fano_b': np.nan, + 'fr': np.nan, + 'fr_err': np.nan, + 'phi': np.nan, + 'phi_err': np.nan, + 'theta': np.nan + } + # fit parameters + self.n_ports = n_ports # 1: reflection port, 2: notch port + self.fit_delay_max_iterations = fit_delay_max_iterations + self.fixed_delay = fixed_delay + self.isolation = isolation + self.guesses = guesses # init guess (f_res, Ql, delay) for phase fit + + def do_fit(self, freq: np.ndarray[float], amp: np.ndarray[float], pha: np.ndarray[float]): + # use external circlefit + my_circuit = circuit.reflection_port(freq, amp*np.exp(1j*pha)) if self.n_ports == 1 else circuit.notch_port(freq, amp*np.exp(1j*pha)) + my_circuit.fit_delay_max_iterations = self.fit_delay_max_iterations + my_circuit.autofit(fixed_delay=self.fixed_delay, isolation=self.isolation) + + # update to be read parameters + self.extract_data = { + 'Qc': np.nan, + 'Qc_max': np.nan, + 'Qc_min': np.nan, + 'Qc_no_dia_corr': np.nan, + 'Qi': np.nan, + 'Qi_err': np.nan, + 'Qi_max': np.nan, + 'Qi_min': np.nan, + 'Qi_no_dia_corr': np.nan, + 'Qi_no_dia_corr_err': np.nan, + 'Ql': np.nan, + 'Ql_err': np.nan, + 'a': np.nan, + 'absQc_err': np.nan, + 'alpha': np.nan, + 'chi_square': np.nan, + 'delay': np.nan, + 'delay_remaining': np.nan, + 'fano_b': np.nan, + 'fr': np.nan, + 'fr_err': np.nan, + 'phi': np.nan, + 'phi_err': np.nan, + 'theta': np.nan + } + self.extract_data.update(my_circuit.fitresults) + self.freq_fit = np.linspace(np.min(freq), np.max(freq), self.out_nop) + z_sim = my_circuit.Sij(self.freq_fit, my_circuit.fr, my_circuit.Ql, my_circuit.Qc, my_circuit.phi, my_circuit.a, my_circuit.alpha, my_circuit.delay) + self.amp_fit = np.abs(z_sim) + self.pha_fit = np.angle(z_sim) + + return self + + +class LorentzianFit(ResonatorFitBase): + def __init__(self): + super().__init__() + self.extract_data = { + "f_res": None, + "f_res_err": None, + "Ql": None, + "Ql_err": None, + # TODO + } + + def do_fit(self, freq, amp, pha): + # TODO + logging.error("Lorentzian Fit not yet implemented. Feel free to adapt the respective maths yourself based on old resonator class") + self.extract_data["f_res"] = 1 + self.extract_data["f_res_err"] = 0.5 + self.extract_data["Ql"] = 1 + self.extract_data["Ql_err"] = 0.5 + return super().do_fit(freq, amp, pha) + + +class SkewedLorentzianFit(ResonatorFitBase): + def __init__(self): + super().__init__() + self.extract_data = { + "f_res": None, + "f_res_err": None, + "Ql": None, + "Ql_err": None, + # TODO + } + + def do_fit(self, freq, amp, pha): + # TODO + logging.error("Lorentzian Fit not yet implemented. Feel free to adapt the respective maths yourself based on old resonator class") + self.extract_data["f_res"] = 1 + self.extract_data["f_res_err"] = 0.5 + self.extract_data["Qc"] = 1 + self.extract_data["Qc_err"] = 0.5 + return super().do_fit(freq, amp, pha) + + +class FanoFit(ResonatorFitBase): + def __init__(self): + super().__init__() + self.extract_data = { + "f_res": None, + "f_res_err": None, + "Qc": None, + "Qc_err": None, + # TODO + } + + def do_fit(self, freq, amp, pha): + # TODO + logging.error("Lorentzian Fit not yet implemented. Feel free to adapt the respective maths yourself based on old resonator class") + self.extract_data["f_res"] = 1 + self.extract_data["f_res_err"] = 0.5 + self.extract_data["Qc"] = 1 + self.extract_data["Qc_err"] = 0.5 + return super().do_fit(freq, amp, pha) + + +FitNames: dict[str, ResonatorFitBase] = { + 'lorentzian': LorentzianFit, + 'skewed_lorentzian': SkewedLorentzianFit, + 'circle_fit_reflection': CircleFit, + 'circle_fit_notch': CircleFit, + 'fano': FanoFit, +} \ No newline at end of file diff --git a/src/qkit/drivers/ADwinProII_Base.py b/src/qkit/drivers/ADwinProII_Base.py new file mode 100644 index 000000000..13eb24d0e --- /dev/null +++ b/src/qkit/drivers/ADwinProII_Base.py @@ -0,0 +1,65 @@ +# ADwinProII_SMU.py driver for using ADwin Pro II as an SMU +# Author: Marius Frohn (uzrfo@student.kit.edu) +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging, time +from qkit.core.instrument_base import Instrument +import numpy as np +import ADwin +from typing import Callable + +class ADwinProII_Base(Instrument): + """ + The ADwin Pro II by Jäger Computergesteuerte Messtechnik GmbH provides slots for various + ADC/DAC or data in/out cards as well as an accessable FPGA-like processor for control. Our + group's device currently has a 4-channel 16-bit ADC, a 50kHz filtered 8-channel 16-bit ADC + and an 8-channel 16-bit DAC card. Getting the device running requires some steps outlined + below: + + - Selecting drivers from https://www.adwin.de/de/download/download.html. Installing the + full software package is recommended for potential debugging, analyzing timings, etc. + - 'pip install ADwin' to the qkit virtual environment. (Note: despite being present, the + driver could not correctly read out the ADwin install directory from the windows registry + on my machine. To fix this, I commented out the entire try/except block around line ~100 + in *venv*/site-packages/ADwin.py and hardcoded 'self.ADwindir = *adwin_install_path*') + - The tool *adwin_install_path*/Tools/ADconfig/ADconfig.exe finds ADwin devices in the + network and allows e.g. assigning their MAC-addresses to a fixed IP-address. This should + already be done for this device. In either case one still needs to register it as an + adwin-device on the measurement pc and assign it an adwin-device-number (default: 1) + - With *adwin_install_path*/Tools/Test/ADpro/ADpro.exe one can check if communication with + device is now possible & what the id-numbers of the used cards are (should be 1, 2, 3) + - For usage one needs to provide a bootloader and to be run processes. The bootloader can + be found under *adwin_install_path*/ADwin12.btl for the T12 processor type. A process + is defined in a compiled .TCx (x: process number) file. Tools and documentation for + creating, editing, testing and compiling are provided in the ADwin software package. + """ + @staticmethod + def _to_volt(x: np.ndarray) -> np.ndarray: + return (np.array(x, dtype=np.int32).astype(np.float64) - 0x8000) * 20.0 / 0x10000 + @staticmethod + def _to_reg(x: np.ndarray) -> np.ndarray: + x_reg = np.round((x + 10.0)/20.0*0x10000) + x_reg = np.where(x_reg == -1, 0, x_reg) + return np.where(x_reg == 0x10000, 0xFFFF, x_reg) + def __init__(self, name: str, btl_path: str = "C:\\ADwin\\ADwin12.btl", device_num: int = 1): + """ + Use via e.g. + adw_base = qkit.instruments.create("adw_base", "ADwinProII_Base", *kwargs*) + """ + Instrument.__init__(self, name, tags=["physical"]) + self.adw = ADwin.ADwin(device_num) + self.adw.Boot(btl_path) + logging.info("Booted ADwinProII with processor {}".format(self.adw.Processor_Type())) \ No newline at end of file diff --git a/src/qkit/drivers/ADwinProII_DoubleSweep.py b/src/qkit/drivers/ADwinProII_DoubleSweep.py new file mode 100644 index 000000000..6c44e11b9 --- /dev/null +++ b/src/qkit/drivers/ADwinProII_DoubleSweep.py @@ -0,0 +1,69 @@ +# ADwinProII_SMU.py driver for using ADwin Pro II as an SMU +# Author: Marius Frohn (uzrfo@student.kit.edu) +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging, time +from qkit.core.instrument_base import Instrument +import numpy as np +from typing import Callable +from qkit.drivers.ADwinProII_Base import ADwinProII_Base + +class ADwinProII_DoubleSweep(Instrument): + def __init__(self, name: str, adw_base: ADwinProII_Base, proc_path="C:\\ADwin\\SMU_DoubleSweep.TC3"): + Instrument.__init__(self, name, tags=["virtual"]) + self.proc_num = int(proc_path[-1]) + self.adw = adw_base.adw + self.dac1_channel = 2 + self.dac2_channel = 3 + self.adc1_channel = 2 + self.adc2_channel = 3 + self.adc1_card = 1 # 1 (normal), 2 (filtered) + self.adc2_card = 1 # 1 (normal), 2 (filtered) + self.delay = 2000 # NOTE: int describing to be slept time inbetween dac set and adc get. + # slept time <=> self.delay * 1e-8 s, set/get commands take ~2e-6 s per point on their own always + self.adw.Load_Process(proc_path) + logging.info("Loaded process '{}' for ADwinProII's FPGA".format(proc_path)) + + def double_sweep(self, v1: np.ndarray, v2: np.ndarray, update_sweep_device: Callable[[], bool] = lambda: True): + if update_sweep_device(): + self.set_sweep_parameters(v1, v2) + self.adw.Start_Process(self.proc_num) + while self.adw.Process_Status(self.proc_num): + time.sleep(0.1) + nops = self.adw.Get_Par(41) + return ADwinProII_Base._to_volt(self.adw.GetData_Long(3, 1, nops)), ADwinProII_Base._to_volt(self.adw.GetData_Long(4, 1, nops)), ADwinProII_Base._to_volt(self.adw.GetData_Long(5, 1, nops)), ADwinProII_Base._to_volt(self.adw.GetData_Long(6, 1, nops)) + + def set_sweep_parameters(self, v1: np.ndarray, v2: np.ndarray): + # Skip checks here; if you use this, you'll know what you're doing because you're me + self.adw.Set_Par(41, len(v1)) + self.adw.Set_Par(42, self.dac1_channel) + self.adw.Set_Par(43, self.dac2_channel) + self.adw.Set_Par(44, self.adc1_channel) + self.adw.Set_Par(45, self.adc1_card) + self.adw.Set_Par(46, self.adc2_channel) + self.adw.Set_Par(47, self.adc2_card) + self.adw.Set_Par(48, int(self.delay)) + self.adw.SetData_Long(ADwinProII_Base._to_reg(v1), 3, 1, len(v1)) + self.adw.SetData_Long(ADwinProII_Base._to_reg(v2), 4, 1, len(v1)) + +class InitHandler(object): + def __init__(self): + self.not_called_yet = True + def __call__(self): + if self.not_called_yet: + self.not_called_yet = False + return True + return False \ No newline at end of file diff --git a/src/qkit/drivers/ADwinProII_SMU.py b/src/qkit/drivers/ADwinProII_SMU.py deleted file mode 100644 index bb9c9313b..000000000 --- a/src/qkit/drivers/ADwinProII_SMU.py +++ /dev/null @@ -1,207 +0,0 @@ -# ADwinProII_SMU.py driver for using ADwin Pro II as an SMU -# Author: Marius Frohn (uzrfo@student.kit.edu) -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA - -import logging, time -from qkit.core.instrument_base import Instrument -import numpy as np -import ADwin - -class ADwinProII_SMU(Instrument): - """ - The ADwin Pro II by Jäger Computergesteuerte Messtechnik GmbH provides slots for various - ADC/DAC or data in/out cards as well as an accessable FPGA-like processor for control. Our - group's device currently has a 4-channel 16-bit ADC, a 50kHz filtered 8-channel 16-bit ADC - and an 8-channel 16-bit DAC card. Getting the device running requires some steps outlined - below: - - - Selecting drivers from https://www.adwin.de/de/download/download.html. Installing the - full software package is recommended for potential debugging, analyzing timings, etc. - - 'pip install ADwin' to the qkit virtual environment. (Note: despite being present, the - driver could not correctly read out the ADwin install directory from the windows registry - on my machine. To fix this, I commented out the entire try/except block around line ~100 - in *venv*/site-packages/ADwin.py and hardcoded 'self.ADwindir = *adwin_install_path*') - - The tool *adwin_install_path*/Tools/ADconfig/ADconfig.exe finds ADwin devices in the - network and allows e.g. assigning their MAC-addresses to a fixed IP-address. This should - already be done for this device. In either case one still needs to register it as an - adwin-device on the measurement pc and assign it an adwin-device-number (default: 1) - - With *adwin_install_path*/Tools/Test/ADpro/ADpro.exe one can check if communication with - device is now possible & what the id-numbers of the used cards are (should be 1, 2, 3) - - For usage one needs to provide a bootloader and to be run processes. The bootloader can - be found under *adwin_install_path*/ADwin12.btl for the T12 processor type. This driver - is in particular designed around 2 processes which configure the FPGA as a default SMU. - If you're still reading this your problem can't be solved any other way but with this - box, so at this point I wish you good luck with whatever is troubling you. The already - compiled processes can be found as .TCx-files alongside the other files on the exchange - under exchange/Devices/ADwinProII/ and should be provided somewhere on the measurement pc. - - SMU_SingleSetGet.TC1 enables ADC/DAC access by checking in regular intervals if a - respective action is requested. SMU_SweepLoop.TC2 is a high-priority process for sweeping - over a given array and reading back a given signal. The speed of one step is mostly limited - by the ADC/DAC access commands taking up to ~1600 CPU cycles = ~1.6e-6 s. The source code - for these processes is provided in the .bas-files. Alternatively one can program the FPGA - to do whatever one wants but then this driver obviously will no longer work with it. - """ - # static stuff - ADC_CARD_OFFSET = 10 - ADC_FILTER_CARD_OFFSET = 20 - DAC_CARD_OFFSET = 30 - @staticmethod - def _to_volt(x: np.ndarray) -> np.ndarray: - return (x.astype(np.float64) - 0x8000) * 20.0 / 0x10000 - @staticmethod - def _to_reg(x: np.ndarray) -> np.ndarray: - x_reg = np.round((x + 10.0)/20.0*0x10000) - x_reg = np.where(x_reg == -1, 0, x_reg) - return np.where(x_reg == 0x10000, 0xFFFF, x_reg) - - def __init__(self, name: str, btl_path: str = "C:\\ADwin\\ADwin12.btl", proc1_path: str = "C:\\ADwin\\SMU_SingleSetGet.TC1", proc2_path: str = "C:\\ADwin\\SMU_SweepLoop.TC2", device_num: int = 1): - """ - This is a driver for using the ADwinProII as an SMU by using the provided processes. - - Usage: - Initialize with - = qkit.instruments.create('', 'ADwinProII_SMU', **kwargs) - - Keyword arguments: - btl_path: path to bootloader file (default: "C:\\ADwin\\ADwin12.btl") - proc1_path: path to process file containing low-priority ADC/DAC access - (default: "C:\\ADwin\\SMU_SingleSetGet.TC1") - proc2_path: path to process file containing high-priority array sweep - (default: "C:\\ADwin\\SMU_SweepLoop.TC1") - device_num: number assigned to device on your pc by ADconfig (default: 1) - """ - # qkit stuff - Instrument.__init__(self, name, tags=['physical']) - self.add_parameter("adc", type = float, flags = Instrument.FLAG_GET, channels = (1, 4), minval = -10.0, maxval=10.0, units = "V") - self.add_parameter("adc_filtered", type = float, flags = Instrument.FLAG_GET, channels = (1, 8), minval = -10.0, maxval=10.0, units = "V") - self.add_parameter("dac", type = float, flags = Instrument.FLAG_SET, channels = (1, 8), minval = -10.0, maxval = 10.0, units = "V") - # Boot & Load processes - self.adw = ADwin.ADwin(device_num) - self.adw.Boot(btl_path) - logging.info("Booted ADwinProII with processor {}".format(self.adw.Processor_Type())) - self.adw.Load_Process(proc1_path) - self.adw.Start_Process(1) - logging.info("Loaded SMU process 1 for ADwinProII's FPGA") - self.adw.Load_Process(proc2_path) - logging.info("Loaded SMU process 2 for ADwinProII's FPGA") - # Sweep parameters - self.dac_channel = 1 # 1...8 - self.adc_channel = 1 # 1...4 or 1...8 depending on card - self.adc_card = 1 # 1 (normal), 2 (filtered) - self._sweep_channels = (self.dac_channel, self.adc_channel) # for qkit compability - self.delay = 2000 # NOTE: int describing to be slept time inbetween dac set and adc get. - # slept time <=> self.delay * 1e-8 s, set/get commands take ~2e-6 s per point on their own always - - # functionality - def do_get_adc(self, channel: int) -> float: - """ - This is a very nice docstring - """ - if channel in range(1, 5): - self.adw.Set_Par(channel + self.ADC_CARD_OFFSET, 1) - for i in range(5000): - if self.adw.Get_Par(channel + self.ADC_CARD_OFFSET) == 0: - return self.adw.Get_FPar(channel + self.ADC_CARD_OFFSET) - time.sleep(0.001) - raise TimeoutError("ADwin Pro II did not confirm ADC read within 5s") - else: - raise ValueError("Channel must be 1...4") - - def do_get_adc_filtered(self, channel: int) -> float: - """ - This is a very nice docstring - """ - if channel in range(1, 9): - self.adw.Set_Par(channel + self.ADC_FILTER_CARD_OFFSET, 1) - for i in range(5000): - if self.adw.Get_Par(channel + self.ADC_FILTER_CARD_OFFSET) == 0: - return self.adw.Get_FPar(channel + self.ADC_FILTER_CARD_OFFSET) - time.sleep(0.001) - raise TimeoutError("ADwin Pro II did not confirm ADC read within 5s") - else: - raise ValueError("Channel must be 1...8") - - def do_set_dac(self, volt: float, channel: int) -> None: - """ - This is a very nice docstring - """ - if channel in range(1, 9): - self.adw.Set_FPar(channel + self.DAC_CARD_OFFSET, volt) - self.adw.Set_Par(channel + self.DAC_CARD_OFFSET, 1) - else: - raise ValueError("Channel must be 1...8") - - def set_sweep_parameters(self, sweep: np.ndarray) -> None: - """ - Check to be swept parameter settings and write them to the device. - Sweep: array describing sweep values by [start, stop, step] - - Max 2^16 points due to (arbitary) memory limitation on device (More - values would not make sense anyways though since the ADC/DACs have - only 16-bit resolution) - Voltage range is limited to +-10V for both ADC/DACs - Look at device for channel number limitations - """ - # self.dac_channel, self.adc_channel = self._sweep_channels # NOTE: keep this line? potentially causes more problems with qkit defaults not fitting for this device than the feature is worth - set_array = np.arange(sweep[0], sweep[1] + sweep[2]/2, sweep[2]) - # Check values - if len(set_array) > 0x1000: - raise ValueError("Sweep size of 16 bit ADC/DACs limited to 2^16 values") - if np.any(np.abs(set_array) > 10.0): - raise ValueError("Sweep array contains values outside +-10V range") - if not self.dac_channel in range(1, 9): - raise ValueError("DAC channel must be 1...8") - if not self.adc_card in [1, 2]: - raise ValueError("ADC card must be 1 (normal), 2 (50kHz filtered)") - if self.adc_card == 1 and not self.adc_channel in range(1, 5): - raise ValueError("ADC channel must be 1...4") - if self.adc_card == 2 and not self.adc_channel in range(1, 9): - raise ValueError("Filtered ADC channel must be 1...8") - # Set values - self.adw.Set_Par(1, len(set_array)) - self.adw.Set_Par(2, self.dac_channel) - self.adw.Set_Par(3, self.adc_channel) - self.adw.Set_Par(4, self.adc_card) - self.adw.Set_Par(5, int(self.delay)) - self.adw.SetData_Long(self._to_reg(set_array), 1, 1, len(set_array)) - - def get_tracedata(self) -> tuple[np.ndarray]: - """ - Starts a sweep with parameters currently set on device and returns - set_values_array, measured_values_array. Sorting by what is I and V - is being handled by overlaying virtual_tunnel_electronic - """ - # Sweep - self.adw.Start_Process(2) - while self.adw.Process_Status(2): - time.sleep(0.1) - # Read result - return self._to_volt(self.adw.GetData_Long(1, 1, self.adw.Get_Par(1))), self._to_volt(self.adw.GetData_Long(2, 1, self.adw.Get_Par(1))) - - # qkit SMU compability - def set_sweep_mode(self, mode: int = 0): - if mode != 0: - print("ADwinProII only has voltage ADC/DACs, only VV-mode 0 supported") - def get_sweep_mode(self) -> int: - return 0 - def get_sweep_channels(self) -> tuple[int]: - return (self.dac_channel, self.adc_channel) - def set_status(self, *args, **kwargs) -> None: - pass # ADC/DACs are always responsive - def reset(self): - for i in range(1, 9): - self.do_set_dac(0.0, i) diff --git a/src/qkit/drivers/ADwinProII_SingleSetGet.py b/src/qkit/drivers/ADwinProII_SingleSetGet.py new file mode 100644 index 000000000..ee8080e84 --- /dev/null +++ b/src/qkit/drivers/ADwinProII_SingleSetGet.py @@ -0,0 +1,77 @@ +# ADwinProII_SMU.py driver for using ADwin Pro II as an SMU +# Author: Marius Frohn (uzrfo@student.kit.edu) +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging, time +from qkit.core.instrument_base import Instrument +from qkit.drivers.ADwinProII_Base import ADwinProII_Base + +class ADwinProII_SingleSetGet(Instrument): + """ + To be used with the FPGA process 'SMU_SingleSetGet.TC1', enabling single set/get commands + for the ADC/DACs. + """ + # static stuff + ADC_CARD_OFFSET = 10 + ADC_FILTER_CARD_OFFSET = 20 + DAC_CARD_OFFSET = 30 + def __init__(self, name: str, adw_base: ADwinProII_Base, proc_path="C:\\ADwin\\SMU_SingleSetGet.TC1"): + Instrument.__init__(self, name, tags=["virtual"]) + self.add_parameter("adc", type = float, flags = Instrument.FLAG_GET, channels = (1, 4), minval = -10.0, maxval = 10.0, units = "V") + self.add_parameter("adc_filtered", type = float, flags = Instrument.FLAG_GET, channels = (1, 8), minval = -10.0, maxval = 10.0, units = "V") + self.add_parameter("dac", type = float, flags = Instrument.FLAG_SET, channels = (1, 8), minval = -10.0, maxval = 10.0, units = "V") + self.adw = adw_base.adw + self.adw.Load_Process(proc_path) + self.adw.Start_Process(int(proc_path[-1])) + logging.info("Loaded process '{}' for ADwinProII's FPGA".format(proc_path)) + def do_get_adc(self, channel: int) -> float: + """ + This is a very nice docstring + """ + if channel in range(1, 5): + self.adw.Set_Par(channel + self.ADC_CARD_OFFSET, 1) + for i in range(500): + if self.adw.Get_Par(channel + self.ADC_CARD_OFFSET) == 0: + return self.adw.Get_FPar(channel + self.ADC_CARD_OFFSET) + time.sleep(0.01) + raise TimeoutError("ADwin Pro II did not confirm ADC read within 5s") + else: + raise ValueError("Channel must be 1...4") + def do_get_adc_filtered(self, channel: int) -> float: + """ + This is a very nice docstring + """ + if channel in range(1, 9): + self.adw.Set_Par(channel + self.ADC_FILTER_CARD_OFFSET, 1) + for i in range(5000): + if self.adw.Get_Par(channel + self.ADC_FILTER_CARD_OFFSET) == 0: + return self.adw.Get_FPar(channel + self.ADC_FILTER_CARD_OFFSET) + time.sleep(0.001) + raise TimeoutError("ADwin Pro II did not confirm ADC read within 5s") + else: + raise ValueError("Channel must be 1...8") + def do_set_dac(self, volt: float, channel: int) -> None: + """ + This is a very nice docstring + """ + if channel in range(1, 9): + self.adw.Set_FPar(channel + self.DAC_CARD_OFFSET, volt) + self.adw.Set_Par(channel + self.DAC_CARD_OFFSET, 1) + else: + raise ValueError("Channel must be 1...8") + def reset(self): + for i in range(1, 9): + self.do_set_dac(0.0, i) \ No newline at end of file diff --git a/src/qkit/drivers/ADwinProII_SweepLoop.py b/src/qkit/drivers/ADwinProII_SweepLoop.py new file mode 100644 index 000000000..ac376c349 --- /dev/null +++ b/src/qkit/drivers/ADwinProII_SweepLoop.py @@ -0,0 +1,94 @@ +# ADwinProII_SMU.py driver for using ADwin Pro II as an SMU +# Author: Marius Frohn (uzrfo@student.kit.edu) +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +import logging, time +from qkit.core.instrument_base import Instrument +import numpy as np +from qkit.drivers.ADwinProII_Base import ADwinProII_Base + +class ADwinProII_SweepLoop(Instrument): + """ + To be used with the FPGA process 'SMU_SweepLoop.TC2', enabling SMU functionality for + making qkit transport script DC sweeps. + """ + def __init__(self, name: str, adw_base: ADwinProII_Base, proc_path="C:\\ADwin\\SMU_SweepLoop.TC2"): + Instrument.__init__(self, name, tags=["virtual"]) + self.proc_num = int(proc_path[-1]) + self.adw = adw_base.adw + self.adw.Load_Process(proc_path) + logging.info("Loaded process '{}' for ADwinProII's FPGA".format(proc_path)) + self.dac_channel = 1 # 1...8 + self.adc_channel = 1 # 1...4 or 1...8 depending on card + self.adc_card = 1 # 1 (normal), 2 (filtered) + self._sweep_channels = (self.dac_channel, self.adc_channel) # for qkit compability + self.delay = 2000 # NOTE: int describing to be slept time inbetween dac set and adc get. + # slept time <=> self.delay * 1e-8 s, set/get commands take ~2e-6 s per point on their own always + def set_sweep_parameters(self, sweep: np.ndarray) -> None: + """ + Check to be swept parameter settings and write them to the device. + Sweep: array describing sweep values by [start, stop, step] + + Max 2^16 points due to (arbitary) memory limitation on device (More + values would not make sense anyways though since the ADC/DACs have + only 16-bit resolution) + Voltage range is limited to +-10V for both ADC/DACs + Look at device for channel number limitations + """ + # self.dac_channel, self.adc_channel = self._sweep_channels # NOTE: keep this line? potentially causes more problems with qkit defaults not fitting for this device than the feature is worth + set_array = np.arange(sweep[0], sweep[1] + sweep[2]/2, sweep[2]) + # Check values + if len(set_array) > 0x1000: + raise ValueError("Sweep size of 16 bit ADC/DACs limited to 2^16 values") + if np.any(np.abs(set_array) > 10.0): + raise ValueError("Sweep array contains values outside +-10V range") + if not self.dac_channel in range(1, 9): + raise ValueError("DAC channel must be 1...8") + if not self.adc_card in [1, 2]: + raise ValueError("ADC card must be 1 (normal), 2 (50kHz filtered)") + if self.adc_card == 1 and not self.adc_channel in range(1, 5): + raise ValueError("ADC channel must be 1...4") + if self.adc_card == 2 and not self.adc_channel in range(1, 9): + raise ValueError("Filtered ADC channel must be 1...8") + # Set values + self.adw.Set_Par(1, len(set_array)) + self.adw.Set_Par(2, self.dac_channel) + self.adw.Set_Par(3, self.adc_channel) + self.adw.Set_Par(4, self.adc_card) + self.adw.Set_Par(5, int(self.delay)) + self.adw.SetData_Long(ADwinProII_Base._to_reg(set_array), 1, 1, len(set_array)) + def get_tracedata(self) -> tuple[np.ndarray]: + """ + Starts a sweep with parameters currently set on device and returns + set_values_array, measured_values_array. Sorting by what is I and V + is being handled by overlaying virtual_tunnel_electronic + """ + # Sweep + self.adw.Start_Process(self.proc_num) + while self.adw.Process_Status(self.proc_num): + time.sleep(0.1) + # Read result + return ADwinProII_Base._to_volt(self.adw.GetData_Long(1, 1, self.adw.Get_Par(1))), ADwinProII_Base._to_volt(self.adw.GetData_Long(2, 1, self.adw.Get_Par(1))) + # qkit SMU compability + def set_sweep_mode(self, mode: int = 0): + if mode != 0: + logging.error("ADwinProII only has voltage ADC/DACs, only VV-mode 0 supported") + def get_sweep_mode(self) -> int: + return 0 + def get_sweep_channels(self) -> tuple[int]: + return (self.dac_channel, self.adc_channel) + def set_status(self, *args, **kwargs) -> None: + pass # ADC/DACs are always responsive \ No newline at end of file diff --git a/src/qkit/drivers/AbstractIVDevice.py b/src/qkit/drivers/AbstractIVDevice.py index 56876db77..02133d990 100644 --- a/src/qkit/drivers/AbstractIVDevice.py +++ b/src/qkit/drivers/AbstractIVDevice.py @@ -7,9 +7,10 @@ class AbstractIVDevice(ABC): @abstractmethod - def take_IV(self, sweep: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + def take_IV(self, sweep: tuple[float, float, float, float]) -> tuple[np.ndarray, np.ndarray]: """ Perform an IV sweep. Returns (bias, sense). + Sweep is (start, stop, step, sleep) """ pass diff --git a/src/qkit/drivers/AbstractVNA.py b/src/qkit/drivers/AbstractVNA.py index f12794118..ff2f34fa0 100644 --- a/src/qkit/drivers/AbstractVNA.py +++ b/src/qkit/drivers/AbstractVNA.py @@ -36,7 +36,7 @@ def get_span(self) -> float: return freqs[-1] - freqs[0] @abstractmethod - def get_tracedata(self, RealImag = None) -> tuple((np.ndarray, np.ndarray)): + def get_tracedata(self, RealImag = None) -> tuple[np.ndarray, np.ndarray]: """ When the measurement succeeded, this method returns the resulting data. The behaviour depends on the [RealImag] keyword argument. diff --git a/src/qkit/drivers/Double_VTE.py b/src/qkit/drivers/Double_VTE.py new file mode 100644 index 000000000..09d34fdb4 --- /dev/null +++ b/src/qkit/drivers/Double_VTE.py @@ -0,0 +1,174 @@ +from qkit.core.instrument_base import Instrument +import numpy as np +import logging +import typing +import time + +class Double_VTE(Instrument): + def __init__(self, name): + """ + Double Virtual Tunnel Electronics + + So far only supports voltage bias, measure pseudo current for 2x transimpedance amplifier setup + """ + super().__init__(name, tags=["virtual"]) + # setter matrix & getter matrix for e.g. sweeping voltage difference at const. total voltage across a sample and measuring current: + # (VA) = (1 -1) . (V1) + # (VB) (.5 .5) (V2) + # (IA) = (.5 -.5) . (I1) + # (IB) (1 1) (I2) + self._setter_matrix = np.array([[1, 0], [0, 1]]) + self._getter_matrix = np.array([[1, 0], [0, 1]]) # technically not required for sweeping, but handled here alongside setter for consistency + + self.v_div_1 = 1 + self.v_div_2 = 1 + self.dVdA_1 = 1 + self.dVdA_2 = 1 + + self.sweep_manually = True + # for manual sweeps + self.setter_1: typing.Callable[[float], None] = None + self.setter_2: typing.Callable[[float], None] = None + self.getter_1: typing.Callable[[], float] = None + self.getter_2: typing.Callable[[], float] = None + self.rdb_set_1: typing.Callable[[], float] = None # optional but recommended to catch e.g. insufficient device resolution + self.rdb_set_2: typing.Callable[[], float] = None + self.set_get_dt = 0.001 # wait between set & get + # for automated sweeps: func(to_be_set_v1, to_be_set_v2) -> actual_set_v1*v_div_1, actual_set_v2*v_div_2, measured_i1*dVdA_1, measured_i2*dVdA_2 + self.double_sweeper: typing.Callable[[np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] = None + + def set_setter_matrix(self, mat: np.ndarray): + if mat.shape != (2, 2): + logging.error("Setter-matrix shape was {}, but (2, 2) is required".format(str(mat.shape))) + return + if np.linalg.det(mat) == 0: + logging.error("Setter-matrix needs to be invertible") + return + self._setter_matrix = mat + def get_setter_matrix(self): + return self._setter_matrix + + def set_getter_matrix(self, mat: np.ndarray): + if mat.shape != (2, 2): + logging.error("Getter-matrix shape was {}, but (2, 2) is required".format(str(mat.shape))) + return + if np.linalg.det(mat) == 0: + logging.error("Getter-matrix needs to be invertible") + return + self._getter_matrix = mat + def get_getter_matrix(self): + return self._getter_matrix + + def show_effective_minmax(self, v1_min: float, v1_max: float, v2_min: float, v2_max: float, fix_a: float | None = None, fix_b: float | None = None, make_plot: bool = True) -> tuple[float]: + helper = np.array([[v1_min, v1_max, v1_max, v1_min, v1_min], + [v2_min, v2_min, v2_max, v2_max, v2_min]]) + # boundary box trivially as min/max over corners + a_min = np.min(self._setter_matrix @ helper, axis=-1)[0] + a_max = np.max(self._setter_matrix @ helper, axis=-1)[0] + b_min = np.min(self._setter_matrix @ helper, axis=-1)[1] + b_max = np.max(self._setter_matrix @ helper, axis=-1)[1] + + # if specific fixed point is within boundary, its according range can be found as the innermost intersections between quadrilateral edges and fixed line + # draw everything in v1, v2 and va, vb space for above comment to make sense + if fix_a is None: + eff_b_min = None + eff_b_max = None + elif (fix_a < a_min) or (fix_a > a_max): + eff_b_min = None + eff_b_max = None + else: + eff_cand = [] + for i in range(4): + try: + # https://en.wikipedia.org/wiki/Line%E2%80%93line_intersection P_y formula with p1 = setter@h_i, p2 = setter@h_i+1, p3 = (fix_a, 1), p4 = (fix_a, 0) + eff_cand += [(np.linalg.det(self._setter_matrix) * np.linalg.det(helper[:,i:i+2]) + fix_a*(self._setter_matrix @ (helper[:,i] - helper[:,i+1]))[1])/(self._setter_matrix @ (helper[:,i] - helper[:,i+1]))[0]] + except: + pass + eff_cand = np.array(eff_cand) + eff_cand = np.unique(eff_cand[(eff_cand >= b_min) & (eff_cand <= b_max)]) + if len(eff_cand) != 2: + logging.warning("Something went wrong during fix_a calculations") + eff_b_min = None + eff_b_max = None + else: + eff_b_min = np.min(eff_cand) + eff_b_max = np.max(eff_cand) + + if fix_b is None: + eff_a_min = None + eff_a_max = None + elif (fix_b < b_min) or (fix_b > b_max): + eff_a_min = None + eff_a_max = None + else: + eff_cand = [] + for i in range(4): + try: + eff_cand += [(-np.linalg.det(self._setter_matrix) * np.linalg.det(helper[:,i:i+2]) + fix_b*(self._setter_matrix @ (helper[:,i] - helper[:,i+1]))[0])/(self._setter_matrix @ (helper[:,i] - helper[:,i+1]))[1]] + except: + pass + eff_cand = np.array(eff_cand) + eff_cand = np.unique(eff_cand[(eff_cand >= a_min) & (eff_cand <= a_max)]) + if len(eff_cand) != 2: + logging.warning("Something went wrong during fix_b calculations") + eff_a_min = None + eff_a_max = None + else: + eff_a_min = np.min(eff_cand) + eff_a_max = np.max(eff_cand) + + if make_plot: + import matplotlib.pyplot as plt + plt.subplots(figsize=(12,12), dpi=360) + plt.plot(*(self._setter_matrix @ helper), "k-", zorder=0) + plt.hlines([b_min, b_max], a_min, a_max, "b", ":", zorder=1) + plt.vlines([a_min, a_max], b_min, b_max, "b", ":", zorder=1) + plt.hlines(fix_b, eff_a_min, eff_a_max, "r", "--", zorder=1) if not ((fix_b is None) or (eff_a_min is None) or (eff_a_max is None)) else None + plt.vlines(fix_a, eff_b_min, eff_b_max, "r", "--", zorder=1) if not ((fix_a is None) or (eff_b_min is None) or (eff_b_max is None)) else None + plt.scatter(*(self._setter_matrix @ helper[:,:-1]), c=range(1, helper.shape[-1]), marker="o", cmap="gist_rainbow", zorder=2) + plt.show() + + return a_min, a_max, b_min, b_max, eff_b_min, eff_b_max, eff_a_min, eff_a_max + + def get_sweepdata(self, a_vals: np.ndarray, b_vals: np.ndarray): + # (nop), (nop) = *as tuple*: (2, 2) @ (2, 2) @ (2, nop) // (1, nop), (1, nop) + sweep_1, sweep_2 = (s for s in np.array([[self.v_div_1, 0], [0, self.v_div_2]]) @ np.linalg.inv(self._setter_matrix) @ np.concatenate([a_vals.reshape((1, len(a_vals))), b_vals.reshape((1, len(b_vals)))], axis=0)) + logging.info("Sweeping Setter_1 within {}V to {}V and Setter_2 within {}V to {}V".format(np.min(sweep_1), np.max(sweep_1), np.min(sweep_2), np.max(sweep_2))) + if self.sweep_manually: + v_set_1, v_set_2, v_meas_1, v_meas_2 = ([], [], [], []) + for i in range(len(a_vals)): + self.setter_1(sweep_1[i]) + self.setter_2(sweep_2[i]) + time.sleep(self.set_get_dt) + v_set_1 += [self.rdb_set_1() if not (self.rdb_set_1 is None) else sweep_1[i]] + v_set_2 += [self.rdb_set_2() if not (self.rdb_set_2 is None) else sweep_2[i]] + v_meas_1 += [self.getter_1()] + v_meas_2 += [self.getter_2()] + return np.array(v_set_1)/self.v_div_1, np.array(v_set_2)/self.v_div_2, np.array(v_meas_1)/self.dVdA_1, np.array(v_meas_2)/self.dVdA_2 + else: + v_set_1, v_set_2, v_meas_1, v_meas_2 = self.double_sweeper(sweep_1, sweep_2) + return v_set_1/self.v_div_1, v_set_2/self.v_div_2, v_meas_1/self.dVdA_1, v_meas_2/self.dVdA_2 + + # qkit setting stuffs + def get_parameters(self): + return { + "setter_matrix": None, + "getter_matrix": None, + "v_div_1": None, + "v_div_2": None, + "dVdA_1": None, + "dVdA_2": None, + "set_get_dt": None + } + + def get(self, param, **kwargs): + try: + return eval("self._{}".format(param)) if "etter_matrix" in param else eval("self.{}".format(param)) + except: + return None + + +if __name__ == "__main__": + mydvte = Double_VTE("mydvte") + mydvte.set_setter_matrix(np.array([[1, -1], [1/3, 2/3]])) + print(mydvte.show_effective_minmax(0, 4, -4, 0, 2, 0)) \ No newline at end of file diff --git a/src/qkit/drivers/IVVI_BiasDAC.py b/src/qkit/drivers/IVVI_BiasDAC.py index 7b2c7879f..907ae50b2 100644 --- a/src/qkit/drivers/IVVI_BiasDAC.py +++ b/src/qkit/drivers/IVVI_BiasDAC.py @@ -108,8 +108,8 @@ def __init__(self): self.v_div = None # for voltage bias def get_sweep_mode(self): # 0: V bias V measure, 1: I bias V measure, 2: V bias I measure - # technically mode 2 also possible, this class cant determine 0 vs 2 without hidden knowledge about measure function though - return 1 - self.pseudo_bias_mode + # technically mode 0 also possible, this class cant determine 0 vs 2 without hidden knowledge about measure function though + return 1 + self.pseudo_bias_mode def get_sweep_bias(self): return self.pseudo_bias_mode def get_sweep_channels(self): diff --git a/src/qkit/drivers/Keysight_B2900.py b/src/qkit/drivers/Keysight_B2900.py index 21107915b..b6a8dbab6 100644 --- a/src/qkit/drivers/Keysight_B2900.py +++ b/src/qkit/drivers/Keysight_B2900.py @@ -1510,7 +1510,7 @@ def get_voltage(self, channel=1): logging.debug('{:s}: Get voltage value{:s}'.format(__name__, self._log_chans[self._channels][channel])) #self._write(':disp:view sing{:d}'.format(channel)) # return float(self._ask(':sour{:s}:volt?'.format(self._cmd_chans[self._channels][channel]))) - return float(self._ask(':meas:volt?').replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf')) + return float(self._ask(':meas:volt? (@{})'.format(channel)).replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf')) except Exception as e: logging.error('{!s}: Cannot get voltage value{:s}'.format(__name__, self._log_chans[self._channels][channel])) raise type(e)('{!s}: Cannot get voltage value{:s}\n{!s}'.format(__name__, self._log_chans[self._channels][channel], e)) @@ -1560,7 +1560,7 @@ def get_current(self, channel=1): logging.debug('{:s}: Get current value{:s}'.format(__name__, self._log_chans[self._channels][channel])) #self._write(':disp:view sing{:d}'.format(channel)) # return float(self._ask(':sour{:s}:curr?'.format(self._cmd_chans[self._channels][channel]))) - return float(self._ask(':meas:curr?').replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf')) + return float(self._ask(':meas:curr? (@{})'.format(channel)).replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf')) except Exception as e: logging.error('{!s}: Cannot get current value{:s}'.format(__name__, self._log_chans[self._channels][channel])) raise type(e)('{!s}: Cannot get current value{:s}\n{!s}'.format(__name__, self._log_chans[self._channels][channel], e)) @@ -1586,7 +1586,7 @@ def get_resistance(self, channel=1): logging.debug('{:s}: Get resistance value{:s}'.format(__name__, self._log_chans[self._channels][channel])) #self._write(':disp:view sing{:d}'.format(channel)) # return float(self._ask(':sour{:s}:res?'.format(self._cmd_chans[self._channels][channel]))) - return float(self._ask(':meas:res?').replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf')) + return float(self._ask(':meas:res? (@{})'.format(channel)).replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf')) except Exception as e: logging.error('{!s}: Cannot get resistance value{:s}'.format(__name__, self._log_chans[self._channels][channel])) raise type(e)('{!s}: Cannot get resistance value{:s}\n{!s}'.format(__name__, self._log_chans[self._channels][channel], e)) @@ -2052,7 +2052,7 @@ def get_tracedata(self): self._wait_for_transition_idle(channel=channel_bias) I_values = np.fromstring(self._ask(':fetc:arr:curr?').replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf'), sep=',', dtype=float) V_values = np.fromstring(self._ask(':fetc:arr:volt?').replace('+9.910000E+37', 'nan').replace('9.900000E+37', 'inf'), sep=',', dtype=float) - return (I_values, V_values)[::int(np.sign(.5 - self.get_sweep_bias()))] + return (I_values, V_values)#[::int(np.sign(.5 - self.get_sweep_bias()))] # IVD.get_tracedata should always yield (I,V)! correct sorting to bias/sense already handled in transport script except Exception as e: logging.error('{!s}: Cannot take sweep data of channel {!s}'.format(__name__, self._sweep_channels)) raise type(e)('{!s}: Cannot take sweep data of channel {!s}\n{!s}'.format(__name__, self._sweep_channels, e)) diff --git a/src/qkit/measure/logging_base.py b/src/qkit/measure/logging_base.py new file mode 100644 index 000000000..6277772a1 --- /dev/null +++ b/src/qkit/measure/logging_base.py @@ -0,0 +1,89 @@ +from qkit.storage.store import Data +from qkit.storage.hdf_dataset import hdf_dataset +import typing +import numpy as np + +class logFunc(object): + """ + Unified logging class to offer log-functionality at different points of 1D - 3D spectroscopy or transport measurements. + + The to be logged function can either be single-valued or yield a trace of (n) datapoints (independent from set x/y-parameters). + + For 3D measurements the function can be called + a) once at the beginning [shape (1) or (n)] + b) at each x-iteration [shape (x) or (x, n)] -> old spectroscopy.set_log_function & spectroscopy.set_log_function_2d + c) only for the first x-value at each y-iteration [shape (y) or (y, n)] + d) at each x- & y-iteration [shape (x, y) or (x, y, n)] -> old transport.set_log_function for the single-valued case + with 2D measurements being naturally limited to cases a), b) and 1D measurements to case a). + + This behavior can be controlled via handing over x_vec, y_vec or trace_info arguments. While the first two should be the respective h5 dataset, which + is already defined in the main measurement, 'trace_vec provides information about the additional coordinate a trace log function may sweep over as + (*values_array*, name, unit). The same base coordinate may be chosen for different trace log functions. + """ + def __init__(self, file_name: str, func: typing.Callable, name: str, unit: str = "", dtype: str = "f", x_ds_url: str = None, y_ds_url: str = None, trace_info: tuple[np.ndarray, str, str] = None): + self.file = Data(file_name) + self.func = func + self.name = name + self.unit = unit + self.dtype = dtype + # print("Logging {} in file {}".format(name, file_name)) # TODO remove + self.signature = "" + self.x_ds_url = x_ds_url + if not (x_ds_url is None): + self.signature += "x" + self.x_len: int = None + self.y_ds_url = y_ds_url + if not (y_ds_url is None): + self.signature += "y" + self.y_len: int = None + self.trace_info = trace_info + if not (trace_info is None): + self.signature += "n" + self.buffer1d: np.ndarray = None + self.log_ds: hdf_dataset = None + + def prepare_file(self): + # prepare trace base coordinate if necessary + if "n" in self.signature: + try: + trace_ds = self.file.get_dataset("/entry/data0/{}".format(self.trace_info[1])) + # base coordinate already exists in file + except: + trace_ds = self.file.add_coordinate(self.trace_info[1], self.trace_info[2]) + trace_ds.add(self.trace_info[0]) + # the logic is admittably more complicated here, writing down all 8 possible cases of x,y,n present or not helps + if len(self.signature) == 0: + self.log_ds = self.file.add_coordinate(self.name, self.unit) # coordinate dtype hardcoded as float + elif len(self.signature) == 1: + self.log_ds = self.file.add_value_vector(self.name, self.file.get_dataset(self.x_ds_url) if "x" == self.signature else (self.file.get_dataset(self.y_ds_url) if "y" == self.signature else trace_ds), self.unit, dtype=self.dtype) + elif len(self.signature) == 2: + self.log_ds = self.file.add_value_matrix(self.name, self.file.get_dataset(self.x_ds_url) if "x" in self.signature else self.file.get_dataset(self.y_ds_url), trace_ds if "n" in self.signature else self.file.get_dataset(self.y_ds_url), self.unit, dtype=self.dtype) + elif len(self.signature) == 3: + self.log_ds = self.file.add_value_box(self.name, self.file.get_dataset(self.x_ds_url), self.file.get_dataset(self.y_ds_url), trace_ds, self.unit, dtype=self.dtype) + + if "x" in self.signature: + self.x_len = self.file[self.x_ds_url].shape[0] + if "y" in self.signature: + self.y_len = self.file[self.y_ds_url].shape[0] + + def logIfDesired(self, ix=0, iy=0): + if (ix == 0 or "x" in self.signature) and (iy == 0 or "y" in self.signature): # log function call desired + if len(self.signature) == 0: # "" + # we will only reach here once, no further case-logic required + self.log_ds.append(np.array([self.func()]), reset=True) + elif "n" in self.signature: # "n", "xn", "yn", "xyn" + self.log_ds.append(self.func()) + if len(self.signature) == 3: + if iy + 1 == self.y_len: + # who doesnt love 4x nested ifs edge cases? + self.log_ds.next_matrix() + elif "y" in self.signature: # "y", "xy" + if iy == 0: + self.buffer1d = np.full(self.y_len, np.nan) + self.buffer1d[iy] = self.func() + self.log_ds.append(self.buffer1d, reset=(iy != 0)) + else: # "x" + if ix == 0: + self.buffer1d = np.full(self.x_len, np.nan) + self.buffer1d[ix] = self.func() + self.log_ds.append(self.buffer1d, reset=(ix != 0)) \ No newline at end of file diff --git a/src/qkit/measure/spectroscopy/spectroscopy.py b/src/qkit/measure/spectroscopy/spectroscopy.py index 59fbb19e0..0d54e32e3 100644 --- a/src/qkit/measure/spectroscopy/spectroscopy.py +++ b/src/qkit/measure/spectroscopy/spectroscopy.py @@ -23,19 +23,20 @@ import threading import qkit +import qkit.analysis.resonator_fitting as resonatorFit if qkit.module_available("matplotlib"): import matplotlib.pylab as plt if qkit.module_available("scipy"): from scipy.optimize import curve_fit from scipy.interpolate import interp1d, UnivariateSpline from qkit.storage import store as hdf -from qkit.analysis.resonator import Resonator as resonator +#from qkit.analysis.resonator import Resonator as resonator from qkit.gui.plot import plot as qviewkit from qkit.gui.notebook.Progress_Bar import Progress_Bar from qkit.measure.measurement_class import Measurement +from qkit.measure.logging_base import logFunc import qkit.measure.write_additional_files as waf - ################################################################## class spectrum(object): @@ -44,8 +45,8 @@ class spectrum(object): usage: m = spectrum(vna = vna1) - m.set_x_parameters(arange(-0.05,0.05,0.01),'flux coil current',coil.set_current, unit = 'mA') outer scan in 3D - m.set_y_parameters(arange(4e9,7e9,10e6),'excitation frequency',mw_src1.set_frequency, unit = 'Hz') + m.set_x_parameters(arange(-0.05,0.05,0.01),'flux coil current',coil.set_current, unit = 'mA') # outer scan in 2D & 3D + m.set_y_parameters(arange(4e9,7e9,10e6),'excitation frequency',mw_src1.set_frequency, unit = 'Hz') # inner scan in 3D m.landscape.generate_fit_function_xy(...) for 3D scan, can be called several times and appends the current landscape m.landscape.generate_fit_function_xz(...) for 2D or 3D scan, adjusts the vna freqs with respect to x @@ -77,10 +78,11 @@ def __init__(self, vna, exp_name='', sample=None): self.progress_bar = True self._fit_resonator = False + self.storeRealImag = False self._plot_comment = "" - self.set_log_function() - self.set_log_function_2D() + self.log_init_params = [] # buffer is necessary to allow adjusting x/y parameter sweeps after setting log functions + self.log_funcs: list[logFunc] = [] self.open_qviewkit = True self.qviewkit_singleInstance = False @@ -93,6 +95,39 @@ def __init__(self, vna, exp_name='', sample=None): self._qvk_process = False self._scan_dim = None self._scan_time = False + + def add_logger(self, func, name="log_param", unit="", dtype="f", over_x=True, over_y=False, is_trace=False, trace_base_vals=None, trace_base_name=None, trace_base_unit=None): + """ + Migration from set_log_function & set_log_function_2D: + + -------- + def get_T(): + ... # returns a float + def a(): + ... # returns a float + trace_base = np.linspace(1, 2, 101) + def b_trace(): + ... # returns a trace + + # obsolete + # spec.set_log_function([get_T, a], ["temp", "a_name"], ["K", "a_unit"]) + # spec.set_log_function_2D([b_trace], ["b_name"], ["b_unit"], [trace_base], ["base_name"], ["base_unit"]) + + # do instead + spec.add_logger(get_T, "temp", "K", over_y=True) # over_y optional for more logged temperatures + spec.add_logger(a, "a_name", "a_unit") # default migration + spec.add_logger(b_trace, "b_name", "b_unit", is_trace=True, trace_base_vals=trace_base, trace_base_name="base_name", trace_base_unit="base_unit") # traces + ------ + + Alternatively more options like logging over y-iterations aswell or skipping the x-iteration are possible now, see qkit/measure/logging_base for details. + """ + self.log_init_params += [(func, name, unit, dtype, over_x, over_y, is_trace, trace_base_vals, trace_base_name, trace_base_unit)] # handle logger initialization in prepare_file + + def clear_loggers(self): + """ + Clear all set log functions + """ + self.log_init_params = [] def set_log_function(self, func=None, name=None, unit=None, log_dtype=None): ''' @@ -108,7 +143,7 @@ def set_log_function(self, func=None, name=None, unit=None, log_dtype=None): unit: unit of logging parameter, default: '' log_dtype: h5 data type, default: 'f' (float32) ''' - + #logging.error("'set_log_function' is obsolete, 'add_logger' is used instead. See respective qkit/src/qkit/measure/spectroscopy function docs for how to migrate.") if name == None: try: name = ['log_param'] * len(func) @@ -124,18 +159,15 @@ def set_log_function(self, func=None, name=None, unit=None, log_dtype=None): log_dtype = ['f'] * len(func) except Exception: log_dtype = None + + # remove all previously set 1d loggers but keep 2d loggers depending on generalizd is_trace parameter + self.log_init_params = [elm for elm in self.log_init_params if elm[6]] - self.log_function = [] - self.log_name = [] - self.log_unit = [] - self.log_dtype = [] - - if func != None: - for i, f in enumerate(func): - self.log_function.append(f) - self.log_name.append(name[i]) - self.log_unit.append(unit[i]) - self.log_dtype.append(log_dtype[i]) + if func == None: + return + + for i in range(len(func)): + self.add_logger(func[i], name[i], unit[i], log_dtype[i]) def set_log_function_2D(self, func=None, name=None, unit=None, y=None, y_name=None, y_unit=None, log_dtype=None): ''' @@ -151,6 +183,7 @@ def set_log_function_2D(self, func=None, name=None, unit=None, y=None, y_name=No unit: unit of logging parameter, default: '' log_dtype: h5 data type, default: 'f' (float32) ''' + #logging.error("'set_log_function_2D' is obsolete, 'add_logger' is used instead. See respective qkit/src/qkit/measure/spectroscopy function docs for how to migrate.") if name == None: try: name = ['log_param'] * len(func) @@ -166,24 +199,15 @@ def set_log_function_2D(self, func=None, name=None, unit=None, y=None, y_name=No log_dtype = ['f'] * len(func) except Exception: log_dtype = None + + # remove all previously set 2d loggers but keep 1d loggers depending on generalized is_trace parameter + self.log_init_params = [elm for elm in self.log_init_params if not elm[6]] - self.log_function_2D = [] - self.log_name_2D = [] - self.log_unit_2D = [] - self.log_y_2D = [] - self.log_y_name_2D = [] - self.log_y_unit_2D = [] - self.log_dtype_2D = [] - - if func != None: - for i, _ in enumerate(func): - self.log_function_2D.append(func[i]) - self.log_name_2D.append(name[i]) - self.log_unit_2D.append(unit[i]) - self.log_dtype_2D.append(log_dtype[i]) - self.log_y_2D.append(y[i]) - self.log_y_name_2D.append(y_name[i]) - self.log_y_unit_2D.append(y_unit[i]) + if func == None: + return + + for i in range(len(func)): + self.add_logger(func[i], name[i], unit[i], log_dtype[i], True, False, True, y[i], y_name[i], y_unit[i]) def set_x_parameters(self, x_vec, x_coordname, x_set_obj, x_unit=""): """ @@ -259,7 +283,9 @@ def _prepare_measurement_file(self): self._mo.append(self._measurement_object.get_JSON()) # write logfile and instrument settings - self._write_settings_dataset() + self._settings = self._data_file.add_textlist('settings') + settings = waf.get_instrument_settings(self._data_file.get_filepath()) + self._settings.append(settings) self._log = waf.open_log_file(self._data_file.get_filepath()) self._data_freq = self._data_file.add_coordinate('frequency', unit='Hz') @@ -271,37 +297,53 @@ def _prepare_measurement_file(self): self._data_time = self._data_file.add_coordinate('time', unit='s') self._data_time.add(np.arange(0, self._nop, 1) * self.vna.get_sweeptime() / (self._nop - 1)) sweep_vector = self._data_time + + # for automatic circlefit + if self._fit_resonator: + self._fit_select = (self._freqpoints >= self._f_min) & (self._freqpoints <= self._f_max) + self._fit_freq = self._data_file.add_coordinate('_fit_frequency', unit='Hz', folder="analysis") if self._scan_dim == 1: - self._data_real = self._data_file.add_value_vector('real', x=sweep_vector, unit='', save_timestamp=True) - self._data_imag = self._data_file.add_value_vector('imag', x=sweep_vector, unit='', save_timestamp=True) - self._data_amp = self._data_file.add_value_vector('amplitude', x=sweep_vector, unit='arb. unit', - save_timestamp=True) - self._data_pha = self._data_file.add_value_vector('phase', x=sweep_vector, unit='rad', - save_timestamp=True) + self._data_real = self._data_file.add_value_vector('real', x=sweep_vector, unit='', save_timestamp=True) if self.storeRealImag else None + self._data_imag = self._data_file.add_value_vector('imag', x=sweep_vector, unit='', save_timestamp=True) if self.storeRealImag else None + self._data_amp = self._data_file.add_value_vector('amplitude', x=sweep_vector, unit='arb. unit', save_timestamp=True) + self._data_pha = self._data_file.add_value_vector('phase', x=sweep_vector, unit='rad', save_timestamp=True) + + self._iq_view = self._data_file.add_view("IQ", x=self._data_real, y=self._data_imag, view_params={'aspect': 1.0}) if self.storeRealImag else None + + if self._fit_resonator: + self._fit_amp = self._data_file.add_value_vector("_fit_amplitude", x=self._fit_freq, unit="arb. unit", folder="analysis") + self._fit_pha = self._data_file.add_value_vector("_fit_phase", x=self._fit_freq, unit="rad", folder="analysis") + self._fit_real = self._data_file.add_value_vector("_fit_real", x=self._fit_freq, unit="", folder="analysis") if self.storeRealImag else None + self._fit_imag = self._data_file.add_value_vector("_fit_imag", x=self._fit_freq, unit="", folder="analysis") if self.storeRealImag else None + # extract data is single datapoint for 1D measure, add as coordinate after measurement manually if self._scan_dim == 2: self._data_x = self._data_file.add_coordinate(self.x_coordname, unit=self.x_unit) self._data_x.add(self.x_vec) - self._data_amp = self._data_file.add_value_matrix('amplitude', x=self._data_x, y=sweep_vector, - unit='arb. unit', save_timestamp=True) - self._data_pha = self._data_file.add_value_matrix('phase', x=self._data_x, y=sweep_vector, unit='rad', - save_timestamp=True) - - if self.log_function != None: # use logging - self._log_value = [] - for i in range(len(self.log_function)): - self._log_value.append( - self._data_file.add_value_vector(self.log_name[i], x=self._data_x, unit=self.log_unit[i], - dtype=self.log_dtype[i])) + + self._data_real = self._data_file.add_value_matrix('real', x=self._data_x, y=sweep_vector, unit='', save_timestamp=False) if self.storeRealImag else None + self._data_imag = self._data_file.add_value_matrix('imag', x=self._data_x, y=sweep_vector, unit='', save_timestamp=False) if self.storeRealImag else None + self._data_amp = self._data_file.add_value_matrix('amplitude', x=self._data_x, y=sweep_vector, unit='arb. unit', save_timestamp=True) + self._data_pha = self._data_file.add_value_matrix('phase', x=self._data_x, y=sweep_vector, unit='rad', save_timestamp=True) + + self._iq_view = self._data_file.add_view("IQ", x=self._data_real, y=self._data_imag, view_params={'aspect': 1.0}) if self.storeRealImag else None + + if self._fit_resonator: + self._fit_amp = self._data_file.add_value_matrix("_fit_amplitude", x=self._data_x, y=sweep_vector, unit="arb. unit", folder="analysis") + self._fit_pha = self._data_file.add_value_matrix("_fit_phase", x=self._data_x, y=sweep_vector, unit="rad", folder="analysis") + self._fit_real = self._data_file.add_value_matrix("_fit_real", x=self._data_x, y=sweep_vector, unit="", folder="analysis") if self.storeRealImag else None + self._fit_imag = self._data_file.add_value_matrix("_fit_imag", x=self._data_x, y=sweep_vector, unit="", folder="analysis") if self.storeRealImag else None + + self._fit_extracts: dict[str, hdf.hdf_dataset] = {} + for key in self._fit_function.extract_data.keys(): + self._fit_extracts[key] = self._data_file.add_value_vector("fit_" + key, x=self._data_x, unit="", folder="analysis") if self._nop < 10: """creates view: plot middle point vs x-parameter, for qubit measurements""" self._views = [ - self._data_file.add_view("amplitude_midpoint",x=self._data_x,y=self._data_amp, - view_params=dict(transpose=True,default_trace=self._nop // 2,linecolors=[(200,200,100)])), - self._data_file.add_view("phase_midpoint", x=self._data_x, y=self._data_pha, - view_params=dict(transpose=True, default_trace=self._nop // 2, linecolors=[(200, 200, 100)])) + self._data_file.add_view("amplitude_midpoint", x=self._data_x, y=self._data_amp, view_params=dict(transpose=True, default_trace=self._nop // 2,linecolors=[(200, 200, 100)])), + self._data_file.add_view("phase_midpoint", x=self._data_x, y=self._data_pha, view_params=dict(transpose=True, default_trace=self._nop // 2, linecolors=[(200, 200, 100)])) ] if self._scan_dim == 3: @@ -311,38 +353,41 @@ def _prepare_measurement_file(self): self._data_y.add(self.y_vec) if self._nop == 0: # saving in a 2D matrix instead of a 3D box HR: does not work yet !!! test things before you put them online. - self._data_amp = self._data_file.add_value_matrix('amplitude', x=self._data_x, y=self._data_y, - unit='arb. unit', save_timestamp=False) - self._data_pha = self._data_file.add_value_matrix('phase', x=self._data_x, y=self._data_y, unit='rad', - save_timestamp=False) + self._data_amp = self._data_file.add_value_matrix('amplitude', x=self._data_x, y=self._data_y, unit='arb. unit', save_timestamp=False) + self._data_pha = self._data_file.add_value_matrix('phase', x=self._data_x, y=self._data_y, unit='rad', save_timestamp=False) else: - self._data_amp = self._data_file.add_value_box('amplitude', x=self._data_x, y=self._data_y, - z=sweep_vector, unit='arb. unit', - save_timestamp=False) - self._data_pha = self._data_file.add_value_box('phase', x=self._data_x, y=self._data_y, - z=sweep_vector, unit='rad', save_timestamp=False) - - if self.log_function != None: # use logging - self._log_value = [] - for i in range(len(self.log_function)): - self._log_value.append( - self._data_file.add_value_vector(self.log_name[i], x=self._data_x, unit=self.log_unit[i], - dtype=self.log_dtype[i])) - - if self.log_function_2D != None: # use 2D logging - self._log_y_value_2D = [] - for i in range(len(self.log_y_name_2D)): - if self.log_y_name_2D[i] not in self.log_y_name_2D[:i]: # add y coordinate for 2D logging - self._log_y_value_2D.append(self._data_file.add_coordinate(self.log_y_name_2D[i], unit=self.log_y_unit_2D[i], folder='data')) # possibly use "data1" - self._log_y_value_2D[i].add(self.log_y_2D[i]) - else: # use y coordinate for 2D logging if it is already added - self._log_y_value_2D.append(self._log_y_value_2D[np.squeeze(np.argwhere(self.log_y_name_2D[i] == np.array(self.log_y_name_2D[:i])))]) - - self._log_value_2D = [] - for i in range(len(self.log_function_2D)): - self._log_value_2D.append( - self._data_file.add_value_matrix(self.log_name_2D[i], x=self._data_x, y=self._log_y_value_2D[i], unit=self.log_unit_2D[i], - dtype=self.log_dtype_2D[i], folder='data')) # possibly use "data1" + self._data_real = self._data_file.add_value_box('real', x=self._data_x, y=self._data_y, z=sweep_vector, unit='', save_timestamp=False) if self.storeRealImag else None + self._data_imag = self._data_file.add_value_box('imag', x=self._data_x, y=self._data_y, z=sweep_vector, unit='', save_timestamp=False) if self.storeRealImag else None + self._data_amp = self._data_file.add_value_box('amplitude', x=self._data_x, y=self._data_y, z=sweep_vector, unit='arb. unit', save_timestamp=True) + self._data_pha = self._data_file.add_value_box('phase', x=self._data_x, y=self._data_y, z=sweep_vector, unit='rad', save_timestamp=True) + + self._iq_view = self._data_file.add_view("IQ", x=self._data_real, y=self._data_imag, view_params={'aspect': 1.0}) if self.storeRealImag else None + + if self._fit_resonator: + self._fit_amp = self._data_file.add_value_box("_fit_amplitude", x=self._data_x, y=self._data_y, z=sweep_vector, unit="arb. unit", folder="analysis") + self._fit_pha = self._data_file.add_value_box("_fit_phase", x=self._data_x, y=self._data_y, z=sweep_vector, unit="rad", folder="analysis") + self._fit_real = self._data_file.add_value_box("_fit_real", x=self._data_x, y=self._data_y, z=sweep_vector, unit="", folder="analysis") if self.storeRealImag else None + self._fit_imag = self._data_file.add_value_box("_fit_imag", x=self._data_x, y=self._data_y, z=sweep_vector, unit="", folder="analysis") if self.storeRealImag else None + + self._fit_extracts: dict[str, hdf.hdf_dataset] = {} + for key in self._fit_function.extract_data.keys(): + self._fit_extracts[key] = self._data_file.add_value_matrix("fit_" + key, x=self._data_x, y=self._data_y, unit="", folder="analysis") + + if self._fit_resonator: + self._iq_view.add(self._fit_real, self._fit_imag) if self.storeRealImag else None + self._amp_view = self._data_file.add_view("AmplitudeFit", self._data_freq, self._data_amp) + self._amp_view.add(self._fit_freq, self._fit_amp) + self._pha_view = self._data_file.add_view("PhaseFit", self._data_freq, self._data_pha) + self._pha_view.add(self._fit_freq, self._fit_pha) + + self.log_funcs = [] + for init_tuple in self.log_init_params: + func, name, unit, dtype, over_x, over_y, is_trace, trace_base_vals, trace_base_name, trace_base_unit = init_tuple + self.log_funcs += [logFunc(self._data_file.get_filepath(), func, name, unit, dtype, + self._data_x.ds_url if (self._scan_dim >= 2) and over_x else None, + self._data_y.ds_url if (self._scan_dim == 3) and over_y else None, + (trace_base_vals, trace_base_name, trace_base_unit) if is_trace else None)] + self.log_funcs[-1].prepare_file() if self.comment: self._data_file.add_comment(self.comment) @@ -350,11 +395,6 @@ def _prepare_measurement_file(self): if self.qviewkit_singleInstance and self.open_qviewkit and self._qvk_process: self._qvk_process.terminate() # terminate an old qviewkit instance - def _write_settings_dataset(self): - self._settings = self._data_file.add_textlist('settings') - settings = waf.get_instrument_settings(self._data_file.get_filepath()) - self._settings.append(settings) - def measure_1D(self, rescan=True, web_visible=True): ''' measure method to record a single (averaged) VNA trace, S11 or S21 according to the setting on the VNA @@ -379,11 +419,13 @@ def measure_1D(self, rescan=True, web_visible=True): """opens qviewkit to plot measurement, amp and pha are opened by default""" if self.open_qviewkit: - self._qvk_process = qviewkit.plot(self._data_file.get_filepath(), datasets=['amplitude', 'phase']) - if self._fit_resonator: - self._resonator = resonator(self._data_file.get_filepath()) + self._qvk_process = qviewkit.plot(self._data_file.get_filepath(), datasets=['amplitude', 'phase'] + (['views/IQ'] if self.storeRealImag else [])) qkit.flow.start() + + for lf in self.log_funcs: + lf.logIfDesired() + if rescan: if self.averaging_start_ready: self.vna.start_measurement() @@ -419,14 +461,26 @@ def measure_1D(self, rescan=True, web_visible=True): if self.progress_bar: self._p.iterate() data_amp, data_pha = self.vna.get_tracedata() - data_real, data_imag = self.vna.get_tracedata('RealImag') + data_real, data_imag = self.vna.get_tracedata('RealImag') if self.storeRealImag else (None, None) self._data_amp.append(data_amp) self._data_pha.append(data_pha) - self._data_real.append(data_real) - self._data_imag.append(data_imag) + self._data_real.append(data_real) if self.storeRealImag else None + self._data_imag.append(data_imag) if self.storeRealImag else None if self._fit_resonator: - self._do_fit_resonator() + self._fit_function.do_fit(self._freqpoints[self._fit_select], np.array(data_amp)[self._fit_select], np.array(data_pha)[self._fit_select]) + # add fit data to file + self._fit_freq.add(self._fit_function.freq_fit) + self._fit_amp.append(self._fit_function.amp_fit) + self._fit_pha.append(self._fit_function.pha_fit) + self._fit_real.append(self._fit_function.amp_fit*np.cos(self._fit_function.pha_fit)) if self.storeRealImag else None + self._fit_imag.append(self._fit_function.amp_fit*np.sin(self._fit_function.pha_fit)) if self.storeRealImag else None + + self._fit_extracts: dict[str, hdf.hdf_dataset] = {} + for key, val in self._fit_function.extract_data.items(): + # define extract data in 1D case here instead of before measurement, so the file ist not too clustered if measurement fails + self._fit_extracts[key] = self._data_file.add_coordinate("fit_" + key, unit="", folder="analysis") + self._fit_extracts[key].append(val) qkit.flow.end() self._end_measurement() @@ -459,8 +513,7 @@ def measure_2D(self, web_visible=True): if self.exp_name: self._file_name += '_' + self.exp_name - if self.progress_bar: self._p = Progress_Bar(len(self.x_vec), '2D VNA sweep ' + self.dirname, - self.vna.get_sweeptime_averages()) + if self.progress_bar: self._p = Progress_Bar(len(self.x_vec), '2D VNA sweep ' + self.dirname, self.vna.get_sweeptime_averages()) self._prepare_measurement_vna() self._prepare_measurement_file() @@ -469,12 +522,10 @@ def measure_2D(self, web_visible=True): if self._nop < 10: self._data_file.hf.hf.attrs['default_ds'] =['views/amplitude_midpoint', 'views/phase_midpoint'] else: - self._data_file.hf.hf.attrs['default_ds'] = ['data0/amplitude_midpoint', 'data0/phase_midpoint'] + self._data_file.hf.hf.attrs['default_ds'] = ['amplitude', 'phase'] + (['views/IQ'] if self.storeRealImag else []) if self.open_qviewkit: - self._qvk_process = qviewkit.plot(self._data_file.get_filepath(),datasets=list(self._data_file.hf.hf.attrs['default_ds'])) - if self._fit_resonator: - self._resonator = resonator(self._data_file.get_filepath()) + self._qvk_process = qviewkit.plot(self._data_file.get_filepath(), datasets=list(self._data_file.hf.hf.attrs['default_ds'])) self._measure() def measure_3D(self, web_visible=True): @@ -512,10 +563,8 @@ def measure_3D(self, web_visible=True): self._prepare_measurement_file() """opens qviewkit to plot measurement, amp and pha are opened by default""" """only middle point in freq array is plotted vs x and y""" - if self.open_qviewkit: self._qvk_process = qviewkit.plot(self._data_file.get_filepath(), - datasets=['amplitude', 'phase']) - if self._fit_resonator: - self._resonator = resonator(self._data_file.get_filepath()) + if self.open_qviewkit: + self._qvk_process = qviewkit.plot(self._data_file.get_filepath(), datasets=['amplitude', 'phase'] + (['views/IQ'] if self.storeRealImag else [])) if self.progress_bar: if self.landscape.xylandscapes: @@ -564,7 +613,6 @@ def _measure(self): measures and plots the data depending on the measurement type. the measurement loops feature the setting of the objects and saving the data in the .h5 file. ''' - qkit.flow.start() try: """ loop: x_obj with parameters from x_vec @@ -572,43 +620,41 @@ def _measure(self): for ix, x in enumerate(self.x_vec): self.x_set_obj(x) sleep(self.tdx) - - if self.log_function != None: - for i, f in enumerate(self.log_function): - self._log_value[i].append(float(f())) - - if self.log_function_2D != None: - for i, f in enumerate(self.log_function_2D): - self._log_value_2D[i].append(f()) + self._sweeptime_averages = self.vna.get_sweeptime_averages() if self._scan_dim == 3: - for y in self.y_vec: + fit_extracts_helper = {} # for book-keeping current y-line + for iy, y in enumerate(self.y_vec): # loop: y_obj with parameters from y_vec (only 3D measurement) if self.landscape.xylandscapes and not self.landscape.perform_measurement_at_point(x, y, ix): # if point is not of interest (not close to one of the functions) - data_amp = np.full(int(self._nop), np.NaN, dtype=np.float16) - data_pha = np.full(int(self._nop), np.NaN, dtype=np.float16) # fill with NaNs + data_amp = np.full(int(self._nop), np.nan, dtype=np.float16) + data_pha = np.full(int(self._nop), np.nan, dtype=np.float16) # fill with NaNs else: self.y_set_obj(y) sleep(self.tdy) + self._sweeptime_averages = self.vna.get_sweeptime_averages() + if self.averaging_start_ready: self.vna.start_measurement() # Check if the VNA is STILL in ready state, then add some delay. # If you manually decrease the poll_inveral, I guess you know what you are doing and will disable this safety query. if self.vna_poll_interval >= 0.1 and self.vna.ready(): logging.debug("VNA STILL ready... Adding delay") - qkit.flow.sleep( - .2) # just to make sure, the ready command does not *still* show ready + sleep(0.2) # just to make sure, the ready command does not *still* show ready while not self.vna.ready(): - qkit.flow.sleep(min(self.vna.get_sweeptime_averages(query=False) / 11., self.vna_poll_interval)) + sleep(min(self.vna.get_sweeptime_averages(query=False) / 11., self.vna_poll_interval)) else: self.vna.avg_clear() - qkit.flow.sleep(self._sweeptime_averages) + sleep(self._sweeptime_averages) # if "avg_status" in self.vna.get_function_names(): # while self.vna.avg_status() < self.vna.get_averages(): # qkit.flow.sleep(.2) #maybe one would like to adjust this at a later point + + for lf in self.log_funcs: + lf.logIfDesired(ix, iy) """ measurement """ if not self.landscape.xzlandscape_func: # normal scan @@ -625,44 +671,82 @@ def _measure(self): else: self._data_amp.append(data_amp) self._data_pha.append(data_pha) + self._data_real.append(data_amp*np.cos(data_pha)) if self.storeRealImag else None + self._data_imag.append(data_amp*np.sin(data_pha)) if self.storeRealImag else None + if self._fit_resonator: - self._do_fit_resonator() - qkit.flow.sleep() + self._fit_function.do_fit(self._freqpoints[self._fit_select], data_amp[self._fit_select], data_pha[self._fit_select]) + + self._fit_freq.add(self._fit_function.freq_fit) if (ix == 0) & (iy == 0) else None + self._fit_amp.append(self._fit_function.amp_fit) + self._fit_pha.append(self._fit_function.pha_fit) + self._fit_real.append(self._fit_function.amp_fit*np.cos(self._fit_function.pha_fit)) if self.storeRealImag else None + self._fit_imag.append(self._fit_function.amp_fit*np.sin(self._fit_function.pha_fit)) if self.storeRealImag else None + + for key, val in self._fit_function.extract_data.items(): + if iy == 0: + fit_extracts_helper[key] = np.full(len(self.y_vec), np.nan) + fit_extracts_helper[key][iy] = val + self._fit_extracts[key].append(fit_extracts_helper[key], reset=(iy != 0)) + """ filling of value-box is done here. after every y-loop the data is stored the next 2d structure """ self._data_amp.next_matrix() self._data_pha.next_matrix() + self._data_real.next_matrix() if self.storeRealImag else None + self._data_imag.next_matrix() if self.storeRealImag else None + if self._fit_resonator: + self._fit_amp.next_matrix() + self._fit_pha.next_matrix() + self._fit_real.next_matrix() if self.storeRealImag else None + self._fit_imag.next_matrix() if self.storeRealImag else None if self._scan_dim == 2: + for lf in self.log_funcs: + lf.logIfDesired(ix) + if self.averaging_start_ready: self.vna.start_measurement() if self.vna.ready(): logging.debug("VNA STILL ready... Adding delay") - qkit.flow.sleep(.2) # just to make sure, the ready command does not *still* show ready + sleep(.2) # just to make sure, the ready command does not *still* show ready while not self.vna.ready(): - qkit.flow.sleep(min(self.vna.get_sweeptime_averages(query=False) / 11., .2)) + sleep(min(self.vna.get_sweeptime_averages(query=False) / 11., 0.2)) else: self.vna.avg_clear() - qkit.flow.sleep(self._sweeptime_averages) + sleep(self._sweeptime_averages) + """ measurement """ if not self.landscape.xzlandscape_func: # normal scan data_amp, data_pha = self.vna.get_tracedata() else: data_amp, data_pha = self.landscape.get_tracedata_xz(x) + + if self.progress_bar: + self._p.iterate() + self._data_amp.append(data_amp) self._data_pha.append(data_pha) + self._data_real.append(data_amp*np.cos(data_pha)) if self.storeRealImag else None + self._data_imag.append(data_amp*np.sin(data_pha)) if self.storeRealImag else None if self._fit_resonator: - self._do_fit_resonator() - if self.progress_bar: - self._p.iterate() - qkit.flow.sleep() + self._fit_function.do_fit(self._freqpoints[self._fit_select], data_amp[self._fit_select], data_pha[self._fit_select]) + + self._fit_freq.add(self._fit_function.freq_fit) if (ix == 0) else None + self._fit_amp.append(self._fit_function.amp_fit) + self._fit_pha.append(self._fit_function.pha_fit) + self._fit_real.append(self._fit_function.amp_fit*np.cos(self._fit_function.pha_fit)) if self.storeRealImag else None + self._fit_imag.append(self._fit_function.amp_fit*np.sin(self._fit_function.pha_fit)) if self.storeRealImag else None + + for key, val in self._fit_function.extract_data.items(): + self._fit_extracts[key].append(val) + finally: self._end_measurement() - qkit.flow.end() def _end_measurement(self): ''' @@ -673,57 +757,49 @@ def _end_measurement(self): t = threading.Thread(target=qviewkit.save_plots, args=[self._data_file.get_filepath(), self._plot_comment]) t.start() self._data_file.close_file() + for lf in self.log_funcs: + lf.file.close_file() waf.close_log_file(self._log) self.dirname = None if self.averaging_start_ready: self.vna.post_measurement() - def set_resonator_fit(self, fit_resonator=True, fit_function='', f_min=None, f_max=None): + def set_resonator_fit(self, fit_resonator=False, fit_function = None, f_min=None, f_max=None, **fit_opt_kwargs): ''' sets fit parameter for resonator fit_resonator (bool): True or False, default: True (optional) - fit_function (string): function which will be fitted to the data (optional) + fit_function (string): function which will be fitted to the data + 'lorentzian','skewed_lorentzian','circle_fit_reflection', 'circle_fit_notch','fano' f_min (float): lower frequency boundary for the fitting function, default: None (optional) f_max (float): upper frequency boundary for the fitting function, default: None (optional) - fit types: 'lorentzian','skewed_lorentzian','circle_fit_reflection', 'circle_fit_notch','fano' + fit_opt_kwargs: kwargs to be passed to the fit function ''' if not fit_resonator: self._fit_resonator = False return - self._functions = {'lorentzian': 0, 'skewed_lorentzian': 1, 'circle_fit_reflection': 2, 'circle_fit_notch': 3, - 'fano': 4, 'all_fits': 5} + if fit_function == "circle_fit_reflection": # this distinction is handled here manually + fit_opt_kwargs.update({"n_ports": 1}) + if fit_function == "circle_fit_notch": + fit_opt_kwargs.update({"n_ports": 2}) try: - self._fit_function = self._functions[fit_function] + self._fit_function: resonatorFit.ResonatorFitBase = resonatorFit.FitNames[fit_function](**fit_opt_kwargs) # init fit-class here except KeyError: - logging.error( - 'Fit function not properly set. Must be either \'lorentzian\', \'skewed_lorentzian\', \'circle_fit_reflection\', \'circle_fit_notch\', \'fano\', or \'all_fits\'.') + logging.error('Fit function not properly set. Must be either \'lorentzian\', \'skewed_lorentzian\', \'circle_fit_reflection\', \'circle_fit_notch\', \'fano\'.') else: self._fit_resonator = True - self._f_min = f_min - self._f_max = f_max + self._f_min = -np.inf if f_min is None else f_min + self._f_max = np.inf if f_max is None else f_max - def _do_fit_resonator(self): + def _do_fit_resonator(self, freq, amp, pha): ''' calls fit function in resonator class - fit function is specified in self.set_fit, with boundaries f_mim and f_max + fit function is specified in self.set_resonator_fit, with boundaries f_min and f_max only the last 'slice' of data is fitted, since we fit live while measuring. ''' - if self._fit_function == 0: # lorentzian - self._resonator.fit_lorentzian(f_min=self._f_min, f_max=self._f_max) - elif self._fit_function == 1: # skewed_lorentzian - self._resonator.fit_skewed_lorentzian(f_min=self._f_min, f_max=self._f_max) - elif self._fit_function == 2: # circle_reflection - self._resonator.fit_circle(reflection=True, f_min=self._f_min, f_max=self._f_max) - elif self._fit_function == 3: # circle_notch - self._resonator.fit_circle(notch=True, f_min=self._f_min, f_max=self._f_max) - elif self._fit_function == 4: # fano - self._resonator.fit_fano(f_min=self._f_min, f_max=self._f_max) - elif self._fit_function == 5: #all fits - logging.warning("Please performe fits individually, fit all is currently not supported.") - # self._resonator.fit_all_fits(f_min=self._f_min, f_max = self._f_max) - else: - logging.error("Fit function set in spectrum.set_resonator_fit is not supported. Must be either \'lorentzian\', \'skewed_lorentzian\', \'circle_fit_reflection\', \'circle_fit_notch\', \'fano\', or \'all_fits\'.") + select = (freq >= self._f_min) & (freq <= self._f_max) + self._fit_function.do_fit(freq[select], amp[select], pha[select]) + # fit result stored in _fit_function object. Storing this data handled back in main measure loop def set_tdx(self, tdx): self.tdx = tdx diff --git a/src/qkit/measure/transport/transport.py b/src/qkit/measure/transport/transport.py index 38990c6ed..8ac7c59be 100644 --- a/src/qkit/measure/transport/transport.py +++ b/src/qkit/measure/transport/transport.py @@ -29,6 +29,7 @@ from qkit.gui.plot import plot as qviewkit from qkit.gui.notebook.Progress_Bar import Progress_Bar from qkit.measure.measurement_class import Measurement +from qkit.measure.logging_base import logFunc import qkit.measure.write_additional_files as waf @@ -118,8 +119,9 @@ def __init__(self, IV_Device): # x and y data self._hdf_x = None self._hdf_y = None + self.log_init_params = [] # buffer is necessary to allow adjusting x/y parameter sweeps after setting log functions + self.log_funcs: list[logFunc] = [] # xy, 2D & 3D scan variables - self.set_log_function() self._x_name = '' self._y_name = '' self._scan_dim = None @@ -666,6 +668,28 @@ def set_xy_parameters(self, x_name, x_func, x_vec, x_unit, y_name, y_func, y_uni self._x_dt = x_dt return + def add_logger(self, func, name="log_param", unit="", dtype="f", over_x=True, over_y=True, is_trace=False, trace_base_vals=None, trace_base_name=None, trace_base_unit=None): + """ + Migration from set_log_function: + + -------- + def get_T(): + ... # returns a float + def a(): + ... # returns a float + + # obsolete + # tr.set_log_function([get_T, a], ["temp", "a_name"], ["K", "a_unit"]) + + # do instead + tr.add_logger(get_T, "temp", "K") # default migration + tr.add_logger(a, "a_name", "a_unit", over_y=False) # skip y-iteration if desired + ------ + + Alternatively more options like logging traces or skipping the x-iteration are possible now, see qkit/measure/logging_base for details. + """ + self.log_init_params += [(func, name, unit, dtype, over_x, over_y, is_trace, trace_base_vals, trace_base_name, trace_base_unit)] # handle logger initialization in prepare_file + def set_log_function(self, func=None, name=None, unit=None, dtype='f'): """ Saves desired values obtained by a function in the .h5-file as a value vector with name , unit and in data format . @@ -690,39 +714,33 @@ def set_log_function(self, func=None, name=None, unit=None, dtype='f'): None """ # TODO: dtype = float instead of 'f' - # log-function + # log-function & general case selection of one logger, multiple or reset if callable(func): - func = [func] + self.add_logger(func, "log_param" if name is None else name, "" if unit is None else unit, dtype) elif func is None: - func = [None] + self.reset_log_function() + return elif np.iterable(func): for fun in func: if not callable(fun): raise ValueError('{:s}: Cannot set {!s} as y-function: callable object needed'.format(__name__, fun)) else: - raise ValueError('{:s}: Cannot set {!s} as log-function: callable object of iterable object of callable objects needed'.format(__name__, func)) - self.log_function = func + raise ValueError('{:s}: Cannot set {!s} as log-function: callable object or iterable object of callable objects needed'.format(__name__, func)) + # continue with multiple loggers handling + # log-name if name is None: - try: - name = ['log_param']*len(func) - except Exception: - name = [None] - elif type(name) is str: - name = [name]*len(func) + name = ['log_param_{}'.format(i) for i in range(len(func))] elif np.iterable(name): for _name in name: if type(_name) is not str: raise ValueError('{:s}: Cannot set {!s} as log-name: string needed'.format(__name__, _name)) else: - raise ValueError('{:s}: Cannot set {!s} as log-name: string of iterable object of strings needed'.format(__name__, name)) - self.log_name = name + raise ValueError('{:s}: Cannot set {!s} as log-name: string or iterable object of strings needed'.format(__name__, name)) + # log-unit if unit is None: - try: - unit = ['log_unit']*len(func) - except Exception: - unit = [None] + unit = ['']*len(func) elif type(unit) is str: unit = [unit]*len(func) elif np.iterable(unit): @@ -730,15 +748,12 @@ def set_log_function(self, func=None, name=None, unit=None, dtype='f'): if type(_unit) is not str: raise ValueError('{:s}: Cannot set {!s} as log-unit: string needed'.format(__name__, _unit)) else: - raise ValueError('{:s}: Cannot set {!s} as log-unit: string of iterable object of strings needed'.format(__name__, unit)) - self.log_unit = unit + raise ValueError('{:s}: Cannot set {!s} as log-unit: string or iterable object of strings needed'.format(__name__, unit)) + # log-dtype if dtype is None: - try: - dtype = [float]*len(func) - except Exception: - dtype = [None] - elif type(dtype) is type: + dtype = ["f"]*len(func) + elif type(dtype) is str: dtype = [dtype]*len(func) elif np.iterable(dtype): for _dtype in dtype: @@ -746,48 +761,16 @@ def set_log_function(self, func=None, name=None, unit=None, dtype='f'): raise ValueError('{:s}: Cannot set {!s} as log-dtype: string needed'.format(__name__, _dtype)) else: raise ValueError('{:s}: Cannot set {!s} as log-dtype: string of iterable object of strings needed'.format(__name__, dtype)) - self.log_dtype = dtype - return - - def get_log_function(self): - """ - Gets the current log_function settings. - - Parameters - ---------- - None - Returns - ------- - func: array_likes of callable objects - A callable object that returns the value to be saved. - name: array_likes of strings - Names of logging parameter appearing in h5 file. Default is 'log_param'. - unit: array_likes of strings - Units of logging parameter. Default is ''. - dtype: array_likes of dtypes - h5 data type to be used in the data file. Default is float (float64). - """ - return [f.__name__ for f in self.log_function], self.log_name, self.log_unit, self.log_dtype - + # add iterables + for i in range(len(func)): + self.add_logger(func[i], name[i], unit[i], dtype[i]) + def reset_log_function(self): """ - Resets all log_function settings. - - Parameters - ---------- - None - - Returns - ------- - None + Clear all set log functions """ - self.log_function = [] - self.log_name = [] - self.log_unit = [] - self.log_dtype = [] - self._hdf_log = [] - return + self.log_init_params = [] def set_fit_IV(self, func, name, unit='', **kwargs): """ @@ -1188,8 +1171,6 @@ def _pass(arg): self._pb.iterate() time.sleep(self._x_dt) elif self._scan_dim in [1, 2, 3]: # IV curve - _rst_log_hdf_appnd = False # variable to save points of log-function in 2D-matrix - self._rst_fit_hdf_appnd = False for self.ix, (x, x_func) in enumerate([(None, _pass)] if self._scan_dim < 2 else [(x, self._x_set_obj) for x in self._x_vec]): # loop: x_obj with parameters from x_vec if 2D or 3D else pass(None) x_func(x) time.sleep(self._x_dt) @@ -1197,19 +1178,9 @@ def _pass(arg): y_func(y) time.sleep(self._tdy) # log function - if self.log_function != [None]: - for j, f in enumerate(self.log_function): - if self._scan_dim == 1: - self._data_log[j] = np.array([float(f())]) # np.asarray(f(), dtype=float) - self._hdf_log[j].append(self._data_log[j]) - elif self._scan_dim == 2: - self._data_log[j][self.ix] = float(f()) - self._hdf_log[j].append(self._data_log[j], reset=True) - elif self._scan_dim == 3: - self._data_log[j][self.ix, self.iy] = float(f()) - self._hdf_log[j].append(self._data_log[j][self.ix], reset=_rst_log_hdf_appnd) - if self._scan_dim == 3: # reset needs to be updated for all log-functions simultaneously and thus outside of the loop - _rst_log_hdf_appnd = not bool(self.iy+1 == len(self._y_vec)) + for lf in self.log_funcs: + time.sleep(0.2) + lf.logIfDesired(self.ix, self.iy) # iterate sweeps and take data self._get_sweepdata() # filling of value-box by storing data in the next 2d structure after every y-loop @@ -1223,6 +1194,8 @@ def _pass(arg): t = threading.Thread(target=qviewkit.save_plots, args=[self._data_file.get_filepath(), self._plot_comment]) t.start() self._data_file.close_file() + for lf in self.log_funcs: + lf.file.close_file() waf.close_log_file(self._log_file) self._set_IVD_status(False) print('Measurement complete: {:s}'.format(self._data_file.get_filepath())) @@ -1305,6 +1278,7 @@ def _prepare_measurement_file(self): self._hdf_dIdV = [] self._hdf_fit = [] self._data_fit = [] + if self._scan_dim == 0: ''' xy ''' # add data variables @@ -1324,6 +1298,7 @@ def _prepare_measurement_file(self): self._data_file.add_view('{:s}_vs_{:s}'.format(*np.array(self._y_name)[np.array(view[::-1])]), x=self._hdf_y[view[0]], y=self._hdf_y[view[1]]) + elif self._scan_dim == 1: ''' 1D scan ''' # add data variables @@ -1343,7 +1318,7 @@ def _prepare_measurement_file(self): if self._dVdI: self._hdf_dVdI.append(self._data_file.add_value_vector('dVdI_{!s}'.format(i), x=self._hdf_bias[i], - unit='V/A', + unit='Ohm', save_timestamp=False, folder='analysis', comment=self._get_numder_comment(self._hdf_V[i].name)+ @@ -1364,8 +1339,6 @@ def _prepare_measurement_file(self): folder='analysis', comment=self._get_fit_comment(i))) self._data_fit.append(np.nan) - # log-function - self._add_log_value_vector() # add views self._add_views() elif self._scan_dim == 2: @@ -1414,8 +1387,6 @@ def _prepare_measurement_file(self): folder='analysis', comment=self._get_fit_comment(i))) self._data_fit.append(np.ones(len(self._x_vec)) * np.nan) - # log-function - self._add_log_value_vector() # add views self._add_views() elif self._scan_dim == 3: @@ -1473,10 +1444,19 @@ def _prepare_measurement_file(self): folder='analysis', comment=self._get_fit_comment(i))) self._data_fit.append(np.ones((len(self._x_vec), len(self._y_vec))) * np.nan) - # log-function - self._add_log_value_vector() # add views self._add_views() + + # logging + self.log_funcs = [] + for init_tuple in self.log_init_params: + func, name, unit, dtype, over_x, over_y, is_trace, trace_base_vals, trace_base_name, trace_base_unit = init_tuple + self.log_funcs += [logFunc(self._data_file.get_filepath(), func, name, unit, dtype, + self._hdf_x.ds_url if (self._scan_dim >= 2) and over_x else None, + self._hdf_y.ds_url if (self._scan_dim == 3) and over_y else None, + (trace_base_vals, trace_base_name, trace_base_unit) if is_trace else None)] + self.log_funcs[-1].prepare_file() + ''' add comment ''' if self._comment: self._data_file.add_comment(self._comment) @@ -1577,41 +1557,6 @@ def _get_fit_comment(self, i): 'dVdI={:s}'.format(self._hdf_dVdI[i].name)], ['{:s}={!s}'.format(key, val) for key, val in self._fit_kwargs.items()] if self._fit_kwargs else []]))+\ ')' - - def _add_log_value_vector(self): - """ - Adds all value vectors for log-function parameter. - - Parameters - ---------- - None - - Returns - ------- - None - """ - if self.log_function != [None]: - self._hdf_log = [] - self._data_log = [] - for i, _ in enumerate(self.log_function): - if self._scan_dim == 1: - self._hdf_log.append(self._data_file.add_coordinate(self.log_name[i], - unit=self.log_unit[i])) - self._data_log.append(np.nan) - elif self._scan_dim == 2: - self._hdf_log.append(self._data_file.add_value_vector(self.log_name[i], - x=self._hdf_x, - unit=self.log_unit[i], - dtype=self.log_dtype[i])) - self._data_log.append(np.ones(len(self._x_vec))*np.nan) - elif self._scan_dim == 3: - self._hdf_log.append(self._data_file.add_value_matrix(self.log_name[i], - x=self._hdf_x, - y=self._hdf_y, - unit=self.log_unit[i], - dtype=self.log_dtype[i])) - self._data_log.append(np.ones((len(self._x_vec), len(self._y_vec)))*np.nan) - return def _add_views(self): """ diff --git a/src/qkit/measure/transport_measurement.py b/src/qkit/measure/transport/transport_measurement.py similarity index 67% rename from src/qkit/measure/transport_measurement.py rename to src/qkit/measure/transport/transport_measurement.py index ead7d56c0..1caefca15 100644 --- a/src/qkit/measure/transport_measurement.py +++ b/src/qkit/measure/transport/transport_measurement.py @@ -19,36 +19,44 @@ class _MeasureMode: measure_unit: str class MeasureModes(Enum): + """ + TODO DocString BiasSense (?) + """ IV = _MeasureMode('i', 'A', 'v', 'V') VI = _MeasureMode('v', 'V', 'i', 'A') class TransportMeasurement(MeasurementTypeAdapter): _measurement_descriptors: list[tuple[MeasurementTypeAdapter.DataDescriptor, MeasurementTypeAdapter.DataDescriptor]] + _sweep_parameters: list[tuple[float, float, float]] _iv_device: AbstractIVDevice _mode: MeasureModes _sleep: float + _extend_range: bool - def __init__(self, iv_device: AbstractIVDevice, mode: MeasureModes = MeasureModes.IV, sleep: float = 0): + def __init__(self, iv_device: AbstractIVDevice, mode: MeasureModes = MeasureModes.IV, sleep: float = 0, extend_range=False): super().__init__() self._iv_device = iv_device self._mode = mode self._measurement_descriptors = [] + self._sweep_parameters = [] self._sleep = sleep + self._extend_range = extend_range if mode == MeasureModes.IV: - assert iv_device.get_sweep_mode() == 1 + assert iv_device.get_sweep_bias() == 0 elif mode == MeasureModes.VI: - assert iv_device.get_sweep_mode() == 2 + assert iv_device.get_sweep_bias() == 1 def add_sweep(self, start: float, stop: float, step: float): axis = Axis( - name=f'{self._mode.value.bias_symbol}_{len(self._measurement_descriptors)}', + name=f'{self._mode.value.bias_symbol}_b_{len(self._measurement_descriptors)}', unit=self._mode.value.bias_unit, - range=np.arange(start, stop, step) + range=np.arange(start, stop if not self._extend_range else (stop + step/2), step) ) - self._measurement_descriptors += ( + self._sweep_parameters += [(start, stop, step)] # TODO: Refactor by lazy DataDescriptor creation based on sweep parameters + self._measurement_descriptors += [( MeasurementTypeAdapter.DataDescriptor( - name=f'{self._mode.value.bias_symbol}_b_{len(self._measurement_descriptors)}', + name=f'{self._mode.value.bias_symbol}_{len(self._measurement_descriptors)}', unit=self._mode.value.bias_unit, axes=(axis,) ), @@ -57,7 +65,7 @@ def add_sweep(self, start: float, stop: float, step: float): unit=self._mode.value.measure_unit, axes=(axis,) ) - ) + )] def add_4_quadrant_sweep(self, start: float, stop: float, step: float, offset: float = 0): """ @@ -100,8 +108,15 @@ def add_half_swing_sweep(self, amplitude: float, step: float, offset: float = 0) offset: float Offset value by which and are shifted. Default is 0. """ - self.add_sweep(amplitude + offset, -amplitude + offset, step) - self.add_sweep(-amplitude + offset, amplitude + offset, -step) + self.add_sweep(amplitude + offset, -amplitude + offset, -step) + self.add_sweep(-amplitude + offset, amplitude + offset, step) + + def reset_sweeps(self): + """ + Clear currently defined sweeps + """ + self._sweep_parameters = [] + self._measurement_descriptors = [] @override @property @@ -113,10 +128,28 @@ def expected_structure(self) -> tuple['MeasurementTypeAdapter.DataDescriptor', . def default_views(self) -> dict[str, DataView]: return { 'IV': DataView( + view_params={ + "labels": ('V', 'I'), + 'plot_style': 1, + 'markersize': 5 + }, + view_sets=list(itertools.chain( + DataViewSet( + x_path = DataReference(b.name if self._mode == MeasureModes.VI else m.name), + y_path = DataReference(m.name if self._mode == MeasureModes.VI else b.name), + ) for (b, m) in self._measurement_descriptors + )) + ), + 'VI': DataView( + view_params={ + "labels": ('I', 'V'), + 'plot_style': 1, + 'markersize': 5 + }, view_sets=list(itertools.chain( DataViewSet( - x_path= DataReference(b.name), - y_path= DataReference(m.name), + x_path = DataReference(b.name if self._mode == MeasureModes.IV else m.name), + y_path = DataReference(m.name if self._mode == MeasureModes.IV else b.name), ) for (b, m) in self._measurement_descriptors )) ), @@ -125,9 +158,8 @@ def default_views(self) -> dict[str, DataView]: @override def perform_measurement(self) -> tuple['MeasurementTypeAdapter.GeneratedData', ...]: results = [] - for (bias, measurement) in self._measurement_descriptors: - intended_bias_values = bias.axes[0].range - bias_data, measurement_data = self._iv_device.take_IV(intended_bias_values) + for ((bias, measurement), sweep_params) in zip(self._measurement_descriptors, self._sweep_parameters): + bias_data, measurement_data = self._iv_device.take_IV((*sweep_params, self._sleep)) results.append(( bias.with_data(bias_data), measurement.with_data(measurement_data) diff --git a/src/qkit/measure/transport/twosided.py b/src/qkit/measure/transport/twosided.py new file mode 100644 index 000000000..3d2a9b008 --- /dev/null +++ b/src/qkit/measure/transport/twosided.py @@ -0,0 +1,395 @@ +import numpy as np +import qkit.measure.transport +import qkit.measure.transport.twosided +from scipy import signal +import logging +import time +import typing + +import qkit +import qkit.storage +import qkit.storage.hdf_dataset +import qkit.storage.store +from qkit.gui.notebook.Progress_Bar import Progress_Bar +from qkit.gui.plot import plot as qviewkit +import qkit.measure +import qkit.measure.samples_class +from qkit.measure.measurement_class import Measurement +import qkit.measure.write_additional_files as waf +from qkit.measure.logging_base import logFunc +from qkit.drivers.Double_VTE import Double_VTE + + +class ArbTwosideSweep(object): + def __init__(self, a_vals: np.ndarray, b_vals: np.ndarray): + if len(a_vals.shape) != 1 or len(b_vals.shape) != 1 or a_vals.shape != b_vals.shape: + logging.error("Sweep arrays must be 1D and of same length") + return + self.a_vals = a_vals + self.b_vals = b_vals + + def get(self): + return self.a_vals, self.b_vals + + def is_b_const(self) -> bool: + return np.all(self.b_vals == self.b_vals[0]) + + def set_b_const(self, b_val: float): + self.b_vals = np.linspace(b_val, b_val, self.a_vals.shape[0]) + +class LinearTwoside(ArbTwosideSweep): + def __init__(self, start_a, stop_a, start_b, stop_b, nop): + super().__init__(np.linspace(start_a, stop_a, nop), np.linspace(start_b, stop_b, nop)) + + +class TransportTwoside(object): + """ + Transport measurement routine for the special cases of applying two different voltages on a sample and measuring the resulting current with trans-impedence amplifiers + """ + def __init__(self, DIVD): + """ + DIVD: Double IV-Device. Should provide + - setter/getter matrixes for converting desired effective values (e.g. voltage/current differences/averages) to respective device-side values + - get_sweepdata: effective v_a, v_b arrays in, measured v1, v2, i1, i2 out + + Possible sweep-modes: + - sweep v_a, v_b constant, arb. x/y-coordinates + - sweep v_a, v_b as x/y-coordinate, other x/y arb. + - simultaneous v_a & v_b sweep, arb. x/y-coordinates + """ + self._DIVD: Double_VTE = DIVD + + self._measurement_object = Measurement() + self._measurement_object.measurement_type = 'transport' + + self.filename = None + self.expname = None + self.comment = None + + self.log_init_params = [] # buffer is necessary to allow adjusting x/y parameter sweeps after setting log functions + self.log_funcs: list[logFunc] = [] + + self._x_coordname = None + self._x_set_obj = lambda x: None + self._x_vec = [None] + self._x_unit = None + self._x_dt = None + + self._y_coordname = None + self._y_set_obj = lambda x: None + self._y_vec = [None] + self._y_unit = None + self._y_dt = None + + self._eff_vb_as_coord = None # None or "x" or "y" + + self._sweeps: list[ArbTwosideSweep] = [] + self.eff_va_name = "eff_va" + self.eff_vb_name = "eff_vb" + self.eff_ia_name = "eff_ia" + self.eff_ib_name = "eff_ib" + self.sweep_dt = None + + self.store_effs = True + + self._derivs: list[tuple[str, str]] = [] # specifiers such as ("Ia", "Vb") for calculating dIa/dVb + self.deriv_func: typing.Callable[[np.ndarray, np.ndarray], np.ndarray] = self.savgol_deriv + + self._views: list[tuple[str, str]] = [] # specifiers such as ("Ia", "Vb") + + self._msdim = None # set by resp. measure_ND() call + + ### Measurement preparation ### + def set_sample(self, sample: qkit.measure.samples_class): + self._measurement_object.sample = sample + + def add_logger(self, func, name="log_param", unit="", dtype="f", over_x=True, over_y=True, is_trace=False, trace_base_vals=None, trace_base_name=None, trace_base_unit=None): + """ + Migration from set_log_function: + + -------- + def get_T(): + ... # returns a float + def a(): + ... # returns a float + + # obsolete + # tr.set_log_function([get_T, a], ["temp", "a_name"], ["K", "a_unit"]) + + # do instead + tr.add_logger(get_T, "temp", "K") # default migration + tr.add_logger(a, "a_name", "a_unit", over_y=False) # skip y-iteration if desired + ------ + + Alternatively more options like logging traces or skipping the x-iteration are possible now, see qkit/measure/logging_base for details. + """ + self.log_init_params += [(func, name, unit, dtype, over_x, over_y, is_trace, trace_base_vals, trace_base_name, trace_base_unit)] # handle logger initialization in prepare_file + + def reset_log_function(self): + """ + Clear all set log functions + """ + self.log_init_params = [] + + def clear_sweeps(self): + self._sweeps = [] + + def add_sweep(self, s: ArbTwosideSweep): + self._sweeps += [s] + + def set_x_parameter(self, name: str = None, set_obj: typing.Callable[[float], None] = lambda x: None, vals: list[float] = [None], unit: str = "A.U.", dt: float = None): + """ + Empty call to remove x_params + """ + if self._eff_vb_as_coord == "x": + self._eff_vb_as_coord = None + self._x_coordname = name + self._x_set_obj = set_obj + self._x_vec = vals + self._x_unit = unit + self._x_dt = dt + + def set_x_vb(self, b_vals: list[float], name: str = "eff_v_b", unit: str = "V", dt: float = None): + [logging.warn("Non-constant v_b in sweeps will be overwritten, please check") for sweep in self.sweeps if not sweep.is_b_const()] + if self._eff_vb_as_coord == "y": + logging.warn("Overriding effective Vb as y-coordinate") + self._y_coordname = None + self._y_set_obj = lambda x: None + self._y_vec = [None] + self._y_unit = None + self._y_dt = None + self._eff_vb_as_coord = "x" + self._x_coordname = name + self._x_set_obj = self._set_b_for_axis + self._x_vec = b_vals + self._x_unit = unit + self._x_dt = dt + + def set_y_parameter(self, name: str, set_obj: typing.Callable[[float], None], vals: list[float], unit: str = "A.U.", dt: float = 1e-3): + """ + Empty call to remove x_params + """ + if self._eff_vb_as_coord == "y": + self._eff_vb_as_coord = None + self._y_coordname = name + self._y_set_obj = set_obj + self._y_vec = vals + self._y_unit = unit + self._y_dt = dt + + def set_y_vb(self, b_vals: list[float], name: str = "eff_v_b", unit: str = "V", dt: float = None): + [logging.warn("Non-constant v_b in sweeps will be overwritten, please check") for sweep in self.sweeps if not sweep.is_b_const()] + if self._eff_vb_as_coord == "x": + logging.warn("Overriding effective Vb as y-coordinate") + self._x_coordname = None + self._x_set_obj = lambda x: None + self._x_vec = [None] + self._x_unit = None + self._x_dt = None + self._eff_vb_as_coord = "y" + self._y_coordname = name + self._y_set_obj = self._set_b_for_axis + self._y_vec = b_vals + self._y_unit = unit + self._y_dt = dt + + def add_deriv(self, y: str, x: str): + """ + x/y: should be format "(i/v)(1/2)", e.g. "i2"; or "(i/v)/(1/2/a/b)" if store store_effs enabled. + muste be different + """ + if len(x) != 2 or len(y) != 2: + logging.error("x and y identifiers must have length 2") + return + x = x.lower() + y = y.lower() + if not (x[0] in "iv" and y[0] in "iv" and x[1] in ("12ab" if self.store_effs else "12") and y[1] in ("12ab" if self.store_effs else "12")): + logging.error("x and y should be format '(i/v)(1/2)', e.g. 'i2'; or '(i/v)/(1/2/a/b)' if store store_effs enabled") + return + if x == y: + logging.error("x and y must be different") + return + self._derivs += [(y, x)] + + def clear_derivs(self): + self._derivs = [] + + def add_view(self, y: str, x: str): + """ + x/y: should be format "(i/v)(1/2)", e.g. "i2"; or "(i/v)/(1/2/a/b)" if store store_effs enabled. + muste be different + """ + if len(x) != 2 or len(y) != 2: + logging.error("x and y identifiers must have length 2") + return + x = x.lower() + y = y.lower() + if not (x[0] in "iv" and y[0] in "iv" and x[1] in ("12ab" if self.store_effs else "12") and y[1] in ("12ab" if self.store_effs else "12")): + logging.error("x and y should be format '(i/v)(1/2)', e.g. 'i2'; or '(i/v)/(1/2/a/b)' if store store_effs enabled") + return + if x == y: + logging.error("x and y must be different") + return + self._views += [(y, x)] + + def clear_views(self): + self._views = [] + + ### Main measurement routine ### + def prepare_measurement_file(self): + self.the_file = qkit.storage.store.Data(name='_'.join(list(filter(None, ('{:d}D_IV_curve'.format(self._msdim), self.filename, self.expname)))), mode='a') + # settings + self.the_file.add_textlist('settings').append(waf.get_instrument_settings(self.the_file.get_filepath())) + self._measurement_object.uuid = self.the_file._uuid + self._measurement_object.hdf_relpath = self.the_file._relpath + self._measurement_object.instruments = qkit.instruments.get_instrument_names() + #self._measurement_object.save() # TODO fixme + #self.the_file.add_textlist('measurement').append(self._measurement_object.get_JSON()) + # matrices + mat_dummy_x = self.the_file.add_coordinate("_matrix_x", folder="analysis") + mat_dummy_x.add([1, 2]) + mat_dummy_y = self.the_file.add_coordinate("_matrix_y", folder="analysis") + mat_dummy_y.add([1, 2]) + setter_matrix = self.the_file.add_value_matrix("setter_matrix", mat_dummy_x, mat_dummy_y, folder="analysis") + for sm_elm in self._DIVD._setter_matrix: + setter_matrix.append(sm_elm) + getter_matrix = self.the_file.add_value_matrix("getter_matrix", mat_dummy_x, mat_dummy_y, folder="analysis") + for gm_elm in self._DIVD._getter_matrix: + getter_matrix.append(gm_elm) + # coords + target_eff_va = [self.the_file.add_coordinate("target_" + self.eff_va_name + "_{}".format(i), "V") for i in range(len(self._sweeps))] + target_eff_vb = [self.the_file.add_coordinate("target_" + self.eff_vb_name + "_{}".format(i), "V") for i in range(len(self._sweeps))] + for i in range(len(self._sweeps)): + a, b = self._sweeps[i].get() + target_eff_va[i].add(a) + target_eff_vb[i].add(b) + if self._msdim >= 2: + x_coord = self.the_file.add_coordinate(self._x_coordname, self._x_unit) + x_coord.add(self._x_vec) + if self._msdim == 3: + y_coord = self.the_file.add_coordinate(self._y_coordname, self._y_unit) + y_coord.add(self._y_vec) + # data + def value_dimobj(name, base_coord, unit, folder="data"): + if self._msdim == 1: + return self.the_file.add_value_vector(name, base_coord, unit, folder=folder) + elif self._msdim == 2: + return self.the_file.add_value_matrix(name, x_coord, base_coord, unit, folder=folder) + elif self._msdim == 3: + return self.the_file.add_value_box(name, x_coord, y_coord, base_coord, unit, folder=folder) + self._v1 = [value_dimobj("v1_{}".format(i), target_eff_va[i], "V") for i in range(len(self._sweeps))] + self._v2 = [value_dimobj("v2_{}".format(i), target_eff_va[i], "V") for i in range(len(self._sweeps))] + self._i1 = [value_dimobj("i1_{}".format(i), target_eff_va[i], "A") for i in range(len(self._sweeps))] + self._i2 = [value_dimobj("i2_{}".format(i), target_eff_va[i], "A") for i in range(len(self._sweeps))] + self._va = [value_dimobj(self.eff_va_name + "_{}".format(i), target_eff_va[i], "V") for i in range(len(self._sweeps))] if self.store_effs else [] + self._vb = [value_dimobj(self.eff_vb_name + "_{}".format(i), target_eff_va[i], "V") for i in range(len(self._sweeps))] if self.store_effs else [] + self._ia = [value_dimobj(self.eff_ia_name + "_{}".format(i), target_eff_va[i], "A") for i in range(len(self._sweeps))] if self.store_effs else [] + self._ib = [value_dimobj(self.eff_ib_name + "_{}".format(i), target_eff_va[i], "A") for i in range(len(self._sweeps))] if self.store_effs else [] + # derivs + self._deriv_store: list[list[qkit.storage.hdf_dataset.hdf_dataset]] = [] + for dy, dx in self._derivs: + self._deriv_store += [ [value_dimobj("d{}_d{}_{}".format(dy, dx, i), target_eff_va[i], "V", "analysis") for i in range(len(self._sweeps))] ] + view_buf = self.the_file.add_view("d" + dy + "_d" + dx, eval("self._" + dx)[0], self._deriv_store[-1][0], view_params={"labels": (dx[0].upper(), dy[0].upper() + "/" + dx[0].upper()), 'plot_style': 1, 'markersize': 5}) + for i in range(1, len(self._sweeps)): + view_buf.add(eval("self._" + dx)[i], self._deriv_store[-1][i]) + # extra views + for y, x in self._views: + view_buf = self.the_file.add_view(y + "_" + x, eval("self._" + x)[0], eval("self._" + y)[0], view_params={"labels": (x[0].upper(), y[0].upper()), 'plot_style': 1, 'markersize': 5}) + for i in range(1, len(self._sweeps)): + view_buf.add(eval("self._" + x)[i], eval("self._" + y)[i]) + # logging + self.log_funcs = [] + for init_tuple in self.log_init_params: + func, name, unit, dtype, over_x, over_y, is_trace, trace_base_vals, trace_base_name, trace_base_unit = init_tuple + self.log_funcs += [logFunc(self.the_file.get_filepath(), func, name, unit, dtype, + x_coord.ds_url if (self._msdim >= 2) and over_x else None, + y_coord.ds_url if (self._msdim == 3) and over_y else None, + (trace_base_vals, trace_base_name, trace_base_unit) if is_trace else None)] + self.log_funcs[-1].prepare_file() + + def measure_1D(self): + if len(self._sweeps) == 0: + logging.error("No sweeps set, cannot measure") + return + self._msdim = 1 + self._measure() + + def measure_2D(self): + if len(self._sweeps) == 0: + logging.error("No sweeps set, cannot measure") + return + if self._x_coordname is None: + logging.error("x-coordinate not set, cannot measure 2D") + return + self._msdim = 2 + self._measure() + + def measure_3D(self): + if len(self._sweeps) == 0: + logging.error("No sweeps set, cannot measure") + if self._x_coordname is None: + logging.error("x-coordinate not set, cannot measure 3D") + return + if self._y_coordname is None: + logging.error("y-coordinate not set, cannot measure 3D") + return + self._msdim = 3 + self._measure() + + def _measure(self): + self.prepare_measurement_file() + pb = Progress_Bar((1 if self._msdim < 3 else len(self._y_vec))*(1 if self._msdim < 2 else len(self._x_vec))*len(self._sweeps), self.the_file._uuid) + qviewkit.plot(self.the_file.get_filepath(), datasets=["views/" + y + "_" + x for y, x in self._views]) + try: + for ix, (x, x_func) in enumerate([(None, lambda x: None)] if self._msdim < 2 else [(x, self._x_set_obj) for x in self._x_vec]): + x_func(x) + time.sleep(self._x_dt) if (self._msdim >= 2 and not (self._x_dt is None)) else None + for iy, (y, y_func) in enumerate([(None, lambda y: None)] if self._msdim < 3 else [(y, self._y_set_obj) for y in self._y_vec]): + y_func(y) + time.sleep(self._y_dt) if (self._msdim == 3 and not (self._y_dt is None)) else None + for logger in self.log_funcs: + logger.logIfDesired(ix, iy) + for i in range(len(self._sweeps)): + v1, v2, i1, i2 = self._DIVD.get_sweepdata(*self._sweeps[i].get()) + time.sleep(self.sweep_dt) if not self.sweep_dt is None else None + self._v1[i].append(v1) + self._v2[i].append(v2) + self._i1[i].append(i1) + self._i2[i].append(i2) + if self.store_effs: + vavb = self._DIVD._setter_matrix @ np.concatenate([v1[None,:], v2[None,:]], axis=0) + iaib = self._DIVD._getter_matrix @ np.concatenate([i1[None,:], i2[None,:]], axis=0) + va = vavb[0] + vb = vavb[1] + ia = iaib[0] + ib = iaib[1] + self._va[i].append(va) + self._vb[i].append(vb) + self._ia[i].append(ia) + self._ib[i].append(ib) + for j, (dy, dx) in enumerate(self._derivs): + self._deriv_store[j][i].append(self.deriv_func(eval(dy), eval(dx))) + pb.iterate() + + if self._msdim == 3: + for df in self._v1 + self._v2 + self._i1 + self._i2 + self._va + self._vb + self._ia + self._ib + sum(self._deriv_store, []): + df.next_matrix() + finally: + self.the_file.close_file() + for lf in self.log_funcs: + lf.file.close_file() + print('Measurement complete: {:s}'.format(self.the_file.get_filepath())) + + + ### Helper ### + @staticmethod + def savgol_deriv(x_vals: np.ndarray, y_vals: np.ndarray, **savgol_args): + """ + Numerical derivative via savgol filters based qkit transport script + """ + savgol_args = {'window_length': 10, 'polyorder': 3, 'deriv': 1} | savgol_args + return signal.savgol_filter(y_vals, **savgol_args)/signal.savgol_filter(x_vals, **savgol_args) + + def _set_b_for_axis(self, val: float): + for sweep in self._sweeps: + sweep.set_b_const(val) diff --git a/src/qkit/measure/unified_measurements.py b/src/qkit/measure/unified_measurements.py index 7fc279680..7ef8d4cb9 100644 --- a/src/qkit/measure/unified_measurements.py +++ b/src/qkit/measure/unified_measurements.py @@ -170,7 +170,7 @@ def _largest_measurement_dimension(self): """ if len(self._measurements) == 0: return 0 - return max(map(lambda m: len(m.expected_structure), self._measurements)) + return max(map(lambda m: m.dimension, self._measurements)) def create_datasets(self, data_file: hdf.Data, swept_axes: list[hdf_dataset]): """ @@ -248,6 +248,7 @@ def _run_sweep(self, data_file: hdf.Data, index_list: tuple[int, ...]): sweep, size = self._generate_enumeration(data_file) try: for index, value in tqdm(sweep, desc=self._axis.name, bar_format=bar_format(), total=size, leave=False): + measurement_log.debug(f"Sweeping {self._axis.name} index: {index} value: {value}") try: self._setter(value) self._current_value = value @@ -366,13 +367,6 @@ class DataGenerator(ABC): Handles storing data into datasets, and provides the structures for describing the data. """ - @property - def dataset_category(self) -> Literal['data', 'analysis']: - """ - The category into which the returned data should be sorted. - """ - return 'data' - def store(self, data_file: hdf.Data, data: tuple['MeasurementTypeAdapter.GeneratedData', ...], sweep_indices: tuple[int, ...]): """ Store the generated [data] in the [data_file], while selecting based on the current [sweep_indices]. @@ -395,6 +389,7 @@ class DataDescriptor: name: str axes: tuple[Axis, ...] unit: str = 'a.u.' + category: Literal['data', 'analysis'] = "data" def __post_init__(self): assert isinstance(self.name, str), "Name must be a string!" @@ -402,6 +397,7 @@ def __post_init__(self): for axis in self.axes: assert isinstance(axis, Axis), "Axes must be a tuple of Axis objects!" assert isinstance(self.unit, str), "Unit must be a string!" + assert self.category == "data" or self.category == "analysis", "Category must bei either 'data' or 'analysis'" def with_data(self, data: Union[np.ndarray, float]) -> 'MeasurementTypeAdapter.GeneratedData': """ @@ -421,19 +417,26 @@ def create_dataset(self, file: hdf.Data, axes: list[hdf_dataset]) -> hdf_dataset # The API has different methods, depending on dimensionality, which it then unifies again to a generic case. # For political reasons, we have to live with this. if len(all_axes) == 0: - return file.add_coordinate(name=self.name, unit=self.unit) + return file.add_coordinate(name=self.name, unit=self.unit, folder=self.category) elif len(all_axes) == 1: - return file.add_value_vector(name=self.name,x = all_axes[0], unit=self.unit) + return file.add_value_vector(name=self.name,x = all_axes[0], unit=self.unit, folder=self.category) elif len(all_axes) == 2: - return file.add_value_matrix(name=self.name, x = all_axes[0], y = all_axes[1], unit=self.unit) + return file.add_value_matrix(name=self.name, x = all_axes[0], y = all_axes[1], unit=self.unit, folder=self.category) elif len(all_axes) == 3: - return file.add_value_box(name=self.name, x = all_axes[0], y = all_axes[1], z = all_axes[2], unit=self.unit) + return file.add_value_box(name=self.name, x = all_axes[0], y = all_axes[1], z = all_axes[2], unit=self.unit, folder=self.category) else: raise NotImplementedError("Qkit Store does not support more than 3 dimensions!") @property def ds_url(self): - return f"/entry/data0/{self.name}" + return f"/entry/{self.category}0/{self.name}" + + @property + def dimension(self): + """ + The dimensionality of the expected Data. + """ + return len(self.axes) @dataclass(frozen=True) class GeneratedData: @@ -450,7 +453,7 @@ def __post_init__(self): assert isinstance(self.data, np.ndarray), "MeasurementData must be an ndarray!" assert len(self.descriptor.axes) == len(self.data.shape), f"Data shape (d={len(self.data.shape)}) incongruent with descriptor (d={len(self.descriptor.axes)})" for (i, axis) in enumerate(self.descriptor.axes): - assert self.data.shape[i] == len(axis.range), f"Axis ({axis.name}) length and data length mismatch" + assert self.data.shape[i] == len(axis.range), f"Axis {i} ({axis.name}) length ({len(axis.range)}) and data length ({self.data.shape[i]}) mismatch" def write_data(self, file: hdf.Data, sweep_indices: tuple[int, ...]): """ @@ -530,11 +533,6 @@ def create_datasets(self, data_file: hdf.Data, parent_schema: tuple['Measurement measurement_log.debug(f"Creating View {name} for Analysis.") view.write(data_file, name) - @override - @property - def dataset_category(self) -> Literal['data', 'analysis']: - return 'analysis' - @abstractmethod def expected_structure(self, parent_schema: tuple['MeasurementTypeAdapter.DataDescriptor', ...]) -> tuple[ 'MeasurementTypeAdapter.DataDescriptor', ...]: @@ -587,9 +585,9 @@ def create_datasets(self, data_file: hdf.Data, swept_axes: list[hdf_dataset]): views = self.default_views assert isinstance(views, dict), "Default views must be a dict of str to DataViews!" - for name, view in views: + for name, view in views.items(): assert isinstance(name, str), "Name of view must be a string!" - assert isinstance(view, DataView), "Each view must be of type DataView!" + assert isinstance(view, DataView), f"Each view must be of type DataView! But {name} is {type(view)}" measurement_log.debug(f"Creating view {name}.") view.write(data_file, name) @@ -618,6 +616,13 @@ def with_analysis(self, analysis: AnalysisTypeAdapter) -> 'MeasurementTypeAdapte assert isinstance(analysis, AnalysisTypeAdapter), "Analysis must be of type AnalysisTypeAdapter!" self._analyses.append(analysis) return self + + @property + def dimension(self): + """ + Determine the dimensionality of the measurement as the maximum dimensionality of any expected structure. + """ + return max(map(lambda es: es.dimension, self.expected_structure)) @property @abstractmethod @@ -791,6 +796,9 @@ def run(self, open_qviewkit: bool = True, open_datasets: Optional[list["DataRefe measurement_log.info("Starting measurement") self.run_measurements(data_file, ()) self._run_child_sweep(data_file, ()) + except Exception as e: + import traceback + traceback.print_exc() finally: # Calling into existing plotting code in the background. measurement_log.info("Creating plots...") diff --git a/tests/resonator_fits/SVSEWX_VNA_tracedata.h5 b/tests/resonator_fits/SVSEWX_VNA_tracedata.h5 new file mode 100644 index 000000000..8d705b587 Binary files /dev/null and b/tests/resonator_fits/SVSEWX_VNA_tracedata.h5 differ diff --git a/tests/resonator_fits/resonator_fits.py b/tests/resonator_fits/resonator_fits.py new file mode 100644 index 000000000..cd42e24d2 --- /dev/null +++ b/tests/resonator_fits/resonator_fits.py @@ -0,0 +1,41 @@ +import pytest + +def test_circlefit(): + from qkit.analysis.resonator_fitting import CircleFit + from qkit.storage.store import Data # for reading file + import numpy as np + + datafile = Data("SVSEWX_VNA_tracedata.h5") + freq = np.array(datafile.data.frequency) + amp = np.array(datafile.data.amplitude) + pha = np.array(datafile.data.phase) + + my_circle_fit = CircleFit(n_ports=2) # notch port + my_circle_fit.do_fit(freq, amp, pha) + + print(my_circle_fit.extract_data) + + # check reasonable qc and f_res in 2 sigma interval + assert my_circle_fit.extract_data["Qc"] > 0 + assert (my_circle_fit.extract_data["f_res"] > 5.57019e9*(1 - 2/334)) & (my_circle_fit.extract_data["f_res"] < 5.57019e9*(1 + 2/334)) + + if __name__ == "__main__": + import matplotlib.pyplot as plt + + plt.plot(freq, amp, "ko") + plt.plot(my_circle_fit.freq_fit, my_circle_fit.amp_fit, "r-") + plt.title("Amplitude") + plt.show() + + plt.plot(freq, pha, "ko") + plt.plot(my_circle_fit.freq_fit, my_circle_fit.pha_fit, "r-") + plt.title("Phase") + plt.show() + + plt.plot(amp*np.cos(pha), amp*np.sin(pha), "ko") + plt.plot(my_circle_fit.amp_fit*np.cos(my_circle_fit.pha_fit), my_circle_fit.amp_fit*np.sin(my_circle_fit.pha_fit), "r-") + plt.title("IQ Circle") + plt.show() + +if __name__ == "__main__": + test_circlefit() \ No newline at end of file