From 51f17d4125c25b3bf8d3beb96a0a07fd7039a244 Mon Sep 17 00:00:00 2001 From: frommymind Date: Tue, 1 Dec 2020 09:08:13 +0800 Subject: [PATCH 1/2] Fix more zookeeper connection isure [#216](https://github.com/smartloli/kafka-eagle/issues/216) --- .../eagle/common/util/KafkaZKPoolUtils.java | 78 +++++++++++++------ 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java b/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java index 80043b28..e29d185b 100644 --- a/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java +++ b/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java @@ -40,7 +40,9 @@ public final class KafkaZKPoolUtils { private static KafkaZKPoolUtils instance = null; /** Zookeeper client connection pool. */ - private static Map> zkCliPools = new HashMap<>(); + private static Map> zkCliIdlePools = new HashMap<>(); + /** Zookeeper client current using connection poll. */ + private static Map> zkCliUsedPools = new HashMap<>(); /** Set pool max size. */ private static int zkCliPoolSize = SystemConfigUtils.getIntProperty("kafka.zk.limit.size"); @@ -51,6 +53,7 @@ public final class KafkaZKPoolUtils { private static final String METRIC_GROUP_NAME = "topic-management-service"; private static String errorMessageByZookeeper = "Get pool,and available size [{}]"; + private static String warningMessageByZookeeper = "Current used pool size [{}], need wait."; private static String releaseMessageByZookeeper = "Release pool,and available size [{}]"; /** Init ZkClient pool numbers. */ @@ -59,7 +62,8 @@ public final class KafkaZKPoolUtils { clusterAliass.put(clusterAlias, SystemConfigUtils.getProperty(clusterAlias + ".zk.list")); } for (Entry entry : clusterAliass.entrySet()) { - List zkCliPool = new ArrayList<>(zkCliPoolSize); + List zkCliIdlePool = new ArrayList<>(zkCliPoolSize); + List zkCliUsedPool = new ArrayList<>(zkCliPoolSize); KafkaZkClient zkc = null; for (int i = 0; i < zkCliPoolSize; i++) { try { @@ -75,13 +79,14 @@ public final class KafkaZKPoolUtils { ErrorUtils.print(KafkaZKPoolUtils.class).error("ClusterAlias[" + entry.getKey() + "] add acl has error, msg is ", e); } } - zkCliPool.add(zkc); + zkCliIdlePool.add(zkc); } } catch (Exception e) { ErrorUtils.print(KafkaZKPoolUtils.class).error("Error initializing zookeeper, msg is ", e); } } - zkCliPools.put(entry.getKey(), zkCliPool); + zkCliIdlePools.put(entry.getKey(), zkCliIdlePool); + zkCliUsedPools.put(entry.getKey(), zkCliUsedPool); } } @@ -96,33 +101,58 @@ public static synchronized KafkaZKPoolUtils getInstance() { /** Reback pool one of ZkClient object. */ public synchronized KafkaZkClient getZkClient(String clusterAlias) { - List zkCliPool = zkCliPools.get(clusterAlias); + long nowTime = System.currentTimeMillis(); + List zkCliIdlePool = zkCliIdlePools.get(clusterAlias); + // Connecting pool + List zkCliUsedPool = zkCliUsedPools.get(clusterAlias); KafkaZkClient zkc = null; try { - if (!zkCliPool.isEmpty()) { - zkc = zkCliPool.get(0); - zkCliPool.remove(0); + // if idle pool is not none, client can get one connection from it. + if (!zkCliIdlePool.isEmpty()) { + zkc = zkCliIdlePool.get(0); + zkCliIdlePool.remove(0); + zkCliUsedPool.add(zkc); String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue()); if (osName.contains(OperateSystem.LINUX.getValue())) { - ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliPool.size()); + ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliIdlePool.size()); } else { - ErrorUtils.print(KafkaZKPoolUtils.class).info(errorMessageByZookeeper, zkCliPool.size()); + ErrorUtils.print(KafkaZKPoolUtils.class).info(errorMessageByZookeeper, zkCliIdlePool.size()); } - } else { - for (int i = 0; i < zkCliPoolSize; i++) { + } + // else if idle pool is null, and used pool have the same size as zkCliPoolSize, then we should wait. + else if (zkCliUsedPool.size() == zkCliPoolSize){ + ErrorUtils.print(KafkaZKPoolUtils.class).warn(warningMessageByZookeeper, zkCliIdlePool.size()); + long waitUsed = System.currentTimeMillis() - nowTime; + boolean getResource = false; + while (waitUsed< ZK_CONNECTION_TIMEOUT_MS && !getResource){ + if (!zkCliIdlePool.isEmpty()){ + zkc = zkCliIdlePool.get(0); + zkCliIdlePool.remove(0); + zkCliUsedPool.add(zkc); + getResource = true; + }else { + Thread.sleep(1000); + System.out.println("Sleep 1000 to wait "); + } + waitUsed = 1000 + waitUsed; + } + if(!getResource) { + System.out.println("Cannot get zookeeper connection, please increase zookeeper pool size ."); + } + } + // else if idle pool is null ,and used pool size less than zkCliPoolSize, then we can create new connection. + else{ + for (int i = 0; i < zkCliPoolSize-zkCliUsedPool.size(); i++) { zkc = KafkaZkClient.apply(clusterAliass.get(clusterAlias), JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener"); if (zkc != null) { - zkCliPool.add(zkc); + zkCliIdlePool.add(zkc); } } - - zkc = zkCliPool.get(0); - zkCliPool.remove(0); String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue()); if (osName.contains(OperateSystem.LINUX.getValue())) { - ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliPool.size()); + ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliIdlePool.size()); } else { - ErrorUtils.print(KafkaZKPoolUtils.class).warn(errorMessageByZookeeper, zkCliPool.size()); + ErrorUtils.print(KafkaZKPoolUtils.class).warn(errorMessageByZookeeper, zkCliIdlePool.size()); } } } catch (Exception e) { @@ -134,15 +164,17 @@ public synchronized KafkaZkClient getZkClient(String clusterAlias) { /** Release ZkClient object. */ public synchronized void release(String clusterAlias, KafkaZkClient zkc) { - List zkCliPool = zkCliPools.get(clusterAlias); - if (zkCliPool != null && zkCliPool.size() < zkCliPoolSize) { - zkCliPool.add(zkc); + List zkCliIdlePool = zkCliIdlePools.get(clusterAlias); + List zkCliUsedPool = zkCliUsedPools.get(clusterAlias); + if (zkCliIdlePool != null && zkCliIdlePool.size() < zkCliPoolSize && zkCliUsedPool !=null) { + zkCliUsedPool.remove(zkc); + zkCliIdlePool.add(zkc); } String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue()); if (osName.contains(OperateSystem.LINUX.getValue())) { - ErrorUtils.print(KafkaZKPoolUtils.class).debug(releaseMessageByZookeeper, (zkCliPool == null ? 0 : zkCliPool.size())); + ErrorUtils.print(KafkaZKPoolUtils.class).debug(releaseMessageByZookeeper, (zkCliIdlePool == null ? 0 : zkCliIdlePool.size())); } else { - ErrorUtils.print(KafkaZKPoolUtils.class).info(releaseMessageByZookeeper, (zkCliPool == null ? 0 : zkCliPool.size())); + ErrorUtils.print(KafkaZKPoolUtils.class).info(releaseMessageByZookeeper, (zkCliIdlePool == null ? 0 : zkCliIdlePool.size())); } } From c7e3e8080bfa6783cb322048459300644f0fbe0d Mon Sep 17 00:00:00 2001 From: frommymind Date: Tue, 1 Dec 2020 16:39:35 +0800 Subject: [PATCH 2/2] =?UTF-8?q?Fix=20BScreen=E6=98=BE=E7=A4=BA=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=20[#404]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../smartloli/kafka/eagle/web/quartz/TopicRankSubTask.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka-eagle-web/src/main/java/org/smartloli/kafka/eagle/web/quartz/TopicRankSubTask.java b/kafka-eagle-web/src/main/java/org/smartloli/kafka/eagle/web/quartz/TopicRankSubTask.java index baecc9dc..dd84f7da 100644 --- a/kafka-eagle-web/src/main/java/org/smartloli/kafka/eagle/web/quartz/TopicRankSubTask.java +++ b/kafka-eagle-web/src/main/java/org/smartloli/kafka/eagle/web/quartz/TopicRankSubTask.java @@ -254,9 +254,12 @@ private void topicProducerLogSizeStats() { TopicLogSize lastTopicLogSize = dashboardServiceImpl.readLastTopicLogSize(params); TopicLogSize topicLogSize = new TopicLogSize(); if (lastTopicLogSize == null || lastTopicLogSize.getLogsize() == 0) { - topicLogSize.setDiffval(0); + topicLogSize.setDiffval(logsize); } else { - topicLogSize.setDiffval(logsize - lastTopicLogSize.getLogsize()); + // if we get the logsize and it bigger than lasttime, we can set diffval. + if (logsize - lastTopicLogSize.getLogsize() > 0){ + topicLogSize.setDiffval(logsize - lastTopicLogSize.getLogsize()); + } } // stats producer threads if (topicLogSize.getDiffval() > 0) {