e31bdeb59d
- inwx_common.py: Login mit Session-Cache, KP_PW-Option - inwx_list/add: Domain als Argument, gemeinsamer Helfer - inwx_domains(_owner).py: Domains alphabetisch, NS, Gruppen via citeq-TXT (DNS) - hosting.kdbx aus Tracking genommen und in .gitignore Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
160 lines
5.3 KiB
Python
160 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
||
"""Gemeinsame Helfer für die INWX-Skripte.
|
||
|
||
Kapselt das Laden der Zugangsdaten aus KeePassXC sowie den Login mit
|
||
Session-Caching: Nach dem ersten Login wird das Session-Cookie von INWX
|
||
lokal zwischengespeichert. Folgeaufrufe nutzen diese Session wieder und
|
||
benötigen daher weder das KeePass-Master-Passwort noch einen neuen Login,
|
||
solange die Session bei INWX noch gültig ist.
|
||
"""
|
||
import json
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
|
||
import requests
|
||
|
||
# Import-Fix für die Library-Struktur (Linux/Mac)
|
||
try:
|
||
from INWX.Domrobot import ApiClient
|
||
except ImportError:
|
||
from inwx.Domrobot import ApiClient
|
||
|
||
# --- KONFIGURATION ---
|
||
API_URL = 'https://api.domrobot.com'
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
KP_DB_PATH = os.path.join(BASE_DIR, "hosting.kdbx")
|
||
KP_ENTRY_NAME = "inwx"
|
||
# Datei mit der zwischengespeicherten Session (enthält nur ein kurzlebiges
|
||
# Session-Cookie, niemals das Passwort).
|
||
SESSION_CACHE_PATH = os.path.join(BASE_DIR, ".inwx_session")
|
||
|
||
|
||
def get_creds():
|
||
"""Holt Benutzername und Passwort aus KeePassXC.
|
||
|
||
Das KeePass-Master-Passwort wird interaktiv abgefragt. Ist die
|
||
Umgebungsvariable KP_PW gesetzt, wird deren Wert stattdessen automatisch
|
||
an keepassxc-cli übergeben (praktisch in nicht-interaktiven Umgebungen).
|
||
"""
|
||
try:
|
||
# -s sorgt dafür, dass das Passwort nicht als 'PROTECTED' ausgegeben wird
|
||
cmd = ["keepassxc-cli", "show", "-s", KP_DB_PATH, KP_ENTRY_NAME]
|
||
|
||
master_pw = os.environ.get("KP_PW")
|
||
if master_pw:
|
||
# Master-Passwort über stdin an keepassxc-cli durchreichen.
|
||
result = subprocess.run(
|
||
cmd, input=master_pw + "\n",
|
||
capture_output=True, text=True,
|
||
)
|
||
if result.returncode != 0:
|
||
print(f"❌ KeePass-Fehler: {result.stderr.strip()}")
|
||
return None, None
|
||
output = result.stdout
|
||
else:
|
||
# Kein KP_PW gesetzt -> keepassxc-cli fragt interaktiv nach.
|
||
output = subprocess.check_output(cmd, text=True)
|
||
|
||
creds = {}
|
||
for line in output.splitlines():
|
||
if ":" in line:
|
||
k, v = line.split(":", 1)
|
||
creds[k.strip().lower()] = v.strip()
|
||
|
||
user = creds.get("username") or creds.get("benutzername") or creds.get("user")
|
||
password = creds.get("password") or creds.get("passwort")
|
||
return user, password
|
||
except Exception as e:
|
||
print(f"❌ KeePass-Fehler: {e}")
|
||
return None, None
|
||
|
||
|
||
def _load_cached_cookies(api):
|
||
"""Lädt gespeicherte Session-Cookies in den ApiClient. True bei Erfolg."""
|
||
if not os.path.exists(SESSION_CACHE_PATH):
|
||
return False
|
||
try:
|
||
with open(SESSION_CACHE_PATH, "r") as f:
|
||
cookies = json.load(f)
|
||
if not cookies:
|
||
return False
|
||
api.api_session.cookies = requests.utils.cookiejar_from_dict(cookies)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _save_cached_cookies(api):
|
||
"""Speichert die Session-Cookies (Datei nur für den Benutzer lesbar, 0600)."""
|
||
try:
|
||
cookies = requests.utils.dict_from_cookiejar(api.api_session.cookies)
|
||
fd = os.open(SESSION_CACHE_PATH, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
||
with os.fdopen(fd, "w") as f:
|
||
json.dump(cookies, f)
|
||
except Exception as e:
|
||
print(f"⚠️ Konnte Session nicht zwischenspeichern: {e}")
|
||
|
||
|
||
def _clear_cached_session():
|
||
"""Entfernt eine (abgelaufene) zwischengespeicherte Session."""
|
||
try:
|
||
os.remove(SESSION_CACHE_PATH)
|
||
except FileNotFoundError:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _do_login(api):
|
||
"""Frischer Login via KeePass-Zugangsdaten; speichert die Session. True bei Erfolg."""
|
||
user, password = get_creds()
|
||
if not user or not password:
|
||
return False
|
||
|
||
print(f"Versuche Login für: {user}")
|
||
login_res = api.login(user, password)
|
||
if login_res['code'] != 1000:
|
||
print(f"❌ Login fehlgeschlagen: {login_res['msg']} (Code: {login_res['code']})")
|
||
return False
|
||
|
||
_save_cached_cookies(api)
|
||
print("✅ Login erfolgreich (Session zwischengespeichert).")
|
||
return True
|
||
|
||
|
||
def get_client():
|
||
"""Liefert einen einsatzbereiten ApiClient.
|
||
|
||
Nutzt eine zwischengespeicherte Session, falls vorhanden. Ob diese noch
|
||
gültig ist, wird erst beim ersten echten Aufruf via call_api() geprüft –
|
||
so sparen wir uns eine zusätzliche Anfrage. Ist keine Session vorhanden,
|
||
wird sofort via KeePass eingeloggt.
|
||
"""
|
||
api = ApiClient(api_url=API_URL)
|
||
if _load_cached_cookies(api):
|
||
print("ℹ️ Verwende zwischengespeicherte Session.")
|
||
return api
|
||
if not _do_login(api):
|
||
sys.exit(1)
|
||
return api
|
||
|
||
|
||
def call_api(api, method, params=None):
|
||
"""Wie ApiClient.call_api, meldet sich aber bei abgelaufener Session neu an.
|
||
|
||
Liefert INWX einen Autorisierungsfehler (Code 2200–2299), wird die
|
||
zwischengespeicherte Session verworfen, via KeePass neu eingeloggt und
|
||
der Aufruf einmal wiederholt.
|
||
"""
|
||
params = params or {}
|
||
res = api.call_api(method, params)
|
||
code = res.get('code')
|
||
if code is not None and 2200 <= code < 2300:
|
||
print("ℹ️ Session abgelaufen, melde neu an...")
|
||
_clear_cached_session()
|
||
if not _do_login(api):
|
||
sys.exit(1)
|
||
res = api.call_api(method, params)
|
||
return res
|