diff --git a/Library/Client/Command.php b/Library/Client/Command.php new file mode 100644 index 0000000..e78d9d8 --- /dev/null +++ b/Library/Client/Command.php @@ -0,0 +1,74 @@ + + * @license https://github.com/vpg/disturb/blob/master/LICENSE MIT Licence + */ +class Command extends Component +{ + /** + * Start workflow by sending a message in related topic + * + * @param String $workflowProcessId Workflow id + * @param Array $payloadHash List of params + * @param String $brokers broker list + * @param String $topicName topic name + * + * @return void + */ + public static function start(string $workflowProcessId, array $payloadHash, string $brokers, string $topicName) + { + $messageHash = [ + 'id' => $workflowProcessId, + 'type' => Message\MessageDto::TYPE_WF_CTRL, + 'action' => 'start', + 'payload' => $payloadHash + ]; + //send message with givens params + $kafkaProducer = new \RdKafka\Producer(); + $kafkaProducer->addBrokers($brokers); + $kafkaTopic = $kafkaProducer->newTopic($topicName); + $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($messageHash)); + } + + /** + * Get status for a specified workflow process id + * + * @param string $workflowProcessId workflow process id + * @param string $workflowConfigFilePath workflow config file path + * + * @return string + */ + public static function getStatus(string $workflowProcessId, string $workflowConfigFilePath) + { + $workflowConfigDto = WorkflowConfigDtoFactory::get($workflowConfigFilePath); + $contextStorage = new Context\ContextStorageService($workflowConfigDto); + return $contextStorage->get($workflowProcessId)->getWorkflowStatus(); + } + + /** + * Get context for a specified workflow process id + * + * @param string $workflowProcessId workflow process id + * @param string $workflowConfigFilePath workflow config file path + * + * @return array + */ + public static function getContext(string $workflowProcessId, string $workflowConfigFilePath) + { + $workflowConfigDto = WorkflowConfigDtoFactory::get($workflowConfigFilePath); + $contextStorage = new Context\ContextStorageService($workflowConfigDto); + return $contextStorage->get($workflowProcessId)->getStepResultData(); + } +} \ No newline at end of file diff --git a/Library/Workflow/ManagerService.php b/Library/Workflow/ManagerService.php index f771f47..66a9870 100644 --- a/Library/Workflow/ManagerService.php +++ b/Library/Workflow/ManagerService.php @@ -214,9 +214,11 @@ public function getNextStepList(string $workflowProcessId) : array */ public function hasNextStep(string $workflowProcessId) : bool { - $this->di->get('logr')->debug(json_encode(func_get_args())); + $this->di->get('logr')->info(json_encode(func_get_args())); $contextDto = $this->di->get('contextStorage')->get($workflowProcessId); + $nextStepPos = $contextDto->getWorkflowCurrentPosition() + 1; + return !empty($this->workflowConfig->getStepList()[$nextStepPos]); } diff --git a/Library/Workflow/ManagerWorker.php b/Library/Workflow/ManagerWorker.php index e17f179..77fccf0 100644 --- a/Library/Workflow/ManagerWorker.php +++ b/Library/Workflow/ManagerWorker.php @@ -141,12 +141,14 @@ protected function processMessage(Message\MessageDto $messageDto) break; case ManagerService::STATUS_SUCCESS: // go to next step if there is a next step and while the current step has no job - while ($hasNext = $this->workflowManagerService->hasNextStep($messageDto->getId()) && + while (($hasNext = $this->workflowManagerService->hasNextStep($messageDto->getId())) && false == $this->runNextStep($messageDto->getId()) ) { $this->getDI()->get('logr')->warning("Current step skipped"); }; + + if (!$hasNext) { $this->workflowManagerService->finalize( $messageDto->getId(), diff --git a/bin/configCommand.php b/bin/configCommand.php new file mode 100644 index 0000000..90279f2 --- /dev/null +++ b/bin/configCommand.php @@ -0,0 +1,22 @@ +registerNamespaces( +[ +'Vpg\Disturb' => realpath(__DIR__ . '/../Library/') +], +true +); + +$loader->registerFiles([ +__DIR__ . '/../../vendor/autoload.php', +__DIR__ . '/../../../autoload.php' +]); +$loader->register(); + +require_once(__DIR__ . '/../Library/Core/DI.php'); + +$di->setShared('loader', $loader);