diff --git a/buffalogs/impossible_travel/ingestion/base_ingestion.py b/buffalogs/impossible_travel/ingestion/base_ingestion.py index 07498628..41576977 100644 --- a/buffalogs/impossible_travel/ingestion/base_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/base_ingestion.py @@ -26,6 +26,27 @@ def __init__(self, ingestion_config, mapping): self.ingestion_config = ingestion_config self.mapping = mapping self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + self.retry_config = self._read_retry_config() + + def _read_retry_config(self) -> dict: + """ + Read retry configuration from ingestion_config. + Provides default values if not specified. + + :return: retry configuration dictionary + :rtype: dict + """ + default_retry_config = { + "enabled": True, + "max_retries": 10, + "initial_backoff": 1, + "max_backoff": 30, + "max_elapsed_time": 60, + "jitter": True + } + + retry_config = self.ingestion_config.get("retry", {}) + return {**default_retry_config, **retry_config} @abstractmethod def process_users(self, start_date: datetime, end_date: datetime) -> list: diff --git a/buffalogs/impossible_travel/ingestion/elasticsearch_ingestion.py b/buffalogs/impossible_travel/ingestion/elasticsearch_ingestion.py index 765e3d33..f94be362 100644 --- a/buffalogs/impossible_travel/ingestion/elasticsearch_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/elasticsearch_ingestion.py @@ -2,7 +2,12 @@ from datetime import datetime from elasticsearch.dsl import Search, connections +from elasticsearch.exceptions import ConnectionError as ESConnectionError +from elasticsearch.exceptions import ConnectionTimeout +from elastic_transport import ConnectionError as TransportConnectionError +from elastic_transport import ConnectionTimeout as TransportConnectionTimeout from impossible_travel.ingestion.base_ingestion import BaseIngestion +from impossible_travel.utils.connection_retry import create_retry_decorator class ElasticsearchIngestion(BaseIngestion): @@ -15,9 +20,62 @@ def __init__(self, ingestion_config: dict, mapping: dict): Constructor for the Elasticsearch Ingestion object """ super().__init__(ingestion_config, mapping) - # create the elasticsearch host connection - connections.create_connection(hosts=self.ingestion_config["url"], request_timeout=self.ingestion_config["timeout"], verify_certs=False) - self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + self._initialize_connection() + + def _initialize_connection(self): + """Initialize Elasticsearch connection with retry logic.""" + retry_decorator = create_retry_decorator( + retry_config=self.retry_config, + exception_types=( + TransportConnectionError, + TransportConnectionTimeout, + ESConnectionError, + ConnectionTimeout, + ConnectionError, + TimeoutError, + OSError, + ), + operation_name="Elasticsearch connection" + ) + + @retry_decorator + def _connect(): + connections.create_connection( + hosts=self.ingestion_config["url"], + request_timeout=self.ingestion_config["timeout"], + verify_certs=False, + ) + conn = connections.get_connection() + conn.cluster.health() + self.logger.info(f"Successfully connected to Elasticsearch at {self.ingestion_config['url']}") + + try: + _connect() + except Exception as e: + self.logger.error(f"Failed to connect to Elasticsearch after all retry attempts: {e}") + raise + + def _execute_search(self, search_obj): + """Execute search with retry logic.""" + retry_decorator = create_retry_decorator( + retry_config=self.retry_config, + exception_types=( + TransportConnectionError, + TransportConnectionTimeout, + ESConnectionError, + ConnectionTimeout, + ConnectionError, + TimeoutError, + OSError, + ), + operation_name="Elasticsearch search" + ) + + @retry_decorator + def _execute(): + return search_obj.execute() + + return _execute() def process_users(self, start_date: datetime, end_date: datetime) -> list: """ @@ -31,7 +89,6 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list: :return: list of users strings that logged in Elasticsearch :rtype: list """ - response = None self.logger.info(f"Starting at: {start_date} Finishing at: {end_date}") users_list = [] s = ( @@ -43,21 +100,17 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list: .query("exists", field="user.name") ) s.aggs.bucket("login_user", "terms", field="user.name", size=self.ingestion_config["bucket_size"]) - try: - response = s.execute() - except ConnectionError: - self.logger.error(f"Failed to establish a connection with host: {connections.get_connection()}") - except TimeoutError: - self.logger.error(f"Timeout reached for the host: {connections.get_connection()}") - except Exception as e: - self.logger.error(f"Exception while quering elasticsearch: {e}") - if response: - if response.aggregations: + try: + response = self._execute_search(s) + if response and response.aggregations: self.logger.info(f"Successfully got {len(response.aggregations.login_user.buckets)} users") for user in response.aggregations.login_user.buckets: - if user.key: # exclude not well-formatted usernames (e.g. "") + if user.key: users_list.append(user.key) + except Exception as e: + self.logger.error(f"Failed to retrieve users from Elasticsearch: {e}") + raise return users_list @@ -75,7 +128,6 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username :return: list of the logins (dictionaries) for that username :rtype: list of dicts """ - response = None user_logins = [] s = ( Search(index=self.ingestion_config["indexes"]) @@ -100,29 +152,24 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username "source.intelligence_category", ] ) - .sort("@timestamp") # from the oldest to the most recent login + .sort("@timestamp") .extra(size=self.ingestion_config["bucket_size"]) ) + try: - response = s.execute() - except ConnectionError: - self.logger.error(f"Failed to establish a connection with host: {connections.get_connection()}") - except TimeoutError: - self.logger.error(f"Timeout reached for the host: {connections.get_connection()}") + response = self._execute_search(s) + if response: + self.logger.info(f"Got {len(response)} logins for the user {username} to be normalized") + for hit in response.hits.hits: + hit_dict = hit.to_dict() + tmp = { + "_index": "fw-proxy" if hit_dict.get("_index", "").startswith("fw-") else hit_dict.get("_index", "").split("-")[0], + "_id": hit_dict["_id"], + } + tmp.update(hit_dict["_source"]) + user_logins.append(tmp) except Exception as e: - self.logger.error(f"Exception while quering elasticsearch: {e}") - - # create a single standard dict (with the required fields listed in the ingestion.json config file) for each login - if response: - self.logger.info(f"Got {len(response)} logins for the user {username} to be normalized") - - for hit in response.hits.hits: - hit_dict = hit.to_dict() - tmp = { - "_index": "fw-proxy" if hit_dict.get("_index", "").startswith("fw-") else hit_dict.get("_index", "").split("-")[0], - "_id": hit_dict["_id"], - } - tmp.update(hit_dict["_source"]) - user_logins.append(tmp) + self.logger.error(f"Failed to retrieve logins for user {username}: {e}") + raise return user_logins diff --git a/buffalogs/impossible_travel/ingestion/opensearch_ingestion.py b/buffalogs/impossible_travel/ingestion/opensearch_ingestion.py index ec042943..56d0660e 100644 --- a/buffalogs/impossible_travel/ingestion/opensearch_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/opensearch_ingestion.py @@ -2,6 +2,7 @@ from datetime import datetime from impossible_travel.ingestion.base_ingestion import BaseIngestion +from impossible_travel.utils.connection_retry import create_retry_decorator try: from opensearchpy import OpenSearch @@ -19,13 +20,45 @@ def __init__(self, ingestion_config: dict, mapping: dict): Constructor for the Opensearch Ingestion object """ super().__init__(ingestion_config, mapping) - # create the opensearch host connection - self.client = OpenSearch( - hosts=[self.ingestion_config["url"]], - timeout=self.ingestion_config["timeout"], - verify_certs=False, + self._initialize_connection() + + def _initialize_connection(self): + """Initialize OpenSearch connection with retry logic.""" + retry_decorator = create_retry_decorator( + retry_config=self.retry_config, + exception_types=(ConnectionError, TimeoutError, OSError), + operation_name="OpenSearch connection" + ) + + @retry_decorator + def _connect(): + self.client = OpenSearch( + hosts=[self.ingestion_config["url"]], + timeout=self.ingestion_config["timeout"], + verify_certs=False, + ) + self.client.cluster.health() + self.logger.info(f"Successfully connected to OpenSearch at {self.ingestion_config['url']}") + + try: + _connect() + except Exception as e: + self.logger.error(f"Failed to connect to OpenSearch after all retry attempts: {e}") + raise + + def _execute_search(self, query: dict): + """Execute search with retry logic.""" + retry_decorator = create_retry_decorator( + retry_config=self.retry_config, + exception_types=(ConnectionError, TimeoutError, OSError), + operation_name="OpenSearch search" ) - self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + + @retry_decorator + def _execute(): + return self.client.search(index=self.ingestion_config["indexes"], body=query) + + return _execute() def process_users(self, start_date: datetime, end_date) -> list: """ @@ -39,7 +72,6 @@ def process_users(self, start_date: datetime, end_date) -> list: :return: list of users strings that logged in Opensearch :rtype: list """ - response = None # Initialize the response variable self.logger.info(f"Starting at: {start_date} Finishing at: {end_date}") users_list = [] query = { @@ -54,22 +86,19 @@ def process_users(self, start_date: datetime, end_date) -> list: ] } }, - # making change to already committed code on the basis that the dynamic templates stores all strings as keywords so having user.name.keyword will cause program to fail "aggs": {"login_user": {"terms": {"field": "user.name", "size": self.ingestion_config["bucket_size"]}}}, } + try: - response = self.client.search(index=self.ingestion_config["indexes"], body=query) - except ConnectionError: - self.logger.error(f"Failed to establish a connection with host: {self.client}") - except TimeoutError: - self.logger.error(f"Timeout reached for the host: {self.client}") + response = self._execute_search(query) + if response and "aggregations" in response and "login_user" in response["aggregations"]: + self.logger.info(f"Successfully got {len(response['aggregations']['login_user']['buckets'])} users") + for user in response["aggregations"]["login_user"]["buckets"]: + users_list.append(user["key"]) except Exception as e: - self.logger.error(f"Exception while quering opensearch: {e}") - # Only access response if it exists - if response and "aggregations" in response and "login_user" in response["aggregations"]: - self.logger.info(f"Successfully got {len(response['aggregations']['login_user']['buckets'])} users") - for user in response["aggregations"]["login_user"]["buckets"]: - users_list.append(user["key"]) + self.logger.error(f"Failed to retrieve users from OpenSearch: {e}") + raise + return users_list def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list: @@ -82,7 +111,6 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username :return: list of the logins (dictionaries) for that specified username :rtype: list of dicts """ - response = None # Initialize the response variable user_logins = [] query = { "query": { @@ -112,26 +140,20 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username ], "size": self.ingestion_config["bucket_size"], } + try: - response = self.client.search(index=self.ingestion_config["indexes"], body=query) - except ConnectionError: - self.logger.error(f"Failed to establish a connection with host:{self.client}") - except TimeoutError: - self.logger.error(f"Timeout reached for the host:{self.client}") + response = self._execute_search(query) + if response and "hits" in response and "hits" in response["hits"]: + self.logger.info(f"Got {len(response['hits']['hits'])} logins for the user {username} to be normalized") + for hit in response["hits"]["hits"]: + tmp = { + "_index": "fw-proxy" if hit.get("_index", "").startswith("fw-") else hit.get("_index", "").split("-")[0], + "_id": hit["_id"], + } + tmp.update(hit["_source"]) + user_logins.append(tmp) except Exception as e: - self.logger.error(f"Exception while querying opensearch:{e}") - # only access response if it exists and has the expected structure - if response and "hits" in response and "hits" in response["hits"]: - # Process hits into standardized format - self.logger.info(f"Got {len(response['hits']['hits'])} logins or the user {username} to be normalized") - - for hit in response["hits"]["hits"]: - tmp = { - "_index": "fw-proxy" if hit.get("_index", "").startswith("fw-") else hit.get("_index", "").split("-")[0], - "_id": hit["_id"], - } - # Add source data to the tmp dict - tmp.update(hit["_source"]) - user_logins.append(tmp) + self.logger.error(f"Failed to retrieve logins for user {username}: {e}") + raise return user_logins diff --git a/buffalogs/impossible_travel/ingestion/splunk_ingestion.py b/buffalogs/impossible_travel/ingestion/splunk_ingestion.py index 4f095b05..9f7d04d3 100644 --- a/buffalogs/impossible_travel/ingestion/splunk_ingestion.py +++ b/buffalogs/impossible_travel/ingestion/splunk_ingestion.py @@ -1,11 +1,13 @@ import logging from datetime import datetime +from impossible_travel.ingestion.base_ingestion import BaseIngestion +from impossible_travel.utils.connection_retry import create_retry_decorator + try: from splunklib import client, results except ImportError: pass -from impossible_travel.ingestion.base_ingestion import BaseIngestion class SplunkIngestion(BaseIngestion): @@ -18,8 +20,18 @@ def __init__(self, ingestion_config: dict, mapping: dict): Constructor for the Splunk Ingestion object """ super().__init__(ingestion_config, mapping) - try: - # Create the Splunk host connection + self._initialize_connection() + + def _initialize_connection(self): + """Initialize Splunk connection with retry logic.""" + retry_decorator = create_retry_decorator( + retry_config=self.retry_config, + exception_types=(ConnectionError, TimeoutError, OSError), + operation_name="Splunk connection" + ) + + @retry_decorator + def _connect(): self.service = client.connect( host=self.ingestion_config.get("host", "localhost"), port=self.ingestion_config.get("port", 8089), @@ -27,11 +39,31 @@ def __init__(self, ingestion_config: dict, mapping: dict): password=self.ingestion_config.get("password"), scheme=self.ingestion_config.get("scheme", "http"), ) - except ConnectionError as e: - logging.error("Failed to establish a connection: %s", e) - self.service = None + self.service.apps.list() + self.logger.info(f"Successfully connected to Splunk at {self.ingestion_config.get('host')}") + + try: + _connect() + except Exception as e: + self.logger.error(f"Failed to connect to Splunk after all retry attempts: {e}") + raise + + def _execute_search(self, query: str, search_kwargs: dict): + """Execute Splunk search with retry logic.""" + retry_decorator = create_retry_decorator( + retry_config=self.retry_config, + exception_types=(ConnectionError, TimeoutError, OSError), + operation_name="Splunk search" + ) + + @retry_decorator + def _execute(): + search_job = self.service.jobs.create(query, **search_kwargs) + while not search_job.is_done(): + search_job.refresh() + return results.ResultsReader(search_job.results()) - self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + return _execute() def process_users(self, start_date: datetime, end_date: datetime) -> list: """ @@ -64,24 +96,17 @@ def process_users(self, start_date: datetime, end_date: datetime) -> list: "count": self.ingestion_config.get("bucket_size", 10000), } - search_job = self.service.jobs.create(query, **search_kwargs) - - while not search_job.is_done(): - search_job.refresh() - - results_reader = results.ResultsReader(search_job.results()) + results_reader = self._execute_search(query, search_kwargs) for result in results_reader: if isinstance(result, dict) and "user.name" in result: users_list.append(result["user.name"]) self.logger.info(f"Successfully got {len(users_list)} users") - except ConnectionError: - self.logger.error(f"Failed to establish a connection with host: {self.ingestion_config.get('host')}") - except TimeoutError: - self.logger.error(f"Timeout reached for the host: {self.ingestion_config.get('host')}") except Exception as e: - self.logger.error(f"Exception while querying Splunk: {e}") + self.logger.error(f"Failed to retrieve users from Splunk: {e}") + raise + return users_list def process_user_logins(self, start_date: datetime, end_date: datetime, username: str) -> list: @@ -110,6 +135,7 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username source.intelligence_category | sort 0 @timestamp """ + try: search_kwargs = { "earliest_time": start_date_str, @@ -118,23 +144,15 @@ def process_user_logins(self, start_date: datetime, end_date: datetime, username "count": self.ingestion_config.get("bucket_size", 10000), } - search_job = self.service.jobs.create(query, **search_kwargs) - - # Wait for the job to complete - while not search_job.is_done(): - search_job.refresh() - - results_reader = results.ResultsReader(search_job.results()) + results_reader = self._execute_search(query, search_kwargs) for result in results_reader: if isinstance(result, dict): response.append(result) self.logger.info(f"Got {len(response)} logins for user {username} to be normalized") - except ConnectionError: - self.logger.error(f"Failed to establish a connection with host: {self.ingestion_config.get('host')}") - except TimeoutError: - self.logger.error(f"Timeout reached for the host: {self.ingestion_config.get('host')}") except Exception as e: - self.logger.error(f"Exception while querying Splunk: {e}") + self.logger.error(f"Failed to retrieve logins for user {username}: {e}") + raise + return response diff --git a/buffalogs/impossible_travel/tests/ingestion/test_elasticsearch_ingestion.py b/buffalogs/impossible_travel/tests/ingestion/test_elasticsearch_ingestion.py index b9c6969c..1bef00bb 100644 --- a/buffalogs/impossible_travel/tests/ingestion/test_elasticsearch_ingestion.py +++ b/buffalogs/impossible_travel/tests/ingestion/test_elasticsearch_ingestion.py @@ -21,13 +21,11 @@ def setUp(self): self.template = load_index_template("example_template") connections.create_connection(hosts=self.elastic_config["url"], request_timeout=self.ingestion_config["elasticsearch"]["timeout"]) self._load_elastic_template_on_elastic(template_to_be_added=self.template) - # load test data into the 2 indexes: cloud-* and fw-proxy-* self._load_test_data_on_elastic(data_to_be_added=self.list_to_be_added_cloud, index="cloud-test_data") self._load_test_data_on_elastic(data_to_be_added=self.list_to_be_added_fw_proxy, index="fw-proxy-test_data") def _load_elastic_template_on_elastic(self, template_to_be_added): response = self.es.indices.put_index_template(name="example_template", body=template_to_be_added) - # check that the template has been uploaded correctly self.assertTrue(response["acknowledged"]) def tearDown(self) -> None: @@ -41,39 +39,34 @@ def _bulk_gendata(self, index: str, data_list: list): def _load_test_data_on_elastic(self, data_to_be_added: List[dict], index: str): bulk(self.es, self._bulk_gendata(index, data_to_be_added), refresh="true") - # check that the data on the Elastic index has been uploaded correctly count = self.es.count(index=index)["count"] self.assertTrue(count > 0) def test_process_users_ConnectionError(self): - # test the function process_users with the exception ConnectionError + self.elastic_config["retry"] = {"enabled": False} self.elastic_config["url"] = "http://unexisting-url:8888" start_date = datetime(2025, 2, 26, 11, 30, tzinfo=timezone.utc) end_date = datetime(2025, 2, 26, 12, 00, tzinfo=timezone.utc) - elastic_ingestor = ElasticsearchIngestion(ingestion_config=self.elastic_config, mapping=self.elastic_config["custom_mapping"]) - with self.assertLogs(elastic_ingestor.logger, level="ERROR"): - elastic_ingestor.process_users(start_date, end_date) + with self.assertRaises(Exception): + elastic_ingestor = ElasticsearchIngestion(ingestion_config=self.elastic_config, mapping=self.elastic_config["custom_mapping"]) def test_process_users_TimeoutError(self): - # test the function process_users with the exception TimeoutError + self.elastic_config["retry"] = {"enabled": False} self.elastic_config["timeout"] = 0.001 start_date = datetime(2025, 2, 26, 11, 30, tzinfo=timezone.utc) end_date = datetime(2025, 2, 26, 12, 00, tzinfo=timezone.utc) - elastic_ingestor = ElasticsearchIngestion(ingestion_config=self.elastic_config, mapping=self.elastic_config["custom_mapping"]) - with self.assertLogs(elastic_ingestor.logger, level="ERROR"): - elastic_ingestor.process_users(start_date, end_date) + with self.assertRaises(Exception): + elastic_ingestor = ElasticsearchIngestion(ingestion_config=self.elastic_config, mapping=self.elastic_config["custom_mapping"]) def test_process_users_Exception(self): - # test the function process_users with a generic exception (e.g. for wrong indexes) self.elastic_config["indexes"] = "unexisting-index" start_date = datetime(2025, 2, 26, 11, 30, tzinfo=timezone.utc) end_date = datetime(2025, 2, 26, 12, 00, tzinfo=timezone.utc) elastic_ingestor = ElasticsearchIngestion(ingestion_config=self.elastic_config, mapping=self.elastic_config["custom_mapping"]) - with self.assertLogs(elastic_ingestor.logger, level="ERROR"): + with self.assertRaises(Exception): elastic_ingestor.process_users(start_date, end_date) def test_process_users_no_data(self): - # test the function process_users with no data in that range time start_date = datetime(2025, 2, 26, 11, 30, tzinfo=timezone.utc) end_date = datetime(2025, 2, 26, 12, 00, tzinfo=timezone.utc) elastic_ingestor = ElasticsearchIngestion(ingestion_config=self.elastic_config, mapping=self.elastic_config["custom_mapping"]) diff --git a/buffalogs/impossible_travel/utils/connection_retry.py b/buffalogs/impossible_travel/utils/connection_retry.py new file mode 100644 index 00000000..089ddae7 --- /dev/null +++ b/buffalogs/impossible_travel/utils/connection_retry.py @@ -0,0 +1,57 @@ +import logging +from typing import Tuple, Type + +import backoff + +logger = logging.getLogger(__name__) + + +def create_retry_decorator( + retry_config: dict, + exception_types: Tuple[Type[Exception], ...], + operation_name: str = "Connection" +): + """ + Create a retry decorator with exponential backoff based on configuration. + + :param retry_config: Dictionary containing retry configuration + :param exception_types: Tuple of exception types to retry on + :param operation_name: Name of the operation for logging + :return: Configured backoff decorator + """ + def on_giveup_handler(details): + exception = details.get("exception") + exception_msg = str(exception) if exception is not None else "" + logger.error( + f"{operation_name} failed after all retry attempts. " + f"Last error: {exception_msg}" + ) + + def on_backoff_handler(details): + exception = details.get("exception") + exception_msg = str(exception) if exception is not None else "" + logger.warning( + f"{operation_name} attempt {details['tries']} failed. " + f"Retrying in {details['wait']:.2f}s... " + f"(elapsed: {details['elapsed']:.2f}s). " + f"Error: {exception_msg}" + ) + + if not retry_config.get("enabled", True): + return lambda func: func + + decorator_kwargs = { + "wait_gen": backoff.expo, + "exception": exception_types, + "max_tries": retry_config.get("max_retries", 10), + "max_time": retry_config.get("max_elapsed_time", 60), + "base": retry_config.get("initial_backoff", 1), + "max_value": retry_config.get("max_backoff", 30), + "on_backoff": on_backoff_handler, + "on_giveup": on_giveup_handler, + } + + if retry_config.get("jitter", True): + decorator_kwargs["jitter"] = backoff.full_jitter + + return backoff.on_exception(**decorator_kwargs) diff --git a/build/Dockerfile b/build/Dockerfile index 0f08eee1..e5b7a053 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -18,7 +18,13 @@ RUN pip install --no-cache-dir -r requirements.txt \ COPY buffalogs/ /opt/certego/buffalogs/ -RUN chmod +x /opt/certego/buffalogs/run.sh; chmod +x /opt/certego/buffalogs/run_worker.sh; chmod +x /opt/certego/buffalogs/run_beat.sh; + +# Fix line endings and set execute permissions +RUN apt-get update && apt-get install -y dos2unix \ + && dos2unix /opt/certego/buffalogs/run.sh /opt/certego/buffalogs/run_worker.sh /opt/certego/buffalogs/run_beat.sh \ + && chmod +x /opt/certego/buffalogs/run.sh /opt/certego/buffalogs/run_worker.sh /opt/certego/buffalogs/run_beat.sh \ + && apt-get -y purge dos2unix && apt-get -y autoremove && apt-get -y clean + WORKDIR /opt/certego/buffalogs/ -CMD [ "./run.sh" ] +CMD ["/bin/bash", "/opt/certego/buffalogs/run.sh"] diff --git a/config/buffalogs/ingestion.json b/config/buffalogs/ingestion.json index bc27d82d..5c681023 100644 --- a/config/buffalogs/ingestion.json +++ b/config/buffalogs/ingestion.json @@ -1,71 +1,103 @@ { - "active_ingestion": "elasticsearch", - "elasticsearch": { - "url": "http://elasticsearch:9200/", - "username": "foobar", - "password": "bar", - "timeout": 90, - "indexes": "cloud-*,fw-proxy-*", - "bucket_size": 10000, - "custom_mapping": { - "@timestamp": "timestamp", - "_id": "id", - "_index": "index", - "user.name": "username", - "source.ip": "ip", - "user_agent.original": "agent", - "source.as.organization.name": "organization", - "source.geo.country_name": "country", - "source.geo.location.lat": "lat", - "source.geo.location.lon": "lon", - "source.intelligence_category": "intelligence_category" - }, - "__custom_fields__" : ["url", "username", "password", "timeout", "indexes"] + "active_ingestion": "elasticsearch", + "elasticsearch": { + "url": "http://elasticsearch:9200/", + "username": "foobar", + "password": "bar", + "timeout": 90, + "retry": { + "enabled": true, + "max_retries": 10, + "initial_backoff": 1, + "max_backoff": 30, + "max_elapsed_time": 60, + "jitter": true }, - "opensearch": { - "url": "http://opensearch:9200/", - "username": "foobar", - "password": "bar", - "timeout": 90, - "indexes": "cloud-*,fw-proxy-*", - "bucket_size": 10000, - "custom_mapping": { - "@timestamp": "timestamp", - "_id": "id", - "_index": "index", - "user.name": "username", - "source.ip": "ip", - "user_agent.original": "agent", - "source.as.organization.name": "organization", - "source.geo.country_name": "country", - "source.geo.location.lat": "lat", - "source.geo.location.lon": "lon", - "source.intelligence_category": "intelligence_category" - }, - "__custom_fields__" : ["url", "username", "password", "timeout", "indexes"] + "indexes": "cloud-*,fw-proxy-*", + "bucket_size": 10000, + "custom_mapping": { + "@timestamp": "timestamp", + "_id": "id", + "_index": "index", + "user.name": "username", + "source.ip": "ip", + "user_agent.original": "agent", + "source.as.organization.name": "organization", + "source.geo.country_name": "country", + "source.geo.location.lat": "lat", + "source.geo.location.lon": "lon", + "source.intelligence_category": "intelligence_category" }, - "splunk": { - "host": "splunk", - "port": 8089, - "scheme": "http", - "username": "foobar", - "password": "bar", - "timeout": 90, - "indexes": "main", - "bucket_size": 10000, - "custom_mapping": { - "user.name": "username", - "@timestamp": "timestamp", - "source.ip": "ip", - "source.geo.country_name": "country", - "source.geo.location.lat": "lat", - "source.geo.location.lon": "lon", - "user_agent.original": "agent", - "source.as.organization.name": "organization", - "_id": "id", - "index": "index", - "source.intelligence_category": "intelligence_category" - }, - "__custom_fields__" : ["host","port", "scheme", "username", "password", "timeout", "indexes"] - } + "__custom_fields__": ["url", "username", "password", "timeout", "indexes"] + }, + "opensearch": { + "url": "http://opensearch:9200/", + "username": "foobar", + "password": "bar", + "timeout": 90, + "retry": { + "enabled": true, + "max_retries": 10, + "initial_backoff": 1, + "max_backoff": 30, + "max_elapsed_time": 60, + "jitter": true + }, + "indexes": "cloud-*,fw-proxy-*", + "bucket_size": 10000, + "custom_mapping": { + "@timestamp": "timestamp", + "_id": "id", + "_index": "index", + "user.name": "username", + "source.ip": "ip", + "user_agent.original": "agent", + "source.as.organization.name": "organization", + "source.geo.country_name": "country", + "source.geo.location.lat": "lat", + "source.geo.location.lon": "lon", + "source.intelligence_category": "intelligence_category" + }, + "__custom_fields__": ["url", "username", "password", "timeout", "indexes"] + }, + "splunk": { + "host": "splunk", + "port": 8089, + "scheme": "http", + "username": "foobar", + "password": "bar", + "timeout": 90, + "retry": { + "enabled": true, + "max_retries": 10, + "initial_backoff": 1, + "max_backoff": 30, + "max_elapsed_time": 60, + "jitter": true + }, + "indexes": "main", + "bucket_size": 10000, + "custom_mapping": { + "user.name": "username", + "@timestamp": "timestamp", + "source.ip": "ip", + "source.geo.country_name": "country", + "source.geo.location.lat": "lat", + "source.geo.location.lon": "lon", + "user_agent.original": "agent", + "source.as.organization.name": "organization", + "_id": "id", + "index": "index", + "source.intelligence_category": "intelligence_category" + }, + "__custom_fields__": [ + "host", + "port", + "scheme", + "username", + "password", + "timeout", + "indexes" + ] + } }