Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 66 additions & 7 deletions hrms/api/roster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,39 @@
from hrms.hr.doctype.shift_assignment_tool.shift_assignment_tool import create_shift_assignment
from hrms.hr.doctype.shift_schedule.shift_schedule import get_or_insert_shift_schedule

ALLOWED_EMPLOYEE_FILTERS = {
"status",
"company",
"department",
"branch",
"designation",
"employee_name",
}

ALLOWED_SHIFT_FILTERS = {
"shift_type",
"status",
"shift_location",
}


def _validate_employee_filters(employee_filters: dict[str, str]) -> None:
for key in employee_filters:
if key not in ALLOWED_EMPLOYEE_FILTERS:
frappe.throw(
_("Invalid employee filter: {0}").format(frappe.bold(key)),
frappe.PermissionError,
)


def _validate_shift_filters(shift_filters: dict[str, str]) -> None:
for key in shift_filters:
if key not in ALLOWED_SHIFT_FILTERS:
frappe.throw(
_("Invalid shift filter: {0}").format(frappe.bold(key)),
frappe.PermissionError,
)


@frappe.whitelist()
def get_default_company() -> str:
Expand All @@ -18,6 +51,8 @@ def get_default_company() -> str:
def get_events(
month_start: str, month_end: str, employee_filters: dict[str, str], shift_filters: dict[str, str]
) -> dict[str, list[dict]]:
_validate_employee_filters(employee_filters)
_validate_shift_filters(shift_filters)
holidays = get_holidays(month_start, month_end, employee_filters)
leaves = get_leaves(month_start, month_end, employee_filters)
shifts = get_shifts(month_start, month_end, employee_filters, shift_filters)
Expand Down Expand Up @@ -54,6 +89,8 @@ def create_shift_schedule_assignment(
frequency: str,
shift_location: str | None = None,
) -> None:
frappe.has_permission("Employee", "read", employee, throw=True)
frappe.has_permission("Shift Schedule Assignment", "create", throw=True)
shift_schedule = get_or_insert_shift_schedule(shift_type, frequency, repeat_on_days)
shift_schedule_assignment = frappe.get_doc(
{
Expand All @@ -77,14 +114,19 @@ def create_shift_schedule_assignment(

@frappe.whitelist()
def delete_shift_schedule_assignment(shift_schedule_assignment: str) -> None:
for shift_assignment in frappe.get_all(
shift_schedule_assignment_doc = frappe.get_doc("Shift Schedule Assignment", shift_schedule_assignment)
shift_schedule_assignment_doc.check_permission("delete")

for shift_assignment_name in frappe.get_all(
"Shift Assignment", {"shift_schedule_assignment": shift_schedule_assignment}, pluck="name"
):
doc = frappe.get_doc("Shift Assignment", shift_assignment)
if doc.docstatus == 1:
doc.cancel()
frappe.delete_doc("Shift Assignment", shift_assignment)
frappe.delete_doc("Shift Schedule Assignment", shift_schedule_assignment)
shift_assignment_doc = frappe.get_doc("Shift Assignment", shift_assignment_name)
frappe.has_permission("Employee", "read", shift_assignment_doc.employee, throw=True)
shift_assignment_doc.check_permission("cancel" if shift_assignment_doc.docstatus == 1 else "delete")
if shift_assignment_doc.docstatus == 1:
shift_assignment_doc.cancel()
frappe.delete_doc("Shift Assignment", shift_assignment_name, ignore_permissions=True)
frappe.delete_doc("Shift Schedule Assignment", shift_schedule_assignment, ignore_permissions=True)


@frappe.whitelist()
Expand All @@ -94,14 +136,22 @@ def swap_shift(
if src_shift == tgt_shift:
frappe.throw(_("Source and target shifts cannot be the same"))

src_shift_doc = frappe.get_doc("Shift Assignment", src_shift)
frappe.has_permission("Employee", "read", src_shift_doc.employee, throw=True)
src_shift_doc.check_permission("write")

frappe.has_permission("Employee", "read", tgt_employee, throw=True)
frappe.has_permission("Shift Assignment", "create", throw=True)

if tgt_shift:
tgt_shift_doc = frappe.get_doc("Shift Assignment", tgt_shift)
frappe.has_permission("Employee", "read", tgt_shift_doc.employee, throw=True)
tgt_shift_doc.check_permission("write")
tgt_company = tgt_shift_doc.company
break_shift(tgt_shift_doc, tgt_date)
else:
tgt_company = frappe.db.get_value("Employee", tgt_employee, "company")

src_shift_doc = frappe.get_doc("Shift Assignment", src_shift)
break_shift(src_shift_doc, src_date)
insert_shift(
tgt_employee,
Expand Down Expand Up @@ -130,6 +180,9 @@ def break_shift(assignment: str | ShiftAssignment, date: str) -> None:
if isinstance(assignment, str):
assignment = frappe.get_doc("Shift Assignment", assignment)

frappe.has_permission("Employee", "read", assignment.employee, throw=True)
assignment.check_permission("write")

if assignment.end_date and date_diff(assignment.end_date, date) < 0:
frappe.throw(_("Cannot break shift after end date"))
if date_diff(assignment.start_date, date) > 0:
Expand Down Expand Up @@ -165,6 +218,8 @@ def insert_shift(
status: str,
shift_location: str | None = None,
) -> None:
frappe.has_permission("Employee", "read", employee, throw=True)
frappe.has_permission("Shift Assignment", "create", throw=True)
filters = {
"doctype": "Shift Assignment",
"employee": employee,
Expand Down Expand Up @@ -193,6 +248,7 @@ def insert_shift(


def get_holidays(month_start: str, month_end: str, employee_filters: dict[str, str]) -> dict[str, list[dict]]:
_validate_employee_filters(employee_filters)
holidays = {}
holiday_lists = {}

Expand All @@ -213,6 +269,7 @@ def get_holidays(month_start: str, month_end: str, employee_filters: dict[str, s


def get_leaves(month_start: str, month_end: str, employee_filters: dict[str, str]) -> dict[str, list[dict]]:
_validate_employee_filters(employee_filters)
LeaveApplication = frappe.qb.DocType("Leave Application")
Employee = frappe.qb.DocType("Employee")

Expand Down Expand Up @@ -244,6 +301,8 @@ def get_leaves(month_start: str, month_end: str, employee_filters: dict[str, str
def get_shifts(
month_start: str, month_end: str, employee_filters: dict[str, str], shift_filters: dict[str, str]
) -> dict[str, list[dict]]:
_validate_employee_filters(employee_filters)
_validate_shift_filters(shift_filters)
ShiftAssignment = frappe.qb.DocType("Shift Assignment")
ShiftType = frappe.qb.DocType("Shift Type")
Employee = frappe.qb.DocType("Employee")
Expand Down
37 changes: 36 additions & 1 deletion hrms/hr/doctype/attendance_request/attendance_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,25 @@ def create_or_update_attendance(self, date: str):
),
title=_("Attendance Updated"),
)
elif status == "Half Day" and doc.half_day_status == "Absent" and self.half_day:
old_half_day_status = doc.half_day_status
doc.db_set({"half_day_status": "Present", "attendance_request": self.name})
text = _(
"Changed the Status for Other Half from {0} to {1} via Attendance Request as the status is Half Day"
).format(frappe.bold(old_half_day_status), frappe.bold("Present"))
doc.add_comment(comment_type="Info", text=text)

frappe.msgprint(
_(
"Updated Status for Other Half from {0} to {1} for date {2} in the attendance record {3}"
).format(
frappe.bold(old_half_day_status),
frappe.bold("Present"),
frappe.bold(format_date(date)),
get_link_to_form("Attendance", doc.name),
),
title=_("Attendance Updated"),
)
else:
# submit a new attendance record
doc = frappe.new_doc("Attendance")
Expand Down Expand Up @@ -221,6 +240,19 @@ def should_mark_attendance(self, attendance_date: str) -> bool:
return True

def has_leave_record(self, attendance_date: str) -> str | None:
filters = {
"employee": self.employee,
"docstatus": 1,
"from_date": ("<=", attendance_date),
"to_date": (">=", attendance_date),
"status": "Approved",
}
if self.half_day_date == attendance_date:
filters["half_day"] = 0

return frappe.db.exists("Leave Application", filters)

def has_half_day_leave_record(self, attendance_date: str) -> str | None:
return frappe.db.exists(
"Leave Application",
{
Expand All @@ -229,6 +261,8 @@ def has_leave_record(self, attendance_date: str) -> str | None:
"from_date": ("<=", attendance_date),
"to_date": (">=", attendance_date),
"status": "Approved",
"half_day": 1,
"half_day_date": attendance_date,
},
)

Expand Down Expand Up @@ -256,6 +290,8 @@ def status_unchanged(self, attendance_date):
new_status = self.get_attendance_status(attendance_date)
attendance_doc = self.get_attendance_doc(attendance_date)
if attendance_doc and attendance_doc.status == new_status:
if new_status == "Half Day" and self.half_day and attendance_doc.half_day_status == "Absent":
return False
return True
return False

Expand All @@ -277,7 +313,6 @@ def get_attendance_warnings(self) -> list:

for day in range(request_days):
attendance_date = add_days(self.from_date, day)

if not self.include_holidays and is_holiday(self.employee, attendance_date):
attendance_warnings.append({"date": attendance_date, "reason": "Holiday", "action": "Skip"})
elif self.has_leave_record(attendance_date):
Expand Down
97 changes: 94 additions & 3 deletions hrms/hr/doctype/attendance_request/test_attendance_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ def test_skip_attendance_on_leave(self):
self.to_date = get_year_ending(getdate())

frappe.delete_doc_if_exists("Leave Type", "Test Skip Attendance", force=1)
leave_type = frappe.get_doc(
dict(leave_type_name="Test Skip Attendance", doctype="Leave Type")
).insert()
leave_type = frappe.get_doc(leave_type_name="Test Skip Attendance", doctype="Leave Type").insert()

make_allocation_record(leave_type=leave_type.name, from_date=self.from_date, to_date=self.to_date)
today = getdate()
Expand Down Expand Up @@ -243,6 +241,99 @@ def test_half_day_status_change_when_existing_attendance_is_updated(self):
)
self.assertEqual(half_day_status, "Absent")

def test_half_day_absent_half_to_present(self):
"""Test attendance request updates half_day_status from Absent to Present when existing Half Day attendance has the other half marked absent"""
today = getdate()

mark_attendance(self.employee.name, today, "Half Day", half_day_status="Absent")

attendance_request = frappe.get_doc(
{
"doctype": "Attendance Request",
"employee": self.employee.name,
"from_date": today,
"to_date": today,
"reason": "On Duty",
"half_day": 1,
"half_day_date": today,
"company": "_Test Company",
}
).save()
attendance_request.submit()

updated = frappe.db.get_value(
"Attendance",
{"employee": self.employee.name, "attendance_date": today, "docstatus": 1},
["status", "half_day_status"],
as_dict=True,
)
self.assertEqual(updated.status, "Half Day")
self.assertEqual(updated.half_day_status, "Present")

def test_half_day_with_shift_auto_absent(self):
"""Test half-day attendance request when shift_type auto-flags the other half as absent due to missing checkins"""
from_date = get_year_start(add_months(getdate(), -1))
to_date = get_year_ending(getdate())
today = getdate()

frappe.delete_doc_if_exists("Leave Type", "Test Half Day Leave", force=1)
leave_type = frappe.get_doc(leave_type_name="Test Half Day Leave", doctype="Leave Type").insert()
make_allocation_record(leave_type=leave_type.name, from_date=from_date, to_date=to_date)
frappe.db.delete("Holiday", {"parent": self.holiday_list})

# 1) Submit half-day leave
leave_application = frappe.get_doc(
{
"doctype": "Leave Application",
"employee": self.employee.name,
"leave_type": leave_type.name,
"from_date": today,
"to_date": today,
"half_day": 1,
"half_day_date": today,
"status": "Approved",
}
).insert()
leave_application.submit()

# 2) Create shift type + assignment
shift_type = create_shift("Test Half Day Shift", "09:00:00", "17:00:00")
shift_type.process_attendance_after = add_days(today, -1)
shift_type.last_sync_of_checkin = add_days(today, 1)
shift_type.enable_auto_attendance = 1
shift_type.save()
create_shift_assignment(self.employee.name, shift_type.name, add_days(today, -1), add_days(today, 1))

# 3) Attendance request for the other half — creates half-day attendance
attendance_request = frappe.get_doc(
{
"doctype": "Attendance Request",
"employee": self.employee.name,
"from_date": today,
"to_date": today,
"reason": "On Duty",
"half_day": 1,
"half_day_date": today,
"company": "_Test Company",
}
).save()
attendance_request.submit()

# 4) Shift auto-attendance marks the other half absent when no checkins exist
frappe.get_doc("Shift Type", shift_type.name).mark_absent_for_half_day_dates(self.employee.name)

# Verify
attendance = frappe.db.get_value(
"Attendance",
{"attendance_request": attendance_request.name},
["name", "status", "half_day_status", "modify_half_day_status"],
as_dict=True,
)
self.assertTrue(attendance)
self.assertEqual(attendance.status, "Half Day")
self.assertEqual(attendance.half_day_status, "Absent")
self.assertEqual(attendance.modify_half_day_status, 0)

@HRMSTestSuite.change_settings("HR Settings", {"allow_multiple_shift_assignments": True})
def test_overlap_with_different_shifts(self):
shift_1 = create_shift("Morning Shift", "08:00:00", "12:00:00")
Expand Down
Loading
Loading