From ab8ce4ddf739cf12d9bfe8b99cb5f5f39ccdbb67 Mon Sep 17 00:00:00 2001 From: Chandan Date: Tue, 30 Jun 2020 16:14:12 +0530 Subject: [PATCH 1/2] Add demo3 --- Completed/demo3/build.gradle | 18 ++++++++++++ .../example/kafka/KafkaDemo3Application.java | 11 ++++++++ .../kafka/controllers/EventController.java | 28 +++++++++++++++++++ .../java/com/example/kafka/models/Event.java | 17 +++++++++++ .../kafka/services/EventPublisher.java | 25 +++++++++++++++++ .../demo3/src/main/resources/application.yml | 8 ++++++ settings.gradle | 4 ++- 7 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 Completed/demo3/build.gradle create mode 100644 Completed/demo3/src/main/java/com/example/kafka/KafkaDemo3Application.java create mode 100644 Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java create mode 100644 Completed/demo3/src/main/java/com/example/kafka/models/Event.java create mode 100644 Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java create mode 100644 Completed/demo3/src/main/resources/application.yml diff --git a/Completed/demo3/build.gradle b/Completed/demo3/build.gradle new file mode 100644 index 0000000..ad4cb6d --- /dev/null +++ b/Completed/demo3/build.gradle @@ -0,0 +1,18 @@ +plugins { + id 'java' +} + +group 'learning.kafka' +version '1.0-SNAPSHOT' + +sourceCompatibility = 1.8 + +repositories { + mavenCentral() +} + +dependencies { + compile group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.0.5.RELEASE' + compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.5.RELEASE' + testCompile group: 'junit', name: 'junit', version: '4.12' +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/KafkaDemo3Application.java b/Completed/demo3/src/main/java/com/example/kafka/KafkaDemo3Application.java new file mode 100644 index 0000000..a2d699b --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/KafkaDemo3Application.java @@ -0,0 +1,11 @@ +package com.example.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaDemo3Application { + public static void main(String[] args) { + SpringApplication.run(KafkaDemo3Application.class, args); + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java b/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java new file mode 100644 index 0000000..d6f58bf --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java @@ -0,0 +1,28 @@ +package com.example.kafka.controllers; + +import com.example.kafka.models.Event; +import com.example.kafka.services.EventPublisher; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Date; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +@RestController +@RequestMapping("event") +public class EventController { + + @Autowired + private EventPublisher eventPublisher; + + @RequestMapping(method = RequestMethod.GET) + public RecordMetadata publishEvent(@RequestParam int user, @RequestParam String createdBy) throws ExecutionException, InterruptedException { + Event event = new Event(UUID.randomUUID().toString(), user, createdBy, new Date()); + return eventPublisher.publish(event); + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/models/Event.java b/Completed/demo3/src/main/java/com/example/kafka/models/Event.java new file mode 100644 index 0000000..04137eb --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/models/Event.java @@ -0,0 +1,17 @@ +package com.example.kafka.models; + +import java.util.Date; + +public class Event { + public String EventId; + public int UserId; + public String CreatedBy; + public Date CreatedAt; + + public Event(String eventId, int userId, String createdBy, Date createdAt) { + this.EventId = eventId; + this.UserId = userId; + this.CreatedBy = createdBy; + this.CreatedAt = createdAt; + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java b/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java new file mode 100644 index 0000000..943dae2 --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java @@ -0,0 +1,25 @@ +package com.example.kafka.services; + +import com.example.kafka.models.Event; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; + +import java.util.concurrent.ExecutionException; + +@Service +public class EventPublisher { + @Value("${eventTopic}") + private String topicName; + + @Autowired + private KafkaTemplate kafkaTemplate; + + public RecordMetadata publish(Event event) throws ExecutionException, InterruptedException { + SendResult result = kafkaTemplate.send(topicName, event.UserId, event).get(); + return result.getRecordMetadata(); + } +} diff --git a/Completed/demo3/src/main/resources/application.yml b/Completed/demo3/src/main/resources/application.yml new file mode 100644 index 0000000..909fad1 --- /dev/null +++ b/Completed/demo3/src/main/resources/application.yml @@ -0,0 +1,8 @@ +eventTopic: event-topic +spring: + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.IntegerSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + diff --git a/settings.gradle b/settings.gradle index 0ed0e20..f9738f8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,6 @@ findProject(':Completed:reactor.kafka.json')?.name = 'reactor.kafka.json' findProject(':Completed:reactor.kafka')?.name = 'reactor.kafka' findProject(':Todo:todo.apache.kafka')?.name = 'todo.apache.kafka' findProject(':Todo:todo.spring.kafka')?.name = 'todo.spring.kafka' -findProject(':Todo:todo.spring.kafka.json')?.name = 'todo.spring.kafka.json' \ No newline at end of file +findProject(':Todo:todo.spring.kafka.json')?.name = 'todo.spring.kafka.json' +include 'Completed:demo3' +findProject(':Completed:demo3')?.name = 'demo3' From 520b9935b62a3f3023340a36c11246c9d04a24fc Mon Sep 17 00:00:00 2001 From: Chandan Date: Wed, 1 Jul 2020 12:07:34 +0530 Subject: [PATCH 2/2] Add Kafka consumer --- .../kafka/controllers/EventController.java | 5 +++-- .../java/com/example/kafka/models/Event.java | 8 ++++++++ .../example/kafka/services/EventConsumer.java | 15 +++++++++++++++ .../kafka/services/EventPublisher.java | 19 ++++++++++++++++--- .../demo3/src/main/resources/application.yml | 11 ++++++++++- 5 files changed, 52 insertions(+), 6 deletions(-) create mode 100644 Completed/demo3/src/main/java/com/example/kafka/services/EventConsumer.java diff --git a/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java b/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java index d6f58bf..afca7d5 100644 --- a/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java +++ b/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java @@ -21,8 +21,9 @@ public class EventController { private EventPublisher eventPublisher; @RequestMapping(method = RequestMethod.GET) - public RecordMetadata publishEvent(@RequestParam int user, @RequestParam String createdBy) throws ExecutionException, InterruptedException { + public boolean publishEvent(@RequestParam int user, @RequestParam String createdBy) throws ExecutionException, InterruptedException { Event event = new Event(UUID.randomUUID().toString(), user, createdBy, new Date()); - return eventPublisher.publish(event); + eventPublisher.publish(event); + return true; } } diff --git a/Completed/demo3/src/main/java/com/example/kafka/models/Event.java b/Completed/demo3/src/main/java/com/example/kafka/models/Event.java index 04137eb..ef10ffa 100644 --- a/Completed/demo3/src/main/java/com/example/kafka/models/Event.java +++ b/Completed/demo3/src/main/java/com/example/kafka/models/Event.java @@ -8,10 +8,18 @@ public class Event { public String CreatedBy; public Date CreatedAt; + public Event() { + } + public Event(String eventId, int userId, String createdBy, Date createdAt) { this.EventId = eventId; this.UserId = userId; this.CreatedBy = createdBy; this.CreatedAt = createdAt; } + + @Override + public String toString() { + return "Event(" + this.UserId + ", " + this.CreatedBy + ")"; + } } diff --git a/Completed/demo3/src/main/java/com/example/kafka/services/EventConsumer.java b/Completed/demo3/src/main/java/com/example/kafka/services/EventConsumer.java new file mode 100644 index 0000000..ed16b69 --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/services/EventConsumer.java @@ -0,0 +1,15 @@ +package com.example.kafka.services; + +import com.example.kafka.models.Event; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +@Service +public class EventConsumer { + + @KafkaListener(topics = "event-topic", groupId = "group3") + public void consume(@Payload Event event) { + System.out.println(event); + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java b/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java index 943dae2..9cc6628 100644 --- a/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java +++ b/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java @@ -7,6 +7,9 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.FailureCallback; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.SuccessCallback; import java.util.concurrent.ExecutionException; @@ -18,8 +21,18 @@ public class EventPublisher { @Autowired private KafkaTemplate kafkaTemplate; - public RecordMetadata publish(Event event) throws ExecutionException, InterruptedException { - SendResult result = kafkaTemplate.send(topicName, event.UserId, event).get(); - return result.getRecordMetadata(); + public void publish(Event event) throws ExecutionException, InterruptedException { + ListenableFuture> resultFuture = kafkaTemplate.send(topicName, event.UserId, event); + resultFuture.addCallback(new SuccessCallback>() { + @Override + public void onSuccess(SendResult result) { + System.out.println("********** success ***********"); + } + }, new FailureCallback() { + @Override + public void onFailure(Throwable ex) { + System.out.println("failure"); + } + }); } } diff --git a/Completed/demo3/src/main/resources/application.yml b/Completed/demo3/src/main/resources/application.yml index 909fad1..b1071db 100644 --- a/Completed/demo3/src/main/resources/application.yml +++ b/Completed/demo3/src/main/resources/application.yml @@ -5,4 +5,13 @@ spring: producer: key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer - + acks: 1 + consumer: + key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring: + json: + trusted: + packages: '*' + auto-offset-reset: earliest \ No newline at end of file