From 806100b5929eb4d3bb4fffc886dc0d0409199cff Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Wed, 1 Jul 2026 15:07:02 +0000 Subject: [PATCH 1/2] feat: Detect missing TLS configuration. Signed-off-by: Paolo Insogna --- src/network/connection.ts | 42 ++++++++++++++++++++++++++- test/network/connection.test.ts | 50 ++++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/src/network/connection.ts b/src/network/connection.ts index 2149b197..6423c884 100644 --- a/src/network/connection.ts +++ b/src/network/connection.ts @@ -144,6 +144,7 @@ export class Connection extends TypedEventEmitter { #responseReader: Reader #socket!: Socket #socketMustBeDrained: boolean + #detectMissingTLS: boolean #reauthenticationTimeout!: NodeJS.Timeout constructor (clientId?: string, options: ConnectionOptions = {}) { @@ -171,6 +172,7 @@ export class Connection extends TypedEventEmitter { this.#responseBuffer = new DynamicBuffer() this.#responseReader = new Reader(this.#responseBuffer) this.#socketMustBeDrained = false + this.#detectMissingTLS = !this.#options.tls notifyCreation('connection', this) } @@ -232,6 +234,7 @@ export class Connection extends TypedEventEmitter { } this.#status = ConnectionStatuses.CONNECTING + this.#detectMissingTLS = !this.#options.tls const connectionOptions: Partial = { timeout: this.#options.connectTimeout @@ -425,7 +428,7 @@ export class Connection extends TypedEventEmitter { callback: Callback ) { // Correlation ID is a 32-bit integer in the protocol, so we need to wrap around after 2^31 - 1 - const correlationId = (this.#correlationId + 1) & 0x7FFFFFFF + const correlationId = (this.#correlationId + 1) & 0x7fffffff this.#correlationId = correlationId const diagnostic = createDiagnosticContext({ @@ -720,6 +723,34 @@ export class Connection extends TypedEventEmitter { correlation_id => INT32 */ #onData (chunk: Buffer): void { + if (this.#detectMissingTLS) { + this.#detectMissingTLS = false + + if (this.#isTLSRecord(chunk)) { + const error = new UserError('Broker requires TLS.') + this.#status = ConnectionStatuses.ERROR + + for (const request of this.#afterDrainRequests) { + if (!request.noResponse) { + clearTimeout(request.timeoutHandle!) + request.callback(error, undefined) + } + } + + this.#afterDrainRequests = [] + + for (const request of this.#inflightRequests.values()) { + clearTimeout(request.timeoutHandle!) + request.callback(error, undefined) + } + + this.#inflightRequests.clear() + + this.#socket.destroy() + return + } + } + this.#responseBuffer.append(chunk) // There is at least one message size to add @@ -826,6 +857,15 @@ export class Connection extends TypedEventEmitter { this.emit('drain') } + #isTLSRecord (chunk: Buffer): boolean { + return ( + chunk.length >= 3 && + (chunk[0] === 0x14 || chunk[0] === 0x15 || chunk[0] === 0x16 || chunk[0] === 0x17) && + chunk[1] === 0x03 && + chunk[2] <= 0x04 + ) + } + #onClose (): void { this.#status = ConnectionStatuses.CLOSED this.emit('close') diff --git a/test/network/connection.test.ts b/test/network/connection.test.ts index 0f18fcdc..98984a95 100644 --- a/test/network/connection.test.ts +++ b/test/network/connection.test.ts @@ -32,6 +32,7 @@ import { saslPlain, saslScramSha, UnexpectedCorrelationIdError, + UserError, Writer } from '../../src/index.ts' import { defaultCrypto, type ScramAlgorithm } from '../../src/protocol/sasl/scram-sha.ts' @@ -217,7 +218,7 @@ test('Connection.connect should handle connection timeout', async t => { // This IP is not routable due to RFC 5737 await rejects(() => connection.connect('192.0.2.1', 9092) as Promise, { - code: 'PLT_KFK_NETWORK', + code: 'PLT_KFK_TIMEOUT', message: 'Connection to 192.0.2.1:9092 timed out.' }) }) @@ -1398,6 +1399,53 @@ test('Connection.connect should prefer tls over ssl when both are provided', asy deepStrictEqual(await hostPromise.promise, 'localhost') }) +test('Connection.send should detect TLS broker when tls is not configured', async t => { + const { server, port } = await createServer(t) + const connection = new Connection('test-client', { requestTimeout: 1000 }) + t.after(() => connection.close()) + + server.on('connection', socket => { + socket.on('data', () => { + socket.write(Buffer.from([0x15, 0x03, 0x03, 0x00, 0x02, 0x02, 0x50])) + }) + }) + + await connection.connect('localhost', port) + + function payloadFn () { + const writer = Writer.create() + writer.appendInt32(42) + return writer + } + + await rejects( + () => + new Promise((resolve, reject) => { + connection.send( + 0, + 0, + payloadFn, + function () { + return 'Success' + }, + false, + false, + (err, returnValue) => { + if (err) { + reject(err) + } else { + resolve(returnValue!) + } + } + ) + }), + { + code: UserError.code, + message: 'Broker requires TLS.' + } + ) +}) + test('Connection.isConnected should return false when connection is not connected', () => { const connection = new Connection('test-client') From e1f410bca87a4cb2009260e782229bbae118728b Mon Sep 17 00:00:00 2001 From: Paolo Insogna Date: Wed, 1 Jul 2026 15:47:56 +0000 Subject: [PATCH 2/2] fixup Signed-off-by: Paolo Insogna --- test/network/connection.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/network/connection.test.ts b/test/network/connection.test.ts index 98984a95..81f9c1ee 100644 --- a/test/network/connection.test.ts +++ b/test/network/connection.test.ts @@ -218,7 +218,7 @@ test('Connection.connect should handle connection timeout', async t => { // This IP is not routable due to RFC 5737 await rejects(() => connection.connect('192.0.2.1', 9092) as Promise, { - code: 'PLT_KFK_TIMEOUT', + code: 'PLT_KFK_NETWORK', message: 'Connection to 192.0.2.1:9092 timed out.' }) })