From bbb282d5b222abd82045bb4536d039a8fad62fda Mon Sep 17 00:00:00 2001 From: Niklas Gustavsson Date: Mon, 11 Feb 2019 15:42:37 +0100 Subject: [PATCH] WIP resolver API --- .../com/spotify/folsom/ConstantResolver.java | 45 +++++++ .../spotify/folsom/MemcacheClientBuilder.java | 114 +++++++++--------- .../java/com/spotify/folsom/Resolver.java | 48 ++++++++ .../folsom/ketama/SrvKetamaClient.java | 44 ++++--- .../spotify/folsom/ketama/SrvResolver.java | 28 +++++ .../com/spotify/folsom/IntegrationTest.java | 1 + .../spotify/folsom/SrvKetamaClientTest.java | 32 ++--- 7 files changed, 217 insertions(+), 95 deletions(-) create mode 100644 folsom/src/main/java/com/spotify/folsom/ConstantResolver.java create mode 100644 folsom/src/main/java/com/spotify/folsom/Resolver.java create mode 100644 folsom/src/main/java/com/spotify/folsom/ketama/SrvResolver.java diff --git a/folsom/src/main/java/com/spotify/folsom/ConstantResolver.java b/folsom/src/main/java/com/spotify/folsom/ConstantResolver.java new file mode 100644 index 00000000..22fd1b37 --- /dev/null +++ b/folsom/src/main/java/com/spotify/folsom/ConstantResolver.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.spotify.folsom; + +import com.google.common.collect.ImmutableList; +import com.spotify.folsom.guava.HostAndPort; +import java.util.List; + +public class ConstantResolver implements Resolver { + private final ImmutableList results; + + public ConstantResolver(ImmutableList results) { + this.results = results; + } + + static ConstantResolver fromAddresses(List addresses) { + final long ttl = Long.MAX_VALUE; // can use a very long TTL since the resolver is constant + final ImmutableList results = + addresses + .stream() + .map( + hostAndPort -> + new ResolveResult(hostAndPort.getHostText(), hostAndPort.getPort(), ttl)) + .collect(ImmutableList.toImmutableList()); + return new ConstantResolver(results); + } + + @Override + public List resolve() { + return results; + } +} diff --git a/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java b/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java index 18166991..f672d73a 100644 --- a/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java +++ b/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java @@ -18,7 +18,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static com.spotify.folsom.client.MemcacheEncoder.MAX_KEY_LEN; import com.google.common.base.Charsets; @@ -38,9 +37,8 @@ import com.spotify.folsom.client.ascii.DefaultAsciiMemcacheClient; import com.spotify.folsom.client.binary.DefaultBinaryMemcacheClient; import com.spotify.folsom.guava.HostAndPort; -import com.spotify.folsom.ketama.AddressAndClient; -import com.spotify.folsom.ketama.KetamaMemcacheClient; import com.spotify.folsom.ketama.SrvKetamaClient; +import com.spotify.folsom.ketama.SrvResolver; import com.spotify.folsom.reconnect.ReconnectingClient; import com.spotify.folsom.retry.RetryingClient; import com.spotify.folsom.roundrobin.RoundRobinMemcacheClient; @@ -110,6 +108,7 @@ public class MemcacheClientBuilder { private Supplier executor = DEFAULT_REPLY_EXECUTOR; private Charset charset = Charsets.UTF_8; + private Resolver resolver; private DnsSrvResolver srvResolver; private String srvRecord; private long dnsRefreshPeriod = 60 * 1000L; @@ -210,12 +209,18 @@ public MemcacheClientBuilder withAddress(final String host, final int port) { return this; } + public MemcacheClientBuilder withResolver(final Resolver resolver) { + this.resolver = resolver; + return this; + } + /** * Use SRV to lookup nodes instead of a fixed set of addresses. This means that the set of nodes * can change dynamically over time. * * @param srvRecord the SRV record to use. * @return itself + * @deprecated Use #withResolver */ public MemcacheClientBuilder withSRVRecord(final String srvRecord) { this.srvRecord = checkNotNull(srvRecord); @@ -252,9 +257,11 @@ public MemcacheClientBuilder withSRVShutdownDelay(final long shutdownDelay) { * @param srvResolver the resolver to use. Default is a caching resolver from {@link * com.spotify.dns.DnsSrvResolvers} * @return itself + * @deprecated Use #withResolver */ public MemcacheClientBuilder withSrvResolver(final DnsSrvResolver srvResolver) { - this.srvResolver = checkNotNull(srvResolver, "srvResolver"); + checkNotNull(srvResolver, "srvResolver"); + this.srvResolver = srvResolver; return this; } @@ -514,70 +521,67 @@ public AsciiMemcacheClient connectAscii() { * @return A raw memcached client. */ protected RawMemcacheClient connectRaw(boolean binary, Authenticator authenticator) { - List addresses = this.addresses; - RawMemcacheClient client; - if (srvRecord != null) { - if (!addresses.isEmpty()) { - throw new IllegalStateException("You may not specify both srvRecord and addresses"); - } - client = createSRVClient(binary, authenticator); - } else { - if (addresses.isEmpty()) { - addresses = ImmutableList.of(HostAndPort.fromParts(DEFAULT_HOSTNAME, DEFAULT_PORT)); - } + final Resolver resolver = getResolver(); - final List clients = createClients(addresses, binary, authenticator); - if (addresses.size() > 1) { - checkState(clients.size() == addresses.size()); - - final List aac = Lists.newArrayListWithCapacity(clients.size()); - for (int i = 0; i < clients.size(); i++) { - final HostAndPort address = addresses.get(i); - aac.add(new AddressAndClient(address, clients.get(i))); - } - - client = new KetamaMemcacheClient(aac); - } else { - client = clients.get(0); - } - } + final SrvKetamaClient client = + new SrvKetamaClient( + resolver, + DEFAULT_SCHEDULED_EXECUTOR.get(), + dnsRefreshPeriod, + TimeUnit.MILLISECONDS, + input -> createClient(input, binary, authenticator), + shutdownDelay, + TimeUnit.MILLISECONDS); + client.start(); if (retry) { return new RetryingClient(client); } return client; } - private List createClients( - final List addresses, final boolean binary, final Authenticator authenticator) { - - final List clients = Lists.newArrayListWithCapacity(addresses.size()); - for (final HostAndPort address : addresses) { - clients.add(createClient(address, binary, authenticator)); + private Resolver getResolver() { + if (resolver != null) { + if (srvRecord != null) { + throw new IllegalStateException( + "withResolver() can not be used together with withSrvRecord()"); + } + if (srvResolver != null) { + throw new IllegalStateException( + "withResolver() can not be used together with withSrvResolver()"); + } + if (!addresses.isEmpty()) { + throw new IllegalStateException( + "withResolver() can not be used together with withAddresses()"); + } + return resolver; } - return clients; - } - private RawMemcacheClient createSRVClient( - final boolean binary, final Authenticator authenticator) { - DnsSrvResolver resolver = srvResolver; - if (resolver == null) { - resolver = DEFAULT_SRV_RESOLVER_EXECUTOR.get(); + if (srvResolver != null) { + if (srvRecord == null) { + throw new IllegalStateException( + "withSrvResolver() can not be used without withSrvRecord()"); + } + if (!addresses.isEmpty()) { + throw new IllegalStateException( + "withSrvResolver() can not be used together with withAddresses()"); + } + return new SrvResolver(srvResolver, srvRecord); } - SrvKetamaClient client = - new SrvKetamaClient( - srvRecord, - resolver, - DEFAULT_SCHEDULED_EXECUTOR.get(), - dnsRefreshPeriod, - TimeUnit.MILLISECONDS, - input -> createClient(input, binary, authenticator), - shutdownDelay, - TimeUnit.MILLISECONDS); + if (srvRecord != null) { + if (!addresses.isEmpty()) { + throw new IllegalStateException( + "withSrvRecord() can not be used together with withAddresses()"); + } + return new SrvResolver(DEFAULT_SRV_RESOLVER_EXECUTOR.get(), srvRecord); + } - client.start(); - return client; + if (!addresses.isEmpty()) { + return ConstantResolver.fromAddresses(addresses); + } + return ConstantResolver.fromAddresses( + ImmutableList.of(HostAndPort.fromParts(DEFAULT_HOSTNAME, DEFAULT_PORT))); } private RawMemcacheClient createClient( diff --git a/folsom/src/main/java/com/spotify/folsom/Resolver.java b/folsom/src/main/java/com/spotify/folsom/Resolver.java new file mode 100644 index 00000000..9189bebe --- /dev/null +++ b/folsom/src/main/java/com/spotify/folsom/Resolver.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 Spotify AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.spotify.folsom; + +import java.util.List; + +public interface Resolver { + + class ResolveResult { + private final String host; + private final int port; + private final long ttl; + + public ResolveResult(String host, int port, long ttl) { + this.host = host; + this.port = port; + this.ttl = ttl; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public long getTtl() { + return ttl; + } + } + + List resolve(); +} diff --git a/folsom/src/main/java/com/spotify/folsom/ketama/SrvKetamaClient.java b/folsom/src/main/java/com/spotify/folsom/ketama/SrvKetamaClient.java index 35043aa4..7bee3b4a 100644 --- a/folsom/src/main/java/com/spotify/folsom/ketama/SrvKetamaClient.java +++ b/folsom/src/main/java/com/spotify/folsom/ketama/SrvKetamaClient.java @@ -17,13 +17,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.spotify.dns.DnsSrvResolver; -import com.spotify.dns.LookupResult; -import com.spotify.folsom.AbstractRawMemcacheClient; -import com.spotify.folsom.ConnectionChangeListener; -import com.spotify.folsom.ObservableClient; -import com.spotify.folsom.RawMemcacheClient; +import com.spotify.folsom.*; +import com.spotify.folsom.Resolver.ResolveResult; import com.spotify.folsom.client.NotConnectedClient; import com.spotify.folsom.client.Request; import com.spotify.folsom.guava.HostAndPort; @@ -47,8 +44,7 @@ public class SrvKetamaClient extends AbstractRawMemcacheClient { public static final int MAX_DNS_WAIT_TIME = 3600; private final ScheduledExecutorService executor; - private final String srvRecord; - private final DnsSrvResolver srvResolver; + private final Resolver resolver; private final long ttl; private final Connector connector; @@ -67,16 +63,14 @@ public class SrvKetamaClient extends AbstractRawMemcacheClient { private boolean shutdown = false; public SrvKetamaClient( - final String srvRecord, - DnsSrvResolver srvResolver, + Resolver resolver, ScheduledExecutorService executor, long period, TimeUnit periodUnit, final Connector connector, long shutdownDelay, TimeUnit shutdownUnit) { - this.srvRecord = srvRecord; - this.srvResolver = srvResolver; + this.resolver = resolver; this.connector = connector; this.shutdownDelay = shutdownDelay; this.shutdownUnit = shutdownUnit; @@ -99,20 +93,24 @@ public void updateDNS() { } long ttl = this.ttl; // Default ttl to use if resolve fails try { - final List lookupResults = srvResolver.resolve(srvRecord); - if (lookupResults.isEmpty()) { - // Just ignore empty results + List resolveResults = resolver.resolve(); + if (resolveResults.isEmpty()) { return; } + List hosts = Lists.newArrayListWithCapacity(resolveResults.size()); + for (ResolveResult resolveResult : resolveResults) { + hosts.add(HostAndPort.fromParts(resolveResult.getHost(), resolveResult.getPort())); + ttl = Math.min(ttl, resolveResult.getTtl()); + } final ImmutableSet newAddresses = - lookupResults + resolveResults .stream() - .map(result -> HostAndPort.fromParts(result.host(), result.port())) + .map(result -> HostAndPort.fromParts(result.getHost(), result.getPort())) .collect(ImmutableSet.toImmutableSet()); final long resolvedTtl = - lookupResults.stream().mapToLong(LookupResult::ttl).min().orElse(Long.MAX_VALUE); + resolveResults.stream().mapToLong(ResolveResult::getTtl).min().orElse(Long.MAX_VALUE); ttl = Math.min(ttl, resolvedTtl); final Set currentAddresses = clients.keySet(); @@ -176,7 +174,15 @@ public boolean isConnected() { @Override public Throwable getConnectionFailure() { - return currentClient.getConnectionFailure(); + final Throwable e = currentClient.getConnectionFailure(); + if (e != null) { + return e; + } + final RawMemcacheClient pendingClient = this.pendingClient; + if (pendingClient != null) { + return pendingClient.getConnectionFailure(); + } + return null; } @Override diff --git a/folsom/src/main/java/com/spotify/folsom/ketama/SrvResolver.java b/folsom/src/main/java/com/spotify/folsom/ketama/SrvResolver.java new file mode 100644 index 00000000..b62581f7 --- /dev/null +++ b/folsom/src/main/java/com/spotify/folsom/ketama/SrvResolver.java @@ -0,0 +1,28 @@ +package com.spotify.folsom.ketama; + +import com.google.common.collect.Lists; +import com.spotify.dns.DnsSrvResolver; +import com.spotify.dns.LookupResult; +import com.spotify.folsom.Resolver; +import java.util.List; + +public class SrvResolver implements Resolver { + + private final DnsSrvResolver srvResolver; + private final String srvRecord; + + public SrvResolver(DnsSrvResolver srvResolver, String srvRecord) { + this.srvResolver = srvResolver; + this.srvRecord = srvRecord; + } + + @Override + public List resolve() { + List lookupResults = srvResolver.resolve(srvRecord); + List results = Lists.newArrayListWithCapacity(lookupResults.size()); + for (LookupResult lookupResult : lookupResults) { + results.add(new ResolveResult(lookupResult.host(), lookupResult.port(), lookupResult.ttl())); + } + return results; + } +} diff --git a/folsom/src/test/java/com/spotify/folsom/IntegrationTest.java b/folsom/src/test/java/com/spotify/folsom/IntegrationTest.java index 1390a5ee..57a429c0 100644 --- a/folsom/src/test/java/com/spotify/folsom/IntegrationTest.java +++ b/folsom/src/test/java/com/spotify/folsom/IntegrationTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import junit.framework.AssertionFailedError; +import org.junit.*; import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; diff --git a/folsom/src/test/java/com/spotify/folsom/SrvKetamaClientTest.java b/folsom/src/test/java/com/spotify/folsom/SrvKetamaClientTest.java index ace34ed0..7daef2e4 100644 --- a/folsom/src/test/java/com/spotify/folsom/SrvKetamaClientTest.java +++ b/folsom/src/test/java/com/spotify/folsom/SrvKetamaClientTest.java @@ -21,8 +21,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; -import com.spotify.dns.DnsSrvResolver; -import com.spotify.dns.LookupResult; import com.spotify.folsom.client.test.FakeRawMemcacheClient; import com.spotify.folsom.guava.HostAndPort; import com.spotify.folsom.ketama.SrvKetamaClient; @@ -43,7 +41,7 @@ public void testSimple() throws Exception { HostAndPort hostNameC = HostAndPort.fromString("c:1"); HostAndPort hostNameD = HostAndPort.fromString("d:1"); - DnsSrvResolver resolver = Mockito.mock(DnsSrvResolver.class); + Resolver resolver = Mockito.mock(Resolver.class); // Run shutdown code immediately DeterministicScheduler executor = new DeterministicScheduler(); @@ -59,7 +57,6 @@ public void testSimple() throws Exception { SrvKetamaClient ketamaClient = new SrvKetamaClient( - "the-srv-record", resolver, executor, 1000, @@ -71,8 +68,7 @@ public void testSimple() throws Exception { assertFalse(ketamaClient.isConnected()); - Mockito.when(resolver.resolve(Mockito.anyString())) - .thenReturn(ImmutableList.of(result("a"), result("b"))); + Mockito.when(resolver.resolve()).thenReturn(ImmutableList.of(result("a"), result("b"))); ketamaClient.updateDNS(); executor.tick(1000, TimeUnit.SECONDS); @@ -80,8 +76,7 @@ public void testSimple() throws Exception { assertTrue(knownClients.get(hostNameA).isConnected()); assertTrue(knownClients.get(hostNameB).isConnected()); - Mockito.when(resolver.resolve(Mockito.anyString())) - .thenReturn(ImmutableList.of(result("b"), result("c"))); + Mockito.when(resolver.resolve()).thenReturn(ImmutableList.of(result("b"), result("c"))); ketamaClient.updateDNS(); executor.tick(1000, TimeUnit.SECONDS); @@ -90,8 +85,7 @@ public void testSimple() throws Exception { assertTrue(knownClients.get(hostNameB).isConnected()); assertTrue(knownClients.get(hostNameC).isConnected()); - Mockito.when(resolver.resolve(Mockito.anyString())) - .thenReturn(ImmutableList.of(result("c"), result("d"))); + Mockito.when(resolver.resolve()).thenReturn(ImmutableList.of(result("c"), result("d"))); ketamaClient.updateDNS(); executor.tick(1000, TimeUnit.SECONDS); @@ -102,7 +96,7 @@ public void testSimple() throws Exception { assertTrue(knownClients.get(hostNameD).isConnected()); // Ignore empty dns results - Mockito.when(resolver.resolve(Mockito.anyString())).thenReturn(ImmutableList.of()); + Mockito.when(resolver.resolve()).thenReturn(ImmutableList.of()); ketamaClient.updateDNS(); executor.tick(1000, TimeUnit.SECONDS); @@ -117,7 +111,7 @@ public void testSimple() throws Exception { @Test public void testReusingConnections() { - DnsSrvResolver resolver = Mockito.mock(DnsSrvResolver.class); + Resolver resolver = Mockito.mock(Resolver.class); // Run shutdown code immediately DeterministicScheduler executor = new DeterministicScheduler(); @@ -136,7 +130,6 @@ public void testReusingConnections() { SrvKetamaClient ketamaClient = new SrvKetamaClient( - "the-srv-record", resolver, executor, 1000, @@ -148,26 +141,23 @@ public void testReusingConnections() { assertFalse(ketamaClient.isConnected()); - Mockito.when(resolver.resolve(Mockito.anyString())) - .thenReturn(ImmutableList.of(result("a"), result("b"))); + Mockito.when(resolver.resolve()).thenReturn(ImmutableList.of(result("a"), result("b"))); ketamaClient.updateDNS(); assertEquals(2, allClients.size()); executor.tick(1000, TimeUnit.SECONDS); - Mockito.when(resolver.resolve(Mockito.anyString())) - .thenReturn(ImmutableList.of(result("b"), result("c"))); + Mockito.when(resolver.resolve()).thenReturn(ImmutableList.of(result("b"), result("c"))); ketamaClient.updateDNS(); assertEquals(3, allClients.size()); executor.tick(1000, TimeUnit.SECONDS); - Mockito.when(resolver.resolve(Mockito.anyString())) - .thenReturn(ImmutableList.of(result("c"), result("d"))); + Mockito.when(resolver.resolve()).thenReturn(ImmutableList.of(result("c"), result("d"))); ketamaClient.updateDNS(); assertEquals(4, allClients.size()); executor.tick(1000, TimeUnit.SECONDS); } - private LookupResult result(final String hostname) { - return LookupResult.create(hostname, 1, 100, 100, 100); + private Resolver.ResolveResult result(final String hostname) { + return new Resolver.ResolveResult(hostname, 1, 100); } }