Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions buffalogs/impossible_travel/ingestion/base_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ class SupportedIngestionSources(Enum):
* ELASTICSEARCH: The login data is extracted from Elasticsearch
* SPLUNK: The login data is extracted from Splunk
* OPENSEARCH: The login data is extracted from Opensearch
* CLOUDTRAIL: The login data is extracted from AWS CloudTrail S3 logs
"""

ELASTICSEARCH = "elasticsearch"
SPLUNK = "splunk"
OPENSEARCH = "opensearch"
CLOUDTRAIL = "cloudtrail"

def __init__(self, ingestion_config, mapping):
super().__init__()
Expand All @@ -29,12 +31,13 @@ def __init__(self, ingestion_config, mapping):

@abstractmethod
def process_users(self, start_date: datetime, end_date: datetime) -> list:
"""Abstract method that implement the extraction of the users logged in between the time range considered defined by (start_date, end_date).
This method will be different implemented based on the ingestion source used.
"""Abstract method that implement the extraction of the users logged in
between the time range considered defined by (start_date, end_date).
This method will be different implemented based on the ingestion source used. # noqa: E501

:param start_date: the initial datetime from which the users are considered
:param start_date: the initial datetime from which the users are considered # noqa: E501
:type start_date: datetime (with tzinfo=datetime.timezone.utc)
:param end_date: the final datetime within which the users are considered
:param end_date: the final datetime within which the users are considered # noqa: E501
:type end_date: datetime (with tzinfo=datetime.timezone.utc)

:return: list of users strings that logged in the system
Expand All @@ -44,14 +47,17 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list:

@abstractmethod
def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list:
"""Abstract method that implement the extraction of the logins of the given user in the time range defined by (start_date, end_date)
This method will be different implemented based on the ingestion source used.
"""Abstract method that implement the extraction of the logins of the
given user in the time range defined by (start_date, end_date)
This method will be different implemented based on the ingestion source used. # noqa: E501

:param username: username of the user that logged in
:type username: str
:param start_date: the initial datetime from which the logins of the user are considered
:param start_date: the initial datetime from which the logins of the
user are considered
:type start_date: datetime (with tzinfo=datetime.timezone.utc)
:param end_date: the final datetime within which the logins of the user are considered
:param end_date: the final datetime within which the logins of the user
are considered
:type end_date: datetime (with tzinfo=datetime.timezone.utc)

:return: list of logins of a user
Expand All @@ -60,8 +66,10 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username
raise NotImplementedError

def normalize_fields(self, logins: list) -> list:
"""Concrete method that manage the mapping into the required BuffaLogs mapping.
The mapping used is defined into the ingestion.json file "custom_mapping" if defined, otherwise it is used the default one
"""Concrete method that manage the mapping into the required BuffaLogs
mapping.
The mapping used is defined into the ingestion.json file "custom_mapping" # noqa: E501
if defined, otherwise it is used the default one

:param logins: the logins to be normalized into the mapping fields
:type logins: list
Expand Down
201 changes: 201 additions & 0 deletions buffalogs/impossible_travel/ingestion/cloudtrail_ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import gzip
import json
import logging
from datetime import datetime, timedelta

import boto3
import geoip2.database
from dateutil import parser
from geoip2.errors import AddressNotFoundError
from impossible_travel.ingestion.base_ingestion import BaseIngestion


class CloudTrailIngestion(BaseIngestion):
"""
Concrete implementation of the BaseIngestion class for AWS CloudTrail
ingestion source
"""

def __init__(self, ingestion_config: dict, mapping: dict):
"""
Constructor for the CloudTrail Ingestion object
"""
super().__init__(ingestion_config, mapping)
self.s3 = boto3.client(
"s3",
aws_access_key_id=ingestion_config.get("aws_access_key_id"),
aws_secret_access_key=ingestion_config.get("aws_secret_access_key"),
region_name=ingestion_config["region"],
)
self.bucket = ingestion_config["bucket_name"]
self.prefix_template = ingestion_config.get("prefix_template", "AWSLogs/{account_id}/CloudTrail/{region}/") # User can override
self.geo_db_path = ingestion_config.get("geo_db_path", "/etc/buffalogs/GeoLite2-City.mmdb")
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
try:
self.geo_reader = geoip2.database.Reader(self.geo_db_path)
except FileNotFoundError:
msg = f"GeoIP database not found at {self.geo_db_path}. " "Geo-enrichment disabled."
self.logger.warning(msg)
self.geo_reader = None

def get_log_files(self, start_date: datetime, end_date: datetime) -> list:
"""
Get list of S3 keys for CloudTrail logs in the time range.
Assumes standard CloudTrail prefix structure.
Handles pagination for list_objects_v2.
"""
files = []
current = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
account_id = self.ingestion_config.get("account_id", "")
region = self.ingestion_config["region"] # Required in config

while current < end_date:
date_part = f"{current.year}/{current.month:02d}/{current.day:02d}/" # noqa: E501
prefix = self.prefix_template.format(account_id=account_id, region=region) + date_part

try:
paginator = self.s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
for obj in page.get("Contents", []):
files.append(obj["Key"])
except Exception as e:
err_msg = f"Error listing S3 objects for prefix " f"{prefix}: {e}"
self.logger.error(err_msg)
current += timedelta(days=1)
return files

def extract_logins(self, files: list) -> list:
"""
Extract and parse login events from S3 files.
"""
logins = []
for key in files:
try:
obj = self.s3.get_object(Bucket=self.bucket, Key=key)
with gzip.GzipFile(fileobj=obj["Body"]) as f:
data = json.load(f)
for record in data.get("Records", []):
if self.is_login_event(record):
login = self.parse_login(record)
if login:
logins.append(login)
except Exception as e:
self.logger.error(f"Error processing S3 file {key}: {e}")
return logins

def is_login_event(self, record: dict) -> bool:
"""
Filter for relevant security/login events.
Equivalent to ES filters: authentication, success, start.
"""
if record.get("errorCode") is not None or record.get("errorMessage") is not None:
return False # Only successful events

event_name = record.get("eventName")
event_source = record.get("eventSource")

# Console sign-ins
if event_source == "signin.amazonaws.com" and event_name in ["ConsoleLogin", "CheckMfa"]:
return True

# Role assumptions (like starting a session)
if event_name in [
"AssumeRole",
"AssumeRoleWithSAML",
"AssumeRoleWithWebIdentity",
"GetSessionToken",
]:
return True

# Add more if needed, e.g., 'SwitchRole'
return False

def parse_login(self, record: dict) -> dict | None:
"""
Parse and enrich a single CloudTrail event into a format similar to ES hits. # noqa: E501
Keys match the mapping (e.g., '@timestamp', 'user.name', etc.).
"""
data = {}
data["@timestamp"] = record.get("eventTime", "")

user_identity = record.get("userIdentity", {})
session_issuer = user_identity.get("sessionContext", {}).get("sessionIssuer", {})

arn_last = user_identity.get("arn", "").split("/")[-1]
principal_last = user_identity.get("principalId", "").split(":")[-1]

data["user.name"] = user_identity.get("userName") or session_issuer.get("userName") or arn_last or principal_last or ""

data["source.ip"] = record.get("sourceIPAddress", "")
data["user_agent.original"] = record.get("userAgent", "")
data["source.as.organization.name"] = "" # Not available in CloudTrail; can enrich externally if needed

ip = data["source.ip"]
if not ip or ip == "127.0.0.1" or ip.startswith(("10.", "192.168.", "172.16.")):
return None # Skip invalid/local IPs

# Geo-enrichment
data["source.geo.country_name"] = ""
data["source.geo.location.lat"] = ""
data["source.geo.location.lon"] = ""

if self.geo_reader:
try:
geo = self.geo_reader.city(ip)
data["source.geo.country_name"] = geo.country.name or ""
data["source.geo.location.lat"] = geo.location.latitude or ""
data["source.geo.location.lon"] = geo.location.longitude or ""
except AddressNotFoundError:
self.logger.debug(f"GeoIP not found for IP: {ip}")
except Exception as e:
self.logger.error(f"GeoIP error for IP {ip}: {e}")

if not data["source.geo.country_name"]: # Skip if no geo
return None

# Intelligence category (e.g., anonymous)
ua_lower = data["user_agent.original"].lower()
is_anonymous = user_identity.get("type") == "AnonymousUser" or "tor" in ua_lower
data["source.intelligence_category"] = "anonymous" if is_anonymous else "" # noqa: E501

# Other fields
data["_id"] = record.get("eventID", "")
data["_index"] = "cloudtrail"
return data

def process_users(self, start_date: datetime, end_date: datetime) -> list:
"""
Concrete implementation of the BaseIngestion.process_users abstract method # noqa: E501
"""
msg = f"Processing users from {start_date} to {end_date}"
self.logger.info(msg)

files = self.get_log_files(start_date, end_date)
logins = self.extract_logins(files)

users = set()
for login in logins:
name = login.get("user.name")
if name:
users.add(name)
return list(users)

def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list:
"""
Concrete implementation of the BaseIngestion.process_user_logins
abstract method
"""
msg = f"Processing logins for user {username} " f"from {start_date} to {end_date}"
self.logger.info(msg)

files = self.get_log_files(start_date, end_date)
logins = self.extract_logins(files)

user_logins = []
for login in logins:
if login.get("user.name") == username:
user_logins.append(login)

# Sort by timestamp, similar to ES sort
user_logins.sort(key=lambda x: parser.parse(x["@timestamp"]))
return user_logins
9 changes: 6 additions & 3 deletions buffalogs/impossible_travel/ingestion/ingestion_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from django.conf import settings
from impossible_travel.ingestion.base_ingestion import BaseIngestion
from impossible_travel.ingestion.cloudtrail_ingestion import CloudTrailIngestion
from impossible_travel.ingestion.elasticsearch_ingestion import ElasticsearchIngestion
from impossible_travel.ingestion.opensearch_ingestion import OpensearchIngestion
from impossible_travel.ingestion.splunk_ingestion import SplunkIngestion
Expand All @@ -24,15 +25,15 @@ def _read_config(self) -> dict:
:rtype: dict
"""
with open(
os.path.join(settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json"),
os.path.join(settings.CERTEGO_BUFFALOGS_CONFIG_PATH, "buffalogs/ingestion.json"), # noqa: E501
mode="r",
encoding="utf-8",
) as f:
config = json.load(f)
if config["active_ingestion"] not in [i.value for i in BaseIngestion.SupportedIngestionSources]:
raise ValueError(f"The ingestion source: {config['active_ingestion']} is not supported")
raise ValueError(f"The ingestion source: {config['active_ingestion']} " "is not supported")
if not config.get(config["active_ingestion"]):
raise ValueError(f"The configuration for the {config['active_ingestion']} must be implemented")
raise ValueError(f"The configuration for the {config['active_ingestion']} " "must be implemented")
return config

def get_ingestion_class(self):
Expand All @@ -46,5 +47,7 @@ def get_ingestion_class(self):
return OpensearchIngestion(self.ingestion_config, self.mapping)
case BaseIngestion.SupportedIngestionSources.SPLUNK:
return SplunkIngestion(self.ingestion_config, self.mapping)
case BaseIngestion.SupportedIngestionSources.CLOUDTRAIL:
return CloudTrailIngestion(self.ingestion_config, self.mapping)
case _:
raise ValueError(f"Unsupported ingestion source: {self.active_ingestion}")
Loading
Loading