214 lines
7.5 KiB
Python
214 lines
7.5 KiB
Python
from peewee import *
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any, Iterable, Union
|
|
import os
|
|
import logging
|
|
import traceback
|
|
|
|
# Configure database path
|
|
DB_DIR = os.path.expanduser('~/hero/var/logdb/')
|
|
DB_FILE = os.path.join(DB_DIR, 'logs.db')
|
|
|
|
# Create directory if it doesn't exist
|
|
os.makedirs(DB_DIR, exist_ok=True)
|
|
|
|
# Initialize database
|
|
database = SqliteDatabase(DB_FILE, pragmas={'journal_mode': 'wal'})
|
|
|
|
class BaseModel(Model):
|
|
"""Base model class for Peewee."""
|
|
class Meta:
|
|
database = database
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert model instance to dictionary."""
|
|
data = {}
|
|
for field_name in self._meta.fields:
|
|
field_value = getattr(self, field_name)
|
|
if field_name in ('time', 'last_seen') and isinstance(field_value, int):
|
|
# Convert epoch to a readable format for the frontend
|
|
data[field_name] = datetime.fromtimestamp(field_value).strftime('%d-%m %H:%M')
|
|
else:
|
|
data[field_name] = field_value
|
|
return data
|
|
|
|
class Log(BaseModel):
|
|
"""Model for INFO logs."""
|
|
time = IntegerField(default=lambda: int(time.time()), index=True)
|
|
email = CharField(max_length=255, null=True)
|
|
logmsg = TextField()
|
|
level = IntegerField(default=100)
|
|
cat = CharField(max_length=100, index=True, default="general")
|
|
payload = TextField(null=True)
|
|
payload_cat = CharField(max_length=100, null=True)
|
|
|
|
class Meta:
|
|
table_name = 'logs'
|
|
|
|
class Error(BaseModel):
|
|
"""Model for ERROR logs."""
|
|
time = IntegerField(default=lambda: int(time.time()), index=True)
|
|
last_seen = IntegerField(default=lambda: int(time.time()), index=True)
|
|
email = CharField(max_length=255, null=True)
|
|
logmsg = TextField()
|
|
stacktrace = TextField(null=True)
|
|
count = IntegerField(default=1)
|
|
cat = CharField(max_length=100, index=True, default="general")
|
|
payload = TextField(null=True)
|
|
payload_cat = CharField(max_length=100, null=True)
|
|
|
|
class Meta:
|
|
table_name = 'errors'
|
|
|
|
def init_db_logging():
|
|
"""Create tables if they don't exist."""
|
|
with database:
|
|
database.create_tables([Log, Error], safe=True)
|
|
|
|
class DatabaseLogHandler(logging.Handler):
|
|
"""A logging handler that writes logs to the Peewee database."""
|
|
def emit(self, record):
|
|
stacktrace = None
|
|
if record.exc_info:
|
|
stacktrace = logging.Formatter().formatException(record.exc_info)
|
|
|
|
if record.levelno >= logging.ERROR:
|
|
log_error(
|
|
msg=record.getMessage(),
|
|
cat=record.name,
|
|
stacktrace=stacktrace
|
|
)
|
|
else:
|
|
log_info(
|
|
msg=record.getMessage(),
|
|
level=record.levelno,
|
|
cat=record.name
|
|
)
|
|
|
|
def log_error(msg: str, cat: str = "general", email: Optional[str] = None, stacktrace: Optional[str] = None, payload: Optional[str] = None, payload_cat: Optional[str] = None):
|
|
"""Log an ERROR message to the database, handling duplicates."""
|
|
try:
|
|
log_info(msg=msg, cat=cat, email=email, payload=payload, payload_cat=payload_cat)
|
|
except Exception as e:
|
|
pass
|
|
try:
|
|
if not stacktrace:
|
|
# Capture the current stack trace if not provided
|
|
stacktrace = "".join(traceback.format_stack())
|
|
|
|
# Filter out irrelevant lines from the stack trace
|
|
if stacktrace:
|
|
lines = stacktrace.split('\n')
|
|
filtered_lines = [
|
|
line for line in lines
|
|
if 'python3.13/logging' not in line and 'src/mylogging.py' not in line
|
|
]
|
|
stacktrace = '\n'.join(filtered_lines)
|
|
|
|
one_day_ago = int(time.time()) - (24 * 3600)
|
|
|
|
# Look for a similar error in the last 24 hours from the same user
|
|
existing_error = Error.select().where(
|
|
(Error.logmsg == msg) &
|
|
(Error.email == email) &
|
|
(Error.last_seen >= one_day_ago)
|
|
).first()
|
|
|
|
if existing_error:
|
|
# If found, increment counter and update last_seen
|
|
existing_error.count += 1
|
|
existing_error.last_seen = int(time.time())
|
|
existing_error.stacktrace = stacktrace
|
|
existing_error.save()
|
|
print(existing_error)
|
|
else:
|
|
# Otherwise, create a new error record
|
|
Error.create(
|
|
logmsg=msg,
|
|
cat=cat,
|
|
email=email,
|
|
stacktrace=stacktrace,
|
|
payload=payload,
|
|
payload_cat=payload_cat
|
|
)
|
|
logging.info(f"Successfully logged new error: {msg}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Failed to log error to {DB_FILE}: {e}")
|
|
|
|
def log_info(msg: str, level: int = 0, cat: str = "general", email: Optional[str] = None, payload: Optional[str] = None, payload_cat: Optional[str] = None):
|
|
"""Log an INFO message to the database."""
|
|
try:
|
|
Log.create(logmsg=msg, level=level, cat=cat, email=email, payload=payload, payload_cat=payload_cat)
|
|
except Exception as e:
|
|
print(f"Failed to log info to {DB_FILE}: {e}")
|
|
|
|
def get_errors(search: Optional[str] = None, cat: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""Get errors from the database with optional filters. Category search is prefix-based."""
|
|
query = Error.select().order_by(Error.last_seen.desc())
|
|
if search:
|
|
query = query.where(Error.logmsg.contains(search))
|
|
if cat and cat.strip():
|
|
query = query.where(Error.cat.startswith(cat.strip()))
|
|
return [e.to_dict() for e in query]
|
|
|
|
def get_logs(
|
|
search: Optional[str] = None,
|
|
cat: Optional[str] = None,
|
|
level: Optional[int] = None,
|
|
hours_ago: Optional[int] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Get logs from the database with optional filters. Category search is prefix-based."""
|
|
query = Log.select().order_by(Log.time.desc())
|
|
|
|
if search and search.strip():
|
|
query = query.where(Log.logmsg.contains(search))
|
|
|
|
if cat and cat.strip():
|
|
query = query.where(Log.cat.startswith(cat.strip()))
|
|
|
|
if level is not None:
|
|
query = query.where(Log.level <= level)
|
|
|
|
if hours_ago is not None:
|
|
time_ago = int(time.time()) - (hours_ago * 3600)
|
|
query = query.where(Log.time >= time_ago)
|
|
|
|
return [l.to_dict() for l in query]
|
|
|
|
def get_log_by_id(log_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Get a single log by its ID."""
|
|
try:
|
|
log = Log.get_by_id(log_id)
|
|
return log.to_dict()
|
|
except Log.DoesNotExist:
|
|
return None
|
|
|
|
def delete_logs_older_than(minutes: int):
|
|
"""Delete logs older than a specified number of minutes."""
|
|
time_ago = int(time.time()) - (minutes * 60)
|
|
Log.delete().where(Log.time < time_ago).execute()
|
|
|
|
def delete_errors_older_than(minutes: int):
|
|
"""Delete errors older than a specified number of minutes."""
|
|
time_ago = int(time.time()) - (minutes * 60)
|
|
Error.delete().where(Error.time < time_ago).execute()
|
|
|
|
def get_unique_log_categories() -> List[str]:
|
|
"""Get unique log categories from the database."""
|
|
query = (Log
|
|
.select(Log.cat)
|
|
.where(Log.cat.is_null(False))
|
|
.distinct()
|
|
.order_by(Log.cat))
|
|
return [l.cat for l in query]
|
|
|
|
def get_unique_error_categories() -> List[str]:
|
|
"""Get unique error categories from the database."""
|
|
query = (Error
|
|
.select(Error.cat)
|
|
.where(Error.cat.is_null(False))
|
|
.distinct()
|
|
.order_by(Error.cat))
|
|
return [e.cat for e in query] |