diff --git a/src/config.ts b/src/config.ts index 7b573c1c7..1b7764c48 100644 --- a/src/config.ts +++ b/src/config.ts @@ -66,7 +66,6 @@ type StorageConfigType = { storageS3InternalTracesEnabled?: boolean storageS3MaxSockets: number storageS3DisableChecksum: boolean - storageS3UploadQueueSize: number storageS3Bucket: string storageS3Endpoint?: string storageS3ForcePathStyle?: boolean @@ -365,8 +364,6 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType { 10 ), storageS3DisableChecksum: getOptionalConfigFromEnv('STORAGE_S3_DISABLE_CHECKSUM') === 'true', - storageS3UploadQueueSize: - envNumber(getOptionalConfigFromEnv('STORAGE_S3_UPLOAD_QUEUE_SIZE')) ?? 2, storageS3InternalTracesEnabled: getOptionalConfigFromEnv('STORAGE_S3_ENABLED_METRICS') === 'true', storageS3Bucket: getOptionalConfigFromEnv('STORAGE_S3_BUCKET', 'GLOBAL_S3_BUCKET'), diff --git a/src/http/routes/s3/commands/put-object.ts b/src/http/routes/s3/commands/put-object.ts index 08b056c7a..66cd63591 100644 --- a/src/http/routes/s3/commands/put-object.ts +++ b/src/http/routes/s3/commands/put-object.ts @@ -81,6 +81,13 @@ export default function PutObject(s3Router: S3Router) { throw ERRORS.InvalidParameter('internalIcebergBucketName') } + const decodedContentLength = ctx.req.headers['x-amz-decoded-content-length'] + const rawContentLength = decodedContentLength || contentLength + const icebergContentLength = + rawContentLength !== undefined && rawContentLength !== null + ? Number(rawContentLength) + : undefined + return pipeline( uploadRequest.body, new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB limit for iceberg objects @@ -93,7 +100,8 @@ export default function PutObject(s3Router: S3Router) { fileStream as Readable, uploadRequest.mimeType, uploadRequest.cacheControl, - ctx.signals.body + ctx.signals.body, + Number.isFinite(icebergContentLength) ? icebergContentLength : undefined ) return { @@ -135,6 +143,17 @@ export default function PutObject(s3Router: S3Router) { fileSizeLimit: bucket.file_size_limit || undefined, }) + const decodedContentLength = ctx.req.headers['x-amz-decoded-content-length'] as + | string + | undefined + const rawContentLength = decodedContentLength ? Number(decodedContentLength) : contentLength + const resolvedContentLength = + rawContentLength !== undefined && + rawContentLength !== null && + Number.isFinite(Number(rawContentLength)) + ? Number(rawContentLength) + : undefined + return pipeline( uploadRequest.body, new ByteLimitTransformStream(uploadRequest.maxFileSize), @@ -147,6 +166,7 @@ export default function PutObject(s3Router: S3Router) { Key: key, CacheControl: uploadRequest.cacheControl, ContentType: uploadRequest.mimeType, + ContentLength: resolvedContentLength, Expires: req.Headers?.['expires'] ? new Date(req.Headers?.['expires']) : undefined, ContentEncoding: req.Headers?.['content-encoding'], Metadata: metadata, diff --git a/src/storage/backend/adapter.ts b/src/storage/backend/adapter.ts index a897283be..b4cb61e40 100644 --- a/src/storage/backend/adapter.ts +++ b/src/storage/backend/adapter.ts @@ -97,7 +97,8 @@ export abstract class StorageBackendAdapter { body: NodeJS.ReadableStream, contentType: string, cacheControl: string, - signal?: AbortSignal + signal?: AbortSignal, + contentLength?: number ): Promise { throw new Error('uploadObject not implemented') } diff --git a/src/storage/backend/file.ts b/src/storage/backend/file.ts index f2754b1e3..78f0eeb17 100644 --- a/src/storage/backend/file.ts +++ b/src/storage/backend/file.ts @@ -180,7 +180,9 @@ export class FileBackend implements StorageBackendAdapter { version: string | undefined, body: NodeJS.ReadableStream, contentType: string, - cacheControl: string + cacheControl: string, + signal?: AbortSignal, + contentLength?: number ): Promise { try { const file = this.resolveSecurePath(withOptionalVersion(`${bucketName}/${key}`, version)) diff --git a/src/storage/backend/s3/adapter.ts b/src/storage/backend/s3/adapter.ts index 5c8fc4736..9fc8ce3ba 100644 --- a/src/storage/backend/s3/adapter.ts +++ b/src/storage/backend/s3/adapter.ts @@ -11,6 +11,7 @@ import { HeadObjectCommand, ListObjectsV2Command, ListPartsCommand, + PutObjectCommand, S3Client, S3ClientConfig, UploadPartCommand, @@ -34,13 +35,14 @@ import { } from './../adapter' const { - storageS3UploadQueueSize, tracingFeatures, storageS3MaxSockets, tracingEnabled, storageS3DisableChecksum, } = getConfig() +const MAX_PUT_OBJECT_SIZE = 5 * 1024 * 1024 * 1024 // 5GB + export interface S3ClientOptions { endpoint?: string region?: string @@ -126,6 +128,7 @@ export class S3Backend implements StorageBackendAdapter { /** * Uploads and store an object + * Max 5GB * @param bucketName * @param key * @param version @@ -133,6 +136,7 @@ export class S3Backend implements StorageBackendAdapter { * @param contentType * @param cacheControl * @param signal + * @param contentLength */ async uploadObject( bucketName: string, @@ -141,17 +145,94 @@ export class S3Backend implements StorageBackendAdapter { body: Readable, contentType: string, cacheControl: string, - signal?: AbortSignal + signal?: AbortSignal, + contentLength?: number ): Promise { if (signal?.aborted) { throw ERRORS.Aborted('Upload was aborted') } + if (typeof contentLength !== 'number') { + // If content length is unknown, use streaming multipart upload which does not require buffering the entire stream in memory. + return this.bufferedMultipartUpload(bucketName, key, version, body, contentType, cacheControl, signal) + } + + if (contentLength > MAX_PUT_OBJECT_SIZE) { + throw ERRORS.EntityTooLarge(undefined) + } + + // Use PutObject directly when content-length is known and within S3's single-object limit (5GB). + // This avoids the buffering overhead of the Upload class which buffers each part in memory. + return this.putObject( + bucketName, + key, + version, + body, + contentType, + cacheControl, + signal, + contentLength + ) + } + + protected async putObject( + bucketName: string, + key: string, + version: string | undefined, + body: Readable, + contentType: string, + cacheControl: string, + signal?: AbortSignal, + contentLength?: number + ): Promise { + const dataStream = tracingFeatures?.upload ? monitorStream(body) : body + + const command = new PutObjectCommand({ + Bucket: bucketName, + Key: withOptionalVersion(key, version), + Body: dataStream, + ContentType: contentType, + CacheControl: cacheControl, + ContentLength: contentLength, + }) + + try { + const data = await this.client.send(command, { + abortSignal: signal, + }) + + return { + httpStatusCode: data.$metadata.httpStatusCode || 200, + cacheControl, + eTag: data.ETag || '', + mimetype: contentType, + contentLength: contentLength || 0, + lastModified: new Date(), + size: contentLength || 0, + contentRange: undefined, + } + } catch (err) { + if (err instanceof Error && err.name === 'AbortError') { + throw ERRORS.AbortedTerminate('Upload was aborted', err) + } + throw StorageBackendError.fromError(err) + } + } + + protected async bufferedMultipartUpload( + bucketName: string, + key: string, + version: string | undefined, + body: Readable, + contentType: string, + cacheControl: string, + signal?: AbortSignal + ): Promise { const dataStream = tracingFeatures?.upload ? monitorStream(body) : body const upload = new Upload({ client: this.client, - queueSize: storageS3UploadQueueSize, + queueSize: 1, params: { Bucket: bucketName, Key: withOptionalVersion(key, version), @@ -177,11 +258,8 @@ export class S3Backend implements StorageBackendAdapter { try { const data = await upload.done() - // Remove event listener to allow GC of upload and dataStream references upload.off('httpUploadProgress', progressHandler) - // Only call head for objects that are > 0 bytes - // for some reason headObject can take a really long time to resolve on zero byte uploads, this was causing requests to timeout const metadata = hasUploadedBytes ? await this.headObject(bucketName, key, version) : { diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index 10f22424d..7ff9b3b60 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -694,6 +694,7 @@ export class S3ProtocolHandler { body: command.Body as Readable, cacheControl: command.CacheControl!, mimeType: command.ContentType!, + contentLength: command.ContentLength, isTruncated: options.isTruncated, userMetadata: command.Metadata, }, diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 17f721560..181e76ef7 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -18,6 +18,7 @@ interface FileUpload { body: Readable mimeType: string cacheControl: string + contentLength?: number isTruncated: () => boolean xRobotsTag?: string userMetadata?: Record @@ -110,7 +111,8 @@ export class Uploader { file.body, file.mimeType, file.cacheControl, - request.signal + request.signal, + file.contentLength ) if (request.file.xRobotsTag) { @@ -320,6 +322,7 @@ export async function fileUploadFromRequest( let userMetadata: Record | undefined let mimeType: string let isTruncated: () => boolean + let fileContentLength: number | undefined let maxFileSize = 0 // When is an empty folder we restrict it to 0 bytes @@ -394,6 +397,9 @@ export async function fileUploadFromRequest( // Extract content-length value to avoid capturing entire request object in closure const contentLength = Number(request.headers['content-length']) + if (Number.isFinite(contentLength) && contentLength >= 0) { + fileContentLength = contentLength + } isTruncated = () => { // @todo more secure to get this from the stream or from s3 in the next step return contentLength > maxFileSize @@ -411,6 +417,7 @@ export async function fileUploadFromRequest( body, mimeType, cacheControl, + contentLength: fileContentLength, isTruncated, userMetadata, maxFileSize,