# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import json
from collections import defaultdict
import frappe
from frappe import _, bold, qb, throw
from frappe.model.workflow import get_workflow_name, is_transition_condition_satisfied
from frappe.query_builder import Criterion, DocType
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import Abs, Sum
from frappe.utils import (
add_days,
add_months,
cint,
comma_and,
flt,
fmt_money,
formatdate,
get_last_day,
get_link_to_form,
getdate,
nowdate,
parse_json,
today,
)
import erpnext
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_accounting_dimensions,
get_dimensions,
)
from erpnext.accounts.doctype.pricing_rule.utils import (
apply_pricing_rule_for_free_items,
apply_pricing_rule_on_transaction,
get_applied_pricing_rules,
)
from erpnext.accounts.general_ledger import get_round_off_account_and_cost_center
from erpnext.accounts.party import (
get_party_account,
get_party_account_currency,
get_party_gle_currency,
validate_party_frozen_disabled,
)
from erpnext.accounts.utils import (
create_gain_loss_journal,
get_account_currency,
get_currency_precision,
get_fiscal_years,
validate_fiscal_year,
)
from erpnext.buying.utils import update_last_purchase_rate
from erpnext.controllers.print_settings import (
set_print_templates_for_item_table,
set_print_templates_for_taxes,
)
from erpnext.controllers.sales_and_purchase_return import validate_return
from erpnext.exceptions import InvalidCurrency
from erpnext.setup.utils import get_exchange_rate
from erpnext.stock.doctype.item.item import get_uom_conv_factor
from erpnext.stock.doctype.packed_item.packed_item import make_packing_list
from erpnext.stock.get_item_details import (
ItemDetailsCtx,
_get_item_tax_template,
get_conversion_factor,
get_item_details,
get_item_tax_map,
get_item_warehouse_,
)
from erpnext.utilities.regional import temporary_flag
from erpnext.utilities.transaction_base import TransactionBase
class AccountMissingError(frappe.ValidationError):
pass
class InvalidQtyError(frappe.ValidationError):
pass
force_item_fields = (
"item_group",
"brand",
"stock_uom",
"is_fixed_asset",
"pricing_rules",
"weight_per_unit",
"weight_uom",
"total_weight",
"valuation_rate",
)
class AccountsController(TransactionBase):
def get_print_settings(self):
print_setting_fields = []
items_field = self.meta.get_field("items")
if items_field and items_field.fieldtype == "Table":
print_setting_fields += ["compact_item_print", "print_uom_after_quantity"]
taxes_field = self.meta.get_field("taxes")
if taxes_field and taxes_field.fieldtype == "Table":
print_setting_fields += ["print_taxes_with_zero_amount"]
return print_setting_fields
@property
def company_currency(self):
if not hasattr(self, "__company_currency"):
self.__company_currency = erpnext.get_company_currency(self.company)
return self.__company_currency
def onload(self):
self.set_onload(
"make_payment_via_journal_entry",
frappe.client_cache.get_doc("Accounts Settings").make_payment_via_journal_entry,
)
if self.is_new():
relevant_docs = (
"Quotation",
"Purchase Order",
"Sales Order",
"Purchase Invoice",
"Sales Invoice",
)
if self.doctype in relevant_docs:
self.set_payment_schedule()
def remove_bundle_for_non_stock_invoices(self):
has_sabb = False
if self.doctype in ("Sales Invoice", "Purchase Invoice") and not self.update_stock:
for item in self.get("items"):
if item.serial_and_batch_bundle:
item.serial_and_batch_bundle = None
has_sabb = True
if has_sabb:
self.remove_serial_and_batch_bundle()
def ensure_supplier_is_not_blocked(self):
is_supplier_payment = self.doctype == "Payment Entry" and self.party_type == "Supplier"
is_buying_invoice = self.doctype in ["Purchase Invoice", "Purchase Order"]
supplier_name = self.supplier if is_buying_invoice else self.party if is_supplier_payment else None
supplier = None
if supplier_name:
supplier = frappe.get_lazy_doc("Supplier", supplier_name)
if supplier and supplier.on_hold:
if (is_buying_invoice and supplier.hold_type in ["All", "Invoices"]) or (
is_supplier_payment and supplier.hold_type in ["All", "Payments"]
):
if not supplier.release_date or getdate(nowdate()) <= supplier.release_date:
frappe.msgprint(
_("{0} is blocked so this transaction cannot proceed").format(supplier_name),
raise_exception=1,
)
def validate_against_voucher_outstanding(self):
from frappe.model.meta import get_meta
if not get_meta(self.doctype).has_field("outstanding_amount"):
return
if self.get("is_return") and self.return_against and not self.get("is_pos"):
against_voucher_outstanding = frappe.get_value(
self.doctype, self.return_against, "outstanding_amount"
)
document_type = "Credit Note" if self.doctype == "Sales Invoice" else "Debit Note"
msg = ""
if self.get("update_outstanding_for_self"):
msg = (
"We can see {0} is made against {1}. If you want {1}'s outstanding to be updated, "
"uncheck '{2}' checkbox.
Or"
).format(
frappe.bold(document_type),
get_link_to_form(self.doctype, self.get("return_against")),
frappe.bold(_("Update Outstanding for Self")),
)
elif not self.update_outstanding_for_self and (
abs(flt(self.rounded_total) or flt(self.grand_total)) > flt(against_voucher_outstanding)
):
self.update_outstanding_for_self = 1
msg = (
"The outstanding amount {} in {} is lesser than {}. Updating the outstanding to this invoice.
And"
).format(
against_voucher_outstanding,
get_link_to_form(self.doctype, self.get("return_against")),
flt(abs(self.outstanding_amount)),
)
if msg:
msg += " you can use {} tool to reconcile against {} later.".format(
get_link_to_form("Payment Reconciliation"),
get_link_to_form(self.doctype, self.get("return_against")),
)
frappe.msgprint(_(msg))
def validate(self):
if not self.get("is_return") and not self.get("is_debit_note"):
self.validate_qty_is_not_zero()
if (
self.doctype in ["Sales Invoice", "Purchase Invoice", "POS Invoice"]
and self.get("is_return")
and self.get("update_stock")
):
self.validate_zero_qty_for_return_invoices_with_stock()
if self.get("_action") and self._action != "update_after_submit":
self.set_missing_values(for_validate=True)
if self.get("_action") == "submit":
self.remove_bundle_for_non_stock_invoices()
self.ensure_supplier_is_not_blocked()
self.validate_date_with_fiscal_year()
self.validate_party_accounts()
self.validate_inter_company_reference()
# validate inter company transaction rate
self.validate_internal_transaction()
self.disable_pricing_rule_on_internal_transfer()
self.disable_tax_included_prices_for_internal_transfer()
self.set_incoming_rate()
self.init_internal_values()
self.validate_against_voucher_outstanding()
# Need to set taxes based on taxes_and_charges template
# before calculating taxes and totals
if self.meta.get_field("taxes_and_charges"):
self.validate_enabled_taxes_and_charges()
self.validate_tax_account_company()
self.set_taxes_and_charges()
if self.meta.get_field("currency"):
self.calculate_taxes_and_totals()
if not self.meta.get_field("is_return") or not self.is_return:
self.validate_value("base_grand_total", ">=", 0)
validate_return(self)
self.validate_all_documents_schedule()
self.validate_party()
self.validate_currency()
self.validate_party_account_currency()
self.validate_return_against_account()
if self.doctype in ["Purchase Invoice", "Sales Invoice"]:
if invalid_advances := [x for x in self.advances if not x.reference_type or not x.reference_name]:
frappe.throw(
_(
"Rows: {0} in {1} section are Invalid. Reference Name should point to a valid Payment Entry or Journal Entry."
).format(
frappe.bold(comma_and([x.idx for x in invalid_advances])),
frappe.bold(_("Advance Payments")),
)
)
pos_check_field = "is_pos" if self.doctype == "Sales Invoice" else "is_paid"
if cint(self.allocate_advances_automatically) and not cint(self.get(pos_check_field)):
self.set_advances()
self.set_advance_gain_or_loss()
if self.is_return:
self.validate_qty()
else:
self.validate_deferred_start_and_end_date()
self.validate_deferred_income_expense_account()
self.set_inter_company_account()
if self.doctype == "Purchase Invoice":
self.calculate_paid_amount()
# apply tax withholding only if checked and applicable
self.set_tax_withholding()
with temporary_flag("company", self.company):
validate_regional(self)
validate_einvoice_fields(self)
if self.doctype != "Material Request" and not self.ignore_pricing_rule:
apply_pricing_rule_on_transaction(self)
self.set_total_in_words()
self.set_default_letter_head()
self.validate_company_in_accounting_dimension()
self.validate_party_address_and_contact()
def set_default_letter_head(self):
if hasattr(self, "letter_head") and not self.letter_head:
self.letter_head = frappe.db.get_value("Company", self.company, "default_letter_head")
def init_internal_values(self):
# init all the internal values as 0 on sa
if self.docstatus.is_draft():
# TODO: Add all such pending values here
fields = ["billed_amt", "delivered_qty"]
for item in self.get("items"):
for field in fields:
if hasattr(item, field):
item.set(field, 0)
def before_cancel(self):
validate_einvoice_fields(self)
def _remove_references_in_unreconcile(self):
upe = frappe.qb.DocType("Unreconcile Payment Entries")
rows = (
frappe.qb.from_(upe)
.select(upe.name, upe.parent)
.where((upe.reference_doctype == self.doctype) & (upe.reference_name == self.name))
.run(as_dict=True)
)
if rows:
references_map = frappe._dict()
for x in rows:
references_map.setdefault(x.parent, []).append(x.name)
for doc, rows in references_map.items():
unreconcile_doc = frappe.get_doc("Unreconcile Payment", doc)
for row in rows:
unreconcile_doc.remove(unreconcile_doc.get("allocations", {"name": row})[0])
unreconcile_doc.flags.ignore_validate_update_after_submit = True
unreconcile_doc.flags.ignore_links = True
unreconcile_doc.save(ignore_permissions=True)
# delete docs upon parent doc deletion
unreconcile_docs = frappe.db.get_all("Unreconcile Payment", filters={"voucher_no": self.name})
for x in unreconcile_docs:
_doc = frappe.get_doc("Unreconcile Payment", x.name)
if _doc.docstatus == 1:
_doc.cancel()
_doc.delete()
def _remove_references_in_repost_doctypes(self):
repost_doctypes = ["Repost Payment Ledger Items", "Repost Accounting Ledger Items"]
for _doctype in repost_doctypes:
dt = frappe.qb.DocType(_doctype)
rows = (
frappe.qb.from_(dt)
.select(dt.name, dt.parent, dt.parenttype)
.where((dt.voucher_type == self.doctype) & (dt.voucher_no == self.name))
.run(as_dict=True)
)
if rows:
references_map = frappe._dict()
for x in rows:
references_map.setdefault((x.parenttype, x.parent), []).append(x.name)
for doc, rows in references_map.items():
repost_doc = frappe.get_doc(doc[0], doc[1])
for row in rows:
if _doctype == "Repost Payment Ledger Items":
repost_doc.remove(repost_doc.get("repost_vouchers", {"name": row})[0])
else:
repost_doc.remove(repost_doc.get("vouchers", {"name": row})[0])
repost_doc.flags.ignore_validate_update_after_submit = True
repost_doc.flags.ignore_links = True
repost_doc.save(ignore_permissions=True)
def _remove_advance_payment_ledger_entries(self):
adv = qb.DocType("Advance Payment Ledger Entry")
qb.from_(adv).delete().where(adv.voucher_type.eq(self.doctype) & adv.voucher_no.eq(self.name)).run()
advance_payment_doctypes = frappe.get_hooks("advance_payment_receivable_doctypes") + frappe.get_hooks(
"advance_payment_payable_doctypes"
)
if self.doctype in advance_payment_doctypes:
qb.from_(adv).delete().where(
adv.against_voucher_type.eq(self.doctype) & adv.against_voucher_no.eq(self.name)
).run()
def on_trash(self):
from erpnext.accounts.utils import delete_exchange_gain_loss_journal
self._remove_advance_payment_ledger_entries()
self._remove_references_in_repost_doctypes()
self._remove_references_in_unreconcile()
self.remove_serial_and_batch_bundle()
# delete sl and gl entries on deletion of transaction
if frappe.get_single_value("Accounts Settings", "delete_linked_ledger_entries"):
# delete linked exchange gain/loss journal
delete_exchange_gain_loss_journal(self)
ple = frappe.qb.DocType("Payment Ledger Entry")
frappe.qb.from_(ple).delete().where(
(ple.voucher_type == self.doctype) & (ple.voucher_no == self.name)
| (
(ple.against_voucher_type == self.doctype)
& (ple.against_voucher_no == self.name)
& ple.delinked
== 1
)
).run()
gle = frappe.qb.DocType("GL Entry")
frappe.qb.from_(gle).delete().where(
(gle.voucher_type == self.doctype) & (gle.voucher_no == self.name)
).run()
sle = frappe.qb.DocType("Stock Ledger Entry")
frappe.qb.from_(sle).delete().where(
(sle.voucher_type == self.doctype) & (sle.voucher_no == self.name)
).run()
def remove_serial_and_batch_bundle(self):
bundles = frappe.get_all(
"Serial and Batch Bundle",
filters={"voucher_type": self.doctype, "voucher_no": self.name, "docstatus": ("!=", 1)},
)
for bundle in bundles:
frappe.delete_doc("Serial and Batch Bundle", bundle.name)
batches = frappe.get_all(
"Batch", filters={"reference_doctype": self.doctype, "reference_name": self.name}
)
for row in batches:
frappe.delete_doc("Batch", row.name)
def validate_company_in_accounting_dimension(self):
doc_field = DocType("DocField")
accounting_dimension = DocType("Accounting Dimension")
dimension_list = (
frappe.qb.from_(accounting_dimension)
.select(accounting_dimension.document_type)
.join(doc_field)
.on(doc_field.parent == accounting_dimension.document_type)
.where(doc_field.fieldname == "company")
).run(as_list=True)
dimension_list = sum(dimension_list, ["Project", "Cost Center"])
self.validate_company(dimension_list)
for child in self.get_all_children() or []:
self.validate_company(dimension_list, child)
def validate_company(self, dimension_list, child=None):
for dimension in dimension_list:
if not child:
dimension_value = self.get(frappe.scrub(dimension))
else:
dimension_value = child.get(frappe.scrub(dimension))
if dimension_value:
company = frappe.get_cached_value(dimension, dimension_value, "company")
if company and company != self.company:
frappe.throw(
_("{0}: {1} does not belong to the Company: {2}").format(
dimension, frappe.bold(dimension_value), self.company
)
)
def validate_party_address_and_contact(self):
party_type, party = self.get_party()
if not (party_type and party):
return
if party_type == "Customer":
billing_address, shipping_address = (
self.get("customer_address"),
self.get("shipping_address_name"),
)
self.validate_party_address(party, party_type, billing_address, shipping_address)
elif party_type == "Supplier":
billing_address = self.get("supplier_address")
self.validate_party_address(party, party_type, billing_address)
self.validate_party_contact(party, party_type)
def validate_party_address(self, party, party_type, billing_address, shipping_address=None):
if billing_address or shipping_address:
party_address = frappe.get_all(
"Dynamic Link",
{"link_doctype": party_type, "link_name": party, "parenttype": "Address"},
pluck="parent",
)
if billing_address and billing_address not in party_address:
frappe.throw(_("Billing Address does not belong to the {0}").format(party))
elif shipping_address and shipping_address not in party_address:
frappe.throw(_("Shipping Address does not belong to the {0}").format(party))
def validate_party_contact(self, party, party_type):
if self.get("contact_person"):
contact = frappe.get_all(
"Dynamic Link",
{"link_doctype": party_type, "link_name": party, "parenttype": "Contact"},
pluck="parent",
)
if self.contact_person and self.contact_person not in contact:
frappe.throw(_("Contact Person does not belong to the {0}").format(party))
def validate_return_against_account(self):
if self.doctype in ["Sales Invoice", "Purchase Invoice"] and self.is_return and self.return_against:
cr_dr_account_field = "debit_to" if self.doctype == "Sales Invoice" else "credit_to"
original_account = frappe.get_value(self.doctype, self.return_against, cr_dr_account_field)
if original_account != self.get(cr_dr_account_field):
frappe.throw(
_(
"Please set {0} to {1}, the same account that was used in the original invoice {2}."
).format(
frappe.bold(_(self.meta.get_label(cr_dr_account_field), context=self.doctype)),
frappe.bold(original_account),
frappe.bold(self.return_against),
)
)
def validate_deferred_income_expense_account(self):
field_map = {
"Sales Invoice": "deferred_revenue_account",
"Purchase Invoice": "deferred_expense_account",
}
for item in self.get("items"):
if item.get("enable_deferred_revenue") or item.get("enable_deferred_expense"):
if not item.get(field_map.get(self.doctype)):
default_deferred_account = frappe.get_cached_value(
"Company", self.company, "default_" + field_map.get(self.doctype)
)
if not default_deferred_account:
frappe.throw(
_(
"Row #{0}: Please update deferred revenue/expense account in item row or default account in company master"
).format(item.idx)
)
else:
item.set(field_map.get(self.doctype), default_deferred_account)
def validate_auto_repeat_subscription_dates(self):
if self.get("from_date") and self.get("to_date") and getdate(self.from_date) > getdate(self.to_date):
frappe.throw(_("To Date cannot be before From Date"), title=_("Invalid Auto Repeat Date"))
def validate_deferred_start_and_end_date(self):
for d in self.items:
if d.get("enable_deferred_revenue") or d.get("enable_deferred_expense"):
if not (d.service_start_date and d.service_end_date):
frappe.throw(
_("Row #{0}: Service Start and End Date is required for deferred accounting").format(
d.idx
)
)
elif getdate(d.service_start_date) > getdate(d.service_end_date):
frappe.throw(
_("Row #{0}: Service Start Date cannot be greater than Service End Date").format(
d.idx
)
)
elif getdate(self.posting_date) > getdate(d.service_end_date):
frappe.throw(
_("Row #{0}: Service End Date cannot be before Invoice Posting Date").format(d.idx)
)
def validate_invoice_documents_schedule(self):
if (
self.is_return
or (self.doctype == "Purchase Invoice" and self.is_paid)
or (self.doctype == "Sales Invoice" and self.is_pos)
or self.get("is_opening") == "Yes"
):
self.payment_terms_template = ""
self.payment_schedule = []
if self.is_return:
return
self.validate_payment_schedule_dates()
self.set_due_date()
self.set_payment_schedule()
if not self.get("ignore_default_payment_terms_template"):
self.validate_payment_schedule_amount()
self.validate_due_date()
self.validate_advance_entries()
def validate_non_invoice_documents_schedule(self):
self.set_payment_schedule()
self.validate_payment_schedule_dates()
self.validate_payment_schedule_amount()
def validate_all_documents_schedule(self):
if self.doctype in ("Sales Invoice", "Purchase Invoice"):
self.validate_invoice_documents_schedule()
elif self.doctype in ("Quotation", "Purchase Order", "Sales Order"):
self.validate_non_invoice_documents_schedule()
def before_print(self, settings=None):
if self.doctype in [
"Purchase Order",
"Sales Order",
"Sales Invoice",
"Purchase Invoice",
"Supplier Quotation",
"Purchase Receipt",
"Delivery Note",
"Quotation",
]:
if self.get("group_same_items"):
self.group_similar_items()
df = self.meta.get_field("discount_amount")
if self.get("discount_amount") and hasattr(self, "taxes") and not len(self.taxes):
df.set("print_hide", 0)
self.discount_amount = -self.discount_amount
else:
df.set("print_hide", 1)
set_print_templates_for_item_table(self, settings)
set_print_templates_for_taxes(self, settings)
def calculate_paid_amount(self):
if hasattr(self, "is_pos") or hasattr(self, "is_paid"):
is_paid = self.get("is_pos") or self.get("is_paid")
if is_paid:
if not self.cash_bank_account:
# show message that the amount is not paid
frappe.throw(
_(
"Note: Payment Entry will not be created since 'Cash or Bank Account' was not specified"
)
)
if cint(self.is_return) and self.grand_total > self.paid_amount:
self.paid_amount = flt(flt(self.grand_total), self.precision("paid_amount"))
elif not flt(self.paid_amount) and flt(self.outstanding_amount) > 0:
self.paid_amount = flt(flt(self.outstanding_amount), self.precision("paid_amount"))
self.base_paid_amount = flt(
self.paid_amount * self.conversion_rate, self.precision("base_paid_amount")
)
else:
self.paid_amount = 0
self.base_paid_amount = 0
def set_missing_values(self, for_validate=False):
if frappe.in_test:
for fieldname in ["posting_date", "transaction_date"]:
if self.meta.get_field(fieldname) and not self.get(fieldname):
self.set(fieldname, today())
break
def calculate_taxes_and_totals(self):
from erpnext.controllers.taxes_and_totals import calculate_taxes_and_totals
calculate_taxes_and_totals(self)
if self.doctype in (
"Sales Order",
"Delivery Note",
"Sales Invoice",
"POS Invoice",
):
self.calculate_commission()
self.calculate_contribution()
def validate_date_with_fiscal_year(self):
if self.meta.get_field("fiscal_year"):
date_field = None
if self.meta.get_field("posting_date"):
date_field = "posting_date"
elif self.meta.get_field("transaction_date"):
date_field = "transaction_date"
if date_field and self.get(date_field):
validate_fiscal_year(
self.get(date_field),
self.fiscal_year,
self.company,
self.meta.get_label(date_field),
self,
)
def validate_party_accounts(self):
if self.doctype not in ("Sales Invoice", "Purchase Invoice"):
return
if self.doctype == "Sales Invoice":
party_account_field = "debit_to"
item_field = "income_account"
else:
party_account_field = "credit_to"
item_field = "expense_account"
for item in self.get("items"):
if item.get(item_field) == self.get(party_account_field):
frappe.throw(
_("Row {0}: {1} {2} cannot be same as {3} (Party Account) {4}").format(
item.idx,
frappe.bold(frappe.unscrub(item_field)),
item.get(item_field),
frappe.bold(frappe.unscrub(party_account_field)),
self.get(party_account_field),
)
)
def validate_inter_company_reference(self):
if self.get("is_return"):
return
if self.doctype not in ("Purchase Invoice", "Purchase Receipt"):
return
if self.is_internal_transfer():
if not (
self.get("inter_company_reference")
or self.get("inter_company_invoice_reference")
or self.get("inter_company_order_reference")
) and not self.get("is_return"):
msg = _("Internal Sale or Delivery Reference missing.")
msg += _("Please create purchase from internal sale or delivery document itself")
frappe.throw(msg, title=_("Internal Sales Reference Missing"))
label = "Delivery Note Item" if self.doctype == "Purchase Receipt" else "Sales Invoice Item"
field = frappe.scrub(label)
for row in self.get("items"):
if not row.get(field):
msg = f"At Row {row.idx}: The field {bold(label)} is mandatory for internal transfer"
frappe.throw(_(msg), title=_("Internal Transfer Reference Missing"))
def validate_internal_transaction(self):
if not cint(frappe.get_single_value("Accounts Settings", "maintain_same_internal_transaction_rate")):
return
doctypes_list = ["Sales Order", "Sales Invoice", "Purchase Order", "Purchase Invoice"]
if self.doctype in doctypes_list and (
self.get("is_internal_customer") or self.get("is_internal_supplier")
):
self.validate_internal_transaction_based_on_voucher_type()
def validate_internal_transaction_based_on_voucher_type(self):
order = ["Sales Order", "Purchase Order"]
invoice = ["Sales Invoice", "Purchase Invoice"]
if self.doctype in order and self.get("inter_company_order_reference"):
# Fetch the linked order
linked_doctype = "Sales Order" if self.doctype == "Purchase Order" else "Purchase Order"
self.validate_line_items(
linked_doctype,
"sales_order" if linked_doctype == "Sales Order" else "purchase_order",
"sales_order_item" if linked_doctype == "Sales Order" else "purchase_order_item",
)
elif self.doctype in invoice and self.get("inter_company_invoice_reference"):
# Fetch the linked invoice
linked_doctype = "Sales Invoice" if self.doctype == "Purchase Invoice" else "Purchase Invoice"
self.validate_line_items(
linked_doctype,
"sales_invoice" if linked_doctype == "Sales Invoice" else "purchase_invoice",
"sales_invoice_item" if linked_doctype == "Sales Invoice" else "purchase_invoice_item",
)
def validate_line_items(self, ref_dt, ref_dn_field, ref_link_field):
action, role_allowed_to_override = frappe.get_cached_value(
"Accounts Settings", "None", ["maintain_same_rate_action", "role_to_override_stop_action"]
)
reference_names = [d.get(ref_link_field) for d in self.get("items") if d.get(ref_link_field)]
reference_details = self.get_reference_details(reference_names, ref_dt + " Item")
stop_actions = []
for d in self.get("items"):
if d.get(ref_link_field):
ref_rate = reference_details.get(d.get(ref_link_field))
if ref_rate is not None and abs(flt(d.rate - ref_rate, d.precision("rate"))) >= 0.01:
if action == "Stop":
user_roles = [
r["role"]
for r in frappe.get_all(
"Has Role", filters={"parent": frappe.session.user}, fields=["role"]
)
]
if role_allowed_to_override not in user_roles:
stop_actions.append(
_("Row #{0}: Rate must be same as {1}: {2} ({3} / {4})").format(
d.idx,
ref_dt,
self.inter_company_invoice_reference
if d.parenttype in ("Sales Invoice", "Purchase Invoice")
else d.get(ref_dn_field),
d.rate,
ref_rate,
)
)
else:
frappe.msgprint(
_("Row #{0}: Rate must be same as {1}: {2} ({3} / {4})").format(
d.idx,
ref_dt,
self.inter_company_invoice_reference
if d.parenttype in ("Sales Invoice", "Purchase Invoice")
else d.get(ref_dn_field),
d.rate,
ref_rate,
),
title=_("Warning"),
indicator="orange",
)
if stop_actions:
frappe.throw(stop_actions, as_list=True)
def disable_pricing_rule_on_internal_transfer(self):
if not self.get("ignore_pricing_rule") and self.is_internal_transfer():
self.ignore_pricing_rule = 1
frappe.msgprint(
_("Disabled pricing rules since this {} is an internal transfer").format(self.doctype),
alert=1,
)
def disable_tax_included_prices_for_internal_transfer(self):
if self.is_internal_transfer():
tax_updated = False
for tax in self.get("taxes"):
if tax.get("included_in_print_rate"):
tax.included_in_print_rate = 0
tax_updated = True
if tax_updated:
frappe.msgprint(
_("Disabled tax included prices since this {} is an internal transfer").format(
self.doctype
),
alert=1,
)
def validate_due_date(self):
if self.get("is_pos") or self.doctype not in ["Sales Invoice", "Purchase Invoice"]:
return
from erpnext.accounts.party import validate_due_date
posting_date = (
self.posting_date if self.doctype == "Sales Invoice" else (self.bill_date or self.posting_date)
)
# skip due date validation for records via Data Import
if frappe.flags.in_import and getdate(self.due_date) < getdate(posting_date):
self.due_date = posting_date
elif self.doctype in ["Sales Invoice", "Purchase Invoice"]:
bill_date = self.bill_date if self.doctype == "Purchase Invoice" else None
validate_due_date(
posting_date=posting_date,
due_date=self.due_date,
bill_date=bill_date,
template_name=self.payment_terms_template,
doctype=self.doctype,
)
def set_price_list_currency(self, buying_or_selling):
if self.meta.get_field("posting_date"):
transaction_date = self.posting_date
else:
transaction_date = self.transaction_date
if self.meta.get_field("currency"):
# price list part
if buying_or_selling.lower() == "selling":
fieldname = "selling_price_list"
args = "for_selling"
else:
fieldname = "buying_price_list"
args = "for_buying"
if self.meta.get_field(fieldname) and self.get(fieldname):
self.price_list_currency = frappe.db.get_value("Price List", self.get(fieldname), "currency")
if self.price_list_currency == self.company_currency:
self.plc_conversion_rate = 1.0
elif not self.plc_conversion_rate:
self.plc_conversion_rate = get_exchange_rate(
self.price_list_currency, self.company_currency, transaction_date, args
)
# currency
if not self.currency:
self.currency = self.price_list_currency
self.conversion_rate = self.plc_conversion_rate
elif self.currency == self.company_currency:
self.conversion_rate = 1.0
elif not self.conversion_rate:
self.conversion_rate = get_exchange_rate(
self.currency, self.company_currency, transaction_date, args
)
if (
self.currency
and buying_or_selling == "Buying"
and frappe.db.get_single_value("Buying Settings", "use_transaction_date_exchange_rate")
and self.doctype == "Purchase Invoice"
):
self.use_transaction_date_exchange_rate = True
self.conversion_rate = get_exchange_rate(
self.currency, self.company_currency, transaction_date, args
)
def set_missing_item_details(self, for_validate=False):
"""set missing item values"""
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
if hasattr(self, "items"):
parent_dict = {}
for fieldname in self.meta.get_valid_columns():
parent_dict[fieldname] = self.get(fieldname)
if self.doctype in ["Quotation", "Sales Order", "Delivery Note", "Sales Invoice"]:
document_type = f"{self.doctype} Item"
parent_dict.update({"document_type": document_type})
# party_name field used for customer in quotation
if (
self.doctype == "Quotation"
and self.quotation_to == "Customer"
and parent_dict.get("party_name")
):
parent_dict.update({"customer": parent_dict.get("party_name")})
self.pricing_rules = []
for item in self.get("items"):
if item.get("item_code"):
ctx: ItemDetailsCtx = ItemDetailsCtx(parent_dict.copy())
ctx.update(item.as_dict())
ctx.update(
{
"doctype": self.doctype,
"name": self.name,
"child_doctype": item.doctype,
"child_docname": item.name,
"ignore_pricing_rule": (
self.ignore_pricing_rule if hasattr(self, "ignore_pricing_rule") else 0
),
}
)
if not ctx.transaction_date:
ctx.transaction_date = ctx.posting_date
if self.get("is_subcontracted"):
ctx.is_subcontracted = self.is_subcontracted
ret = get_item_details(ctx, self, for_validate=for_validate, overwrite_warehouse=False)
for fieldname, value in ret.items():
if item.meta.get_field(fieldname) and value is not None:
if (
item.get(fieldname) is None
or fieldname in force_item_fields
or (
fieldname in ["serial_no", "batch_no"]
and item.get("use_serial_batch_fields")
)
):
item.set(fieldname, value)
if fieldname == "batch_no" and item.batch_no and not item.is_free_item:
if ret.get("rate"):
item.set("rate", ret.get("rate"))
if not item.get("price_list_rate") and ret.get("price_list_rate"):
item.set("price_list_rate", ret.get("price_list_rate"))
elif fieldname in ["cost_center", "conversion_factor"] and not item.get(
fieldname
):
item.set(fieldname, value)
elif fieldname == "item_tax_rate" and not (
self.get("is_return") and self.get("return_against")
):
item.set(fieldname, value)
elif fieldname == "serial_no":
# Ensure that serial numbers are matched against Stock UOM
item_conversion_factor = item.get("conversion_factor") or 1.0
item_qty = abs(item.get("qty")) * item_conversion_factor
if item_qty != len(get_serial_nos(item.get("serial_no"))):
item.set(fieldname, value)
elif (
ret.get("pricing_rule_removed")
and value is not None
and fieldname
in [
"discount_percentage",
"discount_amount",
"rate",
"margin_rate_or_amount",
"margin_type",
"remove_free_item",
]
):
# reset pricing rule fields if pricing_rule_removed
item.set(fieldname, value)
elif fieldname == "expense_account" and not item.get("expense_account"):
item.expense_account = value
if self.doctype in ["Purchase Invoice", "Sales Invoice"] and item.meta.get_field(
"is_fixed_asset"
):
item.set("is_fixed_asset", ret.get("is_fixed_asset", 0))
# Double check for cost center
# Items add via promotional scheme may not have cost center set
if hasattr(item, "cost_center") and not item.get("cost_center"):
item.set(
"cost_center",
self.get("cost_center") or erpnext.get_default_cost_center(self.company),
)
if ret.get("pricing_rules"):
self.apply_pricing_rule_on_items(item, ret)
self.set_pricing_rule_details(item, ret)
else:
# Transactions line item without item code
uom = item.get("uom")
stock_uom = item.get("stock_uom")
if bool(uom) != bool(stock_uom): # xor
item.stock_uom = item.uom = uom or stock_uom
# UOM cannot be zero so substitute as 1
item.conversion_factor = (
get_uom_conv_factor(item.get("uom"), item.get("stock_uom"))
or item.get("conversion_factor")
or 1
)
if self.doctype == "Purchase Invoice":
self.set_expense_account(for_validate)
def apply_pricing_rule_on_items(self, item, pricing_rule_args):
if not pricing_rule_args.get("validate_applied_rule", 0):
# if user changed the discount percentage then set user's discount percentage ?
if pricing_rule_args.get("price_or_product_discount") == "Price":
item.set("pricing_rules", pricing_rule_args.get("pricing_rules"))
if pricing_rule_args.get("apply_rule_on_other_items"):
other_items = json.loads(pricing_rule_args.get("apply_rule_on_other_items"))
if other_items and item.item_code not in other_items:
return
item.set("discount_percentage", pricing_rule_args.get("discount_percentage"))
item.set("discount_amount", pricing_rule_args.get("discount_amount"))
if pricing_rule_args.get("pricing_rule_for") == "Rate":
item.set("price_list_rate", pricing_rule_args.get("price_list_rate"))
if item.get("price_list_rate"):
item.rate = flt(
item.price_list_rate * (1.0 - (flt(item.discount_percentage) / 100.0)),
item.precision("rate"),
)
if item.get("discount_amount"):
item.rate = item.price_list_rate - item.discount_amount
if item.get("apply_discount_on_discounted_rate") and pricing_rule_args.get("rate"):
item.rate = pricing_rule_args.get("rate")
elif pricing_rule_args.get("free_item_data"):
apply_pricing_rule_for_free_items(self, pricing_rule_args.get("free_item_data"))
elif pricing_rule_args.get("validate_applied_rule"):
for pricing_rule in get_applied_pricing_rules(item.get("pricing_rules")):
pricing_rule_doc = frappe.get_cached_doc("Pricing Rule", pricing_rule)
for field in ["discount_percentage", "discount_amount", "rate"]:
if item.get(field) < pricing_rule_doc.get(field):
title = get_link_to_form("Pricing Rule", pricing_rule)
frappe.msgprint(
_("Row {0}: user has not applied the rule {1} on the item {2}").format(
item.idx, frappe.bold(title), frappe.bold(item.item_code)
)
)
def set_pricing_rule_details(self, item_row, args):
pricing_rules = get_applied_pricing_rules(args.get("pricing_rules"))
if not pricing_rules:
return
for pricing_rule in pricing_rules:
self.append(
"pricing_rules",
{
"pricing_rule": pricing_rule,
"item_code": item_row.item_code,
"child_docname": item_row.name,
"rule_applied": True,
},
)
def set_taxes(self):
if not self.meta.get_field("taxes"):
return
tax_master_doctype = self.meta.get_field("taxes_and_charges").options
if (self.is_new() or self.is_pos_profile_changed()) and not self.get("taxes"):
if self.company and not self.get("taxes_and_charges"):
# get the default tax master
self.taxes_and_charges = frappe.db.get_value(
tax_master_doctype, {"is_default": 1, "company": self.company}
)
self.append_taxes_from_master(tax_master_doctype)
def is_pos_profile_changed(self):
if (
self.doctype == "Sales Invoice"
and self.is_pos
and self.pos_profile != frappe.db.get_value("Sales Invoice", self.name, "pos_profile")
):
return True
def set_taxes_and_charges(self):
if self.doctype == "Material Request":
# Material Request does not have taxes
return
if self.get("taxes") or self.get("is_pos"):
return
if frappe.get_single_value(
"Accounts Settings", "add_taxes_from_taxes_and_charges_template"
) and hasattr(self, "taxes_and_charges"):
if tax_master_doctype := self.meta.get_field("taxes_and_charges").options:
self.append_taxes_from_master(tax_master_doctype)
if frappe.get_single_value("Accounts Settings", "add_taxes_from_item_tax_template"):
self.append_taxes_from_item_tax_template()
def append_taxes_from_master(self, tax_master_doctype=None):
if self.get("taxes_and_charges"):
if not tax_master_doctype:
tax_master_doctype = self.meta.get_field("taxes_and_charges").options
self.extend("taxes", get_taxes_and_charges(tax_master_doctype, self.get("taxes_and_charges")))
def append_taxes_from_item_tax_template(self):
if not frappe.get_single_value("Accounts Settings", "add_taxes_from_item_tax_template"):
return
for row in self.items:
item_tax_rate = row.get("item_tax_rate")
if not item_tax_rate:
continue
if isinstance(item_tax_rate, str):
item_tax_rate = parse_json(item_tax_rate)
for account_head, _rate in item_tax_rate.items():
row = self.get_tax_row(account_head)
if not row:
self.append(
"taxes",
{
"charge_type": "On Net Total",
"account_head": account_head,
"rate": 0,
"description": account_head,
"set_by_item_tax_template": 1,
"category": "Total",
"add_deduct_tax": "Add",
},
)
def get_tax_row(self, account_head):
for row in self.taxes:
if row.account_head == account_head:
return row
def set_other_charges(self):
self.set("taxes", [])
self.set_taxes()
def validate_enabled_taxes_and_charges(self):
taxes_and_charges_doctype = self.meta.get_options("taxes_and_charges")
if self.taxes_and_charges and frappe.get_cached_value(
taxes_and_charges_doctype, self.taxes_and_charges, "disabled"
):
frappe.throw(_("{0} '{1}' is disabled").format(taxes_and_charges_doctype, self.taxes_and_charges))
def validate_tax_account_company(self):
for d in self.get("taxes"):
if d.account_head:
tax_account_company = frappe.get_cached_value("Account", d.account_head, "company")
if tax_account_company != self.company:
frappe.throw(
_("Row #{0}: Account {1} does not belong to company {2}").format(
d.idx, d.account_head, self.company
)
)
def get_gl_dict(self, args, account_currency=None, item=None):
"""this method populates the common properties of a gl entry record"""
posting_date = args.get("posting_date") or self.get("posting_date")
fiscal_years = get_fiscal_years(posting_date, company=self.company)
if len(fiscal_years) > 1:
frappe.throw(
_("Multiple fiscal years exist for the date {0}. Please set company in Fiscal Year").format(
formatdate(posting_date)
)
)
else:
fiscal_year = fiscal_years[0][0]
gl_dict = frappe._dict(
{
"company": self.company,
"posting_date": posting_date,
"fiscal_year": fiscal_year,
"voucher_type": self.doctype,
"voucher_no": self.name,
"remarks": self.get("remarks") or self.get("remark"),
"debit": 0,
"credit": 0,
"debit_in_account_currency": 0,
"credit_in_account_currency": 0,
"is_opening": self.get("is_opening") or "No",
"party_type": None,
"party": None,
"project": self.get("project"),
"post_net_value": args.get("post_net_value"),
"voucher_detail_no": args.get("voucher_detail_no"),
"voucher_subtype": self.get_voucher_subtype(),
}
)
with temporary_flag("company", self.company):
update_gl_dict_with_regional_fields(self, gl_dict)
update_gl_dict_with_app_based_fields(self, gl_dict)
accounting_dimensions = get_accounting_dimensions()
dimension_dict = frappe._dict()
for dimension in accounting_dimensions:
dimension_dict[dimension] = self.get(dimension)
if item and item.get(dimension):
dimension_dict[dimension] = item.get(dimension)
gl_dict.update(dimension_dict)
gl_dict.update(args)
if not account_currency:
account_currency = get_account_currency(gl_dict.account)
if gl_dict.account and self.doctype not in [
"Journal Entry",
"Period Closing Voucher",
"Payment Entry",
"Purchase Receipt",
"Purchase Invoice",
"Stock Entry",
]:
self.validate_account_currency(gl_dict.account, account_currency)
if gl_dict.account and self.doctype not in [
"Journal Entry",
"Period Closing Voucher",
"Payment Entry",
]:
set_balance_in_account_currency(
gl_dict, account_currency, self.get("conversion_rate"), self.company_currency
)
# Update details in transaction currency
if self.doctype not in ["Purchase Invoice", "Sales Invoice", "Journal Entry", "Payment Entry"]:
gl_dict.update(
{
"transaction_currency": self.get("currency") or self.company_currency,
"transaction_exchange_rate": self.get("conversion_rate", 1),
"debit_in_transaction_currency": self.get_value_in_transaction_currency(
account_currency, gl_dict, "debit"
),
"credit_in_transaction_currency": self.get_value_in_transaction_currency(
account_currency, gl_dict, "credit"
),
}
)
if not args.get("against_voucher_type") and self.get("against_voucher_type"):
gl_dict.update({"against_voucher_type": self.get("against_voucher_type")})
if not args.get("against_voucher") and self.get("against_voucher"):
gl_dict.update({"against_voucher": self.get("against_voucher")})
return gl_dict
def get_voucher_subtype(self):
voucher_subtypes = {
"Journal Entry": "voucher_type",
"Payment Entry": "payment_type",
"Stock Entry": "stock_entry_type",
"Asset Capitalization": "entry_type",
}
for method_name in frappe.get_hooks("voucher_subtypes"):
voucher_subtype = frappe.get_attr(method_name)(self)
if voucher_subtype:
return voucher_subtype
if self.doctype in voucher_subtypes:
return self.get(voucher_subtypes[self.doctype])
elif self.doctype == "Purchase Receipt" and self.is_return:
return "Purchase Return"
elif self.doctype == "Delivery Note" and self.is_return:
return "Sales Return"
elif self.doctype == "Sales Invoice" and self.is_return:
return "Credit Note"
elif self.doctype == "Sales Invoice" and self.is_debit_note:
return "Debit Note"
elif self.doctype == "Purchase Invoice" and self.is_return:
return "Debit Note"
return self.doctype
def get_value_in_transaction_currency(self, account_currency, gl_dict, field):
if account_currency == self.get("currency"):
return gl_dict.get(field + "_in_account_currency")
else:
return flt(gl_dict.get(field, 0) / self.get("conversion_rate", 1))
def validate_zero_qty_for_return_invoices_with_stock(self):
rows = []
for item in self.items:
if not flt(item.qty):
rows.append(item)
if rows:
frappe.throw(
_(
"For Return Invoices with Stock effect, '0' qty Items are not allowed. Following rows are affected: {0}"
).format(frappe.bold(comma_and(["#" + str(x.idx) for x in rows])))
)
def validate_qty_is_not_zero(self):
if self.flags.allow_zero_qty:
return
for item in self.items:
if self.doctype == "Purchase Receipt" and item.rejected_qty:
continue
if not flt(item.qty):
frappe.throw(
msg=_("Row #{0}: Quantity for Item {1} cannot be zero.").format(
item.idx, frappe.bold(item.item_code)
),
title=_("Invalid Quantity"),
exc=InvalidQtyError,
)
def validate_account_currency(self, account, account_currency=None):
valid_currency = [self.company_currency]
if self.get("currency") and self.currency != self.company_currency:
valid_currency.append(self.currency)
if account_currency not in valid_currency:
frappe.throw(
_("Account {0} is invalid. Account Currency must be {1}").format(
account, (" " + _("or") + " ").join(valid_currency)
)
)
def clear_unallocated_advances(self, childtype, parentfield):
self.set(parentfield, self.get(parentfield, {"allocated_amount": ["not in", [0, None, ""]]}))
doctype = frappe.qb.DocType(childtype)
frappe.qb.from_(doctype).delete().where(
(doctype.parentfield == parentfield)
& (doctype.parent == self.name)
& (doctype.allocated_amount == 0)
).run()
@frappe.whitelist()
def apply_shipping_rule(self):
if self.shipping_rule:
shipping_rule = frappe.get_doc("Shipping Rule", self.shipping_rule)
shipping_rule.apply(self)
self.calculate_taxes_and_totals()
def get_shipping_address(self):
"""Returns Address object from shipping address fields if present"""
# shipping address fields can be `shipping_address_name` or `shipping_address`
# try getting value from both
for fieldname in ("shipping_address_name", "shipping_address"):
shipping_field = self.meta.get_field(fieldname)
if shipping_field and shipping_field.fieldtype == "Link":
if self.get(fieldname):
return frappe.get_doc("Address", self.get(fieldname))
return {}
@frappe.whitelist()
def set_advances(self):
"""Returns list of advances against Account, Party, Reference"""
res = self.get_advance_entries(
include_unallocated=not cint(self.get("only_include_allocated_payments"))
)
self.set("advances", [])
advance_allocated = 0
for d in res:
if self.get("party_account_currency") == self.company_currency:
amount = self.get("base_rounded_total") or self.base_grand_total
else:
amount = self.get("rounded_total") or self.grand_total
allocated_amount = min(amount - advance_allocated, d.amount)
advance_allocated += flt(allocated_amount)
advance_row = {
"doctype": self.doctype + " Advance",
"reference_type": d.reference_type,
"reference_name": d.reference_name,
"reference_row": d.reference_row,
"remarks": d.remarks,
"advance_amount": flt(d.amount),
"allocated_amount": allocated_amount,
"ref_exchange_rate": flt(d.exchange_rate), # exchange_rate of advance entry
"difference_posting_date": self.posting_date,
}
if d.get("paid_from"):
advance_row["account"] = d.paid_from
if d.get("paid_to"):
advance_row["account"] = d.paid_to
self.append("advances", advance_row)
def get_advance_entries(self, include_unallocated=True):
party_account = []
default_advance_account = None
if self.doctype in ["Sales Invoice", "POS Invoice"]:
party_type = "Customer"
party = self.customer
amount_field = "credit_in_account_currency"
order_field = "sales_order"
order_doctype = "Sales Order"
party_account.append(self.debit_to)
else:
party_type = "Supplier"
party = self.supplier
amount_field = "debit_in_account_currency"
order_field = "purchase_order"
order_doctype = "Purchase Order"
party_account.append(self.credit_to)
party_accounts = get_party_account(
party_type, party=party, company=self.company, include_advance=True
)
if party_accounts:
party_account.append(party_accounts[0])
default_advance_account = party_accounts[1] if len(party_accounts) == 2 else None
order_list = list(set(d.get(order_field) for d in self.get("items") if d.get(order_field)))
journal_entries = get_advance_journal_entries(
party_type, party, party_account, amount_field, order_doctype, order_list, include_unallocated
)
payment_entries = get_advance_payment_entries_for_regional(
party_type,
party,
party_account,
order_doctype,
order_list,
default_advance_account,
include_unallocated,
)
res = journal_entries + payment_entries
return res
def is_inclusive_tax(self):
is_inclusive = cint(frappe.get_single_value("Accounts Settings", "show_inclusive_tax_in_print"))
if is_inclusive:
is_inclusive = 0
if self.get("taxes", filters={"included_in_print_rate": 1}):
is_inclusive = 1
return is_inclusive
def should_show_taxes_as_table_in_print(self):
return cint(frappe.get_single_value("Accounts Settings", "show_taxes_as_table_in_print"))
def validate_advance_entries(self):
order_field = "sales_order" if self.doctype == "Sales Invoice" else "purchase_order"
order_list = list(set(d.get(order_field) for d in self.get("items") if d.get(order_field)))
if not order_list:
return
advance_entries = self.get_advance_entries(include_unallocated=False)
if advance_entries:
advance_entries_against_si = [d.reference_name for d in self.get("advances")]
for d in advance_entries:
if not advance_entries_against_si or d.reference_name not in advance_entries_against_si:
frappe.msgprint(
_(
"Payment Entry {0} is linked against Order {1}, check if it should be pulled as advance in this invoice."
).format(d.reference_name, d.against_order)
)
def set_advance_gain_or_loss(self):
if self.get("conversion_rate") == 1 or not self.get("advances"):
return
is_purchase_invoice = self.doctype == "Purchase Invoice"
party_account = self.credit_to if is_purchase_invoice else self.debit_to
if get_account_currency(party_account) != self.currency:
return
for d in self.get("advances"):
advance_exchange_rate = d.ref_exchange_rate
if d.allocated_amount and self.conversion_rate != advance_exchange_rate:
base_allocated_amount_in_ref_rate = advance_exchange_rate * d.allocated_amount
base_allocated_amount_in_inv_rate = self.conversion_rate * d.allocated_amount
difference = base_allocated_amount_in_ref_rate - base_allocated_amount_in_inv_rate
d.exchange_gain_loss = difference
def make_precision_loss_gl_entry(self, gl_entries):
(
round_off_account,
round_off_cost_center,
round_off_for_opening,
) = get_round_off_account_and_cost_center(
self.company, "Purchase Invoice", self.name, self.use_company_roundoff_cost_center
)
precision_loss = self.get("base_net_total") - flt(
self.get("net_total") * self.conversion_rate, self.precision("net_total")
)
credit_or_debit = "credit" if self.doctype == "Purchase Invoice" else "debit"
against = self.supplier if self.doctype == "Purchase Invoice" else self.customer
if precision_loss:
gl_entries.append(
self.get_gl_dict(
{
"account": round_off_account,
"against": against,
credit_or_debit: precision_loss,
"cost_center": round_off_cost_center
if self.use_company_roundoff_cost_center
else self.cost_center or round_off_cost_center,
"remarks": _("Net total calculation precision loss"),
}
)
)
def gain_loss_journal_already_booked(
self,
gain_loss_account,
exc_gain_loss,
ref2_dt,
ref2_dn,
ref2_detail_no,
) -> bool:
"""
Check if gain/loss is booked
"""
if res := frappe.db.get_all(
"Journal Entry Account",
filters={
"docstatus": 1,
"account": gain_loss_account,
"reference_type": ref2_dt, # this will be Journal Entry
"reference_name": ref2_dn,
"reference_detail_no": ref2_detail_no,
},
pluck="parent",
):
# deduplicate
res = list({x for x in res})
if exc_vouchers := frappe.db.get_all(
"Journal Entry",
filters={"name": ["in", res], "voucher_type": "Exchange Gain Or Loss"},
fields=["voucher_type", "total_debit", "total_credit"],
):
booked_voucher = exc_vouchers[0]
if (
booked_voucher.total_debit == exc_gain_loss
and booked_voucher.total_credit == exc_gain_loss
and booked_voucher.voucher_type == "Exchange Gain Or Loss"
):
return True
return False
def make_exchange_gain_loss_journal(
self, args: dict | None = None, dimensions_dict: dict | None = None
) -> None:
"""
Make Exchange Gain/Loss journal for Invoices and Payments
"""
# Cancelling existing exchange gain/loss journals is handled during the `on_cancel` event.
# see accounts/utils.py:cancel_exchange_gain_loss_journal()
if self.docstatus == 1:
if dimensions_dict is None:
dimensions_dict = frappe._dict()
active_dimensions = get_dimensions()[0]
for dim in active_dimensions:
dimensions_dict[dim.fieldname] = self.get(dim.fieldname)
if self.get("doctype") == "Journal Entry":
# 'args' is populated with exchange gain/loss account and the amount to be booked.
# These are generated by Sales/Purchase Invoice during reconciliation and advance allocation.
# and below logic is only for such scenarios
if args:
precision = get_currency_precision()
for arg in args:
# Advance section uses `exchange_gain_loss` and reconciliation uses `difference_amount`
if (
flt(arg.get("difference_amount", 0), precision) != 0
or flt(arg.get("exchange_gain_loss", 0), precision) != 0
) and arg.get("difference_account"):
party_account = arg.get("account")
gain_loss_account = arg.get("difference_account")
difference_amount = arg.get("difference_amount") or arg.get("exchange_gain_loss")
if difference_amount > 0:
dr_or_cr = "debit" if arg.get("party_type") == "Customer" else "credit"
else:
dr_or_cr = "credit" if arg.get("party_type") == "Customer" else "debit"
reverse_dr_or_cr = "debit" if dr_or_cr == "credit" else "credit"
if not self.gain_loss_journal_already_booked(
gain_loss_account,
difference_amount,
self.doctype,
self.name,
arg.get("referenced_row"),
):
posting_date = arg.get("difference_posting_date") or frappe.db.get_value(
arg.voucher_type, arg.voucher_no, "posting_date"
)
je = create_gain_loss_journal(
self.company,
posting_date,
arg.get("party_type"),
arg.get("party"),
party_account,
gain_loss_account,
difference_amount,
dr_or_cr,
reverse_dr_or_cr,
arg.get("against_voucher_type"),
arg.get("against_voucher"),
arg.get("idx"),
self.doctype,
self.name,
arg.get("referenced_row"),
arg.get("cost_center"),
dimensions_dict,
)
frappe.msgprint(
_("Exchange Gain/Loss amount has been booked through {0}").format(
get_link_to_form("Journal Entry", je)
)
)
if self.get("doctype") == "Payment Entry":
# For Payment Entry, exchange_gain_loss field in the `references` table is the trigger for journal creation
gain_loss_to_book = [x for x in self.references if x.exchange_gain_loss != 0]
booked = []
if gain_loss_to_book:
[x.reference_doctype for x in gain_loss_to_book]
[x.reference_name for x in gain_loss_to_book]
je = qb.DocType("Journal Entry")
jea = qb.DocType("Journal Entry Account")
parents = (
qb.from_(jea)
.select(jea.parent)
.where(
(jea.reference_type == "Payment Entry")
& (jea.reference_name == self.name)
& (jea.docstatus == 1)
)
.run()
)
booked = []
if parents:
booked = (
qb.from_(je)
.inner_join(jea)
.on(je.name == jea.parent)
.select(jea.reference_type, jea.reference_name, jea.reference_detail_no)
.where(
(je.docstatus == 1)
& (je.name.isin(parents))
& (je.voucher_type == "Exchange Gain or Loss")
)
.run()
)
for d in gain_loss_to_book:
# Filter out References for which Gain/Loss is already booked
if d.exchange_gain_loss and (
(d.reference_doctype, d.reference_name, str(d.idx)) not in booked
):
if self.book_advance_payments_in_separate_party_account:
party_account = d.account
else:
if self.payment_type == "Receive":
party_account = self.paid_from
elif self.payment_type == "Pay":
party_account = self.paid_to
dr_or_cr = "debit" if d.exchange_gain_loss > 0 else "credit"
# Inverse debit/credit for payable accounts
if self.is_payable_account(d.reference_doctype, party_account):
dr_or_cr = "debit" if dr_or_cr == "credit" else "credit"
reverse_dr_or_cr = "debit" if dr_or_cr == "credit" else "credit"
gain_loss_account = frappe.get_cached_value(
"Company", self.company, "exchange_gain_loss_account"
)
je = create_gain_loss_journal(
self.company,
args.get("difference_posting_date") if args else self.posting_date,
self.party_type,
self.party,
party_account,
gain_loss_account,
d.exchange_gain_loss,
dr_or_cr,
reverse_dr_or_cr,
d.reference_doctype,
d.reference_name,
d.idx,
self.doctype,
self.name,
d.idx,
self.cost_center,
dimensions_dict,
)
frappe.msgprint(
_("Exchange Gain/Loss amount has been booked through {0}").format(
get_link_to_form("Journal Entry", je)
)
)
def is_payable_account(self, reference_doctype, account):
if reference_doctype == "Purchase Invoice" or (
reference_doctype == "Journal Entry"
and frappe.get_cached_value("Account", account, "account_type") == "Payable"
):
return True
return False
def update_against_document_in_jv(self):
"""
Links invoice and advance voucher:
1. cancel advance voucher
2. split into multiple rows if partially adjusted, assign against voucher
3. submit advance voucher
"""
if self.doctype == "Sales Invoice":
party_type = "Customer"
party = self.customer
party_account = self.debit_to
dr_or_cr = "credit_in_account_currency"
else:
party_type = "Supplier"
party = self.supplier
party_account = self.credit_to
dr_or_cr = "debit_in_account_currency"
lst = []
for d in self.get("advances"):
if flt(d.allocated_amount) > 0:
args = frappe._dict(
{
"voucher_type": d.reference_type,
"voucher_no": d.reference_name,
"voucher_detail_no": d.reference_row,
"against_voucher_type": self.doctype,
"against_voucher": self.name,
"account": party_account,
"party_type": party_type,
"party": party,
"is_advance": "Yes",
"dr_or_cr": dr_or_cr,
"unadjusted_amount": flt(d.advance_amount),
"allocated_amount": flt(d.allocated_amount),
"precision": d.precision("advance_amount"),
"exchange_rate": (
self.conversion_rate
if self.party_account_currency != self.company_currency
else 1
),
"grand_total": (
self.base_grand_total
if self.party_account_currency == self.company_currency
else self.grand_total
),
"outstanding_amount": self.outstanding_amount,
"difference_account": frappe.get_cached_value(
"Company", self.company, "exchange_gain_loss_account"
),
"exchange_gain_loss": flt(d.get("exchange_gain_loss")),
"difference_posting_date": d.get("difference_posting_date"),
}
)
lst.append(args)
if lst:
from erpnext.accounts.utils import reconcile_against_document
# pass dimension values to utility method
active_dimensions = get_dimensions()[0]
for x in lst:
for dim in active_dimensions:
if self.get(dim.fieldname):
x.update({dim.fieldname: self.get(dim.fieldname)})
reconcile_against_document(lst, active_dimensions=active_dimensions)
def cancel_system_generated_credit_debit_notes(self):
# Cancel 'Credit/Debit' Note Journal Entries, if found.
if self.doctype in ["Sales Invoice", "Purchase Invoice"]:
voucher_type = "Credit Note" if self.doctype == "Sales Invoice" else "Debit Note"
journals = frappe.db.get_all(
"Journal Entry",
filters={
"is_system_generated": 1,
"reference_type": self.doctype,
"reference_name": self.name,
"voucher_type": voucher_type,
"docstatus": 1,
},
pluck="name",
)
for x in journals:
frappe.get_doc("Journal Entry", x).cancel()
def on_cancel(self):
from erpnext.accounts.doctype.bank_transaction.bank_transaction import (
remove_from_bank_transaction,
)
from erpnext.accounts.utils import (
cancel_common_party_journal,
cancel_exchange_gain_loss_journal,
unlink_ref_doc_from_payment_entries,
)
remove_from_bank_transaction(self.doctype, self.name)
if self.doctype in ["Sales Invoice", "Purchase Invoice", "Payment Entry", "Journal Entry"]:
self.cancel_system_generated_credit_debit_notes()
# Cancel Exchange Gain/Loss Journal before unlinking
cancel_exchange_gain_loss_journal(self)
cancel_common_party_journal(self)
if frappe.get_single_value("Accounts Settings", "unlink_payment_on_cancellation_of_invoice"):
unlink_ref_doc_from_payment_entries(self)
elif self.doctype in ["Sales Order", "Purchase Order"]:
if frappe.get_single_value("Accounts Settings", "unlink_advance_payment_on_cancelation_of_order"):
unlink_ref_doc_from_payment_entries(self)
if self.doctype == "Sales Order":
self.unlink_ref_doc_from_po()
def unlink_ref_doc_from_po(self):
so_items = []
for item in self.items:
so_items.append(item.name)
linked_po = list(
set(
frappe.get_all(
"Purchase Order Item",
filters={
"sales_order": self.name,
"sales_order_item": ["in", so_items],
"docstatus": ["<", 2],
},
pluck="parent",
)
)
)
if linked_po:
frappe.db.set_value(
"Purchase Order Item",
{"sales_order": self.name, "sales_order_item": ["in", so_items], "docstatus": ["<", 2]},
{"sales_order": None, "sales_order_item": None},
)
frappe.msgprint(_("Purchase Orders {0} are un-linked").format("\n".join(linked_po)))
def get_tax_map(self):
tax_map = {}
for tax in self.get("taxes"):
tax_map.setdefault(tax.account_head, 0.0)
tax_map[tax.account_head] += tax.tax_amount
return tax_map
def get_amount_and_base_amount(self, item, enable_discount_accounting):
amount = item.net_amount
base_amount = item.base_net_amount
if (
enable_discount_accounting
and self.get("discount_amount")
and self.get("additional_discount_account")
):
amount += item.distributed_discount_amount
base_amount += flt(
item.distributed_discount_amount * self.get("conversion_rate"),
item.precision("distributed_discount_amount"),
)
return amount, base_amount
def get_tax_amounts(self, tax, enable_discount_accounting):
amount = tax.tax_amount_after_discount_amount
base_amount = tax.base_tax_amount_after_discount_amount
if (
enable_discount_accounting
and self.get("discount_amount")
and self.get("additional_discount_account")
and self.get("apply_discount_on") == "Grand Total"
):
amount = tax.tax_amount
base_amount = tax.base_tax_amount
return amount, base_amount
def make_discount_gl_entries(self, gl_entries):
enable_discount_accounting = cint(
frappe.get_single_value("Selling Settings", "enable_discount_accounting")
)
if enable_discount_accounting:
for item in self.get("items"):
if item.get("discount_amount") and item.get("discount_account"):
discount_amount = item.discount_amount * item.qty
income_account = (
item.income_account
if (not item.enable_deferred_revenue or self.is_return)
else item.deferred_revenue_account
)
account_currency = get_account_currency(item.discount_account)
gl_entries.append(
self.get_gl_dict(
{
"account": item.discount_account,
"against": self.customer,
"debit": flt(
discount_amount * self.get("conversion_rate"),
item.precision("discount_amount"),
),
"debit_in_account_currency": flt(
discount_amount, item.precision("discount_amount")
),
"cost_center": item.cost_center,
"project": item.project,
},
account_currency,
item=item,
)
)
account_currency = get_account_currency(income_account)
gl_entries.append(
self.get_gl_dict(
{
"account": income_account,
"against": self.customer,
"credit": flt(
discount_amount * self.get("conversion_rate"),
item.precision("discount_amount"),
),
"credit_in_account_currency": flt(
discount_amount, item.precision("discount_amount")
),
"cost_center": item.cost_center,
"project": item.project or self.project,
},
account_currency,
item=item,
)
)
if (
(enable_discount_accounting or self.get("is_cash_or_non_trade_discount"))
and self.get("additional_discount_account")
and self.get("discount_amount")
):
gl_entries.append(
self.get_gl_dict(
{
"account": self.additional_discount_account,
"against": self.customer,
"debit": self.base_discount_amount,
"cost_center": self.cost_center or erpnext.get_default_cost_center(self.company),
},
item=self,
)
)
def validate_multiple_billing(self, ref_dt, item_ref_dn, based_on):
from erpnext.controllers.status_updater import get_allowance_for
ref_wise_billed_amount = self.get_reference_wise_billed_amt(ref_dt, item_ref_dn, based_on)
if not ref_wise_billed_amount:
return
total_overbilled_amt = 0.0
overbilled_items = []
precision = self.precision(based_on, "items")
precision_allowance = 1 / (10**precision)
role_allowed_to_overbill = frappe.get_single_value("Accounts Settings", "role_allowed_to_over_bill")
is_overbilling_allowed = role_allowed_to_overbill in frappe.get_roles()
for row in ref_wise_billed_amount.values():
total_billed_amt = row.billed_amt
allowance = get_allowance_for(row.item_code, {}, None, None, "amount")[0]
max_allowed_amt = flt(row.ref_amt * (100 + allowance) / 100)
if total_billed_amt < 0 and max_allowed_amt < 0:
# while making debit note against purchase return entry(purchase receipt) getting overbill error
total_billed_amt, max_allowed_amt = abs(total_billed_amt), abs(max_allowed_amt)
overbill_amt = total_billed_amt - max_allowed_amt
row["max_allowed_amt"] = max_allowed_amt
total_overbilled_amt += overbill_amt
if overbill_amt > precision_allowance and not is_overbilling_allowed:
if self.doctype != "Purchase Invoice" or not cint(
frappe.db.get_single_value(
"Buying Settings", "bill_for_rejected_quantity_in_purchase_invoice"
)
):
overbilled_items.append(row)
if overbilled_items:
self.throw_overbill_exception(overbilled_items, precision)
if is_overbilling_allowed and total_overbilled_amt > 0.1:
frappe.msgprint(
_("Overbilling of {} ignored because you have {} role.").format(
total_overbilled_amt, role_allowed_to_overbill
),
indicator="orange",
alert=True,
)
def get_billing_reference_details(self, reference_names, reference_doctype, based_on):
return frappe._dict(
frappe.get_all(
reference_doctype,
filters={"name": ("in", reference_names)},
fields=["name", based_on],
as_list=1,
)
)
def get_reference_wise_billed_amt(self, ref_dt, item_ref_dn, based_on):
"""
Returns Sum of Amount of
Sales/Purchase Invoice Items
that are linked to `item_ref_dn` (`dn_detail` / `pr_detail`)
that are submitted OR not submitted but are under current invoice
"""
reference_names = [d.get(item_ref_dn) for d in self.items if d.get(item_ref_dn)]
if not reference_names:
return
ref_wise_billed_amount = {}
precision = self.precision(based_on, "items")
reference_details = self.get_billing_reference_details(reference_names, ref_dt + " Item", based_on)
already_billed = self.get_already_billed_amount(reference_names, item_ref_dn, based_on)
for item in self.items:
key = item.get(item_ref_dn)
if not key:
continue
ref_amt = flt(reference_details.get(key), precision)
current_amount = flt(item.get(based_on), precision)
if not ref_amt:
if current_amount: # Skip warning for free items
frappe.msgprint(
_(
"System will not check over billing since amount for Item {0} in {1} is zero"
).format(item.item_code, ref_dt),
title=_("Warning"),
indicator="orange",
)
continue
ref_wise_billed_amount.setdefault(
key,
frappe._dict(item_code=item.item_code, billed_amt=0.0, ref_amt=ref_amt, rows=[]),
)
ref_wise_billed_amount[key]["rows"].append(item.idx)
ref_wise_billed_amount[key]["ref_amt"] = ref_amt
ref_wise_billed_amount[key]["billed_amt"] += current_amount
if key in already_billed:
ref_wise_billed_amount[key]["billed_amt"] += flt(already_billed.pop(key, 0), precision)
return ref_wise_billed_amount
def get_already_billed_amount(self, reference_names, item_ref_dn, based_on):
item_doctype = frappe.qb.DocType(self.items[0].doctype)
based_on_field = frappe.qb.Field(based_on)
join_field = frappe.qb.Field(item_ref_dn)
return frappe._dict(
(
frappe.qb.from_(item_doctype)
.select(join_field, Sum(based_on_field))
.where(join_field.isin(reference_names))
.where((item_doctype.docstatus == 1) & (item_doctype.parent != self.name))
.groupby(join_field)
).run()
)
def throw_overbill_exception(self, overbilled_items, precision):
message = (
_("
Cannot overbill for the following Items:
") + "To allow over-billing, please set allowance in Accounts Settings.
") frappe.throw(_(message)) def get_company_default(self, fieldname, ignore_validation=False): from erpnext.accounts.utils import get_company_default return get_company_default(self.company, fieldname, ignore_validation=ignore_validation) def get_stock_items(self): stock_items = [] item_codes = list(set(item.item_code for item in self.get("items"))) if item_codes: stock_items = frappe.db.get_values( "Item", {"name": ["in", item_codes], "is_stock_item": 1}, pluck="name", cache=True ) return stock_items def calculate_total_advance_from_ledger(self): adv = frappe.qb.DocType("Advance Payment Ledger Entry") advance = ( frappe.qb.from_(adv) .select(adv.currency.as_("account_currency"), Abs(Sum(adv.amount)).as_("amount")) .where( (adv.against_voucher_type == self.doctype) & (adv.against_voucher_no == self.name) & (adv.company == self.company) ) .run(as_dict=True) ) return advance def set_total_advance_paid(self): advance = self.calculate_total_advance_from_ledger() advance_paid, order_total = None, None if advance: advance = advance[0] advance_paid = flt(advance.amount, self.precision("advance_paid")) formatted_advance_paid = fmt_money( advance_paid, precision=self.precision("advance_paid"), currency=advance.account_currency ) if advance.account_currency: frappe.db.set_value( self.doctype, self.name, "party_account_currency", advance.account_currency ) if advance.account_currency == self.currency: order_total = self.get("rounded_total") or self.grand_total precision = "rounded_total" if self.get("rounded_total") else "grand_total" else: order_total = self.get("base_rounded_total") or self.base_grand_total precision = "base_rounded_total" if self.get("base_rounded_total") else "base_grand_total" formatted_order_total = fmt_money( order_total, precision=self.precision(precision), currency=advance.account_currency ) if self.currency == self.company_currency and advance_paid > order_total: frappe.throw( _( "Total advance ({0}) against Order {1} cannot be greater than the Grand Total ({2})" ).format(formatted_advance_paid, self.name, formatted_order_total) ) self.db_set("advance_paid", advance_paid) self.set_advance_payment_status() def set_advance_payment_status(self): new_status = None PaymentRequest = frappe.qb.DocType("Payment Request") paid_amount = frappe.get_value( doctype="Payment Request", filters={ "reference_doctype": self.doctype, "reference_name": self.name, "docstatus": 1, }, fieldname=Sum(PaymentRequest.grand_total - PaymentRequest.outstanding_amount), ) if not paid_amount: if self.doctype in frappe.get_hooks("advance_payment_receivable_doctypes"): new_status = "Not Requested" if paid_amount is None else "Requested" elif self.doctype in frappe.get_hooks("advance_payment_payable_doctypes"): new_status = "Not Initiated" if paid_amount is None else "Initiated" else: total_amount = self.get("rounded_total") or self.get("grand_total") new_status = "Fully Paid" if paid_amount == total_amount else "Partially Paid" if new_status == self.advance_payment_status: return self.db_set("advance_payment_status", new_status, update_modified=False) self.set_status(update=True) self.notify_update() @property def company_abbr(self): if not hasattr(self, "_abbr"): self._abbr = frappe.get_cached_value("Company", self.company, "abbr") return self._abbr def raise_missing_debit_credit_account_error(self, party_type, party): """Raise an error if debit to/credit to account does not exist.""" db_or_cr = ( frappe.bold(_("Debit To")) if self.doctype == "Sales Invoice" else frappe.bold(_("Credit To")) ) rec_or_pay = "Receivable" if self.doctype == "Sales Invoice" else "Payable" link_to_party = frappe.utils.get_link_to_form(party_type, party) link_to_company = frappe.utils.get_link_to_form("Company", self.company) message = _("{0} Account not found against Customer {1}.").format(db_or_cr, frappe.bold(party) or "") message += "