Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@
*/
package org.smartloli.kafka.eagle.common.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.utils.Time;
import org.smartloli.kafka.eagle.common.util.KConstants.OperateSystem;

import kafka.zk.KafkaZkClient;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

/**
* KafkaZkClient pool utils.
Expand All @@ -50,15 +45,12 @@ public final class KafkaZKPoolUtils {
public static final int ZK_SESSION_TIMEOUT_MS = 30_000;
private static final String METRIC_GROUP_NAME = "topic-management-service";

private static String errorMessageByZookeeper = "Get pool,and available size [{}]";
private static String releaseMessageByZookeeper = "Release pool,and available size [{}]";

/** Init ZkClient pool numbers. */
static {
for (String clusterAlias : SystemConfigUtils.getPropertyArray("kafka.eagle.zk.cluster.alias", ",")) {
clusterAliass.put(clusterAlias, SystemConfigUtils.getProperty(clusterAlias + ".zk.list"));
}
for (Entry<String, String> entry : clusterAliass.entrySet()) {
for (Map.Entry<String, String> entry : clusterAliass.entrySet()) {
List<KafkaZkClient> zkCliPool = new ArrayList<>(zkCliPoolSize);
KafkaZkClient zkc = null;
for (int i = 0; i < zkCliPoolSize; i++) {
Expand Down Expand Up @@ -94,60 +86,36 @@ public static synchronized KafkaZKPoolUtils getInstance() {
return instance;
}

private boolean zkClientIsActive(KafkaZkClient zkc){
if(zkc==null){
return false;
}
try{
zkc.getAllBrokersInCluster();
return true;
}catch (Exception e){
ErrorUtils.print(KafkaZKPoolUtils.class).error("zookeeper client not alive!", e);
}
return false;
}

/** Reback pool one of ZkClient object. */
public synchronized KafkaZkClient getZkClient(String clusterAlias) {
public KafkaZkClient getZkClient(String clusterAlias) {
//fix issues-470,ZooKeeper client is thread safe.
//So, in general, we can keep the persistent connection.
List<KafkaZkClient> zkCliPool = zkCliPools.get(clusterAlias);
KafkaZkClient zkc = null;
try {
if (!zkCliPool.isEmpty()) {
zkc = zkCliPool.get(0);
zkCliPool.remove(0);
String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue());
if (osName.contains(OperateSystem.LINUX.getValue())) {
ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliPool.size());
} else {
ErrorUtils.print(KafkaZKPoolUtils.class).info(errorMessageByZookeeper, zkCliPool.size());
}
} else {
for (int i = 0; i < zkCliPoolSize; i++) {
zkc = KafkaZkClient.apply(clusterAliass.get(clusterAlias), JaasUtils.isZkSecurityEnabled(), ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, Integer.MAX_VALUE, Time.SYSTEM, METRIC_GROUP_NAME, "SessionExpireListener");
if (zkc != null) {
zkCliPool.add(zkc);
}
}

zkc = zkCliPool.get(0);
zkCliPool.remove(0);
String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue());
if (osName.contains(OperateSystem.LINUX.getValue())) {
ErrorUtils.print(KafkaZKPoolUtils.class).debug(errorMessageByZookeeper, zkCliPool.size());
} else {
ErrorUtils.print(KafkaZKPoolUtils.class).warn(errorMessageByZookeeper, zkCliPool.size());
}
}
} catch (Exception e) {
ErrorUtils.print(KafkaZKPoolUtils.class).error("Error initializing zookeeper, msg is ", e);
ErrorUtils.print(KafkaZKPoolUtils.class).error("Kafka cluster[" + clusterAlias + ".zk.list] address has null.");
}
return zkc;
int n = ThreadLocalRandom.current().nextInt(zkCliPool.size());
return zkCliPool.get(n);
}

/** Release ZkClient object. */
public synchronized void release(String clusterAlias, KafkaZkClient zkc) {
List<KafkaZkClient> zkCliPool = zkCliPools.get(clusterAlias);
if (zkCliPool != null && zkCliPool.size() < zkCliPoolSize) {
zkCliPool.add(zkc);
}
String osName = System.getProperties().getProperty(OperateSystem.OS_NAME.getValue());
if (osName.contains(OperateSystem.LINUX.getValue())) {
ErrorUtils.print(KafkaZKPoolUtils.class).debug(releaseMessageByZookeeper, (zkCliPool == null ? 0 : zkCliPool.size()));
} else {
ErrorUtils.print(KafkaZKPoolUtils.class).info(releaseMessageByZookeeper, (zkCliPool == null ? 0 : zkCliPool.size()));
}
public void release(String clusterAlias, KafkaZkClient zkc) {
//do nothing.just keep code style
}

/** Construction method. */
private KafkaZKPoolUtils() {
}


}