This commit is contained in:
2025-08-20 04:15:43 +02:00
parent 6b9f0cf291
commit e4bb201181
95 changed files with 194 additions and 907 deletions

0
herolib/core/__init__.py Normal file
View File

Binary file not shown.

View File

View File

@@ -0,0 +1,38 @@
from pydantic import BaseModel, Field
from typing import Dict, Any, Type, TypeVar
from heroscript.heroscript import *
class User(BaseModel, HeroScriptMixin):
oid: str = Field()
name: str = Field(min_length=2, description="Chosen name by user", example="myname")
city: str = Field()
age: int = Field()
description: str = Field()
# Example usage
u1 = User(oid="abc123", name="John", age=30, city="New York",
description="""
this is a multiline
we need to remove the
this will stay 4 chars in
end
""")
myheroscript = u1.heroscript()
print(myheroscript)
u2 = User.from_heroscript(heroscript=myheroscript)
myprint(u2)
# p1 = Product(id=1, name="Phone", price=999.99, description="A smart phone")
# product_heroscript = p1.heroscript()
# print(product_heroscript)
# p2 = Product.from_heroscript(product_heroscript)
# print(p2)

View File

@@ -0,0 +1,78 @@
from pydantic import BaseModel, Field
from typing import Dict, Any, Type, TypeVar, List
from heroscript.heroscript import *
class Comment(BaseModel):
description: str = Field(default="")
class HeroBase(BaseModel, HeroScriptMixin):
oid: str = Field(default="",metadata={"unique": True})
name: str = Field(min_length=2, description="Chosen name by user", example="myname",metadata={"unique": True})
comments: List[Comment] = Field(..., description="Comment which can be attached to obj")
class User(HeroBase):
city: str = Field(metadata={"index": True})
age: int = Field(metadata={"index": True})
description: str = Field(default="")
class Product(BaseModel, HeroScriptMixin):
id: int = Field(default="",metadata={"unique": True})
name: str = Field(metadata={"unique": True})
price: float = Field()
description: str = Field()
myheroscript="""
```hero
!!user.define
oid:abc123
name:John
description:'
this is a multiline
we need to remove the
this will stay 4 chars in
end
'
age:30
city:'New York'
!!product.define
id:33
name:aproduct
description:'
this is a multiline
we need to remove the
this will stay 4 chars in
end
'
price:10.0
```
"""
# hs=HeroScripts(class_types={"user":User,"product":Product},content=myheroscript)
mypath="~/code/git.threefold.info/tfgrid/hero_research/hero/osis/heroscript/example"
hs=HeroScripts(class_types={"user":User,"product":Product},path=mypath)
objs=hs.get_objects()
for o in objs:
myprint(o)
for item in hs.heroscripts:
print(item)
query = "john*"
results = hs.search(User, query)
# Print the search results
for r in results:
# print(f"User: {r["path"]}")
print(r)

View File

@@ -0,0 +1 @@
{"/Users/despiegk/code/git.threefold.info/tfgrid/hero_research/hero/osis/heroscript/example/testFile.md": "f6e8b6a32349c262cb9afbea771c5add", "/Users/despiegk/code/git.threefold.info/tfgrid/hero_research/hero/osis/heroscript/example/sub/test file 2.md": "0ecc29046b6ef743481358e4c5630a6d"}

View File

@@ -0,0 +1,15 @@
# header
!!product.define
id:33
name:aproduct
description:'
this is a multiline
we need to remove the
this will stay 4 chars in
end
'
price:10.0
something else

View File

@@ -0,0 +1,22 @@
!!user.define
oid:abc123
name:John
description:'
this is a multiline
we need to remove the
this will stay 4 chars in
end
'
age:30
city:'New York'
```heroscript
!!user.define
oid:4nd
name:John2
age:40
city:bxl
```

View File

@@ -0,0 +1,207 @@
from herotools.texttools import dedent
from typing import List, Dict, Tuple
import re
from heroscript.tools import action_blocks,format_multiline_text,heroscript_repr
import textwrap
class HeroActions:
def __init__(self, path: str = "", content:str = ""):
blocks=action_blocks(path=path,content=content)
self.actions : List[HeroAction] = []
for block in blocks:
self.actions.append(HeroAction(block))
def __repr__(self):
out=""
for item in self.actions:
out+=item.__repr__()+"\n"
return out
class HeroAction:
def __init__(self, content: str):
blocks=action_blocks(content=content)
if len(blocks)==0:
raise ValueError(f"don't find actions in {content}")
elif len(blocks)>1:
raise ValueError(f"Found more than one action in {content}")
content=blocks[0]
self.name, content = _name_paramstr(content)
self.params = Params(content)
def __str__(self):
param_str=textwrap.indent(self.params.__str__()," ")
return f"!!{self.name}\n{param_str}"
def __repr__(self):
#return self.__str__()
return heroscript_repr(self.__str__())
class Params:
def __init__(self, content: str):
self.__params = params_parse(content)
def __str__(self):
sorted_params = sorted(self.__params.items())
param_str=""
for key,value in sorted_params:
if "'" in value:
param_str+=f"{key}: {value}\n"
elif "\n" in value:
v=format_multiline_text(value)
param_str+=f"{key}: {v}\n"
elif " " in value:
param_str+=f"{key}: '{value}'\n"
else:
param_str+=f"{key}: {value}\n"
return param_str
def get_int(self, key: str, defval: int = 99999999) -> int:
if key not in self.__params:
if defval == 99999999:
raise KeyError(f"Key '{key}' must exist in parameters")
return defval
return int(self.__params[key])
def get_float(self, key: str, defval: float = 99999999.0) -> float:
if key not in self.__params:
if defval == 99999999.0:
raise KeyError(f"Key '{key}' must exist in parameters")
return defval
return float(self.__params[key])
def get(self, key: str, defval: str = "99999999") -> str:
if key not in self.__params:
if defval == "99999999":
raise KeyError(f"Key '{key}' must exist in parameters")
return defval
return self.__params[key]
def get_list(self, key: str, defval: List[str] = [], needtoexist: bool = True) -> List[str]:
if defval is None:
defval = []
if key not in self.__params:
if needtoexist:
raise KeyError(f"Key '{key}' must exist in parameters")
return defval
return [item.strip().strip("'").strip() for item in self.__params[key].split(",")]
def get_list_int(self, key: str, defval: List[int] = [], needtoexist: bool = True) -> List[int]:
if defval is None:
defval = []
if key not in self.__params:
if needtoexist:
raise KeyError(f"Key '{key}' must exist in parameters")
return defval
return [int(item.strip()) for item in self.__params[key].split(",")]
def get_list_float(self, key: str, defval: List[float] = [], needtoexist: bool = True) -> List[float]:
if defval is None:
defval = []
if key not in self.__params:
if needtoexist:
raise KeyError(f"Key '{key}' must exist in parameters")
return defval
return [float(item.strip()) for item in self.__params[key].split(",")]
def get_all(self) -> Dict[str, str]:
return self.__params
def _name_paramstr(heroscript: str) -> Tuple[str, str]:
if not isinstance(heroscript, str):
raise ValueError("Input must be a string")
heroscript = dedent(heroscript)
lines = heroscript.strip().split("\n")
if not lines or "!!" not in lines[0]:
raise ValueError("The first line must contain '!!' to indicate the class name")
try:
class_name = lines[0].split("!!")[1].lower().strip()
except IndexError:
raise ValueError("Invalid format for class name extraction")
rest_of_text = dedent("\n".join(lines[1:]))
return class_name, rest_of_text
def params_parse(content: str) -> Dict[str, str]:
lines = dedent(content).strip().split("\n")
props = {}
multiline_prop = None
multiline_value : List[str] = list()
for line in lines:
if multiline_prop:
if line.strip() == "'":
props[prop] = dedent("\n".join(multiline_value))
multiline_prop = None
multiline_value = []
else:
multiline_value.append(line)
else:
if ":" in line:
prop, value = line.split(":", 1)
prop = prop.strip()
value = value.strip()
if value == "'":
multiline_prop = prop
else:
if value.startswith("'") and value.endswith("'"):
value1 = value[1:-1]
if not "'" in value1:
value=value1
props[prop] = value
return props
if __name__ == "__main__":
# Example usage
text = """
!!obj1.define
myname: 'mymama'
mylist: '20,200'
mylist2: 20,'a bbb'
mylist3: 20,200
myint:2
!!obj2.color
mother: 'mymama'
name:'aurelie'
length:60
description:'
multiline is supported
now for aurelie
'
color:green
"""
hero_actions = HeroActions(content=text)
print(hero_actions)
a2=hero_actions.actions[1]
assert a2.params.get_list(key="color")==["green"]
assert a2.params.get_list(key="mother")==["mymama"]
assert a2.params.get(key="color")=="green"
assert a2.params.get_int(key="length")==60
assert a2.params.get_list_int(key="length")==[60]
#now some non existing ones
assert a2.params.get_int(key="lengtha",defval=3)==3
assert a2.params.get(key="lengtha",defval="3")=="3"
a1=hero_actions.actions[0]
#print(a1.params.get_list(key="mylist2"))
assert a1.params.get_list(key="mylist")==["20","200"]
assert a1.params.get_list_int(key="mylist")==[20,200]
assert a1.params.get_list(key="mylist2")==["20","a bbb"]

View File

@@ -0,0 +1,129 @@
from pydantic import BaseModel, Field
from typing import Any, Type, TypeVar
import re
import hashlib
import json
import os
from types import List,Dict
T = TypeVar("T", bound=BaseModel)
class HeroScripts:
def __init__(self, class_types: dict, path:str = "", content:str = "", indexpath: str = ""):
self.class_types = class_types
self.heroscripts = List(HeroScript)
self.path = os.path.expanduser(path)
self.indexpath = os.path.expanduser(indexpath)
self.done = Dict[str,str] = {}
# self.done_load()
if self.path:
try:
# self.done_load()
self.load(self.path)
self.done_save()
except FileNotFoundError as e:
print(f"Directory not found: {self.path}")
print(f"Error: {str(e)}")
self.create_indexes()
self.index_objects()
if content:
blocks = extract_heroscript_blocks(content)
self.heroscripts.extend(HeroScript(block) for block in blocks)
def done_load(self):
if self.path:
done_file = os.path.join(self.path, "done.json")
if os.path.exists(done_file):
with open(done_file, "r") as f:
self.done = json.load(f)
def done_save(self):
if self.path:
done_file = os.path.join(self.path, "done.json")
with open(done_file, "w") as f:
json.dump(self.done, f)
def load(self, path):
for root, _, files in os.walk(path):
for filename in files:
print(f" - load {path}/{filename}")
path=f"{path}/{filename}"
if filename.endswith(".md"):
filepath = os.path.join(root, filename)
with open(filepath, "r") as file:
content = file.read()
md5hash = hashlib.md5(content.encode()).hexdigest()
if filepath not in self.done or self.done[filepath] != md5hash:
blocks = self.extract_heroscript_blocks(content)
self.heroscripts.extend(HeroScript(block,path) for block in blocks)
self.done[filepath] = md5hash
@staticmethod
def get_objects(self):
objects = []
for heroscript in self.heroscripts:
if heroscript.content:
try:
class_name = heroscript.content.split("\n")[0].split("!!")[1].split(".")[0].lower()
if class_name in self.class_types:
class_type = self.class_types[class_name]
try:
obj = class_type.from_heroscript(heroscript.content)
objects.append(obj)
except Exception as e:
print(f"Error parsing HeroScript: {e}")
except (IndexError, ValueError):
print(f"Invalid HeroScript format: {heroscript.content}")
return objects
def create_indexes(self):
for class_type in self.class_types.values():
schema = self.create_schema(class_type)
index_dir = os.path.join(self.indexpath, class_type.__name__.lower())
if not os.path.exists(index_dir):
os.makedirs(index_dir)
index.create_in(index_dir, schema)
def create_schema(self, class_type):
schema_fields = {"path": STORED()}
for field_name, field in class_type.__fields__.items():
json_schema_extra = getattr(field, "json_schema_extra", None)
if json_schema_extra is not None:
metadata = json_schema_extra.get("metadata", {})
if isinstance(metadata, list):
metadata = {item: True for item in metadata}
if metadata.get("unique") or metadata.get("indexed"):
if field.annotation == str :
schema_fields[field_name] = ID(stored=True, unique=metadata.get("unique", False))
elif field.annotation == int or field.annotation == float :
schema_fields[field_name] = NUMERIC(stored=True, unique=metadata.get("unique", False))
else:
schema_fields[field_name] = TEXT(stored=True,lowercase=True)
return Schema(**schema_fields)
def index_objects(self):
for heroscript in self.heroscripts:
for obj in self.get_objects():
index_dir = os.path.join(self.indexpath, type(obj).__name__.lower())
ix = index.open_dir(index_dir)
writer = ix.writer()
writer.add_document(path=heroscript.path, **{k: str(v).lower() for k, v in obj.dict().items() if k in ix.schema.names()})
writer.commit()
def search(self, class_type, query):
index_dir = os.path.join(self.indexpath, class_type.__name__.lower())
ix = index.open_dir(index_dir)
qp = QueryParser("name", schema=ix.schema)
q = qp.parse(query)
with ix.searcher() as searcher:
results = searcher.search(q)
# return results
return [result["path"] for result in results]

View File

@@ -0,0 +1,82 @@
from pydantic import BaseModel, Field
from typing import Dict, Any, Type, TypeVar
import re
from colorama import Fore, Style
import hashlib
import json
import os
from types import List
from heroscript.heroaction import HeroAction
from heroscript.tools import format_multiline_text
class HeroScriptMixin:
def heroscript(self) -> HeroAction:
class_name = self.__class__.__name__.lower()
prop_order = ["id", "oid", "name", "title", "description", "content"]
# Get all the properties of the object
props = list(self.__fields__.keys())
# Separate properties into those in prop_order and the rest
ordered_props = [prop for prop in prop_order if prop in props]
remaining_props = [prop for prop in props if prop not in prop_order]
# Sort the remaining properties
sorted_remaining_props = sorted(remaining_props)
# Combine the ordered properties and sorted remaining properties
sorted_props = ordered_props + sorted_remaining_props
lines = [f"!!{class_name}.define"]
for prop in sorted_props:
if prop in self.__fields__:
val = getattr(self, prop)
if isinstance(val, str):
if "\n" in val:
val = format_multiline_text(text=val)
elif any(c.isspace() for c in val):
val = f"'{val}'"
lines.append(f" {prop}:{val}")
result = "\n".join(lines)
return HeroAction(content=result)
@classmethod
def from_heroscript(cls, heroscript: str):
lines = heroscript.strip().split("\n")
class_name = lines[0].split("!!")[1].split(".")[0]
props = {}
multiline_prop = None
multiline_value = List(str)
for line in lines[1:]:
if multiline_prop:
if line.strip() == "'":
# End of multiline text
min_indent = min(len(ml) - len(ml.lstrip()) for ml in multiline_value if ml.strip())
unindented_lines = [ml[min_indent:] for ml in multiline_value]
props[multiline_prop] = "\n".join(unindented_lines)
multiline_prop = None
multiline_value = []
else:
multiline_value.append(line)
else:
if ":" in line:
prop, value = line.split(":", 1)
prop = prop.strip()
value = value.strip()
if value == "'":
# Start of multiline text
multiline_prop = prop
else:
if value.startswith("'") and value.endswith("'"):
value = value[1:-1]
props[prop] = value
return cls(**props)

View File

@@ -0,0 +1,4 @@
## heroscript
> not to be used yet

View File

@@ -0,0 +1,145 @@
from typing import List
import os
from colorama import Fore, Style
from herotools.texttools import dedent
import textwrap
#load the heroscripts from filesystem
def heroscript_blocks(path: str) -> List[str]:
heroscript_blocks = list()
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith(".md"):
file_path = os.path.join(root, file)
with open(file_path, "r") as f:
content = f.read()
blocks = _extract_heroscript_blocks(content)
heroscript_blocks.extend(blocks)
return heroscript_blocks
def _extract_heroscript_blocks(content: str):
content=dedent(content)
blocks = []
lines = content.split("\n")
in_block = False
block_lines : List[str] = list()
for line in lines:
if line.startswith("```hero"):
in_block = True
block_lines = []
elif line.startswith("```") and in_block:
in_block = False
block = "\n".join(block_lines)
blocks.append(block)
elif in_block:
block_lines.append(line)
return blocks
def action_blocks(path: str = "", content:str = "") -> List[str]:
if content!="":
return __action_blocks_get(content)
res : List[str] = list()
for hscript in heroscript_blocks(path):
for actionscript in __action_blocks_get(hscript):
res.append(actionscript)
return res
def __action_blocks_get(content: str) -> List[str]:
content=dedent(content)
blocks = list()
lines = content.split("\n")
block_lines : List[str] = list()
herofound=False
for line in lines:
# print(line)
if line.startswith("!!"):
herofound=True
if block_lines: #means we found before
block = "\n".join(block_lines)
blocks.append(block)
block_lines = []
# print("f1")
block_lines.append(line)
elif line.strip() and not line.startswith(" ") and not line.startswith("\t") and block_lines:
block = "\n".join(block_lines)
blocks.append(block)
block_lines = []
herofound=False
elif herofound:
block_lines.append(line)
# print("append")
if block_lines:
block = "\n".join(block_lines)
blocks.append(block)
return blocks
def myprint(obj):
class_name = f"{Fore.YELLOW}{obj.__class__.__name__}{Style.RESET_ALL}"
fields = [field for field in obj.__fields__ if field in obj.__dict__]
attributes = ', '.join(f"{Fore.LIGHTBLACK_EX}{field}{Style.RESET_ALL}={Fore.GREEN}'{getattr(obj, field)}'{Style.RESET_ALL}" for field in fields)
print( f"{class_name}({attributes})" )
#format text to be ready to be set in heroscript
def format_multiline_text(text: str) -> str:
text = dedent(text)
text = textwrap.indent(text, " ")
# Join the formatted lines with newline characters and add the required indentation
formatted_text = "'\n" + text + "\n '"
return formatted_text
#representation with colors of heroscript
def heroscript_repr(content:str) ->str:
lines = content.split("\n")
formatted_lines = []
for line in lines:
if line.startswith("!!"):
formatted_line = f"{Fore.RED}{line}{Style.RESET_ALL}"
elif ":" in line:
prop, value = line.split(":", 1)
prop = prop.strip()
value = value.strip()
if value.startswith("'") and value.endswith("'"):
value = f" {Fore.GREEN}{value}{Style.RESET_ALL}"
else:
value = f" {Fore.YELLOW}{value}{Style.RESET_ALL}"
formatted_line = f" {Fore.CYAN}{prop}{Style.RESET_ALL}:{value}"
else:
formatted_line = line
formatted_lines.append(formatted_line)
return "\n".join(formatted_lines)
def heroscript_print(content:str):
o=heroscript_repr(content)
print(o)
if __name__ == "__main__":
t=" something\n a\n\n bbbb"
print(dedent(t))
print(format_multiline_text(t))

View File

View File

@@ -0,0 +1,9 @@
from herolib.core.pathlib.pathlib import get_dir
from herolib.core.logger.model import Logger
def new(path: str) -> Logger:
p = get_dir(path=path, create=True)
return Logger(
path=p,
lastlog_time=0
)

View File

@@ -0,0 +1,3 @@
# This file is now empty as the log function has been moved to model.py
# It can be removed or kept as a placeholder if needed for future extensions.
# For now, we will keep it empty.

View File

@@ -0,0 +1,150 @@
import unittest
import os
import shutil
from lib.core.logger.factory import new
from lib.core.logger.model import LogItemArgs, LogType, Logger # Import Logger class
from lib.data.ourtime.ourtime import new as ourtime_new, now as ourtime_now
from lib.core.pathlib.pathlib import get_file, ls, rmdir_all
class TestLogger(unittest.TestCase):
def setUp(self):
# Corresponds to testsuite_begin()
if os.path.exists('/tmp/testlogs'):
rmdir_all('/tmp/testlogs')
def tearDown(self):
# Corresponds to testsuite_end()
# if os.path.exists('/tmp/testlogs'):
# rmdir_all('/tmp/testlogs')
pass
def test_logger_functionality(self):
logger = new('/tmp/testlogs')
# Test stdout logging
logger.log(LogItemArgs(
cat='test-app',
log='This is a test message\nWith a second line\nAnd a third line',
logtype=LogType.STDOUT,
timestamp=ourtime_new('2022-12-05 20:14:35')
))
# Test error logging
logger.log(LogItemArgs(
cat='error-test',
log='This is an error\nWith details',
logtype=LogType.ERROR,
timestamp=ourtime_new('2022-12-05 20:14:35')
))
logger.log(LogItemArgs(
cat='test-app',
log='This is a test message\nWith a second line\nAnd a third line',
logtype=LogType.STDOUT,
timestamp=ourtime_new('2022-12-05 20:14:36')
))
logger.log(LogItemArgs(
cat='error-test',
log='''
This is an error
With details
''',
logtype=LogType.ERROR,
timestamp=ourtime_new('2022-12-05 20:14:36')
))
logger.log(LogItemArgs(
cat='error-test',
log='''
aaa
bbb
''',
logtype=LogType.ERROR,
timestamp=ourtime_new('2022-12-05 22:14:36')
))
logger.log(LogItemArgs(
cat='error-test',
log='''
aaa2
bbb2
''',
logtype=LogType.ERROR,
timestamp=ourtime_new('2022-12-05 22:14:36')
))
# Verify log directory exists
self.assertTrue(os.path.exists('/tmp/testlogs'), 'Log directory should exist')
# Get log file
files = ls('/tmp/testlogs')
self.assertEqual(len(files), 2) # Expecting two files: 2022-12-05-20.log and 2022-12-05-22.log
# Test search functionality
items_stdout = logger.search(
timestamp_from=ourtime_new('2022-11-01 20:14:35'),
timestamp_to=ourtime_new('2025-11-01 20:14:35'),
logtype=LogType.STDOUT
)
self.assertEqual(len(items_stdout), 2)
items_error = logger.search(
timestamp_from=ourtime_new('2022-11-01 20:14:35'),
timestamp_to=ourtime_new('2025-11-01 20:14:35'),
logtype=LogType.ERROR
)
self.assertEqual(len(items_error), 4)
# Test specific log content
found_error_log = False
for item in items_error:
if "This is an error\nWith details" in item.log:
found_error_log = True
break
self.assertTrue(found_error_log, "Expected error log content not found")
found_stdout_log = False
for item in items_stdout:
if "This is a test message\nWith a second line\nAnd a third line" in item.log:
found_stdout_log = True
break
self.assertTrue(found_stdout_log, "Expected stdout log content not found")
# Test search by category
items_test_app = logger.search(
timestamp_from=ourtime_new('2022-11-01 20:14:35'),
timestamp_to=ourtime_new('2025-11-01 20:14:35'),
cat='test-app'
)
self.assertEqual(len(items_test_app), 2)
items_error_test = logger.search(
timestamp_from=ourtime_new('2022-11-01 20:14:35'),
timestamp_to=ourtime_new('2025-11-01 20:14:35'),
cat='error-test'
)
self.assertEqual(len(items_error_test), 4)
# Test search by log content
items_with_aaa = logger.search(
timestamp_from=ourtime_new('2022-11-01 20:14:35'),
timestamp_to=ourtime_new('2025-11-01 20:14:35'),
log='aaa'
)
self.assertEqual(len(items_with_aaa), 2)
# Test search with timestamp range
items_specific_time = logger.search(
timestamp_from=ourtime_new('2022-12-05 22:00:00'),
timestamp_to=ourtime_new('2022-12-05 23:00:00'),
logtype=LogType.ERROR
)
self.assertEqual(len(items_specific_time), 2)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,72 @@
from enum import Enum
from typing import Optional
from herolib.data.ourtime.ourtime import OurTime
from herolib.core.pathlib.pathlib import Path
class LogType(Enum):
STDOUT = "stdout"
ERROR = "error"
class LogItemArgs:
def __init__(self, cat: str, log: str, logtype: LogType, timestamp: Optional[OurTime] = None):
self.timestamp = timestamp
self.cat = cat
self.log = log
self.logtype = logtype
import os
from herolib.core.texttools.texttools import name_fix, expand, dedent
from herolib.data.ourtime.ourtime import OurTime, now as ourtime_now
class Logger:
def __init__(self, path: Path, lastlog_time: int = 0):
self.path = path
self.lastlog_time = lastlog_time
def log(self, args_: LogItemArgs):
args = args_
t = args.timestamp if args.timestamp else ourtime_now()
# Format category (max 10 chars, ascii only)
args.cat = name_fix(args.cat)
if len(args.cat) > 10:
raise ValueError('category cannot be longer than 10 chars')
args.cat = expand(args.cat, 10, ' ')
args.log = dedent(args.log).strip()
logfile_path = os.path.join(self.path.path, f"{t.dayhour()}.log")
# Create log file if it doesn't exist
if not os.path.exists(logfile_path):
with open(logfile_path, 'w') as f:
pass # Create empty file
self.lastlog_time = 0 # make sure we put time again
with open(logfile_path, 'a') as f:
content = ''
# Add timestamp if we're in a new second
if t.unix() > self.lastlog_time:
content += f"\n{t.time().format_ss()}\n"
self.lastlog_time = t.unix()
# Format log lines
error_prefix = 'E' if args.logtype == LogType.ERROR else ' '
lines = args.log.split('\n')
for i, line in enumerate(lines):
if i == 0:
content += f"{error_prefix} {args.cat} - {line}\n"
else:
content += f"{error_prefix} {line}\n"
f.write(content.rstrip()) # Use rstrip to remove trailing whitespace
f.write('\n') # Add a newline after each log entry for consistency
class LogItem:
def __init__(self, timestamp: OurTime, cat: str, log: str, logtype: LogType):
self.timestamp = timestamp
self.cat = cat
self.log = log
self.logtype = logtype

View File

@@ -0,0 +1,137 @@
import os
from typing import Optional, List
from herolib.core.texttools.texttools import name_fix
from herolib.data.ourtime.ourtime import OurTime, new as ourtime_new
from herolib.core.logger.model import Logger, LogItem, LogType
class SearchArgs:
def __init__(self, timestamp_from: Optional[OurTime] = None,
timestamp_to: Optional[OurTime] = None,
cat: str = "", log: str = "", logtype: Optional[LogType] = None,
maxitems: int = 10000):
self.timestamp_from = timestamp_from
self.timestamp_to = timestamp_to
self.cat = cat
self.log = log
self.logtype = logtype
self.maxitems = maxitems
def process(result: List[LogItem], current_item: LogItem, current_time: OurTime,
args: SearchArgs, from_time: int, to_time: int):
# Add previous item if it matches filters
log_epoch = current_item.timestamp.unix()
if log_epoch < from_time or log_epoch > to_time:
return
cat_match = (args.cat == '' or current_item.cat.strip() == args.cat)
log_match = (args.log == '' or args.log.lower() in current_item.log.lower())
logtype_match = (args.logtype is None or current_item.logtype == args.logtype)
if cat_match and log_match and logtype_match:
result.append(current_item)
def search(l: Logger, args_: SearchArgs) -> List[LogItem]:
args = args_
# Format category (max 10 chars, ascii only)
args.cat = name_fix(args.cat)
if len(args.cat) > 10:
raise ValueError('category cannot be longer than 10 chars')
timestamp_from = args.timestamp_from if args.timestamp_from else OurTime()
timestamp_to = args.timestamp_to if args.timestamp_to else OurTime()
# Get time range
from_time = timestamp_from.unix()
to_time = timestamp_to.unix()
if from_time > to_time:
raise ValueError(f'from_time cannot be after to_time: {from_time} < {to_time}')
result: List[LogItem] = []
# Find log files in time range
files = sorted(os.listdir(l.path.path))
for file in files:
if not file.endswith('.log'):
continue
# Parse dayhour from filename
dayhour = file[:-4] # remove .log
try:
file_time = ourtime_new(dayhour)
except ValueError:
continue # Skip if filename is not a valid time format
current_time = OurTime()
current_item = LogItem(OurTime(), "", "", LogType.STDOUT) # Initialize with dummy values
collecting = False
# Skip if file is outside time range
if file_time.unix() < from_time or file_time.unix() > to_time:
continue
# Read and parse log file
content = ""
try:
with open(os.path.join(l.path.path, file), 'r') as f:
content = f.read()
except FileNotFoundError:
continue
lines = content.split('\n')
for line in lines:
if len(result) >= args.maxitems:
return result
line_trim = line.strip()
if not line_trim:
continue
# Check if this is a timestamp line
if not (line.startswith(' ') or line.startswith('E')):
try:
current_time = ourtime_new(line_trim)
except ValueError:
continue # Skip if not a valid timestamp line
if collecting:
process(result, current_item, current_time, args, from_time, to_time)
collecting = False
continue
if collecting and len(line) > 14 and line[13] == '-':
process(result, current_item, current_time, args, from_time, to_time)
collecting = False
# Parse log line
is_error = line.startswith('E')
if not collecting:
# Start new item
cat_start = 2
cat_end = 12
log_start = 15
if len(line) < log_start:
continue # Line too short to contain log content
current_item = LogItem(
timestamp=current_time,
cat=line[cat_start:cat_end].strip(),
log=line[log_start:].strip(),
logtype=LogType.ERROR if is_error else LogType.STDOUT
)
collecting = True
else:
# Continuation line
if len(line_trim) < 16: # Check for minimum length for continuation line
current_item.log += '\n' + line_trim
else:
current_item.log += '\n' + line[15:].strip() # Use strip for continuation lines
# Add last item if collecting
if collecting:
process(result, current_item, current_time, args, from_time, to_time)
return result

View File

View File

@@ -0,0 +1,214 @@
from peewee import *
import time
from datetime import datetime
from typing import Optional, List, Dict, Any, Iterable, Union
import os
import logging
import traceback
# Configure database path
DB_DIR = os.path.expanduser('~/hero/var/logdb/')
DB_FILE = os.path.join(DB_DIR, 'logs.db')
# Create directory if it doesn't exist
os.makedirs(DB_DIR, exist_ok=True)
# Initialize database
database = SqliteDatabase(DB_FILE, pragmas={'journal_mode': 'wal'})
class BaseModel(Model):
"""Base model class for Peewee."""
class Meta:
database = database
def to_dict(self) -> Dict[str, Any]:
"""Convert model instance to dictionary."""
data = {}
for field_name in self._meta.fields:
field_value = getattr(self, field_name)
if field_name in ('time', 'last_seen') and isinstance(field_value, int):
# Convert epoch to a readable format for the frontend
data[field_name] = datetime.fromtimestamp(field_value).strftime('%d-%m %H:%M')
else:
data[field_name] = field_value
return data
class Log(BaseModel):
"""Model for INFO logs."""
time = IntegerField(default=lambda: int(time.time()), index=True)
email = CharField(max_length=255, null=True)
logmsg = TextField()
level = IntegerField(default=100)
cat = CharField(max_length=100, index=True, default="general")
payload = TextField(null=True)
payload_cat = CharField(max_length=100, null=True)
class Meta:
table_name = 'logs'
class Error(BaseModel):
"""Model for ERROR logs."""
time = IntegerField(default=lambda: int(time.time()), index=True)
last_seen = IntegerField(default=lambda: int(time.time()), index=True)
email = CharField(max_length=255, null=True)
logmsg = TextField()
stacktrace = TextField(null=True)
count = IntegerField(default=1)
cat = CharField(max_length=100, index=True, default="general")
payload = TextField(null=True)
payload_cat = CharField(max_length=100, null=True)
class Meta:
table_name = 'errors'
def init_db_logging():
"""Create tables if they don't exist."""
with database:
database.create_tables([Log, Error], safe=True)
class DatabaseLogHandler(logging.Handler):
"""A logging handler that writes logs to the Peewee database."""
def emit(self, record):
stacktrace = None
if record.exc_info:
stacktrace = logging.Formatter().formatException(record.exc_info)
if record.levelno >= logging.ERROR:
log_error(
msg=record.getMessage(),
cat=record.name,
stacktrace=stacktrace
)
else:
log_info(
msg=record.getMessage(),
level=record.levelno,
cat=record.name
)
def log_error(msg: str, cat: str = "general", email: Optional[str] = None, stacktrace: Optional[str] = None, payload: Optional[str] = None, payload_cat: Optional[str] = None):
"""Log an ERROR message to the database, handling duplicates."""
try:
log_info(msg=msg, cat=cat, email=email, payload=payload, payload_cat=payload_cat)
except Exception as e:
pass
try:
if not stacktrace:
# Capture the current stack trace if not provided
stacktrace = "".join(traceback.format_stack())
# Filter out irrelevant lines from the stack trace
if stacktrace:
lines = stacktrace.split('\n')
filtered_lines = [
line for line in lines
if 'python3.13/logging' not in line and 'src/mylogging.py' not in line
]
stacktrace = '\n'.join(filtered_lines)
one_day_ago = int(time.time()) - (24 * 3600)
# Look for a similar error in the last 24 hours from the same user
existing_error = Error.select().where(
(Error.logmsg == msg) &
(Error.email == email) &
(Error.last_seen >= one_day_ago)
).first()
if existing_error:
# If found, increment counter and update last_seen
existing_error.count += 1
existing_error.last_seen = int(time.time())
existing_error.stacktrace = stacktrace
existing_error.save()
print(existing_error)
else:
# Otherwise, create a new error record
Error.create(
logmsg=msg,
cat=cat,
email=email,
stacktrace=stacktrace,
payload=payload,
payload_cat=payload_cat
)
logging.info(f"Successfully logged new error: {msg}")
except Exception as e:
logging.error(f"Failed to log error to {DB_FILE}: {e}")
def log_info(msg: str, level: int = 0, cat: str = "general", email: Optional[str] = None, payload: Optional[str] = None, payload_cat: Optional[str] = None):
"""Log an INFO message to the database."""
try:
Log.create(logmsg=msg, level=level, cat=cat, email=email, payload=payload, payload_cat=payload_cat)
except Exception as e:
print(f"Failed to log info to {DB_FILE}: {e}")
def get_errors(search: Optional[str] = None, cat: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get errors from the database with optional filters. Category search is prefix-based."""
query = Error.select().order_by(Error.last_seen.desc())
if search:
query = query.where(Error.logmsg.contains(search))
if cat and cat.strip():
query = query.where(Error.cat.startswith(cat.strip()))
return [e.to_dict() for e in query]
def get_logs(
search: Optional[str] = None,
cat: Optional[str] = None,
level: Optional[int] = None,
hours_ago: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""Get logs from the database with optional filters. Category search is prefix-based."""
query = Log.select().order_by(Log.time.desc())
if search and search.strip():
query = query.where(Log.logmsg.contains(search))
if cat and cat.strip():
query = query.where(Log.cat.startswith(cat.strip()))
if level is not None:
query = query.where(Log.level <= level)
if hours_ago is not None:
time_ago = int(time.time()) - (hours_ago * 3600)
query = query.where(Log.time >= time_ago)
return [l.to_dict() for l in query]
def get_log_by_id(log_id: int) -> Optional[Dict[str, Any]]:
"""Get a single log by its ID."""
try:
log = Log.get_by_id(log_id)
return log.to_dict()
except Log.DoesNotExist:
return None
def delete_logs_older_than(minutes: int):
"""Delete logs older than a specified number of minutes."""
time_ago = int(time.time()) - (minutes * 60)
Log.delete().where(Log.time < time_ago).execute()
def delete_errors_older_than(minutes: int):
"""Delete errors older than a specified number of minutes."""
time_ago = int(time.time()) - (minutes * 60)
Error.delete().where(Error.time < time_ago).execute()
def get_unique_log_categories() -> List[str]:
"""Get unique log categories from the database."""
query = (Log
.select(Log.cat)
.where(Log.cat.is_null(False))
.distinct()
.order_by(Log.cat))
return [l.cat for l in query]
def get_unique_error_categories() -> List[str]:
"""Get unique error categories from the database."""
query = (Error
.select(Error.cat)
.where(Error.cat.is_null(False))
.distinct()
.order_by(Error.cat))
return [e.cat for e in query]

View File

View File

@@ -0,0 +1,80 @@
import os
class Path:
def __init__(self, path: str):
self.path = os.path.expanduser(path)
def exists(self) -> bool:
return os.path.exists(self.path)
def is_file(self) -> bool:
return os.path.isfile(self.path)
def is_dir(self) -> bool:
return os.path.isdir(self.path)
def read(self) -> str:
with open(self.path, 'r') as f:
return f.read()
def write(self, content: str):
os.makedirs(os.path.dirname(self.path), exist_ok=True)
with open(self.path, 'w') as f:
f.write(content)
def delete(self):
if self.is_file():
os.remove(self.path)
elif self.is_dir():
os.rmdir(self.path)
def list(self, recursive: bool = False, regex: list = None) -> list[str]:
files = []
if self.is_dir():
if recursive:
for root, _, filenames in os.walk(self.path):
for filename in filenames:
full_path = os.path.join(root, filename)
relative_path = os.path.relpath(full_path, self.path)
if regex:
import re
if any(re.match(r, relative_path) for r in regex):
files.append(relative_path)
else:
files.append(relative_path)
else:
for entry in os.listdir(self.path):
full_path = os.path.join(self.path, entry)
if os.path.isfile(full_path):
if regex:
import re
if any(re.match(r, entry) for r in regex):
files.append(entry)
else:
files.append(entry)
return files
def get(path: str) -> Path:
return Path(path)
def get_dir(path: str, create: bool = False) -> Path:
p = Path(path)
if create and not p.exists():
os.makedirs(p.path, exist_ok=True)
return p
def get_file(path: str, create: bool = False) -> Path:
p = Path(path)
if create and not p.exists():
os.makedirs(os.path.dirname(p.path), exist_ok=True)
with open(p.path, 'w') as f:
pass # Create empty file
return p
def rmdir_all(path: str):
if os.path.exists(path):
import shutil
shutil.rmtree(path)
def ls(path: str) -> list[str]:
return os.listdir(path)

View File

View File

@@ -0,0 +1,142 @@
import re
def name_fix(name: str) -> str:
# VLang's name_fix converts '-' to '_' and cleans up special chars.
# Python's re.sub can handle this.
name = re.sub(r'[^a-zA-Z0-9_ ]', '', name.replace('-', '_'))
return name.strip()
def expand(txt: str, length: int, expand_with: str) -> str:
# Pads the string to the specified length.
return txt.ljust(length, expand_with)
def dedent(text: str) -> str:
# Removes common leading whitespace from every line.
# This is a simplified version of textwrap.dedent
lines = text.splitlines()
if not lines:
return ""
# Find the minimum indentation of non-empty lines
min_indent = float('inf')
for line in lines:
if line.strip():
indent = len(line) - len(line.lstrip())
min_indent = min(min_indent, indent)
if min_indent == float('inf'): # All lines are empty or just whitespace
return "\n".join([line.strip() for line in lines])
dedented_lines = [line[min_indent:] for line in lines]
return "\n".join(dedented_lines)
def remove_empty_lines(text: str) -> str:
lines = text.splitlines()
return "\n".join([line for line in lines if line.strip()])
def remove_double_lines(text: str) -> str:
lines = text.splitlines()
cleaned_lines = []
prev_empty = False
for line in lines:
is_empty = not line.strip()
if is_empty and prev_empty:
continue
cleaned_lines.append(line)
prev_empty = is_empty
return "\n".join(cleaned_lines)
def ascii_clean(r: str) -> str:
return r.encode('ascii', 'ignore').decode('ascii')
def name_clean(r: str) -> str:
return re.sub(r'[^a-zA-Z0-9]', '', r)
def name_fix_keepspace(name_: str) -> str:
# Similar to name_fix but keeps spaces.
return re.sub(r'[^a-zA-Z0-9 ]', '', name_.replace('-', '_')).strip()
def name_fix_no_ext(name_: str) -> str:
return os.path.splitext(name_)[0]
def name_fix_snake_to_pascal(name: str) -> str:
return ''.join(word.capitalize() for word in name.split('_'))
def snake_case(name: str) -> str:
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
def name_split(name: str) -> tuple[str, str]:
parts = name.split('.')
if len(parts) > 1:
return parts[0], '.'.join(parts[1:])
return name, ""
def cmd_line_args_parser(text: str) -> list[str]:
# A simple parser, might need more robust solution for complex cases
import shlex
return shlex.split(text)
def text_remove_quotes(text: str) -> str:
return re.sub(r'["\'].*?["\']', '', text)
def check_exists_outside_quotes(text: str, items: list[str]) -> bool:
# This is a simplified implementation. A full implementation would require
# more complex parsing to correctly identify text outside quotes.
cleaned_text = text_remove_quotes(text)
for item in items:
if item in cleaned_text:
return True
return False
def is_int(text: str) -> bool:
return text.isdigit()
def is_upper_text(text: str) -> bool:
return text.isupper() and text.isalpha()
def multiline_to_single(text: str) -> str:
return text.replace('\n', '\\n').replace('\r', '')
def split_smart(t: str, delimiter_: str) -> list[str]:
# This is a placeholder, a smart split would need to handle quotes and escapes
return t.split(delimiter_)
def version(text_: str) -> int:
# Converts version strings like "v0.4.36" to 4036 or "v1.4.36" to 1004036
match = re.match(r'v?(\d+)\.(\d+)\.(\d+)', text_)
if match:
major, minor, patch = int(match.group(1)), int(match.group(2)), int(match.group(3))
if major == 0:
return minor * 100 + patch
else:
return major * 1000000 + minor * 100 + patch
return 0
def format_rfc1123(dt: datetime) -> str:
return dt.strftime('%a, %d %b %Y %H:%M:%S GMT')
def to_array(r: str) -> list[str]:
if ',' in r:
return [item.strip() for item in r.split(',')]
return [item.strip() for item in r.splitlines() if item.strip()]
def to_array_int(r: str) -> list[int]:
return [int(item) for item in to_array(r) if item.isdigit()]
def to_map(mapstring: str, line: str, delimiter_: str = ' ') -> dict[str, str]:
# This is a simplified implementation. The VLang version is more complex.
# It assumes a space delimiter for now.
keys = [k.strip() for k in mapstring.split(',')]
values = line.split(delimiter_)
result = {}
val_idx = 0
for key in keys:
if key == '-':
val_idx += 1
continue
if val_idx < len(values):
result[key] = values[val_idx]
val_idx += 1
return result