herolib_python/_archive/git_poller.py
2025-08-05 15:15:36 +02:00

123 lines
4.0 KiB
Python

import sys
import os
import redis
import subprocess
import time
def find_git_root(path):
while path != '/':
if os.path.exists(os.path.join(path, '.git')):
return path
path = os.path.dirname(path)
return None
def get_git_hash(path):
return subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=path).decode('utf-8').strip()
def get_changes(path, old_hash, new_hash):
return subprocess.check_output(['git', 'log', f'{old_hash}..{new_hash}', '--name-only', '--pretty=format:'], cwd=path).decode('utf-8').split('\n')
def find_heroscript_callers(path, changes, start_path):
callers = set()
for change in changes:
if not change:
continue
change_path = os.path.join(path, change)
current_path = os.path.dirname(change_path)
while current_path.startswith(start_path):
if os.path.exists(os.path.join(current_path, '.heroscript_caller')):
callers.add(os.path.join(current_path, '.heroscript_caller'))
break
current_path = os.path.dirname(current_path)
return callers
def find_all_heroscript_callers(path):
callers = set()
for root, dirs, files in os.walk(path):
if '.heroscript_caller' in files:
callers.add(os.path.join(root, '.heroscript_caller'))
return callers
def read_heroscript_caller(file_path):
with open(file_path, 'r') as file:
lines = [line.strip() for line in file if line.strip()]
return list(dict.fromkeys(lines)) # Remove duplicates while preserving order
def main(start_path, reset=False):
if not start_path:
start_path = os.getcwd()
git_root = find_git_root(start_path)
if not git_root:
print(f"Error: No git repository found in {start_path} or its parent directories.")
return
r = redis.Redis(host='localhost', port=6379, db=0)
if reset:
r.hdel('git.lastcommit', git_root)
print(f"Reset Redis hash for {git_root}")
# Perform git pull
subprocess.run(['git', 'pull'], cwd=git_root, check=True)
new_hash = get_git_hash(git_root)
old_hash = r.hget('git.lastcommit', git_root)
if old_hash:
old_hash = old_hash.decode('utf-8')
if old_hash != new_hash:
changes = get_changes(git_root, old_hash, new_hash)
callers = find_heroscript_callers(git_root, changes, start_path)
else:
print("No changes detected.")
return
else:
callers = find_all_heroscript_callers(start_path)
myerror=False
for caller in callers:
unique_lines = read_heroscript_caller(caller)
for heroscripturl in unique_lines:
print(f"{heroscripturl}:{new_hash}")
res0=run_hero_command(heroscripturl)
if res0==False:
myerror=True
if myerror==False:
r.hset('git.lastcommit', git_root, new_hash)
def run_hero_command(url:str) -> bool:
try:
# Construct the command
command = f"hero run -u {url}"
# Run the command and capture output
result = subprocess.run(command, shell=True, check=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
text=True)
except subprocess.CalledProcessError as e:
print(f"Error running command: {e}")
print(f"Command output: {e.output}")
print(f"Command stderr: {e.stderr}")
return False
except Exception as e:
print(f"An unexpected error occurred: {e}")
print(f"Command output: {e.output}")
print(f"Command stderr: {e.stderr}")
return False
print("Command Output (stdout):")
print(result.stdout)
return True
if __name__ == "__main__":
if len(sys.argv) == 3 and sys.argv[2] == '--reset':
main(sys.argv[1], reset=True)
elif len(sys.argv) == 2:
main(sys.argv[1])
else:
print("Usage: python script.py <path> [--reset]")
sys.exit(1)