diff --git a/README.md b/README.md index 0947cd0..2188204 100644 --- a/README.md +++ b/README.md @@ -156,6 +156,37 @@ for line in data: ``` + +### As a python logging.Filter + +```python +import logging + +from anonip import AnonipFilterMsg + +if __name__ == '__main__': + handler = logging.StreamHandler() + handler.addFilter(AnonipFilterMsg()) + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[handler] + ) + + logging.debug('192.0.2.123 - call from root logger') + + logger = logging.getLogger('child') + logger.info('2001:db8:abcd:ef01:2345:6789:abcd:ef01 - call from child logger') +``` + +The following implementations are available, each targeting different parts of +logging.LogRecord: + +* `AnonipFilterMsg`: Operates on the log format string. +* `AnonipFilterArg`: Operates on a positional argument. +* `AnonipFilterExtraField`: Operates on a specific extra field. + + ### Python 2 or 3? For compatibility reasons, anonip uses the shebang `#! /usr/bin/env python`. This will default to python2 on all Linux distributions except for Arch Linux. diff --git a/anonip.py b/anonip.py index 2850784..fcbb7ad 100755 --- a/anonip.py +++ b/anonip.py @@ -60,6 +60,7 @@ if sys.version_info[0] >= 3: # pragma: no cover # compatibility for python < 3 unicode = str + basestring = str __title__ = "anonip" __description__ = "Anonip is a tool to anonymize IP-addresses in log files." @@ -303,6 +304,95 @@ def truncate_address(self, ip): return ip.supernet(new_prefix=self._prefixes[ip.version])[0] +class AnonipFilterMsg(object): + def __init__(self, anonip=None): + """ + An implementation of Python logging.Filter using anonip. + + Operates on the `msg` field of a log record. + + :param anonip: dict of parameters for Anonip instance + """ + self.anonip = Anonip(**(anonip or {})) + + def filter(self, record): + """ + Apply anonip IP masking to `record.msg`. + + :param record: logging.LogRecord + :return: bool + """ + + record.msg = self.anonip.process_line(record.msg) + + return True + + +class AnonipFilterExtraField(object): + def __init__(self, key, anonip=None): + """ + An implementation of Python logging.Filter using anonip. + + Operates on one extra field of a log record. + + :param key: str name of extra field to operate on. + :param anonip: dict of parameters for Anonip instance + """ + self.key = key + self.anonip = Anonip(**(anonip or {})) + + def filter(self, record): + """ + Apply anonip IP masking to `record[self.key]`. + + :param record: logging.LogRecord + :return: bool + """ + + if hasattr(record, self.key): + value = getattr(record, self.key) + if isinstance(value, basestring): + ip = self.anonip.extract_ip(value)[1] + if ip: + setattr(record, self.key, str(self.anonip.process_ip(ip))) + + return True + + +class AnonipFilterArg(object): + def __init__(self, index, anonip=None): + """ + An implementation of Python logging.Filter using anonip. + + Operates on one positional argument of a log record. + + :param index: int index of argument to operate on. + :param anonip: dict of parameters for Anonip instance + """ + self.index = index + self.anonip = Anonip(**(anonip or {})) + + def filter(self, record): + """ + Apply anonip IP masking to `record.args[self.index]`. + + :param record: logging.LogRecord + :return: bool + """ + + if self.index < len(record.args): + value = record.args[self.index] + if isinstance(value, basestring): + ip = self.anonip.extract_ip(value)[1] + if ip: + head = record.args[: self.index] + result = (str(self.anonip.process_ip(ip)),) + tail = record.args[self.index + 1 :] + record.args = head + result + tail + + return True + + def _validate_ipmask(mask, bits=32): """ Verify if the supplied ip mask is valid. diff --git a/tests.py b/tests.py index cb07d6a..7e57b1b 100755 --- a/tests.py +++ b/tests.py @@ -407,3 +407,103 @@ def test_properties_columns(): assert a.columns == [0] a.columns = [5, 6] assert a.columns == [4, 5] + + +def test_logging_filter_msg(caplog): + logger = logging.getLogger("filter_msg") + logger.addFilter(anonip.AnonipFilterMsg()) + logger.setLevel(logging.INFO) + + logger.info("192.168.100.200 string") + logger.info("1.2.3.4 string") + logger.info("2001:0db8:85a3:0000:0000:8a2e:0370:7334 string") + logger.info("2a00:1450:400a:803::200e string") + + assert caplog.record_tuples == [ + ("filter_msg", logging.INFO, "192.168.96.0 string"), + ("filter_msg", logging.INFO, "1.2.0.0 string"), + ("filter_msg", logging.INFO, "2001:db8:85a0:: string"), + ("filter_msg", logging.INFO, "2a00:1450:4000:: string"), + ] + + +def test_logging_filter_args(caplog): + logger = logging.getLogger("filter_args") + logger.addFilter(anonip.AnonipFilterArg(0)) + logger.setLevel(logging.INFO) + + logger.info("string %s", "192.168.100.200") + logger.info("%s string", "1.2.3.4") + logger.info("string %s", "2001:0db8:85a3:0000:0000:8a2e:0370:7334") + logger.info("some %s string", "2a00:1450:400a:803::200e") + logger.info("invalid %s string", "not-an-ip") + logger.info("skip complex %s arg", ("not", "a", "string")) + + expected = [ + "192.168.96.0", + "1.2.0.0", + "2001:db8:85a0::", + "2a00:1450:4000::", + "not-an-ip", + ("not", "a", "string"), + ] + + actual = [ + record.args[0] for record in caplog.records if record.name == "filter_args" + ] + + assert actual == expected + + +def test_logging_filter_args_missing(caplog): + logger = logging.getLogger("filter_extra") + logger.addFilter( + anonip.AnonipFilterArg(42, anonip={"ipv4mask": 16, "ipv6mask": 64}) + ) + logger.setLevel(logging.INFO) + + logger.info("string") + + assert caplog.record_tuples == [ + ("filter_extra", logging.INFO, "string"), + ] + + +def test_logging_filter_extra(caplog): + logger = logging.getLogger("filter_extra") + logger.addFilter( + anonip.AnonipFilterExtraField(key="ip", anonip={"ipv4mask": 16, "ipv6mask": 64}) + ) + logger.setLevel(logging.INFO) + + logger.info("string", extra={"ip": "192.168.100.200"}) + logger.info("string", extra={"ip": "1.2.3.4"}) + logger.info("string", extra={"ip": "2001:0db8:85a3:0000:0000:8a2e:0370:7334"}) + logger.info("string", extra={"ip": "2a00:1450:400a:803::200e"}) + logger.info("string", extra={"ip": "not-an-ip"}) + logger.info("string", extra={"ip": ("not", "a", "string")}) + + expected = [ + "192.168.0.0", + "1.2.0.0", + "2001:db8:85a3::", + "2a00:1450:400a:803::", + "not-an-ip", + ("not", "a", "string"), + ] + + actual = [record.ip for record in caplog.records if record.name == "filter_extra"] + + assert actual == expected + + +def test_logging_filter_extra_non_existing(caplog): + logger = logging.getLogger("filter_extra") + logger.addFilter(anonip.AnonipFilterExtraField("non-existing-field")) + logger.setLevel(logging.INFO) + + logger.info("string") + + assert caplog.record_tuples == [ + ("filter_extra", logging.INFO, "string"), + ]