feat: Show Ledger view for Purchase & Sales Register (#38801)

* feat: fetch payments and journals

* refactor: use single qb query for PE and PI

* feat: fetch PE along with SI

* refactor: move repeating code to common controller

* fix: fetch cost center for PE

* feat: fetch JV with PE

* fix: validate party filter for fetching payments

* fix: linting issues

* refactor: use qb to fetch PE JV and Inv

* refactor: move fn to fetch advance taxes to utils & use qb

* fix: filtering through accounting dimensions

* chore: remove debugging print statements

* fix: modify rows and columns for ledger view

* refactor: filter accounting dimensions using qb

* fix: show additional table cols from india compliance api call

* chore: fix typo

* test: purchase register and ledger view

* fix: filter by party in opening row calculation

* fix: exclude cancelled gl entries for opening balance

* chore: change column format for report

* fix: check gl entry status using is_cancelled

* fix: running balance after sorting

* fix: gst itemised registers for india compliance api call

* fix: additional query cols for gst itemised registers

* fix: additional query cols for sales register

* fix: PE in sales register

* fix: tax accounts in sales register

* fix: Sales/Purchase register showing duplicate records

* fix: avoid fetching advance account for party

* refactor: remove unnecessary condition

* fix: remove checking for advance accounts in payments

---------

Co-authored-by: Deepesh Garg <deepeshgarg6@gmail.com>
This commit is contained in:
Gursheen Kaur Anand
2023-12-29 13:02:17 +05:30
committed by GitHub
parent a2cba1bf23
commit 04fb215ff5
8 changed files with 982 additions and 387 deletions

View File

@@ -309,7 +309,8 @@ def get_conditions(filters):
def get_items(filters, additional_query_columns): def get_items(filters, additional_query_columns):
conditions = get_conditions(filters) conditions = get_conditions(filters)
if additional_query_columns:
additional_query_columns = "," + ",".join(additional_query_columns)
return frappe.db.sql( return frappe.db.sql(
""" """
select select

View File

@@ -381,7 +381,8 @@ def get_group_by_conditions(filters, doctype):
def get_items(filters, additional_query_columns, additional_conditions=None): def get_items(filters, additional_query_columns, additional_conditions=None):
conditions = get_conditions(filters, additional_conditions) conditions = get_conditions(filters, additional_conditions)
if additional_query_columns:
additional_query_columns = "," + ",".join(additional_query_columns)
return frappe.db.sql( return frappe.db.sql(
""" """
select select

View File

@@ -52,6 +52,12 @@ frappe.query_reports["Purchase Register"] = {
"label": __("Item Group"), "label": __("Item Group"),
"fieldtype": "Link", "fieldtype": "Link",
"options": "Item Group" "options": "Item Group"
},
{
"fieldname": "include_payments",
"label": __("Show Ledger View"),
"fieldtype": "Check",
"default": 0
} }
] ]
} }

View File

@@ -4,13 +4,22 @@
import frappe import frappe
from frappe import _, msgprint from frappe import _, msgprint
from frappe.utils import flt from frappe.query_builder.custom import ConstantColumn
from frappe.utils import flt, getdate
from pypika import Order
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import ( from erpnext.accounts.party import get_party_account
get_accounting_dimensions, from erpnext.accounts.report.utils import (
get_dimension_with_children, apply_common_conditions,
get_advance_taxes_and_charges,
get_journal_entries,
get_opening_row,
get_party_details,
get_payment_entries,
get_query_columns,
get_taxes_query,
get_values_for_columns,
) )
from erpnext.accounts.report.utils import get_query_columns, get_values_for_columns
def execute(filters=None): def execute(filters=None):
@@ -21,9 +30,15 @@ def _execute(filters=None, additional_table_columns=None):
if not filters: if not filters:
filters = {} filters = {}
include_payments = filters.get("include_payments")
if filters.get("include_payments") and not filters.get("supplier"):
frappe.throw(_("Please select a supplier for fetching payments."))
invoice_list = get_invoices(filters, get_query_columns(additional_table_columns)) invoice_list = get_invoices(filters, get_query_columns(additional_table_columns))
if filters.get("include_payments"):
invoice_list += get_payments(filters)
columns, expense_accounts, tax_accounts, unrealized_profit_loss_accounts = get_columns( columns, expense_accounts, tax_accounts, unrealized_profit_loss_accounts = get_columns(
invoice_list, additional_table_columns invoice_list, additional_table_columns, include_payments
) )
if not invoice_list: if not invoice_list:
@@ -33,14 +48,28 @@ def _execute(filters=None, additional_table_columns=None):
invoice_expense_map = get_invoice_expense_map(invoice_list) invoice_expense_map = get_invoice_expense_map(invoice_list)
internal_invoice_map = get_internal_invoice_map(invoice_list) internal_invoice_map = get_internal_invoice_map(invoice_list)
invoice_expense_map, invoice_tax_map = get_invoice_tax_map( invoice_expense_map, invoice_tax_map = get_invoice_tax_map(
invoice_list, invoice_expense_map, expense_accounts invoice_list, invoice_expense_map, expense_accounts, include_payments
) )
invoice_po_pr_map = get_invoice_po_pr_map(invoice_list) invoice_po_pr_map = get_invoice_po_pr_map(invoice_list)
suppliers = list(set(d.supplier for d in invoice_list)) suppliers = list(set(d.supplier for d in invoice_list))
supplier_details = get_supplier_details(suppliers) supplier_details = get_party_details("Supplier", suppliers)
company_currency = frappe.get_cached_value("Company", filters.company, "default_currency") company_currency = frappe.get_cached_value("Company", filters.company, "default_currency")
res = []
if include_payments:
opening_row = get_opening_row(
"Supplier", filters.supplier, getdate(filters.from_date), filters.company
)[0]
res.append(
{
"payable_account": opening_row.account,
"debit": flt(opening_row.debit),
"credit": flt(opening_row.credit),
"balance": flt(opening_row.balance),
}
)
data = [] data = []
for inv in invoice_list: for inv in invoice_list:
# invoice details # invoice details
@@ -48,24 +77,23 @@ def _execute(filters=None, additional_table_columns=None):
purchase_receipt = list(set(invoice_po_pr_map.get(inv.name, {}).get("purchase_receipt", []))) purchase_receipt = list(set(invoice_po_pr_map.get(inv.name, {}).get("purchase_receipt", [])))
project = list(set(invoice_po_pr_map.get(inv.name, {}).get("project", []))) project = list(set(invoice_po_pr_map.get(inv.name, {}).get("project", [])))
row = [ row = {
inv.name, "voucher_type": inv.doctype,
inv.posting_date, "voucher_no": inv.name,
inv.supplier, "posting_date": inv.posting_date,
inv.supplier_name, "supplier_id": inv.supplier,
*get_values_for_columns(additional_table_columns, inv).values(), "supplier_name": inv.supplier_name,
supplier_details.get(inv.supplier), # supplier_group **get_values_for_columns(additional_table_columns, inv),
inv.tax_id, "supplier_group": supplier_details.get(inv.supplier).get("supplier_group"), # supplier_group
inv.credit_to, "tax_id": supplier_details.get(inv.supplier).get("tax_id"),
inv.mode_of_payment, "payable_account": inv.credit_to,
", ".join(project), "mode_of_payment": inv.mode_of_payment,
inv.bill_no, "project": ", ".join(project) if inv.doctype == "Purchase Invoice" else inv.project,
inv.bill_date, "remarks": inv.remarks,
inv.remarks, "purchase_order": ", ".join(purchase_order),
", ".join(purchase_order), "purchase_receipt": ", ".join(purchase_receipt),
", ".join(purchase_receipt), "currency": company_currency,
company_currency, }
]
# map expense values # map expense values
base_net_total = 0 base_net_total = 0
@@ -75,14 +103,16 @@ def _execute(filters=None, additional_table_columns=None):
else: else:
expense_amount = flt(invoice_expense_map.get(inv.name, {}).get(expense_acc)) expense_amount = flt(invoice_expense_map.get(inv.name, {}).get(expense_acc))
base_net_total += expense_amount base_net_total += expense_amount
row.append(expense_amount) row.update({frappe.scrub(expense_acc): expense_amount})
# Add amount in unrealized account # Add amount in unrealized account
for account in unrealized_profit_loss_accounts: for account in unrealized_profit_loss_accounts:
row.append(flt(internal_invoice_map.get((inv.name, account)))) row.update(
{frappe.scrub(account + "_unrealized"): flt(internal_invoice_map.get((inv.name, account)))}
)
# net total # net total
row.append(base_net_total or inv.base_net_total) row.update({"net_total": base_net_total or inv.base_net_total})
# tax account # tax account
total_tax = 0 total_tax = 0
@@ -90,45 +120,190 @@ def _execute(filters=None, additional_table_columns=None):
if tax_acc not in expense_accounts: if tax_acc not in expense_accounts:
tax_amount = flt(invoice_tax_map.get(inv.name, {}).get(tax_acc)) tax_amount = flt(invoice_tax_map.get(inv.name, {}).get(tax_acc))
total_tax += tax_amount total_tax += tax_amount
row.append(tax_amount) row.update({frappe.scrub(tax_acc): tax_amount})
# total tax, grand total, rounded total & outstanding amount # total tax, grand total, rounded total & outstanding amount
row += [total_tax, inv.base_grand_total, flt(inv.base_grand_total, 0), inv.outstanding_amount] row.update(
{
"total_tax": total_tax,
"grand_total": inv.base_grand_total,
"rounded_total": inv.base_rounded_total,
"outstanding_amount": inv.outstanding_amount,
}
)
if inv.doctype == "Purchase Invoice":
row.update({"debit": inv.base_grand_total, "credit": 0.0})
else:
row.update({"debit": 0.0, "credit": inv.base_grand_total})
data.append(row) data.append(row)
return columns, data res += sorted(data, key=lambda x: x["posting_date"])
if include_payments:
running_balance = flt(opening_row.balance)
for row in range(1, len(res)):
running_balance += res[row]["debit"] - res[row]["credit"]
res[row].update({"balance": running_balance})
return columns, res, None, None, None, include_payments
def get_columns(invoice_list, additional_table_columns): def get_columns(invoice_list, additional_table_columns, include_payments=False):
"""return columns based on filters""" """return columns based on filters"""
columns = [ columns = [
_("Invoice") + ":Link/Purchase Invoice:120", {
_("Posting Date") + ":Date:80", "label": _("Voucher Type"),
_("Supplier Id") + "::120", "fieldname": "voucher_type",
_("Supplier Name") + "::120", "width": 120,
},
{
"label": _("Voucher"),
"fieldname": "voucher_no",
"fieldtype": "Dynamic Link",
"options": "voucher_type",
"width": 120,
},
{"label": _("Posting Date"), "fieldname": "posting_date", "fieldtype": "Date", "width": 80},
{
"label": _("Supplier"),
"fieldname": "supplier_id",
"fieldtype": "Link",
"options": "Supplier",
"width": 120,
},
{"label": _("Supplier Name"), "fieldname": "supplier_name", "fieldtype": "Data", "width": 120},
] ]
if additional_table_columns: if additional_table_columns and not include_payments:
columns += additional_table_columns columns += additional_table_columns
columns += [ if not include_payments:
_("Supplier Group") + ":Link/Supplier Group:120", columns += [
_("Tax Id") + "::80", {
_("Payable Account") + ":Link/Account:120", "label": _("Supplier Group"),
_("Mode of Payment") + ":Link/Mode of Payment:80", "fieldname": "supplier_group",
_("Project") + ":Link/Project:80", "fieldtype": "Link",
_("Bill No") + "::120", "options": "Supplier Group",
_("Bill Date") + ":Date:80", "width": 120,
_("Remarks") + "::150", },
_("Purchase Order") + ":Link/Purchase Order:100", {"label": _("Tax Id"), "fieldname": "tax_id", "fieldtype": "Data", "width": 80},
_("Purchase Receipt") + ":Link/Purchase Receipt:100", {
{"fieldname": "currency", "label": _("Currency"), "fieldtype": "Data", "width": 80}, "label": _("Payable Account"),
] "fieldname": "payable_account",
"fieldtype": "Link",
"options": "Account",
"width": 100,
},
{
"label": _("Mode Of Payment"),
"fieldname": "mode_of_payment",
"fieldtype": "Data",
"width": 120,
},
{
"label": _("Project"),
"fieldname": "project",
"fieldtype": "Link",
"options": "Project",
"width": 80,
},
{"label": _("Bill No"), "fieldname": "bill_no", "fieldtype": "Data", "width": 120},
{"label": _("Bill Date"), "fieldname": "bill_date", "fieldtype": "Date", "width": 80},
{
"label": _("Purchase Order"),
"fieldname": "purchase_order",
"fieldtype": "Link",
"options": "Purchase Order",
"width": 100,
},
{
"label": _("Purchase Receipt"),
"fieldname": "purchase_receipt",
"fieldtype": "Link",
"options": "Purchase Receipt",
"width": 100,
},
{"fieldname": "currency", "label": _("Currency"), "fieldtype": "Data", "width": 80},
]
else:
columns += [
{
"fieldname": "payable_account",
"label": _("Payable Account"),
"fieldtype": "Link",
"options": "Account",
"width": 120,
},
{"fieldname": "debit", "label": _("Debit"), "fieldtype": "Currency", "width": 120},
{"fieldname": "credit", "label": _("Credit"), "fieldtype": "Currency", "width": 120},
{"fieldname": "balance", "label": _("Balance"), "fieldtype": "Currency", "width": 120},
]
account_columns, accounts = get_account_columns(invoice_list, include_payments)
columns = (
columns
+ account_columns[0]
+ account_columns[1]
+ [
{
"label": _("Net Total"),
"fieldname": "net_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
}
]
+ account_columns[2]
+ [
{
"label": _("Total Tax"),
"fieldname": "total_tax",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
}
]
)
if not include_payments:
columns += [
{
"label": _("Grand Total"),
"fieldname": "grand_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Rounded Total"),
"fieldname": "rounded_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Outstanding Amount"),
"fieldname": "outstanding_amount",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
]
columns += [{"label": _("Remarks"), "fieldname": "remarks", "fieldtype": "Data", "width": 120}]
return columns, accounts[0], accounts[2], accounts[1]
def get_account_columns(invoice_list, include_payments):
expense_accounts = [] expense_accounts = []
tax_accounts = [] tax_accounts = []
unrealized_profit_loss_accounts = [] unrealized_profit_loss_accounts = []
expense_columns = []
tax_columns = []
unrealized_profit_loss_account_columns = []
if invoice_list: if invoice_list:
expense_accounts = frappe.db.sql_list( expense_accounts = frappe.db.sql_list(
"""select distinct expense_account """select distinct expense_account
@@ -139,15 +314,18 @@ def get_columns(invoice_list, additional_table_columns):
tuple([inv.name for inv in invoice_list]), tuple([inv.name for inv in invoice_list]),
) )
tax_accounts = frappe.db.sql_list( purchase_taxes_query = get_taxes_query(
"""select distinct account_head invoice_list, "Purchase Taxes and Charges", "Purchase Invoice"
from `tabPurchase Taxes and Charges` where parenttype = 'Purchase Invoice'
and docstatus = 1 and (account_head is not null and account_head != '')
and category in ('Total', 'Valuation and Total')
and parent in (%s) order by account_head"""
% ", ".join(["%s"] * len(invoice_list)),
tuple(inv.name for inv in invoice_list),
) )
purchase_tax_accounts = purchase_taxes_query.run(as_dict=True, pluck="account_head")
tax_accounts = purchase_tax_accounts
if include_payments:
advance_taxes_query = get_taxes_query(
invoice_list, "Advance Taxes and Charges", "Payment Entry"
)
advance_tax_accounts = advance_taxes_query.run(as_dict=True, pluck="account_head")
tax_accounts = set(tax_accounts + advance_tax_accounts)
unrealized_profit_loss_accounts = frappe.db.sql_list( unrealized_profit_loss_accounts = frappe.db.sql_list(
"""SELECT distinct unrealized_profit_loss_account """SELECT distinct unrealized_profit_loss_account
@@ -158,108 +336,109 @@ def get_columns(invoice_list, additional_table_columns):
tuple(inv.name for inv in invoice_list), tuple(inv.name for inv in invoice_list),
) )
expense_columns = [(account + ":Currency/currency:120") for account in expense_accounts] for account in expense_accounts:
unrealized_profit_loss_account_columns = [ expense_columns.append(
(account + ":Currency/currency:120") for account in unrealized_profit_loss_accounts {
] "label": account,
tax_columns = [ "fieldname": frappe.scrub(account),
(account + ":Currency/currency:120") "fieldtype": "Currency",
for account in tax_accounts "options": "currency",
if account not in expense_accounts "width": 120,
] }
)
columns = ( for account in tax_accounts:
columns if account not in expense_accounts:
+ expense_columns tax_columns.append(
+ unrealized_profit_loss_account_columns {
+ [_("Net Total") + ":Currency/currency:120"] "label": account,
+ tax_columns "fieldname": frappe.scrub(account),
+ [ "fieldtype": "Currency",
_("Total Tax") + ":Currency/currency:120", "options": "currency",
_("Grand Total") + ":Currency/currency:120", "width": 120,
_("Rounded Total") + ":Currency/currency:120", }
_("Outstanding Amount") + ":Currency/currency:120", )
]
)
return columns, expense_accounts, tax_accounts, unrealized_profit_loss_accounts for account in unrealized_profit_loss_accounts:
unrealized_profit_loss_account_columns.append(
{
"label": account,
"fieldname": frappe.scrub(account),
"fieldtype": "Currency",
"options": "currency",
"width": 120,
}
)
columns = [expense_columns, unrealized_profit_loss_account_columns, tax_columns]
accounts = [expense_accounts, unrealized_profit_loss_accounts, tax_accounts]
def get_conditions(filters): return columns, accounts
conditions = ""
if filters.get("company"):
conditions += " and company=%(company)s"
if filters.get("supplier"):
conditions += " and supplier = %(supplier)s"
if filters.get("from_date"):
conditions += " and posting_date>=%(from_date)s"
if filters.get("to_date"):
conditions += " and posting_date<=%(to_date)s"
if filters.get("mode_of_payment"):
conditions += " and ifnull(mode_of_payment, '') = %(mode_of_payment)s"
if filters.get("cost_center"):
conditions += """ and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
and ifnull(`tabPurchase Invoice Item`.cost_center, '') = %(cost_center)s)"""
if filters.get("warehouse"):
conditions += """ and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
and ifnull(`tabPurchase Invoice Item`.warehouse, '') = %(warehouse)s)"""
if filters.get("item_group"):
conditions += """ and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
and ifnull(`tabPurchase Invoice Item`.item_group, '') = %(item_group)s)"""
accounting_dimensions = get_accounting_dimensions(as_list=False)
if accounting_dimensions:
common_condition = """
and exists(select name from `tabPurchase Invoice Item`
where parent=`tabPurchase Invoice`.name
"""
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
conditions += (
common_condition
+ "and ifnull(`tabPurchase Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
else:
conditions += (
common_condition
+ "and ifnull(`tabPurchase Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
return conditions
def get_invoices(filters, additional_query_columns): def get_invoices(filters, additional_query_columns):
conditions = get_conditions(filters) pi = frappe.qb.DocType("Purchase Invoice")
return frappe.db.sql( query = (
""" frappe.qb.from_(pi)
select .select(
name, posting_date, credit_to, supplier, supplier_name, tax_id, bill_no, bill_date, ConstantColumn("Purchase Invoice").as_("doctype"),
remarks, base_net_total, base_grand_total, outstanding_amount, pi.name,
mode_of_payment {0} pi.posting_date,
from `tabPurchase Invoice` pi.credit_to,
where docstatus = 1 {1} pi.supplier,
order by posting_date desc, name desc""".format( pi.supplier_name,
additional_query_columns, conditions pi.tax_id,
), pi.bill_no,
filters, pi.bill_date,
as_dict=1, pi.remarks,
pi.base_net_total,
pi.base_grand_total,
pi.base_rounded_total,
pi.outstanding_amount,
pi.mode_of_payment,
)
.where((pi.docstatus == 1))
.orderby(pi.posting_date, pi.name, order=Order.desc)
) )
if additional_query_columns:
for col in additional_query_columns:
query = query.select(col)
if filters.get("supplier"):
query = query.where(pi.supplier == filters.supplier)
query = get_conditions(filters, query, "Purchase Invoice")
query = apply_common_conditions(
filters, query, doctype="Purchase Invoice", child_doctype="Purchase Invoice Item"
)
invoices = query.run(as_dict=True)
return invoices
def get_conditions(filters, query, doctype):
parent_doc = frappe.qb.DocType(doctype)
if filters.get("mode_of_payment"):
query = query.where(parent_doc.mode_of_payment == filters.mode_of_payment)
return query
def get_payments(filters):
args = frappe._dict(
account="credit_to",
account_fieldname="paid_to",
party="supplier",
party_name="supplier_name",
party_account=get_party_account("Supplier", filters.supplier, filters.company),
)
payment_entries = get_payment_entries(filters, args)
journal_entries = get_journal_entries(filters, args)
return payment_entries + journal_entries
def get_invoice_expense_map(invoice_list): def get_invoice_expense_map(invoice_list):
expense_details = frappe.db.sql( expense_details = frappe.db.sql(
@@ -300,7 +479,9 @@ def get_internal_invoice_map(invoice_list):
return internal_invoice_map return internal_invoice_map
def get_invoice_tax_map(invoice_list, invoice_expense_map, expense_accounts): def get_invoice_tax_map(
invoice_list, invoice_expense_map, expense_accounts, include_payments=False
):
tax_details = frappe.db.sql( tax_details = frappe.db.sql(
""" """
select parent, account_head, case add_deduct_tax when "Add" then sum(base_tax_amount_after_discount_amount) select parent, account_head, case add_deduct_tax when "Add" then sum(base_tax_amount_after_discount_amount)
@@ -315,6 +496,9 @@ def get_invoice_tax_map(invoice_list, invoice_expense_map, expense_accounts):
as_dict=1, as_dict=1,
) )
if include_payments:
tax_details += get_advance_taxes_and_charges(invoice_list)
invoice_tax_map = {} invoice_tax_map = {}
for d in tax_details: for d in tax_details:
if d.account_head in expense_accounts: if d.account_head in expense_accounts:
@@ -382,17 +566,3 @@ def get_account_details(invoice_list):
account_map[acc.name] = acc.parent_account account_map[acc.name] = acc.parent_account
return account_map return account_map
def get_supplier_details(suppliers):
supplier_details = {}
for supp in frappe.db.sql(
"""select name, supplier_group from `tabSupplier`
where name in (%s)"""
% ", ".join(["%s"] * len(suppliers)),
tuple(suppliers),
as_dict=1,
):
supplier_details.setdefault(supp.name, supp.supplier_group)
return supplier_details

View File

@@ -0,0 +1,128 @@
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
import frappe
from frappe.tests.utils import FrappeTestCase
from frappe.utils import add_months, getdate, today
from erpnext.accounts.report.purchase_register.purchase_register import execute
class TestPurchaseRegister(FrappeTestCase):
def test_purchase_register(self):
frappe.db.sql("delete from `tabPurchase Invoice` where company='_Test Company 6'")
frappe.db.sql("delete from `tabGL Entry` where company='_Test Company 6'")
filters = frappe._dict(
company="_Test Company 6", from_date=add_months(today(), -1), to_date=today()
)
pi = make_purchase_invoice()
report_results = execute(filters)
first_row = frappe._dict(report_results[1][0])
self.assertEqual(first_row.voucher_type, "Purchase Invoice")
self.assertEqual(first_row.voucher_no, pi.name)
self.assertEqual(first_row.payable_account, "Creditors - _TC6")
self.assertEqual(first_row.net_total, 1000)
self.assertEqual(first_row.total_tax, 100)
self.assertEqual(first_row.grand_total, 1100)
def test_purchase_register_ledger_view(self):
frappe.db.sql("delete from `tabPurchase Invoice` where company='_Test Company 6'")
frappe.db.sql("delete from `tabGL Entry` where company='_Test Company 6'")
filters = frappe._dict(
company="_Test Company 6",
from_date=add_months(today(), -1),
to_date=today(),
include_payments=True,
supplier="_Test Supplier",
)
pi = make_purchase_invoice()
pe = make_payment_entry()
report_results = execute(filters)
first_row = frappe._dict(report_results[1][2])
self.assertEqual(first_row.voucher_type, "Payment Entry")
self.assertEqual(first_row.voucher_no, pe.name)
self.assertEqual(first_row.payable_account, "Creditors - _TC6")
self.assertEqual(first_row.debit, 0)
self.assertEqual(first_row.credit, 600)
self.assertEqual(first_row.balance, 500)
def make_purchase_invoice():
from erpnext.accounts.doctype.account.test_account import create_account
from erpnext.accounts.doctype.cost_center.test_cost_center import create_cost_center
from erpnext.stock.doctype.warehouse.test_warehouse import create_warehouse
gst_acc = create_account(
account_name="GST",
account_type="Tax",
parent_account="Duties and Taxes - _TC6",
company="_Test Company 6",
account_currency="INR",
)
create_warehouse(warehouse_name="_Test Warehouse - _TC6", company="_Test Company 6")
create_cost_center(cost_center_name="_Test Cost Center", company="_Test Company 6")
pi = create_purchase_invoice_with_taxes()
pi.submit()
return pi
def create_purchase_invoice_with_taxes():
return frappe.get_doc(
{
"doctype": "Purchase Invoice",
"posting_date": today(),
"supplier": "_Test Supplier",
"company": "_Test Company 6",
"cost_center": "_Test Cost Center - _TC6",
"taxes_and_charges": "",
"currency": "INR",
"credit_to": "Creditors - _TC6",
"items": [
{
"doctype": "Purchase Invoice Item",
"cost_center": "_Test Cost Center - _TC6",
"item_code": "_Test Item",
"qty": 1,
"rate": 1000,
"expense_account": "Stock Received But Not Billed - _TC6",
}
],
"taxes": [
{
"account_head": "GST - _TC6",
"cost_center": "_Test Cost Center - _TC6",
"add_deduct_tax": "Add",
"category": "Valuation and Total",
"charge_type": "Actual",
"description": "Shipping Charges",
"doctype": "Purchase Taxes and Charges",
"parentfield": "taxes",
"rate": 100,
"tax_amount": 100.0,
}
],
}
)
def make_payment_entry():
frappe.set_user("Administrator")
from erpnext.accounts.doctype.payment_entry.test_payment_entry import create_payment_entry
return create_payment_entry(
company="_Test Company 6",
party_type="Supplier",
party="_Test Supplier",
payment_type="Pay",
paid_from="Cash - _TC6",
paid_to="Creditors - _TC6",
paid_amount=600,
save=1,
submit=1,
)

View File

@@ -64,6 +64,12 @@ frappe.query_reports["Sales Register"] = {
"label": __("Item Group"), "label": __("Item Group"),
"fieldtype": "Link", "fieldtype": "Link",
"options": "Item Group" "options": "Item Group"
},
{
"fieldname": "include_payments",
"label": __("Show Ledger View"),
"fieldtype": "Check",
"default": 0
} }
] ]
} }

View File

@@ -5,13 +5,22 @@
import frappe import frappe
from frappe import _, msgprint from frappe import _, msgprint
from frappe.model.meta import get_field_precision from frappe.model.meta import get_field_precision
from frappe.utils import flt from frappe.query_builder.custom import ConstantColumn
from frappe.utils import flt, getdate
from pypika import Order
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import ( from erpnext.accounts.party import get_party_account
get_accounting_dimensions, from erpnext.accounts.report.utils import (
get_dimension_with_children, apply_common_conditions,
get_advance_taxes_and_charges,
get_journal_entries,
get_opening_row,
get_party_details,
get_payment_entries,
get_query_columns,
get_taxes_query,
get_values_for_columns,
) )
from erpnext.accounts.report.utils import get_query_columns, get_values_for_columns
def execute(filters=None): def execute(filters=None):
@@ -22,9 +31,15 @@ def _execute(filters, additional_table_columns=None):
if not filters: if not filters:
filters = frappe._dict({}) filters = frappe._dict({})
include_payments = filters.get("include_payments")
if filters.get("include_payments") and not filters.get("customer"):
frappe.throw(_("Please select a customer for fetching payments."))
invoice_list = get_invoices(filters, get_query_columns(additional_table_columns)) invoice_list = get_invoices(filters, get_query_columns(additional_table_columns))
columns, income_accounts, tax_accounts, unrealized_profit_loss_accounts = get_columns( if filters.get("include_payments"):
invoice_list, additional_table_columns invoice_list += get_payments(filters)
columns, income_accounts, unrealized_profit_loss_accounts, tax_accounts = get_columns(
invoice_list, additional_table_columns, include_payments
) )
if not invoice_list: if not invoice_list:
@@ -34,13 +49,29 @@ def _execute(filters, additional_table_columns=None):
invoice_income_map = get_invoice_income_map(invoice_list) invoice_income_map = get_invoice_income_map(invoice_list)
internal_invoice_map = get_internal_invoice_map(invoice_list) internal_invoice_map = get_internal_invoice_map(invoice_list)
invoice_income_map, invoice_tax_map = get_invoice_tax_map( invoice_income_map, invoice_tax_map = get_invoice_tax_map(
invoice_list, invoice_income_map, income_accounts invoice_list, invoice_income_map, income_accounts, include_payments
) )
# Cost Center & Warehouse Map # Cost Center & Warehouse Map
invoice_cc_wh_map = get_invoice_cc_wh_map(invoice_list) invoice_cc_wh_map = get_invoice_cc_wh_map(invoice_list)
invoice_so_dn_map = get_invoice_so_dn_map(invoice_list) invoice_so_dn_map = get_invoice_so_dn_map(invoice_list)
company_currency = frappe.get_cached_value("Company", filters.get("company"), "default_currency") company_currency = frappe.get_cached_value("Company", filters.get("company"), "default_currency")
mode_of_payments = get_mode_of_payments([inv.name for inv in invoice_list]) mode_of_payments = get_mode_of_payments([inv.name for inv in invoice_list])
customers = list(set(d.customer for d in invoice_list))
customer_details = get_party_details("Customer", customers)
res = []
if include_payments:
opening_row = get_opening_row(
"Customer", filters.customer, getdate(filters.from_date), filters.company
)[0]
res.append(
{
"receivable_account": opening_row.account,
"debit": flt(opening_row.debit),
"credit": flt(opening_row.credit),
"balance": flt(opening_row.balance),
}
)
data = [] data = []
for inv in invoice_list: for inv in invoice_list:
@@ -51,14 +82,15 @@ def _execute(filters, additional_table_columns=None):
warehouse = list(set(invoice_cc_wh_map.get(inv.name, {}).get("warehouse", []))) warehouse = list(set(invoice_cc_wh_map.get(inv.name, {}).get("warehouse", [])))
row = { row = {
"invoice": inv.name, "voucher_type": inv.doctype,
"voucher_no": inv.name,
"posting_date": inv.posting_date, "posting_date": inv.posting_date,
"customer": inv.customer, "customer": inv.customer,
"customer_name": inv.customer_name, "customer_name": inv.customer_name,
**get_values_for_columns(additional_table_columns, inv), **get_values_for_columns(additional_table_columns, inv),
"customer_group": inv.get("customer_group"), "customer_group": customer_details.get(inv.customer).get("customer_group"),
"territory": inv.get("territory"), "territory": customer_details.get(inv.customer).get("territory"),
"tax_id": inv.get("tax_id"), "tax_id": customer_details.get(inv.customer).get("tax_id"),
"receivable_account": inv.debit_to, "receivable_account": inv.debit_to,
"mode_of_payment": ", ".join(mode_of_payments.get(inv.name, [])), "mode_of_payment": ", ".join(mode_of_payments.get(inv.name, [])),
"project": inv.project, "project": inv.project,
@@ -66,7 +98,7 @@ def _execute(filters, additional_table_columns=None):
"remarks": inv.remarks, "remarks": inv.remarks,
"sales_order": ", ".join(sales_order), "sales_order": ", ".join(sales_order),
"delivery_note": ", ".join(delivery_note), "delivery_note": ", ".join(delivery_note),
"cost_center": ", ".join(cost_center), "cost_center": ", ".join(cost_center) if inv.doctype == "Sales Invoice" else inv.cost_center,
"warehouse": ", ".join(warehouse), "warehouse": ", ".join(warehouse),
"currency": company_currency, "currency": company_currency,
} }
@@ -116,19 +148,36 @@ def _execute(filters, additional_table_columns=None):
} }
) )
if inv.doctype == "Sales Invoice":
row.update({"debit": inv.base_grand_total, "credit": 0.0})
else:
row.update({"debit": 0.0, "credit": inv.base_grand_total})
data.append(row) data.append(row)
return columns, data res += sorted(data, key=lambda x: x["posting_date"])
if include_payments:
running_balance = flt(opening_row.balance)
for row in range(1, len(res)):
running_balance += res[row]["debit"] - res[row]["credit"]
res[row].update({"balance": running_balance})
return columns, res, None, None, None, include_payments
def get_columns(invoice_list, additional_table_columns): def get_columns(invoice_list, additional_table_columns, include_payments=False):
"""return columns based on filters""" """return columns based on filters"""
columns = [ columns = [
{ {
"label": _("Invoice"), "label": _("Voucher Type"),
"fieldname": "invoice", "fieldname": "voucher_type",
"fieldtype": "Link", "width": 120,
"options": "Sales Invoice", },
{
"label": _("Voucher"),
"fieldname": "voucher_no",
"fieldtype": "Dynamic Link",
"options": "voucher_type",
"width": 120, "width": 120,
}, },
{"label": _("Posting Date"), "fieldname": "posting_date", "fieldtype": "Date", "width": 80}, {"label": _("Posting Date"), "fieldname": "posting_date", "fieldtype": "Date", "width": 80},
@@ -142,83 +191,156 @@ def get_columns(invoice_list, additional_table_columns):
{"label": _("Customer Name"), "fieldname": "customer_name", "fieldtype": "Data", "width": 120}, {"label": _("Customer Name"), "fieldname": "customer_name", "fieldtype": "Data", "width": 120},
] ]
if additional_table_columns: if additional_table_columns and not include_payments:
columns += additional_table_columns columns += additional_table_columns
columns += [ if not include_payments:
columns += [
{
"label": _("Customer Group"),
"fieldname": "customer_group",
"fieldtype": "Link",
"options": "Customer Group",
"width": 120,
},
{
"label": _("Territory"),
"fieldname": "territory",
"fieldtype": "Link",
"options": "Territory",
"width": 80,
},
{"label": _("Tax Id"), "fieldname": "tax_id", "fieldtype": "Data", "width": 80},
{
"label": _("Receivable Account"),
"fieldname": "receivable_account",
"fieldtype": "Link",
"options": "Account",
"width": 100,
},
{
"label": _("Mode Of Payment"),
"fieldname": "mode_of_payment",
"fieldtype": "Data",
"width": 120,
},
{
"label": _("Project"),
"fieldname": "project",
"fieldtype": "Link",
"options": "Project",
"width": 80,
},
{"label": _("Owner"), "fieldname": "owner", "fieldtype": "Data", "width": 100},
{
"label": _("Sales Order"),
"fieldname": "sales_order",
"fieldtype": "Link",
"options": "Sales Order",
"width": 100,
},
{
"label": _("Delivery Note"),
"fieldname": "delivery_note",
"fieldtype": "Link",
"options": "Delivery Note",
"width": 100,
},
{
"label": _("Cost Center"),
"fieldname": "cost_center",
"fieldtype": "Link",
"options": "Cost Center",
"width": 100,
},
{
"label": _("Warehouse"),
"fieldname": "warehouse",
"fieldtype": "Link",
"options": "Warehouse",
"width": 100,
},
{"fieldname": "currency", "label": _("Currency"), "fieldtype": "Data", "width": 80},
]
else:
columns += [
{
"fieldname": "receivable_account",
"label": _("Receivable Account"),
"fieldtype": "Link",
"options": "Account",
"width": 120,
},
{"fieldname": "debit", "label": _("Debit"), "fieldtype": "Currency", "width": 120},
{"fieldname": "credit", "label": _("Credit"), "fieldtype": "Currency", "width": 120},
{"fieldname": "balance", "label": _("Balance"), "fieldtype": "Currency", "width": 120},
]
account_columns, accounts = get_account_columns(invoice_list, include_payments)
net_total_column = [
{ {
"label": _("Customer Group"), "label": _("Net Total"),
"fieldname": "customer_group", "fieldname": "net_total",
"fieldtype": "Link", "fieldtype": "Currency",
"options": "Customer Group", "options": "currency",
"width": 120, "width": 120,
}, }
{
"label": _("Territory"),
"fieldname": "territory",
"fieldtype": "Link",
"options": "Territory",
"width": 80,
},
{"label": _("Tax Id"), "fieldname": "tax_id", "fieldtype": "Data", "width": 120},
{
"label": _("Receivable Account"),
"fieldname": "receivable_account",
"fieldtype": "Link",
"options": "Account",
"width": 80,
},
{
"label": _("Mode Of Payment"),
"fieldname": "mode_of_payment",
"fieldtype": "Data",
"width": 120,
},
{
"label": _("Project"),
"fieldname": "project",
"fieldtype": "Link",
"options": "Project",
"width": 80,
},
{"label": _("Owner"), "fieldname": "owner", "fieldtype": "Data", "width": 150},
{"label": _("Remarks"), "fieldname": "remarks", "fieldtype": "Data", "width": 150},
{
"label": _("Sales Order"),
"fieldname": "sales_order",
"fieldtype": "Link",
"options": "Sales Order",
"width": 100,
},
{
"label": _("Delivery Note"),
"fieldname": "delivery_note",
"fieldtype": "Link",
"options": "Delivery Note",
"width": 100,
},
{
"label": _("Cost Center"),
"fieldname": "cost_center",
"fieldtype": "Link",
"options": "Cost Center",
"width": 100,
},
{
"label": _("Warehouse"),
"fieldname": "warehouse",
"fieldtype": "Link",
"options": "Warehouse",
"width": 100,
},
{"fieldname": "currency", "label": _("Currency"), "fieldtype": "Data", "width": 80},
] ]
total_columns = [
{
"label": _("Tax Total"),
"fieldname": "tax_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
}
]
if not include_payments:
total_columns += [
{
"label": _("Grand Total"),
"fieldname": "grand_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Rounded Total"),
"fieldname": "rounded_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Outstanding Amount"),
"fieldname": "outstanding_amount",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
]
columns = (
columns
+ account_columns[0]
+ account_columns[2]
+ net_total_column
+ account_columns[1]
+ total_columns
)
columns += [{"label": _("Remarks"), "fieldname": "remarks", "fieldtype": "Data", "width": 150}]
return columns, accounts[0], accounts[1], accounts[2]
def get_account_columns(invoice_list, include_payments):
income_accounts = [] income_accounts = []
tax_accounts = [] tax_accounts = []
unrealized_profit_loss_accounts = []
income_columns = [] income_columns = []
tax_columns = [] tax_columns = []
unrealized_profit_loss_accounts = []
unrealized_profit_loss_account_columns = [] unrealized_profit_loss_account_columns = []
if invoice_list: if invoice_list:
@@ -230,14 +352,16 @@ def get_columns(invoice_list, additional_table_columns):
tuple(inv.name for inv in invoice_list), tuple(inv.name for inv in invoice_list),
) )
tax_accounts = frappe.db.sql_list( sales_taxes_query = get_taxes_query(invoice_list, "Sales Taxes and Charges", "Sales Invoice")
"""select distinct account_head sales_tax_accounts = sales_taxes_query.run(as_dict=True, pluck="account_head")
from `tabSales Taxes and Charges` where parenttype = 'Sales Invoice' tax_accounts = sales_tax_accounts
and docstatus = 1 and base_tax_amount_after_discount_amount != 0
and parent in (%s) order by account_head""" if include_payments:
% ", ".join(["%s"] * len(invoice_list)), advance_taxes_query = get_taxes_query(
tuple(inv.name for inv in invoice_list), invoice_list, "Advance Taxes and Charges", "Payment Entry"
) )
advance_tax_accounts = advance_taxes_query.run(as_dict=True, pluck="account_head")
tax_accounts = set(tax_accounts + advance_tax_accounts)
unrealized_profit_loss_accounts = frappe.db.sql_list( unrealized_profit_loss_accounts = frappe.db.sql_list(
"""SELECT distinct unrealized_profit_loss_account """SELECT distinct unrealized_profit_loss_account
@@ -283,134 +407,82 @@ def get_columns(invoice_list, additional_table_columns):
} }
) )
net_total_column = [ columns = [income_columns, unrealized_profit_loss_account_columns, tax_columns]
{ accounts = [income_accounts, unrealized_profit_loss_accounts, tax_accounts]
"label": _("Net Total"),
"fieldname": "net_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
}
]
total_columns = [ return columns, accounts
{
"label": _("Tax Total"),
"fieldname": "tax_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Grand Total"),
"fieldname": "grand_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Rounded Total"),
"fieldname": "rounded_total",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
{
"label": _("Outstanding Amount"),
"fieldname": "outstanding_amount",
"fieldtype": "Currency",
"options": "currency",
"width": 120,
},
]
columns = (
columns
+ income_columns
+ unrealized_profit_loss_account_columns
+ net_total_column
+ tax_columns
+ total_columns
)
return columns, income_accounts, tax_accounts, unrealized_profit_loss_accounts
def get_conditions(filters):
conditions = ""
accounting_dimensions = get_accounting_dimensions(as_list=False) or []
accounting_dimensions_list = [d.fieldname for d in accounting_dimensions]
if filters.get("company"):
conditions += " and company=%(company)s"
if filters.get("customer") and "customer" not in accounting_dimensions_list:
conditions += " and customer = %(customer)s"
if filters.get("from_date"):
conditions += " and posting_date >= %(from_date)s"
if filters.get("to_date"):
conditions += " and posting_date <= %(to_date)s"
if filters.get("owner"):
conditions += " and owner = %(owner)s"
def get_sales_invoice_item_field_condition(field, table="Sales Invoice Item") -> str:
if not filters.get(field) or field in accounting_dimensions_list:
return ""
return f""" and exists(select name from `tab{table}`
where parent=`tabSales Invoice`.name
and ifnull(`tab{table}`.{field}, '') = %({field})s)"""
conditions += get_sales_invoice_item_field_condition("mode_of_payment", "Sales Invoice Payment")
conditions += get_sales_invoice_item_field_condition("cost_center")
conditions += get_sales_invoice_item_field_condition("warehouse")
conditions += get_sales_invoice_item_field_condition("brand")
conditions += get_sales_invoice_item_field_condition("item_group")
if accounting_dimensions:
common_condition = """
and exists(select name from `tabSales Invoice Item`
where parent=`tabSales Invoice`.name
"""
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
conditions += (
common_condition
+ "and ifnull(`tabSales Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
else:
conditions += (
common_condition
+ "and ifnull(`tabSales Invoice`.{0}, '') in %({0})s)".format(dimension.fieldname)
)
return conditions
def get_invoices(filters, additional_query_columns): def get_invoices(filters, additional_query_columns):
conditions = get_conditions(filters) si = frappe.qb.DocType("Sales Invoice")
return frappe.db.sql( query = (
""" frappe.qb.from_(si)
select name, posting_date, debit_to, project, customer, .select(
customer_name, owner, remarks, territory, tax_id, customer_group, ConstantColumn("Sales Invoice").as_("doctype"),
base_net_total, base_grand_total, base_rounded_total, outstanding_amount, si.name,
is_internal_customer, represents_company, company {0} si.posting_date,
from `tabSales Invoice` si.debit_to,
where docstatus = 1 {1} si.project,
order by posting_date desc, name desc""".format( si.customer,
additional_query_columns, conditions si.customer_name,
), si.owner,
filters, si.remarks,
as_dict=1, si.territory,
si.tax_id,
si.customer_group,
si.base_net_total,
si.base_grand_total,
si.base_rounded_total,
si.outstanding_amount,
si.is_internal_customer,
si.represents_company,
si.company,
)
.where((si.docstatus == 1))
.orderby(si.posting_date, si.name, order=Order.desc)
) )
if additional_query_columns:
for col in additional_query_columns:
query = query.select(col)
if filters.get("customer"):
query = query.where(si.customer == filters.customer)
query = get_conditions(filters, query, "Sales Invoice")
query = apply_common_conditions(
filters, query, doctype="Sales Invoice", child_doctype="Sales Invoice Item"
)
invoices = query.run(as_dict=True)
return invoices
def get_conditions(filters, query, doctype):
parent_doc = frappe.qb.DocType(doctype)
if filters.get("owner"):
query = query.where(parent_doc.owner == filters.owner)
if filters.get("mode_of_payment"):
payment_doc = frappe.qb.DocType("Sales Invoice Payment")
query = query.inner_join(payment_doc).on(parent_doc.name == payment_doc.parent)
query = query.where(payment_doc.mode_of_payment == filters.mode_of_payment).distinct()
return query
def get_payments(filters):
args = frappe._dict(
account="debit_to",
account_fieldname="paid_from",
party="customer",
party_name="customer_name",
party_account=get_party_account("Customer", filters.customer, filters.company),
)
payment_entries = get_payment_entries(filters, args)
journal_entries = get_journal_entries(filters, args)
return payment_entries + journal_entries
def get_invoice_income_map(invoice_list): def get_invoice_income_map(invoice_list):
income_details = frappe.db.sql( income_details = frappe.db.sql(
@@ -447,7 +519,7 @@ def get_internal_invoice_map(invoice_list):
return internal_invoice_map return internal_invoice_map
def get_invoice_tax_map(invoice_list, invoice_income_map, income_accounts): def get_invoice_tax_map(invoice_list, invoice_income_map, income_accounts, include_payments=False):
tax_details = frappe.db.sql( tax_details = frappe.db.sql(
"""select parent, account_head, """select parent, account_head,
sum(base_tax_amount_after_discount_amount) as tax_amount sum(base_tax_amount_after_discount_amount) as tax_amount
@@ -457,6 +529,9 @@ def get_invoice_tax_map(invoice_list, invoice_income_map, income_accounts):
as_dict=1, as_dict=1,
) )
if include_payments:
tax_details += get_advance_taxes_and_charges(invoice_list)
invoice_tax_map = {} invoice_tax_map = {}
for d in tax_details: for d in tax_details:
if d.account_head in income_accounts: if d.account_head in income_accounts:

View File

@@ -1,8 +1,16 @@
import frappe import frappe
from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import Sum
from frappe.utils import flt, formatdate, get_datetime_str, get_table_name from frappe.utils import flt, formatdate, get_datetime_str, get_table_name
from pypika import Order
from erpnext import get_company_currency, get_default_company from erpnext import get_company_currency, get_default_company
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_accounting_dimensions,
get_dimension_with_children,
)
from erpnext.accounts.doctype.fiscal_year.fiscal_year import get_from_and_to_date from erpnext.accounts.doctype.fiscal_year.fiscal_year import get_from_and_to_date
from erpnext.accounts.party import get_party_account
from erpnext.setup.utils import get_exchange_rate from erpnext.setup.utils import get_exchange_rate
__exchange_rates = {} __exchange_rates = {}
@@ -165,7 +173,7 @@ def get_query_columns(report_columns):
else: else:
columns.append(fieldname) columns.append(fieldname)
return ", " + ", ".join(columns) return columns
def get_values_for_columns(report_columns, report_row): def get_values_for_columns(report_columns, report_row):
@@ -179,3 +187,203 @@ def get_values_for_columns(report_columns, report_row):
values[fieldname] = report_row.get(fieldname) values[fieldname] = report_row.get(fieldname)
return values return values
def get_party_details(party_type, party_list):
party_details = {}
party = frappe.qb.DocType(party_type)
query = frappe.qb.from_(party).select(party.name, party.tax_id).where(party.name.isin(party_list))
if party_type == "Supplier":
query = query.select(party.supplier_group)
else:
query = query.select(party.customer_group, party.territory)
party_detail_list = query.run(as_dict=True)
for party_dict in party_detail_list:
party_details[party_dict.name] = party_dict
return party_details
def get_taxes_query(invoice_list, doctype, parenttype):
taxes = frappe.qb.DocType(doctype)
query = (
frappe.qb.from_(taxes)
.select(taxes.account_head)
.distinct()
.where(
(taxes.parenttype == parenttype)
& (taxes.docstatus == 1)
& (taxes.account_head.isnotnull())
& (taxes.parent.isin([inv.name for inv in invoice_list]))
)
.orderby(taxes.account_head)
)
if doctype == "Purchase Taxes and Charges":
return query.where(taxes.category.isin(["Total", "Valuation and Total"]))
elif doctype == "Sales Taxes and Charges":
return query
return query.where(taxes.charge_type.isin(["On Paid Amount", "Actual"]))
def get_journal_entries(filters, args):
je = frappe.qb.DocType("Journal Entry")
journal_account = frappe.qb.DocType("Journal Entry Account")
query = (
frappe.qb.from_(je)
.inner_join(journal_account)
.on(je.name == journal_account.parent)
.select(
je.voucher_type.as_("doctype"),
je.name,
je.posting_date,
journal_account.account.as_(args.account),
journal_account.party.as_(args.party),
journal_account.party.as_(args.party_name),
je.bill_no,
je.bill_date,
je.remark.as_("remarks"),
je.total_amount.as_("base_net_total"),
je.total_amount.as_("base_grand_total"),
je.mode_of_payment,
journal_account.project,
)
.where(
(je.voucher_type == "Journal Entry")
& (journal_account.party == filters.get(args.party))
& (journal_account.account == args.party_account)
)
.orderby(je.posting_date, je.name, order=Order.desc)
)
query = apply_common_conditions(filters, query, doctype="Journal Entry", payments=True)
journal_entries = query.run(as_dict=True)
return journal_entries
def get_payment_entries(filters, args):
pe = frappe.qb.DocType("Payment Entry")
query = (
frappe.qb.from_(pe)
.select(
ConstantColumn("Payment Entry").as_("doctype"),
pe.name,
pe.posting_date,
pe[args.account_fieldname].as_(args.account),
pe.party.as_(args.party),
pe.party_name.as_(args.party_name),
pe.remarks,
pe.paid_amount.as_("base_net_total"),
pe.paid_amount_after_tax.as_("base_grand_total"),
pe.mode_of_payment,
pe.project,
pe.cost_center,
)
.where(
(pe.party == filters.get(args.party)) & (pe[args.account_fieldname] == args.party_account)
)
.orderby(pe.posting_date, pe.name, order=Order.desc)
)
query = apply_common_conditions(filters, query, doctype="Payment Entry", payments=True)
payment_entries = query.run(as_dict=True)
return payment_entries
def apply_common_conditions(filters, query, doctype, child_doctype=None, payments=False):
parent_doc = frappe.qb.DocType(doctype)
if child_doctype:
child_doc = frappe.qb.DocType(child_doctype)
join_required = False
if filters.get("company"):
query = query.where(parent_doc.company == filters.company)
if filters.get("from_date"):
query = query.where(parent_doc.posting_date >= filters.from_date)
if filters.get("to_date"):
query = query.where(parent_doc.posting_date <= filters.to_date)
if payments:
if filters.get("cost_center"):
query = query.where(parent_doc.cost_center == filters.cost_center)
else:
if filters.get("cost_center"):
query = query.where(child_doc.cost_center == filters.cost_center)
join_required = True
if filters.get("warehouse"):
query = query.where(child_doc.warehouse == filters.warehouse)
join_required = True
if filters.get("item_group"):
query = query.where(child_doc.item_group == filters.item_group)
join_required = True
if not payments:
if filters.get("brand"):
query = query.where(child_doc.brand == filters.brand)
join_required = True
if join_required:
query = query.inner_join(child_doc).on(parent_doc.name == child_doc.parent)
query = query.distinct()
if parent_doc.get_table_name() != "tabJournal Entry":
query = filter_invoices_based_on_dimensions(filters, query, parent_doc)
return query
def get_advance_taxes_and_charges(invoice_list):
adv_taxes = frappe.qb.DocType("Advance Taxes and Charges")
return (
frappe.qb.from_(adv_taxes)
.select(
adv_taxes.parent,
adv_taxes.account_head,
(
frappe.qb.terms.Case()
.when(adv_taxes.add_deduct_tax == "Add", Sum(adv_taxes.base_tax_amount))
.else_(Sum(adv_taxes.base_tax_amount) * -1)
).as_("tax_amount"),
)
.where(
(adv_taxes.parent.isin([inv.name for inv in invoice_list]))
& (adv_taxes.charge_type.isin(["On Paid Amount", "Actual"]))
& (adv_taxes.base_tax_amount != 0)
)
.groupby(adv_taxes.parent, adv_taxes.account_head, adv_taxes.add_deduct_tax)
).run(as_dict=True)
def filter_invoices_based_on_dimensions(filters, query, parent_doc):
accounting_dimensions = get_accounting_dimensions(as_list=False)
if accounting_dimensions:
for dimension in accounting_dimensions:
if filters.get(dimension.fieldname):
if frappe.get_cached_value("DocType", dimension.document_type, "is_tree"):
filters[dimension.fieldname] = get_dimension_with_children(
dimension.document_type, filters.get(dimension.fieldname)
)
fieldname = dimension.fieldname
query = query.where(parent_doc[fieldname] == filters.fieldname)
return query
def get_opening_row(party_type, party, from_date, company):
party_account = get_party_account(party_type, party, company)
gle = frappe.qb.DocType("GL Entry")
return (
frappe.qb.from_(gle)
.select(
ConstantColumn("Opening").as_("account"),
Sum(gle.debit).as_("debit"),
Sum(gle.credit).as_("credit"),
(Sum(gle.debit) - Sum(gle.credit)).as_("balance"),
)
.where(
(gle.account == party_account)
& (gle.party == party)
& (gle.posting_date < from_date)
& (gle.is_cancelled == 0)
)
).run(as_dict=True)