import os from typing import Optional, List from herolib.core.texttools.texttools import name_fix from herolib.data.ourtime.ourtime import OurTime, new as ourtime_new from herolib.core.logger.model import Logger, LogItem, LogType class SearchArgs: def __init__(self, timestamp_from: Optional[OurTime] = None, timestamp_to: Optional[OurTime] = None, cat: str = "", log: str = "", logtype: Optional[LogType] = None, maxitems: int = 10000): self.timestamp_from = timestamp_from self.timestamp_to = timestamp_to self.cat = cat self.log = log self.logtype = logtype self.maxitems = maxitems def search(l: Logger, args_: SearchArgs) -> List[LogItem]: args = args_ args.cat = name_fix(args.cat) if len(args.cat) > 10: raise ValueError('category cannot be longer than 10 chars') from_time = args.timestamp_from.unix() if args.timestamp_from else 0 to_time = args.timestamp_to.unix() if args.timestamp_to else ourtime_new('2100-01-01').unix() if from_time > to_time: raise ValueError(f'from_time cannot be after to_time: {from_time} > {to_time}') result: List[LogItem] = [] if not os.path.exists(l.path.path): return [] files = sorted(os.listdir(l.path.path)) for file in files: if not file.endswith('.log'): continue dayhour = file[:-4] try: file_time = ourtime_new(dayhour) except ValueError: continue file_hour_start_unix = file_time.unix() file_hour_end_unix = file_hour_start_unix + 3599 if file_hour_end_unix < from_time or file_hour_start_unix > to_time: continue try: with open(os.path.join(l.path.path, file), 'r') as f: content = f.read() except FileNotFoundError: continue current_time = None current_item = None for line in content.splitlines(): if len(result) >= args.maxitems: break if not line.strip() and current_item: if from_time <= current_item.timestamp.unix() <= to_time: if (not args.cat or args.cat == current_item.cat) and \ (not args.log or args.log.lower() in current_item.log.lower()) and \ (args.logtype is None or args.logtype == current_item.logtype): result.append(current_item) current_item = None continue if not line.startswith(' ') and not line.startswith('E'): if current_item: if from_time <= current_item.timestamp.unix() <= to_time: if (not args.cat or args.cat == current_item.cat) and \ (not args.log or args.log.lower() in current_item.log.lower()) and \ (args.logtype is None or args.logtype == current_item.logtype): result.append(current_item) try: current_time = ourtime_new(f"{file_time.day()} {line.strip()}") current_item = None except ValueError: current_time = None current_item = None elif current_time: if line.startswith(' ') or line.startswith('E'): if len(line) > 14 and line[13] == '-': if current_item: if from_time <= current_item.timestamp.unix() <= to_time: if (not args.cat or args.cat == current_item.cat) and \ (not args.log or args.log.lower() in current_item.log.lower()) and \ (args.logtype is None or args.logtype == current_item.logtype): result.append(current_item) is_error = line.startswith('E') logtype = LogType.ERROR if is_error else LogType.STDOUT cat = line[2:12].strip() log_content = line[15:] current_item = LogItem(timestamp=current_time, cat=cat, log=log_content.strip(), logtype=logtype) elif current_item: current_item.log += "\n" + (line[15:] if len(line) >15 else line) if current_item: if from_time <= current_item.timestamp.unix() <= to_time: if (not args.cat or args.cat == current_item.cat) and \ (not args.log or args.log.lower() in current_item.log.lower()) and \ (args.logtype is None or args.logtype == current_item.logtype): result.append(current_item) return result