Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ is expected to be used by Storage library maintainers.
- [Delete a Retry Test resource](#delete-a-retry-test-resource)
- [Causing a failure using x-retry-test-id header](#causing-a-failure-using-x-retry-test-id-header)
- [Forced Failures Supported](#forced-failures-supported)
- [Storage Control API Stall Support](#storage-control-api-stall-support)
- [Developing for the testbench](#developing-for-the-testbench)
- [Writing and running tests](#writing-and-running-tests)
- [Releasing the testbench](#releasing-the-testbench)
Expand Down Expand Up @@ -277,11 +278,42 @@ curl -H "x-retry-test-id: 1d05c20627844214a9ff7cbcf696317d" "http://localhost:91
| redirect-send-token-T | [HTTP] Unsupported [GRPC] Testbench will fail the RPC with `ABORTED` and include appropriate redirection error details.
| redirect-send-handle-and-token-T | [HTTP] Unsupported [GRPC] Testbench will fail the RPC with `ABORTED` and include appropriate redirection error details.

## Storage Control API Stall Support

The testbench supports stall functionality for the Storage Control API (gRPC only) to test client retry behavior. All folder operations and storage layout operations can be delayed using the `x-goog-emulator-instructions` metadata header.

**Supported operations:**
- **Folder operations:** `CreateFolder`, `DeleteFolder`, `GetFolder`, `ListFolders`, `RenameFolder`
- **Storage layout operations:** `GetStorageLayout`

> **Note:** The Storage Control API uses the **same gRPC server** as the Storage API. Both services are available on the same port (e.g., port 8888 if started with `curl "http://localhost:9000/start_grpc?port=8888"`).

**Supported stall instruction:**
- `stall-for-Ns`: Stalls for N seconds (e.g., `stall-for-3s` stalls for 3 seconds)

**Example usage in Python:**
```python
import grpc
from google.storage.control.v2 import storage_control_pb2, storage_control_pb2_grpc

# Connect to the same gRPC server port (8888) started earlier
channel = grpc.insecure_channel('localhost:8888')
stub = storage_control_pb2_grpc.StorageControlStub(channel)

# Create folder with 2-second stall
metadata = [('x-goog-emulator-instructions', 'stall-for-2s')]
request = storage_control_pb2.CreateFolderRequest(
parent="projects/_/buckets/test-bucket",
folder_id="test-folder"
)
response = stub.CreateFolder(request, metadata=metadata)
```

## Developing for the testbench

### Writing and running tests

Tests are located in the `tests/` directory. To run the tests locally, use
Tests are located in the `tests/` directory. To run the tests locally, use

```bash
python -m unittest [test_module.py] # runs all the tests in test_module.py
Expand All @@ -299,4 +331,4 @@ Steps:
1. Title "v0.x.x"
1. Click Generate release notes
1. Make sure "Set as the latest release" is checked
1. Click "Publish Release" to release
1. Click "Publish Release" to release
418 changes: 418 additions & 0 deletions google/storage/control/v2/storage_control_pb2.py

Large diffs are not rendered by default.

1,275 changes: 1,275 additions & 0 deletions google/storage/control/v2/storage_control_pb2_grpc.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
"Operating System :: OS Independent",
],
packages=[
"google",
"google/storage",
"google/storage/control",
"google/storage/control/v2",
"google/storage/v2",
"google/iam/v1",
"testbench",
Expand Down
63 changes: 62 additions & 1 deletion testbench/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
retry_tests,
supported_methods,
soft_deleted_objects,
folders=None,
):
self._resources_lock = threading.RLock()
self._buckets = buckets
Expand All @@ -59,9 +60,12 @@ def __init__(
self._projects_lock = threading.RLock()
self._projects = {}

self._folders_lock = threading.RLock()
self._folders = folders if folders is not None else {}

@classmethod
def init(cls):
return cls({}, {}, {}, {}, {}, {}, [], {})
return cls({}, {}, {}, {}, {}, {}, [], {}, {})

def clear(self):
"""Clear all data except for the supported method list."""
Expand All @@ -76,6 +80,8 @@ def clear(self):
self._rewrites = {}
with self._retry_tests_lock:
self._retry_tests = {}
with self._folders_lock:
self._folders = {}
# The list of supported methods for `retry_test` is defined via flask
# decorators, it should remain unchanged after the test or application
# is initialized. Arguably this means it should be in a global variable.
Expand Down Expand Up @@ -790,3 +796,58 @@ def delete_retry_test(self, retry_test_id):
with self._retry_tests_lock:
self.get_retry_test(retry_test_id)
del self._retry_tests[retry_test_id]

# === FOLDER OPERATIONS === #

def insert_folder(self, folder_name, folder, context):
"""Insert a folder into the database."""
with self._folders_lock:
if folder_name in self._folders:
testbench.error.already_exists(
"Folder %s already exists" % folder_name, context
)
self._folders[folder_name] = folder
return folder

def get_folder(self, folder_name, context):
"""Get a folder from the database."""
with self._folders_lock:
folder = self._folders.get(folder_name)
if folder is None:
testbench.error.notfound("Folder %s" % folder_name, context)
return folder

def delete_folder(self, folder_name, context):
"""Delete a folder from the database."""
with self._folders_lock:
if folder_name not in self._folders:
testbench.error.notfound("Folder %s" % folder_name, context)
del self._folders[folder_name]

def list_folders(self, bucket_name, prefix, context):
"""List folders in a bucket with optional prefix filter."""
with self._folders_lock:
folders = []
for folder_name, folder in self._folders.items():
# Filter by bucket
if not folder_name.startswith(bucket_name):
continue
# Filter by prefix if provided
if prefix and not folder_name.startswith(f"{bucket_name}/{prefix}"):
continue
folders.append(folder)
return folders

def rename_folder(self, src_folder_name, dst_folder_name, context):
"""Rename a folder."""
with self._folders_lock:
if src_folder_name not in self._folders:
testbench.error.notfound("Source folder %s" % src_folder_name, context)
if dst_folder_name in self._folders:
testbench.error.already_exists(
"Destination folder %s already exists" % dst_folder_name, context
)
folder = self._folders[src_folder_name]
del self._folders[src_folder_name]
self._folders[dst_folder_name] = folder
return folder
110 changes: 110 additions & 0 deletions testbench/grpc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import gcs
import testbench
from google.iam.v1 import iam_policy_pb2
from google.storage.control.v2 import storage_control_pb2, storage_control_pb2_grpc
from google.storage.v2 import storage_pb2, storage_pb2_grpc

_GRPC_SERVER_THREAD_COUNT = 2
Expand Down Expand Up @@ -1163,13 +1164,122 @@ def QueryWriteStatus(self, request, context):
return storage_pb2.QueryWriteStatusResponse(persisted_size=len(upload.media))


# === STORAGE CONTROL SERVICER === #


@decorate_all_rpc_methods
class StorageControlServicer(storage_control_pb2_grpc.StorageControlServicer):
"""Implements the google.storage.control.v2.StorageControl gRPC service."""

def __init__(self, db, echo_metadata=False):
self.db = db
self.db.insert_test_bucket()
self.echo_metadata = echo_metadata

def _apply_stall(self, context):
"""Check for stall instructions and apply delay if needed."""
import time
Comment thread
gargnitingoogle marked this conversation as resolved.

instruction = testbench.common.extract_instruction(None, context)
if instruction and "stall" in instruction:
# Parse stall instruction (e.g., "stall-for-1s" or "stall-for-500ms")
if instruction.startswith("stall-for-"):
# Check for milliseconds.
match_ms = re.match(r"stall-for-(\d+)ms", instruction)
if match_ms:
time.sleep(int(match_ms.group(1)) / 1000.0)
return
# Check for seconds.
match_s = re.match(r"stall-for-(\d+)s", instruction)
if match_s:
time.sleep(int(match_s.group(1)))

@retry_test(method="storage.folders.create")
def CreateFolder(self, request, context):
self._apply_stall(context)
# Create a simple folder metadata
folder = storage_control_pb2.Folder()
# The name should include the full path
folder.name = f"{request.parent}/folders/{request.folder_id}"
folder.metageneration = 1
folder.create_time.FromDatetime(datetime.datetime.now(datetime.timezone.utc))
folder.update_time.CopyFrom(folder.create_time)

# Store in database using full name as key
self.db.insert_folder(folder.name, folder, context)
return folder

@retry_test(method="storage.folders.delete")
def DeleteFolder(self, request, context):
self._apply_stall(context)
folder_key = request.name
self.db.delete_folder(folder_key, context)
return empty_pb2.Empty()

@retry_test(method="storage.folders.get")
def GetFolder(self, request, context):
self._apply_stall(context)
folder_key = request.name
return self.db.get_folder(folder_key, context)

@retry_test(method="storage.folders.list")
def ListFolders(self, request, context):
self._apply_stall(context)
# Extract bucket from parent (format: "projects/_/buckets/{bucket}")
bucket_name = request.parent
prefix = request.prefix if hasattr(request, "prefix") else ""
Comment thread
gargnitingoogle marked this conversation as resolved.

folders = self.db.list_folders(bucket_name, prefix, context)
return storage_control_pb2.ListFoldersResponse(folders=folders)

@retry_test(method="storage.folders.rename")
def RenameFolder(self, request, context):
self._apply_stall(context)
src_folder = request.name
dst_folder = request.destination_folder_id

folder = self.db.rename_folder(src_folder, dst_folder, context)
return folder

@retry_test(method="storage.storageLayout.get")
def GetStorageLayout(self, request, context):
self._apply_stall(context)

# Extract bucket path from request.name which is "projects/_/buckets/bucket_name/storageLayout"
bucket_path = request.name.replace("/storageLayout", "")
bucket = self.db.get_bucket(bucket_path, context)

if bucket is None:
return None
Comment thread
gargnitingoogle marked this conversation as resolved.

# Create a simple storage layout response
layout = storage_control_pb2.StorageLayout()
layout.name = request.name

# Set default location and location_type
layout.location = bucket.metadata.location if bucket.metadata.location else "US"
layout.location_type = "multi-region"
# Set hierarchical namespace enabled flag based on bucket metadata
if (
bucket.metadata.hierarchical_namespace
and bucket.metadata.hierarchical_namespace.enabled
):
layout.hierarchical_namespace.enabled = True
else:
layout.hierarchical_namespace.enabled = False
return layout


def run(port, database, echo_metadata=False):
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=_GRPC_SERVER_THREAD_COUNT)
)
storage_pb2_grpc.add_StorageServicer_to_server(
StorageServicer(database, echo_metadata), server
)
storage_control_pb2_grpc.add_StorageControlServicer_to_server(
StorageControlServicer(database, echo_metadata), server
)
port = server.add_insecure_port("0.0.0.0:%d" % port)
server.start()
return port, server
Loading
Loading