...
This commit is contained in:
@@ -1,64 +0,0 @@
|
||||
from typing import Dict, List, Optional
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
@dataclass
|
||||
class ProcessMetrics:
|
||||
"""Metrics for a running process and its children."""
|
||||
cpu_percent: float = 0.0
|
||||
memory_rss: int = 0 # Resident Set Size in bytes
|
||||
memory_vms: int = 0 # Virtual Memory Size in bytes
|
||||
memory_percent: float = 0.0
|
||||
num_threads: int = 0
|
||||
num_children: int = 0
|
||||
children_cpu_percent: float = 0.0
|
||||
children_memory_rss: int = 0
|
||||
last_updated: str = ""
|
||||
|
||||
@dataclass
|
||||
class TaskStatus:
|
||||
"""Status of an individual task (script)."""
|
||||
script_path: str
|
||||
script_name: str
|
||||
state: str = "PENDING" # PENDING, WAITING, RUNNING, DONE, ERROR, CRASHED, TIMED_OUT
|
||||
start_time: Optional[str] = None
|
||||
end_time: Optional[str] = None
|
||||
duration_seconds: float = 0.0
|
||||
exit_code: Optional[int] = None
|
||||
error_message: Optional[str] = None
|
||||
pane_id: Optional[str] = None
|
||||
process_metrics: ProcessMetrics = field(default_factory=ProcessMetrics)
|
||||
|
||||
@dataclass
|
||||
class DirectoryStatus:
|
||||
"""Status of a directory containing tasks."""
|
||||
directory_num: int
|
||||
directory_path: str
|
||||
state: str = "PENDING" # PENDING, RUNNING, DONE, ERROR, TIMED_OUT
|
||||
timeout: int = 600
|
||||
start_time: Optional[str] = None
|
||||
end_time: Optional[str] = None
|
||||
duration_seconds: float = 0.0
|
||||
tasks: List[TaskStatus] = field(default_factory=list)
|
||||
window_name: Optional[str] = None
|
||||
|
||||
@dataclass
|
||||
class DAGStructure:
|
||||
"""Complete DAG structure for the task run."""
|
||||
run_name: str
|
||||
run_id: str
|
||||
state: str = "INITIALIZING" # INITIALIZING, RUNNING, COMPLETED, FAILED
|
||||
start_time: str = ""
|
||||
end_time: Optional[str] = None
|
||||
duration_seconds: float = 0.0
|
||||
total_directories: int = 0
|
||||
completed_directories: int = 0
|
||||
failed_directories: int = 0
|
||||
directories: List[DirectoryStatus] = field(default_factory=list)
|
||||
last_updated: str = ""
|
||||
|
||||
class MetaData:
|
||||
"""Class to hold metadata for a task directory."""
|
||||
def __init__(self, timeout: int = 600): # Default timeout to 10 minutes (600 seconds)
|
||||
self.timeout = timeout
|
||||
# Add more attributes here in the future
|
||||
@@ -1,89 +0,0 @@
|
||||
import psutil
|
||||
from typing import List, Optional, Tuple
|
||||
from datetime import datetime
|
||||
from libtmux.pane import Pane
|
||||
from .model import ProcessMetrics
|
||||
|
||||
class ProcessMonitor:
|
||||
"""Monitor processes running in tmux panes using psutil."""
|
||||
|
||||
@staticmethod
|
||||
def get_pane_process_tree(pane: Pane) -> Tuple[Optional[psutil.Process], List[psutil.Process]]:
|
||||
"""Get the main process and all child processes for a tmux pane."""
|
||||
try:
|
||||
pane_pid = pane.pane_pid
|
||||
if pane_pid is None:
|
||||
return None, []
|
||||
|
||||
# Get the main process
|
||||
try:
|
||||
main_process = psutil.Process(int(pane_pid))
|
||||
except (psutil.NoSuchProcess, ValueError):
|
||||
return None, []
|
||||
|
||||
# Get all children recursively
|
||||
children = []
|
||||
try:
|
||||
children = main_process.children(recursive=True)
|
||||
except psutil.NoSuchProcess:
|
||||
pass
|
||||
|
||||
return main_process, children
|
||||
except Exception as e:
|
||||
print(f"Error getting process tree: {e}")
|
||||
return None, []
|
||||
|
||||
@staticmethod
|
||||
def get_process_metrics(pane: Pane) -> ProcessMetrics:
|
||||
"""Get CPU and memory metrics for all processes in a pane."""
|
||||
metrics = ProcessMetrics()
|
||||
metrics.last_updated = datetime.now().isoformat()
|
||||
|
||||
main_proc, children = ProcessMonitor.get_pane_process_tree(pane)
|
||||
|
||||
if main_proc is None:
|
||||
return metrics
|
||||
|
||||
try:
|
||||
# Get main process metrics
|
||||
if main_proc.is_running():
|
||||
metrics.cpu_percent = main_proc.cpu_percent(interval=0.1)
|
||||
mem_info = main_proc.memory_info()
|
||||
metrics.memory_rss = mem_info.rss
|
||||
metrics.memory_vms = mem_info.vms
|
||||
metrics.memory_percent = main_proc.memory_percent()
|
||||
metrics.num_threads = main_proc.num_threads()
|
||||
|
||||
# Get children metrics
|
||||
metrics.num_children = len(children)
|
||||
for child in children:
|
||||
try:
|
||||
if child.is_running():
|
||||
metrics.children_cpu_percent += child.cpu_percent(interval=0.1)
|
||||
child_mem = child.memory_info()
|
||||
metrics.children_memory_rss += child_mem.rss
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
|
||||
print(f"Error getting process metrics: {e}")
|
||||
|
||||
return metrics
|
||||
|
||||
@staticmethod
|
||||
def is_process_running_command(pane: Pane, command_pattern: str) -> bool:
|
||||
"""Check if a specific command is running in the pane."""
|
||||
main_proc, children = ProcessMonitor.get_pane_process_tree(pane)
|
||||
|
||||
all_processes = [main_proc] + children if main_proc else children
|
||||
|
||||
for proc in all_processes:
|
||||
try:
|
||||
if proc and proc.is_running():
|
||||
cmdline = " ".join(proc.cmdline())
|
||||
if command_pattern in cmdline:
|
||||
return True
|
||||
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
||||
pass
|
||||
|
||||
return False
|
||||
@@ -1,559 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
import toml
|
||||
import libtmux
|
||||
from libtmux.pane import Pane
|
||||
from libtmux.window import Window
|
||||
from libtmux.session import Session
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from dataclasses import asdict
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
import threading
|
||||
|
||||
from .model import DAGStructure, DirectoryStatus, TaskStatus, MetaData
|
||||
from .process_monitor import ProcessMonitor
|
||||
|
||||
# Configuration
|
||||
WAITING_MESSAGE = "WAITING FOR JOBS"
|
||||
HPY_SH_PATH = "/root/heromonkey/functions/hpy.sh" # Path to hpy.sh
|
||||
|
||||
|
||||
class TaskRunner:
|
||||
def __init__(self, tasks_root_dir: str):
|
||||
self.tasks_root_dir = tasks_root_dir
|
||||
self.run_name = os.path.basename(os.path.abspath(tasks_root_dir)) # Derive run_name
|
||||
self.session = self._get_current_tmux_session()
|
||||
self.all_tasks_with_meta = self._get_sorted_tasks_with_meta(tasks_root_dir)
|
||||
self.window_panes = {} # {window_idx: [pane1, pane2, ...]}
|
||||
self.run_id = str(uuid.uuid4())
|
||||
self.dag = self._initialize_dag()
|
||||
self.dag_file_path = Path(tasks_root_dir) / ".dag.toml"
|
||||
self.process_monitor = ProcessMonitor()
|
||||
self._save_dag()
|
||||
|
||||
def _initialize_dag(self) -> DAGStructure:
|
||||
"""Initialize the DAG structure."""
|
||||
dag = DAGStructure(
|
||||
run_name=self.run_name,
|
||||
run_id=self.run_id,
|
||||
state="INITIALIZING",
|
||||
start_time=datetime.now().isoformat(),
|
||||
total_directories=len(self.all_tasks_with_meta)
|
||||
)
|
||||
|
||||
# Create directory entries
|
||||
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
|
||||
dir_status = DirectoryStatus(
|
||||
directory_num=dir_num,
|
||||
directory_path=os.path.dirname(scripts[0]) if scripts else "",
|
||||
timeout=metadata.timeout,
|
||||
window_name=f"{self.run_name}_{dir_num}"
|
||||
)
|
||||
|
||||
# Create task entries
|
||||
for script_path in scripts:
|
||||
task = TaskStatus(
|
||||
script_path=script_path,
|
||||
script_name=os.path.basename(script_path)
|
||||
)
|
||||
dir_status.tasks.append(task)
|
||||
|
||||
dag.directories.append(dir_status)
|
||||
|
||||
return dag
|
||||
|
||||
def _save_dag(self):
|
||||
"""Save the DAG structure to a TOML file."""
|
||||
try:
|
||||
dag_dict = asdict(self.dag)
|
||||
with open(self.dag_file_path, 'w') as f:
|
||||
toml.dump(dag_dict, f)
|
||||
except Exception as e:
|
||||
print(f"Error saving DAG: {e}")
|
||||
|
||||
def _update_task_state(self, dir_idx: int, task_idx: int,
|
||||
state: str, error_message: Optional[str] = None):
|
||||
"""Update task state and save DAG."""
|
||||
task = self.dag.directories[dir_idx].tasks[task_idx]
|
||||
old_state = task.state
|
||||
task.state = state
|
||||
|
||||
if state == "RUNNING" and old_state != "RUNNING":
|
||||
task.start_time = datetime.now().isoformat()
|
||||
elif state in ["DONE", "ERROR", "CRASHED", "TIMED_OUT"]:
|
||||
task.end_time = datetime.now().isoformat()
|
||||
if task.start_time:
|
||||
start = datetime.fromisoformat(task.start_time)
|
||||
end = datetime.fromisoformat(task.end_time)
|
||||
task.duration_seconds = (end - start).total_seconds()
|
||||
|
||||
if error_message:
|
||||
task.error_message = error_message
|
||||
|
||||
self.dag.last_updated = datetime.now().isoformat()
|
||||
self._save_dag()
|
||||
|
||||
def _update_directory_state(self, dir_idx: int, state: str):
|
||||
"""Update directory state and save DAG."""
|
||||
directory = self.dag.directories[dir_idx]
|
||||
old_state = directory.state
|
||||
directory.state = state
|
||||
|
||||
if state == "RUNNING" and old_state != "RUNNING":
|
||||
directory.start_time = datetime.now().isoformat()
|
||||
elif state in ["DONE", "ERROR", "TIMED_OUT"]:
|
||||
directory.end_time = datetime.now().isoformat()
|
||||
if directory.start_time:
|
||||
start = datetime.fromisoformat(directory.start_time)
|
||||
end = datetime.fromisoformat(directory.end_time)
|
||||
directory.duration_seconds = (end - start).total_seconds()
|
||||
|
||||
if state == "DONE":
|
||||
self.dag.completed_directories += 1
|
||||
else:
|
||||
self.dag.failed_directories += 1
|
||||
|
||||
self._save_dag()
|
||||
|
||||
def _check_task_status(self, script_path: str, pane: Pane) -> Tuple[str, Optional[str]]:
|
||||
"""
|
||||
Comprehensive task status checking.
|
||||
Returns: (state, error_message)
|
||||
"""
|
||||
script_basename = os.path.basename(script_path)
|
||||
done_file = f"{script_path}.done"
|
||||
error_file = f"{script_path}.error"
|
||||
ok_file = f"{script_path}.ok"
|
||||
|
||||
# Check file markers
|
||||
if os.path.exists(done_file) or os.path.exists(ok_file):
|
||||
# Create .ok file if it doesn't exist
|
||||
if not os.path.exists(ok_file):
|
||||
Path(ok_file).touch()
|
||||
return "DONE", None
|
||||
|
||||
if os.path.exists(error_file):
|
||||
error_msg = None
|
||||
try:
|
||||
with open(error_file, 'r') as f:
|
||||
error_msg = f.read().strip()
|
||||
except:
|
||||
error_msg = "Unknown error"
|
||||
return "ERROR", error_msg
|
||||
|
||||
# Check if hpy command is running
|
||||
if self.process_monitor.is_process_running_command(pane, f"hpy {script_basename}"):
|
||||
return "RUNNING", None
|
||||
|
||||
# Check if pane has any running process
|
||||
if self._is_pane_running(pane):
|
||||
# Might be setting up or running something else
|
||||
return "RUNNING", None
|
||||
|
||||
# If we get here, the process finished without markers
|
||||
# This is likely a crash
|
||||
error_msg = f"Process terminated without completion marker"
|
||||
# Create error file
|
||||
with open(error_file, 'w') as f:
|
||||
f.write(error_msg)
|
||||
return "CRASHED", error_msg
|
||||
|
||||
def _monitor_directory_tasks(self, dir_idx: int, timeout: int) -> bool:
|
||||
"""
|
||||
Monitor tasks in a directory with comprehensive status checking.
|
||||
Returns: True if all tasks completed successfully, False otherwise.
|
||||
"""
|
||||
directory = self.dag.directories[dir_idx]
|
||||
scripts, metadata = self.all_tasks_with_meta[directory.directory_num]
|
||||
panes = self.window_panes[dir_idx]
|
||||
|
||||
self._update_directory_state(dir_idx, "RUNNING")
|
||||
|
||||
start_time = time.time()
|
||||
all_success = True
|
||||
|
||||
while True:
|
||||
all_finished = True
|
||||
has_errors = False
|
||||
|
||||
for task_idx, (script_path, pane) in enumerate(zip(scripts, panes)):
|
||||
task = directory.tasks[task_idx]
|
||||
|
||||
# Get process metrics if running
|
||||
if task.state == "RUNNING":
|
||||
metrics = self.process_monitor.get_process_metrics(pane)
|
||||
task.process_metrics = metrics
|
||||
|
||||
# Check task status
|
||||
new_state, error_msg = self._check_task_status(script_path, pane)
|
||||
|
||||
if new_state != task.state:
|
||||
self._update_task_state(dir_idx, task_idx, new_state, error_msg)
|
||||
print(f" Task {task.script_name}: {task.state}")
|
||||
|
||||
if new_state == "RUNNING":
|
||||
all_finished = False
|
||||
elif new_state in ["ERROR", "CRASHED", "TIMED_OUT"]:
|
||||
has_errors = True
|
||||
all_success = False
|
||||
|
||||
# Save DAG periodically
|
||||
self._save_dag()
|
||||
|
||||
if all_finished:
|
||||
if has_errors:
|
||||
self._update_directory_state(dir_idx, "ERROR")
|
||||
else:
|
||||
self._update_directory_state(dir_idx, "DONE")
|
||||
break
|
||||
|
||||
# Check timeout
|
||||
elapsed = time.time() - start_time
|
||||
if elapsed > timeout:
|
||||
print(f" Directory {directory.directory_num} timed out!")
|
||||
for task_idx, task in enumerate(directory.tasks):
|
||||
if task.state == "RUNNING":
|
||||
self._update_task_state(dir_idx, task_idx, "TIMED_OUT")
|
||||
panes[task_idx].send_keys("C-c", literal=True)
|
||||
self._update_directory_state(dir_idx, "TIMED_OUT")
|
||||
all_success = False
|
||||
break
|
||||
|
||||
time.sleep(2) # Check every 2 seconds
|
||||
|
||||
return all_success
|
||||
|
||||
# def run(self):
|
||||
# """Enhanced run method with DAG tracking."""
|
||||
# print(f"Starting enhanced task orchestration for '{self.run_name}'")
|
||||
# print(f"Run ID: {self.run_id}")
|
||||
# print(f"DAG file: {self.dag_file_path}")
|
||||
|
||||
# self.dag.state = "RUNNING"
|
||||
# self._save_dag()
|
||||
|
||||
# # Initialize windows and panes (similar to original)
|
||||
# self._setup_windows_and_panes()
|
||||
|
||||
# # Process directories sequentially
|
||||
# overall_success = True
|
||||
# for dir_idx in range(len(self.dag.directories)):
|
||||
# directory = self.dag.directories[dir_idx]
|
||||
# print(f"\n--- Processing Directory {directory.directory_num} ---")
|
||||
|
||||
# # Start tasks if not the first directory
|
||||
# if dir_idx > 0:
|
||||
# self._start_directory_tasks(dir_idx)
|
||||
|
||||
# # Monitor tasks
|
||||
# success = self._monitor_directory_tasks(
|
||||
# dir_idx,
|
||||
# directory.timeout
|
||||
# )
|
||||
|
||||
# if not success:
|
||||
# overall_success = False
|
||||
|
||||
# # Update final DAG state
|
||||
# self.dag.state = "COMPLETED" if overall_success else "FAILED"
|
||||
# self.dag.end_time = datetime.now().isoformat()
|
||||
# if self.dag.start_time:
|
||||
# start = datetime.fromisoformat(self.dag.start_time)
|
||||
# end = datetime.fromisoformat(self.dag.end_time)
|
||||
# self.dag.duration_seconds = (end - start).total_seconds()
|
||||
# self._save_dag()
|
||||
|
||||
# print(f"\nTask orchestration completed: {self.dag.state}")
|
||||
# print(f"Total duration: {self.dag.duration_seconds:.2f} seconds")
|
||||
|
||||
def reset(self):
|
||||
"""Kills all processes and panes inside task windows, removes windows, and deletes .done/.error/.ok files."""
|
||||
print(f"\n--- Resetting run '{self.run_name}' ---")
|
||||
self.cleanup() # First, kill all associated tmux windows
|
||||
|
||||
# Then, remove all .done, .error, and .ok files
|
||||
print(" Removing .done, .error, and .ok files...")
|
||||
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
|
||||
for script_path in scripts:
|
||||
done_file = f"{script_path}.done"
|
||||
error_file = f"{script_path}.error"
|
||||
ok_file = f"{script_path}.ok"
|
||||
if os.path.exists(done_file):
|
||||
os.remove(done_file)
|
||||
print(f" Removed: {done_file}")
|
||||
if os.path.exists(error_file):
|
||||
os.remove(error_file)
|
||||
print(f" Removed: {error_file}")
|
||||
if os.path.exists(ok_file):
|
||||
os.remove(ok_file)
|
||||
print(f" Removed: {ok_file}")
|
||||
|
||||
# Also remove the .dag.toml file if it exists
|
||||
if hasattr(self, 'dag_file_path') and self.dag_file_path.exists():
|
||||
os.remove(self.dag_file_path)
|
||||
print(f" Removed: {self.dag_file_path}")
|
||||
print("Reset complete.")
|
||||
|
||||
|
||||
|
||||
def _get_sorted_tasks_with_meta(self, tasks_root):
|
||||
"""
|
||||
Reads all scripts and .meta.toml from the tasks_root, sorts them by directory,
|
||||
and then by script name within each directory.
|
||||
Returns a dictionary where keys are directory numbers (e.g., 1, 2)
|
||||
and values are tuples of (list_of_full_script_paths, MetaData_object).
|
||||
"""
|
||||
tasks_with_meta = {}
|
||||
for dirpath, dirnames, filenames in os.walk(tasks_root):
|
||||
if dirpath == tasks_root:
|
||||
dirnames[:] = sorted([d for d in dirnames if d.isdigit()], key=int)
|
||||
|
||||
relative_path = os.path.relpath(dirpath, tasks_root)
|
||||
if relative_path != "." and relative_path.isdigit():
|
||||
dir_num = int(relative_path)
|
||||
scripts = sorted([os.path.join(dirpath, f) for f in filenames if f.endswith(".sh")])
|
||||
|
||||
metadata_file = os.path.join(dirpath, ".meta.toml")
|
||||
metadata = MetaData() # Default metadata
|
||||
if os.path.exists(metadata_file):
|
||||
try:
|
||||
with open(metadata_file, 'r') as f:
|
||||
meta_data_dict = toml.load(f)
|
||||
if 'timeout' in meta_data_dict:
|
||||
metadata.timeout = int(meta_data_dict['timeout'])
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not read or parse .meta.toml for directory {dir_num}: {e}")
|
||||
|
||||
if scripts:
|
||||
tasks_with_meta[dir_num] = (scripts, metadata)
|
||||
|
||||
sorted_tasks_with_meta = dict(sorted(tasks_with_meta.items()))
|
||||
return sorted_tasks_with_meta
|
||||
|
||||
def _get_current_tmux_session(self) -> Session:
|
||||
"""Gets the current tmux session based on TMUX environment variable."""
|
||||
server = libtmux.Server()
|
||||
tmux_env = os.environ.get('TMUX')
|
||||
|
||||
if not tmux_env:
|
||||
raise Exception("Not running inside a tmux session. The 'TMUX' environment variable is not set.")
|
||||
|
||||
try:
|
||||
# TMUX variable format: /tmp/tmux-1000/default,12345,0
|
||||
# The last part '0' is the session index.
|
||||
match = re.search(r',(\d+)$', tmux_env)
|
||||
if not match:
|
||||
raise Exception(f"Could not parse session index from TMUX environment variable: {tmux_env}")
|
||||
|
||||
session_index_from_env = match.group(1)
|
||||
|
||||
found_session = None
|
||||
for s in server.sessions:
|
||||
if s.session_id == f"${session_index_from_env}":
|
||||
found_session = s
|
||||
break
|
||||
|
||||
if not found_session:
|
||||
raise Exception(f"Could not find tmux session with ID: ${session_index_from_env}")
|
||||
|
||||
print(f"Attached to current tmux session: '{found_session.name}' via TMUX env var.")
|
||||
return found_session
|
||||
except Exception as e:
|
||||
raise Exception(f"Error getting current tmux session: {e}")
|
||||
|
||||
def _create_tmux_window(self, window_name: str) -> Window:
|
||||
"""Creates a new tmux window."""
|
||||
window = self.session.new_window(attach=False, window_name=window_name)
|
||||
print(f" Tmux window '{window_name}' created.")
|
||||
return window
|
||||
|
||||
def _create_tmux_pane(self, window: Window, pane_index: int, command: str) -> Pane:
|
||||
"""Creates a tmux pane and sends a command."""
|
||||
if pane_index == 0:
|
||||
pane = window.active_pane
|
||||
pane.send_keys("clear", enter=True)
|
||||
else:
|
||||
pane = window.split(attach=False)
|
||||
|
||||
pane.send_keys(command, enter=True)
|
||||
print(f" Pane {pane_index}: Command sent: '{command}'")
|
||||
return pane
|
||||
|
||||
def _is_pane_running(self, pane: Pane) -> bool:
|
||||
"""Checks if a tmux pane is still running a process."""
|
||||
try:
|
||||
pane_pid = pane.pane_pid
|
||||
if pane_pid is not None:
|
||||
try:
|
||||
pid_int = int(pane_pid)
|
||||
if pid_int > 0:
|
||||
os.kill(pid_int, 0)
|
||||
return True
|
||||
except (ValueError, OSError):
|
||||
return False
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Error checking pane status for {pane.window_name}:{pane.pane_index}: {e}")
|
||||
return False
|
||||
|
||||
def _setup_windows_and_panes(self):
|
||||
"""Initial setup of tmux windows and panes for all tasks."""
|
||||
all_dir_nums = sorted(self.all_tasks_with_meta.keys())
|
||||
print("\n--- Initial Tmux Setup: Creating windows and panes ---")
|
||||
for window_idx, dir_num in enumerate(all_dir_nums):
|
||||
scripts, metadata = self.all_tasks_with_meta[dir_num]
|
||||
window_name = f"{self.run_name}_{dir_num}"
|
||||
window = self._create_tmux_window(window_name)
|
||||
self.window_panes[window_idx] = []
|
||||
|
||||
for pane_idx, script_path in enumerate(scripts):
|
||||
script_dir = os.path.dirname(script_path)
|
||||
script_basename = os.path.basename(script_path)
|
||||
|
||||
if window_idx == 0:
|
||||
# Send cd command first, then the hpy command
|
||||
pane = self._create_tmux_pane(window, pane_idx, f"cd {script_dir}")
|
||||
pane.send_keys(f"source {HPY_SH_PATH} && hpy {script_basename}; echo \"Script {script_basename} finished.\"", enter=True)
|
||||
print(f" Pane {pane_idx}: Command sent: 'cd {script_dir}' and 'source {HPY_SH_PATH} && hpy {script_basename}'")
|
||||
else:
|
||||
command = f"echo '{WAITING_MESSAGE} for {script_basename}'"
|
||||
pane = self._create_tmux_pane(window, pane_idx, command)
|
||||
self.window_panes[window_idx].append(pane)
|
||||
|
||||
if window_idx == 0:
|
||||
print(f" Window '{window_name}' (Directory {dir_num}) tasks started.")
|
||||
else:
|
||||
print(f" Window '{window_name}' (Directory {dir_num}) panes set to '{WAITING_MESSAGE}'.")
|
||||
|
||||
def _start_directory_tasks(self, dir_idx: int):
|
||||
"""Starts tasks in a specific directory (window)."""
|
||||
directory = self.dag.directories[dir_idx]
|
||||
scripts, metadata = self.all_tasks_with_meta[directory.directory_num]
|
||||
panes_in_current_window = self.window_panes[dir_idx]
|
||||
|
||||
print(f"\n--- Activating tasks in window '{directory.window_name}' (Directory {directory.directory_num}) ---")
|
||||
for pane_idx, script_path in enumerate(scripts):
|
||||
script_dir = os.path.dirname(script_path)
|
||||
script_basename = os.path.basename(script_path)
|
||||
|
||||
pane = panes_in_current_window[pane_idx]
|
||||
pane.send_keys("C-c", literal=True) # Clear any previous command/output
|
||||
|
||||
# Send cd command first, then the hpy command
|
||||
pane.send_keys(f"cd {script_dir}", enter=True)
|
||||
pane.send_keys(f"source {HPY_SH_PATH} && hpy {script_basename}; echo \"Script {script_basename} finished.\"", enter=True)
|
||||
print(f" Pane {pane_idx}: Command sent: 'cd {script_dir}' and 'source {HPY_SH_PATH} && hpy {script_basename}'")
|
||||
|
||||
def run(self):
|
||||
"""Enhanced run method with DAG tracking."""
|
||||
print(f"Starting enhanced task orchestration for '{self.run_name}'")
|
||||
print(f"Run ID: {self.run_id}")
|
||||
print(f"DAG file: {self.dag_file_path}")
|
||||
|
||||
self.dag.state = "RUNNING"
|
||||
self._save_dag()
|
||||
|
||||
if not self.all_tasks_with_meta:
|
||||
print("No tasks found to execute.")
|
||||
return
|
||||
|
||||
print("Detected tasks:")
|
||||
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
|
||||
print(f" Directory {dir_num} (Timeout: {metadata.timeout}s):")
|
||||
for script in scripts:
|
||||
print(f" - {script}")
|
||||
|
||||
# Initialize windows and panes
|
||||
self._setup_windows_and_panes()
|
||||
|
||||
# Process directories sequentially
|
||||
overall_success = True
|
||||
for dir_idx in range(len(self.dag.directories)):
|
||||
directory = self.dag.directories[dir_idx]
|
||||
print(f"\n--- Processing Directory {directory.directory_num} ---")
|
||||
|
||||
# Start tasks if not the first directory
|
||||
if dir_idx > 0:
|
||||
self._start_directory_tasks(dir_idx)
|
||||
|
||||
# Monitor tasks
|
||||
success = self._monitor_directory_tasks(
|
||||
dir_idx,
|
||||
directory.timeout
|
||||
)
|
||||
|
||||
if not success:
|
||||
overall_success = False
|
||||
|
||||
# Update final DAG state
|
||||
self.dag.state = "COMPLETED" if overall_success else "FAILED"
|
||||
self.dag.end_time = datetime.now().isoformat()
|
||||
if self.dag.start_time:
|
||||
start = datetime.fromisoformat(self.dag.start_time)
|
||||
end = datetime.fromisoformat(self.dag.end_time)
|
||||
self.dag.duration_seconds = (end - start).total_seconds()
|
||||
self._save_dag()
|
||||
|
||||
print(f"\nTask orchestration completed: {self.dag.state}")
|
||||
print(f"Total duration: {self.dag.duration_seconds:.2f} seconds")
|
||||
print(f"You can attach to the tmux session to review: tmux attach -t {self.session.name}")
|
||||
|
||||
def cleanup(self):
|
||||
"""Removes all tmux windows created by this run."""
|
||||
print(f"\n--- Cleaning up tmux windows for run '{self.run_name}' ---")
|
||||
print(f" Current session name: '{self.session.name}'")
|
||||
all_session_windows = [w.name for w in self.session.windows if w.name]
|
||||
print(f" All windows in current session: {all_session_windows}")
|
||||
|
||||
windows_to_kill = []
|
||||
expected_prefix = f"{self.run_name}_"
|
||||
print(f" Looking for windows starting with prefix: '{expected_prefix}'")
|
||||
|
||||
for window in self.session.windows:
|
||||
if window.name and window.name.startswith(expected_prefix):
|
||||
windows_to_kill.append(window)
|
||||
|
||||
if not windows_to_kill:
|
||||
print(f" No windows found to kill with prefix '{expected_prefix}'.")
|
||||
print("Cleanup complete.")
|
||||
return
|
||||
|
||||
print(f" Identified {len(windows_to_kill)} windows to kill: {[w.name for w in windows_to_kill]}")
|
||||
for window in windows_to_kill:
|
||||
try:
|
||||
window.kill()
|
||||
print(f" Killed window: '{window.name}'")
|
||||
except Exception as e:
|
||||
print(f" Error killing window '{window.name}': {e}")
|
||||
print("Cleanup complete.")
|
||||
|
||||
def reset(self):
|
||||
"""Kills all processes and panes inside task windows, removes windows, and deletes .done/.error/.ok files."""
|
||||
print(f"\n--- Resetting run '{self.run_name}' ---")
|
||||
self.cleanup() # First, kill all associated tmux windows
|
||||
|
||||
# Then, remove all .done, .error, and .ok files
|
||||
print(" Removing .done, .error, and .ok files...")
|
||||
for dir_num, (scripts, metadata) in self.all_tasks_with_meta.items():
|
||||
for script_path in scripts:
|
||||
done_file = f"{script_path}.done"
|
||||
error_file = f"{script_path}.error"
|
||||
ok_file = f"{script_path}.ok"
|
||||
if os.path.exists(done_file):
|
||||
os.remove(done_file)
|
||||
print(f" Removed: {done_file}")
|
||||
if os.path.exists(error_file):
|
||||
os.remove(error_file)
|
||||
print(f" Removed: {error_file}")
|
||||
if os.path.exists(ok_file):
|
||||
os.remove(ok_file)
|
||||
print(f" Removed: {ok_file}")
|
||||
|
||||
# Also remove the .dag.toml file if it exists
|
||||
if hasattr(self, 'dag_file_path') and self.dag_file_path.exists():
|
||||
os.remove(self.dag_file_path)
|
||||
print(f" Removed: {self.dag_file_path}")
|
||||
print("Reset complete.")
|
||||
@@ -1,167 +0,0 @@
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, List, Optional
|
||||
from dataclasses import asdict
|
||||
from datetime import datetime
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from .task_runner import TaskRunner
|
||||
|
||||
|
||||
class TaskRunnerAPI:
|
||||
"""FastAPI interface for the task runner."""
|
||||
|
||||
def __init__(self, runner: TaskRunner):
|
||||
self.runner = runner
|
||||
self.app = FastAPI(title="Task Runner API", version="1.0.0")
|
||||
|
||||
# Add CORS middleware
|
||||
self.app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"],
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
self._setup_routes()
|
||||
|
||||
def _setup_routes(self):
|
||||
"""Setup API routes."""
|
||||
|
||||
@self.app.get("/")
|
||||
async def root():
|
||||
"""Get API information."""
|
||||
return {
|
||||
"name": "Task Runner API",
|
||||
"version": "1.0.0",
|
||||
"run_id": self.runner.run_id,
|
||||
"run_name": self.runner.run_name
|
||||
}
|
||||
|
||||
@self.app.get("/status")
|
||||
async def get_status():
|
||||
"""Get current run status."""
|
||||
return {
|
||||
"run_id": self.runner.run_id,
|
||||
"run_name": self.runner.run_name,
|
||||
"state": self.runner.dag.state,
|
||||
"start_time": self.runner.dag.start_time,
|
||||
"end_time": self.runner.dag.end_time,
|
||||
"duration_seconds": self.runner.dag.duration_seconds,
|
||||
"total_directories": self.runner.dag.total_directories,
|
||||
"completed_directories": self.runner.dag.completed_directories,
|
||||
"failed_directories": self.runner.dag.failed_directories
|
||||
}
|
||||
|
||||
@self.app.get("/directories")
|
||||
async def get_directories():
|
||||
"""Get all directory statuses."""
|
||||
return [
|
||||
{
|
||||
"directory_num": d.directory_num,
|
||||
"directory_path": d.directory_path,
|
||||
"state": d.state,
|
||||
"timeout": d.timeout,
|
||||
"start_time": d.start_time,
|
||||
"end_time": d.end_time,
|
||||
"duration_seconds": d.duration_seconds,
|
||||
"task_count": len(d.tasks),
|
||||
"tasks_done": sum(1 for t in d.tasks if t.state == "DONE"),
|
||||
"tasks_error": sum(1 for t in d.tasks if t.state in ["ERROR", "CRASHED", "TIMED_OUT"])
|
||||
}
|
||||
for d in self.runner.dag.directories
|
||||
]
|
||||
|
||||
@self.app.get("/directories/{dir_num}/tasks")
|
||||
async def get_directory_tasks(dir_num: int):
|
||||
"""Get tasks for a specific directory."""
|
||||
for d in self.runner.dag.directories:
|
||||
if d.directory_num == dir_num:
|
||||
return d.tasks
|
||||
raise HTTPException(status_code=404, detail="Directory not found")
|
||||
|
||||
@self.app.get("/tasks/{dir_num}/{task_name}")
|
||||
async def get_task_details(dir_num: int, task_name: str):
|
||||
"""Get detailed information about a specific task."""
|
||||
for d in self.runner.dag.directories:
|
||||
if d.directory_num == dir_num:
|
||||
for t in d.tasks:
|
||||
if t.script_name == task_name:
|
||||
return t
|
||||
raise HTTPException(status_code=404, detail="Task not found")
|
||||
|
||||
@self.app.get("/metrics")
|
||||
async def get_metrics():
|
||||
"""Get current process metrics for all running tasks."""
|
||||
metrics = []
|
||||
for d in self.runner.dag.directories:
|
||||
for t in d.tasks:
|
||||
if t.state == "RUNNING":
|
||||
metrics.append({
|
||||
"directory": d.directory_num,
|
||||
"task": t.script_name,
|
||||
"cpu_percent": t.process_metrics.cpu_percent,
|
||||
"memory_rss_mb": t.process_metrics.memory_rss / (1024 * 1024),
|
||||
"memory_percent": t.process_metrics.memory_percent,
|
||||
"num_threads": t.process_metrics.num_threads,
|
||||
"num_children": t.process_metrics.num_children
|
||||
})
|
||||
return metrics
|
||||
|
||||
@self.app.get("/dag")
|
||||
async def get_full_dag():
|
||||
"""Get the complete DAG structure."""
|
||||
return asdict(self.runner.dag)
|
||||
|
||||
def start(self, host: str = "0.0.0.0", port: int = 8000):
|
||||
"""Start the FastAPI server."""
|
||||
uvicorn.run(self.app, host=host, port=port)
|
||||
|
||||
|
||||
class TaskOrchestrator:
|
||||
"""Main orchestrator that runs tasks and API server."""
|
||||
|
||||
def __init__(self, tasks_dir: str, api_port: int = 8000):
|
||||
self.runner = TaskRunner(tasks_dir)
|
||||
self.api = TaskRunnerAPI(self.runner)
|
||||
self.api_thread = None
|
||||
self.api_port = api_port
|
||||
|
||||
def start_api_server(self, port: int = 8000):
|
||||
"""Start API server in a separate thread."""
|
||||
self.api_thread = threading.Thread(
|
||||
target=self.api.start,
|
||||
args=("0.0.0.0", port),
|
||||
daemon=True
|
||||
)
|
||||
self.api_thread.start()
|
||||
print(f"API server started on http://0.0.0.0:{port}")
|
||||
|
||||
def run(self):
|
||||
"""Run the task orchestration."""
|
||||
# Start API server
|
||||
self.start_api_server(self.api_port)
|
||||
|
||||
# Reset and run tasks
|
||||
self.runner.reset()
|
||||
try:
|
||||
self.runner.run()
|
||||
except Exception as e:
|
||||
print(f"Error during execution: {e}")
|
||||
self.runner.dag.state = "FAILED"
|
||||
self.runner.dag.end_time = datetime.now().isoformat()
|
||||
self.runner._save_dag()
|
||||
|
||||
print("\nExecution completed. API server still running.")
|
||||
print(f"Access API at: http://localhost:{self.api_port}")
|
||||
print("Press Ctrl+C to stop the API server.")
|
||||
|
||||
try:
|
||||
# Keep the main thread alive for API access
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down...")
|
||||
@@ -1,94 +0,0 @@
|
||||
from herotools.logger import logger
|
||||
from bs4 import BeautifulSoup
|
||||
import re
|
||||
from typing import Callable
|
||||
from herotools.texttools import name_fix
|
||||
|
||||
# Define the type for the content and link fetching functions
|
||||
LinkFetcher = Callable[[str, str, str, str, str], str]
|
||||
ContentFetcher = Callable[[str, str, str, str], str]
|
||||
|
||||
# Private functions to be used internally
|
||||
|
||||
def _get_link(language: str, prefix: str, site_name: str, pagename: str, name: str) -> str:
|
||||
# Replace this with your logic to get the actual link
|
||||
logger.debug(f"_get_link: {language[:10]:<10} {site_name}:{pagename}:{name}")
|
||||
return f"{prefix}{language}/{site_name}/{pagename}/{name}.jpg"
|
||||
|
||||
def _get_content(language: str, site_name: str, pagename: str, name: str) -> str:
|
||||
# Replace this with your logic to get the actual content
|
||||
logger.debug(f"_get_content: {language[:10]:<10} {site_name}:{pagename}:{name}")
|
||||
return f"Replaced text for {name} on page {pagename} in {language} language on {site_name} site"
|
||||
|
||||
def _process_html(language: str, prefix: str, site_name: str, pagename: str, html_content: str) -> str:
|
||||
"""
|
||||
Function to process HTML and replace content based on tags.
|
||||
This allows us to work with templates and get content based on language to replace in HTML.
|
||||
"""
|
||||
language = name_fix(language)
|
||||
site_name = name_fix(site_name)
|
||||
pagename = name_fix(pagename)
|
||||
prefix = prefix.strip()
|
||||
if not prefix.endswith('/'):
|
||||
prefix += '/'
|
||||
|
||||
soup = BeautifulSoup(html_content, 'html.parser')
|
||||
|
||||
# Find all elements with class names starting with !!img: or !!txt:
|
||||
for element in soup.find_all(class_=re.compile(r'!!(img|txt):(.+)')):
|
||||
for cls in element['class']:
|
||||
if cls.startswith('!!img:'):
|
||||
name = cls.split(':')[1]
|
||||
name = name_fix(name)
|
||||
# Get the link to replace the src attribute in !!img: elements
|
||||
link = _get_link(language=language, prefix=prefix, site_name=site_name, pagename=pagename, name=name)
|
||||
if element.name == 'img':
|
||||
element['src'] = link
|
||||
elif 'src' in element.attrs:
|
||||
element['src'] = link # In case the element is not an img but has a src attribute
|
||||
elif cls.startswith('!!txt:'):
|
||||
name = cls.split(':')[1]
|
||||
name = name_fix(name)
|
||||
# Get the content to replace the text in !!txt: elements
|
||||
content = _get_content(language=language, site_name=site_name, pagename=pagename, name=name)
|
||||
element.string = content
|
||||
|
||||
# Output the modified HTML
|
||||
return str(soup)
|
||||
|
||||
# Public function to process the HTML content
|
||||
def process(language: str, prefix: str, site_name: str, pagename: str, html_content: str) -> str:
|
||||
"""
|
||||
Public function to process HTML and replace content based on tags.
|
||||
This function wraps the internal _process_html function.
|
||||
"""
|
||||
return _process_html(language=language, prefix=prefix, site_name=site_name, pagename=pagename, html_content=html_content)
|
||||
|
||||
# Sample usage with a given language, site name, page name, and HTML content
|
||||
if __name__ == "__main__":
|
||||
# Example HTML content
|
||||
html_content = '''
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>Sample Page</title>
|
||||
</head>
|
||||
<body>
|
||||
<h2 class="mb-6 is-size-1 is-size-3-mobile has-text-weight-bold !!txt:title1">Take care of your performance every day.</h2>
|
||||
<img class="responsive !!img:logo" src="old-link.jpg" alt="Company Logo">
|
||||
<p class="content !!txt:description">This is a sample description text.</p>
|
||||
</body>
|
||||
</html>
|
||||
'''
|
||||
|
||||
# Process the HTML content for a specific language, site name, and page
|
||||
language: str = "en"
|
||||
site_name: str = "ExampleSite"
|
||||
pagename: str = "HomePage"
|
||||
prefix: str = "http://localhost/images/"
|
||||
processed_html: str = process(language=language, prefix=prefix, site_name=site_name, pagename=pagename, html_content=html_content)
|
||||
|
||||
# Print the modified HTML
|
||||
print(processed_html)
|
||||
@@ -1,172 +0,0 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add the parent directory of herotools to the Python module search path
|
||||
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
from herotools.logger import logger
|
||||
from markdown_it import MarkdownIt
|
||||
from markdown_it.tree import SyntaxTreeNode
|
||||
import re
|
||||
from enum import Enum
|
||||
from herotools.texttools import name_fix
|
||||
from mdformat.renderer import MDRenderer
|
||||
from urllib.parse import urlparse
|
||||
|
||||
class ImageType(Enum):
|
||||
JPEG = 'jpeg'
|
||||
PNG = 'png'
|
||||
GIF = 'gif'
|
||||
OTHER = 'other'
|
||||
|
||||
|
||||
def get_link_page(prefix:str, linkname:str, sitename: str, name: str) -> str:
|
||||
"""
|
||||
Generates a page link based on sitename and name.
|
||||
|
||||
Args:
|
||||
sitename (str): The name of the site.
|
||||
name (str): The name of the page.
|
||||
|
||||
Returns:
|
||||
str: The generated link.
|
||||
"""
|
||||
logger.debug(f"get_link_page: {prefix[:60]:<60} {linkname} {sitename}:{name}")
|
||||
return f"[{linkname}]({prefix}/{sitename}/{name})"
|
||||
|
||||
def get_link_image(prefix:str, sitename: str, name: str, image_type: ImageType) -> str:
|
||||
"""
|
||||
Generates an image link based on the URL and image type.
|
||||
|
||||
Args:
|
||||
url (str): The original URL of the image.
|
||||
image_type (ImageType): The type of the image.
|
||||
|
||||
Returns:
|
||||
str: The generated link.
|
||||
"""
|
||||
logger.debug(f"get_link_image: {prefix[:60]:<60} {sitename}:{name}")
|
||||
return f""
|
||||
|
||||
def get_include(sitename: str, name: str) -> str:
|
||||
"""
|
||||
Generates an include directive link based on sitename and name.
|
||||
|
||||
Args:
|
||||
sitename (str): The name of the site.
|
||||
name (str): The name of the page to include.
|
||||
|
||||
Returns:
|
||||
str: The generated include directive.
|
||||
"""
|
||||
logger.debug(f"get_include: {sitename}:{name}")
|
||||
return f"include: {sitename}/{name}"
|
||||
|
||||
def replace(prefix:str, markdown: str) -> str:
|
||||
"""
|
||||
Finds all image links, markdown page links, and custom include directives in the provided markdown text
|
||||
and replaces them using the appropriate functions.
|
||||
|
||||
Args:
|
||||
markdown (str): The markdown content.
|
||||
|
||||
Returns:
|
||||
str: The modified markdown content with updated links.
|
||||
"""
|
||||
# Initialize the Markdown parser
|
||||
md = MarkdownIt()
|
||||
tokens = md.parse(markdown)
|
||||
ast = SyntaxTreeNode(tokens)
|
||||
|
||||
print(ast.pretty(indent=2, show_text=True))
|
||||
|
||||
def process_node(node: SyntaxTreeNode):
|
||||
# from IPython import embed; embed()
|
||||
|
||||
def get_new_url(url: str):
|
||||
logger.debug(f"url: {url}")
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
# site_name = parsed_url.netloc
|
||||
image_path = parsed_url.path
|
||||
logger.debug(f"parsed_url: {parsed_url}")
|
||||
|
||||
# prefix = prefix.rstrip('/')
|
||||
# image_path = image_path.strip('/')
|
||||
|
||||
new_url = f"{prefix.rstrip('/')}/{image_path.strip('/')}"
|
||||
logger.debug(f"new_url: {new_url}")
|
||||
|
||||
return new_url
|
||||
|
||||
if node.type == 'image':
|
||||
# Process image link
|
||||
url = node.attrs.get('src', '')
|
||||
new_url = get_new_url(url)
|
||||
node.attrs['src'] = new_url
|
||||
|
||||
elif node.type == 'link':
|
||||
# Process markdown page link
|
||||
url = node.attrs.get('href', '')
|
||||
new_url = get_new_url(url)
|
||||
node.attrs['href'] = new_url
|
||||
|
||||
# Recursively process child nodes
|
||||
for child in node.children or []:
|
||||
process_node(child)
|
||||
|
||||
def replace_include_directives(match: re.Match) -> str:
|
||||
"""
|
||||
Replaces custom include directives with appropriate links.
|
||||
|
||||
Args:
|
||||
match (re.Match): The match object containing the found include directive.
|
||||
|
||||
Returns:
|
||||
str: The generated link for the include directive.
|
||||
"""
|
||||
url = match.group(1)
|
||||
if ':' in url:
|
||||
site_name, page = url.split(':', 1)
|
||||
page_name = page.split('/')[-1]
|
||||
else:
|
||||
site_name = ""
|
||||
page_name = url
|
||||
if not page.endswith('.md'):
|
||||
page += '.md'
|
||||
return get_include(prefix, site_name, page_name)
|
||||
|
||||
|
||||
# Process the root node
|
||||
process_node(ast)
|
||||
|
||||
# Convert the AST back to markdown
|
||||
renderer = MDRenderer()
|
||||
options = {}
|
||||
env = {}
|
||||
rendered_markdown = renderer.render(tokens, options, env)
|
||||
|
||||
# include_pattern = re.compile(r"!!include page:'(.*?)'")
|
||||
# rendered_markdown = include_pattern.sub(replace_include_directives, rendered_markdown)
|
||||
|
||||
return rendered_markdown
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
text = """
|
||||

|
||||
[Page link](sitename:some/path/to/page.md)
|
||||
!!include page:'mypage'
|
||||
!!include page:'mypage.md'
|
||||
!!include page:'mysite:mypage
|
||||
!!include page:'mysite:mypage'
|
||||
!!include page:'mysite:mypage.md'
|
||||
"""
|
||||
|
||||
print(text)
|
||||
text2=replace("http://localhost:8080/pre/", text)
|
||||
print(text2)
|
||||
|
||||
|
||||
@@ -1,94 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
from typing import Callable
|
||||
|
||||
from herotools.logger import logger
|
||||
from herotools.md5 import file_md5
|
||||
from herotools.texttools import name_fix
|
||||
|
||||
|
||||
def _example_set_file(site_name: str, path: str, md5: str) -> None:
|
||||
# Placeholder for actual implementation
|
||||
logger.debug(f"set_file : site_name={site_name[:20]:<20} {path}")
|
||||
|
||||
|
||||
def _example_set_img(site_name: str, path: str, md5: str) -> None:
|
||||
# Placeholder for actual implementation
|
||||
logger.debug(f"set_img : site_name={site_name[:20]:<20} {path}")
|
||||
|
||||
|
||||
def _example_set_markdown(
|
||||
site_name: str, path: str, md5: str, content: str
|
||||
) -> None:
|
||||
# Placeholder for actual implementation
|
||||
logger.debug(f"set_markdown : site_name={site_name[:20]:<20} {path}")
|
||||
|
||||
|
||||
def _example_set_site(site_name: str, path: str) -> None:
|
||||
# Placeholder for actual implementation
|
||||
logger.info(f"set_site : site_name={site_name[:20]:<20} {path}")
|
||||
|
||||
|
||||
def _site_process_action(
|
||||
site_name: str,
|
||||
site_path: str,
|
||||
set_file: Callable[[str, str, str], None],
|
||||
set_img: Callable[[str, str, str], None],
|
||||
set_markdown: Callable[[str, str, str, str], None],
|
||||
) -> None:
|
||||
logger.debug(f"site process: {site_path[:60]:<60} -> {site_name}")
|
||||
for root, _, files in os.walk(site_path):
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
file_path_rel = os.path.relpath(file_path, site_path)
|
||||
file_name = os.path.basename(file)
|
||||
# print(file_name)
|
||||
mymd5 = file_md5(file_path)
|
||||
if file.lower().endswith(".md"):
|
||||
with open(file_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
set_markdown(site_name, file_path_rel, mymd5, content)
|
||||
elif file_name in [".collection", ".site", ".done"]:
|
||||
continue
|
||||
elif re.search(
|
||||
r"\.(jpg|jpeg|png|gif|bmp|tiff|webp)$", file, re.IGNORECASE
|
||||
):
|
||||
set_img(site_name, file_path_rel, mymd5)
|
||||
else:
|
||||
set_file(site_name, file_path_rel, mymd5)
|
||||
|
||||
|
||||
def process(
|
||||
path: str,
|
||||
set_site: Callable[[str, str], None],
|
||||
set_file: Callable[[str, str, str], None],
|
||||
set_img: Callable[[str, str, str], None],
|
||||
set_markdown: Callable[[str, str, str, str], None],
|
||||
) -> None:
|
||||
"""
|
||||
walk over directory and apply set_file(), set_img() and set_markdown()
|
||||
"""
|
||||
path = os.path.abspath(os.path.expanduser(path))
|
||||
logger.info(f"sites process: {path}")
|
||||
for root, dirs, files in os.walk(path):
|
||||
if ".site" in files or ".collection" in files:
|
||||
site_name = name_fix(os.path.basename(root))
|
||||
set_site(site_name, root)
|
||||
_site_process_action(
|
||||
site_name, root, set_file, set_img, set_markdown
|
||||
)
|
||||
# Prevent the os.walk from going deeper into subdirectories
|
||||
dirs[:] = []
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mypath = "~/code/git.threefold.info/projectmycelium/info_projectmycelium/collections"
|
||||
|
||||
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
process(
|
||||
mypath,
|
||||
_example_set_site,
|
||||
_example_set_file,
|
||||
_example_set_img,
|
||||
_example_set_markdown,
|
||||
)
|
||||
Reference in New Issue
Block a user