Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
</div>

```bash
pip install cognis-modpot
pip install "git+https://github.com/cognis-digital/modpot.git"
modpot scan . # → prioritized findings in seconds
```

<!-- cognis:layman:start -->
## What is this?

modpot is a fake industrial control system (ICS) that you run on a server to attract and log attackers who scan the internet for vulnerable factory or utility equipment. When someone probes it — trying to read sensor values, change control registers, or run diagnostic commands — it records exactly what they did as structured JSON logs you can feed into a SIEM, alert system, or spreadsheet. It speaks Modbus TCP, the most common protocol used in real water treatment plants, power grids, and factory floors, so it looks convincing to automated scanners. Security researchers, threat-intelligence teams, and network defenders use it to see who is actively targeting industrial equipment and what commands they try to run.
<!-- cognis:layman:end -->

## Contents

- [Why modpot?](#why) · [Features](#features) · [Quick start](#quick-start) · [Example](#example) · [Architecture](#architecture) · [AI stack](#ai-stack) · [How it compares](#how-it-compares) · [Integrations](#integrations) · [Install anywhere](#install-anywhere) · [Related](#related) · [Contributing](#contributing)
Expand Down Expand Up @@ -48,10 +54,46 @@ OT threat-intel content engine — drop it on a VPS, share the 'someone tried to
<div align="right"><a href="#top">↑ back to top</a></div>

<a name="quick-start"></a>
<!-- cognis:install:start -->
## Install

`modpot` is source-available (not published to PyPI) — every method below installs
straight from GitHub. Pick whichever you prefer; the one-line scripts auto-detect
the best tool available on your machine.

**One-liner (Linux / macOS):**
```sh
curl -fsSL https://raw.githubusercontent.com/cognis-digital/modpot/HEAD/install.sh | sh
```

**One-liner (Windows PowerShell):**
```powershell
irm https://raw.githubusercontent.com/cognis-digital/modpot/HEAD/install.ps1 | iex
```

**Or install manually — any one of:**
```sh
pipx install "git+https://github.com/cognis-digital/modpot.git" # isolated (recommended)
uv tool install "git+https://github.com/cognis-digital/modpot.git" # uv
pip install "git+https://github.com/cognis-digital/modpot.git" # pip
```

**From source:**
```sh
git clone https://github.com/cognis-digital/modpot.git
cd modpot && pip install .
```

Then run:
```sh
modpot --help
```
<!-- cognis:install:end -->

## Quick start

```bash
pip install cognis-modpot
pip install "git+https://github.com/cognis-digital/modpot.git"
modpot --version
modpot scan . # scan current project
modpot scan . --format json # machine-readable
Expand Down
29 changes: 29 additions & 0 deletions install.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Comprehensive installer for cognis-digital/modpot (Windows PowerShell).
# Tries: pipx -> uv -> pip (git+https) -> from source.
# modpot is source-available and not on PyPI; all paths install from GitHub.
$ErrorActionPreference = "Stop"
$Repo = "modpot"
$Url = "git+https://github.com/cognis-digital/modpot.git"
$Git = "https://github.com/cognis-digital/modpot.git"
function Say($m) { Write-Host "[$Repo] $m" -ForegroundColor Magenta }
function Have($c) { [bool](Get-Command $c -ErrorAction SilentlyContinue) }

if (-not (Have python) -and -not (Have py)) {
Say "Python 3.9+ is required but was not found. Install Python first."; exit 1
}
if (Have pipx) {
Say "Installing with pipx (isolated, recommended)..."
pipx install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: modpot"; exit 0 }
}
if (Have uv) {
Say "Installing with uv..."
uv tool install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: modpot"; exit 0 }
}
if (Have pip) {
Say "Installing with pip (user site)..."
pip install --user $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: modpot"; exit 0 }
}
Say "No packaging tool worked; falling back to a source clone."
$Tmp = Join-Path $env:TEMP "$Repo-src"
git clone --depth 1 $Git $Tmp
Say "Cloned to $Tmp - run: cd $Tmp; python -m pip install ."
44 changes: 34 additions & 10 deletions install.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
#!/usr/bin/env sh
# Universal installer for modpot. Prefers uv > pipx > pip; installs from the repo.
set -e
SRC="git+https://github.com/cognis-digital/modpot.git"
echo "Installing modpot ..."
if command -v uv >/dev/null 2>&1; then uv tool install "$SRC"
elif command -v pipx >/dev/null 2>&1; then pipx install "$SRC"
elif command -v python3 >/dev/null 2>&1; then python3 -m pip install --user "$SRC"
else echo "Need uv, pipx, or python3+pip"; exit 1; fi
echo "Done. Run: modpot --help"
#!/usr/bin/env sh
# Comprehensive installer for cognis-digital/modpot (Linux / macOS).
# Tries the best available method: pipx -> uv -> pip (git+https) -> from source.
# modpot is source-available and not on PyPI; all paths install from GitHub.
set -eu

REPO="modpot"
URL="git+https://github.com/cognis-digital/modpot.git"
GITURL="https://github.com/cognis-digital/modpot.git"

say() { printf '\033[1;35m[%s]\033[0m %s\n' "$REPO" "$1"; }
have() { command -v "$1" >/dev/null 2>&1; }

if ! have python3 && ! have python; then
say "Python 3.9+ is required but was not found. Install Python first."; exit 1
fi

if have pipx; then
say "Installing with pipx (isolated, recommended)..."
pipx install "$URL" && { say "Done. Run: modpot"; exit 0; }
fi
if have uv; then
say "Installing with uv..."
uv tool install "$URL" && { say "Done. Run: modpot"; exit 0; }
fi
if have pip3 || have pip; then
PIP="$(command -v pip3 || command -v pip)"
say "Installing with pip (user site)..."
"$PIP" install --user "$URL" && { say "Done. Run: modpot"; exit 0; }
fi

say "No packaging tool worked; falling back to a source clone."
TMP="$(mktemp -d)"; git clone --depth 1 "$GITURL" "$TMP/$REPO"
say "Cloned to $TMP/$REPO — run: cd $TMP/$REPO && python3 -m pip install ."
2 changes: 1 addition & 1 deletion integrations/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Usage: <tool> scan . --format json | python integrations/webhook.py --url URL
"""
from __future__ import annotations
import argparse, json, sys, urllib.request
import argparse, sys, urllib.request

def main() -> int:
ap = argparse.ArgumentParser()
Expand Down
1 change: 1 addition & 0 deletions layman.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
modpot is a fake industrial control system (ICS) that you run on a server to attract and log attackers who scan the internet for vulnerable factory or utility equipment. When someone probes it — trying to read sensor values, change control registers, or run diagnostic commands — it records exactly what they did as structured JSON logs you can feed into a SIEM, alert system, or spreadsheet. It speaks Modbus TCP, the most common protocol used in real water treatment plants, power grids, and factory floors, so it looks convincing to automated scanners. Security researchers, threat-intelligence teams, and network defenders use it to see who is actively targeting industrial equipment and what commands they try to run.
99 changes: 68 additions & 31 deletions modpot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,22 @@
_SEV_RANK = {"info": 0, "low": 1, "medium": 2, "high": 3}


_CONN_RECV_TIMEOUT = 30 # seconds; prevent hanging on stalled clients


def _read_lines(path: str) -> list[str]:
if path == "-":
return sys.stdin.read().splitlines()
with open(path, "r", encoding="utf-8") as fh:
return fh.read().splitlines()
try:
return sys.stdin.read().splitlines()
except UnicodeDecodeError as exc:
raise OSError(f"stdin contains non-UTF-8 data: {exc}") from exc
try:
with open(path, "r", encoding="utf-8") as fh:
return fh.read().splitlines()
except UnicodeDecodeError as exc:
raise OSError(
f"{path} contains non-UTF-8 data (is it a binary file?): {exc}"
) from exc


def _print_table(events: list[dict]) -> None:
Expand Down Expand Up @@ -102,11 +113,21 @@ def _cmd_analyze(args: argparse.Namespace) -> int:
if args.min_severity:
floor = _SEV_RANK.get(args.min_severity, 0)
events = [e for e in events if _SEV_RANK.get(e["severity"], 0) >= floor]
_emit(events, args.format)
# subcommand --format takes precedence; fall back to top-level --format
fmt = getattr(args, "format", None) or "table"
_emit(events, fmt)
return 1 if _has_high(events) else 0


def _cmd_serve(args: argparse.Namespace) -> int:
import struct as _struct

if not (1 <= args.port <= 65535):
print(
f"error: port {args.port} is out of range (must be 1–65535)",
file=sys.stderr,
)
return 2
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
Expand All @@ -126,33 +147,43 @@ def _cmd_serve(args: argparse.Namespace) -> int:
conn, addr = srv.accept()
src = f"{addr[0]}:{addr[1]}"
with conn:
while True:
head = _recv_exact(conn, 7)
if head is None:
break
import struct

_, _, length, _ = struct.unpack(">HHHB", head)
rest = _recv_exact(conn, max(length - 1, 0))
if rest is None:
break
raw = head + rest
try:
frame = parse_frame(raw)
event = frame_to_event(frame, src=src)
conn.sendall(build_response(frame))
except ParseError as exc:
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"src": src,
"category": "unknown",
"severity": "medium",
"reasons": [f"unparseable frame: {exc}"],
"raw_hex": raw.hex(),
}
if event.get("severity") == "high":
saw_high = True
print(json.dumps(event), flush=True)
conn.settimeout(_CONN_RECV_TIMEOUT)
try:
while True:
head = _recv_exact(conn, 7)
if head is None:
break
try:
_, _, length, _ = _struct.unpack(">HHHB", head)
except _struct.error:
break
if length < 1:
break
# Cap frame body to 260 bytes (Modbus PDU max 253 + 1 unit id).
# A larger length field is an attacker trying to exhaust memory.
body_len = min(max(length - 1, 0), 260)
rest = _recv_exact(conn, body_len)
if rest is None:
break
raw = head + rest
try:
frame = parse_frame(raw)
event = frame_to_event(frame, src=src)
conn.sendall(build_response(frame))
except ParseError as exc:
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"src": src,
"category": "unknown",
"severity": "medium",
"reasons": [f"unparseable frame: {exc}"],
"raw_hex": raw.hex(),
}
if event.get("severity") == "high":
saw_high = True
print(json.dumps(event), flush=True)
except socket.timeout:
pass # stalled client; close connection and move on
except KeyboardInterrupt:
print("\n[modpot] stopped", file=sys.stderr)
finally:
Expand Down Expand Up @@ -205,6 +236,12 @@ def build_parser() -> argparse.ArgumentParser:
default=None,
help="only show events at or above this severity",
)
a.add_argument(
"--format",
choices=["table", "json"],
default=None,
help="output format (overrides top-level --format; default: table)",
)
a.set_defaults(func=_cmd_analyze)

s = sub.add_parser(
Expand Down
11 changes: 8 additions & 3 deletions modpot/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,26 @@ def _decode_pdu(frame: ModbusFrame, fc: int, body: bytes) -> None:
frame.note = "function code body not decoded"


_MAX_COIL_BITS = 2000 # Modbus spec limit for coil read responses
_MAX_REG_COUNT = 125 # Modbus spec limit for register read responses


def build_response(frame: ModbusFrame) -> bytes:
"""Build a plausible honeypot response for a parsed request *frame*.

Reads return zeroed register/coil data of the requested size; writes
Reads return zeroed register/coil data of the requested size (capped at
the Modbus spec maximums so the byte-count field never overflows); writes
echo back the request per spec. Unknown/unsupported codes return a
Modbus exception response (code 0x01, illegal function) so the
honeypot looks like a real but minimal device.
"""
fc = frame.function_code
if fc in (0x01, 0x02):
qty = frame.quantity or 0
qty = min(frame.quantity or 0, _MAX_COIL_BITS)
nbytes = (qty + 7) // 8
pdu = bytes([fc, nbytes]) + b"\x00" * nbytes
elif fc in (0x03, 0x04):
qty = frame.quantity or 0
qty = min(frame.quantity or 0, _MAX_REG_COUNT)
nbytes = qty * 2
pdu = bytes([fc, nbytes]) + b"\x00" * nbytes
elif fc in (0x05, 0x06):
Expand Down
20 changes: 15 additions & 5 deletions modpot/mcp_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""MODPOT MCP server — exposes scan() as an MCP tool for Cognis.Studio."""
"""MODPOT MCP server — exposes analyze_capture() as an MCP tool for Cognis.Studio."""
from __future__ import annotations
from modpot.core import scan, to_json

import json


def serve() -> int:
"""Start an MCP stdio server. Requires the optional 'mcp' extra:
Expand All @@ -11,12 +13,20 @@ def serve() -> int:
except Exception:
print("Install the MCP extra: pip install 'cognis-modpot[mcp]'")
return 1

from modpot.core import analyze_capture

app = FastMCP("modpot")

@app.tool()
def modpot_scan(target: str) -> str:
"""Spin up a high-interaction Modbus/DNP3 ICS honeypot that logs attacker register reads/writes as structured JSON.. Returns JSON findings."""
return to_json(scan(target))
def modpot_analyze(hexlog: str) -> str:
"""Decode and classify Modbus TCP frames from a newline-separated hex
capture log. Returns JSON findings as a string."""
if not hexlog or not hexlog.strip():
return json.dumps([])
lines = hexlog.splitlines()
events = analyze_capture(lines)
return json.dumps(events)

app.run()
return 0
Loading
Loading