diff --git a/celery-java/pom.xml b/celery-java/pom.xml index 4b186f8..7d3c2a1 100644 --- a/celery-java/pom.xml +++ b/celery-java/pom.xml @@ -7,7 +7,9 @@ 4.0.0 + org.sedlakovi.celery celery-java + 1.3-SNAPSHOT jar Celery-Java @@ -137,4 +139,22 @@ + + 1.8 + 1.8 + + + + + + spring-releases + https://repo.spring.io/libs-release + + + + + spring-releases + https://repo.spring.io/libs-release + + diff --git a/celery-java/src/main/java/com/geneea/celery/Celery.java b/celery-java/src/main/java/com/geneea/celery/Celery.java index 7413a9a..acbcb92 100644 --- a/celery-java/src/main/java/com/geneea/celery/Celery.java +++ b/celery-java/src/main/java/com/geneea/celery/Celery.java @@ -3,8 +3,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.geneea.celery.backends.rabbit.RabbitResultConsumer; +import com.geneea.celery.brokers.rabbit.RabbitBroker; import com.google.common.base.Joiner; import com.google.common.base.Suppliers; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; import lombok.Builder; import lombok.extern.java.Log; import com.geneea.celery.backends.CeleryBackends; @@ -36,12 +41,12 @@ public class Celery { private final ObjectMapper jsonMapper = new ObjectMapper(); private final String queue; - // Memoized suppliers help us to deal with a connection that can't be established yet. It may fail several times + // Memorized suppliers help us to deal with a connection that can't be established yet. It may fail several times // with an exception but when it succeeds, it then always returns the same instance. // // This is tailored for the RabbitMQ connections - they fail to be created if the host can't be reached but they // can heal automatically. If other brokers/backends don't work this way, we might need to rework it. - private final Supplier> resultsProvider; + public final Supplier> resultsProvider; private final Supplier broker; /** @@ -49,13 +54,15 @@ public class Celery { * * @param brokerUri connection to broker that will dispatch messages * @param backendUri connection to backend providing responses + * @param maxPriority the max priority of the queue if any, otherwise set to zero * @param queue routing tag (specifies into which Rabbit queue the messages will go) */ @Builder private Celery(final String brokerUri, @Nullable final String queue, @Nullable final String backendUri, - @Nullable final ExecutorService executor) { + @Nullable final ExecutorService executor, + Optional maxPriority) { this.queue = queue == null ? "celery" : queue; ExecutorService executorService = executor != null ? executor : Executors.newCachedThreadPool(); @@ -63,7 +70,13 @@ private Celery(final String brokerUri, broker = Suppliers.memoize(() -> { Broker b = CeleryBrokers.createBroker(brokerUri, executorService); try { - b.declareQueue(Celery.this.queue); + if( maxPriority.isPresent()){ + b.declareQueue(Celery.this.queue, maxPriority.get()); + } + else { + b.declareQueue(Celery.this.queue); + } + } catch (IOException e) { throw new RuntimeException(e); } @@ -95,6 +108,30 @@ private String getLocalHostName() { } } + + public Connection getBrokerConnection(){ + try{ + RabbitBroker b = (RabbitBroker)broker.get(); + Connection con = b.getChannel().getConnection(); + return con; + }catch (Exception ex){ + System.out.println(String.format("Can not get celery broker connection with ex:%s", ex.toString())); + return null; + } + } + + public Connection getBackendConnection(){ + try{ + RabbitResultConsumer df = (RabbitResultConsumer)resultsProvider.get().get() ; + + Connection conn = df.getChannel().getConnection(); + return conn; + }catch (Exception ex){ + System.out.println(String.format("Can not get celery backend connection with ex:%s", ex.toString())); + return null; + } + } + /** * Submit a Java task for processing. You'll probably not need to call this method. rather use @{@link CeleryTask} * annotation. @@ -110,6 +147,22 @@ public AsyncResult submit(Class taskClass, String method, Object[] args) t return submit(taskClass.getName() + "#" + method, args); } + /** + * Submit a Java task for processing with priority. You'll probably not need to call this method. rather use @{@link CeleryTask} + * annotation. + * + * @param taskClass task implementing class + * @param method method in {@code taskClass} that does the work + * @param priority the priority of the task + * @param args positional arguments for the method (need to be JSON serializable) + * @return asynchronous result + * + * @throws IOException if the message couldn't be sent + */ + public AsyncResult submit(Class taskClass, String method, int priority, Object[] args) throws IOException { + return submit(taskClass.getName() + "#" + method, priority, args); + } + /** * Submit a task by name. A low level method for submitting arbitrary tasks that don't have their proxies * generated by @{@link CeleryTask} annotation. @@ -142,6 +195,62 @@ public AsyncResult submit(String name, Object[] args) throws IOException { .putNull("errbacks"); Message message = broker.get().newMessage(); + + message.setBody(jsonMapper.writeValueAsBytes(payload)); + message.setContentEncoding("utf-8"); + message.setContentType("application/json"); + + Message.Headers headers = message.getHeaders(); + headers.setId(taskId); + headers.setTaskName(name); + headers.setArgsRepr("(" + Joiner.on(", ").join(args) + ")"); + headers.setOrigin(clientName); + if (rp.isPresent()) { + headers.setReplyTo(clientId); + } + + message.send(queue); + + Future result; + if (rp.isPresent()) { + result = rp.get().getResult(taskId); + } else { + result = CompletableFuture.completedFuture(null); + } + return new AsyncResultImpl<>(result, taskId); + } + + /** + * Submit a task by name with priority. + * + * @param name task name as understood by the worker + * @param priority the priority of the message + * @param args positional arguments for the method (need to be JSON serializable) + * @return asynchronous result + * @throws IOException + */ + public AsyncResult submit(String name, int priority, Object[] args) throws IOException { + // Get the provider early to increase the chance to find out there is a connection problem before actually + // sending the message. + // + // This will help for example in the case when the connection can't be established at all. The connection may + // still drop after sending the message but there isn't much we can do about it. + Optional rp = resultsProvider.get(); + String taskId = UUID.randomUUID().toString(); + + ArrayNode payload = jsonMapper.createArrayNode(); + ArrayNode argsArr = payload.addArray(); + for (Object arg : args) { + argsArr.addPOJO(arg); + } + payload.addObject(); + payload.addObject() + .putNull("callbacks") + .putNull("chain") + .putNull("chord") + .putNull("errbacks"); + + Message message = broker.get().newMessage(priority); message.setBody(jsonMapper.writeValueAsBytes(payload)); message.setContentEncoding("utf-8"); message.setContentType("application/json"); @@ -163,23 +272,31 @@ public AsyncResult submit(String name, Object[] args) throws IOException { } else { result = CompletableFuture.completedFuture(null); } - return new AsyncResultImpl<>(result); + return new AsyncResultImpl<>(result, taskId); } public interface AsyncResult { boolean isDone(); T get() throws ExecutionException, InterruptedException; + + String getTaskId(); } private class AsyncResultImpl implements AsyncResult { private final Future future; + private String taskId; AsyncResultImpl(Future future) { this.future = future; } + AsyncResultImpl(Future future, String taskId) { + this.future = future; + this.taskId = taskId; + } + @Override public boolean isDone() { return future.isDone(); @@ -189,5 +306,13 @@ public boolean isDone() { public T get() throws ExecutionException, InterruptedException { return future.get(); } + + public String getTaskId(){ + if(taskId != null) { + return taskId; + }else { + return ""; + } + } } } diff --git a/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java b/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java index 5853588..846dde9 100644 --- a/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java +++ b/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java @@ -18,7 +18,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -class RabbitResultConsumer extends DefaultConsumer implements RabbitBackend.ResultsProvider { +public class RabbitResultConsumer extends DefaultConsumer implements RabbitBackend.ResultsProvider { private final LoadingCache> tasks = CacheBuilder diff --git a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java index 69f5613..0064e31 100644 --- a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java +++ b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java @@ -10,7 +10,7 @@ import java.util.HashMap; import java.util.Map; -class RabbitBroker implements Broker { +public class RabbitBroker implements Broker { private final Channel channel; public RabbitBroker(Channel channel) { @@ -22,18 +22,45 @@ public void declareQueue(String name) throws IOException { channel.queueDeclare(name, true, false, false, null); } + @Override + public void declareQueue(String name, int maxPriority) throws IOException { + Map props = new HashMap<>(); + props.put("x-max-priority", maxPriority); + channel.queueDeclare(name, true, false, false, props); + } + @Override public Message newMessage() { return new RabbitMessage(); } + public Channel getChannel() { + return channel; + } + + @Override + public Message newMessage(int priority) { + return new RabbitMessage(priority); + } + class RabbitMessage implements Message { private byte[] body; - private final AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder() - .deliveryMode(2) - .priority(0); + private final AMQP.BasicProperties.Builder props; + private final RabbitMessageHeaders headers = new RabbitMessageHeaders(); + public RabbitMessage(){ + props = new AMQP.BasicProperties.Builder() + .deliveryMode(2) + .priority(0); + } + + public RabbitMessage(int priority){ + props = new AMQP.BasicProperties.Builder() + .deliveryMode(2) + .priority(priority); + } + @Override public void setBody(byte[] body) { this.body = body; diff --git a/celery-java/src/main/java/com/geneea/celery/spi/Broker.java b/celery-java/src/main/java/com/geneea/celery/spi/Broker.java index cedd66f..5483ceb 100644 --- a/celery-java/src/main/java/com/geneea/celery/spi/Broker.java +++ b/celery-java/src/main/java/com/geneea/celery/spi/Broker.java @@ -20,8 +20,21 @@ public interface Broker { */ void declareQueue(String name) throws IOException; + /** + * @param name queue name + * @param maxPriority the max priority of the queue with priority + * @throws IOException + */ + void declareQueue(String name, int maxPriority) throws IOException; + /** * @return message that can be constructed and later sent */ Message newMessage(); + + /** + * @param priority the priority of the message that is executed + * @return message that can be constructed and later sent + */ + Message newMessage(int priority); } diff --git a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy index 5e08aa7..b6b7ad6 100644 --- a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy +++ b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy @@ -31,12 +31,31 @@ public class MockBrokerFactory implements BrokerFactory { queuesDeclared.add(name) } + /** + * @param name queue name + * @param maxPriority the max priority of the queue with priority + * @throws IOException + */ + @Override + void declareQueue(String name, int maxPriority) throws IOException { + + } + @Override Message newMessage() { def message = messages[messageNum % messages.size()] messageNum++ return message } + + /** + * @param priority the priority of the message that is executed + * @return message that can be constructed and later sent + */ + @Override + Message newMessage(int priority) { + return null + } } } } diff --git a/celery-java/src/test/java/TestCases.java b/celery-java/src/test/java/TestCases.java new file mode 100644 index 0000000..a98bb03 --- /dev/null +++ b/celery-java/src/test/java/TestCases.java @@ -0,0 +1,38 @@ +import com.geneea.celery.Celery; +import org.junit.Test; + +import java.util.Optional; + +/** + * Created by stone on 18-3-7. + */ +public class TestCases { + + @Test + public void testPriority(){ + try{ + Optional p = Optional.of(10); + + Celery client = Celery.builder() + .queue("test_idata1") + .brokerUri("amqp://guest:123456@10.12.6.19:5672/10.12.6.19") + .backendUri("rpc://guest:123456@10.12.6.19:5672/10.12.6.19") + .maxPriority(p) + .build(); + + for(int i=0 ;i <100 ; i++){ + Celery.AsyncResult a = client.submit("app_tasks.add", i % 10, new Object[]{i % 10, 0}); + while(!a.isDone()){ + Thread.sleep(100); + } + + + System.out.println(a.getTaskId() + "~~~~ result is :"+ a.get()); + } + + }catch (Exception ex){ + + } + + } +} diff --git a/examples/pom.xml b/examples/pom.xml index 9eda9be..c731fb0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -61,6 +61,11 @@ 1.2.1 test + + org.springframework + spring-core + RELEASE + diff --git a/examples/src/main/java/com/geneea/celery/examples/Main.java b/examples/src/main/java/com/geneea/celery/examples/Main.java index 3448f1a..2403653 100644 --- a/examples/src/main/java/com/geneea/celery/examples/Main.java +++ b/examples/src/main/java/com/geneea/celery/examples/Main.java @@ -6,6 +6,7 @@ import com.geneea.celery.Celery; import com.geneea.celery.CeleryWorker; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -13,32 +14,51 @@ public class Main { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); + //factory.setHost("localhost"); ExecutorService executor = Executors.newCachedThreadPool(); - Connection connection = factory.newConnection(executor); + factory.setUri("amqp://guest:123456@10.12.6.19:5672/10.12.6.19"); + factory.setUsername("guest"); + factory.setPassword("123456"); + factory.setVirtualHost("10.12.6.19"); - CeleryWorker worker = CeleryWorker.create("celery", connection); + Connection connection = factory.newConnection(); + + connection.close(); + + //CeleryWorker worker = CeleryWorker.create("celery", connection); Celery client = Celery.builder() - .brokerUri("amqp://localhost/%2F") - .backendUri("rpc://localhost/%2F") + .queue("test_idata1") + .brokerUri("amqp://guest:123456@10.12.6.19:5672/10.12.6.19") + .backendUri("rpc://guest:123456@10.12.6.19:5672/10.12.6.19") + .maxPriority(Optional.of(10)) .build(); try { - for (int i = 0; i < 20; i++) { + /*for (int i = 0; i < 20; i++) { Stopwatch sw = Stopwatch.createStarted(); Integer result = TestTaskProxy.with(client).sum(1, i).get(); System.out.printf("CeleryTask #%d's result was: %s. The task took %s end-to-end.\n", i, result, sw); - } + }*/ + + //System.out.println("Testing result of void task: " + TestVoidTaskProxy.with(client).run(1, 2).get()); + - System.out.println("Testing result of void task: " + TestVoidTaskProxy.with(client).run(1, 2).get()); + String re="hh"; System.out.println("Testing task that should fail and throw exception:"); - client.submit(TestTask.class, "sum", new Object[]{"a", "b"}).get(); + for(int i=0;i <10000; i++){ + Celery.AsyncResult t=client.submit(TestTask.class, "sum", new Object[]{re, 'd'}); + System.out.println( String.format("current is %s with id :%s", i, t.getTaskId())); + + //boolean res= executor.isShutdown(); + } + } finally { - connection.close(); - worker.close(); - worker.join(); - executor.shutdown(); + + Connection cb= client.getBackendConnection(); + Connection cbr = client.getBrokerConnection(); + cb.close(); + cbr.close(); } // The worker threads hang waiting for the messages for some reason for quite a long time but eventually,