Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions folsom/src/main/java/com/spotify/folsom/ConstantResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2019 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.spotify.folsom;

import com.google.common.collect.ImmutableList;
import com.spotify.folsom.guava.HostAndPort;
import java.util.List;

public class ConstantResolver implements Resolver {
private final ImmutableList<ResolveResult> results;

public ConstantResolver(ImmutableList<ResolveResult> results) {
this.results = results;
}

static ConstantResolver fromAddresses(List<HostAndPort> addresses) {
final long ttl = Long.MAX_VALUE; // can use a very long TTL since the resolver is constant
final ImmutableList<ResolveResult> results =
addresses
.stream()
.map(
hostAndPort ->
new ResolveResult(hostAndPort.getHostText(), hostAndPort.getPort(), ttl))
.collect(ImmutableList.toImmutableList());
return new ConstantResolver(results);
}

@Override
public List<ResolveResult> resolve() {
return results;
}
}
114 changes: 59 additions & 55 deletions folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.spotify.folsom.client.MemcacheEncoder.MAX_KEY_LEN;

import com.google.common.base.Charsets;
Expand All @@ -38,9 +37,8 @@
import com.spotify.folsom.client.ascii.DefaultAsciiMemcacheClient;
import com.spotify.folsom.client.binary.DefaultBinaryMemcacheClient;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.folsom.ketama.KetamaMemcacheClient;
import com.spotify.folsom.ketama.SrvKetamaClient;
import com.spotify.folsom.ketama.SrvResolver;
import com.spotify.folsom.reconnect.ReconnectingClient;
import com.spotify.folsom.retry.RetryingClient;
import com.spotify.folsom.roundrobin.RoundRobinMemcacheClient;
Expand Down Expand Up @@ -110,6 +108,7 @@ public class MemcacheClientBuilder<V> {
private Supplier<Executor> executor = DEFAULT_REPLY_EXECUTOR;
private Charset charset = Charsets.UTF_8;

private Resolver resolver;
private DnsSrvResolver srvResolver;
private String srvRecord;
private long dnsRefreshPeriod = 60 * 1000L;
Expand Down Expand Up @@ -210,12 +209,18 @@ public MemcacheClientBuilder<V> withAddress(final String host, final int port) {
return this;
}

public MemcacheClientBuilder<V> withResolver(final Resolver resolver) {
this.resolver = resolver;
return this;
}

/**
* Use SRV to lookup nodes instead of a fixed set of addresses. This means that the set of nodes
* can change dynamically over time.
*
* @param srvRecord the SRV record to use.
* @return itself
* @deprecated Use #withResolver
*/
public MemcacheClientBuilder<V> withSRVRecord(final String srvRecord) {
this.srvRecord = checkNotNull(srvRecord);
Expand Down Expand Up @@ -252,9 +257,11 @@ public MemcacheClientBuilder<V> withSRVShutdownDelay(final long shutdownDelay) {
* @param srvResolver the resolver to use. Default is a caching resolver from {@link
* com.spotify.dns.DnsSrvResolvers}
* @return itself
* @deprecated Use #withResolver
*/
public MemcacheClientBuilder<V> withSrvResolver(final DnsSrvResolver srvResolver) {
this.srvResolver = checkNotNull(srvResolver, "srvResolver");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I screwed with this one, but this change is no longer needed

checkNotNull(srvResolver, "srvResolver");
this.srvResolver = srvResolver;
return this;
}

Expand Down Expand Up @@ -514,70 +521,67 @@ public AsciiMemcacheClient<V> connectAscii() {
* @return A raw memcached client.
*/
protected RawMemcacheClient connectRaw(boolean binary, Authenticator authenticator) {
List<HostAndPort> addresses = this.addresses;
RawMemcacheClient client;
if (srvRecord != null) {
if (!addresses.isEmpty()) {
throw new IllegalStateException("You may not specify both srvRecord and addresses");
}
client = createSRVClient(binary, authenticator);
} else {
if (addresses.isEmpty()) {
addresses = ImmutableList.of(HostAndPort.fromParts(DEFAULT_HOSTNAME, DEFAULT_PORT));
}
final Resolver resolver = getResolver();

final List<RawMemcacheClient> clients = createClients(addresses, binary, authenticator);
if (addresses.size() > 1) {
checkState(clients.size() == addresses.size());

final List<AddressAndClient> aac = Lists.newArrayListWithCapacity(clients.size());
for (int i = 0; i < clients.size(); i++) {
final HostAndPort address = addresses.get(i);
aac.add(new AddressAndClient(address, clients.get(i)));
}

client = new KetamaMemcacheClient(aac);
} else {
client = clients.get(0);
}
}
final SrvKetamaClient client =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should not be named SrvKetamaClient now

new SrvKetamaClient(
resolver,
DEFAULT_SCHEDULED_EXECUTOR.get(),
dnsRefreshPeriod,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be renamed

TimeUnit.MILLISECONDS,
input -> createClient(input, binary, authenticator),
shutdownDelay,
TimeUnit.MILLISECONDS);

client.start();
if (retry) {
return new RetryingClient(client);
}
return client;
}

private List<RawMemcacheClient> createClients(
final List<HostAndPort> addresses, final boolean binary, final Authenticator authenticator) {

final List<RawMemcacheClient> clients = Lists.newArrayListWithCapacity(addresses.size());
for (final HostAndPort address : addresses) {
clients.add(createClient(address, binary, authenticator));
private Resolver getResolver() {
if (resolver != null) {
if (srvRecord != null) {
throw new IllegalStateException(
"withResolver() can not be used together with withSrvRecord()");
}
if (srvResolver != null) {
throw new IllegalStateException(
"withResolver() can not be used together with withSrvResolver()");
}
if (!addresses.isEmpty()) {
throw new IllegalStateException(
"withResolver() can not be used together with withAddresses()");
}
return resolver;
}
return clients;
}

private RawMemcacheClient createSRVClient(
final boolean binary, final Authenticator authenticator) {
DnsSrvResolver resolver = srvResolver;
if (resolver == null) {
resolver = DEFAULT_SRV_RESOLVER_EXECUTOR.get();
if (srvResolver != null) {
if (srvRecord == null) {
throw new IllegalStateException(
"withSrvResolver() can not be used without withSrvRecord()");
}
if (!addresses.isEmpty()) {
throw new IllegalStateException(
"withSrvResolver() can not be used together with withAddresses()");
}
return new SrvResolver(srvResolver, srvRecord);
}

SrvKetamaClient client =
new SrvKetamaClient(
srvRecord,
resolver,
DEFAULT_SCHEDULED_EXECUTOR.get(),
dnsRefreshPeriod,
TimeUnit.MILLISECONDS,
input -> createClient(input, binary, authenticator),
shutdownDelay,
TimeUnit.MILLISECONDS);
if (srvRecord != null) {
if (!addresses.isEmpty()) {
throw new IllegalStateException(
"withSrvRecord() can not be used together with withAddresses()");
}
return new SrvResolver(DEFAULT_SRV_RESOLVER_EXECUTOR.get(), srvRecord);
}

client.start();
return client;
if (!addresses.isEmpty()) {
return ConstantResolver.fromAddresses(addresses);
}
return ConstantResolver.fromAddresses(
ImmutableList.of(HostAndPort.fromParts(DEFAULT_HOSTNAME, DEFAULT_PORT)));
}

private RawMemcacheClient createClient(
Expand Down
48 changes: 48 additions & 0 deletions folsom/src/main/java/com/spotify/folsom/Resolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2019 Spotify AB
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.spotify.folsom;

import java.util.List;

public interface Resolver {

class ResolveResult {
private final String host;
private final int port;
private final long ttl;

public ResolveResult(String host, int port, long ttl) {
this.host = host;
this.port = port;
this.ttl = ttl;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public long getTtl() {
return ttl;
}
}

List<ResolveResult> resolve();
}
44 changes: 25 additions & 19 deletions folsom/src/main/java/com/spotify/folsom/ketama/SrvKetamaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.spotify.dns.DnsSrvResolver;
import com.spotify.dns.LookupResult;
import com.spotify.folsom.AbstractRawMemcacheClient;
import com.spotify.folsom.ConnectionChangeListener;
import com.spotify.folsom.ObservableClient;
import com.spotify.folsom.RawMemcacheClient;
import com.spotify.folsom.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use wildcard imports?

import com.spotify.folsom.Resolver.ResolveResult;
import com.spotify.folsom.client.NotConnectedClient;
import com.spotify.folsom.client.Request;
import com.spotify.folsom.guava.HostAndPort;
Expand All @@ -47,8 +44,7 @@ public class SrvKetamaClient extends AbstractRawMemcacheClient {
public static final int MAX_DNS_WAIT_TIME = 3600;

private final ScheduledExecutorService executor;
private final String srvRecord;
private final DnsSrvResolver srvResolver;
private final Resolver resolver;
private final long ttl;

private final Connector connector;
Expand All @@ -67,16 +63,14 @@ public class SrvKetamaClient extends AbstractRawMemcacheClient {
private boolean shutdown = false;

public SrvKetamaClient(
final String srvRecord,
DnsSrvResolver srvResolver,
Resolver resolver,
ScheduledExecutorService executor,
long period,
TimeUnit periodUnit,
final Connector connector,
long shutdownDelay,
TimeUnit shutdownUnit) {
this.srvRecord = srvRecord;
this.srvResolver = srvResolver;
this.resolver = resolver;
this.connector = connector;
this.shutdownDelay = shutdownDelay;
this.shutdownUnit = shutdownUnit;
Expand All @@ -99,20 +93,24 @@ public void updateDNS() {
}
long ttl = this.ttl; // Default ttl to use if resolve fails
try {
final List<LookupResult> lookupResults = srvResolver.resolve(srvRecord);
if (lookupResults.isEmpty()) {
// Just ignore empty results
List<ResolveResult> resolveResults = resolver.resolve();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final

if (resolveResults.isEmpty()) {
return;
}
List<HostAndPort> hosts = Lists.newArrayListWithCapacity(resolveResults.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final

for (ResolveResult resolveResult : resolveResults) {
hosts.add(HostAndPort.fromParts(resolveResult.getHost(), resolveResult.getPort()));
ttl = Math.min(ttl, resolveResult.getTtl());
}

final ImmutableSet<HostAndPort> newAddresses =
lookupResults
resolveResults
.stream()
.map(result -> HostAndPort.fromParts(result.host(), result.port()))
.map(result -> HostAndPort.fromParts(result.getHost(), result.getPort()))
.collect(ImmutableSet.toImmutableSet());

final long resolvedTtl =
lookupResults.stream().mapToLong(LookupResult::ttl).min().orElse(Long.MAX_VALUE);
resolveResults.stream().mapToLong(ResolveResult::getTtl).min().orElse(Long.MAX_VALUE);
ttl = Math.min(ttl, resolvedTtl);

final Set<HostAndPort> currentAddresses = clients.keySet();
Expand Down Expand Up @@ -176,7 +174,15 @@ public boolean isConnected() {

@Override
public Throwable getConnectionFailure() {
return currentClient.getConnectionFailure();
final Throwable e = currentClient.getConnectionFailure();
if (e != null) {
return e;
}
final RawMemcacheClient pendingClient = this.pendingClient;
if (pendingClient != null) {
return pendingClient.getConnectionFailure();
}
return null;
}

@Override
Expand Down
28 changes: 28 additions & 0 deletions folsom/src/main/java/com/spotify/folsom/ketama/SrvResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.spotify.folsom.ketama;

import com.google.common.collect.Lists;
import com.spotify.dns.DnsSrvResolver;
import com.spotify.dns.LookupResult;
import com.spotify.folsom.Resolver;
import java.util.List;

public class SrvResolver implements Resolver {

private final DnsSrvResolver srvResolver;
private final String srvRecord;

public SrvResolver(DnsSrvResolver srvResolver, String srvRecord) {
this.srvResolver = srvResolver;
this.srvRecord = srvRecord;
}

@Override
public List<ResolveResult> resolve() {
List<LookupResult> lookupResults = srvResolver.resolve(srvRecord);
List<ResolveResult> results = Lists.newArrayListWithCapacity(lookupResults.size());
for (LookupResult lookupResult : lookupResults) {
results.add(new ResolveResult(lookupResult.host(), lookupResult.port(), lookupResult.ttl()));
}
return results;
}
}
Loading