131 lines
5.2 KiB
Python
131 lines
5.2 KiB
Python
import asyncio
|
|
import os
|
|
import csv
|
|
import time
|
|
import schedule
|
|
from playwright.async_api import async_playwright
|
|
from PIL import Image, ImageChops
|
|
|
|
# --- Konfiguration ---
|
|
URLS_FILE = os.environ.get("URLS_FILE", "/app/urls.csv")
|
|
OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/output")
|
|
DEFAULT_INTERVAL = int(os.environ.get("INTERVAL_MINUTES", "60"))
|
|
DEFAULT_SCALE = float(os.environ.get("SCALE", "1.0"))
|
|
|
|
# --- CSV laden ---
|
|
def load_urls():
|
|
entries = []
|
|
if os.path.exists(URLS_FILE):
|
|
with open(URLS_FILE, newline="", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
url = row.get("url")
|
|
filename = row.get("filename")
|
|
scale = float(row.get("scale") or DEFAULT_SCALE)
|
|
selector = row.get("selector", "").strip()
|
|
element_width = row.get("element_width")
|
|
element_height = row.get("element_height")
|
|
interval_minutes = row.get("interval_minutes")
|
|
entries.append({
|
|
"url": url.strip() if url else "",
|
|
"filename": filename.strip() if filename else "",
|
|
"scale": scale,
|
|
"selector": selector,
|
|
"element_width": int(element_width) if element_width else None,
|
|
"element_height": int(element_height) if element_height else None,
|
|
"interval_minutes": int(interval_minutes) if interval_minutes else DEFAULT_INTERVAL
|
|
})
|
|
return [e for e in entries if e["url"] and e["filename"]]
|
|
|
|
# --- Prüfen, ob Bild sich geändert hat ---
|
|
def images_different(path1, path2):
|
|
if not os.path.exists(path2):
|
|
return True
|
|
img1 = Image.open(path1)
|
|
img2 = Image.open(path2)
|
|
return ImageChops.difference(img1, img2).getbbox() is not None
|
|
|
|
# --- Screenshot aufnehmen ---
|
|
async def capture_page(entry):
|
|
url = entry["url"]
|
|
filename = entry["filename"]
|
|
scale = entry["scale"]
|
|
selector = entry.get("selector")
|
|
width = entry.get("element_width")
|
|
height = entry.get("element_height")
|
|
|
|
print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Screenshot {url} → {filename} (Selector: '{selector}')")
|
|
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch(headless=True)
|
|
context = await browser.new_context(viewport={"width":1920,"height":1080})
|
|
page = await context.new_page()
|
|
|
|
try:
|
|
await page.goto(url, wait_until="networkidle", timeout=60000)
|
|
|
|
# --- Temporäre Datei ---
|
|
base, ext = os.path.splitext(filename)
|
|
if ext.lower() not in [".png", ".jpg", ".jpeg"]:
|
|
ext = ".png"
|
|
temp_path = os.path.join(OUTPUT_DIR, f"{base}.tmp{ext}")
|
|
output_path = os.path.join(OUTPUT_DIR, filename)
|
|
|
|
# --- Screenshot aufnehmen ---
|
|
if selector:
|
|
element = await page.query_selector(selector)
|
|
if element:
|
|
# Größe anpassen, falls angegeben
|
|
if width or height:
|
|
js_width = f"{width}px" if width else "auto"
|
|
js_height = f"{height}px" if height else "auto"
|
|
await page.eval_on_selector(selector,
|
|
f"(el) => {{ el.style.width = '{js_width}'; el.style.height = '{js_height}'; }}")
|
|
await element.screenshot(path=temp_path)
|
|
else:
|
|
print(f"❌ Selector '{selector}' nicht gefunden, ganze Seite wird genutzt")
|
|
await page.screenshot(path=temp_path, full_page=True)
|
|
else:
|
|
await page.screenshot(path=temp_path, full_page=True)
|
|
|
|
await browser.close()
|
|
|
|
# --- Skalierung ---
|
|
if abs(scale - 1.0) > 0.001:
|
|
img = Image.open(temp_path)
|
|
new_size = (int(img.width * scale), int(img.height * scale))
|
|
img = img.resize(new_size, Image.Resampling.LANCZOS)
|
|
img.save(temp_path, optimize=True, quality=90)
|
|
|
|
# --- Nur speichern, wenn sich Screenshot geändert hat ---
|
|
if images_different(temp_path, output_path):
|
|
os.replace(temp_path, output_path)
|
|
print(f"→ {filename} gespeichert ({scale*100:.0f}% Größe).")
|
|
else:
|
|
os.remove(temp_path)
|
|
print(f"→ {filename} unverändert, nicht gespeichert.")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Fehler bei {url}: {e}")
|
|
await browser.close()
|
|
|
|
# --- Scheduler pro URL ---
|
|
def schedule_screenshots(entries):
|
|
for entry in entries:
|
|
# Sofort einmal Screenshot erstellen
|
|
asyncio.run(capture_page(entry))
|
|
# Intervall planen
|
|
interval = entry.get("interval_minutes", DEFAULT_INTERVAL)
|
|
schedule.every(interval).minutes.do(lambda e=entry: asyncio.run(capture_page(e)))
|
|
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(1)
|
|
|
|
if __name__ == "__main__":
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
entries = load_urls()
|
|
if not entries:
|
|
print("Keine Einträge in CSV gefunden!")
|
|
else:
|
|
schedule_screenshots(entries)
|