Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/content/docs/integrations/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,4 @@ to_kafka "topic"

You can control the message encoding with the `message` argument in
[`to_kafka`](/reference/operators/to_kafka) that defaults to
`this.print_json()`.
`this.print_ndjson()`.
17 changes: 12 additions & 5 deletions src/content/docs/reference/operators/from_kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Receives events from an Apache Kafka topic.

```tql
from_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record,
aws_iam=record, aws_region=string, commit_batch_size=int]
aws_iam=record, aws_region=string, batch_size=int, batch_timeout=duration]
```

## Description
Expand Down Expand Up @@ -84,14 +84,21 @@ options][librdkafka-options] to configure Kafka according to your needs.
We recommend factoring these options into the plugin-specific `kafka.yaml` so
that they are independent of the `from_kafka` arguments.

### `commit_batch_size = int (optional)`
### `batch_size = int (optional)`

The operator commits offsets after receiving `commit_batch_size` messages
to improve throughput. If you need to ensure exactly-once semantics for your
pipeline, set this option to `1` to commit every message individually.
The number of messages to accumulate before emitting a batch. The operator
commits offsets after each batch to improve throughput.

Defaults to `1000`.

### `batch_timeout = duration (optional)`

The maximum time to wait before flushing a partial batch. If the operator
receives fewer than `batch_size` messages within this interval, it emits
and commits whatever it has collected so far.

Defaults to `10s`.

## Amazon MSK

The operator supports [Amazon MSK](/integrations/amazon/msk) with IAM
Expand Down
15 changes: 8 additions & 7 deletions src/content/docs/reference/operators/to_kafka.mdx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
title: to_kafka
category: Outputs/Events
example: 'to_kafka "topic", message=this.print_json()'
example: 'to_kafka "topic", message=this.print_ndjson()'
---

import Op from '@components/see-also/Op.astro';
Expand Down Expand Up @@ -41,7 +41,7 @@ The Kafka topic to send messages to.

An expression that evaluates to the message content for each row.

Defaults to `this.print_json()` when not specified.
Defaults to `this.print_ndjson()` when not specified.

### `key = string (optional)`

Expand Down Expand Up @@ -72,7 +72,7 @@ connecting to MSK with IAM authentication.

## Examples

### Send JSON-formatted events to topic `events` (using default)
### Send NDJSON-formatted events to topic `events` (using default)

Stream security events to a Kafka topic with automatic JSON formatting:

Expand All @@ -85,7 +85,8 @@ to_kafka "events"

This pipeline subscribes to security alerts, filters for high-severity events,
selects relevant fields, and sends them to Kafka as JSON. Each event is
automatically formatted using `this.print_json()`, producing messages like:
automatically formatted using `this.print_ndjson()`, producing one NDJSON line per
event:

```json
{
Expand All @@ -96,11 +97,11 @@ automatically formatted using `this.print_json()`, producing messages like:
}
```

### Send JSON-formatted events with explicit message
### Send NDJSON-formatted events with explicit message

```tql
subscribe "logs"
to_kafka "events", message=this.print_json()
to_kafka "events", message=this.print_ndjson()
```

### Send specific field values with a timestamp
Expand All @@ -114,7 +115,7 @@ to_kafka "alerts", message=alert_msg, timestamp=2024-01-01T00:00:00

```tql
metrics
to_kafka "metrics", message=this.print_json(), key="server-01"
to_kafka "metrics", message=this.print_ndjson(), key="server-01"
```

## See Also
Expand Down