-
Notifications
You must be signed in to change notification settings - Fork 14
Add sqlite 3 missing sink #591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
aecc31c
Add first rendition of sqlite3.py sink
bitterpanda63 3d54bf0
Add sqlite3 sink tests
bitterpanda63 92fe154
Update sqlite3.py sink: Add c-level wrapping, still needs some modifiβ¦
bitterpanda63 217612d
sqlite3.py _connect shouldnt have a @before_modify_return and use getβ¦
bitterpanda63 867d95a
Create a modify_arguments.py helper for patching
bitterpanda63 b75b13f
Create and use the new patch_immutable_class for sqlite3.py
bitterpanda63 cd4ae75
pass factory so as to fix an issue with the _cursor_patch
bitterpanda63 103b1f8
Add sqlite3 as supported in README.md
bitterpanda63 12d6dd0
Update to patch _execute and _executescript & to make them more dynamic
bitterpanda63 5a65a9c
cleanup & check if py is 3.11 or higher
bitterpanda63 bcf9313
lint changes
bitterpanda63 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| """Exports modify_arguments""" | ||
|
|
||
|
|
||
| def modify_arguments(args, kwargs, pos, name, value): | ||
| """ | ||
| Returns (new_args, new_kwargs) with `value` injected as keyword argument | ||
| `name`. If a positional argument exists at index `pos` or beyond, it is | ||
| removed from args so the call is not duplicated. | ||
| """ | ||
| if len(args) > pos: | ||
| new_args = args[:pos] + (value,) + args[pos + 1 :] | ||
| new_kwargs = dict(kwargs) | ||
| else: | ||
| new_args = args | ||
| new_kwargs = dict(kwargs) | ||
| new_kwargs[name] = value | ||
| return new_args, new_kwargs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| import pytest | ||
| from .modify_arguments import modify_arguments | ||
|
|
||
|
|
||
| def test_injects_value_as_kwarg(): | ||
| args, kwargs = modify_arguments((), {}, 0, "key", "val") | ||
| assert kwargs["key"] == "val" | ||
| assert args == () | ||
|
|
||
|
|
||
| def test_overwrites_positional_arg_at_pos(): | ||
| args, kwargs = modify_arguments(("a", "b", "c"), {}, 2, "key", "new") | ||
| assert args == ("a", "b", "new") | ||
| assert "key" not in kwargs | ||
|
|
||
|
|
||
| def test_overwrites_positional_arg_keeps_trailing_args(): | ||
| args, kwargs = modify_arguments(("a", "b", "c", "d"), {}, 2, "key", "new") | ||
| assert args == ("a", "b", "new", "d") | ||
| assert "key" not in kwargs | ||
|
|
||
|
|
||
| def test_injects_as_kwarg_when_pos_not_in_args(): | ||
| args, kwargs = modify_arguments(("a", "b"), {}, 5, "key", "new") | ||
| assert args == ("a", "b") | ||
| assert kwargs["key"] == "new" | ||
|
|
||
|
|
||
| def test_overwrites_existing_kwarg(): | ||
| args, kwargs = modify_arguments((), {"key": "old"}, 0, "key", "new") | ||
| assert kwargs["key"] == "new" | ||
|
|
||
|
|
||
| def test_does_not_mutate_original_kwargs(): | ||
| original = {"other": 1} | ||
| _, kwargs = modify_arguments((), original, 0, "key", "val") | ||
| assert "key" not in original | ||
| assert kwargs["other"] == 1 | ||
|
|
||
|
|
||
| def test_does_not_mutate_original_args(): | ||
| original = ("a", "b", "c") | ||
| new_args, _ = modify_arguments(original, {}, 1, "key", "val") | ||
| assert original == ("a", "b", "c") | ||
| assert new_args == ("a", "val", "c") | ||
|
|
||
|
|
||
| def test_empty_args_and_kwargs(): | ||
| args, kwargs = modify_arguments((), {}, 3, "key", 42) | ||
| assert args == () | ||
| assert kwargs == {"key": 42} | ||
|
|
||
|
|
||
| def test_preserves_other_kwargs(): | ||
| args, kwargs = modify_arguments((), {"a": 1, "b": 2}, 5, "c", 3) | ||
| assert kwargs["a"] == 1 | ||
| assert kwargs["b"] == 2 | ||
| assert kwargs["c"] == 3 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| import sqlite3 as _sqlite3 | ||
| import sys | ||
| from aikido_zen.helpers.get_argument import get_argument | ||
| from aikido_zen.helpers.modify_arguments import modify_arguments | ||
| import aikido_zen.vulnerabilities as vulns | ||
| from aikido_zen.helpers.register_call import register_call | ||
| from aikido_zen.sinks import ( | ||
| patch_function, | ||
| on_import, | ||
| before, | ||
| patch_immutable_class, | ||
| ) | ||
|
|
||
|
|
||
| @before | ||
| def _execute(func, instance, args, kwargs): | ||
| op = f"sqlite3.{type(instance).__name__}.{func.__name__}" | ||
| query = get_argument(args, kwargs, 0, "sql") | ||
| register_call(op, "sql_op") | ||
| vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "sqlite")) | ||
|
|
||
|
|
||
| @before | ||
| def _executescript(func, instance, args, kwargs): | ||
| op = f"sqlite3.{type(instance).__name__}.{func.__name__}" | ||
| query = get_argument(args, kwargs, 0, "sql_script") | ||
| register_call(op, "sql_op") | ||
| vulns.run_vulnerability_scan(kind="sql_injection", op=op, args=(query, "sqlite")) | ||
|
|
||
|
|
||
| def _cursor_patch(func, instance, args, kwargs): | ||
| factory = get_argument(args, kwargs, 0, "factory") or _sqlite3.Cursor | ||
| patched_factory = patch_immutable_class( | ||
| factory, | ||
| { | ||
| "execute": _execute, | ||
| "executemany": _execute, | ||
| "executescript": _executescript, | ||
| }, | ||
| ) | ||
|
|
||
| new_args, new_kwargs = modify_arguments(args, kwargs, 0, "factory", patched_factory) | ||
| return func(*new_args, **new_kwargs) | ||
|
|
||
|
|
||
| def _connect(func, instance, args, kwargs): | ||
| factory = get_argument(args, kwargs, 5, "factory") or _sqlite3.Connection | ||
| connection_patches = {"cursor": _cursor_patch} | ||
|
|
||
| # In Python 3.11, the sqlite3 module was fully moved to C. Hence the extra patches | ||
| if sys.version_info >= (3, 11): | ||
| connection_patches.update( | ||
| { | ||
| "execute": _execute, | ||
| "executemany": _execute, | ||
| "executescript": _executescript, | ||
| } | ||
| ) | ||
|
|
||
| patched_factory = patch_immutable_class(factory, connection_patches) | ||
| new_args, new_kwargs = modify_arguments(args, kwargs, 5, "factory", patched_factory) | ||
| return func(*new_args, **new_kwargs) | ||
|
|
||
|
|
||
| @on_import("sqlite3") | ||
| def patch(m): | ||
| """ | ||
| patches sqlite3, a c library; the "connect" function is not c, after that we use patch_immutable_class to | ||
| patch the factory parameter of the connect function. In this factory we patch the cursor function. | ||
| """ | ||
| patch_function(m, "connect", _connect) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.