From 9f423acc124d811c94532be60401d27f320cd017 Mon Sep 17 00:00:00 2001 From: ferhat elmas Date: Thu, 12 Mar 2026 12:24:51 +0100 Subject: [PATCH] fix: s3 stream pipeline ordering Signed-off-by: ferhat elmas --- src/http/routes/s3/commands/put-object.ts | 48 ++++- src/http/routes/s3/commands/upload-part.ts | 4 +- src/storage/uploader.ts | 17 +- src/test/s3-protocol.test.ts | 226 ++++++++++++++++++++- src/test/uploader.test.ts | 25 +++ 5 files changed, 304 insertions(+), 16 deletions(-) create mode 100644 src/test/uploader.test.ts diff --git a/src/http/routes/s3/commands/put-object.ts b/src/http/routes/s3/commands/put-object.ts index 08b056c7a..b1695d9cd 100644 --- a/src/http/routes/s3/commands/put-object.ts +++ b/src/http/routes/s3/commands/put-object.ts @@ -3,7 +3,7 @@ import { ERRORS } from '@internal/errors' import { ByteLimitTransformStream } from '@storage/protocols/s3/byte-limit-stream' import { MAX_PART_SIZE, S3ProtocolHandler } from '@storage/protocols/s3/s3-handler' import { fileUploadFromRequest, getStandardMaxFileSizeLimit } from '@storage/uploader' -import stream, { PassThrough, Readable } from 'stream' +import stream, { Readable, Transform } from 'stream' import { pipeline } from 'stream/promises' import { ROUTE_OPERATIONS } from '../../operations' import { S3Router } from '../router' @@ -49,6 +49,36 @@ const PostFormInput = { }, } as const +type PipelineBody = NodeJS.ReadableStream +type PipelineHandlerInput = AsyncIterable + +function withReadableStreamHandler(handler: (fileStream: Readable) => Promise) { + return async (fileStream: PipelineHandlerInput) => { + // stream/promises exposes the final stream to the destination callback + // as a generic async iterable. In these handlers the upstream is always + // a Node readable, so narrow once here. + return handler(fileStream as Readable) + } +} + +function pipelineWithOptionalStreamingSignature( + body: PipelineBody, + limit: number, + streamingSignatureV4: Transform | undefined, + handler: (fileStream: Readable) => Promise +) { + if (streamingSignatureV4) { + return pipeline( + body, + streamingSignatureV4, + new ByteLimitTransformStream(limit), + withReadableStreamHandler(handler) + ) + } + + return pipeline(body, new ByteLimitTransformStream(limit), withReadableStreamHandler(handler)) +} + export default function PutObject(s3Router: S3Router) { s3Router.put( '/:Bucket/*', @@ -81,16 +111,16 @@ export default function PutObject(s3Router: S3Router) { throw ERRORS.InvalidParameter('internalIcebergBucketName') } - return pipeline( + return pipelineWithOptionalStreamingSignature( uploadRequest.body, - new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB limit for iceberg objects - ctx.req.streamingSignatureV4 || new PassThrough(), + MAX_PART_SIZE, + ctx.req.streamingSignatureV4, async (fileStream) => { const u = await ctx.req.storage.backend.uploadObject( icebergBucket, key, undefined, - fileStream as Readable, + fileStream, uploadRequest.mimeType, uploadRequest.cacheControl, ctx.signals.body @@ -135,14 +165,14 @@ export default function PutObject(s3Router: S3Router) { fileSizeLimit: bucket.file_size_limit || undefined, }) - return pipeline( + return pipelineWithOptionalStreamingSignature( uploadRequest.body, - new ByteLimitTransformStream(uploadRequest.maxFileSize), - ctx.req.streamingSignatureV4 || new PassThrough(), + uploadRequest.maxFileSize, + ctx.req.streamingSignatureV4, async (fileStream) => { return s3Protocol.putObject( { - Body: fileStream as Readable, + Body: fileStream, Bucket: req.Params.Bucket, Key: key, CacheControl: uploadRequest.cacheControl, diff --git a/src/http/routes/s3/commands/upload-part.ts b/src/http/routes/s3/commands/upload-part.ts index c6d90815d..d1fd34804 100644 --- a/src/http/routes/s3/commands/upload-part.ts +++ b/src/http/routes/s3/commands/upload-part.ts @@ -59,8 +59,8 @@ export default function UploadPart(s3Router: S3Router) { return pipeline( passThrough, - new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB max part size ctx.req.streamingSignatureV4, + new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB max part size async (body) => { const part = await ctx.req.storage.backend.uploadPart( icebergBucketName!, @@ -131,8 +131,8 @@ export default function UploadPart(s3Router: S3Router) { return pipeline( passThrough, - new ByteLimitTransformStream(MAX_PART_SIZE), ctx.req.streamingSignatureV4, + new ByteLimitTransformStream(MAX_PART_SIZE), async (body) => { return s3Protocol.uploadPart( { diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 17f721560..6190c8183 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -296,6 +296,18 @@ export function validateMimeType(mimeType: string, allowedMimeTypes: string[]) { throw ERRORS.InvalidMimeType(mimeType) } +function getKnownRequestContentLength(request: FastifyRequest): number | undefined { + const contentLengthHeader = + request.headers['x-amz-decoded-content-length'] ?? request.headers['content-length'] + const contentLength = Number(contentLengthHeader) + + if (!Number.isFinite(contentLength) || contentLength < 0) { + return undefined + } + + return contentLength +} + /** * Extracts the file information from the request * @param request @@ -392,11 +404,10 @@ export async function fileUploadFromRequest( userMetadata = parseUserMetadata(customMd) } - // Extract content-length value to avoid capturing entire request object in closure - const contentLength = Number(request.headers['content-length']) + const contentLength = getKnownRequestContentLength(request) isTruncated = () => { // @todo more secure to get this from the stream or from s3 in the next step - return contentLength > maxFileSize + return typeof contentLength === 'number' && contentLength > maxFileSize } } diff --git a/src/test/s3-protocol.test.ts b/src/test/s3-protocol.test.ts index dcb311969..f6b994e77 100644 --- a/src/test/s3-protocol.test.ts +++ b/src/test/s3-protocol.test.ts @@ -29,13 +29,16 @@ import { createPresignedPost } from '@aws-sdk/s3-presigned-post' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { wait } from '@internal/concurrency' import axios from 'axios' -import { randomUUID } from 'crypto' +import { createHash, createHmac, randomUUID } from 'crypto' import { FastifyInstance } from 'fastify' import { ReadableStreamBuffer } from 'stream-buffers' import app from '../app' import { getConfig, mergeConfig } from '../config' +import { SignatureV4, SignatureV4Service } from '../storage/protocols/s3' const { s3ProtocolAccessKeySecret, s3ProtocolAccessKeyId, storageS3Region } = getConfig() +const STREAMING_PAYLOAD_ALGORITHM = 'STREAMING-AWS4-HMAC-SHA256-PAYLOAD' +const EMPTY_SHA256_HASH = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' async function createBucket(client: S3Client, name?: string, publicRead = true) { let bucketName: string @@ -76,18 +79,170 @@ async function uploadFile( return await uploader.done() } +function formatAwsDate(date = new Date()) { + return date.toISOString().replace(/[:-]|\.\d{3}/g, '') +} + +function hmacSha256(key: string | Buffer, value: string) { + return createHmac('sha256', key).update(value).digest() +} + +function sha256Hex(value: Buffer) { + return createHash('sha256').update(value).digest('hex') +} + +function deriveSigningKey(secretKey: string, shortDate: string, region: string, service: string) { + const dateKey = hmacSha256(`AWS4${secretKey}`, shortDate) + const regionKey = hmacSha256(dateKey, region) + const serviceKey = hmacSha256(regionKey, service) + return hmacSha256(serviceKey, 'aws4_request') +} + +function createSignedChunk( + payload: Buffer, + previousSignature: string, + options: { + longDate: string + shortDate: string + region: string + service: string + secretKey: string + } +) { + const signingKey = deriveSigningKey( + options.secretKey, + options.shortDate, + options.region, + options.service + ) + const scope = `${options.shortDate}/${options.region}/${options.service}/aws4_request` + const chunkHash = sha256Hex(payload) + const stringToSign = [ + 'AWS4-HMAC-SHA256-PAYLOAD', + options.longDate, + scope, + previousSignature, + EMPTY_SHA256_HASH, + chunkHash, + ].join('\n') + const signature = createHmac('sha256', signingKey).update(stringToSign).digest('hex') + + return { + signature, + encoded: Buffer.concat([ + Buffer.from(`${payload.length.toString(16)};chunk-signature=${signature}\r\n`), + payload, + Buffer.from('\r\n'), + ]), + } +} + +async function sendAwsChunkedRequest(options: { + baseUrl: string + path: string + payload: Buffer + query?: Record +}) { + const longDate = formatAwsDate() + const shortDate = longDate.slice(0, 8) + const host = new URL(options.baseUrl).host + const signedHeaders = [ + 'host', + 'x-amz-content-sha256', + 'x-amz-date', + 'x-amz-decoded-content-length', + ] + const service = SignatureV4Service.S3 + const signer = new SignatureV4({ + enforceRegion: false, + credentials: { + accessKey: s3ProtocolAccessKeyId!, + secretKey: s3ProtocolAccessKeySecret!, + region: storageS3Region, + service, + }, + }) + const headers = { + host, + 'content-encoding': 'aws-chunked', + 'x-amz-content-sha256': STREAMING_PAYLOAD_ALGORITHM, + 'x-amz-date': longDate, + 'x-amz-decoded-content-length': options.payload.length.toString(), + } + const clientSignature = { + credentials: { + accessKey: s3ProtocolAccessKeyId!, + shortDate, + region: storageS3Region, + service, + }, + signedHeaders, + signature: '', + longDate, + contentSha: STREAMING_PAYLOAD_ALGORITHM, + } + const { signature } = await signer.sign(clientSignature, { + url: options.path, + method: 'PUT', + headers, + query: options.query, + }) + const chunk = createSignedChunk(options.payload, signature, { + longDate, + shortDate, + region: storageS3Region, + service, + secretKey: s3ProtocolAccessKeySecret!, + }) + const endChunk = createSignedChunk(Buffer.alloc(0), chunk.signature, { + longDate, + shortDate, + region: storageS3Region, + service, + secretKey: s3ProtocolAccessKeySecret!, + }) + const encodedBody = Buffer.concat([chunk.encoded, endChunk.encoded]) + const requestUrl = new URL(`${options.baseUrl}${options.path}`) + + if (options.query) { + for (const [key, value] of Object.entries(options.query)) { + requestUrl.searchParams.set(key, value) + } + } + + const response = await fetch(requestUrl, { + method: 'PUT', + headers: { + ...headers, + authorization: + `AWS4-HMAC-SHA256 Credential=${s3ProtocolAccessKeyId}/${shortDate}/` + + `${storageS3Region}/${service}/aws4_request, SignedHeaders=${signedHeaders.join(';')}, ` + + `Signature=${signature}`, + 'content-length': encodedBody.length.toString(), + }, + body: encodedBody, + }) + + return { + status: response.status, + data: await response.text(), + } +} + jest.setTimeout(10 * 1000) describe('S3 Protocol', () => { describe('Bucket', () => { let testApp: FastifyInstance let client: S3Client + let baseUrl: string beforeAll(async () => { testApp = app() const listener = await testApp.listen() + baseUrl = listener.replace('[::1]', 'localhost') client = new S3Client({ - endpoint: `${listener.replace('[::1]', 'localhost')}/s3`, + endpoint: `${baseUrl}/s3`, forcePathStyle: true, region: storageS3Region, credentials: { @@ -750,6 +905,33 @@ describe('S3 Protocol', () => { } }) + it('accepts aws-chunked putObject bodies when decoded size is within the limit', async () => { + const bucketName = await createBucket(client) + const key = 'test-aws-chunked-put-object.jpg' + const payload = Buffer.alloc(123, 1) + + mergeConfig({ + uploadFileSizeLimit: 150, + }) + + const response = await sendAwsChunkedRequest({ + baseUrl, + path: `/s3/${bucketName}/${key}`, + payload, + }) + + expect(response.status).toBe(200) + + const headObject = await client.send( + new HeadObjectCommand({ + Bucket: bucketName, + Key: key, + }) + ) + + expect(headObject.ContentLength).toBe(payload.length) + }) + it('will not allow uploading a file that exceeded the maxFileSize', async () => { const bucketName = await createBucket(client) @@ -828,6 +1010,46 @@ describe('S3 Protocol', () => { } }) + it('accepts aws-chunked uploadPart bodies when decoded size is within the limit', async () => { + const bucketName = await createBucket(client, 'chunked-part') + const payload = Buffer.alloc(123, 2) + + mergeConfig({ + uploadFileSizeLimit: 150, + }) + + const multipart = await client.send( + new CreateMultipartUploadCommand({ + Bucket: bucketName, + Key: 'test-aws-chunked-upload-part.jpg', + ContentType: 'image/jpg', + CacheControl: 'max-age=2000', + }) + ) + + const response = await sendAwsChunkedRequest({ + baseUrl, + path: `/s3/${bucketName}/test-aws-chunked-upload-part.jpg`, + payload, + query: { + uploadId: multipart.UploadId!, + partNumber: '1', + }, + }) + + expect(response.status).toBe(200) + + const listedParts = await client.send( + new ListPartsCommand({ + Bucket: bucketName, + Key: 'test-aws-chunked-upload-part.jpg', + UploadId: multipart.UploadId, + }) + ) + + expect(listedParts.Parts?.map((part) => part.PartNumber)).toEqual([1]) + }) + it('upload a file using multipart upload', async () => { const bucketName = await createBucket(client) diff --git a/src/test/uploader.test.ts b/src/test/uploader.test.ts new file mode 100644 index 000000000..3cc52a8dd --- /dev/null +++ b/src/test/uploader.test.ts @@ -0,0 +1,25 @@ +import { FastifyRequest } from 'fastify' +import { Readable } from 'stream' +import { fileUploadFromRequest } from '../storage/uploader' + +describe('fileUploadFromRequest', () => { + test('prefers x-amz-decoded-content-length for aws-chunked truncation checks', async () => { + const upload = await fileUploadFromRequest( + { + headers: { + 'content-type': 'application/octet-stream', + 'content-length': '177', + 'x-amz-decoded-content-length': '123', + }, + raw: Readable.from(['payload']), + tenantId: 'stub-tenant', + } as unknown as FastifyRequest, + { + objectName: 'test.txt', + fileSizeLimit: 150, + } + ) + + expect(upload.isTruncated()).toBe(false) + }) +})