fix: get_query for batch number and incorrect batch qty

This commit is contained in:
Rohit Waghchaure
2023-06-04 16:09:01 +05:30
parent 14292ffc6f
commit acd12c5830
5 changed files with 259 additions and 136 deletions

View File

@@ -3,12 +3,13 @@
import json
from collections import defaultdict
from collections import OrderedDict, defaultdict
import frappe
from frappe import scrub
from frappe.desk.reportview import get_filters_cond, get_match_cond
from frappe.utils import nowdate, unique
from frappe.query_builder.functions import Concat, Sum
from frappe.utils import nowdate, today, unique
import erpnext
from erpnext.stock.get_item_details import _get_item_tax_template
@@ -412,95 +413,136 @@ def get_delivery_notes_to_be_billed(doctype, txt, searchfield, start, page_len,
@frappe.validate_and_sanitize_search_inputs
def get_batch_no(doctype, txt, searchfield, start, page_len, filters):
doctype = "Batch"
cond = ""
if filters.get("posting_date"):
cond = "and (batch.expiry_date is null or batch.expiry_date >= %(posting_date)s)"
batch_nos = None
args = {
"item_code": filters.get("item_code"),
"warehouse": filters.get("warehouse"),
"posting_date": filters.get("posting_date"),
"txt": "%{0}%".format(txt),
"start": start,
"page_len": page_len,
}
having_clause = "having sum(sle.actual_qty) > 0"
if filters.get("is_return"):
having_clause = ""
meta = frappe.get_meta(doctype, cached=True)
searchfields = meta.get_search_fields()
search_columns = ""
search_cond = ""
query = get_batches_from_stock_ledger_entries(searchfields, txt, filters)
bundle_query = get_batches_from_serial_and_batch_bundle(searchfields, txt, filters)
if searchfields:
search_columns = ", " + ", ".join(searchfields)
search_cond = " or " + " or ".join([field + " like %(txt)s" for field in searchfields])
data = (
frappe.qb.from_((query) + (bundle_query))
.select("batch_no", "qty", "manufacturing_date", "expiry_date")
.offset(start)
.limit(page_len)
)
if args.get("warehouse"):
searchfields = ["batch." + field for field in searchfields]
if searchfields:
search_columns = ", " + ", ".join(searchfields)
search_cond = " or " + " or ".join([field + " like %(txt)s" for field in searchfields])
for field in searchfields:
data = data.select(field)
batch_nos = frappe.db.sql(
"""select sle.batch_no, round(sum(sle.actual_qty),2), sle.stock_uom,
concat('MFG-',batch.manufacturing_date), concat('EXP-',batch.expiry_date)
{search_columns}
from `tabStock Ledger Entry` sle
INNER JOIN `tabBatch` batch on sle.batch_no = batch.name
where
batch.disabled = 0
and sle.is_cancelled = 0
and sle.item_code = %(item_code)s
and sle.warehouse = %(warehouse)s
and (sle.batch_no like %(txt)s
or batch.expiry_date like %(txt)s
or batch.manufacturing_date like %(txt)s
{search_cond})
and batch.docstatus < 2
{cond}
{match_conditions}
group by batch_no {having_clause}
order by batch.expiry_date, sle.batch_no desc
limit %(page_len)s offset %(start)s""".format(
search_columns=search_columns,
cond=cond,
match_conditions=get_match_cond(doctype),
having_clause=having_clause,
search_cond=search_cond,
),
args,
data = data.run()
data = get_filterd_batches(data)
return data
def get_filterd_batches(data):
batches = OrderedDict()
for batch_data in data:
if batch_data[0] not in batches:
batches[batch_data[0]] = list(batch_data)
else:
batches[batch_data[0]][1] += batch_data[1]
filterd_batch = []
for batch, batch_data in batches.items():
if batch_data[1] > 0:
filterd_batch.append(tuple(batch_data))
return filterd_batch
def get_batches_from_stock_ledger_entries(searchfields, txt, filters):
stock_ledger_entry = frappe.qb.DocType("Stock Ledger Entry")
batch_table = frappe.qb.DocType("Batch")
expiry_date = filters.get("posting_date") or today()
query = (
frappe.qb.from_(stock_ledger_entry)
.inner_join(batch_table)
.on(batch_table.name == stock_ledger_entry.batch_no)
.select(
stock_ledger_entry.batch_no,
Sum(stock_ledger_entry.actual_qty).as_("qty"),
)
return batch_nos
else:
return frappe.db.sql(
"""select name, concat('MFG-', manufacturing_date), concat('EXP-',expiry_date)
{search_columns}
from `tabBatch` batch
where batch.disabled = 0
and item = %(item_code)s
and (name like %(txt)s
or expiry_date like %(txt)s
or manufacturing_date like %(txt)s
{search_cond})
and docstatus < 2
{0}
{match_conditions}
order by expiry_date, name desc
limit %(page_len)s offset %(start)s""".format(
cond,
search_columns=search_columns,
search_cond=search_cond,
match_conditions=get_match_cond(doctype),
),
args,
.where(((batch_table.expiry_date >= expiry_date) | (batch_table.expiry_date.isnull())))
.where(stock_ledger_entry.is_cancelled == 0)
.where(
(stock_ledger_entry.item_code == filters.get("item_code"))
& (batch_table.disabled == 0)
& (stock_ledger_entry.batch_no.isnotnull())
)
.groupby(stock_ledger_entry.batch_no, stock_ledger_entry.warehouse)
)
query = query.select(
Concat("MFG-", batch_table.manufacturing_date).as_("manufacturing_date"),
Concat("EXP-", batch_table.expiry_date).as_("expiry_date"),
)
if filters.get("warehouse"):
query = query.where(stock_ledger_entry.warehouse == filters.get("warehouse"))
for field in searchfields:
query = query.select(batch_table[field])
if txt:
txt_condition = batch_table.name.like(txt)
for field in searchfields + ["name"]:
txt_condition |= batch_table[field].like(txt)
query = query.where(txt_condition)
return query
def get_batches_from_serial_and_batch_bundle(searchfields, txt, filters):
bundle = frappe.qb.DocType("Serial and Batch Entry")
stock_ledger_entry = frappe.qb.DocType("Stock Ledger Entry")
batch_table = frappe.qb.DocType("Batch")
expiry_date = filters.get("posting_date") or today()
bundle_query = (
frappe.qb.from_(bundle)
.inner_join(stock_ledger_entry)
.on(bundle.parent == stock_ledger_entry.serial_and_batch_bundle)
.inner_join(batch_table)
.on(batch_table.name == bundle.batch_no)
.select(
bundle.batch_no,
Sum(bundle.qty).as_("qty"),
)
.where(((batch_table.expiry_date >= expiry_date) | (batch_table.expiry_date.isnull())))
.where(stock_ledger_entry.is_cancelled == 0)
.where(
(stock_ledger_entry.item_code == filters.get("item_code"))
& (batch_table.disabled == 0)
& (stock_ledger_entry.serial_and_batch_bundle.isnotnull())
)
.groupby(bundle.batch_no, bundle.warehouse)
)
bundle_query = bundle_query.select(
Concat("MFG-", batch_table.manufacturing_date),
Concat("EXP-", batch_table.expiry_date),
)
if filters.get("warehouse"):
bundle_query = bundle_query.where(stock_ledger_entry.warehouse == filters.get("warehouse"))
for field in searchfields:
bundle_query = bundle_query.select(batch_table[field])
if txt:
txt_condition = batch_table.name.like(txt)
for field in searchfields + ["name"]:
txt_condition |= batch_table[field].like(txt)
bundle_query = bundle_query.where(txt_condition)
return bundle_query
@frappe.whitelist()