From aecc31cc2fe3cd4d74221b44242a28cd057c1220 Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 11 Feb 2026 11:19:11 +0100 Subject: [PATCH 01/11] Add first rendition of sqlite3.py sink --- aikido_zen/__init__.py | 1 + aikido_zen/sinks/sqlite3.py | 93 +++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 aikido_zen/sinks/sqlite3.py diff --git a/aikido_zen/__init__.py b/aikido_zen/__init__.py index c65ba3509..1739c4adb 100644 --- a/aikido_zen/__init__.py +++ b/aikido_zen/__init__.py @@ -72,6 +72,7 @@ def protect(mode="daemon", token=""): import aikido_zen.sinks.psycopg import aikido_zen.sinks.asyncpg import aikido_zen.sinks.clickhouse_driver + import aikido_zen.sinks.sqlite3 import aikido_zen.sinks.builtins import aikido_zen.sinks.os diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py new file mode 100644 index 000000000..e333b6103 --- /dev/null +++ b/aikido_zen/sinks/sqlite3.py @@ -0,0 +1,93 @@ +""" +Sink module for `sqlite3` +""" + +from aikido_zen.helpers.get_argument import get_argument +import aikido_zen.vulnerabilities as vulns +from aikido_zen.helpers.register_call import register_call +from aikido_zen.sinks import patch_function, on_import, before + + +@before +def _cursor_execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Cursor.execute", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", op="sqlite3.Cursor.execute", args=(query, "sqlite") + ) + + +@before +def _cursor_executemany(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Cursor.executemany", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", op="sqlite3.Cursor.executemany", args=(query, "sqlite") + ) + + +@before +def _cursor_executescript(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql_script") + + register_call("sqlite3.Cursor.executescript", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", + op="sqlite3.Cursor.executescript", + args=(query, "sqlite"), + ) + + +@before +def _connection_execute(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Connection.execute", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", op="sqlite3.Connection.execute", args=(query, "sqlite") + ) + + +@before +def _connection_executemany(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql") + + register_call("sqlite3.Connection.executemany", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", + op="sqlite3.Connection.executemany", + args=(query, "sqlite"), + ) + + +@before +def _connection_executescript(func, instance, args, kwargs): + query = get_argument(args, kwargs, 0, "sql_script") + + register_call("sqlite3.Connection.executescript", "sql_op") + vulns.run_vulnerability_scan( + kind="sql_injection", + op="sqlite3.Connection.executescript", + args=(query, "sqlite"), + ) + + +@on_import("sqlite3") +def patch(m): + """ + patching sqlite3 + - patches Cursor.execute(sql, ...) + - patches Cursor.executemany(sql, ...) + - patches Cursor.executescript(sql_script) + - patches Connection.execute(sql, ...) + - patches Connection.executemany(sql, ...) + - patches Connection.executescript(sql_script) + """ + patch_function(m, "Cursor.execute", _cursor_execute) + patch_function(m, "Cursor.executemany", _cursor_executemany) + patch_function(m, "Cursor.executescript", _cursor_executescript) + patch_function(m, "Connection.execute", _connection_execute) + patch_function(m, "Connection.executemany", _connection_executemany) + patch_function(m, "Connection.executescript", _connection_executescript) From 3d54bf008843cd5923bd479905216bd3be51e22d Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 11 Feb 2026 11:23:20 +0100 Subject: [PATCH 02/11] Add sqlite3 sink tests --- aikido_zen/sinks/tests/sqlite3_test.py | 239 +++++++++++++++++++++++++ 1 file changed, 239 insertions(+) create mode 100644 aikido_zen/sinks/tests/sqlite3_test.py diff --git a/aikido_zen/sinks/tests/sqlite3_test.py b/aikido_zen/sinks/tests/sqlite3_test.py new file mode 100644 index 000000000..50fbc77f9 --- /dev/null +++ b/aikido_zen/sinks/tests/sqlite3_test.py @@ -0,0 +1,239 @@ +import pytest +from unittest.mock import patch +import aikido_zen.sinks.sqlite3 +from aikido_zen.background_process.comms import reset_comms + +kind = "sql_injection" + + +@pytest.fixture +def database_conn(): + import sqlite3 + + conn = sqlite3.connect(":memory:") + conn.execute( + "CREATE TABLE dogs (id INTEGER PRIMARY KEY, dog_name TEXT, isAdmin INTEGER)" + ) + conn.commit() + return conn + + +def test_cursor_execute(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + query = "SELECT * FROM dogs" + cursor.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + cursor.fetchall() + mock_run_vulnerability_scan.assert_called_once() + + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_execute_parameterized(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + query = "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + cursor.execute(query, ("doggo", 0)) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_execute_no_args(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + dogname = "Doggo" + isadmin = 1 + query = f"INSERT INTO dogs (dog_name, isAdmin) VALUES ('{dogname}', {isadmin})" + cursor.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] + == "INSERT INTO dogs (dog_name, isAdmin) VALUES ('Doggo', 1)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_execute_with_fstring(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + table_name = "dogs" + value_2 = "1" + cursor.execute( + f"INSERT INTO {table_name} (dog_name, isAdmin) VALUES (?, {value_2})", + ("doggy",), + ) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] == "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, 1)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_executemany(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + data = [ + ("Doggy", 0), + ("Doggy 2", 1), + ("Dogski", 1), + ] + cursor.executemany("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", data) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] == "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_cursor_executescript(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor() + script = """ + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Fido', 0); + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Rex', 1); + """ + cursor.executescript(script) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == script + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + cursor.close() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_execute(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + query = "SELECT * FROM dogs" + database_conn.execute(query) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_execute_parameterized(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + query = "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + database_conn.execute(query, ("doggo", 0)) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == query + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_executemany(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + data = [ + ("Doggy", 0), + ("Doggy 2", 1), + ("Dogski", 1), + ] + database_conn.executemany( + "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", data + ) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] == "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.commit() + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() + + +def test_connection_executescript(database_conn): + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + script = """ + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Fido', 0); + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Rex', 1); + """ + database_conn.executescript(script) + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert called_with_args[0] == script + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + database_conn.close() + mock_run_vulnerability_scan.assert_called_once() From 92fe154310136362fe9938326739bc3c0436733f Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Wed, 11 Feb 2026 11:23:42 +0100 Subject: [PATCH 03/11] Update sqlite3.py sink: Add c-level wrapping, still needs some modifications --- aikido_zen/sinks/sqlite3.py | 119 ++++++++++++++++++++++++------------ 1 file changed, 81 insertions(+), 38 deletions(-) diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index e333b6103..7fb835073 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -1,11 +1,22 @@ """ Sink module for `sqlite3` + +sqlite3 uses C-level types for Connection and Cursor, so we cannot directly +patch their methods with wrapt. Instead we: +1. Intercept `sqlite3.connect` and inject a custom `factory` parameter. +2. The custom factory is a dynamic Python Connection subclass whose `cursor()` + returns a wrapped Cursor subclass. +3. All SQL interception happens at the Cursor level. Connection shortcut methods + (execute, executemany, executescript) internally call cursor methods, so + wrapping only the Cursor avoids double-counting. """ +import sqlite3 as _sqlite3 + from aikido_zen.helpers.get_argument import get_argument import aikido_zen.vulnerabilities as vulns from aikido_zen.helpers.register_call import register_call -from aikido_zen.sinks import patch_function, on_import, before +from aikido_zen.sinks import patch_function, on_import, before, before_modify_return @before @@ -24,7 +35,9 @@ def _cursor_executemany(func, instance, args, kwargs): register_call("sqlite3.Cursor.executemany", "sql_op") vulns.run_vulnerability_scan( - kind="sql_injection", op="sqlite3.Cursor.executemany", args=(query, "sqlite") + kind="sql_injection", + op="sqlite3.Cursor.executemany", + args=(query, "sqlite"), ) @@ -40,54 +53,84 @@ def _cursor_executescript(func, instance, args, kwargs): ) -@before -def _connection_execute(func, instance, args, kwargs): - query = get_argument(args, kwargs, 0, "sql") - - register_call("sqlite3.Connection.execute", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", op="sqlite3.Connection.execute", args=(query, "sqlite") +def _build_aikido_cursor(base_cursor_cls): + """ + Creates a Python-level subclass of the given Cursor class with mutable + method slots so that wrapt can patch them. + """ + cls = type( + "AikidoSQLite3Cursor", + (base_cursor_cls,), + { + "execute": base_cursor_cls.execute, + "executemany": base_cursor_cls.executemany, + "executescript": base_cursor_cls.executescript, + }, ) + patch_function(cls, "execute", _cursor_execute) + patch_function(cls, "executemany", _cursor_executemany) + patch_function(cls, "executescript", _cursor_executescript) + return cls -@before -def _connection_executemany(func, instance, args, kwargs): - query = get_argument(args, kwargs, 0, "sql") +_AikidoCursor = _build_aikido_cursor(_sqlite3.Cursor) - register_call("sqlite3.Connection.executemany", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", - op="sqlite3.Connection.executemany", - args=(query, "sqlite"), - ) +def _aikido_cursor(self, *args, **kwargs): + """Replacement cursor() that returns an AikidoSQLite3Cursor instance.""" + return _AikidoCursor(self) -@before -def _connection_executescript(func, instance, args, kwargs): - query = get_argument(args, kwargs, 0, "sql_script") - register_call("sqlite3.Connection.executescript", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", - op="sqlite3.Connection.executescript", - args=(query, "sqlite"), +def _build_aikido_connection(base_conn_cls): + """ + Creates a Python-level Connection subclass whose cursor() returns + wrapped cursors. + """ + return type( + "AikidoSQLite3Connection", + (base_conn_cls,), + { + "cursor": _aikido_cursor, + }, ) +_AikidoConnection = _build_aikido_connection(_sqlite3.Connection) + + +@before_modify_return +def _connect(func, instance, args, kwargs): + """ + Intercept sqlite3.connect to inject our Connection factory. + The factory parameter is the 6th positional arg (index 5) or a keyword arg. + """ + # Determine the user-specified factory, if any + factory = kwargs.get("factory") + if factory is None and len(args) > 5: + factory = args[5] + if factory is None: + factory = _sqlite3.Connection + + # If the user passed a custom factory, build a new wrapped subclass for it + if factory is _sqlite3.Connection: + aikido_factory = _AikidoConnection + else: + aikido_factory = _build_aikido_connection(factory) + + # Build new args with our factory injected as a keyword + new_args = args[:5] if len(args) > 5 else args + new_kwargs = dict(kwargs) + new_kwargs["factory"] = aikido_factory + + return func(*new_args, **new_kwargs) + + @on_import("sqlite3") def patch(m): """ patching sqlite3 - - patches Cursor.execute(sql, ...) - - patches Cursor.executemany(sql, ...) - - patches Cursor.executescript(sql_script) - - patches Connection.execute(sql, ...) - - patches Connection.executemany(sql, ...) - - patches Connection.executescript(sql_script) + - patches sqlite3.connect to inject a wrapped Connection factory + - wrapped connections produce wrapped cursors + - Cursor.execute, Cursor.executemany, Cursor.executescript are intercepted """ - patch_function(m, "Cursor.execute", _cursor_execute) - patch_function(m, "Cursor.executemany", _cursor_executemany) - patch_function(m, "Cursor.executescript", _cursor_executescript) - patch_function(m, "Connection.execute", _connection_execute) - patch_function(m, "Connection.executemany", _connection_executemany) - patch_function(m, "Connection.executescript", _connection_executescript) + patch_function(m, "connect", _connect) From 217612d4df73d2e023e90025da9785a5e80399d6 Mon Sep 17 00:00:00 2001 From: BitterPanda Date: Fri, 27 Feb 2026 16:00:51 +0100 Subject: [PATCH 04/11] sqlite3.py _connect shouldnt have a @before_modify_return and use get_arg --- aikido_zen/sinks/sqlite3.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index 7fb835073..462cc672e 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -98,16 +98,13 @@ def _build_aikido_connection(base_conn_cls): _AikidoConnection = _build_aikido_connection(_sqlite3.Connection) -@before_modify_return def _connect(func, instance, args, kwargs): """ Intercept sqlite3.connect to inject our Connection factory. The factory parameter is the 6th positional arg (index 5) or a keyword arg. """ # Determine the user-specified factory, if any - factory = kwargs.get("factory") - if factory is None and len(args) > 5: - factory = args[5] + factory = get_argument(args, kwargs, 5, "factory") if factory is None: factory = _sqlite3.Connection From 867d95a9a84d7e010e681fbea35d8247d27ffa82 Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Mon, 2 Mar 2026 14:20:18 +0100 Subject: [PATCH 05/11] Create a modify_arguments.py helper for patching --- aikido_zen/helpers/modify_arguments.py | 17 ++++++ aikido_zen/helpers/modify_arguments_test.py | 58 +++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 aikido_zen/helpers/modify_arguments.py create mode 100644 aikido_zen/helpers/modify_arguments_test.py diff --git a/aikido_zen/helpers/modify_arguments.py b/aikido_zen/helpers/modify_arguments.py new file mode 100644 index 000000000..159b47c4f --- /dev/null +++ b/aikido_zen/helpers/modify_arguments.py @@ -0,0 +1,17 @@ +"""Exports modify_arguments""" + + +def modify_arguments(args, kwargs, pos, name, value): + """ + Returns (new_args, new_kwargs) with `value` injected as keyword argument + `name`. If a positional argument exists at index `pos` or beyond, it is + removed from args so the call is not duplicated. + """ + if len(args) > pos: + new_args = args[:pos] + (value,) + args[pos + 1 :] + new_kwargs = dict(kwargs) + else: + new_args = args + new_kwargs = dict(kwargs) + new_kwargs[name] = value + return new_args, new_kwargs diff --git a/aikido_zen/helpers/modify_arguments_test.py b/aikido_zen/helpers/modify_arguments_test.py new file mode 100644 index 000000000..ccea34830 --- /dev/null +++ b/aikido_zen/helpers/modify_arguments_test.py @@ -0,0 +1,58 @@ +import pytest +from .modify_arguments import modify_arguments + + +def test_injects_value_as_kwarg(): + args, kwargs = modify_arguments((), {}, 0, "key", "val") + assert kwargs["key"] == "val" + assert args == () + + +def test_overwrites_positional_arg_at_pos(): + args, kwargs = modify_arguments(("a", "b", "c"), {}, 2, "key", "new") + assert args == ("a", "b", "new") + assert "key" not in kwargs + + +def test_overwrites_positional_arg_keeps_trailing_args(): + args, kwargs = modify_arguments(("a", "b", "c", "d"), {}, 2, "key", "new") + assert args == ("a", "b", "new", "d") + assert "key" not in kwargs + + +def test_injects_as_kwarg_when_pos_not_in_args(): + args, kwargs = modify_arguments(("a", "b"), {}, 5, "key", "new") + assert args == ("a", "b") + assert kwargs["key"] == "new" + + +def test_overwrites_existing_kwarg(): + args, kwargs = modify_arguments((), {"key": "old"}, 0, "key", "new") + assert kwargs["key"] == "new" + + +def test_does_not_mutate_original_kwargs(): + original = {"other": 1} + _, kwargs = modify_arguments((), original, 0, "key", "val") + assert "key" not in original + assert kwargs["other"] == 1 + + +def test_does_not_mutate_original_args(): + original = ("a", "b", "c") + new_args, _ = modify_arguments(original, {}, 1, "key", "val") + assert original == ("a", "b", "c") + assert new_args == ("a", "val", "c") + + +def test_empty_args_and_kwargs(): + args, kwargs = modify_arguments((), {}, 3, "key", 42) + assert args == () + assert kwargs == {"key": 42} + + +def test_preserves_other_kwargs(): + args, kwargs = modify_arguments((), {"a": 1, "b": 2}, 5, "c", 3) + assert kwargs["a"] == 1 + assert kwargs["b"] == 2 + assert kwargs["c"] == 3 From b75b13f78229915cb7d03cac7fa17f8037d92296 Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Mon, 2 Mar 2026 14:35:28 +0100 Subject: [PATCH 06/11] Create and use the new patch_immutable_class for sqlite3.py --- aikido_zen/sinks/__init__.py | 21 ++++++++ aikido_zen/sinks/sqlite3.py | 92 ++++++++---------------------------- 2 files changed, 40 insertions(+), 73 deletions(-) diff --git a/aikido_zen/sinks/__init__.py b/aikido_zen/sinks/__init__.py index e962d7814..131eea745 100644 --- a/aikido_zen/sinks/__init__.py +++ b/aikido_zen/sinks/__init__.py @@ -135,3 +135,24 @@ async def decorator(func, instance, args, kwargs): return return_value return decorator + + +def patch_immutable_class(base_cls, method_patches): + class_name = f"AikidoPatched{base_cls.__name__}" + + modifiable_attributes = {} + for name in method_patches: + modifiable_attributes[name] = getattr(base_cls, name) + + cls = type( + class_name, + (base_cls,), + # this modifiable_attributes object contains a python (not c) map of functions, so we can apply the + # patch_function to these attributes of our new class. + modifiable_attributes, + ) + + for name, wrapper in method_patches.items(): + patch_function(cls, name, wrapper) + + return cls diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index 462cc672e..77ea21155 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -1,22 +1,15 @@ -""" -Sink module for `sqlite3` - -sqlite3 uses C-level types for Connection and Cursor, so we cannot directly -patch their methods with wrapt. Instead we: -1. Intercept `sqlite3.connect` and inject a custom `factory` parameter. -2. The custom factory is a dynamic Python Connection subclass whose `cursor()` - returns a wrapped Cursor subclass. -3. All SQL interception happens at the Cursor level. Connection shortcut methods - (execute, executemany, executescript) internally call cursor methods, so - wrapping only the Cursor avoids double-counting. -""" - import sqlite3 as _sqlite3 from aikido_zen.helpers.get_argument import get_argument +from aikido_zen.helpers.modify_arguments import modify_arguments import aikido_zen.vulnerabilities as vulns from aikido_zen.helpers.register_call import register_call -from aikido_zen.sinks import patch_function, on_import, before, before_modify_return +from aikido_zen.sinks import ( + patch_function, + on_import, + before, + patch_immutable_class, +) @before @@ -53,81 +46,34 @@ def _cursor_executescript(func, instance, args, kwargs): ) -def _build_aikido_cursor(base_cursor_cls): - """ - Creates a Python-level subclass of the given Cursor class with mutable - method slots so that wrapt can patch them. - """ - cls = type( - "AikidoSQLite3Cursor", - (base_cursor_cls,), +def _cursor_patch(func, instance, args, kwargs): + patched_cursor_class = patch_immutable_class( + _sqlite3.Cursor, { - "execute": base_cursor_cls.execute, - "executemany": base_cursor_cls.executemany, - "executescript": base_cursor_cls.executescript, + "execute": _cursor_execute, + "executemany": _cursor_executemany, + "executescript": _cursor_executescript, }, ) - patch_function(cls, "execute", _cursor_execute) - patch_function(cls, "executemany", _cursor_executemany) - patch_function(cls, "executescript", _cursor_executescript) - return cls - - -_AikidoCursor = _build_aikido_cursor(_sqlite3.Cursor) - - -def _aikido_cursor(self, *args, **kwargs): - """Replacement cursor() that returns an AikidoSQLite3Cursor instance.""" - return _AikidoCursor(self) - - -def _build_aikido_connection(base_conn_cls): - """ - Creates a Python-level Connection subclass whose cursor() returns - wrapped cursors. - """ - return type( - "AikidoSQLite3Connection", - (base_conn_cls,), - { - "cursor": _aikido_cursor, - }, - ) - - -_AikidoConnection = _build_aikido_connection(_sqlite3.Connection) + return patched_cursor_class(instance) def _connect(func, instance, args, kwargs): - """ - Intercept sqlite3.connect to inject our Connection factory. - The factory parameter is the 6th positional arg (index 5) or a keyword arg. - """ - # Determine the user-specified factory, if any factory = get_argument(args, kwargs, 5, "factory") if factory is None: + # Use a default factory if the user does not provide one for us factory = _sqlite3.Connection - # If the user passed a custom factory, build a new wrapped subclass for it - if factory is _sqlite3.Connection: - aikido_factory = _AikidoConnection - else: - aikido_factory = _build_aikido_connection(factory) - - # Build new args with our factory injected as a keyword - new_args = args[:5] if len(args) > 5 else args - new_kwargs = dict(kwargs) - new_kwargs["factory"] = aikido_factory + patched_factory = patch_immutable_class(factory, {"cursor": _cursor_patch}) + new_args, new_kwargs = modify_arguments(args, kwargs, 5, "factory", patched_factory) return func(*new_args, **new_kwargs) @on_import("sqlite3") def patch(m): """ - patching sqlite3 - - patches sqlite3.connect to inject a wrapped Connection factory - - wrapped connections produce wrapped cursors - - Cursor.execute, Cursor.executemany, Cursor.executescript are intercepted + patches sqlite3, a c library; the "connect" function is not c, after that we use patch_immutable_class to + patch the factory parameter of the connect function. In this factory we patch the cursor function. """ patch_function(m, "connect", _connect) From cd4ae75ed5b23e82e97289f6b663d44c4b610d98 Mon Sep 17 00:00:00 2001 From: BitterPanda Date: Mon, 2 Mar 2026 18:51:20 +0100 Subject: [PATCH 07/11] pass factory so as to fix an issue with the _cursor_patch also includes a bunch of additional test cases --- aikido_zen/sinks/__init__.py | 2 +- aikido_zen/sinks/sqlite3.py | 15 +-- aikido_zen/sinks/tests/sqlite3_test.py | 177 +++++++++++++++++++++++++ 3 files changed, 185 insertions(+), 9 deletions(-) diff --git a/aikido_zen/sinks/__init__.py b/aikido_zen/sinks/__init__.py index 131eea745..bc18f44ca 100644 --- a/aikido_zen/sinks/__init__.py +++ b/aikido_zen/sinks/__init__.py @@ -138,7 +138,7 @@ async def decorator(func, instance, args, kwargs): def patch_immutable_class(base_cls, method_patches): - class_name = f"AikidoPatched{base_cls.__name__}" + class_name = f"{base_cls.__name__}" modifiable_attributes = {} for name in method_patches: diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index 77ea21155..526c71584 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -47,23 +47,22 @@ def _cursor_executescript(func, instance, args, kwargs): def _cursor_patch(func, instance, args, kwargs): - patched_cursor_class = patch_immutable_class( - _sqlite3.Cursor, + factory = get_argument(args, kwargs, 0, "factory") or _sqlite3.Cursor + patched_factory = patch_immutable_class( + factory, { "execute": _cursor_execute, "executemany": _cursor_executemany, "executescript": _cursor_executescript, }, ) - return patched_cursor_class(instance) + new_args, new_kwargs = modify_arguments(args, kwargs, 0, "factory", patched_factory) + return func(*new_args, **new_kwargs) -def _connect(func, instance, args, kwargs): - factory = get_argument(args, kwargs, 5, "factory") - if factory is None: - # Use a default factory if the user does not provide one for us - factory = _sqlite3.Connection +def _connect(func, instance, args, kwargs): + factory = get_argument(args, kwargs, 5, "factory") or _sqlite3.Connection patched_factory = patch_immutable_class(factory, {"cursor": _cursor_patch}) new_args, new_kwargs = modify_arguments(args, kwargs, 5, "factory", patched_factory) diff --git a/aikido_zen/sinks/tests/sqlite3_test.py b/aikido_zen/sinks/tests/sqlite3_test.py index 50fbc77f9..9ce689efa 100644 --- a/aikido_zen/sinks/tests/sqlite3_test.py +++ b/aikido_zen/sinks/tests/sqlite3_test.py @@ -237,3 +237,180 @@ def test_connection_executescript(database_conn): database_conn.close() mock_run_vulnerability_scan.assert_called_once() + + +# Functional tests — verify sqlite3 behavior is not broken by patching + + +def test_cursor_execute_returns_results(database_conn): + cursor = database_conn.cursor() + cursor.execute("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", ("Fido", 1)) + database_conn.commit() + + cursor.execute("SELECT * FROM dogs WHERE dog_name = ?", ("Fido",)) + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][1] == "Fido" + assert rows[0][2] == 1 + cursor.close() + + +def test_cursor_fetchone(database_conn): + cursor = database_conn.cursor() + cursor.execute("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", ("Rex", 0)) + database_conn.commit() + + cursor.execute("SELECT * FROM dogs") + row = cursor.fetchone() + assert row is not None + assert row[1] == "Rex" + cursor.close() + + +def test_cursor_fetchmany(database_conn): + cursor = database_conn.cursor() + data = [("Dog1", 0), ("Dog2", 1), ("Dog3", 0)] + cursor.executemany("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", data) + database_conn.commit() + + cursor.execute("SELECT * FROM dogs") + rows = cursor.fetchmany(2) + assert len(rows) == 2 + cursor.close() + + +def test_cursor_rowcount(database_conn): + cursor = database_conn.cursor() + cursor.execute("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", ("Buddy", 0)) + assert cursor.rowcount == 1 + cursor.close() + + +def test_cursor_lastrowid(database_conn): + cursor = database_conn.cursor() + cursor.execute("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", ("Max", 0)) + assert cursor.lastrowid is not None + assert cursor.lastrowid > 0 + cursor.close() + + +def test_cursor_description(database_conn): + cursor = database_conn.cursor() + cursor.execute("SELECT * FROM dogs") + assert cursor.description is not None + col_names = [col[0] for col in cursor.description] + assert col_names == ["id", "dog_name", "isAdmin"] + cursor.close() + + +def test_executemany_inserts_all_rows(database_conn): + cursor = database_conn.cursor() + data = [("Dog1", 0), ("Dog2", 1), ("Dog3", 0)] + cursor.executemany("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", data) + database_conn.commit() + + cursor.execute("SELECT COUNT(*) FROM dogs") + count = cursor.fetchone()[0] + assert count == 3 + cursor.close() + + +def test_executescript_runs_all_statements(database_conn): + cursor = database_conn.cursor() + script = """ + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Script1', 0); + INSERT INTO dogs (dog_name, isAdmin) VALUES ('Script2', 1); + """ + cursor.executescript(script) + + cursor.execute("SELECT COUNT(*) FROM dogs") + count = cursor.fetchone()[0] + assert count == 2 + cursor.close() + + +def test_connection_as_context_manager(): + import sqlite3 + + with sqlite3.connect(":memory:") as conn: + conn.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, val TEXT)") + conn.execute("INSERT INTO test (val) VALUES (?)", ("hello",)) + row = conn.execute("SELECT val FROM test").fetchone() + assert row[0] == "hello" + + +def test_connect_with_custom_factory(): + import sqlite3 + + class CustomConnection(sqlite3.Connection): + def custom_method(self): + return "custom" + + conn = sqlite3.connect(":memory:", factory=CustomConnection) + assert isinstance(conn, CustomConnection) + assert conn.custom_method() == "custom" + + # Verify cursor operations still work through a custom factory + conn.execute("CREATE TABLE t (v TEXT)") + conn.execute("INSERT INTO t VALUES (?)", ("x",)) + row = conn.execute("SELECT v FROM t").fetchone() + assert row[0] == "x" + conn.close() + + +def test_connect_with_keyword_database(): + import sqlite3 + + conn = sqlite3.connect(database=":memory:") + conn.execute("CREATE TABLE kw_test (id INTEGER PRIMARY KEY, val TEXT)") + conn.execute("INSERT INTO kw_test (val) VALUES (?)", ("kw",)) + row = conn.execute("SELECT val FROM kw_test").fetchone() + assert row[0] == "kw" + conn.close() + + +def test_row_factory(database_conn): + import sqlite3 + + database_conn.row_factory = sqlite3.Row + cursor = database_conn.cursor() + cursor.execute("INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", ("RowFido", 1)) + database_conn.commit() + + cursor.execute("SELECT * FROM dogs WHERE dog_name = ?", ("RowFido",)) + row = cursor.fetchone() + assert row["dog_name"] == "RowFido" + assert row["isAdmin"] == 1 + cursor.close() + + +def test_cursor_with_custom_factory(database_conn): + import sqlite3 + + class CustomCursor(sqlite3.Cursor): + pass + + reset_comms() + with patch( + "aikido_zen.vulnerabilities.run_vulnerability_scan" + ) as mock_run_vulnerability_scan: + cursor = database_conn.cursor(factory=CustomCursor) + assert isinstance(cursor, CustomCursor) + + cursor.execute( + "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)", ("FactoryFido", 1) + ) + database_conn.commit() + + called_with_args = mock_run_vulnerability_scan.call_args[1]["args"] + assert ( + called_with_args[0] == "INSERT INTO dogs (dog_name, isAdmin) VALUES (?, ?)" + ) + assert called_with_args[1] == "sqlite" + mock_run_vulnerability_scan.assert_called_once() + + cursor.execute("SELECT * FROM dogs WHERE dog_name = ?", ("FactoryFido",)) + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][1] == "FactoryFido" + cursor.close() From 103b1f85da5e1ef60627887287b135d98c01bd57 Mon Sep 17 00:00:00 2001 From: bitterpanda Date: Mon, 2 Mar 2026 18:54:48 +0100 Subject: [PATCH 08/11] Add sqlite3 as supported in README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 1b65820a6..cb2802cc8 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,7 @@ Zen for Python 3 is compatible with: * ✅ [`asyncpg`](https://pypi.org/project/asyncpg) ^0.27 * ✅ [`motor`](https://pypi.org/project/motor/) (See `pymongo` version) * ✅ [`clickhouse-driver`](https://pypi.org/project/clickhouse-driver) +* ✅ [`sqlite3`](https://docs.python.org/3/library/sqlite3.html) ### AI SDKs Zen instruments the following AI SDKs to track which models are used and how many tokens are consumed, allowing you to monitor your AI usage and costs: From 12d6dd014dfc17a1507eeae437b95c101134e3d7 Mon Sep 17 00:00:00 2001 From: BitterPanda Date: Mon, 2 Mar 2026 19:42:27 +0100 Subject: [PATCH 09/11] Update to patch _execute and _executescript & to make them more dynamic --- aikido_zen/sinks/__init__.py | 4 +-- aikido_zen/sinks/sqlite3.py | 47 ++++++++++++++++++++---------------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/aikido_zen/sinks/__init__.py b/aikido_zen/sinks/__init__.py index bc18f44ca..0ad392e56 100644 --- a/aikido_zen/sinks/__init__.py +++ b/aikido_zen/sinks/__init__.py @@ -138,14 +138,12 @@ async def decorator(func, instance, args, kwargs): def patch_immutable_class(base_cls, method_patches): - class_name = f"{base_cls.__name__}" - modifiable_attributes = {} for name in method_patches: modifiable_attributes[name] = getattr(base_cls, name) cls = type( - class_name, + base_cls.__name__, (base_cls,), # this modifiable_attributes object contains a python (not c) map of functions, so we can apply the # patch_function to these attributes of our new class. diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index 526c71584..c7390ff6f 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -23,27 +23,19 @@ def _cursor_execute(func, instance, args, kwargs): @before -def _cursor_executemany(func, instance, args, kwargs): +def _execute(func, instance, args, kwargs): + op = f"sqlite3.{type(instance).__name__}.{func.__name__}" query = get_argument(args, kwargs, 0, "sql") - - register_call("sqlite3.Cursor.executemany", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", - op="sqlite3.Cursor.executemany", - args=(query, "sqlite"), - ) + register_call(op, "sql_op") + vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "sqlite")) @before -def _cursor_executescript(func, instance, args, kwargs): +def _executescript(func, instance, args, kwargs): + op = f"sqlite3.{type(instance).__name__}.{func.__name__}" query = get_argument(args, kwargs, 0, "sql_script") - - register_call("sqlite3.Cursor.executescript", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", - op="sqlite3.Cursor.executescript", - args=(query, "sqlite"), - ) + register_call(op, "sql_op") + vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "sqlite")) def _cursor_patch(func, instance, args, kwargs): @@ -51,9 +43,9 @@ def _cursor_patch(func, instance, args, kwargs): patched_factory = patch_immutable_class( factory, { - "execute": _cursor_execute, - "executemany": _cursor_executemany, - "executescript": _cursor_executescript, + "execute": _execute, + "executemany": _execute, + "executescript": _executescript, }, ) @@ -63,8 +55,21 @@ def _cursor_patch(func, instance, args, kwargs): def _connect(func, instance, args, kwargs): factory = get_argument(args, kwargs, 5, "factory") or _sqlite3.Connection - patched_factory = patch_immutable_class(factory, {"cursor": _cursor_patch}) - + connection_patches = { + "cursor": _cursor_patch + } + + if _PATCH_CONNECTION_EXECUTE: + # Since py 3.11 there are more ways than using the cursor to execute (e.g. using the connection) + connection_patches.update( + { + "execute": _execute, + "executemany": _execute, + "executescript": _executescript, + } + ) + + patched_factory = patch_immutable_class(factory, connection_patches) new_args, new_kwargs = modify_arguments(args, kwargs, 5, "factory", patched_factory) return func(*new_args, **new_kwargs) From 5a65a9c7b3d9d1df516c3c6c14d494caac18a9d3 Mon Sep 17 00:00:00 2001 From: BitterPanda Date: Mon, 2 Mar 2026 19:48:45 +0100 Subject: [PATCH 10/11] cleanup & check if py is 3.11 or higher --- aikido_zen/sinks/sqlite3.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index c7390ff6f..ce53ffacd 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -1,5 +1,5 @@ import sqlite3 as _sqlite3 - +import sys from aikido_zen.helpers.get_argument import get_argument from aikido_zen.helpers.modify_arguments import modify_arguments import aikido_zen.vulnerabilities as vulns @@ -12,15 +12,6 @@ ) -@before -def _cursor_execute(func, instance, args, kwargs): - query = get_argument(args, kwargs, 0, "sql") - - register_call("sqlite3.Cursor.execute", "sql_op") - vulns.run_vulnerability_scan( - kind="sql_injection", op="sqlite3.Cursor.execute", args=(query, "sqlite") - ) - @before def _execute(func, instance, args, kwargs): @@ -59,8 +50,8 @@ def _connect(func, instance, args, kwargs): "cursor": _cursor_patch } - if _PATCH_CONNECTION_EXECUTE: - # Since py 3.11 there are more ways than using the cursor to execute (e.g. using the connection) + # In Python 3.11, the sqlite3 module was fully moved to C. Hence the extra patches + if sys.version_info >= (3, 11): connection_patches.update( { "execute": _execute, From bcf93130d45cb3b3d731595046e727d2ebb31b07 Mon Sep 17 00:00:00 2001 From: BitterPanda Date: Mon, 2 Mar 2026 20:17:43 +0100 Subject: [PATCH 11/11] lint changes --- aikido_zen/sinks/sqlite3.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/aikido_zen/sinks/sqlite3.py b/aikido_zen/sinks/sqlite3.py index ce53ffacd..830caf4b1 100644 --- a/aikido_zen/sinks/sqlite3.py +++ b/aikido_zen/sinks/sqlite3.py @@ -12,7 +12,6 @@ ) - @before def _execute(func, instance, args, kwargs): op = f"sqlite3.{type(instance).__name__}.{func.__name__}" @@ -46,9 +45,7 @@ def _cursor_patch(func, instance, args, kwargs): def _connect(func, instance, args, kwargs): factory = get_argument(args, kwargs, 5, "factory") or _sqlite3.Connection - connection_patches = { - "cursor": _cursor_patch - } + connection_patches = {"cursor": _cursor_patch} # In Python 3.11, the sqlite3 module was fully moved to C. Hence the extra patches if sys.version_info >= (3, 11):