Compare commits

...

1 Commits

Author SHA1 Message Date
Mohammad Hussain Nagaria
282a30144a feat: Employee reminders (#25735)
* feat: Add reminders section to HR Settings

* refactor: Extract generic function for getting Employees

* feat: Employee Work Anniversary Reminder

* feat: Daily Holiday Reminder

* fix: Unnecessary params and replace [] with .get()

* test: Daily Holiday Reminders

* test: is_holiday basic tests

* refactor: Move employee reminders code to separate module

* feat: Add advance reminder to HR settings

* feat: Advance Holiday Reminders

* refactor: get_holidays_for_employee

* feat: Email holiday reminders in advance + tests

* fix: Remove unused import

* refactor: HR Setting Reminder Section

* refactor: Remove Daily Holiday Reminders feat

* feat: Reminder miss warning

* fix: Failing test and function name change

* chore: Add patch for field rename

* chore: Rename frequency label

* fix: Failing patch test

* fix: sider and removed description of fields

* fix: email alignment

Co-authored-by: pateljannat <pateljannat2308@gmail.com>
Co-authored-by: Jannat Patel <31363128+pateljannat@users.noreply.github.com>
(cherry picked from commit 24b2a31581)

# Conflicts:
#	erpnext/hooks.py
#	erpnext/hr/doctype/compensatory_leave_request/compensatory_leave_request.py
#	erpnext/hr/doctype/employee/employee.py
#	erpnext/hr/doctype/employee/test_employee.py
#	erpnext/hr/doctype/hr_settings/hr_settings.json
#	erpnext/hr/doctype/hr_settings/hr_settings.py
#	erpnext/hr/doctype/upload_attendance/upload_attendance.py
#	erpnext/hr/utils.py
#	erpnext/patches.txt
#	erpnext/payroll/doctype/employee_benefit_application/employee_benefit_application.py
#	erpnext/payroll/doctype/payroll_period/payroll_period.py
#	erpnext/payroll/doctype/salary_slip/salary_slip.py
#	erpnext/public/js/utils.js
#	erpnext/setup/doctype/employee/employee_reminders.py
#	erpnext/setup/doctype/employee/test_employee_reminders.py
2025-01-12 00:13:13 +00:00
18 changed files with 3921 additions and 0 deletions

View File

@@ -436,6 +436,11 @@ scheduler_events = {
"erpnext.crm.doctype.opportunity.opportunity.auto_close_opportunity",
"erpnext.controllers.accounts_controller.update_invoice_status",
"erpnext.accounts.doctype.fiscal_year.fiscal_year.auto_create_fiscal_year",
<<<<<<< HEAD
=======
"erpnext.hr.doctype.employee.employee_reminders.send_work_anniversary_reminders",
"erpnext.hr.doctype.employee.employee_reminders.send_birthday_reminders",
>>>>>>> 24b2a31581 (feat: Employee reminders (#25735))
"erpnext.projects.doctype.task.task.set_tasks_as_overdue",
"erpnext.stock.doctype.serial_no.serial_no.update_maintenance_status",
"erpnext.buying.doctype.supplier_scorecard.supplier_scorecard.refresh_scorecards",
@@ -466,6 +471,12 @@ scheduler_events = {
"erpnext.crm.utils.open_leads_opportunities_based_on_todays_event",
"erpnext.assets.doctype.asset.depreciation.post_depreciation_entries",
],
"weekly": [
"erpnext.hr.doctype.employee.employee_reminders.send_reminders_in_advance_weekly"
],
"monthly": [
"erpnext.hr.doctype.employee.employee_reminders.send_reminders_in_advance_monthly"
],
"monthly_long": [
"erpnext.accounts.deferred_revenue.process_deferred_accounting",
"erpnext.accounts.utils.auto_create_exchange_rate_revaluation_monthly",

View File

@@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe import _
from frappe.utils import date_diff, add_days, getdate, cint, format_date
from frappe.model.document import Document
from erpnext.hr.utils import validate_dates, validate_overlap, get_leave_period, validate_active_employee, \
create_additional_leave_ledger_entry, get_holiday_dates_for_employee
class CompensatoryLeaveRequest(Document):
def validate(self):
validate_active_employee(self.employee)
validate_dates(self, self.work_from_date, self.work_end_date)
if self.half_day:
if not self.half_day_date:
frappe.throw(_("Half Day Date is mandatory"))
if not getdate(self.work_from_date)<=getdate(self.half_day_date)<=getdate(self.work_end_date):
frappe.throw(_("Half Day Date should be in between Work From Date and Work End Date"))
validate_overlap(self, self.work_from_date, self.work_end_date)
self.validate_holidays()
self.validate_attendance()
if not self.leave_type:
frappe.throw(_("Leave Type is madatory"))
def validate_attendance(self):
attendance = frappe.get_all('Attendance',
filters={
'attendance_date': ['between', (self.work_from_date, self.work_end_date)],
'status': 'Present',
'docstatus': 1,
'employee': self.employee
}, fields=['attendance_date', 'status'])
if len(attendance) < date_diff(self.work_end_date, self.work_from_date) + 1:
frappe.throw(_("You are not present all day(s) between compensatory leave request days"))
def validate_holidays(self):
holidays = get_holiday_dates_for_employee(self.employee, self.work_from_date, self.work_end_date)
if len(holidays) < date_diff(self.work_end_date, self.work_from_date) + 1:
if date_diff(self.work_end_date, self.work_from_date):
msg = _("The days between {0} to {1} are not valid holidays.").format(frappe.bold(format_date(self.work_from_date)), frappe.bold(format_date(self.work_end_date)))
else:
msg = _("{0} is not a holiday.").format(frappe.bold(format_date(self.work_from_date)))
frappe.throw(msg)
def on_submit(self):
company = frappe.db.get_value("Employee", self.employee, "company")
date_difference = date_diff(self.work_end_date, self.work_from_date) + 1
if self.half_day:
date_difference -= 0.5
leave_period = get_leave_period(self.work_from_date, self.work_end_date, company)
if leave_period:
leave_allocation = self.get_existing_allocation_for_period(leave_period)
if leave_allocation:
leave_allocation.new_leaves_allocated += date_difference
leave_allocation.validate()
leave_allocation.db_set("new_leaves_allocated", leave_allocation.total_leaves_allocated)
leave_allocation.db_set("total_leaves_allocated", leave_allocation.total_leaves_allocated)
# generate additional ledger entry for the new compensatory leaves off
create_additional_leave_ledger_entry(leave_allocation, date_difference, add_days(self.work_end_date, 1))
else:
leave_allocation = self.create_leave_allocation(leave_period, date_difference)
self.db_set("leave_allocation", leave_allocation.name)
else:
frappe.throw(_("There is no leave period in between {0} and {1}").format(format_date(self.work_from_date), format_date(self.work_end_date)))
def on_cancel(self):
if self.leave_allocation:
date_difference = date_diff(self.work_end_date, self.work_from_date) + 1
if self.half_day:
date_difference -= 0.5
leave_allocation = frappe.get_doc("Leave Allocation", self.leave_allocation)
if leave_allocation:
leave_allocation.new_leaves_allocated -= date_difference
if leave_allocation.new_leaves_allocated - date_difference <= 0:
leave_allocation.new_leaves_allocated = 0
leave_allocation.validate()
leave_allocation.db_set("new_leaves_allocated", leave_allocation.total_leaves_allocated)
leave_allocation.db_set("total_leaves_allocated", leave_allocation.total_leaves_allocated)
# create reverse entry on cancelation
create_additional_leave_ledger_entry(leave_allocation, date_difference * -1, add_days(self.work_end_date, 1))
def get_existing_allocation_for_period(self, leave_period):
leave_allocation = frappe.db.sql("""
select name
from `tabLeave Allocation`
where employee=%(employee)s and leave_type=%(leave_type)s
and docstatus=1
and (from_date between %(from_date)s and %(to_date)s
or to_date between %(from_date)s and %(to_date)s
or (from_date < %(from_date)s and to_date > %(to_date)s))
""", {
"from_date": leave_period[0].from_date,
"to_date": leave_period[0].to_date,
"employee": self.employee,
"leave_type": self.leave_type
}, as_dict=1)
if leave_allocation:
return frappe.get_doc("Leave Allocation", leave_allocation[0].name)
else:
return False
def create_leave_allocation(self, leave_period, date_difference):
is_carry_forward = frappe.db.get_value("Leave Type", self.leave_type, "is_carry_forward")
allocation = frappe.get_doc(dict(
doctype="Leave Allocation",
employee=self.employee,
employee_name=self.employee_name,
leave_type=self.leave_type,
from_date=add_days(self.work_end_date, 1),
to_date=leave_period[0].to_date,
carry_forward=cint(is_carry_forward),
new_leaves_allocated=date_difference,
total_leaves_allocated=date_difference,
description=self.reason
))
allocation.insert(ignore_permissions=True)
allocation.submit()
return allocation

View File

@@ -0,0 +1,456 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
from frappe.utils import getdate, validate_email_address, today, add_years, cstr
from frappe.model.naming import set_name_by_naming_series
from frappe import throw, _, scrub
from frappe.permissions import add_user_permission, remove_user_permission, \
set_user_permission_if_allowed, has_permission, get_doc_permissions
from erpnext.utilities.transaction_base import delete_events
from frappe.utils.nestedset import NestedSet
class EmployeeUserDisabledError(frappe.ValidationError):
pass
class InactiveEmployeeStatusError(frappe.ValidationError):
pass
class Employee(NestedSet):
nsm_parent_field = 'reports_to'
def autoname(self):
naming_method = frappe.db.get_value("HR Settings", None, "emp_created_by")
if not naming_method:
throw(_("Please setup Employee Naming System in Human Resource > HR Settings"))
else:
if naming_method == 'Naming Series':
set_name_by_naming_series(self)
elif naming_method == 'Employee Number':
self.name = self.employee_number
elif naming_method == 'Full Name':
self.set_employee_name()
self.name = self.employee_name
self.employee = self.name
def validate(self):
from erpnext.controllers.status_updater import validate_status
validate_status(self.status, ["Active", "Inactive", "Suspended", "Left"])
self.employee = self.name
self.set_employee_name()
self.validate_date()
self.validate_email()
self.validate_status()
self.validate_reports_to()
self.validate_preferred_email()
if self.job_applicant:
self.validate_onboarding_process()
if self.user_id:
self.validate_user_details()
else:
existing_user_id = frappe.db.get_value("Employee", self.name, "user_id")
if existing_user_id:
remove_user_permission(
"Employee", self.name, existing_user_id)
def after_rename(self, old, new, merge):
self.db_set("employee", new)
def set_employee_name(self):
self.employee_name = ' '.join(filter(lambda x: x, [self.first_name, self.middle_name, self.last_name]))
def validate_user_details(self):
data = frappe.db.get_value('User',
self.user_id, ['enabled', 'user_image'], as_dict=1)
if data.get("user_image") and self.image == '':
self.image = data.get("user_image")
self.validate_for_enabled_user_id(data.get("enabled", 0))
self.validate_duplicate_user_id()
def update_nsm_model(self):
frappe.utils.nestedset.update_nsm(self)
def on_update(self):
self.update_nsm_model()
if self.user_id:
self.update_user()
self.update_user_permissions()
self.reset_employee_emails_cache()
self.update_approver_role()
def update_user_permissions(self):
if not self.create_user_permission: return
if not has_permission('User Permission', ptype='write', raise_exception=False): return
employee_user_permission_exists = frappe.db.exists('User Permission', {
'allow': 'Employee',
'for_value': self.name,
'user': self.user_id
})
if employee_user_permission_exists: return
employee_user_permission_exists = frappe.db.exists('User Permission', {
'allow': 'Employee',
'for_value': self.name,
'user': self.user_id
})
if employee_user_permission_exists: return
add_user_permission("Employee", self.name, self.user_id)
set_user_permission_if_allowed("Company", self.company, self.user_id)
def update_user(self):
# add employee role if missing
user = frappe.get_doc("User", self.user_id)
user.flags.ignore_permissions = True
if "Employee" not in user.get("roles"):
user.append_roles("Employee")
# copy details like Fullname, DOB and Image to User
if self.employee_name and not (user.first_name and user.last_name):
employee_name = self.employee_name.split(" ")
if len(employee_name) >= 3:
user.last_name = " ".join(employee_name[2:])
user.middle_name = employee_name[1]
elif len(employee_name) == 2:
user.last_name = employee_name[1]
user.first_name = employee_name[0]
if self.date_of_birth:
user.birth_date = self.date_of_birth
if self.gender:
user.gender = self.gender
if self.image:
if not user.user_image:
user.user_image = self.image
try:
frappe.get_doc({
"doctype": "File",
"file_url": self.image,
"attached_to_doctype": "User",
"attached_to_name": self.user_id
}).insert()
except frappe.DuplicateEntryError:
# already exists
pass
user.save()
def update_approver_role(self):
if self.leave_approver:
user = frappe.get_doc("User", self.leave_approver)
user.flags.ignore_permissions = True
user.add_roles("Leave Approver")
if self.expense_approver:
user = frappe.get_doc("User", self.expense_approver)
user.flags.ignore_permissions = True
user.add_roles("Expense Approver")
def validate_date(self):
if self.date_of_birth and getdate(self.date_of_birth) > getdate(today()):
throw(_("Date of Birth cannot be greater than today."))
if self.date_of_birth and self.date_of_joining and getdate(self.date_of_birth) >= getdate(self.date_of_joining):
throw(_("Date of Joining must be greater than Date of Birth"))
elif self.date_of_retirement and self.date_of_joining and (getdate(self.date_of_retirement) <= getdate(self.date_of_joining)):
throw(_("Date Of Retirement must be greater than Date of Joining"))
elif self.relieving_date and self.date_of_joining and (getdate(self.relieving_date) < getdate(self.date_of_joining)):
throw(_("Relieving Date must be greater than or equal to Date of Joining"))
elif self.contract_end_date and self.date_of_joining and (getdate(self.contract_end_date) <= getdate(self.date_of_joining)):
throw(_("Contract End Date must be greater than Date of Joining"))
def validate_email(self):
if self.company_email:
validate_email_address(self.company_email, True)
if self.personal_email:
validate_email_address(self.personal_email, True)
def set_preferred_email(self):
preferred_email_field = frappe.scrub(self.prefered_contact_email)
if preferred_email_field:
preferred_email = self.get(preferred_email_field)
self.prefered_email = preferred_email
def validate_status(self):
if self.status == 'Left':
reports_to = frappe.db.get_all('Employee',
filters={'reports_to': self.name, 'status': "Active"},
fields=['name','employee_name']
)
if reports_to:
link_to_employees = [frappe.utils.get_link_to_form('Employee', employee.name, label=employee.employee_name) for employee in reports_to]
message = _("The following employees are currently still reporting to {0}:").format(frappe.bold(self.employee_name))
message += "<br><br><ul><li>" + "</li><li>".join(link_to_employees)
message += "</li></ul><br>"
message += _("Please make sure the employees above report to another Active employee.")
throw(message, InactiveEmployeeStatusError, _("Cannot Relieve Employee"))
if not self.relieving_date:
throw(_("Please enter relieving date."))
def validate_for_enabled_user_id(self, enabled):
if not self.status == 'Active':
return
if enabled is None:
frappe.throw(_("User {0} does not exist").format(self.user_id))
if enabled == 0:
frappe.throw(_("User {0} is disabled").format(self.user_id), EmployeeUserDisabledError)
def validate_duplicate_user_id(self):
employee = frappe.db.sql_list("""select name from `tabEmployee` where
user_id=%s and status='Active' and name!=%s""", (self.user_id, self.name))
if employee:
throw(_("User {0} is already assigned to Employee {1}").format(
self.user_id, employee[0]), frappe.DuplicateEntryError)
def validate_reports_to(self):
if self.reports_to == self.name:
throw(_("Employee cannot report to himself."))
def on_trash(self):
self.update_nsm_model()
delete_events(self.doctype, self.name)
if frappe.db.exists("Employee Transfer", {'new_employee_id': self.name, 'docstatus': 1}):
emp_transfer = frappe.get_doc("Employee Transfer", {'new_employee_id': self.name, 'docstatus': 1})
emp_transfer.db_set("new_employee_id", '')
def validate_preferred_email(self):
if self.prefered_contact_email and not self.get(scrub(self.prefered_contact_email)):
frappe.msgprint(_("Please enter {0}").format(self.prefered_contact_email))
def validate_onboarding_process(self):
employee_onboarding = frappe.get_all("Employee Onboarding",
filters={"job_applicant": self.job_applicant, "docstatus": 1, "boarding_status": ("!=", "Completed")})
if employee_onboarding:
doc = frappe.get_doc("Employee Onboarding", employee_onboarding[0].name)
doc.validate_employee_creation()
doc.db_set("employee", self.name)
def reset_employee_emails_cache(self):
prev_doc = self.get_doc_before_save() or {}
cell_number = cstr(self.get('cell_number'))
prev_number = cstr(prev_doc.get('cell_number'))
if (cell_number != prev_number or
self.get('user_id') != prev_doc.get('user_id')):
frappe.cache().hdel('employees_with_number', cell_number)
frappe.cache().hdel('employees_with_number', prev_number)
def get_timeline_data(doctype, name):
'''Return timeline for attendance'''
return dict(frappe.db.sql('''select unix_timestamp(attendance_date), count(*)
from `tabAttendance` where employee=%s
and attendance_date > date_sub(curdate(), interval 1 year)
and status in ('Present', 'Half Day')
group by attendance_date''', name))
@frappe.whitelist()
def get_retirement_date(date_of_birth=None):
ret = {}
if date_of_birth:
try:
retirement_age = int(frappe.db.get_single_value("HR Settings", "retirement_age") or 60)
dt = add_years(getdate(date_of_birth),retirement_age)
ret = {'date_of_retirement': dt.strftime('%Y-%m-%d')}
except ValueError:
# invalid date
ret = {}
return ret
def validate_employee_role(doc, method):
# called via User hook
if "Employee" in [d.role for d in doc.get("roles")]:
if not frappe.db.get_value("Employee", {"user_id": doc.name}):
frappe.msgprint(_("Please set User ID field in an Employee record to set Employee Role"))
doc.get("roles").remove(doc.get("roles", {"role": "Employee"})[0])
def update_user_permissions(doc, method):
# called via User hook
if "Employee" in [d.role for d in doc.get("roles")]:
if not has_permission('User Permission', ptype='write', raise_exception=False): return
employee = frappe.get_doc("Employee", {"user_id": doc.name})
employee.update_user_permissions()
def get_employee_email(employee_doc):
return employee_doc.get("user_id") or employee_doc.get("personal_email") or employee_doc.get("company_email")
def get_holiday_list_for_employee(employee, raise_exception=True):
if employee:
holiday_list, company = frappe.db.get_value("Employee", employee, ["holiday_list", "company"])
else:
holiday_list=''
company=frappe.db.get_value("Global Defaults", None, "default_company")
if not holiday_list:
holiday_list = frappe.get_cached_value('Company', company, "default_holiday_list")
if not holiday_list and raise_exception:
frappe.throw(_('Please set a default Holiday List for Employee {0} or Company {1}').format(employee, company))
return holiday_list
def is_holiday(employee, date=None, raise_exception=True, only_non_weekly=False, with_description=False):
'''
Returns True if given Employee has an holiday on the given date
:param employee: Employee `name`
:param date: Date to check. Will check for today if None
:param raise_exception: Raise an exception if no holiday list found, default is True
:param only_non_weekly: Check only non-weekly holidays, default is False
'''
holiday_list = get_holiday_list_for_employee(employee, raise_exception)
if not date:
date = today()
if not holiday_list:
return False
filters = {
'parent': holiday_list,
'holiday_date': date
}
if only_non_weekly:
filters['weekly_off'] = False
holidays = frappe.get_all(
'Holiday',
fields=['description'],
filters=filters,
pluck='description'
)
if with_description:
return len(holidays) > 0, holidays
return len(holidays) > 0
@frappe.whitelist()
def deactivate_sales_person(status = None, employee = None):
if status == "Left":
sales_person = frappe.db.get_value("Sales Person", {"Employee": employee})
if sales_person:
frappe.db.set_value("Sales Person", sales_person, "enabled", 0)
@frappe.whitelist()
def create_user(employee, user = None, email=None):
emp = frappe.get_doc("Employee", employee)
employee_name = emp.employee_name.split(" ")
middle_name = last_name = ""
if len(employee_name) >= 3:
last_name = " ".join(employee_name[2:])
middle_name = employee_name[1]
elif len(employee_name) == 2:
last_name = employee_name[1]
first_name = employee_name[0]
if email:
emp.prefered_email = email
user = frappe.new_doc("User")
user.update({
"name": emp.employee_name,
"email": emp.prefered_email,
"enabled": 1,
"first_name": first_name,
"middle_name": middle_name,
"last_name": last_name,
"gender": emp.gender,
"birth_date": emp.date_of_birth,
"phone": emp.cell_number,
"bio": emp.bio
})
user.insert()
return user.name
def get_all_employee_emails(company):
'''Returns list of employee emails either based on user_id or company_email'''
employee_list = frappe.get_all('Employee',
fields=['name','employee_name'],
filters={
'status': 'Active',
'company': company
}
)
employee_emails = []
for employee in employee_list:
if not employee:
continue
user, company_email, personal_email = frappe.db.get_value('Employee',
employee, ['user_id', 'company_email', 'personal_email'])
email = user or company_email or personal_email
if email:
employee_emails.append(email)
return employee_emails
def get_employee_emails(employee_list):
'''Returns list of employee emails either based on user_id or company_email'''
employee_emails = []
for employee in employee_list:
if not employee:
continue
user, company_email, personal_email = frappe.db.get_value('Employee', employee,
['user_id', 'company_email', 'personal_email'])
email = user or company_email or personal_email
if email:
employee_emails.append(email)
return employee_emails
@frappe.whitelist()
def get_children(doctype, parent=None, company=None, is_root=False, is_tree=False):
filters = [['status', '=', 'Active']]
if company and company != 'All Companies':
filters.append(['company', '=', company])
fields = ['name as value', 'employee_name as title']
if is_root:
parent = ''
if parent and company and parent!=company:
filters.append(['reports_to', '=', parent])
else:
filters.append(['reports_to', '=', ''])
employees = frappe.get_list(doctype, fields=fields,
filters=filters, order_by='name')
for employee in employees:
is_expandable = frappe.get_all(doctype, filters=[
['reports_to', '=', employee.get('value')]
])
employee.expandable = 1 if is_expandable else 0
return employees
def on_doctype_update():
frappe.db.add_index("Employee", ["lft", "rgt"])
def has_user_permission_for_employee(user_name, employee_name):
return frappe.db.exists({
'doctype': 'User Permission',
'user': user_name,
'allow': 'Employee',
'for_value': employee_name
})
def has_upload_permission(doc, ptype='read', user=None):
if not user:
user = frappe.session.user
if get_doc_permissions(doc, user=user, ptype=ptype).get(ptype):
return True
return doc.user_id == user

View File

@@ -0,0 +1,82 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
import erpnext
import unittest
import frappe.utils
from erpnext.hr.doctype.employee.employee import InactiveEmployeeStatusError
test_records = frappe.get_test_records('Employee')
class TestEmployee(unittest.TestCase):
def test_employee_status_left(self):
employee1 = make_employee("test_employee_1@company.com")
employee2 = make_employee("test_employee_2@company.com")
employee1_doc = frappe.get_doc("Employee", employee1)
employee2_doc = frappe.get_doc("Employee", employee2)
employee2_doc.reload()
employee2_doc.reports_to = employee1_doc.name
employee2_doc.save()
employee1_doc.reload()
employee1_doc.status = 'Left'
self.assertRaises(InactiveEmployeeStatusError, employee1_doc.save)
def test_employee_status_inactive(self):
from erpnext.payroll.doctype.salary_structure.test_salary_structure import make_salary_structure
from erpnext.payroll.doctype.salary_structure.salary_structure import make_salary_slip
from erpnext.payroll.doctype.salary_slip.test_salary_slip import make_holiday_list
employee = make_employee("test_employee_status@company.com")
employee_doc = frappe.get_doc("Employee", employee)
employee_doc.status = "Inactive"
employee_doc.save()
employee_doc.reload()
make_holiday_list()
frappe.db.set_value("Company", erpnext.get_default_company(), "default_holiday_list", "Salary Slip Test Holiday List")
frappe.db.sql("""delete from `tabSalary Structure` where name='Test Inactive Employee Salary Slip'""")
salary_structure = make_salary_structure("Test Inactive Employee Salary Slip", "Monthly",
employee=employee_doc.name, company=employee_doc.company)
salary_slip = make_salary_slip(salary_structure.name, employee=employee_doc.name)
self.assertRaises(InactiveEmployeeStatusError, salary_slip.save)
def tearDown(self):
frappe.db.rollback()
def make_employee(user, company=None, **kwargs):
if not frappe.db.get_value("User", user):
frappe.get_doc({
"doctype": "User",
"email": user,
"first_name": user,
"new_password": "password",
"roles": [{"doctype": "Has Role", "role": "Employee"}]
}).insert()
if not frappe.db.get_value("Employee", {"user_id": user}):
employee = frappe.get_doc({
"doctype": "Employee",
"naming_series": "EMP-",
"first_name": user,
"company": company or erpnext.get_default_company(),
"user_id": user,
"date_of_birth": "1990-05-08",
"date_of_joining": "2013-01-01",
"department": frappe.get_all("Department", fields="name")[0].name,
"gender": "Female",
"company_email": user,
"prefered_contact_email": "Company Email",
"prefered_email": user,
"status": "Active",
"employment_type": "Intern"
})
if kwargs:
employee.update(kwargs)
employee.insert()
return employee.name
else:
frappe.db.set_value("Employee", {"employee_name":user}, "status", "Active")
return frappe.get_value("Employee", {"employee_name":user}, "name")

View File

@@ -0,0 +1,209 @@
{
"actions": [],
"creation": "2013-08-02 13:45:23",
"doctype": "DocType",
"document_type": "Other",
"editable_grid": 1,
"engine": "InnoDB",
"field_order": [
"employee_settings",
"retirement_age",
"emp_created_by",
"column_break_4",
"standard_working_hours",
"expense_approver_mandatory_in_expense_claim",
"reminders_section",
"send_birthday_reminders",
"column_break_9",
"send_work_anniversary_reminders",
"column_break_11",
"send_holiday_reminders",
"frequency",
"leave_settings",
"send_leave_notification",
"leave_approval_notification_template",
"leave_status_notification_template",
"role_allowed_to_create_backdated_leave_application",
"column_break_18",
"leave_approver_mandatory_in_leave_application",
"show_leaves_of_all_department_members_in_calendar",
"auto_leave_encashment",
"restrict_backdated_leave_application",
"hiring_settings",
"check_vacancies"
],
"fields": [
{
"fieldname": "employee_settings",
"fieldtype": "Section Break",
"label": "Employee Settings"
},
{
"description": "Enter retirement age in years",
"fieldname": "retirement_age",
"fieldtype": "Data",
"label": "Retirement Age"
},
{
"default": "Naming Series",
"description": "Employee records are created using the selected field",
"fieldname": "emp_created_by",
"fieldtype": "Select",
"label": "Employee Records to be created by",
"options": "Naming Series\nEmployee Number\nFull Name"
},
{
"fieldname": "column_break_4",
"fieldtype": "Column Break"
},
{
"default": "1",
"fieldname": "expense_approver_mandatory_in_expense_claim",
"fieldtype": "Check",
"label": "Expense Approver Mandatory In Expense Claim"
},
{
"collapsible": 1,
"fieldname": "leave_settings",
"fieldtype": "Section Break",
"label": "Leave Settings"
},
{
"depends_on": "eval: doc.send_leave_notification == 1",
"fieldname": "leave_approval_notification_template",
"fieldtype": "Link",
"label": "Leave Approval Notification Template",
"mandatory_depends_on": "eval: doc.send_leave_notification == 1",
"options": "Email Template"
},
{
"depends_on": "eval: doc.send_leave_notification == 1",
"fieldname": "leave_status_notification_template",
"fieldtype": "Link",
"label": "Leave Status Notification Template",
"mandatory_depends_on": "eval: doc.send_leave_notification == 1",
"options": "Email Template"
},
{
"fieldname": "column_break_18",
"fieldtype": "Column Break"
},
{
"default": "1",
"fieldname": "leave_approver_mandatory_in_leave_application",
"fieldtype": "Check",
"label": "Leave Approver Mandatory In Leave Application"
},
{
"default": "0",
"fieldname": "show_leaves_of_all_department_members_in_calendar",
"fieldtype": "Check",
"label": "Show Leaves Of All Department Members In Calendar"
},
{
"collapsible": 1,
"fieldname": "hiring_settings",
"fieldtype": "Section Break",
"label": "Hiring Settings"
},
{
"default": "0",
"fieldname": "check_vacancies",
"fieldtype": "Check",
"label": "Check Vacancies On Job Offer Creation"
},
{
"default": "0",
"fieldname": "auto_leave_encashment",
"fieldtype": "Check",
"label": "Auto Leave Encashment"
},
{
"default": "0",
"fieldname": "restrict_backdated_leave_application",
"fieldtype": "Check",
"label": "Restrict Backdated Leave Application"
},
{
"depends_on": "eval:doc.restrict_backdated_leave_application == 1",
"fieldname": "role_allowed_to_create_backdated_leave_application",
"fieldtype": "Link",
"label": "Role Allowed to Create Backdated Leave Application",
"options": "Role"
},
{
"default": "1",
"fieldname": "send_leave_notification",
"fieldtype": "Check",
"label": "Send Leave Notification"
},
{
"fieldname": "standard_working_hours",
"fieldtype": "Int",
"label": "Standard Working Hours"
},
{
"collapsible": 1,
"fieldname": "reminders_section",
"fieldtype": "Section Break",
"label": "Reminders"
},
{
"default": "1",
"fieldname": "send_holiday_reminders",
"fieldtype": "Check",
"label": "Holidays"
},
{
"default": "1",
"fieldname": "send_work_anniversary_reminders",
"fieldtype": "Check",
"label": "Work Anniversaries "
},
{
"default": "Weekly",
"depends_on": "eval:doc.send_holiday_reminders",
"fieldname": "frequency",
"fieldtype": "Select",
"label": "Set the frequency for holiday reminders",
"options": "Weekly\nMonthly"
},
{
"default": "1",
"fieldname": "send_birthday_reminders",
"fieldtype": "Check",
"label": "Birthdays"
},
{
"fieldname": "column_break_9",
"fieldtype": "Column Break"
},
{
"fieldname": "column_break_11",
"fieldtype": "Column Break"
}
],
"icon": "fa fa-cog",
"idx": 1,
"issingle": 1,
"links": [],
"modified": "2021-08-24 14:54:12.834162",
"modified_by": "Administrator",
"module": "HR",
"name": "HR Settings",
"owner": "Administrator",
"permissions": [
{
"create": 1,
"email": 1,
"print": 1,
"read": 1,
"role": "System Manager",
"share": 1,
"write": 1
}
],
"sort_field": "modified",
"sort_order": "ASC",
"track_changes": 1
}

View File

@@ -0,0 +1,79 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
# For license information, please see license.txt
import frappe
from frappe.model.document import Document
from frappe.utils import format_date
# Wether to proceed with frequency change
PROCEED_WITH_FREQUENCY_CHANGE = False
class HRSettings(Document):
def validate(self):
self.set_naming_series()
# Based on proceed flag
global PROCEED_WITH_FREQUENCY_CHANGE
if not PROCEED_WITH_FREQUENCY_CHANGE:
self.validate_frequency_change()
PROCEED_WITH_FREQUENCY_CHANGE = False
def set_naming_series(self):
from erpnext.setup.doctype.naming_series.naming_series import set_by_naming_series
set_by_naming_series("Employee", "employee_number",
self.get("emp_created_by")=="Naming Series", hide_name_field=True)
def validate_frequency_change(self):
weekly_job, monthly_job = None, None
try:
weekly_job = frappe.get_doc(
'Scheduled Job Type',
'employee_reminders.send_reminders_in_advance_weekly'
)
monthly_job = frappe.get_doc(
'Scheduled Job Type',
'employee_reminders.send_reminders_in_advance_monthly'
)
except frappe.DoesNotExistError:
return
next_weekly_trigger = weekly_job.get_next_execution()
next_monthly_trigger = monthly_job.get_next_execution()
if self.freq_changed_from_monthly_to_weekly():
if next_monthly_trigger < next_weekly_trigger:
self.show_freq_change_warning(next_monthly_trigger, next_weekly_trigger)
elif self.freq_changed_from_weekly_to_monthly():
if next_monthly_trigger > next_weekly_trigger:
self.show_freq_change_warning(next_weekly_trigger, next_monthly_trigger)
def freq_changed_from_weekly_to_monthly(self):
return self.has_value_changed("frequency") and self.frequency == "Monthly"
def freq_changed_from_monthly_to_weekly(self):
return self.has_value_changed("frequency") and self.frequency == "Weekly"
def show_freq_change_warning(self, from_date, to_date):
from_date = frappe.bold(format_date(from_date))
to_date = frappe.bold(format_date(to_date))
frappe.msgprint(
msg=frappe._('Employees will miss holiday reminders from {} until {}. <br> Do you want to proceed with this change?').format(from_date, to_date),
title='Confirm change in Frequency',
primary_action={
'label': frappe._('Yes, Proceed'),
'client_action': 'erpnext.proceed_save_with_reminders_frequency_change'
},
raise_exception=frappe.ValidationError
)
@frappe.whitelist()
def set_proceed_with_frequency_change():
'''Enables proceed with frequency change'''
global PROCEED_WITH_FREQUENCY_CHANGE
PROCEED_WITH_FREQUENCY_CHANGE = True

View File

@@ -0,0 +1,204 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe.utils import cstr, add_days, date_diff, getdate
from frappe import _
from frappe.utils.csvutils import UnicodeWriter
from frappe.model.document import Document
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee
from erpnext.hr.utils import get_holiday_dates_for_employee
class UploadAttendance(Document):
pass
@frappe.whitelist()
def get_template():
if not frappe.has_permission("Attendance", "create"):
raise frappe.PermissionError
args = frappe.local.form_dict
if getdate(args.from_date) > getdate(args.to_date):
frappe.throw(_("To Date should be greater than From Date"))
w = UnicodeWriter()
w = add_header(w)
try:
w = add_data(w, args)
except Exception as e:
frappe.clear_messages()
frappe.respond_as_web_page("Holiday List Missing", html=e)
return
# write out response as a type csv
frappe.response['result'] = cstr(w.getvalue())
frappe.response['type'] = 'csv'
frappe.response['doctype'] = "Attendance"
def add_header(w):
status = ", ".join((frappe.get_meta("Attendance").get_field("status").options or "").strip().split("\n"))
w.writerow(["Notes:"])
w.writerow(["Please do not change the template headings"])
w.writerow(["Status should be one of these values: " + status])
w.writerow(["If you are overwriting existing attendance records, 'ID' column mandatory"])
w.writerow(["ID", "Employee", "Employee Name", "Date", "Status", "Leave Type",
"Company", "Naming Series"])
return w
def add_data(w, args):
data = get_data(args)
writedata(w, data)
return w
def get_data(args):
dates = get_dates(args)
employees = get_active_employees()
holidays = get_holidays_for_employees([employee.name for employee in employees], args["from_date"], args["to_date"])
existing_attendance_records = get_existing_attendance_records(args)
data = []
for date in dates:
for employee in employees:
if getdate(date) < getdate(employee.date_of_joining):
continue
if employee.relieving_date:
if getdate(date) > getdate(employee.relieving_date):
continue
existing_attendance = {}
if existing_attendance_records \
and tuple([getdate(date), employee.name]) in existing_attendance_records \
and getdate(employee.date_of_joining) <= getdate(date) \
and getdate(employee.relieving_date) >= getdate(date):
existing_attendance = existing_attendance_records[tuple([getdate(date), employee.name])]
employee_holiday_list = get_holiday_list_for_employee(employee.name)
row = [
existing_attendance and existing_attendance.name or "",
employee.name, employee.employee_name, date,
existing_attendance and existing_attendance.status or "",
existing_attendance and existing_attendance.leave_type or "", employee.company,
existing_attendance and existing_attendance.naming_series or get_naming_series(),
]
if date in holidays[employee_holiday_list]:
row[4] = "Holiday"
data.append(row)
return data
def get_holidays_for_employees(employees, from_date, to_date):
holidays = {}
for employee in employees:
holiday_list = get_holiday_list_for_employee(employee)
holiday = get_holiday_dates_for_employee(employee, getdate(from_date), getdate(to_date))
if holiday_list not in holidays:
holidays[holiday_list] = holiday
return holidays
def writedata(w, data):
for row in data:
w.writerow(row)
def get_dates(args):
"""get list of dates in between from date and to date"""
no_of_days = date_diff(add_days(args["to_date"], 1), args["from_date"])
dates = [add_days(args["from_date"], i) for i in range(0, no_of_days)]
return dates
def get_active_employees():
employees = frappe.db.get_all('Employee',
fields=['name', 'employee_name', 'date_of_joining', 'company', 'relieving_date'],
filters={
'docstatus': ['<', 2],
'status': 'Active'
}
)
return employees
def get_existing_attendance_records(args):
attendance = frappe.db.sql("""select name, attendance_date, employee, status, leave_type, naming_series
from `tabAttendance` where attendance_date between %s and %s and docstatus < 2""",
(args["from_date"], args["to_date"]), as_dict=1)
existing_attendance = {}
for att in attendance:
existing_attendance[tuple([att.attendance_date, att.employee])] = att
return existing_attendance
def get_naming_series():
series = frappe.get_meta("Attendance").get_field("naming_series").options.strip().split("\n")
if not series:
frappe.throw(_("Please setup numbering series for Attendance via Setup > Numbering Series"))
return series[0]
@frappe.whitelist()
def upload():
if not frappe.has_permission("Attendance", "create"):
raise frappe.PermissionError
from frappe.utils.csvutils import read_csv_content
rows = read_csv_content(frappe.local.uploaded_file)
if not rows:
frappe.throw(_("Please select a csv file"))
frappe.enqueue(import_attendances, rows=rows, now=True if len(rows) < 200 else False)
def import_attendances(rows):
def remove_holidays(rows):
rows = [ row for row in rows if row[4] != "Holiday"]
return rows
from frappe.modules import scrub
rows = list(filter(lambda x: x and any(x), rows))
columns = [scrub(f) for f in rows[4]]
columns[0] = "name"
columns[3] = "attendance_date"
rows = rows[5:]
ret = []
error = False
rows = remove_holidays(rows)
from frappe.utils.csvutils import check_record, import_doc
for i, row in enumerate(rows):
if not row: continue
row_idx = i + 5
d = frappe._dict(zip(columns, row))
d["doctype"] = "Attendance"
if d.name:
d["docstatus"] = frappe.db.get_value("Attendance", d.name, "docstatus")
try:
check_record(d)
ret.append(import_doc(d, "Attendance", 1, row_idx, submit=True))
frappe.publish_realtime('import_attendance', dict(
progress=i,
total=len(rows)
))
except AttributeError:
pass
except Exception as e:
error = True
ret.append('Error for row (#%d) %s : %s' % (row_idx,
len(row)>1 and row[1] or "", cstr(e)))
frappe.errprint(frappe.get_traceback())
if error:
frappe.db.rollback()
else:
frappe.db.commit()
frappe.publish_realtime('import_attendance', dict(
messages=ret,
error=error
))

552
erpnext/hr/utils.py Normal file
View File

@@ -0,0 +1,552 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import erpnext
import frappe
from erpnext.hr.doctype.employee.employee import get_holiday_list_for_employee, InactiveEmployeeStatusError
from frappe import _
from frappe.desk.form import assign_to
from frappe.model.document import Document
from frappe.utils import (add_days, cstr, flt, format_datetime, formatdate,
get_datetime, getdate, nowdate, today, unique, get_link_to_form)
class DuplicateDeclarationError(frappe.ValidationError): pass
class EmployeeBoardingController(Document):
'''
Create the project and the task for the boarding process
Assign to the concerned person and roles as per the onboarding/separation template
'''
def validate(self):
validate_active_employee(self.employee)
# remove the task if linked before submitting the form
if self.amended_from:
for activity in self.activities:
activity.task = ''
def on_submit(self):
# create the project for the given employee onboarding
project_name = _(self.doctype) + " : "
if self.doctype == "Employee Onboarding":
project_name += self.job_applicant
else:
project_name += self.employee
project = frappe.get_doc({
"doctype": "Project",
"project_name": project_name,
"expected_start_date": self.date_of_joining if self.doctype == "Employee Onboarding" else self.resignation_letter_date,
"department": self.department,
"company": self.company
}).insert(ignore_permissions=True, ignore_mandatory=True)
self.db_set("project", project.name)
self.db_set("boarding_status", "Pending")
self.reload()
self.create_task_and_notify_user()
def create_task_and_notify_user(self):
# create the task for the given project and assign to the concerned person
for activity in self.activities:
if activity.task:
continue
task = frappe.get_doc({
"doctype": "Task",
"project": self.project,
"subject": activity.activity_name + " : " + self.employee_name,
"description": activity.description,
"department": self.department,
"company": self.company,
"task_weight": activity.task_weight
}).insert(ignore_permissions=True)
activity.db_set("task", task.name)
users = [activity.user] if activity.user else []
if activity.role:
user_list = frappe.db.sql_list('''
SELECT
DISTINCT(has_role.parent)
FROM
`tabHas Role` has_role
LEFT JOIN `tabUser` user
ON has_role.parent = user.name
WHERE
has_role.parenttype = 'User'
AND user.enabled = 1
AND has_role.role = %s
''', activity.role)
users = unique(users + user_list)
if "Administrator" in users:
users.remove("Administrator")
# assign the task the users
if users:
self.assign_task_to_users(task, users)
def assign_task_to_users(self, task, users):
for user in users:
args = {
'assign_to': [user],
'doctype': task.doctype,
'name': task.name,
'description': task.description or task.subject,
'notify': self.notify_users_by_email
}
assign_to.add(args)
def on_cancel(self):
# delete task project
for task in frappe.get_all("Task", filters={"project": self.project}):
frappe.delete_doc("Task", task.name, force=1)
frappe.delete_doc("Project", self.project, force=1)
self.db_set('project', '')
for activity in self.activities:
activity.db_set("task", "")
@frappe.whitelist()
def get_onboarding_details(parent, parenttype):
return frappe.get_all("Employee Boarding Activity",
fields=["activity_name", "role", "user", "required_for_employee_creation", "description", "task_weight"],
filters={"parent": parent, "parenttype": parenttype},
order_by= "idx")
@frappe.whitelist()
def get_boarding_status(project):
status = 'Pending'
if project:
doc = frappe.get_doc('Project', project)
if flt(doc.percent_complete) > 0.0 and flt(doc.percent_complete) < 100.0:
status = 'In Process'
elif flt(doc.percent_complete) == 100.0:
status = 'Completed'
return status
def set_employee_name(doc):
if doc.employee and not doc.employee_name:
doc.employee_name = frappe.db.get_value("Employee", doc.employee, "employee_name")
def update_employee(employee, details, date=None, cancel=False):
internal_work_history = {}
for item in details:
fieldtype = frappe.get_meta("Employee").get_field(item.fieldname).fieldtype
new_data = item.new if not cancel else item.current
if fieldtype == "Date" and new_data:
new_data = getdate(new_data)
elif fieldtype =="Datetime" and new_data:
new_data = get_datetime(new_data)
setattr(employee, item.fieldname, new_data)
if item.fieldname in ["department", "designation", "branch"]:
internal_work_history[item.fieldname] = item.new
if internal_work_history and not cancel:
internal_work_history["from_date"] = date
employee.append("internal_work_history", internal_work_history)
return employee
@frappe.whitelist()
def get_employee_fields_label():
fields = []
for df in frappe.get_meta("Employee").get("fields"):
if df.fieldname in ["salutation", "user_id", "employee_number", "employment_type",
"holiday_list", "branch", "department", "designation", "grade",
"notice_number_of_days", "reports_to", "leave_policy", "company_email"]:
fields.append({"value": df.fieldname, "label": df.label})
return fields
@frappe.whitelist()
def get_employee_field_property(employee, fieldname):
if employee and fieldname:
field = frappe.get_meta("Employee").get_field(fieldname)
value = frappe.db.get_value("Employee", employee, fieldname)
options = field.options
if field.fieldtype == "Date":
value = formatdate(value)
elif field.fieldtype == "Datetime":
value = format_datetime(value)
return {
"value" : value,
"datatype" : field.fieldtype,
"label" : field.label,
"options" : options
}
else:
return False
def validate_dates(doc, from_date, to_date):
date_of_joining, relieving_date = frappe.db.get_value("Employee", doc.employee, ["date_of_joining", "relieving_date"])
if getdate(from_date) > getdate(to_date):
frappe.throw(_("To date can not be less than from date"))
elif getdate(from_date) > getdate(nowdate()):
frappe.throw(_("Future dates not allowed"))
elif date_of_joining and getdate(from_date) < getdate(date_of_joining):
frappe.throw(_("From date can not be less than employee's joining date"))
elif relieving_date and getdate(to_date) > getdate(relieving_date):
frappe.throw(_("To date can not greater than employee's relieving date"))
def validate_overlap(doc, from_date, to_date, company = None):
query = """
select name
from `tab{0}`
where name != %(name)s
"""
query += get_doc_condition(doc.doctype)
if not doc.name:
# hack! if name is null, it could cause problems with !=
doc.name = "New "+doc.doctype
overlap_doc = frappe.db.sql(query.format(doc.doctype),{
"employee": doc.get("employee"),
"from_date": from_date,
"to_date": to_date,
"name": doc.name,
"company": company
}, as_dict = 1)
if overlap_doc:
if doc.get("employee"):
exists_for = doc.employee
if company:
exists_for = company
throw_overlap_error(doc, exists_for, overlap_doc[0].name, from_date, to_date)
def get_doc_condition(doctype):
if doctype == "Compensatory Leave Request":
return "and employee = %(employee)s and docstatus < 2 \
and (work_from_date between %(from_date)s and %(to_date)s \
or work_end_date between %(from_date)s and %(to_date)s \
or (work_from_date < %(from_date)s and work_end_date > %(to_date)s))"
elif doctype == "Leave Period":
return "and company = %(company)s and (from_date between %(from_date)s and %(to_date)s \
or to_date between %(from_date)s and %(to_date)s \
or (from_date < %(from_date)s and to_date > %(to_date)s))"
def throw_overlap_error(doc, exists_for, overlap_doc, from_date, to_date):
msg = _("A {0} exists between {1} and {2} (").format(doc.doctype,
formatdate(from_date), formatdate(to_date)) \
+ """ <b><a href="/app/Form/{0}/{1}">{1}</a></b>""".format(doc.doctype, overlap_doc) \
+ _(") for {0}").format(exists_for)
frappe.throw(msg)
def validate_duplicate_exemption_for_payroll_period(doctype, docname, payroll_period, employee):
existing_record = frappe.db.exists(doctype, {
"payroll_period": payroll_period,
"employee": employee,
'docstatus': ['<', 2],
'name': ['!=', docname]
})
if existing_record:
frappe.throw(_("{0} already exists for employee {1} and period {2}")
.format(doctype, employee, payroll_period), DuplicateDeclarationError)
def validate_tax_declaration(declarations):
subcategories = []
for d in declarations:
if d.exemption_sub_category in subcategories:
frappe.throw(_("More than one selection for {0} not allowed").format(d.exemption_sub_category))
subcategories.append(d.exemption_sub_category)
def get_total_exemption_amount(declarations):
exemptions = frappe._dict()
for d in declarations:
exemptions.setdefault(d.exemption_category, frappe._dict())
category_max_amount = exemptions.get(d.exemption_category).max_amount
if not category_max_amount:
category_max_amount = frappe.db.get_value("Employee Tax Exemption Category", d.exemption_category, "max_amount")
exemptions.get(d.exemption_category).max_amount = category_max_amount
sub_category_exemption_amount = d.max_amount \
if (d.max_amount and flt(d.amount) > flt(d.max_amount)) else d.amount
exemptions.get(d.exemption_category).setdefault("total_exemption_amount", 0.0)
exemptions.get(d.exemption_category).total_exemption_amount += flt(sub_category_exemption_amount)
if category_max_amount and exemptions.get(d.exemption_category).total_exemption_amount > category_max_amount:
exemptions.get(d.exemption_category).total_exemption_amount = category_max_amount
total_exemption_amount = sum([flt(d.total_exemption_amount) for d in exemptions.values()])
return total_exemption_amount
@frappe.whitelist()
def get_leave_period(from_date, to_date, company):
leave_period = frappe.db.sql("""
select name, from_date, to_date
from `tabLeave Period`
where company=%(company)s and is_active=1
and (from_date between %(from_date)s and %(to_date)s
or to_date between %(from_date)s and %(to_date)s
or (from_date < %(from_date)s and to_date > %(to_date)s))
""", {
"from_date": from_date,
"to_date": to_date,
"company": company
}, as_dict=1)
if leave_period:
return leave_period
def generate_leave_encashment():
''' Generates a draft leave encashment on allocation expiry '''
from erpnext.hr.doctype.leave_encashment.leave_encashment import create_leave_encashment
if frappe.db.get_single_value('HR Settings', 'auto_leave_encashment'):
leave_type = frappe.get_all('Leave Type', filters={'allow_encashment': 1}, fields=['name'])
leave_type=[l['name'] for l in leave_type]
leave_allocation = frappe.get_all("Leave Allocation", filters={
'to_date': add_days(today(), -1),
'leave_type': ('in', leave_type)
}, fields=['employee', 'leave_period', 'leave_type', 'to_date', 'total_leaves_allocated', 'new_leaves_allocated'])
create_leave_encashment(leave_allocation=leave_allocation)
def allocate_earned_leaves():
'''Allocate earned leaves to Employees'''
e_leave_types = get_earned_leaves()
today = getdate()
for e_leave_type in e_leave_types:
leave_allocations = get_leave_allocations(today, e_leave_type.name)
for allocation in leave_allocations:
if not allocation.leave_policy_assignment and not allocation.leave_policy:
continue
leave_policy = allocation.leave_policy if allocation.leave_policy else frappe.db.get_value(
"Leave Policy Assignment", allocation.leave_policy_assignment, ["leave_policy"])
annual_allocation = frappe.db.get_value("Leave Policy Detail", filters={
'parent': leave_policy,
'leave_type': e_leave_type.name
}, fieldname=['annual_allocation'])
from_date=allocation.from_date
if e_leave_type.based_on_date_of_joining_date:
from_date = frappe.db.get_value("Employee", allocation.employee, "date_of_joining")
if check_effective_date(from_date, today, e_leave_type.earned_leave_frequency, e_leave_type.based_on_date_of_joining_date):
update_previous_leave_allocation(allocation, annual_allocation, e_leave_type)
def update_previous_leave_allocation(allocation, annual_allocation, e_leave_type):
earned_leaves = get_monthly_earned_leave(annual_allocation, e_leave_type.earned_leave_frequency, e_leave_type.rounding)
allocation = frappe.get_doc('Leave Allocation', allocation.name)
new_allocation = flt(allocation.total_leaves_allocated) + flt(earned_leaves)
if new_allocation > e_leave_type.max_leaves_allowed and e_leave_type.max_leaves_allowed > 0:
new_allocation = e_leave_type.max_leaves_allowed
if new_allocation != allocation.total_leaves_allocated:
allocation.db_set("total_leaves_allocated", new_allocation, update_modified=False)
today_date = today()
create_additional_leave_ledger_entry(allocation, earned_leaves, today_date)
def get_monthly_earned_leave(annual_leaves, frequency, rounding):
earned_leaves = 0.0
divide_by_frequency = {"Yearly": 1, "Half-Yearly": 6, "Quarterly": 4, "Monthly": 12}
if annual_leaves:
earned_leaves = flt(annual_leaves) / divide_by_frequency[frequency]
if rounding:
if rounding == "0.25":
earned_leaves = round(earned_leaves * 4) / 4
elif rounding == "0.5":
earned_leaves = round(earned_leaves * 2) / 2
else:
earned_leaves = round(earned_leaves)
return earned_leaves
def get_leave_allocations(date, leave_type):
return frappe.db.sql("""select name, employee, from_date, to_date, leave_policy_assignment, leave_policy
from `tabLeave Allocation`
where
%s between from_date and to_date and docstatus=1
and leave_type=%s""",
(date, leave_type), as_dict=1)
def get_earned_leaves():
return frappe.get_all("Leave Type",
fields=["name", "max_leaves_allowed", "earned_leave_frequency", "rounding", "based_on_date_of_joining"],
filters={'is_earned_leave' : 1})
def create_additional_leave_ledger_entry(allocation, leaves, date):
''' Create leave ledger entry for leave types '''
allocation.new_leaves_allocated = leaves
allocation.from_date = date
allocation.unused_leaves = 0
allocation.create_leave_ledger_entry()
def check_effective_date(from_date, to_date, frequency, based_on_date_of_joining_date):
import calendar
from dateutil import relativedelta
from_date = get_datetime(from_date)
to_date = get_datetime(to_date)
rd = relativedelta.relativedelta(to_date, from_date)
#last day of month
last_day = calendar.monthrange(to_date.year, to_date.month)[1]
if (from_date.day == to_date.day and based_on_date_of_joining_date) or (not based_on_date_of_joining_date and to_date.day == last_day):
if frequency == "Monthly":
return True
elif frequency == "Quarterly" and rd.months % 3:
return True
elif frequency == "Half-Yearly" and rd.months % 6:
return True
elif frequency == "Yearly" and rd.months % 12:
return True
if frappe.flags.in_test:
return True
return False
def get_salary_assignment(employee, date):
assignment = frappe.db.sql("""
select * from `tabSalary Structure Assignment`
where employee=%(employee)s
and docstatus = 1
and %(on_date)s >= from_date order by from_date desc limit 1""", {
'employee': employee,
'on_date': date,
}, as_dict=1)
return assignment[0] if assignment else None
def get_sal_slip_total_benefit_given(employee, payroll_period, component=False):
total_given_benefit_amount = 0
query = """
select sum(sd.amount) as 'total_amount'
from `tabSalary Slip` ss, `tabSalary Detail` sd
where ss.employee=%(employee)s
and ss.docstatus = 1 and ss.name = sd.parent
and sd.is_flexible_benefit = 1 and sd.parentfield = "earnings"
and sd.parenttype = "Salary Slip"
and (ss.start_date between %(start_date)s and %(end_date)s
or ss.end_date between %(start_date)s and %(end_date)s
or (ss.start_date < %(start_date)s and ss.end_date > %(end_date)s))
"""
if component:
query += "and sd.salary_component = %(component)s"
sum_of_given_benefit = frappe.db.sql(query, {
'employee': employee,
'start_date': payroll_period.start_date,
'end_date': payroll_period.end_date,
'component': component
}, as_dict=True)
if sum_of_given_benefit and flt(sum_of_given_benefit[0].total_amount) > 0:
total_given_benefit_amount = sum_of_given_benefit[0].total_amount
return total_given_benefit_amount
def get_holiday_dates_for_employee(employee, start_date, end_date):
"""return a list of holiday dates for the given employee between start_date and end_date"""
# return only date
holidays = get_holidays_for_employee(employee, start_date, end_date)
return [cstr(h.holiday_date) for h in holidays]
def get_holidays_for_employee(employee, start_date, end_date, raise_exception=True, only_non_weekly=False):
"""Get Holidays for a given employee
`employee` (str)
`start_date` (str or datetime)
`end_date` (str or datetime)
`raise_exception` (bool)
`only_non_weekly` (bool)
return: list of dicts with `holiday_date` and `description`
"""
holiday_list = get_holiday_list_for_employee(employee, raise_exception=raise_exception)
if not holiday_list:
return []
filters = {
'parent': holiday_list,
'holiday_date': ('between', [start_date, end_date])
}
if only_non_weekly:
filters['weekly_off'] = False
holidays = frappe.get_all(
'Holiday',
fields=['description', 'holiday_date'],
filters=filters
)
return holidays
@erpnext.allow_regional
def calculate_annual_eligible_hra_exemption(doc):
# Don't delete this method, used for localization
# Indian HRA Exemption Calculation
return {}
@erpnext.allow_regional
def calculate_hra_exemption_for_period(doc):
# Don't delete this method, used for localization
# Indian HRA Exemption Calculation
return {}
def get_previous_claimed_amount(employee, payroll_period, non_pro_rata=False, component=False):
total_claimed_amount = 0
query = """
select sum(claimed_amount) as 'total_amount'
from `tabEmployee Benefit Claim`
where employee=%(employee)s
and docstatus = 1
and (claim_date between %(start_date)s and %(end_date)s)
"""
if non_pro_rata:
query += "and pay_against_benefit_claim = 1"
if component:
query += "and earning_component = %(component)s"
sum_of_claimed_amount = frappe.db.sql(query, {
'employee': employee,
'start_date': payroll_period.start_date,
'end_date': payroll_period.end_date,
'component': component
}, as_dict=True)
if sum_of_claimed_amount and flt(sum_of_claimed_amount[0].total_amount) > 0:
total_claimed_amount = sum_of_claimed_amount[0].total_amount
return total_claimed_amount
def share_doc_with_approver(doc, user):
# if approver does not have permissions, share
if not frappe.has_permission(doc=doc, ptype="submit", user=user):
frappe.share.add(doc.doctype, doc.name, user, submit=1,
flags={"ignore_share_permission": True})
frappe.msgprint(_("Shared with the user {0} with {1} access").format(
user, frappe.bold("submit"), alert=True))
# remove shared doc if approver changes
doc_before_save = doc.get_doc_before_save()
if doc_before_save:
approvers = {
"Leave Application": "leave_approver",
"Expense Claim": "expense_approver",
"Shift Request": "approver"
}
approver = approvers.get(doc.doctype)
if doc_before_save.get(approver) != doc.get(approver):
frappe.share.remove(doc.doctype, doc.name, doc_before_save.get(approver))
def validate_active_employee(employee):
if frappe.db.get_value("Employee", employee, "status") == "Inactive":
frappe.throw(_("Transactions cannot be created for an Inactive Employee {0}.").format(
get_link_to_form("Employee", employee)), InactiveEmployeeStatusError)

View File

@@ -199,6 +199,11 @@ erpnext.patches.v12_0.add_document_type_field_for_italy_einvoicing
erpnext.patches.v13_0.update_shipment_status
erpnext.patches.v13_0.remove_attribute_field_from_item_variant_setting
erpnext.patches.v13_0.set_pos_closing_as_failed
<<<<<<< HEAD
=======
erpnext.patches.v13_0.rename_stop_to_send_birthday_reminders
execute:frappe.rename_doc("Workspace", "Loan Management", "Loans", force=True)
>>>>>>> 24b2a31581 (feat: Employee reminders (#25735))
erpnext.patches.v13_0.update_timesheet_changes
erpnext.patches.v13_0.add_doctype_to_sla #14-06-2021
erpnext.patches.v13_0.bill_for_rejected_quantity_in_purchase_invoice

View File

@@ -0,0 +1,23 @@
import frappe
from frappe.model.utils.rename_field import rename_field
def execute():
frappe.reload_doc('hr', 'doctype', 'hr_settings')
try:
# Rename the field
rename_field('HR Settings', 'stop_birthday_reminders', 'send_birthday_reminders')
# Reverse the value
old_value = frappe.db.get_single_value('HR Settings', 'send_birthday_reminders')
frappe.db.set_value(
'HR Settings',
'HR Settings',
'send_birthday_reminders',
1 if old_value == 0 else 0
)
except Exception as e:
if e.args[0] != 1054:
raise

View File

@@ -0,0 +1,256 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe import _
from frappe.utils import date_diff, getdate, rounded, add_days, cstr, cint, flt
from frappe.model.document import Document
from erpnext.payroll.doctype.payroll_period.payroll_period import get_payroll_period_days, get_period_factor
from erpnext.payroll.doctype.salary_structure_assignment.salary_structure_assignment import get_assigned_salary_structure
from erpnext.hr.utils import get_sal_slip_total_benefit_given, get_holiday_dates_for_employee, get_previous_claimed_amount, validate_active_employee
class EmployeeBenefitApplication(Document):
def validate(self):
validate_active_employee(self.employee)
self.validate_duplicate_on_payroll_period()
if not self.max_benefits:
self.max_benefits = get_max_benefits_remaining(self.employee, self.date, self.payroll_period)
if self.max_benefits and self.max_benefits > 0:
self.validate_max_benefit_for_component()
self.validate_prev_benefit_claim()
if self.remaining_benefit > 0:
self.validate_remaining_benefit_amount()
else:
frappe.throw(_("As per your assigned Salary Structure you cannot apply for benefits").format(self.employee))
def validate_prev_benefit_claim(self):
if self.employee_benefits:
for benefit in self.employee_benefits:
if benefit.pay_against_benefit_claim == 1:
payroll_period = frappe.get_doc("Payroll Period", self.payroll_period)
benefit_claimed = get_previous_claimed_amount(self.employee, payroll_period, component = benefit.earning_component)
benefit_given = get_sal_slip_total_benefit_given(self.employee, payroll_period, component = benefit.earning_component)
benefit_claim_remining = benefit_claimed - benefit_given
if benefit_claimed > 0 and benefit_claim_remining > benefit.amount:
frappe.throw(_("An amount of {0} already claimed for the component {1}, set the amount equal or greater than {2}").format(
benefit_claimed, benefit.earning_component, benefit_claim_remining))
def validate_remaining_benefit_amount(self):
# check salary structure earnings have flexi component (sum of max_benefit_amount)
# without pro-rata which satisfy the remaining_benefit
# else pro-rata component for the amount
# again comes the same validation and satisfy or throw
benefit_components = []
if self.employee_benefits:
for employee_benefit in self.employee_benefits:
benefit_components.append(employee_benefit.earning_component)
salary_struct_name = get_assigned_salary_structure(self.employee, self.date)
if salary_struct_name:
non_pro_rata_amount = 0
pro_rata_amount = 0
salary_structure = frappe.get_doc("Salary Structure", salary_struct_name)
if salary_structure.earnings:
for earnings in salary_structure.earnings:
if earnings.is_flexible_benefit == 1 and earnings.salary_component not in benefit_components:
pay_against_benefit_claim, max_benefit_amount = frappe.db.get_value("Salary Component", earnings.salary_component, ["pay_against_benefit_claim", "max_benefit_amount"])
if pay_against_benefit_claim != 1:
pro_rata_amount += max_benefit_amount
else:
non_pro_rata_amount += max_benefit_amount
if pro_rata_amount == 0 and non_pro_rata_amount == 0:
frappe.throw(_("Please add the remaining benefits {0} to any of the existing component").format(self.remaining_benefit))
elif non_pro_rata_amount > 0 and non_pro_rata_amount < rounded(self.remaining_benefit):
frappe.throw(_("You can claim only an amount of {0}, the rest amount {1} should be in the application as pro-rata component").format(
non_pro_rata_amount, self.remaining_benefit - non_pro_rata_amount))
elif non_pro_rata_amount == 0:
frappe.throw(_("Please add the remaining benefits {0} to the application as pro-rata component").format(
self.remaining_benefit))
def validate_max_benefit_for_component(self):
if self.employee_benefits:
max_benefit_amount = 0
for employee_benefit in self.employee_benefits:
self.validate_max_benefit(employee_benefit.earning_component)
max_benefit_amount += employee_benefit.amount
if max_benefit_amount > self.max_benefits:
frappe.throw(_("Maximum benefit amount of employee {0} exceeds {1}").format(self.employee, self.max_benefits))
def validate_max_benefit(self, earning_component_name):
max_benefit_amount = frappe.db.get_value("Salary Component", earning_component_name, "max_benefit_amount")
benefit_amount = 0
for employee_benefit in self.employee_benefits:
if employee_benefit.earning_component == earning_component_name:
benefit_amount += employee_benefit.amount
prev_sal_slip_flexi_amount = get_sal_slip_total_benefit_given(self.employee, frappe.get_doc("Payroll Period", self.payroll_period), earning_component_name)
benefit_amount += prev_sal_slip_flexi_amount
if rounded(benefit_amount, 2) > max_benefit_amount:
frappe.throw(_("Maximum benefit amount of component {0} exceeds {1}").format(earning_component_name, max_benefit_amount))
def validate_duplicate_on_payroll_period(self):
application = frappe.db.exists(
"Employee Benefit Application",
{
'employee': self.employee,
'payroll_period': self.payroll_period,
'docstatus': 1
}
)
if application:
frappe.throw(_("Employee {0} already submited an apllication {1} for the payroll period {2}").format(self.employee, application, self.payroll_period))
@frappe.whitelist()
def get_max_benefits(employee, on_date):
sal_struct = get_assigned_salary_structure(employee, on_date)
if sal_struct:
max_benefits = frappe.db.get_value("Salary Structure", sal_struct, "max_benefits")
if max_benefits > 0:
return max_benefits
return False
@frappe.whitelist()
def get_max_benefits_remaining(employee, on_date, payroll_period):
max_benefits = get_max_benefits(employee, on_date)
if max_benefits and max_benefits > 0:
have_depends_on_payment_days = False
per_day_amount_total = 0
payroll_period_days = get_payroll_period_days(on_date, on_date, employee)[1]
payroll_period_obj = frappe.get_doc("Payroll Period", payroll_period)
# Get all salary slip flexi amount in the payroll period
prev_sal_slip_flexi_total = get_sal_slip_total_benefit_given(employee, payroll_period_obj)
if prev_sal_slip_flexi_total > 0:
# Check salary structure hold depends_on_payment_days component
# If yes then find the amount per day of each component and find the sum
sal_struct_name = get_assigned_salary_structure(employee, on_date)
if sal_struct_name:
sal_struct = frappe.get_doc("Salary Structure", sal_struct_name)
for sal_struct_row in sal_struct.get("earnings"):
salary_component = frappe.get_doc("Salary Component", sal_struct_row.salary_component)
if salary_component.depends_on_payment_days == 1 and salary_component.pay_against_benefit_claim != 1:
have_depends_on_payment_days = True
benefit_amount = get_benefit_amount_based_on_pro_rata(sal_struct, salary_component.max_benefit_amount)
amount_per_day = benefit_amount / payroll_period_days
per_day_amount_total += amount_per_day
# Then the sum multiply with the no of lwp in that period
# Include that amount to the prev_sal_slip_flexi_total to get the actual
if have_depends_on_payment_days and per_day_amount_total > 0:
holidays = get_holiday_dates_for_employee(employee, payroll_period_obj.start_date, on_date)
working_days = date_diff(on_date, payroll_period_obj.start_date) + 1
leave_days = calculate_lwp(employee, payroll_period_obj.start_date, holidays, working_days)
leave_days_amount = leave_days * per_day_amount_total
prev_sal_slip_flexi_total += leave_days_amount
return max_benefits - prev_sal_slip_flexi_total
return max_benefits
def calculate_lwp(employee, start_date, holidays, working_days):
lwp = 0
holidays = "','".join(holidays)
for d in range(working_days):
dt = add_days(cstr(getdate(start_date)), d)
leave = frappe.db.sql("""
select t1.name, t1.half_day
from `tabLeave Application` t1, `tabLeave Type` t2
where t2.name = t1.leave_type
and t2.is_lwp = 1
and t1.docstatus = 1
and t1.employee = %(employee)s
and CASE WHEN t2.include_holiday != 1 THEN %(dt)s not in ('{0}') and %(dt)s between from_date and to_date
WHEN t2.include_holiday THEN %(dt)s between from_date and to_date
END
""".format(holidays), {"employee": employee, "dt": dt})
if leave:
lwp = cint(leave[0][1]) and (lwp + 0.5) or (lwp + 1)
return lwp
def get_benefit_component_amount(employee, start_date, end_date, salary_component, sal_struct, payroll_frequency, payroll_period):
if not payroll_period:
frappe.msgprint(_("Start and end dates not in a valid Payroll Period, cannot calculate {0}")
.format(salary_component))
return False
# Considering there is only one application for a year
benefit_application = frappe.db.sql("""
select name
from `tabEmployee Benefit Application`
where
payroll_period=%(payroll_period)s
and employee=%(employee)s
and docstatus = 1
""", {
'employee': employee,
'payroll_period': payroll_period.name
})
current_benefit_amount = 0.0
component_max_benefit, depends_on_payment_days = frappe.db.get_value("Salary Component",
salary_component, ["max_benefit_amount", "depends_on_payment_days"])
benefit_amount = 0
if benefit_application:
benefit_amount = frappe.db.get_value("Employee Benefit Application Detail",
{"parent": benefit_application[0][0], "earning_component": salary_component}, "amount")
elif component_max_benefit:
benefit_amount = get_benefit_amount_based_on_pro_rata(sal_struct, component_max_benefit)
current_benefit_amount = 0
if benefit_amount:
total_sub_periods = get_period_factor(employee,
start_date, end_date, payroll_frequency, payroll_period, depends_on_payment_days)[0]
current_benefit_amount = benefit_amount / total_sub_periods
return current_benefit_amount
def get_benefit_amount_based_on_pro_rata(sal_struct, component_max_benefit):
max_benefits_total = 0
benefit_amount = 0
for d in sal_struct.get("earnings"):
if d.is_flexible_benefit == 1:
component = frappe.db.get_value("Salary Component", d.salary_component, ["max_benefit_amount", "pay_against_benefit_claim"], as_dict=1)
if not component.pay_against_benefit_claim:
max_benefits_total += component.max_benefit_amount
if max_benefits_total > 0:
benefit_amount = sal_struct.max_benefits * component.max_benefit_amount / max_benefits_total
if benefit_amount > component_max_benefit:
benefit_amount = component_max_benefit
return benefit_amount
@frappe.whitelist()
@frappe.validate_and_sanitize_search_inputs
def get_earning_components(doctype, txt, searchfield, start, page_len, filters):
if len(filters) < 2:
return {}
salary_structure = get_assigned_salary_structure(filters['employee'], filters['date'])
if salary_structure:
return frappe.db.sql("""
select salary_component
from `tabSalary Detail`
where parent = %s and is_flexible_benefit = 1
order by name
""", salary_structure)
else:
frappe.throw(_("Salary Structure not found for employee {0} and date {1}")
.format(filters['employee'], filters['date']))
@frappe.whitelist()
def get_earning_components_max_benefits(employee, date, earning_component):
salary_structure = get_assigned_salary_structure(employee, date)
amount = frappe.db.sql("""
select amount
from `tabSalary Detail`
where parent = %s and is_flexible_benefit = 1
and salary_component = %s
order by name
""", salary_structure, earning_component)
return amount if amount else 0

View File

@@ -0,0 +1,108 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2018, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
from __future__ import unicode_literals
import frappe
from frappe import _
from frappe.utils import date_diff, getdate, formatdate, cint, month_diff, flt, add_months
from frappe.model.document import Document
from erpnext.hr.utils import get_holiday_dates_for_employee
class PayrollPeriod(Document):
def validate(self):
self.validate_dates()
self.validate_overlap()
def validate_dates(self):
if getdate(self.start_date) > getdate(self.end_date):
frappe.throw(_("End date can not be less than start date"))
def validate_overlap(self):
query = """
select name
from `tab{0}`
where name != %(name)s
and company = %(company)s and (start_date between %(start_date)s and %(end_date)s \
or end_date between %(start_date)s and %(end_date)s \
or (start_date < %(start_date)s and end_date > %(end_date)s))
"""
if not self.name:
# hack! if name is null, it could cause problems with !=
self.name = "New "+self.doctype
overlap_doc = frappe.db.sql(query.format(self.doctype),{
"start_date": self.start_date,
"end_date": self.end_date,
"name": self.name,
"company": self.company
}, as_dict = 1)
if overlap_doc:
msg = _("A {0} exists between {1} and {2} (").format(self.doctype,
formatdate(self.start_date), formatdate(self.end_date)) \
+ """ <b><a href="/app/Form/{0}/{1}">{1}</a></b>""".format(self.doctype, overlap_doc[0].name) \
+ _(") for {0}").format(self.company)
frappe.throw(msg)
def get_payroll_period_days(start_date, end_date, employee, company=None):
if not company:
company = frappe.db.get_value("Employee", employee, "company")
payroll_period = frappe.db.sql("""
select name, start_date, end_date
from `tabPayroll Period`
where
company=%(company)s
and %(start_date)s between start_date and end_date
and %(end_date)s between start_date and end_date
""", {
'company': company,
'start_date': start_date,
'end_date': end_date
})
if len(payroll_period) > 0:
actual_no_of_days = date_diff(getdate(payroll_period[0][2]), getdate(payroll_period[0][1])) + 1
working_days = actual_no_of_days
if not cint(frappe.db.get_value("Payroll Settings", None, "include_holidays_in_total_working_days")):
holidays = get_holiday_dates_for_employee(employee, getdate(payroll_period[0][1]), getdate(payroll_period[0][2]))
working_days -= len(holidays)
return payroll_period[0][0], working_days, actual_no_of_days
return False, False, False
def get_payroll_period(from_date, to_date, company):
payroll_period = frappe.db.sql("""
select name, start_date, end_date
from `tabPayroll Period`
where start_date<=%s and end_date>= %s and company=%s
""", (from_date, to_date, company), as_dict=1)
return payroll_period[0] if payroll_period else None
def get_period_factor(employee, start_date, end_date, payroll_frequency, payroll_period, depends_on_payment_days=0):
# TODO if both deduct checked update the factor to make tax consistent
period_start, period_end = payroll_period.start_date, payroll_period.end_date
joining_date, relieving_date = frappe.db.get_value("Employee", employee, ["date_of_joining", "relieving_date"])
if getdate(joining_date) > getdate(period_start):
period_start = joining_date
if relieving_date and getdate(relieving_date) < getdate(period_end):
period_end = relieving_date
if month_diff(period_end, start_date) > 1:
start_date = add_months(start_date, - (month_diff(period_end, start_date)+1))
total_sub_periods, remaining_sub_periods = 0.0, 0.0
if payroll_frequency == "Monthly" and not depends_on_payment_days:
total_sub_periods = month_diff(payroll_period.end_date, payroll_period.start_date)
remaining_sub_periods = month_diff(period_end, start_date)
else:
salary_days = date_diff(end_date, start_date) + 1
days_in_payroll_period = date_diff(payroll_period.end_date, payroll_period.start_date) + 1
total_sub_periods = flt(days_in_payroll_period) / flt(salary_days)
remaining_days_in_payroll_period = date_diff(period_end, start_date) + 1
remaining_sub_periods = flt(remaining_days_in_payroll_period) / flt(salary_days)
return total_sub_periods, remaining_sub_periods

File diff suppressed because it is too large Load Diff

View File

@@ -74,9 +74,22 @@ $.extend(erpnext, {
});
},
<<<<<<< HEAD
route_to_pending_reposts: (args) => {
frappe.set_route("List", "Repost Item Valuation", args);
},
=======
proceed_save_with_reminders_frequency_change: () => {
frappe.ui.hide_open_dialog();
frappe.call({
method: 'erpnext.hr.doctype.hr_settings.hr_settings.set_proceed_with_frequency_change',
callback: () => {
cur_frm.save();
}
});
}
>>>>>>> 24b2a31581 (feat: Employee reminders (#25735))
});
$.extend(erpnext.utils, {

View File

@@ -0,0 +1,247 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
from frappe import _
from frappe.utils import comma_sep, getdate, today, add_months, add_days
from erpnext.hr.doctype.employee.employee import get_all_employee_emails, get_employee_email
from erpnext.hr.utils import get_holidays_for_employee
# -----------------
# HOLIDAY REMINDERS
# -----------------
def send_reminders_in_advance_weekly():
to_send_in_advance = int(frappe.db.get_single_value("HR Settings", "send_holiday_reminders") or 1)
frequency = frappe.db.get_single_value("HR Settings", "frequency")
if not (to_send_in_advance and frequency == "Weekly"):
return
send_advance_holiday_reminders("Weekly")
def send_reminders_in_advance_monthly():
to_send_in_advance = int(frappe.db.get_single_value("HR Settings", "send_holiday_reminders") or 1)
frequency = frappe.db.get_single_value("HR Settings", "frequency")
if not (to_send_in_advance and frequency == "Monthly"):
return
send_advance_holiday_reminders("Monthly")
def send_advance_holiday_reminders(frequency):
"""Send Holiday Reminders in Advance to Employees
`frequency` (str): 'Weekly' or 'Monthly'
"""
if frequency == "Weekly":
start_date = getdate()
end_date = add_days(getdate(), 7)
elif frequency == "Monthly":
# Sent on 1st of every month
start_date = getdate()
end_date = add_months(getdate(), 1)
else:
return
employees = frappe.db.get_all('Employee', pluck='name')
for employee in employees:
holidays = get_holidays_for_employee(
employee,
start_date, end_date,
only_non_weekly=True,
raise_exception=False
)
if not (holidays is None):
send_holidays_reminder_in_advance(employee, holidays)
def send_holidays_reminder_in_advance(employee, holidays):
employee_doc = frappe.get_doc('Employee', employee)
employee_email = get_employee_email(employee_doc)
frequency = frappe.db.get_single_value("HR Settings", "frequency")
email_header = _("Holidays this Month.") if frequency == "Monthly" else _("Holidays this Week.")
frappe.sendmail(
recipients=[employee_email],
subject=_("Upcoming Holidays Reminder"),
template="holiday_reminder",
args=dict(
reminder_text=_("Hey {}! This email is to remind you about the upcoming holidays.").format(employee_doc.get('first_name')),
message=_("Below is the list of upcoming holidays for you:"),
advance_holiday_reminder=True,
holidays=holidays,
frequency=frequency[:-2]
),
header=email_header
)
# ------------------
# BIRTHDAY REMINDERS
# ------------------
def send_birthday_reminders():
"""Send Employee birthday reminders if no 'Stop Birthday Reminders' is not set."""
to_send = int(frappe.db.get_single_value("HR Settings", "send_birthday_reminders") or 1)
if not to_send:
return
employees_born_today = get_employees_who_are_born_today()
for company, birthday_persons in employees_born_today.items():
employee_emails = get_all_employee_emails(company)
birthday_person_emails = [get_employee_email(doc) for doc in birthday_persons]
recipients = list(set(employee_emails) - set(birthday_person_emails))
reminder_text, message = get_birthday_reminder_text_and_message(birthday_persons)
send_birthday_reminder(recipients, reminder_text, birthday_persons, message)
if len(birthday_persons) > 1:
# special email for people sharing birthdays
for person in birthday_persons:
person_email = person["user_id"] or person["personal_email"] or person["company_email"]
others = [d for d in birthday_persons if d != person]
reminder_text, message = get_birthday_reminder_text_and_message(others)
send_birthday_reminder(person_email, reminder_text, others, message)
def get_birthday_reminder_text_and_message(birthday_persons):
if len(birthday_persons) == 1:
birthday_person_text = birthday_persons[0]['name']
else:
# converts ["Jim", "Rim", "Dim"] to Jim, Rim & Dim
person_names = [d['name'] for d in birthday_persons]
birthday_person_text = comma_sep(person_names, frappe._("{0} & {1}"), False)
reminder_text = _("Today is {0}'s birthday 🎉").format(birthday_person_text)
message = _("A friendly reminder of an important date for our team.")
message += "<br>"
message += _("Everyone, lets congratulate {0} on their birthday.").format(birthday_person_text)
return reminder_text, message
def send_birthday_reminder(recipients, reminder_text, birthday_persons, message):
frappe.sendmail(
recipients=recipients,
subject=_("Birthday Reminder"),
template="birthday_reminder",
args=dict(
reminder_text=reminder_text,
birthday_persons=birthday_persons,
message=message,
),
header=_("Birthday Reminder 🎂")
)
def get_employees_who_are_born_today():
"""Get all employee born today & group them based on their company"""
return get_employees_having_an_event_today("birthday")
def get_employees_having_an_event_today(event_type):
"""Get all employee who have `event_type` today
& group them based on their company. `event_type`
can be `birthday` or `work_anniversary`"""
from collections import defaultdict
# Set column based on event type
if event_type == 'birthday':
condition_column = 'date_of_birth'
elif event_type == 'work_anniversary':
condition_column = 'date_of_joining'
else:
return
employees_born_today = frappe.db.multisql({
"mariadb": f"""
SELECT `personal_email`, `company`, `company_email`, `user_id`, `employee_name` AS 'name', `image`, `date_of_joining`
FROM `tabEmployee`
WHERE
DAY({condition_column}) = DAY(%(today)s)
AND
MONTH({condition_column}) = MONTH(%(today)s)
AND
`status` = 'Active'
""",
"postgres": f"""
SELECT "personal_email", "company", "company_email", "user_id", "employee_name" AS 'name', "image"
FROM "tabEmployee"
WHERE
DATE_PART('day', {condition_column}) = date_part('day', %(today)s)
AND
DATE_PART('month', {condition_column}) = date_part('month', %(today)s)
AND
"status" = 'Active'
""",
}, dict(today=today(), condition_column=condition_column), as_dict=1)
grouped_employees = defaultdict(lambda: [])
for employee_doc in employees_born_today:
grouped_employees[employee_doc.get('company')].append(employee_doc)
return grouped_employees
# --------------------------
# WORK ANNIVERSARY REMINDERS
# --------------------------
def send_work_anniversary_reminders():
"""Send Employee Work Anniversary Reminders if 'Send Work Anniversary Reminders' is checked"""
to_send = int(frappe.db.get_single_value("HR Settings", "send_work_anniversary_reminders") or 1)
if not to_send:
return
employees_joined_today = get_employees_having_an_event_today("work_anniversary")
for company, anniversary_persons in employees_joined_today.items():
employee_emails = get_all_employee_emails(company)
anniversary_person_emails = [get_employee_email(doc) for doc in anniversary_persons]
recipients = list(set(employee_emails) - set(anniversary_person_emails))
reminder_text, message = get_work_anniversary_reminder_text_and_message(anniversary_persons)
send_work_anniversary_reminder(recipients, reminder_text, anniversary_persons, message)
if len(anniversary_persons) > 1:
# email for people sharing work anniversaries
for person in anniversary_persons:
person_email = person["user_id"] or person["personal_email"] or person["company_email"]
others = [d for d in anniversary_persons if d != person]
reminder_text, message = get_work_anniversary_reminder_text_and_message(others)
send_work_anniversary_reminder(person_email, reminder_text, others, message)
def get_work_anniversary_reminder_text_and_message(anniversary_persons):
if len(anniversary_persons) == 1:
anniversary_person = anniversary_persons[0]['name']
persons_name = anniversary_person
# Number of years completed at the company
completed_years = getdate().year - anniversary_persons[0]['date_of_joining'].year
anniversary_person += f" completed {completed_years} years"
else:
person_names_with_years = []
names = []
for person in anniversary_persons:
person_text = person['name']
names.append(person_text)
# Number of years completed at the company
completed_years = getdate().year - person['date_of_joining'].year
person_text += f" completed {completed_years} years"
person_names_with_years.append(person_text)
# converts ["Jim", "Rim", "Dim"] to Jim, Rim & Dim
anniversary_person = comma_sep(person_names_with_years, frappe._("{0} & {1}"), False)
persons_name = comma_sep(names, frappe._("{0} & {1}"), False)
reminder_text = _("Today {0} at our Company! 🎉").format(anniversary_person)
message = _("A friendly reminder of an important date for our team.")
message += "<br>"
message += _("Everyone, lets congratulate {0} on their work anniversary!").format(persons_name)
return reminder_text, message
def send_work_anniversary_reminder(recipients, reminder_text, anniversary_persons, message):
frappe.sendmail(
recipients=recipients,
subject=_("Work Anniversary Reminder"),
template="anniversary_reminder",
args=dict(
reminder_text=reminder_text,
anniversary_persons=anniversary_persons,
message=message,
),
header=_("🎊️🎊️ Work Anniversary Reminder 🎊️🎊️")
)

View File

@@ -0,0 +1,173 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import frappe
import unittest
from frappe.utils import getdate
from datetime import timedelta
from erpnext.hr.doctype.employee.test_employee import make_employee
from erpnext.hr.doctype.hr_settings.hr_settings import set_proceed_with_frequency_change
class TestEmployeeReminders(unittest.TestCase):
@classmethod
def setUpClass(cls):
from erpnext.hr.doctype.holiday_list.test_holiday_list import make_holiday_list
# Create a test holiday list
test_holiday_dates = cls.get_test_holiday_dates()
test_holiday_list = make_holiday_list(
'TestHolidayRemindersList',
holiday_dates=[
{'holiday_date': test_holiday_dates[0], 'description': 'test holiday1'},
{'holiday_date': test_holiday_dates[1], 'description': 'test holiday2'},
{'holiday_date': test_holiday_dates[2], 'description': 'test holiday3', 'weekly_off': 1},
{'holiday_date': test_holiday_dates[3], 'description': 'test holiday4'},
{'holiday_date': test_holiday_dates[4], 'description': 'test holiday5'},
{'holiday_date': test_holiday_dates[5], 'description': 'test holiday6'},
],
from_date=getdate()-timedelta(days=10),
to_date=getdate()+timedelta(weeks=5)
)
# Create a test employee
test_employee = frappe.get_doc(
'Employee',
make_employee('test@gopher.io', company="_Test Company")
)
# Attach the holiday list to employee
test_employee.holiday_list = test_holiday_list.name
test_employee.save()
# Attach to class
cls.test_employee = test_employee
cls.test_holiday_dates = test_holiday_dates
@classmethod
def get_test_holiday_dates(cls):
today_date = getdate()
return [
today_date,
today_date-timedelta(days=4),
today_date-timedelta(days=3),
today_date+timedelta(days=1),
today_date+timedelta(days=3),
today_date+timedelta(weeks=3)
]
def setUp(self):
# Clear Email Queue
frappe.db.sql("delete from `tabEmail Queue`")
def test_is_holiday(self):
from erpnext.hr.doctype.employee.employee import is_holiday
self.assertTrue(is_holiday(self.test_employee.name))
self.assertTrue(is_holiday(self.test_employee.name, date=self.test_holiday_dates[1]))
self.assertFalse(is_holiday(self.test_employee.name, date=getdate()-timedelta(days=1)))
# Test weekly_off holidays
self.assertTrue(is_holiday(self.test_employee.name, date=self.test_holiday_dates[2]))
self.assertFalse(is_holiday(self.test_employee.name, date=self.test_holiday_dates[2], only_non_weekly=True))
# Test with descriptions
has_holiday, descriptions = is_holiday(self.test_employee.name, with_description=True)
self.assertTrue(has_holiday)
self.assertTrue('test holiday1' in descriptions)
def test_birthday_reminders(self):
employee = frappe.get_doc("Employee", frappe.db.sql_list("select name from tabEmployee limit 1")[0])
employee.date_of_birth = "1992" + frappe.utils.nowdate()[4:]
employee.company_email = "test@example.com"
employee.company = "_Test Company"
employee.save()
from erpnext.hr.doctype.employee.employee_reminders import get_employees_who_are_born_today, send_birthday_reminders
employees_born_today = get_employees_who_are_born_today()
self.assertTrue(employees_born_today.get("_Test Company"))
hr_settings = frappe.get_doc("HR Settings", "HR Settings")
hr_settings.send_birthday_reminders = 1
hr_settings.save()
send_birthday_reminders()
email_queue = frappe.db.sql("""select * from `tabEmail Queue`""", as_dict=True)
self.assertTrue("Subject: Birthday Reminder" in email_queue[0].message)
def test_work_anniversary_reminders(self):
employee = frappe.get_doc("Employee", frappe.db.sql_list("select name from tabEmployee limit 1")[0])
employee.date_of_joining = "1998" + frappe.utils.nowdate()[4:]
employee.company_email = "test@example.com"
employee.company = "_Test Company"
employee.save()
from erpnext.hr.doctype.employee.employee_reminders import get_employees_having_an_event_today, send_work_anniversary_reminders
employees_having_work_anniversary = get_employees_having_an_event_today('work_anniversary')
self.assertTrue(employees_having_work_anniversary.get("_Test Company"))
hr_settings = frappe.get_doc("HR Settings", "HR Settings")
hr_settings.send_work_anniversary_reminders = 1
hr_settings.save()
send_work_anniversary_reminders()
email_queue = frappe.db.sql("""select * from `tabEmail Queue`""", as_dict=True)
self.assertTrue("Subject: Work Anniversary Reminder" in email_queue[0].message)
def test_send_holidays_reminder_in_advance(self):
from erpnext.hr.utils import get_holidays_for_employee
from erpnext.hr.doctype.employee.employee_reminders import send_holidays_reminder_in_advance
# Get HR settings and enable advance holiday reminders
hr_settings = frappe.get_doc("HR Settings", "HR Settings")
hr_settings.send_holiday_reminders = 1
set_proceed_with_frequency_change()
hr_settings.frequency = 'Weekly'
hr_settings.save()
holidays = get_holidays_for_employee(
self.test_employee.get('name'),
getdate(), getdate() + timedelta(days=3),
only_non_weekly=True,
raise_exception=False
)
send_holidays_reminder_in_advance(
self.test_employee.get('name'),
holidays
)
email_queue = frappe.db.sql("""select * from `tabEmail Queue`""", as_dict=True)
self.assertEqual(len(email_queue), 1)
def test_advance_holiday_reminders_monthly(self):
from erpnext.hr.doctype.employee.employee_reminders import send_reminders_in_advance_monthly
# Get HR settings and enable advance holiday reminders
hr_settings = frappe.get_doc("HR Settings", "HR Settings")
hr_settings.send_holiday_reminders = 1
set_proceed_with_frequency_change()
hr_settings.frequency = 'Monthly'
hr_settings.save()
send_reminders_in_advance_monthly()
email_queue = frappe.db.sql("""select * from `tabEmail Queue`""", as_dict=True)
self.assertTrue(len(email_queue) > 0)
def test_advance_holiday_reminders_weekly(self):
from erpnext.hr.doctype.employee.employee_reminders import send_reminders_in_advance_weekly
# Get HR settings and enable advance holiday reminders
hr_settings = frappe.get_doc("HR Settings", "HR Settings")
hr_settings.send_holiday_reminders = 1
hr_settings.frequency = 'Weekly'
hr_settings.save()
send_reminders_in_advance_weekly()
email_queue = frappe.db.sql("""select * from `tabEmail Queue`""", as_dict=True)
self.assertTrue(len(email_queue) > 0)

View File

@@ -0,0 +1,25 @@
<div class="gray-container text-center">
<div>
{% for person in anniversary_persons %}
{% if person.image %}
<img
class="avatar-frame standard-image"
src="{{ person.image }}"
style="{{ css_style or '' }}"
title="{{ person.name }}">
</span>
{% else %}
<span
class="avatar-frame standard-image"
style="{{ css_style or '' }}"
title="{{ person.name }}">
{{ frappe.utils.get_abbr(person.name) }}
</span>
{% endif %}
{% endfor %}
</div>
<div style="margin-top: 15px">
<span>{{ reminder_text }}</span>
<p class="text-muted">{{ message }}</p>
</div>
</div>

View File

@@ -0,0 +1,16 @@
<div>
<span>{{ reminder_text }}</span>
<p class="text-muted">{{ message }}</p>
</div>
{% if advance_holiday_reminder %}
{% if holidays | len > 0 %}
<ol>
{% for holiday in holidays %}
<li>{{ frappe.format(holiday.holiday_date, 'Date') }} - {{ holiday.description }}</li>
{% endfor %}
</ol>
{% else %}
<p>You don't have no upcoming holidays this {{ frequency }}.</p>
{% endif %}
{% endif %}