Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions src/http/routes/s3/commands/put-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ERRORS } from '@internal/errors'
import { ByteLimitTransformStream } from '@storage/protocols/s3/byte-limit-stream'
import { MAX_PART_SIZE, S3ProtocolHandler } from '@storage/protocols/s3/s3-handler'
import { fileUploadFromRequest, getStandardMaxFileSizeLimit } from '@storage/uploader'
import stream, { PassThrough, Readable } from 'stream'
import stream, { Readable, Transform } from 'stream'
import { pipeline } from 'stream/promises'
import { ROUTE_OPERATIONS } from '../../operations'
import { S3Router } from '../router'
Expand Down Expand Up @@ -49,6 +49,36 @@ const PostFormInput = {
},
} as const

type PipelineBody = NodeJS.ReadableStream
type PipelineHandlerInput = AsyncIterable<unknown>

function withReadableStreamHandler<T>(handler: (fileStream: Readable) => Promise<T>) {
return async (fileStream: PipelineHandlerInput) => {
// stream/promises exposes the final stream to the destination callback
// as a generic async iterable. In these handlers the upstream is always
// a Node readable, so narrow once here.
return handler(fileStream as Readable)
}
}

function pipelineWithOptionalStreamingSignature<T>(
body: PipelineBody,
limit: number,
streamingSignatureV4: Transform | undefined,
handler: (fileStream: Readable) => Promise<T>
) {
if (streamingSignatureV4) {
return pipeline(
body,
streamingSignatureV4,
new ByteLimitTransformStream(limit),
withReadableStreamHandler(handler)
)
}

return pipeline(body, new ByteLimitTransformStream(limit), withReadableStreamHandler(handler))
}

export default function PutObject(s3Router: S3Router) {
s3Router.put(
'/:Bucket/*',
Expand Down Expand Up @@ -81,16 +111,16 @@ export default function PutObject(s3Router: S3Router) {
throw ERRORS.InvalidParameter('internalIcebergBucketName')
}

return pipeline(
return pipelineWithOptionalStreamingSignature(
uploadRequest.body,
new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB limit for iceberg objects
ctx.req.streamingSignatureV4 || new PassThrough(),
MAX_PART_SIZE,
ctx.req.streamingSignatureV4,
async (fileStream) => {
const u = await ctx.req.storage.backend.uploadObject(
icebergBucket,
key,
undefined,
fileStream as Readable,
fileStream,
uploadRequest.mimeType,
uploadRequest.cacheControl,
ctx.signals.body
Expand Down Expand Up @@ -135,14 +165,14 @@ export default function PutObject(s3Router: S3Router) {
fileSizeLimit: bucket.file_size_limit || undefined,
})

return pipeline(
return pipelineWithOptionalStreamingSignature(
uploadRequest.body,
new ByteLimitTransformStream(uploadRequest.maxFileSize),
ctx.req.streamingSignatureV4 || new PassThrough(),
uploadRequest.maxFileSize,
ctx.req.streamingSignatureV4,
async (fileStream) => {
return s3Protocol.putObject(
{
Body: fileStream as Readable,
Body: fileStream,
Bucket: req.Params.Bucket,
Key: key,
CacheControl: uploadRequest.cacheControl,
Expand Down
4 changes: 2 additions & 2 deletions src/http/routes/s3/commands/upload-part.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ export default function UploadPart(s3Router: S3Router) {

return pipeline(
passThrough,
new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB max part size
ctx.req.streamingSignatureV4,
new ByteLimitTransformStream(MAX_PART_SIZE), // 5GB max part size
async (body) => {
const part = await ctx.req.storage.backend.uploadPart(
icebergBucketName!,
Expand Down Expand Up @@ -131,8 +131,8 @@ export default function UploadPart(s3Router: S3Router) {

return pipeline(
passThrough,
new ByteLimitTransformStream(MAX_PART_SIZE),
ctx.req.streamingSignatureV4,
new ByteLimitTransformStream(MAX_PART_SIZE),
async (body) => {
return s3Protocol.uploadPart(
{
Expand Down
17 changes: 14 additions & 3 deletions src/storage/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,18 @@ export function validateMimeType(mimeType: string, allowedMimeTypes: string[]) {
throw ERRORS.InvalidMimeType(mimeType)
}

function getKnownRequestContentLength(request: FastifyRequest): number | undefined {
const contentLengthHeader =
request.headers['x-amz-decoded-content-length'] ?? request.headers['content-length']
const contentLength = Number(contentLengthHeader)

if (!Number.isFinite(contentLength) || contentLength < 0) {
return undefined
}

return contentLength
}

/**
* Extracts the file information from the request
* @param request
Expand Down Expand Up @@ -392,11 +404,10 @@ export async function fileUploadFromRequest(
userMetadata = parseUserMetadata(customMd)
}

// Extract content-length value to avoid capturing entire request object in closure
const contentLength = Number(request.headers['content-length'])
const contentLength = getKnownRequestContentLength(request)
isTruncated = () => {
// @todo more secure to get this from the stream or from s3 in the next step
return contentLength > maxFileSize
return typeof contentLength === 'number' && contentLength > maxFileSize
}
}

Expand Down
Loading