diff --git a/.classpath b/.classpath
index 9fc2de7..bf615c1 100644
--- a/.classpath
+++ b/.classpath
@@ -6,11 +6,6 @@
-
-
-
-
-
@@ -22,12 +17,12 @@
-
+
-
+
diff --git a/.gitignore b/.gitignore
index f868956..56c5f31 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,16 +1,58 @@
-*.class
+# Node #
+######################
+lib-cov
+*.seed
+*.log
+*.csv
+*.dat
+*.out
+*.pid
+*.gz
-# Package Files #
-*.jar
-*.war
-*.ear
+pids
+logs
+results
-# Eclipse settings
-.settings
+npm-debug.log
+node_modules
-# IDEA settings
-*.iml
-.idea
+# Logs and databases #
+######################
+*.log
+*.sql
+*.sqlite
-# Maven output
-/target
+# OS generated files #
+######################
+.DS_Store
+.DS_Store?
+._*
+.Spotlight-V100
+.Trashes
+ehthumbs.db
+Thumbs.db
+
+# Editor #
+######################
+.idea/
+iron.iml
+*.sublime-workspace
+*.sublime-project
+iron.iml
+/nbproject/
+.tm_properties
+
+/kdt.zip
+.tags
+.tags_sorted_by_file
+
+.buildpath
+.project
+.classpath
+.settings/
+koala-config.json
+
+.externalToolBuilders/
+.gitignore
+
+target/
diff --git a/pom.xml b/pom.xml
index b230c0b..5dbb07f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,7 +2,7 @@
4.0.0
ly.bit
nsq
- 0.0.1-SNAPSHOT
+ 0.0.4-SNAPSHOT
Java NSQ Client
@@ -57,10 +57,23 @@
maven-compiler-plugin
3.0
- 1.6
- 1.6
+ 1.7
+ 1.7
+
+
+
+ releases
+ Nexus Releases Repository
+ http://192.168.66.204:8081/content/repositories/releases/
+
+
+ snapshots
+ Nexus Snapshots Repository
+ http://192.168.66.204:8081/content/repositories/snapshots/
+
+
diff --git a/src/main/java/ly/bit/nsq/Connection.java b/src/main/java/ly/bit/nsq/Connection.java
index 121c907..43845ee 100644
--- a/src/main/java/ly/bit/nsq/Connection.java
+++ b/src/main/java/ly/bit/nsq/Connection.java
@@ -3,7 +3,6 @@
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
diff --git a/src/main/java/ly/bit/nsq/BasicConnection.java b/src/main/java/ly/bit/nsq/DefaultConnection.java
similarity index 92%
rename from src/main/java/ly/bit/nsq/BasicConnection.java
rename to src/main/java/ly/bit/nsq/DefaultConnection.java
index 8350a34..83e82d7 100644
--- a/src/main/java/ly/bit/nsq/BasicConnection.java
+++ b/src/main/java/ly/bit/nsq/DefaultConnection.java
@@ -1,22 +1,20 @@
package ly.bit.nsq;
import java.io.BufferedInputStream;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.Arrays;
import ly.bit.nsq.exceptions.NSQException;
import ly.bit.nsq.util.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicConnection extends Connection {
- private static final Logger log = LoggerFactory.getLogger(BasicConnection.class);
+public class DefaultConnection extends Connection {
+ private static final Logger log = LoggerFactory.getLogger(DefaultConnection.class);
private Socket sock;
private InputStream inputStream;
diff --git a/src/main/java/ly/bit/nsq/NSQProducer.java b/src/main/java/ly/bit/nsq/NSQProducer.java
index 1b616cf..38ecec8 100644
--- a/src/main/java/ly/bit/nsq/NSQProducer.java
+++ b/src/main/java/ly/bit/nsq/NSQProducer.java
@@ -1,13 +1,15 @@
package ly.bit.nsq;
import ly.bit.nsq.exceptions.NSQException;
+import ly.bit.nsq.lookupd.DefaultLookup;
+import ly.bit.nsq.util.StringUtils;
+
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.params.ClientPNames;
import org.apache.http.client.params.CookiePolicy;
-import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
@@ -21,7 +23,10 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
@@ -33,29 +38,37 @@ public class NSQProducer {
private static final String PUT_URL = "/put?topic=";
private static final int DEFAULT_SOCKET_TIMEOUT = 2000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 2000;
-
- private String url;
- private String topic;
+ private static final int MAX_PER_ROUTE_CONNECTIONS = 32;
+ private static final int MAX_CONNECTIONS = 64;
+ private static final int MAX_RETRY_COUNT = 3;
+
+ private String defaultNsqdAddr;
+ private DefaultLookup lookup;
+ private ConcurrentHashMap> hostIndex;
+ private ConcurrentHashMap reTryCountMap;
protected ExecutorService executor = Executors.newCachedThreadPool();
-
protected HttpClient httpclient;
+ protected PoolingClientConnectionManager cm;
// TODO add timeout config / allow setting any httpclient param via getHtttpClient
-
- // Convenience constructor assuming local nsqd on standard port
- public NSQProducer(String topic) {
- this("http://127.0.0.1:4151", topic);
+
+ public NSQProducer(String defaultNsqdAddr, String lookupAddr) {
+ this(lookupAddr);
+ this.defaultNsqdAddr = StringUtils.trimRight("/", defaultNsqdAddr);
}
- public NSQProducer(String url, String topic) {
- this.topic = topic;
- this.url = url + PUT_URL + topic;
+ public NSQProducer(String lookupAddr) {
+ this.lookup = new DefaultLookup(lookupAddr);
+ this.hostIndex = new ConcurrentHashMap>();
+ this.reTryCountMap = new ConcurrentHashMap();
SchemeRegistry schemeRegistry = new SchemeRegistry();
schemeRegistry.register(
new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));
- ClientConnectionManager cm = new PoolingClientConnectionManager(schemeRegistry);
+ cm = new PoolingClientConnectionManager(schemeRegistry);
+ cm.setDefaultMaxPerRoute(MAX_PER_ROUTE_CONNECTIONS);
+ cm.setMaxTotal(MAX_CONNECTIONS);
this.httpclient = new DefaultHttpClient(cm);
this.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT);
@@ -78,9 +91,13 @@ public void run(){
* @param message
* @throws NSQException
*/
- public void put(String message) throws NSQException {
+ public void put(String message, String topic) throws NSQException {
HttpPost post = null;
try {
+ String url = getUrl(topic);
+ if (url == null) {
+ throw new NSQException("can't get topic:("+topic+") http producer");
+ }
post = new HttpPost(url);
post.setEntity(new StringEntity(message));
HttpResponse response = this.httpclient.execute(post);
@@ -90,12 +107,25 @@ public void put(String message) throws NSQException {
if (response.getEntity() != null) {
EntityUtils.consume(response.getEntity());
}
+ reTryCountMap.put(topic, 0);
} catch (UnsupportedEncodingException e) {
throw new NSQException(e);
} catch (ClientProtocolException e) {
throw new NSQException(e);
} catch (IOException e) {
- throw new NSQException(e);
+ Integer reTryCount = reTryCountMap.get(topic);
+ if (reTryCount != null && reTryCount.intValue() < MAX_RETRY_COUNT) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ throw new NSQException(e1);
+ }
+ reTryCountMap.put(topic, MAX_RETRY_COUNT);
+ hostIndex.remove(topic);
+ put(message, topic);// retry
+ } else {
+ throw new NSQException(e);
+ }
} finally {
if (post != null) {
post.releaseConnection();
@@ -109,8 +139,8 @@ public void put(String message) throws NSQException {
* @param message
* @return
*/
- public FutureTask putAsync(String message) {
- FutureTask task = new FutureTask(new NSQAsyncWriter(message));
+ public FutureTask putAsync(String message, String topic) {
+ FutureTask task = new FutureTask(new NSQAsyncWriter(message, topic));
executor.execute(task);
return task;
@@ -118,13 +148,15 @@ public FutureTask putAsync(String message) {
public class NSQAsyncWriter implements Callable {
private String message = null;
+ private String topic = null;
- NSQAsyncWriter(String message) {
+ NSQAsyncWriter(String message, String topic) {
this.message = message;
+ this.topic = topic;
}
public Void call() throws NSQException {
try {
- NSQProducer.this.put(message);
+ NSQProducer.this.put(message, topic);
} catch (NSQException e) {
// Log the error here since caller probably won't ever check the future.
log.error("Error posting NSQ message:", e);
@@ -144,19 +176,26 @@ public void shutdown() {
}
}
- public String toString(){
- return "Writer<" + this.url + ">";
- }
- public String getUrl() {
- return url;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
+ public String getUrl(String topic) {
+ List urls = hostIndex.get(topic);
+ if (urls == null) {
+ if (StringUtils.isBlank(lookup.getLookupAddr()) && !StringUtils.isBlank(defaultNsqdAddr)) {
+ return new StringBuffer(defaultNsqdAddr).append(PUT_URL).append(topic).toString();
+ } else {
+ List httpAddrs = lookup.getHttpAddrs(topic);
+ if (httpAddrs == null) httpAddrs = lookup.getHttpAddrs();
+ if (httpAddrs == null) return null;
+ urls = new ArrayList(httpAddrs.size());
+ for (String httpAddr : httpAddrs) {
+ String url = new StringBuffer(httpAddr).append(PUT_URL).append(topic).toString();
+ urls.add(url);
+ }
+ hostIndex.put(topic, urls);
+ reTryCountMap.put(topic, 0);
+ }
+ }
+
+ return urls.get((int)(Math.random()*100) % urls.size());
}
/**
@@ -178,5 +217,21 @@ public void setSocketTimeout(int timeout) {
public void setConnectionTimeout(int timeout) {
this.httpclient.getParams().setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, timeout);
}
+
+ /**
+ * default 2
+ * @param max
+ */
+ public void setDefaultMaxPerRoute(int max) {
+ this.cm.setDefaultMaxPerRoute(max);
+ }
+
+ /**
+ * default 20
+ * @param max
+ */
+ public void setMaxTotal(int max) {
+ this.cm.setMaxTotal(max);
+ }
}
diff --git a/src/main/java/ly/bit/nsq/NSQReader.java b/src/main/java/ly/bit/nsq/NSQReader.java
index 1ad21d8..c3538f0 100644
--- a/src/main/java/ly/bit/nsq/NSQReader.java
+++ b/src/main/java/ly/bit/nsq/NSQReader.java
@@ -6,16 +6,16 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
import ly.bit.nsq.exceptions.NSQException;
-import ly.bit.nsq.lookupd.AbstractLookupd;
-import ly.bit.nsq.lookupd.BasicLookupdJob;
+import ly.bit.nsq.lookupd.ReaderLookupJob;
+import ly.bit.nsq.lookupd.DefaultLookup;
import ly.bit.nsq.util.ConnectionUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public abstract class NSQReader {
private static final Logger log = LoggerFactory.getLogger(NSQReader.class);
@@ -33,7 +33,7 @@ public abstract class NSQReader {
protected Class extends Connection> connClass;
protected ConcurrentHashMap connections;
- protected ConcurrentHashMap lookupdConnections;
+ protected ConcurrentHashMap lookupdConnections;
private ScheduledExecutorService lookupdScheduler;
@@ -55,8 +55,8 @@ public void init(String topic, String channel){
String[] hostParts = this.hostname.split("\\.");
this.shortHostname = hostParts[0];
- this.connClass = BasicConnection.class; // TODO can be passed by caller
- this.lookupdConnections = new ConcurrentHashMap();
+ this.connClass = DefaultConnection.class; // TODO can be passed by caller
+ this.lookupdConnections = new ConcurrentHashMap();
this.lookupdScheduler = Executors.newScheduledThreadPool(1);
// register action for shutdown
@@ -136,13 +136,13 @@ public void connectToNsqd(String address, int port) throws NSQException{
// lookupd stuff
- public void addLookupd(AbstractLookupd lookupd) {
- String addr = lookupd.getAddr();
- AbstractLookupd stored = this.lookupdConnections.putIfAbsent(addr, lookupd);
+ public void addLookupd(DefaultLookup lookupd) {
+ String lookupAddr = lookupd.getLookupAddr();
+ DefaultLookup stored = this.lookupdConnections.putIfAbsent(lookupAddr, lookupd);
if (stored != null){
return;
}
- lookupdScheduler.scheduleAtFixedRate(new BasicLookupdJob(addr, this), 30, 30, SECONDS);
+ lookupdScheduler.scheduleAtFixedRate(new ReaderLookupJob(lookupAddr, this), 0, 30, SECONDS);
}
public String toString(){
@@ -153,7 +153,7 @@ public String getTopic() {
return topic;
}
- public ConcurrentHashMap getLookupdConnections() {
+ public ConcurrentHashMap getLookupdConnections() {
return lookupdConnections;
}
diff --git a/src/main/java/ly/bit/nsq/example/ExampleProducer.java b/src/main/java/ly/bit/nsq/example/ExampleProducer.java
index 3e6aa07..52e8570 100644
--- a/src/main/java/ly/bit/nsq/example/ExampleProducer.java
+++ b/src/main/java/ly/bit/nsq/example/ExampleProducer.java
@@ -6,13 +6,14 @@
public class ExampleProducer {
public static void main(String... args){
- NSQProducer producer = new NSQProducer("http://127.0.0.1:4151", "testTopic");
+ NSQProducer producer = new NSQProducer("http://127.0.0.1:4151");
+ String topic = "testTopic";
for(int i=0; i<100; i++) {
try {
String message = "{\"foo\":\"bar\"}";
System.out.println("Sending: " + message);
- producer.put(message);
+ producer.put(message, topic);
Thread.sleep(1000);
} catch (NSQException e) {
e.printStackTrace();
diff --git a/src/main/java/ly/bit/nsq/example/PrintReader.java b/src/main/java/ly/bit/nsq/example/PrintReader.java
index db8835e..746ecd4 100644
--- a/src/main/java/ly/bit/nsq/example/PrintReader.java
+++ b/src/main/java/ly/bit/nsq/example/PrintReader.java
@@ -11,7 +11,7 @@
import ly.bit.nsq.Message;
import ly.bit.nsq.exceptions.NSQException;
-import ly.bit.nsq.lookupd.BasicLookupd;
+import ly.bit.nsq.lookupd.DefaultLookup;
import ly.bit.nsq.syncresponse.SyncResponseHandler;
import ly.bit.nsq.syncresponse.SyncResponseReader;
@@ -31,7 +31,7 @@ public static void main(String... args){
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
- reader.addLookupd(new BasicLookupd("http://127.0.0.1:4161"));
+ reader.addLookupd(new DefaultLookup("http://127.0.0.1:4161"));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
diff --git a/src/main/java/ly/bit/nsq/exceptions/NSQException.java b/src/main/java/ly/bit/nsq/exceptions/NSQException.java
index 94f8304..874e085 100644
--- a/src/main/java/ly/bit/nsq/exceptions/NSQException.java
+++ b/src/main/java/ly/bit/nsq/exceptions/NSQException.java
@@ -1,6 +1,7 @@
package ly.bit.nsq.exceptions;
public class NSQException extends Exception {
+ private static final long serialVersionUID = 7412096973790239373L;
public NSQException(Throwable t) {
super(t);
diff --git a/src/main/java/ly/bit/nsq/exceptions/RequeueWithoutBackoff.java b/src/main/java/ly/bit/nsq/exceptions/RequeueWithoutBackoff.java
index dcd836b..0542970 100644
--- a/src/main/java/ly/bit/nsq/exceptions/RequeueWithoutBackoff.java
+++ b/src/main/java/ly/bit/nsq/exceptions/RequeueWithoutBackoff.java
@@ -8,6 +8,7 @@
*
*/
public class RequeueWithoutBackoff extends NSQException {
+ private static final long serialVersionUID = 7482757109732670199L;
public RequeueWithoutBackoff(Throwable t) {
super(t);
diff --git a/src/main/java/ly/bit/nsq/lookupd/AbstractLookupd.java b/src/main/java/ly/bit/nsq/lookupd/AbstractLookupd.java
deleted file mode 100644
index 2e2f76c..0000000
--- a/src/main/java/ly/bit/nsq/lookupd/AbstractLookupd.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package ly.bit.nsq.lookupd;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public abstract class AbstractLookupd {
- private static final Logger log = LoggerFactory.getLogger(AbstractLookupd.class);
-
- protected String addr;
-
- public String getAddr() {
- return addr;
- }
-
- /**
- * This should handle making a request to lookupd, and returning which producers match the channel we want
- * Netty presumably can wait on the future or something, who knows...
- */
- public abstract List query(String topic);
-
- public static List parseResponseForProducers(Reader response){
- ObjectMapper mapper = new ObjectMapper();
- List outputs = new ArrayList();
- try {
- JsonNode rootNode = mapper.readTree(response);
- JsonNode producers = rootNode.path("data").path("producers");
- Iterator prodItr = producers.getElements();
- while(prodItr.hasNext()){
- JsonNode producer = prodItr.next();
- String addr = producer.path("broadcast_address").getTextValue();
- if ( addr == null ) { // We're keeping previous field compatibility, just in case
- addr = producer.path("address").getTextValue();
- }
- int tcpPort = producer.path("tcp_port").getIntValue();
- outputs.add(addr + ":" + tcpPort);
- }
- } catch (JsonParseException e) {
- log.error("Error parsing json from lookupd:", e);
- } catch (JsonMappingException e) {
- log.error("Error mapping json from lookupd:", e);
- } catch (IOException e) {
- log.error("Error reading response from lookupd:", e);
- }
- return outputs;
- }
-
- public static void main(String... args){
- String response = "{\"status_code\":200,\"status_txt\":\"OK\",\"data\":{\"channels\":[\"social_graph_input\"],\"producers\":[{\"address\":\"dev.bitly.org\",\"tcp_port\":4150,\"http_port\":4151,\"version\":\"0.2.16-alpha\"}]}}";
-// for (String addr : parseResponseForProducers(response)){
-// System.out.println(addr);
-// }
- }
-
-}
diff --git a/src/main/java/ly/bit/nsq/lookupd/BasicLookupd.java b/src/main/java/ly/bit/nsq/lookupd/BasicLookupd.java
deleted file mode 100644
index edc66bc..0000000
--- a/src/main/java/ly/bit/nsq/lookupd/BasicLookupd.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package ly.bit.nsq.lookupd;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.LinkedList;
-import java.util.List;
-
-public class BasicLookupd extends AbstractLookupd {
- private static final Logger log = LoggerFactory.getLogger(AbstractLookupd.class);
-
- @Override
- public List query(String topic) {
- String urlString = this.addr + "/lookup?topic=" + topic;
- URL url = null;
- try {
- url = new URL(urlString);
- InputStream is = url.openStream();
- BufferedReader br = new BufferedReader(new InputStreamReader(is));
- return parseResponseForProducers(br);
- } catch (MalformedURLException e) {
- log.error("Malformed Lookupd URL: {}", urlString);
- } catch (IOException e) {
- log.error("Problem reading lookupd response: ", e);
- }
- return new LinkedList();
- }
-
- public BasicLookupd(String addr){
- this.addr = addr;
- }
-
-}
diff --git a/src/main/java/ly/bit/nsq/lookupd/DefaultLookup.java b/src/main/java/ly/bit/nsq/lookupd/DefaultLookup.java
new file mode 100644
index 0000000..f8ba225
--- /dev/null
+++ b/src/main/java/ly/bit/nsq/lookupd/DefaultLookup.java
@@ -0,0 +1,143 @@
+package ly.bit.nsq.lookupd;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import ly.bit.nsq.util.StringUtils;
+
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DefaultLookup {
+ private static final Logger log = LoggerFactory.getLogger(DefaultLookup.class);
+ private static final int DEFAULT_CONNECT_TIME_OUT = 5000;
+ private static final int DEFAULT_READ_TIME_OUT = 5000;
+ private String lookupAddr;
+
+ public DefaultLookup(String lookupAddr) {
+ this.lookupAddr = StringUtils.trimRight("/", lookupAddr);
+ }
+
+ public String getLookupAddr() {
+ return lookupAddr;
+ }
+
+ public List getTcpAddrs(String topic) {
+ Map> allAddr = getLookup(topic);
+ if (allAddr == null) return null;
+ return allAddr.get("tcp");
+ }
+
+ public List getHttpAddrs(String topic) {
+ Map> allAddr = getLookup(topic);
+ if (allAddr == null) return null;
+ return allAddr.get("http");
+ }
+
+ public List getHttpAddrs() {
+ Map> allAddr = getNodes();
+ if (allAddr == null) return null;
+ return allAddr.get("http");
+ }
+
+ public String getAvailableHttpAddr(String topic) {
+ List allHttpAddr = getHttpAddrs(topic);
+ if (allHttpAddr == null || allHttpAddr.size() < 1) return null;
+ return allHttpAddr.get(0);
+ }
+
+ public String getAvailableHttpAddr() {
+ List allHttpAddr = getHttpAddrs();
+ if (allHttpAddr == null || allHttpAddr.size() < 1) return null;
+ return allHttpAddr.get(0);
+ }
+
+ private Map> getLookup(String topic) {
+ String lookupUrl = lookupAddr + "/lookup?topic=" + topic;
+ return parse(httpReader(lookupUrl));
+ }
+
+ private Map> getNodes() {
+ String nodesUrl = lookupAddr + "/nodes";
+ return parse(httpReader(nodesUrl));
+ }
+
+ private Map> parse(Reader reader) {
+ if (reader == null) return null;
+
+ Map> allAvailableAddr = new HashMap>();
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ JsonNode rootNode = mapper.readTree(reader);
+ int statusCode = rootNode.path("status_code").getIntValue();
+ if (statusCode != 200) return null;
+
+ JsonNode producers = rootNode.path("data").path("producers");
+ List tcpProducers = new ArrayList();
+ List httpProducers = new ArrayList();
+ Iterator prodItr = producers.getElements();
+ while(prodItr.hasNext()){
+ JsonNode producer = prodItr.next();
+ String addr = producer.path("broadcast_address").getTextValue();
+ if (addr == null) { // We're keeping previous field compatibility, just in case
+ addr = producer.path("address").getTextValue();
+ }
+ int tcpPort = producer.path("tcp_port").getIntValue();
+ int httpPort = producer.path("http_port").getIntValue();
+ tcpProducers.add(addr + ":" + tcpPort);
+ httpProducers.add("http://" + addr + ":" + httpPort);
+ }
+ allAvailableAddr.put("tcp", tcpProducers);
+ allAvailableAddr.put("http", httpProducers);
+ return allAvailableAddr;
+ } catch (JsonParseException e) {
+ log.error("Error parsing json from lookupd:", e);
+ } catch (JsonMappingException e) {
+ log.error("Error mapping json from lookupd:", e);
+ } catch (IOException e) {
+ log.error("Error reading response from lookupd:", e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ log.error("lookup close reader error:", e);
+ }
+ }
+ }
+ return null;
+ }
+
+ private Reader httpReader(String uri) {
+ try {
+ URL url = new URL(uri);
+ URLConnection conn = url.openConnection();
+ conn.setConnectTimeout(DEFAULT_CONNECT_TIME_OUT);
+ conn.setReadTimeout(DEFAULT_READ_TIME_OUT);
+ InputStream is = url.openStream();
+ BufferedReader br = new BufferedReader(new InputStreamReader(is));
+ return br;
+ } catch (MalformedURLException e) {
+ log.error("Malformed Lookupd URL: {}", uri);
+ } catch (IOException e) {
+ log.error("Problem reading lookupd response: ", e);
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java b/src/main/java/ly/bit/nsq/lookupd/ReaderLookupJob.java
similarity index 65%
rename from src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java
rename to src/main/java/ly/bit/nsq/lookupd/ReaderLookupJob.java
index 1bb4eba..2298c92 100644
--- a/src/main/java/ly/bit/nsq/lookupd/BasicLookupdJob.java
+++ b/src/main/java/ly/bit/nsq/lookupd/ReaderLookupJob.java
@@ -8,21 +8,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BasicLookupdJob implements Runnable {
- private static final Logger log = LoggerFactory.getLogger(BasicLookupdJob.class);
+public class ReaderLookupJob implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(ReaderLookupJob.class);
final private String lookupdAddress;
final private NSQReader reader;
- public BasicLookupdJob(String lookupdAddress, NSQReader reader) {
+ public ReaderLookupJob(String lookupdAddress, NSQReader reader) {
this.lookupdAddress = lookupdAddress;
this.reader = reader;
}
@Override
public void run() {
- Map lookupdConnections = reader.getLookupdConnections();
- AbstractLookupd lookupd = lookupdConnections.get(lookupdAddress);
- List producers = lookupd.query(reader.getTopic());
+ Map lookupdConnections = reader.getLookupdConnections();
+ DefaultLookup lookupd = lookupdConnections.get(lookupdAddress);
+ List producers = lookupd.getTcpAddrs(reader.getTopic());
for(String producer : producers) {
String[] components = producer.split(":");
String nsqdAddress = components[0];
diff --git a/src/main/java/ly/bit/nsq/syncresponse/SyncResponseReader.java b/src/main/java/ly/bit/nsq/syncresponse/SyncResponseReader.java
index a071dc8..6def40c 100644
--- a/src/main/java/ly/bit/nsq/syncresponse/SyncResponseReader.java
+++ b/src/main/java/ly/bit/nsq/syncresponse/SyncResponseReader.java
@@ -3,8 +3,6 @@
import ly.bit.nsq.Message;
import ly.bit.nsq.NSQReader;
import ly.bit.nsq.exceptions.RequeueWithoutBackoff;
-import ly.bit.nsq.lookupd.AbstractLookupd;
-import ly.bit.nsq.lookupd.BasicLookupd;
public class SyncResponseReader extends NSQReader {
diff --git a/src/main/java/ly/bit/nsq/util/StringUtils.java b/src/main/java/ly/bit/nsq/util/StringUtils.java
new file mode 100644
index 0000000..7773976
--- /dev/null
+++ b/src/main/java/ly/bit/nsq/util/StringUtils.java
@@ -0,0 +1,18 @@
+package ly.bit.nsq.util;
+
+public class StringUtils {
+ public static String trimRight(String trimStr, String sourceStr) {
+ if (trimStr == null || sourceStr == null
+ || trimStr.equals("") || sourceStr.equals("")) return sourceStr;
+
+ int len = sourceStr.length();
+ if (sourceStr.lastIndexOf(trimStr) == (len-1)) return sourceStr.substring(0, len-1);
+ return sourceStr;
+ }
+
+ public static boolean isBlank(String str) {
+ if (str == null) return true;
+ if (str.trim().equals("")) return true;
+ return false;
+ }
+}
diff --git a/src/test/java/ly/bit/nsq/NSQProducerTest.java b/src/test/java/ly/bit/nsq/NSQProducerTest.java
index f120ded..6b3461e 100644
--- a/src/test/java/ly/bit/nsq/NSQProducerTest.java
+++ b/src/test/java/ly/bit/nsq/NSQProducerTest.java
@@ -9,7 +9,6 @@
import org.apache.http.*;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.message.BasicStatusLine;
import org.junit.After;
import org.junit.Before;
@@ -26,15 +25,16 @@
public class NSQProducerTest {
private Logger log = LoggerFactory.getLogger(NSQProducerTest.class);
NSQProducer producer;
+ String host = "http://127.0.0.1:4151";
String topic = "andy_wuz_ere";
HttpClient mockClient;
@Before
public void setUp() {
mockClient = mock(HttpClient.class);
- producer = new NSQProducer(topic);
+ producer = new NSQProducer(host);
producer.httpclient = mockClient;
- assertEquals("http://127.0.0.1:4151/put?topic=" + topic, producer.getUrl());
+ assertEquals("http://127.0.0.1:4151/put?topic=" + topic, producer.getUrl(topic));
}
@After
@@ -49,7 +49,7 @@ public void testPut_success() throws Exception {
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), HttpStatus.SC_OK, "OK");
when(successResponse.getStatusLine()).thenReturn(statusLine);
when(mockClient.execute(any(HttpPost.class))).thenReturn(successResponse);
- producer.put("{foo:\"bar\"}");
+ producer.put("{foo:\"bar\"}", topic);
}
@Test(expected = NSQException.class)
@@ -60,7 +60,7 @@ public void testPut_error() throws Exception {
"OH NOES, NSQD CRASHED");
when(errorResponse.getStatusLine()).thenReturn(statusLine);
when(mockClient.execute(any(HttpPost.class))).thenReturn(errorResponse);
- producer.put("{foo:\"bar\"}");
+ producer.put("{foo:\"bar\"}", topic);
}
@Test
@@ -70,7 +70,7 @@ public void testPutAsync_success() throws Exception {
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), HttpStatus.SC_OK, "OK");
when(successResponse.getStatusLine()).thenReturn(statusLine);
when(mockClient.execute(any(HttpPost.class))).thenReturn(successResponse);
- FutureTask future = producer.putAsync("{foo:\"bar\"}");
+ FutureTask future = producer.putAsync("{foo:\"bar\"}", topic);
future.get();
}
@@ -82,7 +82,7 @@ public void testPutAsync_error() throws Exception {
"OH NOES, NSQD CRASHED");
when(errorResponse.getStatusLine()).thenReturn(statusLine);
when(mockClient.execute(any(HttpPost.class))).thenReturn(errorResponse);
- FutureTask future = producer.putAsync("{foo:\"bar\"}");
+ FutureTask future = producer.putAsync("{foo:\"bar\"}", topic);
future.get();
}
@@ -93,7 +93,7 @@ public void testShutdown() throws Exception {
StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), HttpStatus.SC_OK, "OK");
when(successResponse.getStatusLine()).thenReturn(statusLine);
when(mockClient.execute(any(HttpPost.class))).thenReturn(successResponse);
- FutureTask future = producer.putAsync("{foo:\"bar\"}");
+ FutureTask future = producer.putAsync("{foo:\"bar\"}", topic);
producer.shutdown();
assertTrue(producer.executor.isShutdown());