The Mediator is a Python-based implementation of Knowledge-Based Temporal Abstraction (KBTA) that converts time-stamped clinical data into interval-based symbolic abstractions for research and predictive modeling.
Key Features:
- β Hierarchical abstractions β Raw Concepts β Events β States β Trends β Contexts β Patterns
- β XSD schema validation β Structural validation for all TAK definitions
- β Production-ready β SQLite backend, ProcessPool processing, comprehensive tests
- β Extensible β XML-based TAK definitions (no code changes needed)
Theoretical Foundation:
This implementation is based on the KBTA framework:
-
Shahar, Y., & Musen, M. A. (1996). "Knowledge-based temporal abstraction in clinical domains." Artificial Intelligence in Medicine, 8(3), 267-298.
-
Shalom, E., Goldstein, A., Weiss, R., Selivanova, M., Cohen, N. M., & Shahar, Y. (2024). "Implementation and evaluation of a system for assessment of the quality of long-term management of patients at a geriatric hospital." Journal of Biomedical Informatics, 156, 104686.
Mediator/
βββ backend/ # Database layer
β βββ data/
β β βββ generate_synthetic_data.ipynb # Synthetic data generator
β β βββ mediator.db # SQLite database (auto-created)
β β βββ input_data.csv # Sample input CSV (auto-created by .ipynb)
β βββ queries/ # SQL templates
β βββ config.py # Database paths
β βββ dataaccess.py # Database access + CLI
βββ core/ # TAK engine
β βββ knowledge-base/ # TAK definitions (XML)
β β βββ raw-concepts/ # Single/multi-attribute concepts
β β βββ events/ # Point-in-time events
β β βββ states/ # Interval-based states
β β βββ trends/ # Slope-based trends
β β βββ contexts/ # Background contexts
β β βββ patterns/ # Temporal patterns
β β βββ global_clippers.json # Global START/END clippers
β β βββ tak_schema.xsd # XSD validation schema
β β βββ TAK_README.md # TAK documentation
β βββ tak/ # TAK implementation
β β βββ tak.py # Base classes + tak rules
β β βββ repository.py # TAK repository object + functions
β β βββ raw_concept.py # RawConcept + ParameterizedRawConcept TAK
β β βββ event.py # Event TAK
β β βββ state.py # State TAK
β β βββ trend.py # Trend TAK
β β βββ context.py # Context TAK
β β βββ pattern.py # Pattern TAK - LocalPattern
β β βββ utils.py # Shared utilities
β βββ config.py # TAK paths
β βββ mediator.py # Orchestration engine + CLI
βββ run_mediator.ipynb # Example flow for deployment option 2
βββ images/ # Documentation assets
βββ unittests/ # Comprehensive test suite
βββ logs/ # Here you'll find post-run log file.
βββ setup.py # Package definition (for pip install -e)
βββ Dockerfile # Docker image definition
βββ docker-compose.yml # Docker Compose configuration
βββ .dockerignore # Files excluded from Docker build
βββ MANIFEST.in # Package data files
βββ requirements.txt # Python dependencies
βββ requirements-py37.txt # Python dependencies (for older envs)
βββ LICENSE # MIT License
βββ README.md # This file
Choose the deployment method that fits your use case:
| Scenario | Recommended Method | Section |
|---|---|---|
| π» Local development (IDE) | Option 1 | Code editing, testing, CLI debugging |
| π Research workflows (Jupyter) | Option 2 | Interactive analysis, Python API |
| π³ Remote/Production (Docker) | Option 3 | Reproducible, isolated, Python 3.7 compatible |
Best for: Code editing, testing, debugging, TAK development with full IDE support
- Python 3.9+ (check:
python3 --version) - IDE (VS Code, PyCharm, etc.)
- Git
# Clone repository
git clone https://github.com/shaharoded/Mediator.git
cd Mediator
# Create virtual environment
python3 -m venv .venv
.venv\Scripts\activate # Windows
# Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
# Install as editable package (enables imports from anywhere)
pip install -e .Create database tables:
python -m backend.dataaccess --create_dbLoad CSV (Option A: Place in backend/data/):
# Copy your CSV to data folder
cp /path/to/your/input_data.csv backend/data/
# Load into database
Load CSV (Option B: Pass absolute path):
# Load from any location
python -m backend.dataaccess --load_csv /absolute/path/to/input_data.csv --yesCSV Requirements:
- Required columns:
PatientId,ConceptName,StartDateTime,EndDateTime,Value - Optional columns:
Unit - Format:
YYYY-MM-DD HH:MM:SStimestamps
Extract TAK ZIP to knowledge-base folder:
# Extract new TAKs (maintains folder structure)
unzip new_taks.zip -d core/knowledge-base/
# Verify extraction
ls core/knowledge-base/raw-concepts/
ls core/knowledge-base/states/
# Validate all TAKs against schema
find core/knowledge-base -name "*.xml" -exec \
xmllint --schema core/knowledge-base/tak_schema.xsd {} \;Process all patients:
python -m core.mediatorProcess specific patients:
python -m core.mediator --patients 1000,1001,1002Debug mode:
python -m core.mediator --patients 1000 --log-level DEBUGCustom settings:
python -m core.mediator \
--kb core/knowledge-base \
--db backend/data/mediator.db \
--max-concurrent 8 \
--log-level INFOThis implementation does not offer a run on a partial subset of TAKs from the repository, since it validates every TAK's dependencies before calculation, and all must be in cache when running the abstraction (it also assumes stored results from past runs might not be credible).
If you wish to run on a subset of TAKs you'll need to pull the desired TAKs + all of their dependencies to a new TAK folder of the same structure, and point the Mediator's config there.
# Run all tests (53 tests)
python -m pytest unittests/ -v
# Run specific test modules
python -m pytest unittests/test_raw_concept.py -v
python -m pytest unittests/test_event.py -v
python -m pytest unittests/test_state.py -v
python -m pytest unittests/test_trend.py -v
python -m pytest unittests/test_context.py -v
python -m pytest unittests/test_pattern.py -v
python -m pytest unittests/test_repository.py -v
python -m pytest unittests/test_mediator.py -v
# With coverage report
python -m pytest unittests/ --cov=core --cov=backend --cov-report=htmlBest for: Research workflows, interactive analysis, visualization, Python API usage. This method is designed to be deployed on an older version of python as found in my remote computer research env. The idea is to use this as a code repository with a main.ipynb file that can import and use the pythonic functions offered here.
Note: Python 3.7 support uses older dependency versions (pandas 1.3.5, numpy 1.21.6) which are no longer maintained. If available, Python 3.9+ is strongly recommended, and will compile with this package as well.
- Python 3.7+ (requirements are adapted to older version for my research env)
- Jupyter Notebook
# Clone repository
git clone https://github.com/shaharoded/Mediator.git
cd MediatorPackage code for manual transfer:
& "C:\Program Files\7-Zip\7z.exe" a -tzip mediator-deploy.zip `
core backend run_mediator.ipynb setup.py MANIFEST.in requirements-py37.txt README.md LICENSE `
"-xr!backend\data\*"If you have an old deploy file - Delete it!. This compression method merge the 2 files. I used 7z for it's exclusion patterns. Other ways to do this will work as well.
Place CSV in backend/data/ folder:
# Copy your CSV
cp /path/to/input_data.csv backend/data/Or use arbitrary location (reference by absolute path in notebook)
# Activate virtual environment
.venv\Scripts\activate # Windows
# Start Jupyter server
jupyter notebookNow navigate to run_mediator.ipynb and continue there.
The notebook containes pythonic usage example to use this package as an API.
In case you need to change the KB TAKs, simply keep in the KB folder only the files you want to keep / update their content etc.
Best for: Production servers, old Python versions (<3.7), reproducible environments, cloud deployment
- Docker installed (check:
docker --version) - 2 GB free disk space
Build from source (LOCAL machine):
# Clone repository
git clone https://github.com/shaharoded/Mediator.git
cd Mediator
# Build Docker image
docker build -t mediator:latest .
# Save image to tar.gz (for transfer)
docker save mediator:latest | gzip > mediator-v1.0.tar.gzTransfer image:
# Transfer tar.gz to remote server
scp mediator-v1.0.tar.gz user@remote-server:/home/user/
# Or manually upload via SFTP/SCPLoad image on target machine:
# SSH to remote server
ssh user@remote-server
# Load Docker image
docker load < /home/user/mediator-v1.0.tar.gz
# Verify image loaded
docker images | grep mediatorCreate data directory on host:
# Create folder for persistent data (DB + logs)
mkdir -p /home/user/mediator_data
cd /home/user/mediator_dataCreate database (one-time):
# Create database tables
docker run --rm -v $(pwd):/app/backend/data \
mediator:latest python -m backend.dataaccess --create_dbPlace CSV in data folder:
# Copy your CSV to data folder
cp /path/to/input_data_file.csv /home/user/mediator_data/Load CSV into database:
# Load CSV from mounted data folder
docker run --rm -v /home/user/mediator_data:/app/backend/data \
mediator:latest python -m backend.dataaccess \
--load_csv /app/backend/data/input_data_file.csv --yesAlternative: Load CSV from arbitrary location:
# Mount CSV from custom location
docker run --rm \
-v /home/user/mediator_data:/app/backend/data \
-v /custom/path/to/input.csv:/app/input.csv \
mediator:latest python -m backend.dataaccess \
--load_csv /app/input.csv --yesOption A: Rebuild image with new TAKs (recommended):
# LOCAL: Extract TAKs to knowledge-base folder
cd Mediator/
unzip new_taks.zip -d core/knowledge-base/
# Verify extraction
ls core/knowledge-base/raw-concepts/
ls core/knowledge-base/states/
# Rebuild Docker image
docker build -t mediator:v1.1 .
# Save and transfer
docker save mediator:v1.1 | gzip > mediator-v1.1.tar.gz
scp mediator-v1.1.tar.gz user@remote-server:/home/user/
# REMOTE: Load new image
ssh user@remote-server
docker load < /home/user/mediator-v1.1.tar.gzOption B: Mount TAK folder at runtime (no rebuild needed):
# Extract TAKs on host machine (outside container)
unzip new_taks.zip -d /home/user/custom_knowledge_base/
# Verify extraction
ls /home/user/custom_knowledge_base/raw-concepts/
ls /home/user/custom_knowledge_base/states/
# Run with custom KB path
docker run --rm \
-v /home/user/mediator_data:/app/backend/data \
-v /home/user/custom_knowledge_base:/app/custom-kb \
mediator:latest python -m core.mediator --kb /app/custom-kbProcess all patients:
docker run --rm -v /home/user/mediator_data:/app/backend/data \
mediator:latest python -m core.mediatorProcess specific patients:
docker run --rm -v /home/user/mediator_data:/app/backend/data \
mediator:latest python -m core.mediator --patients 1000,1001,1002Debug mode:
docker run --rm -v /home/user/mediator_data:/app/backend/data \
mediator:latest python -m core.mediator --patients 1000 --log-level DEBUGCustom settings:
docker run --rm \
-v /home/user/mediator_data:/app/backend/data \
mediator:latest python -m core.mediator \
--max-concurrent 8 \
--log-level INFOInteractive shell (debugging):
# Enter container shell
docker run --rm -it -v /home/user/mediator_data:/app/backend/data \
mediator:latest /bin/bash
# Inside container:
python -m core.mediator --patients 1000 --log-level DEBUG
python -m pytest unittests/ -vAlternative workflow using docker-compose:
# Navigate to project root
cd Mediator/
# Build image
docker-compose build
# Create database
docker-compose run mediator python -m backend.dataaccess --create_db
# Load CSV
docker-compose run mediator python -m backend.dataaccess \
--load_csv /app/backend/data/input_data_file.csv --yes
# Run pipeline
docker-compose run mediator python -m core.mediator --patients 1000,1001Update database with new CSV:
# Replace existing data
docker run --rm \
-v /home/user/mediator_data:/app/backend/data \
-v /path/to/new_data.csv:/app/new_data.csv \
mediator:latest python -m backend.dataaccess \
--load_csv /app/new_data.csv --replace-input --clear-output-qa --yesQuery results after processing:
# Access database using SQLite CLI
docker run --rm -it -v /home/user/mediator_data:/app/backend/data \
mediator:latest sqlite3 /app/backend/data/mediator.db
# Inside SQLite:
# SELECT ConceptName, COUNT(*) FROM OutputPatientData GROUP BY ConceptName;Check logs:
# View mediator run logs
docker run --rm -v /home/user/mediator_data:/app/backend/data \
mediator:latest cat /app/backend/data/mediator_run.logFor detailed information about TAK families, XML schema, validation rules, and examples:
π See: core/knowledge-base/TAK_README.md
Quick TAK Reference:
- Raw Concepts β Bridge InputPatientData β pipeline (multi-attr tuples, numeric ranges, nominal values)
- Events β Point-in-time occurrences (multi-source, flexible constraints)
- States β Interval-based symbolic states (discretization + merging)
- Trends β Slope-based trends (Increasing/Decreasing/Steady)
- Contexts β Background facts (windowing + clipping)
@article{shahar1996knowledge,
title={Knowledge-based temporal abstraction in clinical domains},
author={Shahar, Yuval and Musen, Mark A},
journal={Artificial Intelligence in Medicine},
volume={8},
number={3},
pages={267--298},
year={1996},
publisher={Elsevier}
}
@article{shalom2024implementation,
title={Implementation and evaluation of a system for assessment of the quality of long-term management of patients at a geriatric hospital},
author={Shalom, Erez and Goldstein, Avraham and Weiss, Robert and Selivanova, Marina and Cohen, Nir Menachemi and Shahar, Yuval},
journal={Journal of Biomedical Informatics},
volume={156},
pages={104686},
year={2024},
publisher={Elsevier}
}The following are not important for my use case but might be nice for other applications:
-
Define Overlap(Pattern) to use for complex context. Should check if 2+ contexts (or any other concept) overlap and if so will return their overlap window (can possibly include +- good before/after).
-
Currently parameters in pattern compliance resolve once per patient. Maybe we want to resolve them per each pattern instance (like in parameterized-raw-concept)? For things like BMI as parameter it's not important, but if parameter is "last insulin dose" and we want to check each pattern instance against different value, then it's useful (but also solveable and clearer by using parameterized-raw-concept as anchor/event).
-
Currently, batches are written to queue (and then DB) after every TAK.apply(). A better method will probably write them once for every patient, to reduce the number of write operations. For my needs that was irrelevant, and the current method is a relic from "before parallelism" versions.
Maintained by: Shahar Oded
