22995b68f4
Ermittelt Zonendatei aus named.conf, fügt Record hinzu, erhöht SOA-Serial, validiert mit named-checkzone und lädt per rndc reload neu. Läuft als root. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
181 lines
6.0 KiB
Python
181 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
||
"""Legt einen DNS-Record in einer lokalen BIND9-Master-Zone an.
|
||
|
||
Ablauf: Zonendatei aus named.conf.local ermitteln -> Record (falls noch nicht
|
||
vorhanden) anhängen -> SOA-Serial erhöhen -> mit named-checkzone validieren ->
|
||
mit rndc reload neu laden. Bei DNSSEC-Zonen (inline-signing) signiert BIND nach
|
||
dem Reload automatisch neu.
|
||
|
||
Muss als root laufen (Zonendatei schreiben + rndc): sudo python3 bind9_add_record.py ...
|
||
"""
|
||
import argparse
|
||
import datetime
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import sys
|
||
|
||
# --- KONFIGURATION ---
|
||
NAMED_CONF_LOCAL = "/etc/bind/named.conf.local"
|
||
NAMED_CONF_OPTIONS = "/etc/bind/named.conf.options"
|
||
RNDC = "/usr/sbin/rndc"
|
||
NAMED_CHECKZONE = "/usr/bin/named-checkzone"
|
||
|
||
|
||
def read_file(path):
|
||
with open(path, "r") as f:
|
||
return f.read()
|
||
|
||
|
||
def get_directory():
|
||
"""Liest options { directory "..."; } aus named.conf.options."""
|
||
text = read_file(NAMED_CONF_OPTIONS)
|
||
m = re.search(r'directory\s+"([^"]+)"', text)
|
||
return m.group(1) if m else "."
|
||
|
||
|
||
def get_zone_file(zone):
|
||
"""Ermittelt den Pfad der Zonendatei für eine Zone aus named.conf.local."""
|
||
text = read_file(NAMED_CONF_LOCAL)
|
||
# Beginn des zone-Blocks finden: zone "<zone>" [in] {
|
||
head = re.search(r'zone\s+"' + re.escape(zone) + r'"\s*(?:in\s+)?\{', text, re.I)
|
||
if not head:
|
||
sys.exit(f"❌ Zone '{zone}' nicht in {NAMED_CONF_LOCAL} gefunden.")
|
||
# Passende schließende Klammer per Klammerzählung suchen
|
||
# (der Block enthält verschachtelte { } wie bei allow-transfer { ... }).
|
||
depth, i = 0, head.end() - 1
|
||
while i < len(text):
|
||
if text[i] == "{":
|
||
depth += 1
|
||
elif text[i] == "}":
|
||
depth -= 1
|
||
if depth == 0:
|
||
break
|
||
i += 1
|
||
block = text[head.end():i]
|
||
fm = re.search(r'file\s+"([^"]+)"', block)
|
||
if not fm:
|
||
sys.exit(f"❌ Keine 'file'-Angabe für Zone '{zone}' gefunden.")
|
||
path = fm.group(1)
|
||
if not os.path.isabs(path):
|
||
path = os.path.join(get_directory(), path)
|
||
return path
|
||
|
||
|
||
def bump_serial(text):
|
||
"""Erhöht die SOA-Serial (Format YYYYMMDDnn). Liefert (text, alt, neu)."""
|
||
today = int(datetime.date.today().strftime("%Y%m%d")) * 100
|
||
|
||
# 1) explizit kommentierte Serial: 2026053101 ; Serial
|
||
m = re.search(r'(\d+)(\s*;\s*[Ss]erial)', text)
|
||
# 2) mehrzeilige SOA mit Klammer: SOA ... ( \n 2026053101
|
||
if not m:
|
||
m = re.search(r'(\bSOA\b[^(]*\(\s*)(\d+)', text, re.I)
|
||
if m:
|
||
old = int(m.group(2))
|
||
new = max(old + 1, today + 1)
|
||
text = text[:m.start(2)] + str(new) + text[m.end(2):]
|
||
return text, old, new
|
||
# 3) einzeilige SOA: SOA mname rname serial refresh ...
|
||
m = re.search(r'(\bSOA\b\s+\S+\s+\S+\s+)(\d+)', text, re.I)
|
||
if not m:
|
||
sys.exit("❌ Konnte die SOA-Serial nicht finden.")
|
||
old = int(m.group(2))
|
||
new = max(old + 1, today + 1)
|
||
text = text[:m.start(2)] + str(new) + text[m.end(2):]
|
||
return text, old, new
|
||
|
||
old = int(m.group(1))
|
||
new = max(old + 1, today + 1)
|
||
text = text[:m.start(1)] + str(new) + text[m.end(1):]
|
||
return text, old, new
|
||
|
||
|
||
def record_exists(text, name, rtype, content):
|
||
"""Grobe Prüfung, ob ein gleichwertiger Record bereits in der Zone steht."""
|
||
rtype = rtype.upper()
|
||
for line in text.splitlines():
|
||
s = line.strip()
|
||
if not s or s.startswith(";"):
|
||
continue
|
||
toks = s.split()
|
||
if not toks:
|
||
continue
|
||
if toks[0] == name and rtype in (t.upper() for t in toks) and content in s:
|
||
return True
|
||
return False
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="DNS-Record in einer BIND9-Master-Zone anlegen.")
|
||
parser.add_argument("zone", help="Zonenname (z.B. maierch.de)")
|
||
parser.add_argument("name", help="Record-Name relativ zur Zone (z.B. srv9) oder @ / FQDN mit Punkt")
|
||
parser.add_argument("type", help="Record-Typ (A, AAAA, CNAME, TXT, MX, ...)")
|
||
parser.add_argument("content", help="Record-Wert (z.B. die IP-Adresse)")
|
||
parser.add_argument("--ttl", type=int, default=None, help="TTL in Sekunden (Standard: Zonen-$TTL)")
|
||
args = parser.parse_args()
|
||
|
||
if os.geteuid() != 0:
|
||
sys.exit("❌ Bitte mit root-Rechten ausführen: sudo python3 bind9_add_record.py ...")
|
||
|
||
rtype = args.type.upper()
|
||
zone_file = get_zone_file(args.zone)
|
||
print(f"📄 Zonendatei: {zone_file}")
|
||
|
||
text = read_file(zone_file)
|
||
|
||
# Duplikatsprüfung
|
||
if record_exists(text, args.name, rtype, args.content):
|
||
print(f"ℹ️ Record existiert bereits: {args.name} {rtype} {args.content} – keine Aktion.")
|
||
return
|
||
|
||
# Record-Zeile bauen
|
||
if args.ttl is not None:
|
||
record = f"{args.name}\t{args.ttl}\tIN\t{rtype}\t{args.content}"
|
||
else:
|
||
record = f"{args.name}\tIN\t{rtype}\t{args.content}"
|
||
|
||
if not text.endswith("\n"):
|
||
text += "\n"
|
||
text += record + "\n"
|
||
|
||
# SOA-Serial erhöhen
|
||
text, old_serial, new_serial = bump_serial(text)
|
||
print(f"🔢 Serial: {old_serial} -> {new_serial}")
|
||
print(f"➕ Neuer Record: {record.replace(chr(9), ' ')}")
|
||
|
||
# Sicherung anlegen
|
||
backup = zone_file + ".bak"
|
||
shutil.copy2(zone_file, backup)
|
||
|
||
# Schreiben
|
||
with open(zone_file, "w") as f:
|
||
f.write(text)
|
||
|
||
# Validieren
|
||
check = subprocess.run(
|
||
[NAMED_CHECKZONE, args.zone, zone_file],
|
||
capture_output=True, text=True,
|
||
)
|
||
if check.returncode != 0:
|
||
print("❌ named-checkzone meldet Fehler – stelle die Sicherung wieder her:")
|
||
print(check.stdout + check.stderr)
|
||
shutil.move(backup, zone_file)
|
||
sys.exit(1)
|
||
print(f"✅ named-checkzone OK")
|
||
|
||
# Neu laden
|
||
reload_res = subprocess.run([RNDC, "reload", args.zone], capture_output=True, text=True)
|
||
if reload_res.returncode != 0:
|
||
print("❌ rndc reload fehlgeschlagen:")
|
||
print(reload_res.stdout + reload_res.stderr)
|
||
sys.exit(1)
|
||
|
||
os.remove(backup)
|
||
print(f"✅ Zone '{args.zone}' neu geladen. Record ist aktiv.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|