diff --git a/src/content/docs/integrations/kafka.mdx b/src/content/docs/integrations/kafka.mdx index 0581cbe8d..d5e0ae4c7 100644 --- a/src/content/docs/integrations/kafka.mdx +++ b/src/content/docs/integrations/kafka.mdx @@ -44,4 +44,4 @@ to_kafka "topic" You can control the message encoding with the `message` argument in [`to_kafka`](/reference/operators/to_kafka) that defaults to -`this.print_json()`. +`this.print_ndjson()`. diff --git a/src/content/docs/reference/operators/from_kafka.mdx b/src/content/docs/reference/operators/from_kafka.mdx index 052193a91..62111c75d 100644 --- a/src/content/docs/reference/operators/from_kafka.mdx +++ b/src/content/docs/reference/operators/from_kafka.mdx @@ -12,7 +12,7 @@ Receives events from an Apache Kafka topic. ```tql from_kafka topic:string, [count=int, exit=bool, offset=int|string, options=record, - aws_iam=record, aws_region=string, commit_batch_size=int] + aws_iam=record, aws_region=string, batch_size=int, batch_timeout=duration] ``` ## Description @@ -84,14 +84,21 @@ options][librdkafka-options] to configure Kafka according to your needs. We recommend factoring these options into the plugin-specific `kafka.yaml` so that they are independent of the `from_kafka` arguments. -### `commit_batch_size = int (optional)` +### `batch_size = int (optional)` -The operator commits offsets after receiving `commit_batch_size` messages -to improve throughput. If you need to ensure exactly-once semantics for your -pipeline, set this option to `1` to commit every message individually. +The number of messages to accumulate before emitting a batch. The operator +commits offsets after each batch to improve throughput. Defaults to `1000`. +### `batch_timeout = duration (optional)` + +The maximum time to wait before flushing a partial batch. If the operator +receives fewer than `batch_size` messages within this interval, it emits +and commits whatever it has collected so far. + +Defaults to `10s`. + ## Amazon MSK The operator supports [Amazon MSK](/integrations/amazon/msk) with IAM diff --git a/src/content/docs/reference/operators/to_kafka.mdx b/src/content/docs/reference/operators/to_kafka.mdx index d2f210ae6..bf84a50e0 100644 --- a/src/content/docs/reference/operators/to_kafka.mdx +++ b/src/content/docs/reference/operators/to_kafka.mdx @@ -1,7 +1,7 @@ --- title: to_kafka category: Outputs/Events -example: 'to_kafka "topic", message=this.print_json()' +example: 'to_kafka "topic", message=this.print_ndjson()' --- import Op from '@components/see-also/Op.astro'; @@ -41,7 +41,7 @@ The Kafka topic to send messages to. An expression that evaluates to the message content for each row. -Defaults to `this.print_json()` when not specified. +Defaults to `this.print_ndjson()` when not specified. ### `key = string (optional)` @@ -72,7 +72,7 @@ connecting to MSK with IAM authentication. ## Examples -### Send JSON-formatted events to topic `events` (using default) +### Send NDJSON-formatted events to topic `events` (using default) Stream security events to a Kafka topic with automatic JSON formatting: @@ -85,7 +85,8 @@ to_kafka "events" This pipeline subscribes to security alerts, filters for high-severity events, selects relevant fields, and sends them to Kafka as JSON. Each event is -automatically formatted using `this.print_json()`, producing messages like: +automatically formatted using `this.print_ndjson()`, producing one NDJSON line per +event: ```json { @@ -96,11 +97,11 @@ automatically formatted using `this.print_json()`, producing messages like: } ``` -### Send JSON-formatted events with explicit message +### Send NDJSON-formatted events with explicit message ```tql subscribe "logs" -to_kafka "events", message=this.print_json() +to_kafka "events", message=this.print_ndjson() ``` ### Send specific field values with a timestamp @@ -114,7 +115,7 @@ to_kafka "alerts", message=alert_msg, timestamp=2024-01-01T00:00:00 ```tql metrics -to_kafka "metrics", message=this.print_json(), key="server-01" +to_kafka "metrics", message=this.print_ndjson(), key="server-01" ``` ## See Also