From 46940c2d04bc3319220acdc6c7a426264ffb6747 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Tue, 10 Feb 2026 19:37:31 +0800 Subject: [PATCH 1/4] fix race condition in ZK session reconnecting leading to SESSIONEXPIRED Errors. --- .../metadata/impl/PulsarZooKeeperClient.java | 121 ++++++++++-------- .../metadata/impl/ZKSessionWatcher.java | 5 +- 2 files changed, 72 insertions(+), 54 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index 6a995f20e745a..b43b1f12ba7a4 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -110,6 +111,8 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo private final OpStatsLogger syncStats; private final OpStatsLogger createClientStats; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Runnable clientCreator = new Runnable() { @Override @@ -120,21 +123,26 @@ public void run() { @Override public ZooKeeper call() throws KeeperException, InterruptedException { log.info("Reconnecting zookeeper {}.", connectString); - // close the previous one - closeZkHandle(); - ZooKeeper newZk; + lock.writeLock().lock(); try { - newZk = createZooKeeper(); - } catch (IOException | QuorumPeerConfig.ConfigException e) { - log.error("Failed to create zookeeper instance to {} with config path {}", - connectString, configPath, e); - throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + // close the previous one + closeZkHandle(); + ZooKeeper newZk; + try { + newZk = createZooKeeper(); + } catch (IOException | QuorumPeerConfig.ConfigException e) { + log.error("Failed to create zookeeper instance to {} with config path {}", + connectString, configPath, e); + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); + } + waitForConnection(); + zk.set(newZk); + log.info("ZooKeeper session {} is created to {}.", + Long.toHexString(newZk.getSessionId()), connectString); + return newZk; + } finally { + lock.writeLock().unlock(); } - waitForConnection(); - zk.set(newZk); - log.info("ZooKeeper session {} is created to {}.", - Long.toHexString(newZk.getSessionId()), connectString); - return newZk; } @Override @@ -151,6 +159,15 @@ public String toString() { }; + private ZooKeeper getZkHandle() { + lock.readLock().lock(); + try { + return zk.get(); + } finally { + lock.readLock().unlock(); + } + } + @VisibleForTesting static PulsarZooKeeperClient createConnectedZooKeeperClient( String connectString, int sessionTimeoutMs, Set childWatchers, @@ -342,7 +359,7 @@ public void close() throws InterruptedException { } private void closeZkHandle() throws InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { super.close(); } else { @@ -421,7 +438,7 @@ public void run() { @Override public long getSessionId() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return super.getSessionId(); } @@ -430,7 +447,7 @@ public long getSessionId() { @Override public byte[] getSessionPasswd() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return super.getSessionPasswd(); } @@ -439,7 +456,7 @@ public byte[] getSessionPasswd() { @Override public int getSessionTimeout() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return super.getSessionTimeout(); } @@ -448,7 +465,7 @@ public int getSessionTimeout() { @Override public void addAuthInfo(String scheme, byte[] auth) { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { super.addAuthInfo(scheme, auth); return; @@ -486,7 +503,7 @@ public String toString() { @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.multi(ops); } @@ -518,7 +535,7 @@ public void processResult(int rc, String path, Object ctx, List result @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.multi(ops, multiCb, worker); } else { @@ -541,7 +558,7 @@ public Transaction transaction() { // since there is no reference about which client that the transaction could use // so just use ZooKeeper instance directly. // you'd better to use {@link #multi}. - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return super.transaction(); } @@ -559,7 +576,7 @@ public String toString() { @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getACL(path, stat); } @@ -594,7 +611,7 @@ public String toString() { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.getACL(path, stat, aclCb, worker); } else { @@ -618,7 +635,7 @@ public String toString() { @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.setACL(path, acl, version); } @@ -654,7 +671,7 @@ public String toString() { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.setACL(path, acl, version, stCb, worker); } else { @@ -691,7 +708,7 @@ public String toString() { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.sync(path, vCb, worker); } else { @@ -705,7 +722,7 @@ void zkRun() { @Override public States getState() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getState(); } else { @@ -715,7 +732,7 @@ public States getState() { @Override public String toString() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.toString(); } else { @@ -731,7 +748,7 @@ public String create(final String path, final byte[] data, @Override public String call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.create(path, data, acl, createMode); } @@ -767,7 +784,7 @@ public void processResult(int rc, String path, Object ctx, String name) { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker); } else { @@ -790,7 +807,7 @@ public void delete(final String path, final int version) throws KeeperException, @Override public Void call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.delete(path, version); } else { @@ -827,7 +844,7 @@ public void processResult(int rc, String path, Object ctx) { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.delete(path, version, deleteCb, worker); } else { @@ -850,7 +867,7 @@ public Stat exists(final String path, final Watcher watcher) throws KeeperExcept @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.exists(path, watcher); } @@ -871,7 +888,7 @@ public Stat exists(final String path, final boolean watch) throws KeeperExceptio @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.exists(path, watch); } @@ -906,7 +923,7 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.exists(path, watcher, stCb, worker); } else { @@ -943,7 +960,7 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.exists(path, watch, stCb, worker); } else { @@ -967,7 +984,7 @@ public byte[] getData(final String path, final Watcher watcher, final Stat stat) @Override public byte[] call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getData(path, watcher, stat); } @@ -989,7 +1006,7 @@ public byte[] getData(final String path, final boolean watch, final Stat stat) @Override public byte[] call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getData(path, watch, stat); } @@ -1024,7 +1041,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.getData(path, watcher, dataCb, worker); } else { @@ -1061,7 +1078,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.getData(path, watch, dataCb, worker); } else { @@ -1085,7 +1102,7 @@ public Stat setData(final String path, final byte[] data, final int version) @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.setData(path, data, version); } @@ -1121,7 +1138,7 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.setData(path, data, version, stCb, worker); } else { @@ -1145,7 +1162,7 @@ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) @Override public Void call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode); } else { @@ -1182,7 +1199,7 @@ public void processResult(int rc, String path, Object ctx) { @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, cb, ctx); } else { @@ -1206,7 +1223,7 @@ public List getChildren(final String path, final Watcher watcher, final @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watcher, stat); } @@ -1228,7 +1245,7 @@ public List getChildren(final String path, final boolean watch, final St @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watch, stat); } @@ -1265,7 +1282,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watcher, childCb, worker); } else { @@ -1304,7 +1321,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watch, childCb, worker); } else { @@ -1329,7 +1346,7 @@ public List getChildren(final String path, final Watcher watcher) @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watcher); } @@ -1351,7 +1368,7 @@ public List getChildren(final String path, final boolean watch) @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watch); } @@ -1388,7 +1405,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watcher, childCb, worker); } else { @@ -1427,7 +1444,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = zk.get(); + ZooKeeper zkHandle = getZkHandle(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watch, childCb, worker); } else { diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java index a840721023080..ac6c35c9b56c0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java @@ -169,11 +169,12 @@ private synchronized void checkState(Watcher.Event.KeeperState zkClientState) { log.info("ZooKeeper client reconnection with server quorum. Current status: {}", currentStatus); disconnectedAt = 0; + SessionEvent previousStatus = currentStatus; + currentStatus = SessionEvent.SessionReestablished; sessionListener.accept(SessionEvent.Reconnected); - if (currentStatus == SessionEvent.SessionLost) { + if (previousStatus == SessionEvent.SessionLost) { sessionListener.accept(SessionEvent.SessionReestablished); } - currentStatus = SessionEvent.SessionReestablished; } break; } From a51ec818632bf8c1e045c298a49f1e1367081f20 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Tue, 10 Feb 2026 20:13:40 +0800 Subject: [PATCH 2/4] fix race condition in ZK session reconnecting leading to SESSIONEXPIRED Errors. --- .../org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index b43b1f12ba7a4..02766f0af3d53 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -122,8 +122,8 @@ public void run() { @Override public ZooKeeper call() throws KeeperException, InterruptedException { - log.info("Reconnecting zookeeper {}.", connectString); lock.writeLock().lock(); + log.info("Reconnecting zookeeper {}.", connectString); try { // close the previous one closeZkHandle(); From fdb3b7093b2442393798a798f40efcd2c0b6e692 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Mon, 20 Apr 2026 14:13:37 +0800 Subject: [PATCH 3/4] fix. --- .../org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index 02766f0af3d53..1bab0134c3ee3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -359,7 +359,7 @@ public void close() throws InterruptedException { } private void closeZkHandle() throws InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { super.close(); } else { From 15cfe8738274f77162a7b0ba6e66f5821ab97370 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Mon, 20 Apr 2026 14:20:56 +0800 Subject: [PATCH 4/4] fix. --- .../metadata/impl/PulsarZooKeeperClient.java | 119 ++++++++---------- 1 file changed, 51 insertions(+), 68 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java index 1bab0134c3ee3..6a995f20e745a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java @@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -111,8 +110,6 @@ public class PulsarZooKeeperClient extends ZooKeeper implements Watcher, AutoClo private final OpStatsLogger syncStats; private final OpStatsLogger createClientStats; - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final Runnable clientCreator = new Runnable() { @Override @@ -122,27 +119,22 @@ public void run() { @Override public ZooKeeper call() throws KeeperException, InterruptedException { - lock.writeLock().lock(); log.info("Reconnecting zookeeper {}.", connectString); + // close the previous one + closeZkHandle(); + ZooKeeper newZk; try { - // close the previous one - closeZkHandle(); - ZooKeeper newZk; - try { - newZk = createZooKeeper(); - } catch (IOException | QuorumPeerConfig.ConfigException e) { - log.error("Failed to create zookeeper instance to {} with config path {}", - connectString, configPath, e); - throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); - } - waitForConnection(); - zk.set(newZk); - log.info("ZooKeeper session {} is created to {}.", - Long.toHexString(newZk.getSessionId()), connectString); - return newZk; - } finally { - lock.writeLock().unlock(); + newZk = createZooKeeper(); + } catch (IOException | QuorumPeerConfig.ConfigException e) { + log.error("Failed to create zookeeper instance to {} with config path {}", + connectString, configPath, e); + throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } + waitForConnection(); + zk.set(newZk); + log.info("ZooKeeper session {} is created to {}.", + Long.toHexString(newZk.getSessionId()), connectString); + return newZk; } @Override @@ -159,15 +151,6 @@ public String toString() { }; - private ZooKeeper getZkHandle() { - lock.readLock().lock(); - try { - return zk.get(); - } finally { - lock.readLock().unlock(); - } - } - @VisibleForTesting static PulsarZooKeeperClient createConnectedZooKeeperClient( String connectString, int sessionTimeoutMs, Set childWatchers, @@ -438,7 +421,7 @@ public void run() { @Override public long getSessionId() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return super.getSessionId(); } @@ -447,7 +430,7 @@ public long getSessionId() { @Override public byte[] getSessionPasswd() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return super.getSessionPasswd(); } @@ -456,7 +439,7 @@ public byte[] getSessionPasswd() { @Override public int getSessionTimeout() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return super.getSessionTimeout(); } @@ -465,7 +448,7 @@ public int getSessionTimeout() { @Override public void addAuthInfo(String scheme, byte[] auth) { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { super.addAuthInfo(scheme, auth); return; @@ -503,7 +486,7 @@ public String toString() { @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.multi(ops); } @@ -535,7 +518,7 @@ public void processResult(int rc, String path, Object ctx, List result @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.multi(ops, multiCb, worker); } else { @@ -558,7 +541,7 @@ public Transaction transaction() { // since there is no reference about which client that the transaction could use // so just use ZooKeeper instance directly. // you'd better to use {@link #multi}. - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return super.transaction(); } @@ -576,7 +559,7 @@ public String toString() { @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getACL(path, stat); } @@ -611,7 +594,7 @@ public String toString() { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.getACL(path, stat, aclCb, worker); } else { @@ -635,7 +618,7 @@ public String toString() { @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.setACL(path, acl, version); } @@ -671,7 +654,7 @@ public String toString() { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.setACL(path, acl, version, stCb, worker); } else { @@ -708,7 +691,7 @@ public String toString() { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.sync(path, vCb, worker); } else { @@ -722,7 +705,7 @@ void zkRun() { @Override public States getState() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getState(); } else { @@ -732,7 +715,7 @@ public States getState() { @Override public String toString() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.toString(); } else { @@ -748,7 +731,7 @@ public String create(final String path, final byte[] data, @Override public String call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.create(path, data, acl, createMode); } @@ -784,7 +767,7 @@ public void processResult(int rc, String path, Object ctx, String name) { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker); } else { @@ -807,7 +790,7 @@ public void delete(final String path, final int version) throws KeeperException, @Override public Void call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.delete(path, version); } else { @@ -844,7 +827,7 @@ public void processResult(int rc, String path, Object ctx) { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.delete(path, version, deleteCb, worker); } else { @@ -867,7 +850,7 @@ public Stat exists(final String path, final Watcher watcher) throws KeeperExcept @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.exists(path, watcher); } @@ -888,7 +871,7 @@ public Stat exists(final String path, final boolean watch) throws KeeperExceptio @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.exists(path, watch); } @@ -923,7 +906,7 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.exists(path, watcher, stCb, worker); } else { @@ -960,7 +943,7 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.exists(path, watch, stCb, worker); } else { @@ -984,7 +967,7 @@ public byte[] getData(final String path, final Watcher watcher, final Stat stat) @Override public byte[] call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getData(path, watcher, stat); } @@ -1006,7 +989,7 @@ public byte[] getData(final String path, final boolean watch, final Stat stat) @Override public byte[] call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getData(path, watch, stat); } @@ -1041,7 +1024,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.getData(path, watcher, dataCb, worker); } else { @@ -1078,7 +1061,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.getData(path, watch, dataCb, worker); } else { @@ -1102,7 +1085,7 @@ public Stat setData(final String path, final byte[] data, final int version) @Override public Stat call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.setData(path, data, version); } @@ -1138,7 +1121,7 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.setData(path, data, version, stCb, worker); } else { @@ -1162,7 +1145,7 @@ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode) @Override public Void call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode); } else { @@ -1199,7 +1182,7 @@ public void processResult(int rc, String path, Object ctx) { @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.addWatch(basePath, watcher, mode, cb, ctx); } else { @@ -1223,7 +1206,7 @@ public List getChildren(final String path, final Watcher watcher, final @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watcher, stat); } @@ -1245,7 +1228,7 @@ public List getChildren(final String path, final boolean watch, final St @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watch, stat); } @@ -1282,7 +1265,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watcher, childCb, worker); } else { @@ -1321,7 +1304,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watch, childCb, worker); } else { @@ -1346,7 +1329,7 @@ public List getChildren(final String path, final Watcher watcher) @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watcher); } @@ -1368,7 +1351,7 @@ public List getChildren(final String path, final boolean watch) @Override public List call() throws KeeperException, InterruptedException { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { return PulsarZooKeeperClient.super.getChildren(path, watch); } @@ -1405,7 +1388,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watcher, childCb, worker); } else { @@ -1444,7 +1427,7 @@ public void processResult(int rc, String path, Object ctx, @Override void zkRun() { - ZooKeeper zkHandle = getZkHandle(); + ZooKeeper zkHandle = zk.get(); if (null == zkHandle) { PulsarZooKeeperClient.super.getChildren(path, watch, childCb, worker); } else {