Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions buffalogs/impossible_travel/ingestion/base_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,27 @@ def __init__(self, ingestion_config, mapping):
self.ingestion_config = ingestion_config
self.mapping = mapping
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self.retry_config = self._read_retry_config()

def _read_retry_config(self) -> dict:
"""
Read retry configuration from ingestion_config.
Provides default values if not specified.

:return: retry configuration dictionary
:rtype: dict
"""
default_retry_config = {
"enabled": True,
"max_retries": 10,
"initial_backoff": 1,
"max_backoff": 30,
"max_elapsed_time": 60,
"jitter": True
}

retry_config = self.ingestion_config.get("retry", {})
return {**default_retry_config, **retry_config}

@abstractmethod
def process_users(self, start_date: datetime, end_date: datetime) -> list:
Expand Down
119 changes: 83 additions & 36 deletions buffalogs/impossible_travel/ingestion/elasticsearch_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@
from datetime import datetime

from elasticsearch.dsl import Search, connections
from elasticsearch.exceptions import ConnectionError as ESConnectionError
from elasticsearch.exceptions import ConnectionTimeout
from elastic_transport import ConnectionError as TransportConnectionError
from elastic_transport import ConnectionTimeout as TransportConnectionTimeout
from impossible_travel.ingestion.base_ingestion import BaseIngestion
from impossible_travel.utils.connection_retry import create_retry_decorator


class ElasticsearchIngestion(BaseIngestion):
Expand All @@ -15,9 +20,62 @@ def __init__(self, ingestion_config: dict, mapping: dict):
Constructor for the Elasticsearch Ingestion object
"""
super().__init__(ingestion_config, mapping)
# create the elasticsearch host connection
connections.create_connection(hosts=self.ingestion_config["url"], request_timeout=self.ingestion_config["timeout"], verify_certs=False)
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self._initialize_connection()

def _initialize_connection(self):
"""Initialize Elasticsearch connection with retry logic."""
retry_decorator = create_retry_decorator(
retry_config=self.retry_config,
exception_types=(
TransportConnectionError,
TransportConnectionTimeout,
ESConnectionError,
ConnectionTimeout,
ConnectionError,
TimeoutError,
OSError,
),
operation_name="Elasticsearch connection"
)

@retry_decorator
def _connect():
connections.create_connection(
hosts=self.ingestion_config["url"],
request_timeout=self.ingestion_config["timeout"],
verify_certs=False,
)
conn = connections.get_connection()
conn.cluster.health()
self.logger.info(f"Successfully connected to Elasticsearch at {self.ingestion_config['url']}")

try:
_connect()
except Exception as e:
self.logger.error(f"Failed to connect to Elasticsearch after all retry attempts: {e}")
raise

def _execute_search(self, search_obj):
"""Execute search with retry logic."""
retry_decorator = create_retry_decorator(
retry_config=self.retry_config,
exception_types=(
TransportConnectionError,
TransportConnectionTimeout,
ESConnectionError,
ConnectionTimeout,
ConnectionError,
TimeoutError,
OSError,
),
operation_name="Elasticsearch search"
)

@retry_decorator
def _execute():
return search_obj.execute()

return _execute()

def process_users(self, start_date: datetime, end_date: datetime) -> list:
"""
Expand All @@ -31,7 +89,6 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list:
:return: list of users strings that logged in Elasticsearch
:rtype: list
"""
response = None
self.logger.info(f"Starting at: {start_date} Finishing at: {end_date}")
users_list = []
s = (
Expand All @@ -43,21 +100,17 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list:
.query("exists", field="user.name")
)
s.aggs.bucket("login_user", "terms", field="user.name", size=self.ingestion_config["bucket_size"])
try:
response = s.execute()
except ConnectionError:
self.logger.error(f"Failed to establish a connection with host: {connections.get_connection()}")
except TimeoutError:
self.logger.error(f"Timeout reached for the host: {connections.get_connection()}")
except Exception as e:
self.logger.error(f"Exception while quering elasticsearch: {e}")

if response:
if response.aggregations:
try:
response = self._execute_search(s)
if response and response.aggregations:
self.logger.info(f"Successfully got {len(response.aggregations.login_user.buckets)} users")
for user in response.aggregations.login_user.buckets:
if user.key: # exclude not well-formatted usernames (e.g. "")
if user.key:
users_list.append(user.key)
except Exception as e:
self.logger.error(f"Failed to retrieve users from Elasticsearch: {e}")
raise

return users_list

Expand All @@ -75,7 +128,6 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username
:return: list of the logins (dictionaries) for that username
:rtype: list of dicts
"""
response = None
user_logins = []
s = (
Search(index=self.ingestion_config["indexes"])
Expand All @@ -100,29 +152,24 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username
"source.intelligence_category",
]
)
.sort("@timestamp") # from the oldest to the most recent login
.sort("@timestamp")
.extra(size=self.ingestion_config["bucket_size"])
)

try:
response = s.execute()
except ConnectionError:
self.logger.error(f"Failed to establish a connection with host: {connections.get_connection()}")
except TimeoutError:
self.logger.error(f"Timeout reached for the host: {connections.get_connection()}")
response = self._execute_search(s)
if response:
self.logger.info(f"Got {len(response)} logins for the user {username} to be normalized")
for hit in response.hits.hits:
hit_dict = hit.to_dict()
tmp = {
"_index": "fw-proxy" if hit_dict.get("_index", "").startswith("fw-") else hit_dict.get("_index", "").split("-")[0],
"_id": hit_dict["_id"],
}
tmp.update(hit_dict["_source"])
user_logins.append(tmp)
except Exception as e:
self.logger.error(f"Exception while quering elasticsearch: {e}")

# create a single standard dict (with the required fields listed in the ingestion.json config file) for each login
if response:
self.logger.info(f"Got {len(response)} logins for the user {username} to be normalized")

for hit in response.hits.hits:
hit_dict = hit.to_dict()
tmp = {
"_index": "fw-proxy" if hit_dict.get("_index", "").startswith("fw-") else hit_dict.get("_index", "").split("-")[0],
"_id": hit_dict["_id"],
}
tmp.update(hit_dict["_source"])
user_logins.append(tmp)
self.logger.error(f"Failed to retrieve logins for user {username}: {e}")
raise

return user_logins
100 changes: 61 additions & 39 deletions buffalogs/impossible_travel/ingestion/opensearch_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime

from impossible_travel.ingestion.base_ingestion import BaseIngestion
from impossible_travel.utils.connection_retry import create_retry_decorator

try:
from opensearchpy import OpenSearch
Expand All @@ -19,13 +20,45 @@ def __init__(self, ingestion_config: dict, mapping: dict):
Constructor for the Opensearch Ingestion object
"""
super().__init__(ingestion_config, mapping)
# create the opensearch host connection
self.client = OpenSearch(
hosts=[self.ingestion_config["url"]],
timeout=self.ingestion_config["timeout"],
verify_certs=False,
self._initialize_connection()

def _initialize_connection(self):
"""Initialize OpenSearch connection with retry logic."""
retry_decorator = create_retry_decorator(
retry_config=self.retry_config,
exception_types=(ConnectionError, TimeoutError, OSError),
operation_name="OpenSearch connection"
)

@retry_decorator
def _connect():
self.client = OpenSearch(
hosts=[self.ingestion_config["url"]],
timeout=self.ingestion_config["timeout"],
verify_certs=False,
)
self.client.cluster.health()
self.logger.info(f"Successfully connected to OpenSearch at {self.ingestion_config['url']}")

try:
_connect()
except Exception as e:
self.logger.error(f"Failed to connect to OpenSearch after all retry attempts: {e}")
raise

def _execute_search(self, query: dict):
"""Execute search with retry logic."""
retry_decorator = create_retry_decorator(
retry_config=self.retry_config,
exception_types=(ConnectionError, TimeoutError, OSError),
operation_name="OpenSearch search"
)
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")

@retry_decorator
def _execute():
return self.client.search(index=self.ingestion_config["indexes"], body=query)

return _execute()

def process_users(self, start_date: datetime, end_date) -> list:
"""
Expand All @@ -39,7 +72,6 @@ def process_users(self, start_date: datetime, end_date) -> list:
:return: list of users strings that logged in Opensearch
:rtype: list
"""
response = None # Initialize the response variable
self.logger.info(f"Starting at: {start_date} Finishing at: {end_date}")
users_list = []
query = {
Expand All @@ -54,22 +86,19 @@ def process_users(self, start_date: datetime, end_date) -> list:
]
}
},
# making change to already committed code on the basis that the dynamic templates stores all strings as keywords so having user.name.keyword will cause program to fail
"aggs": {"login_user": {"terms": {"field": "user.name", "size": self.ingestion_config["bucket_size"]}}},
}

try:
response = self.client.search(index=self.ingestion_config["indexes"], body=query)
except ConnectionError:
self.logger.error(f"Failed to establish a connection with host: {self.client}")
except TimeoutError:
self.logger.error(f"Timeout reached for the host: {self.client}")
response = self._execute_search(query)
if response and "aggregations" in response and "login_user" in response["aggregations"]:
self.logger.info(f"Successfully got {len(response['aggregations']['login_user']['buckets'])} users")
for user in response["aggregations"]["login_user"]["buckets"]:
users_list.append(user["key"])
except Exception as e:
self.logger.error(f"Exception while quering opensearch: {e}")
# Only access response if it exists
if response and "aggregations" in response and "login_user" in response["aggregations"]:
self.logger.info(f"Successfully got {len(response['aggregations']['login_user']['buckets'])} users")
for user in response["aggregations"]["login_user"]["buckets"]:
users_list.append(user["key"])
self.logger.error(f"Failed to retrieve users from OpenSearch: {e}")
raise

return users_list

def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list:
Expand All @@ -82,7 +111,6 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username
:return: list of the logins (dictionaries) for that specified username
:rtype: list of dicts
"""
response = None # Initialize the response variable
user_logins = []
query = {
"query": {
Expand Down Expand Up @@ -112,26 +140,20 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username
],
"size": self.ingestion_config["bucket_size"],
}

try:
response = self.client.search(index=self.ingestion_config["indexes"], body=query)
except ConnectionError:
self.logger.error(f"Failed to establish a connection with host:{self.client}")
except TimeoutError:
self.logger.error(f"Timeout reached for the host:{self.client}")
response = self._execute_search(query)
if response and "hits" in response and "hits" in response["hits"]:
self.logger.info(f"Got {len(response['hits']['hits'])} logins for the user {username} to be normalized")
for hit in response["hits"]["hits"]:
tmp = {
"_index": "fw-proxy" if hit.get("_index", "").startswith("fw-") else hit.get("_index", "").split("-")[0],
"_id": hit["_id"],
}
tmp.update(hit["_source"])
user_logins.append(tmp)
except Exception as e:
self.logger.error(f"Exception while querying opensearch:{e}")
# only access response if it exists and has the expected structure
if response and "hits" in response and "hits" in response["hits"]:
# Process hits into standardized format
self.logger.info(f"Got {len(response['hits']['hits'])} logins or the user {username} to be normalized")

for hit in response["hits"]["hits"]:
tmp = {
"_index": "fw-proxy" if hit.get("_index", "").startswith("fw-") else hit.get("_index", "").split("-")[0],
"_id": hit["_id"],
}
# Add source data to the tmp dict
tmp.update(hit["_source"])
user_logins.append(tmp)
self.logger.error(f"Failed to retrieve logins for user {username}: {e}")
raise

return user_logins
Loading