Skip to content

dementev-dev/nifi-kafka-postgres-lab

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Последнее обновление: 2026-01-08 | Версии: NiFi 1.27.0 / Kafka 3.8.0 / Postgres 16.4

Учебный стенд для знакомства с Apache NiFi и Kafka: NiFi + NiFi Registry + Postgres + Kafka + Kafka UI.

Содержание

Показать/скрыть

Требования

  • Docker Engine / Docker Desktop с поддержкой docker compose (Compose v2).
  • Свободные порты на машине: 18443, 18080, 5437, 9092, 8082.
  • Ресурсы (примерно): 2 CPU, 6–8 GB RAM.

Быстрый старт

Если вы запускаете стенд впервые — пройдите сценарий ниже. Если нужно просто запустить/остановить/сбросить — используйте шпаргалку в “Команды управления”.

Все команды docker compose ... запускайте в терминале ОС на хосте из корня репозитория (рядом с docker-compose.yml).

Сценарий первого запуска (5-10 минут)

  1. Поднимите стенд:
    docker compose up -d
    docker compose ps
  2. Дождитесь запуска NiFi (может занять 1–3 минуты), затем откройте:
  3. Инициализируйте демо-схемы/таблицы в Postgres (требуется один раз после первого старта или после docker compose down -v):
    docker compose exec -T postgres psql -U postgres -d app -f /nifi-templates/SampleKafka2Postgres.sql
  4. Импортируйте flow в NiFi из файлов:
    • nifi-templates/Sample2Kafka.json
    • nifi-templates/SampleKafka2Postgres.json Подробный гайд: MENTEE_GUIDE_JSON_FLOWS.md.
  5. Внутри каждого process group:
    • включите Controller Services;
    • запустите процессоры.
  6. Проверьте результат:
    • в Kafka UI в topic Sample2Kafka появляются сообщения;
    • в Postgres растёт число строк:
      docker compose exec -it postgres bash -c "export PGPASSWORD=postgres; psql -U postgres -d app -c 'SELECT COUNT(*) FROM ods.samplekafka2postgres;'"

Команды управления

docker compose up -d
docker compose ps

Остановить стенд:

docker compose stop

Запустить обратно (после stop состояние сохраняется):

docker compose start

Удалить контейнеры и сеть:

docker compose down

Затем поднять обратно (с сохранённым состоянием в томах):

docker compose up -d

Удалить ещё и тома (volumes) — полный сброс (деструктивно):

docker compose down -v

После docker compose down -v потребуется заново выполнить SQL-инициализацию и импортировать flow (см. сценарий выше).

Что сохраняется между перезапусками

  • docker compose stop/start — сохраняется всё (контейнеры не удаляются).
  • docker compose down — контейнеры удаляются, но тома (volumes) остаются: сохраняются NiFi (conf/state), NiFi Registry, Postgres.
  • docker compose down -v — полный сброс: удаляются и контейнеры, и тома (volumes).

Примечание: в этой конфигурации Kafka-сообщения/топики не сохраняются между docker compose downup (чтобы не копить дисковое пространство). Между stopstart Kafka сохраняется. Пример подключения тома (volume) для Kafka есть в docker-compose.yml.

Адреса и доступы

Важно про адреса (внутри Docker и с хоста)

⚠️ Критически важно: Если вы настраиваете подключение в NiFi, то localhost почти всегда будет неправильным (NiFi живёт в контейнере).

Почему так: внутри контейнера localhost — это сам контейнер. Контейнеры из одного docker compose видят друг друга по именам сервисов (Docker DNS), поэтому postgres, kafka, registry работают как “хосты”.

Используйте имена сервисов из docker-compose.yml:

  • Postgres (из NiFi): jdbc:postgresql://postgres:5432/app
  • Kafka (из NiFi): kafka:29092
  • Registry (из NiFi): http://registry:18080

А с локальной машины:

  • Postgres: jdbc:postgresql://localhost:5437/app
  • Kafka: localhost:9092

PostgreSQL

JDBC-драйвер для Postgres лежит в drivers/ и монтируется в контейнер NiFi как /opt/nifi/nifi-current/drivers/.

Параметры для DBCP в NiFi:

  • Database Connection URL: jdbc:postgresql://postgres:5432/app
  • Database Driver Class Name: org.postgresql.Driver
  • Database Driver Location(s): /opt/nifi/nifi-current/drivers/postgresql-42.7.4.jar
  • Database User: postgres
  • Password: postgres

Подключение через DBeaver (удобнее всего)

Postgres проброшен наружу на порт 5437, поэтому в DBeaver создайте подключение со следующими параметрами:

  • Host: localhost
  • Port: 5437
  • Database: app
  • Username: postgres
  • Password: postgres

Если DBeaver предложит скачать драйвер — соглашайтесь скачать/установить драйвер PostgreSQL.

Инициализация демо-схем/таблиц:

docker compose exec -T postgres psql -U postgres -d app -f /nifi-templates/SampleKafka2Postgres.sql

Запускайте это после первого старта или после docker compose down -v (скрипт не идемпотентный: при повторном запуске будут ошибки про существующие схемы/таблицы).

Через консоль (если нужно)

Просмотр данных:

docker compose exec -it postgres bash -c "export PGPASSWORD=postgres; psql -U postgres -d app"
select * from ods.samplekafka2postgres order by id desc limit 10;

Kafka

  • С локальной машины (например, для консольных утилит): localhost:9092
  • Из NiFi (внутри Docker): kafka:29092

Работа с Kafka

💡 Рекомендация: Для большинства задач используйте Kafka UI (http://localhost:8082/) — это удобный и наглядный веб-интерфейс для просмотра топиков, сообщений и consumer groups.

Kafka CLI (командная строка) полезен для:

  • Автоматизации и скриптов
  • Продвинутых операций (например, изменение конфигурации топиков)
  • Быстрой проверки без браузера

Базовые команды CLI (опционально)

⚠️ Важно: В Apache Kafka 3.8.0 скрипты находятся в /opt/kafka/bin/, поэтому нужно указывать полный путь.

Команды ниже выполняются внутри контейнера Kafka (через docker compose exec ...). Если вы запускаете Kafka CLI на хосте — используйте --bootstrap-server localhost:9092.

Чтение сообщений:

docker compose exec kafka /opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server kafka:29092 \
  --topic Sample2Kafka \
  --from-beginning

Запись сообщений:

Интерактивный режим (ввод сообщений вручную):

docker compose exec kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server kafka:29092 \
  --topic Sample2Kafka

После запуска введите сообщение и нажмите Enter. Нажмите Ctrl+C для выхода.

Отправка одной командой (через echo и pipe):

echo '{"dttm": 1704698992000, "txt": "Тестовое сообщение"}' | \
  docker compose exec -T kafka /opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server kafka:29092 \
  --topic Sample2Kafka

💡 Примечание: Флаг -T отключает TTY, что позволяет использовать pipe.

Просмотр списка топиков:

docker compose exec kafka /opt/kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka:29092 \
  --list

Примеры flow (шаблоны)

Короткий гайд для менти по JSON-версии потоков: MENTEE_GUIDE_JSON_FLOWS.md.

В nifi-templates/ лежат примеры:

  • Sample2Kafka.json — публикует JSON-сообщения в Kafka topic Sample2Kafka
  • SampleKafka2Postgres.json — читает JSON из Kafka topic Sample2Kafka и пишет в Postgres (в stg.samplekafka2postgres, затем вызывает ods.load_samplekafka2postgres())
  • Sample2Kafka-avro.json / SampleKafka2Postgres-avro.json — Avro-версии потоков

Памятка: как импортировать Process Group / Flow в NiFi

Файлы нужно загружать через браузер с вашей машины (каталог nifi-templates/).

Название пункта может отличаться в зависимости от UI/версии, но смысл один — “загрузить process group/flow definition из файла”:

  1. В верхнем меню найдите действие вроде Upload / Import / Process Group → выберите загрузку из файла.
  2. Выберите nifi-templates/SampleKafka2Postgres.json (или Sample2Kafka.json) и разместите process group на канвасе.

После импорта, как правило, нужно:

  • перейти внутрь process group;
  • включить Controller Services (Configure → Controller Services → Enable, или “enable all controller services”);
  • затем запустить процессоры.

Рекомендуемый минимальный сценарий:

  1. topic Sample2Kafka вручную создавать обычно не требуется: он создаётся автоматически при первой попытке записи (если в Kafka включено автосоздание топиков; по умолчанию оно включено). Если по какой-то причине topic не появился — создайте его в Kafka UI.
  2. Импортируйте flow в NiFi (в зависимости от UI: import/upload template для .xml или import flow definition для .json). Если вы импортировали раньше, то после docker compose down flow сохранится.
  3. Внутри flow включите Controller Services, затем запустите процессоры.

Kafka UI уже настроен в docker-compose.yml:

  • Cluster name: Kafka Cluster
  • Bootstrap Servers: kafka:29092

Shared folder (общая папка)

Каталог shared-folder/ на хосте смонтирован в контейнер NiFi как /opt/nifi/nifi-current/ls-target (удобно для ListFile/GetFile). Если после запуска контейнеров возникают проблемы с правами: sudo chown -R $USER shared-folder.

Полезные команды

docker compose logs -f nifi
docker compose logs -f kafka
docker compose exec postgres bash

Для доступа из NiFi к сервисам на локальной машине используйте host.docker.internal вместо localhost.

Проверка работоспособности

После запуска стенда проверьте, что все компоненты работают корректно:

Checklist

  • NiFi UI открывается по адресу http://localhost:18443/nifi/ (аутентификация не требуется)
  • Kafka UI показывает кластер по адресу http://localhost:8082/
  • В Kafka UI виден topic Sample2Kafka (если flow уже запущен)
  • Postgres отвечает на подключение через DBeaver или консоль:
    docker compose exec -it postgres bash -c "export PGPASSWORD=postgres; psql -U postgres -d app -c 'SELECT version();'"
  • Контейнеры запущены (все в статусе Up):
    docker compose ps

Проверка flow

Если вы импортировали и запустили примеры flow:

  • В NiFi процессоры запущены (зелёный индикатор)
  • В Kafka UI в topic Sample2Kafka появляются сообщения
  • В Postgres таблица ods.samplekafka2postgres заполняется данными:
    docker compose exec -it postgres bash -c "export PGPASSWORD=postgres; psql -U postgres -d app -c 'SELECT COUNT(*) FROM ods.samplekafka2postgres;'"

Если что-то не работает

Проблемы с запуском контейнеров

Симптом: Контейнеры не запускаются или сразу падают.

  • Решение: Проверьте логи:
    docker compose logs -f nifi
    docker compose logs -f kafka
    docker compose logs -f postgres
  • NiFi может запускаться 1–3 минуты — это нормально.

Симптом: Ошибка "port is already allocated".

  • Решение: Порты заняты. Измените проброс портов в docker-compose.yml или остановите процессы, использующие эти порты.

Проблемы с подключением в NiFi

Симптом: Процессор в NiFi не подключается к Kafka или Postgres.

  • Решение: Проверьте, что используете адреса "из NiFi" (см. раздел Важно про адреса):
    • Postgres: postgres:5432 (не localhost:5437)
    • Kafka: kafka:29092 (не localhost:9092)

Симптом: Controller Services выключены.

  • Решение: Внутри process group откройте Controller Services → выберите сервисы → Enable.

Проблемы с Kafka

Симптом: Сообщения не появляются в Kafka UI.

  • Решение:
    • Проверьте, что процессор PublishKafkaRecord_2_6 запущен и растёт счётчик "out"
    • Убедитесь, что topic Sample2Kafka существует (создаётся автоматически при первой записи)
    • Проверьте логи Kafka: docker compose logs -f kafka

Симптом: Consumer в NiFi не читает сообщения.

  • Решение:
    • Проверьте, что используете правильный bootstrap.servers: kafka:29092
    • Убедитесь, что consumer group не зафиксирован на другом offset (можно создать новый consumer group)

Проблемы с Postgres

Симптом: Ошибка при вставке данных в Postgres.

  • Решение:
    • Проверьте, что таблицы существуют: выполните SQL-скрипт SampleKafka2Postgres.sql
    • Убедитесь, что JDBC-драйвер подключён: /opt/nifi/nifi-current/drivers/postgresql-42.7.4.jar
    • Проверьте параметры подключения: jdbc:postgresql://postgres:5432/app, пользователь postgres, пароль postgres

Симптом: Таблица пустая, хотя flow работает.

  • Решение:
    • Проверьте, что сообщения проходят фильтр QueryRecord (relationship GOOD_DATE)
    • Посмотрите логи процессора PutDatabaseRecord на наличие ошибок
    • Убедитесь, что процедура ods.load_samplekafka2postgres() существует

Проблемы с правами доступа

Симптом: Ошибки доступа к shared-folder/.

  • Решение: Исправьте права:
    sudo chown -R $USER shared-folder

Дополнительная диагностика

Если проблема не решена:

  1. Посмотрите логи всех сервисов: docker compose logs
  2. Перезапустите проблемный контейнер: docker compose restart nifi (или другой сервис)
  3. Попробуйте полный сброс: docker compose down -v && docker compose up -d

    ⚠️ Внимание: Это удалит все данные из Postgres, NiFi и Registry. После этого потребуется заново выполнить /nifi-templates/SampleKafka2Postgres.sql и импортировать flow.

About

Учебный стенд NiFi/Kafka

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors