Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,16 @@ services:
command: |-
bash --login -c '
source /run/secrets/credentials
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} list_users
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} list_users

# create admin
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_ADMIN_USER_RABBIT} 2> /dev/null
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_ADMIN_USER_RABBIT} $${HTTP_ADMIN_PASSWORD_RABBIT}
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_ADMIN_USER_RABBIT} ".*" ".*" ".*"
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} set_user_tags $${HTTP_ADMIN_USER_RABBIT} administrator
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_ADMIN_USER_RABBIT} 2> /dev/null
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_ADMIN_USER_RABBIT} $${HTTP_ADMIN_PASSWORD_RABBIT}
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_ADMIN_USER_RABBIT} ".*" ".*" ".*"
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} set_user_tags $${HTTP_ADMIN_USER_RABBIT} administrator

# create guest
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_GUEST_USER_RABBIT} 2> /dev/null
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_GUEST_USER_RABBIT} $${HTTP_GUEST_PASSWORD_RABBIT}
rabbitmqctl --node rabbit@${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_GUEST_USER_RABBIT} ".*" ".*" ".*"
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} delete_user $${HTTP_GUEST_USER_RABBIT} 2> /dev/null
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} add_user $${HTTP_GUEST_USER_RABBIT} $${HTTP_GUEST_PASSWORD_RABBIT}
rabbitmqctl --node rabbit@$${HTTP_HOST_NAME_RABBIT} set_permissions -p / $${HTTP_GUEST_USER_RABBIT} ".*" ".*" ".*"
'
43 changes: 4 additions & 39 deletions src/models/datasources/any.py → src/_core/utils/any.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
# IMPORTS
# ----------------------------------------------------------------

import json
from typing import Any

from pydantic import BaseModel
Expand All @@ -21,7 +20,6 @@
"AnyDataFrame",
"AnyDictionary",
"AnyEntity",
"serialise_any_element",
]

# ----------------------------------------------------------------
Expand All @@ -36,6 +34,7 @@ class AnyEntity(BaseModel):

model_config = ConfigDict(
use_enum_values=True,
arbitrary_types_allowed=True,
)

value: Any
Expand All @@ -50,6 +49,7 @@ class AnyDictionary(BaseModel):
extra="allow",
populate_by_name=True,
use_enum_values=True,
arbitrary_types_allowed=True,
)


Expand All @@ -60,6 +60,7 @@ class AnyArray(RootModel[list[BaseModel]]):

model_config = ConfigDict(
use_enum_values=True,
arbitrary_types_allowed=True,
)

root: list[Any]
Expand All @@ -73,42 +74,6 @@ class AnyDataFrame(RootModel[list[AnyDictionary]]):
model_config = ConfigDict(
populate_by_name=True,
use_enum_values=True,
arbitrary_types_allowed=True,
)
root: list[AnyDictionary]


# ----------------------------------------------------------------
# METHODS
# ----------------------------------------------------------------


def serialise_any_element(x: Any, /) -> bytes:
"""
Uses pydantic classes to as cleanly as possible JSON-serialise any element.
"""
match x:
case list():
x = AnyArray(root=x)

case dict():
x = AnyDictionary(root=x)

if isinstance(x, BaseModel):
contents = x.model_dump_json(
by_alias=True,
exclude_none=True,
exclude_unset=False,
exclude_defaults=False,
warnings="none",
).encode()
return contents

try:
contents = json.dumps(x).encode()
return contents

except Exception as _:
pass

contents = str(x).encode()
return contents
123 changes: 123 additions & 0 deletions src/_core/utils/serialise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# ----------------------------------------------------------------
# IMPORTS
# ----------------------------------------------------------------

import json
from datetime import datetime
from typing import Any

from pydantic import BaseModel
from safetywrap import Err
from safetywrap import Ok
from safetywrap import Result

from .any import *

# ----------------------------------------------------------------
# EXPORTS
# ----------------------------------------------------------------

__all__ = [
"JSON_TYPE",
"JSON_TYPE_BASIC",
"serialise_any_as_object",
"serialise_any_as_text",
]

# ----------------------------------------------------------------
# TYPES
# ----------------------------------------------------------------

# NOTE: although datetime is not a primitive JSON type it can be used in code
JSON_TYPE_BASIC = None | bool | str | int | float | datetime
JSON_TYPE = JSON_TYPE_BASIC | list["JSON_TYPE"] | dict[str, "JSON_TYPE"]

# ----------------------------------------------------------------
# METHODS
# ----------------------------------------------------------------


def serialise_any_as_object(x: Any, /) -> Result[JSON_TYPE, None]:
"""
Converts any element to a JSON-serialisable object.

NOTE: uses safety wrapping
"""
# preprocess bytes + all complex JSON-types
match x:
case bytes():
obj = x.decode()

case list():
obj = AnyArray(root=x)

case dict():
obj = AnyDictionary(root=x)

case _:
obj = x

match obj:
# cover base JSON-types + datetime
case None | bool() | str() | int() | float() | datetime():
return Ok(obj)

# cover all complex JSON-types - use pydantic's serialisation
case BaseModel():
obj = obj.model_dump(
mode="json",
by_alias=True,
exclude_none=True,
exclude_unset=False,
exclude_defaults=False,
warnings="none",
)

# NOTE: collapse "root" if result is {"root": ...}
if isinstance(obj, dict) and list(obj.keys()) == ["root"]:
obj = obj.get("root", obj)

return Ok(obj)

raise Err(None)


def serialise_any_as_text(x: Any, /) -> Result[str, None]:
"""
Serialises any element to a JSON text.

NOTE: uses safety wrapping
"""
obj = serialise_any_as_object(x)
if isinstance(obj, Err):
return Err(None)

obj = obj.unwrap()

# attempt JSON-serialisation
try:
text = json.dumps(
obj,
skipkeys=False,
ensure_ascii=False,
allow_nan=True,
sort_keys=False,
)
return Ok(text)

except Exception as _:
pass

# otherwise attempt to convert original object to string
try:
text = str(x)
return Ok(text)

except Exception as _:
pass

# otherwise fail
return Err(None)
35 changes: 3 additions & 32 deletions src/app/endpoints_fastapi/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# ----------------------------------------------------------------

import logging
from datetime import datetime
from functools import wraps
from typing import Any
from typing import Awaitable
Expand All @@ -25,8 +24,9 @@

from ..._core.constants import *
from ..._core.logging import *
from ..._core.utils.any import *
from ..._core.utils.serialise import *
from ...guards.http import *
from ...models.datasources import *
from ...models.filesmanager import *
from ...models.internal import *
from ...setup import *
Expand Down Expand Up @@ -136,37 +136,8 @@ async def wrapped_action(
case Ok():
result = result.unwrap()

# pre-handle jsonisable objects
match result:
case list():
result = AnyArray(root=result)

case dict():
result = AnyDictionary(root=result)

# serialise result
match result:
case None:
contents = None

case BaseModel():
contents = result.model_dump(
mode="json",
by_alias=True,
exclude_none=True,
exclude_unset=False,
exclude_defaults=False,
warnings="none",
)

case bool() | str() | int() | float() | datetime():
contents = result

case bytes():
contents = result.decode()

case None:
contents = None
contents = serialise_any_as_object(result).unwrap_or(None) # fmt: skip

# prepare response
response = JSONResponse(contents, status_code=code)
Expand Down
6 changes: 3 additions & 3 deletions src/features/feat_searchfs/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
# IMPORTS
# ----------------------------------------------------------------

import json
from datetime import datetime
from datetime import timedelta
from functools import partial

from pika.adapters.blocking_connection import BlockingChannel

from ..._core.logging import *
from ..._core.utils.serialise import *
from ..._core.utils.time import *
from ...algorithms.filesmanager import *
from ...models.apis.queue import *
Expand Down Expand Up @@ -89,12 +89,12 @@ def feature(
"path": subpath,
"filename": filename,
}
contents = json.dumps(body).encode()
contents = serialise_any_as_text(body).unwrap_or("")
chan.basic_publish(
exchange=msg_exchange,
routing_key=msg_route,
body=contents,
properties=RABBIG_LOG_LEVEL_INFO,
properties=RABBIT_LOG_LEVEL_INFO,
)

return
Expand Down
20 changes: 10 additions & 10 deletions src/features/feat_searchfs/superfeature.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from safetywrap import Ok
from safetywrap import Result

from ..._core.utils.serialise import *
from ..._core.utils.time import *
from ...models.apis.queue import *
from ...models.application import *
from ...models.datasources import *
from ...models.internal.errors import *
from ...setup import *
from .feature import *
Expand All @@ -35,7 +35,7 @@
def superfeature(
tasks: list[RequestTask],
/,
) -> Result[str, list[str]]:
) -> Result[str, list[JSON_TYPE]]:
"""
Calls `SEARCH-FS` features for a list of tasks
"""
Expand All @@ -51,7 +51,7 @@ def superfeature(
Establish connection to message queue
"""

with ChannelContext(settings=settings) as chan:
with ChannelContext(settings) as chan:
# FIXME: publication to exchages fails when msg_exchange is not ""
# chan.exchange_declare(exchange=msg_exchange, exchange_type="direct")

Expand All @@ -60,7 +60,7 @@ def superfeature(
msg_route = f"[{feat.value}].[{task.label}]"

# ensure case has its own route and that it is cleared
chan.queue_declare(queue=msg_route)
chan.queue_declare(queue=msg_route, durable=False, exclusive=False)
Comment thread
raj-open marked this conversation as resolved.
if task.options.reset_queue:
chan.queue_purge(queue=msg_route)

Expand Down Expand Up @@ -89,8 +89,8 @@ def superfeature(
"data": err.data,
}
errors.append(body)
contents = serialise_any_element(body)
chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip
contents = serialise_any_as_text(body).unwrap_or("")
chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIT_LOG_LEVEL_ERROR) # fmt: skip

except Exception as err:
msg = str(err)
Expand All @@ -103,8 +103,8 @@ def superfeature(
},
}
errors.append(body)
contents = serialise_any_element(body)
chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip
contents = serialise_any_as_text(body).unwrap_or("")
chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIT_LOG_LEVEL_ERROR) # fmt: skip

except BaseException as err:
# DEV-NOTE: pass on all other kinds of exceptions
Expand All @@ -117,8 +117,8 @@ def superfeature(
},
}
errors.append(body)
contents = serialise_any_element(body)
chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIG_LOG_LEVEL_ERROR) # fmt: skip
contents = serialise_any_as_text(body).unwrap_or("")
chan.basic_publish(exchange=msg_exchange, routing_key=msg_route, body=contents, properties=RABBIT_LOG_LEVEL_ERROR) # fmt: skip
raise err

"""
Expand Down
Loading