Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 43 additions & 10 deletions sssd_test_framework/misc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,34 +198,67 @@ def seconds_to_timespan(seconds: int) -> str:
return f"{d:02d}:{h:02d}:{m:02d}:{s:02d}:00"


def ip_to_ptr(ip_address: str) -> str:
def ip_to_ptr(ip_address: str, prefixlen: int | None = None) -> str:
"""
Get the reverse pointer from given address.

:param ip_address: Address.
:type ip_address: str
:param prefixlen: Prefix length, optional
:type prefixlen: int | None = None
:return: Reverse pointer.
:rtype: str
"""
ip = ipaddress.ip_address(ip_address)
network = ipaddress.ip_network(ip)

if prefixlen is not None:
network = network.supernet(new_prefix=prefixlen)
elif ip.version == 4:
prefixlen = 24
else:
prefixlen = 64

if prefixlen > network.max_prefixlen:
raise ValueError(f"Prefix {prefixlen} too large for {ip.version}!")

subnet = network.supernet(new_prefix=prefixlen)

if ip.version == 4:
octets = ip.packed
ptr = f"{octets[2]}.{octets[1]}.{octets[0]}.in-addr.arpa."
elif ip.version == 6:
hex_parts = ip.exploded.replace(":", "").lower()
ptr = f"{hex_parts[::-1]}.ip6.arpa."
octets = subnet.network_address.packed
parts = [str(octets[i]) for i in range((prefixlen // 8) - 1, -1, -1)]
return ".".join(parts) + ".in-addr.arpa"
else:
raise ValueError("Unsupported IP version")
return ptr
hex_str = subnet.network_address.exploded.replace(":", "").lower()
nibbles = list(hex_str)
relevant_nibbles = nibbles[: prefixlen // 4]
reversed_nibbles = ".".join(relevant_nibbles[::-1])
return f"{reversed_nibbles}.ip6.arpa"


def ip_is_valid(ip: str) -> bool:
"""
Check ip is valid.

:param ip: IP address.
:type ip: str
:return: True, false if str is not an ip.
:rtype: bool
"""
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False


def ip_version(ip_address: str) -> int | None:
"""
Parse str and return the IP version.

::param ip_address: IP address.
:param ip_address: IP address.
:type ip_address: str
:return: IP version or None if not found.
:return: IP version or None if not found.
:rtype: int | None
"""
try:
Expand Down
54 changes: 40 additions & 14 deletions sssd_test_framework/roles/ad.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
attrs_include_value,
attrs_parse,
attrs_to_hash,
ip_to_ptr,
ip_version,
seconds_to_timespan,
)
Expand Down Expand Up @@ -2347,32 +2348,57 @@ def add_record(self, name: str, data: str | int) -> ADDNSZone:
"""
args = ""

if self.domain not in name:
name = f"{name}.{self.domain}"
short_name = name.split(".")[0]

if isinstance(data, int):
args = f"-Ptr -Name {str(data)} -AllowUpdateAny -PtrDomainName {name}.{self.zone_name}"
args = f"-Ptr -Name {str(data)} -AllowUpdateAny -PtrDomainName {name}."
elif isinstance(data, str) and ip_version(data) == 4:
args = f"-A -Name {name} -IPv4Address {data}"
args = f"-A -Name {short_name} -IPv4Address {data}"
elif isinstance(data, str) and ip_version(data) == 6:
args = f"-A -Name {name} -IPv6Address {data}"
args = f"-A -Name {short_name} -IPv6Address {data}"

self.host.conn.run(f"Add-DnsServerResourceRecord -ZoneName {self.zone_name} {args} ")
return self

def delete_record(self, name: str) -> None:
"""
Delete DNS record.
Delete DNS record, both forward and reverse records are deleted.

:param name: Name of the record.
:param name: Name or IP of the record.
:type name: str
"""
if "in-addr" in self.zone_name:
record_type = "PTR"
else:
data = self.host.conn.run(f"dig +short {name}").stdout.strip()
record_type = "AAAA" if ":" in data else "A"
if self.domain not in name:
name = f"{name}.{self.domain}"

self.host.conn.run(
f"Remove-DnsServerResourceRecord -ZoneName {self.zone_name} -Name {name} -RRType {record_type} -Force"
)
records = self.host.conn.run(f"dig +short +norecurse {name} '@{self.server}'").stdout_lines
records = [s.rstrip("\r") for s in records]

if not isinstance(records, list) or records is None:
return None

if len(records) > 1:
for record in records:
if ip_version(record) == 4:
self.role.host.conn.run(
f"Remove-DnsServerResourceRecord -RRType A -Force -ZoneName {self.zone_name} -Name {name}"
)
if ip_version(record) == 6:
self.host.conn.run(
f"Remove-DnsServerResourceRecord -RRType AAAA -Force -ZoneName {self.zone_name} -Name {name}"
)

for ptr_records in records:
ptr_record = self.host.conn.run(f"dig +short -x +norecurse {ptr_records} '@{self.server}'").stdout_lines
ptr_record = [r.rstrip("\r") for r in ptr_record]
if ptr_record:
self.host.conn.run(
f"Remove-DnsServerResourceRecord -RRType PTR "
f"-Force -ZoneName {ip_to_ptr(ptr_record[0])} "
f"-Name {'.'.join(ptr_record).split()[-1]}",
)
return None

def print(self) -> str:
"""
Expand All @@ -2381,7 +2407,7 @@ def print(self) -> str:
:return: Print zone data.
:rtype: str
"""
return self.host.conn.run(f"Get-DnsServerResourceRecord -ZoneName {self.zone_name}").stdout
return self.host.conn.run(f"Get-DnsServerResourceRecord -ZoneName {self.zone_name} | Format-List").stdout


ADNetgroupMember: TypeAlias = LDAPNetgroupMember[ADUser, ADNetgroup]
Expand Down
2 changes: 1 addition & 1 deletion sssd_test_framework/roles/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ def add_record(self, name: str, data: str | int) -> GenericDNSZone:
@abstractmethod
def delete_record(self, name: str) -> None:
"""
Delete DNS record.
Delete DNS record, both forward and reverse records are deleted.

:param name: Name of the record.
:type name: str
Expand Down
39 changes: 33 additions & 6 deletions sssd_test_framework/roles/ipa.py
Original file line number Diff line number Diff line change
Expand Up @@ -2867,7 +2867,8 @@ def create(self) -> IPADNSZone:
:return: IPADNSServer object.
:rtype: IPADNSServer
"""
self.host.conn.run(f"ipa dnszone-add {self.zone_name} --dynamic-update=TRUE --skip-overlap-check")
self.host.conn.run(f"ipa dnszone-add {self.zone_name} --skip-overlap-check")
self.host.conn.run(f"ipa dnszone-mod {self.zone_name} --dynamic-update=TRUE --allow-sync-ptr=TRUE")
return self

def delete(self) -> None:
Expand All @@ -2891,26 +2892,52 @@ def add_record(self, name: str, data: str | int) -> IPADNSZone:
:rtype: IPADNSZone
"""
args = ""
if self.domain not in name:
name = f"{name}.{self.domain}"
short_name = name.split(".")[0]

if isinstance(data, int):
args = f"{str(data)} --ptr-rec={name}."
elif isinstance(data, str) and ip_version(data) == 4:
args = f"{name} --a-rec={data}"
args = f"{short_name} --a-rec={data}"
elif isinstance(data, str) and ip_version(data) == 6:
args = f"{name} --aaaa-rec={data}"
args = f"{short_name} --aaaa-rec={data}"

self.host.conn.run(f"ipa dnsrecord-add {self.zone_name} {args}")

return self

def delete_record(self, name: str) -> None:
"""
Delete DNS record.
Delete DNS record, both forward and reverse records are deleted.

:param name: Name of the record.
:param name: Name.
:type name: str
"""
self.host.conn.run(f"ipa dnsrecord-del {self.zone_name} {name}")
if self.domain not in name:
name = f"{name}.{self.domain}"

records = self.host.conn.run(f"dig +short +norecurse {name} '@{self.server}'").stdout_lines
records = [s.rstrip("\r") for s in records]

if not isinstance(records, list) or records is None:
return None

if len(records) > 1:
for record in records:
if ip_version(record) == 4:
self.host.conn.run(f"ipa dnsrecord-del {self.zone_name} --a-rec={record}")
if ip_version(record) == 6:
self.host.conn.run(f"ipa dnsrecord-del {self.zone_name} --aaaa-rec={record}")

for ptr_records in records:
ptr_record = self.host.conn.run(f"dig +short -x +norecurse {ptr_records} '@{self.server}'").stdout_lines
ptr_record = [r.rstrip("\r") for r in ptr_record]
if ptr_record:
self.host.conn.run(f"ipa dnsrecord-del {self.zone_name} --ptr-rec={record}")
return None

time.sleep(5) # Wait for the record to be deleted

def print(self) -> str:
"""
Expand Down
55 changes: 37 additions & 18 deletions sssd_test_framework/roles/samba.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pytest_mh.conn import ProcessResult

from ..hosts.samba import SambaHost
from ..misc import attrs_parse, ip_version, to_list_of_strings
from ..misc import attrs_parse, ip_to_ptr, ip_version, to_list_of_strings
from ..utils.ldap import LDAPRecordAttributes
from .base import BaseLinuxLDAPRole, BaseObject, DeleteAttribute
from .generic import GenericPasswordPolicy
Expand Down Expand Up @@ -1360,37 +1360,56 @@ def add_record(self, name: str, data: str) -> SambaDNSZone:
:rtype: SambaDNSZone
"""
args = ""
if self.domain not in name:
name = f"{name}.{self.domain}"
short_name = name.split(".")[0]

if isinstance(data, int):
args = f" {name} PTR {str(data)} {self.credentials}"
args = f" {name}. PTR {str(data)} {self.credentials}"
elif isinstance(data, str) and ip_version(data) == 4:
args = f" {name} A {data} {self.credentials}"
args = f" {short_name} A {data} {self.credentials}"
elif isinstance(data, str) and ip_version(data) == 6:
args = f" {name} AAAA {data} {self.credentials}"
args = f" {short_name} AAAA {data} {self.credentials}"

self.host.conn.run(f"samba-tool dns add {self.server} {self.zone_name} {args}")
return self

def delete_record(self, name: str) -> None:
"""
Delete DNS record.
Delete DNS record, both forward and reverse records are deleted.

:param name: Name of the record.
:type name: str
"""
if "in-addr" in self.zone_name:
record_type = "PTR"
data = self.host.conn.run(f"dig -x +short {name}").stdout.strip()
else:
data = self.host.conn.run(f"dig +short {name}").stdout.strip()
record_type = "AAAA" if ":" in data else "A"

self.role.host.conn.run(
f"samba-tool dns delete "
f"{self.server} {self.zone_name} "
f"{name} {record_type} {data} "
f"{self.credentials}"
)
if self.domain not in name:
name = f"{name}.{self.domain}"

records = self.host.conn.run(f"dig +short +norecurse {name} '@{self.server}'").stdout_lines
records = [s.rstrip("\r") for s in records]

if not isinstance(records, list) or records is None:
return None

if len(records) > 1:
for record in records:
if ip_version(record) == 4:
self.role.host.conn.run(
f"samba-tool dns delete {self.server} {self.zone_name} {name} A {record} {self.credentials}"
)
if ip_version(record) == 6:
self.host.conn.run(
f"samba-tool dns delete {self.server} {self.zone_name} {name} AAAA {record} {self.credentials}"
)

for ptr_records in records:
ptr_record = self.host.conn.run(f"dig +short -x +norecurse {ptr_records} '@{self.server}'").stdout_lines
ptr_record = [r.rstrip("\r") for r in ptr_record]
if ptr_record:
self.host.conn.run(
f"samba-tool dns delete {self.server} {ip_to_ptr(ptr_record[0])} {name} "
f"PTR {ptr_record} {self.credentials}"
)
return None

def print(self) -> str:
"""
Expand Down
Loading
Loading