ansible: fix ci-instances playbook for Incus
[lttng-ci.git] / automation / ansible / scripts / update_dns_entry.py
... / ...
CommitLineData
1#!/usr/bin/env python3
2#
3
4import argparse
5import ipaddress
6import subprocess
7
8import dns.message
9import dns.query
10import dns.resolver
11
12
13def get_argument_parser():
14 parser = argparse.ArgumentParser(
15 prog="update_dns_entry.py",
16 description="Generate fixed-address DHCP configuration based for hosts based on DNS entries",
17 )
18 parser.add_argument(
19 "-s", "--server", default=None, required=True, help="Server for DNS updates"
20 )
21 parser.add_argument(
22 "-u", "--user", default=None, help="The user to use with samba-tool"
23 )
24 parser.add_argument(
25 "-z", "--zone", required=True, help="The zone in which to update the entry"
26 )
27 parser.add_argument("-n", "--name", required=True, help="DNS entry name")
28 parser.add_argument("-v", "--value", required=True, help="DNS entry value")
29 parser.add_argument("-t", "--type", default="A", help="Entry type")
30 return parser
31
32
33def update_dns_entry(
34 server, zone, name, entry_type, value, user=None, with_reverse=True
35):
36 if entry_type == "A":
37 assert ipaddress.ip_address(value)
38 try:
39 server_ip = str(ipaddress.ip_address(server))
40 except ValueError:
41 server_ip = dns.resolver.resolve(server)[0].to_text()
42
43 commands = []
44 # Verify existing entry
45 query = dns.message.make_query(".".join([name, zone]), entry_type)
46 record = dns.query.udp(query, server_ip)
47 if len(record.answer) == 0:
48 # Create
49 argv = ["samba-tool", "dns", "add", server, zone, name, entry_type, value]
50 if user is not None:
51 argv += ["-U", user]
52 commands.append(argv)
53 else:
54 assert len(record.answer) == 1
55 # Check validity
56 existing = (record.answer)[0][0].to_text()
57 if existing != value:
58 # Update
59 argv = [
60 "samba-tool",
61 "dns",
62 "update",
63 server,
64 zone,
65 name,
66 entry_type,
67 existing,
68 value,
69 ]
70 if user is not None:
71 argv += ["-U", user]
72 commands.append(argv)
73
74 # Check reverse
75 if with_reverse and entry_type == "A":
76 rname, rzone = ipaddress.ip_address(value).reverse_pointer.split(".", 1)
77 rvalue = ".".join([name, zone]) + "."
78 rtype = "PTR"
79 query = dns.message.make_query(
80 ipaddress.ip_address(value).reverse_pointer, rtype
81 )
82 record = dns.query.udp(query, server_ip)
83 if len(record.answer) == 0:
84 argv = ["samba-tool", "dns", "add", server, rzone, rname, rtype, rvalue]
85 if user is not None:
86 argv += ["-U", user]
87 commands.append(argv)
88 else:
89 assert len(record.answer) == 1
90 existing = (record.answer)[0][0].to_text()
91 if existing != value:
92 argv = [
93 "samba-tool",
94 "dns",
95 "update",
96 server,
97 rzone,
98 rname,
99 rtype,
100 existing,
101 rvalue,
102 ]
103 if user is not None:
104 argv += ["-U", user]
105 commands.append(argv)
106
107 # Run commands
108 for command in commands:
109 subprocess.run(command, check=True)
110
111
112if __name__ == "__main__":
113 parser = get_argument_parser()
114 args = parser.parse_args()
115 update_dns_entry(
116 args.server, args.zone, args.name, args.type, args.value, user=args.user
117 )
This page took 0.023313 seconds and 5 git commands to generate.