Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 95 additions & 38 deletions hrms/hr/report/monthly_attendance_sheet/monthly_attendance_sheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
from frappe import _
from frappe.query_builder import Case
from frappe.query_builder.functions import Count, Extract, Sum
from frappe.utils import cint, cstr, formatdate, getdate
from frappe.utils import add_days, cint, cstr, formatdate, getdate
from frappe.utils.nestedset import get_descendants_of

from hrms.utils import date_diff, get_date_range
from hrms.utils.holiday_list import (
fill_employee_holiday_list_date_gaps_with_company_holiday_list,
get_assigned_holiday_lists_to_employee_and_company,
)

Filters = frappe._dict

Expand Down Expand Up @@ -235,7 +239,16 @@ def get_date_condition(docfield: Field, filters: Filters) -> Criterion:

def get_data(filters: Filters, attendance_map: dict) -> list[dict]:
employee_details, group_by_param_values = get_employee_related_details(filters)
holiday_map = get_holiday_map(filters)

# flatten grouped structure so get_employee_holiday_map always gets {emp: details}
if filters.group_by:
ungrouped_employee_details = {}
for details in employee_details.values():
ungrouped_employee_details.update(details)
else:
ungrouped_employee_details = employee_details

employee_holiday_map = get_employee_holiday_map(ungrouped_employee_details, filters)
data = []

if filters.group_by:
Expand All @@ -245,14 +258,14 @@ def get_data(filters: Filters, attendance_map: dict) -> list[dict]:
if not value:
continue

records = get_rows(employee_details[value], filters, holiday_map, attendance_map)
records = get_rows(employee_details[value], filters, employee_holiday_map, attendance_map)

if records:
data.append({group_by_column: value})
data.extend(records)

else:
data = get_rows(employee_details, filters, holiday_map, attendance_map)
data = get_rows(employee_details, filters, employee_holiday_map, attendance_map)

return data

Expand Down Expand Up @@ -305,6 +318,7 @@ def get_attendance_map(filters: Filters) -> dict:

def get_attendance_records(filters: Filters) -> list[dict]:
Attendance = frappe.qb.DocType("Attendance")
Employee = frappe.qb.DocType("Employee")
attendance_date_condition = get_date_condition(Attendance.attendance_date, filters)
status = (
frappe.qb.terms.Case()
Expand Down Expand Up @@ -335,6 +349,14 @@ def get_attendance_records(filters: Filters) -> list[dict]:

if filters.employee:
query = query.where(Attendance.employee == filters.employee)

if filters.department or filters.branch:
query = query.join(Employee).on(Attendance.employee == Employee.name)
if filters.department and filters.department != "All Departments":
query = query.where(Employee.department == filters.department)
if filters.branch:
query = query.where(Employee.branch == filters.branch)

query = query.orderby(Attendance.employee, Attendance.attendance_date)

return query.run(as_dict=1)
Expand Down Expand Up @@ -404,52 +426,87 @@ def get_employee_related_details(filters: Filters) -> tuple[dict, list]:
return emp_map, group_by_param_values


def get_holiday_map(filters: Filters) -> dict[str, list[dict]]:
def get_employee_holiday_map(employee_details: dict, filters: Filters) -> dict[str, list[dict]]:
"""
Returns a dict of holidays falling in the filter month and year
with list name as key and list of holidays as values like
{
'Holiday List 1': [
{'day_of_month': '0' , 'weekly_off': 1},
{'day_of_month': '1', 'weekly_off': 0}
],
'Holiday List 2': [
{'day_of_month': '0' , 'weekly_off': 1},
{'day_of_month': '1', 'weekly_off': 0}
]
}
Builds {employee: [holidays]} for all employees in two queries.

Query 1 — bulk HLA fetch for all employees + their companies.
Query 2 — holidays for only the holiday lists employees are actually assigned to.

Per-employee lookup after this call is an O(1) dict access.
"""
# add default holiday list too
holiday_lists = frappe.db.get_all("Holiday List", pluck="name")
default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")
holiday_lists.append(default_holiday_list)
if not employee_details:
return {}

start_date, end_date = get_date_range_from_filters(filters)

employees = list(employee_details.keys())
companies = list({d.company for d in employee_details.values() if d.get("company")})

assigned_holiday_lists = get_assigned_holiday_lists_to_employee_and_company(
employees + companies, start_date, end_date
)

# gaps in employee-level assignments are filled with company-level assignments,
employee_hl_ranges = {}
for employee, details in employee_details.items():
employee_ranges = assigned_holiday_lists.get(employee, [])
company_ranges = assigned_holiday_lists.get(details.get("company"), [])
ranges = fill_employee_holiday_list_date_gaps_with_company_holiday_list(
employee_ranges, company_ranges, start_date, end_date
)
if ranges:
employee_hl_ranges[employee] = ranges

if not employee_hl_ranges:
return {}

# collect only the HL names employees are actually assigned to
used_hl_names = {r["holiday_list"] for ranges in employee_hl_ranges.values() for r in ranges}

holiday_map = frappe._dict()
Holiday = frappe.qb.DocType("Holiday")
holiday_rows = (
frappe.qb.from_(Holiday)
.select(Holiday.parent, Holiday.holiday_date, Holiday.weekly_off)
.where(Holiday.parent.isin(list(used_hl_names)))
.where(Holiday.holiday_date.between(start_date, end_date))
).run(as_dict=True)

holiday_condition = get_date_condition(Holiday.holiday_date, filters)
hl_holidays = {}
for h in holiday_rows:
hl_holidays.setdefault(h.parent, []).append(h)

# filter holidays to each employee's effective ranges
employee_holiday_map = {}
for employee, ranges in employee_hl_ranges.items():
holidays = [
h
for r in ranges
for h in hl_holidays.get(r["holiday_list"], [])
if r["from_date"] <= getdate(h.holiday_date) <= r["to_date"]
]
if holidays:
employee_holiday_map[employee] = holidays

for d in holiday_lists:
if not d:
continue
return employee_holiday_map

holidays = (
frappe.qb.from_(Holiday)
.select(Holiday.holiday_date, Holiday.weekly_off)
.where((Holiday.parent == d) & (holiday_condition))
).run(as_dict=True)
holiday_map.setdefault(d, holidays)

return holiday_map
def get_date_range_from_filters(filters: Filters) -> tuple:
"""Returns (start_date, end_date) as date objects from filters."""
if filters.filter_based_on == "Month":
total_days = get_total_days_in_month(filters)
start_date = getdate(f"{cstr(filters.year)}-{cstr(filters.month)}-01")
end_date = getdate(f"{cstr(filters.year)}-{cstr(filters.month)}-{total_days}")
return start_date, end_date
return getdate(filters.start_date), getdate(filters.end_date)


def get_rows(employee_details: dict, filters: Filters, holiday_map: dict, attendance_map: dict) -> list[dict]:
def get_rows(
employee_details: dict, filters: Filters, employee_holiday_map: dict, attendance_map: dict
) -> list[dict]:
records = []
default_holiday_list = frappe.get_cached_value("Company", filters.company, "default_holiday_list")

for employee, details in employee_details.items():
emp_holiday_list = details.holiday_list or default_holiday_list
holidays = holiday_map.get(emp_holiday_list)
holidays = employee_holiday_map.get(employee, [])

if filters.summarized_view:
attendance = get_attendance_status_for_summarized_view(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from erpnext.setup.doctype.employee.test_employee import make_employee

from hrms.hr.doctype.attendance.attendance import mark_attendance
from hrms.hr.doctype.holiday_list_assignment.test_holiday_list_assignment import assign_holiday_list
from hrms.hr.doctype.holiday_list_assignment.test_holiday_list_assignment import (
assign_holiday_list,
create_holiday_list_assignment,
)
from hrms.hr.doctype.leave_allocation.leave_allocation import OverlapError
from hrms.hr.doctype.leave_application.test_leave_application import make_allocation_record
from hrms.hr.doctype.shift_type.test_shift_type import setup_shift_type
Expand Down Expand Up @@ -625,6 +628,105 @@ def test_attendance_with_department_and_branch_filter_combined(self):
self.assertNotIn(emp_wrong_branch, employees_in_report)
self.assertNotIn(emp_wrong_dept, employees_in_report)

def test_multiple_holiday_list_assignments_in_detailed_view(self):
"""
Employee switches holiday lists mid-month.
Holidays from each list must appear only within their effective date range.

HL-1 active days 1-15: holiday on day 5 AND day 20 (day 20 should NOT appear)
HL-2 active days 16-end: holiday on day 10 AND day 25 (day 10 should NOT appear)
"""
previous_month_first = get_first_day_for_prev_month()
year_start = getdate(get_year_start(previous_month_first))
year_end = getdate(get_year_ending(previous_month_first))

hl1_day = previous_month_first.replace(day=5) # in HL-1's range
hl1_bleed = previous_month_first.replace(day=20) # HL-1 holiday outside its range
hl2_bleed = previous_month_first.replace(day=10) # HL-2 holiday before it becomes active
hl2_day = previous_month_first.replace(day=25) # in HL-2's range
hl2_start = previous_month_first.replace(day=16)

hl1 = make_holiday_list(
"Test Multi HL-1", from_date=year_start, to_date=year_end, add_weekly_offs=False
)
hl2 = make_holiday_list(
"Test Multi HL-2", from_date=year_start, to_date=year_end, add_weekly_offs=False
)

add_holiday_to_list(hl1, hl1_day)
add_holiday_to_list(hl1, hl1_bleed) # outside effective range — must not show
add_holiday_to_list(hl2, hl2_bleed) # before HL-2 becomes active — must not show
add_holiday_to_list(hl2, hl2_day)

frappe.db.delete("Holiday List Assignment", {"assigned_to": self.employee})
create_holiday_list_assignment("Employee", self.employee, hl1, from_date=previous_month_first)
create_holiday_list_assignment("Employee", self.employee, hl2, from_date=hl2_start)

mark_attendance(self.employee, previous_month_first, "Present")

filters = frappe._dict(
{
"month": previous_month_first.month,
"year": previous_month_first.year,
"company": self.company,
"filter_based_on": "Month",
}
)
report = execute(filters=filters)
row = report[1][0]

# holiday in HL-1's active window → must appear
self.assertEqual(row[date_key(hl1_day)], "H")
# holiday in HL-2's active window → must appear
self.assertEqual(row[date_key(hl2_day)], "H")
# HL-1 holiday after HL-2 takes over → must NOT appear
self.assertNotEqual(row[date_key(hl1_bleed)], "H")
# HL-2 holiday before HL-2 becomes active → must NOT appear
self.assertNotEqual(row[date_key(hl2_bleed)], "H")

def test_multiple_holiday_list_assignments_in_summarized_view(self):
"""
Total holiday count must combine holidays from all active holiday list assignments.
"""
previous_month_first = get_first_day_for_prev_month()
year_start = getdate(get_year_start(previous_month_first))
year_end = getdate(get_year_ending(previous_month_first))

hl1_day = previous_month_first.replace(day=5)
hl2_day = previous_month_first.replace(day=25)
hl2_start = previous_month_first.replace(day=16)

hl1 = make_holiday_list(
"Test Multi HL-1", from_date=year_start, to_date=year_end, add_weekly_offs=False
)
hl2 = make_holiday_list(
"Test Multi HL-2", from_date=year_start, to_date=year_end, add_weekly_offs=False
)

add_holiday_to_list(hl1, hl1_day)
add_holiday_to_list(hl2, hl2_day)

frappe.db.delete("Holiday List Assignment", {"assigned_to": self.employee})
create_holiday_list_assignment("Employee", self.employee, hl1, from_date=previous_month_first)
create_holiday_list_assignment("Employee", self.employee, hl2, from_date=hl2_start)

mark_attendance(self.employee, previous_month_first.replace(day=3), "Present")

filters = frappe._dict(
{
"month": previous_month_first.month,
"year": previous_month_first.year,
"company": self.company,
"filter_based_on": "Month",
"summarized_view": 1,
}
)
report = execute(filters=filters)
row = report[1][0]

# one holiday from each list → total must be 2
self.assertEqual(row["total_holidays"], 2)

def test_detailed_view_with_date_range_and_group_by_filter(self):
today = getdate()
mark_attendance(self.employee, today, "Absent", "Day Shift")
Expand Down Expand Up @@ -701,3 +803,9 @@ def create_branch(branch_name):
if not frappe.db.exists("Branch", branch_name):
frappe.get_doc({"doctype": "Branch", "branch": branch_name}).insert(ignore_permissions=True)
return branch_name


def add_holiday_to_list(holiday_list_name, holiday_date, description="Test Holiday"):
hl = frappe.get_doc("Holiday List", holiday_list_name)
hl.append("holidays", {"holiday_date": holiday_date, "description": description, "weekly_off": 0})
hl.save()
Loading
Loading