Skip to content

ImpossibleForge/pfc-kafka-consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pfc-kafka-consumer

Kafka consumer for PFC-JSONL log compression — consume messages from Kafka topics and compress them directly to .pfc format.

Commits Kafka offsets only after successful PFC compression — no data loss if the process crashes mid-flight.

License Part of PFC-JSONL Ecosystem


How it fits in your pipeline

Kafka / Redpanda
      │  topic: app-logs, access-logs, ...
      ▼
pfc-kafka-consumer          ← this service
      │  pfc_jsonl compress  (after each rotation)
      │  commit offsets      (only on success)
      ▼
kafka_20260115_100000.pfc   →  local disk or S3
      │
      ▼
Query with DuckDB / pfc-gateway

Quickstart

1. Install

pip install confluent-kafka toml
# Optional S3 upload:
pip install boto3

2. Download pfc_jsonl binary

# Linux x86_64
curl -L https://github.com/ImpossibleForge/pfc-jsonl/releases/latest/download/pfc_jsonl-linux-x86_64 \
     -o /usr/local/bin/pfc_jsonl && chmod +x /usr/local/bin/pfc_jsonl

# macOS ARM64
curl -L https://github.com/ImpossibleForge/pfc-jsonl/releases/latest/download/pfc_jsonl-macos-arm64 \
     -o /usr/local/bin/pfc_jsonl && chmod +x /usr/local/bin/pfc_jsonl

3. Configure

cp config/config.toml ./config.toml
# Edit brokers, topics, group_id

4. Start

python pfc_kafka_consumer.py --config config.toml
# 2026-01-15T10:00:00 [pfc-kafka] INFO pfc-kafka-consumer v0.1.0 started
# 2026-01-15T10:00:00 [pfc-kafka] INFO Topics: ['app-logs'] | Group: pfc-consumer

Configuration

[kafka]
brokers           = ["localhost:9092"]
topics            = ["app-logs", "access-logs"]
group_id          = "pfc-consumer"
auto_offset_reset = "earliest"   # or "latest"
poll_timeout_sec  = 1.0
batch_size        = 500

# Optional auth
security_protocol = "PLAINTEXT"  # PLAINTEXT | SSL | SASL_PLAINTEXT | SASL_SSL
sasl_mechanism    = ""           # PLAIN | SCRAM-SHA-256 | SCRAM-SHA-512
sasl_username     = ""
sasl_password     = ""
ssl_ca_location   = ""

[buffer]
rotate_mb             = 64
rotate_sec            = 3600
output_dir            = "/tmp/pfc-kafka"
prefix                = "kafka"
commit_after_compress = true     # safe default — commit only after successful compress

[pfc]
binary = "/usr/local/bin/pfc_jsonl"

[s3]
enabled = false
bucket  = "my-log-archive"
prefix  = "kafka-logs/"
region  = "us-east-1"

Output format

Each Kafka message becomes one flat JSONL line. JSON messages are merged; plain strings are wrapped.

JSON message:

{"timestamp": "2026-01-15T10:00:00.123Z", "level": "ERROR", "service": "payment"}

→ becomes:

{
  "timestamp": "2026-01-15T10:00:00.123Z",
  "level": "ERROR",
  "service": "payment",
  "_topic": "app-logs",
  "_partition": 2,
  "_offset": 84712,
  "_kafka_timestamp": "2026-01-15T10:00:00.123Z"
}

Plain string message:

2026-01-15T10:00:00 ERROR payment failed

→ becomes:

{
  "message": "2026-01-15T10:00:00 ERROR payment failed",
  "timestamp": "2026-01-15T10:00:00.123Z",
  "_topic": "app-logs",
  "_partition": 0,
  "_offset": 12345,
  "_kafka_timestamp": "2026-01-15T10:00:00.123Z"
}

Offset commit safety

commit_after_compress = true (default):

  • Messages are not committed to Kafka until the PFC file is written successfully
  • If the process crashes before compression completes, messages are re-consumed on restart
  • No data loss — at-least-once delivery guarantee

commit_after_compress = false:

  • Offsets committed immediately after polling
  • Higher throughput, but messages may be lost if compression fails

Confluent Cloud / MSK / Redpanda Cloud

[kafka]
brokers           = ["pkc-xxxx.us-east-1.aws.confluent.cloud:9092"]
security_protocol = "SASL_SSL"
sasl_mechanism    = "PLAIN"
sasl_username     = "YOUR_API_KEY"
sasl_password     = "YOUR_API_SECRET"

Querying compressed logs

-- DuckDB
INSTALL pfc FROM community;
LOAD pfc;

SELECT level, service, count(*)
FROM read_pfc_jsonl('kafka_20260115_100000.pfc',
                    ts_from=1768471200::BIGINT,
                    ts_to=1768471500::BIGINT)
WHERE line LIKE '%ERROR%'
GROUP BY level, service
ORDER BY 3 DESC;

Running tests

pip install pytest confluent-kafka toml
pytest tests/test_kafka_consumer.py tests/test_resilience.py -v

# Full E2E (requires Docker):
python3 tests/e2e_integration_test.py

Part of the PFC-JSONL Ecosystem

Repo What it does
pfc-jsonl Core compressor (BWT + rANS)
pfc-duckdb DuckDB community extension
pfc-fluentbit Native Fluent Bit output plugin
pfc-vector High-performance HTTP ingest daemon
pfc-otel-collector OpenTelemetry OTLP/HTTP exporter
pfc-gateway HTTP query gateway
pfc-migrate Migrate from gzip/zstd/S3/Azure/GCS
pfc-kafka-consumer Kafka / Redpanda consumer
pfc-grafana Grafana data source plugin for PFC archives


Disclaimer

PFC-Kafka-Consumer is an independent open-source project and is not affiliated with, endorsed by, or associated with the Apache Software Foundation, Apache Kafka, or Confluent.

License

pfc-kafka-consumer (this repository) is released under the MIT License — see LICENSE.

The PFC-JSONL binary (pfc_jsonl) is proprietary software — free for personal and open-source use. Commercial use requires a license: info@impossibleforge.com

About

Kafka consumer that compresses log messages directly to PFC format

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages