diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index eb5830bddb61..bf4c731903bc 100644
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -592,5 +592,31 @@
+
+ kafka
+
+ false
+
+
+
+ org.apache.gluten
+ gluten-kafka
+ ${project.version}
+
+
+ org.apache.gluten
+ gluten-kafka
+ ${project.version}
+ test-jar
+ test
+
+
+ org.apache.spark
+ spark-sql-kafka-0-10_${scala.binary.version}
+ ${spark.version}
+ provided
+
+
+
diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc
index ddf417405eb8..fbf8fbad4b47 100644
--- a/cpp/velox/compute/VeloxBackend.cc
+++ b/cpp/velox/compute/VeloxBackend.cc
@@ -22,6 +22,7 @@
#include "operators/functions/RegistrationAllFunctions.h"
#include "operators/plannodes/RowVectorStream.h"
+#include "operators/reader/KafkaConnector.h"
#include "utils/ConfigExtractor.h"
#ifdef GLUTEN_ENABLE_QAT
@@ -325,6 +326,10 @@ void VeloxBackend::initConnector(const std::shared_ptr(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled));
+ // Register Kafka connector for streaming data from Kafka topics
+ velox::connector::registerConnector(
+ std::make_shared(kKafkaConnectorId, hiveConf));
+
#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) {
diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h
index 54db73303184..544a133dab11 100644
--- a/cpp/velox/config/VeloxConfig.h
+++ b/cpp/velox/config/VeloxConfig.h
@@ -111,6 +111,7 @@ const std::string kVeloxMemReclaimMaxWaitMs = "spark.gluten.sql.columnar.backend
const uint64_t kVeloxMemReclaimMaxWaitMsDefault = 3600000; // 60min
const std::string kHiveConnectorId = "test-hive";
+const std::string kKafkaConnectorId = "kafka-stream";
const std::string kVeloxCacheEnabled = "spark.gluten.sql.columnar.backend.velox.cacheEnabled";
const std::string kExprMaxCompiledRegexes = "spark.gluten.sql.columnar.backend.velox.maxCompiledRegexes";
diff --git a/cpp/velox/operators/reader/KAFKA_INTEGRATION.md b/cpp/velox/operators/reader/KAFKA_INTEGRATION.md
new file mode 100644
index 000000000000..728cb213d352
--- /dev/null
+++ b/cpp/velox/operators/reader/KAFKA_INTEGRATION.md
@@ -0,0 +1,236 @@
+# Kafka Reader Integration Guide
+
+## Overview
+
+This document explains how Kafka streams are mapped to use the KafkaReader in the Gluten Velox backend. The integration allows reading streaming data from Kafka topics through the Substrait plan.
+
+## Architecture
+
+### Components
+
+1. **KafkaSplit** (`KafkaSplit.h`) - Connector split containing Kafka connection parameters
+2. **KafkaConnector** (`KafkaConnector.h/cc`) - Velox connector for Kafka streams
+3. **KafkaReader** (`KafkaReader.h/cc`) - Source operator that reads from Kafka using librdkafka
+4. **SubstraitToVeloxPlan** - Modified to handle `stream_kafka()` in ReadRel
+
+### Flow
+
+```
+Substrait ReadRel (stream_kafka=true)
+ ↓
+SubstraitToVeloxPlanConverter::toVeloxPlan()
+ ↓
+constructKafkaStreamNode()
+ ↓
+TableScanNode with kKafkaConnectorId
+ ↓
+KafkaSplit created with broker/topic/partition info
+ ↓
+KafkaReader operator consumes messages
+ ↓
+RowVector output
+```
+
+## Key Changes
+
+### 1. SubstraitToVeloxPlan.cc
+
+The `getStreamIndex()` method now returns -1 for Kafka streams, but they are handled specially:
+
+```cpp
+// Line 1684-1690
+int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) {
+ // Check if this is a Kafka stream
+ if (sRead.stream_kafka()) {
+ // For Kafka streams, we don't use the iterator pattern
+ // Return -1 to indicate this should be handled as a regular scan
+ return -1;
+ }
+ // ... rest of the method
+}
+```
+
+In `toVeloxPlan()`, Kafka streams are now detected and routed to `constructKafkaStreamNode()`:
+
+```cpp
+// Line 1440-1452
+core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& readRel) {
+ // ... validation code ...
+
+ // Check if this is a Kafka stream - handle it specially
+ if (readRel.stream_kafka()) {
+ return constructKafkaStreamNode(readRel);
+ }
+
+ // ... rest of the method for regular streams and table scans ...
+}
+```
+
+### 2. Connector Registration
+
+In `VeloxBackend.cc`, the Kafka connector is registered alongside Hive and ValueStream connectors:
+
+```cpp
+// Register Kafka connector for streaming data from Kafka topics
+velox::connector::registerConnector(
+ std::make_shared(kKafkaConnectorId, hiveConf));
+```
+
+### 3. Split Creation
+
+When creating splits for Kafka streams, use `KafkaConnectorSplit`:
+
+```cpp
+auto kafkaSplit = std::make_shared(
+ kKafkaConnectorId,
+ "localhost:9092", // brokers
+ "my-topic", // topic
+ 0, // partition
+ 0, // startOffset
+ -1, // endOffset (-1 for latest)
+ "my-consumer-group", // groupId
+ additionalProps // additional Kafka properties
+);
+```
+
+## Usage Example
+
+### From Substrait Plan
+
+When a Substrait ReadRel has `stream_kafka()` set to true, it will automatically:
+
+1. Create a TableScanNode with `kKafkaConnectorId`
+2. Extract schema from `base_schema`
+3. Create appropriate column handles
+4. Mark the split info as `TABLE_SCAN` type
+
+### Creating Kafka Splits Programmatically
+
+```cpp
+#include "operators/reader/KafkaSplit.h"
+
+// Create Kafka split
+std::unordered_map kafkaProps;
+kafkaProps["auto.offset.reset"] = "earliest";
+kafkaProps["enable.auto.commit"] = "false";
+
+auto split = std::make_shared(
+ kKafkaConnectorId,
+ "broker1:9092,broker2:9092", // Kafka brokers
+ "events-topic", // Topic name
+ 0, // Partition number
+ 100, // Start offset
+ 1000, // End offset
+ "gluten-consumer", // Consumer group ID
+ kafkaProps // Additional properties
+);
+
+// Add split to task
+task->addSplit(planNodeId, std::move(split));
+```
+
+### Kafka Message Schema
+
+The KafkaReader produces RowVectors with the following default schema:
+
+- Column 0: `key` (VARBINARY) - Message key
+- Column 1: `value` (VARBINARY) - Message payload
+- Column 2: `topic` (VARCHAR) - Topic name
+- Column 3: `partition` (INTEGER) - Partition number
+- Column 4: `offset` (BIGINT) - Message offset
+- Column 5: `timestamp` (BIGINT) - Message timestamp
+
+You can customize the schema in the Substrait ReadRel's `base_schema`.
+
+## Configuration
+
+### Kafka Consumer Properties
+
+Configure Kafka consumer through `KafkaReaderConfig`:
+
+```cpp
+config_.brokers = "localhost:9092";
+config_.topic = "my-topic";
+config_.partition = 0;
+config_.startOffset = 0;
+config_.endOffset = -1; // Read until latest
+config_.groupId = "my-group";
+config_.maxPollRecords = 1000;
+config_.pollTimeoutMs = 1000;
+config_.additionalProps["security.protocol"] = "SASL_SSL";
+```
+
+### Velox Configuration
+
+Add to your Velox configuration:
+
+```cpp
+const std::string kKafkaConnectorId = "kafka-stream";
+```
+
+## Implementation Details
+
+### KafkaReader Operator
+
+The `KafkaReader` is a `SourceOperator` that:
+
+1. Initializes librdkafka consumer on first `getOutput()` call
+2. Polls messages in batches (configurable via `maxPollRecords`)
+3. Converts Kafka messages to Velox RowVectors
+4. Manages offset commits
+5. Handles partition EOF and errors
+
+### Offset Management
+
+- Offsets are committed after each batch
+- Manual offset management (auto-commit disabled)
+- Supports reading from specific offset ranges
+- Handles partition EOF gracefully
+
+### Error Handling
+
+- Connection errors logged and operator marked as finished
+- Message-level errors logged but don't stop processing
+- Timeout errors handled gracefully (returns empty batch)
+
+## Limitations and Future Work
+
+1. **Single Partition**: Currently reads from one partition per split
+2. **No Rebalancing**: Manual partition assignment (no consumer group rebalancing)
+3. **Schema**: Fixed schema mapping (could be made configurable)
+4. **Deserialization**: Messages returned as raw bytes (no built-in Avro/JSON support)
+
+## Testing
+
+To test the Kafka integration:
+
+1. Start a Kafka broker
+2. Create a test topic with data
+3. Create a Substrait plan with `stream_kafka()` set
+4. Execute the plan through Gluten
+
+Example test setup:
+
+```bash
+# Start Kafka
+docker run -d --name kafka -p 9092:9092 apache/kafka
+
+# Create topic
+kafka-topics --create --topic test-topic --bootstrap-server localhost:9092
+
+# Produce test data
+echo "test message" | kafka-console-producer --topic test-topic --bootstrap-server localhost:9092
+```
+
+## Dependencies
+
+- librdkafka (C/C++ Kafka client library)
+- Velox connector framework
+- Substrait plan support
+
+## See Also
+
+- `KafkaReader.h` - Main reader implementation
+- `KafkaSplit.h` - Split definition
+- `KafkaConnector.h` - Connector interface
+- `SubstraitToVeloxPlan.cc` - Plan conversion logic
\ No newline at end of file
diff --git a/cpp/velox/operators/reader/KafkaConnector.cc b/cpp/velox/operators/reader/KafkaConnector.cc
new file mode 100644
index 000000000000..29ec1d47c13a
--- /dev/null
+++ b/cpp/velox/operators/reader/KafkaConnector.cc
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "KafkaConnector.h"
+#include "KafkaReader.h"
+
+namespace gluten {
+
+std::unique_ptr
+KafkaDataSourceFactory::createKafkaReader(
+ int32_t operatorId,
+ facebook::velox::exec::DriverCtx* driverCtx,
+ const std::shared_ptr& planNode) {
+ return std::make_unique(operatorId, driverCtx, planNode);
+}
+
+} // namespace gluten
+
+// Made with Bob
diff --git a/cpp/velox/operators/reader/KafkaConnector.h b/cpp/velox/operators/reader/KafkaConnector.h
new file mode 100644
index 000000000000..6b94c8eebbcc
--- /dev/null
+++ b/cpp/velox/operators/reader/KafkaConnector.h
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/connectors/Connector.h"
+#include "velox/exec/Operator.h"
+
+namespace gluten {
+
+/// Kafka connector for streaming data from Kafka topics
+class KafkaConnector : public facebook::velox::connector::Connector {
+ public:
+ explicit KafkaConnector(
+ const std::string& id,
+ std::shared_ptr config)
+ : Connector(id), config_(std::move(config)) {}
+
+ std::unique_ptr createDataSource(
+ const facebook::velox::RowTypePtr& outputType,
+ const std::shared_ptr& tableHandle,
+ const std::unordered_map<
+ std::string,
+ std::shared_ptr>& columnHandles,
+ facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx) override {
+ VELOX_UNSUPPORTED("KafkaConnector does not support createDataSource");
+ }
+
+ std::unique_ptr createDataSink(
+ facebook::velox::RowTypePtr inputType,
+ std::shared_ptr connectorInsertTableHandle,
+ facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx,
+ facebook::velox::connector::CommitStrategy commitStrategy) override {
+ VELOX_UNSUPPORTED("KafkaConnector does not support createDataSink");
+ }
+
+ const std::shared_ptr& getConfig() const {
+ return config_;
+ }
+
+ private:
+ std::shared_ptr config_;
+};
+
+/// Factory for creating Kafka data sources
+/// This is used by the Velox execution engine to create KafkaReader operators
+class KafkaDataSourceFactory {
+ public:
+ static std::unique_ptr createKafkaReader(
+ int32_t operatorId,
+ facebook::velox::exec::DriverCtx* driverCtx,
+ const std::shared_ptr& planNode);
+};
+
+} // namespace gluten
+
+// Made with Bob
diff --git a/cpp/velox/operators/reader/KafkaReader.cc b/cpp/velox/operators/reader/KafkaReader.cc
new file mode 100644
index 000000000000..b038a7b1bc1f
--- /dev/null
+++ b/cpp/velox/operators/reader/KafkaReader.cc
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "KafkaReader.h"
+#include "KafkaSplit.h"
+#include
+#include "velox/vector/FlatVector.h"
+
+namespace gluten {
+
+namespace {
+
+// Event callback for Kafka consumer
+class KafkaEventCb : public RdKafka::EventCb {
+ public:
+ void event_cb(RdKafka::Event& event) override {
+ switch (event.type()) {
+ case RdKafka::Event::EVENT_ERROR:
+ LOG(ERROR) << "Kafka error: " << RdKafka::err2str(event.err())
+ << " - " << event.str();
+ break;
+ case RdKafka::Event::EVENT_STATS:
+ LOG(INFO) << "Kafka stats: " << event.str();
+ break;
+ case RdKafka::Event::EVENT_LOG:
+ LOG(INFO) << "Kafka log: " << event.str();
+ break;
+ default:
+ LOG(INFO) << "Kafka event: " << event.type() << " - " << event.str();
+ break;
+ }
+ }
+};
+
+// Rebalance callback for consumer group
+class KafkaRebalanceCb : public RdKafka::RebalanceCb {
+ public:
+ void rebalance_cb(
+ RdKafka::KafkaConsumer* consumer,
+ RdKafka::ErrorCode err,
+ std::vector& partitions) override {
+ if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
+ LOG(INFO) << "Kafka rebalance: assigning " << partitions.size() << " partitions";
+ consumer->assign(partitions);
+ } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
+ LOG(INFO) << "Kafka rebalance: revoking " << partitions.size() << " partitions";
+ consumer->unassign();
+ } else {
+ LOG(ERROR) << "Kafka rebalance error: " << RdKafka::err2str(err);
+ consumer->unassign();
+ }
+ }
+};
+
+} // anonymous namespace
+
+KafkaReader::KafkaReader(
+ int32_t operatorId,
+ facebook::velox::exec::DriverCtx* driverCtx,
+ const std::shared_ptr& planNode)
+ : SourceOperator(
+ driverCtx,
+ planNode->outputType(),
+ operatorId,
+ planNode->id(),
+ "KafkaReader") {
+ LOG(INFO) << "KafkaReader created with operator ID: " << operatorId;
+}
+
+KafkaReader::~KafkaReader() {
+ cleanup();
+}
+
+void KafkaReader::initializeConsumer() {
+ if (consumerInitialized_) {
+ return;
+ }
+
+ std::string errstr;
+
+ // Create global configuration
+ conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
+ if (!conf_) {
+ throw std::runtime_error("Failed to create Kafka global configuration");
+ }
+
+ // Set broker list
+ if (conf_->set("bootstrap.servers", config_.brokers, errstr) != RdKafka::Conf::CONF_OK) {
+ throw std::runtime_error("Failed to set bootstrap.servers: " + errstr);
+ }
+
+ // Set group ID if provided
+ if (!config_.groupId.empty()) {
+ if (conf_->set("group.id", config_.groupId, errstr) != RdKafka::Conf::CONF_OK) {
+ throw std::runtime_error("Failed to set group.id: " + errstr);
+ }
+ }
+
+ // Set event callback
+ static KafkaEventCb eventCb;
+ if (conf_->set("event_cb", &eventCb, errstr) != RdKafka::Conf::CONF_OK) {
+ LOG(WARNING) << "Failed to set event callback: " << errstr;
+ }
+
+ // Set rebalance callback
+ static KafkaRebalanceCb rebalanceCb;
+ if (conf_->set("rebalance_cb", &rebalanceCb, errstr) != RdKafka::Conf::CONF_OK) {
+ LOG(WARNING) << "Failed to set rebalance callback: " << errstr;
+ }
+
+ // Disable auto-commit for manual offset management
+ if (conf_->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) {
+ LOG(WARNING) << "Failed to disable auto-commit: " << errstr;
+ }
+
+ // Set additional properties
+ for (const auto& [key, value] : config_.additionalProps) {
+ if (conf_->set(key, value, errstr) != RdKafka::Conf::CONF_OK) {
+ LOG(WARNING) << "Failed to set property " << key << ": " << errstr;
+ }
+ }
+
+ // Create consumer
+ consumer_.reset(RdKafka::KafkaConsumer::create(conf_.get(), errstr));
+ if (!consumer_) {
+ throw std::runtime_error("Failed to create Kafka consumer: " + errstr);
+ }
+
+ consumerInitialized_ = true;
+ LOG(INFO) << "Kafka consumer initialized successfully";
+}
+
+void KafkaReader::connectToKafka() {
+ if (!consumerInitialized_) {
+ initializeConsumer();
+ }
+
+ // Create topic partition for assignment
+ std::vector partitions;
+ RdKafka::TopicPartition* partition =
+ RdKafka::TopicPartition::create(config_.topic, config_.partition, config_.startOffset);
+ partitions.push_back(partition);
+
+ // Assign partition
+ RdKafka::ErrorCode err = consumer_->assign(partitions);
+ if (err != RdKafka::ERR_NO_ERROR) {
+ delete partition;
+ throw std::runtime_error("Failed to assign partition: " + RdKafka::err2str(err));
+ }
+
+ currentOffset_ = config_.startOffset;
+ LOG(INFO) << "Connected to Kafka topic: " << config_.topic
+ << ", partition: " << config_.partition
+ << ", start offset: " << config_.startOffset;
+
+ delete partition;
+}
+
+std::vector KafkaReader::pollMessages() {
+ std::vector messages;
+
+ if (!consumer_) {
+ return messages;
+ }
+
+ int32_t remainingRecords = config_.maxPollRecords;
+ auto startTime = std::chrono::steady_clock::now();
+
+ while (remainingRecords > 0) {
+ // Check if we've exceeded the poll timeout
+ auto elapsed = std::chrono::duration_cast(
+ std::chrono::steady_clock::now() - startTime).count();
+ if (elapsed >= config_.pollTimeoutMs) {
+ break;
+ }
+
+ // Poll for a single message
+ RdKafka::Message* msg = consumer_->consume(100); // 100ms timeout per consume
+
+ if (!msg) {
+ continue;
+ }
+
+ switch (msg->err()) {
+ case RdKafka::ERR_NO_ERROR:
+ // Valid message
+ messages.push_back(msg);
+ currentOffset_ = msg->offset();
+ messagesRead_++;
+ remainingRecords--;
+
+ // Check if we've reached the end offset
+ if (config_.endOffset != RdKafka::Topic::OFFSET_END &&
+ currentOffset_ >= config_.endOffset) {
+ finished_ = true;
+ return messages;
+ }
+ break;
+
+ case RdKafka::ERR__PARTITION_EOF:
+ // End of partition
+ LOG(INFO) << "Reached end of partition at offset: " << currentOffset_;
+ delete msg;
+ finished_ = true;
+ return messages;
+
+ case RdKafka::ERR__TIMED_OUT:
+ // Timeout, no message available
+ delete msg;
+ break;
+
+ default:
+ // Error
+ LOG(ERROR) << "Kafka consume error: " << msg->errstr();
+ delete msg;
+ break;
+ }
+ }
+
+ return messages;
+}
+
+facebook::velox::RowVectorPtr KafkaReader::convertMessagesToRowVector(
+ const std::vector& messages) {
+ if (messages.empty()) {
+ return nullptr;
+ }
+
+ auto rowType = outputType_->asRow();
+ size_t numRows = messages.size();
+
+ // Create vectors for each column
+ std::vector childVectors;
+ for (size_t i = 0; i < rowType.size(); ++i) {
+ childVectors.push_back(createColumnVector(rowType.childAt(i), messages, i));
+ }
+
+ // Create and return RowVector
+ return std::make_shared(
+ pool(),
+ rowType.asRow(),
+ facebook::velox::BufferPtr(nullptr),
+ numRows,
+ childVectors);
+}
+
+facebook::velox::VectorPtr KafkaReader::createColumnVector(
+ const facebook::velox::TypePtr& type,
+ const std::vector& messages,
+ size_t columnIndex) {
+ size_t numRows = messages.size();
+
+ // Handle common Kafka message fields based on column index
+ // Column 0: key (VARBINARY)
+ // Column 1: value (VARBINARY)
+ // Column 2: topic (VARCHAR)
+ // Column 3: partition (INTEGER)
+ // Column 4: offset (BIGINT)
+ // Column 5: timestamp (BIGINT)
+
+ if (type->isVarbinary() || type->isVarchar()) {
+ auto flatVector = std::dynamic_pointer_cast>(
+ facebook::velox::BaseVector::create(type, numRows, pool()));
+
+ for (size_t i = 0; i < numRows; ++i) {
+ const auto* msg = messages[i];
+
+ if (columnIndex == 0) {
+ // Key
+ if (msg->key()) {
+ flatVector->set(i, facebook::velox::StringView(
+ static_cast(msg->key()->c_str()), msg->key()->size()));
+ } else {
+ flatVector->setNull(i, true);
+ }
+ } else if (columnIndex == 1) {
+ // Value
+ if (msg->payload()) {
+ flatVector->set(i, facebook::velox::StringView(
+ static_cast(msg->payload()), msg->len()));
+ } else {
+ flatVector->setNull(i, true);
+ }
+ } else if (columnIndex == 2) {
+ // Topic
+ flatVector->set(i, facebook::velox::StringView(msg->topic_name()));
+ }
+ }
+
+ return flatVector;
+ } else if (type->isInteger()) {
+ auto flatVector = std::dynamic_pointer_cast>(
+ facebook::velox::BaseVector::create(type, numRows, pool()));
+
+ for (size_t i = 0; i < numRows; ++i) {
+ const auto* msg = messages[i];
+
+ if (columnIndex == 3) {
+ // Partition
+ flatVector->set(i, msg->partition());
+ }
+ }
+
+ return flatVector;
+ } else if (type->isBigint()) {
+ auto flatVector = std::dynamic_pointer_cast>(
+ facebook::velox::BaseVector::create(type, numRows, pool()));
+
+ for (size_t i = 0; i < numRows; ++i) {
+ const auto* msg = messages[i];
+
+ if (columnIndex == 4) {
+ // Offset
+ flatVector->set(i, msg->offset());
+ } else if (columnIndex == 5) {
+ // Timestamp
+ flatVector->set(i, msg->timestamp().timestamp);
+ }
+ }
+
+ return flatVector;
+ }
+
+ // Default: create empty vector
+ return facebook::velox::BaseVector::create(type, numRows, pool());
+}
+
+void KafkaReader::commitOffset(int64_t offset) {
+ if (!consumer_) {
+ return;
+ }
+
+ std::vector partitions;
+ RdKafka::TopicPartition* partition =
+ RdKafka::TopicPartition::create(config_.topic, config_.partition, offset + 1);
+ partitions.push_back(partition);
+
+ RdKafka::ErrorCode err = consumer_->commitSync(partitions);
+ if (err != RdKafka::ERR_NO_ERROR) {
+ LOG(ERROR) << "Failed to commit offset: " << RdKafka::err2str(err);
+ } else {
+ LOG(INFO) << "Committed offset: " << offset;
+ }
+
+ delete partition;
+}
+
+void KafkaReader::cleanup() {
+ if (consumer_) {
+ // Commit final offset
+ if (currentOffset_ > 0) {
+ commitOffset(currentOffset_);
+ }
+
+ // Close consumer
+ consumer_->close();
+ consumer_.reset();
+ LOG(INFO) << "Kafka consumer closed. Total messages read: " << messagesRead_;
+ }
+
+ conf_.reset();
+ tconf_.reset();
+}
+
+facebook::velox::RowVectorPtr KafkaReader::getOutput() {
+ if (finished_ || !hasSplit_) {
+ return nullptr;
+ }
+
+ // Initialize consumer on first call
+ if (!consumerInitialized_) {
+ try {
+ connectToKafka();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Failed to connect to Kafka: " << e.what();
+ finished_ = true;
+ return nullptr;
+ }
+ }
+
+ // Poll messages from Kafka
+ auto messages = pollMessages();
+
+ if (messages.empty()) {
+ // No messages available, but not finished yet
+ if (!finished_) {
+ return nullptr;
+ }
+ // Finished reading
+ return nullptr;
+ }
+
+ // Convert messages to RowVector
+ auto result = convertMessagesToRowVector(messages);
+
+ // Clean up messages
+ for (auto* msg : messages) {
+ delete msg;
+ }
+
+ // Commit offset periodically (every batch)
+ if (currentOffset_ > 0) {
+ commitOffset(currentOffset_);
+ }
+
+ return result;
+}
+
+facebook::velox::BlockingReason KafkaReader::isBlocked(
+ facebook::velox::ContinueFuture* future) {
+ // Kafka consumer is non-blocking with timeout
+ return facebook::velox::BlockingReason::kNotBlocked;
+}
+
+bool KafkaReader::isFinished() {
+ return (noMoreSplits_ && !hasSplit_) || finished_;
+}
+
+void KafkaReader::addSplit(
+ std::shared_ptr split) {
+ // Extract Kafka configuration from split
+ auto kafkaSplit = std::dynamic_pointer_cast(split);
+ if (!kafkaSplit) {
+ throw std::runtime_error("Split is not a KafkaConnectorSplit");
+ }
+
+ hasSplit_ = true;
+
+ // Populate config from split
+ config_.brokers = kafkaSplit->getBrokers();
+ config_.topic = kafkaSplit->getTopic();
+ config_.partition = kafkaSplit->getPartition();
+ config_.startOffset = kafkaSplit->getStartOffset();
+ config_.endOffset = kafkaSplit->getEndOffset();
+ config_.groupId = kafkaSplit->getGroupId();
+ config_.additionalProps = kafkaSplit->getAdditionalProps();
+
+ LOG(INFO) << "Added Kafka split: " << kafkaSplit->toString();
+}
+
+void KafkaReader::noMoreSplits() {
+ noMoreSplits_ = true;
+ LOG(INFO) << "No more splits for KafkaReader";
+}
+
+} // namespace gluten
+
+// Made with Bob
diff --git a/cpp/velox/operators/reader/KafkaReader.h b/cpp/velox/operators/reader/KafkaReader.h
new file mode 100644
index 000000000000..cd9e6a164bfe
--- /dev/null
+++ b/cpp/velox/operators/reader/KafkaReader.h
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include "velox/connectors/Connector.h"
+#include "velox/exec/Operator.h"
+#include "velox/vector/ComplexVector.h"
+
+namespace gluten {
+
+/// Configuration for Kafka consumer
+struct KafkaReaderConfig {
+ std::string brokers;
+ std::string topic;
+ int32_t partition;
+ int64_t startOffset;
+ int64_t endOffset;
+ std::string groupId;
+ int32_t maxPollRecords;
+ int32_t pollTimeoutMs;
+ std::unordered_map additionalProps;
+
+ KafkaReaderConfig()
+ : partition(0),
+ startOffset(RdKafka::Topic::OFFSET_BEGINNING),
+ endOffset(RdKafka::Topic::OFFSET_END),
+ maxPollRecords(1000),
+ pollTimeoutMs(1000) {}
+};
+
+/// Kafka reader operator for streaming Kafka data
+/// Integrates with librdkafka to consume messages from Kafka topics
+class KafkaReader : public facebook::velox::exec::SourceOperator {
+ public:
+ KafkaReader(
+ int32_t operatorId,
+ facebook::velox::exec::DriverCtx* driverCtx,
+ const std::shared_ptr& planNode);
+
+ ~KafkaReader() override;
+
+ facebook::velox::RowVectorPtr getOutput() override;
+
+ facebook::velox::BlockingReason isBlocked(
+ facebook::velox::ContinueFuture* future) override;
+
+ bool isFinished() override;
+
+ void addSplit(std::shared_ptr split) override;
+
+ void noMoreSplits() override;
+
+ private:
+ /// Initialize Kafka consumer with configuration
+ void initializeConsumer();
+
+ /// Connect to Kafka broker and subscribe to topic/partition
+ void connectToKafka();
+
+ /// Poll messages from Kafka
+ std::vector pollMessages();
+
+ /// Convert Kafka messages to RowVector format
+ facebook::velox::RowVectorPtr convertMessagesToRowVector(
+ const std::vector& messages);
+
+ /// Handle offset management
+ void commitOffset(int64_t offset);
+
+ /// Cleanup Kafka resources
+ void cleanup();
+
+ /// Create vector for a specific column based on type
+ facebook::velox::VectorPtr createColumnVector(
+ const facebook::velox::TypePtr& type,
+ const std::vector& messages,
+ size_t columnIndex);
+
+ // Kafka consumer and configuration
+ std::unique_ptr consumer_;
+ std::unique_ptr conf_;
+ std::unique_ptr tconf_;
+ KafkaReaderConfig config_;
+
+ // State management
+ bool noMoreSplits_ = false;
+ bool hasSplit_ = false;
+ bool consumerInitialized_ = false;
+ bool finished_ = false;
+ int64_t currentOffset_ = 0;
+ int64_t messagesRead_ = 0;
+
+ // Buffer for batching
+ std::vector messageBuffer_;
+ size_t maxBatchSize_ = 1000;
+};
+
+} // namespace gluten
diff --git a/cpp/velox/operators/reader/KafkaSplit.h b/cpp/velox/operators/reader/KafkaSplit.h
new file mode 100644
index 000000000000..bd54b35fddf5
--- /dev/null
+++ b/cpp/velox/operators/reader/KafkaSplit.h
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+#include
+#include "velox/connectors/Connector.h"
+
+namespace gluten {
+
+/// Split for Kafka streaming data
+/// Contains all necessary information to connect to and consume from a Kafka topic/partition
+class KafkaConnectorSplit : public facebook::velox::connector::ConnectorSplit {
+ public:
+ KafkaConnectorSplit(
+ const std::string& connectorId,
+ const std::string& brokers,
+ const std::string& topic,
+ int32_t partition,
+ int64_t startOffset,
+ int64_t endOffset,
+ const std::string& groupId = "",
+ const std::unordered_map& additionalProps = {})
+ : ConnectorSplit(connectorId),
+ brokers_(brokers),
+ topic_(topic),
+ partition_(partition),
+ startOffset_(startOffset),
+ endOffset_(endOffset),
+ groupId_(groupId),
+ additionalProps_(additionalProps) {}
+
+ const std::string& getBrokers() const {
+ return brokers_;
+ }
+
+ const std::string& getTopic() const {
+ return topic_;
+ }
+
+ int32_t getPartition() const {
+ return partition_;
+ }
+
+ int64_t getStartOffset() const {
+ return startOffset_;
+ }
+
+ int64_t getEndOffset() const {
+ return endOffset_;
+ }
+
+ const std::string& getGroupId() const {
+ return groupId_;
+ }
+
+ const std::unordered_map& getAdditionalProps() const {
+ return additionalProps_;
+ }
+
+ std::string toString() const override {
+ return fmt::format(
+ "KafkaConnectorSplit[brokers={}, topic={}, partition={}, startOffset={}, endOffset={}]",
+ brokers_,
+ topic_,
+ partition_,
+ startOffset_,
+ endOffset_);
+ }
+
+ private:
+ std::string brokers_;
+ std::string topic_;
+ int32_t partition_;
+ int64_t startOffset_;
+ int64_t endOffset_;
+ std::string groupId_;
+ std::unordered_map additionalProps_;
+};
+
+} // namespace gluten
+
+// Made with Bob
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
index adb7fc5f45b6..c04e51d333f7 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc
@@ -1415,6 +1415,67 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructCudfValueStreamNode(
splitInfoMap_[node->id()] = splitInfo;
return node;
}
+core::PlanNodePtr SubstraitToVeloxPlanConverter::constructKafkaStreamNode(
+ const ::substrait::ReadRel& readRel) {
+ // Get output schema from ReadRel
+ std::vector colNameList;
+ std::vector veloxTypeList;
+ bool asLowerCase = !veloxCfg_->get(kCaseSensitive, false);
+
+ if (readRel.has_base_schema()) {
+ const auto& baseSchema = readRel.base_schema();
+ colNameList.reserve(baseSchema.names().size());
+ for (const auto& name : baseSchema.names()) {
+ std::string fieldName = name;
+ if (asLowerCase) {
+ folly::toLowerAscii(fieldName);
+ }
+ colNameList.emplace_back(fieldName);
+ }
+ veloxTypeList = SubstraitParser::parseNamedStruct(baseSchema, asLowerCase);
+ }
+
+ // Create output names for the plan node
+ std::vector outNames;
+ outNames.reserve(colNameList.size());
+ connector::ColumnHandleMap assignments;
+
+ for (int idx = 0; idx < colNameList.size(); idx++) {
+ auto outName = SubstraitParser::makeNodeName(planNodeId_, idx);
+ assignments[outName] = std::make_shared(
+ colNameList[idx],
+ connector::hive::HiveColumnHandle::ColumnType::kRegular,
+ veloxTypeList[idx],
+ veloxTypeList[idx]);
+ outNames.emplace_back(outName);
+ }
+
+ auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
+
+ // Create Kafka table handle
+ common::SubfieldFilters subfieldFilters;
+ auto tableHandle = std::make_shared(
+ kKafkaConnectorId,
+ "kafka_stream",
+ std::move(subfieldFilters),
+ nullptr, // remainingFilter
+ outputType);
+
+ // Create TableScanNode for Kafka
+ auto tableScanNode = std::make_shared(
+ nextPlanNodeId(),
+ std::move(outputType),
+ std::move(tableHandle),
+ assignments);
+
+ // Set split info for Kafka stream
+ auto splitInfo = std::make_shared();
+ splitInfo->leafType = SplitInfo::LeafType::TABLE_SCAN;
+ splitInfoMap_[tableScanNode->id()] = splitInfo;
+
+ return tableScanNode;
+}
+
#endif
core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode(
@@ -1445,6 +1506,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
!readRel.common().has_emit(), "Emit not supported for ValuesNode and TableScanNode related Substrait plans.");
}
+ // Check if this is a Kafka stream - handle it specially
+ if (readRel.stream_kafka()) {
+ return constructKafkaStreamNode(readRel);
+ }
+
auto streamIdx = getStreamIndex(readRel);
if (streamIdx >= 0) {
// Check if the ReadRel specifies an input of stream. If yes, build TableScanNode with iterator connector.
@@ -1682,6 +1748,13 @@ std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) {
}
int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) {
+ // Check if this is a Kafka stream
+ if (sRead.stream_kafka()) {
+ // For Kafka streams, we don't use the iterator pattern
+ // Return -1 to indicate this should be handled as a regular scan
+ return -1;
+ }
+
if (sRead.has_local_files()) {
const auto& fileList = sRead.local_files().items();
if (fileList.size() == 0) {
diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h
index 47bf3a0525b1..730fc36d2e2d 100644
--- a/cpp/velox/substrait/SubstraitToVeloxPlan.h
+++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h
@@ -150,6 +150,9 @@ class SubstraitToVeloxPlanConverter {
// Construct a cuDF value stream node.
core::PlanNodePtr constructCudfValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);
+ // Construct a Kafka stream node for reading from Kafka topics.
+ core::PlanNodePtr constructKafkaStreamNode(const ::substrait::ReadRel& sRead);
+
// This is only used in benchmark and enable query trace, which will load all the data to ValuesNode.
core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);