Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ public void alarmTopicListAjax(HttpServletResponse response, HttpServletRequest
obj.put("operate", "<a name='remove' href='#" + alertInfo.getId() + "' class='btn btn-danger btn-xs'>Remove</a>&nbsp<a name='modify' href='#" + alertInfo.getId() + "' class='btn btn-warning btn-xs'>Modify</a>&nbsp");
aaDatas.add(obj);
}

int count = alertService.alertCount(map);
JSONObject target = new JSONObject();
target.put("sEcho", sEcho);
target.put("iTotalRecords", alertService.alertCount(map));
target.put("iTotalDisplayRecords", alertService.alertCount(map));
target.put("iTotalRecords", count);
target.put("iTotalDisplayRecords", count);
target.put("aaData", aaDatas);
try {
byte[] output = target.toJSONString().getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
Expand All @@ -36,6 +37,7 @@
import org.smartloli.kafka.eagle.common.util.KConstants;
import org.smartloli.kafka.eagle.common.util.SystemConfigUtils;
import org.smartloli.kafka.eagle.web.service.ConsumerService;
import scala.Tuple2;

/**
* Kafka consumer controller to viewer data.
Expand Down Expand Up @@ -105,8 +107,8 @@ public void consumerTableAjax(HttpServletResponse response, HttpServletRequest r
String clusterAlias = session.getAttribute(KConstants.SessionAlias.CLUSTER_ALIAS).toString();

String formatter = SystemConfigUtils.getProperty(clusterAlias + ".kafka.eagle.offset.storage");
int count = consumerService.getConsumerCount(clusterAlias, formatter);
JSONArray consumers = JSON.parseArray(consumerService.getConsumer(clusterAlias, formatter, page));
Tuple2<String,Integer> reTuple = consumerService.getConsumer(clusterAlias, formatter, page);
JSONArray consumers = JSON.parseArray(reTuple._1);
JSONArray aaDatas = new JSONArray();
for (Object object : consumers) {
JSONObject consumer = (JSONObject) object;
Expand All @@ -126,8 +128,10 @@ public void consumerTableAjax(HttpServletResponse response, HttpServletRequest r

JSONObject target = new JSONObject();
target.put("sEcho", sEcho);
target.put("iTotalRecords", count);
target.put("iTotalDisplayRecords", count);
target.put("iTotalRecords", reTuple._2);
target.put("iTotalDisplayRecords", reTuple._2);


target.put("aaData", aaDatas);
try {
byte[] output = target.toJSONString().getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,19 @@
<where>
`cluster`=#{cluster}
<if test="search!=null and search!=''">
and (`group`=#{search} or `topic`=#{search})
and (`group` like '%'||#{search}||'%' or `topic` like '%'||#{search}||'%')
</if>
</where>
order by `id` desc limit #{start},#{size}
</select>

<select id="alertCount" parameterType="map" resultType="Integer">
select
count(*) from ke_alarm where `cluster`=#{cluster}
count(*) from ke_alarm
where `cluster`=#{cluster}
<if test="search!=null and search!=''">
and (`group` like '%'||#{search}||'%' or `topic` like '%'||#{search}||'%')
</if>
</select>

<select id="isExistAlertByCGT" parameterType="map" resultType="Integer">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.smartloli.kafka.eagle.web.service;

import org.smartloli.kafka.eagle.common.protocol.DisplayInfo;
import scala.Tuple2;

/**
* Kafka consumer data interface.
Expand All @@ -43,7 +44,7 @@ public interface ConsumerService {
public String getConsumerDetail(String clusterAlias, String formatter, String group);

/** Judge consumers storage offset in kafka or zookeeper interface. */
public String getConsumer(String clusterAlias, String formatter, DisplayInfo page);
public Tuple2<String,Integer> getConsumer(String clusterAlias, String formatter, DisplayInfo page);

/** Get consumer size from kafka topic interface. */
public int getConsumerCount(String clusterAlias, String formatter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@
*/
package org.smartloli.kafka.eagle.web.service.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import org.smartloli.kafka.eagle.common.protocol.ConsumerInfo;
import org.smartloli.kafka.eagle.common.protocol.DisplayInfo;
import org.smartloli.kafka.eagle.common.protocol.TopicConsumerInfo;
Expand All @@ -35,6 +28,13 @@
import org.smartloli.kafka.eagle.core.factory.KafkaService;
import org.smartloli.kafka.eagle.web.service.ConsumerService;
import org.springframework.stereotype.Service;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

/**
* Kafka consumer data interface, and set up the return data set.
Expand Down Expand Up @@ -114,7 +114,7 @@ public String getActiveTopic(String clusterAlias, String formatter) {
}

/** Get consumers from zookeeper. */
private String getConsumer(String clusterAlias, DisplayInfo page) {
private Tuple2<String, Integer> getConsumerZk(String clusterAlias, DisplayInfo page) {
Map<String, List<String>> consumers = kafkaService.getConsumers(clusterAlias, page);
List<ConsumerInfo> consumerPages = new ArrayList<ConsumerInfo>();
int id = 0;
Expand All @@ -127,15 +127,15 @@ private String getConsumer(String clusterAlias, DisplayInfo page) {
consumer.setActiveNumber(getActiveNumber(clusterAlias, entry.getKey(), entry.getValue()));
consumerPages.add(consumer);
}
return consumerPages.toString();
return new Tuple2<String,Integer>(consumerPages.toString(),consumers.size());
}

/** Judge consumers storage offset in kafka or zookeeper. */
public String getConsumer(String clusterAlias, String formatter, DisplayInfo page) {
public Tuple2<String,Integer> getConsumer(String clusterAlias, String formatter, DisplayInfo page) {
if ("kafka".equals(formatter)) {
return getKafkaConsumer(page, clusterAlias);
} else {
return getConsumer(clusterAlias, page);
return getConsumerZk(clusterAlias, page);
}
}

Expand Down Expand Up @@ -220,23 +220,25 @@ private String getKafkaActiveTopic(String clusterAlias) {
}

/** Get kafka consumer & storage offset in kafka topic. */
private String getKafkaConsumer(DisplayInfo page, String clusterAlias) {
private Tuple2<String,Integer> getKafkaConsumer(DisplayInfo page, String clusterAlias) {
List<ConsumerInfo> kafkaConsumerPages = new ArrayList<ConsumerInfo>();
JSONArray consumerGroups = JSON.parseArray(kafkaService.getKafkaConsumer(clusterAlias));
int offset = 0;
int id = 0;
for (Object object : consumerGroups) {
JSONObject consumerGroup = (JSONObject) object;
String group = consumerGroup.getString("group");
if (page.getSearch().length() > 0 && page.getSearch().equals(group)) {
ConsumerInfo consumer = new ConsumerInfo();
consumer.setGroup(group);
consumer.setId(++id);
consumer.setNode(consumerGroup.getString("node"));
consumer.setActiveNumber(JSON.parseObject(kafkaService.getKafkaActiverSize(clusterAlias, group)).getInteger("activers"));
consumer.setTopics(JSON.parseObject(kafkaService.getKafkaActiverSize(clusterAlias, group)).getInteger("topics"));
kafkaConsumerPages.add(consumer);
break;
if (page.getSearch().length() > 0 && group.contains(page.getSearch())) {
if (offset < (page.getiDisplayLength() + page.getiDisplayStart()) && offset >= page.getiDisplayStart()) {
ConsumerInfo consumer = new ConsumerInfo();
consumer.setGroup(group);
consumer.setId(++id);
consumer.setNode(consumerGroup.getString("node"));
consumer.setActiveNumber(JSON.parseObject(kafkaService.getKafkaActiverSize(clusterAlias, group)).getInteger("activers"));
consumer.setTopics(JSON.parseObject(kafkaService.getKafkaActiverSize(clusterAlias, group)).getInteger("topics"));
kafkaConsumerPages.add(consumer);
}
offset++;
} else if (page.getSearch().length() == 0) {
if (offset < (page.getiDisplayLength() + page.getiDisplayStart()) && offset >= page.getiDisplayStart()) {
ConsumerInfo consumer = new ConsumerInfo();
Expand All @@ -250,7 +252,7 @@ private String getKafkaConsumer(DisplayInfo page, String clusterAlias) {
offset++;
}
}
return kafkaConsumerPages.toString();
return new Tuple2<>(kafkaConsumerPages.toString(),offset);
}

/** Get consumer detail from kafka topic. */
Expand Down