From 19438f43535f79e7cf70fb0d58e4613f08edb1b8 Mon Sep 17 00:00:00 2001 From: shaoyongzhi Date: Mon, 8 Feb 2021 17:25:38 +0800 Subject: [PATCH] issues-470 fix --- .../eagle/common/util/KafkaZKPoolUtils.java | 82 ++++++------------- 1 file changed, 25 insertions(+), 57 deletions(-) diff --git a/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java b/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java index 80043b28..38a72fd8 100644 --- a/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java +++ b/kafka-eagle-common/src/main/java/org/smartloli/kafka/eagle/common/util/KafkaZKPoolUtils.java @@ -17,17 +17,12 @@ */ package org.smartloli.kafka.eagle.common.util; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - +import kafka.zk.KafkaZkClient; import org.apache.kafka.common.security.JaasUtils; import org.apache.kafka.common.utils.Time; -import org.smartloli.kafka.eagle.common.util.KConstants.OperateSystem; -import kafka.zk.KafkaZkClient; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; /** * KafkaZkClient pool utils. @@ -50,15 +45,12 @@ public final class KafkaZKPoolUtils { public static final int ZK_SESSION_TIMEOUT_MS = 30_000; private static final String METRIC_GROUP_NAME = "topic-management-service"; - private static String errorMessageByZookeeper = "Get pool,and available size [{}]"; - private static String releaseMessageByZookeeper = "Release pool,and available size [{}]"; - /** Init ZkClient pool numbers. */ static { for (String clusterAlias : SystemConfigUtils.getPropertyArray("kafka.eagle.zk.cluster.alias", ",")) { clusterAliass.put(clusterAlias, SystemConfigUtils.getProperty(clusterAlias + ".zk.list")); } - for (Entry entry : clusterAliass.entrySet()) { + for (Map.Entry entry : clusterAliass.entrySet()) { List zkCliPool = new ArrayList<>(zkCliPoolSize); KafkaZkClient zkc = null; for (int i = 0; i < zkCliPoolSize; i++) { @@ -94,60 +86,36 @@ public static synchronized KafkaZKPoolUtils getInstance() { return instance; } + private boolean zkClientIsActive(KafkaZkClient zkc){ + if(zkc==null){ + return false; + } + try{ + zkc.getAllBrokersInCluster(); + return true; + }catch (Exception e){ + ErrorUtils.print(KafkaZKPoolUtils.class).error("zookeeper client not alive!", e); + } + return false; + } + /** Reback pool one of ZkClient object. */ - public synchronized KafkaZkClient getZkClient(String clusterAlias) { + public KafkaZkClient getZkClient(String clusterAlias) { + //fix issues-470,ZooKeeper client is thread safe. + //So, in general, we can keep the persistent connection. List zkCliPool = zkCliPools.get(clusterAlias); - KafkaZkClient zkc = null; - try { - if (!zkCliPool.isEmpty()) { - zkc = zkCliPool.get(0); - zkCliPool.remove(0); - String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue()); - if (osName.contains(OperateSystem.LINUX.getValue())) { - ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliPool.size()); - } else { - ErrorUtils.print(KafkaZKPoolUtils.class).info(errorMessageByZookeeper, zkCliPool.size()); - } - } else { - for (int i = 0; i < zkCliPoolSize; i++) { - zkc = KafkaZkClient.apply(clusterAliass.get(clusterAlias), JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener"); - if (zkc != null) { - zkCliPool.add(zkc); - } - } - - zkc = zkCliPool.get(0); - zkCliPool.remove(0); - String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue()); - if (osName.contains(OperateSystem.LINUX.getValue())) { - ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliPool.size()); - } else { - ErrorUtils.print(KafkaZKPoolUtils.class).warn(errorMessageByZookeeper, zkCliPool.size()); - } - } - } catch (Exception e) { - ErrorUtils.print(KafkaZKPoolUtils.class).error("Error initializing zookeeper, msg is ", e); - ErrorUtils.print(KafkaZKPoolUtils.class).error("Kafka cluster[" + clusterAlias + ".zk.list] address has null."); - } - return zkc; + int n = ThreadLocalRandom.current().nextInt(zkCliPool.size()); + return zkCliPool.get(n); } /** Release ZkClient object. */ - public synchronized void release(String clusterAlias, KafkaZkClient zkc) { - List zkCliPool = zkCliPools.get(clusterAlias); - if (zkCliPool != null && zkCliPool.size() < zkCliPoolSize) { - zkCliPool.add(zkc); - } - String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue()); - if (osName.contains(OperateSystem.LINUX.getValue())) { - ErrorUtils.print(KafkaZKPoolUtils.class).debug(releaseMessageByZookeeper, (zkCliPool == null ? 0 : zkCliPool.size())); - } else { - ErrorUtils.print(KafkaZKPoolUtils.class).info(releaseMessageByZookeeper, (zkCliPool == null ? 0 : zkCliPool.size())); - } + public void release(String clusterAlias, KafkaZkClient zkc) { + //do nothing.just keep code style } /** Construction method. */ private KafkaZKPoolUtils() { } + }