From 7f03fd994aa95bad960583f44c52d21b98687ad5 Mon Sep 17 00:00:00 2001 From: dirksteynberg Date: Fri, 18 Mar 2022 16:29:56 +0200 Subject: [PATCH 1/3] Adding postgres writing functionality --- turbo_stream/__init__.py | 10 + turbo_stream/google_analyitcs/reader.py | 62 +++++- turbo_stream/google_search_console/reader.py | 51 +++++ turbo_stream/postgresql/__init__.py | 0 turbo_stream/postgresql/writer.py | 201 +++++++++++++++++++ 5 files changed, 316 insertions(+), 8 deletions(-) create mode 100644 turbo_stream/postgresql/__init__.py create mode 100644 turbo_stream/postgresql/writer.py diff --git a/turbo_stream/__init__.py b/turbo_stream/__init__.py index 4013b53..7324ec2 100644 --- a/turbo_stream/__init__.py +++ b/turbo_stream/__init__.py @@ -149,3 +149,13 @@ def write_date_to_local(self, file_location): """ logging.info(f"Writing data to local path: {file_location}.") write_file(data=self._data_set, file_location=file_location) + + +class _WriterInterface: + """ + Turbo Stream Writer Class Interface + """ + + def __init__(self, credentials: (dict, str), configuration: dict = None, **kwargs): + self._configuration: dict = configuration + self._credentials: (dict, str) = credentials diff --git a/turbo_stream/google_analyitcs/reader.py b/turbo_stream/google_analyitcs/reader.py index e309200..faad37a 100644 --- a/turbo_stream/google_analyitcs/reader.py +++ b/turbo_stream/google_analyitcs/reader.py @@ -5,6 +5,8 @@ import logging from socket import timeout +from turbo_stream.postgresql.writer import _PostgreSQLWriter + from googleapiclient.discovery import build from googleapiclient.errors import HttpError from oauth2client.service_account import ServiceAccountCredentials @@ -24,11 +26,11 @@ class GoogleAnalyticsReader(ReaderInterface): """ def __init__( - self, - configuration: dict, - credentials: str, - service_account_email: str, - **kwargs, + self, + configuration: dict, + credentials: str, + service_account_email: str, + **kwargs, ): super().__init__(configuration, credentials) @@ -147,9 +149,16 @@ def _iterate_report(self, reports, view_id): else: row_dict[metric.get("name")] = int(value) - # add additional data - row_dict["ga:viewId"] = view_id - self._data_set.append(row_dict) + # add additional data & clean up field names + row_dict["viewId"] = view_id + new_row_dict = {} + + for key, value in row_dict.items(): + new_row_dict[key.replace("ga:", "")] = value + + print(new_row_dict) + + self._data_set.append(new_row_dict) def run_query(self): """ @@ -190,3 +199,40 @@ def run_query(self): logging.info(f"{self.__class__.__name__} process complete!") return self._data_set + + def write_data_to_postgresql( + self, credentials: dict, table_name: str, truncate_on_insert=False + ): + _writer = _PostgreSQLWriter(credentials=credentials) + + _dimensions = [dim.replace("ga:", "") for dim in self._configuration.get("dimensions", [])] + _metrics = [met.replace("ga:", "") for met in self._configuration.get("metrics", [])] + + _schema = { + 'viewId': { + "type": "VARCHAR", + "not_null": True, + } + } + + for _metric in _metrics: + _schema[_metric] = { + "type": "NUMERIC", + "not_null": True, + } + + for _dimension in _dimensions: + _schema[_dimension] = { + "type": "VARCHAR", + "not_null": True, + } + + _writer._create_table( + table_name=table_name, schema=_schema + ) + + _writer._insert_table( + table_name=table_name, + dataset=self._data_set, + truncate_on_insert=truncate_on_insert, + ) diff --git a/turbo_stream/google_search_console/reader.py b/turbo_stream/google_search_console/reader.py index 79c74c0..a3bd0e4 100644 --- a/turbo_stream/google_search_console/reader.py +++ b/turbo_stream/google_search_console/reader.py @@ -9,6 +9,8 @@ from googleapiclient.errors import HttpError from oauth2client.client import OAuth2WebServerFlow +from turbo_stream.postgresql.writer import _PostgreSQLWriter + from turbo_stream import ReaderInterface, write_file, write_file_to_s3 from turbo_stream.utils.date_handlers import date_range from turbo_stream.utils.request_handlers import request_handler, retry_handler @@ -216,3 +218,52 @@ def write_partition_data_to_s3( key=f"{path}/{partition_name}_{dimension}.{fmt}", data=partition_data, ) + + def write_data_to_postgresql( + self, credentials: dict, table_name: str, truncate_on_insert=False + ): + _writer = _PostgreSQLWriter(credentials=credentials) + _dimensions = self._configuration.get("dimensions", []) + _metrics = self._configuration.get("metrics", []) + + # for gsc, we have a base schema that has each dimension added as + # multi-metric databases in an rds setting + for _dimension in _dimensions: + _schema = { + "site_url": { + "type": "VARCHAR", + "not_null": True, + }, + "search_type": { + "type": "VARCHAR", + "not_null": True, + }, + } + + for _metric in _metrics: + _schema[_metric] = { + "type": "NUMERIC", + "not_null": True, + } + + _schema[_dimension] = { + "type": "VARCHAR", + "not_null": True, + } + + if "date" not in _schema: + _schema["date"] = { + "type": "VARCHAR", + "not_null": True, + } + + _writer._create_table( + table_name=f"{table_name}_{_dimension}", schema=_schema + ) + + for _dimension, _dimension_dataset in self._data_set[0].items(): + _writer._insert_table( + table_name=f"{table_name}_{_dimension}", + dataset=_dimension_dataset, + truncate_on_insert=truncate_on_insert, + ) diff --git a/turbo_stream/postgresql/__init__.py b/turbo_stream/postgresql/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/turbo_stream/postgresql/writer.py b/turbo_stream/postgresql/writer.py new file mode 100644 index 0000000..ed7abc5 --- /dev/null +++ b/turbo_stream/postgresql/writer.py @@ -0,0 +1,201 @@ +""" +Writer class for PostgreSQL that suits the expectations of the turbo stream services. +""" +import logging + +import psycopg2 +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT + +from turbo_stream import _WriterInterface + +logging.basicConfig( + format="%(asctime)s %(name)-12s %(levelname)-8s %(message)s", level=logging.INFO +) + + +class _PostgreSQLWriter(_WriterInterface): + def __init__(self, credentials: (dict, str), configuration: dict = None, **kwargs): + super().__init__(configuration, credentials, **kwargs) + + self._credentials = credentials + self._configuration = configuration + + self._test_cursor() + + def _connect(self): + """ + Establish connection to PostgreSQL database. + The basic connection parameters are: + - *dbname*: the database name + - *database*: the database name (only as keyword argument) + - *user*: user name used to authenticate (defaults to postgres if not provided) + - *password*: password used to authenticate + - *host*: database host address (defaults to UNIX socket if not provided) + - *port*: connection port number (defaults to 5432 if not provided) + """ + logging.info(f"Establishing a connection to PostgreSQL database.") + + if "dbname" in self._credentials: + return psycopg2.connect( + dbname=self._credentials.get("dbname"), + user=self._credentials.get("user", "postgres"), + password=self._credentials.get("password"), + host=self._credentials.get("host"), + port=self._credentials.get("port", 5432), + ) + + return psycopg2.connect( + database=self._credentials.get("database", "postgres"), + user=self._credentials.get("user", "postgres"), + password=self._credentials.get("password"), + host=self._credentials.get("host", "localhost"), + port=self._credentials.get("port", 5432), + ) + + def _test_cursor(self): + """ + Get PostgreSQL cursor for querying. + :return: Cursor object. + """ + # test the connection before returning it + try: + self._execute_query("SELECT 1") + logging.info("Connection to PostgreSQL successful.") + + except psycopg2.OperationalError as err: + logging.info("Connection to PostgreSQL failed.") + raise err + + def _execute_query(self, query, dataset=None): + # Some PostgreSQL command such as CREATE DATABASE or VACUUM can’t run into a transaction. + _connection = self._connect() + _connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + _cursor = _connection.cursor() + logging.info(query) + if dataset is None: + _cursor.execute(query) + else: + _cursor.executemany(query, dataset) + _connection.close() + + def _create_table(self, table_name: str, schema: dict): + """ + Converts a standardised dict object that represents the typical PostgreSQL + Table Schema and generates a physical database from it. + :param table_name: The name of the new table. + :param schema: The dict schema object of the new table which looks like: + schema = { + "user_id": { + "type": "INT", + "not_null": True, + "primary_key": True, + "unique": True + }, + "user_name": { + "type": "INT", + "not_null": True, + "primary_key": False, + "unique": False, + }, + } + """ + logging.info( + f"Attempting to create {table_name} table for PostgreSQL database." + ) + field_set_string = [] + count = 1 + for field_name, field_meta in schema.items(): + field_set_string.append(field_name) + + if field_meta.get("type", False): + field_set_string.append(field_meta.get("type")) + + if field_meta.get("not null", False): + field_set_string.append("NOT NULL") + + if field_meta.get("primary_key", False): + field_set_string.append("PRIMARY KEY") + + if field_meta.get("unique", False): + field_set_string.append("UNIQUE") + + if len(schema) > count: + # break each statement except for the last one + field_set_string.append(",") + + count += 1 + + query_frame_string = ( + f"CREATE TABLE IF NOT EXISTS " + f"{table_name} (ts_id SERIAL, ts_date DATE DEFAULT now(), {' '.join(field_set_string)});" + ) + + self._execute_query(query_frame_string) + + logging.info( + f"Attempting to create {table_name} table for PostgreSQL database complete." + ) + + def _insert_table(self, table_name: str, dataset: list, truncate_on_insert=False): + """ + Submits a dataset in a record orientation that fists the tables schema. + :param table_name: The table to write to. + :param dataset: The dataset that fits the table schema. + :param truncate_on_insert: Bool to clear out table before inserting dataset. + """ + + if truncate_on_insert: + logging.info(f"Attempting to truncate {table_name} table.") + query = f"TRUNCATE TABLE {table_name};" + self._execute_query(query) + + logging.info(f"Attempting to insert data into {table_name} table.") + fields = [str(f) for f in dataset[0].keys()] + + value_insert_set = [] + for field in fields: + value_insert_set.append(f"%({field})s") + + query = ( + f"INSERT INTO {table_name}({', '.join(fields)}) " + f"VALUES ({', '.join(value_insert_set)})" + ) + self._execute_query(query=query, dataset=dataset) + logging.info(f"Attempting to insert data into {table_name} table complete.") + + def _drop_duplicates(self, table_name: str, schema: dict): + """ + Query to drop all fields that may have duplicates. + This is faster because this runs only 2 queries. + First one to select all the duplicates, then one to delete all items from the table. + :param table_name: The name of the new table. + :param schema: The dict schema object of the new table which looks like: + schema = { + "user_id": { + "type": "INT", + "not_null": True, + "primary_key": True, + "unique": True + }, + "user_name": { + "type": "INT", + "not_null": True, + "primary_key": False, + "unique": False, + }, + } + """ + + logging.info(f"Attempting to drop duplicates for {table_name} table.") + fields = schema.keys() + comparison_clauses = [] + for field in fields: + comparison_clauses.append(f"a.{field} = b.{field}") + query = ( + f"DELETE FROM {table_name} a USING (SELECT MIN(ctid) as ctid, {', '.join(fields)} " + f"FROM {table_name} GROUP BY {', '.join(fields)} HAVING COUNT(*) > 1) b WHERE " + f"{' AND '.join(comparison_clauses)} AND a.ctid <> b.ctid;" + ) + logging.info(query) + self._execute_query(query) + logging.info(f"Attempting to drop duplicates for {table_name} table complete.") From 584952f866b32364bfa7691ba8532908bf3e4b88 Mon Sep 17 00:00:00 2001 From: dirksteynberg Date: Tue, 22 Mar 2022 13:28:35 +0200 Subject: [PATCH 2/3] Adding postgres writing functionality --- turbo_stream/google_analyitcs/reader.py | 35 ++++++++++++-------- turbo_stream/google_search_console/reader.py | 9 ++++- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/turbo_stream/google_analyitcs/reader.py b/turbo_stream/google_analyitcs/reader.py index faad37a..39ed24c 100644 --- a/turbo_stream/google_analyitcs/reader.py +++ b/turbo_stream/google_analyitcs/reader.py @@ -26,11 +26,11 @@ class GoogleAnalyticsReader(ReaderInterface): """ def __init__( - self, - configuration: dict, - credentials: str, - service_account_email: str, - **kwargs, + self, + configuration: dict, + credentials: str, + service_account_email: str, + **kwargs, ): super().__init__(configuration, credentials) @@ -156,8 +156,6 @@ def _iterate_report(self, reports, view_id): for key, value in row_dict.items(): new_row_dict[key.replace("ga:", "")] = value - print(new_row_dict) - self._data_set.append(new_row_dict) def run_query(self): @@ -201,15 +199,23 @@ def run_query(self): return self._data_set def write_data_to_postgresql( - self, credentials: dict, table_name: str, truncate_on_insert=False + self, + credentials: dict, + table_name: str, + truncate_on_insert: bool = False, + deduplicate: bool = False, ): _writer = _PostgreSQLWriter(credentials=credentials) - _dimensions = [dim.replace("ga:", "") for dim in self._configuration.get("dimensions", [])] - _metrics = [met.replace("ga:", "") for met in self._configuration.get("metrics", [])] + _dimensions = [ + dim.replace("ga:", "") for dim in self._configuration.get("dimensions", []) + ] + _metrics = [ + met.replace("ga:", "") for met in self._configuration.get("metrics", []) + ] _schema = { - 'viewId': { + "viewId": { "type": "VARCHAR", "not_null": True, } @@ -227,12 +233,13 @@ def write_data_to_postgresql( "not_null": True, } - _writer._create_table( - table_name=table_name, schema=_schema - ) + _writer._create_table(table_name=table_name, schema=_schema) _writer._insert_table( table_name=table_name, dataset=self._data_set, truncate_on_insert=truncate_on_insert, ) + + if deduplicate: + _writer._drop_duplicates(table_name=table_name, schema=_schema) diff --git a/turbo_stream/google_search_console/reader.py b/turbo_stream/google_search_console/reader.py index a3bd0e4..64b8e35 100644 --- a/turbo_stream/google_search_console/reader.py +++ b/turbo_stream/google_search_console/reader.py @@ -220,7 +220,11 @@ def write_partition_data_to_s3( ) def write_data_to_postgresql( - self, credentials: dict, table_name: str, truncate_on_insert=False + self, + credentials: dict, + table_name: str, + truncate_on_insert: bool = False, + deduplicate: bool = False, ): _writer = _PostgreSQLWriter(credentials=credentials) _dimensions = self._configuration.get("dimensions", []) @@ -267,3 +271,6 @@ def write_data_to_postgresql( dataset=_dimension_dataset, truncate_on_insert=truncate_on_insert, ) + + if deduplicate: + _writer._drop_duplicates(table_name=table_name, schema=_schema) From 74db32375bdb9b16d5f63c91ebabd731749e7aa5 Mon Sep 17 00:00:00 2001 From: dirksteynberg Date: Tue, 22 Mar 2022 13:55:14 +0200 Subject: [PATCH 3/3] Adding postgres writing functionality --- environments/postgres-database-docker-compose.yml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 environments/postgres-database-docker-compose.yml diff --git a/environments/postgres-database-docker-compose.yml b/environments/postgres-database-docker-compose.yml new file mode 100644 index 0000000..18bf680 --- /dev/null +++ b/environments/postgres-database-docker-compose.yml @@ -0,0 +1,15 @@ +version: '3.8' +services: + db: + image: postgres:14.1-alpine + restart: always + environment: + - POSTGRES_USER=postgres + - POSTGRES_PASSWORD=postgres + ports: + - '5432:5432' + volumes: + - db:/var/lib/postgresql/data +volumes: + db: + driver: local \ No newline at end of file