From 951439b9a352c00ad01cf0beecd180746e4745f6 Mon Sep 17 00:00:00 2001 From: Minsu O Date: Fri, 28 Aug 2020 16:23:07 -0700 Subject: [PATCH 1/6] test 4.1 --- databuilder/extractor/neo4j_extractor.py | 5 +++-- databuilder/publisher/neo4j_csv_publisher.py | 13 +++++++------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/databuilder/extractor/neo4j_extractor.py b/databuilder/extractor/neo4j_extractor.py index c0ae42442..892ef6c93 100644 --- a/databuilder/extractor/neo4j_extractor.py +++ b/databuilder/extractor/neo4j_extractor.py @@ -67,12 +67,13 @@ def _get_driver(self) -> Any: trust = neo4j.TRUST_SYSTEM_CA_SIGNED_CERTIFICATES if self.conf.get_bool(Neo4jExtractor.NEO4J_VALIDATE_SSL) \ else neo4j.TRUST_ALL_CERTIFICATES return GraphDatabase.driver(self.graph_url, - max_connection_life_time=self.conf.get_int( + max_connection_lifetime=self.conf.get_int( Neo4jExtractor.NEO4J_MAX_CONN_LIFE_TIME_SEC), auth=(self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_USER), self.conf.get_string(Neo4jExtractor.NEO4J_AUTH_PW)), encrypted=self.conf.get_bool(Neo4jExtractor.NEO4J_ENCRYPTED), - trust=trust) + #trust=trust + ) def _execute_query(self, tx: Any) -> Any: """ diff --git a/databuilder/publisher/neo4j_csv_publisher.py b/databuilder/publisher/neo4j_csv_publisher.py index 8095b5c32..dd726690d 100644 --- a/databuilder/publisher/neo4j_csv_publisher.py +++ b/databuilder/publisher/neo4j_csv_publisher.py @@ -13,7 +13,7 @@ from neo4j import GraphDatabase, Transaction import neo4j -from neo4j.exceptions import CypherError +from neo4j.exceptions import Neo4jError from pyhocon import ConfigFactory from pyhocon import ConfigTree from typing import Set, List @@ -145,10 +145,11 @@ def init(self, conf: ConfigTree) -> None: else neo4j.TRUST_ALL_CERTIFICATES self._driver = \ GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY), - max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC), + max_connection_lifetime=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC), auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)), encrypted=conf.get_bool(NEO4J_ENCRYPTED), - trust=trust) + #trust=trust + ) self._transaction_size = conf.get_int(NEO4J_TRANSCATION_SIZE) self._session = self._driver.session() self._confirm_rel_created = conf.get_bool(NEO4J_RELATIONSHIP_CREATION_CONFIRM) @@ -408,7 +409,7 @@ def _create_props_body(self, def _execute_statement(self, stmt: str, tx: Transaction, - params: bool=None, + params: dict=None, expect_result: bool=False) -> Transaction: """ Executes statement against Neo4j. If execution fails, it rollsback and raise exception. @@ -423,7 +424,7 @@ def _execute_statement(self, if LOGGER.isEnabledFor(logging.DEBUG): LOGGER.debug('Executing statement: {} with params {}'.format(stmt, params)) - result = tx.run(str(stmt).encode('utf-8', 'ignore'), parameters=params) + result = tx.run(str(stmt), parameters=params) if expect_result and not result.single(): raise RuntimeError('Failed to executed statement: {}'.format(stmt)) @@ -456,7 +457,7 @@ def _try_create_index(self, label: str) -> None: with self._driver.session() as session: try: session.run(stmt) - except CypherError as e: + except Neo4jError as e: if 'An equivalent constraint already exists' not in e.__str__(): raise # Else, swallow the exception, to make this function idempotent. From f99537b19e14d81cf37f625feff37ced1064e4d4 Mon Sep 17 00:00:00 2001 From: Minsu O Date: Sat, 29 Aug 2020 08:24:37 -0700 Subject: [PATCH 2/6] testing 4.1.1 --- requirements.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index cb2b42f26..f2e477361 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,8 +47,8 @@ pyparsing==2.2.0 six>=1.11.0,<2.0.0 sqlalchemy>=1.3.0,<2.0 wheel==0.31.1 -neo4j-driver==1.7.2 -neotime==1.7.1 + + mypy==0.782 pytz==2018.4 statsd==3.2.1 @@ -59,3 +59,4 @@ httplib2>=0.18.0 unidecode requests==2.23.0,<3.0 +neo4j==4.1.1 From 40b00b9cb44334d9f8e75010ea59352e20bafac7 Mon Sep 17 00:00:00 2001 From: Minsu O Date: Sat, 29 Aug 2020 08:45:02 -0700 Subject: [PATCH 3/6] 4.1.1 --- requirements.txt | 5 ++--- setup.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index f2e477361..cb2b42f26 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,8 +47,8 @@ pyparsing==2.2.0 six>=1.11.0,<2.0.0 sqlalchemy>=1.3.0,<2.0 wheel==0.31.1 - - +neo4j-driver==1.7.2 +neotime==1.7.1 mypy==0.782 pytz==2018.4 statsd==3.2.1 @@ -59,4 +59,3 @@ httplib2>=0.18.0 unidecode requests==2.23.0,<3.0 -neo4j==4.1.1 diff --git a/setup.py b/setup.py index e5396ffc3..f45ece076 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ requirements = [ - "neo4j-driver>=1.7.2,<4.0", + "neo4j==4.1.1", "pytz>=2018.4", "statsd>=3.2.1", "retrying>=1.3.3", From 3017cfc2d3f94d217a68f7d6719f2dbd0d335e05 Mon Sep 17 00:00:00 2001 From: Minsu O Date: Mon, 31 Aug 2020 13:43:06 -0700 Subject: [PATCH 4/6] return result set as list, possibly cause oom so maybe leave as iterator and update receivers --- databuilder/extractor/neo4j_extractor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/databuilder/extractor/neo4j_extractor.py b/databuilder/extractor/neo4j_extractor.py index 892ef6c93..f149f25f1 100644 --- a/databuilder/extractor/neo4j_extractor.py +++ b/databuilder/extractor/neo4j_extractor.py @@ -81,7 +81,8 @@ def _execute_query(self, tx: Any) -> Any: """ LOGGER.info('Executing query {}'.format(self.cypher_query)) result = tx.run(self.cypher_query) - return result + #return result + return [record for record in result] def _get_extract_iter(self) -> Iterator[Any]: """ From 945b22c61f6f0877975fb5e73abbf31d623ed45b Mon Sep 17 00:00:00 2001 From: Minsu O Date: Wed, 2 Sep 2020 01:50:48 -0700 Subject: [PATCH 5/6] update param 4.1 --- databuilder/task/neo4j_staleness_removal_task.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/databuilder/task/neo4j_staleness_removal_task.py b/databuilder/task/neo4j_staleness_removal_task.py index dc1512716..1337855f7 100644 --- a/databuilder/task/neo4j_staleness_removal_task.py +++ b/databuilder/task/neo4j_staleness_removal_task.py @@ -95,10 +95,11 @@ def init(self, conf: ConfigTree) -> None: else neo4j.TRUST_ALL_CERTIFICATES self._driver = \ GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY), - max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC), + max_connection_lifetime=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC), auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)), encrypted=conf.get_bool(NEO4J_ENCRYPTED), - trust=trust) + #trust=trust + ) def run(self) -> None: """ From e606f88d4c309efbc244b9625eecdbbf952c0b7e Mon Sep 17 00:00:00 2001 From: Minsu O Date: Sat, 21 Nov 2020 19:27:00 -0800 Subject: [PATCH 6/6] update removal task --- databuilder/task/neo4j_staleness_removal_task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/databuilder/task/neo4j_staleness_removal_task.py b/databuilder/task/neo4j_staleness_removal_task.py index 1337855f7..0ac858270 100644 --- a/databuilder/task/neo4j_staleness_removal_task.py +++ b/databuilder/task/neo4j_staleness_removal_task.py @@ -266,7 +266,8 @@ def _execute_cypher_query(self, start = time.time() try: with self._driver.session() as session: - return session.run(statement, **param_dict) + result = session.run(statement, **param_dict) + return [record for record in result] finally: if LOGGER.isEnabledFor(logging.DEBUG):