Files
schuetz3-erpnext/erpnext/controllers/stock_controller.py
2025-06-20 21:06:43 +05:30

1974 lines
60 KiB
Python

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# License: GNU General Public License v3. See license.txt
import json
from collections import defaultdict
import frappe
from frappe import _, bold
from frappe.query_builder.functions import Sum
from frappe.utils import cint, cstr, flt, get_link_to_form, getdate
import erpnext
from erpnext.accounts.general_ledger import (
make_gl_entries,
make_reverse_gl_entries,
process_gl_map,
)
from erpnext.accounts.utils import cancel_exchange_gain_loss_journal, get_fiscal_year
from erpnext.controllers.accounts_controller import AccountsController
from erpnext.controllers.sales_and_purchase_return import (
available_serial_batch_for_return,
filter_serial_batches,
make_serial_batch_bundle_for_return,
)
from erpnext.stock import get_warehouse_account_map
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import (
get_evaluated_inventory_dimension,
)
from erpnext.stock.doctype.serial_and_batch_bundle.serial_and_batch_bundle import (
get_type_of_transaction,
)
from erpnext.stock.stock_ledger import get_items_to_be_repost
class QualityInspectionRequiredError(frappe.ValidationError):
pass
class QualityInspectionRejectedError(frappe.ValidationError):
pass
class QualityInspectionNotSubmittedError(frappe.ValidationError):
pass
class BatchExpiredError(frappe.ValidationError):
pass
class StockController(AccountsController):
def validate(self):
super().validate()
if self.docstatus == 0:
for table_name in ["items", "packed_items", "supplied_items"]:
self.validate_duplicate_serial_and_batch_bundle(table_name)
if not self.get("is_return"):
self.validate_inspection()
self.validate_serialized_batch()
self.clean_serial_nos()
self.validate_customer_provided_item()
self.set_rate_of_stock_uom()
self.validate_internal_transfer()
self.validate_putaway_capacity()
self.reset_conversion_factor()
self.check_zero_rate()
def reset_conversion_factor(self):
for row in self.get("items"):
if row.uom != row.stock_uom:
continue
if row.conversion_factor != 1.0:
row.conversion_factor = 1.0
frappe.msgprint(
_(
"Conversion factor for item {0} has been reset to 1.0 as the uom {1} is same as stock uom {2}."
).format(bold(row.item_code), bold(row.uom), bold(row.stock_uom)),
alert=True,
)
def check_zero_rate(self):
if self.doctype in [
"POS Invoice",
"Purchase Invoice",
"Sales Invoice",
"Delivery Note",
"Purchase Receipt",
"Stock Entry",
"Stock Reconciliation",
]:
for item in self.get("items"):
if (item.get("valuation_rate") == 0 or item.get("incoming_rate") == 0) and item.get(
"allow_zero_valuation_rate"
) == 0:
frappe.toast(
_(
"Row #{0}: Item {1} has zero rate but 'Allow Zero Valuation Rate' is not enabled."
).format(item.idx, frappe.bold(item.item_code)),
indicator="orange",
)
def validate_items_exist(self):
if not self.get("items"):
return
items = [d.item_code for d in self.get("items")]
exists_items = frappe.get_all("Item", filters={"name": ("in", items)}, pluck="name")
non_exists_items = set(items) - set(exists_items)
if non_exists_items:
frappe.throw(_("Items {0} do not exist in the Item master.").format(", ".join(non_exists_items)))
def validate_duplicate_serial_and_batch_bundle(self, table_name):
if not self.get(table_name):
return
sbb_list = []
for item in self.get(table_name):
if item.get("serial_and_batch_bundle"):
sbb_list.append(item.get("serial_and_batch_bundle"))
if item.get("rejected_serial_and_batch_bundle"):
sbb_list.append(item.get("rejected_serial_and_batch_bundle"))
if sbb_list:
SLE = frappe.qb.DocType("Stock Ledger Entry")
data = (
frappe.qb.from_(SLE)
.select(SLE.voucher_type, SLE.voucher_no, SLE.serial_and_batch_bundle)
.where(
(SLE.docstatus == 1)
& (SLE.serial_and_batch_bundle.notnull())
& (SLE.serial_and_batch_bundle.isin(sbb_list))
)
.limit(1)
).run(as_dict=True)
if data:
data = data[0]
frappe.throw(
_("Serial and Batch Bundle {0} is already used in {1} {2}.").format(
frappe.bold(data.serial_and_batch_bundle), data.voucher_type, data.voucher_no
)
)
def make_gl_entries(self, gl_entries=None, from_repost=False, via_landed_cost_voucher=False):
if self.docstatus == 2:
make_reverse_gl_entries(voucher_type=self.doctype, voucher_no=self.name)
provisional_accounting_for_non_stock_items = cint(
frappe.get_cached_value(
"Company", self.company, "enable_provisional_accounting_for_non_stock_items"
)
)
is_asset_pr = any(d.get("is_fixed_asset") for d in self.get("items"))
if (
cint(erpnext.is_perpetual_inventory_enabled(self.company))
or provisional_accounting_for_non_stock_items
or is_asset_pr
):
warehouse_account = get_warehouse_account_map(self.company)
if self.docstatus == 1:
if not gl_entries:
gl_entries = (
self.get_gl_entries(warehouse_account, via_landed_cost_voucher)
if self.doctype == "Purchase Receipt"
else self.get_gl_entries(warehouse_account)
)
make_gl_entries(gl_entries, from_repost=from_repost)
def validate_serialized_batch(self):
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
is_material_issue = False
if self.doctype == "Stock Entry" and self.purpose in ["Material Issue", "Material Transfer"]:
is_material_issue = True
for d in self.get("items"):
if hasattr(d, "serial_no") and hasattr(d, "batch_no") and d.serial_no and d.batch_no:
serial_nos = frappe.get_all(
"Serial No",
fields=["batch_no", "name", "warehouse"],
filters={"name": ("in", get_serial_nos(d.serial_no))},
)
for row in serial_nos:
if row.warehouse and row.batch_no != d.batch_no:
frappe.throw(
_("Row #{0}: Serial No {1} does not belong to Batch {2}").format(
d.idx, row.name, d.batch_no
)
)
if is_material_issue:
continue
if flt(d.qty) > 0.0 and d.get("batch_no") and self.get("posting_date") and self.docstatus < 2:
expiry_date = frappe.get_cached_value("Batch", d.get("batch_no"), "expiry_date")
if expiry_date and getdate(expiry_date) < getdate(self.posting_date):
frappe.throw(
_("Row #{0}: The batch {1} has already expired.").format(
d.idx, get_link_to_form("Batch", d.get("batch_no"))
),
BatchExpiredError,
)
def clean_serial_nos(self):
from erpnext.stock.doctype.serial_no.serial_no import clean_serial_no_string
for row in self.get("items"):
if hasattr(row, "serial_no") and row.serial_no:
# remove extra whitespace and store one serial no on each line
row.serial_no = clean_serial_no_string(row.serial_no)
for row in self.get("packed_items") or []:
if hasattr(row, "serial_no") and row.serial_no:
# remove extra whitespace and store one serial no on each line
row.serial_no = clean_serial_no_string(row.serial_no)
def make_bundle_using_old_serial_batch_fields(self, table_name=None, via_landed_cost_voucher=False):
if self.get("_action") == "update_after_submit":
return
# To handle test cases
if frappe.flags.in_test and frappe.flags.use_serial_and_batch_fields:
return
if not table_name:
table_name = "items"
if self.doctype == "Asset Capitalization":
table_name = "stock_items"
parent_details = frappe._dict()
if table_name == "packed_items":
parent_details = self.get_parent_details_for_packed_items()
for row in self.get(table_name):
if (
not via_landed_cost_voucher
and row.serial_and_batch_bundle
and (row.serial_no or row.batch_no)
):
self.validate_serial_nos_and_batches_with_bundle(row)
if not row.serial_no and not row.batch_no and not row.get("rejected_serial_no"):
continue
if not row.use_serial_batch_fields and (
row.serial_no or row.batch_no or row.get("rejected_serial_no")
):
row.use_serial_batch_fields = 1
if row.use_serial_batch_fields and (
not row.serial_and_batch_bundle and not row.get("rejected_serial_and_batch_bundle")
):
bundle_details = {
"item_code": row.get("rm_item_code") or row.item_code,
"posting_date": self.posting_date,
"posting_time": self.posting_time,
"voucher_type": self.doctype,
"voucher_no": self.name,
"voucher_detail_no": row.name,
"company": self.company,
"is_rejected": 1 if row.get("rejected_warehouse") else 0,
"use_serial_batch_fields": row.use_serial_batch_fields,
"via_landed_cost_voucher": via_landed_cost_voucher,
"do_not_submit": True if not via_landed_cost_voucher else False,
}
if row.get("qty") or row.get("consumed_qty") or row.get("stock_qty"):
self.update_bundle_details(bundle_details, table_name, row, parent_details=parent_details)
self.create_serial_batch_bundle(bundle_details, row)
if row.get("rejected_qty"):
self.update_bundle_details(bundle_details, table_name, row, is_rejected=True)
self.create_serial_batch_bundle(bundle_details, row)
def get_parent_details_for_packed_items(self):
parent_details = frappe._dict()
for row in self.get("items"):
parent_details[row.name] = row
return parent_details
def make_bundle_for_sales_purchase_return(self, table_name=None):
if not self.get("is_return"):
return
if not table_name:
table_name = "items"
self.make_bundle_for_non_rejected_qty(table_name)
if self.doctype in ["Purchase Invoice", "Purchase Receipt"]:
self.make_bundle_for_rejected_qty(table_name)
def make_bundle_for_rejected_qty(self, table_name=None):
field, reference_ids = self.get_reference_ids(
table_name, "rejected_qty", "rejected_serial_and_batch_bundle"
)
if not reference_ids:
return
child_doctype = self.doctype + " Item"
available_dict = available_serial_batch_for_return(
field, child_doctype, reference_ids, is_rejected=True
)
for row in self.get(table_name):
if data := available_dict.get(row.get(field)):
qty_field = "rejected_qty"
warehouse_field = "rejected_warehouse"
if row.get("return_qty_from_rejected_warehouse"):
qty_field = "qty"
warehouse_field = "warehouse"
if not data.get("qty"):
frappe.throw(
_("For the {0}, no stock is available for the return in the warehouse {1}.").format(
frappe.bold(row.item_code), row.get(warehouse_field)
)
)
data = filter_serial_batches(
self, data, row, warehouse_field=warehouse_field, qty_field=qty_field
)
bundle = make_serial_batch_bundle_for_return(data, row, self, warehouse_field, qty_field)
if row.get("return_qty_from_rejected_warehouse"):
row.db_set(
{
"serial_and_batch_bundle": bundle,
"batch_no": "",
"serial_no": "",
}
)
else:
row.db_set(
{
"rejected_serial_and_batch_bundle": bundle,
"batch_no": "",
"rejected_serial_no": "",
}
)
def make_bundle_for_non_rejected_qty(self, table_name):
field, reference_ids = self.get_reference_ids(table_name)
if not reference_ids:
return
child_doctype = self.doctype + " Item"
available_dict = available_serial_batch_for_return(field, child_doctype, reference_ids)
for row in self.get(table_name):
if data := available_dict.get(row.get(field)):
data = filter_serial_batches(self, data, row)
bundle = make_serial_batch_bundle_for_return(data, row, self)
row.db_set(
{
"serial_and_batch_bundle": bundle,
"batch_no": "",
"serial_no": "",
}
)
if self.doctype in ["Sales Invoice", "Delivery Note"]:
row.db_set(
"incoming_rate", frappe.db.get_value("Serial and Batch Bundle", bundle, "avg_rate")
)
def get_reference_ids(self, table_name, qty_field=None, bundle_field=None) -> tuple[str, list[str]]:
field = {
"Sales Invoice": "sales_invoice_item",
"Delivery Note": "dn_detail",
"Purchase Receipt": "purchase_receipt_item",
"Purchase Invoice": "purchase_invoice_item",
"POS Invoice": "pos_invoice_item",
}.get(self.doctype)
if not bundle_field:
bundle_field = "serial_and_batch_bundle"
if not qty_field:
qty_field = "qty"
reference_ids = []
for row in self.get(table_name):
if not self.is_serial_batch_item(row.item_code):
continue
if (
row.get(field)
and (
qty_field == "qty"
and not row.get("return_qty_from_rejected_warehouse")
or qty_field == "rejected_qty"
and (row.get("return_qty_from_rejected_warehouse") or row.get("rejected_warehouse"))
)
and not row.get("use_serial_batch_fields")
and not row.get(bundle_field)
):
reference_ids.append(row.get(field))
return field, reference_ids
@frappe.request_cache
def is_serial_batch_item(self, item_code) -> bool:
if not frappe.db.exists("Item", item_code):
frappe.throw(_("Item {0} does not exist.").format(bold(item_code)))
item_details = frappe.db.get_value("Item", item_code, ["has_serial_no", "has_batch_no"], as_dict=1)
if item_details.has_serial_no or item_details.has_batch_no:
return True
return False
def update_bundle_details(self, bundle_details, table_name, row, is_rejected=False, parent_details=None):
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
# Since qty field is different for different doctypes
qty = row.get("qty")
warehouse = row.get("warehouse")
if table_name == "packed_items":
type_of_transaction = "Inward"
if not self.is_return:
type_of_transaction = "Outward"
elif table_name == "supplied_items":
qty = row.consumed_qty
warehouse = self.supplier_warehouse
type_of_transaction = "Outward"
if self.is_return:
type_of_transaction = "Inward"
else:
type_of_transaction = get_type_of_transaction(self, row)
if hasattr(row, "stock_qty"):
qty = row.stock_qty
if self.doctype == "Stock Entry":
qty = row.transfer_qty
warehouse = row.s_warehouse or row.t_warehouse
serial_nos = row.serial_no
if is_rejected:
serial_nos = row.get("rejected_serial_no")
type_of_transaction = "Inward" if not self.is_return else "Outward"
qty = row.get("rejected_qty")
warehouse = row.get("rejected_warehouse")
if (
self.is_internal_transfer()
and self.doctype in ["Sales Invoice", "Delivery Note"]
and self.is_return
):
warehouse = row.get("target_warehouse") or row.get("warehouse")
type_of_transaction = "Outward"
if table_name == "packed_items":
if not warehouse:
warehouse = parent_details[row.parent_detail_docname].warehouse
bundle_details["voucher_detail_no"] = parent_details[row.parent_detail_docname].name
bundle_details.update(
{
"qty": qty,
"is_rejected": is_rejected,
"type_of_transaction": type_of_transaction,
"warehouse": warehouse,
"batches": frappe._dict({row.batch_no: qty}) if row.batch_no else None,
"serial_nos": get_serial_nos(serial_nos) if serial_nos else None,
"batch_no": row.batch_no,
}
)
def create_serial_batch_bundle(self, bundle_details, row):
from erpnext.stock.serial_batch_bundle import SerialBatchCreation
sn_doc = SerialBatchCreation(bundle_details).make_serial_and_batch_bundle()
field = "serial_and_batch_bundle"
if bundle_details.get("is_rejected"):
field = "rejected_serial_and_batch_bundle"
row.set(field, sn_doc.name)
row.db_set({field: sn_doc.name})
def validate_serial_nos_and_batches_with_bundle(self, row):
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
throw_error = False
if row.serial_no:
serial_nos = frappe.get_all(
"Serial and Batch Entry",
fields=["serial_no"],
filters={"parent": row.serial_and_batch_bundle},
)
serial_nos = sorted([cstr(d.serial_no) for d in serial_nos])
parsed_serial_nos = get_serial_nos(row.serial_no)
if len(serial_nos) != len(parsed_serial_nos):
throw_error = True
elif serial_nos != parsed_serial_nos:
for serial_no in serial_nos:
if serial_no not in parsed_serial_nos:
throw_error = True
break
elif row.batch_no:
batches = frappe.get_all(
"Serial and Batch Entry", fields=["batch_no"], filters={"parent": row.serial_and_batch_bundle}
)
batches = sorted([d.batch_no for d in batches])
if batches != [row.batch_no]:
throw_error = True
if throw_error:
frappe.throw(
_(
"At row {0}: Serial and Batch Bundle {1} has already created. Please remove the values from the serial no or batch no fields."
).format(row.idx, row.serial_and_batch_bundle)
)
def set_use_serial_batch_fields(self):
if frappe.get_single_value("Stock Settings", "use_serial_batch_fields"):
for row in self.items:
row.use_serial_batch_fields = 1
def get_gl_entries(self, warehouse_account=None, default_expense_account=None, default_cost_center=None):
if not warehouse_account:
warehouse_account = get_warehouse_account_map(self.company)
sle_map = self.get_stock_ledger_details()
voucher_details = self.get_voucher_details(default_expense_account, default_cost_center, sle_map)
gl_list = []
warehouse_with_no_account = []
precision = self.get_debit_field_precision()
for item_row in voucher_details:
sle_list = sle_map.get(item_row.name)
sle_rounding_diff = 0.0
if sle_list:
for sle in sle_list:
if warehouse_account.get(sle.warehouse):
# from warehouse account
sle_rounding_diff += flt(sle.stock_value_difference)
self.check_expense_account(item_row)
# expense account/ target_warehouse / source_warehouse
if item_row.get("target_warehouse"):
warehouse = item_row.get("target_warehouse")
expense_account = warehouse_account[warehouse]["account"]
else:
expense_account = item_row.expense_account
gl_list.append(
self.get_gl_dict(
{
"account": warehouse_account[sle.warehouse]["account"],
"against": expense_account,
"cost_center": item_row.cost_center,
"project": sle.get("project") or item_row.project or self.get("project"),
"remarks": self.get("remarks") or _("Accounting Entry for Stock"),
"debit": flt(sle.stock_value_difference, precision),
"is_opening": item_row.get("is_opening")
or self.get("is_opening")
or "No",
},
warehouse_account[sle.warehouse]["account_currency"],
item=item_row,
)
)
gl_list.append(
self.get_gl_dict(
{
"account": expense_account,
"against": warehouse_account[sle.warehouse]["account"],
"cost_center": item_row.cost_center,
"remarks": self.get("remarks") or _("Accounting Entry for Stock"),
"debit": -1 * flt(sle.stock_value_difference, precision),
"project": sle.get("project")
or item_row.get("project")
or self.get("project"),
"is_opening": item_row.get("is_opening")
or self.get("is_opening")
or "No",
},
item=item_row,
)
)
elif sle.warehouse not in warehouse_with_no_account:
warehouse_with_no_account.append(sle.warehouse)
if abs(sle_rounding_diff) > (1.0 / (10**precision)) and self.is_internal_transfer():
warehouse_asset_account = ""
if self.get("is_internal_customer"):
warehouse_asset_account = warehouse_account[item_row.get("target_warehouse")]["account"]
elif self.get("is_internal_supplier"):
warehouse_asset_account = warehouse_account[item_row.get("warehouse")]["account"]
expense_account = frappe.get_cached_value("Company", self.company, "default_expense_account")
if not expense_account:
frappe.throw(
_(
"Please set default cost of goods sold account in company {0} for booking rounding gain and loss during stock transfer"
).format(frappe.bold(self.company))
)
gl_list.append(
self.get_gl_dict(
{
"account": expense_account,
"against": warehouse_asset_account,
"cost_center": item_row.cost_center,
"project": item_row.project or self.get("project"),
"remarks": _("Rounding gain/loss Entry for Stock Transfer"),
"debit": sle_rounding_diff,
"is_opening": item_row.get("is_opening") or self.get("is_opening") or "No",
},
warehouse_account[sle.warehouse]["account_currency"],
item=item_row,
)
)
gl_list.append(
self.get_gl_dict(
{
"account": warehouse_asset_account,
"against": expense_account,
"cost_center": item_row.cost_center,
"remarks": _("Rounding gain/loss Entry for Stock Transfer"),
"credit": sle_rounding_diff,
"project": item_row.get("project") or self.get("project"),
"is_opening": item_row.get("is_opening") or self.get("is_opening") or "No",
},
item=item_row,
)
)
if warehouse_with_no_account:
for wh in warehouse_with_no_account:
if frappe.get_cached_value("Warehouse", wh, "company"):
frappe.throw(
_(
"Warehouse {0} is not linked to any account, please mention the account in the warehouse record or set default inventory account in company {1}."
).format(wh, self.company)
)
return process_gl_map(gl_list, precision=precision)
def get_debit_field_precision(self):
if not frappe.flags.debit_field_precision:
frappe.flags.debit_field_precision = frappe.get_precision("GL Entry", "debit_in_account_currency")
return frappe.flags.debit_field_precision
def get_voucher_details(self, default_expense_account, default_cost_center, sle_map):
if self.doctype == "Stock Reconciliation":
reconciliation_purpose = frappe.db.get_value(self.doctype, self.name, "purpose")
is_opening = "Yes" if reconciliation_purpose == "Opening Stock" else "No"
details = []
for voucher_detail_no in sle_map:
details.append(
frappe._dict(
{
"name": voucher_detail_no,
"expense_account": default_expense_account,
"cost_center": default_cost_center,
"is_opening": is_opening,
}
)
)
return details
else:
details = self.get("items")
if default_expense_account or default_cost_center:
for d in details:
if default_expense_account and not d.get("expense_account"):
d.expense_account = default_expense_account
if default_cost_center and not d.get("cost_center"):
d.cost_center = default_cost_center
return details
def get_items_and_warehouses(self) -> tuple[list[str], list[str]]:
"""Get list of items and warehouses affected by a transaction"""
if not (hasattr(self, "items") or hasattr(self, "packed_items")):
return [], []
item_rows = (self.get("items") or []) + (self.get("packed_items") or [])
items = {d.item_code for d in item_rows if d.item_code}
warehouses = set()
for d in item_rows:
if d.get("warehouse"):
warehouses.add(d.warehouse)
if self.doctype == "Stock Entry":
if d.get("s_warehouse"):
warehouses.add(d.s_warehouse)
if d.get("t_warehouse"):
warehouses.add(d.t_warehouse)
return list(items), list(warehouses)
def get_stock_ledger_details(self):
stock_ledger = {}
table = frappe.qb.DocType("Stock Ledger Entry")
stock_ledger_entries = (
frappe.qb.from_(table)
.select(
table.name,
table.warehouse,
table.stock_value_difference,
table.valuation_rate,
table.voucher_detail_no,
table.item_code,
table.posting_date,
table.posting_time,
table.actual_qty,
table.qty_after_transaction,
table.project,
)
.where(
(table.voucher_type == self.doctype)
& (table.voucher_no == self.name)
& (table.is_cancelled == 0)
)
).run(as_dict=True)
for sle in stock_ledger_entries:
stock_ledger.setdefault(sle.voucher_detail_no, []).append(sle)
return stock_ledger
def check_expense_account(self, item):
if not item.get("expense_account"):
msg = _("Please set an Expense Account in the Items table")
frappe.throw(
_("Row #{0}: Expense Account not set for the Item {1}. {2}").format(
item.idx, frappe.bold(item.item_code), msg
),
title=_("Expense Account Missing"),
)
else:
is_expense_account = (
frappe.get_cached_value("Account", item.get("expense_account"), "report_type")
== "Profit and Loss"
)
if (
self.doctype
not in (
"Purchase Receipt",
"Purchase Invoice",
"Stock Reconciliation",
"Stock Entry",
"Subcontracting Receipt",
)
and not is_expense_account
):
frappe.throw(
_("Expense / Difference account ({0}) must be a 'Profit or Loss' account").format(
item.get("expense_account")
)
)
if is_expense_account and not item.get("cost_center"):
frappe.throw(
_("{0} {1}: Cost Center is mandatory for Item {2}").format(
_(self.doctype), self.name, item.get("item_code")
)
)
def delete_auto_created_batches(self):
for table_name in ["items", "packed_items", "supplied_items"]:
if not self.get(table_name):
continue
for row in self.get(table_name):
update_values = {}
if row.get("batch_no"):
update_values["batch_no"] = None
if row.get("serial_and_batch_bundle"):
update_values["serial_and_batch_bundle"] = None
frappe.db.set_value(
"Serial and Batch Bundle", row.serial_and_batch_bundle, {"is_cancelled": 1}
)
if update_values:
row.db_set(update_values)
if table_name == "items" and row.get("rejected_serial_and_batch_bundle"):
frappe.db.set_value(
"Serial and Batch Bundle", row.rejected_serial_and_batch_bundle, {"is_cancelled": 1}
)
row.db_set("rejected_serial_and_batch_bundle", None)
if row.get("current_serial_and_batch_bundle"):
row.db_set("current_serial_and_batch_bundle", None)
def set_serial_and_batch_bundle(self, table_name=None, ignore_validate=False):
if not table_name:
table_name = "items"
QTY_FIELD = {
"serial_and_batch_bundle": "qty",
"current_serial_and_batch_bundle": "current_qty",
"rejected_serial_and_batch_bundle": "rejected_qty",
}
for row in self.get(table_name):
for field in QTY_FIELD.keys():
if row.get(field):
frappe.get_doc("Serial and Batch Bundle", row.get(field)).set_serial_and_batch_values(
self, row, qty_field=QTY_FIELD[field]
)
def make_package_for_transfer(
self, serial_and_batch_bundle, warehouse, type_of_transaction=None, do_not_submit=None, qty=0
):
return make_bundle_for_material_transfer(
is_new=self.is_new(),
docstatus=self.docstatus,
voucher_type=self.doctype,
voucher_no=self.name,
serial_and_batch_bundle=serial_and_batch_bundle,
warehouse=warehouse,
type_of_transaction=type_of_transaction,
do_not_submit=do_not_submit,
qty=qty,
)
def get_sl_entries(self, d, args):
sl_dict = frappe._dict(
{
"item_code": d.get("item_code", None),
"warehouse": d.get("warehouse", None),
"serial_and_batch_bundle": d.get("serial_and_batch_bundle"),
"posting_date": self.posting_date,
"posting_time": self.posting_time,
"fiscal_year": get_fiscal_year(self.posting_date, company=self.company)[0],
"voucher_type": self.doctype,
"voucher_no": self.name,
"voucher_detail_no": d.name,
"actual_qty": (self.docstatus == 1 and 1 or -1) * flt(d.get("stock_qty")),
"stock_uom": frappe.get_cached_value(
"Item", args.get("item_code") or d.get("item_code"), "stock_uom"
),
"incoming_rate": 0,
"company": self.company,
"project": d.get("project") or self.get("project"),
"is_cancelled": 1 if self.docstatus == 2 else 0,
}
)
sl_dict.update(args)
self.update_inventory_dimensions(d, sl_dict)
if self.docstatus == 2:
from erpnext.deprecation_dumpster import deprecation_warning
deprecation_warning("unknown", "v16", "No instructions.")
# To handle denormalized serial no records, will br deprecated in v16
for field in ["serial_no", "batch_no"]:
if d.get(field):
sl_dict[field] = d.get(field)
return sl_dict
def set_landed_cost_voucher_amount(self):
for d in self.get("items"):
lcv_item = frappe.qb.DocType("Landed Cost Item")
query = (
frappe.qb.from_(lcv_item)
.select(Sum(lcv_item.applicable_charges), lcv_item.cost_center)
.where((lcv_item.docstatus == 1) & (lcv_item.receipt_document == self.name))
)
if self.doctype == "Stock Entry":
query = query.where(lcv_item.stock_entry_item == d.name)
else:
query = query.where(lcv_item.purchase_receipt_item == d.name)
lc_voucher_data = query.run(as_list=True)
d.landed_cost_voucher_amount = lc_voucher_data[0][0] if lc_voucher_data else 0.0
if not d.cost_center and lc_voucher_data and lc_voucher_data[0][1]:
d.db_set("cost_center", lc_voucher_data[0][1])
def has_landed_cost_amount(self):
for row in self.items:
if row.get("landed_cost_voucher_amount"):
return True
return False
def get_item_account_wise_lcv_entries(self):
if not self.has_landed_cost_amount():
return
landed_cost_vouchers = frappe.get_all(
"Landed Cost Purchase Receipt",
fields=["parent"],
filters={"receipt_document": self.name, "docstatus": 1},
)
if not landed_cost_vouchers:
return
item_account_wise_cost = {}
row_fieldname = "purchase_receipt_item"
if self.doctype == "Stock Entry":
row_fieldname = "stock_entry_item"
for lcv in landed_cost_vouchers:
landed_cost_voucher_doc = frappe.get_doc("Landed Cost Voucher", lcv.parent)
based_on_field = "applicable_charges"
# Use amount field for total item cost for manually cost distributed LCVs
if landed_cost_voucher_doc.distribute_charges_based_on != "Distribute Manually":
based_on_field = frappe.scrub(landed_cost_voucher_doc.distribute_charges_based_on)
total_item_cost = 0
if based_on_field:
for item in landed_cost_voucher_doc.items:
total_item_cost += item.get(based_on_field)
for item in landed_cost_voucher_doc.items:
if item.receipt_document == self.name:
for account in landed_cost_voucher_doc.taxes:
exchange_rate = account.exchange_rate or 1
item_account_wise_cost.setdefault((item.item_code, item.get(row_fieldname)), {})
item_account_wise_cost[(item.item_code, item.get(row_fieldname))].setdefault(
account.expense_account, {"amount": 0.0, "base_amount": 0.0}
)
item_row = item_account_wise_cost[(item.item_code, item.get(row_fieldname))][
account.expense_account
]
if total_item_cost > 0:
item_row["amount"] += account.amount * item.get(based_on_field) / total_item_cost
item_row["base_amount"] += (
account.base_amount * item.get(based_on_field) / total_item_cost
)
else:
item_row["amount"] += item.applicable_charges / exchange_rate
item_row["base_amount"] += item.applicable_charges
return item_account_wise_cost
def update_inventory_dimensions(self, row, sl_dict) -> None:
# To handle delivery note and sales invoice
if row.get("item_row"):
row = row.get("item_row")
dimensions = get_evaluated_inventory_dimension(row, sl_dict, parent_doc=self)
for dimension in dimensions:
if not dimension:
continue
if (
self.doctype in ["Purchase Invoice", "Purchase Receipt"]
and row.get("rejected_warehouse")
and sl_dict.get("warehouse") == row.get("rejected_warehouse")
):
fieldname = f"rejected_{dimension.source_fieldname}"
sl_dict[dimension.target_fieldname] = row.get(fieldname)
continue
if self.doctype in [
"Purchase Invoice",
"Purchase Receipt",
"Sales Invoice",
"Delivery Note",
"Stock Entry",
]:
if (
(
sl_dict.actual_qty > 0
and not self.get("is_return")
or sl_dict.actual_qty < 0
and self.get("is_return")
)
and self.doctype in ["Purchase Invoice", "Purchase Receipt", "Stock Entry"]
) or (
(
sl_dict.actual_qty < 0
and not self.get("is_return")
or sl_dict.actual_qty > 0
and self.get("is_return")
)
and self.doctype in ["Sales Invoice", "Delivery Note", "Stock Entry"]
):
if self.doctype == "Stock Entry":
if row.get("t_warehouse") == sl_dict.warehouse and sl_dict.get("actual_qty") > 0:
fieldname = f"to_{dimension.source_fieldname}"
if dimension.source_fieldname.startswith("to_"):
fieldname = f"{dimension.source_fieldname}"
sl_dict[dimension.target_fieldname] = row.get(fieldname)
continue
sl_dict[dimension.target_fieldname] = row.get(dimension.source_fieldname)
else:
fieldname_start_with = "to"
if self.doctype in ["Purchase Invoice", "Purchase Receipt"]:
fieldname_start_with = "from"
fieldname = f"{fieldname_start_with}_{dimension.source_fieldname}"
sl_dict[dimension.target_fieldname] = row.get(fieldname)
if not sl_dict.get(dimension.target_fieldname):
sl_dict[dimension.target_fieldname] = row.get(dimension.source_fieldname)
elif row.get(dimension.source_fieldname):
sl_dict[dimension.target_fieldname] = row.get(dimension.source_fieldname)
if not sl_dict.get(dimension.target_fieldname) and dimension.fetch_from_parent:
sl_dict[dimension.target_fieldname] = self.get(dimension.fetch_from_parent)
# Get value based on doctype name
if not sl_dict.get(dimension.target_fieldname):
fieldname = next(
(
field.fieldname
for field in frappe.get_meta(self.doctype).fields
if field.options == dimension.fetch_from_parent
),
None,
)
if fieldname and self.get(fieldname):
sl_dict[dimension.target_fieldname] = self.get(fieldname)
if sl_dict[dimension.target_fieldname] and self.docstatus == 1:
row.db_set(dimension.source_fieldname, sl_dict[dimension.target_fieldname])
def make_sl_entries(self, sl_entries, allow_negative_stock=False, via_landed_cost_voucher=False):
from erpnext.stock.serial_batch_bundle import update_batch_qty
from erpnext.stock.stock_ledger import make_sl_entries
make_sl_entries(sl_entries, allow_negative_stock, via_landed_cost_voucher)
update_batch_qty(self.doctype, self.name, via_landed_cost_voucher=via_landed_cost_voucher)
def make_gl_entries_on_cancel(self):
cancel_exchange_gain_loss_journal(frappe._dict(doctype=self.doctype, name=self.name))
if frappe.db.sql(
"""select name from `tabGL Entry` where voucher_type=%s
and voucher_no=%s""",
(self.doctype, self.name),
):
self.make_gl_entries()
def get_serialized_items(self):
serialized_items = []
item_codes = list(set(d.item_code for d in self.get("items")))
if item_codes:
serialized_items = frappe.db.sql_list(
"""select name from `tabItem`
where has_serial_no=1 and name in ({})""".format(", ".join(["%s"] * len(item_codes))),
tuple(item_codes),
)
return serialized_items
def validate_warehouse(self):
from erpnext.stock.utils import validate_disabled_warehouse, validate_warehouse_company
warehouses = list(set(d.warehouse for d in self.get("items") if getattr(d, "warehouse", None)))
target_warehouses = list(
set([d.target_warehouse for d in self.get("items") if getattr(d, "target_warehouse", None)])
)
warehouses.extend(target_warehouses)
from_warehouse = list(
set([d.from_warehouse for d in self.get("items") if getattr(d, "from_warehouse", None)])
)
warehouses.extend(from_warehouse)
for w in warehouses:
validate_disabled_warehouse(w)
validate_warehouse_company(w, self.company)
def update_billing_percentage(self, update_modified=True):
target_ref_field = "amount"
if self.doctype == "Delivery Note":
total_amount = total_returned = 0
for item in self.items:
total_amount += flt(item.amount)
total_returned += flt(item.returned_qty * item.rate)
if total_returned < total_amount:
target_ref_field = "(amount - (returned_qty * rate))"
self._update_percent_field(
{
"target_dt": self.doctype + " Item",
"target_parent_dt": self.doctype,
"target_parent_field": "per_billed",
"target_ref_field": target_ref_field,
"target_field": "billed_amt",
"name": self.name,
},
update_modified,
)
def validate_inspection(self):
"""Checks if quality inspection is set/ is valid for Items that require inspection."""
inspection_fieldname_map = {
"Purchase Receipt": "inspection_required_before_purchase",
"Purchase Invoice": "inspection_required_before_purchase",
"Subcontracting Receipt": "inspection_required_before_purchase",
"Sales Invoice": "inspection_required_before_delivery",
"Delivery Note": "inspection_required_before_delivery",
}
inspection_required_fieldname = inspection_fieldname_map.get(self.doctype)
# return if inspection is not required on document level
if (
(not inspection_required_fieldname and self.doctype != "Stock Entry")
or (self.doctype == "Stock Entry" and not self.inspection_required)
or (self.doctype in ["Sales Invoice", "Purchase Invoice"] and not self.update_stock)
):
return
for row in self.get("items"):
qi_required = False
if inspection_required_fieldname and frappe.get_cached_value(
"Item", row.item_code, inspection_required_fieldname
):
qi_required = True
elif self.doctype == "Stock Entry" and row.t_warehouse:
qi_required = True # inward stock needs inspection
if row.get("is_scrap_item"):
continue
if qi_required: # validate row only if inspection is required on item level
self.validate_qi_presence(row)
if self.docstatus == 1:
self.validate_qi_submission(row)
self.validate_qi_rejection(row)
def validate_qi_presence(self, row):
"""Check if QI is present on row level. Warn on save and stop on submit if missing."""
if self.doctype in [
"Purchase Receipt",
"Purchase Invoice",
"Sales Invoice",
"Delivery Note",
] and frappe.get_single_value(
"Stock Settings", "allow_to_make_quality_inspection_after_purchase_or_delivery"
):
return
if not row.quality_inspection:
msg = _("Row #{0}: Quality Inspection is required for Item {1}").format(
row.idx, frappe.bold(row.item_code)
)
if self.docstatus == 1:
frappe.throw(msg, title=_("Inspection Required"), exc=QualityInspectionRequiredError)
else:
frappe.msgprint(msg, title=_("Inspection Required"), indicator="blue")
def validate_qi_submission(self, row):
"""Check if QI is submitted on row level, during submission"""
action = frappe.get_single_value("Stock Settings", "action_if_quality_inspection_is_not_submitted")
qa_docstatus = frappe.db.get_value("Quality Inspection", row.quality_inspection, "docstatus")
if qa_docstatus != 1:
link = frappe.utils.get_link_to_form("Quality Inspection", row.quality_inspection)
msg = _("Row #{0}: Quality Inspection {1} is not submitted for the item: {2}").format(
row.idx, link, row.item_code
)
if action == "Stop":
frappe.throw(msg, title=_("Inspection Submission"), exc=QualityInspectionNotSubmittedError)
else:
frappe.msgprint(msg, alert=True, indicator="orange")
def validate_qi_rejection(self, row):
"""Check if QI is rejected on row level, during submission"""
action = frappe.get_single_value("Stock Settings", "action_if_quality_inspection_is_rejected")
qa_status = frappe.db.get_value("Quality Inspection", row.quality_inspection, "status")
if qa_status == "Rejected":
link = frappe.utils.get_link_to_form("Quality Inspection", row.quality_inspection)
msg = _("Row #{0}: Quality Inspection {1} was rejected for item {2}").format(
row.idx, link, row.item_code
)
if action == "Stop":
frappe.throw(msg, title=_("Inspection Rejected"), exc=QualityInspectionRejectedError)
else:
frappe.msgprint(msg, alert=True, indicator="orange")
def update_blanket_order(self):
blanket_orders = list(set([d.blanket_order for d in self.items if d.blanket_order]))
for blanket_order in blanket_orders:
frappe.get_doc("Blanket Order", blanket_order).update_ordered_qty()
def validate_customer_provided_item(self):
for d in self.get("items"):
# Customer Provided parts will have zero valuation rate
if frappe.get_cached_value("Item", d.item_code, "is_customer_provided_item"):
d.allow_zero_valuation_rate = 1
def set_rate_of_stock_uom(self):
if self.doctype in [
"Purchase Receipt",
"Purchase Invoice",
"Purchase Order",
"Sales Invoice",
"Sales Order",
"Delivery Note",
"Quotation",
]:
for d in self.get("items"):
d.stock_uom_rate = d.rate / (d.conversion_factor or 1)
def validate_internal_transfer(self):
if self.doctype in ("Sales Invoice", "Delivery Note", "Purchase Invoice", "Purchase Receipt"):
if self.is_internal_transfer():
self.validate_in_transit_warehouses()
self.validate_multi_currency()
self.validate_packed_items()
if self.get("is_internal_supplier") and self.docstatus == 1:
self.validate_internal_transfer_qty()
else:
self.validate_internal_transfer_warehouse()
def validate_internal_transfer_warehouse(self):
for row in self.items:
if row.get("target_warehouse"):
row.target_warehouse = None
if row.get("from_warehouse"):
row.from_warehouse = None
def validate_in_transit_warehouses(self):
if (self.doctype == "Sales Invoice" and self.get("update_stock")) or self.doctype == "Delivery Note":
for item in self.get("items"):
if not item.target_warehouse:
frappe.throw(
_("Row {0}: Target Warehouse is mandatory for internal transfers").format(item.idx)
)
if (
self.doctype == "Purchase Invoice" and self.get("update_stock")
) or self.doctype == "Purchase Receipt":
for item in self.get("items"):
if not item.from_warehouse:
frappe.throw(
_("Row {0}: From Warehouse is mandatory for internal transfers").format(item.idx)
)
def validate_multi_currency(self):
if self.currency != self.company_currency:
frappe.throw(_("Internal transfers can only be done in company's default currency"))
def validate_packed_items(self):
if self.doctype in ("Sales Invoice", "Delivery Note Item") and self.get("packed_items"):
frappe.throw(_("Packed Items cannot be transferred internally"))
def validate_internal_transfer_qty(self):
if self.doctype not in ["Purchase Invoice", "Purchase Receipt"]:
return
self.__inter_company_reference = (
self.get("inter_company_reference")
if self.doctype == "Purchase Invoice"
else self.get("inter_company_invoice_reference")
)
item_wise_transfer_qty = self.get_item_wise_inter_transfer_qty()
if not item_wise_transfer_qty:
return
item_wise_received_qty = self.get_item_wise_inter_received_qty()
precision = frappe.get_precision(self.doctype + " Item", "qty")
over_receipt_allowance = frappe.get_single_value("Stock Settings", "over_delivery_receipt_allowance")
parent_doctype = {
"Purchase Receipt": "Delivery Note",
"Purchase Invoice": "Sales Invoice",
}.get(self.doctype)
for key, transferred_qty in item_wise_transfer_qty.items():
recevied_qty = flt(item_wise_received_qty.get(key), precision)
if over_receipt_allowance:
transferred_qty = transferred_qty + flt(
transferred_qty * over_receipt_allowance / 100, precision
)
if recevied_qty > flt(transferred_qty, precision):
frappe.throw(
_("For Item {0} cannot be received more than {1} qty against the {2} {3}").format(
bold(key[1]),
bold(flt(transferred_qty, precision)),
bold(parent_doctype),
get_link_to_form(parent_doctype, self.__inter_company_reference),
)
)
def get_item_wise_inter_transfer_qty(self):
parent_doctype = {
"Purchase Receipt": "Delivery Note",
"Purchase Invoice": "Sales Invoice",
}.get(self.doctype)
child_doctype = parent_doctype + " Item"
parent_tab = frappe.qb.DocType(parent_doctype)
child_tab = frappe.qb.DocType(child_doctype)
query = (
frappe.qb.from_(parent_doctype)
.inner_join(child_tab)
.on(child_tab.parent == parent_tab.name)
.select(
child_tab.name,
child_tab.item_code,
child_tab.qty,
)
.where((parent_tab.name == self.__inter_company_reference) & (parent_tab.docstatus == 1))
)
data = query.run(as_dict=True)
item_wise_transfer_qty = defaultdict(float)
for row in data:
item_wise_transfer_qty[(row.name, row.item_code)] += flt(row.qty)
return item_wise_transfer_qty
def get_item_wise_inter_received_qty(self):
child_doctype = self.doctype + " Item"
parent_tab = frappe.qb.DocType(self.doctype)
child_tab = frappe.qb.DocType(child_doctype)
query = (
frappe.qb.from_(self.doctype)
.inner_join(child_tab)
.on(child_tab.parent == parent_tab.name)
.select(
child_tab.item_code,
child_tab.qty,
)
.where(parent_tab.docstatus == 1)
)
if self.doctype == "Purchase Invoice":
query = query.select(
child_tab.sales_invoice_item.as_("name"),
)
query = query.where(
parent_tab.inter_company_invoice_reference == self.inter_company_invoice_reference
)
else:
query = query.select(
child_tab.delivery_note_item.as_("name"),
)
query = query.where(parent_tab.inter_company_reference == self.inter_company_reference)
data = query.run(as_dict=True)
item_wise_transfer_qty = defaultdict(float)
for row in data:
item_wise_transfer_qty[(row.name, row.item_code)] += flt(row.qty)
return item_wise_transfer_qty
def validate_putaway_capacity(self):
# if over receipt is attempted while 'apply putaway rule' is disabled
# and if rule was applied on the transaction, validate it.
from erpnext.stock.doctype.putaway_rule.putaway_rule import get_available_putaway_capacity
valid_doctype = self.doctype in (
"Purchase Receipt",
"Stock Entry",
"Purchase Invoice",
"Stock Reconciliation",
)
if not frappe.get_all("Putaway Rule", limit=1):
return
if self.doctype == "Purchase Invoice" and self.get("update_stock") == 0:
valid_doctype = False
if valid_doctype:
rule_map = defaultdict(dict)
for item in self.get("items"):
warehouse_field = "t_warehouse" if self.doctype == "Stock Entry" else "warehouse"
rule = frappe.db.get_value(
"Putaway Rule",
{"item_code": item.get("item_code"), "warehouse": item.get(warehouse_field)},
["name", "disable"],
as_dict=True,
)
if rule:
if rule.get("disabled"):
continue # dont validate for disabled rule
if self.doctype == "Stock Reconciliation":
stock_qty = flt(item.qty)
else:
stock_qty = (
flt(item.transfer_qty) if self.doctype == "Stock Entry" else flt(item.stock_qty)
)
rule_name = rule.get("name")
if not rule_map[rule_name]:
rule_map[rule_name]["warehouse"] = item.get(warehouse_field)
rule_map[rule_name]["item"] = item.get("item_code")
rule_map[rule_name]["qty_put"] = 0
rule_map[rule_name]["capacity"] = get_available_putaway_capacity(rule_name)
rule_map[rule_name]["qty_put"] += flt(stock_qty)
for rule, values in rule_map.items():
if flt(values["qty_put"]) > flt(values["capacity"]):
message = self.prepare_over_receipt_message(rule, values)
frappe.throw(msg=message, title=_("Over Receipt"))
def prepare_over_receipt_message(self, rule, values):
message = _("{0} qty of Item {1} is being received into Warehouse {2} with capacity {3}.").format(
frappe.bold(values["qty_put"]),
frappe.bold(values["item"]),
frappe.bold(values["warehouse"]),
frappe.bold(values["capacity"]),
)
message += "<br><br>"
rule_link = frappe.utils.get_link_to_form("Putaway Rule", rule)
message += _("Please adjust the qty or edit {0} to proceed.").format(rule_link)
return message
def repost_future_sle_and_gle(self, force=False, via_landed_cost_voucher=False):
args = frappe._dict(
{
"posting_date": self.posting_date,
"posting_time": self.posting_time,
"voucher_type": self.doctype,
"voucher_no": self.name,
"company": self.company,
"via_landed_cost_voucher": via_landed_cost_voucher,
}
)
if self.docstatus == 2:
force = True
if force or future_sle_exists(args) or repost_required_for_queue(self):
item_based_reposting = frappe.get_single_value("Stock Reposting Settings", "item_based_reposting")
if item_based_reposting:
create_item_wise_repost_entries(
voucher_type=self.doctype,
voucher_no=self.name,
via_landed_cost_voucher=via_landed_cost_voucher,
)
else:
create_repost_item_valuation_entry(args)
def add_gl_entry(
self,
gl_entries,
account,
cost_center,
debit,
credit,
remarks,
against_account,
debit_in_account_currency=None,
credit_in_account_currency=None,
account_currency=None,
project=None,
voucher_detail_no=None,
item=None,
posting_date=None,
):
gl_entry = {
"account": account,
"cost_center": cost_center,
"debit": debit,
"credit": credit,
"against": against_account,
"remarks": remarks,
}
if voucher_detail_no:
gl_entry.update({"voucher_detail_no": voucher_detail_no})
if debit_in_account_currency:
gl_entry.update({"debit_in_account_currency": debit_in_account_currency})
if credit_in_account_currency:
gl_entry.update({"credit_in_account_currency": credit_in_account_currency})
if posting_date:
gl_entry.update({"posting_date": posting_date})
gl_entries.append(self.get_gl_dict(gl_entry, item=item))
@frappe.whitelist()
def show_accounting_ledger_preview(company, doctype, docname):
filters = frappe._dict(company=company, include_dimensions=1)
doc = frappe.get_lazy_doc(doctype, docname)
doc.run_method("before_gl_preview")
gl_columns, gl_data = get_accounting_ledger_preview(doc, filters)
frappe.db.rollback()
return {"gl_columns": gl_columns, "gl_data": gl_data}
@frappe.whitelist()
def show_stock_ledger_preview(company, doctype, docname):
filters = frappe._dict(company=company)
doc = frappe.get_lazy_doc(doctype, docname)
doc.run_method("before_sl_preview")
sl_columns, sl_data = get_stock_ledger_preview(doc, filters)
frappe.db.rollback()
return {
"sl_columns": sl_columns,
"sl_data": sl_data,
}
def get_accounting_ledger_preview(doc, filters):
from erpnext.accounts.report.general_ledger.general_ledger import get_columns as get_gl_columns
gl_columns, gl_data = [], []
fields = [
"posting_date",
"account",
"debit",
"credit",
"against",
"party_type",
"party",
"cost_center",
"against_voucher_type",
"against_voucher",
]
doc.docstatus = 1
if doc.get("update_stock") or doc.doctype in ("Purchase Receipt", "Delivery Note"):
doc.update_stock_ledger()
doc.make_gl_entries()
columns = get_gl_columns(filters)
gl_entries = get_gl_entries_for_preview(doc.doctype, doc.name, fields)
gl_columns = get_columns(columns, fields)
gl_data = get_data(fields, gl_entries)
return gl_columns, gl_data
def get_stock_ledger_preview(doc, filters):
from erpnext.stock.report.stock_ledger.stock_ledger import get_columns as get_sl_columns
sl_columns, sl_data = [], []
fields = [
"item_code",
"stock_uom",
"actual_qty",
"qty_after_transaction",
"warehouse",
"incoming_rate",
"valuation_rate",
"stock_value",
"stock_value_difference",
]
columns_fields = [
"item_code",
"stock_uom",
"in_qty",
"out_qty",
"qty_after_transaction",
"warehouse",
"incoming_rate",
"in_out_rate",
"stock_value",
"stock_value_difference",
]
if doc.get("update_stock") or doc.doctype in ("Purchase Receipt", "Delivery Note"):
doc.docstatus = 1
doc.update_stock_ledger()
columns = get_sl_columns(filters)
sl_entries = get_sl_entries_for_preview(doc.doctype, doc.name, fields)
sl_columns = get_columns(columns, columns_fields)
sl_data = get_data(columns_fields, sl_entries)
return sl_columns, sl_data
def get_sl_entries_for_preview(doctype, docname, fields):
sl_entries = frappe.get_all(
"Stock Ledger Entry", filters={"voucher_type": doctype, "voucher_no": docname}, fields=fields
)
for entry in sl_entries:
if entry.actual_qty > 0:
entry["in_qty"] = entry.actual_qty
entry["out_qty"] = 0
else:
entry["out_qty"] = abs(entry.actual_qty)
entry["in_qty"] = 0
entry["in_out_rate"] = entry["valuation_rate"]
return sl_entries
def get_gl_entries_for_preview(doctype, docname, fields):
return frappe.get_all("GL Entry", filters={"voucher_type": doctype, "voucher_no": docname}, fields=fields)
def get_columns(raw_columns, fields):
return [
{"name": d.get("label"), "editable": False, "width": 110}
for d in raw_columns
if not d.get("hidden") and d.get("fieldname") in fields
]
def get_data(raw_columns, raw_data):
datatable_data = []
for row in raw_data:
data_row = []
for column in raw_columns:
data_row.append(row.get(column) or "")
datatable_data.append(data_row)
return datatable_data
def repost_required_for_queue(doc: StockController) -> bool:
"""check if stock document contains repeated item-warehouse with queue based valuation.
if queue exists for repeated items then SLEs need to reprocessed in background again.
"""
consuming_sles = frappe.db.get_all(
"Stock Ledger Entry",
filters={
"voucher_type": doc.doctype,
"voucher_no": doc.name,
"actual_qty": ("<", 0),
"is_cancelled": 0,
},
fields=["item_code", "warehouse", "stock_queue"],
)
item_warehouses = [(sle.item_code, sle.warehouse) for sle in consuming_sles]
unique_item_warehouses = set(item_warehouses)
if len(unique_item_warehouses) == len(item_warehouses):
return False
for sle in consuming_sles:
if sle.stock_queue != "[]": # using FIFO/LIFO valuation
return True
return False
@frappe.whitelist()
def check_item_quality_inspection(doctype, items):
if isinstance(items, str):
items = json.loads(items)
inspection_fieldname_map = {
"Purchase Receipt": "inspection_required_before_purchase",
"Purchase Invoice": "inspection_required_before_purchase",
"Subcontracting Receipt": "inspection_required_before_purchase",
"Sales Invoice": "inspection_required_before_delivery",
"Delivery Note": "inspection_required_before_delivery",
}
items_to_remove = []
for item in items:
if not frappe.db.get_value("Item", item.get("item_code"), inspection_fieldname_map.get(doctype)):
items_to_remove.append(item)
items = [item for item in items if item not in items_to_remove]
return items
@frappe.whitelist()
def make_quality_inspections(doctype, docname, items, inspection_type):
if isinstance(items, str):
items = json.loads(items)
inspections = []
for item in items:
if flt(item.get("sample_size")) > flt(item.get("qty")):
frappe.throw(
_(
"{item_name}'s Sample Size ({sample_size}) cannot be greater than the Accepted Quantity ({accepted_quantity})"
).format(
item_name=item.get("item_name"),
sample_size=item.get("sample_size"),
accepted_quantity=item.get("qty"),
)
)
quality_inspection = frappe.get_doc(
{
"doctype": "Quality Inspection",
"inspection_type": inspection_type,
"inspected_by": frappe.session.user,
"reference_type": doctype,
"reference_name": docname,
"item_code": item.get("item_code"),
"description": item.get("description"),
"sample_size": flt(item.get("sample_size")),
"item_serial_no": item.get("serial_no").split("\n")[0] if item.get("serial_no") else None,
"batch_no": item.get("batch_no"),
}
).insert()
quality_inspection.save()
inspections.append(quality_inspection.name)
return inspections
def is_reposting_pending():
return frappe.db.exists(
"Repost Item Valuation", {"docstatus": 1, "status": ["in", ["Queued", "In Progress"]]}
)
def future_sle_exists(args, sl_entries=None, allow_force_reposting=True):
from erpnext.stock.utils import get_combine_datetime
if allow_force_reposting and frappe.get_single_value(
"Stock Reposting Settings", "do_reposting_for_each_stock_transaction"
):
return True
key = (args.voucher_type, args.voucher_no)
if not hasattr(frappe.local, "future_sle"):
frappe.local.future_sle = {}
if validate_future_sle_not_exists(args, key, sl_entries):
return False
elif get_cached_data(args, key):
return True
if not sl_entries:
sl_entries = get_sle_entries_against_voucher(args)
if not sl_entries:
return
or_conditions = get_conditions_to_validate_future_sle(sl_entries)
args["posting_datetime"] = get_combine_datetime(args["posting_date"], args["posting_time"])
data = frappe.db.sql(
"""
select item_code, warehouse, count(name) as total_row
from `tabStock Ledger Entry`
where
({})
and posting_datetime >= %(posting_datetime)s
and voucher_no != %(voucher_no)s
and is_cancelled = 0
GROUP BY
item_code, warehouse
""".format(" or ".join(or_conditions)),
args,
as_dict=1,
)
for d in data:
frappe.local.future_sle[key][(d.item_code, d.warehouse)] = d.total_row
return len(data)
def validate_future_sle_not_exists(args, key, sl_entries=None):
item_key = ""
if args.get("item_code"):
item_key = (args.get("item_code"), args.get("warehouse"))
if not sl_entries and hasattr(frappe.local, "future_sle"):
if key not in frappe.local.future_sle:
return False
if not frappe.local.future_sle.get(key) or (
item_key and item_key not in frappe.local.future_sle.get(key)
):
return True
def get_cached_data(args, key):
if key not in frappe.local.future_sle:
frappe.local.future_sle[key] = frappe._dict({})
if args.get("item_code"):
item_key = (args.get("item_code"), args.get("warehouse"))
count = frappe.local.future_sle[key].get(item_key)
return True if (count or count == 0) else False
else:
return frappe.local.future_sle[key]
def get_sle_entries_against_voucher(args):
return frappe.get_all(
"Stock Ledger Entry",
filters={"voucher_type": args.voucher_type, "voucher_no": args.voucher_no},
fields=["item_code", "warehouse"],
order_by="creation asc",
)
def get_conditions_to_validate_future_sle(sl_entries):
warehouse_items_map = {}
for entry in sl_entries:
if entry.warehouse not in warehouse_items_map:
warehouse_items_map[entry.warehouse] = set()
warehouse_items_map[entry.warehouse].add(entry.item_code)
or_conditions = []
for warehouse, items in warehouse_items_map.items():
or_conditions.append(
f"""warehouse = {frappe.db.escape(warehouse)}
and item_code in ({", ".join(frappe.db.escape(item) for item in items)})"""
)
return or_conditions
def create_repost_item_valuation_entry(args):
args = frappe._dict(args)
repost_entry = frappe.new_doc("Repost Item Valuation")
repost_entry.based_on = args.based_on
if not args.based_on:
repost_entry.based_on = "Transaction" if args.voucher_no else "Item and Warehouse"
repost_entry.voucher_type = args.voucher_type
repost_entry.voucher_no = args.voucher_no
repost_entry.item_code = args.item_code
repost_entry.warehouse = args.warehouse
repost_entry.posting_date = args.posting_date
repost_entry.posting_time = args.posting_time
repost_entry.company = args.company
repost_entry.allow_zero_rate = args.allow_zero_rate
repost_entry.flags.ignore_links = True
repost_entry.flags.ignore_permissions = True
repost_entry.via_landed_cost_voucher = args.via_landed_cost_voucher
repost_entry.save()
repost_entry.submit()
def create_item_wise_repost_entries(
voucher_type, voucher_no, allow_zero_rate=False, via_landed_cost_voucher=False
):
"""Using a voucher create repost item valuation records for all item-warehouse pairs."""
stock_ledger_entries = get_items_to_be_repost(voucher_type, voucher_no)
distinct_item_warehouses = set()
repost_entries = []
for sle in stock_ledger_entries:
item_wh = (sle.item_code, sle.warehouse)
if item_wh in distinct_item_warehouses:
continue
distinct_item_warehouses.add(item_wh)
repost_entry = frappe.new_doc("Repost Item Valuation")
repost_entry.based_on = "Item and Warehouse"
repost_entry.item_code = sle.item_code
repost_entry.warehouse = sle.warehouse
repost_entry.posting_date = sle.posting_date
repost_entry.posting_time = sle.posting_time
repost_entry.allow_zero_rate = allow_zero_rate
repost_entry.flags.ignore_links = True
repost_entry.flags.ignore_permissions = True
repost_entry.via_landed_cost_voucher = via_landed_cost_voucher
repost_entry.submit()
repost_entries.append(repost_entry)
return repost_entries
def make_bundle_for_material_transfer(**kwargs):
if isinstance(kwargs, dict):
kwargs = frappe._dict(kwargs)
bundle_doc = frappe.get_doc("Serial and Batch Bundle", kwargs.serial_and_batch_bundle)
if not kwargs.type_of_transaction:
kwargs.type_of_transaction = "Inward"
bundle_doc = frappe.copy_doc(bundle_doc)
bundle_doc.docstatus = 0
bundle_doc.warehouse = kwargs.warehouse
bundle_doc.type_of_transaction = kwargs.type_of_transaction
bundle_doc.voucher_type = kwargs.voucher_type
bundle_doc.voucher_no = "" if kwargs.is_new or kwargs.docstatus == 2 else kwargs.voucher_no
bundle_doc.is_cancelled = 0
qty = 0
if (
len(bundle_doc.entries) == 1
and flt(kwargs.qty) < flt(bundle_doc.total_qty)
and not bundle_doc.has_serial_no
):
qty = kwargs.qty
for row in bundle_doc.entries:
row.is_outward = 0
row.qty = abs(qty or row.qty)
row.stock_value_difference = abs(row.stock_value_difference)
if kwargs.type_of_transaction == "Outward":
row.qty *= -1
row.stock_value_difference *= row.stock_value_difference
row.is_outward = 1
row.warehouse = kwargs.warehouse
bundle_doc.calculate_qty_and_amount()
bundle_doc.flags.ignore_permissions = True
bundle_doc.flags.ignore_validate = True
if kwargs.do_not_submit:
bundle_doc.save(ignore_permissions=True)
else:
bundle_doc.submit()
return bundle_doc.name