Session-Caching, Domain-/Gruppen-Skripte; KeePass-DB aus Repo entfernen
- inwx_common.py: Login mit Session-Cache, KP_PW-Option - inwx_list/add: Domain als Argument, gemeinsamer Helfer - inwx_domains(_owner).py: Domains alphabetisch, NS, Gruppen via citeq-TXT (DNS) - hosting.kdbx aus Tracking genommen und in .gitignore Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
+10
@@ -0,0 +1,10 @@
|
||||
# KeePass-Datenbank mit Zugangsdaten – gehört NICHT ins Repo
|
||||
hosting.kdbx
|
||||
|
||||
# Zwischengespeicherte INWX-Session (enthält ein kurzlebiges Session-Cookie)
|
||||
.inwx_session
|
||||
|
||||
# Python-venv und Cache
|
||||
venv/
|
||||
__pycache__/
|
||||
*.pyc
|
||||
@@ -0,0 +1,3 @@
|
||||
10 gruppe1
|
||||
20 gruppe2
|
||||
30 gruppe3
|
||||
Binary file not shown.
+14
-47
@@ -1,40 +1,17 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
import argparse
|
||||
|
||||
# Import-Fix für die Library-Struktur
|
||||
try:
|
||||
from INWX.Domrobot import ApiClient
|
||||
except ImportError:
|
||||
from inwx.Domrobot import ApiClient
|
||||
from inwx_common import get_client, call_api
|
||||
|
||||
# --- KONFIGURATION ---
|
||||
API_URL = 'https://api.domrobot.com'
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
KP_DB_PATH = os.path.join(BASE_DIR, "hosting.kdbx")
|
||||
KP_ENTRY_NAME = "inwx"
|
||||
DOMAIN = "ma151.de"
|
||||
# Login/Session-Handling steckt in inwx_common.py.
|
||||
|
||||
def get_creds():
|
||||
"""Holt die Daten sicher aus KeePassXC."""
|
||||
try:
|
||||
cmd = ["keepassxc-cli", "show", "-s", KP_DB_PATH, KP_ENTRY_NAME]
|
||||
output = subprocess.check_output(cmd, text=True)
|
||||
creds = {}
|
||||
for line in output.splitlines():
|
||||
if ":" in line:
|
||||
k, v = line.split(":", 1)
|
||||
creds[k.strip().lower()] = v.strip()
|
||||
return creds.get("username") or creds.get("benutzername"), creds.get("password") or creds.get("passwort")
|
||||
except Exception as e:
|
||||
print(f"❌ KeePass-Fehler: {e}")
|
||||
return None, None
|
||||
|
||||
def main():
|
||||
# 1. Argument-Parser einrichten
|
||||
parser = argparse.ArgumentParser(description=f"DNS-Record zu {DOMAIN} hinzufügen.")
|
||||
parser = argparse.ArgumentParser(description="DNS-Record zu einer Domain hinzufügen.")
|
||||
parser.add_argument("domain", help="Domain (z.B. ma151.de)")
|
||||
parser.add_argument("type", help="Typ des Records (z.B. A, AAAA, CNAME, TXT)")
|
||||
parser.add_argument("name", help="Name/Subdomain (z.B. srv1)")
|
||||
parser.add_argument("content", help="Wert des Records (z.B. die IP-Adresse)")
|
||||
@@ -42,27 +19,16 @@ def main():
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# 2. Zugangsdaten laden
|
||||
user, password = get_creds()
|
||||
if not user or not password:
|
||||
sys.exit(1)
|
||||
# 2. Client holen (nutzt ggf. die zwischengespeicherte Session)
|
||||
api = get_client()
|
||||
|
||||
api = ApiClient(api_url=API_URL)
|
||||
|
||||
# 3. Login
|
||||
login_res = api.login(user, password)
|
||||
if login_res['code'] != 1000:
|
||||
print(f"❌ Login fehlgeschlagen: {login_res['msg']}")
|
||||
sys.exit(1)
|
||||
|
||||
# 4. Prüfen, ob der Record existiert
|
||||
full_name = f"{args.name}.{DOMAIN}"
|
||||
# 3. Prüfen, ob der Record existiert
|
||||
full_name = f"{args.name}.{args.domain}"
|
||||
print(f"🔍 Prüfe {full_name} ({args.type} -> {args.content})...")
|
||||
|
||||
info_res = api.call_api('nameserver.info', {'domain': DOMAIN})
|
||||
info_res = call_api(api, 'nameserver.info', {'domain': args.domain})
|
||||
if info_res['code'] != 1000:
|
||||
print(f"❌ Fehler bei Domain-Info: {info_res['msg']}")
|
||||
api.logout()
|
||||
sys.exit(1)
|
||||
|
||||
records = info_res['resData'].get('record', [])
|
||||
@@ -77,10 +43,10 @@ def main():
|
||||
if already_exists:
|
||||
print(f"ℹ️ Record existiert bereits. Keine Aktion erforderlich.")
|
||||
else:
|
||||
# 5. Record erstellen
|
||||
# 4. Record erstellen
|
||||
print(f"🚀 Erstelle neuen Record...")
|
||||
create_res = api.call_api('nameserver.createRecord', {
|
||||
'domain': DOMAIN,
|
||||
create_res = call_api(api, 'nameserver.createRecord', {
|
||||
'domain': args.domain,
|
||||
'name': args.name,
|
||||
'type': args.type.upper(),
|
||||
'content': args.content,
|
||||
@@ -92,7 +58,8 @@ def main():
|
||||
else:
|
||||
print(f"❌ Fehler: {create_res['msg']} (Code: {create_res['code']})")
|
||||
|
||||
api.logout()
|
||||
# Kein logout() -> die Session bleibt gültig und kann wiederverwendet werden.
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
+159
@@ -0,0 +1,159 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Gemeinsame Helfer für die INWX-Skripte.
|
||||
|
||||
Kapselt das Laden der Zugangsdaten aus KeePassXC sowie den Login mit
|
||||
Session-Caching: Nach dem ersten Login wird das Session-Cookie von INWX
|
||||
lokal zwischengespeichert. Folgeaufrufe nutzen diese Session wieder und
|
||||
benötigen daher weder das KeePass-Master-Passwort noch einen neuen Login,
|
||||
solange die Session bei INWX noch gültig ist.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import requests
|
||||
|
||||
# Import-Fix für die Library-Struktur (Linux/Mac)
|
||||
try:
|
||||
from INWX.Domrobot import ApiClient
|
||||
except ImportError:
|
||||
from inwx.Domrobot import ApiClient
|
||||
|
||||
# --- KONFIGURATION ---
|
||||
API_URL = 'https://api.domrobot.com'
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
KP_DB_PATH = os.path.join(BASE_DIR, "hosting.kdbx")
|
||||
KP_ENTRY_NAME = "inwx"
|
||||
# Datei mit der zwischengespeicherten Session (enthält nur ein kurzlebiges
|
||||
# Session-Cookie, niemals das Passwort).
|
||||
SESSION_CACHE_PATH = os.path.join(BASE_DIR, ".inwx_session")
|
||||
|
||||
|
||||
def get_creds():
|
||||
"""Holt Benutzername und Passwort aus KeePassXC.
|
||||
|
||||
Das KeePass-Master-Passwort wird interaktiv abgefragt. Ist die
|
||||
Umgebungsvariable KP_PW gesetzt, wird deren Wert stattdessen automatisch
|
||||
an keepassxc-cli übergeben (praktisch in nicht-interaktiven Umgebungen).
|
||||
"""
|
||||
try:
|
||||
# -s sorgt dafür, dass das Passwort nicht als 'PROTECTED' ausgegeben wird
|
||||
cmd = ["keepassxc-cli", "show", "-s", KP_DB_PATH, KP_ENTRY_NAME]
|
||||
|
||||
master_pw = os.environ.get("KP_PW")
|
||||
if master_pw:
|
||||
# Master-Passwort über stdin an keepassxc-cli durchreichen.
|
||||
result = subprocess.run(
|
||||
cmd, input=master_pw + "\n",
|
||||
capture_output=True, text=True,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
print(f"❌ KeePass-Fehler: {result.stderr.strip()}")
|
||||
return None, None
|
||||
output = result.stdout
|
||||
else:
|
||||
# Kein KP_PW gesetzt -> keepassxc-cli fragt interaktiv nach.
|
||||
output = subprocess.check_output(cmd, text=True)
|
||||
|
||||
creds = {}
|
||||
for line in output.splitlines():
|
||||
if ":" in line:
|
||||
k, v = line.split(":", 1)
|
||||
creds[k.strip().lower()] = v.strip()
|
||||
|
||||
user = creds.get("username") or creds.get("benutzername") or creds.get("user")
|
||||
password = creds.get("password") or creds.get("passwort")
|
||||
return user, password
|
||||
except Exception as e:
|
||||
print(f"❌ KeePass-Fehler: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
def _load_cached_cookies(api):
|
||||
"""Lädt gespeicherte Session-Cookies in den ApiClient. True bei Erfolg."""
|
||||
if not os.path.exists(SESSION_CACHE_PATH):
|
||||
return False
|
||||
try:
|
||||
with open(SESSION_CACHE_PATH, "r") as f:
|
||||
cookies = json.load(f)
|
||||
if not cookies:
|
||||
return False
|
||||
api.api_session.cookies = requests.utils.cookiejar_from_dict(cookies)
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _save_cached_cookies(api):
|
||||
"""Speichert die Session-Cookies (Datei nur für den Benutzer lesbar, 0600)."""
|
||||
try:
|
||||
cookies = requests.utils.dict_from_cookiejar(api.api_session.cookies)
|
||||
fd = os.open(SESSION_CACHE_PATH, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
||||
with os.fdopen(fd, "w") as f:
|
||||
json.dump(cookies, f)
|
||||
except Exception as e:
|
||||
print(f"⚠️ Konnte Session nicht zwischenspeichern: {e}")
|
||||
|
||||
|
||||
def _clear_cached_session():
|
||||
"""Entfernt eine (abgelaufene) zwischengespeicherte Session."""
|
||||
try:
|
||||
os.remove(SESSION_CACHE_PATH)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _do_login(api):
|
||||
"""Frischer Login via KeePass-Zugangsdaten; speichert die Session. True bei Erfolg."""
|
||||
user, password = get_creds()
|
||||
if not user or not password:
|
||||
return False
|
||||
|
||||
print(f"Versuche Login für: {user}")
|
||||
login_res = api.login(user, password)
|
||||
if login_res['code'] != 1000:
|
||||
print(f"❌ Login fehlgeschlagen: {login_res['msg']} (Code: {login_res['code']})")
|
||||
return False
|
||||
|
||||
_save_cached_cookies(api)
|
||||
print("✅ Login erfolgreich (Session zwischengespeichert).")
|
||||
return True
|
||||
|
||||
|
||||
def get_client():
|
||||
"""Liefert einen einsatzbereiten ApiClient.
|
||||
|
||||
Nutzt eine zwischengespeicherte Session, falls vorhanden. Ob diese noch
|
||||
gültig ist, wird erst beim ersten echten Aufruf via call_api() geprüft –
|
||||
so sparen wir uns eine zusätzliche Anfrage. Ist keine Session vorhanden,
|
||||
wird sofort via KeePass eingeloggt.
|
||||
"""
|
||||
api = ApiClient(api_url=API_URL)
|
||||
if _load_cached_cookies(api):
|
||||
print("ℹ️ Verwende zwischengespeicherte Session.")
|
||||
return api
|
||||
if not _do_login(api):
|
||||
sys.exit(1)
|
||||
return api
|
||||
|
||||
|
||||
def call_api(api, method, params=None):
|
||||
"""Wie ApiClient.call_api, meldet sich aber bei abgelaufener Session neu an.
|
||||
|
||||
Liefert INWX einen Autorisierungsfehler (Code 2200–2299), wird die
|
||||
zwischengespeicherte Session verworfen, via KeePass neu eingeloggt und
|
||||
der Aufruf einmal wiederholt.
|
||||
"""
|
||||
params = params or {}
|
||||
res = api.call_api(method, params)
|
||||
code = res.get('code')
|
||||
if code is not None and 2200 <= code < 2300:
|
||||
print("ℹ️ Session abgelaufen, melde neu an...")
|
||||
_clear_cached_session()
|
||||
if not _do_login(api):
|
||||
sys.exit(1)
|
||||
res = api.call_api(method, params)
|
||||
return res
|
||||
+156
@@ -0,0 +1,156 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Listet alle für den Benutzer sichtbaren Domains alphabetisch sortiert auf."""
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from inwx_common import get_client, call_api
|
||||
|
||||
PAGE_LIMIT = 100 # Domains pro API-Seite
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
OWNER_FILE = os.path.join(BASE_DIR, "domain_owner")
|
||||
TXT_PREFIX = "citeq:" # Präfix der Gruppen-TXT-Einträge
|
||||
|
||||
|
||||
def fetch_all_domains(api):
|
||||
"""Holt alle Domains über alle Seiten hinweg (domain.list ist paginiert)."""
|
||||
domains = []
|
||||
page = 1
|
||||
while True:
|
||||
res = call_api(api, 'domain.list', {'page': page, 'pageLimit': PAGE_LIMIT})
|
||||
if res['code'] != 1000:
|
||||
print(f"❌ API Fehler: {res['msg']} (Code: {res['code']})")
|
||||
sys.exit(1)
|
||||
|
||||
data = res['resData']
|
||||
domains.extend(data.get('domain', []))
|
||||
|
||||
# Solange wir noch nicht alle laut 'count' geladen haben, weiterblättern.
|
||||
if len(domains) >= data.get('count', 0) or not data.get('domain'):
|
||||
break
|
||||
page += 1
|
||||
|
||||
return domains
|
||||
|
||||
|
||||
def get_nameservers(api, domain):
|
||||
"""Ermittelt die zuständigen Nameserver (Registry-Delegation) einer Domain.
|
||||
|
||||
Nutzt domain.info -> Feld 'ns', das die delegierten Nameserver enthält und
|
||||
auch dann funktioniert, wenn die DNS-Zone nicht bei INWX gehostet wird.
|
||||
Liefert eine sortierte Liste der NS-Hostnamen oder None bei Fehler.
|
||||
"""
|
||||
res = call_api(api, 'domain.info', {'domain': domain})
|
||||
if res['code'] != 1000:
|
||||
return None
|
||||
|
||||
ns = res['resData'].get('ns') or []
|
||||
return sorted(n.rstrip('.') for n in ns)
|
||||
|
||||
|
||||
def load_owners():
|
||||
"""Liest die Zuordnung Ziffer -> Klarname aus der Datei domain_owner.
|
||||
|
||||
Format pro Zeile: <ziffer><Whitespace><klarname>, wobei als Trenner
|
||||
beliebiger Whitespace dient (ein oder mehrere Tabs/Leerzeichen). Der
|
||||
Klarname darf selbst Leerzeichen enthalten.
|
||||
Leere Zeilen und Kommentarzeilen (#) werden ignoriert.
|
||||
"""
|
||||
owners = {}
|
||||
if not os.path.exists(OWNER_FILE):
|
||||
print(f"⚠️ Datei {OWNER_FILE} nicht gefunden – Gruppen werden ohne Klarnamen angezeigt.")
|
||||
return owners
|
||||
try:
|
||||
with open(OWNER_FILE, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
parts = line.split(None, 1) # an erstem Whitespace (Tab/Space) trennen
|
||||
if len(parts) == 2:
|
||||
ziffer, klarname = parts
|
||||
owners[ziffer.strip()] = klarname.strip()
|
||||
except Exception as e:
|
||||
print(f"⚠️ Konnte {OWNER_FILE} nicht lesen: {e}")
|
||||
return owners
|
||||
|
||||
|
||||
def get_group(domain):
|
||||
"""Ermittelt die Gruppen-Ziffer aus dem citeq:-TXT-Eintrag einer Domain.
|
||||
|
||||
Nutzt eine echte DNS-Abfrage (dig +short TXT) und funktioniert daher
|
||||
unabhängig davon, wo die DNS-Zone gehostet wird. Liefert die Ziffer als
|
||||
String oder None, wenn kein passender TXT-Eintrag gefunden wird.
|
||||
"""
|
||||
try:
|
||||
out = subprocess.run(
|
||||
["dig", "+short", "TXT", domain],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
print("⚠️ 'dig' nicht gefunden – Gruppen können nicht ermittelt werden.")
|
||||
return None
|
||||
except subprocess.TimeoutExpired:
|
||||
return None
|
||||
|
||||
if out.returncode != 0:
|
||||
return None
|
||||
|
||||
for line in out.stdout.splitlines():
|
||||
# dig liefert TXT-Werte in Anführungszeichen, lange Werte ggf. in
|
||||
# mehreren Teilen ("teil1" "teil2") -> zusammenfügen und entkleiden.
|
||||
content = line.replace('" "', '').strip().strip('"')
|
||||
if content.startswith(TXT_PREFIX):
|
||||
return content[len(TXT_PREFIX):].strip()
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
api = get_client()
|
||||
|
||||
print("Rufe Domain-Liste ab...")
|
||||
domains = fetch_all_domains(api)
|
||||
|
||||
if not domains:
|
||||
print("ℹ️ Keine Domains gefunden.")
|
||||
return
|
||||
|
||||
# Zuordnung Ziffer -> Klarname laden
|
||||
owners = load_owners()
|
||||
|
||||
# Alphabetisch sortieren (Groß-/Kleinschreibung ignorieren)
|
||||
domains.sort(key=lambda d: d['domain'].lower())
|
||||
|
||||
print(f"\n{'DOMAIN':<35} {'STATUS':<8} {'GRUPPE':<18} {'NAMESERVER'}")
|
||||
print("-" * 100)
|
||||
for d in domains:
|
||||
name = d.get('domain', '')
|
||||
status = d.get('status', '')
|
||||
|
||||
# Gruppen-Ziffer aus citeq:-TXT (per DNS) ermitteln und Klarnamen zuordnen
|
||||
ziffer = get_group(name)
|
||||
if ziffer is None:
|
||||
group = "-"
|
||||
elif ziffer in owners:
|
||||
group = f"{ziffer} ({owners[ziffer]})"
|
||||
else:
|
||||
group = f"{ziffer} (?)"
|
||||
|
||||
# Zuständige Nameserver pro Domain ermitteln
|
||||
ns_list = get_nameservers(api, name)
|
||||
if ns_list is None:
|
||||
ns = "(nicht abrufbar)"
|
||||
elif not ns_list:
|
||||
ns = "(keine NS-Einträge gefunden)"
|
||||
else:
|
||||
ns = ", ".join(ns_list)
|
||||
|
||||
print(f"{name:<35} {status:<8} {group:<18} {ns}")
|
||||
|
||||
print(f"\nGesamt: {len(domains)} Domain(s)")
|
||||
|
||||
# Kein logout() -> die Session bleibt gültig und kann wiederverwendet werden.
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,181 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Listet die Domains gruppiert nach den Einträgen in domain_owner auf.
|
||||
|
||||
Reihenfolge richtet sich nach der Datei domain_owner: zuerst der 1. Eintrag
|
||||
(Ziffer + Klarname), darunter alle Domains mit dieser Ziffer, dann eine
|
||||
Leerzeile, dann der 2. Eintrag usw. Domains ohne passende Zuordnung werden
|
||||
am Ende separat aufgeführt.
|
||||
"""
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from inwx_common import get_client, call_api
|
||||
|
||||
PAGE_LIMIT = 100 # Domains pro API-Seite
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
OWNER_FILE = os.path.join(BASE_DIR, "domain_owner")
|
||||
TXT_PREFIX = "citeq:" # Präfix der Gruppen-TXT-Einträge
|
||||
|
||||
|
||||
def fetch_all_domains(api):
|
||||
"""Holt alle Domains über alle Seiten hinweg (domain.list ist paginiert)."""
|
||||
domains = []
|
||||
page = 1
|
||||
while True:
|
||||
res = call_api(api, 'domain.list', {'page': page, 'pageLimit': PAGE_LIMIT})
|
||||
if res['code'] != 1000:
|
||||
print(f"❌ API Fehler: {res['msg']} (Code: {res['code']})")
|
||||
sys.exit(1)
|
||||
|
||||
data = res['resData']
|
||||
domains.extend(data.get('domain', []))
|
||||
|
||||
# Solange wir noch nicht alle laut 'count' geladen haben, weiterblättern.
|
||||
if len(domains) >= data.get('count', 0) or not data.get('domain'):
|
||||
break
|
||||
page += 1
|
||||
|
||||
return domains
|
||||
|
||||
|
||||
def get_nameservers(api, domain):
|
||||
"""Ermittelt die zuständigen Nameserver (Registry-Delegation) einer Domain.
|
||||
|
||||
Nutzt domain.info -> Feld 'ns', das die delegierten Nameserver enthält und
|
||||
auch dann funktioniert, wenn die DNS-Zone nicht bei INWX gehostet wird.
|
||||
Liefert eine sortierte Liste der NS-Hostnamen oder None bei Fehler.
|
||||
"""
|
||||
res = call_api(api, 'domain.info', {'domain': domain})
|
||||
if res['code'] != 1000:
|
||||
return None
|
||||
|
||||
ns = res['resData'].get('ns') or []
|
||||
return sorted(n.rstrip('.') for n in ns)
|
||||
|
||||
|
||||
def load_owners():
|
||||
"""Liest die Zuordnung Ziffer -> Klarname aus der Datei domain_owner.
|
||||
|
||||
Die Reihenfolge der Einträge aus der Datei bleibt erhalten (dict behält
|
||||
seit Python 3.7 die Einfügereihenfolge). Format pro Zeile:
|
||||
<ziffer><Whitespace><klarname>, Trenner ist beliebiger Whitespace (ein
|
||||
oder mehrere Tabs/Leerzeichen); der Klarname darf Leerzeichen enthalten.
|
||||
Leere Zeilen und Kommentarzeilen (#) werden ignoriert.
|
||||
"""
|
||||
owners = {}
|
||||
if not os.path.exists(OWNER_FILE):
|
||||
print(f"⚠️ Datei {OWNER_FILE} nicht gefunden – keine Gruppierung möglich.")
|
||||
return owners
|
||||
try:
|
||||
with open(OWNER_FILE, "r") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
parts = line.split(None, 1) # an erstem Whitespace trennen
|
||||
if len(parts) == 2:
|
||||
ziffer, klarname = parts
|
||||
owners[ziffer.strip()] = klarname.strip()
|
||||
except Exception as e:
|
||||
print(f"⚠️ Konnte {OWNER_FILE} nicht lesen: {e}")
|
||||
return owners
|
||||
|
||||
|
||||
def get_group(domain):
|
||||
"""Ermittelt die Gruppen-Ziffer aus dem citeq:-TXT-Eintrag einer Domain.
|
||||
|
||||
Nutzt eine echte DNS-Abfrage (dig +short TXT) und funktioniert daher
|
||||
unabhängig davon, wo die DNS-Zone gehostet wird. Liefert die Ziffer als
|
||||
String oder None, wenn kein passender TXT-Eintrag gefunden wird.
|
||||
"""
|
||||
try:
|
||||
out = subprocess.run(
|
||||
["dig", "+short", "TXT", domain],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
except FileNotFoundError:
|
||||
print("⚠️ 'dig' nicht gefunden – Gruppen können nicht ermittelt werden.")
|
||||
return None
|
||||
except subprocess.TimeoutExpired:
|
||||
return None
|
||||
|
||||
if out.returncode != 0:
|
||||
return None
|
||||
|
||||
for line in out.stdout.splitlines():
|
||||
# dig liefert TXT-Werte in Anführungszeichen, lange Werte ggf. in
|
||||
# mehreren Teilen ("teil1" "teil2") -> zusammenfügen und entkleiden.
|
||||
content = line.replace('" "', '').strip().strip('"')
|
||||
if content.startswith(TXT_PREFIX):
|
||||
return content[len(TXT_PREFIX):].strip()
|
||||
return None
|
||||
|
||||
|
||||
def format_domain_line(rec):
|
||||
"""Einzeilige Darstellung einer Domain (eingerückt unter der Gruppe)."""
|
||||
if rec['ns'] is None:
|
||||
ns = "(nicht abrufbar)"
|
||||
elif not rec['ns']:
|
||||
ns = "(keine NS-Einträge gefunden)"
|
||||
else:
|
||||
ns = ", ".join(rec['ns'])
|
||||
return f" {rec['name']:<35} {rec['status']:<8} {ns}"
|
||||
|
||||
|
||||
def main():
|
||||
api = get_client()
|
||||
|
||||
print("Rufe Domain-Liste ab...")
|
||||
domains = fetch_all_domains(api)
|
||||
|
||||
if not domains:
|
||||
print("ℹ️ Keine Domains gefunden.")
|
||||
return
|
||||
|
||||
# Zuordnung Ziffer -> Klarname laden (in Dateireihenfolge)
|
||||
owners = load_owners()
|
||||
|
||||
# Pro Domain einmal Ziffer (DNS) und Nameserver (API) ermitteln
|
||||
records = []
|
||||
for d in domains:
|
||||
name = d.get('domain', '')
|
||||
records.append({
|
||||
'name': name,
|
||||
'status': d.get('status', ''),
|
||||
'ziffer': get_group(name),
|
||||
'ns': get_nameservers(api, name),
|
||||
})
|
||||
|
||||
# Alphabetisch sortieren (Groß-/Kleinschreibung ignorieren)
|
||||
records.sort(key=lambda r: r['name'].lower())
|
||||
|
||||
# Ausgabe in der Reihenfolge der Einträge aus domain_owner
|
||||
print()
|
||||
for ziffer, klarname in owners.items():
|
||||
print(f"{ziffer} ({klarname})")
|
||||
matching = [r for r in records if r['ziffer'] == ziffer]
|
||||
if matching:
|
||||
for rec in matching:
|
||||
print(format_domain_line(rec))
|
||||
else:
|
||||
print(" (keine Domains)")
|
||||
print() # Leerzeile zwischen den Gruppen
|
||||
|
||||
# Domains, die keiner Ziffer aus domain_owner zugeordnet sind
|
||||
rest = [r for r in records if r['ziffer'] not in owners]
|
||||
if rest:
|
||||
print("Ohne Zuordnung")
|
||||
for rec in rest:
|
||||
ziffer = rec['ziffer']
|
||||
hinweis = "" if ziffer is None else f" [citeq:{ziffer} unbekannt]"
|
||||
print(format_domain_line(rec) + hinweis)
|
||||
print()
|
||||
|
||||
print(f"Gesamt: {len(records)} Domain(s)")
|
||||
|
||||
# Kein logout() -> die Session bleibt gültig und kann wiederverwendet werden.
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+25
-59
@@ -1,78 +1,44 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
# Import-Fix für Linux/Mac
|
||||
try:
|
||||
from INWX.Domrobot import ApiClient
|
||||
except ImportError:
|
||||
from inwx.Domrobot import ApiClient
|
||||
from inwx_common import get_client, call_api
|
||||
|
||||
# --- KONFIGURATION ---
|
||||
API_URL = 'https://api.domrobot.com'
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
KP_DB_PATH = os.path.join(BASE_DIR, "hosting.kdbx")
|
||||
KP_ENTRY_NAME = "inwx"
|
||||
DOMAIN = "ma151.de"
|
||||
# Login/Session-Handling steckt in inwx_common.py.
|
||||
|
||||
def get_creds():
|
||||
"""Holt die echten Daten (inkl. Passwort) aus KeePassXC."""
|
||||
try:
|
||||
# -s sorgt dafür, dass das Passwort nicht als 'PROTECTED' ausgegeben wird
|
||||
cmd = ["keepassxc-cli", "show", "-s", KP_DB_PATH, KP_ENTRY_NAME]
|
||||
output = subprocess.check_output(cmd, text=True)
|
||||
|
||||
creds = {}
|
||||
for line in output.splitlines():
|
||||
if ":" in line:
|
||||
k, v = line.split(":", 1)
|
||||
creds[k.strip().lower()] = v.strip()
|
||||
|
||||
user = creds.get("username") or creds.get("benutzername") or creds.get("user")
|
||||
password = creds.get("password") or creds.get("passwort")
|
||||
|
||||
return user, password
|
||||
except Exception as e:
|
||||
print(f"❌ KeePass-Fehler: {e}")
|
||||
return None, None
|
||||
|
||||
def main():
|
||||
user, password = get_creds()
|
||||
if not user or not password:
|
||||
# Domain als Argument einlesen
|
||||
if len(sys.argv) < 2:
|
||||
script = os.path.basename(sys.argv[0])
|
||||
print(f"Verwendung: {script} <domain>")
|
||||
print(f"Beispiel: {script} ma151.de")
|
||||
sys.exit(1)
|
||||
|
||||
# ApiClient mit der korrekten URL (Library fügt /xmlrpc/ an)
|
||||
api = ApiClient(api_url=API_URL)
|
||||
domain = sys.argv[1]
|
||||
|
||||
print(f"Versuche Login für: {user}")
|
||||
# Holt einen einsatzbereiten Client (nutzt ggf. die zwischengespeicherte Session)
|
||||
api = get_client()
|
||||
|
||||
# Login
|
||||
login_res = api.login(user, password)
|
||||
print(f"Rufe Records für {domain} ab...")
|
||||
try:
|
||||
res = call_api(api, 'nameserver.info', {'domain': domain})
|
||||
|
||||
if login_res['code'] == 1000:
|
||||
print("✅ Login erfolgreich!")
|
||||
print(f"Rufe Records für {DOMAIN} ab...")
|
||||
if res['code'] == 1000:
|
||||
records = res['resData']['record']
|
||||
print(f"\n{'TYP':<8} {'NAME':<35} {'WERT'}")
|
||||
print("-" * 75)
|
||||
for rec in records:
|
||||
print(f"{rec['type']:<8} {rec['name']:<35} {rec['content']}")
|
||||
else:
|
||||
print(f"❌ API Fehler: {res['msg']} (Code: {res['code']})")
|
||||
|
||||
try:
|
||||
# Deine Library-Version nutzt laut DEBUG 'call_api'
|
||||
res = api.call_api('nameserver.info', {'domain': DOMAIN})
|
||||
except Exception as e:
|
||||
print(f"❌ Fehler beim Abruf mit call_api: {e}")
|
||||
|
||||
if res['code'] == 1000:
|
||||
records = res['resData']['record']
|
||||
print(f"\n{'TYP':<8} {'NAME':<35} {'WERT'}")
|
||||
print("-" * 75)
|
||||
for rec in records:
|
||||
print(f"{rec['type']:<8} {rec['name']:<35} {rec['content']}")
|
||||
else:
|
||||
print(f"❌ API Fehler: {res['msg']} (Code: {res['code']})")
|
||||
# Kein logout() -> die Session bleibt gültig und kann wiederverwendet werden.
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Fehler beim Abruf mit call_api: {e}")
|
||||
|
||||
api.logout()
|
||||
else:
|
||||
print(f"❌ Login fehlgeschlagen: {login_res['msg']} (Code: {login_res['code']})")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user