Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions App/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# App/controllers/_init_.py
from .user import *
from .auth import *
from .initialize import *
from .admin import *
from .staff import *
from .staff import *
from .auth import *
from .initialize import *
97 changes: 57 additions & 40 deletions App/controllers/admin.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,75 @@
from App.models import Shift
from App.database import db
# App/controllers/admin.py
from App.controllers.scheduling_logic import ScheduleStrategyFactory, AutoScheduler
from datetime import datetime
from App.controllers.user import get_user

from App.models import Shift, Schedule
from App.models import Admin, Staff, Shift
from App.database import db
from datetime import datetime
from App.controllers.user import get_user

def create_schedule(admin_id, scheduleName): #Not sure why this was missing
admin = get_user(admin_id)
if not admin or admin.role != "admin":
raise PermissionError("Only admins can create schedules")

new_schedule = Schedule(
created_by=admin_id,
name=scheduleName,
created_at=datetime.utcnow()
)
def generate_auto_schedule(schedule_id: int, method_type: str):
"""
Executes the automatic scheduling logic using the Strategy pattern
by leveraging the StrategyFactory and AutoScheduler directly in the Controller.

db.session.add(new_schedule)
db.session.commit()
The logic now flows: Controller -> StrategyFactory (select strategy) -> AutoScheduler (execute strategy).

Args:
schedule_id (str): The ID of the schedule to be processed.
method_type (str): The strategy to use (e.g., 'priority', 'random').

return new_schedule
Returns:
dict: The result of the scheduling operation.
"""
# collect staff and unassigned shift templates for the schedule
staff_list = Staff.query.all()
shift_templates = Shift.query.filter_by(schedule_id=schedule_id, staff_id=None).all()

def schedule_shift(admin_id, staff_id, schedule_id, start_time, end_time):
admin = get_user(admin_id)
staff = get_user(staff_id)
try:
strategy = ScheduleStrategyFactory.create_strategy(method_type)
except ValueError as e:
return {"status": "error", "message": str(e)}

schedule = db.session.get(Schedule, schedule_id)
scheduler = AutoScheduler(strategy, staff_list, [
{"start_time": s.start_time, "end_time": s.end_time} for s in shift_templates
], schedule_id)

if not admin or admin.role != "admin":
raise PermissionError("Only admins can schedule shifts")
if not staff or staff.role != "staff":
raise ValueError("Invalid staff member")
if not schedule:
raise ValueError("Invalid schedule ID")
try:
result = scheduler.generate_schedule()
return {"status": "success", "data": result}
except Exception as e:
return {"status": "error", "message": f"Auto-scheduling failed: {e}"}

new_shift = Shift(

def schedule_shift(admin_id: int, staff_id: int, schedule_id: int, start_time, end_time):
"""Create a shift under a schedule assigned to a staff member.

This is a thin controller wrapper that persists the Shift.
"""
# basic validation could be added (check admin role) in future
if not staff_id or not schedule_id:
raise ValueError("staff_id and schedule_id are required")

# parse datetimes if strings
if isinstance(start_time, str):
start_time = datetime.fromisoformat(start_time)
if isinstance(end_time, str):
end_time = datetime.fromisoformat(end_time)

shift = Shift(
staff_id=staff_id,
schedule_id=schedule_id,
start_time=start_time,
end_time=end_time
end_time=end_time,
)

db.session.add(new_shift)
db.session.add(shift)
db.session.commit()

return new_shift
return shift.get_json()


def get_shift_report(admin_id):
admin = get_user(admin_id)
if not admin or admin.role != "admin":
raise PermissionError("Only admins can view shift reports")
def get_shift_report(admin_id: int):
"""Return all shifts (simple report) for an admin dashboard.

return [shift.get_json() for shift in Shift.query.order_by(Shift.start_time).all()]
No strict admin checks are enforced here; controllers that call this
should ensure permissions.
"""
shifts = Shift.query.order_by(Shift.start_time).all()
return [s.get_json() for s in shifts]
74 changes: 74 additions & 0 deletions App/controllers/scheduling_logic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from App.controllers.scheduling_logic import ScheduleStrategyFactory, AutoScheduler
from datetime import datetime
from App.models import Admin, Staff, Shift
from App.database import db


def generate_auto_schedule(schedule_id: int, method_type: str):
"""
Executes the automatic scheduling logic using the Strategy pattern
by leveraging the StrategyFactory and AutoScheduler directly in the Controller.

The logic now flows: Controller -> StrategyFactory (select strategy) -> AutoScheduler (execute strategy).

Args:
schedule_id (str): The ID of the schedule to be processed.
method_type (str): The strategy to use (e.g., 'priority', 'random').

Returns:
dict: The result of the scheduling operation.
"""
# collect staff and unassigned shift templates for the schedule
staff_list = Staff.query.all()
shift_templates = Shift.query.filter_by(schedule_id=schedule_id, staff_id=None).all()

try:
strategy = ScheduleStrategyFactory.create_strategy(method_type)
except ValueError as e:
return {"status": "error", "message": str(e)}

scheduler = AutoScheduler(strategy, staff_list, [
{"start_time": s.start_time, "end_time": s.end_time} for s in shift_templates
], schedule_id)

try:
result = scheduler.generate_schedule()
return {"status": "success", "data": result}
except Exception as e:
return {"status": "error", "message": f"Auto-scheduling failed: {e}"}


def schedule_shift(admin_id: int, staff_id: int, schedule_id: int, start_time, end_time):
"""Create a shift under a schedule assigned to a staff member.

This is a thin controller wrapper that persists the Shift.
"""
# basic validation could be added (check admin role) in future
if not staff_id or not schedule_id:
raise ValueError("staff_id and schedule_id are required")

# parse datetimes if strings
if isinstance(start_time, str):
start_time = datetime.fromisoformat(start_time)
if isinstance(end_time, str):
end_time = datetime.fromisoformat(end_time)

shift = Shift(
staff_id=staff_id,
schedule_id=schedule_id,
start_time=start_time,
end_time=end_time,
)
db.session.add(shift)
db.session.commit()
return shift.get_json()


def get_shift_report(admin_id: int):
"""Return all shifts (simple report) for an admin dashboard.

No strict admin checks are enforced here; controllers that call this
should ensure permissions.
"""
shifts = Shift.query.order_by(Shift.start_time).all()
return [s.get_json() for s in shifts]
86 changes: 55 additions & 31 deletions App/controllers/staff.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,67 @@
from App.models import Shift
# App/controllers/staff.py
from App.models import Staff
from App.database import db
from datetime import datetime
from App.controllers.user import get_user
from App.controllers.user import get_user_by_id

def get_combined_roster(staff_id):
staff = get_user(staff_id)
if not staff or staff.role != "staff":
raise PermissionError("Only staff can view roster")
return [shift.get_json() for shift in Shift.query.order_by(Shift.start_time).all()]
# Helper function to get the specific Staff object
def get_staff_user(staff_id):
"""Retrieves a Staff user instance by ID and checks for role."""
user = get_user_by_id(staff_id)
if not user or user.role != "staff":
raise PermissionError("Access denied: Only staff members can perform this action.")
# Cast the generic User to the specific Staff subclass
return db.session.get(Staff, staff_id)

def get_staff_roster(staff_id):
"""
Returns the roster (list of shifts) specifically assigned to this staff member.
Delegates logic to the Staff model's view_roster method.
"""
try:
staff = get_staff_user(staff_id) # Raises PermissionError if not staff
return staff.view_roster()
except PermissionError as e:
return {"error": str(e)}, 403

def clock_in(staff_id, shift_id):
staff = get_user(staff_id)
if not staff or staff.role != "staff":
raise PermissionError("Only staff can clock in")

shift = db.session.get(Shift, shift_id)

if not shift or shift.staff_id != staff_id:
raise ValueError("Invalid shift for staff")
def get_combined_roster(staff_id):
"""Compatibility wrapper expected by other modules (e.g. wsgi).

shift.clock_in = datetime.now()
db.session.commit()
return shift
Delegates to get_staff_roster which returns the roster JSON or an error tuple.
"""
return get_staff_roster(staff_id)

def clock_in(staff_id, shift_id):
"""
Clocks in a staff member for an assigned shift.
Delegates persistence logic and checks to the Staff model's method.
"""
try:
staff = get_staff_user(staff_id)
# The Staff model handles all clock-in validation and persistence
return staff.clock_in(shift_id).get_json()
except PermissionError as e:
return {"error": str(e)}, 403
except ValueError as e:
return {"error": str(e)}, 400

def clock_out(staff_id, shift_id):
staff = get_user(staff_id)
if not staff or staff.role != "staff":
raise PermissionError("Only staff can clock out")

shift = db.session.get(Shift, shift_id)
if not shift or shift.staff_id != staff_id:
raise ValueError("Invalid shift for staff")

shift.clock_out = datetime.now()
db.session.commit()
return shift

"""
Clocks out a staff member from an assigned shift.
Delegates persistence logic and checks to the Staff model's method.
"""
try:
staff = get_staff_user(staff_id)
# The Staff model handles all clock-out validation and persistence
return staff.clock_out(shift_id).get_json()
except PermissionError as e:
return {"error": str(e)}, 403
except ValueError as e:
return {"error": str(e)}, 400

def get_shift(shift_id):
"""Retrieves a single shift object (helper, unchanged)."""
from App.models import Shift
shift = db.session.get(Shift, shift_id)
return shift
return shift.get_json() if shift else None
55 changes: 51 additions & 4 deletions App/controllers/user.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# App/controllers/user.py
from App.models import User, Admin, Staff, Shift
from App.database import db
from datetime import datetime
Expand All @@ -7,7 +8,7 @@
def create_user(username, password, role):
role = role.lower().strip()
if role not in VALID_ROLES:
print(f"⚠ Invalid role '{role}'. Must be one of {VALID_ROLES}")
print(f"⚠ Invalid role '{role}'. Must be one of {VALID_ROLES}")
return None
if role == "admin":
newuser = Admin(username=username, password=password)
Expand All @@ -23,7 +24,8 @@ def create_user(username, password, role):
def get_user_by_username(username):
return User.query.filter_by(username=username).first()

def get_user(id):
# Renamed from get_user to get_user_by_id for clarity
def get_user_by_id(id):
return db.session.get(User, id)

def get_all_users():
Expand All @@ -36,9 +38,54 @@ def get_all_users_json():
return [user.get_json() for user in users]

def update_user(id, username):
user = get_user(id)
user = get_user_by_id(id)
if user:
user.username = username
db.session.commit()
return user
return None
return None# App/controllers/user.py
from App.models import User, Admin, Staff, Shift
from App.database import db
from datetime import datetime

VALID_ROLES = {"user", "staff", "admin"}

def create_user(username, password, role):
role = role.lower().strip()
if role not in VALID_ROLES:
print(f"⚠ Invalid role '{role}'. Must be one of {VALID_ROLES}")
return None
if role == "admin":
newuser = Admin(username=username, password=password)
elif role == "staff":
newuser = Staff(username=username, password=password)
else:
newuser = User(username=username, password=password, role="user")

db.session.add(newuser)
db.session.commit()
return newuser

def get_user_by_username(username):
return User.query.filter_by(username=username).first()

# Renamed from get_user to get_user_by_id for clarity
def get_user_by_id(id):
return db.session.get(User, id)

def get_all_users():
return User.query.all()

def get_all_users_json():
users = get_all_users()
if not users:
return []
return [user.get_json() for user in users]

def update_user(id, username):
user = get_user_by_id(id)
if user:
user.username = username
db.session.commit()
return user
return None