From fd528365a293e5eb4bac106a6dcf73e47f76c56b Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 8 Nov 2023 23:37:32 -0700 Subject: [PATCH 01/21] feat: Created Event document Event document represents time series for storing various events --- src/unipoll_api/documents.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/src/unipoll_api/documents.py b/src/unipoll_api/documents.py index 66cfacc..7f241f3 100644 --- a/src/unipoll_api/documents.py +++ b/src/unipoll_api/documents.py @@ -1,8 +1,18 @@ # from typing import ForwardRef, NewType, TypeAlias, Optional +from datetime import datetime from typing import Literal from bson import DBRef from beanie import Document as BeanieDocument -from beanie import BackLink, WriteRules, after_event, Insert, Link, PydanticObjectId # BackLink +from beanie import ( + BackLink, + WriteRules, + after_event, + Insert, + Link, + PydanticObjectId, + TimeSeriesConfig, + Granularity, +) from fastapi_users_db_beanie import BeanieBaseUser from pydantic import Field from unipoll_api.utils import colored_dbg as Debug @@ -210,3 +220,18 @@ class Member(Document): workspace: BackLink[Workspace] = Field(original_field="members") groups: list[BackLink[Group]] = Field(original_field="members") policies: list[Link[Policy]] = [] + + +# https://docs.mongodb.com/manual/core/timeseries-collections +class Event(Document): + ts: datetime = Field(default_factory=datetime.now) + resource: BackLink[Resource] = Field(original_field="event_log") + resource_id: str = Field(default_factory=str) + data: dict + + class Settings: + timeseries = TimeSeriesConfig( + time_field="ts", + meta_field="resource_id", + # expire_after_seconds=60 * 60 * 24 # 24 hours + ) \ No newline at end of file From 71ef74dfd96c9e08ca54863dffad4f12eb1a1356 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 8 Nov 2023 23:47:01 -0700 Subject: [PATCH 02/21] feat: Added event list and log_event to Resource Added list of linked events to Resource model and created log_event() method to record new events into the database --- src/unipoll_api/documents.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/unipoll_api/documents.py b/src/unipoll_api/documents.py index 7f241f3..38d5fdb 100644 --- a/src/unipoll_api/documents.py +++ b/src/unipoll_api/documents.py @@ -54,6 +54,7 @@ class Resource(Document): title="Name", description="Name of the resource", min_length=3, max_length=50) description: str = Field(default="", title="Description", max_length=1000) policies: list[Link["Policy"]] = [] + events: list[Link["Event"]] = [] @after_event(Insert) def create_group(self) -> None: @@ -85,6 +86,14 @@ async def remove_policy_by_holder(self, policy_holder: "Group | Member", save: b if save: await self.save(link_rule=WriteRules.WRITE) # type: ignore + async def log_event(self, data: dict, save: bool = True) -> "Event": + new_event = await Event(resource_id=self.id, data=data).create() # type: ignore + + self.events.append(new_event) # type: ignore + if save: + await self.save(link_rule=WriteRules.WRITE) # type: ignore + return new_event + class Account(BeanieBaseUser, Document): # type: ignore id: ResourceID = Field(default_factory=ResourceID, alias="_id") @@ -234,4 +243,4 @@ class Settings: time_field="ts", meta_field="resource_id", # expire_after_seconds=60 * 60 * 24 # 24 hours - ) \ No newline at end of file + ) From 03470e31ec632166f9e58d9a96f8e2baea511e18 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Mon, 13 Nov 2023 23:28:33 -0700 Subject: [PATCH 03/21] feat: Add event and timeseries generators --- src/unipoll_api/utils/events.py | 37 ++++++++++++++++++++++++++++++++ src/unipoll_api/utils/streams.py | 16 -------------- 2 files changed, 37 insertions(+), 16 deletions(-) create mode 100644 src/unipoll_api/utils/events.py delete mode 100644 src/unipoll_api/utils/streams.py diff --git a/src/unipoll_api/utils/events.py b/src/unipoll_api/utils/events.py new file mode 100644 index 0000000..7376a57 --- /dev/null +++ b/src/unipoll_api/utils/events.py @@ -0,0 +1,37 @@ +import asyncio +from datetime import datetime +from unipoll_api.documents import Resource, Event +from . import colored_dbg as Debug + + +async def get_updates(resource: Resource, since: datetime): + events = await Event.find(Event.resource_id == str(resource.id), Event.ts > since).to_list() + return events + + +async def timeseries_generator(resource: Resource): + try: + time = datetime.now() + while True: + events = await get_updates(resource, time) + if events: + time = events[-1].ts + yield events + except asyncio.CancelledError as e: + Debug.info("Disconnected from client (via refresh/close)") + raise e + + +async def event_generator(resource: Resource): + try: + while True: + print(resource.events) + await asyncio.sleep(1) + if resource.events: + print("Event found") + event = resource.events.pop() + print(event) + yield True + except asyncio.CancelledError as e: + Debug.info("Disconnected from client (via refresh/close)") + raise e diff --git a/src/unipoll_api/utils/streams.py b/src/unipoll_api/utils/streams.py deleted file mode 100644 index 00362d2..0000000 --- a/src/unipoll_api/utils/streams.py +++ /dev/null @@ -1,16 +0,0 @@ -import asyncio -from unipoll_api.documents import Resource -from . import colored_dbg as Debug - - -# async def update_generator(resource: Resource): -async def update_generator(): - i = 0 - try: - while True: - i += 1 - yield dict(data=i) - await asyncio.sleep(0.2) - except asyncio.CancelledError as e: - Debug.info("Disconnected from client (via refresh/close)") - raise e From cf726a3230dd4714c64b1a261bbb006925b7c350 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Mon, 13 Nov 2023 23:29:07 -0700 Subject: [PATCH 04/21] feat: Add new routes for event logging and subscription --- src/unipoll_api/routes/streams.py | 60 ++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/src/unipoll_api/routes/streams.py b/src/unipoll_api/routes/streams.py index 1025ce9..ce0b317 100644 --- a/src/unipoll_api/routes/streams.py +++ b/src/unipoll_api/routes/streams.py @@ -1,13 +1,57 @@ -from fastapi import APIRouter +from fastapi import APIRouter, HTTPException +from fastapi import Body from sse_starlette.sse import EventSourceResponse -from unipoll_api.utils.streams import update_generator - +from datetime import datetime +from unipoll_api.documents import ResourceID, Workspace, Event +from unipoll_api.utils.events import event_generator, get_updates, timeseries_generator router = APIRouter() -@router.get("/updates", - response_class=EventSourceResponse) -async def event_log(): - updates = update_generator() - return EventSourceResponse(updates) +@router.get("/updates/{resource_id}") +async def event_log(resource_id: ResourceID, + since: str = ""): + try: + if since == "": + from unipoll_api.app import start_time + time = start_time + else: + time = datetime.fromisoformat(since) + workspace = await Workspace.get(resource_id) + return await get_updates(workspace, time) + except Exception as e: + print(e) + return HTTPException(status_code=404, detail="Resource not found") + +@router.post("/updates/{workspace_id}") +async def generate_event(workspace_id: ResourceID, + event: dict = Body(...)): + try: + workspace = await Workspace.get(workspace_id) + new_event = await workspace.log_event(data={"message": "Event generated"}) + + # BUG: Does not work, since workspace.events reference is not updated + # FIXME: Find a way to keep a reference to the workspace.events list outside the function + + return new_event + except Exception as e: + print(e) + + +@router.get("/subscribe/{workspace_id}") +async def subscribe(workspace_id: ResourceID): + try: + workspace = await Workspace.get(workspace_id) + # get_updates = event_generator(workspace) + return EventSourceResponse(event_generator(workspace)) + except Exception as e: + print(e) + + +@router.get("/timeseries/{resource_id}") +async def timeseries(resource_id: ResourceID): + try: + workspace = await Workspace.get(resource_id) + return EventSourceResponse(timeseries_generator(workspace)) + except Exception as e: + print(e) From d5cede11c7e6ad1fdbcde46409376c78bb707e57 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Mon, 13 Nov 2023 23:29:34 -0700 Subject: [PATCH 05/21] feat: Add start time to application --- src/unipoll_api/app.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/unipoll_api/app.py b/src/unipoll_api/app.py index 64eadad..3c8a92a 100644 --- a/src/unipoll_api/app.py +++ b/src/unipoll_api/app.py @@ -1,3 +1,4 @@ +from datetime import datetime import json import uvicorn import os @@ -14,6 +15,9 @@ from unipoll_api.utils import cli_args, colored_dbg +# Application start time +start_time = datetime.now() + # Apply setting from configuration file settings = get_settings() From a0318423c8c2aec4fd5de3eed458d091c5a384bc Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Mon, 13 Nov 2023 23:29:59 -0700 Subject: [PATCH 06/21] refactor: Add deque data structure for events in Resource class --- src/unipoll_api/documents.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/unipoll_api/documents.py b/src/unipoll_api/documents.py index 38d5fdb..003bb0c 100644 --- a/src/unipoll_api/documents.py +++ b/src/unipoll_api/documents.py @@ -2,6 +2,8 @@ from datetime import datetime from typing import Literal from bson import DBRef +from collections import deque +from queue import Queue from beanie import Document as BeanieDocument from beanie import ( BackLink, @@ -54,7 +56,8 @@ class Resource(Document): title="Name", description="Name of the resource", min_length=3, max_length=50) description: str = Field(default="", title="Description", max_length=1000) policies: list[Link["Policy"]] = [] - events: list[Link["Event"]] = [] + # events: Queue = Queue() + events: deque = deque() @after_event(Insert) def create_group(self) -> None: @@ -87,9 +90,9 @@ async def remove_policy_by_holder(self, policy_holder: "Group | Member", save: b await self.save(link_rule=WriteRules.WRITE) # type: ignore async def log_event(self, data: dict, save: bool = True) -> "Event": - new_event = await Event(resource_id=self.id, data=data).create() # type: ignore + new_event = await Event(resource_id=str(self.id), data=data).create() # type: ignore - self.events.append(new_event) # type: ignore + self.events.append({"time": datetime.now(), "event": data}) if save: await self.save(link_rule=WriteRules.WRITE) # type: ignore return new_event @@ -234,10 +237,13 @@ class Member(Document): # https://docs.mongodb.com/manual/core/timeseries-collections class Event(Document): ts: datetime = Field(default_factory=datetime.now) - resource: BackLink[Resource] = Field(original_field="event_log") + # resource: BackLink[Resource] = Field(original_field="event_log") resource_id: str = Field(default_factory=str) data: dict + # @after_event(Insert) + # def + class Settings: timeseries = TimeSeriesConfig( time_field="ts", From 9d4c0a891a5f82a4fb2f28936bc1ed6b2a22d367 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Mon, 13 Nov 2023 23:30:12 -0700 Subject: [PATCH 07/21] feat: Add Event document model to mongo_db.py --- src/unipoll_api/mongo_db.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/unipoll_api/mongo_db.py b/src/unipoll_api/mongo_db.py index f317a07..a137e4f 100644 --- a/src/unipoll_api/mongo_db.py +++ b/src/unipoll_api/mongo_db.py @@ -19,5 +19,6 @@ Documents.Group, Documents.Workspace, Documents.Policy, - Documents.Poll + Documents.Poll, + Documents.Event ] From 2609e105238348ed43a2bfdef48abb618f07fce7 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 21:38:18 -0700 Subject: [PATCH 08/21] feat: Add Redis configuration options to Settings class --- src/unipoll_api/config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/unipoll_api/config.py b/src/unipoll_api/config.py index 1716e5c..dd883fa 100644 --- a/src/unipoll_api/config.py +++ b/src/unipoll_api/config.py @@ -28,6 +28,8 @@ class Settings(BaseSettings): # type: ignore port: int = 9000 reload: bool = True model_config = SettingsConfigDict(env_file=".env") + redis_host: str = Field(default="localhost", title="Redis Host", description="The host of the Redis database.") + redis_port: int = Field(default=6379, title="Redis Port", description="The port of the Redis database.") @lru_cache() From 63bb57cccc317dabd170c30305f00673eba1305e Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 21:41:35 -0700 Subject: [PATCH 09/21] feat: Added pubsub Redis functionality Created to functions to publish and listen to the messages from redis --- src/unipoll_api/redis.py | 47 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 src/unipoll_api/redis.py diff --git a/src/unipoll_api/redis.py b/src/unipoll_api/redis.py new file mode 100644 index 0000000..63a9c50 --- /dev/null +++ b/src/unipoll_api/redis.py @@ -0,0 +1,47 @@ +# import asyncio +import json +import redis.exceptions +import redis.asyncio +from redis.asyncio.client import Redis + +from unipoll_api.config import get_settings + +settings = get_settings() + + +PUSH_NOTIFICATIONS_CHANNEL = "PUSH_NOTIFICATIONS_CHANNEL" + + +connection: Redis = redis.asyncio.from_url( + f"redis://{settings.redis_host}:{settings.redis_port}", + encoding="utf8", + decode_responses=True, +) + + +async def publish_message(message: dict): + try: + await connection.publish(PUSH_NOTIFICATIONS_CHANNEL, json.dumps(message)) + except redis.exceptions.ConnectionError as e: + print("Connection error:", e) + except Exception as e: + print("An unexpected error occurred:", e) + + +async def listen_to_channel(user_id: str): + # Create message listener and subscribe on the event source channel + try: + async with connection.pubsub() as listener: + await listener.subscribe(PUSH_NOTIFICATIONS_CHANNEL) + # Create a message generator + while True: + message = await listener.get_message() + if message is None: + continue + if message.get("type") == "message": + message = json.loads(message["data"]) + # Checking, if the user is recipient of the message + if user_id == message.get("recipient_id"): + yield {"data": message} + except redis.exceptions.ConnectionError as e: + print("Connection error:", e) From 583405640ae4ad723b53e17aaeaa937d36f618f1 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 21:55:43 -0700 Subject: [PATCH 10/21] feat: Added endpoints to test Redis pubsub --- src/unipoll_api/routes/streams.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/unipoll_api/routes/streams.py b/src/unipoll_api/routes/streams.py index ce0b317..ce56cf4 100644 --- a/src/unipoll_api/routes/streams.py +++ b/src/unipoll_api/routes/streams.py @@ -1,9 +1,12 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Request from fastapi import Body from sse_starlette.sse import EventSourceResponse from datetime import datetime -from unipoll_api.documents import ResourceID, Workspace, Event +from unipoll_api.redis import listen_to_channel, publish_message, connection +from unipoll_api.documents import Account, ResourceID, Workspace, Event from unipoll_api.utils.events import event_generator, get_updates, timeseries_generator +from unipoll_api.dependencies import get_current_active_user + router = APIRouter() @@ -55,3 +58,23 @@ async def timeseries(resource_id: ResourceID): return EventSourceResponse(timeseries_generator(workspace)) except Exception as e: print(e) + + +@router.post("/redis/push") +async def redis_push(user: Account = Depends(get_current_active_user)): + try: + message = { + "time": datetime.now().isoformat(), + "message": "Hello World!" + } + await publish_message({"recipient_id": str(user.id), "message": message}) + except Exception as e: + print(e) + + +@router.get("/redis/subscribe") +async def redis_subscribe(user: Account = Depends(get_current_active_user)): + try: + return EventSourceResponse(listen_to_channel(str(user.id))) + except Exception as e: + print(e) From ead43ae9fb463a64455fe0c0120b702d13cf7dba Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 21:56:01 -0700 Subject: [PATCH 11/21] refactor: Refactor event logging and subscription endpoints for testing purposes --- src/unipoll_api/routes/streams.py | 50 ++++++++++++++----------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/src/unipoll_api/routes/streams.py b/src/unipoll_api/routes/streams.py index ce56cf4..a6d91de 100644 --- a/src/unipoll_api/routes/streams.py +++ b/src/unipoll_api/routes/streams.py @@ -1,16 +1,19 @@ -from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi import APIRouter, Depends, HTTPException from fastapi import Body from sse_starlette.sse import EventSourceResponse from datetime import datetime -from unipoll_api.redis import listen_to_channel, publish_message, connection -from unipoll_api.documents import Account, ResourceID, Workspace, Event -from unipoll_api.utils.events import event_generator, get_updates, timeseries_generator +from unipoll_api.redis import listen_to_channel, publish_message +from unipoll_api.documents import Account, ResourceID, Workspace # Event +from unipoll_api.utils.events import get_updates, get_event_stream from unipoll_api.dependencies import get_current_active_user - +from unipoll_api.exceptions import ResourceExceptions router = APIRouter() +# For testing purposes only +# Endpoint to get all events for a resource +# Accepts a query parameter "since" to get all events after a certain time @router.get("/updates/{resource_id}") async def event_log(resource_id: ResourceID, since: str = ""): @@ -20,46 +23,38 @@ async def event_log(resource_id: ResourceID, time = start_time else: time = datetime.fromisoformat(since) - workspace = await Workspace.get(resource_id) - return await get_updates(workspace, time) + return await get_updates(resource_id, time) except Exception as e: print(e) return HTTPException(status_code=404, detail="Resource not found") -@router.post("/updates/{workspace_id}") + +# For testing purposes only +# Endpoint to log an event for a resource(workspace) +@router.post("/workspace/{workspace_id}/log") async def generate_event(workspace_id: ResourceID, event: dict = Body(...)): try: workspace = await Workspace.get(workspace_id) - new_event = await workspace.log_event(data={"message": "Event generated"}) - - # BUG: Does not work, since workspace.events reference is not updated - # FIXME: Find a way to keep a reference to the workspace.events list outside the function - + if not workspace: + raise ResourceExceptions.ResourceNotFound("Workspace", workspace_id) + new_event = await workspace.log_event(data={"message": event}) return new_event except Exception as e: print(e) -@router.get("/subscribe/{workspace_id}") -async def subscribe(workspace_id: ResourceID): - try: - workspace = await Workspace.get(workspace_id) - # get_updates = event_generator(workspace) - return EventSourceResponse(event_generator(workspace)) - except Exception as e: - print(e) - - -@router.get("/timeseries/{resource_id}") -async def timeseries(resource_id: ResourceID): +# For testing purposes only +# Endpoint to get new events, that occur after this request +@router.get("/resource/{resource_id}/subscribe") +async def mongodb_subscribe(resource_id: ResourceID): try: - workspace = await Workspace.get(resource_id) - return EventSourceResponse(timeseries_generator(workspace)) + return EventSourceResponse(get_event_stream(resource_id)) except Exception as e: print(e) +# Endpoint to push notifications to a user @router.post("/redis/push") async def redis_push(user: Account = Depends(get_current_active_user)): try: @@ -72,6 +67,7 @@ async def redis_push(user: Account = Depends(get_current_active_user)): print(e) +# Endpoint to user notifications @router.get("/redis/subscribe") async def redis_subscribe(user: Account = Depends(get_current_active_user)): try: From ca491e21b25b1654356ad8023de9b3490875796f Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 21:56:52 -0700 Subject: [PATCH 12/21] refactor: Refactor events Changed get_updates function to use ResourceID instead of Resource object, removed event_generator --- src/unipoll_api/utils/events.py | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/unipoll_api/utils/events.py b/src/unipoll_api/utils/events.py index 7376a57..8a5c2bd 100644 --- a/src/unipoll_api/utils/events.py +++ b/src/unipoll_api/utils/events.py @@ -1,37 +1,22 @@ import asyncio from datetime import datetime -from unipoll_api.documents import Resource, Event +from unipoll_api.documents import ResourceID, Event from . import colored_dbg as Debug -async def get_updates(resource: Resource, since: datetime): - events = await Event.find(Event.resource_id == str(resource.id), Event.ts > since).to_list() +async def get_updates(resource_id: ResourceID, since: datetime): + events = await Event.find(Event.resource_id == str(resource_id), Event.ts > since).to_list() return events -async def timeseries_generator(resource: Resource): +async def get_event_stream(resource_id: ResourceID): try: time = datetime.now() while True: - events = await get_updates(resource, time) + events = await get_updates(resource_id, time) if events: time = events[-1].ts yield events except asyncio.CancelledError as e: Debug.info("Disconnected from client (via refresh/close)") raise e - - -async def event_generator(resource: Resource): - try: - while True: - print(resource.events) - await asyncio.sleep(1) - if resource.events: - print("Event found") - event = resource.events.pop() - print(event) - yield True - except asyncio.CancelledError as e: - Debug.info("Disconnected from client (via refresh/close)") - raise e From 3966d8c292bf66530fd374828adbf6cdbc9fbbce Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 21:57:49 -0700 Subject: [PATCH 13/21] refactor: Removed unnecessary code in app.py operation_id is not set inside router module --- src/unipoll_api/app.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/unipoll_api/app.py b/src/unipoll_api/app.py index 3c8a92a..24929e8 100644 --- a/src/unipoll_api/app.py +++ b/src/unipoll_api/app.py @@ -52,9 +52,9 @@ async def on_startup() -> None: # Simplify operation IDs so that generated API clients have simpler function names # Each route will have its operation ID set to the method name - for route in app.routes: - if isinstance(route, APIRoute): - route.operation_id = route.name + # for route in app.routes: + # if isinstance(route, APIRoute): + # route.operation_id = route.name await init_beanie( database=mainDB, # type: ignore From b1aeefd2f50b05307a3aceddb34c5427390a366f Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 21:58:38 -0700 Subject: [PATCH 14/21] refactor: Removed events fielf from the Resource There is no point in storing events inside of the Resource --- src/unipoll_api/documents.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/unipoll_api/documents.py b/src/unipoll_api/documents.py index 003bb0c..2a42f84 100644 --- a/src/unipoll_api/documents.py +++ b/src/unipoll_api/documents.py @@ -2,8 +2,6 @@ from datetime import datetime from typing import Literal from bson import DBRef -from collections import deque -from queue import Queue from beanie import Document as BeanieDocument from beanie import ( BackLink, @@ -56,8 +54,6 @@ class Resource(Document): title="Name", description="Name of the resource", min_length=3, max_length=50) description: str = Field(default="", title="Description", max_length=1000) policies: list[Link["Policy"]] = [] - # events: Queue = Queue() - events: deque = deque() @after_event(Insert) def create_group(self) -> None: From c23fd2fa3c988f9ae9d9ddd32ad5a1a31d425f13 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 22:33:14 -0700 Subject: [PATCH 15/21] feat: Add function to notify all members of a resource --- src/unipoll_api/utils/events.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/unipoll_api/utils/events.py b/src/unipoll_api/utils/events.py index 8a5c2bd..aebcf68 100644 --- a/src/unipoll_api/utils/events.py +++ b/src/unipoll_api/utils/events.py @@ -1,6 +1,7 @@ import asyncio from datetime import datetime -from unipoll_api.documents import ResourceID, Event +from unipoll_api.documents import ResourceID, Event, Resource +from unipoll_api.redis import publish_message from . import colored_dbg as Debug @@ -20,3 +21,16 @@ async def get_event_stream(resource_id: ResourceID): except asyncio.CancelledError as e: Debug.info("Disconnected from client (via refresh/close)") raise e + + +async def notify_members(resource: Resource, message: dict): + try: + timestamp = datetime.now() + for member in resource.members: + # print(member) + data = {"recipient_id": str(member.account.id), + "timestamp": str(timestamp), + "message": message} + await publish_message(data) + except Exception as e: + print(e) From 4f4dc0d57138bdb97b6ac1ff20dabd122ee6bbeb Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 22:33:51 -0700 Subject: [PATCH 16/21] refactor: Add message parameter and timestamp to redis_push endpoint --- src/unipoll_api/routes/streams.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/unipoll_api/routes/streams.py b/src/unipoll_api/routes/streams.py index a6d91de..badf451 100644 --- a/src/unipoll_api/routes/streams.py +++ b/src/unipoll_api/routes/streams.py @@ -56,13 +56,15 @@ async def mongodb_subscribe(resource_id: ResourceID): # Endpoint to push notifications to a user @router.post("/redis/push") -async def redis_push(user: Account = Depends(get_current_active_user)): +async def redis_push(user: Account = Depends(get_current_active_user), + message: dict = Body(...)): try: - message = { - "time": datetime.now().isoformat(), - "message": "Hello World!" + data = { + "recipient_id": str(user.id), + "timestamp": datetime.now().isoformat(), + "message": message } - await publish_message({"recipient_id": str(user.id), "message": message}) + await publish_message(data) except Exception as e: print(e) From 02ec499824e37ecaac1fb9a0e7e383f36d8affba Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 22:34:05 -0700 Subject: [PATCH 17/21] refactor: Refactor log_event method in Resource class --- src/unipoll_api/documents.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/unipoll_api/documents.py b/src/unipoll_api/documents.py index 2a42f84..77d2354 100644 --- a/src/unipoll_api/documents.py +++ b/src/unipoll_api/documents.py @@ -85,12 +85,8 @@ async def remove_policy_by_holder(self, policy_holder: "Group | Member", save: b if save: await self.save(link_rule=WriteRules.WRITE) # type: ignore - async def log_event(self, data: dict, save: bool = True) -> "Event": + async def log_event(self, data: dict) -> "Event": new_event = await Event(resource_id=str(self.id), data=data).create() # type: ignore - - self.events.append({"time": datetime.now(), "event": data}) - if save: - await self.save(link_rule=WriteRules.WRITE) # type: ignore return new_event From 4592d49b14eb7bd277396a823e8af6d62498d4e6 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 22:35:03 -0700 Subject: [PATCH 18/21] style: Changed publish_message parameter name --- src/unipoll_api/redis.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/unipoll_api/redis.py b/src/unipoll_api/redis.py index 63a9c50..b5cad9f 100644 --- a/src/unipoll_api/redis.py +++ b/src/unipoll_api/redis.py @@ -19,9 +19,9 @@ ) -async def publish_message(message: dict): +async def publish_message(data: dict): try: - await connection.publish(PUSH_NOTIFICATIONS_CHANNEL, json.dumps(message)) + await connection.publish(PUSH_NOTIFICATIONS_CHANNEL, json.dumps(data)) except redis.exceptions.ConnectionError as e: print("Connection error:", e) except Exception as e: From a1448ab1ad4e57d1a2a5615db7d868a63ac7eef5 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 15 Nov 2023 22:35:17 -0700 Subject: [PATCH 19/21] feat: Add event logging and member notification to workspace update --- src/unipoll_api/actions/workspace.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/unipoll_api/actions/workspace.py b/src/unipoll_api/actions/workspace.py index 9949302..84af368 100644 --- a/src/unipoll_api/actions/workspace.py +++ b/src/unipoll_api/actions/workspace.py @@ -1,8 +1,9 @@ +import asyncio from bson import DBRef from unipoll_api import AccountManager from unipoll_api import actions from unipoll_api.documents import Workspace, Account, Policy, Member -from unipoll_api.utils import Permissions +from unipoll_api.utils import Permissions, events from unipoll_api.schemas import WorkspaceSchemas from unipoll_api.exceptions import WorkspaceExceptions # from unipoll_api.dependencies import get_member @@ -87,6 +88,12 @@ async def update_workspace(workspace: Workspace, # Save the updated workspace if save_changes: await Workspace.save(workspace) + + # Log the event + await workspace.log_event(data={"message": "Workspace updated"}) + asyncio.create_task(events.notify_members(workspace, {"message": "Workspace updated"})) + # BackgroundTasks.add_task(events.notify_members, workspace, {"message": "Workspace updated"}) + # Return the updated workspace return WorkspaceSchemas.Workspace(**workspace.model_dump(include={'id', 'name', 'description'})) From 7290cab4e96ce72374fbc4dc299453f113acdae5 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 20 Dec 2023 23:49:48 -0700 Subject: [PATCH 20/21] refactor: Update Redis subscribe endpoint route --- src/unipoll_api/routes/streams.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/unipoll_api/routes/streams.py b/src/unipoll_api/routes/streams.py index badf451..73e4433 100644 --- a/src/unipoll_api/routes/streams.py +++ b/src/unipoll_api/routes/streams.py @@ -70,7 +70,8 @@ async def redis_push(user: Account = Depends(get_current_active_user), # Endpoint to user notifications -@router.get("/redis/subscribe") +# @router.get("/redis/subscribe") +@router.get("/subscribe") async def redis_subscribe(user: Account = Depends(get_current_active_user)): try: return EventSourceResponse(listen_to_channel(str(user.id))) From 2c07b3c46f4511817bb9f449329769d71eb06016 Mon Sep 17 00:00:00 2001 From: Michael Pisman Date: Wed, 20 Dec 2023 23:50:01 -0700 Subject: [PATCH 21/21] fix: Fix JSON serialization in listen_to_channel function --- src/unipoll_api/redis.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/unipoll_api/redis.py b/src/unipoll_api/redis.py index b5cad9f..6e96400 100644 --- a/src/unipoll_api/redis.py +++ b/src/unipoll_api/redis.py @@ -42,6 +42,6 @@ async def listen_to_channel(user_id: str): message = json.loads(message["data"]) # Checking, if the user is recipient of the message if user_id == message.get("recipient_id"): - yield {"data": message} + yield {"data": json.dumps(message)} except redis.exceptions.ConnectionError as e: print("Connection error:", e)