Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/tests/mock_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
from tests.mock_api.devices_endpoints import devices_router
from tests.mock_api.measures_endpoints import measures_router
from tests.mock_api.sdk_endpoints import sdk_router
from tests.mock_api import label_endpoints, patient_endpoints
from tests.mock_api import label_endpoints, patient_endpoints, device_patient_endpoints

app = FastAPI()
app.include_router(sdk_router, prefix="/sdk")
app.include_router(measures_router, prefix="/measures")
app.include_router(devices_router, prefix="/devices")
app.include_router(label_endpoints.router, prefix="/labels")
app.include_router(patient_endpoints.router, prefix="/patients")
app.include_router(device_patient_endpoints.router, prefix="/device-patient-mapping")
58 changes: 58 additions & 0 deletions sdk/tests/mock_api/device_patient_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# AtriumDB is a timeseries database software designed to best handle the unique features and
# challenges that arise from clinical waveform data.
# Copyright (C) 2023 The Hospital for Sick Children
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from typing import List, Optional

from fastapi import APIRouter, Depends, HTTPException, Query

from atriumdb import AtriumSDK
from tests.mock_api.sdk_dependency import get_sdk_instance

router = APIRouter()


@router.get("", response_model=List[List[Optional[int]]])
async def get_device_patient_data(
device_id: List[int] = Query(default=[]),
patient_id: List[int] = Query(default=[]),
start_time: Optional[int] = None,
end_time: Optional[int] = None,
timestamp: Optional[int] = None,
atriumdb_sdk: AtriumSDK = Depends(get_sdk_instance)):
if timestamp is not None and (start_time is not None or end_time is not None):
raise HTTPException(
status_code=400,
detail="timestamp and start_time/end_time are mutually exclusive.")

device_id_list = device_id if device_id else None
patient_id_list = patient_id if patient_id else None

if timestamp is not None:
results = atriumdb_sdk.sql_handler.select_device_patient_encounters(
timestamp=timestamp,
device_id_list=device_id_list,
patient_id_list=patient_id_list,
)
else:
results = atriumdb_sdk.sql_handler.select_device_patients(
device_id_list=device_id_list,
patient_id_list=patient_id_list,
start_time=start_time,
end_time=end_time,
)

return [list(row) for row in results]
120 changes: 119 additions & 1 deletion sdk/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def start_server():
# test labels functionality of the api
_test_for_both(DB_NAME, _test_api_labels)

# test the device-patient-mapping endpoint
_test_for_both(DB_NAME, _test_api_device_patient_mapping)


# testing normal operation of the api, getting devices, patients, blocks ect
def _test_api(db_type, dataset_location, connection_params):
Expand Down Expand Up @@ -356,4 +359,119 @@ def _create_label_hierarchy(sdk: AtriumSDK, hierarchy: dict, parent_id=None):
elif isinstance(children, list):
# Iterate over the list of child labels
for child_label in children:
sdk.insert_label_name(name=child_label, parent=label_id)
sdk.insert_label_name(name=child_label, parent=label_id)

# test the device-patient-mapping endpoint of the api
def _test_api_device_patient_mapping(db_type, dataset_location, connection_params):
sdk = AtriumSDK.create_dataset(
dataset_location=dataset_location, database_type=db_type, connection_params=connection_params)

# set up the local sdk instance for the api to use
app.dependency_overrides[get_sdk_instance] = lambda: sdk

# set up remote mode sdk to connect to the api
api_sdk = AtriumSDK(metadata_connection_type="api", api_url="http://127.0.0.1:8123", validate_token=False)
api_sdk.token_expiry = time.time() + 1_000_000

# insert three devices and three patients with three back-to-back encounters
device_id_1 = sdk.insert_device(device_tag='dpm_monitor1')
device_id_2 = sdk.insert_device(device_tag='dpm_monitor2')
device_id_3 = sdk.insert_device(device_tag='dpm_monitor3')

patient_id_1 = sdk.insert_patient(mrn='dpm-1001')
patient_id_2 = sdk.insert_patient(mrn='dpm-1002')
patient_id_3 = sdk.insert_patient(mrn='dpm-1003')

# times in seconds; insert in seconds and assert in nanoseconds (the api returns raw ns)
T1, T2, T3, T4 = 1647084000, 1647094800, 1647105600, 1647116400
T1_ns, T2_ns, T3_ns, T4_ns = (t * 1_000_000_000 for t in (T1, T2, T3, T4))

sdk.insert_device_patient_data([
(device_id_1, patient_id_1, T1, T2),
(device_id_2, patient_id_2, T2, T3),
(device_id_3, patient_id_3, T3, T4),
], time_units='s')

print('Testing device-patient-mapping with no filters...')
all_mappings = api_sdk._api_get_device_patient_data()
expected = {
(device_id_1, patient_id_1, T1_ns, T2_ns),
(device_id_2, patient_id_2, T2_ns, T3_ns),
(device_id_3, patient_id_3, T3_ns, T4_ns),
}
assert set(all_mappings) == expected

print('Testing device-patient-mapping with device_id filter...')
device_1_mappings = api_sdk._api_get_device_patient_data(device_id_list=[device_id_1])
assert device_1_mappings == [(device_id_1, patient_id_1, T1_ns, T2_ns)]

# multiple device ids should be encoded as repeated query params and OR'd together
multi_device_mappings = api_sdk._api_get_device_patient_data(device_id_list=[device_id_1, device_id_3])
assert set(multi_device_mappings) == {
(device_id_1, patient_id_1, T1_ns, T2_ns),
(device_id_3, patient_id_3, T3_ns, T4_ns),
}

print('Testing device-patient-mapping with patient_id filter...')
patient_2_mappings = api_sdk._api_get_device_patient_data(patient_id_list=[patient_id_2])
assert patient_2_mappings == [(device_id_2, patient_id_2, T2_ns, T3_ns)]

print('Testing device-patient-mapping with start_time and end_time filters...')
# a window strictly inside the second encounter selects only that mapping
range_mappings = api_sdk._api_get_device_patient_data(start_time=T2_ns + 1, end_time=T3_ns - 1)
assert range_mappings == [(device_id_2, patient_id_2, T2_ns, T3_ns)]

# a window covering only the first two encounters
range_mappings_two = api_sdk._api_get_device_patient_data(start_time=T1_ns, end_time=T3_ns - 1)
assert set(range_mappings_two) == {
(device_id_1, patient_id_1, T1_ns, T2_ns),
(device_id_2, patient_id_2, T2_ns, T3_ns),
}

print('Testing device-patient-mapping with timestamp filter...')
encounter_at_T2 = api_sdk._api_get_device_patient_data(timestamp=T2_ns + 100)
assert encounter_at_T2 == [(device_id_2, patient_id_2, T2_ns, T3_ns)]

# a timestamp outside any encounter window returns nothing
no_encounter = api_sdk._api_get_device_patient_data(timestamp=T4_ns + 1_000_000_000)
assert no_encounter == []

print('Testing combined timestamp and device_id filter...')
combined = api_sdk._api_get_device_patient_data(timestamp=T2_ns + 100, device_id_list=[device_id_2])
assert combined == [(device_id_2, patient_id_2, T2_ns, T3_ns)]

# filtering by a device that wasn't active at the timestamp returns nothing
combined_none = api_sdk._api_get_device_patient_data(timestamp=T2_ns + 100, device_id_list=[device_id_1])
assert combined_none == []

print('Testing mutual exclusion of timestamp and start_time/end_time...')
# the api raises a 400 when timestamp is combined with a time range, which the sdk surfaces as ValueError
with pytest.raises(ValueError):
api_sdk._request("GET", "device-patient-mapping",
params={'timestamp': T2_ns, 'start_time': T1_ns, 'end_time': T3_ns})
with pytest.raises(ValueError):
api_sdk._request("GET", "device-patient-mapping",
params={'timestamp': T2_ns, 'start_time': T1_ns})
with pytest.raises(ValueError):
api_sdk._request("GET", "device-patient-mapping",
params={'timestamp': T2_ns, 'end_time': T3_ns})

print('Testing the high-level get_device_patient_data wrapper through the api...')
api_data = api_sdk.get_device_patient_data(time_units='s')
expected_data = {
(device_id_1, patient_id_1, float(T1), float(T2)),
(device_id_2, patient_id_2, float(T2), float(T3)),
(device_id_3, patient_id_3, float(T3), float(T4)),
}
assert set(api_data) == expected_data

print('Testing the high-level get_device_patient_encounters wrapper through the api...')
encounter = api_sdk.get_device_patient_encounters(timestamp=T2 + 100, device_id=device_id_2, time_units='s')
assert encounter == [(device_id_2, patient_id_2, float(T2), float(T3))]

no_encounter_high_level = api_sdk.get_device_patient_encounters(
timestamp=T4 + 1, device_id=device_id_3, time_units='s')
assert no_encounter_high_level == []

# close api connection
api_sdk.close()