diff --git a/Completed/demo3/build.gradle b/Completed/demo3/build.gradle new file mode 100644 index 0000000..ad4cb6d --- /dev/null +++ b/Completed/demo3/build.gradle @@ -0,0 +1,18 @@ +plugins { + id 'java' +} + +group 'learning.kafka' +version '1.0-SNAPSHOT' + +sourceCompatibility = 1.8 + +repositories { + mavenCentral() +} + +dependencies { + compile group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.0.5.RELEASE' + compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.5.RELEASE' + testCompile group: 'junit', name: 'junit', version: '4.12' +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/KafkaDemo3Application.java b/Completed/demo3/src/main/java/com/example/kafka/KafkaDemo3Application.java new file mode 100644 index 0000000..a2d699b --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/KafkaDemo3Application.java @@ -0,0 +1,11 @@ +package com.example.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaDemo3Application { + public static void main(String[] args) { + SpringApplication.run(KafkaDemo3Application.class, args); + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java b/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java new file mode 100644 index 0000000..afca7d5 --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/controllers/EventController.java @@ -0,0 +1,29 @@ +package com.example.kafka.controllers; + +import com.example.kafka.models.Event; +import com.example.kafka.services.EventPublisher; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Date; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +@RestController +@RequestMapping("event") +public class EventController { + + @Autowired + private EventPublisher eventPublisher; + + @RequestMapping(method = RequestMethod.GET) + public boolean publishEvent(@RequestParam int user, @RequestParam String createdBy) throws ExecutionException, InterruptedException { + Event event = new Event(UUID.randomUUID().toString(), user, createdBy, new Date()); + eventPublisher.publish(event); + return true; + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/models/Event.java b/Completed/demo3/src/main/java/com/example/kafka/models/Event.java new file mode 100644 index 0000000..ef10ffa --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/models/Event.java @@ -0,0 +1,25 @@ +package com.example.kafka.models; + +import java.util.Date; + +public class Event { + public String EventId; + public int UserId; + public String CreatedBy; + public Date CreatedAt; + + public Event() { + } + + public Event(String eventId, int userId, String createdBy, Date createdAt) { + this.EventId = eventId; + this.UserId = userId; + this.CreatedBy = createdBy; + this.CreatedAt = createdAt; + } + + @Override + public String toString() { + return "Event(" + this.UserId + ", " + this.CreatedBy + ")"; + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/services/EventConsumer.java b/Completed/demo3/src/main/java/com/example/kafka/services/EventConsumer.java new file mode 100644 index 0000000..ed16b69 --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/services/EventConsumer.java @@ -0,0 +1,15 @@ +package com.example.kafka.services; + +import com.example.kafka.models.Event; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +@Service +public class EventConsumer { + + @KafkaListener(topics = "event-topic", groupId = "group3") + public void consume(@Payload Event event) { + System.out.println(event); + } +} diff --git a/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java b/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java new file mode 100644 index 0000000..9cc6628 --- /dev/null +++ b/Completed/demo3/src/main/java/com/example/kafka/services/EventPublisher.java @@ -0,0 +1,38 @@ +package com.example.kafka.services; + +import com.example.kafka.models.Event; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.FailureCallback; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.SuccessCallback; + +import java.util.concurrent.ExecutionException; + +@Service +public class EventPublisher { + @Value("${eventTopic}") + private String topicName; + + @Autowired + private KafkaTemplate kafkaTemplate; + + public void publish(Event event) throws ExecutionException, InterruptedException { + ListenableFuture> resultFuture = kafkaTemplate.send(topicName, event.UserId, event); + resultFuture.addCallback(new SuccessCallback>() { + @Override + public void onSuccess(SendResult result) { + System.out.println("********** success ***********"); + } + }, new FailureCallback() { + @Override + public void onFailure(Throwable ex) { + System.out.println("failure"); + } + }); + } +} diff --git a/Completed/demo3/src/main/resources/application.yml b/Completed/demo3/src/main/resources/application.yml new file mode 100644 index 0000000..b1071db --- /dev/null +++ b/Completed/demo3/src/main/resources/application.yml @@ -0,0 +1,17 @@ +eventTopic: event-topic +spring: + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.IntegerSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + acks: 1 + consumer: + key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring: + json: + trusted: + packages: '*' + auto-offset-reset: earliest \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 0ed0e20..f9738f8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,4 +14,6 @@ findProject(':Completed:reactor.kafka.json')?.name = 'reactor.kafka.json' findProject(':Completed:reactor.kafka')?.name = 'reactor.kafka' findProject(':Todo:todo.apache.kafka')?.name = 'todo.apache.kafka' findProject(':Todo:todo.spring.kafka')?.name = 'todo.spring.kafka' -findProject(':Todo:todo.spring.kafka.json')?.name = 'todo.spring.kafka.json' \ No newline at end of file +findProject(':Todo:todo.spring.kafka.json')?.name = 'todo.spring.kafka.json' +include 'Completed:demo3' +findProject(':Completed:demo3')?.name = 'demo3'