From b45e3cdcbc44dcf6438a6e2d5382dcf824a0e2f3 Mon Sep 17 00:00:00 2001 From: Yuan Date: Tue, 17 Mar 2026 22:11:31 +0000 Subject: [PATCH 1/2] [VL] adding kfaka read support Signed-off-by: Yuan --- backends-velox/pom.xml | 26 +++++++++ cpp/velox/operators/reader/KafkaReader.cc | 30 ++++++++++ cpp/velox/operators/reader/KafkaReader.h | 65 +++++++++++++++++++++ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 7 +++ 4 files changed, 128 insertions(+) create mode 100644 cpp/velox/operators/reader/KafkaReader.cc create mode 100644 cpp/velox/operators/reader/KafkaReader.h diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index eb5830bddb61..bf4c731903bc 100644 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -592,5 +592,31 @@ + + kafka + + false + + + + org.apache.gluten + gluten-kafka + ${project.version} + + + org.apache.gluten + gluten-kafka + ${project.version} + test-jar + test + + + org.apache.spark + spark-sql-kafka-0-10_${scala.binary.version} + ${spark.version} + provided + + + diff --git a/cpp/velox/operators/reader/KafkaReader.cc b/cpp/velox/operators/reader/KafkaReader.cc new file mode 100644 index 000000000000..1bc4a9680b79 --- /dev/null +++ b/cpp/velox/operators/reader/KafkaReader.cc @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "KafkaReader.h" + +namespace gluten { + +// Implementation placeholder for KafkaReader +// This file will contain the actual implementation of Kafka reading logic +// including: +// - Kafka consumer initialization +// - Message polling and deserialization +// - Offset management +// - Error handling and retry logic + +} // namespace gluten diff --git a/cpp/velox/operators/reader/KafkaReader.h b/cpp/velox/operators/reader/KafkaReader.h new file mode 100644 index 000000000000..32d8b41bebcb --- /dev/null +++ b/cpp/velox/operators/reader/KafkaReader.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/exec/Operator.h" + +namespace gluten { + +/// Kafka reader operator for streaming Kafka data +/// This is a placeholder implementation that will be extended +/// to support actual Kafka consumption in the Velox backend +class KafkaReader : public facebook::velox::exec::SourceOperator { + public: + KafkaReader( + int32_t operatorId, + facebook::velox::exec::DriverCtx* driverCtx, + const std::shared_ptr& planNode) + : SourceOperator( + driverCtx, + planNode->outputType(), + operatorId, + planNode->id(), + "KafkaReader") {} + + facebook::velox::RowVectorPtr getOutput() override { + // TODO: Implement actual Kafka reading logic + // This should: + // 1. Connect to Kafka broker + // 2. Read messages from the specified topic/partition + // 3. Convert Kafka messages to RowVector format + // 4. Handle offset management + return nullptr; + } + + facebook::velox::BlockingReason isBlocked( + facebook::velox::ContinueFuture* future) override { + return facebook::velox::BlockingReason::kNotBlocked; + } + + bool isFinished() override { + return noMoreSplits_ && !hasSplit_; + } + + private: + bool noMoreSplits_ = false; + bool hasSplit_ = false; +}; + +} // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index adb7fc5f45b6..86a0f9a85979 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1682,6 +1682,13 @@ std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) { } int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) { + // Check if this is a Kafka stream + if (sRead.stream_kafka()) { + // For Kafka streams, we don't use the iterator pattern + // Return -1 to indicate this should be handled as a regular scan + return -1; + } + if (sRead.has_local_files()) { const auto& fileList = sRead.local_files().items(); if (fileList.size() == 0) { From ad5eee77bfee15ca60e52f4357f951b71c355b5f Mon Sep 17 00:00:00 2001 From: Yuan Date: Mon, 30 Mar 2026 14:22:22 +0100 Subject: [PATCH 2/2] implement kafka read Signed-off-by: Yuan --- cpp/velox/operators/reader/KafkaReader.cc | 442 +++++++++++++++++++++- cpp/velox/operators/reader/KafkaReader.h | 102 +++-- 2 files changed, 512 insertions(+), 32 deletions(-) diff --git a/cpp/velox/operators/reader/KafkaReader.cc b/cpp/velox/operators/reader/KafkaReader.cc index 1bc4a9680b79..b48b0d7dfefb 100644 --- a/cpp/velox/operators/reader/KafkaReader.cc +++ b/cpp/velox/operators/reader/KafkaReader.cc @@ -16,15 +16,443 @@ */ #include "KafkaReader.h" +#include +#include "velox/vector/FlatVector.h" namespace gluten { -// Implementation placeholder for KafkaReader -// This file will contain the actual implementation of Kafka reading logic -// including: -// - Kafka consumer initialization -// - Message polling and deserialization -// - Offset management -// - Error handling and retry logic +namespace { + +// Event callback for Kafka consumer +class KafkaEventCb : public RdKafka::EventCb { + public: + void event_cb(RdKafka::Event& event) override { + switch (event.type()) { + case RdKafka::Event::EVENT_ERROR: + LOG(ERROR) << "Kafka error: " << RdKafka::err2str(event.err()) + << " - " << event.str(); + break; + case RdKafka::Event::EVENT_STATS: + LOG(INFO) << "Kafka stats: " << event.str(); + break; + case RdKafka::Event::EVENT_LOG: + LOG(INFO) << "Kafka log: " << event.str(); + break; + default: + LOG(INFO) << "Kafka event: " << event.type() << " - " << event.str(); + break; + } + } +}; + +// Rebalance callback for consumer group +class KafkaRebalanceCb : public RdKafka::RebalanceCb { + public: + void rebalance_cb( + RdKafka::KafkaConsumer* consumer, + RdKafka::ErrorCode err, + std::vector& partitions) override { + if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { + LOG(INFO) << "Kafka rebalance: assigning " << partitions.size() << " partitions"; + consumer->assign(partitions); + } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) { + LOG(INFO) << "Kafka rebalance: revoking " << partitions.size() << " partitions"; + consumer->unassign(); + } else { + LOG(ERROR) << "Kafka rebalance error: " << RdKafka::err2str(err); + consumer->unassign(); + } + } +}; + +} // anonymous namespace + +KafkaReader::KafkaReader( + int32_t operatorId, + facebook::velox::exec::DriverCtx* driverCtx, + const std::shared_ptr& planNode) + : SourceOperator( + driverCtx, + planNode->outputType(), + operatorId, + planNode->id(), + "KafkaReader") { + LOG(INFO) << "KafkaReader created with operator ID: " << operatorId; +} + +KafkaReader::~KafkaReader() { + cleanup(); +} + +void KafkaReader::initializeConsumer() { + if (consumerInitialized_) { + return; + } + + std::string errstr; + + // Create global configuration + conf_.reset(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); + if (!conf_) { + throw std::runtime_error("Failed to create Kafka global configuration"); + } + + // Set broker list + if (conf_->set("bootstrap.servers", config_.brokers, errstr) != RdKafka::Conf::CONF_OK) { + throw std::runtime_error("Failed to set bootstrap.servers: " + errstr); + } + + // Set group ID if provided + if (!config_.groupId.empty()) { + if (conf_->set("group.id", config_.groupId, errstr) != RdKafka::Conf::CONF_OK) { + throw std::runtime_error("Failed to set group.id: " + errstr); + } + } + + // Set event callback + static KafkaEventCb eventCb; + if (conf_->set("event_cb", &eventCb, errstr) != RdKafka::Conf::CONF_OK) { + LOG(WARNING) << "Failed to set event callback: " << errstr; + } + + // Set rebalance callback + static KafkaRebalanceCb rebalanceCb; + if (conf_->set("rebalance_cb", &rebalanceCb, errstr) != RdKafka::Conf::CONF_OK) { + LOG(WARNING) << "Failed to set rebalance callback: " << errstr; + } + + // Disable auto-commit for manual offset management + if (conf_->set("enable.auto.commit", "false", errstr) != RdKafka::Conf::CONF_OK) { + LOG(WARNING) << "Failed to disable auto-commit: " << errstr; + } + + // Set additional properties + for (const auto& [key, value] : config_.additionalProps) { + if (conf_->set(key, value, errstr) != RdKafka::Conf::CONF_OK) { + LOG(WARNING) << "Failed to set property " << key << ": " << errstr; + } + } + + // Create consumer + consumer_.reset(RdKafka::KafkaConsumer::create(conf_.get(), errstr)); + if (!consumer_) { + throw std::runtime_error("Failed to create Kafka consumer: " + errstr); + } + + consumerInitialized_ = true; + LOG(INFO) << "Kafka consumer initialized successfully"; +} + +void KafkaReader::connectToKafka() { + if (!consumerInitialized_) { + initializeConsumer(); + } + + // Create topic partition for assignment + std::vector partitions; + RdKafka::TopicPartition* partition = + RdKafka::TopicPartition::create(config_.topic, config_.partition, config_.startOffset); + partitions.push_back(partition); + + // Assign partition + RdKafka::ErrorCode err = consumer_->assign(partitions); + if (err != RdKafka::ERR_NO_ERROR) { + delete partition; + throw std::runtime_error("Failed to assign partition: " + RdKafka::err2str(err)); + } + + currentOffset_ = config_.startOffset; + LOG(INFO) << "Connected to Kafka topic: " << config_.topic + << ", partition: " << config_.partition + << ", start offset: " << config_.startOffset; + + delete partition; +} + +std::vector KafkaReader::pollMessages() { + std::vector messages; + + if (!consumer_) { + return messages; + } + + int32_t remainingRecords = config_.maxPollRecords; + auto startTime = std::chrono::steady_clock::now(); + + while (remainingRecords > 0) { + // Check if we've exceeded the poll timeout + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - startTime).count(); + if (elapsed >= config_.pollTimeoutMs) { + break; + } + + // Poll for a single message + RdKafka::Message* msg = consumer_->consume(100); // 100ms timeout per consume + + if (!msg) { + continue; + } + + switch (msg->err()) { + case RdKafka::ERR_NO_ERROR: + // Valid message + messages.push_back(msg); + currentOffset_ = msg->offset(); + messagesRead_++; + remainingRecords--; + + // Check if we've reached the end offset + if (config_.endOffset != RdKafka::Topic::OFFSET_END && + currentOffset_ >= config_.endOffset) { + finished_ = true; + return messages; + } + break; + + case RdKafka::ERR__PARTITION_EOF: + // End of partition + LOG(INFO) << "Reached end of partition at offset: " << currentOffset_; + delete msg; + finished_ = true; + return messages; + + case RdKafka::ERR__TIMED_OUT: + // Timeout, no message available + delete msg; + break; + + default: + // Error + LOG(ERROR) << "Kafka consume error: " << msg->errstr(); + delete msg; + break; + } + } + + return messages; +} + +facebook::velox::RowVectorPtr KafkaReader::convertMessagesToRowVector( + const std::vector& messages) { + if (messages.empty()) { + return nullptr; + } + + auto rowType = outputType_->asRow(); + size_t numRows = messages.size(); + + // Create vectors for each column + std::vector childVectors; + for (size_t i = 0; i < rowType.size(); ++i) { + childVectors.push_back(createColumnVector(rowType.childAt(i), messages, i)); + } + + // Create and return RowVector + return std::make_shared( + pool(), + rowType.asRow(), + facebook::velox::BufferPtr(nullptr), + numRows, + childVectors); +} + +facebook::velox::VectorPtr KafkaReader::createColumnVector( + const facebook::velox::TypePtr& type, + const std::vector& messages, + size_t columnIndex) { + size_t numRows = messages.size(); + + // Handle common Kafka message fields based on column index + // Column 0: key (VARBINARY) + // Column 1: value (VARBINARY) + // Column 2: topic (VARCHAR) + // Column 3: partition (INTEGER) + // Column 4: offset (BIGINT) + // Column 5: timestamp (BIGINT) + + if (type->isVarbinary() || type->isVarchar()) { + auto flatVector = std::dynamic_pointer_cast>( + facebook::velox::BaseVector::create(type, numRows, pool())); + + for (size_t i = 0; i < numRows; ++i) { + const auto* msg = messages[i]; + + if (columnIndex == 0) { + // Key + if (msg->key()) { + flatVector->set(i, facebook::velox::StringView( + static_cast(msg->key()->c_str()), msg->key()->size())); + } else { + flatVector->setNull(i, true); + } + } else if (columnIndex == 1) { + // Value + if (msg->payload()) { + flatVector->set(i, facebook::velox::StringView( + static_cast(msg->payload()), msg->len())); + } else { + flatVector->setNull(i, true); + } + } else if (columnIndex == 2) { + // Topic + flatVector->set(i, facebook::velox::StringView(msg->topic_name())); + } + } + + return flatVector; + } else if (type->isInteger()) { + auto flatVector = std::dynamic_pointer_cast>( + facebook::velox::BaseVector::create(type, numRows, pool())); + + for (size_t i = 0; i < numRows; ++i) { + const auto* msg = messages[i]; + + if (columnIndex == 3) { + // Partition + flatVector->set(i, msg->partition()); + } + } + + return flatVector; + } else if (type->isBigint()) { + auto flatVector = std::dynamic_pointer_cast>( + facebook::velox::BaseVector::create(type, numRows, pool())); + + for (size_t i = 0; i < numRows; ++i) { + const auto* msg = messages[i]; + + if (columnIndex == 4) { + // Offset + flatVector->set(i, msg->offset()); + } else if (columnIndex == 5) { + // Timestamp + flatVector->set(i, msg->timestamp().timestamp); + } + } + + return flatVector; + } + + // Default: create empty vector + return facebook::velox::BaseVector::create(type, numRows, pool()); +} + +void KafkaReader::commitOffset(int64_t offset) { + if (!consumer_) { + return; + } + + std::vector partitions; + RdKafka::TopicPartition* partition = + RdKafka::TopicPartition::create(config_.topic, config_.partition, offset + 1); + partitions.push_back(partition); + + RdKafka::ErrorCode err = consumer_->commitSync(partitions); + if (err != RdKafka::ERR_NO_ERROR) { + LOG(ERROR) << "Failed to commit offset: " << RdKafka::err2str(err); + } else { + LOG(INFO) << "Committed offset: " << offset; + } + + delete partition; +} + +void KafkaReader::cleanup() { + if (consumer_) { + // Commit final offset + if (currentOffset_ > 0) { + commitOffset(currentOffset_); + } + + // Close consumer + consumer_->close(); + consumer_.reset(); + LOG(INFO) << "Kafka consumer closed. Total messages read: " << messagesRead_; + } + + conf_.reset(); + tconf_.reset(); +} + +facebook::velox::RowVectorPtr KafkaReader::getOutput() { + if (finished_ || !hasSplit_) { + return nullptr; + } + + // Initialize consumer on first call + if (!consumerInitialized_) { + try { + connectToKafka(); + } catch (const std::exception& e) { + LOG(ERROR) << "Failed to connect to Kafka: " << e.what(); + finished_ = true; + return nullptr; + } + } + + // Poll messages from Kafka + auto messages = pollMessages(); + + if (messages.empty()) { + // No messages available, but not finished yet + if (!finished_) { + return nullptr; + } + // Finished reading + return nullptr; + } + + // Convert messages to RowVector + auto result = convertMessagesToRowVector(messages); + + // Clean up messages + for (auto* msg : messages) { + delete msg; + } + + // Commit offset periodically (every batch) + if (currentOffset_ > 0) { + commitOffset(currentOffset_); + } + + return result; +} + +facebook::velox::BlockingReason KafkaReader::isBlocked( + facebook::velox::ContinueFuture* future) { + // Kafka consumer is non-blocking with timeout + return facebook::velox::BlockingReason::kNotBlocked; +} + +bool KafkaReader::isFinished() { + return (noMoreSplits_ && !hasSplit_) || finished_; +} + +void KafkaReader::addSplit( + std::shared_ptr split) { + // Extract Kafka configuration from split + // This is a simplified implementation - in practice, you'd parse the split + // to extract broker, topic, partition, and offset information + + hasSplit_ = true; + + // TODO: Parse split to populate config_ + // For now, using default configuration + config_.brokers = "localhost:9092"; + config_.topic = "test-topic"; + config_.partition = 0; + config_.groupId = "gluten-kafka-reader"; + + LOG(INFO) << "Added Kafka split: topic=" << config_.topic + << ", partition=" << config_.partition; +} + +void KafkaReader::noMoreSplits() { + noMoreSplits_ = true; + LOG(INFO) << "No more splits for KafkaReader"; +} } // namespace gluten + +// Made with Bob diff --git a/cpp/velox/operators/reader/KafkaReader.h b/cpp/velox/operators/reader/KafkaReader.h index 32d8b41bebcb..cd9e6a164bfe 100644 --- a/cpp/velox/operators/reader/KafkaReader.h +++ b/cpp/velox/operators/reader/KafkaReader.h @@ -17,49 +17,101 @@ #pragma once +#include +#include +#include +#include #include "velox/connectors/Connector.h" #include "velox/exec/Operator.h" +#include "velox/vector/ComplexVector.h" namespace gluten { +/// Configuration for Kafka consumer +struct KafkaReaderConfig { + std::string brokers; + std::string topic; + int32_t partition; + int64_t startOffset; + int64_t endOffset; + std::string groupId; + int32_t maxPollRecords; + int32_t pollTimeoutMs; + std::unordered_map additionalProps; + + KafkaReaderConfig() + : partition(0), + startOffset(RdKafka::Topic::OFFSET_BEGINNING), + endOffset(RdKafka::Topic::OFFSET_END), + maxPollRecords(1000), + pollTimeoutMs(1000) {} +}; + /// Kafka reader operator for streaming Kafka data -/// This is a placeholder implementation that will be extended -/// to support actual Kafka consumption in the Velox backend +/// Integrates with librdkafka to consume messages from Kafka topics class KafkaReader : public facebook::velox::exec::SourceOperator { public: KafkaReader( int32_t operatorId, facebook::velox::exec::DriverCtx* driverCtx, - const std::shared_ptr& planNode) - : SourceOperator( - driverCtx, - planNode->outputType(), - operatorId, - planNode->id(), - "KafkaReader") {} - - facebook::velox::RowVectorPtr getOutput() override { - // TODO: Implement actual Kafka reading logic - // This should: - // 1. Connect to Kafka broker - // 2. Read messages from the specified topic/partition - // 3. Convert Kafka messages to RowVector format - // 4. Handle offset management - return nullptr; - } + const std::shared_ptr& planNode); + + ~KafkaReader() override; + + facebook::velox::RowVectorPtr getOutput() override; facebook::velox::BlockingReason isBlocked( - facebook::velox::ContinueFuture* future) override { - return facebook::velox::BlockingReason::kNotBlocked; - } + facebook::velox::ContinueFuture* future) override; + + bool isFinished() override; - bool isFinished() override { - return noMoreSplits_ && !hasSplit_; - } + void addSplit(std::shared_ptr split) override; + + void noMoreSplits() override; private: + /// Initialize Kafka consumer with configuration + void initializeConsumer(); + + /// Connect to Kafka broker and subscribe to topic/partition + void connectToKafka(); + + /// Poll messages from Kafka + std::vector pollMessages(); + + /// Convert Kafka messages to RowVector format + facebook::velox::RowVectorPtr convertMessagesToRowVector( + const std::vector& messages); + + /// Handle offset management + void commitOffset(int64_t offset); + + /// Cleanup Kafka resources + void cleanup(); + + /// Create vector for a specific column based on type + facebook::velox::VectorPtr createColumnVector( + const facebook::velox::TypePtr& type, + const std::vector& messages, + size_t columnIndex); + + // Kafka consumer and configuration + std::unique_ptr consumer_; + std::unique_ptr conf_; + std::unique_ptr tconf_; + KafkaReaderConfig config_; + + // State management bool noMoreSplits_ = false; bool hasSplit_ = false; + bool consumerInitialized_ = false; + bool finished_ = false; + int64_t currentOffset_ = 0; + int64_t messagesRead_ = 0; + + // Buffer for batching + std::vector messageBuffer_; + size_t maxBatchSize_ = 1000; }; } // namespace gluten