diff --git a/docker-compose.yaml b/docker-compose.yaml index 9789af2..e4e76ec 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -223,16 +223,16 @@ services: command: |- bash --login -c ' source /run/secrets/credentials - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} list_users + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} list_users # create admin - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_ADMIN_USER_RABBIT} 2> /dev/null - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_ADMIN_USER_RABBIT} $${HTTP_ADMIN_PASSWORD_RABBIT} - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_ADMIN_USER_RABBIT} ".*" ".*" ".*" - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} set_user_tags $${HTTP_ADMIN_USER_RABBIT} administrator + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_ADMIN_USER_RABBIT} 2> /dev/null + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_ADMIN_USER_RABBIT} $${HTTP_ADMIN_PASSWORD_RABBIT} + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_ADMIN_USER_RABBIT} ".*" ".*" ".*" + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} set_user_tags $${HTTP_ADMIN_USER_RABBIT} administrator # create guest - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_GUEST_USER_RABBIT} 2> /dev/null - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_GUEST_USER_RABBIT} $${HTTP_GUEST_PASSWORD_RABBIT} - rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_GUEST_USER_RABBIT} ".*" ".*" ".*" + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_GUEST_USER_RABBIT} 2> /dev/null + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_GUEST_USER_RABBIT} $${HTTP_GUEST_PASSWORD_RABBIT} + rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_GUEST_USER_RABBIT} ".*" ".*" ".*" ' diff --git a/src/models/datasources/any.py b/src/_core/utils/any.py similarity index 62% rename from src/models/datasources/any.py rename to src/_core/utils/any.py index 1838c12..2ca13ae 100644 --- a/src/models/datasources/any.py +++ b/src/_core/utils/any.py @@ -5,7 +5,6 @@ # IMPORTS # ---------------------------------------------------------------- -import json from typing import Any from pydantic import BaseModel @@ -21,7 +20,6 @@ "AnyDataFrame", "AnyDictionary", "AnyEntity", - "serialise_any_element", ] # ---------------------------------------------------------------- @@ -36,6 +34,7 @@ class AnyEntity(BaseModel): model_config = ConfigDict( use_enum_values=True, + arbitrary_types_allowed=True, ) value: Any @@ -50,6 +49,7 @@ class AnyDictionary(BaseModel): extra="allow", populate_by_name=True, use_enum_values=True, + arbitrary_types_allowed=True, ) @@ -60,6 +60,7 @@ class AnyArray(RootModel[list[BaseModel]]): model_config = ConfigDict( use_enum_values=True, + arbitrary_types_allowed=True, ) root: list[Any] @@ -73,42 +74,6 @@ class AnyDataFrame(RootModel[list[AnyDictionary]]): model_config = ConfigDict( populate_by_name=True, use_enum_values=True, + arbitrary_types_allowed=True, ) root: list[AnyDictionary] - - -# ---------------------------------------------------------------- -# METHODS -# ---------------------------------------------------------------- - - -def serialise_any_element(x: Any, /) -> bytes: - """ - Uses pydantic classes to as cleanly as possible JSON-serialise any element. - """ - match x: - case list(): - x = AnyArray(root=x) - - case dict(): - x = AnyDictionary(root=x) - - if isinstance(x, BaseModel): - contents = x.model_dump_json( - by_alias=True, - exclude_none=True, - exclude_unset=False, - exclude_defaults=False, - warnings="none", - ).encode() - return contents - - try: - contents = json.dumps(x).encode() - return contents - - except Exception as _: - pass - - contents = str(x).encode() - return contents diff --git a/src/_core/utils/serialise.py b/src/_core/utils/serialise.py new file mode 100644 index 0000000..9541556 --- /dev/null +++ b/src/_core/utils/serialise.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# ---------------------------------------------------------------- +# IMPORTS +# ---------------------------------------------------------------- + +import json +from datetime import datetime +from typing import Any + +from pydantic import BaseModel +from safetywrap import Err +from safetywrap import Ok +from safetywrap import Result + +from .any import * + +# ---------------------------------------------------------------- +# EXPORTS +# ---------------------------------------------------------------- + +__all__ = [ + "JSON_TYPE", + "JSON_TYPE_BASIC", + "serialise_any_as_object", + "serialise_any_as_text", +] + +# ---------------------------------------------------------------- +# TYPES +# ---------------------------------------------------------------- + +# NOTE: although datetime is not a primitive JSON type it can be used in code +JSON_TYPE_BASIC = None | bool | str | int | float | datetime +JSON_TYPE = JSON_TYPE_BASIC | list["JSON_TYPE"] | dict[str, "JSON_TYPE"] + +# ---------------------------------------------------------------- +# METHODS +# ---------------------------------------------------------------- + + +def serialise_any_as_object(x: Any, /) -> Result[JSON_TYPE, None]: + """ + Converts any element to a JSON-serialisable object. + + NOTE: uses safety wrapping + """ + # preprocess bytes + all complex JSON-types + match x: + case bytes(): + obj = x.decode() + + case list(): + obj = AnyArray(root=x) + + case dict(): + obj = AnyDictionary(root=x) + + case _: + obj = x + + match obj: + # cover base JSON-types + datetime + case None | bool() | str() | int() | float() | datetime(): + return Ok(obj) + + # cover all complex JSON-types - use pydantic's serialisation + case BaseModel(): + obj = obj.model_dump( + mode="json", + by_alias=True, + exclude_none=True, + exclude_unset=False, + exclude_defaults=False, + warnings="none", + ) + + # NOTE: collapse "root" if result is {"root": ...} + if isinstance(obj, dict) and list(obj.keys()) == ["root"]: + obj = obj.get("root", obj) + + return Ok(obj) + + raise Err(None) + + +def serialise_any_as_text(x: Any, /) -> Result[str, None]: + """ + Serialises any element to a JSON text. + + NOTE: uses safety wrapping + """ + obj = serialise_any_as_object(x) + if isinstance(obj, Err): + return Err(None) + + obj = obj.unwrap() + + # attempt JSON-serialisation + try: + text = json.dumps( + obj, + skipkeys=False, + ensure_ascii=False, + allow_nan=True, + sort_keys=False, + ) + return Ok(text) + + except Exception as _: + pass + + # otherwise attempt to convert original object to string + try: + text = str(x) + return Ok(text) + + except Exception as _: + pass + + # otherwise fail + return Err(None) diff --git a/src/app/endpoints_fastapi/decorators.py b/src/app/endpoints_fastapi/decorators.py index 58edb5a..e67aa77 100644 --- a/src/app/endpoints_fastapi/decorators.py +++ b/src/app/endpoints_fastapi/decorators.py @@ -6,7 +6,6 @@ # ---------------------------------------------------------------- import logging -from datetime import datetime from functools import wraps from typing import Any from typing import Awaitable @@ -25,8 +24,9 @@ from ..._core.constants import * from ..._core.logging import * +from ..._core.utils.any import * +from ..._core.utils.serialise import * from ...guards.http import * -from ...models.datasources import * from ...models.filesmanager import * from ...models.internal import * from ...setup import * @@ -136,37 +136,8 @@ async def wrapped_action( case Ok(): result = result.unwrap() - # pre-handle jsonisable objects - match result: - case list(): - result = AnyArray(root=result) - - case dict(): - result = AnyDictionary(root=result) - # serialise result - match result: - case None: - contents = None - - case BaseModel(): - contents = result.model_dump( - mode="json", - by_alias=True, - exclude_none=True, - exclude_unset=False, - exclude_defaults=False, - warnings="none", - ) - - case bool() | str() | int() | float() | datetime(): - contents = result - - case bytes(): - contents = result.decode() - - case None: - contents = None + contents = serialise_any_as_object(result).unwrap_or(None) # fmt: skip # prepare response response = JSONResponse(contents, status_code=code) diff --git a/src/features/feat_searchfs/feature.py b/src/features/feat_searchfs/feature.py index 3aeeb89..550c051 100644 --- a/src/features/feat_searchfs/feature.py +++ b/src/features/feat_searchfs/feature.py @@ -5,7 +5,6 @@ # IMPORTS # ---------------------------------------------------------------- -import json from datetime import datetime from datetime import timedelta from functools import partial @@ -13,6 +12,7 @@ from pika.adapters.blocking_connection import BlockingChannel from ..._core.logging import * +from ..._core.utils.serialise import * from ..._core.utils.time import * from ...algorithms.filesmanager import * from ...models.apis.queue import * @@ -89,12 +89,12 @@ def feature( "path": subpath, "filename": filename, } - contents = json.dumps(body).encode() + contents = serialise_any_as_text(body).unwrap_or("") chan.basic_publish( exchange=msg_exchange, routing_key=msg_route, body=contents, - properties=RABBIG_LOG_LEVEL_INFO, + properties=RABBIT_LOG_LEVEL_INFO, ) return diff --git a/src/features/feat_searchfs/superfeature.py b/src/features/feat_searchfs/superfeature.py index 2fb00fe..0fd9518 100644 --- a/src/features/feat_searchfs/superfeature.py +++ b/src/features/feat_searchfs/superfeature.py @@ -11,10 +11,10 @@ from safetywrap import Ok from safetywrap import Result +from ..._core.utils.serialise import * from ..._core.utils.time import * from ...models.apis.queue import * from ...models.application import * -from ...models.datasources import * from ...models.internal.errors import * from ...setup import * from .feature import * @@ -35,7 +35,7 @@ def superfeature( tasks: list[RequestTask], /, -) -> Result[str, list[str]]: +) -> Result[str, list[JSON_TYPE]]: """ Calls `SEARCH-FS` features for a list of tasks """ @@ -51,7 +51,7 @@ def superfeature( Establish connection to message queue """ - with ChannelContext(settings=settings) as chan: + with ChannelContext(settings) as chan: # FIXME: publication to exchages fails when msg_exchange is not "" # chan.exchange_declare(exchange=msg_exchange, exchange_type="direct") @@ -60,7 +60,7 @@ def superfeature( msg_route = f"[{feat.value}].[{task.label}]" # ensure case has its own route and that it is cleared - chan.queue_declare(queue=msg_route) + chan.queue_declare(queue=msg_route, durable=False, exclusive=False) if task.options.reset_queue: chan.queue_purge(queue=msg_route) @@ -89,8 +89,8 @@ def superfeature( "data": err.data, } errors.append(body) - contents = serialise_any_element(body) - chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip + contents = serialise_any_as_text(body).unwrap_or("") + chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIT_LOG_LEVEL_ERROR) # fmt: skip except Exception as err: msg = str(err) @@ -103,8 +103,8 @@ def superfeature( }, } errors.append(body) - contents = serialise_any_element(body) - chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip + contents = serialise_any_as_text(body).unwrap_or("") + chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIT_LOG_LEVEL_ERROR) # fmt: skip except BaseException as err: # DEV-NOTE: pass on all other kinds of exceptions @@ -117,8 +117,8 @@ def superfeature( }, } errors.append(body) - contents = serialise_any_element(body) - chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip + contents = serialise_any_as_text(body).unwrap_or("") + chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIT_LOG_LEVEL_ERROR) # fmt: skip raise err """ diff --git a/src/models/apis/queue/__init__.py b/src/models/apis/queue/__init__.py index 57f17f5..79bc888 100644 --- a/src/models/apis/queue/__init__.py +++ b/src/models/apis/queue/__init__.py @@ -13,8 +13,11 @@ # ---------------------------------------------------------------- __all__ = [ - "RABBIG_LOG_LEVEL_ERROR", - "RABBIG_LOG_LEVEL_INFO", - "RABBIG_LOG_LEVEL_WARNING", + "RABBIT_LOG_LEVEL_ERROR", + "RABBIT_LOG_LEVEL_INFO", + "RABBIT_LOG_LEVEL_WARNING", + "RABBIT_ROUTE_ERROR", + "RABBIT_ROUTE_INFO", + "RABBIT_ROUTE_WARNING", "ChannelContext", ] diff --git a/src/models/apis/queue/channels.py b/src/models/apis/queue/channels.py index 8960964..63efeae 100644 --- a/src/models/apis/queue/channels.py +++ b/src/models/apis/queue/channels.py @@ -6,14 +6,12 @@ # ---------------------------------------------------------------- import logging +from contextlib import contextmanager +from typing import Generator from pika import BlockingConnection from pika import ConnectionParameters from pika.adapters.blocking_connection import BlockingChannel -from pydantic import BaseModel -from pydantic import ConfigDict -from pydantic import Field -from pydantic import SkipValidation # ---------------------------------------------------------------- # EXPORTS @@ -28,38 +26,17 @@ # ---------------------------------------------------------------- -class ChannelStruct(BaseModel): - """ - A struct which contains settings + dynamically determined properties of channel - """ - - model_config = ConfigDict( - populate_by_name=True, - arbitrary_types_allowed=True, - ) - - settings: SkipValidation[ConnectionParameters] - connection: SkipValidation[BlockingConnection] | None = Field(default=None, init=False) - channel: SkipValidation[BlockingChannel] | None = Field(default=None, init=False) - - -class ChannelContext(ChannelStruct): +@contextmanager +def ChannelContext(settings: ConnectionParameters, /) -> Generator[BlockingChannel]: """ Provides a Channel as a context manager """ - - def __enter__(self) -> BlockingChannel: - self.connection = BlockingConnection(self.settings) - self.channel = self.connection.channel() - return self.channel - - def __exit__(self, *_, **__): - logging.info("gracefully terminating channel") - - if isinstance(self.channel, BlockingChannel): - self.channel.close() - - if isinstance(self.connection, BlockingConnection): - self.connection.close() - - return + with BlockingConnection(settings) as connection: + try: + chan = connection.channel() + yield chan + + finally: + # DEV-NOTE: this is carried out regardless of (base)exceptions - which are rethrown + logging.info("gracefully terminating channel") + chan.close() diff --git a/src/models/apis/queue/logging.py b/src/models/apis/queue/logging.py index cc2afdf..a4efe91 100644 --- a/src/models/apis/queue/logging.py +++ b/src/models/apis/queue/logging.py @@ -12,15 +12,21 @@ # ---------------------------------------------------------------- __all__ = [ - "RABBIG_LOG_LEVEL_ERROR", - "RABBIG_LOG_LEVEL_INFO", - "RABBIG_LOG_LEVEL_WARNING", + "RABBIT_LOG_LEVEL_ERROR", + "RABBIT_LOG_LEVEL_INFO", + "RABBIT_LOG_LEVEL_WARNING", + "RABBIT_ROUTE_ERROR", + "RABBIT_ROUTE_INFO", + "RABBIT_ROUTE_WARNING", ] # ---------------------------------------------------------------- # CONSTANTS # ---------------------------------------------------------------- -RABBIG_LOG_LEVEL_INFO = BasicProperties(type="info", priority=1) -RABBIG_LOG_LEVEL_WARNING = BasicProperties(type="warning", priority=10) -RABBIG_LOG_LEVEL_ERROR = BasicProperties(type="error", priority=100) +RABBIT_ROUTE_INFO = "[INFO]" +RABBIT_ROUTE_ERROR = "[ERROR]" +RABBIT_ROUTE_WARNING = "[WARNING]" +RABBIT_LOG_LEVEL_INFO = BasicProperties(type="info", priority=1) +RABBIT_LOG_LEVEL_WARNING = BasicProperties(type="warning", priority=10) +RABBIT_LOG_LEVEL_ERROR = BasicProperties(type="error", priority=100) diff --git a/src/models/datasources/__init__.py b/src/models/datasources/__init__.py deleted file mode 100644 index c2e5368..0000000 --- a/src/models/datasources/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -# ---------------------------------------------------------------- -# IMPORTS -# ---------------------------------------------------------------- - -from .any import * - -# ---------------------------------------------------------------- -# EXPORTS -# ---------------------------------------------------------------- - -__all__ = [ - "AnyArray", - "AnyDataFrame", - "AnyDictionary", - "AnyEntity", - "serialise_any_element", -] diff --git a/src/models/internal/errors.py b/src/models/internal/errors.py index 3a10b40..1270229 100644 --- a/src/models/internal/errors.py +++ b/src/models/internal/errors.py @@ -10,14 +10,14 @@ from typing import Generic from typing import ParamSpec from typing import TypeVar -from typing import Union + +from ..._core.utils.serialise import * # ---------------------------------------------------------------- # EXPORTS # ---------------------------------------------------------------- __all__ = [ - "JSON_TYPE", "NOTES", "ExceptionWithData", "convert_notes_to_exception", @@ -28,8 +28,6 @@ # ---------------------------------------------------------------- PARAMS = ParamSpec("PARAMS") -JSON_TYPE_BASIC = Union[None, bool, str, int, float] -JSON_TYPE = Union[JSON_TYPE_BASIC, list[JSON_TYPE_BASIC], dict[str, JSON_TYPE_BASIC]] NOTES = TypeVar("NOTES", bound=JSON_TYPE) # ---------------------------------------------------------------- diff --git a/src/queries/_console/api.py b/src/queries/_console/api.py index e6bcd2a..7a1ef1c 100644 --- a/src/queries/_console/api.py +++ b/src/queries/_console/api.py @@ -47,7 +47,6 @@ def create_parser(self) -> ArgumentParser: nargs="?", type=str, help="path to files for logging", - default="logs", ) parser.add_argument( "--verbose", diff --git a/src/queries/_console/cli.py b/src/queries/_console/cli.py index 526441e..929ce0e 100644 --- a/src/queries/_console/cli.py +++ b/src/queries/_console/cli.py @@ -67,7 +67,6 @@ def create_parser(self) -> ArgumentParser: nargs="?", type=str, help="path to files for logging", - default="logs", ) parser.add_argument( "--verbose", diff --git a/src/queries/environment/mode.py b/src/queries/environment/mode.py index 057d713..0a85542 100644 --- a/src/queries/environment/mode.py +++ b/src/queries/environment/mode.py @@ -26,7 +26,10 @@ def get_path_logs( path: str, env: dict[str, str], # end decorator args - default: str = "logs", -) -> str: - value = env.get("PATH_LOGS", default) +) -> str | None: + """ + Returns logging path set in environment + """ + # NOTE: ensure that even the empty string is converted to null + value = env.get("PATH_LOGS") or None return value diff --git a/src/setup/config.py b/src/setup/config.py index 1713b32..d33e731 100644 --- a/src/setup/config.py +++ b/src/setup/config.py @@ -41,7 +41,7 @@ pid = ContextVar[int]("pid") # fmt: skip path_env = ContextVar[str]("path env", default=".env") # fmt: skip -path_logging = Property[str](label="path logging", factory=lambda: get_path_logs(path_env.get())) # fmt: skip +path_logging = Property[str | None](label="path logging", factory=lambda: get_path_logs(path_env.get())) # fmt: skip path_config = Property[str](label="path application config", factory=lambda: get_root_path("setup", "config.yaml")) # fmt: skip path_requests = Property[str](label="path user requests", factory=lambda: get_root_path("setup", "requests.yaml")) # fmt: skip