Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 14 additions & 314 deletions backend/src/api/routes/invoices.py
Original file line number Diff line number Diff line change
@@ -1,318 +1,34 @@
from io import BytesIO
from datetime import date, datetime
from collections import defaultdict

from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import case, func, or_
from sqlalchemy.orm import Session
from sqlalchemy.orm import joinedload
from decimal import Decimal, ROUND_HALF_UP
from sqlalchemy import case, func
from sqlalchemy.orm import Session, joinedload
from decimal import Decimal

import weasyprint

from fastapi import Query

from src.db.session import get_db
from src.models.buyer import Buyer as Ledger
from src.models.company_account import CompanyAccount
from src.models.company import CompanyProfile
from src.models.invoice import Invoice, InvoiceItem
from src.models.inventory import Inventory
from src.models.product import Product
from src.models.user import User
from src.schemas.invoice import InvoiceCreate, InvoiceOut, PaginatedInvoiceOut
from src.api.deps import get_active_company, get_current_user
from src.services.series import generate_next_number
from src.services.financial_year import get_active_fy, get_fy_for_date
from src.services.invoice_payments import build_invoice_payment_summaries
from src.services.pdf_templates import (
_build_invoice_html,
_build_multi_copy_invoice_html,
_copy_label,
)
from src.services.gst_tax_service import money as _money, is_interstate_supply, assign_item_tax_split, assign_invoice_tax_totals
from src.services.invoice_processor import InvoiceProcessor

router = APIRouter()




def _generate_next_number(
db: Session,
voucher_type: str,
financial_year_id: int | None = None,
invoice_date: date | None = None,
active_financial_year_id: int | None = None,
company_id: int | None = None,
) -> str:
return generate_next_number(
db,
voucher_type,
financial_year_id,
invoice_date,
active_financial_year_id,
company_id=company_id,
)


def _require_ledger(db: Session, ledger_id: int, company_id: int | None) -> Ledger:
query = db.query(Ledger).filter(Ledger.id == ledger_id)
if company_id is not None:
query = query.filter(or_(Ledger.company_id == company_id, Ledger.company_id.is_(None)))
ledger = query.first()
if not ledger:
raise HTTPException(status_code=404, detail=f"Ledger {ledger_id} not found")
return ledger


def _change_inventory_quantity(
db: Session,
product_id: int,
quantity_delta: Decimal,
*,
company_id: int | None,
context: str,
) -> None:
query = db.query(Inventory).filter(Inventory.product_id == product_id)
if company_id is not None:
query = query.filter(or_(Inventory.company_id == company_id, Inventory.company_id.is_(None)))
inventory = query.first()
if not inventory:
inventory = Inventory(company_id=company_id, product_id=product_id, quantity=0)
db.add(inventory)
db.flush()

inventory.quantity = Decimal(str(inventory.quantity or 0)) + quantity_delta
if Decimal(str(inventory.quantity or 0)) < 0:
raise HTTPException(status_code=400, detail=f"Insufficient inventory while {context}")




def _reverse_existing_invoice_inventory(db: Session, invoice: Invoice) -> None:
for item in invoice.items:
product_query = db.query(Product).filter(Product.id == item.product_id)
if invoice.company_id is not None:
product_query = product_query.filter(or_(Product.company_id == invoice.company_id, Product.company_id.is_(None)))
product = product_query.first()
if not product:
raise HTTPException(status_code=404, detail=f"Product {item.product_id} not found")
if not product.maintain_inventory:
continue

reverse_delta = item.quantity if invoice.voucher_type == "sales" else -item.quantity
_change_inventory_quantity(
db,
item.product_id,
reverse_delta,
company_id=invoice.company_id,
context=f"reversing invoice {invoice.id}",
)


def _inventory_effect_for_voucher_type(quantity: float, voucher_type: str) -> Decimal:
quantity_value = Decimal(str(quantity))
return -quantity_value if voucher_type == "sales" else quantity_value


def _apply_inventory_delta_for_invoice_update(
db: Session,
invoice: Invoice,
payload: InvoiceCreate,
*,
company_id: int | None,
) -> None:
existing_effect_by_product: dict[int, Decimal] = defaultdict(lambda: Decimal("0"))
for item in invoice.items:
existing_effect_by_product[item.product_id] += _inventory_effect_for_voucher_type(
item.quantity,
invoice.voucher_type,
)

next_effect_by_product: dict[int, Decimal] = defaultdict(lambda: Decimal("0"))
for item in payload.items:
next_effect_by_product[item.product_id] += _inventory_effect_for_voucher_type(
item.quantity,
payload.voucher_type,
)

for product_id in set(existing_effect_by_product) | set(next_effect_by_product):
quantity_delta = next_effect_by_product[product_id] - existing_effect_by_product[product_id]
if quantity_delta == 0:
continue

product_query = db.query(Product).filter(Product.id == product_id)
if company_id is not None:
product_query = product_query.filter(or_(Product.company_id == company_id, Product.company_id.is_(None)))
product = product_query.first()
if not product:
raise HTTPException(status_code=404, detail=f"Product {product_id} not found")
if not product.maintain_inventory:
continue

_change_inventory_quantity(
db,
product_id,
quantity_delta,
company_id=company_id,
context=f"editing invoice {invoice.id}",
)


def _apply_payload_to_invoice(
db: Session,
invoice: Invoice,
payload: InvoiceCreate,
active_company: CompanyProfile | None = None,
created_by: int | None = None,
financial_year_id: int | None = None,
active_financial_year_id: int | None = None,
regenerate_number: bool = True,
apply_inventory_changes: bool = True,
) -> None:
company = active_company or db.query(CompanyProfile).order_by(CompanyProfile.id.asc()).first()
company_id = company.id if company else None
ledger = _require_ledger(db, payload.ledger_id, company_id)

invoice.company_id = company_id
invoice.ledger_id = ledger.id
invoice.ledger_name = ledger.name
invoice.ledger_address = ledger.address
invoice.ledger_gst = ledger.gst
invoice.ledger_phone = ledger.phone_number
invoice.company_name = company.name if company else None
invoice.company_address = company.address if company else None
invoice.company_gst = company.gst if company else None
invoice.company_phone = company.phone_number if company else None
invoice.company_email = company.email if company else None
invoice.company_website = company.website if company else None
invoice.company_currency_code = company.currency_code if company else None
invoice.company_bank_name = company.bank_name if company else None
invoice.company_branch_name = company.branch_name if company else None
invoice.company_account_name = company.account_name if company else None
invoice.company_account_number = company.account_number if company else None
invoice.company_ifsc_code = company.ifsc_code if company else None
invoice.voucher_type = payload.voucher_type
invoice.supplier_invoice_number = payload.supplier_invoice_number
invoice.reference_notes = payload.reference_notes
if created_by is not None:
invoice.created_by = created_by
if financial_year_id is not None:
invoice.financial_year_id = financial_year_id

if payload.invoice_date is not None:
invoice.invoice_date = datetime.combine(payload.invoice_date, datetime.min.time())

invoice.due_date = datetime.combine(payload.due_date, datetime.min.time()) if payload.due_date is not None else None

invoice.tax_inclusive = payload.tax_inclusive
invoice.apply_round_off = payload.apply_round_off
if regenerate_number:
invoice.invoice_number = _generate_next_number(
db, invoice.voucher_type, financial_year_id, payload.invoice_date,
active_financial_year_id,
company_id=company_id,
)

if not payload.items:
raise HTTPException(status_code=400, detail="Invoice must have at least one line item")

interstate_supply = is_interstate_supply(invoice.company_gst, invoice.ledger_gst)

taxable_total = Decimal("0")
created_items: list[InvoiceItem] = []
for item in payload.items:
quantity_value = Decimal(str(item.quantity))
if quantity_value <= 0:
raise HTTPException(status_code=400, detail="Item quantity must be greater than zero")

product_query = db.query(Product).filter(Product.id == item.product_id)
if company_id is not None:
product_query = product_query.filter(or_(Product.company_id == company_id, Product.company_id.is_(None)))
product = product_query.first()
if not product:
raise HTTPException(status_code=404, detail=f"Product {item.product_id} not found")

if not product.allow_decimal and quantity_value != quantity_value.to_integral_value():
raise HTTPException(
status_code=400,
detail=f"Quantity for {product.name} must be a whole number",
)

if apply_inventory_changes and product.maintain_inventory:
inventory_query = db.query(Inventory).filter(Inventory.product_id == item.product_id)
if company_id is not None:
inventory_query = inventory_query.filter(or_(Inventory.company_id == company_id, Inventory.company_id.is_(None)))
inventory = inventory_query.first()
if payload.voucher_type == "sales" and (not inventory or Decimal(str(inventory.quantity or 0)) < quantity_value):
raise HTTPException(status_code=400, detail=f"Insufficient inventory for {product.name}")

quantity_delta = _inventory_effect_for_voucher_type(item.quantity, payload.voucher_type)
_change_inventory_quantity(
db,
item.product_id,
quantity_delta,
company_id=company_id,
context=f"applying invoice {invoice.id or 'new'}",
)

# Use custom unit_price if provided, otherwise use product price.
# GST rate is snapshotted from the product at invoice time.
unit_price = Decimal(str(item.unit_price)) if item.unit_price is not None else Decimal(str(product.price))
gst_rate = Decimal(str(product.gst_rate or 0))

if payload.tax_inclusive:
# Entered price already includes tax; back-calculate taxable amount
line_total = _money(unit_price * quantity_value)
taxable_amount = _money(line_total / (1 + gst_rate / Decimal("100")))
tax_amount = _money(line_total - taxable_amount)
else:
taxable_amount = _money(unit_price * quantity_value)
tax_amount = _money(taxable_amount * gst_rate / Decimal("100"))
line_total = _money(taxable_amount + tax_amount)

taxable_total += taxable_amount

invoice_item = InvoiceItem(
invoice_id=invoice.id,
product_id=product.id,
quantity=float(quantity_value),
hsn_sac=product.hsn_sac,
unit_price=float(unit_price),
gst_rate=float(gst_rate),
taxable_amount=float(taxable_amount),
tax_amount=float(tax_amount),
line_total=float(line_total),
description=item.description,
)
created_items.append(invoice_item)
db.add(invoice_item)

taxable_total = _money(taxable_total)
invoice.taxable_amount = float(taxable_total)

assign_item_tax_split(
created_items,
interstate_supply=interstate_supply,
)

tax_total = assign_invoice_tax_totals(
invoice,
created_items,
interstate_supply=interstate_supply,
)
raw_total = _money(taxable_total + tax_total)
if invoice.apply_round_off:
rounded_total = raw_total.quantize(Decimal("1"), rounding=ROUND_HALF_UP)
round_off_amount = _money(rounded_total - raw_total)
invoice.round_off_amount = float(round_off_amount)
invoice.total_amount = float(_money(rounded_total))
else:
invoice.round_off_amount = 0
invoice.total_amount = float(raw_total)


def _to_invoice_out(
invoice: Invoice,
*,
Expand Down Expand Up @@ -355,8 +71,8 @@ def create_invoice(
)
db.add(invoice)
db.flush()
_apply_payload_to_invoice(
db,
processor = InvoiceProcessor(db)
processor.apply_payload(
invoice,
payload,
active_company,
Expand Down Expand Up @@ -583,8 +299,8 @@ def update_invoice(
try:
active_fy = get_active_fy(db, company_id=active_company.id)

_apply_inventory_delta_for_invoice_update(
db,
processor = InvoiceProcessor(db)
processor.apply_inventory_delta_for_update(
invoice,
payload,
company_id=active_company.id,
Expand All @@ -594,8 +310,7 @@ def update_invoice(
db.delete(item)
db.flush()

_apply_payload_to_invoice(
db,
processor.apply_payload(
invoice,
payload,
active_company,
Expand Down Expand Up @@ -710,7 +425,8 @@ def cancel_invoice(
raise HTTPException(status_code=400, detail="Invoice is already cancelled")

try:
_reverse_existing_invoice_inventory(db, invoice)
processor = InvoiceProcessor(db)
processor.reverse_inventory(invoice)
invoice.status = "cancelled"
db.commit()
db.refresh(invoice)
Expand Down Expand Up @@ -743,24 +459,8 @@ def restore_invoice(
raise HTTPException(status_code=400, detail="Invoice is not cancelled")

try:
# Re-apply inventory changes (reverse the reversal)
for item in invoice.items:
product_query = db.query(Product).filter(Product.id == item.product_id)
product_query = product_query.filter(or_(Product.company_id == active_company.id, Product.company_id.is_(None)))
product = product_query.first()
if not product:
raise HTTPException(status_code=404, detail=f"Product {item.product_id} not found")
if not product.maintain_inventory:
continue

restore_delta = -item.quantity if invoice.voucher_type == "sales" else item.quantity
_change_inventory_quantity(
db,
item.product_id,
restore_delta,
company_id=active_company.id,
context=f"restoring invoice {invoice.id}",
)
processor = InvoiceProcessor(db)
processor.restore_inventory(invoice, company_id=active_company.id)
invoice.status = "active"
db.commit()
db.refresh(invoice)
Expand Down
Loading
Loading