diff --git a/.dockerignore b/.dockerignore index 7497d22..fc54bca 100644 --- a/.dockerignore +++ b/.dockerignore @@ -54,9 +54,3 @@ **/__archive__* **/__pycache__* /build - -# ---------------------------------------------------------------- -# FORCE RETAIN -# ---------------------------------------------------------------- - -!**/.gitkeep diff --git a/README.md b/README.md index fca58d1..007dc33 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ # Example Rabbit MQ # -This repository provides an example implementation of a tool with a single feature `SEARCH-FILESYSTEM`, +This repository provides an example implementation of a tool with a single feature `SEARCH-FS`, which upon performs the following: - given a request payload; @@ -29,6 +29,10 @@ which upon performs the following: - [docker + CLI tools](https://docs.docker.com/engine/install) +- (optional) [Postman](https://www.postman.com). + Cf. the [wiki](https://github.com/raj-open/example-rabbit-mq/wiki/Postman-Setup) + for an prepared environment + collection. + NOTE: We primarily use Docker for local testing, in particular to spin up a Rabbit MQ server. ## Basic setup ## @@ -192,6 +196,7 @@ Fill in `setup/requests.yaml` as follows: label: 'Mock example' options: + reset-queue: true # default is false - whether to clear (sub)queue for task at start of run # skip-empty: true # false (default) => includes empty files; true => skips them max-depth: 100 # limits depth of folder structure max-items: 1_000_000 # limits number of items that can be logged diff --git a/demo/README.md b/demo/README.md index 80a666a..ac356fe 100644 --- a/demo/README.md +++ b/demo/README.md @@ -25,14 +25,44 @@ For ease of use one can also run the demos as follows: where `{name}` is the name of the subfolder, e.g. `"example-case-1"`. -> [!TIP] -> Call -> -> ```bash -> just demo "example-all" -> ``` -> -> to run all the examples. + > [!TIP] + > Call one of + > + > ```bash + > just demo "example-all" + > just demos # equivalent command + > ``` + > + > to run all the examples. + +## Alternative via FastApi ## + +In step 2 one can alternatively start the FastApi server +(via `just start-server` or `just docker-start-server`) +and send a POST-request to the endpoing `/feature/search-fs` +with JSON-body e.g. + +```json +{ + "ref": { + "location": "OS", + "path": "demo/example-case-1/requests.yaml" + } +} +``` + +to run `SEARCH-FS` against `example-case-1` or + +```json +{ + "ref": { + "location": "OS", + "path": "demo/example-all/requests.yaml" + } +} +``` + +to run `SEARCH-FS` against all cases. ## Results ## diff --git a/demo/example-case-1/requests.yaml b/demo/example-case-1/requests.yaml index 574de72..c490bbd 100644 --- a/demo/example-case-1/requests.yaml +++ b/demo/example-case-1/requests.yaml @@ -1,6 +1,7 @@ label: 'Demo Case 1' options: + reset-queue: true skip-empty: true max-depth: 100 max-items: 10_000_000 diff --git a/demo/example-case-2/requests.yaml b/demo/example-case-2/requests.yaml index c0b4231..529298c 100644 --- a/demo/example-case-2/requests.yaml +++ b/demo/example-case-2/requests.yaml @@ -1,6 +1,7 @@ label: 'Demo Case 2' options: + reset-queue: true skip-empty: false max-depth: 100 max-items: 10_000_000 diff --git a/demo/example-case-3/requests.yaml b/demo/example-case-3/requests.yaml index f4ca6bc..283f68e 100644 --- a/demo/example-case-3/requests.yaml +++ b/demo/example-case-3/requests.yaml @@ -1,6 +1,7 @@ label: 'Demo Case 3' options: + reset-queue: true skip-empty: true max-depth: 100 max-items: 10_000_000 diff --git a/demo/example-case-4/requests.yaml b/demo/example-case-4/requests.yaml index 4cdaccc..7595ebe 100644 --- a/demo/example-case-4/requests.yaml +++ b/demo/example-case-4/requests.yaml @@ -1,6 +1,7 @@ label: 'Demo Case 4' options: + reset-queue: true skip-empty: false max-depth: 100 max-items: 10_000_000 diff --git a/demo/example-empty/requests.yaml b/demo/example-empty/requests.yaml index 75de3f9..e8b027d 100644 --- a/demo/example-empty/requests.yaml +++ b/demo/example-empty/requests.yaml @@ -1,6 +1,7 @@ label: 'Demo Empty' options: + reset-queue: true skip-empty: true max-depth: 100 max-items: 10_000_000 diff --git a/demo/example-flat/requests.yaml b/demo/example-flat/requests.yaml index 9aa5b9a..892661f 100644 --- a/demo/example-flat/requests.yaml +++ b/demo/example-flat/requests.yaml @@ -1,6 +1,7 @@ label: 'Demo Flat' options: + reset-queue: true skip-empty: true max-depth: 100 max-items: 10_000_000 diff --git a/docker-compose.yaml b/docker-compose.yaml index 3319762..9789af2 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -17,7 +17,7 @@ secrets: services: # -------------------------------- - # SERVICE: builds container + # SERVICES: main code base # -------------------------------- base: @@ -44,10 +44,6 @@ services: stdin_open: true user: root - # -------------------------------- - # SERVICE: builds container - # -------------------------------- - build: image: local/examplerabbitmq:build @@ -70,10 +66,6 @@ services: echo "success" ' - # -------------------------------- - # SERVICE: runs qa steps - # -------------------------------- - qa: image: local/examplerabbitmq:build # <- i.e. use this image env_file: @@ -93,7 +85,7 @@ services: ' # -------------------------------- - # SERVICE: starts server + # SERVICES: server # -------------------------------- server: @@ -111,16 +103,18 @@ services: - type: bind source: ${PATH_LOGS} target: //home/basicuser/app/logs - - ./setup://home/basicuser/app/setup:ro + - ./demo://home/basicuser/app/demo:ro - ./data://home/basicuser/app/data:rw + - ./setup://home/basicuser/app/setup:ro # # for debugging # volumes: # - type: bind # source: ${PATH_LOGS} # target: //home/basicuser/app/logs - # - ./setup://home/basicuser/app/setup:ro + # - ./demo://home/basicuser/app/demo:ro # - ./data://home/basicuser/app/data:rw + # - ./setup://home/basicuser/app/setup:ro # - ./scripts://home/basicuser/app/scripts:ro # - ./src://home/basicuser/app/src:ro # - ./tests://home/basicuser/app/tests:ro @@ -160,7 +154,7 @@ services: ' # -------------------------------- - # SERVICE: queue + # SERVICES: queue # -------------------------------- queue-build: diff --git a/docs/models/application/Models/RequestTaskOptions.md b/docs/models/application/Models/RequestTaskOptions.md index e1c492d..b279119 100644 --- a/docs/models/application/Models/RequestTaskOptions.md +++ b/docs/models/application/Models/RequestTaskOptions.md @@ -3,6 +3,7 @@ | Name | Type | Description | Notes | |------------ | ------------- | ------------- | -------------| +| **reset-queue** | **Boolean** | Whether to clear queue before execution | [optional] [default to false] | | **skip-empty** | **Boolean** | Whether to only include non-empty files | [optional] [default to false] | | **max-depth** | **Integer** | Limits the search depth | [optional] [default to 50] | | **max-items** | **Integer** | Limits the amount of items that can be found | [optional] [default to 1000000] | diff --git a/justfile b/justfile index 89c96ec..cdf9013 100644 --- a/justfile +++ b/justfile @@ -300,6 +300,9 @@ create-mocks *args: demo name: @just run SEARCH-FS --requests "demo/{{name}}/requests.yaml" +demos: + @just demo "example-all" + # -------------------------------- # TARGETS: terminate execution # -------------------------------- diff --git a/models/schema-application.yaml b/models/schema-application.yaml index 3f22174..4c9bc0c 100644 --- a/models/schema-application.yaml +++ b/models/schema-application.yaml @@ -151,6 +151,11 @@ components: - max-duration additionalProperties: true properties: + reset-queue: + description: |- + Whether to clear queue before execution + type: boolean + default: false skip-empty: description: |- Whether to only include non-empty files diff --git a/src/features/feat_searchfs/feature.py b/src/features/feat_searchfs/feature.py index ccffcfe..3aeeb89 100644 --- a/src/features/feat_searchfs/feature.py +++ b/src/features/feat_searchfs/feature.py @@ -6,22 +6,18 @@ # ---------------------------------------------------------------- import json -import logging from datetime import datetime from datetime import timedelta from functools import partial -from pika import BasicProperties -from safetywrap import Err -from safetywrap import Ok -from safetywrap import Result +from pika.adapters.blocking_connection import BlockingChannel from ..._core.logging import * +from ..._core.utils.time import * from ...algorithms.filesmanager import * from ...models.apis.queue import * from ...models.application import * from ...models.filesmanager import * -from ...models.internal.errors import * from ...setup import * # ---------------------------------------------------------------- @@ -43,19 +39,27 @@ depth=0, ) def feature( + chan: BlockingChannel, + /, *, label: str, ref: FileRef, options: RequestTaskOptions, -) -> Result[str, str]: + msg_exchange: str, + msg_route: str, +): """ Feature `SEARCH-FS` """ - feat = EnumFeatures.SEARCH_FS # NOTE: currently unused # cfg_general = config.parser_config().parse() managers = config.get_managers() + # locate directory in file system + root = ref.path + loc = ref.location + manager = managers[loc] + # create guard to safeguard against computational limits guard = partial( guard_limits, @@ -65,68 +69,35 @@ def feature( t_max=datetime.now() + options.max_duration, ) - try: - """ - connect to message queue and perform task - """ - # FIXME: publication to exchages fails - # msg_exchange = feat.value - msg_exchange = "" - msg_route = label - msg_properties = BasicProperties(type="info") - - settings = config.get_queue_parameters() - with ChannelContext(settings=settings) as chan: - # FIXME: publication to exchages fails - # chan.exchange_declare(exchange=msg_exchange, exchange_type="direct") - chan.queue_declare(queue=msg_route) - - # locate directory in file system - root = ref.path - loc = ref.location - manager = managers[loc] - - """ - run search algorithm and apply guards to prevent unlimited search duration - """ - for count, (d, subpath, filename) in enumerate( - # NOTE: algorithm returns a generator - recursive_file_search( - manager, - path=root, - skip_empty=options.skip_empty, - ), - # keep track of number of items found - start=1, - ): - # apply guard - guard(d=d, count=count) - - # if not blocked by guard log to queue - body = {"path": subpath, "filename": filename} - contents = json.dumps(body).encode() - chan.basic_publish( - exchange=msg_exchange, - routing_key=msg_route, - body=contents, - properties=msg_properties, - ) - - return Ok("success") - - except ExceptionWithData as err: - msg = f"task '{label}' failed with error code {err.code or 500} - {err}" # fmt: skip - logging.error(msg) - return Err(msg) - - except Exception as err: - msg = f"task '{label}' failed - {err}" # fmt: skip - logging.error(msg) - return Err(msg) - - except BaseException as err: - # DEV-NOTE: pass on all other kinds of exceptions - raise err + # run search algorithm and apply guards to prevent unlimited search duration + for count, (d, subpath, filename) in enumerate( + # NOTE: algorithm returns a generator + recursive_file_search( + manager, + path=root, + skip_empty=options.skip_empty, + ), + # keep track of number of items found + start=1, + ): + # apply guard + guard(d=d, count=count) + + # if not blocked by guard log to queue + body = { + "timestamp": get_datetime_stamp(), + "path": subpath, + "filename": filename, + } + contents = json.dumps(body).encode() + chan.basic_publish( + exchange=msg_exchange, + routing_key=msg_route, + body=contents, + properties=RABBIG_LOG_LEVEL_INFO, + ) + + return # ---------------------------------------------------------------- @@ -147,7 +118,7 @@ def guard_limits( Applies guard clauses to terminate search algorithm if limits are breached """ # terminate if search takes too long - if datetime.now() >= t_max: + if datetime.now() > t_max: raise TimeoutError(f"search algorithm terminated - exceeded maximum tolerated duration of {max_duration}") # fmt: skip # terminate if depth exceeds limits @@ -156,4 +127,4 @@ def guard_limits( # terminate if number of items exceeds limits if count > max_items: - raise Exception(f"search algorithm terminated - item count exceeededs maximum tolerated value of {max_items}") # fmt: skip + raise Exception(f"search algorithm terminated - item count exceeeded maximum tolerated value of {max_items}") # fmt: skip diff --git a/src/features/feat_searchfs/superfeature.py b/src/features/feat_searchfs/superfeature.py index b55470b..2fb00fe 100644 --- a/src/features/feat_searchfs/superfeature.py +++ b/src/features/feat_searchfs/superfeature.py @@ -11,8 +11,12 @@ from safetywrap import Ok from safetywrap import Result -from ..._core.utils.code import * +from ..._core.utils.time import * +from ...models.apis.queue import * from ...models.application import * +from ...models.datasources import * +from ...models.internal.errors import * +from ...setup import * from .feature import * # ---------------------------------------------------------------- @@ -35,18 +39,91 @@ def superfeature( """ Calls `SEARCH-FS` features for a list of tasks """ - errors = list[str]() + feat = EnumFeatures.SEARCH_FS + # NOTE: currently unused + # cfg_general = config.parser_config().parse() + errors = list[JSON_TYPE]() + msg_exchange = "" n_tot = len(tasks) + settings = config.get_queue_parameters() - for task in tasks: - result = feature( - label=task.label, - ref=task.data.inputs, - options=task.options, - ) + """ + Establish connection to message queue + """ - if isinstance(result, Err): - errors.append(result.unwrap_err()) + with ChannelContext(settings=settings) as chan: + # FIXME: publication to exchages fails when msg_exchange is not "" + # chan.exchange_declare(exchange=msg_exchange, exchange_type="direct") + + # perform each task an log each case to different route + for task in tasks: + msg_route = f"[{feat.value}].[{task.label}]" + + # ensure case has its own route and that it is cleared + chan.queue_declare(queue=msg_route) + if task.options.reset_queue: + chan.queue_purge(queue=msg_route) + + """ + Run feature with error handling + """ + + try: + feature( + chan, + label=task.label, + ref=task.data.inputs, + options=task.options, + msg_exchange=msg_exchange, + msg_route=msg_route, + ) + + except ExceptionWithData as err: + msg = str(err) + logging.error(msg) + err.add_data("label", task.label) + body = { + "timestamp": get_datetime_stamp(), + "message": str(err), + "code": err.code or 500, + "data": err.data, + } + errors.append(body) + contents = serialise_any_element(body) + chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip + + except Exception as err: + msg = str(err) + logging.error(msg) + body = { + "timestamp": get_datetime_stamp(), + "message": msg, + "data": { + "label": task.label, + }, + } + errors.append(body) + contents = serialise_any_element(body) + chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip + + except BaseException as err: + # DEV-NOTE: pass on all other kinds of exceptions + msg = f"task terminated - {err}" + body = { + "timestamp": get_datetime_stamp(), + "message": msg, + "data": { + "label": task.label, + }, + } + errors.append(body) + contents = serialise_any_element(body) + chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip + raise err + + """ + Finally error handling + """ if (n := len(errors)) > 0: match n, n_tot: @@ -62,4 +139,8 @@ def superfeature( return Err(errors) + """ + No errors - all tasks successful + """ + return Ok("success") diff --git a/src/models/apis/queue/__init__.py b/src/models/apis/queue/__init__.py index 3ef62d8..57f17f5 100644 --- a/src/models/apis/queue/__init__.py +++ b/src/models/apis/queue/__init__.py @@ -6,11 +6,15 @@ # ---------------------------------------------------------------- from .channels import * +from .logging import * # ---------------------------------------------------------------- # EXPORTS # ---------------------------------------------------------------- __all__ = [ + "RABBIG_LOG_LEVEL_ERROR", + "RABBIG_LOG_LEVEL_INFO", + "RABBIG_LOG_LEVEL_WARNING", "ChannelContext", ] diff --git a/src/models/apis/queue/logging.py b/src/models/apis/queue/logging.py new file mode 100644 index 0000000..cc2afdf --- /dev/null +++ b/src/models/apis/queue/logging.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# ---------------------------------------------------------------- +# IMPORTS +# ---------------------------------------------------------------- + +from pika import BasicProperties + +# ---------------------------------------------------------------- +# EXPORTS +# ---------------------------------------------------------------- + +__all__ = [ + "RABBIG_LOG_LEVEL_ERROR", + "RABBIG_LOG_LEVEL_INFO", + "RABBIG_LOG_LEVEL_WARNING", +] + +# ---------------------------------------------------------------- +# CONSTANTS +# ---------------------------------------------------------------- + +RABBIG_LOG_LEVEL_INFO = BasicProperties(type="info", priority=1) +RABBIG_LOG_LEVEL_WARNING = BasicProperties(type="warning", priority=10) +RABBIG_LOG_LEVEL_ERROR = BasicProperties(type="error", priority=100) diff --git a/src/models/datasources/__init__.py b/src/models/datasources/__init__.py index 978ab5c..c2e5368 100644 --- a/src/models/datasources/__init__.py +++ b/src/models/datasources/__init__.py @@ -16,4 +16,5 @@ "AnyDataFrame", "AnyDictionary", "AnyEntity", + "serialise_any_element", ] diff --git a/src/models/datasources/any.py b/src/models/datasources/any.py index a9591ba..1838c12 100644 --- a/src/models/datasources/any.py +++ b/src/models/datasources/any.py @@ -5,6 +5,7 @@ # IMPORTS # ---------------------------------------------------------------- +import json from typing import Any from pydantic import BaseModel @@ -20,10 +21,11 @@ "AnyDataFrame", "AnyDictionary", "AnyEntity", + "serialise_any_element", ] # ---------------------------------------------------------------- -# EXPORTS +# CLASSES # ---------------------------------------------------------------- @@ -73,3 +75,40 @@ class AnyDataFrame(RootModel[list[AnyDictionary]]): use_enum_values=True, ) root: list[AnyDictionary] + + +# ---------------------------------------------------------------- +# METHODS +# ---------------------------------------------------------------- + + +def serialise_any_element(x: Any, /) -> bytes: + """ + Uses pydantic classes to as cleanly as possible JSON-serialise any element. + """ + match x: + case list(): + x = AnyArray(root=x) + + case dict(): + x = AnyDictionary(root=x) + + if isinstance(x, BaseModel): + contents = x.model_dump_json( + by_alias=True, + exclude_none=True, + exclude_unset=False, + exclude_defaults=False, + warnings="none", + ).encode() + return contents + + try: + contents = json.dumps(x).encode() + return contents + + except Exception as _: + pass + + contents = str(x).encode() + return contents diff --git a/src/models/generated/application.py b/src/models/generated/application.py index fcc46d8..85e6884 100644 --- a/src/models/generated/application.py +++ b/src/models/generated/application.py @@ -60,6 +60,11 @@ class RequestTaskOptions(BaseModel): extra="allow", populate_by_name=True, ) + reset_queue: bool = Field( + default=False, + alias="reset-queue", + description="Whether to clear queue before execution", + ) skip_empty: bool = Field( default=False, alias="skip-empty", diff --git a/src/models/internal/errors.py b/src/models/internal/errors.py index 8fe71c2..3a10b40 100644 --- a/src/models/internal/errors.py +++ b/src/models/internal/errors.py @@ -17,6 +17,8 @@ # ---------------------------------------------------------------- __all__ = [ + "JSON_TYPE", + "NOTES", "ExceptionWithData", "convert_notes_to_exception", ] diff --git a/templates/template-requests-multiple.yaml b/templates/template-requests-multiple.yaml index c3ce784..1ac00a7 100644 --- a/templates/template-requests-multiple.yaml +++ b/templates/template-requests-multiple.yaml @@ -4,7 +4,8 @@ - label: 'First task' options: &ref_options - # skip-empty: true # default is false + reset-queue: true # default is false - whether to clear (sub)queue for task at start of run + # skip-empty: true # false (default) => includes empty files; true => skips them max-depth: 100 max-items: 10_000_000 max-duration: 00:30:00 diff --git a/templates/template-requests.yaml b/templates/template-requests.yaml index 859cb7e..64bf59a 100644 --- a/templates/template-requests.yaml +++ b/templates/template-requests.yaml @@ -3,7 +3,8 @@ label: 'Some label' # Special options and safeguards options: - # skip-empty: true # default is false + reset-queue: true # default is false - whether to clear (sub)queue for task at start of run + # skip-empty: true # false (default) => includes empty files; true => skips them max-depth: 100 max-items: 10_000_000 max-duration: 00:30:00