From 0db92c36af6755c68fdeea0de882429c710bad95 Mon Sep 17 00:00:00 2001 From: mbrenguier Date: Wed, 14 Feb 2018 18:26:06 +0100 Subject: [PATCH 1/6] refs #74795 - adding disturb starter --- Library/Client/DisturbStarter.php | 44 +++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 Library/Client/DisturbStarter.php diff --git a/Library/Client/DisturbStarter.php b/Library/Client/DisturbStarter.php new file mode 100644 index 0000000..686048e --- /dev/null +++ b/Library/Client/DisturbStarter.php @@ -0,0 +1,44 @@ + $workflowId, + 'type' => Message\MessageDto::TYPE_WF_CTRL, + 'action' => 'start', + 'payload' => $payloadHash + ]; + //send message with givens params + $kafkaProducer = new \RdKafka\Producer(); + $kafkaProducer->addBrokers($brokers); + $kafkaTopic = $kafkaProducer->newTopic($topicName); + error_log($topicName); + error_log(print_r($messageHash,1)); + $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($messageHash)); + } +} \ No newline at end of file From 15c51e987fddc7bcd350926d8066c393ac834a1b Mon Sep 17 00:00:00 2001 From: mbrenguier Date: Wed, 14 Feb 2018 18:36:44 +0100 Subject: [PATCH 2/6] remove logs --- Library/Client/DisturbStarter.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/Library/Client/DisturbStarter.php b/Library/Client/DisturbStarter.php index 686048e..b646e24 100644 --- a/Library/Client/DisturbStarter.php +++ b/Library/Client/DisturbStarter.php @@ -37,8 +37,6 @@ public static function start(string $workflowId, array $payloadHash, string $bro $kafkaProducer = new \RdKafka\Producer(); $kafkaProducer->addBrokers($brokers); $kafkaTopic = $kafkaProducer->newTopic($topicName); - error_log($topicName); - error_log(print_r($messageHash,1)); $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($messageHash)); } } \ No newline at end of file From 4e4a83471d9b8720c824416409f604a059419238 Mon Sep 17 00:00:00 2001 From: mbrenguier Date: Thu, 15 Feb 2018 15:36:30 +0100 Subject: [PATCH 3/6] add get status command --- Library/Client/Command.php | 49 +++++++++++++++++++++++++++++++ Library/Client/DisturbStarter.php | 42 -------------------------- bin/configCommand.php | 22 ++++++++++++++ 3 files changed, 71 insertions(+), 42 deletions(-) create mode 100644 Library/Client/Command.php delete mode 100644 Library/Client/DisturbStarter.php create mode 100644 bin/configCommand.php diff --git a/Library/Client/Command.php b/Library/Client/Command.php new file mode 100644 index 0000000..935eef4 --- /dev/null +++ b/Library/Client/Command.php @@ -0,0 +1,49 @@ + $workflowProcessId, + 'type' => Message\MessageDto::TYPE_WF_CTRL, + 'action' => 'start', + 'payload' => $payloadHash + ]; + //send message with givens params + $kafkaProducer = new \RdKafka\Producer(); + $kafkaProducer->addBrokers($brokers); + $kafkaTopic = $kafkaProducer->newTopic($topicName); + $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($messageHash)); + } + + /** + * Get status for a specified workflow process id + * + * @param string $workflowProcessId workflow process id + * @param string $workflowConfigFilePath workflow config file path + */ + public static function getStatus(string $workflowProcessId, string $workflowConfigFilePath) + { + $workflowConfigDto = WorkflowConfigDtoFactory::get($workflowConfigFilePath); + $contextStorage = new Context\ContextStorageService($workflowConfigDto); + return $contextStorage->get($workflowProcessId)->getWorkflowStatus(); + } +} \ No newline at end of file diff --git a/Library/Client/DisturbStarter.php b/Library/Client/DisturbStarter.php deleted file mode 100644 index b646e24..0000000 --- a/Library/Client/DisturbStarter.php +++ /dev/null @@ -1,42 +0,0 @@ - $workflowId, - 'type' => Message\MessageDto::TYPE_WF_CTRL, - 'action' => 'start', - 'payload' => $payloadHash - ]; - //send message with givens params - $kafkaProducer = new \RdKafka\Producer(); - $kafkaProducer->addBrokers($brokers); - $kafkaTopic = $kafkaProducer->newTopic($topicName); - $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($messageHash)); - } -} \ No newline at end of file diff --git a/bin/configCommand.php b/bin/configCommand.php new file mode 100644 index 0000000..90279f2 --- /dev/null +++ b/bin/configCommand.php @@ -0,0 +1,22 @@ +registerNamespaces( +[ +'Vpg\Disturb' => realpath(__DIR__ . '/../Library/') +], +true +); + +$loader->registerFiles([ +__DIR__ . '/../../vendor/autoload.php', +__DIR__ . '/../../../autoload.php' +]); +$loader->register(); + +require_once(__DIR__ . '/../Library/Core/DI.php'); + +$di->setShared('loader', $loader); From 46f50864f0940a51f613b16cae37670306838ab2 Mon Sep 17 00:00:00 2001 From: mbrenguier Date: Thu, 15 Feb 2018 15:47:14 +0100 Subject: [PATCH 4/6] fixing doc --- Library/Client/Command.php | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Library/Client/Command.php b/Library/Client/Command.php index 935eef4..8d03263 100644 --- a/Library/Client/Command.php +++ b/Library/Client/Command.php @@ -8,6 +8,13 @@ include realpath(__DIR__ . '/../../bin/configCommand.php'); +/** + * Class Disturb Client Command + * + * @package Disturb\Client + * @author Maxime BRENGUIER + * @license https://github.com/vpg/disturb/blob/master/LICENSE MIT Licence + */ class Command extends Component { /** @@ -18,6 +25,7 @@ class Command extends Component * @param String $brokers broker list * @param String $topicName topic name * + * @return void */ public static function start(string $workflowProcessId, array $payloadHash, string $brokers, string $topicName) { @@ -39,6 +47,8 @@ public static function start(string $workflowProcessId, array $payloadHash, stri * * @param string $workflowProcessId workflow process id * @param string $workflowConfigFilePath workflow config file path + * + * @return string */ public static function getStatus(string $workflowProcessId, string $workflowConfigFilePath) { From 1b5816a1e782c7b649c554f95dc7d665dbae6b8f Mon Sep 17 00:00:00 2001 From: mbrenguier Date: Tue, 6 Mar 2018 12:17:52 +0100 Subject: [PATCH 5/6] fix workflow --- Library/Workflow/ManagerService.php | 4 +++- Library/Workflow/ManagerWorker.php | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Library/Workflow/ManagerService.php b/Library/Workflow/ManagerService.php index f771f47..66a9870 100644 --- a/Library/Workflow/ManagerService.php +++ b/Library/Workflow/ManagerService.php @@ -214,9 +214,11 @@ public function getNextStepList(string $workflowProcessId) : array */ public function hasNextStep(string $workflowProcessId) : bool { - $this->di->get('logr')->debug(json_encode(func_get_args())); + $this->di->get('logr')->info(json_encode(func_get_args())); $contextDto = $this->di->get('contextStorage')->get($workflowProcessId); + $nextStepPos = $contextDto->getWorkflowCurrentPosition() + 1; + return !empty($this->workflowConfig->getStepList()[$nextStepPos]); } diff --git a/Library/Workflow/ManagerWorker.php b/Library/Workflow/ManagerWorker.php index e17f179..77fccf0 100644 --- a/Library/Workflow/ManagerWorker.php +++ b/Library/Workflow/ManagerWorker.php @@ -141,12 +141,14 @@ protected function processMessage(Message\MessageDto $messageDto) break; case ManagerService::STATUS_SUCCESS: // go to next step if there is a next step and while the current step has no job - while ($hasNext = $this->workflowManagerService->hasNextStep($messageDto->getId()) && + while (($hasNext = $this->workflowManagerService->hasNextStep($messageDto->getId())) && false == $this->runNextStep($messageDto->getId()) ) { $this->getDI()->get('logr')->warning("Current step skipped"); }; + + if (!$hasNext) { $this->workflowManagerService->finalize( $messageDto->getId(), From 5d9431928473b0a39547b4e8a1e1ff7ec1058a28 Mon Sep 17 00:00:00 2001 From: mbrenguier Date: Wed, 7 Mar 2018 17:18:06 +0100 Subject: [PATCH 6/6] add get context command --- Library/Client/Command.php | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Library/Client/Command.php b/Library/Client/Command.php index 8d03263..e78d9d8 100644 --- a/Library/Client/Command.php +++ b/Library/Client/Command.php @@ -56,4 +56,19 @@ public static function getStatus(string $workflowProcessId, string $workflowConf $contextStorage = new Context\ContextStorageService($workflowConfigDto); return $contextStorage->get($workflowProcessId)->getWorkflowStatus(); } + + /** + * Get context for a specified workflow process id + * + * @param string $workflowProcessId workflow process id + * @param string $workflowConfigFilePath workflow config file path + * + * @return array + */ + public static function getContext(string $workflowProcessId, string $workflowConfigFilePath) + { + $workflowConfigDto = WorkflowConfigDtoFactory::get($workflowConfigFilePath); + $contextStorage = new Context\ContextStorageService($workflowConfigDto); + return $contextStorage->get($workflowProcessId)->getStepResultData(); + } } \ No newline at end of file