diff --git a/sdk/storage/azblob/blob/constants.go b/sdk/storage/azblob/blob/constants.go index ef47450b5ea6..956e5163dc09 100644 --- a/sdk/storage/azblob/blob/constants.go +++ b/sdk/storage/azblob/blob/constants.go @@ -17,10 +17,19 @@ const ( // DefaultDownloadBlockSize is default block size DefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB - // DefaultConcurrency is the default number of blocks downloaded or uploaded in parallel - DefaultConcurrency = shared.DefaultConcurrency + // DefaultConcurrency is the legacy default number of blocks downloaded or uploaded in parallel. + // + // Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count. + DefaultConcurrency = shared.DefaultConcurrency //nolint:staticcheck // intentional re-export of deprecated const for backward compat ) +// DefaultConcurrencyValue returns the default concurrency for parallel uploads/downloads. +// The value is based on CPU core count, clamped between 8 and 96. +// Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default of 5. +func DefaultConcurrencyValue() uint16 { + return shared.DefaultConcurrencyValue() +} + // BlobType defines values for BlobType type BlobType = generated.BlobType diff --git a/sdk/storage/azblob/blob/models.go b/sdk/storage/azblob/blob/models.go index 9acec4594fb3..a23329c19f70 100644 --- a/sdk/storage/azblob/blob/models.go +++ b/sdk/storage/azblob/blob/models.go @@ -122,7 +122,8 @@ type downloadOptions struct { CPKInfo *CPKInfo CPKScopeInfo *CPKScopeInfo - // Concurrency indicates the maximum number of blocks to download in parallel (0=default). + // Concurrency indicates the maximum number of blocks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerBlock is used when downloading each block. @@ -176,7 +177,8 @@ type DownloadBufferOptions struct { // CPKScopeInfo contains a group of parameters for client provided encryption scope. CPKScopeInfo *CPKScopeInfo - // Concurrency indicates the maximum number of blocks to download in parallel (0=default). + // Concurrency indicates the maximum number of blocks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerBlock is used when downloading each block. @@ -204,7 +206,8 @@ type DownloadFileOptions struct { CPKInfo *CPKInfo CPKScopeInfo *CPKScopeInfo - // Concurrency indicates the maximum number of blocks to download in parallel. The default value is 5. + // Concurrency indicates the maximum number of blocks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerBlock is used when downloading each block. diff --git a/sdk/storage/azblob/blockblob/chunkwriting_test.go b/sdk/storage/azblob/blockblob/chunkwriting_test.go index d8f5473ebde1..ed6ee6e90961 100644 --- a/sdk/storage/azblob/blockblob/chunkwriting_test.go +++ b/sdk/storage/azblob/blockblob/chunkwriting_test.go @@ -177,7 +177,7 @@ func TestSlowDestCopyFrom(t *testing.T) { require.ErrorIs(t, err, io.ErrNoProgress) } - require.Equal(t, 1, tracker.Count) + require.GreaterOrEqual(t, tracker.Count, 1) require.True(t, tracker.Freed) } diff --git a/sdk/storage/azblob/blockblob/models.go b/sdk/storage/azblob/blockblob/models.go index 868637ffc481..4e3bac2863f8 100644 --- a/sdk/storage/azblob/blockblob/models.go +++ b/sdk/storage/azblob/blockblob/models.go @@ -273,7 +273,8 @@ type uploadFromReaderOptions struct { CPKInfo *blob.CPKInfo CPKScopeInfo *blob.CPKScopeInfo - // Concurrency indicates the maximum number of blocks to upload in parallel (0=default) + // Concurrency indicates the maximum number of blocks to upload in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 TransactionalValidation blob.TransferValidationType @@ -334,7 +335,8 @@ type UploadStreamOptions struct { BlockSize int64 // Concurrency defines the max number of concurrent uploads to be performed to upload the file. - // Each concurrent upload will create a buffer of size BlockSize. The default value is one. + // Each concurrent upload will create a buffer of size BlockSize. The default is based on + // CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency int TransactionalValidation blob.TransferValidationType @@ -350,7 +352,7 @@ type UploadStreamOptions struct { func (u *UploadStreamOptions) setDefaults() { if u.Concurrency == 0 { - u.Concurrency = 1 + u.Concurrency = int(shared.DefaultConcurrencyValue()) } if u.BlockSize < _1MiB { diff --git a/sdk/storage/azblob/internal/shared/batch_transfer.go b/sdk/storage/azblob/internal/shared/batch_transfer.go index 7fc26f543d06..e6cc0a03039e 100644 --- a/sdk/storage/azblob/internal/shared/batch_transfer.go +++ b/sdk/storage/azblob/internal/shared/batch_transfer.go @@ -6,12 +6,28 @@ package shared import ( "context" "errors" + "os" + "runtime" + "strings" ) const ( + // DefaultConcurrency is the legacy default concurrency value. + // + // Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count. DefaultConcurrency = 5 ) +// DefaultConcurrencyValue returns the default concurrency based on CPU count, +// clamped between 8 and 96. Set the AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY +// environment variable to "true" to revert to the previous default of 5. +func DefaultConcurrencyValue() uint16 { + if strings.EqualFold(os.Getenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY"), "true") { + return DefaultConcurrency + } + return uint16(max(8, min(96, runtime.NumCPU()))) +} + // BatchTransferOptions identifies options used by doBatchTransfer. type BatchTransferOptions struct { TransferSize int64 @@ -30,7 +46,7 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error { } if o.Concurrency == 0 { - o.Concurrency = DefaultConcurrency // default concurrency + o.Concurrency = DefaultConcurrencyValue() } // Prepare and do parallel operations. diff --git a/sdk/storage/azblob/internal/shared/shared_test.go b/sdk/storage/azblob/internal/shared/shared_test.go index d4cd2c7b2e7a..e9eafee8699e 100644 --- a/sdk/storage/azblob/internal/shared/shared_test.go +++ b/sdk/storage/azblob/internal/shared/shared_test.go @@ -4,6 +4,7 @@ package shared import ( + "runtime" "strings" "testing" @@ -207,3 +208,48 @@ func TestIsIPEndpointStyle(t *testing.T) { require.True(t, IsIPEndpointStyle("127.0.0.1")) require.True(t, IsIPEndpointStyle("127.0.0.1:80")) } + +func TestDefaultConcurrencyValue_InBounds(t *testing.T) { + t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "") + val := DefaultConcurrencyValue() + require.GreaterOrEqual(t, val, uint16(8)) + require.LessOrEqual(t, val, uint16(96)) +} + +func TestDefaultConcurrencyValue_Deterministic(t *testing.T) { + t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "") + val1 := DefaultConcurrencyValue() + val2 := DefaultConcurrencyValue() + require.Equal(t, val1, val2) +} + +func TestDefaultConcurrencyValue_MatchesCPU(t *testing.T) { + t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "") + cpus := runtime.NumCPU() + val := DefaultConcurrencyValue() + if cpus < 8 { + require.Equal(t, uint16(8), val) + } else if cpus > 96 { + require.Equal(t, uint16(96), val) + } else { + require.Equal(t, uint16(cpus), val) + } +} + +func TestDefaultConcurrencyValue_LegacyEnvVar(t *testing.T) { + t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "true") + require.Equal(t, uint16(DefaultConcurrency), DefaultConcurrencyValue()) + + t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "TRUE") + require.Equal(t, uint16(DefaultConcurrency), DefaultConcurrencyValue()) + + t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "false") + val := DefaultConcurrencyValue() + require.GreaterOrEqual(t, val, uint16(8)) + require.LessOrEqual(t, val, uint16(96)) + + t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "") + val = DefaultConcurrencyValue() + require.GreaterOrEqual(t, val, uint16(8)) + require.LessOrEqual(t, val, uint16(96)) +} diff --git a/sdk/storage/azdatalake/file/models.go b/sdk/storage/azdatalake/file/models.go index 4ff0c58b897a..5d8857888af3 100644 --- a/sdk/storage/azdatalake/file/models.go +++ b/sdk/storage/azdatalake/file/models.go @@ -141,7 +141,8 @@ type uploadFromReaderOptions struct { // Progress is a function that is invoked periodically as bytes are sent to the FileClient. // Note that the progress reporting is not always increasing; it can go down when retrying a request. Progress func(bytesTransferred int64) - // Concurrency indicates the maximum number of chunks to upload in parallel (default is 5) + // Concurrency indicates the maximum number of chunks to upload in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // AccessConditions contains optional parameters to access leased entity. AccessConditions *AccessConditions @@ -157,7 +158,8 @@ type uploadFromReaderOptions struct { type UploadStreamOptions struct { // ChunkSize specifies the chunk size to use in bytes; the default (and maximum size) is MaxAppendBytes. ChunkSize int64 - // Concurrency indicates the maximum number of chunks to upload in parallel (default is 5) + // Concurrency indicates the maximum number of chunks to upload in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // AccessConditions contains optional parameters to access leased entity. AccessConditions *AccessConditions @@ -324,7 +326,7 @@ func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*gener func (u *UploadStreamOptions) setDefaults() { if u.Concurrency == 0 { - u.Concurrency = 1 + u.Concurrency = shared.DefaultConcurrencyValue() } if u.ChunkSize < _1MiB { @@ -428,7 +430,8 @@ type DownloadBufferOptions struct { CPKInfo *CPKInfo // CPKScopeInfo contains a group of parameters for client provided encryption scope. CPKScopeInfo *CPKScopeInfo - // Concurrency indicates the maximum number of chunks to download in parallel (0=default). + // Concurrency indicates the maximum number of chunks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerChunk is used when downloading each chunk. RetryReaderOptionsPerChunk *RetryReaderOptions @@ -482,7 +485,8 @@ type DownloadFileOptions struct { CPKInfo *CPKInfo // CPKScopeInfo contains a group of parameters for client provided encryption scope. CPKScopeInfo *CPKScopeInfo - // Concurrency indicates the maximum number of chunks to download in parallel. The default value is 5. + // Concurrency indicates the maximum number of chunks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerChunk is used when downloading each chunk. RetryReaderOptionsPerChunk *RetryReaderOptions diff --git a/sdk/storage/azdatalake/internal/shared/batch_transfer.go b/sdk/storage/azdatalake/internal/shared/batch_transfer.go index 2c5cf54c07c9..98f0239d2005 100644 --- a/sdk/storage/azdatalake/internal/shared/batch_transfer.go +++ b/sdk/storage/azdatalake/internal/shared/batch_transfer.go @@ -6,8 +6,28 @@ package shared import ( "context" "errors" + "os" + "runtime" + "strings" ) +const ( + // DefaultConcurrency is the legacy default concurrency value. + // + // Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count. + DefaultConcurrency = 5 +) + +// DefaultConcurrencyValue returns the default concurrency based on CPU count, +// clamped between 8 and 96. Set the AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY +// environment variable to "true" to revert to the previous default of 5. +func DefaultConcurrencyValue() uint16 { + if strings.EqualFold(os.Getenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY"), "true") { + return DefaultConcurrency + } + return uint16(max(8, min(96, runtime.NumCPU()))) +} + // BatchTransferOptions identifies options used by doBatchTransfer. type BatchTransferOptions struct { TransferSize int64 @@ -25,7 +45,7 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error { } if o.Concurrency == 0 { - o.Concurrency = 5 // default concurrency + o.Concurrency = DefaultConcurrencyValue() } // Prepare and do parallel operations. diff --git a/sdk/storage/azfile/file/models.go b/sdk/storage/azfile/file/models.go index 0dd5122c1556..bfdce6de2de1 100644 --- a/sdk/storage/azfile/file/models.go +++ b/sdk/storage/azfile/file/models.go @@ -606,7 +606,8 @@ type downloadOptions struct { // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions - // Concurrency indicates the maximum number of chunks to download in parallel (0=default). + // Concurrency indicates the maximum number of chunks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerChunk is used when downloading each chunk. @@ -646,7 +647,8 @@ type DownloadBufferOptions struct { // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions - // Concurrency indicates the maximum number of chunks to download in parallel (0=default). + // Concurrency indicates the maximum number of chunks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerChunk is used when downloading each chunk. @@ -669,7 +671,8 @@ type DownloadFileOptions struct { // LeaseAccessConditions contains optional parameters to access leased entity. LeaseAccessConditions *LeaseAccessConditions - // Concurrency indicates the maximum number of chunks to download in parallel (0=default). + // Concurrency indicates the maximum number of chunks to download in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // RetryReaderOptionsPerChunk is used when downloading each chunk. @@ -1050,7 +1053,8 @@ type uploadFromReaderOptions struct { // Note that the progress reporting is not always increasing; it can go down when retrying a request. Progress func(bytesTransferred int64) - // Concurrency indicates the maximum number of chunks to upload in parallel (default is 5) + // Concurrency indicates the maximum number of chunks to upload in parallel. + // The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency uint16 // LeaseAccessConditions contains optional parameters to access leased entity. @@ -1078,7 +1082,8 @@ type UploadStreamOptions struct { ChunkSize int64 // Concurrency defines the max number of concurrent uploads to be performed to upload the file. - // Each concurrent upload will create a buffer of size ChunkSize. The default value is one. + // Each concurrent upload will create a buffer of size ChunkSize. The default is based on + // CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default. Concurrency int // LeaseAccessConditions contains optional parameters to access leased entity. @@ -1087,7 +1092,7 @@ type UploadStreamOptions struct { func (u *UploadStreamOptions) setDefaults() { if u.Concurrency == 0 { - u.Concurrency = 1 + u.Concurrency = int(shared.DefaultConcurrencyValue()) } if u.ChunkSize < _1MiB { diff --git a/sdk/storage/azfile/internal/shared/batch_transfer.go b/sdk/storage/azfile/internal/shared/batch_transfer.go index 2c5cf54c07c9..98f0239d2005 100644 --- a/sdk/storage/azfile/internal/shared/batch_transfer.go +++ b/sdk/storage/azfile/internal/shared/batch_transfer.go @@ -6,8 +6,28 @@ package shared import ( "context" "errors" + "os" + "runtime" + "strings" ) +const ( + // DefaultConcurrency is the legacy default concurrency value. + // + // Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count. + DefaultConcurrency = 5 +) + +// DefaultConcurrencyValue returns the default concurrency based on CPU count, +// clamped between 8 and 96. Set the AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY +// environment variable to "true" to revert to the previous default of 5. +func DefaultConcurrencyValue() uint16 { + if strings.EqualFold(os.Getenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY"), "true") { + return DefaultConcurrency + } + return uint16(max(8, min(96, runtime.NumCPU()))) +} + // BatchTransferOptions identifies options used by doBatchTransfer. type BatchTransferOptions struct { TransferSize int64 @@ -25,7 +45,7 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error { } if o.Concurrency == 0 { - o.Concurrency = 5 // default concurrency + o.Concurrency = DefaultConcurrencyValue() } // Prepare and do parallel operations.