diff --git a/gcs/upload.py b/gcs/upload.py index b1985e8c..7bc5025b 100644 --- a/gcs/upload.py +++ b/gcs/upload.py @@ -18,6 +18,7 @@ import hashlib import itertools import json +import threading import types import uuid @@ -58,6 +59,7 @@ def init(cls, request, metadata, bucket, location, upload_id): media=b"", complete=False, transfer=set(), + lock=threading.Lock(), ) @classmethod @@ -585,7 +587,8 @@ def update_upload_checksums(upload_metadata, object_checksums): # Currently, the testbench will always checkpoint and flush data for testing purposes, # instead of the 15 seconds interval used in the GCS server. # TODO(#592): Refactor testbench checkpointing to more closely follow GCS server behavior. - upload.media += content + with upload.lock: + upload.media += content # Update appendable blob size and media here, as part of #720. # TODO(#720): (a) Update crc and update_time in object metadata. @@ -595,9 +598,11 @@ def update_upload_checksums(upload_metadata, object_checksums): update_upload_checksums(upload.metadata, object_checksums) def update_appendable_blob(blob, unused_generation): - blob.media = upload.media - blob.metadata.size = len(upload.media) - blob.metadata.checksums.crc32c = crc32c.crc32c(upload.media) + with upload.lock: + snapshot = bytes(upload.media) + blob.media = snapshot + blob.metadata.size = len(snapshot) + blob.metadata.checksums.crc32c = crc32c.crc32c(snapshot) return blob blob = db.do_update_object( @@ -632,11 +637,13 @@ def update_appendable_blob(blob, unused_generation): if is_appendable: def finalize_blob(blob, unused_generation): - blob.media = upload.media + with upload.lock: + snapshot = bytes(upload.media) + blob.media = snapshot blob.metadata.finalize_time.FromDatetime( datetime.datetime.now(datetime.timezone.utc) ) - blob.metadata.size = len(upload.media) + blob.metadata.size = len(snapshot) blob.upload = None blob.upload_gen = 0 return blob