This commit is contained in:
despiegk 2025-04-19 08:39:40 +02:00
parent 82052ef385
commit 9c7baa3b4e
10 changed files with 1167 additions and 0 deletions

View File

@ -0,0 +1,131 @@
# Business Models Python Port
This directory contains a Python port of the business models from the Rust codebase, using SQLModel for database integration.
## Overview
This project includes:
1. Python port of Rust business models using SQLModel
2. FastAPI server with OpenAPI/Swagger documentation
3. CRUD operations for all models
4. Convenience endpoints for common operations
The models ported from Rust to Python include:
- **Currency**: Represents a monetary value with amount and currency code
- **Customer**: Represents a customer who can purchase products or services
- **Product**: Represents a product or service offered
- **ProductComponent**: Represents a component of a product
- **SaleItem**: Represents an item in a sale
- **Sale**: Represents a sale of products or services
## Structure
- `models.py`: Contains the SQLModel definitions for all business models
- `example.py`: Demonstrates how to use the models with a sample application
- `install_and_run.sh`: Bash script to install dependencies using `uv` and run the example
- `api.py`: FastAPI server providing CRUD operations for all models
- `server.sh`: Bash script to start the FastAPI server
## Requirements
- Python 3.7+
- [uv](https://github.com/astral-sh/uv) for dependency management
## Installation
The project uses `uv` for dependency management. To install dependencies and run the example:
```bash
./install_and_run.sh
```
## API Server
The project includes a FastAPI server that provides CRUD operations for all models and some convenience endpoints.
### Starting the Server
To start the API server:
```bash
./server.sh
```
This script will:
1. Create a virtual environment if it doesn't exist
2. Install the required dependencies using `uv`
3. Start the FastAPI server with hot reloading enabled
### API Documentation
Once the server is running, you can access the OpenAPI documentation at:
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
### Available Endpoints
The API provides the following endpoints:
#### Currencies
- `GET /currencies/`: List all currencies
- `POST /currencies/`: Create a new currency
- `GET /currencies/{currency_id}`: Get a specific currency
- `PUT /currencies/{currency_id}`: Update a currency
- `DELETE /currencies/{currency_id}`: Delete a currency
#### Customers
- `GET /customers/`: List all customers
- `POST /customers/`: Create a new customer
- `GET /customers/{customer_id}`: Get a specific customer
- `PUT /customers/{customer_id}`: Update a customer
- `DELETE /customers/{customer_id}`: Delete a customer
- `GET /customers/{customer_id}/sales/`: Get all sales for a customer
#### Products
- `GET /products/`: List all products
- `POST /products/`: Create a new product
- `GET /products/{product_id}`: Get a specific product
- `PUT /products/{product_id}`: Update a product
- `DELETE /products/{product_id}`: Delete a product
- `GET /products/available/`: Get all available products
- `POST /products/{product_id}/components/`: Add a component to a product
- `GET /products/{product_id}/components/`: Get all components for a product
#### Sales
- `GET /sales/`: List all sales
- `POST /sales/`: Create a new sale
- `GET /sales/{sale_id}`: Get a specific sale
- `PUT /sales/{sale_id}`: Update a sale
- `DELETE /sales/{sale_id}`: Delete a sale
- `PUT /sales/{sale_id}/status/{status}`: Update the status of a sale
- `POST /sales/{sale_id}/items/`: Add an item to a sale
- `GET /sales/{sale_id}/items/`: Get all items for a sale
## Dependencies
- SQLModel: For database models and ORM functionality
- Pydantic: For data validation (used by SQLModel)
- FastAPI: For creating the API server
- Uvicorn: ASGI server for running FastAPI applications
## Example Usage
The `example.py` script demonstrates:
1. Creating an SQLite database
2. Defining and creating tables for the models
3. Creating sample data (customers, products, sales)
4. Performing operations on the data
5. Querying and displaying the data
To run the example manually (after activating the virtual environment):
```bash
# From the py directory
python example.py
# Or from the parent directory
cd py && python example.py

View File

@ -0,0 +1,3 @@
"""
Python port of the business models from Rust.
"""

Binary file not shown.

455
herodb/src/models/py/api.py Executable file
View File

@ -0,0 +1,455 @@
#!/usr/bin/env python3
"""
FastAPI server providing CRUD operations for business models.
"""
import os
from datetime import datetime
from typing import List, Optional
from fastapi import Depends, FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from sqlmodel import Session, SQLModel, create_engine, select
from models import (
Currency,
Customer,
Product,
ProductComponent,
ProductStatus,
ProductType,
Sale,
SaleItem,
SaleStatus,
)
# Create database
DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///business.db")
engine = create_engine(DATABASE_URL, echo=False)
# Create tables
SQLModel.metadata.create_all(engine)
# Create FastAPI app
app = FastAPI(
title="Business API",
description="API for business models",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Dependency to get database session
def get_session():
with Session(engine) as session:
yield session
# Root endpoint
@app.get("/")
async def root():
return {"message": "Welcome to the Business API"}
# Currency endpoints
@app.post("/currencies/", response_model=Currency, tags=["Currencies"])
def create_currency(currency: Currency, session: Session = Depends(get_session)):
"""Create a new currency"""
session.add(currency)
session.commit()
session.refresh(currency)
return currency
@app.get("/currencies/", response_model=List[Currency], tags=["Currencies"])
def read_currencies(
skip: int = 0, limit: int = 100, session: Session = Depends(get_session)
):
"""Get all currencies"""
currencies = session.exec(select(Currency).offset(skip).limit(limit)).all()
return currencies
@app.get("/currencies/{currency_id}", response_model=Currency, tags=["Currencies"])
def read_currency(currency_id: int, session: Session = Depends(get_session)):
"""Get a currency by ID"""
currency = session.get(Currency, currency_id)
if not currency:
raise HTTPException(status_code=404, detail="Currency not found")
return currency
@app.put("/currencies/{currency_id}", response_model=Currency, tags=["Currencies"])
def update_currency(
currency_id: int, currency_data: Currency, session: Session = Depends(get_session)
):
"""Update a currency"""
currency = session.get(Currency, currency_id)
if not currency:
raise HTTPException(status_code=404, detail="Currency not found")
# Update currency attributes
currency_data_dict = currency_data.dict(exclude_unset=True)
for key, value in currency_data_dict.items():
setattr(currency, key, value)
session.add(currency)
session.commit()
session.refresh(currency)
return currency
@app.delete("/currencies/{currency_id}", tags=["Currencies"])
def delete_currency(currency_id: int, session: Session = Depends(get_session)):
"""Delete a currency"""
currency = session.get(Currency, currency_id)
if not currency:
raise HTTPException(status_code=404, detail="Currency not found")
session.delete(currency)
session.commit()
return {"message": "Currency deleted successfully"}
# Customer endpoints
@app.post("/customers/", response_model=Customer, tags=["Customers"])
def create_customer(customer: Customer, session: Session = Depends(get_session)):
"""Create a new customer"""
session.add(customer)
session.commit()
session.refresh(customer)
return customer
@app.get("/customers/", response_model=List[Customer], tags=["Customers"])
def read_customers(
skip: int = 0, limit: int = 100, session: Session = Depends(get_session)
):
"""Get all customers"""
customers = session.exec(select(Customer).offset(skip).limit(limit)).all()
return customers
@app.get("/customers/{customer_id}", response_model=Customer, tags=["Customers"])
def read_customer(customer_id: int, session: Session = Depends(get_session)):
"""Get a customer by ID"""
customer = session.get(Customer, customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
return customer
@app.put("/customers/{customer_id}", response_model=Customer, tags=["Customers"])
def update_customer(
customer_id: int, customer_data: Customer, session: Session = Depends(get_session)
):
"""Update a customer"""
customer = session.get(Customer, customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
# Update customer attributes
customer_data_dict = customer_data.dict(exclude_unset=True)
for key, value in customer_data_dict.items():
setattr(customer, key, value)
session.add(customer)
session.commit()
session.refresh(customer)
return customer
@app.delete("/customers/{customer_id}", tags=["Customers"])
def delete_customer(customer_id: int, session: Session = Depends(get_session)):
"""Delete a customer"""
customer = session.get(Customer, customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
session.delete(customer)
session.commit()
return {"message": "Customer deleted successfully"}
# Product endpoints
@app.post("/products/", response_model=Product, tags=["Products"])
def create_product(product: Product, session: Session = Depends(get_session)):
"""Create a new product"""
session.add(product)
session.commit()
session.refresh(product)
return product
@app.get("/products/", response_model=List[Product], tags=["Products"])
def read_products(
skip: int = 0,
limit: int = 100,
category: Optional[str] = None,
status: Optional[ProductStatus] = None,
istemplate: Optional[bool] = None,
session: Session = Depends(get_session)
):
"""Get all products with optional filtering"""
query = select(Product)
if category:
query = query.where(Product.category == category)
if status:
query = query.where(Product.status == status)
if istemplate is not None:
query = query.where(Product.istemplate == istemplate)
products = session.exec(query.offset(skip).limit(limit)).all()
return products
@app.get("/products/{product_id}", response_model=Product, tags=["Products"])
def read_product(product_id: int, session: Session = Depends(get_session)):
"""Get a product by ID"""
product = session.get(Product, product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@app.put("/products/{product_id}", response_model=Product, tags=["Products"])
def update_product(
product_id: int, product_data: Product, session: Session = Depends(get_session)
):
"""Update a product"""
product = session.get(Product, product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Update product attributes
product_data_dict = product_data.dict(exclude_unset=True)
for key, value in product_data_dict.items():
setattr(product, key, value)
session.add(product)
session.commit()
session.refresh(product)
return product
@app.delete("/products/{product_id}", tags=["Products"])
def delete_product(product_id: int, session: Session = Depends(get_session)):
"""Delete a product"""
product = session.get(Product, product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
session.delete(product)
session.commit()
return {"message": "Product deleted successfully"}
# Product Component endpoints
@app.post("/products/{product_id}/components/", response_model=ProductComponent, tags=["Product Components"])
def create_product_component(
product_id: int, component: ProductComponent, session: Session = Depends(get_session)
):
"""Add a component to a product"""
product = session.get(Product, product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
component.product_id = product_id
session.add(component)
session.commit()
session.refresh(component)
return component
@app.get("/products/{product_id}/components/", response_model=List[ProductComponent], tags=["Product Components"])
def read_product_components(
product_id: int, session: Session = Depends(get_session)
):
"""Get all components for a product"""
product = session.get(Product, product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product.components
# Sale endpoints
@app.post("/sales/", response_model=Sale, tags=["Sales"])
def create_sale(sale: Sale, session: Session = Depends(get_session)):
"""Create a new sale"""
# Ensure customer exists if customer_id is provided
if sale.customer_id:
customer = session.get(Customer, sale.customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
session.add(sale)
session.commit()
session.refresh(sale)
return sale
@app.get("/sales/", response_model=List[Sale], tags=["Sales"])
def read_sales(
skip: int = 0,
limit: int = 100,
status: Optional[SaleStatus] = None,
customer_id: Optional[int] = None,
session: Session = Depends(get_session)
):
"""Get all sales with optional filtering"""
query = select(Sale)
if status:
query = query.where(Sale.status == status)
if customer_id:
query = query.where(Sale.customer_id == customer_id)
sales = session.exec(query.offset(skip).limit(limit)).all()
return sales
@app.get("/sales/{sale_id}", response_model=Sale, tags=["Sales"])
def read_sale(sale_id: int, session: Session = Depends(get_session)):
"""Get a sale by ID"""
sale = session.get(Sale, sale_id)
if not sale:
raise HTTPException(status_code=404, detail="Sale not found")
return sale
@app.put("/sales/{sale_id}", response_model=Sale, tags=["Sales"])
def update_sale(
sale_id: int, sale_data: Sale, session: Session = Depends(get_session)
):
"""Update a sale"""
sale = session.get(Sale, sale_id)
if not sale:
raise HTTPException(status_code=404, detail="Sale not found")
# Update sale attributes
sale_data_dict = sale_data.dict(exclude_unset=True)
for key, value in sale_data_dict.items():
setattr(sale, key, value)
session.add(sale)
session.commit()
session.refresh(sale)
return sale
@app.delete("/sales/{sale_id}", tags=["Sales"])
def delete_sale(sale_id: int, session: Session = Depends(get_session)):
"""Delete a sale"""
sale = session.get(Sale, sale_id)
if not sale:
raise HTTPException(status_code=404, detail="Sale not found")
session.delete(sale)
session.commit()
return {"message": "Sale deleted successfully"}
# Sale Item endpoints
@app.post("/sales/{sale_id}/items/", response_model=SaleItem, tags=["Sale Items"])
def create_sale_item(
sale_id: int, item: SaleItem, session: Session = Depends(get_session)
):
"""Add an item to a sale"""
sale = session.get(Sale, sale_id)
if not sale:
raise HTTPException(status_code=404, detail="Sale not found")
item.sale_id = sale_id
session.add(item)
session.commit()
session.refresh(item)
# Update the sale's total amount
sale.add_item(item)
session.add(sale)
session.commit()
return item
@app.get("/sales/{sale_id}/items/", response_model=List[SaleItem], tags=["Sale Items"])
def read_sale_items(
sale_id: int, session: Session = Depends(get_session)
):
"""Get all items for a sale"""
sale = session.get(Sale, sale_id)
if not sale:
raise HTTPException(status_code=404, detail="Sale not found")
return sale.items
# Convenience endpoints
@app.put("/sales/{sale_id}/status/{status}", response_model=Sale, tags=["Convenience"])
def update_sale_status(
sale_id: int, status: SaleStatus, session: Session = Depends(get_session)
):
"""Update the status of a sale"""
sale = session.get(Sale, sale_id)
if not sale:
raise HTTPException(status_code=404, detail="Sale not found")
sale.update_status(status)
session.add(sale)
session.commit()
session.refresh(sale)
return sale
@app.get("/products/available/", response_model=List[Product], tags=["Convenience"])
def get_available_products(
istemplate: Optional[bool] = False,
session: Session = Depends(get_session)
):
"""Get all available products"""
query = select(Product).where(
Product.status == ProductStatus.AVAILABLE,
Product.purchase_till > datetime.utcnow(),
Product.istemplate == istemplate
)
products = session.exec(query).all()
return products
@app.get("/customers/{customer_id}/sales/", response_model=List[Sale], tags=["Convenience"])
def get_customer_sales(
customer_id: int,
status: Optional[SaleStatus] = None,
session: Session = Depends(get_session)
):
"""Get all sales for a customer"""
customer = session.get(Customer, customer_id)
if not customer:
raise HTTPException(status_code=404, detail="Customer not found")
query = select(Sale).where(Sale.customer_id == customer_id)
if status:
query = query.where(Sale.status == status)
sales = session.exec(query).all()
return sales
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Binary file not shown.

190
herodb/src/models/py/example.py Executable file
View File

@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""
Example script demonstrating the use of the business models.
"""
import datetime
from typing import List
from sqlmodel import Session, SQLModel, create_engine, select
from models import (
Currency,
Customer,
Product,
ProductComponent,
ProductStatus,
ProductType,
Sale,
SaleItem,
SaleStatus,
)
def create_tables(engine):
"""Create all tables in the database"""
SQLModel.metadata.create_all(engine)
def create_sample_data(session: Session) -> None:
"""Create sample data for demonstration"""
# Create currencies
usd = Currency(currency_code="USD", amount=0.0)
eur = Currency(currency_code="EUR", amount=0.0)
session.add(usd)
session.add(eur)
session.commit()
# Create a customer
customer = Customer.new(
name="Acme Corporation",
description="A fictional company",
pubkey="acme123456",
contact_sids=["circle1_contact123", "circle2_contact456"]
)
session.add(customer)
session.commit()
# Create product components
cpu_component = ProductComponent.new(
name="CPU",
description="Central Processing Unit",
quantity=1,
)
ram_component = ProductComponent.new(
name="RAM",
description="Random Access Memory",
quantity=2,
)
session.add(cpu_component)
session.add(ram_component)
session.commit()
# Create products
laptop_price = Currency(currency_code="USD", amount=1200.0)
session.add(laptop_price)
session.commit()
laptop = Product.new(
name="Laptop",
description="High-performance laptop",
price=laptop_price,
type_=ProductType.PRODUCT,
category="Electronics",
status=ProductStatus.AVAILABLE,
max_amount=100,
validity_days=365,
istemplate=False,
)
laptop.add_component(cpu_component)
laptop.add_component(ram_component)
session.add(laptop)
session.commit()
support_price = Currency(currency_code="USD", amount=50.0)
session.add(support_price)
session.commit()
support = Product.new(
name="Technical Support",
description="24/7 technical support",
price=support_price,
type_=ProductType.SERVICE,
category="Support",
status=ProductStatus.AVAILABLE,
max_amount=1000,
validity_days=30,
istemplate=True, # This is a template product
)
session.add(support)
session.commit()
# Create a sale
sale = Sale.new(
customer=customer,
currency_code="USD",
)
session.add(sale)
session.commit()
# Create sale items
laptop_unit_price = Currency(currency_code="USD", amount=1200.0)
session.add(laptop_unit_price)
session.commit()
laptop_item = SaleItem.new(
product=laptop,
quantity=1,
unit_price=laptop_unit_price,
active_till=datetime.datetime.utcnow() + datetime.timedelta(days=365),
)
sale.add_item(laptop_item)
support_unit_price = Currency(currency_code="USD", amount=50.0)
session.add(support_unit_price)
session.commit()
support_item = SaleItem.new(
product=support,
quantity=2,
unit_price=support_unit_price,
active_till=datetime.datetime.utcnow() + datetime.timedelta(days=30),
)
sale.add_item(support_item)
# Complete the sale
sale.update_status(SaleStatus.COMPLETED)
session.commit()
def query_data(session: Session) -> None:
"""Query and display data from the database"""
print("\n=== Customers ===")
customers = session.exec(select(Customer)).all()
for customer in customers:
print(f"Customer: {customer.name} ({customer.pubkey})")
print(f" Description: {customer.description}")
print(f" Contact SIDs: {', '.join(customer.contact_sids)}")
print(f" Created at: {customer.created_at}")
print("\n=== Products ===")
products = session.exec(select(Product)).all()
for product in products:
print(f"Product: {product.name} ({product.type_.value})")
print(f" Description: {product.description}")
print(f" Price: {product.price.amount} {product.price.currency_code}")
print(f" Status: {product.status.value}")
print(f" Is Template: {product.istemplate}")
print(f" Components:")
for component in product.components:
print(f" - {component.name}: {component.quantity}")
print("\n=== Sales ===")
sales = session.exec(select(Sale)).all()
for sale in sales:
print(f"Sale to: {sale.customer.name}")
print(f" Status: {sale.status.value}")
print(f" Total: {sale.total_amount.amount} {sale.total_amount.currency_code}")
print(f" Items:")
for item in sale.items:
print(f" - {item.name}: {item.quantity} x {item.unit_price.amount} = {item.subtotal.amount} {item.subtotal.currency_code}")
def main():
"""Main function"""
print("Creating in-memory SQLite database...")
engine = create_engine("sqlite:///business.db", echo=False)
print("Creating tables...")
create_tables(engine)
print("Creating sample data...")
with Session(engine) as session:
create_sample_data(session)
print("Querying data...")
with Session(engine) as session:
query_data(session)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,49 @@
#!/bin/bash
# Script to install dependencies using uv and run the example script
set -e # Exit on error
# Change to the directory where this script is located
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$SCRIPT_DIR"
echo "Changed to directory: $SCRIPT_DIR"
# Define variables
VENV_DIR=".venv"
REQUIREMENTS="sqlmodel pydantic"
# Check if uv is installed
if ! command -v uv &> /dev/null; then
echo "Error: uv is not installed."
echo "Please install it using: curl -LsSf https://astral.sh/uv/install.sh | sh"
exit 1
fi
# Create virtual environment if it doesn't exist
if [ ! -d "$VENV_DIR" ]; then
echo "Creating virtual environment..."
uv venv "$VENV_DIR"
fi
# Activate virtual environment
echo "Activating virtual environment..."
source "$VENV_DIR/bin/activate"
# Install dependencies
echo "Installing dependencies using uv..."
uv pip install $REQUIREMENTS
# Make example.py executable
chmod +x example.py
# Remove existing database file if it exists
if [ -f "business.db" ]; then
echo "Removing existing database file..."
rm business.db
fi
# Run the example script
echo "Running example script..."
python example.py
echo "Done!"

View File

@ -0,0 +1,297 @@
"""
Python port of the business models from Rust using SQLModel.
"""
from datetime import datetime, timedelta
from enum import Enum
import json
from typing import List, Optional
from sqlmodel import Field, Relationship, SQLModel
class SaleStatus(str, Enum):
"""SaleStatus represents the status of a sale"""
PENDING = "pending"
COMPLETED = "completed"
CANCELLED = "cancelled"
class ProductType(str, Enum):
"""ProductType represents the type of a product"""
PRODUCT = "product"
SERVICE = "service"
class ProductStatus(str, Enum):
"""ProductStatus represents the status of a product"""
AVAILABLE = "available"
UNAVAILABLE = "unavailable"
class Currency(SQLModel, table=True):
"""Currency represents a monetary value with amount and currency code"""
id: Optional[int] = Field(default=None, primary_key=True)
amount: float
currency_code: str
@classmethod
def new(cls, amount: float, currency_code: str) -> "Currency":
"""Create a new currency with amount and code"""
return cls(amount=amount, currency_code=currency_code)
class Customer(SQLModel, table=True):
"""Customer represents a customer who can purchase products or services"""
id: Optional[int] = Field(default=None, primary_key=True)
name: str
description: str
pubkey: str
contact_sids_json: str = Field(default="[]")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
sales: List["Sale"] = Relationship(back_populates="customer")
@property
def contact_sids(self) -> List[str]:
"""Get the contact SIDs as a list"""
return json.loads(self.contact_sids_json)
@contact_sids.setter
def contact_sids(self, value: List[str]) -> None:
"""Set the contact SIDs from a list"""
self.contact_sids_json = json.dumps(value)
@classmethod
def new(cls, name: str, description: str, pubkey: str, contact_sids: List[str] = None) -> "Customer":
"""Create a new customer with default timestamps"""
customer = cls(
name=name,
description=description,
pubkey=pubkey,
)
if contact_sids:
customer.contact_sids = contact_sids
return customer
def add_contact(self, contact_id: int) -> None:
"""Add a contact ID to the customer"""
# In a real implementation, this would add a relationship to a Contact model
# For simplicity, we're not implementing the Contact model in this example
self.updated_at = datetime.utcnow()
def add_contact_sid(self, circle_id: str, object_id: str) -> None:
"""Add a smart ID (sid) to the customer's contact_sids list"""
sid = f"{circle_id}_{object_id}"
sids = self.contact_sids
if sid not in sids:
sids.append(sid)
self.contact_sids = sids
self.updated_at = datetime.utcnow()
class ProductComponent(SQLModel, table=True):
"""ProductComponent represents a component of a product"""
id: Optional[int] = Field(default=None, primary_key=True)
name: str
description: str
quantity: int
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
product_id: Optional[int] = Field(default=None, foreign_key="product.id")
product: Optional["Product"] = Relationship(back_populates="components")
@classmethod
def new(cls, name: str, description: str, quantity: int) -> "ProductComponent":
"""Create a new product component with default timestamps"""
return cls(
name=name,
description=description,
quantity=quantity,
)
class Product(SQLModel, table=True):
"""Product represents a product or service offered"""
id: Optional[int] = Field(default=None, primary_key=True)
name: str
description: str
type_: ProductType = Field(sa_column_kwargs={"name": "type"})
category: str
status: ProductStatus
max_amount: int
purchase_till: datetime
active_till: datetime
istemplate: bool = Field(default=False)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Price relationship
price_id: Optional[int] = Field(default=None, foreign_key="currency.id")
price: Optional[Currency] = Relationship()
# Relationships
components: List[ProductComponent] = Relationship(back_populates="product")
sale_items: List["SaleItem"] = Relationship(back_populates="product")
@classmethod
def new(
cls,
name: str,
description: str,
price: Currency,
type_: ProductType,
category: str,
status: ProductStatus,
max_amount: int,
validity_days: int,
istemplate: bool = False,
) -> "Product":
"""Create a new product with default timestamps"""
now = datetime.utcnow()
return cls(
name=name,
description=description,
price=price,
type_=type_,
category=category,
status=status,
max_amount=max_amount,
purchase_till=now + timedelta(days=365),
active_till=now + timedelta(days=validity_days),
istemplate=istemplate,
)
def add_component(self, component: ProductComponent) -> None:
"""Add a component to this product"""
component.product = self
self.components.append(component)
self.updated_at = datetime.utcnow()
def set_purchase_period(self, purchase_till: datetime) -> None:
"""Update the purchase availability timeframe"""
self.purchase_till = purchase_till
self.updated_at = datetime.utcnow()
def set_active_period(self, active_till: datetime) -> None:
"""Update the active timeframe"""
self.active_till = active_till
self.updated_at = datetime.utcnow()
def is_purchasable(self) -> bool:
"""Check if the product is available for purchase"""
return self.status == ProductStatus.AVAILABLE and datetime.utcnow() <= self.purchase_till
def is_active(self) -> bool:
"""Check if the product is still active (for services)"""
return datetime.utcnow() <= self.active_till
class SaleItem(SQLModel, table=True):
"""SaleItem represents an item in a sale"""
id: Optional[int] = Field(default=None, primary_key=True)
name: str
quantity: int
active_till: datetime
# Relationships
sale_id: Optional[int] = Field(default=None, foreign_key="sale.id")
sale: Optional["Sale"] = Relationship(back_populates="items")
product_id: Optional[int] = Field(default=None, foreign_key="product.id")
product: Optional[Product] = Relationship(back_populates="sale_items")
unit_price_id: Optional[int] = Field(default=None, foreign_key="currency.id")
unit_price: Optional[Currency] = Relationship(sa_relationship_kwargs={"foreign_keys": "[SaleItem.unit_price_id]"})
subtotal_id: Optional[int] = Field(default=None, foreign_key="currency.id")
subtotal: Optional[Currency] = Relationship(sa_relationship_kwargs={"foreign_keys": "[SaleItem.subtotal_id]"})
@classmethod
def new(
cls,
product: Product,
quantity: int,
unit_price: Currency,
active_till: datetime,
) -> "SaleItem":
"""Create a new sale item"""
# Calculate subtotal
amount = unit_price.amount * quantity
subtotal = Currency(
amount=amount,
currency_code=unit_price.currency_code,
)
return cls(
name=product.name,
product=product,
quantity=quantity,
unit_price=unit_price,
subtotal=subtotal,
active_till=active_till,
)
class Sale(SQLModel, table=True):
"""Sale represents a sale of products or services"""
id: Optional[int] = Field(default=None, primary_key=True)
status: SaleStatus
sale_date: datetime = Field(default_factory=datetime.utcnow)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
customer_id: Optional[int] = Field(default=None, foreign_key="customer.id")
customer: Optional[Customer] = Relationship(back_populates="sales")
total_amount_id: Optional[int] = Field(default=None, foreign_key="currency.id")
total_amount: Optional[Currency] = Relationship()
items: List[SaleItem] = Relationship(back_populates="sale")
@classmethod
def new(
cls,
customer: Customer,
currency_code: str,
status: SaleStatus = SaleStatus.PENDING,
) -> "Sale":
"""Create a new sale with default timestamps"""
total_amount = Currency(amount=0.0, currency_code=currency_code)
return cls(
customer=customer,
total_amount=total_amount,
status=status,
)
def add_item(self, item: SaleItem) -> None:
"""Add an item to the sale and update the total amount"""
item.sale = self
# Update the total amount
if not self.items:
# First item, initialize the total amount with the same currency
self.total_amount = Currency(
amount=item.subtotal.amount,
currency_code=item.subtotal.currency_code,
)
else:
# Add to the existing total
# (Assumes all items have the same currency)
self.total_amount.amount += item.subtotal.amount
# Add the item to the list
self.items.append(item)
# Update the sale timestamp
self.updated_at = datetime.utcnow()
def update_status(self, status: SaleStatus) -> None:
"""Update the status of the sale"""
self.status = status
self.updated_at = datetime.utcnow()

42
herodb/src/models/py/server.sh Executable file
View File

@ -0,0 +1,42 @@
#!/bin/bash
# Script to start the FastAPI server
set -e # Exit on error
# Change to the directory where this script is located
SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$SCRIPT_DIR"
echo "Changed to directory: $SCRIPT_DIR"
# Define variables
VENV_DIR=".venv"
REQUIREMENTS="sqlmodel pydantic fastapi uvicorn"
# Check if uv is installed
if ! command -v uv &> /dev/null; then
echo "Error: uv is not installed."
echo "Please install it using: curl -LsSf https://astral.sh/uv/install.sh | sh"
exit 1
fi
# Create virtual environment if it doesn't exist
if [ ! -d "$VENV_DIR" ]; then
echo "Creating virtual environment..."
uv venv "$VENV_DIR"
fi
# Activate virtual environment
echo "Activating virtual environment..."
source "$VENV_DIR/bin/activate"
# Install dependencies
echo "Installing dependencies using uv..."
uv pip install $REQUIREMENTS
# Make api.py executable
chmod +x api.py
# Start the FastAPI server
echo "Starting FastAPI server..."
echo "API documentation available at: http://localhost:8000/docs"
uvicorn api:app --host 0.0.0.0 --port 8000 --reload