Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type StorageConfigType = {
storageS3InternalTracesEnabled?: boolean
storageS3MaxSockets: number
storageS3DisableChecksum: boolean
storageS3UploadQueueSize: number
storageS3Bucket: string
storageS3Endpoint?: string
storageS3ForcePathStyle?: boolean
Expand Down Expand Up @@ -365,8 +364,6 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
10
),
storageS3DisableChecksum: getOptionalConfigFromEnv('STORAGE_S3_DISABLE_CHECKSUM') === 'true',
storageS3UploadQueueSize:
envNumber(getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_QUEUE_SIZE')) ?? 2,
storageS3InternalTracesEnabled:
getOptionalConfigFromEnv('STORAGE_S3_ENABLED_METRICS') === 'true',
storageS3Bucket: getOptionalConfigFromEnv('STORAGE_S3_BUCKET', 'GLOBAL_S3_BUCKET'),
Expand Down
22 changes: 21 additions & 1 deletion src/http/routes/s3/commands/put-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ export default function PutObject(s3Router: S3Router) {
throw ERRORS.InvalidParameter('internalIcebergBucketName')
}

const decodedContentLength = ctx.req.headers['x-amz-decoded-content-length']
const rawContentLength = decodedContentLength || contentLength
const icebergContentLength =
rawContentLength !== undefined && rawContentLength !== null
? Number(rawContentLength)
: undefined

return pipeline(
uploadRequest.body,
new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB limit for iceberg objects
Expand All @@ -93,7 +100,8 @@ export default function PutObject(s3Router: S3Router) {
fileStream as Readable,
uploadRequest.mimeType,
uploadRequest.cacheControl,
ctx.signals.body
ctx.signals.body,
Number.isFinite(icebergContentLength) ? icebergContentLength : undefined
)

return {
Expand Down Expand Up @@ -135,6 +143,17 @@ export default function PutObject(s3Router: S3Router) {
fileSizeLimit: bucket.file_size_limit || undefined,
})

const decodedContentLength = ctx.req.headers['x-amz-decoded-content-length'] as
| string
| undefined
const rawContentLength = decodedContentLength ? Number(decodedContentLength) : contentLength
const resolvedContentLength =
rawContentLength !== undefined &&
rawContentLength !== null &&
Number.isFinite(Number(rawContentLength))
? Number(rawContentLength)
: undefined

return pipeline(
uploadRequest.body,
new ByteLimitTransformStream(uploadRequest.maxFileSize),
Expand All @@ -147,6 +166,7 @@ export default function PutObject(s3Router: S3Router) {
Key: key,
CacheControl: uploadRequest.cacheControl,
ContentType: uploadRequest.mimeType,
ContentLength: resolvedContentLength,
Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined,
ContentEncoding: req.Headers?.['content-encoding'],
Metadata: metadata,
Expand Down
3 changes: 2 additions & 1 deletion src/storage/backend/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ export abstract class StorageBackendAdapter {
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string,
signal?: AbortSignal
signal?: AbortSignal,
contentLength?: number
): Promise<ObjectMetadata> {
throw new Error('uploadObject not implemented')
}
Expand Down
4 changes: 3 additions & 1 deletion src/storage/backend/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ export class FileBackend implements StorageBackendAdapter {
version: string | undefined,
body: NodeJS.ReadableStream,
contentType: string,
cacheControl: string
cacheControl: string,
signal?: AbortSignal,
contentLength?: number
): Promise<ObjectMetadata> {
try {
const file = this.resolveSecurePath(withOptionalVersion(`${bucketName}/${key}`, version))
Expand Down
90 changes: 84 additions & 6 deletions src/storage/backend/s3/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
HeadObjectCommand,
ListObjectsV2Command,
ListPartsCommand,
PutObjectCommand,
S3Client,
S3ClientConfig,
UploadPartCommand,
Expand All @@ -34,13 +35,14 @@ import {
} from './../adapter'

const {
storageS3UploadQueueSize,
tracingFeatures,
storageS3MaxSockets,
tracingEnabled,
storageS3DisableChecksum,
} = getConfig()

const MAX_PUT_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 // 5GB

export interface S3ClientOptions {
endpoint?: string
region?: string
Expand Down Expand Up @@ -126,13 +128,15 @@ export class S3Backend implements StorageBackendAdapter {

/**
* Uploads and store an object
* Max 5GB
* @param bucketName
* @param key
* @param version
* @param body
* @param contentType
* @param cacheControl
* @param signal
* @param contentLength
*/
async uploadObject(
bucketName: string,
Expand All @@ -141,17 +145,94 @@ export class S3Backend implements StorageBackendAdapter {
body: Readable,
contentType: string,
cacheControl: string,
signal?: AbortSignal
signal?: AbortSignal,
contentLength?: number
): Promise<ObjectMetadata> {
if (signal?.aborted) {
throw ERRORS.Aborted('Upload was aborted')
}

if (typeof contentLength !== 'number') {
// If content length is unknown, use streaming multipart upload which does not require buffering the entire stream in memory.
return this.bufferedMultipartUpload(bucketName, key, version, body, contentType, cacheControl, signal)
}

if (contentLength > MAX_PUT_OBJECT_SIZE) {
throw ERRORS.EntityTooLarge(undefined)
}

// Use PutObject directly when content-length is known and within S3's single-object limit (5GB).
// This avoids the buffering overhead of the Upload class which buffers each part in memory.
return this.putObject(
bucketName,
key,
version,
body,
contentType,
cacheControl,
signal,
contentLength
)
}

protected async putObject(
bucketName: string,
key: string,
version: string | undefined,
body: Readable,
contentType: string,
cacheControl: string,
signal?: AbortSignal,
contentLength?: number
): Promise<ObjectMetadata> {
const dataStream = tracingFeatures?.upload ? monitorStream(body) : body

const command = new PutObjectCommand({
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Body: dataStream,
ContentType: contentType,
CacheControl: cacheControl,
ContentLength: contentLength,
})

try {
const data = await this.client.send(command, {
abortSignal: signal,
})

return {
httpStatusCode: data.$metadata.httpStatusCode || 200,
cacheControl,
eTag: data.ETag || '',
mimetype: contentType,
contentLength: contentLength || 0,
lastModified: new Date(),
size: contentLength || 0,
contentRange: undefined,
}
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') {
throw ERRORS.AbortedTerminate('Upload was aborted', err)
}
throw StorageBackendError.fromError(err)
}
}

protected async bufferedMultipartUpload(
bucketName: string,
key: string,
version: string | undefined,
body: Readable,
contentType: string,
cacheControl: string,
signal?: AbortSignal
): Promise<ObjectMetadata> {
const dataStream = tracingFeatures?.upload ? monitorStream(body) : body

const upload = new Upload({
client: this.client,
queueSize: storageS3UploadQueueSize,
queueSize: 1,
params: {
Bucket: bucketName,
Key: withOptionalVersion(key, version),
Expand All @@ -177,11 +258,8 @@ export class S3Backend implements StorageBackendAdapter {
try {
const data = await upload.done()

// Remove event listener to allow GC of upload and dataStream references
upload.off('httpUploadProgress', progressHandler)

// Only call head for objects that are > 0 bytes
// for some reason headObject can take a really long time to resolve on zero byte uploads, this was causing requests to timeout
const metadata = hasUploadedBytes
? await this.headObject(bucketName, key, version)
: {
Expand Down
1 change: 1 addition & 0 deletions src/storage/protocols/s3/s3-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ export class S3ProtocolHandler {
body: command.Body as Readable,
cacheControl: command.CacheControl!,
mimeType: command.ContentType!,
contentLength: command.ContentLength,
isTruncated: options.isTruncated,
userMetadata: command.Metadata,
},
Expand Down
9 changes: 8 additions & 1 deletion src/storage/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface FileUpload {
body: Readable
mimeType: string
cacheControl: string
contentLength?: number
isTruncated: () => boolean
xRobotsTag?: string
userMetadata?: Record<string, unknown>
Expand Down Expand Up @@ -110,7 +111,8 @@ export class Uploader {
file.body,
file.mimeType,
file.cacheControl,
request.signal
request.signal,
file.contentLength
)

if (request.file.xRobotsTag) {
Expand Down Expand Up @@ -320,6 +322,7 @@ export async function fileUploadFromRequest(
let userMetadata: Record<string, unknown> | undefined
let mimeType: string
let isTruncated: () => boolean
let fileContentLength: number | undefined
let maxFileSize = 0

// When is an empty folder we restrict it to 0 bytes
Expand Down Expand Up @@ -394,6 +397,9 @@ export async function fileUploadFromRequest(

// Extract content-length value to avoid capturing entire request object in closure
const contentLength = Number(request.headers['content-length'])
if (Number.isFinite(contentLength) && contentLength >= 0) {
fileContentLength = contentLength
}
isTruncated = () => {
// @todo more secure to get this from the stream or from s3 in the next step
return contentLength > maxFileSize
Expand All @@ -411,6 +417,7 @@ export async function fileUploadFromRequest(
body,
mimeType,
cacheControl,
contentLength: fileContentLength,
isTruncated,
userMetadata,
maxFileSize,
Expand Down
Loading