1618 lines
50 KiB
Python
1618 lines
50 KiB
Python
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
|
|
# License: GNU General Public License v3. See license.txt
|
|
|
|
|
|
import json
|
|
import typing
|
|
from functools import WRAPPER_ASSIGNMENTS, wraps
|
|
|
|
import frappe
|
|
from frappe import _, throw
|
|
from frappe.model import child_table_fields, default_fields
|
|
from frappe.model.document import Document
|
|
from frappe.model.meta import get_field_precision
|
|
from frappe.model.utils import get_fetch_values
|
|
from frappe.query_builder.functions import IfNull, Sum
|
|
from frappe.utils import add_days, add_months, cint, cstr, flt, getdate, parse_json
|
|
|
|
import erpnext
|
|
from erpnext import get_company_currency
|
|
from erpnext.accounts.doctype.pricing_rule.pricing_rule import (
|
|
get_pricing_rule_for_item,
|
|
set_transaction_type,
|
|
)
|
|
from erpnext.setup.doctype.brand.brand import get_brand_defaults
|
|
from erpnext.setup.doctype.item_group.item_group import get_item_group_defaults
|
|
from erpnext.setup.utils import get_exchange_rate
|
|
from erpnext.stock.doctype.item.item import get_item_defaults, get_uom_conv_factor
|
|
from erpnext.stock.doctype.item_manufacturer.item_manufacturer import get_item_manufacturer_part_no
|
|
from erpnext.stock.doctype.price_list.price_list import get_price_list_details
|
|
|
|
ItemDetails = frappe._dict
|
|
ItemDetailsCtx = frappe._dict
|
|
ItemPriceCtx = frappe._dict
|
|
|
|
sales_doctypes = ["Quotation", "Sales Order", "Delivery Note", "Sales Invoice", "POS Invoice"]
|
|
purchase_doctypes = [
|
|
"Material Request",
|
|
"Supplier Quotation",
|
|
"Purchase Order",
|
|
"Purchase Receipt",
|
|
"Purchase Invoice",
|
|
]
|
|
|
|
|
|
def _preprocess_ctx(ctx):
|
|
if not ctx.price_list:
|
|
ctx.price_list = ctx.selling_price_list or ctx.buying_price_list
|
|
|
|
if not ctx.item_code and ctx.barcode:
|
|
ctx.item_code = get_item_code(barcode=ctx.barcode)
|
|
elif not ctx.item_code and ctx.serial_no:
|
|
ctx.item_code = get_item_code(serial_no=ctx.serial_no)
|
|
|
|
set_transaction_type(ctx)
|
|
|
|
|
|
@frappe.whitelist()
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def get_item_details(
|
|
ctx: ItemDetailsCtx, doc=None, for_validate=False, overwrite_warehouse=True
|
|
) -> ItemDetails:
|
|
"""
|
|
ctx = {
|
|
"item_code": "",
|
|
"warehouse": None,
|
|
"customer": "",
|
|
"conversion_rate": 1.0,
|
|
"selling_price_list": None,
|
|
"price_list_currency": None,
|
|
"plc_conversion_rate": 1.0,
|
|
"doctype": "",
|
|
"name": "",
|
|
"supplier": None,
|
|
"transaction_date": None,
|
|
"conversion_rate": 1.0,
|
|
"buying_price_list": None,
|
|
"is_subcontracted": 0/1,
|
|
"ignore_pricing_rule": 0/1
|
|
"project": ""
|
|
"set_warehouse": ""
|
|
}
|
|
"""
|
|
_preprocess_ctx(ctx)
|
|
for_validate = parse_json(for_validate)
|
|
overwrite_warehouse = parse_json(overwrite_warehouse)
|
|
item = frappe.get_cached_doc("Item", ctx.item_code)
|
|
validate_item_details(ctx, item)
|
|
|
|
if isinstance(doc, str):
|
|
doc = json.loads(doc)
|
|
|
|
if doc:
|
|
ctx.transaction_date = doc.get("transaction_date") or doc.get("posting_date")
|
|
|
|
if doc.get("doctype") == "Purchase Invoice":
|
|
ctx.bill_date = doc.get("bill_date")
|
|
|
|
out: ItemDetails = get_basic_details(ctx, item, overwrite_warehouse)
|
|
|
|
get_item_tax_template(ctx, item, out)
|
|
out.item_tax_rate = get_item_tax_map(
|
|
doc=doc or ctx,
|
|
tax_template=out.item_tax_template or ctx.item_tax_template,
|
|
as_json=True,
|
|
)
|
|
|
|
get_party_item_code(ctx, item, out)
|
|
|
|
if ctx.doctype in ["Sales Order", "Quotation"]:
|
|
set_valuation_rate(out, ctx)
|
|
|
|
update_party_blanket_order(ctx, out)
|
|
|
|
# Never try to find a customer price if customer is set in these Doctype
|
|
current_customer = ctx.customer
|
|
if ctx.doctype in ["Purchase Order", "Purchase Receipt", "Purchase Invoice"]:
|
|
ctx.customer = None
|
|
|
|
out.update(get_price_list_rate(ctx, item))
|
|
|
|
ctx.customer = current_customer
|
|
|
|
if ctx.customer and cint(ctx.is_pos):
|
|
out.update(get_pos_profile_item_details_(ctx, ctx.company, update_data=True))
|
|
|
|
if item.is_stock_item:
|
|
update_bin_details(ctx, out, doc)
|
|
|
|
# update ctx with out, if key or value not exists
|
|
for key, value in out.items():
|
|
if ctx.get(key) is None:
|
|
ctx[key] = value
|
|
|
|
data = get_pricing_rule_for_item(ctx, doc=doc, for_validate=for_validate)
|
|
|
|
out.update(data)
|
|
|
|
if (
|
|
frappe.get_single_value("Stock Settings", "auto_create_serial_and_batch_bundle_for_outward")
|
|
and not ctx.get("serial_and_batch_bundle")
|
|
and (ctx.get("use_serial_batch_fields") or ctx.get("doctype") == "POS Invoice")
|
|
):
|
|
update_stock(ctx, out, doc)
|
|
|
|
if ctx.transaction_date and item.lead_time_days:
|
|
out.schedule_date = out.lead_time_date = add_days(ctx.transaction_date, item.lead_time_days)
|
|
|
|
if ctx.is_subcontracted:
|
|
out.bom = ctx.bom or get_default_bom(ctx.item_code)
|
|
|
|
get_gross_profit(out)
|
|
if ctx.doctype == "Material Request":
|
|
out.rate = ctx.rate or out.price_list_rate
|
|
out.amount = flt(ctx.qty) * flt(out.rate)
|
|
|
|
out = remove_standard_fields(out)
|
|
return out
|
|
|
|
|
|
def remove_standard_fields(out: ItemDetails):
|
|
for key in child_table_fields + default_fields:
|
|
out.pop(key, None)
|
|
return out
|
|
|
|
|
|
def set_valuation_rate(out: ItemDetails | dict, ctx: ItemDetailsCtx):
|
|
if frappe.db.exists("Product Bundle", {"name": ctx.item_code, "disabled": 0}, cache=True):
|
|
valuation_rate = 0.0
|
|
bundled_items = frappe.get_doc("Product Bundle", ctx.item_code)
|
|
|
|
for bundle_item in bundled_items.items:
|
|
valuation_rate += flt(
|
|
get_valuation_rate(bundle_item.item_code, ctx.company, out.get("warehouse")).get(
|
|
"valuation_rate"
|
|
)
|
|
* bundle_item.qty
|
|
)
|
|
|
|
out.update({"valuation_rate": valuation_rate})
|
|
|
|
else:
|
|
out.update(get_valuation_rate(ctx.item_code, ctx.company, out.get("warehouse")))
|
|
|
|
|
|
def update_stock(ctx, out, doc=None):
|
|
from erpnext.stock.doctype.batch.batch import get_available_batches
|
|
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos_for_outward
|
|
|
|
if (
|
|
(
|
|
ctx.get("doctype") in ["Delivery Note", "POS Invoice"]
|
|
or (ctx.get("doctype") == "Sales Invoice" and ctx.get("update_stock"))
|
|
)
|
|
and out.warehouse
|
|
and out.stock_qty > 0
|
|
):
|
|
if doc and isinstance(doc, dict):
|
|
doc = frappe._dict(doc)
|
|
|
|
kwargs = frappe._dict(
|
|
{
|
|
"item_code": ctx.item_code,
|
|
"warehouse": ctx.warehouse,
|
|
"based_on": frappe.get_single_value("Stock Settings", "pick_serial_and_batch_based_on"),
|
|
"sabb_voucher_no": doc.get("name"),
|
|
"sabb_voucher_detail_no": ctx.child_docname,
|
|
"sabb_voucher_type": ctx.doctype,
|
|
}
|
|
)
|
|
|
|
if ctx.get("doctype") == "Delivery Note":
|
|
kwargs["against_sales_order"] = ctx.get("against_sales_order")
|
|
|
|
if ctx.get("ignore_serial_nos"):
|
|
kwargs["ignore_serial_nos"] = ctx.get("ignore_serial_nos")
|
|
|
|
qty = out.stock_qty
|
|
batches = []
|
|
if out.has_batch_no and not ctx.get("batch_no"):
|
|
batches = get_available_batches(kwargs)
|
|
if doc:
|
|
filter_batches(batches, doc)
|
|
|
|
for batch_no, batch_qty in batches.items():
|
|
rate = get_batch_based_item_price(
|
|
{"price_list": doc.get("selling_price_list"), "uom": out.uom, "batch_no": batch_no},
|
|
out.item_code,
|
|
)
|
|
if batch_qty >= qty:
|
|
out.update({"batch_no": batch_no, "actual_batch_qty": qty})
|
|
if rate:
|
|
out.update({"rate": rate, "price_list_rate": rate})
|
|
break
|
|
else:
|
|
qty -= batch_qty
|
|
|
|
out.update({"batch_no": batch_no, "actual_batch_qty": batch_qty})
|
|
if rate:
|
|
out.update({"rate": rate, "price_list_rate": rate})
|
|
|
|
if out.has_serial_no and out.has_batch_no and has_incorrect_serial_nos(ctx, out):
|
|
kwargs["batches"] = [ctx.get("batch_no")] if ctx.get("batch_no") else [out.get("batch_no")]
|
|
serial_nos = get_serial_nos_for_outward(kwargs)
|
|
serial_nos = get_filtered_serial_nos(serial_nos, doc)
|
|
|
|
out["serial_no"] = "\n".join(serial_nos[: cint(out.stock_qty)])
|
|
|
|
elif out.has_serial_no and not ctx.get("serial_no"):
|
|
serial_nos = get_serial_nos_for_outward(kwargs)
|
|
serial_nos = get_filtered_serial_nos(serial_nos, doc)
|
|
|
|
out["serial_no"] = "\n".join(serial_nos[: cint(out.stock_qty)])
|
|
|
|
|
|
def has_incorrect_serial_nos(ctx, out):
|
|
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
|
|
|
|
if not ctx.get("serial_no"):
|
|
return True
|
|
|
|
serial_nos = get_serial_nos(ctx.get("serial_no"))
|
|
if len(serial_nos) != out.get("stock_qty"):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def filter_batches(batches, doc):
|
|
for row in doc.get("items"):
|
|
if row.get("batch_no") in batches:
|
|
batches[row.get("batch_no")] -= row.get("qty")
|
|
if batches[row.get("batch_no")] <= 0:
|
|
del batches[row.get("batch_no")]
|
|
|
|
|
|
def get_filtered_serial_nos(serial_nos, doc):
|
|
from erpnext.stock.doctype.serial_no.serial_no import get_serial_nos
|
|
|
|
for row in doc.get("items"):
|
|
if row.get("serial_no"):
|
|
for serial_no in get_serial_nos(row.get("serial_no")):
|
|
if serial_no in serial_nos:
|
|
serial_nos.remove(serial_no)
|
|
|
|
return serial_nos
|
|
|
|
|
|
def update_bin_details(ctx: ItemDetailsCtx, out: ItemDetails, doc):
|
|
if ctx.doctype == "Material Request" and ctx.material_request_type == "Material Transfer":
|
|
out.update(get_bin_details(ctx.item_code, ctx.from_warehouse))
|
|
|
|
elif out.get("warehouse"):
|
|
company = ctx.company if (doc and doc.get("doctype") == "Purchase Order") else None
|
|
|
|
# calculate company_total_stock only for po
|
|
bin_details = get_bin_details(ctx.item_code, out.warehouse, company, include_child_warehouses=True)
|
|
|
|
out.update(bin_details)
|
|
|
|
|
|
def get_item_code(barcode=None, serial_no=None):
|
|
if barcode:
|
|
item_code = frappe.db.get_value("Item Barcode", {"barcode": barcode}, fieldname=["parent"])
|
|
if not item_code:
|
|
frappe.throw(_("No Item with Barcode {0}").format(barcode))
|
|
elif serial_no:
|
|
item_code = frappe.db.get_value("Serial No", serial_no, "item_code")
|
|
if not item_code:
|
|
frappe.throw(_("No Item with Serial No {0}").format(serial_no))
|
|
|
|
return item_code
|
|
|
|
|
|
def validate_item_details(ctx: ItemDetailsCtx, item):
|
|
if not ctx.company:
|
|
throw(_("Please specify Company"))
|
|
|
|
from erpnext.stock.doctype.item.item import validate_end_of_life
|
|
|
|
validate_end_of_life(item.name, item.end_of_life, item.disabled)
|
|
|
|
if cint(item.has_variants):
|
|
msg = f"Item {item.name} is a template, please select one of its variants"
|
|
|
|
throw(_(msg), title=_("Template Item Selected"))
|
|
|
|
elif ctx.transaction_type == "buying" and ctx.doctype != "Material Request":
|
|
if ctx.is_subcontracted:
|
|
if ctx.is_old_subcontracting_flow:
|
|
if item.is_sub_contracted_item != 1:
|
|
throw(_("Item {0} must be a Sub-contracted Item").format(item.name))
|
|
else:
|
|
if item.is_stock_item:
|
|
throw(_("Item {0} must be a Non-Stock Item").format(item.name))
|
|
|
|
|
|
def get_basic_details(ctx: ItemDetailsCtx, item, overwrite_warehouse=True) -> ItemDetails:
|
|
"""
|
|
:param ctx: {
|
|
"item_code": "",
|
|
"warehouse": None,
|
|
"customer": "",
|
|
"conversion_rate": 1.0,
|
|
"selling_price_list": None,
|
|
"price_list_currency": None,
|
|
"price_list_uom_dependant": None,
|
|
"plc_conversion_rate": 1.0,
|
|
"doctype": "",
|
|
"name": "",
|
|
"supplier": None,
|
|
"transaction_date": None,
|
|
"conversion_rate": 1.0,
|
|
"buying_price_list": None,
|
|
"is_subcontracted": 0/1,
|
|
"ignore_pricing_rule": 0/1
|
|
"project": "",
|
|
barcode: "",
|
|
serial_no: "",
|
|
currency: "",
|
|
update_stock: "",
|
|
price_list: "",
|
|
company: "",
|
|
order_type: "",
|
|
is_pos: "",
|
|
project: "",
|
|
qty: "",
|
|
stock_qty: "",
|
|
conversion_factor: "",
|
|
against_blanket_order: 0/1
|
|
}
|
|
:param item: `item_code` of Item object
|
|
:return: frappe._dict
|
|
"""
|
|
|
|
if not item:
|
|
item = frappe.get_cached_doc("Item", ctx.item_code)
|
|
|
|
if item.variant_of and not item.taxes and frappe.db.exists("Item Tax", {"parent": item.variant_of}):
|
|
item.update_template_tables()
|
|
|
|
item_defaults = get_item_defaults(item.name, ctx.company)
|
|
item_group_defaults = get_item_group_defaults(item.name, ctx.company)
|
|
brand_defaults = get_brand_defaults(item.name, ctx.company)
|
|
|
|
defaults = frappe._dict(
|
|
{
|
|
"item_defaults": item_defaults,
|
|
"item_group_defaults": item_group_defaults,
|
|
"brand_defaults": brand_defaults,
|
|
}
|
|
)
|
|
|
|
warehouse = get_item_warehouse_(ctx, item, overwrite_warehouse, defaults)
|
|
|
|
if ctx.doctype == "Material Request" and not ctx.material_request_type:
|
|
ctx["material_request_type"] = frappe.db.get_value(
|
|
"Material Request", ctx.name, "material_request_type", cache=True
|
|
)
|
|
|
|
expense_account = None
|
|
|
|
if item.is_fixed_asset:
|
|
from erpnext.assets.doctype.asset.asset import get_asset_account, is_cwip_accounting_enabled
|
|
|
|
if is_cwip_accounting_enabled(item.asset_category):
|
|
expense_account = get_asset_account(
|
|
"capital_work_in_progress_account",
|
|
asset_category=item.asset_category,
|
|
company=ctx.company,
|
|
)
|
|
elif ctx.doctype in (
|
|
"Purchase Invoice",
|
|
"Purchase Receipt",
|
|
"Purchase Order",
|
|
"Material Request",
|
|
):
|
|
from erpnext.assets.doctype.asset_category.asset_category import get_asset_category_account
|
|
|
|
expense_account = get_asset_category_account(
|
|
fieldname="fixed_asset_account", item=ctx.item_code, company=ctx.company
|
|
)
|
|
|
|
# Set the UOM to the Default Sales UOM or Default Purchase UOM if configured in the Item Master
|
|
if not ctx.uom:
|
|
if ctx.doctype in sales_doctypes:
|
|
ctx.uom = item.sales_uom if item.sales_uom else item.stock_uom
|
|
elif (ctx.doctype in ["Purchase Order", "Purchase Receipt", "Purchase Invoice"]) or (
|
|
ctx.doctype == "Material Request" and ctx.material_request_type == "Purchase"
|
|
):
|
|
ctx.uom = item.purchase_uom if item.purchase_uom else item.stock_uom
|
|
else:
|
|
ctx.uom = item.stock_uom
|
|
|
|
# Set stock UOM in ctx, so that it can be used while fetching item price
|
|
ctx.stock_uom = item.stock_uom
|
|
|
|
if ctx.batch_no and item.name != frappe.get_cached_value("Batch", ctx.batch_no, "item"):
|
|
ctx.batch_no = ""
|
|
|
|
out = ItemDetails(
|
|
{
|
|
"item_code": item.name,
|
|
"item_name": item.item_name,
|
|
"description": cstr(item.description).strip(),
|
|
"image": cstr(item.image).strip(),
|
|
"warehouse": warehouse,
|
|
"income_account": get_default_income_account(
|
|
ctx, item_defaults, item_group_defaults, brand_defaults
|
|
),
|
|
"expense_account": expense_account
|
|
or get_default_expense_account(ctx, item_defaults, item_group_defaults, brand_defaults),
|
|
"discount_account": get_default_discount_account(
|
|
ctx, item_defaults, item_group_defaults, brand_defaults
|
|
),
|
|
"provisional_expense_account": get_provisional_account(
|
|
ctx, item_defaults, item_group_defaults, brand_defaults
|
|
),
|
|
"cost_center": get_default_cost_center(ctx, item_defaults, item_group_defaults, brand_defaults),
|
|
"has_serial_no": item.has_serial_no,
|
|
"has_batch_no": item.has_batch_no,
|
|
"batch_no": ctx.batch_no,
|
|
"uom": ctx.uom,
|
|
"stock_uom": item.stock_uom,
|
|
"min_order_qty": flt(item.min_order_qty) if ctx.doctype == "Material Request" else "",
|
|
"qty": flt(ctx.qty) or 1.0,
|
|
"stock_qty": flt(ctx.qty) or 1.0,
|
|
"price_list_rate": 0.0,
|
|
"base_price_list_rate": 0.0,
|
|
"rate": 0.0,
|
|
"base_rate": 0.0,
|
|
"amount": 0.0,
|
|
"base_amount": 0.0,
|
|
"net_rate": 0.0,
|
|
"net_amount": 0.0,
|
|
"discount_percentage": 0.0,
|
|
"discount_amount": flt(ctx.discount_amount) or 0.0,
|
|
"update_stock": ctx.update_stock if ctx.doctype in ["Sales Invoice", "Purchase Invoice"] else 0,
|
|
"delivered_by_supplier": item.delivered_by_supplier
|
|
if ctx.doctype in ["Sales Order", "Sales Invoice"]
|
|
else 0,
|
|
"is_fixed_asset": item.is_fixed_asset,
|
|
"last_purchase_rate": item.last_purchase_rate if ctx.doctype in ["Purchase Order"] else 0,
|
|
"transaction_date": ctx.transaction_date,
|
|
"against_blanket_order": ctx.against_blanket_order,
|
|
"bom_no": item.get("default_bom"),
|
|
"weight_per_unit": ctx.weight_per_unit or item.get("weight_per_unit"),
|
|
"weight_uom": ctx.weight_uom or item.get("weight_uom"),
|
|
"grant_commission": item.get("grant_commission"),
|
|
}
|
|
)
|
|
|
|
default_supplier = get_default_supplier(ctx, item_defaults, item_group_defaults, brand_defaults)
|
|
if default_supplier:
|
|
out.supplier = default_supplier
|
|
|
|
if item.get("enable_deferred_revenue") or item.get("enable_deferred_expense"):
|
|
out.update(calculate_service_end_date(ctx, item))
|
|
|
|
# calculate conversion factor
|
|
if item.stock_uom == ctx.uom:
|
|
out.conversion_factor = 1.0
|
|
else:
|
|
out.conversion_factor = ctx.conversion_factor or get_conversion_factor(item.name, ctx.uom).get(
|
|
"conversion_factor"
|
|
)
|
|
|
|
ctx.conversion_factor = out.conversion_factor
|
|
out.stock_qty = out.qty * out.conversion_factor
|
|
ctx.stock_qty = out.stock_qty
|
|
|
|
# calculate last purchase rate
|
|
if ctx.doctype in purchase_doctypes and not frappe.db.get_single_value(
|
|
"Buying Settings", "disable_last_purchase_rate"
|
|
):
|
|
from erpnext.buying.doctype.purchase_order.purchase_order import item_last_purchase_rate
|
|
|
|
out.last_purchase_rate = item_last_purchase_rate(
|
|
ctx.name, ctx.conversion_rate, item.name, out.conversion_factor
|
|
)
|
|
|
|
# if default specified in item is for another company, fetch from company
|
|
for d in [
|
|
["Account", "income_account", "default_income_account"],
|
|
["Account", "expense_account", "default_expense_account"],
|
|
["Cost Center", "cost_center", "cost_center"],
|
|
["Warehouse", "warehouse", ""],
|
|
]:
|
|
if not out[d[1]]:
|
|
out[d[1]] = frappe.get_cached_value("Company", ctx.company, d[2]) if d[2] else None
|
|
|
|
for fieldname in ("item_name", "item_group", "brand", "stock_uom"):
|
|
out[fieldname] = item.get(fieldname)
|
|
|
|
if ctx.manufacturer:
|
|
part_no = get_item_manufacturer_part_no(ctx.item_code, ctx.manufacturer)
|
|
if part_no:
|
|
out.manufacturer_part_no = part_no
|
|
else:
|
|
out.manufacturer_part_no = None
|
|
out.manufacturer = None
|
|
else:
|
|
data = frappe.get_cached_value(
|
|
"Item", item.name, ["default_item_manufacturer", "default_manufacturer_part_no"], as_dict=True
|
|
)
|
|
|
|
if data:
|
|
out.update(
|
|
{
|
|
"manufacturer": data.default_item_manufacturer,
|
|
"manufacturer_part_no": data.default_manufacturer_part_no,
|
|
}
|
|
)
|
|
|
|
child_doctype = ctx.doctype + " Item"
|
|
meta = frappe.get_meta(child_doctype)
|
|
if meta.get_field("barcode"):
|
|
update_barcode_value(out)
|
|
|
|
if out.weight_per_unit:
|
|
out.total_weight = out.weight_per_unit * out.stock_qty
|
|
|
|
return out
|
|
|
|
|
|
from erpnext.deprecation_dumpster import get_item_warehouse
|
|
|
|
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def get_item_warehouse_(ctx: ItemDetailsCtx, item, overwrite_warehouse, defaults=None):
|
|
if not defaults:
|
|
defaults = frappe._dict(
|
|
{
|
|
"item_defaults": get_item_defaults(item.name, ctx.company),
|
|
"item_group_defaults": get_item_group_defaults(item.name, ctx.company),
|
|
"brand_defaults": get_brand_defaults(item.name, ctx.company),
|
|
}
|
|
)
|
|
|
|
if overwrite_warehouse or not ctx.warehouse:
|
|
warehouse = (
|
|
ctx.set_warehouse
|
|
or defaults.item_defaults.get("default_warehouse")
|
|
or defaults.item_group_defaults.get("default_warehouse")
|
|
or defaults.brand_defaults.get("default_warehouse")
|
|
or ctx.warehouse
|
|
)
|
|
|
|
if not warehouse:
|
|
defaults = frappe.defaults.get_defaults() or {}
|
|
warehouse_exists = frappe.db.exists(
|
|
"Warehouse", {"name": defaults.default_warehouse, "company": ctx.company}
|
|
)
|
|
if defaults.get("default_warehouse") and warehouse_exists:
|
|
warehouse = defaults.default_warehouse
|
|
|
|
else:
|
|
warehouse = ctx.warehouse
|
|
|
|
if not warehouse:
|
|
default_warehouse = frappe.get_single_value("Stock Settings", "default_warehouse")
|
|
if frappe.db.get_value("Warehouse", default_warehouse, "company") == ctx.company:
|
|
return default_warehouse
|
|
|
|
return warehouse
|
|
|
|
|
|
def update_barcode_value(out):
|
|
barcode_data = get_barcode_data([out])
|
|
|
|
# If item has one barcode then update the value of the barcode field
|
|
if barcode_data and len(barcode_data.get(out.item_code)) == 1:
|
|
out["barcode"] = barcode_data.get(out.item_code)[0]
|
|
|
|
|
|
def get_barcode_data(items_list=None, item_code=None):
|
|
# get item-wise batch no data
|
|
# example: {'LED-GRE': [Batch001, Batch002]}
|
|
# where LED-GRE is item code, SN0001 is serial no and Pune is warehouse
|
|
|
|
itemwise_barcode = {}
|
|
if not items_list and item_code:
|
|
_dict_item_code = frappe._dict(
|
|
{
|
|
"item_code": item_code,
|
|
}
|
|
)
|
|
|
|
items_list = [frappe._dict(_dict_item_code)]
|
|
|
|
for item in items_list:
|
|
barcodes = frappe.db.get_all("Item Barcode", filters={"parent": item.item_code}, fields="barcode")
|
|
|
|
for barcode in barcodes:
|
|
if item.item_code not in itemwise_barcode:
|
|
itemwise_barcode.setdefault(item.item_code, [])
|
|
itemwise_barcode[item.item_code].append(barcode.get("barcode"))
|
|
|
|
return itemwise_barcode
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_item_tax_info(doc, tax_category, item_codes, item_rates=None, item_tax_templates=None):
|
|
out = {}
|
|
|
|
if item_tax_templates is None:
|
|
item_tax_templates = {}
|
|
|
|
if item_rates is None:
|
|
item_rates = {}
|
|
|
|
doc = parse_json(doc)
|
|
item_codes = parse_json(item_codes)
|
|
item_rates = parse_json(item_rates)
|
|
item_tax_templates = parse_json(item_tax_templates)
|
|
|
|
for item_code in item_codes:
|
|
if not item_code or item_code[1] in out or not item_tax_templates.get(item_code[1]):
|
|
continue
|
|
|
|
out[item_code[1]] = ItemDetails()
|
|
item = frappe.get_cached_doc("Item", item_code[0])
|
|
ctx: ItemDetailsCtx = {
|
|
"company": doc.company,
|
|
"tax_category": tax_category,
|
|
"base_net_rate": item_rates.get(item_code[1]),
|
|
}
|
|
|
|
if item_tax_templates:
|
|
ctx.update({"item_tax_template": item_tax_templates.get(item_code[1])})
|
|
|
|
get_item_tax_template(ctx, item, out[item_code[1]])
|
|
out[item_code[1]]["item_tax_rate"] = get_item_tax_map(
|
|
doc=doc,
|
|
tax_template=out[item_code[1]].get("item_tax_template"),
|
|
as_json=True,
|
|
)
|
|
|
|
return out
|
|
|
|
|
|
@frappe.whitelist()
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def get_item_tax_template(ctx: ItemDetailsCtx, item=None, out: ItemDetails | None = None):
|
|
"""
|
|
Determines item_tax template from item or parent item groups.
|
|
|
|
Accesses:
|
|
ctx = {
|
|
"child_doctype": str
|
|
}
|
|
Passes:
|
|
ctx = {
|
|
"company": str
|
|
"bill_date": str
|
|
"transaction_date": str
|
|
"tax_category": None
|
|
"item_tax_template": None
|
|
"base_net_rate": float
|
|
}
|
|
"""
|
|
if not item:
|
|
if not ctx.get("item_code"):
|
|
frappe.throw(_("Item/Item Code required to get Item Tax Template."))
|
|
else:
|
|
item = frappe.get_cached_doc("Item", ctx.item_code)
|
|
|
|
item_tax_template = None
|
|
if item.taxes:
|
|
item_tax_template = _get_item_tax_template(ctx, item.taxes, out)
|
|
|
|
if not item_tax_template:
|
|
item_group = item.item_group
|
|
while item_group and not item_tax_template:
|
|
item_group_doc = frappe.get_cached_doc("Item Group", item_group)
|
|
item_tax_template = _get_item_tax_template(ctx, item_group_doc.taxes, out)
|
|
item_group = item_group_doc.parent_item_group
|
|
|
|
if out and ctx.get("child_doctype") and item_tax_template:
|
|
out.update(get_fetch_values(ctx.get("child_doctype"), "item_tax_template", item_tax_template))
|
|
|
|
return item_tax_template
|
|
|
|
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def _get_item_tax_template(
|
|
ctx: ItemDetailsCtx, taxes, out: ItemDetails | None = None, for_validate=False
|
|
) -> None | str | list[str]:
|
|
"""
|
|
Accesses:
|
|
ctx = {
|
|
"company": str
|
|
"bill_date": str
|
|
"transaction_date": str
|
|
"tax_category": None
|
|
"item_tax_template": None
|
|
}
|
|
Passes:
|
|
ctx = {
|
|
"base_net_rate": float
|
|
}
|
|
"""
|
|
if out is None:
|
|
out = ItemDetails()
|
|
taxes_with_validity = []
|
|
taxes_with_no_validity = []
|
|
|
|
for tax in taxes:
|
|
tax_company = frappe.get_cached_value("Item Tax Template", tax.item_tax_template, "company")
|
|
if tax_company == ctx["company"]:
|
|
if tax.valid_from or tax.maximum_net_rate:
|
|
# In purchase Invoice first preference will be given to supplier invoice date
|
|
# if supplier date is not present then posting date
|
|
validation_date = (
|
|
ctx.get("bill_date") or ctx.get("posting_date") or ctx.get("transaction_date")
|
|
)
|
|
|
|
if getdate(tax.valid_from) <= getdate(validation_date) and is_within_valid_range(ctx, tax):
|
|
taxes_with_validity.append(tax)
|
|
else:
|
|
taxes_with_no_validity.append(tax)
|
|
|
|
if taxes_with_validity:
|
|
taxes = sorted(taxes_with_validity, key=lambda i: i.valid_from or tax.maximum_net_rate, reverse=True)
|
|
else:
|
|
taxes = taxes_with_no_validity
|
|
|
|
if for_validate:
|
|
return [
|
|
tax.item_tax_template
|
|
for tax in taxes
|
|
if (
|
|
cstr(tax.tax_category) == cstr(ctx.get("tax_category"))
|
|
and (tax.item_tax_template not in taxes)
|
|
)
|
|
]
|
|
|
|
# all templates have validity and no template is valid
|
|
if not taxes_with_validity and (not taxes_with_no_validity):
|
|
return None
|
|
|
|
# do not change if already a valid template
|
|
if ctx.get("item_tax_template") in {t.item_tax_template for t in taxes}:
|
|
out.item_tax_template = ctx.get("item_tax_template")
|
|
return ctx.get("item_tax_template")
|
|
|
|
for tax in taxes:
|
|
if cstr(tax.tax_category) == cstr(ctx.get("tax_category")):
|
|
out.item_tax_template = tax.item_tax_template
|
|
return tax.item_tax_template
|
|
return None
|
|
|
|
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def is_within_valid_range(ctx: ItemDetailsCtx, tax) -> bool:
|
|
"""
|
|
Accesses:
|
|
ctx = {
|
|
"base_net_rate": float
|
|
}
|
|
"""
|
|
|
|
if not flt(tax.maximum_net_rate):
|
|
# No range specified, just ignore
|
|
return True
|
|
elif flt(tax.minimum_net_rate) <= flt(ctx.get("base_net_rate")) <= flt(tax.maximum_net_rate):
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_item_tax_map(*, doc: str | dict | Document, tax_template: str | None = None, as_json=True):
|
|
doc = parse_json(doc)
|
|
item_tax_map = {}
|
|
for t in (t for t in (doc.get("taxes") or []) if not t.get("set_by_item_tax_template")):
|
|
item_tax_map[t.get("account_head")] = t.get("rate")
|
|
|
|
if tax_template:
|
|
template = frappe.get_cached_doc("Item Tax Template", tax_template)
|
|
for d in template.taxes:
|
|
if frappe.get_cached_value("Account", d.tax_type, "company") == doc.get("company"):
|
|
item_tax_map[d.tax_type] = d.tax_rate
|
|
|
|
return json.dumps(item_tax_map) if as_json else item_tax_map
|
|
|
|
|
|
@frappe.whitelist()
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def calculate_service_end_date(ctx: ItemDetailsCtx, item=None):
|
|
_preprocess_ctx(ctx)
|
|
if not item:
|
|
item = frappe.get_cached_doc("Item", ctx.item_code)
|
|
|
|
doctype = ctx.parenttype or ctx.doctype
|
|
if doctype == "Sales Invoice":
|
|
enable_deferred = "enable_deferred_revenue"
|
|
no_of_months = "no_of_months"
|
|
account = "deferred_revenue_account"
|
|
else:
|
|
enable_deferred = "enable_deferred_expense"
|
|
no_of_months = "no_of_months_exp"
|
|
account = "deferred_expense_account"
|
|
|
|
service_start_date = ctx.service_start_date if ctx.service_start_date else ctx.transaction_date
|
|
service_end_date = add_months(service_start_date, item.get(no_of_months))
|
|
deferred_detail = {"service_start_date": service_start_date, "service_end_date": service_end_date}
|
|
deferred_detail[enable_deferred] = item.get(enable_deferred)
|
|
deferred_detail[account] = get_default_deferred_account(ctx, item, fieldname=account)
|
|
|
|
return deferred_detail
|
|
|
|
|
|
def get_default_income_account(ctx: ItemDetailsCtx, item, item_group, brand):
|
|
return (
|
|
item.get("income_account")
|
|
or item_group.get("income_account")
|
|
or brand.get("income_account")
|
|
or ctx.income_account
|
|
)
|
|
|
|
|
|
def get_default_expense_account(ctx: ItemDetailsCtx, item, item_group, brand):
|
|
return (
|
|
item.get("expense_account")
|
|
or item_group.get("expense_account")
|
|
or brand.get("expense_account")
|
|
or ctx.expense_account
|
|
)
|
|
|
|
|
|
def get_provisional_account(ctx: ItemDetailsCtx, item, item_group, brand):
|
|
return (
|
|
item.get("default_provisional_account")
|
|
or item_group.get("default_provisional_account")
|
|
or brand.get("default_provisional_account")
|
|
or ctx.default_provisional_account
|
|
)
|
|
|
|
|
|
def get_default_discount_account(ctx: ItemDetailsCtx, item, item_group, brand):
|
|
return (
|
|
item.get("default_discount_account")
|
|
or item_group.get("default_discount_account")
|
|
or brand.get("default_discount_account")
|
|
or ctx.discount_account
|
|
)
|
|
|
|
|
|
def get_default_deferred_account(ctx: ItemDetailsCtx, item, fieldname=None):
|
|
if item.get("enable_deferred_revenue") or item.get("enable_deferred_expense"):
|
|
return (
|
|
frappe.get_cached_value(
|
|
"Item Default",
|
|
{"parent": ctx.item_code, "company": ctx.company},
|
|
fieldname,
|
|
)
|
|
or ctx.get(fieldname)
|
|
or frappe.get_cached_value("Company", ctx.company, "default_" + fieldname)
|
|
)
|
|
else:
|
|
return None
|
|
|
|
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def get_default_cost_center(ctx: ItemDetailsCtx, item=None, item_group=None, brand=None, company=None):
|
|
cost_center = None
|
|
|
|
if not company and ctx.get("company"):
|
|
company = ctx.get("company")
|
|
|
|
if ctx.get("project"):
|
|
cost_center = frappe.db.get_value("Project", ctx.get("project"), "cost_center", cache=True)
|
|
|
|
if not cost_center and (item and item_group and brand):
|
|
if ctx.get("customer"):
|
|
cost_center = (
|
|
item.get("selling_cost_center")
|
|
or item_group.get("selling_cost_center")
|
|
or brand.get("selling_cost_center")
|
|
)
|
|
else:
|
|
cost_center = (
|
|
item.get("buying_cost_center")
|
|
or item_group.get("buying_cost_center")
|
|
or brand.get("buying_cost_center")
|
|
)
|
|
|
|
elif not cost_center and ctx.get("item_code") and company:
|
|
for method in ["get_item_defaults", "get_item_group_defaults", "get_brand_defaults"]:
|
|
path = f"erpnext.stock.get_item_details.{method}"
|
|
data = frappe.get_attr(path)(ctx.get("item_code"), company)
|
|
|
|
if data and (data.selling_cost_center or data.buying_cost_center):
|
|
if ctx.get("customer") and data.selling_cost_center:
|
|
return data.selling_cost_center
|
|
|
|
elif ctx.get("supplier") and data.buying_cost_center:
|
|
return data.buying_cost_center
|
|
|
|
return data.selling_cost_center or data.buying_cost_center
|
|
|
|
if not cost_center and ctx.get("cost_center"):
|
|
cost_center = ctx.get("cost_center")
|
|
|
|
if company and cost_center and frappe.get_cached_value("Cost Center", cost_center, "company") != company:
|
|
return None
|
|
|
|
if not cost_center and company:
|
|
cost_center = frappe.get_cached_value("Company", company, "cost_center")
|
|
|
|
return cost_center
|
|
|
|
|
|
def get_default_supplier(_ctx: ItemDetailsCtx, item, item_group, brand):
|
|
return item.get("default_supplier") or item_group.get("default_supplier") or brand.get("default_supplier")
|
|
|
|
|
|
def get_price_list_rate(ctx: ItemDetailsCtx, item_doc, out: ItemDetails = None):
|
|
if out is None:
|
|
out = ItemDetails()
|
|
|
|
meta = frappe.get_meta(ctx.parenttype or ctx.doctype)
|
|
|
|
if meta.get_field("currency") or ctx.get("currency"):
|
|
if not ctx.price_list_currency or not ctx.plc_conversion_rate:
|
|
# if currency and plc_conversion_rate exist then
|
|
# `get_price_list_currency_and_exchange_rate` has already been called
|
|
pl_details = get_price_list_currency_and_exchange_rate(ctx)
|
|
ctx.update(pl_details)
|
|
|
|
if meta.get_field("currency"):
|
|
validate_conversion_rate(ctx, meta)
|
|
|
|
price_list_rate = get_price_list_rate_for(ctx, item_doc.name)
|
|
|
|
# variant
|
|
if price_list_rate is None and item_doc.variant_of:
|
|
price_list_rate = get_price_list_rate_for(ctx, item_doc.variant_of)
|
|
|
|
# insert in database
|
|
if price_list_rate is None or frappe.get_cached_value(
|
|
"Stock Settings", "Stock Settings", "update_existing_price_list_rate"
|
|
):
|
|
insert_item_price(ctx)
|
|
|
|
if price_list_rate is None:
|
|
return out
|
|
|
|
out.price_list_rate = flt(price_list_rate) * flt(ctx.plc_conversion_rate) / flt(ctx.conversion_rate)
|
|
|
|
if frappe.db.get_single_value("Buying Settings", "disable_last_purchase_rate"):
|
|
return out
|
|
|
|
if not ctx.is_internal_supplier and not out.price_list_rate and ctx.transaction_type == "buying":
|
|
from erpnext.stock.doctype.item.item import get_last_purchase_details
|
|
|
|
out.update(get_last_purchase_details(item_doc.name, ctx.name, ctx.conversion_rate))
|
|
|
|
return out
|
|
|
|
|
|
def insert_item_price(ctx: ItemDetailsCtx):
|
|
"""Insert Item Price if Price List and Price List Rate are specified and currency is the same"""
|
|
if not ctx.price_list or not ctx.rate or ctx.is_internal_supplier or ctx.is_internal_customer:
|
|
return
|
|
|
|
stock_settings = frappe.get_cached_doc("Stock Settings")
|
|
|
|
if (
|
|
not frappe.db.get_value("Price List", ctx.price_list, "currency", cache=True) == ctx.currency
|
|
or not stock_settings.auto_insert_price_list_rate_if_missing
|
|
or not frappe.has_permission("Item Price", "write")
|
|
):
|
|
return
|
|
|
|
item_price = frappe.db.get_value(
|
|
"Item Price",
|
|
{
|
|
"item_code": ctx.item_code,
|
|
"price_list": ctx.price_list,
|
|
"currency": ctx.currency,
|
|
"uom": ctx.stock_uom,
|
|
},
|
|
["name", "price_list_rate"],
|
|
as_dict=1,
|
|
)
|
|
|
|
update_based_on_price_list_rate = stock_settings.update_price_list_based_on == "Price List Rate"
|
|
|
|
if item_price and item_price.name:
|
|
if not stock_settings.update_existing_price_list_rate:
|
|
return
|
|
|
|
rate_to_consider = flt(ctx.price_list_rate) if update_based_on_price_list_rate else flt(ctx.rate)
|
|
price_list_rate = _get_stock_uom_rate(rate_to_consider, ctx)
|
|
|
|
if not price_list_rate or item_price.price_list_rate == price_list_rate:
|
|
return
|
|
|
|
frappe.db.set_value("Item Price", item_price.name, "price_list_rate", price_list_rate)
|
|
frappe.msgprint(
|
|
_("Item Price updated for {0} in Price List {1}").format(ctx.item_code, ctx.price_list),
|
|
alert=True,
|
|
)
|
|
else:
|
|
rate_to_consider = (
|
|
(flt(ctx.price_list_rate) or flt(ctx.rate)) if update_based_on_price_list_rate else flt(ctx.rate)
|
|
)
|
|
price_list_rate = _get_stock_uom_rate(rate_to_consider, ctx)
|
|
|
|
item_price = frappe.get_doc(
|
|
{
|
|
"doctype": "Item Price",
|
|
"price_list": ctx.price_list,
|
|
"item_code": ctx.item_code,
|
|
"currency": ctx.currency,
|
|
"price_list_rate": price_list_rate,
|
|
"uom": ctx.stock_uom,
|
|
}
|
|
)
|
|
item_price.insert()
|
|
frappe.msgprint(
|
|
_("Item Price added for {0} in Price List {1}").format(ctx.item_code, ctx.price_list),
|
|
alert=True,
|
|
)
|
|
|
|
|
|
def _get_stock_uom_rate(rate: float, ctx: ItemDetailsCtx):
|
|
return rate / ctx.conversion_factor if ctx.conversion_factor else rate
|
|
|
|
|
|
def get_item_price(
|
|
pctx: ItemPriceCtx | dict, item_code, ignore_party=False, force_batch_no=False
|
|
) -> list[dict]:
|
|
"""
|
|
Get name, price_list_rate from Item Price based on conditions
|
|
Check if the desired qty is within the increment of the packing list.
|
|
:param pctx: dict (or frappe._dict) with mandatory fields price_list, uom
|
|
optional fields transaction_date, customer, supplier
|
|
:param item_code: str, Item Doctype field item_code
|
|
"""
|
|
pctx: ItemPriceCtx = frappe._dict(pctx)
|
|
|
|
ip = frappe.qb.DocType("Item Price")
|
|
query = (
|
|
frappe.qb.from_(ip)
|
|
.select(ip.name, ip.price_list_rate, ip.uom)
|
|
.where(
|
|
(ip.item_code == item_code)
|
|
& (ip.price_list == pctx.price_list)
|
|
& (IfNull(ip.uom, "").isin(["", pctx.uom]))
|
|
)
|
|
.orderby(ip.valid_from, order=frappe.qb.desc)
|
|
.orderby(IfNull(ip.batch_no, ""), order=frappe.qb.desc)
|
|
.orderby(ip.uom, order=frappe.qb.desc)
|
|
.limit(1)
|
|
)
|
|
|
|
if force_batch_no:
|
|
query = query.where(ip.batch_no == pctx.batch_no)
|
|
else:
|
|
query = query.where(IfNull(ip.batch_no, "").isin(["", pctx.batch_no]))
|
|
|
|
if not ignore_party:
|
|
if pctx.customer:
|
|
query = query.where(ip.customer == pctx.customer)
|
|
elif pctx.supplier:
|
|
query = query.where(ip.supplier == pctx.supplier)
|
|
else:
|
|
query = query.where((IfNull(ip.customer, "") == "") & (IfNull(ip.supplier, "") == ""))
|
|
|
|
if pctx.transaction_date:
|
|
query = query.where(
|
|
(IfNull(ip.valid_from, "2000-01-01") <= pctx.transaction_date)
|
|
& (IfNull(ip.valid_upto, "2500-12-31") >= pctx.transaction_date)
|
|
)
|
|
|
|
return query.run(as_dict=True)
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_batch_based_item_price(pctx: ItemPriceCtx | dict | str, item_code) -> float:
|
|
pctx = parse_json(pctx)
|
|
|
|
item_price = get_item_price(pctx, item_code, force_batch_no=True)
|
|
if not item_price:
|
|
item_price = get_item_price(pctx, item_code, ignore_party=True, force_batch_no=True)
|
|
|
|
is_free_item = pctx.get("items", [{}])[0].get("is_free_item")
|
|
|
|
if item_price and item_price[0].uom == pctx.uom and not is_free_item:
|
|
return item_price[0].price_list_rate
|
|
|
|
return 0.0
|
|
|
|
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def get_price_list_rate_for(ctx: ItemDetailsCtx, item_code):
|
|
"""
|
|
:param customer: link to Customer DocType
|
|
:param supplier: link to Supplier DocType
|
|
:param price_list: str (Standard Buying or Standard Selling)
|
|
:param item_code: str, Item Doctype field item_code
|
|
:param qty: Desired Qty
|
|
:param transaction_date: Date of the price
|
|
"""
|
|
pctx = ItemPriceCtx(
|
|
{
|
|
"item_code": item_code,
|
|
"price_list": ctx.get("price_list"),
|
|
"customer": ctx.get("customer"),
|
|
"supplier": ctx.get("supplier"),
|
|
"uom": ctx.get("uom"),
|
|
"transaction_date": ctx.get("transaction_date"),
|
|
"batch_no": ctx.get("batch_no"),
|
|
}
|
|
)
|
|
|
|
item_price_data = 0
|
|
price_list_rate = get_item_price(pctx, item_code)
|
|
if price_list_rate:
|
|
desired_qty = ctx.get("qty")
|
|
if desired_qty and check_packing_list(price_list_rate[0].name, desired_qty, item_code):
|
|
item_price_data = price_list_rate
|
|
else:
|
|
for field in ["customer", "supplier"]:
|
|
del pctx[field]
|
|
|
|
general_price_list_rate = get_item_price(pctx, item_code, ignore_party=ctx.get("ignore_party"))
|
|
|
|
if not general_price_list_rate and ctx.get("uom") != ctx.get("stock_uom"):
|
|
pctx.uom = ctx.get("stock_uom")
|
|
general_price_list_rate = get_item_price(pctx, item_code, ignore_party=ctx.get("ignore_party"))
|
|
|
|
if general_price_list_rate:
|
|
item_price_data = general_price_list_rate
|
|
|
|
if item_price_data:
|
|
if item_price_data[0].uom == ctx.get("uom"):
|
|
return item_price_data[0].price_list_rate
|
|
elif not ctx.get("price_list_uom_dependant"):
|
|
return flt(item_price_data[0].price_list_rate * flt(ctx.get("conversion_factor", 1)))
|
|
else:
|
|
return item_price_data[0].price_list_rate
|
|
|
|
|
|
def check_packing_list(price_list_rate_name, desired_qty, item_code):
|
|
"""
|
|
Check if the desired qty is within the increment of the packing list.
|
|
:param price_list_rate_name: Name of Item Price
|
|
:param desired_qty: Desired Qt
|
|
:param item_code: str, Item Doctype field item_code
|
|
:param qty: Desired Qt
|
|
"""
|
|
|
|
flag = True
|
|
if packing_unit := frappe.db.get_value("Item Price", price_list_rate_name, "packing_unit", cache=True):
|
|
packing_increment = desired_qty % packing_unit
|
|
|
|
if packing_increment != 0:
|
|
flag = False
|
|
|
|
return flag
|
|
|
|
|
|
def validate_conversion_rate(ctx: ItemDetailsCtx, meta):
|
|
from erpnext.controllers.accounts_controller import validate_conversion_rate
|
|
|
|
company_currency = frappe.get_cached_value("Company", ctx.company, "default_currency")
|
|
if not ctx.conversion_rate and ctx.currency == company_currency:
|
|
ctx.conversion_rate = 1.0
|
|
|
|
if not ctx.ignore_conversion_rate and ctx.conversion_rate == 1 and ctx.currency != company_currency:
|
|
ctx.conversion_rate = (
|
|
get_exchange_rate(ctx.currency, company_currency, ctx.transaction_date, "for_buying") or 1.0
|
|
)
|
|
|
|
# validate currency conversion rate
|
|
validate_conversion_rate(
|
|
ctx.currency, ctx.conversion_rate, meta.get_label("conversion_rate"), ctx.company
|
|
)
|
|
|
|
ctx.conversion_rate = flt(
|
|
ctx.conversion_rate,
|
|
get_field_precision(meta.get_field("conversion_rate"), frappe._dict({"fields": ctx})),
|
|
)
|
|
|
|
if ctx.price_list:
|
|
if not ctx.plc_conversion_rate and ctx.price_list_currency == frappe.db.get_value(
|
|
"Price List", ctx.price_list, "currency", cache=True
|
|
):
|
|
ctx.plc_conversion_rate = 1.0
|
|
|
|
# validate price list currency conversion rate
|
|
if not ctx.price_list_currency:
|
|
throw(_("Price List Currency not selected"))
|
|
else:
|
|
validate_conversion_rate(
|
|
ctx.price_list_currency,
|
|
ctx.plc_conversion_rate,
|
|
meta.get_label("plc_conversion_rate"),
|
|
ctx.company,
|
|
)
|
|
|
|
if meta.get_field("plc_conversion_rate"):
|
|
ctx.plc_conversion_rate = flt(
|
|
ctx.plc_conversion_rate,
|
|
get_field_precision(meta.get_field("plc_conversion_rate"), frappe._dict({"fields": ctx})),
|
|
)
|
|
|
|
|
|
def get_party_item_code(ctx: ItemDetailsCtx, item_doc, out: ItemDetails):
|
|
if ctx.transaction_type == "selling" and ctx.customer:
|
|
out.customer_item_code = None
|
|
|
|
if ctx.quotation_to and ctx.quotation_to != "Customer":
|
|
return
|
|
|
|
customer_item_code = item_doc.get("customer_items", {"customer_name": ctx.customer})
|
|
|
|
if customer_item_code:
|
|
out.customer_item_code = customer_item_code[0].ref_code
|
|
else:
|
|
customer_group = frappe.get_cached_value("Customer", ctx.customer, "customer_group")
|
|
customer_group_item_code = item_doc.get("customer_items", {"customer_group": customer_group})
|
|
if customer_group_item_code and not customer_group_item_code[0].customer_name:
|
|
out.customer_item_code = customer_group_item_code[0].ref_code
|
|
|
|
if ctx.transaction_type == "buying" and ctx.supplier:
|
|
item_supplier = item_doc.get("supplier_items", {"supplier": ctx.supplier})
|
|
out.supplier_part_no = item_supplier[0].supplier_part_no if item_supplier else None
|
|
|
|
|
|
from erpnext.deprecation_dumpster import get_pos_profile_item_details
|
|
|
|
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def get_pos_profile_item_details_(ctx: ItemDetailsCtx, company, pos_profile=None, update_data=False):
|
|
res = frappe._dict()
|
|
|
|
if not frappe.flags.pos_profile and not pos_profile:
|
|
pos_profile = frappe.flags.pos_profile = get_pos_profile(company, ctx.pos_profile)
|
|
|
|
if pos_profile:
|
|
for fieldname in ("income_account", "cost_center", "warehouse", "expense_account"):
|
|
if (not ctx.get(fieldname) or update_data) and pos_profile.get(fieldname):
|
|
res[fieldname] = pos_profile.get(fieldname)
|
|
|
|
if res.get("warehouse"):
|
|
res.actual_qty = get_bin_details(ctx.item_code, res.warehouse, include_child_warehouses=True).get(
|
|
"actual_qty"
|
|
)
|
|
|
|
return res
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_pos_profile(company, pos_profile=None, user=None):
|
|
if pos_profile:
|
|
return frappe.get_cached_doc("POS Profile", pos_profile)
|
|
|
|
if not user:
|
|
user = frappe.session["user"]
|
|
|
|
pf = frappe.qb.DocType("POS Profile")
|
|
pfu = frappe.qb.DocType("POS Profile User")
|
|
|
|
query = (
|
|
frappe.qb.from_(pf)
|
|
.left_join(pfu)
|
|
.on(pf.name == pfu.parent)
|
|
.select(pf.star)
|
|
.where((pfu.user == user) & (pfu.default == 1))
|
|
)
|
|
|
|
if company:
|
|
query = query.where(pf.company == company)
|
|
|
|
pos_profile = query.run(as_dict=True)
|
|
|
|
if not pos_profile and company:
|
|
pos_profile = (
|
|
frappe.qb.from_(pf)
|
|
.left_join(pfu)
|
|
.on(pf.name == pfu.parent)
|
|
.select(pf.star)
|
|
.where((pf.company == company) & (pf.disabled == 0))
|
|
).run(as_dict=True)
|
|
|
|
return pos_profile and pos_profile[0] or None
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_conversion_factor(item_code, uom):
|
|
item = frappe.get_cached_value("Item", item_code, ["variant_of", "stock_uom"], as_dict=True)
|
|
if not item_code or not item:
|
|
return {"conversion_factor": 1.0}
|
|
|
|
if uom == item.stock_uom:
|
|
return {"conversion_factor": 1.0}
|
|
|
|
filters = {"parent": item_code, "uom": uom}
|
|
|
|
if item.variant_of:
|
|
filters["parent"] = ("in", (item_code, item.variant_of))
|
|
conversion_factor = frappe.db.get_value("UOM Conversion Detail", filters, "conversion_factor")
|
|
if not conversion_factor:
|
|
conversion_factor = get_uom_conv_factor(uom, item.stock_uom)
|
|
|
|
return {"conversion_factor": conversion_factor or 1.0}
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_projected_qty(item_code, warehouse):
|
|
return {
|
|
"projected_qty": frappe.db.get_value(
|
|
"Bin", {"item_code": item_code, "warehouse": warehouse}, "projected_qty"
|
|
)
|
|
}
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_bin_details(item_code, warehouse, company=None, include_child_warehouses=False):
|
|
bin_details = {"projected_qty": 0, "actual_qty": 0, "reserved_qty": 0}
|
|
|
|
if warehouse:
|
|
from frappe.query_builder.functions import Coalesce, Sum
|
|
|
|
from erpnext.stock.doctype.warehouse.warehouse import get_child_warehouses
|
|
|
|
warehouses = get_child_warehouses(warehouse) if include_child_warehouses else [warehouse]
|
|
|
|
bin = frappe.qb.DocType("Bin")
|
|
bin_details = (
|
|
frappe.qb.from_(bin)
|
|
.select(
|
|
Coalesce(Sum(bin.projected_qty), 0).as_("projected_qty"),
|
|
Coalesce(Sum(bin.actual_qty), 0).as_("actual_qty"),
|
|
Coalesce(Sum(bin.reserved_qty), 0).as_("reserved_qty"),
|
|
)
|
|
.where((bin.item_code == item_code) & (bin.warehouse.isin(warehouses)))
|
|
).run(as_dict=True)[0]
|
|
|
|
if company:
|
|
bin_details["company_total_stock"] = get_company_total_stock(item_code, company)
|
|
|
|
return bin_details
|
|
|
|
|
|
def get_company_total_stock(item_code, company):
|
|
bin = frappe.qb.DocType("Bin")
|
|
wh = frappe.qb.DocType("Warehouse")
|
|
|
|
return (
|
|
frappe.qb.from_(bin)
|
|
.inner_join(wh)
|
|
.on(bin.warehouse == wh.name)
|
|
.select(Sum(bin.actual_qty))
|
|
.where((wh.company == company) & (bin.item_code == item_code))
|
|
).run()[0][0]
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_batch_qty(batch_no, warehouse, item_code):
|
|
from erpnext.stock.doctype.batch import batch
|
|
|
|
if batch_no:
|
|
return {"actual_batch_qty": batch.get_batch_qty(batch_no, warehouse)}
|
|
|
|
|
|
@frappe.whitelist()
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def apply_price_list(ctx: ItemDetailsCtx, as_doc=False, doc=None):
|
|
"""Apply pricelist on a document-like dict object and return as
|
|
{'parent': dict, 'children': list}
|
|
|
|
:param ctx: See below
|
|
:param as_doc: Updates value in the passed dict
|
|
|
|
ctx = {
|
|
"doctype": "",
|
|
"name": "",
|
|
"items": [{"doctype": "", "name": "", "item_code": "", "brand": "", "item_group": ""}, ...],
|
|
"conversion_rate": 1.0,
|
|
"selling_price_list": None,
|
|
"price_list_currency": None,
|
|
"price_list_uom_dependant": None,
|
|
"plc_conversion_rate": 1.0,
|
|
"doctype": "",
|
|
"name": "",
|
|
"supplier": None,
|
|
"transaction_date": None,
|
|
"conversion_rate": 1.0,
|
|
"buying_price_list": None,
|
|
"ignore_pricing_rule": 0/1
|
|
}
|
|
"""
|
|
_preprocess_ctx(ctx)
|
|
parent = get_price_list_currency_and_exchange_rate(ctx)
|
|
ctx.update(parent)
|
|
|
|
children = []
|
|
|
|
if "items" in ctx:
|
|
item_list = ctx.get("items")
|
|
ctx.update(parent)
|
|
|
|
for item in item_list:
|
|
ctx_copy = ItemDetailsCtx(ctx.copy())
|
|
ctx_copy.update(item)
|
|
item_details = apply_price_list_on_item(ctx_copy, doc=doc)
|
|
children.append(item_details)
|
|
|
|
if as_doc:
|
|
ctx.price_list_currency = (parent.price_list_currency,)
|
|
ctx.plc_conversion_rate = parent.plc_conversion_rate
|
|
if ctx.get("items"):
|
|
for i, item in enumerate(ctx.get("items")):
|
|
for fieldname in children[i]:
|
|
# if the field exists in the original doc
|
|
# update the value
|
|
if fieldname in item and fieldname not in ("name", "doctype"):
|
|
item[fieldname] = children[i][fieldname]
|
|
return ctx
|
|
else:
|
|
return {"parent": parent, "children": children}
|
|
|
|
|
|
def apply_price_list_on_item(ctx, doc=None):
|
|
item_doc = frappe.get_cached_doc("Item", ctx.item_code)
|
|
item_details = get_price_list_rate(ctx, item_doc)
|
|
item_details.update(get_pricing_rule_for_item(ctx, doc=doc))
|
|
|
|
return item_details
|
|
|
|
|
|
def get_price_list_currency_and_exchange_rate(ctx: ItemDetailsCtx):
|
|
if not ctx.price_list:
|
|
return {}
|
|
|
|
if ctx.doctype in ["Quotation", "Sales Order", "Delivery Note", "Sales Invoice"]:
|
|
ctx.update({"exchange_rate": "for_selling"})
|
|
elif ctx.doctype in ["Purchase Order", "Purchase Receipt", "Purchase Invoice"]:
|
|
ctx.update({"exchange_rate": "for_buying"})
|
|
|
|
price_list_details = get_price_list_details(ctx.price_list)
|
|
|
|
price_list_currency = price_list_details.get("currency")
|
|
price_list_uom_dependant = price_list_details.get("price_list_uom_dependant")
|
|
|
|
plc_conversion_rate = ctx.plc_conversion_rate
|
|
company_currency = get_company_currency(ctx.company)
|
|
|
|
if (not plc_conversion_rate) or (
|
|
price_list_currency and ctx.price_list_currency and price_list_currency != ctx.price_list_currency
|
|
):
|
|
# cksgb 19/09/2016: added args.transaction_date as posting_date argument for get_exchange_rate
|
|
plc_conversion_rate = (
|
|
get_exchange_rate(price_list_currency, company_currency, ctx.transaction_date, ctx.exchange_rate)
|
|
or plc_conversion_rate
|
|
)
|
|
|
|
return frappe._dict(
|
|
{
|
|
"price_list_currency": price_list_currency,
|
|
"price_list_uom_dependant": price_list_uom_dependant,
|
|
"plc_conversion_rate": plc_conversion_rate or 1,
|
|
}
|
|
)
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_default_bom(item_code=None):
|
|
def _get_bom(item):
|
|
bom = frappe.get_all("BOM", dict(item=item, is_active=True, is_default=True, docstatus=1), limit=1)
|
|
return bom[0].name if bom else None
|
|
|
|
if not item_code:
|
|
return
|
|
|
|
bom_name = _get_bom(item_code)
|
|
|
|
template_item = frappe.db.get_value("Item", item_code, "variant_of")
|
|
if not bom_name and template_item:
|
|
bom_name = _get_bom(template_item)
|
|
|
|
return bom_name
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_valuation_rate(item_code, company, warehouse=None):
|
|
if frappe.get_cached_value("Warehouse", warehouse, "is_group"):
|
|
return {"valuation_rate": 0.0}
|
|
|
|
item = get_item_defaults(item_code, company)
|
|
item_group = get_item_group_defaults(item_code, company)
|
|
brand = get_brand_defaults(item_code, company)
|
|
if item.get("is_stock_item"):
|
|
if not warehouse:
|
|
warehouse = (
|
|
item.get("default_warehouse")
|
|
or item_group.get("default_warehouse")
|
|
or brand.get("default_warehouse")
|
|
)
|
|
|
|
return frappe.db.get_value(
|
|
"Bin", {"item_code": item_code, "warehouse": warehouse}, ["valuation_rate"], as_dict=True
|
|
) or {"valuation_rate": 0}
|
|
|
|
elif not item.get("is_stock_item"):
|
|
pi_item = frappe.qb.DocType("Purchase Invoice Item")
|
|
valuation_rate = (
|
|
frappe.qb.from_(pi_item)
|
|
.select(Sum(pi_item.base_net_amount) / Sum(pi_item.qty * pi_item.conversion_factor))
|
|
.where((pi_item.docstatus == 1) & (pi_item.item_code == item_code))
|
|
).run()
|
|
|
|
if valuation_rate:
|
|
return {"valuation_rate": valuation_rate[0][0] or 0.0}
|
|
else:
|
|
return {"valuation_rate": 0.0}
|
|
|
|
|
|
def get_gross_profit(out: ItemDetails):
|
|
if out.valuation_rate:
|
|
out.update({"gross_profit": ((out.base_rate - out.valuation_rate) * out.stock_qty)})
|
|
|
|
return out
|
|
|
|
|
|
@frappe.whitelist()
|
|
def get_serial_no(_args, serial_nos=None, sales_order=None):
|
|
serial_nos = serial_nos or []
|
|
return serial_nos
|
|
|
|
|
|
def update_party_blanket_order(ctx: ItemDetailsCtx, out: ItemDetails | dict):
|
|
if out["against_blanket_order"]:
|
|
blanket_order_details = get_blanket_order_details(ctx)
|
|
if blanket_order_details:
|
|
out.update(blanket_order_details)
|
|
|
|
|
|
@frappe.whitelist()
|
|
@erpnext.normalize_ctx_input(ItemDetailsCtx)
|
|
def get_blanket_order_details(ctx: ItemDetailsCtx):
|
|
blanket_order_details = None
|
|
|
|
if ctx.item_code:
|
|
bo = frappe.qb.DocType("Blanket Order")
|
|
bo_item = frappe.qb.DocType("Blanket Order Item")
|
|
|
|
query = (
|
|
frappe.qb.from_(bo)
|
|
.from_(bo_item)
|
|
.select(bo_item.rate.as_("blanket_order_rate"), bo.name.as_("blanket_order"))
|
|
.where(
|
|
(bo.company == ctx.company)
|
|
& (bo_item.item_code == ctx.item_code)
|
|
& (bo.docstatus == 1)
|
|
& (bo.name == bo_item.parent)
|
|
)
|
|
)
|
|
|
|
if ctx.customer and ctx.doctype == "Sales Order":
|
|
query = query.where(bo.customer == ctx.customer)
|
|
elif ctx.supplier and ctx.doctype == "Purchase Order":
|
|
query = query.where(bo.supplier == ctx.supplier)
|
|
if ctx.blanket_order:
|
|
query = query.where(bo.name == ctx.blanket_order)
|
|
if ctx.transaction_date:
|
|
query = query.where(bo.to_date >= ctx.transaction_date)
|
|
|
|
blanket_order_details = query.run(as_dict=True)
|
|
blanket_order_details = blanket_order_details[0] if blanket_order_details else ""
|
|
|
|
return blanket_order_details
|