Skip to content
13 changes: 11 additions & 2 deletions sdk/storage/azblob/blob/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@ const (
// DefaultDownloadBlockSize is default block size
DefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB

// DefaultConcurrency is the default number of blocks downloaded or uploaded in parallel
DefaultConcurrency = shared.DefaultConcurrency
// DefaultConcurrency is the legacy default number of blocks downloaded or uploaded in parallel.
//
// Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count.
DefaultConcurrency = shared.DefaultConcurrency //nolint:staticcheck // intentional re-export of deprecated const for backward compat
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add changelog entries in all the three packages

)

// DefaultConcurrencyValue returns the default concurrency for parallel uploads/downloads.
// The value is based on CPU core count, clamped between 8 and 96.
Comment thread
tanyasethi-msft marked this conversation as resolved.
// Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default of 5.
func DefaultConcurrencyValue() uint16 {
return shared.DefaultConcurrencyValue()
}

// BlobType defines values for BlobType
type BlobType = generated.BlobType

Expand Down
9 changes: 6 additions & 3 deletions sdk/storage/azblob/blob/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ type downloadOptions struct {
CPKInfo *CPKInfo
CPKScopeInfo *CPKScopeInfo

// Concurrency indicates the maximum number of blocks to download in parallel (0=default).
// Concurrency indicates the maximum number of blocks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

// RetryReaderOptionsPerBlock is used when downloading each block.
Expand Down Expand Up @@ -176,7 +177,8 @@ type DownloadBufferOptions struct {
// CPKScopeInfo contains a group of parameters for client provided encryption scope.
CPKScopeInfo *CPKScopeInfo

// Concurrency indicates the maximum number of blocks to download in parallel (0=default).
// Concurrency indicates the maximum number of blocks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

// RetryReaderOptionsPerBlock is used when downloading each block.
Expand Down Expand Up @@ -204,7 +206,8 @@ type DownloadFileOptions struct {
CPKInfo *CPKInfo
CPKScopeInfo *CPKScopeInfo

// Concurrency indicates the maximum number of blocks to download in parallel. The default value is 5.
// Concurrency indicates the maximum number of blocks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

// RetryReaderOptionsPerBlock is used when downloading each block.
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/azblob/blockblob/chunkwriting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestSlowDestCopyFrom(t *testing.T) {
require.ErrorIs(t, err, io.ErrNoProgress)
}

require.Equal(t, 1, tracker.Count)
require.GreaterOrEqual(t, tracker.Count, 1)
require.True(t, tracker.Freed)
}

Expand Down
8 changes: 5 additions & 3 deletions sdk/storage/azblob/blockblob/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ type uploadFromReaderOptions struct {
CPKInfo *blob.CPKInfo
CPKScopeInfo *blob.CPKScopeInfo

// Concurrency indicates the maximum number of blocks to upload in parallel (0=default)
// Concurrency indicates the maximum number of blocks to upload in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

TransactionalValidation blob.TransferValidationType
Expand Down Expand Up @@ -334,7 +335,8 @@ type UploadStreamOptions struct {
BlockSize int64

// Concurrency defines the max number of concurrent uploads to be performed to upload the file.
// Each concurrent upload will create a buffer of size BlockSize. The default value is one.
// Each concurrent upload will create a buffer of size BlockSize. The default is based on
// CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency int
Comment on lines +338 to 340
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check if data validation tests are already there for all the affected upload and download methods.


TransactionalValidation blob.TransferValidationType
Expand All @@ -350,7 +352,7 @@ type UploadStreamOptions struct {

func (u *UploadStreamOptions) setDefaults() {
if u.Concurrency == 0 {
u.Concurrency = 1
u.Concurrency = int(shared.DefaultConcurrencyValue())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method returns 5 if AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY environment variable is set. Whereas the previous default value for concurrency in UploadStreamOptions is 1.

}

if u.BlockSize < _1MiB {
Expand Down
18 changes: 17 additions & 1 deletion sdk/storage/azblob/internal/shared/batch_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,28 @@ package shared
import (
"context"
"errors"
"os"
"runtime"
"strings"
)

const (
// DefaultConcurrency is the legacy default concurrency value.
//
// Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count.
DefaultConcurrency = 5
)

// DefaultConcurrencyValue returns the default concurrency based on CPU count,
// clamped between 8 and 96. Set the AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY
// environment variable to "true" to revert to the previous default of 5.
func DefaultConcurrencyValue() uint16 {
if strings.EqualFold(os.Getenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY"), "true") {
return DefaultConcurrency
}
return uint16(max(8, min(96, runtime.NumCPU())))
}

// BatchTransferOptions identifies options used by doBatchTransfer.
type BatchTransferOptions struct {
TransferSize int64
Expand All @@ -30,7 +46,7 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error {
}

if o.Concurrency == 0 {
o.Concurrency = DefaultConcurrency // default concurrency
o.Concurrency = DefaultConcurrencyValue()
}

// Prepare and do parallel operations.
Expand Down
46 changes: 46 additions & 0 deletions sdk/storage/azblob/internal/shared/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package shared

import (
"runtime"
"strings"
"testing"

Expand Down Expand Up @@ -207,3 +208,48 @@ func TestIsIPEndpointStyle(t *testing.T) {
require.True(t, IsIPEndpointStyle("127.0.0.1"))
require.True(t, IsIPEndpointStyle("127.0.0.1:80"))
}

func TestDefaultConcurrencyValue_InBounds(t *testing.T) {
t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "")
val := DefaultConcurrencyValue()
require.GreaterOrEqual(t, val, uint16(8))
require.LessOrEqual(t, val, uint16(96))
}

func TestDefaultConcurrencyValue_Deterministic(t *testing.T) {
t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "")
val1 := DefaultConcurrencyValue()
val2 := DefaultConcurrencyValue()
require.Equal(t, val1, val2)
}

func TestDefaultConcurrencyValue_MatchesCPU(t *testing.T) {
Comment thread
tanyasethi-msft marked this conversation as resolved.
t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "")
cpus := runtime.NumCPU()
val := DefaultConcurrencyValue()
if cpus < 8 {
require.Equal(t, uint16(8), val)
} else if cpus > 96 {
require.Equal(t, uint16(96), val)
} else {
require.Equal(t, uint16(cpus), val)
}
}

func TestDefaultConcurrencyValue_LegacyEnvVar(t *testing.T) {
t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "true")
require.Equal(t, uint16(DefaultConcurrency), DefaultConcurrencyValue())

t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "TRUE")
require.Equal(t, uint16(DefaultConcurrency), DefaultConcurrencyValue())

t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "false")
val := DefaultConcurrencyValue()
require.GreaterOrEqual(t, val, uint16(8))
require.LessOrEqual(t, val, uint16(96))

t.Setenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY", "")
val = DefaultConcurrencyValue()
require.GreaterOrEqual(t, val, uint16(8))
require.LessOrEqual(t, val, uint16(96))
}
14 changes: 9 additions & 5 deletions sdk/storage/azdatalake/file/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ type uploadFromReaderOptions struct {
// Progress is a function that is invoked periodically as bytes are sent to the FileClient.
// Note that the progress reporting is not always increasing; it can go down when retrying a request.
Progress func(bytesTransferred int64)
// Concurrency indicates the maximum number of chunks to upload in parallel (default is 5)
// Concurrency indicates the maximum number of chunks to upload in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16
// AccessConditions contains optional parameters to access leased entity.
AccessConditions *AccessConditions
Expand All @@ -157,7 +158,8 @@ type uploadFromReaderOptions struct {
type UploadStreamOptions struct {
// ChunkSize specifies the chunk size to use in bytes; the default (and maximum size) is MaxAppendBytes.
ChunkSize int64
// Concurrency indicates the maximum number of chunks to upload in parallel (default is 5)
// Concurrency indicates the maximum number of chunks to upload in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16
// AccessConditions contains optional parameters to access leased entity.
AccessConditions *AccessConditions
Expand Down Expand Up @@ -324,7 +326,7 @@ func (o *AppendDataOptions) format(offset int64, body io.ReadSeekCloser) (*gener

func (u *UploadStreamOptions) setDefaults() {
if u.Concurrency == 0 {
u.Concurrency = 1
u.Concurrency = shared.DefaultConcurrencyValue()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous, this returns the previous default value as 5 instead of 1

}

if u.ChunkSize < _1MiB {
Expand Down Expand Up @@ -428,7 +430,8 @@ type DownloadBufferOptions struct {
CPKInfo *CPKInfo
// CPKScopeInfo contains a group of parameters for client provided encryption scope.
CPKScopeInfo *CPKScopeInfo
// Concurrency indicates the maximum number of chunks to download in parallel (0=default).
// Concurrency indicates the maximum number of chunks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16
// RetryReaderOptionsPerChunk is used when downloading each chunk.
RetryReaderOptionsPerChunk *RetryReaderOptions
Expand Down Expand Up @@ -482,7 +485,8 @@ type DownloadFileOptions struct {
CPKInfo *CPKInfo
// CPKScopeInfo contains a group of parameters for client provided encryption scope.
CPKScopeInfo *CPKScopeInfo
// Concurrency indicates the maximum number of chunks to download in parallel. The default value is 5.
// Concurrency indicates the maximum number of chunks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16
// RetryReaderOptionsPerChunk is used when downloading each chunk.
RetryReaderOptionsPerChunk *RetryReaderOptions
Expand Down
22 changes: 21 additions & 1 deletion sdk/storage/azdatalake/internal/shared/batch_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,28 @@ package shared
import (
"context"
"errors"
"os"
"runtime"
"strings"
)

const (
// DefaultConcurrency is the legacy default concurrency value.
//
// Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count.
DefaultConcurrency = 5
)

// DefaultConcurrencyValue returns the default concurrency based on CPU count,
// clamped between 8 and 96. Set the AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY
// environment variable to "true" to revert to the previous default of 5.
func DefaultConcurrencyValue() uint16 {
if strings.EqualFold(os.Getenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY"), "true") {
return DefaultConcurrency
}
return uint16(max(8, min(96, runtime.NumCPU())))
}

// BatchTransferOptions identifies options used by doBatchTransfer.
type BatchTransferOptions struct {
TransferSize int64
Expand All @@ -25,7 +45,7 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error {
}

if o.Concurrency == 0 {
o.Concurrency = 5 // default concurrency
o.Concurrency = DefaultConcurrencyValue()
}

// Prepare and do parallel operations.
Expand Down
17 changes: 11 additions & 6 deletions sdk/storage/azfile/file/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ type downloadOptions struct {
// LeaseAccessConditions contains optional parameters to access leased entity.
LeaseAccessConditions *LeaseAccessConditions

// Concurrency indicates the maximum number of chunks to download in parallel (0=default).
// Concurrency indicates the maximum number of chunks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

// RetryReaderOptionsPerChunk is used when downloading each chunk.
Expand Down Expand Up @@ -646,7 +647,8 @@ type DownloadBufferOptions struct {
// LeaseAccessConditions contains optional parameters to access leased entity.
LeaseAccessConditions *LeaseAccessConditions

// Concurrency indicates the maximum number of chunks to download in parallel (0=default).
// Concurrency indicates the maximum number of chunks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

// RetryReaderOptionsPerChunk is used when downloading each chunk.
Expand All @@ -669,7 +671,8 @@ type DownloadFileOptions struct {
// LeaseAccessConditions contains optional parameters to access leased entity.
LeaseAccessConditions *LeaseAccessConditions

// Concurrency indicates the maximum number of chunks to download in parallel (0=default).
// Concurrency indicates the maximum number of chunks to download in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

// RetryReaderOptionsPerChunk is used when downloading each chunk.
Expand Down Expand Up @@ -1050,7 +1053,8 @@ type uploadFromReaderOptions struct {
// Note that the progress reporting is not always increasing; it can go down when retrying a request.
Progress func(bytesTransferred int64)

// Concurrency indicates the maximum number of chunks to upload in parallel (default is 5)
// Concurrency indicates the maximum number of chunks to upload in parallel.
// The default is based on CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency uint16

// LeaseAccessConditions contains optional parameters to access leased entity.
Expand Down Expand Up @@ -1078,7 +1082,8 @@ type UploadStreamOptions struct {
ChunkSize int64

// Concurrency defines the max number of concurrent uploads to be performed to upload the file.
// Each concurrent upload will create a buffer of size ChunkSize. The default value is one.
// Each concurrent upload will create a buffer of size ChunkSize. The default is based on
// CPU core count (min 8, max 96). Set AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY=true to revert to the previous default.
Concurrency int

// LeaseAccessConditions contains optional parameters to access leased entity.
Expand All @@ -1087,7 +1092,7 @@ type UploadStreamOptions struct {

func (u *UploadStreamOptions) setDefaults() {
if u.Concurrency == 0 {
u.Concurrency = 1
u.Concurrency = int(shared.DefaultConcurrencyValue())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous

}

if u.ChunkSize < _1MiB {
Expand Down
22 changes: 21 additions & 1 deletion sdk/storage/azfile/internal/shared/batch_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,28 @@ package shared
import (
"context"
"errors"
"os"
"runtime"
"strings"
)

const (
// DefaultConcurrency is the legacy default concurrency value.
//
// Deprecated: Use DefaultConcurrencyValue() instead, which returns a value based on CPU core count.
DefaultConcurrency = 5
)

// DefaultConcurrencyValue returns the default concurrency based on CPU count,
// clamped between 8 and 96. Set the AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY
// environment variable to "true" to revert to the previous default of 5.
func DefaultConcurrencyValue() uint16 {
if strings.EqualFold(os.Getenv("AZURE_STORAGE_USE_LEGACY_DEFAULT_CONCURRENCY"), "true") {
return DefaultConcurrency
}
return uint16(max(8, min(96, runtime.NumCPU())))
}

// BatchTransferOptions identifies options used by doBatchTransfer.
type BatchTransferOptions struct {
TransferSize int64
Expand All @@ -25,7 +45,7 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error {
}

if o.Concurrency == 0 {
o.Concurrency = 5 // default concurrency
o.Concurrency = DefaultConcurrencyValue()
}

// Prepare and do parallel operations.
Expand Down
Loading