diff --git a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml index 3a8920a67b35..130a81d23571 100644 --- a/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml +++ b/sdk/storage/azure-storage-blob-stress/scenarios-matrix.yaml @@ -108,6 +108,180 @@ matrix: durationMin: 60 imageBuildDir: "../../.." + # content validation downloads using BlobDownloadStreamOptions with CRC64 validation + cvdownloadstreamsm: + testScenario: contentvalidationdownloadstream + sync: true + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadStreamOptions with CRC64 validation and async client + cvdownloadstreamasyncsm: + testScenario: contentvalidationdownloadstream + sync: false + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadStreamOptions with CRC64 validation and large payload + cvdownloadstreamlg: + testScenario: contentvalidationdownloadstream + sync: true + sizeBytes: "52428800" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadStreamOptions with CRC64 validation, async client, and large payload + cvdownloadstreamasynclg: + testScenario: contentvalidationdownloadstream + sync: false + sizeBytes: "52428800" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadStreamOptions with AUTO validation + cvdownloadstreamauto: + testScenario: contentvalidationdownloadstream + sync: true + sizeBytes: "10485760" + contentValidationAlgorithm: AUTO + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadContentOptions with CRC64 validation + cvdownloadcontentsm: + testScenario: contentvalidationdownloadcontent + sync: true + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadContentOptions with CRC64 validation and async client + cvdownloadcontentasyncsm: + testScenario: contentvalidationdownloadcontent + sync: false + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadContentOptions with CRC64 validation and large payload + cvdownloadcontentlg: + testScenario: contentvalidationdownloadcontent + sync: true + sizeBytes: "52428800" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadContentOptions with CRC64 validation, async client, and large payload + cvdownloadcontentasynclg: + testScenario: contentvalidationdownloadcontent + sync: false + sizeBytes: "52428800" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadContentOptions with AUTO validation + cvdownloadcontentauto: + testScenario: contentvalidationdownloadcontent + sync: true + sizeBytes: "10485760" + contentValidationAlgorithm: AUTO + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadToFileOptions with CRC64 validation + cvdownloadfilesm: + testScenario: contentvalidationdownloadtofile + sync: true + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadToFileOptions with CRC64 validation and async client + cvdownloadfileasyncsm: + testScenario: contentvalidationdownloadtofile + sync: false + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadToFileOptions with CRC64 validation and multi-block payload + cvdownloadfilemd: + testScenario: contentvalidationdownloadtofile + sync: true + sizeBytes: "16777216" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadToFileOptions with CRC64 validation, async client, and multi-block payload + cvdownloadfileasyncmd: + testScenario: contentvalidationdownloadtofile + sync: false + sizeBytes: "16777216" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + # content validation downloads using BlobDownloadToFileOptions with AUTO validation + cvdownloadfileauto: + testScenario: contentvalidationdownloadtofile + sync: true + sizeBytes: "10485760" + contentValidationAlgorithm: AUTO + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobInputStreamOptions with CRC64 validation + cvinputstreamsm: + testScenario: contentvalidationopeninputstream + sync: true + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobInputStreamOptions with CRC64 validation and large payload + cvinputstreamlg: + testScenario: contentvalidationopeninputstream + sync: true + sizeBytes: "52428800" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + + # content validation downloads using BlobSeekableByteChannelReadOptions with CRC64 validation + cvbytechannelreadsm: + testScenario: contentvalidationopenseekablebytechannelread + sync: true + sizeBytes: 1024 + downloadFaults: true + durationMin: 25 + imageBuildDir: "../../.." + + # content validation downloads using BlobSeekableByteChannelReadOptions with CRC64 validation and large payload + cvbytechannelreadlg: + testScenario: contentvalidationopenseekablebytechannelread + sync: true + sizeBytes: "52428800" + downloadFaults: true + durationMin: 60 + imageBuildDir: "../../.." + # this test uploads 1KB (1024 bytes) to append blob, no chunking appendblocksmall: testScenario: appendblock diff --git a/sdk/storage/azure-storage-blob-stress/scripts/fault-injector.sh b/sdk/storage/azure-storage-blob-stress/scripts/fault-injector.sh index ed834fc131d6..2d6b39967ab1 100644 --- a/sdk/storage/azure-storage-blob-stress/scripts/fault-injector.sh +++ b/sdk/storage/azure-storage-blob-stress/scripts/fault-injector.sh @@ -1,4 +1,4 @@ #!/bin/sh set -ex; -dotnet dev-certs https --export-path /mnt/outputs/dev-cert.pfx; +dotnet dev-certs https --export-path /mnt/outputs/dev-cert.crt --format PEM --no-password; /root/.dotnet/tools/http-fault-injector; diff --git a/sdk/storage/azure-storage-blob-stress/scripts/stress-run.sh b/sdk/storage/azure-storage-blob-stress/scripts/stress-run.sh index 20a7669c46e9..7604926a6782 100644 --- a/sdk/storage/azure-storage-blob-stress/scripts/stress-run.sh +++ b/sdk/storage/azure-storage-blob-stress/scripts/stress-run.sh @@ -1,4 +1,14 @@ #!/bin/sh set -ex; set -exa; -keytool -import -alias test -file /mnt/outputs/dev-cert.pfx -keystore ${JAVA_HOME}/lib/security/cacerts -noprompt -keypass changeit -storepass changeit; +attempts=0; +while [ ! -s /mnt/outputs/dev-cert.crt ]; do + attempts=$((attempts + 1)); + if [ "$attempts" -gt 60 ]; then + echo "Timed out waiting for fault injector certificate" >&2; + exit 1; + fi; + sleep 1; +done; +keytool -delete -alias HttpFaultInject -keystore "${JAVA_HOME}/lib/security/cacerts" -storepass changeit >/dev/null 2>&1 || true; +keytool -importcert -trustcacerts -alias HttpFaultInject -file /mnt/outputs/dev-cert.crt -keystore "${JAVA_HOME}/lib/security/cacerts" -noprompt -storepass changeit; diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java index e38bd16791ca..d562bbe51363 100644 --- a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/App.java @@ -15,6 +15,11 @@ public static void main(String[] args) { BlockBlobOutputStream.class, BlockBlobUpload.class, CommitBlockList.class, + ContentValidationDownloadContent.class, + ContentValidationDownloadStream.class, + ContentValidationDownloadToFile.class, + ContentValidationOpenInputStream.class, + ContentValidationOpenSeekableByteChannelRead.class, DownloadToFile.class, DownloadStream.class, DownloadContent.class, diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDecoderStressOptions.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDecoderStressOptions.java new file mode 100644 index 000000000000..bc16742ac621 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDecoderStressOptions.java @@ -0,0 +1,26 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.storage.common.ContentValidationAlgorithm; +import com.azure.storage.stress.StorageStressOptions; +import com.beust.jcommander.Parameter; + +/** + * Options for stress scenarios that enable transactional response content validation on downloads + * (CRC64 / structured message). See {@link com.azure.storage.blob.BlobContentValidationDownloadTests}. + */ +public class ContentValidationDecoderStressOptions extends StorageStressOptions { + /** + * Response content validation behavior for download APIs. Use CRC64 or AUTO to exercise content validation. + * NONE disables response validation. + */ + @Parameter(names = { "--contentValidationAlgorithm" }, + description = "CRC64 (default), AUTO, or NONE") + private ContentValidationAlgorithm contentValidationAlgorithm = ContentValidationAlgorithm.CRC64; + + public ContentValidationAlgorithm getContentValidationAlgorithm() { + return contentValidationAlgorithm; + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadContent.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadContent.java new file mode 100644 index 000000000000..07c549475e33 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadContent.java @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.http.HttpHeaderName; +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlobDownloadContentOptions; +import com.azure.storage.blob.options.BlobDownloadStreamOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import reactor.core.publisher.Mono; + +/** + * Download content with + * {@link BlobDownloadContentOptions#setContentValidationAlgorithm} enabled. + * Verifies the correctness of the download response content via CRC. + */ +public class ContentValidationDownloadContent extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationDownloadContent(ContentValidationDecoderStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) { + originalContent.checkMatch( + syncClient.downloadContentWithResponse( + new BlobDownloadContentOptions() + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + null, span).getValue(), + span).block(); + } + + @Override + protected Mono runInternalAsync(Context span) { + // TODO return downloadContent once it stops buffering. + return asyncClient.downloadStreamWithResponse( + new BlobDownloadStreamOptions() + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .flatMap(response -> { + long contentLength = Long.valueOf(response.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH)); + return BinaryData.fromFlux(response.getValue(), contentLength, false); + }) + .flatMap(bd -> originalContent.checkMatch(bd, span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadStream.java new file mode 100644 index 000000000000..1fcaf549657a --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadStream.java @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlobDownloadStreamOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcOutputStream; +import reactor.core.publisher.Mono; + +import java.io.IOException; + +/** + * Streaming blob download with + * {@link BlobDownloadStreamOptions#setContentValidationAlgorithm} enabled. + * Verifies the correctness of the download response content via CRC. + */ +public class ContentValidationDownloadStream extends BlobScenarioBase { + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationDownloadStream(ContentValidationDecoderStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) throws IOException { + try (CrcOutputStream outputStream = new CrcOutputStream()) { + syncClient.downloadStreamWithResponse(outputStream, + new BlobDownloadStreamOptions() + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + null, span); + outputStream.close(); + originalContent.checkMatch(outputStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return asyncClient.downloadStreamWithResponse( + new BlobDownloadStreamOptions() + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .flatMap(response -> originalContent.checkMatch(response.getValue(), span)); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadToFile.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadToFile.java new file mode 100644 index 000000000000..da52199b1339 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationDownloadToFile.java @@ -0,0 +1,104 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.BinaryData; +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlobDownloadToFileOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.common.ParallelTransferOptions; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.UUID; + +/** + * Download to file with + * {@link BlobDownloadToFileOptions#setContentValidationAlgorithm} enabled. + * Verifies the correctness of the download response content via CRC. + */ +public class ContentValidationDownloadToFile extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationDownloadToFile.class); + private final Path directoryPath; + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncClient; + private final BlobAsyncClient asyncNoFaultClient; + private final ParallelTransferOptions parallelTransferOptions; + + public ContentValidationDownloadToFile(ContentValidationDecoderStressOptions options) { + super(options); + this.directoryPath = getTempPath("test"); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncClient = getAsyncContainerClient().getBlobAsyncClient(blobName); + this.parallelTransferOptions = new ParallelTransferOptions() + .setMaxConcurrency(options.getMaxConcurrency()); + } + + @Override + protected void runInternal(Context span) { + Path downloadPath = directoryPath.resolve(UUID.randomUUID() + ".txt"); + BlobDownloadToFileOptions blobOptions = new BlobDownloadToFileOptions(downloadPath.toString()) + .setParallelTransferOptions(parallelTransferOptions) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()); + + try { + syncClient.downloadToFileWithResponse(blobOptions, Duration.ofSeconds(options.getDuration()), span); + originalContent.checkMatch(BinaryData.fromFile(downloadPath), span).block(); + } finally { + deleteFile(downloadPath); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return Mono.using( + () -> directoryPath.resolve(UUID.randomUUID() + ".txt"), + path -> asyncClient.downloadToFileWithResponse( + new BlobDownloadToFileOptions(path.toString()) + .setParallelTransferOptions(parallelTransferOptions) + .setContentValidationAlgorithm(options.getContentValidationAlgorithm())) + .flatMap(ignored -> originalContent.checkMatch(BinaryData.fromFile(path), span)), + ContentValidationDownloadToFile::deleteFile); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } + + private Path getTempPath(String prefix) { + try { + return Files.createTempDirectory(prefix); + } catch (IOException e) { + throw LOGGER.logExceptionAsError(new UncheckedIOException(e)); + } + } + + private static void deleteFile(Path path) { + try { + Files.deleteIfExists(path); + } catch (Throwable e) { + LOGGER.atError() + .addKeyValue("path", path) + .log("failed to delete file", e); + } + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationOpenInputStream.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationOpenInputStream.java new file mode 100644 index 000000000000..5d282c7e9c7a --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationOpenInputStream.java @@ -0,0 +1,69 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.options.BlobInputStreamOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.io.InputStream; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Open input stream with {@link BlobInputStreamOptions#setContentValidationAlgorithm} enabled (sync only). + * Verifies the correctness of the download response content via CRC. + */ +public class ContentValidationOpenInputStream extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationOpenInputStream.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationOpenInputStream(ContentValidationDecoderStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + } + + @Override + protected void runInternal(Context span) throws IOException { + try (InputStream stream = syncClient.openInputStream( + new BlobInputStreamOptions() + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + span)) { + try (CrcInputStream crcStream = new CrcInputStream(stream)) { + byte[] buffer = new byte[8192]; + while (crcStream.read(buffer) != -1) { + // do nothing + } + originalContent.checkMatch(crcStream.getContentInfo(), span).block(); + } + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, new RuntimeException("openInputStream() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationOpenSeekableByteChannelRead.java b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationOpenSeekableByteChannelRead.java new file mode 100644 index 000000000000..8de4aefd8832 --- /dev/null +++ b/sdk/storage/azure-storage-blob-stress/src/main/java/com/azure/storage/blob/stress/ContentValidationOpenSeekableByteChannelRead.java @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.stress; + +import com.azure.core.util.Context; +import com.azure.core.util.logging.ClientLogger; +import com.azure.storage.blob.BlobAsyncClient; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.models.BlobSeekableByteChannelReadResult; +import com.azure.storage.blob.options.BlobSeekableByteChannelReadOptions; +import com.azure.storage.blob.stress.utils.OriginalContent; +import com.azure.storage.stress.CrcInputStream; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.nio.channels.Channels; + +import static com.azure.core.util.FluxUtil.monoError; + +/** + * Seekable byte channel read with {@link BlobSeekableByteChannelReadOptions#setContentValidationAlgorithm} + * enabled (sync only). + * Verifies the correctness of the download response content via CRC. + */ +public class ContentValidationOpenSeekableByteChannelRead + extends BlobScenarioBase { + private static final ClientLogger LOGGER = new ClientLogger(ContentValidationOpenSeekableByteChannelRead.class); + private final OriginalContent originalContent = new OriginalContent(); + private final BlobClient syncClient; + private final BlobAsyncClient asyncNoFaultClient; + + public ContentValidationOpenSeekableByteChannelRead(ContentValidationDecoderStressOptions options) { + super(options); + String blobName = generateBlobName(); + this.asyncNoFaultClient = getAsyncContainerClientNoFault().getBlobAsyncClient(blobName); + this.syncClient = getSyncContainerClient().getBlobClient(blobName); + } + + @Override + protected void runInternal(Context span) throws IOException { + BlobSeekableByteChannelReadResult result = syncClient.openSeekableByteChannelRead( + new BlobSeekableByteChannelReadOptions() + .setContentValidationAlgorithm(options.getContentValidationAlgorithm()), + span); + try (CrcInputStream crcStream = new CrcInputStream(Channels.newInputStream(result.getChannel()))) { + byte[] buffer = new byte[8192]; + while (crcStream.read(buffer) != -1) { + // do nothing + } + originalContent.checkMatch(crcStream.getContentInfo(), span).block(); + } + } + + @Override + protected Mono runInternalAsync(Context span) { + return monoError(LOGGER, + new RuntimeException("openSeekableByteChannelRead() does not exist on the async client")); + } + + @Override + public Mono setupAsync() { + return super.setupAsync() + .then(originalContent.setupBlob(asyncNoFaultClient, options.getSize())); + } + + @Override + public Mono cleanupAsync() { + return asyncNoFaultClient.deleteIfExists() + .then(super.cleanupAsync()); + } +} diff --git a/sdk/storage/azure-storage-blob-stress/templates/stress-test-job.yaml b/sdk/storage/azure-storage-blob-stress/templates/stress-test-job.yaml index 2a22302d24ed..31c44b03cc0e 100644 --- a/sdk/storage/azure-storage-blob-stress/templates/stress-test-job.yaml +++ b/sdk/storage/azure-storage-blob-stress/templates/stress-test-job.yaml @@ -16,7 +16,7 @@ spec: args: - | set -ex; - dotnet dev-certs https --export-path /mnt/outputs/dev-cert.pfx; + dotnet dev-certs https --export-path /mnt/outputs/dev-cert.crt --format PEM --no-password; /root/.dotnet/tools/http-fault-injector; resources: limits: @@ -30,7 +30,17 @@ spec: - | set -xa; set -o pipefail; - keytool -import -alias test -file /mnt/outputs/dev-cert.pfx -keystore ${JAVA_HOME}/lib/security/cacerts -noprompt -keypass changeit -storepass changeit; + attempts=0; + while [ ! -s /mnt/outputs/dev-cert.crt ]; do + attempts=$((attempts + 1)); + if [ "$attempts" -gt 60 ]; then + echo "Timed out waiting for fault injector certificate" >&2; + exit 1; + fi; + sleep 1; + done; + keytool -delete -alias HttpFaultInject -keystore "${JAVA_HOME}/lib/security/cacerts" -storepass changeit >/dev/null 2>&1 || true; + keytool -importcert -trustcacerts -alias HttpFaultInject -file /mnt/outputs/dev-cert.crt -keystore "${JAVA_HOME}/lib/security/cacerts" -noprompt -storepass changeit || exit 1; mkdir -p "$DEBUG_SHARE"; . /mnt/outputs/.env; export AZURE_HTTP_CLIENT_IMPLEMENTATION=com.azure.core.http.netty.NettyAsyncHttpClientProvider; @@ -53,6 +63,7 @@ spec: {{ ternary "--sync" "" .Stress.sync }} \ {{ ternary "--downloadFaults" "" (default false .Stress.downloadFaults) }} \ {{ ternary "--uploadFaults" "" (default false .Stress.uploadFaults) }} \ + {{ with .Stress.contentValidationAlgorithm }}--contentValidationAlgorithm {{ . }}{{ end }} \ --warmup 0 \ 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.testScenario }}-`date +%s`.log"; code=$?; diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/StorageSeekableByteChannelBlobReadBehavior.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/StorageSeekableByteChannelBlobReadBehavior.java index 840dd6dda6be..47ef361690ff 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/StorageSeekableByteChannelBlobReadBehavior.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/StorageSeekableByteChannelBlobReadBehavior.java @@ -11,6 +11,7 @@ import com.azure.storage.blob.models.BlobRange; import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.models.DownloadRetryOptions; import com.azure.storage.common.implementation.StorageSeekableByteChannel; import java.io.IOException; @@ -75,7 +76,7 @@ public int read(ByteBuffer dst, long sourceOffset) throws IOException { try (ByteBufferBackedOutputStreamUtil dstStream = new ByteBufferBackedOutputStreamUtil(dst)) { BlobDownloadResponse response = client.downloadStreamWithResponse(dstStream, new BlobRange(sourceOffset, (long) dst.remaining()), - null /*downloadRetryOptions*/, requestConditions, false, null, null); + new DownloadRetryOptions(), requestConditions, false, null, null); resourceLength = CoreUtils.extractSizeFromContentRange(response.getDeserializedHeaders().getContentRange()); return dst.position() - initialPosition; } catch (BlobStorageException e) { @@ -89,6 +90,11 @@ public int read(ByteBuffer dst, long sourceOffset) throws IOException { return sourceOffset < resourceLength ? 0 : -1; } throw LOGGER.logExceptionAsError(e); + } catch (RuntimeException e) { + if (resourceLength > 0 && sourceOffset >= resourceLength && e.getCause() instanceof IOException) { + return -1; + } + throw e; } } diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/StorageSeekableByteChannelBlobReadBehaviorUnitTests.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/StorageSeekableByteChannelBlobReadBehaviorUnitTests.java new file mode 100644 index 000000000000..f20e6cc9190c --- /dev/null +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/StorageSeekableByteChannelBlobReadBehaviorUnitTests.java @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.blob.specialized; + +import com.azure.core.http.HttpHeaders; +import com.azure.storage.blob.models.BlobDownloadAsyncResponse; +import com.azure.storage.blob.models.BlobDownloadHeaders; +import com.azure.storage.blob.models.BlobDownloadResponse; +import com.azure.storage.blob.models.DownloadRetryOptions; +import com.azure.storage.common.implementation.Constants; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.verify; + +public class StorageSeekableByteChannelBlobReadBehaviorUnitTests { + + private BlobDownloadResponse createMockDownloadResponse(String contentRange) { + Map headers = new HashMap<>(); + headers.put("Content-Range", contentRange); + return new BlobDownloadResponse(new BlobDownloadAsyncResponse(null, 206, new HttpHeaders(headers), null, + new BlobDownloadHeaders().setContentRange(contentRange))); + } + + @ParameterizedTest + @MethodSource("truncatedErrorResponseAtEofSupplier") + void readReturnEofWhenErrorResponseTruncatedAtKnownEof(long blobSize) throws IOException { + BlobClientBase client = Mockito.mock(BlobClientBase.class); + RuntimeException reactorWrapped = new RuntimeException(new IOException("connection reset by peer")); + Mockito.when(client.downloadStreamWithResponse(any(), any(), any(), any(), anyBoolean(), any(), any())) + .thenThrow(reactorWrapped); + + StorageSeekableByteChannelBlobReadBehavior behavior + = new StorageSeekableByteChannelBlobReadBehavior(client, ByteBuffer.allocate(0), -1, blobSize, null); + + assertEquals(-1, behavior.read(ByteBuffer.allocate(Constants.KB), blobSize)); + } + + private static Stream truncatedErrorResponseAtEofSupplier() { + return Stream.of(Arguments.of(Constants.KB), Arguments.of(50L * Constants.MB)); + } + + @Test + void readRethrowsRuntimeExceptionWhenNotAtEof() { + BlobClientBase client = Mockito.mock(BlobClientBase.class); + RuntimeException reactorWrapped = new RuntimeException(new IOException("connection reset by peer")); + Mockito.when(client.downloadStreamWithResponse(any(), any(), any(), any(), anyBoolean(), any(), any())) + .thenThrow(reactorWrapped); + + StorageSeekableByteChannelBlobReadBehavior behavior + = new StorageSeekableByteChannelBlobReadBehavior(client, ByteBuffer.allocate(0), -1, Constants.KB, null); + + assertThrows(RuntimeException.class, () -> behavior.read(ByteBuffer.allocate(Constants.KB), 0)); + } + + @Test + void readPassesNonNullDownloadRetryOptionsToClient() throws IOException { + BlobClientBase client = Mockito.mock(BlobClientBase.class); + ArgumentCaptor retryCaptor = ArgumentCaptor.forClass(DownloadRetryOptions.class); + Mockito.when(client.downloadStreamWithResponse(any(), any(), any(), any(), anyBoolean(), any(), any())) + .thenReturn(createMockDownloadResponse("bytes 0-1023/1024")); + + StorageSeekableByteChannelBlobReadBehavior behavior + = new StorageSeekableByteChannelBlobReadBehavior(client, ByteBuffer.allocate(0), -1, Constants.KB, null); + behavior.read(ByteBuffer.allocate(Constants.KB), 0); + + verify(client).downloadStreamWithResponse(any(), any(), retryCaptor.capture(), any(), anyBoolean(), any(), + any()); + assertNotNull(retryCaptor.getValue()); + } +} diff --git a/sdk/storage/azure-storage-file-datalake-stress/templates/stress-test-job.yaml b/sdk/storage/azure-storage-file-datalake-stress/templates/stress-test-job.yaml index e86f52638947..c10c4ded6d64 100644 --- a/sdk/storage/azure-storage-file-datalake-stress/templates/stress-test-job.yaml +++ b/sdk/storage/azure-storage-file-datalake-stress/templates/stress-test-job.yaml @@ -16,7 +16,7 @@ spec: args: - | set -ex; - dotnet dev-certs https --export-path /mnt/outputs/dev-cert.pfx; + dotnet dev-certs https --export-path /mnt/outputs/dev-cert.crt --format PEM --no-password; /root/.dotnet/tools/http-fault-injector; resources: limits: @@ -30,7 +30,17 @@ spec: - | set -xa; set -o pipefail; - keytool -import -alias test -file /mnt/outputs/dev-cert.pfx -keystore ${JAVA_HOME}/lib/security/cacerts -noprompt -keypass changeit -storepass changeit; + attempts=0; + while [ ! -s /mnt/outputs/dev-cert.crt ]; do + attempts=$((attempts + 1)); + if [ "$attempts" -gt 60 ]; then + echo "Timed out waiting for fault injector certificate" >&2; + exit 1; + fi; + sleep 1; + done; + keytool -delete -alias HttpFaultInject -keystore "${JAVA_HOME}/lib/security/cacerts" -storepass changeit >/dev/null 2>&1 || true; + keytool -importcert -trustcacerts -alias HttpFaultInject -file /mnt/outputs/dev-cert.crt -keystore "${JAVA_HOME}/lib/security/cacerts" -noprompt -storepass changeit || exit 1; mkdir -p "$DEBUG_SHARE"; . /mnt/outputs/.env; export AZURE_HTTP_CLIENT_IMPLEMENTATION=com.azure.core.http.netty.NettyAsyncHttpClientProvider; diff --git a/sdk/storage/azure-storage-file-share-stress/templates/stress-test-job.yaml b/sdk/storage/azure-storage-file-share-stress/templates/stress-test-job.yaml index b558feecdcb8..fe77e9cb6f63 100644 --- a/sdk/storage/azure-storage-file-share-stress/templates/stress-test-job.yaml +++ b/sdk/storage/azure-storage-file-share-stress/templates/stress-test-job.yaml @@ -16,7 +16,7 @@ spec: args: - | set -ex; - dotnet dev-certs https --export-path /mnt/outputs/dev-cert.pfx; + dotnet dev-certs https --export-path /mnt/outputs/dev-cert.crt --format PEM --no-password; /root/.dotnet/tools/http-fault-injector; resources: limits: @@ -30,7 +30,17 @@ spec: - | set -xa; set -o pipefail; - keytool -import -alias test -file /mnt/outputs/dev-cert.pfx -keystore ${JAVA_HOME}/lib/security/cacerts -noprompt -keypass changeit -storepass changeit; + attempts=0; + while [ ! -s /mnt/outputs/dev-cert.crt ]; do + attempts=$((attempts + 1)); + if [ "$attempts" -gt 60 ]; then + echo "Timed out waiting for fault injector certificate" >&2; + exit 1; + fi; + sleep 1; + done; + keytool -delete -alias HttpFaultInject -keystore "${JAVA_HOME}/lib/security/cacerts" -storepass changeit >/dev/null 2>&1 || true; + keytool -importcert -trustcacerts -alias HttpFaultInject -file /mnt/outputs/dev-cert.crt -keystore "${JAVA_HOME}/lib/security/cacerts" -noprompt -storepass changeit || exit 1; mkdir -p "$DEBUG_SHARE"; . /mnt/outputs/.env; export AZURE_HTTP_CLIENT_IMPLEMENTATION=com.azure.core.http.netty.NettyAsyncHttpClientProvider; diff --git a/sdk/storage/azure-storage-stress/delete-matching-resource-groups.ps1 b/sdk/storage/azure-storage-stress/delete-matching-resource-groups.ps1 new file mode 100644 index 000000000000..5bb8afa4cb8a --- /dev/null +++ b/sdk/storage/azure-storage-stress/delete-matching-resource-groups.ps1 @@ -0,0 +1,59 @@ +param( + [string] $Alias, + [string] $SubscriptionId, + [switch] $Execute +) + +$ErrorActionPreference = "Stop" + +Get-Command az | Out-Null + +if ($SubscriptionId) { + az account set --subscription $SubscriptionId +} + +$currentSubscription = az account show --query "{name:name, id:id}" --output tsv +Write-Host "Using subscription: $currentSubscription" +Write-Host "Looking for resource groups starting with 'SSS3PT_$Alias'..." + +$resourceGroups = @(az group list ` + --query "[?starts_with(name, 'SSS3PT_$Alias')].name" ` + --output tsv) + +if ($resourceGroups.Count -eq 0) { + Write-Host "No matching resource groups found." + exit 0 +} + +Write-Host "" +Write-Host "Matching resource groups:" +$resourceGroups | ForEach-Object { Write-Host " $_" } + +if (-not $Execute) { + Write-Host "" + Write-Host "Dry run only. To delete these resource groups, run:" + Write-Host " .\delete-matching-resource-groups.ps1 -Execute" + Write-Host "" + Write-Host "To target a specific subscription, run:" + Write-Host " .\delete-matching-resource-groups.ps1 -SubscriptionId -Execute" + exit 0 +} + +Write-Host "" +$confirmation = Read-Host "Type DELETE to permanently delete these resource groups" + +if ($confirmation -ne "DELETE") { + Write-Host "Cancelled." + exit 1 +} + +foreach ($resourceGroup in $resourceGroups) { + Write-Host "Deleting resource group: $resourceGroup" + az group delete ` + --name $resourceGroup ` + --yes ` + --no-wait +} + +Write-Host "" +Write-Host "Delete operations submitted."