refactor: stock closing balance -> stock closing entry (#44489)

* refactor: stock closing balance

* perf: batchwise balance history report

* fix: stock ageing data for stock balance report
This commit is contained in:
rohitwaghchaure
2024-12-26 22:57:58 +05:30
committed by GitHub
parent ab1cca0c40
commit c9088f4955
21 changed files with 1176 additions and 476 deletions

View File

@@ -392,4 +392,5 @@ erpnext.patches.v15_0.migrate_old_item_wise_tax_detail_data_format
erpnext.patches.v15_0.set_is_exchange_gain_loss_in_payment_entry_deductions
erpnext.patches.v14_0.update_stock_uom_in_work_order_item
erpnext.patches.v15_0.enable_allow_existing_serial_no
erpnext.patches.v15_0.update_cc_in_process_statement_of_accounts
erpnext.patches.v15_0.update_cc_in_process_statement_of_accounts
erpnext.patches.v15_0.refactor_closing_stock_balance #5

View File

@@ -0,0 +1,60 @@
import frappe
from frappe import _
from frappe.custom.doctype.custom_field.custom_field import create_custom_fields
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import (
field_exists,
get_inventory_dimensions,
)
def execute():
add_inventory_dimensions_to_stock_closing_balance()
create_stock_closing_entries()
def add_inventory_dimensions_to_stock_closing_balance():
inventory_dimensions = get_inventory_dimensions()
dimension_fields_list = []
for inv_dim in inventory_dimensions:
if not frappe.db.get_value(
"Custom Field", {"dt": "Stock Closing Balance", "fieldname": inv_dim.fieldname}
) and not field_exists("Stock Closing Balance", inv_dim.fieldname):
dimension_field = frappe._dict()
dimension_field["mandatory_depends_on"] = ""
dimension_field["reqd"] = 0
dimension_field["fieldname"] = inv_dim.fieldname
dimension_field["label"] = inv_dim.dimension_name
dimension_field["fieldtype"] = "Link"
dimension_field["options"] = inv_dim.doctype
dimension_field["read_only"] = 1
dimension_field["insert_after"] = "inventory_dimension_section"
dimension_field["search_index"] = 1
dimension_fields_list.append(dimension_field)
if dimension_fields_list:
dimension_fields_list.insert(
0,
{
"label": _("Inventory Dimension"),
"fieldtype": "Section Break",
"fieldname": "inventory_dimension_section",
"insert_after": "stock_uom",
},
)
create_custom_fields({"Stock Closing Balance": dimension_fields_list})
def create_stock_closing_entries():
for row in frappe.get_all(
"Closing Stock Balance",
fields=["company", "status", "from_date", "to_date"],
filters={"docstatus": 1},
group_by="company",
order_by="creation desc",
):
new_entry = frappe.new_doc("Stock Closing Entry")
new_entry.update(row)
new_entry.save(ignore_permissions=True)
new_entry.submit()

View File

@@ -1,154 +0,0 @@
# Copyright (c) 2023, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import gzip
import json
import frappe
from frappe import _
from frappe.core.doctype.prepared_report.prepared_report import create_json_gz_file
from frappe.desk.form.load import get_attachments
from frappe.model.document import Document
from frappe.utils import get_link_to_form, parse_json
from frappe.utils.background_jobs import enqueue
from erpnext.stock.report.stock_balance.stock_balance import execute
class ClosingStockBalance(Document):
# begin: auto-generated types
# This code is auto-generated. Do not modify anything in this block.
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from frappe.types import DF
amended_from: DF.Link | None
company: DF.Link | None
from_date: DF.Date | None
include_uom: DF.Link | None
item_code: DF.Link | None
item_group: DF.Link | None
naming_series: DF.Literal["CBAL-.#####"]
status: DF.Literal["Draft", "Queued", "In Progress", "Completed", "Failed", "Canceled"]
to_date: DF.Date | None
warehouse: DF.Link | None
warehouse_type: DF.Link | None
# end: auto-generated types
def before_save(self):
self.set_status()
def set_status(self, save=False):
self.status = "Queued"
if self.docstatus == 2:
self.status = "Canceled"
if self.docstatus == 0:
self.status = "Draft"
if save:
self.db_set("status", self.status)
def validate(self):
self.validate_duplicate()
def validate_duplicate(self):
table = frappe.qb.DocType("Closing Stock Balance")
query = (
frappe.qb.from_(table)
.select(table.name)
.where(
(table.docstatus == 1)
& (table.company == self.company)
& (
(table.from_date.between(self.from_date, self.to_date))
| (table.to_date.between(self.from_date, self.to_date))
| ((table.from_date >= self.from_date) & (table.to_date >= self.to_date))
)
)
)
for fieldname in ["warehouse", "item_code", "item_group", "warehouse_type"]:
if self.get(fieldname):
query = query.where(table[fieldname] == self.get(fieldname))
query = query.run(as_dict=True)
if query and query[0].name:
name = get_link_to_form("Closing Stock Balance", query[0].name)
msg = f"Closing Stock Balance {name} already exists for the selected date range"
frappe.throw(_(msg), title=_("Duplicate Closing Stock Balance"))
def on_submit(self):
self.set_status(save=True)
self.enqueue_job()
def on_cancel(self):
self.set_status(save=True)
self.clear_attachment()
@frappe.whitelist()
def enqueue_job(self):
self.db_set("status", "In Progress")
self.clear_attachment()
enqueue(prepare_closing_stock_balance, name=self.name, queue="long", timeout=1500)
@frappe.whitelist()
def regenerate_closing_balance(self):
self.enqueue_job()
def clear_attachment(self):
if attachments := get_attachments(self.doctype, self.name):
attachment = attachments[0]
frappe.delete_doc("File", attachment.name)
def create_closing_stock_balance_entries(self):
columns, data = execute(
filters=frappe._dict(
{
"company": self.company,
"from_date": self.from_date,
"to_date": self.to_date,
"warehouse": self.warehouse,
"item_code": self.item_code,
"item_group": self.item_group,
"warehouse_type": self.warehouse_type,
"include_uom": self.include_uom,
"ignore_closing_balance": 1,
"show_variant_attributes": 1,
"show_stock_ageing_data": 1,
}
)
)
create_json_gz_file(
{"columns": columns, "data": data}, self.doctype, self.name, "closing-stock-balance"
)
def get_prepared_data(self):
if attachments := get_attachments(self.doctype, self.name):
attachment = attachments[0]
attached_file = frappe.get_doc("File", attachment.name)
data = gzip.decompress(attached_file.get_content())
if data := json.loads(data.decode("utf-8")):
data = data
return parse_json(data)
return frappe._dict({})
def prepare_closing_stock_balance(name):
doc = frappe.get_doc("Closing Stock Balance", name)
doc.db_set("status", "In Progress")
try:
doc.create_closing_stock_balance_entries()
doc.db_set("status", "Completed")
except Exception:
doc.db_set("status", "Failed")
doc.log_error(title="Closing Stock Balance Failed")

View File

@@ -1,18 +0,0 @@
# Copyright (c) 2023, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
# import frappe
from frappe.tests import IntegrationTestCase, UnitTestCase
class UnitTestClosingStockBalance(UnitTestCase):
"""
Unit tests for ClosingStockBalance.
Use this class for testing individual functions and methods.
"""
pass
class TestClosingStockBalance(IntegrationTestCase):
pass

View File

@@ -223,18 +223,17 @@ class InventoryDimension(Document):
self.add_transfer_field(self.document_type, dimension_fields)
custom_fields.setdefault(self.document_type, dimension_fields)
if (
dimension_fields
and not frappe.db.get_value(
"Custom Field", {"dt": "Stock Ledger Entry", "fieldname": self.target_fieldname}
)
and not field_exists("Stock Ledger Entry", self.target_fieldname)
):
dimension_field = dimension_fields[1]
dimension_field["mandatory_depends_on"] = ""
dimension_field["reqd"] = 0
dimension_field["fieldname"] = self.target_fieldname
custom_fields["Stock Ledger Entry"] = dimension_field
for dt in ["Stock Ledger Entry", "Stock Closing Balance"]:
if (
dimension_fields
and not frappe.db.get_value("Custom Field", {"dt": dt, "fieldname": self.target_fieldname})
and not field_exists(dt, self.target_fieldname)
):
dimension_field = dimension_fields[1]
dimension_field["mandatory_depends_on"] = ""
dimension_field["reqd"] = 0
dimension_field["fieldname"] = self.target_fieldname
custom_fields[dt] = dimension_field
filter_custom_fields = {}
if custom_fields:
@@ -390,8 +389,10 @@ def get_inventory_dimensions():
"distinct target_fieldname as fieldname",
"reference_document as doctype",
"validate_negative_stock",
"name as dimension_name",
],
filters={"disabled": 0},
order_by="creation",
)
frappe.local.inventory_dimensions = dimensions

View File

@@ -97,27 +97,27 @@ class RepostItemValuation(Document):
]
)
# Closing Stock Balance
# Stock Closing Balance
closing_stock = self.get_closing_stock_balance()
if closing_stock and closing_stock[0].name:
name = get_link_to_form("Closing Stock Balance", closing_stock[0].name)
to_date = frappe.format(closing_stock[0].to_date, "Date")
msg = f"Due to closing stock balance {name}, you cannot repost item valuation before {to_date}"
frappe.throw(_(msg))
name = get_link_to_form("Stock Closing Entry", closing_stock[0].name)
to_date = frappe.format(closing_stock[0].posting_date, "Date")
frappe.throw(
_("Due to stock closing entry {0}, you cannot repost item valuation before {1}").format(
name, to_date
)
)
def get_closing_stock_balance(self):
filters = {
"company": self.company,
"status": "Completed",
"docstatus": 1,
"to_date": (">=", self.posting_date),
"status": "Completed",
}
for field in ["warehouse", "item_code"]:
if self.get(field):
filters.update({field: ("in", ["", self.get(field)])})
return frappe.get_all("Closing Stock Balance", fields=["name", "to_date"], filters=filters)
return frappe.get_all(
"Stock Closing Entry", fields=["name", "to_date as posting_date"], filters=filters, limit=1
)
@staticmethod
def get_max_period_closing_date(company):

View File

@@ -391,17 +391,18 @@ class TestRepostItemValuation(IntegrationTestCase, StockTestMixin):
self.assertTrue(frappe.db.exists("Repost Item Valuation", {"voucher_no": pr.name}))
def test_repost_item_valuation_for_closing_stock_balance(self):
from erpnext.stock.doctype.closing_stock_balance.closing_stock_balance import (
from erpnext.stock.doctype.stock_closing_entry.stock_closing_entry import (
prepare_closing_stock_balance,
)
doc = frappe.new_doc("Closing Stock Balance")
doc = frappe.new_doc("Stock Closing Entry")
doc.company = "_Test Company"
doc.from_date = today()
doc.to_date = today()
doc.submit()
prepare_closing_stock_balance(doc.name)
doc.load_from_db()
self.assertEqual(doc.docstatus, 1)
self.assertEqual(doc.status, "Completed")

View File

@@ -0,0 +1,8 @@
// Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
// frappe.ui.form.on("Stock Closing Balance", {
// refresh(frm) {
// },
// });

View File

@@ -0,0 +1,262 @@
{
"actions": [],
"allow_copy": 1,
"creation": "2024-12-02 16:56:15.241776",
"default_view": "List",
"doctype": "DocType",
"document_type": "Other",
"engine": "InnoDB",
"field_order": [
"item_code",
"warehouse",
"batch_no",
"column_break_rvdz",
"posting_date",
"posting_time",
"posting_datetime",
"section_break_11",
"actual_qty",
"valuation_rate",
"column_break_17",
"stock_value",
"stock_value_difference",
"section_break_21",
"company",
"column_break_usgq",
"stock_closing_entry",
"section_break_fxjm",
"item_name",
"item_group",
"column_break_ljle",
"stock_uom",
"inventory_dimension_key",
"stock_ageing_section",
"fifo_queue"
],
"fields": [
{
"fieldname": "item_code",
"fieldtype": "Link",
"in_filter": 1,
"in_list_view": 1,
"in_standard_filter": 1,
"label": "Item Code",
"oldfieldname": "item_code",
"oldfieldtype": "Link",
"options": "Item",
"print_width": "100px",
"read_only": 1,
"search_index": 1,
"width": "100px"
},
{
"fieldname": "warehouse",
"fieldtype": "Link",
"in_filter": 1,
"in_list_view": 1,
"in_standard_filter": 1,
"label": "Warehouse",
"oldfieldname": "warehouse",
"oldfieldtype": "Link",
"options": "Warehouse",
"print_width": "100px",
"read_only": 1,
"search_index": 1,
"width": "100px"
},
{
"fieldname": "posting_date",
"fieldtype": "Date",
"in_filter": 1,
"in_list_view": 1,
"label": "Posting Date",
"oldfieldname": "posting_date",
"oldfieldtype": "Date",
"print_width": "100px",
"read_only": 1,
"search_index": 1,
"width": "100px"
},
{
"fieldname": "posting_time",
"fieldtype": "Time",
"label": "Posting Time",
"oldfieldname": "posting_time",
"oldfieldtype": "Time",
"print_width": "100px",
"read_only": 1,
"width": "100px"
},
{
"fieldname": "posting_datetime",
"fieldtype": "Datetime",
"label": "Posting Datetime",
"read_only": 1,
"search_index": 1
},
{
"fieldname": "section_break_11",
"fieldtype": "Section Break"
},
{
"fieldname": "actual_qty",
"fieldtype": "Float",
"in_filter": 1,
"in_list_view": 1,
"label": "Qty Change",
"oldfieldname": "actual_qty",
"oldfieldtype": "Currency",
"print_width": "150px",
"read_only": 1,
"width": "150px"
},
{
"fieldname": "column_break_17",
"fieldtype": "Column Break"
},
{
"fieldname": "valuation_rate",
"fieldtype": "Currency",
"label": "Valuation Rate",
"oldfieldname": "valuation_rate",
"oldfieldtype": "Currency",
"options": "Company:company:default_currency",
"print_width": "150px",
"read_only": 1,
"width": "150px"
},
{
"fieldname": "stock_value",
"fieldtype": "Currency",
"label": "Balance Stock Value",
"oldfieldname": "stock_value",
"oldfieldtype": "Currency",
"options": "Company:company:default_currency",
"read_only": 1
},
{
"fieldname": "section_break_21",
"fieldtype": "Section Break"
},
{
"fieldname": "company",
"fieldtype": "Link",
"in_filter": 1,
"label": "Company",
"oldfieldname": "company",
"oldfieldtype": "Data",
"options": "Company",
"print_width": "150px",
"read_only": 1,
"search_index": 1,
"width": "150px"
},
{
"fieldname": "stock_uom",
"fieldtype": "Link",
"label": "Stock UOM",
"oldfieldname": "stock_uom",
"oldfieldtype": "Data",
"options": "UOM",
"print_width": "150px",
"read_only": 1,
"width": "150px"
},
{
"fieldname": "column_break_rvdz",
"fieldtype": "Column Break"
},
{
"fieldname": "column_break_usgq",
"fieldtype": "Column Break"
},
{
"fieldname": "stock_value_difference",
"fieldtype": "Currency",
"label": "Change in Stock Value",
"read_only": 1
},
{
"fieldname": "section_break_fxjm",
"fieldtype": "Section Break"
},
{
"fieldname": "item_name",
"fieldtype": "Data",
"label": "Item Name",
"read_only": 1
},
{
"fieldname": "item_group",
"fieldtype": "Link",
"label": "Item Group",
"options": "Item Group",
"read_only": 1
},
{
"fieldname": "column_break_ljle",
"fieldtype": "Column Break"
},
{
"fieldname": "stock_closing_entry",
"fieldtype": "Link",
"label": "Stock Closing Entry",
"options": "Stock Closing Entry",
"read_only": 1,
"search_index": 1
},
{
"fieldname": "inventory_dimension_key",
"fieldtype": "Small Text",
"label": "Inventory Dimension key",
"read_only": 1
},
{
"fieldname": "batch_no",
"fieldtype": "Link",
"label": "Batch No",
"options": "Batch",
"read_only": 1,
"search_index": 1
},
{
"fieldname": "stock_ageing_section",
"fieldtype": "Section Break",
"label": "Stock Ageing"
},
{
"fieldname": "fifo_queue",
"fieldtype": "Long Text",
"label": "FIFO Queue",
"read_only": 1
}
],
"hide_toolbar": 1,
"icon": "fa fa-list",
"in_create": 1,
"index_web_pages_for_search": 1,
"links": [],
"modified": "2024-12-10 21:56:36.633567",
"modified_by": "Administrator",
"module": "Stock",
"name": "Stock Closing Balance",
"owner": "Administrator",
"permissions": [
{
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "Stock User"
},
{
"export": 1,
"read": 1,
"report": 1,
"role": "Accounts Manager"
}
],
"sort_field": "creation",
"sort_order": "DESC",
"states": []
}

View File

@@ -0,0 +1,36 @@
# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
# import frappe
from frappe.model.document import Document
class StockClosingBalance(Document):
# begin: auto-generated types
# This code is auto-generated. Do not modify anything in this block.
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from frappe.types import DF
actual_qty: DF.Float
batch_no: DF.Link | None
company: DF.Link | None
fifo_queue: DF.LongText | None
inventory_dimension_key: DF.SmallText | None
item_code: DF.Link | None
item_group: DF.Link | None
item_name: DF.Data | None
posting_date: DF.Date | None
posting_datetime: DF.Datetime | None
posting_time: DF.Time | None
stock_closing_entry: DF.Link | None
stock_uom: DF.Link | None
stock_value: DF.Currency
stock_value_difference: DF.Currency
valuation_rate: DF.Currency
warehouse: DF.Link | None
# end: auto-generated types
pass

View File

@@ -0,0 +1,29 @@
# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
# import frappe
from frappe.tests import IntegrationTestCase, UnitTestCase
# On IntegrationTestCase, the doctype test records and all
# link-field test record depdendencies are recursively loaded
# Use these module variables to add/remove to/from that list
EXTRA_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"]
IGNORE_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"]
class UnitTestStockClosingBalance(UnitTestCase):
"""
Unit tests for StockClosingBalance.
Use this class for testing individual functions and methods.
"""
pass
class IntegrationTestStockClosingBalance(IntegrationTestCase):
"""
Integration tests for StockClosingBalance.
Use this class for testing interactions between multiple components.
"""
pass

View File

@@ -1,7 +1,7 @@
// Copyright (c) 2023, Frappe Technologies Pvt. Ltd. and contributors
// For license information, please see license.txt
frappe.ui.form.on("Closing Stock Balance", {
frappe.ui.form.on("Stock Closing Entry", {
refresh(frm) {
frm.trigger("generate_closing_balance");
frm.trigger("regenerate_closing_balance");
@@ -9,7 +9,7 @@ frappe.ui.form.on("Closing Stock Balance", {
generate_closing_balance(frm) {
if (["Queued", "Failed"].includes(frm.doc.status)) {
frm.add_custom_button(__("Generate Closing Stock Balance"), () => {
frm.add_custom_button(__("Generate Stock Closing Entry"), () => {
frm.call({
method: "enqueue_job",
doc: frm.doc,
@@ -24,7 +24,7 @@ frappe.ui.form.on("Closing Stock Balance", {
regenerate_closing_balance(frm) {
if (frm.doc.status == "Completed") {
frm.add_custom_button(__("Regenerate Closing Stock Balance"), () => {
frm.add_custom_button(__("Regenerate Stock Closing Entry"), () => {
frm.call({
method: "regenerate_closing_balance",
doc: frm.doc,

View File

@@ -2,7 +2,7 @@
"actions": [],
"allow_rename": 1,
"autoname": "naming_series:",
"creation": "2023-05-17 09:58:42.086911",
"creation": "2024-12-07 14:07:01.356963",
"default_view": "List",
"doctype": "DocType",
"editable_grid": 1,
@@ -14,13 +14,6 @@
"column_break_p0s0",
"from_date",
"to_date",
"filters_section",
"item_code",
"item_group",
"include_uom",
"column_break_rm5w",
"warehouse",
"warehouse_type",
"amended_from"
],
"fields": [
@@ -34,7 +27,8 @@
"fieldname": "company",
"fieldtype": "Link",
"label": "Company",
"options": "Company"
"options": "Company",
"search_index": 1
},
{
"default": "Draft",
@@ -60,67 +54,29 @@
"fieldtype": "Date",
"label": "To Date"
},
{
"collapsible": 1,
"fieldname": "filters_section",
"fieldtype": "Section Break",
"label": "Filters"
},
{
"fieldname": "item_code",
"fieldtype": "Link",
"label": "Item Code",
"options": "Item"
},
{
"fieldname": "item_group",
"fieldtype": "Link",
"label": "Item Group",
"options": "Item Group"
},
{
"fieldname": "column_break_rm5w",
"fieldtype": "Column Break"
},
{
"fieldname": "warehouse",
"fieldtype": "Link",
"label": "Warehouse",
"options": "Warehouse"
},
{
"fieldname": "warehouse_type",
"fieldtype": "Link",
"label": "Warehouse Type",
"options": "Warehouse Type"
},
{
"fieldname": "amended_from",
"fieldtype": "Link",
"label": "Amended From",
"no_copy": 1,
"options": "Closing Stock Balance",
"options": "Stock Closing Entry",
"print_hide": 1,
"read_only": 1
},
{
"fieldname": "include_uom",
"fieldtype": "Link",
"label": "Include UOM",
"options": "UOM"
"read_only": 1,
"search_index": 1
}
],
"index_web_pages_for_search": 1,
"is_submittable": 1,
"links": [],
"modified": "2024-12-19 13:48:46.618066",
"modified": "2024-12-20 13:48:46.618066",
"modified_by": "Administrator",
"module": "Stock",
"name": "Closing Stock Balance",
"name": "Stock Closing Entry",
"naming_rule": "By \"Naming Series\" field",
"owner": "Administrator",
"permissions": [
{
"amend": 1,
"cancel": 1,
"create": 1,
"delete": 1,
@@ -135,20 +91,7 @@
"write": 1
},
{
"cancel": 1,
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "Stock Manager",
"share": 1,
"submit": 1,
"write": 1
},
{
"amend": 1,
"cancel": 1,
"create": 1,
"delete": 1,
@@ -161,6 +104,21 @@
"share": 1,
"submit": 1,
"write": 1
},
{
"amend": 1,
"cancel": 1,
"create": 1,
"delete": 1,
"email": 1,
"export": 1,
"print": 1,
"read": 1,
"report": 1,
"role": "Stock Manager",
"share": 1,
"submit": 1,
"write": 1
}
],
"sort_field": "creation",

View File

@@ -0,0 +1,423 @@
# Copyright (c) 2023, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt
import gzip
import json
import frappe
from frappe import _
from frappe.core.doctype.prepared_report.prepared_report import create_json_gz_file
from frappe.desk.form.load import get_attachments
from frappe.model.document import Document
from frappe.utils import add_days, get_date_str, get_link_to_form, nowtime, parse_json
from frappe.utils.background_jobs import enqueue
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import get_inventory_dimensions
class StockClosingEntry(Document):
# begin: auto-generated types
# This code is auto-generated. Do not modify anything in this block.
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from frappe.types import DF
amended_from: DF.Link | None
company: DF.Link | None
from_date: DF.Date | None
naming_series: DF.Literal["CBAL-.#####"]
status: DF.Literal["Draft", "Queued", "In Progress", "Completed", "Failed", "Canceled"]
to_date: DF.Date | None
# end: auto-generated types
def before_save(self):
self.set_status()
def set_status(self, save=False):
self.status = "Queued"
if self.docstatus == 2:
self.status = "Canceled"
if self.docstatus == 0:
self.status = "Draft"
if save:
self.db_set("status", self.status)
def validate(self):
self.validate_duplicate()
def validate_duplicate(self):
table = frappe.qb.DocType("Stock Closing Entry")
query = (
frappe.qb.from_(table)
.select(table.name)
.where(
(table.docstatus == 1)
& (table.company == self.company)
& (
(table.from_date.between(self.from_date, self.to_date))
| (table.to_date.between(self.from_date, self.to_date))
| ((table.from_date >= self.from_date) & (table.to_date >= self.to_date))
)
)
)
for fieldname in ["warehouse", "item_code", "item_group", "warehouse_type"]:
if self.get(fieldname):
query = query.where(table[fieldname] == self.get(fieldname))
query = query.run(as_dict=True)
if query and query[0].name:
name = get_link_to_form("Stock Closing Entry", query[0].name)
frappe.throw(
_("Stock Closing Entry {0} already exists for the selected date range").format(name),
title=_("Duplicate Stock Closing Entry"),
)
def on_submit(self):
self.set_status(save=True)
self.enqueue_job()
def on_cancel(self):
self.set_status(save=True)
self.remove_stock_closing()
def remove_stock_closing(self):
table = frappe.qb.DocType("Stock Closing Balance")
frappe.qb.from_(table).delete().where(table.stock_closing_entry == self.name).run()
@frappe.whitelist()
def enqueue_job(self):
self.db_set("status", "In Progress")
enqueue(prepare_closing_stock_balance, name=self.name, queue="long", timeout=1500)
frappe.msgprint(
_(
"Stock Closing Entry {0} has been queued for processing, system will take sometime to complete it."
).format(self.name)
)
@frappe.whitelist()
def regenerate_closing_balance(self):
self.remove_stock_closing()
self.enqueue_job()
def create_stock_closing_balance_entries(self):
from erpnext.stock.utils import get_combine_datetime
stk_cl_obj = StockClosing(self.company, self.from_date, self.to_date)
entries = stk_cl_obj.get_stock_closing_entries()
for key in entries:
row = entries[key]
if row.actual_qty == 0.0 and row.stock_value_difference == 0.0:
continue
if row.fifo_queue is not None:
row.fifo_queue = json.dumps(row.fifo_queue)
new_doc = frappe.new_doc("Stock Closing Balance")
new_doc.update(row)
new_doc.posting_date = self.to_date
new_doc.posting_time = nowtime()
new_doc.posting_datetime = get_combine_datetime(self.to_date, new_doc.posting_time)
new_doc.stock_closing_entry = self.name
new_doc.company = self.company
new_doc.save()
def get_prepared_data(self):
if attachments := get_attachments(self.doctype, self.name):
attachment = attachments[0]
attached_file = frappe.get_doc("File", attachment.name)
data = gzip.decompress(attached_file.get_content())
if data := json.loads(data.decode("utf-8")):
data = data
return parse_json(data)
return frappe._dict({})
def prepare_closing_stock_balance(name):
doc = frappe.get_doc("Stock Closing Entry", name)
doc.db_set("status", "In Progress")
try:
doc.create_stock_closing_balance_entries()
doc.db_set("status", "Completed")
except Exception:
doc.db_set("status", "Failed")
doc.log_error(title="Stock Closing Entry Failed")
class StockClosing:
def __init__(self, company, from_date, to_date, **kwargs):
self.company = company
self.from_date = from_date
self.to_date = to_date
self.kwargs = kwargs
self.inv_dimensions = get_inventory_dimensions()
self.last_closing_balance = self.get_last_stock_closing_entry()
def get_stock_closing_entries(self):
sl_entries = self.get_sle_entries()
closing_stock = frappe._dict()
for row in sl_entries:
dimensions_keys = self.get_keys(row)
for dimension_key in dimensions_keys:
for dimension_fields, dimension_values in dimension_key.items():
key = dimension_values
if key in closing_stock:
actual_qty = row.sabb_qty or row.actual_qty
closing_stock[key].actual_qty += actual_qty
closing_stock[key].stock_value_difference += (
row.sabb_stock_value_difference or row.stock_value_difference
)
if not row.actual_qty and row.qty_after_transaction:
closing_stock[key].actual_qty = row.qty_after_transaction
fifo_queue = closing_stock[key].fifo_queue
if fifo_queue:
self.update_fifo_queue(fifo_queue, actual_qty, row.posting_date)
closing_stock[key].fifo_queue = fifo_queue
else:
entries = self.get_initialized_entry(row, dimension_fields)
closing_stock[key] = entries
return closing_stock
def update_fifo_queue(self, fifo_queue, actual_qty, posting_date):
if actual_qty > 0:
fifo_queue.append([actual_qty, get_date_str(posting_date)])
else:
remaining_qty = actual_qty
for idx, queue in enumerate(fifo_queue):
if queue[0] + remaining_qty >= 0:
queue[0] += remaining_qty
if queue[0] == 0:
fifo_queue.pop(idx)
break
else:
remaining_qty += queue[0]
fifo_queue.pop(0)
def get_initialized_entry(self, row, dimension_fields):
item_details = frappe.get_cached_value(
"Item", row.item_code, ["item_group", "item_name", "stock_uom", "has_serial_no"], as_dict=1
)
inventory_dimension_key = None
if dimension_fields not in [("item_code", "warehouse"), ("item_code", "warehouse", "batch_no")]:
inventory_dimension_key = json.dumps(dimension_fields)
actual_qty = row.sabb_qty or row.actual_qty or row.qty_after_transaction
entry = frappe._dict(
{
"item_code": row.item_code,
"warehouse": row.warehouse,
"actual_qty": actual_qty,
"stock_value_difference": row.sabb_stock_value_difference or row.stock_value_difference,
"item_group": item_details.item_group,
"item_name": item_details.item_name,
"stock_uom": item_details.stock_uom,
"inventory_dimension_key": inventory_dimension_key,
"fifo_queue": [[actual_qty, get_date_str(row.posting_date)]]
if not item_details.has_serial_no
else [],
}
)
if row.sabb_batch_no:
row.batch_no = row.sabb_batch_no
# To update dimensions
for field in dimension_fields:
if row.get(field):
entry[field] = row.get(field)
return entry
def get_sle_entries(self):
sl_entries = []
if self.last_closing_balance:
self.from_date = add_days(self.last_closing_balance.to_date, 1)
sl_entries += self.get_entries(
"Stock Closing Balance",
fields=[
"item_code",
"warehouse",
"posting_date",
"posting_time",
"posting_datetime",
"batch_no",
"actual_qty",
"valuation_rate",
"stock_value",
"stock_value_difference",
],
filters={
"company": self.company,
"closing_stock_balance": self.last_closing_balance.name,
},
)
if not self.last_closing_balance:
self.from_date = "1900-01-01"
sl_entries += self.get_entries(
"Stock Ledger Entry",
fields=[
"item_code",
"warehouse",
"posting_date",
"posting_time",
"posting_datetime",
"batch_no",
"actual_qty",
"valuation_rate",
"stock_value",
"stock_value_difference",
"qty_after_transaction",
"stock_uom",
],
filters={
"company": self.company,
"posting_date": [self.from_date, self.to_date],
"is_cancelled": 0,
"docstatus": 1,
},
)
return sl_entries
def get_entries(self, doctype, fields, filters, **kwargs):
"""Get Stock Ledger Entries for the given filters."""
for dimension in self.inv_dimensions:
if dimension.fieldname not in fields:
fields.append(dimension.fieldname)
table = frappe.qb.DocType(doctype)
query = frappe.qb.from_(table).select(*fields).orderby(table.posting_datetime)
if filters:
for field, value in filters.items():
if field == "posting_date":
query = query.where(table[field].between(value[0], value[1]))
elif isinstance(value, list) or isinstance(value, tuple):
query = query.where(table[field].isin(value))
else:
query = query.where(table[field] == value)
for key, value in kwargs.items():
if value:
if isinstance(value, list) or isinstance(value, tuple):
query = query.where(table[key].isin(value))
else:
query = query.where(table[key] == value)
if doctype == "Stock Ledger Entry":
sabb_table = frappe.qb.DocType("Serial and Batch Entry")
query = query.left_join(sabb_table).on(
(sabb_table.parent == table.serial_and_batch_bundle) & (table.has_batch_no == 1)
)
query = query.select(sabb_table.batch_no.as_("sabb_batch_no"))
query = query.select(sabb_table.qty.as_("sabb_qty"))
query = query.select(sabb_table.stock_value_difference.as_("sabb_stock_value_difference"))
return query.run(as_dict=True)
def get_last_stock_closing_entry(self):
entries = frappe.get_all(
"Stock Closing Entry",
fields=["name", "to_date"],
filters={"company": self.company, "to_date": ["<", self.from_date], "docstatus": 1},
order_by="to_date desc, creation desc",
limit=1,
)
return entries[0] if entries else frappe._dict()
def get_keys(self, row):
keys = []
keys.append({("item_code", "warehouse"): (row.item_code, row.warehouse)})
if row.batch_no:
keys.append(
{("item_code", "warehouse", "batch_no"): (row.item_code, row.warehouse, row.batch_no)}
)
if row.sabb_batch_no:
keys.append(
{("item_code", "warehouse", "batch_no"): (row.item_code, row.warehouse, row.sabb_batch_no)}
)
dimension_fields = []
dimension_values = []
for dimension in self.inv_dimensions:
if row.get(dimension.fieldname):
keys.append(
{
("item_code", "warehouse", dimension.fieldname): (
row.item_code,
row.warehouse,
row.get(dimension.fieldname),
)
}
)
dimension_fields.append(dimension.fieldname)
dimension_values.append(row.get(dimension.fieldname))
if dimension_fields and len(dimension_fields) > 1:
keys.append(
{
("item_code", "warehouse", *dimension_fields): (
row.item_code,
row.warehouse,
*dimension_values,
)
}
)
return keys
def get_stock_closing_balance(self, kwargs, for_batch=False):
if not self.last_closing_balance:
return []
stock_closing_entry = self.last_closing_balance.name
if isinstance(kwargs, dict):
kwargs = frappe._dict(kwargs)
table = frappe.qb.DocType("Stock Closing Balance")
query = frappe.qb.from_(table).select("*").where(table.stock_closing_entry == stock_closing_entry)
for key, value in kwargs.items():
if key == "inventory_dimension_key":
if isinstance(value, tuple) and value[0] == "is" and value[1] == "not set":
query = query.where(
table.inventory_dimension_key.isnull() | (table.inventory_dimension_key == "")
)
elif isinstance(value, list) or isinstance(value, tuple):
query = query.where(table[key].isin(value))
else:
query = query.where(table[key] == value)
if for_batch:
query = query.where(table.batch_no.isnotnull())
query = query.where(
table.inventory_dimension_key.isnull() | (table.inventory_dimension_key == "")
)
return query.run(as_dict=True)

View File

@@ -0,0 +1,13 @@
from frappe import _
def get_data():
return {
"fieldname": "stock_closing_entry",
"transactions": [
{
"label": _("Stock Closing Log"),
"items": ["Stock Closing Balance"],
},
],
}

View File

@@ -0,0 +1,6 @@
frappe.listview_settings["Stock Closing Entry"] = {
add_fields: ["status"],
get_indicator: function (doc) {
return [__(doc.status), frappe.utils.guess_colour(doc.status), "status,=," + doc.status];
},
};

View File

@@ -0,0 +1,29 @@
# Copyright (c) 2024, Frappe Technologies Pvt. Ltd. and Contributors
# See license.txt
# import frappe
from frappe.tests import IntegrationTestCase, UnitTestCase
# On IntegrationTestCase, the doctype test records and all
# link-field test record depdendencies are recursively loaded
# Use these module variables to add/remove to/from that list
EXTRA_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"]
IGNORE_TEST_RECORD_DEPENDENCIES = [] # eg. ["User"]
class UnitTestStockClosingEntry(UnitTestCase):
"""
Unit tests for StockClosingEntry.
Use this class for testing individual functions and methods.
"""
pass
class IntegrationTestStockClosingEntry(IntegrationTestCase):
"""
Integration tests for StockClosingEntry.
Use this class for testing interactions between multiple components.
"""
pass

View File

@@ -8,6 +8,7 @@ from frappe.utils import add_to_date, cint, flt, get_datetime, get_table_name, g
from pypika import functions as fn
from erpnext.deprecation_dumpster import deprecated
from erpnext.stock.doctype.stock_closing_entry.stock_closing_entry import StockClosing
from erpnext.stock.doctype.warehouse.warehouse import apply_warehouse_filter
SLE_COUNT_LIMIT = 10_000
@@ -94,12 +95,36 @@ def get_columns(filters):
def get_stock_ledger_entries(filters):
entries = get_stock_ledger_entries_for_batch_no(filters)
entries = []
stk_cl_obj = StockClosing(filters.company, filters.from_date, filters.from_date)
if stk_cl_obj.last_closing_balance:
entries += get_stock_closing_balance(stk_cl_obj, filters)
filters.start_from = stk_cl_obj.last_closing_balance.to_date
entries += get_stock_ledger_entries_for_batch_no(filters)
entries += get_stock_ledger_entries_for_batch_bundle(filters)
return entries
def get_stock_closing_balance(stk_cl_obj, filters):
query_filters = {}
for field in ["item_code", "warehouse", "company", "batch_no"]:
if filters.get(field):
query_filters[field] = filters.get(field)
if filters.warehouse_type:
warehouses = frappe.get_all(
"Warehouse",
filters={"warehouse_type": filters.warehouse_type, "is_group": 0},
pluck="name",
)
query_filters["warehouse"] = warehouses
return stk_cl_obj.get_stock_closing_balance(query_filters, for_batch=True)
@deprecated(f"{__name__}.get_stock_ledger_entries_for_batch_no", "unknown", "v16", "No known instructions.")
def get_stock_ledger_entries_for_batch_no(filters):
if not filters.get("from_date"):
@@ -144,6 +169,9 @@ def get_stock_ledger_entries_for_batch_no(filters):
if filters.get(field):
query = query.where(sle[field] == filters.get(field))
if filters.start_from:
query = query.where(sle.posting_datetime > get_datetime(filters.start_from))
return query.run(as_dict=True) or []
@@ -190,6 +218,9 @@ def get_stock_ledger_entries_for_batch_bundle(filters):
else:
query = query.where(sle[field] == filters.get(field))
if filters.start_from:
query = query.where(sle.posting_date > getdate(filters.start_from))
return query.run(as_dict=True) or []

View File

@@ -2,6 +2,7 @@
# License: GNU General Public License v3. See license.txt
import json
from operator import itemgetter
from typing import Any, TypedDict
@@ -14,6 +15,7 @@ from frappe.utils.nestedset import get_descendants_of
import erpnext
from erpnext.stock.doctype.inventory_dimension.inventory_dimension import get_inventory_dimensions
from erpnext.stock.doctype.stock_closing_entry.stock_closing_entry import StockClosing
from erpnext.stock.doctype.warehouse.warehouse import apply_warehouse_filter
from erpnext.stock.report.stock_ageing.stock_ageing import FIFOSlots, get_average_age
from erpnext.stock.utils import add_additional_uom_columns
@@ -60,9 +62,11 @@ class StockBalanceReport:
def run(self):
self.float_precision = cint(frappe.db.get_default("float_precision")) or 3
self.item_warehouse_map = frappe._dict({})
self.inventory_dimensions = self.get_inventory_dimension_fields()
self.prepare_opening_data_from_closing_balance()
self.prepare_stock_ledger_entries()
self.prepare_opening_stock()
self.prepare_sle_query()
self.prepare_item_warehouse_map_for_current_period()
self.prepare_new_data()
if not self.columns:
@@ -72,220 +76,72 @@ class StockBalanceReport:
return self.columns, self.data
def prepare_opening_data_from_closing_balance(self) -> None:
self.opening_data = frappe._dict({})
def prepare_opening_stock(self) -> None:
opening_entries = self.get_entries_from_stock_closing_balance()
closing_balance = self.get_closing_balance()
if not closing_balance:
return
for entry in opening_entries:
key = self.get_group_by_key(entry)
self.start_from = add_days(closing_balance[0].to_date, 1)
res = frappe.get_doc("Closing Stock Balance", closing_balance[0].name).get_prepared_data()
for entry in res.data:
entry = frappe._dict(entry)
group_by_key = self.get_group_by_key(entry)
if group_by_key not in self.opening_data:
self.opening_data.setdefault(group_by_key, entry)
def prepare_new_data(self):
self.item_warehouse_map = self.get_item_warehouse_map()
if self.filters.get("show_stock_ageing_data"):
self.filters["show_warehouse_wise_stock"] = True
item_wise_fifo_queue = FIFOSlots(self.filters, self.sle_entries).generate()
_func = itemgetter(1)
del self.sle_entries
sre_details = self.get_sre_reserved_qty_details()
variant_values = {}
if self.filters.get("show_variant_attributes"):
variant_values = self.get_variant_values_for()
for _key, report_data in self.item_warehouse_map.items():
if variant_data := variant_values.get(report_data.item_code):
report_data.update(variant_data)
if self.filters.get("show_stock_ageing_data"):
opening_fifo_queue = self.get_opening_fifo_queue(report_data) or []
fifo_queue = []
if fifo_queue := item_wise_fifo_queue.get((report_data.item_code, report_data.warehouse)):
fifo_queue = fifo_queue.get("fifo_queue")
if fifo_queue:
opening_fifo_queue.extend(fifo_queue)
stock_ageing_data = {"average_age": 0, "earliest_age": 0, "latest_age": 0}
if opening_fifo_queue:
fifo_queue = sorted(filter(_func, opening_fifo_queue), key=_func)
if not fifo_queue:
continue
to_date = self.to_date
stock_ageing_data["average_age"] = get_average_age(fifo_queue, to_date)
stock_ageing_data["earliest_age"] = date_diff(to_date, fifo_queue[0][1])
stock_ageing_data["latest_age"] = date_diff(to_date, fifo_queue[-1][1])
stock_ageing_data["fifo_queue"] = fifo_queue
report_data.update(stock_ageing_data)
report_data.update(
{"reserved_stock": sre_details.get((report_data.item_code, report_data.warehouse), 0.0)}
self.item_warehouse_map[key] = frappe._dict(
{
"item_code": entry.item_code,
"warehouse": entry.warehouse,
"item_group": entry.item_group,
"company": entry.company,
"currency": self.company_currency,
"stock_uom": entry.stock_uom,
"item_name": entry.item_name,
"opening_qty": entry.actual_qty,
"opening_val": entry.stock_value_difference,
"opening_fifo_queue": json.loads(entry.fifo_queue) if entry.fifo_queue else [],
"in_qty": 0.0,
"in_val": 0.0,
"out_qty": 0.0,
"out_val": 0.0,
"bal_qty": entry.actual_qty,
"bal_val": entry.stock_value_difference,
"val_rate": 0.0,
}
)
if (
not self.filters.get("include_zero_stock_items")
and report_data
and report_data.bal_qty == 0
and report_data.bal_val == 0
):
continue
self.data.append(report_data)
def get_item_warehouse_map(self):
item_warehouse_map = {}
self.opening_vouchers = self.get_opening_vouchers()
if self.filters.get("show_stock_ageing_data"):
self.sle_entries = self.sle_query.run(as_dict=True)
# HACK: This is required to avoid causing db query in flt
_system_settings = frappe.get_cached_doc("System Settings")
with frappe.db.unbuffered_cursor():
if not self.filters.get("show_stock_ageing_data"):
self.sle_entries = self.sle_query.run(as_dict=True, as_iterator=True)
for entry in self.sle_entries:
group_by_key = self.get_group_by_key(entry)
if group_by_key not in item_warehouse_map:
self.initialize_data(item_warehouse_map, group_by_key, entry)
self.prepare_item_warehouse_map(item_warehouse_map, entry, group_by_key)
if self.opening_data.get(group_by_key):
del self.opening_data[group_by_key]
for group_by_key, entry in self.opening_data.items():
if group_by_key not in item_warehouse_map:
self.initialize_data(item_warehouse_map, group_by_key, entry)
item_warehouse_map = filter_items_with_no_transactions(
item_warehouse_map, self.float_precision, self.inventory_dimensions
)
return item_warehouse_map
def get_sre_reserved_qty_details(self) -> dict:
from erpnext.stock.doctype.stock_reservation_entry.stock_reservation_entry import (
get_sre_reserved_qty_for_items_and_warehouses as get_reserved_qty_details,
)
item_code_list, warehouse_list = [], []
for d in self.item_warehouse_map:
item_code_list.append(d[1])
warehouse_list.append(d[2])
return get_reserved_qty_details(item_code_list, warehouse_list)
def prepare_item_warehouse_map(self, item_warehouse_map, entry, group_by_key):
qty_dict = item_warehouse_map[group_by_key]
for field in self.inventory_dimensions:
qty_dict[field] = entry.get(field)
if entry.voucher_type == "Stock Reconciliation" and (not entry.batch_no or entry.serial_no):
qty_diff = flt(entry.qty_after_transaction) - flt(qty_dict.bal_qty)
else:
qty_diff = flt(entry.actual_qty)
value_diff = flt(entry.stock_value_difference)
if entry.posting_date < self.from_date or entry.voucher_no in self.opening_vouchers.get(
entry.voucher_type, []
):
qty_dict.opening_qty += qty_diff
qty_dict.opening_val += value_diff
elif entry.posting_date >= self.from_date and entry.posting_date <= self.to_date:
if flt(qty_diff, self.float_precision) >= 0:
qty_dict.in_qty += qty_diff
qty_dict.in_val += value_diff
else:
qty_dict.out_qty += abs(qty_diff)
qty_dict.out_val += abs(value_diff)
qty_dict.val_rate = entry.valuation_rate
qty_dict.bal_qty += qty_diff
qty_dict.bal_val += value_diff
def initialize_data(self, item_warehouse_map, group_by_key, entry):
opening_data = self.opening_data.get(group_by_key, {})
item_warehouse_map[group_by_key] = frappe._dict(
{
"item_code": entry.item_code,
"warehouse": entry.warehouse,
"item_group": entry.item_group,
"company": entry.company,
"currency": self.company_currency,
"stock_uom": entry.stock_uom,
"item_name": entry.item_name,
"opening_qty": opening_data.get("bal_qty") or 0.0,
"opening_val": opening_data.get("bal_val") or 0.0,
"opening_fifo_queue": opening_data.get("fifo_queue") or [],
"in_qty": 0.0,
"in_val": 0.0,
"out_qty": 0.0,
"out_val": 0.0,
"bal_qty": opening_data.get("bal_qty") or 0.0,
"bal_val": opening_data.get("bal_val") or 0.0,
"val_rate": 0.0,
}
)
def get_group_by_key(self, row) -> tuple:
group_by_key = [row.company, row.item_code, row.warehouse]
for fieldname in self.inventory_dimensions:
if not row.get(fieldname):
continue
if self.filters.get(fieldname) or self.filters.get("show_dimension_wise_stock"):
group_by_key.append(row.get(fieldname))
return tuple(group_by_key)
def get_closing_balance(self) -> list[dict[str, Any]]:
if self.filters.get("ignore_closing_balance"):
def get_entries_from_stock_closing_balance(self) -> list:
stk_cl_obj = StockClosing(self.filters.company, self.from_date, self.from_date)
if not stk_cl_obj.last_closing_balance:
return []
table = frappe.qb.DocType("Closing Stock Balance")
self.start_from = add_days(stk_cl_obj.last_closing_balance.to_date, 1)
query = (
frappe.qb.from_(table)
.select(table.name, table.to_date)
.where(
(table.docstatus == 1)
& (table.company == self.filters.company)
& (table.to_date <= self.from_date)
& (table.status == "Completed")
)
.orderby(table.to_date, order=Order.desc)
.limit(1)
)
query_filters = {}
dimenion_keys = []
for field in self.filter_fields():
if not self.filters.get(field):
continue
for fieldname in ["warehouse", "item_code", "item_group", "warehouse_type"]:
if self.filters.get(fieldname):
query = query.where(table[fieldname] == self.filters.get(fieldname))
if field in self.inventory_dimensions:
dimenion_keys.append(field)
return query.run(as_dict=True)
query_filters[field] = self.filters.get(field)
def prepare_stock_ledger_entries(self):
if dimenion_keys:
query_filters["inventory_dimension_key"] = json.dumps(("item_code", "warehouse", *dimenion_keys))
else:
query_filters["inventory_dimension_key"] = ("is", "not set")
opening_entries = stk_cl_obj.get_stock_closing_balance(query_filters)
if not opening_entries:
return []
return opening_entries
def filter_fields(self) -> list[str]:
fields = ["item_code", "warehouse"]
for field in self.inventory_dimensions:
fields.append(field)
return fields
def prepare_sle_query(self):
sle = frappe.qb.DocType("Stock Ledger Entry")
item_table = frappe.qb.DocType("Item")
@@ -330,6 +186,164 @@ class StockBalanceReport:
self.sle_query = query
def prepare_item_warehouse_map_for_current_period(self):
self.opening_vouchers = self.get_opening_vouchers()
if self.filters.get("show_stock_ageing_data"):
self.sle_entries = self.sle_query.run(as_dict=True)
# HACK: This is required to avoid causing db query in flt
_system_settings = frappe.get_cached_doc("System Settings")
with frappe.db.unbuffered_cursor():
if not self.filters.get("show_stock_ageing_data"):
self.sle_entries = self.sle_query.run(as_dict=True, as_iterator=True)
for entry in self.sle_entries:
group_by_key = self.get_group_by_key(entry)
if group_by_key not in self.item_warehouse_map:
self.initialize_data(group_by_key, entry)
self.prepare_item_warehouse_map(entry, group_by_key)
self.item_warehouse_map = filter_items_with_no_transactions(
self.item_warehouse_map, self.float_precision, self.inventory_dimensions
)
def prepare_new_data(self):
if self.filters.get("show_stock_ageing_data"):
self.filters["show_warehouse_wise_stock"] = True
item_wise_fifo_queue = FIFOSlots(self.filters).generate()
_func = itemgetter(1)
del self.sle_entries
sre_details = self.get_sre_reserved_qty_details()
variant_values = {}
if self.filters.get("show_variant_attributes"):
variant_values = self.get_variant_values_for()
for _key, report_data in self.item_warehouse_map.items():
if variant_data := variant_values.get(report_data.item_code):
report_data.update(variant_data)
if self.filters.get("show_stock_ageing_data"):
opening_fifo_queue = self.get_opening_fifo_queue(report_data) or []
fifo_queue = []
if fifo_queue := item_wise_fifo_queue.get((report_data.item_code, report_data.warehouse)):
fifo_queue = fifo_queue.get("fifo_queue")
if fifo_queue:
opening_fifo_queue.extend(fifo_queue)
stock_ageing_data = {"average_age": 0, "earliest_age": 0, "latest_age": 0}
if opening_fifo_queue:
fifo_queue = sorted(filter(_func, opening_fifo_queue), key=_func)
if not fifo_queue:
continue
to_date = self.to_date
stock_ageing_data["average_age"] = get_average_age(fifo_queue, to_date)
stock_ageing_data["earliest_age"] = date_diff(to_date, fifo_queue[0][1])
stock_ageing_data["latest_age"] = date_diff(to_date, fifo_queue[-1][1])
stock_ageing_data["fifo_queue"] = fifo_queue
report_data.update(stock_ageing_data)
report_data.update(
{"reserved_stock": sre_details.get((report_data.item_code, report_data.warehouse), 0.0)}
)
if (
not self.filters.get("include_zero_stock_items")
and report_data
and report_data.bal_qty == 0
and report_data.bal_val == 0
):
continue
self.data.append(report_data)
def get_sre_reserved_qty_details(self) -> dict:
from erpnext.stock.doctype.stock_reservation_entry.stock_reservation_entry import (
get_sre_reserved_qty_for_items_and_warehouses as get_reserved_qty_details,
)
item_code_list, warehouse_list = [], []
for d in self.item_warehouse_map:
item_code_list.append(d[0])
warehouse_list.append(d[1])
return get_reserved_qty_details(item_code_list, warehouse_list)
def prepare_item_warehouse_map(self, entry, group_by_key):
qty_dict = self.item_warehouse_map[group_by_key]
for field in self.inventory_dimensions:
qty_dict[field] = entry.get(field)
if entry.voucher_type == "Stock Reconciliation" and (not entry.batch_no or entry.serial_no):
qty_diff = flt(entry.qty_after_transaction) - flt(qty_dict.bal_qty)
else:
qty_diff = flt(entry.actual_qty)
value_diff = flt(entry.stock_value_difference)
if entry.posting_date < self.from_date or entry.voucher_no in self.opening_vouchers.get(
entry.voucher_type, []
):
qty_dict.opening_qty += qty_diff
qty_dict.opening_val += value_diff
elif entry.posting_date >= self.from_date and entry.posting_date <= self.to_date:
if flt(qty_diff, self.float_precision) >= 0:
qty_dict.in_qty += qty_diff
qty_dict.in_val += value_diff
else:
qty_dict.out_qty += abs(qty_diff)
qty_dict.out_val += abs(value_diff)
qty_dict.val_rate = entry.valuation_rate
qty_dict.bal_qty += qty_diff
qty_dict.bal_val += value_diff
def initialize_data(self, group_by_key, entry):
self.item_warehouse_map[group_by_key] = frappe._dict(
{
"item_code": entry.item_code,
"warehouse": entry.warehouse,
"item_group": entry.item_group,
"company": entry.company,
"currency": self.company_currency,
"stock_uom": entry.stock_uom,
"item_name": entry.item_name,
"opening_qty": 0.0,
"opening_val": 0.0,
"opening_fifo_queue": [],
"in_qty": 0.0,
"in_val": 0.0,
"out_qty": 0.0,
"out_val": 0.0,
"bal_qty": 0.0,
"bal_val": 0.0,
"val_rate": 0.0,
}
)
def get_group_by_key(self, row) -> tuple:
group_by_key = [row.item_code, row.warehouse]
for fieldname in self.inventory_dimensions:
if not row.get(fieldname):
continue
if self.filters.get(fieldname) or self.filters.get("show_dimension_wise_stock"):
group_by_key.append(row.get(fieldname))
return tuple(group_by_key)
def apply_inventory_dimensions_filters(self, query, sle) -> str:
inventory_dimension_fields = self.get_inventory_dimension_fields()
if inventory_dimension_fields: