diff --git a/migrations/tenant/0057-s3-multipart-uploads-metadata.sql b/migrations/tenant/0057-s3-multipart-uploads-metadata.sql new file mode 100644 index 000000000..ef9496bb8 --- /dev/null +++ b/migrations/tenant/0057-s3-multipart-uploads-metadata.sql @@ -0,0 +1 @@ +ALTER TABLE storage.s3_multipart_uploads ADD COLUMN IF NOT EXISTS metadata jsonb NULL; \ No newline at end of file diff --git a/src/http/routes/object/getSignedUploadURL.ts b/src/http/routes/object/getSignedUploadURL.ts index b84c113bf..0de648d0a 100644 --- a/src/http/routes/object/getSignedUploadURL.ts +++ b/src/http/routes/object/getSignedUploadURL.ts @@ -1,6 +1,7 @@ import { FastifyInstance } from 'fastify' import { FromSchema } from 'json-schema-to-ts' import { getConfig } from '../../../config' +import { parseUserMetadata } from '../../../storage/uploader' import { createDefaultSchema } from '../../routes-helper' import { AuthenticatedRequest } from '../../types' import { ROUTE_OPERATIONS } from '../operations' @@ -20,6 +21,9 @@ const getSignedUploadURLHeadersSchema = { type: 'object', properties: { 'x-upsert': { type: 'string' }, + 'x-metadata': { type: 'string' }, + 'content-type': { type: 'string' }, + 'content-length': { type: 'string' }, authorization: { type: 'string' }, }, required: ['authorization'], @@ -69,10 +73,29 @@ export default async function routes(fastify: FastifyInstance) { const urlPath = `${bucketName}/${objectName}` + let userMetadata: Record | undefined + + const customMd = request.headers['x-metadata'] + + if (typeof customMd === 'string') { + // TODO: parseUserMetadata casts to Record but values could be anything; + // validation should be added in a follow-up + userMetadata = parseUserMetadata(customMd) + } + + const contentType = request.headers['content-type'] + const contentLengthHeader = request.headers['content-length'] + const contentLength = contentLengthHeader ? Number(contentLengthHeader) : undefined + const signedUpload = await request.storage .from(bucketName) .signUploadObjectUrl(objectName, urlPath as string, uploadSignedUrlExpirationTime, owner, { upsert: request.headers['x-upsert'] === 'true', + userMetadata, + metadata: { + mimetype: contentType, + contentLength, + }, }) return response.status(200).send({ url: signedUpload.url, token: signedUpload.token }) diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index b75f88cab..efe5a3de6 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -137,7 +137,7 @@ function createTusServer( namingFunction, onUploadCreate: onCreate, onUploadFinish, - onIncomingRequest, + onIncomingRequest: (req, id) => onIncomingRequest(req, id, datastore), generateUrl, getFileIdFromRequest, onResponseError, diff --git a/src/http/routes/tus/lifecycle.ts b/src/http/routes/tus/lifecycle.ts index 6ab40b714..f28c274fa 100644 --- a/src/http/routes/tus/lifecycle.ts +++ b/src/http/routes/tus/lifecycle.ts @@ -3,7 +3,7 @@ import { ERRORS, isRenderableError } from '@internal/errors' import { UploadId } from '@storage/protocols/tus' import { Storage } from '@storage/storage' import { Uploader, validateMimeType } from '@storage/uploader' -import { Upload } from '@tus/server' +import { DataStore, Metadata, Upload } from '@tus/server' import { randomUUID } from 'crypto' import http from 'http' import { BaseLogger } from 'pino' @@ -44,7 +44,7 @@ export type MultiPartRequest = http.IncomingMessage & { /** * Runs on every TUS incoming request */ -export async function onIncomingRequest(rawReq: Request, id: string) { +export async function onIncomingRequest(rawReq: Request, id: string, datastore: DataStore) { const req = getNodeRequest(rawReq) const res = rawReq.node?.res as http.ServerResponse @@ -91,11 +91,53 @@ export async function onIncomingRequest(rawReq: Request, id: string) { req.upload.storage.location ) + let contentType: string | undefined + let contentLength: number | undefined + let rawMetadata: string | null | undefined + + if (req.method === 'POST') { + const uploadMetadataHeader = req.headers['upload-metadata'] + if (uploadMetadataHeader && typeof uploadMetadataHeader === 'string') { + try { + const parsedMetadata = Metadata.parse(uploadMetadataHeader) + contentType = parsedMetadata?.contentType ?? undefined + rawMetadata = parsedMetadata?.metadata + } catch (e) { + req.log.warn({ error: e }, 'Failed to parse upload metadata') + throw ERRORS.InvalidParameter('upload-metadata', { + error: e as Error, + message: 'Invalid Upload-Metadata header', + }) + } + } + const uploadLength = req.headers['upload-length'] + contentLength = uploadLength ? Number(uploadLength) : undefined + } else { + const upload = await datastore.getUpload(id) + contentType = upload.metadata?.contentType ?? undefined + contentLength = upload.size ?? undefined + rawMetadata = upload.metadata?.metadata + } + + let customMd: Record | undefined + if (rawMetadata) { + try { + customMd = JSON.parse(rawMetadata) + } catch (e) { + req.log.warn({ error: e }, 'Failed to parse user metadata') + } + } + await uploader.canUpload({ owner: req.upload.owner, bucketId: uploadID.bucket, objectName: uploadID.objectName, isUpsert, + userMetadata: customMd, + metadata: { + mimetype: contentType, + contentLength, + }, }) } diff --git a/src/internal/database/migrations/types.ts b/src/internal/database/migrations/types.ts index 35bd839c0..1abbd291c 100644 --- a/src/internal/database/migrations/types.ts +++ b/src/internal/database/migrations/types.ts @@ -56,4 +56,5 @@ export const DBMigration = { 'drop-index-object-level': 54, 'prevent-direct-deletes': 55, 'fix-optimized-search-function': 56, + 's3-multipart-uploads-metadata': 57, } diff --git a/src/storage/database/adapter.ts b/src/storage/database/adapter.ts index db190f483..d90b0beaf 100644 --- a/src/storage/database/adapter.ts +++ b/src/storage/database/adapter.ts @@ -195,7 +195,8 @@ export interface Database { version: string, signature: string, owner?: string, - metadata?: Record + userMetadata?: Record, + metadata?: Partial ): Promise findMultipartUpload( diff --git a/src/storage/database/knex.ts b/src/storage/database/knex.ts index 546657109..439159b88 100644 --- a/src/storage/database/knex.ts +++ b/src/storage/database/knex.ts @@ -912,22 +912,33 @@ export class StorageKnexDB implements Database { version: string, signature: string, owner?: string, - metadata?: Record + userMetadata?: Record, + metadata?: Partial ) { return this.runQuery('CreateMultipartUpload', async (knex, signal) => { + const data: Record = { + id: uploadId, + bucket_id: bucketId, + key: objectName, + version, + upload_signature: signature, + owner_id: owner, + user_metadata: userMetadata, + } + + // TODO: move this guard into normalizeColumns once it is table-aware. + // metadata was added to s3_multipart_uploads in migration 57 but has existed on + // objects since much earlier, so a table-agnostic rule would incorrectly strip it. + if ( + !this.latestMigration || + DBMigration[this.latestMigration] >= DBMigration['s3-multipart-uploads-metadata'] + ) { + data.metadata = metadata + } + const multipart = await knex .table('s3_multipart_uploads') - .insert( - this.normalizeColumns({ - id: uploadId, - bucket_id: bucketId, - key: objectName, - version, - upload_signature: signature, - owner_id: owner, - user_metadata: metadata, - }) - ) + .insert(this.normalizeColumns(data)) .returning('*') .abortOnSignal(signal) @@ -937,10 +948,18 @@ export class StorageKnexDB implements Database { async findMultipartUpload(uploadId: string, columns = 'id', options?: { forUpdate?: boolean }) { const multiPart = await this.runQuery('FindMultipartUpload', async (knex, signal) => { - const query = knex - .from('s3_multipart_uploads') - .select(columns.split(',')) - .where('id', uploadId) + // TODO: move this guard into normalizeColumns once it is table-aware. + // metadata was added to s3_multipart_uploads in migration 57 but has existed on + // objects since much earlier, so a table-agnostic rule would incorrectly strip it. + const hasMetadataColumn = + !this.latestMigration || + DBMigration[this.latestMigration] >= DBMigration['s3-multipart-uploads-metadata'] + + const cols = hasMetadataColumn + ? columns.split(',') + : columns.split(',').filter((col) => col.trim() !== 'metadata') + + const query = knex.from('s3_multipart_uploads').select(cols).where('id', uploadId) if (options?.forUpdate) { return query.abortOnSignal(signal).forUpdate().first() diff --git a/src/storage/object.ts b/src/storage/object.ts index 9a8781c50..1529919e5 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -17,7 +17,7 @@ import { ObjectUpdatedMetadata, } from './events' import { mustBeValidKey } from './limits' -import { fileUploadFromRequest, Uploader, UploadRequest } from './uploader' +import { CanUploadMetadata, fileUploadFromRequest, Uploader, UploadRequest } from './uploader' const { requestUrlLengthLimit } = getConfig() @@ -97,6 +97,7 @@ export class ObjectStorage { owner: file.owner, isUpsert: Boolean(file.isUpsert), signal: file.signal, + userMetadata: uploadRequest.userMetadata, }) } @@ -332,11 +333,15 @@ export class ObjectStorage { ...(fileMetadata || {}), } + const destinationUserMetadata = copyMetadata ? originObject.user_metadata : userMetadata + await this.uploader.canUpload({ bucketId: destinationBucket, objectName: destinationKey, owner, isUpsert: upsert, + userMetadata: destinationUserMetadata || undefined, + metadata: destinationMetadata, }) try { @@ -381,7 +386,7 @@ export class ObjectStorage { lastModified: copyResult.lastModified, eTag: copyResult.eTag, }, - user_metadata: copyMetadata ? originObject.user_metadata : userMetadata, + user_metadata: destinationUserMetadata, version: newVersion, }) @@ -790,7 +795,11 @@ export class ObjectStorage { url: string, expiresIn: number, owner?: string, - options?: { upsert?: boolean } + options?: { + upsert?: boolean + userMetadata?: Record + metadata?: CanUploadMetadata + } ) { // check if user has INSERT permissions await this.uploader.canUpload({ @@ -798,6 +807,8 @@ export class ObjectStorage { objectName, owner, isUpsert: options?.upsert ?? false, + userMetadata: options?.userMetadata, + metadata: options?.metadata, }) const { urlSigningKey } = await getJwtSecret(this.db.tenantId) diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index 10f22424d..42172c7da 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -413,6 +413,10 @@ export class S3ProtocolHandler { objectName: command.Key as string, isUpsert: true, owner: this.owner, + userMetadata: command.Metadata, + metadata: { + mimetype: command.ContentType, + }, }) const uploadId = await this.storage.backend.createMultiPartUpload( @@ -441,7 +445,8 @@ export class S3ProtocolHandler { version, signature, this.owner, - command.Metadata + command.Metadata, + { mimetype: command.ContentType } ) return { @@ -470,17 +475,19 @@ export class S3ProtocolHandler { throw ERRORS.InvalidUploadId() } + const multiPartUpload = await this.storage.db + .asSuperUser() + .findMultipartUpload(UploadId, 'id,version,user_metadata,metadata') + await uploader.canUpload({ bucketId: Bucket as string, objectName: Key as string, isUpsert: true, owner: this.owner, + userMetadata: multiPartUpload.user_metadata || undefined, + metadata: multiPartUpload.metadata || undefined, }) - const multiPartUpload = await this.storage.db - .asSuperUser() - .findMultipartUpload(UploadId, 'id,version,user_metadata') - const parts = command.MultipartUpload?.Parts || [] if (parts.length === 0) { @@ -578,11 +585,18 @@ export class S3ProtocolHandler { const maxFileSize = await getFileSizeLimit(this.storage.db.tenantId, bucket?.file_size_limit) const uploader = new Uploader(this.storage.backend, this.storage.db, this.storage.location) + + const multipartData = await this.storage.db + .asSuperUser() + .findMultipartUpload(UploadId, 'version,user_metadata,metadata') + await uploader.canUpload({ bucketId: Bucket as string, objectName: Key as string, owner: this.owner, isUpsert: true, + userMetadata: multipartData.user_metadata || undefined, + metadata: multipartData.metadata || undefined, }) const multipart = await this.shouldAllowPartUpload(UploadId, ContentLength, maxFileSize) @@ -695,9 +709,10 @@ export class S3ProtocolHandler { cacheControl: command.CacheControl!, mimeType: command.ContentType!, isTruncated: options.isTruncated, - userMetadata: command.Metadata, + contentLength: command.ContentLength, }, objectName: command.Key as string, + userMetadata: command.Metadata, owner: this.owner, isUpsert: true, uploadType: 's3', @@ -735,7 +750,7 @@ export class S3ProtocolHandler { const multipart = await this.storage.db .asSuperUser() - .findMultipartUpload(UploadId, 'id,version') + .findMultipartUpload(UploadId, 'id,version,user_metadata,metadata') const uploader = new Uploader(this.storage.backend, this.storage.db, this.storage.location) await uploader.canUpload({ @@ -743,6 +758,8 @@ export class S3ProtocolHandler { objectName: Key, owner: this.owner, isUpsert: true, + userMetadata: multipart.user_metadata || undefined, + metadata: multipart.metadata || undefined, }) await this.storage.backend.abortMultipartUpload( @@ -1233,13 +1250,6 @@ export class S3ProtocolHandler { const uploader = new Uploader(this.storage.backend, this.storage.db, this.storage.location) - await uploader.canUpload({ - bucketId: Bucket, - objectName: Key, - owner: this.owner, - isUpsert: true, - }) - const [destinationBucket] = await this.storage.db.asSuperUser().withTransaction(async (db) => { return Promise.all([ db.findBucketById(Bucket, 'file_size_limit'), @@ -1251,6 +1261,19 @@ export class S3ProtocolHandler { destinationBucket?.file_size_limit ) + const multipartData = await this.storage.db + .asSuperUser() + .findMultipartUpload(UploadId, 'version,user_metadata,metadata') + + await uploader.canUpload({ + bucketId: Bucket, + objectName: Key, + owner: this.owner, + isUpsert: true, + userMetadata: multipartData.user_metadata || undefined, + metadata: multipartData.metadata || undefined, + }) + const multipart = await this.shouldAllowPartUpload(UploadId, Number(copySize), maxFileSize) const uploadPart = await this.storage.backend.uploadPartCopy( @@ -1324,7 +1347,7 @@ export class S3ProtocolHandler { return this.storage.db.asSuperUser().withTransaction(async (db) => { const multipart = await db.findMultipartUpload( uploadId, - 'in_progress_size,version,upload_signature', + 'in_progress_size,version,upload_signature,user_metadata,metadata', { forUpdate: true, } diff --git a/src/storage/schemas/multipart.ts b/src/storage/schemas/multipart.ts index 1ca6eb296..3a1849ac8 100644 --- a/src/storage/schemas/multipart.ts +++ b/src/storage/schemas/multipart.ts @@ -15,6 +15,9 @@ export const multipartUploadSchema = { user_metadata: { anyOf: [{ type: 'object', additionalProperties: true }, { type: 'null' }], }, + metadata: { + anyOf: [{ type: 'object', additionalProperties: true }, { type: 'null' }], + }, }, required: [ 'id', diff --git a/src/storage/uploader.ts b/src/storage/uploader.ts index 8b185487e..7c5a4f4ed 100644 --- a/src/storage/uploader.ts +++ b/src/storage/uploader.ts @@ -20,19 +20,32 @@ interface FileUpload { cacheControl: string isTruncated: () => boolean xRobotsTag?: string - userMetadata?: Record + contentLength?: number } export interface UploadRequest { bucketId: string objectName: string file: FileUpload + userMetadata?: Record owner?: string isUpsert?: boolean uploadType?: 'standard' | 's3' | 'resumable' signal?: AbortSignal } +export type CanUploadMetadata = Partial> & + Record + +export interface CanUploadOptions { + bucketId: string + objectName: string + owner: string | undefined + isUpsert: boolean | undefined + userMetadata: Record | undefined + metadata: CanUploadMetadata | undefined +} + const MAX_CUSTOM_METADATA_SIZE = 1024 * 1024 /** @@ -46,7 +59,7 @@ export class Uploader { private readonly location: StorageObjectLocator ) {} - async canUpload(options: Pick) { + async canUpload(options: CanUploadOptions) { const shouldCreateObject = !options.isUpsert if (shouldCreateObject) { @@ -56,6 +69,8 @@ export class Uploader { name: options.objectName, version: '1', owner: options.owner, + metadata: options.metadata, + user_metadata: options.userMetadata, }) }) } else { @@ -65,6 +80,8 @@ export class Uploader { name: options.objectName, version: '1', owner: options.owner, + metadata: options.metadata, + user_metadata: options.userMetadata, }) }) } @@ -75,7 +92,7 @@ export class Uploader { * We check RLS policies before proceeding * @param options */ - async prepareUpload(options: Omit) { + async prepareUpload(options: CanUploadOptions & { uploadType?: string }) { await this.canUpload(options) fileUploadStarted.add(1, { uploadType: options.uploadType, @@ -92,7 +109,15 @@ export class Uploader { * @param options */ async upload(request: UploadRequest) { - const version = await this.prepareUpload(request) + const version = await this.prepareUpload({ + bucketId: request.bucketId, + objectName: request.objectName, + owner: request.owner, + isUpsert: request.isUpsert, + userMetadata: request.userMetadata, + metadata: { mimetype: request.file.mimeType, contentLength: request.file.contentLength }, + uploadType: request.uploadType, + }) try { const file = request.file @@ -125,7 +150,7 @@ export class Uploader { ...request, version, objectMetadata, - userMetadata: { ...file.userMetadata }, + userMetadata: { ...request.userMetadata }, }) } catch (e) { await ObjectAdminDelete.send({ @@ -323,7 +348,14 @@ export async function fileUploadFromRequest( allowedMimeTypes?: string[] objectName: string } -): Promise { +): Promise< + FileUpload & { + mimeType: string + maxFileSize: number + userMetadata: Record | undefined + contentLength: number | undefined + } +> { const contentType = request.headers['content-type'] const xRobotsTag = request.headers['x-robots-tag'] as string | undefined @@ -421,6 +453,10 @@ export async function fileUploadFromRequest( throw ERRORS.NoContentProvided(new Error('Request stream closed before upload could begin')) } + const contentLength = request.headers['content-length'] + ? Number(request.headers['content-length']) + : undefined + return { body, mimeType, @@ -429,6 +465,7 @@ export async function fileUploadFromRequest( userMetadata, maxFileSize, xRobotsTag, + contentLength, } } diff --git a/src/test/cdn.test.ts b/src/test/cdn.test.ts index cee987215..c25b64b72 100644 --- a/src/test/cdn.test.ts +++ b/src/test/cdn.test.ts @@ -94,12 +94,12 @@ describe('CDN Cache Manager', () => { await storageHook.storage.from(bucketName).uploadNewObject({ isUpsert: true, objectName, + userMetadata: {}, file: { body: Readable.from(Buffer.from('test')), cacheControl: 'public, max-age=31536000', mimeType: 'text/plain', isTruncated: () => false, - userMetadata: {}, }, }) diff --git a/src/test/rls.test.ts b/src/test/rls.test.ts index 4f572d58e..15f246971 100644 --- a/src/test/rls.test.ts +++ b/src/test/rls.test.ts @@ -1,4 +1,11 @@ -import { CreateBucketCommand, S3Client } from '@aws-sdk/client-s3' +import { + CompleteMultipartUploadCommand, + CreateBucketCommand, + CreateMultipartUploadCommand, + S3Client, + S3ServiceException, + UploadPartCommand, +} from '@aws-sdk/client-s3' import { signJWT } from '@internal/auth' import { wait } from '@internal/concurrency' import { getPostgresConnection, getServiceKeyUser } from '@internal/database' @@ -13,6 +20,8 @@ import yaml from 'js-yaml' import { Knex, knex } from 'knex' import Mustache from 'mustache' import path from 'path' +import * as tus from 'tus-js-client' +import { DetailedError } from 'tus-js-client' import app from '../app' import { getConfig } from '../config' import { Storage } from '../storage' @@ -39,6 +48,9 @@ interface TestCaseAssert { operation: | 'upload' | 'upload.upsert' + | 'upload.tus' + | 'upload.signed' + | 'upload.s3.multipart' | 'bucket.create' | 'bucket.get' | 'bucket.list' @@ -55,6 +67,11 @@ interface TestCaseAssert { useExistingBucketName?: string role?: string policies?: string[] + userMetadata?: Record + mimeType?: string + contentLength?: number + copyMetadata?: boolean + destinationObjectName?: string status: number error?: string } @@ -68,8 +85,16 @@ const testSpec = yaml.load( fs.readFileSync(path.resolve(__dirname, 'rls_tests.yaml'), 'utf8') ) as RlsTestSpec -const { serviceKeyAsync, tenantId, jwtSecret, databaseURL, storageS3Bucket, storageBackendType } = - getConfig() +const { + serviceKeyAsync, + anonKeyAsync, + tenantId, + jwtSecret, + databaseURL, + storageS3Bucket, + storageBackendType, + storageS3Region, +} = getConfig() const backend = createStorageBackend(storageBackendType) const client = backend.client let appInstance: FastifyInstance @@ -233,6 +258,11 @@ describe('RLS policies', () => { bucket: bucketName, objectName, jwt: assert.role === 'service' ? await serviceKeyAsync : jwt, + userMetadata: assert.userMetadata, + mimeType: assert.mimeType, + contentLength: assert.contentLength, + copyMetadata: assert.copyMetadata, + destinationObjectName: assert.destinationObjectName, }) console.log( @@ -251,8 +281,7 @@ describe('RLS policies', () => { } if (assert.error) { - const body = await response.json() - + const body = response.json() expect(body.message).toBe(assert.error) } } finally { @@ -294,15 +323,39 @@ describe('RLS policies', () => { async function runOperation( operation: TestCaseAssert['operation'], - options: { bucket: string; jwt: string; objectName: string } + options: { + bucket: string + jwt: string + objectName: string + userMetadata?: Record + mimeType?: string + contentLength?: number + copyMetadata?: boolean + destinationObjectName?: string + } ) { - const { jwt, bucket, objectName } = options + const { + jwt, + bucket, + objectName, + userMetadata, + mimeType, + contentLength, + copyMetadata, + destinationObjectName, + } = options switch (operation) { case 'upload': - return uploadFile(bucket, objectName, jwt) + return uploadFile(bucket, objectName, jwt, false, userMetadata, mimeType, contentLength) case 'upload.upsert': - return uploadFile(bucket, objectName, jwt, true) + return uploadFile(bucket, objectName, jwt, true, userMetadata, mimeType, contentLength) + case 'upload.tus': + return tusUploadFile(bucket, objectName, jwt, userMetadata, mimeType, contentLength) + case 'upload.signed': + return signUploadUrl(bucket, objectName, jwt, userMetadata) + case 'upload.s3.multipart': + return s3MultipartUpload(bucket, objectName, jwt, userMetadata) case 'bucket.list': return appInstance.inject({ method: 'GET', @@ -400,11 +453,15 @@ async function runOperation( url: `/object/copy`, headers: { authorization: `Bearer ${jwt}`, + ...(userMetadata + ? { 'x-metadata': Buffer.from(JSON.stringify(userMetadata)).toString('base64') } + : {}), }, payload: { bucketId: bucket, sourceKey: objectName, - destinationKey: 'copied_' + objectName, + destinationKey: destinationObjectName ?? 'copied_' + objectName, + copyMetadata: copyMetadata ?? true, }, }) default: @@ -454,13 +511,31 @@ async function createPolicy(db: Knex, policy: Policy) { return Promise.all(created) } -async function uploadFile(bucket: string, fileName: string, jwt: string, upsert?: boolean) { +async function uploadFile( + bucket: string, + fileName: string, + jwt: string, + upsert?: boolean, + userMetadata?: Record, + mimeType?: string, + contentLength?: number +) { const testFile = fs.createReadStream(path.resolve(__dirname, 'assets', 'sadcat.jpg')) const form = new FormData() form.append('file', testFile) + + if (userMetadata) { + form.append('metadata', JSON.stringify(userMetadata)) + } + + if (mimeType) { + form.append('contentType', mimeType) + } + const headers = Object.assign({}, form.getHeaders(), { authorization: `Bearer ${jwt}`, ...(upsert ? { 'x-upsert': 'true' } : {}), + ...(contentLength ? { 'content-length': contentLength.toString() } : {}), }) return appInstance.inject({ @@ -470,3 +545,158 @@ async function uploadFile(bucket: string, fileName: string, jwt: string, upsert? payload: form, }) } + +async function tusUploadFile( + bucket: string, + objectName: string, + jwt: string, + userMetadata?: Record, + mimeType?: string, + contentLength?: number +) { + if (!appInstance.server.listening) { + await appInstance.listen({ port: 0 }) + } + + const addressInfo = appInstance.server.address() + if (!addressInfo || typeof addressInfo === 'string') { + throw new Error('Unable to resolve local server address') + } + + const localServerAddress = `http://127.0.0.1:${addressInfo.port}` + + const file = fs.createReadStream(path.resolve(__dirname, 'assets', 'sadcat.jpg')) + + let statusCode = 200 + let message = '' + + try { + await new Promise((resolve, reject) => { + const upload = new tus.Upload(file, { + endpoint: `${localServerAddress}/upload/resumable`, + uploadSize: contentLength || undefined, + onShouldRetry: () => false, + uploadDataDuringCreation: false, + headers: { + authorization: `Bearer ${jwt}`, + }, + metadata: { + bucketName: bucket, + objectName, + contentType: mimeType || 'application/octet-stream', + cacheControl: '3600', + ...(userMetadata ? { metadata: JSON.stringify(userMetadata) } : {}), + }, + onError(error) { + console.log('Failed because: ' + error) + reject(error) + }, + onSuccess: () => { + resolve(true) + }, + }) + + upload.start() + }) + } catch (e) { + if (!(e instanceof DetailedError)) throw e + + statusCode = e.originalResponse.getStatus() + message = e.originalResponse.getBody() + } + + const body = message ? { message } : {} + return { statusCode, body: JSON.stringify(body), json: () => body } +} + +async function signUploadUrl( + bucket: string, + objectName: string, + jwt: string, + userMetadata?: Record +) { + const metadata = userMetadata + ? Buffer.from(JSON.stringify(userMetadata)).toString('base64') + : undefined + + return appInstance.inject({ + method: 'POST', + url: `/object/upload/sign/${bucket}/${objectName}`, + headers: { + authorization: `Bearer ${jwt}`, + ...(metadata ? { 'x-metadata': metadata } : {}), + }, + }) +} + +async function s3MultipartUpload( + bucket: string, + objectName: string, + jwt: string, + userMetadata?: Record +) { + if (!appInstance.server.listening) { + await appInstance.listen({ port: 0 }) + } + + const listener = appInstance.server.address() as { port: number } + const anonKey = await anonKeyAsync + const s3Client = new S3Client({ + endpoint: `http://127.0.0.1:${listener.port}/s3`, + forcePathStyle: true, + region: storageS3Region, + credentials: { + accessKeyId: tenantId, + secretAccessKey: anonKey, + sessionToken: jwt, + }, + }) + + let statusCode = 200 + let message = '' + + try { + const s3Metadata = userMetadata + ? Object.fromEntries(Object.entries(userMetadata).map(([k, v]) => [k, String(v)])) + : undefined + + const createResp = await s3Client.send( + new CreateMultipartUploadCommand({ + Bucket: bucket, + Key: objectName, + ContentType: 'image/jpg', + ...(s3Metadata ? { Metadata: s3Metadata } : {}), + }) + ) + + const data = Buffer.alloc(5 * 1024) + const partResp = await s3Client.send( + new UploadPartCommand({ + Bucket: bucket, + Key: objectName, + UploadId: createResp.UploadId, + PartNumber: 1, + Body: data, + ContentLength: data.length, + }) + ) + + await s3Client.send( + new CompleteMultipartUploadCommand({ + Bucket: bucket, + Key: objectName, + UploadId: createResp.UploadId, + MultipartUpload: { Parts: [{ PartNumber: 1, ETag: partResp.ETag }] }, + }) + ) + } catch (e: unknown) { + if (!(e instanceof S3ServiceException)) throw e + + statusCode = e.$metadata.httpStatusCode ?? 400 + message = e.message + } finally { + s3Client.destroy() + } + const body = message ? { message } : {} + return { statusCode, body: JSON.stringify(body), json: () => body } +} diff --git a/src/test/rls_tests.yaml b/src/test/rls_tests.yaml index 2581bc9d0..2cffb64f3 100644 --- a/src/test/rls_tests.yaml +++ b/src/test/rls_tests.yaml @@ -47,6 +47,24 @@ policies: permissions: ["delete"] content: "USING(owner = '{{uid}}')" + - name: insert_with_metadata_check + tables: ["storage.objects"] + roles: ["authenticated"] + permissions: ["insert"] + content: "WITH CHECK(user_metadata->>'department' = 'engineering')" + + - name: insert_only_images + tables: ["storage.objects"] + roles: ["authenticated"] + permissions: ["insert"] + content: "WITH CHECK(metadata->>'mimetype' LIKE 'image/%')" + + - name: insert_max_size_limit + tables: ["storage.objects"] + roles: ["authenticated"] + permissions: ["insert"] + content: "WITH CHECK((metadata->>'contentLength')::int <= 100000)" + tests: - description: "Will only able to read objects" policies: @@ -475,3 +493,128 @@ tests: - operation: bucket.delete status: 400 error: "Bucket not found" + + - description: "Will only upload files with correct user metadata" + policies: + - insert_with_metadata_check + - read_only_all_objects + asserts: + - operation: upload + objectName: "test_file.jpg" + userMetadata: + department: "engineering" + status: 200 + + - operation: upload + status: 400 + error: "new row violates row-level security policy" + + - operation: upload.tus + objectName: "test_file_tus.jpg" + userMetadata: + department: "engineering" + status: 200 + + - operation: upload.tus + status: 403 + error: "new row violates row-level security policy" + + - operation: upload + objectName: "source_copy_meta.jpg" + userMetadata: + department: "engineering" + status: 200 + + - operation: object.copy + objectName: "source_copy_meta.jpg" + destinationObjectName: "copied_source_copy_meta_1.jpg" + copyMetadata: true + status: 200 + + - operation: object.copy + objectName: "source_copy_meta.jpg" + destinationObjectName: "copied_source_copy_meta_2.jpg" + copyMetadata: false + userMetadata: + department: "engineering" + status: 200 + + - operation: object.copy + objectName: "source_copy_meta.jpg" + destinationObjectName: "copied_source_copy_meta_3.jpg" + copyMetadata: false + userMetadata: + department: "marketing" + status: 400 + error: "new row violates row-level security policy" + + - operation: upload.signed + objectName: "signed_file.jpg" + userMetadata: + department: "engineering" + status: 200 + + - operation: upload.signed + objectName: "signed_file_no_meta.jpg" + status: 400 + error: "new row violates row-level security policy" + + - operation: upload.s3.multipart + objectName: "s3_multi_file.jpg" + userMetadata: + department: "engineering" + status: 200 + + - operation: upload.s3.multipart + objectName: "s3_multi_file_no_meta.jpg" + status: 403 + + - description: "Will only upload image files based on mimetype" + policies: + - insert_only_images + asserts: + - operation: upload + objectName: "test_image.jpg" + mimeType: "image/jpeg" + status: 200 + + - operation: upload + objectName: "test_file.txt" + mimeType: "text/plain" + status: 400 + error: "new row violates row-level security policy" + + - operation: upload.tus + objectName: "test_image_tus.jpg" + mimeType: "image/jpeg" + status: 200 + + - operation: upload.tus + objectName: "test_file_tus.txt" + mimeType: "text/plain" + status: 403 + error: "new row violates row-level security policy" + + - description: "Will only upload files under size limit based on contentLength" + policies: + - insert_max_size_limit + asserts: + - operation: upload + objectName: "small_file.jpg" + status: 200 + + - operation: upload + objectName: "large_file.jpg" + contentLength: 200000 + status: 400 + error: "new row violates row-level security policy" + + - operation: upload.tus + objectName: "small_file_tus.jpg" + status: 200 + + - operation: upload.tus + objectName: "large_file_tus.jpg" + contentLength: 200000 + status: 403 + error: "new row violates row-level security policy" diff --git a/src/test/s3-protocol.test.ts b/src/test/s3-protocol.test.ts index f6b994e77..346bcf702 100644 --- a/src/test/s3-protocol.test.ts +++ b/src/test/s3-protocol.test.ts @@ -28,6 +28,11 @@ import { Upload } from '@aws-sdk/lib-storage' import { createPresignedPost } from '@aws-sdk/s3-presigned-post' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' import { wait } from '@internal/concurrency' +import { getPostgresConnection, getServiceKeyUser, TenantConnection } from '@internal/database' +import { DBMigration } from '@internal/database/migrations' +import { ERRORS } from '@internal/errors' +import { StorageKnexDB } from '@storage/database' +import { Uploader } from '@storage/uploader' import axios from 'axios' import { createHash, createHmac, randomUUID } from 'crypto' import { FastifyInstance } from 'fastify' @@ -1067,6 +1072,78 @@ describe('S3 Protocol', () => { expect(resp.$metadata).toBeTruthy() }) + + it('does not mutate in_progress_size when canUpload (RLS) fails', async () => { + /* + Calling shouldAllowPartUpload mutates the in_progress_size so we have to ensure + canUpload is called beforehand or else it can cause issues for valid uploads. + + This test sets the fileSizeLimit to 10kb and each part at 5kb. It simulates + first request successful, second failed, third passes. If the in_progress_size + was mutated on the second request it would be at 10kb causing the third to fail. + */ + const bucketName = await createBucket(client) + const key = 'rls-ordering-test.jpg' + const partSize = 1024 * 5 + + mergeConfig({ uploadFileSizeLimit: partSize * 2 }) + + const createResp = await client.send( + new CreateMultipartUploadCommand({ + Bucket: bucketName, + Key: key, + ContentType: 'image/jpg', + }) + ) + expect(createResp.UploadId).toBeTruthy() + const uploadId = createResp.UploadId + + const part1Resp = await client.send( + new UploadPartCommand({ + Bucket: bucketName, + Key: key, + UploadId: uploadId, + PartNumber: 1, + Body: Buffer.alloc(partSize), + ContentLength: partSize, + }) + ) + expect(part1Resp.ETag).toBeTruthy() + + const canUploadSpy = jest + .spyOn(Uploader.prototype, 'canUpload') + .mockRejectedValueOnce(ERRORS.AccessDenied('upload')) + + try { + await client.send( + new UploadPartCommand({ + Bucket: bucketName, + Key: key, + UploadId: uploadId, + PartNumber: 2, + Body: Buffer.alloc(partSize), + ContentLength: partSize, + }) + ) + throw new Error('Should not reach here') + } catch (e) { + expect((e as Error).message).not.toEqual('Should not reach here') + } finally { + canUploadSpy.mockRestore() + } + + const part2Resp = await client.send( + new UploadPartCommand({ + Bucket: bucketName, + Key: key, + UploadId: uploadId, + PartNumber: 2, + Body: Buffer.alloc(partSize), + ContentLength: partSize, + }) + ) + expect(part2Resp.ETag).toBeTruthy() + }) }) describe('GetObject', () => { @@ -2022,3 +2099,130 @@ describe('S3 Protocol', () => { }) }) }) + +describe('Migration compatibility', () => { + describe('integration', () => { + const { tenantId } = getConfig() + let connection: TenantConnection + let bucketId: string + + beforeAll(async () => { + const adminUser = await getServiceKeyUser(tenantId) + connection = await getPostgresConnection({ + tenantId, + user: adminUser, + superUser: adminUser, + host: 'localhost', + }) + + bucketId = randomUUID() + const db = new StorageKnexDB(connection, { tenantId, host: 'localhost' }) + await db.createBucket({ id: bucketId, name: `migration-test-${bucketId}`, public: false }) + }) + + afterAll(async () => { + const db = new StorageKnexDB(connection, { tenantId, host: 'localhost' }) + await db.deleteBucket(bucketId) + await connection.dispose() + }) + + const makeDB = (latestMigration?: keyof typeof DBMigration) => + new StorageKnexDB(connection, { tenantId, host: 'localhost', latestMigration }) + + describe('createMultipartUpload', () => { + it('does not store metadata when latestMigration is before s3-multipart-uploads-metadata', async () => { + const db = makeDB('fix-optimized-search-function') // migration 56 + const uploadId = randomUUID() + try { + const result = await db.createMultipartUpload( + uploadId, + bucketId, + 'test-pre-migration.txt', + randomUUID(), + 'sig', + undefined, + undefined, + { + cacheControl: 'no-cache', + contentLength: 0, + size: 0, + mimetype: 'text/plain', + eTag: 'abc', + } + ) + expect(result.metadata).toBeNull() + } finally { + await makeDB().deleteMultipartUpload(uploadId) + } + }) + + it('stores metadata when latestMigration is s3-multipart-uploads-metadata', async () => { + const db = makeDB('s3-multipart-uploads-metadata') // migration 57 + const uploadId = randomUUID() + const metadata = { + cacheControl: 'no-cache', + contentLength: 0, + size: 0, + mimetype: 'text/plain', + eTag: 'abc', + } + try { + const result = await db.createMultipartUpload( + uploadId, + bucketId, + 'test-post-migration.txt', + randomUUID(), + 'sig', + undefined, + undefined, + metadata + ) + expect(result.metadata).toEqual(metadata) + } finally { + await makeDB().deleteMultipartUpload(uploadId) + } + }) + }) + + describe('findMultipartUpload', () => { + let uploadId: string + + beforeAll(async () => { + uploadId = randomUUID() + const db = makeDB('s3-multipart-uploads-metadata') + await db.createMultipartUpload( + uploadId, + bucketId, + 'test-find.txt', + randomUUID(), + 'sig', + undefined, + undefined, + { + cacheControl: 'no-cache', + contentLength: 0, + size: 0, + mimetype: 'text/plain', + eTag: 'abc', + } + ) + }) + + afterAll(async () => { + await makeDB().deleteMultipartUpload(uploadId) + }) + + it('excludes metadata from result when latestMigration is before s3-multipart-uploads-metadata', async () => { + const db = makeDB('fix-optimized-search-function') // migration 56 + const result = await db.findMultipartUpload(uploadId, 'id,version,metadata') + expect(result).not.toHaveProperty('metadata') + }) + + it('includes metadata in result when latestMigration is s3-multipart-uploads-metadata', async () => { + const db = makeDB('s3-multipart-uploads-metadata') // migration 57 + const result = await db.findMultipartUpload(uploadId, 'id,version,metadata') + expect(result).toHaveProperty('metadata') + }) + }) + }) +}) diff --git a/src/test/scanner.test.ts b/src/test/scanner.test.ts index dd3ebf266..db2c5a5ee 100644 --- a/src/test/scanner.test.ts +++ b/src/test/scanner.test.ts @@ -28,7 +28,6 @@ describe('ObjectScanner', () => { body: Readable.from(Buffer.from('test')), mimeType: 'text/plain', cacheControl: 'no-cache', - userMetadata: {}, isTruncated: () => false, }, }) @@ -95,7 +94,6 @@ describe('ObjectScanner', () => { body: Readable.from(Buffer.from('test')), mimeType: 'text/plain', cacheControl: 'no-cache', - userMetadata: {}, isTruncated: () => false, }, })