Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Completed/demo3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
plugins {
id 'java'
}

group 'learning.kafka'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

repositories {
mavenCentral()
}

dependencies {
compile group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.0.5.RELEASE'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.1.5.RELEASE'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.example.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemo3Application {
public static void main(String[] args) {
SpringApplication.run(KafkaDemo3Application.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.example.kafka.controllers;

import com.example.kafka.models.Event;
import com.example.kafka.services.EventPublisher;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

@RestController
@RequestMapping("event")
public class EventController {

@Autowired
private EventPublisher eventPublisher;

@RequestMapping(method = RequestMethod.GET)
public boolean publishEvent(@RequestParam int user, @RequestParam String createdBy) throws ExecutionException, InterruptedException {
Event event = new Event(UUID.randomUUID().toString(), user, createdBy, new Date());
eventPublisher.publish(event);
return true;
}
}
25 changes: 25 additions & 0 deletions Completed/demo3/src/main/java/com/example/kafka/models/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.example.kafka.models;

import java.util.Date;

public class Event {
public String EventId;
public int UserId;
public String CreatedBy;
public Date CreatedAt;

public Event() {
}

public Event(String eventId, int userId, String createdBy, Date createdAt) {
this.EventId = eventId;
this.UserId = userId;
this.CreatedBy = createdBy;
this.CreatedAt = createdAt;
}

@Override
public String toString() {
return "Event(" + this.UserId + ", " + this.CreatedBy + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.example.kafka.services;

import com.example.kafka.models.Event;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class EventConsumer {

@KafkaListener(topics = "event-topic", groupId = "group3")
public void consume(@Payload Event event) {
System.out.println(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.example.kafka.services;

import com.example.kafka.models.Event;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.concurrent.ExecutionException;

@Service
public class EventPublisher {
@Value("${eventTopic}")
private String topicName;

@Autowired
private KafkaTemplate<Integer, Event> kafkaTemplate;

public void publish(Event event) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<Integer, Event>> resultFuture = kafkaTemplate.send(topicName, event.UserId, event);
resultFuture.addCallback(new SuccessCallback<SendResult<Integer, Event>>() {
@Override
public void onSuccess(SendResult<Integer, Event> result) {
System.out.println("********** success ***********");
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
System.out.println("failure");
}
});
}
}
17 changes: 17 additions & 0 deletions Completed/demo3/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
eventTopic: event-topic
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: 1
consumer:
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: '*'
auto-offset-reset: earliest
4 changes: 3 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ findProject(':Completed:reactor.kafka.json')?.name = 'reactor.kafka.json'
findProject(':Completed:reactor.kafka')?.name = 'reactor.kafka'
findProject(':Todo:todo.apache.kafka')?.name = 'todo.apache.kafka'
findProject(':Todo:todo.spring.kafka')?.name = 'todo.spring.kafka'
findProject(':Todo:todo.spring.kafka.json')?.name = 'todo.spring.kafka.json'
findProject(':Todo:todo.spring.kafka.json')?.name = 'todo.spring.kafka.json'
include 'Completed:demo3'
findProject(':Completed:demo3')?.name = 'demo3'