diff --git a/buffalogs/impossible_travel/ingestion/base_ingestion.py b/buffalogs/impossible_travel/ingestion/base_ingestion.py index 07498628..f744e316 100644 --- a/buffalogs/impossible_travel/ingestion/base_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/base_ingestion.py @@ -15,11 +15,13 @@ class SupportedIngestionSources(Enum): * ELASTICSEARCH: The login data is extracted from Elasticsearch * SPLUNK: The login data is extracted from Splunk * OPENSEARCH: The login data is extracted from Opensearch + * CLOUDTRAIL: The login data is extracted from AWS CloudTrail S3 logs """ ELASTICSEARCH = "elasticsearch" SPLUNK = "splunk" OPENSEARCH = "opensearch" + CLOUDTRAIL = "cloudtrail" def __init__(self, ingestion_config, mapping): super().__init__() @@ -29,12 +31,13 @@ def __init__(self, ingestion_config, mapping): @abstractmethod def process_users(self, start_date: datetime, end_date: datetime) -> list: - """Abstract method that implement the extraction of the users logged in between the time range considered defined by (start_date, end_date). - This method will be different implemented based on the ingestion source used. + """Abstract method that implement the extraction of the users logged in + between the time range considered defined by (start_date, end_date). + This method will be different implemented based on the ingestion source used. # noqa: E501 - :param start_date: the initial datetime from which the users are considered + :param start_date: the initial datetime from which the users are considered # noqa: E501 :type start_date: datetime (with tzinfo=datetime.timezone.utc) - :param end_date: the final datetime within which the users are considered + :param end_date: the final datetime within which the users are considered # noqa: E501 :type end_date: datetime (with tzinfo=datetime.timezone.utc) :return: list of users strings that logged in the system @@ -44,14 +47,17 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list: @abstractmethod def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list: - """Abstract method that implement the extraction of the logins of the given user in the time range defined by (start_date, end_date) - This method will be different implemented based on the ingestion source used. + """Abstract method that implement the extraction of the logins of the + given user in the time range defined by (start_date, end_date) + This method will be different implemented based on the ingestion source used. # noqa: E501 :param username: username of the user that logged in :type username: str - :param start_date: the initial datetime from which the logins of the user are considered + :param start_date: the initial datetime from which the logins of the + user are considered :type start_date: datetime (with tzinfo=datetime.timezone.utc) - :param end_date: the final datetime within which the logins of the user are considered + :param end_date: the final datetime within which the logins of the user + are considered :type end_date: datetime (with tzinfo=datetime.timezone.utc) :return: list of logins of a user @@ -60,8 +66,10 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username raise NotImplementedError def normalize_fields(self, logins: list) -> list: - """Concrete method that manage the mapping into the required BuffaLogs mapping. - The mapping used is defined into the ingestion.json file "custom_mapping" if defined, otherwise it is used the default one + """Concrete method that manage the mapping into the required BuffaLogs + mapping. + The mapping used is defined into the ingestion.json file "custom_mapping" # noqa: E501 + if defined, otherwise it is used the default one :param logins: the logins to be normalized into the mapping fields :type logins: list diff --git a/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py b/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py new file mode 100644 index 00000000..a5868fc5 --- /dev/null +++ b/buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py @@ -0,0 +1,201 @@ +import gzip +import json +import logging +from datetime import datetime, timedelta + +import boto3 +import geoip2.database +from dateutil import parser +from geoip2.errors import AddressNotFoundError +from impossible_travel.ingestion.base_ingestion import BaseIngestion + + +class CloudTrailIngestion(BaseIngestion): + """ + Concrete implementation of the BaseIngestion class for AWS CloudTrail + ingestion source + """ + + def __init__(self, ingestion_config: dict, mapping: dict): + """ + Constructor for the CloudTrail Ingestion object + """ + super().__init__(ingestion_config, mapping) + self.s3 = boto3.client( + "s3", + aws_access_key_id=ingestion_config.get("aws_access_key_id"), + aws_secret_access_key=ingestion_config.get("aws_secret_access_key"), + region_name=ingestion_config["region"], + ) + self.bucket = ingestion_config["bucket_name"] + self.prefix_template = ingestion_config.get("prefix_template", "AWSLogs/{account_id}/CloudTrail/{region}/") # User can override + self.geo_db_path = ingestion_config.get("geo_db_path", "/etc/buffalogs/GeoLite2-City.mmdb") + self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + try: + self.geo_reader = geoip2.database.Reader(self.geo_db_path) + except FileNotFoundError: + msg = f"GeoIP database not found at {self.geo_db_path}. " "Geo-enrichment disabled." + self.logger.warning(msg) + self.geo_reader = None + + def get_log_files(self, start_date: datetime, end_date: datetime) -> list: + """ + Get list of S3 keys for CloudTrail logs in the time range. + Assumes standard CloudTrail prefix structure. + Handles pagination for list_objects_v2. + """ + files = [] + current = start_date.replace(hour=0, minute=0, second=0, microsecond=0) + account_id = self.ingestion_config.get("account_id", "") + region = self.ingestion_config["region"] # Required in config + + while current < end_date: + date_part = f"{current.year}/{current.month:02d}/{current.day:02d}/" # noqa: E501 + prefix = self.prefix_template.format(account_id=account_id, region=region) + date_part + + try: + paginator = self.s3.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix): + for obj in page.get("Contents", []): + files.append(obj["Key"]) + except Exception as e: + err_msg = f"Error listing S3 objects for prefix " f"{prefix}: {e}" + self.logger.error(err_msg) + current += timedelta(days=1) + return files + + def extract_logins(self, files: list) -> list: + """ + Extract and parse login events from S3 files. + """ + logins = [] + for key in files: + try: + obj = self.s3.get_object(Bucket=self.bucket, Key=key) + with gzip.GzipFile(fileobj=obj["Body"]) as f: + data = json.load(f) + for record in data.get("Records", []): + if self.is_login_event(record): + login = self.parse_login(record) + if login: + logins.append(login) + except Exception as e: + self.logger.error(f"Error processing S3 file {key}: {e}") + return logins + + def is_login_event(self, record: dict) -> bool: + """ + Filter for relevant security/login events. + Equivalent to ES filters: authentication, success, start. + """ + if record.get("errorCode") is not None or record.get("errorMessage") is not None: + return False # Only successful events + + event_name = record.get("eventName") + event_source = record.get("eventSource") + + # Console sign-ins + if event_source == "signin.amazonaws.com" and event_name in ["ConsoleLogin", "CheckMfa"]: + return True + + # Role assumptions (like starting a session) + if event_name in [ + "AssumeRole", + "AssumeRoleWithSAML", + "AssumeRoleWithWebIdentity", + "GetSessionToken", + ]: + return True + + # Add more if needed, e.g., 'SwitchRole' + return False + + def parse_login(self, record: dict) -> dict | None: + """ + Parse and enrich a single CloudTrail event into a format similar to ES hits. # noqa: E501 + Keys match the mapping (e.g., '@timestamp', 'user.name', etc.). + """ + data = {} + data["@timestamp"] = record.get("eventTime", "") + + user_identity = record.get("userIdentity", {}) + session_issuer = user_identity.get("sessionContext", {}).get("sessionIssuer", {}) + + arn_last = user_identity.get("arn", "").split("/")[-1] + principal_last = user_identity.get("principalId", "").split(":")[-1] + + data["user.name"] = user_identity.get("userName") or session_issuer.get("userName") or arn_last or principal_last or "" + + data["source.ip"] = record.get("sourceIPAddress", "") + data["user_agent.original"] = record.get("userAgent", "") + data["source.as.organization.name"] = "" # Not available in CloudTrail; can enrich externally if needed + + ip = data["source.ip"] + if not ip or ip == "127.0.0.1" or ip.startswith(("10.", "192.168.", "172.16.")): + return None # Skip invalid/local IPs + + # Geo-enrichment + data["source.geo.country_name"] = "" + data["source.geo.location.lat"] = "" + data["source.geo.location.lon"] = "" + + if self.geo_reader: + try: + geo = self.geo_reader.city(ip) + data["source.geo.country_name"] = geo.country.name or "" + data["source.geo.location.lat"] = geo.location.latitude or "" + data["source.geo.location.lon"] = geo.location.longitude or "" + except AddressNotFoundError: + self.logger.debug(f"GeoIP not found for IP: {ip}") + except Exception as e: + self.logger.error(f"GeoIP error for IP {ip}: {e}") + + if not data["source.geo.country_name"]: # Skip if no geo + return None + + # Intelligence category (e.g., anonymous) + ua_lower = data["user_agent.original"].lower() + is_anonymous = user_identity.get("type") == "AnonymousUser" or "tor" in ua_lower + data["source.intelligence_category"] = "anonymous" if is_anonymous else "" # noqa: E501 + + # Other fields + data["_id"] = record.get("eventID", "") + data["_index"] = "cloudtrail" + return data + + def process_users(self, start_date: datetime, end_date: datetime) -> list: + """ + Concrete implementation of the BaseIngestion.process_users abstract method # noqa: E501 + """ + msg = f"Processing users from {start_date} to {end_date}" + self.logger.info(msg) + + files = self.get_log_files(start_date, end_date) + logins = self.extract_logins(files) + + users = set() + for login in logins: + name = login.get("user.name") + if name: + users.add(name) + return list(users) + + def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list: + """ + Concrete implementation of the BaseIngestion.process_user_logins + abstract method + """ + msg = f"Processing logins for user {username} " f"from {start_date} to {end_date}" + self.logger.info(msg) + + files = self.get_log_files(start_date, end_date) + logins = self.extract_logins(files) + + user_logins = [] + for login in logins: + if login.get("user.name") == username: + user_logins.append(login) + + # Sort by timestamp, similar to ES sort + user_logins.sort(key=lambda x: parser.parse(x["@timestamp"])) + return user_logins diff --git a/buffalogs/impossible_travel/ingestion/ingestion_factory.py b/buffalogs/impossible_travel/ingestion/ingestion_factory.py index 174a99db..e568b8c0 100644 --- a/buffalogs/impossible_travel/ingestion/ingestion_factory.py +++ b/buffalogs/impossible_travel/ingestion/ingestion_factory.py @@ -3,6 +3,7 @@ from django.conf import settings from impossible_travel.ingestion.base_ingestion import BaseIngestion +from impossible_travel.ingestion.cloudtrail_ingestion import CloudTrailIngestion from impossible_travel.ingestion.elasticsearch_ingestion import ElasticsearchIngestion from impossible_travel.ingestion.opensearch_ingestion import OpensearchIngestion from impossible_travel.ingestion.splunk_ingestion import SplunkIngestion @@ -24,15 +25,15 @@ def _read_config(self) -> dict: :rtype: dict """ with open( - os.path.join(settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json"), + os.path.join(settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json"), # noqa: E501 mode="r", encoding="utf-8", ) as f: config = json.load(f) if config["active_ingestion"] not in [i.value for i in BaseIngestion.SupportedIngestionSources]: - raise ValueError(f"The ingestion source: {config['active_ingestion']} is not supported") + raise ValueError(f"The ingestion source: {config['active_ingestion']} " "is not supported") if not config.get(config["active_ingestion"]): - raise ValueError(f"The configuration for the {config['active_ingestion']} must be implemented") + raise ValueError(f"The configuration for the {config['active_ingestion']} " "must be implemented") return config def get_ingestion_class(self): @@ -46,5 +47,7 @@ def get_ingestion_class(self): return OpensearchIngestion(self.ingestion_config, self.mapping) case BaseIngestion.SupportedIngestionSources.SPLUNK: return SplunkIngestion(self.ingestion_config, self.mapping) + case BaseIngestion.SupportedIngestionSources.CLOUDTRAIL: + return CloudTrailIngestion(self.ingestion_config, self.mapping) case _: raise ValueError(f"Unsupported ingestion source: {self.active_ingestion}") diff --git a/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_ingestion.py b/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_ingestion.py new file mode 100644 index 00000000..38f748b1 --- /dev/null +++ b/buffalogs/impossible_travel/tests/ingestion/test_cloudtrail_ingestion.py @@ -0,0 +1,131 @@ +import gzip +import json +from datetime import datetime, timezone +from unittest.mock import patch + +import boto3 +from impossible_travel.ingestion.cloudtrail_ingestion import CloudTrailIngestion +from moto import mock_aws + + +@mock_aws +def test_integration(): + # Setup config (mimics ingestion.json) + config = { + "bucket_name": "test-bucket", + "region": "us-east-1", + "account_id": "123456789012", + "prefix_template": "AWSLogs/{account_id}/CloudTrail/{region}/", + "geo_db_path": "/fake/path.mmdb", # Fake path - we force geo success + "timeout": 90, + "bucket_size": 10000, + "custom_mapping": { + "@timestamp": "timestamp", + "_id": "id", + "_index": "index", + "user.name": "username", + "source.ip": "ip", + "user_agent.original": "agent", + "source.as.organization.name": "organization", + "source.geo.country_name": "country", + "source.geo.location.lat": "lat", + "source.geo.location.lon": "lon", + "source.intelligence_category": "intelligence_category", + }, + } + + # Create mock S3 bucket + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket="test-bucket") + + # Mock CloudTrail log file + mock_record_alice = { + "eventTime": "2025-10-15T10:00:00Z", + "eventSource": "signin.amazonaws.com", + "eventName": "ConsoleLogin", + "userIdentity": {"userName": "alice"}, + "sourceIPAddress": "203.0.113.55", + "userAgent": "Mozilla/5.0", + "errorCode": None, + } + mock_record_bob = { + "eventTime": "2025-10-15T12:00:00Z", + "eventSource": "sts.amazonaws.com", + "eventName": "AssumeRole", + "userIdentity": { + "type": "AssumedRole", + "sessionContext": {"sessionIssuer": {"userName": "bob"}}, + }, + "sourceIPAddress": "198.51.100.77", + "userAgent": "Mozilla/5.0", + "errorCode": None, + } + mock_log = {"Records": [mock_record_alice, mock_record_bob]} + gz_log = gzip.compress(json.dumps(mock_log).encode()) + s3.put_object( + Bucket="test-bucket", + Key=("AWSLogs/123456789012/CloudTrail/us-east-1/" "2025/10/15/file.json.gz"), + Body=gz_log, + ) + + # Create ingestion instance + ingestion = CloudTrailIngestion(config, config["custom_mapping"]) + + # Mock parse_login to force success (no recursion, full implementation) + def mock_parse_login(self, record): + data = { + "@timestamp": record.get("eventTime", ""), + "user.name": ( + record.get("userIdentity", {}).get("userName") + or record.get("userIdentity", {}).get("sessionContext", {}).get("sessionIssuer", {}).get("userName") + or "" + ), + "source.ip": record.get("sourceIPAddress", ""), + "user_agent.original": record.get("userAgent", ""), + "source.as.organization.name": "", + "source.geo.country_name": "MockCountry", + "source.geo.location.lat": 0.0, + "source.geo.location.lon": 0.0, + "source.intelligence_category": "", + "_id": record.get("eventID", ""), + "_index": "cloudtrail", + } + + # Skip invalid IPs + ip = data["source.ip"] + if not ip or ip == "127.0.0.1" or ip.startswith(("10.", "192.168.", "172.16.")): + return None + + # Skip if no country (but we force it here) + if not data["source.geo.country_name"]: + return None + + return data + + with patch.object(CloudTrailIngestion, "parse_login", mock_parse_login): + # Test process_users + users = ingestion.process_users( + start_date=datetime(2025, 10, 15, tzinfo=timezone.utc), + end_date=datetime(2025, 10, 16, tzinfo=timezone.utc), + ) + print("Users found:", users) + assert sorted(users) == ["alice", "bob"], "process_users failed" + + # Test process_user_logins + alice_logins = ingestion.process_user_logins( + start_date=datetime(2025, 10, 15, tzinfo=timezone.utc), + end_date=datetime(2025, 10, 16, tzinfo=timezone.utc), + username="alice", + ) + print("Alice logins:", alice_logins) + assert len(alice_logins) == 1, "process_user_logins failed for alice" + assert alice_logins[0]["user.name"] == "alice", "Wrong username" + assert alice_logins[0]["source.ip"] == "203.0.113.55", "Wrong IP" + + print("Integration test PASSED!") + return "SUCCESS" + + +if __name__ == "__main__": + result = test_integration() + print(result) diff --git a/config/buffalogs/ingestion.json b/config/buffalogs/ingestion.json index bc27d82d..ec3f6216 100644 --- a/config/buffalogs/ingestion.json +++ b/config/buffalogs/ingestion.json @@ -20,7 +20,7 @@ "source.geo.location.lon": "lon", "source.intelligence_category": "intelligence_category" }, - "__custom_fields__" : ["url", "username", "password", "timeout", "indexes"] + "__custom_fields__": ["url", "username", "password", "timeout", "indexes"] }, "opensearch": { "url": "http://opensearch:9200/", @@ -42,7 +42,7 @@ "source.geo.location.lon": "lon", "source.intelligence_category": "intelligence_category" }, - "__custom_fields__" : ["url", "username", "password", "timeout", "indexes"] + "__custom_fields__": ["url", "username", "password", "timeout", "indexes"] }, "splunk": { "host": "splunk", @@ -66,6 +66,39 @@ "index": "index", "source.intelligence_category": "intelligence_category" }, - "__custom_fields__" : ["host","port", "scheme", "username", "password", "timeout", "indexes"] - } -} + "__custom_fields__": ["host", "port", "scheme", "username", "password", "timeout", "indexes"] + }, + "cloudtrail": { + "bucket_name": "your-cloudtrail-logs-bucket-name", + "region": "us-east-1", + "account_id": "123456789012", + "aws_access_key_id": "AKIAXXXXXXXXXXXXXXXX", + "aws_secret_access_key": "your-secret-key-here", + "prefix_template": "AWSLogs/{account_id}/CloudTrail/{region}/", + "geo_db_path": "/etc/buffalogs/GeoLite2-City.mmdb", + "timeout": 90, + "bucket_size": 10000, + "custom_mapping": { + "@timestamp": "timestamp", + "_id": "id", + "_index": "index", + "user.name": "username", + "source.ip": "ip", + "user_agent.original": "agent", + "source.as.organization.name": "organization", + "source.geo.country_name": "country", + "source.geo.location.lat": "lat", + "source.geo.location.lon": "lon", + "source.intelligence_category": "intelligence_category" + }, + "__custom_fields__": [ + "bucket_name", + "region", + "account_id", + "aws_access_key_id", + "aws_secret_access_key", + "prefix_template", + "geo_db_path" + ] + } +} \ No newline at end of file diff --git a/django-buffalogs/setup.cfg b/django-buffalogs/setup.cfg index ce626846..25ca0bd3 100644 --- a/django-buffalogs/setup.cfg +++ b/django-buffalogs/setup.cfg @@ -31,4 +31,6 @@ install_requires = python-dateutil>=2.9.0 requests>=2.32.5 ua-parser>=1.0.1 + boto3>=1.35.0 + geoip2>=4.8.0 uwsgi>=2.0.31 diff --git a/docs/ingestion/cloudtrail.md b/docs/ingestion/cloudtrail.md new file mode 100644 index 00000000..a671de41 --- /dev/null +++ b/docs/ingestion/cloudtrail.md @@ -0,0 +1,111 @@ +```markdown +# AWS CloudTrail Ingestion + +BuffaLogs now supports ingesting login-related events directly from **AWS CloudTrail** logs stored in an S3 bucket. +This enables detection of impossible travel, anomalous logins from new countries or IPs, root activity, anonymous access attempts, role assumption abuse, and other suspicious identity behavior in AWS environments. + +## Why CloudTrail? + +- Captures console sign-ins (`ConsoleLogin`), role assumptions (`AssumeRole`, `AssumeRoleWithSAML`, `AssumeRoleWithWebIdentity`), temporary credentials (`GetSessionToken`), and more. +- Fills a major gap for cloud-native environments — current sources are mostly on-prem (SSH, Apache) or specific IdPs. +- Combined with GeoIP enrichment, it allows full impossible travel and anomaly detection in AWS accounts. + +## Prerequisites + +1. **CloudTrail must be enabled and logging to S3** + - Create a **trail** (preferably multi-region) + - Enable **management events** (and optionally global service events) + - Choose an S3 bucket to store the logs + +2. **IAM permissions** + - `s3:ListBucket` + `s3:GetObject` on the CloudTrail bucket + - Best practice: Use an **IAM role** (if BuffaLogs runs in AWS/EC2/Lambda/Fargate) instead of long-lived access keys + +3. **GeoLite2 City database for IP → country/lat/lon enrichment** + - Free download: https://www.maxmind.com/en/accounts/current/geoip/downloads (requires free account signup) + - File: `GeoLite2-City.mmdb` + - Recommended location: `/etc/buffalogs/GeoLite2-City.mmdb` (or any path you specify in config) + +## Configuration + +Edit `config/buffalogs/ingestion.json` and add/replace with: + +```json +{ + "active_ingestion": "cloudtrail", + "cloudtrail": { + "bucket_name": "your-cloudtrail-logs-bucket-name", + "region": "us-east-1", + "account_id": "123456789012", + "aws_access_key_id": "AKIAXXXXXXXXXXXXXXXX", // optional — remove if using IAM role / env vars + "aws_secret_access_key": "your-secret-key-here", // optional — remove if using IAM role / env vars + "prefix_template": "AWSLogs/{account_id}/CloudTrail/{region}/", + "geo_db_path": "/etc/buffalogs/GeoLite2-City.mmdb", + "timeout": 90, + "bucket_size": 10000 + } +} +``` + +### Docker Compose volume example (for GeoLite2 DB) + +```yaml +services: + buffalogs: + volumes: + - ./GeoLite2-City.mmdb:/etc/buffalogs/GeoLite2-City.mmdb:ro +``` + +## How It Works + +- Scans S3 day-by-day for the requested time range +- Downloads and decompresses `.json.gz` files +- Filters only **successful** login/identity events: + - `ConsoleLogin` (main console sign-in) + - `AssumeRole`, `AssumeRoleWithSAML`, `AssumeRoleWithWebIdentity`, `GetSessionToken` +- Enriches public source IPs with country + lat/lon using MaxMind GeoLite2 +- Skips: + - Failed events (have `errorCode`) + - Internal AWS IPs / localhost / private ranges + - Events without usable geo data +- Normalizes fields to match the format expected by impossible travel detection + +## Testing & Verification + +1. Set `"active_ingestion": "cloudtrail"` in `ingestion.json` +2. Restart services (`docker compose down && docker compose up -d`) +3. Trigger analysis (via UI or celery beat/task) +4. Check container logs for lines like: + ``` + Processing users from 2025-10-01 to 2025-10-02 + Processing logins for user alice from ... + ``` +5. Look in the BuffaLogs dashboard for new anomalies (especially impossible travel between distant locations) + +## Troubleshooting + +| Issue | Possible Cause / Fix | +|------------------------------------|---------------------------------------------------------------------------------------| +| No events ingested | CloudTrail not writing management events, wrong bucket/region/account_id, empty time range | +| Geo fields empty / anomalies missing | GeoLite2-City.mmdb missing, wrong path, or IP not found in database | +| S3 access denied | Wrong credentials, missing IAM permissions, or bucket policy blocking | +| Slow ingestion on large ranges | CloudTrail logs can be voluminous — test with 1–3 days first | +| "GeoIP database not found" warning | Mount the `.mmdb` file correctly or update `geo_db_path` | + +## Security Recommendations + +- **Never commit access keys** to git — use environment variables, AWS Secrets Manager, or IAM roles. +- Use least-privilege IAM policy. +- Consider enabling CloudTrail log file validation and S3 bucket encryption. + +## Future Enhancements (ideas for follow-up PRs) + +- Support for data events or CloudTrail Lake queries +- AS/ISP enrichment (requires paid MaxMind or external service) +- Caching / incremental ingestion +- Filtering specific event sources or users via config + +Enjoy cloud threat hunting with BuffaLogs! + +Feel free to open issues or contribute improvements. +``` \ No newline at end of file