From 210b892e7df2ffcdba7977757d8a101bada3ee6c Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Wed, 30 Jul 2025 10:54:56 +0200 Subject: [PATCH 1/8] Fix wrong escape sequence on newer python versions Regex patterns should always be raw strings. --- source/ftrack_api/query.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/ftrack_api/query.py b/source/ftrack_api/query.py index 2443cf08..442f2098 100644 --- a/source/ftrack_api/query.py +++ b/source/ftrack_api/query.py @@ -10,8 +10,8 @@ class QueryResult(collections.abc.Sequence): """Results from a query.""" - OFFSET_EXPRESSION = re.compile("(?Poffset (?P\d+))") - LIMIT_EXPRESSION = re.compile("(?Plimit (?P\d+))") + OFFSET_EXPRESSION = re.compile(r"(?Poffset (?P\d+))") + LIMIT_EXPRESSION = re.compile(r"(?Plimit (?P\d+))") def __init__(self, session, expression, page_size=500): """Initialise result set. From 39599575b01f24a64cf82b1edde430754bcb57de Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Wed, 30 Jul 2025 12:57:28 +0200 Subject: [PATCH 2/8] Add lock decorator to critical methods --- source/ftrack_api/session.py | 65 +++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/source/ftrack_api/session.py b/source/ftrack_api/session.py index 71b38f3f..ae9eaf40 100644 --- a/source/ftrack_api/session.py +++ b/source/ftrack_api/session.py @@ -54,6 +54,13 @@ from weakref import WeakMethod +def synchronous(func): + """ Decorator to synchronize access to a method or function.""" + lock = threading.RLock() + def wrapper(*args, **kwargs): + with lock: + return func(*args, **kwargs) + return wrapper class SessionAuthentication(requests.auth.AuthBase): """Attach ftrack session authentication information to requests.""" @@ -241,10 +248,6 @@ def __init__( if cache is not None: self.cache.caches.append(cache) - # Lock used for making sure only one thread at a time - # merges into the cache, updates or creates entities. - self.merge_lock = threading.RLock() - self._managed_request = None self._request = requests.Session() @@ -895,46 +898,46 @@ def merge(self, value, merged=None): with self.operation_recording(False): return self._merge(value, merged) + @synchronous def _merge(self, value, merged): """Return merged *value*.""" log_debug = self.logger.isEnabledFor(logging.DEBUG) - with self.merge_lock: - if isinstance(value, ftrack_api.entity.base.Entity): - log_debug and self.logger.debug( - "Merging entity into session: {0} at {1}".format(value, id(value)) - ) + if isinstance(value, ftrack_api.entity.base.Entity): + log_debug and self.logger.debug( + "Merging entity into session: {0} at {1}".format(value, id(value)) + ) - return self._merge_entity(value, merged=merged) + return self._merge_entity(value, merged=merged) - elif isinstance(value, ftrack_api.collection.Collection): - log_debug and self.logger.debug( - "Merging collection into session: {0!r} at {1}".format( - value, id(value) - ) + elif isinstance(value, ftrack_api.collection.Collection): + log_debug and self.logger.debug( + "Merging collection into session: {0!r} at {1}".format( + value, id(value) ) + ) - merged_collection = [] - for entry in value: - merged_collection.append(self._merge(entry, merged=merged)) + merged_collection = [] + for entry in value: + merged_collection.append(self._merge(entry, merged=merged)) - return merged_collection + return merged_collection - elif isinstance(value, ftrack_api.collection.MappedCollectionProxy): - log_debug and self.logger.debug( - "Merging mapped collection into session: {0!r} at {1}".format( - value, id(value) - ) + elif isinstance(value, ftrack_api.collection.MappedCollectionProxy): + log_debug and self.logger.debug( + "Merging mapped collection into session: {0!r} at {1}".format( + value, id(value) ) + ) - merged_collection = [] - for entry in value.collection: - merged_collection.append(self._merge(entry, merged=merged)) + merged_collection = [] + for entry in value.collection: + merged_collection.append(self._merge(entry, merged=merged)) - return merged_collection + return merged_collection - else: - return value + else: + return value def _merge_recursive(self, entity, merged=None): """Merge *entity* and all its attributes recursivly.""" @@ -1151,6 +1154,7 @@ def populate(self, entities, projections): # repeated calls or perhaps raise an error? # TODO: Make atomic. + @synchronous def commit(self): """Commit all local changes to the server.""" batch = [] @@ -1331,6 +1335,7 @@ def commit(self): for entity in list(self._local_cache.values()): entity.clear() + @synchronous def rollback(self): """Clear all recorded operations and local state. From b87dd22b0e1aa71f96db815c3e0bce36357bac61 Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Wed, 30 Jul 2025 15:07:56 +0200 Subject: [PATCH 3/8] Fix formatting --- source/ftrack_api/session.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/ftrack_api/session.py b/source/ftrack_api/session.py index ae9eaf40..4ea88f22 100644 --- a/source/ftrack_api/session.py +++ b/source/ftrack_api/session.py @@ -54,14 +54,18 @@ from weakref import WeakMethod + def synchronous(func): - """ Decorator to synchronize access to a method or function.""" + """Decorator to synchronize access to a method or function.""" lock = threading.RLock() + def wrapper(*args, **kwargs): with lock: return func(*args, **kwargs) + return wrapper + class SessionAuthentication(requests.auth.AuthBase): """Attach ftrack session authentication information to requests.""" @@ -912,9 +916,7 @@ def _merge(self, value, merged): elif isinstance(value, ftrack_api.collection.Collection): log_debug and self.logger.debug( - "Merging collection into session: {0!r} at {1}".format( - value, id(value) - ) + "Merging collection into session: {0!r} at {1}".format(value, id(value)) ) merged_collection = [] From 39c3675433c7710826f419554d5ee36d00292722 Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Thu, 31 Jul 2025 12:44:46 +0200 Subject: [PATCH 4/8] Minor changes to docstring --- pyproject.toml | 2 +- source/ftrack_api/session.py | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8bd20e08..df11bb2e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "ftrack-python-api" -version = "0.1.0" +version = "3.0.5.rc1" description = "Python API for ftrack." authors = ["ftrack "] repository = "https://github.com/ftrackhq/ftrack-python" diff --git a/source/ftrack_api/session.py b/source/ftrack_api/session.py index 4ea88f22..6feddd08 100644 --- a/source/ftrack_api/session.py +++ b/source/ftrack_api/session.py @@ -55,8 +55,8 @@ from weakref import WeakMethod -def synchronous(func): - """Decorator to synchronize access to a method or function.""" +def _synchronous(func): + """Decorator to synchronize access to a method or function across threads.""" lock = threading.RLock() def wrapper(*args, **kwargs): @@ -902,7 +902,7 @@ def merge(self, value, merged=None): with self.operation_recording(False): return self._merge(value, merged) - @synchronous + @_synchronous def _merge(self, value, merged): """Return merged *value*.""" log_debug = self.logger.isEnabledFor(logging.DEBUG) @@ -1155,8 +1155,7 @@ def populate(self, entities, projections): # actually populated? If some weren't would we mark that to avoid # repeated calls or perhaps raise an error? - # TODO: Make atomic. - @synchronous + @_synchronous def commit(self): """Commit all local changes to the server.""" batch = [] @@ -1337,7 +1336,7 @@ def commit(self): for entity in list(self._local_cache.values()): entity.clear() - @synchronous + @_synchronous def rollback(self): """Clear all recorded operations and local state. From 06d2b844f7e3cbb01ace0b9e63961b3a49e11f9e Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Thu, 31 Jul 2025 16:12:51 +0200 Subject: [PATCH 5/8] Revert to using a context manager for locking The implementation as a decorator had problems with different sessions getting out of sync and not committing the same data, effectively nulling any database changes. --- source/ftrack_api/session.py | 424 ++++++++++++++++++----------------- 1 file changed, 213 insertions(+), 211 deletions(-) diff --git a/source/ftrack_api/session.py b/source/ftrack_api/session.py index 6feddd08..8fc67d38 100644 --- a/source/ftrack_api/session.py +++ b/source/ftrack_api/session.py @@ -55,17 +55,6 @@ from weakref import WeakMethod -def _synchronous(func): - """Decorator to synchronize access to a method or function across threads.""" - lock = threading.RLock() - - def wrapper(*args, **kwargs): - with lock: - return func(*args, **kwargs) - - return wrapper - - class SessionAuthentication(requests.auth.AuthBase): """Attach ftrack session authentication information to requests.""" @@ -189,6 +178,7 @@ def __init__( super(Session, self).__init__() self.logger = logging.getLogger(__name__ + "." + self.__class__.__name__) self._closed = False + self._thread_lock = threading.RLock() if server_url is None: server_url = os.environ.get("FTRACK_SERVER") @@ -902,44 +892,46 @@ def merge(self, value, merged=None): with self.operation_recording(False): return self._merge(value, merged) - @_synchronous def _merge(self, value, merged): """Return merged *value*.""" log_debug = self.logger.isEnabledFor(logging.DEBUG) - if isinstance(value, ftrack_api.entity.base.Entity): - log_debug and self.logger.debug( - "Merging entity into session: {0} at {1}".format(value, id(value)) - ) + with self._thread_lock: + if isinstance(value, ftrack_api.entity.base.Entity): + log_debug and self.logger.debug( + "Merging entity into session: {0} at {1}".format(value, id(value)) + ) - return self._merge_entity(value, merged=merged) + return self._merge_entity(value, merged=merged) - elif isinstance(value, ftrack_api.collection.Collection): - log_debug and self.logger.debug( - "Merging collection into session: {0!r} at {1}".format(value, id(value)) - ) + elif isinstance(value, ftrack_api.collection.Collection): + log_debug and self.logger.debug( + "Merging collection into session: {0!r} at {1}".format( + value, id(value) + ) + ) - merged_collection = [] - for entry in value: - merged_collection.append(self._merge(entry, merged=merged)) + merged_collection = [] + for entry in value: + merged_collection.append(self._merge(entry, merged=merged)) - return merged_collection + return merged_collection - elif isinstance(value, ftrack_api.collection.MappedCollectionProxy): - log_debug and self.logger.debug( - "Merging mapped collection into session: {0!r} at {1}".format( - value, id(value) + elif isinstance(value, ftrack_api.collection.MappedCollectionProxy): + log_debug and self.logger.debug( + "Merging mapped collection into session: {0!r} at {1}".format( + value, id(value) + ) ) - ) - merged_collection = [] - for entry in value.collection: - merged_collection.append(self._merge(entry, merged=merged)) + merged_collection = [] + for entry in value.collection: + merged_collection.append(self._merge(entry, merged=merged)) - return merged_collection + return merged_collection - else: - return value + else: + return value def _merge_recursive(self, entity, merged=None): """Merge *entity* and all its attributes recursivly.""" @@ -1155,188 +1147,197 @@ def populate(self, entities, projections): # actually populated? If some weren't would we mark that to avoid # repeated calls or perhaps raise an error? - @_synchronous def commit(self): """Commit all local changes to the server.""" batch = [] - with self.auto_populating(False): - for operation in self.recorded_operations: - # Convert operation to payload. - if isinstance(operation, ftrack_api.operation.CreateEntityOperation): - # At present, data payload requires duplicating entity - # type in data and also ensuring primary key added. - entity_data = { - "__entity_type__": operation.entity_type, - } - entity_data.update(operation.entity_key) - entity_data.update(operation.entity_data) - - payload = OperationPayload( - { - "action": "create", - "entity_type": operation.entity_type, - "entity_key": list(operation.entity_key.values()), - "entity_data": entity_data, - } - ) - - elif isinstance(operation, ftrack_api.operation.UpdateEntityOperation): - entity_data = { + with self._thread_lock: + with self.auto_populating(False): + for operation in self.recorded_operations: + # Convert operation to payload. + if isinstance( + operation, ftrack_api.operation.CreateEntityOperation + ): # At present, data payload requires duplicating entity - # type. - "__entity_type__": operation.entity_type, - operation.attribute_name: operation.new_value, - } - - payload = OperationPayload( - { - "action": "update", - "entity_type": operation.entity_type, - "entity_key": list(operation.entity_key.values()), - "entity_data": entity_data, + # type in data and also ensuring primary key added. + entity_data = { + "__entity_type__": operation.entity_type, } - ) + entity_data.update(operation.entity_key) + entity_data.update(operation.entity_data) + + payload = OperationPayload( + { + "action": "create", + "entity_type": operation.entity_type, + "entity_key": list(operation.entity_key.values()), + "entity_data": entity_data, + } + ) - elif isinstance(operation, ftrack_api.operation.DeleteEntityOperation): - payload = OperationPayload( - { - "action": "delete", - "entity_type": operation.entity_type, - "entity_key": list(operation.entity_key.values()), + elif isinstance( + operation, ftrack_api.operation.UpdateEntityOperation + ): + entity_data = { + # At present, data payload requires duplicating entity + # type. + "__entity_type__": operation.entity_type, + operation.attribute_name: operation.new_value, } - ) - else: - raise ValueError( - "Cannot commit. Unrecognised operation type {0} " - "detected.".format(type(operation)) - ) + payload = OperationPayload( + { + "action": "update", + "entity_type": operation.entity_type, + "entity_key": list(operation.entity_key.values()), + "entity_data": entity_data, + } + ) - batch.append(payload) + elif isinstance( + operation, ftrack_api.operation.DeleteEntityOperation + ): + payload = OperationPayload( + { + "action": "delete", + "entity_type": operation.entity_type, + "entity_key": list(operation.entity_key.values()), + } + ) - # Optimise batch. - # TODO: Might be better to perform these on the operations list instead - # so all operation contextual information available. + else: + raise ValueError( + "Cannot commit. Unrecognised operation type {0} " + "detected.".format(type(operation)) + ) - # If entity was created and deleted in one batch then remove all - # payloads for that entity. - created = set() - deleted = set() + batch.append(payload) - for payload in batch: - if payload["action"] == "create": - created.add((payload["entity_type"], str(payload["entity_key"]))) + # Optimise batch. + # TODO: Might be better to perform these on the operations list instead + # so all operation contextual information available. - elif payload["action"] == "delete": - deleted.add((payload["entity_type"], str(payload["entity_key"]))) + # If entity was created and deleted in one batch then remove all + # payloads for that entity. + created = set() + deleted = set() - created_then_deleted = deleted.intersection(created) - if created_then_deleted: - optimised_batch = [] for payload in batch: - entity_type = payload.get("entity_type") - entity_key = str(payload.get("entity_key")) + if payload["action"] == "create": + created.add((payload["entity_type"], str(payload["entity_key"]))) - if (entity_type, entity_key) in created_then_deleted: - continue + elif payload["action"] == "delete": + deleted.add((payload["entity_type"], str(payload["entity_key"]))) - optimised_batch.append(payload) - - batch = optimised_batch + created_then_deleted = deleted.intersection(created) + if created_then_deleted: + optimised_batch = [] + for payload in batch: + entity_type = payload.get("entity_type") + entity_key = str(payload.get("entity_key")) - # Remove early update operations so that only last operation on - # attribute is applied server side. - updates_map = set() - for payload in reversed(batch): - if payload["action"] in ("update",): - for key, value in list(payload["entity_data"].items()): - if key == "__entity_type__": + if (entity_type, entity_key) in created_then_deleted: continue - identity = (payload["entity_type"], str(payload["entity_key"]), key) - if identity in updates_map: - del payload["entity_data"][key] - else: - updates_map.add(identity) - - # Remove NOT_SET values from entity_data. - for payload in batch: - entity_data = payload.get("entity_data", {}) - for key, value in list(entity_data.items()): - if value is ftrack_api.symbol.NOT_SET: - del entity_data[key] - - # Remove payloads with redundant entity_data. - optimised_batch = [] - for payload in batch: - entity_data = payload.get("entity_data") - if entity_data is not None: - keys = list(entity_data.keys()) - if not keys or keys == ["__entity_type__"]: - continue + optimised_batch.append(payload) - optimised_batch.append(payload) + batch = optimised_batch - batch = optimised_batch + # Remove early update operations so that only last operation on + # attribute is applied server side. + updates_map = set() + for payload in reversed(batch): + if payload["action"] in ("update",): + for key, value in list(payload["entity_data"].items()): + if key == "__entity_type__": + continue - # Collapse updates that are consecutive into one payload. Also, collapse - # updates that occur immediately after creation into the create payload. - optimised_batch = [] - previous_payload = None + identity = ( + payload["entity_type"], + str(payload["entity_key"]), + key, + ) + if identity in updates_map: + del payload["entity_data"][key] + else: + updates_map.add(identity) - for payload in batch: - if ( - previous_payload is not None - and payload["action"] == "update" - and previous_payload["action"] in ("create", "update") - and previous_payload["entity_type"] == payload["entity_type"] - and previous_payload["entity_key"] == payload["entity_key"] - ): - previous_payload["entity_data"].update(payload["entity_data"]) - continue + # Remove NOT_SET values from entity_data. + for payload in batch: + entity_data = payload.get("entity_data", {}) + for key, value in list(entity_data.items()): + if value is ftrack_api.symbol.NOT_SET: + del entity_data[key] + + # Remove payloads with redundant entity_data. + optimised_batch = [] + for payload in batch: + entity_data = payload.get("entity_data") + if entity_data is not None: + keys = list(entity_data.keys()) + if not keys or keys == ["__entity_type__"]: + continue - else: optimised_batch.append(payload) - previous_payload = payload - batch = optimised_batch + batch = optimised_batch - # Process batch. - if batch: - result = self.call(batch) + # Collapse updates that are consecutive into one payload. Also, collapse + # updates that occur immediately after creation into the create payload. + optimised_batch = [] + previous_payload = None - # Clear recorded operations. - self.recorded_operations.clear() + for payload in batch: + if ( + previous_payload is not None + and payload["action"] == "update" + and previous_payload["action"] in ("create", "update") + and previous_payload["entity_type"] == payload["entity_type"] + and previous_payload["entity_key"] == payload["entity_key"] + ): + previous_payload["entity_data"].update(payload["entity_data"]) + continue - # As optimisation, clear local values which are not primary keys to - # avoid redundant merges when merging references. Note: primary keys - # remain as needed for cache retrieval on new entities. - with self.auto_populating(False): - with self.operation_recording(False): - for entity in list(self._local_cache.values()): - for attribute in entity: - if attribute not in entity.primary_key_attributes: - del entity[attribute] - - # Process results merging into cache relevant data. - for entry in result: - if entry["action"] in ("create", "update"): - # Merge returned entities into local cache. - self.merge(entry["data"]) - - elif entry["action"] == "delete": - # TODO: Detach entity - need identity returned? - # TODO: Expunge entity from cache. - pass - # Clear remaining local state, including local values for primary - # keys on entities that were merged. - with self.auto_populating(False): - with self.operation_recording(False): - for entity in list(self._local_cache.values()): - entity.clear() + else: + optimised_batch.append(payload) + previous_payload = payload + + batch = optimised_batch + + # Process batch. + if batch: + result = self.call(batch) + + # Clear recorded operations. + self.recorded_operations.clear() + + # As optimisation, clear local values which are not primary keys to + # avoid redundant merges when merging references. Note: primary keys + # remain as needed for cache retrieval on new entities. + with self.auto_populating(False): + with self.operation_recording(False): + for entity in list(self._local_cache.values()): + for attribute in entity: + if attribute not in entity.primary_key_attributes: + del entity[attribute] + + # Process results merging into cache relevant data. + for entry in result: + if entry["action"] in ("create", "update"): + # Merge returned entities into local cache. + self.merge(entry["data"]) + + elif entry["action"] == "delete": + # TODO: Detach entity - need identity returned? + # TODO: Expunge entity from cache. + pass + # Clear remaining local state, including local values for primary + # keys on entities that were merged. + with self.auto_populating(False): + with self.operation_recording(False): + for entity in list(self._local_cache.values()): + entity.clear() - @_synchronous def rollback(self): """Clear all recorded operations and local state. @@ -1349,34 +1350,35 @@ def rollback(self): doing so could cause errors. """ - with self.auto_populating(False): - with self.operation_recording(False): - # Detach all newly created entities and remove from cache. This - # is done because simply clearing the local values of newly - # created entities would result in entities with no identity as - # primary key was local while not persisted. In addition, it - # makes no sense for failed created entities to exist in session - # or cache. - for operation in self.recorded_operations: - if isinstance( - operation, ftrack_api.operation.CreateEntityOperation - ): - entity_key = str( - ( - str(operation.entity_type), - list(operation.entity_key.values()), + with self._thread_lock: + with self.auto_populating(False): + with self.operation_recording(False): + # Detach all newly created entities and remove from cache. This + # is done because simply clearing the local values of newly + # created entities would result in entities with no identity as + # primary key was local while not persisted. In addition, it + # makes no sense for failed created entities to exist in session + # or cache. + for operation in self.recorded_operations: + if isinstance( + operation, ftrack_api.operation.CreateEntityOperation + ): + entity_key = str( + ( + str(operation.entity_type), + list(operation.entity_key.values()), + ) ) - ) - try: - self.cache.remove(entity_key) - except KeyError: - pass + try: + self.cache.remove(entity_key) + except KeyError: + pass - # Clear locally stored modifications on remaining entities. - for entity in list(self._local_cache.values()): - entity.clear() + # Clear locally stored modifications on remaining entities. + for entity in list(self._local_cache.values()): + entity.clear() - self.recorded_operations.clear() + self.recorded_operations.clear() def _fetch_server_information(self): """Return server information.""" From f1e2d8eb881d40a956d478b50d16ec68ee8426f2 Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Thu, 31 Jul 2025 16:17:49 +0200 Subject: [PATCH 6/8] Move lock to original location in init --- source/ftrack_api/session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/ftrack_api/session.py b/source/ftrack_api/session.py index 8fc67d38..3e28da58 100644 --- a/source/ftrack_api/session.py +++ b/source/ftrack_api/session.py @@ -178,7 +178,6 @@ def __init__( super(Session, self).__init__() self.logger = logging.getLogger(__name__ + "." + self.__class__.__name__) self._closed = False - self._thread_lock = threading.RLock() if server_url is None: server_url = os.environ.get("FTRACK_SERVER") @@ -242,6 +241,7 @@ def __init__( if cache is not None: self.cache.caches.append(cache) + self._thread_lock = threading.RLock() self._managed_request = None self._request = requests.Session() From f9106a7d1bf88bd1e336c4fe466a731d7489c381 Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Mon, 4 Aug 2025 11:30:44 +0200 Subject: [PATCH 7/8] Use modern context manager syntax --- source/ftrack_api/session.py | 358 +++++++++++++++++------------------ 1 file changed, 174 insertions(+), 184 deletions(-) diff --git a/source/ftrack_api/session.py b/source/ftrack_api/session.py index 3e28da58..4204de37 100644 --- a/source/ftrack_api/session.py +++ b/source/ftrack_api/session.py @@ -1151,192 +1151,183 @@ def commit(self): """Commit all local changes to the server.""" batch = [] - with self._thread_lock: - with self.auto_populating(False): - for operation in self.recorded_operations: - # Convert operation to payload. - if isinstance( - operation, ftrack_api.operation.CreateEntityOperation - ): + with self._thread_lock, self.auto_populating(False): + for operation in self.recorded_operations: + # Convert operation to payload. + if isinstance(operation, ftrack_api.operation.CreateEntityOperation): + # At present, data payload requires duplicating entity + # type in data and also ensuring primary key added. + entity_data = { + "__entity_type__": operation.entity_type, + } + entity_data.update(operation.entity_key) + entity_data.update(operation.entity_data) + + payload = OperationPayload( + { + "action": "create", + "entity_type": operation.entity_type, + "entity_key": list(operation.entity_key.values()), + "entity_data": entity_data, + } + ) + + elif isinstance(operation, ftrack_api.operation.UpdateEntityOperation): + entity_data = { # At present, data payload requires duplicating entity - # type in data and also ensuring primary key added. - entity_data = { - "__entity_type__": operation.entity_type, + # type. + "__entity_type__": operation.entity_type, + operation.attribute_name: operation.new_value, + } + + payload = OperationPayload( + { + "action": "update", + "entity_type": operation.entity_type, + "entity_key": list(operation.entity_key.values()), + "entity_data": entity_data, } - entity_data.update(operation.entity_key) - entity_data.update(operation.entity_data) - - payload = OperationPayload( - { - "action": "create", - "entity_type": operation.entity_type, - "entity_key": list(operation.entity_key.values()), - "entity_data": entity_data, - } - ) + ) - elif isinstance( - operation, ftrack_api.operation.UpdateEntityOperation - ): - entity_data = { - # At present, data payload requires duplicating entity - # type. - "__entity_type__": operation.entity_type, - operation.attribute_name: operation.new_value, + elif isinstance(operation, ftrack_api.operation.DeleteEntityOperation): + payload = OperationPayload( + { + "action": "delete", + "entity_type": operation.entity_type, + "entity_key": list(operation.entity_key.values()), } + ) - payload = OperationPayload( - { - "action": "update", - "entity_type": operation.entity_type, - "entity_key": list(operation.entity_key.values()), - "entity_data": entity_data, - } - ) + else: + raise ValueError( + "Cannot commit. Unrecognised operation type {0} " + "detected.".format(type(operation)) + ) - elif isinstance( - operation, ftrack_api.operation.DeleteEntityOperation - ): - payload = OperationPayload( - { - "action": "delete", - "entity_type": operation.entity_type, - "entity_key": list(operation.entity_key.values()), - } - ) + batch.append(payload) - else: - raise ValueError( - "Cannot commit. Unrecognised operation type {0} " - "detected.".format(type(operation)) - ) + # Optimise batch. + # TODO: Might be better to perform these on the operations list instead + # so all operation contextual information available. - batch.append(payload) + # If entity was created and deleted in one batch then remove all + # payloads for that entity. + created = set() + deleted = set() - # Optimise batch. - # TODO: Might be better to perform these on the operations list instead - # so all operation contextual information available. + for payload in batch: + if payload["action"] == "create": + created.add((payload["entity_type"], str(payload["entity_key"]))) - # If entity was created and deleted in one batch then remove all - # payloads for that entity. - created = set() - deleted = set() + elif payload["action"] == "delete": + deleted.add((payload["entity_type"], str(payload["entity_key"]))) + created_then_deleted = deleted.intersection(created) + if created_then_deleted: + optimised_batch = [] for payload in batch: - if payload["action"] == "create": - created.add((payload["entity_type"], str(payload["entity_key"]))) + entity_type = payload.get("entity_type") + entity_key = str(payload.get("entity_key")) - elif payload["action"] == "delete": - deleted.add((payload["entity_type"], str(payload["entity_key"]))) + if (entity_type, entity_key) in created_then_deleted: + continue - created_then_deleted = deleted.intersection(created) - if created_then_deleted: - optimised_batch = [] - for payload in batch: - entity_type = payload.get("entity_type") - entity_key = str(payload.get("entity_key")) + optimised_batch.append(payload) - if (entity_type, entity_key) in created_then_deleted: - continue + batch = optimised_batch - optimised_batch.append(payload) + # Remove early update operations so that only last operation on + # attribute is applied server side. + updates_map = set() + for payload in reversed(batch): + if payload["action"] in ("update",): + for key, value in list(payload["entity_data"].items()): + if key == "__entity_type__": + continue - batch = optimised_batch + identity = ( + payload["entity_type"], + str(payload["entity_key"]), + key, + ) + if identity in updates_map: + del payload["entity_data"][key] + else: + updates_map.add(identity) + + # Remove NOT_SET values from entity_data. + for payload in batch: + entity_data = payload.get("entity_data", {}) + for key, value in list(entity_data.items()): + if value is ftrack_api.symbol.NOT_SET: + del entity_data[key] + + # Remove payloads with redundant entity_data. + optimised_batch = [] + for payload in batch: + entity_data = payload.get("entity_data") + if entity_data is not None: + keys = list(entity_data.keys()) + if not keys or keys == ["__entity_type__"]: + continue - # Remove early update operations so that only last operation on - # attribute is applied server side. - updates_map = set() - for payload in reversed(batch): - if payload["action"] in ("update",): - for key, value in list(payload["entity_data"].items()): - if key == "__entity_type__": - continue + optimised_batch.append(payload) - identity = ( - payload["entity_type"], - str(payload["entity_key"]), - key, - ) - if identity in updates_map: - del payload["entity_data"][key] - else: - updates_map.add(identity) + batch = optimised_batch - # Remove NOT_SET values from entity_data. - for payload in batch: - entity_data = payload.get("entity_data", {}) - for key, value in list(entity_data.items()): - if value is ftrack_api.symbol.NOT_SET: - del entity_data[key] + # Collapse updates that are consecutive into one payload. Also, collapse + # updates that occur immediately after creation into the create payload. + optimised_batch = [] + previous_payload = None - # Remove payloads with redundant entity_data. - optimised_batch = [] - for payload in batch: - entity_data = payload.get("entity_data") - if entity_data is not None: - keys = list(entity_data.keys()) - if not keys or keys == ["__entity_type__"]: - continue + for payload in batch: + if ( + previous_payload is not None + and payload["action"] == "update" + and previous_payload["action"] in ("create", "update") + and previous_payload["entity_type"] == payload["entity_type"] + and previous_payload["entity_key"] == payload["entity_key"] + ): + previous_payload["entity_data"].update(payload["entity_data"]) + continue + else: optimised_batch.append(payload) + previous_payload = payload - batch = optimised_batch + batch = optimised_batch - # Collapse updates that are consecutive into one payload. Also, collapse - # updates that occur immediately after creation into the create payload. - optimised_batch = [] - previous_payload = None + # Process batch. + if batch: + result = self.call(batch) - for payload in batch: - if ( - previous_payload is not None - and payload["action"] == "update" - and previous_payload["action"] in ("create", "update") - and previous_payload["entity_type"] == payload["entity_type"] - and previous_payload["entity_key"] == payload["entity_key"] - ): - previous_payload["entity_data"].update(payload["entity_data"]) - continue - - else: - optimised_batch.append(payload) - previous_payload = payload - - batch = optimised_batch + # Clear recorded operations. + self.recorded_operations.clear() - # Process batch. - if batch: - result = self.call(batch) - - # Clear recorded operations. - self.recorded_operations.clear() - - # As optimisation, clear local values which are not primary keys to - # avoid redundant merges when merging references. Note: primary keys - # remain as needed for cache retrieval on new entities. - with self.auto_populating(False): - with self.operation_recording(False): - for entity in list(self._local_cache.values()): - for attribute in entity: - if attribute not in entity.primary_key_attributes: - del entity[attribute] - - # Process results merging into cache relevant data. - for entry in result: - if entry["action"] in ("create", "update"): - # Merge returned entities into local cache. - self.merge(entry["data"]) - - elif entry["action"] == "delete": - # TODO: Detach entity - need identity returned? - # TODO: Expunge entity from cache. - pass - # Clear remaining local state, including local values for primary - # keys on entities that were merged. - with self.auto_populating(False): - with self.operation_recording(False): - for entity in list(self._local_cache.values()): - entity.clear() + # As optimisation, clear local values which are not primary keys to + # avoid redundant merges when merging references. Note: primary keys + # remain as needed for cache retrieval on new entities. + with self.auto_populating(False), self.operation_recording(False): + for entity in list(self._local_cache.values()): + for attribute in entity: + if attribute not in entity.primary_key_attributes: + del entity[attribute] + + # Process results merging into cache relevant data. + for entry in result: + if entry["action"] in ("create", "update"): + # Merge returned entities into local cache. + self.merge(entry["data"]) + + elif entry["action"] == "delete": + # TODO: Detach entity - need identity returned? + # TODO: Expunge entity from cache. + pass + # Clear remaining local state, including local values for primary + # keys on entities that were merged. + with self.auto_populating(False), self.operation_recording(False): + for entity in list(self._local_cache.values()): + entity.clear() def rollback(self): """Clear all recorded operations and local state. @@ -1351,32 +1342,31 @@ def rollback(self): """ with self._thread_lock: - with self.auto_populating(False): - with self.operation_recording(False): - # Detach all newly created entities and remove from cache. This - # is done because simply clearing the local values of newly - # created entities would result in entities with no identity as - # primary key was local while not persisted. In addition, it - # makes no sense for failed created entities to exist in session - # or cache. - for operation in self.recorded_operations: - if isinstance( - operation, ftrack_api.operation.CreateEntityOperation - ): - entity_key = str( - ( - str(operation.entity_type), - list(operation.entity_key.values()), - ) + with self.auto_populating(False), self.operation_recording(False): + # Detach all newly created entities and remove from cache. This + # is done because simply clearing the local values of newly + # created entities would result in entities with no identity as + # primary key was local while not persisted. In addition, it + # makes no sense for failed created entities to exist in session + # or cache. + for operation in self.recorded_operations: + if isinstance( + operation, ftrack_api.operation.CreateEntityOperation + ): + entity_key = str( + ( + str(operation.entity_type), + list(operation.entity_key.values()), ) - try: - self.cache.remove(entity_key) - except KeyError: - pass - - # Clear locally stored modifications on remaining entities. - for entity in list(self._local_cache.values()): - entity.clear() + ) + try: + self.cache.remove(entity_key) + except KeyError: + pass + + # Clear locally stored modifications on remaining entities. + for entity in list(self._local_cache.values()): + entity.clear() self.recorded_operations.clear() From 7fbc6f3f559523f42f4772e77e8062169ecd12a1 Mon Sep 17 00:00:00 2001 From: Dennis Weil Date: Mon, 4 Aug 2025 15:36:35 +0200 Subject: [PATCH 8/8] Revert project version to default --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index df11bb2e..8bd20e08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "ftrack-python-api" -version = "3.0.5.rc1" +version = "0.1.0" description = "Python API for ftrack." authors = ["ftrack "] repository = "https://github.com/ftrackhq/ftrack-python"