Event Conductor is a powerful Python library designed for event-driven architectures. It provides advanced capabilities such as priority subscriptions, middleware hooks, wildcard matching, and event sourcing, enabling seamless event management across your applications.
- Priority Subscriptions: Define the execution order of callbacks with numeric priorities.
- Wildcard/Regex Matching: Subscribe to multiple events using patterns like
USER.*. - Sync and Async Modes: Publish events in a blocking or non-blocking manner.
- Middleware Hooks: Add
beforeandafterhooks to transform or monitor events. - Traceability: Includes correlation IDs and timestamps for enhanced debugging and tracking.
- Event Sourcing: Record and replay events for debugging or state reconstruction.
-
Clone the repository:
git clone https://github.com/jaintp/event-conductor.git cd event-conductor -
Install dependencies with
uv:uv install
Alternatively, use
pip:pip install -r requirements.txt
-
Install testing dependencies:
pip install pytest pytest-asyncio
from event_conductor import EventConductor
async def main():
# Create an instance of EventConductor
conductor = EventConductor()
async def on_user_created(event):
print(f"User created: {event}")
# Subscribe to an event
await conductor.subscribe("USER.CREATED", on_user_created)
# Publish an event
await conductor.publish({"event_name": "USER.CREATED", "user_id": 42})
if __name__ == '__main__':
asyncio.run(main())Or...
import asyncio
from event_conductor import BaseEvent, EventConductor
class UserCreated(BaseEvent):
event_name: str = 'USER.CREATED'
user_id: int
async def main():
# Create an instance of EventConductor
conductor = EventConductor()
async def on_user_created(event):
print(f"User created: {event}")
# Create event instance
event = UserCreated(user_id=42)
# Subscribe to an event
await conductor.subscribe("USER.CREATED", on_user_created)
# Publish an event
await conductor.publish(event)
if __name__ == '__main__':
asyncio.run(main())async def order_handler(event):
print(f"Order event: {event.event_name}")
# Subscribe to all events matching the pattern
await conductor.subscribe(r"^ORDER\\..*", order_handler)
# Publish events
await conductor.publish({"event_name": "ORDER.PLACED"})
await conductor.publish({"event_name": "ORDER.CANCELLED"})async def high_priority(event):
print("High priority executed")
async def low_priority(event):
print("Low priority executed")
# Subscribe with priorities
await conductor.subscribe("TEST.EVENT", high_priority, priority=10)
await conductor.subscribe("TEST.EVENT", low_priority, priority=1)
await conductor.publish({"event_name": "TEST.EVENT"})def before_hook(event):
event.event_name = event.event_name.upper()
return event
def after_hook(event, exc):
if exc:
print(f"Error: {exc}")
else:
print(f"Successfully handled {event.event_name}")
conductor.add_before_hook(before_hook)
conductor.add_after_hook(after_hook)
await conductor.publish({"event_name": "middleware.test"})# Publish events
await conductor.publish({"event_name": "REPLAY.ONE"})
await conductor.publish({"event_name": "REPLAY.TWO"})
# Replay stored events
await conductor.replay_events()Run tests using pytest:
pytestFor coverage:
pytest --cov=event_conductor --cov-report=term-missing-
Fork the repository.
-
Create a feature branch:
git checkout -b feature/your-feature
-
Commit your changes:
git commit -m "Add your feature description" -
Push your branch:
git push origin feature/your-feature
-
Create a pull request.
This project is licensed under the MIT License. See the LICENSE file for details.
Developed by Jai Brown (JaINTP) (jaintp.dev@gmail.com)
Let me know if there are any specific areas you'd like to refine further!