From 275d7823ae241c537e04ddbdcbb36ad240257cb5 Mon Sep 17 00:00:00 2001 From: sup3rDav3 Date: Sun, 5 Apr 2026 23:16:45 -0400 Subject: [PATCH 1/2] feat: add --shares-depth flag for subdirectory write permission checking - Adds --shares-depth argument to SMB protocol (default: 0, root only) - When depth > 0, recursively checks subdirectories for write access - Only runs subdir check when root write check fails (no performance impact by default) - Reports writable subdirectories inline e.g. READ,WRITE (ad.local\scripts) - Respects --no-write-check flag - Compatible with --shares write filter - Fixes: write permissions were only checked on top-level share directories - Tested against Windows Server 2022 - detected writable SYSVOL\scripts subdir Closes #1186 --- nxc/protocols/smb.py | 1178 ++++++++++++++++++++++++------- nxc/protocols/smb/proto_args.py | 499 +++++++++++-- 2 files changed, 1363 insertions(+), 314 deletions(-) diff --git a/nxc/protocols/smb.py b/nxc/protocols/smb.py index 3270356d52..14dffae6b4 100755 --- a/nxc/protocols/smb.py +++ b/nxc/protocols/smb.py @@ -18,7 +18,7 @@ from impacket.examples.regsecrets import ( RemoteOperations as RegSecretsRemoteOperations, SAMHashes as RegSecretsSAMHashes, - LSASecrets as RegSecretsLSASecrets + LSASecrets as RegSecretsLSASecrets, ) from impacket.nmb import NetBIOSError, NetBIOSTimeout from impacket.dcerpc.v5 import transport, lsat, lsad, scmr, rrp, srvs, wkst @@ -34,17 +34,33 @@ from impacket.krb5 import constants from impacket.dcerpc.v5.dtypes import NULL from impacket.dcerpc.v5.dcomrt import DCOMConnection -from impacket.dcerpc.v5.dcom.wmi import CLSID_WbemLevel1Login, IID_IWbemLevel1Login, IWbemLevel1Login -from impacket.smb3structs import FILE_SHARE_WRITE, FILE_SHARE_DELETE, SMB2_0_IOCTL_IS_FSCTL +from impacket.dcerpc.v5.dcom.wmi import ( + CLSID_WbemLevel1Login, + IID_IWbemLevel1Login, + IWbemLevel1Login, +) +from impacket.smb3structs import ( + FILE_SHARE_WRITE, + FILE_SHARE_DELETE, + SMB2_0_IOCTL_IS_FSCTL, +) from impacket.dcerpc.v5 import tsts as TSTS from nxc.config import process_secret, host_info_colors, check_guest_account from nxc.connection import connection, sem, requires_admin, dcom_FirewallChecker from nxc.helpers.misc import gen_random_string, validate_ntlm from nxc.logger import NXCAdapter -from nxc.protocols.smb.dpapi import collect_masterkeys_from_target, get_domain_backup_key, upgrade_to_dploot_connection +from nxc.protocols.smb.dpapi import ( + collect_masterkeys_from_target, + get_domain_backup_key, + upgrade_to_dploot_connection, +) from nxc.protocols.smb.firefox import FirefoxCookie, FirefoxData, FirefoxTriage -from nxc.protocols.smb.kerberos import kerberos_login_with_S4U, kerberos_altservice, get_realm_from_ticket +from nxc.protocols.smb.kerberos import ( + kerberos_login_with_S4U, + kerberos_altservice, + get_realm_from_ticket, +) from nxc.protocols.smb.wmiexec import WMIEXEC from nxc.protocols.smb.atexec import TSCH_EXEC from nxc.protocols.smb.smbexec import SMBEXEC @@ -118,8 +134,8 @@ def __init__(self, args, db, host): self.nthash = "" self.remote_ops = None self.bootkey = None - self.smbv1 = None # Check if SMBv1 is supported - self.smbv3 = None # Check if SMBv3 is supported + self.smbv1 = None # Check if SMBv1 is supported + self.smbv3 = None # Check if SMBv3 is supported self.is_timed_out = False self.signing = False self.smb_share_name = smb_share_name @@ -153,7 +169,10 @@ def get_os_arch(self): dce.set_auth_type(RPC_C_AUTHN_GSS_NEGOTIATE) dce.connect() try: - dce.bind(MSRPC_UUID_PORTMAP, transfer_syntax=("71710533-BEBA-4937-8319-B5DBEF9CCC36", "1.0")) + dce.bind( + MSRPC_UUID_PORTMAP, + transfer_syntax=("71710533-BEBA-4937-8319-B5DBEF9CCC36", "1.0"), + ) except DCERPCException as e: if str(e).find("syntaxes_not_supported") >= 0: dce.disconnect() @@ -194,21 +213,28 @@ def enum_host_info(self): if not self.no_ntlm: self.hostname = self.conn.getServerName() self.targetDomain = self.conn.getServerDNSDomainName() - if not self.targetDomain: # Not sure if that can even happen but now we are safe + if ( + not self.targetDomain + ): # Not sure if that can even happen but now we are safe self.targetDomain = self.hostname else: try: self.is_host_dc() # If we know the host is a DC we can still get the hostname over LDAP if NTLM is not available if self.isdc and detect_if_ip(self.host): - self.hostname, self.domain = LDAPResolution(self.host).get_resolution() + self.hostname, self.domain = LDAPResolution( + self.host + ).get_resolution() self.targetDomain = self.domain # If we can't authenticate with NTLM and the target is supplied as a FQDN we must parse it else: # Check if the host is a valid IP address, if not we parse the FQDN in the Exception import socket + socket.inet_aton(self.host) - self.logger.debug("NTLM authentication not available! Authentication will fail without a valid hostname and domain name") + self.logger.debug( + "NTLM authentication not available! Authentication will fail without a valid hostname and domain name" + ) self.hostname = self.host self.targetDomain = self.host except OSError: @@ -225,7 +251,9 @@ def enum_host_info(self): if self.args.domain: self.domain = self.args.domain - elif self.args.use_kcache: # Fixing domain trust, just pull the auth domain out of the ticket + elif ( + self.args.use_kcache + ): # Fixing domain trust, just pull the auth domain out of the ticket self.domain = CCache.parseFile()[0] else: self.domain = self.targetDomain @@ -249,16 +277,28 @@ def enum_host_info(self): if isinstance(self.server_os.lower(), bytes): self.server_os = self.server_os.decode("utf-8") - if "Windows 6.1" in self.server_os and self.server_os_build == 0 and self.os_arch == 0: + if ( + "Windows 6.1" in self.server_os + and self.server_os_build == 0 + and self.os_arch == 0 + ): self.server_os = "Unix - Samba" elif self.server_os_build == 0 and self.os_arch == 0: self.server_os = "Unix" - self.logger.debug(f"Server OS: {self.server_os} {self.server_os_major}.{self.server_os_minor} build {self.server_os_build}") + self.logger.debug( + f"Server OS: {self.server_os} {self.server_os_major}.{ + self.server_os_minor + } build {self.server_os_build}" + ) self.logger.extra["hostname"] = self.hostname try: - self.signing = self.conn.isSigningRequired() if self.smbv1 else self.conn._SMBConnection._Connection["RequireSigning"] + self.signing = ( + self.conn.isSigningRequired() + if self.smbv1 + else self.conn._SMBConnection._Connection["RequireSigning"] + ) except Exception as e: self.logger.debug(e) @@ -283,21 +323,55 @@ def enum_host_info(self): self.logger.debug(f"Error adding host {self.host} into db: {e!s}") # DCOM connection with kerberos needed - self.remoteName = self.host if not self.kerberos else f"{self.hostname}.{self.targetDomain}" + self.remoteName = ( + self.host if not self.kerberos else f"{self.hostname}.{self.targetDomain}" + ) # using kdcHost is buggy on impacket when using trust relation between ad so we kdcHost must stay to none if targetdomain is not equal to domain if not self.kdcHost and self.domain and self.domain == self.targetDomain: result = self.resolver(self.domain) self.kdcHost = result["host"] if result else None - self.logger.info(f"Resolved domain: {self.domain} with dns, kdcHost: {self.kdcHost}") + self.logger.info( + f"Resolved domain: {self.domain} with dns, kdcHost: {self.kdcHost}" + ) def print_host_info(self): - signing = colored(f"signing:{self.signing}", host_info_colors[0], attrs=["bold"]) if self.signing else colored(f"signing:{self.signing}", host_info_colors[1], attrs=["bold"]) - smbv1 = colored(f"SMBv1:{self.smbv1}", host_info_colors[2], attrs=["bold"]) if self.smbv1 else colored(f"SMBv1:{self.smbv1}", host_info_colors[3], attrs=["bold"]) - ntlm = colored(f" (NTLM:{not self.no_ntlm})", host_info_colors[2], attrs=["bold"]) if self.no_ntlm else "" - null_auth = colored(f" (Null Auth:{self.null_auth})", host_info_colors[2], attrs=["bold"]) if self.null_auth else "" - guest = colored(f" (Guest Auth:{self.is_guest})", host_info_colors[1], attrs=["bold"]) if self.is_guest else "" - self.logger.display(f"{self.server_os}{f' x{self.os_arch}' if self.os_arch else ''} (name:{self.hostname}) (domain:{self.targetDomain}) ({signing}) ({smbv1}){ntlm}{null_auth}{guest}") + signing = ( + colored(f"signing:{self.signing}", host_info_colors[0], attrs=["bold"]) + if self.signing + else colored(f"signing:{self.signing}", host_info_colors[1], attrs=["bold"]) + ) + smbv1 = ( + colored(f"SMBv1:{self.smbv1}", host_info_colors[2], attrs=["bold"]) + if self.smbv1 + else colored(f"SMBv1:{self.smbv1}", host_info_colors[3], attrs=["bold"]) + ) + ntlm = ( + colored(f" (NTLM:{not self.no_ntlm})", host_info_colors[2], attrs=["bold"]) + if self.no_ntlm + else "" + ) + null_auth = ( + colored( + f" (Null Auth:{self.null_auth})", host_info_colors[2], attrs=["bold"] + ) + if self.null_auth + else "" + ) + guest = ( + colored( + f" (Guest Auth:{self.is_guest})", host_info_colors[1], attrs=["bold"] + ) + if self.is_guest + else "" + ) + self.logger.display( + f"{self.server_os}{f' x{self.os_arch}' if self.os_arch else ''} (name:{ + self.hostname + }) (domain:{self.targetDomain}) ({signing}) ({smbv1}){ntlm}{null_auth}{ + guest + }" + ) if self.args.generate_hosts_file or self.args.generate_krb5_file: if self.isdc is None: @@ -305,8 +379,16 @@ def print_host_info(self): if self.args.generate_hosts_file: with open(self.args.generate_hosts_file, "a+") as host_file: dc_part = f" {self.targetDomain}" if self.isdc else "" - host_file.write(f"{self.host} {self.hostname}.{self.targetDomain}{dc_part} {self.hostname}\n") - self.logger.debug(f"Line added to {self.args.generate_hosts_file} {self.host} {self.hostname}.{self.targetDomain}{dc_part} {self.hostname}") + host_file.write( + f"{self.host} {self.hostname}.{self.targetDomain}{dc_part} { + self.hostname + }\n" + ) + self.logger.debug( + f"Line added to {self.args.generate_hosts_file} {self.host} { + self.hostname + }.{self.targetDomain}{dc_part} {self.hostname}" + ) elif self.args.generate_krb5_file and self.isdc: with open(self.args.generate_krb5_file, "w+") as host_file: data = dedent(f""" @@ -328,12 +410,27 @@ def print_host_info(self): """).strip() host_file.write(data) self.logger.debug(data) - self.logger.success(f"krb5 conf saved to: {self.args.generate_krb5_file}") - self.logger.success(f"Run the following command to use the conf file: export KRB5_CONFIG={self.args.generate_krb5_file}") + self.logger.success( + f"krb5 conf saved to: {self.args.generate_krb5_file}" + ) + self.logger.success( + f"Run the following command to use the conf file: export KRB5_CONFIG={ + self.args.generate_krb5_file + }" + ) return self.host, self.hostname, self.targetDomain - def kerberos_login(self, domain, username, password="", ntlm_hash="", aesKey="", kdcHost="", useCache=False): + def kerberos_login( + self, + domain, + username, + password="", + ntlm_hash="", + aesKey="", + kdcHost="", + useCache=False, + ): self.logger.debug(f"KDC set to: {kdcHost}") # Re-connect since we logged off self.create_conn_obj() @@ -359,15 +456,37 @@ def kerberos_login(self, domain, username, password="", ntlm_hash="", aesKey="", kerb_pass = next(s for s in [self.nthash, password, aesKey] if s) else: kerb_pass = "" - self.logger.debug(f"Attempting to do Kerberos Login with useCache: {useCache}") + self.logger.debug( + f"Attempting to do Kerberos Login with useCache: {useCache}" + ) tgs = None if self.args.delegate: kerb_pass = "" self.username = self.args.delegate - serverName = Principal(self.args.delegate_spn if self.args.delegate_spn else f"cifs/{self.remoteName}", type=constants.PrincipalNameType.NT_SRV_INST.value) - tgs, sk = kerberos_login_with_S4U(domain, self.hostname, username, password, nthash, lmhash, aesKey, kdcHost, self.args.delegate, serverName, useCache, no_s4u2proxy=self.args.no_s4u2proxy) - self.logger.debug(f"TGS obtained for {self.args.delegate} for {serverName}") + serverName = Principal( + self.args.delegate_spn + if self.args.delegate_spn + else f"cifs/{self.remoteName}", + type=constants.PrincipalNameType.NT_SRV_INST.value, + ) + tgs, sk = kerberos_login_with_S4U( + domain, + self.hostname, + username, + password, + nthash, + lmhash, + aesKey, + kdcHost, + self.args.delegate, + serverName, + useCache, + no_s4u2proxy=self.args.no_s4u2proxy, + ) + self.logger.debug( + f"TGS obtained for {self.args.delegate} for {serverName}" + ) spn = f"cifs/{self.remoteName}" if self.args.delegate_spn: @@ -377,7 +496,17 @@ def kerberos_login(self, domain, username, password="", ntlm_hash="", aesKey="", if self.args.generate_st: self.save_st(tgs, sk, spn if self.args.delegate_spn else None) - self.conn.kerberosLogin(self.username, password, domain, lmhash, nthash, aesKey, kdcHost, useCache=useCache, TGS=tgs) + self.conn.kerberosLogin( + self.username, + password, + domain, + lmhash, + nthash, + aesKey, + kdcHost, + useCache=useCache, + TGS=tgs, + ) if "Unix" not in self.server_os: self.check_if_admin() @@ -386,17 +515,25 @@ def kerberos_login(self, domain, username, password="", ntlm_hash="", aesKey="", elif not self.args.delegate: self.username = username - used_ccache = " from ccache" if useCache else f":{process_secret(kerb_pass)}" + used_ccache = ( + " from ccache" if useCache else f":{process_secret(kerb_pass)}" + ) if self.args.delegate: used_ccache = f" through S4U with {username}" if self.args.delegate_spn: - used_ccache = f" through S4U with {username} (w/ SPN {self.args.delegate_spn})" + used_ccache = ( + f" through S4U with {username} (w/ SPN {self.args.delegate_spn})" + ) out = f"{self.domain}\\{self.username}{used_ccache} {self.mark_pwned()}" self.logger.success(out) - if not self.args.local_auth and self.username != "" and not self.args.delegate: + if ( + not self.args.local_auth + and self.username != "" + and not self.args.delegate + ): add_user_bh(self.username, domain, self.logger, self.config) if self.admin_privs: add_user_bh(f"{self.hostname}$", domain, self.logger, self.config) @@ -417,25 +554,33 @@ def kerberos_login(self, domain, username, password="", ntlm_hash="", aesKey="", self.logger.fail(f"CCache Error: {e}") return False except OSError as e: - used_ccache = " from ccache" if useCache else f":{process_secret(kerb_pass)}" + used_ccache = ( + " from ccache" if useCache else f":{process_secret(kerb_pass)}" + ) if self.args.delegate: used_ccache = f" through S4U with {username}" self.logger.fail(f"{domain}\\{self.username}{used_ccache} {e}") return False except SessionError as e: error, desc = e.getErrorString() - used_ccache = " from ccache" if useCache else f":{process_secret(kerb_pass)}" + used_ccache = ( + " from ccache" if useCache else f":{process_secret(kerb_pass)}" + ) if self.args.delegate: used_ccache = f" through S4U with {username}" self.logger.fail( - f"{domain}\\{self.username}{used_ccache} {error} {f'({desc})' if self.args.verbose else ''}", + f"{domain}\\{self.username}{used_ccache} {error} { + f'({desc})' if self.args.verbose else '' + }", color="magenta" if error in smb_error_status else "red", ) if error not in smb_error_status: self.inc_failed_login(username) return False except (ConnectionResetError, NetBIOSTimeout, NetBIOSError) as e: - used_ccache = " from ccache" if useCache else f":{process_secret(kerb_pass)}" + used_ccache = ( + " from ccache" if useCache else f":{process_secret(kerb_pass)}" + ) if self.args.delegate: used_ccache = f" through S4U with {username}" desc = e.getErrorString() if hasattr(e, "getErrorString") else str(e) @@ -451,7 +596,9 @@ def plaintext_login(self, domain, username, password): self.domain = domain self.conn.login(self.username, self.password, domain) - self.logger.debug(f"Logged in with password to SMB with {domain}/{self.username}") + self.logger.debug( + f"Logged in with password to SMB with {domain}/{self.username}" + ) self.is_guest = bool(self.conn.isGuestSession()) self.logger.debug(f"{self.is_guest=}") if "Unix" not in self.server_os: @@ -460,19 +607,40 @@ def plaintext_login(self, domain, username, password): # Only do database/bloodhound stuff if we don't have guest/null auth valid_auth = not self.is_guest and self.username if valid_auth: - self.logger.debug(f"Adding credential: {domain}/{self.username}:{self.password}") - self.db.add_credential("plaintext", domain, self.username, self.password) - user_id = self.db.get_credential("plaintext", domain, self.username, self.password) + self.logger.debug( + f"Adding credential: {domain}/{self.username}:{self.password}" + ) + self.db.add_credential( + "plaintext", domain, self.username, self.password + ) + user_id = self.db.get_credential( + "plaintext", domain, self.username, self.password + ) host_id = self.db.get_hosts(self.host)[0].id self.db.add_loggedin_relation(user_id, host_id) if self.admin_privs and valid_auth: - self.logger.debug(f"Adding admin user: {self.domain}/{self.username}:{self.password}@{self.host}") - self.db.add_admin_user("plaintext", domain, self.username, self.password, self.host, user_id=user_id) + self.logger.debug( + f"Adding admin user: {self.domain}/{self.username}:{self.password}@{ + self.host + }" + ) + self.db.add_admin_user( + "plaintext", + domain, + self.username, + self.password, + self.host, + user_id=user_id, + ) add_user_bh(f"{self.hostname}$", domain, self.logger, self.config) if not self.args.local_auth and valid_auth: add_user_bh(self.username, self.domain, self.logger, self.config) - self.logger.success(f"{domain}\\{self.username}:{process_secret(self.password)} {self.mark_guest()}{self.mark_pwned()}") + self.logger.success( + f"{domain}\\{self.username}:{process_secret(self.password)} { + self.mark_guest() + }{self.mark_pwned()}" + ) # check https://github.com/byt3bl33d3r/CrackMapExec/issues/321 if self.args.continue_on_success and self.signing: @@ -482,17 +650,25 @@ def plaintext_login(self, domain, username, password): except SessionError as e: error, desc = e.getErrorString() self.logger.fail( - f'{domain}\\{self.username}:{process_secret(self.password)} {error} {f"({desc})" if self.args.verbose else ""}', + f"{domain}\\{self.username}:{process_secret(self.password)} {error} { + f'({desc})' if self.args.verbose else '' + }", color="magenta" if error in smb_error_status else "red", ) - if error in ["STATUS_PASSWORD_MUST_CHANGE", "STATUS_PASSWORD_EXPIRED", "STATUS_NOLOGON_WORKSTATION_TRUST_ACCOUNT"] and self.args.module == ["change-password"]: + if error in [ + "STATUS_PASSWORD_MUST_CHANGE", + "STATUS_PASSWORD_EXPIRED", + "STATUS_NOLOGON_WORKSTATION_TRUST_ACCOUNT", + ] and self.args.module == ["change-password"]: return True if error not in smb_error_status: self.inc_failed_login(username) return False except (ConnectionResetError, NetBIOSTimeout, NetBIOSError) as e: desc = e.getErrorString() if hasattr(e, "getErrorString") else str(e) - self.logger.fail(f"{domain}\\{self.username}:{process_secret(self.password)} {desc}") + self.logger.fail( + f"{domain}\\{self.username}:{process_secret(self.password)} {desc}" + ) return False except BrokenPipeError: self.logger.fail("Broken Pipe Error while attempting to login") @@ -519,7 +695,9 @@ def hash_login(self, domain, username, ntlm_hash): self.nthash = nthash self.conn.login(self.username, "", domain, lmhash, nthash) - self.logger.debug(f"Logged in with hash to SMB with {domain}/{self.username}") + self.logger.debug( + f"Logged in with hash to SMB with {domain}/{self.username}" + ) self.is_guest = bool(self.conn.isGuestSession()) self.logger.debug(f"{self.is_guest=}") if "Unix" not in self.server_os: @@ -529,16 +707,24 @@ def hash_login(self, domain, username, ntlm_hash): valid_auth = not self.is_guest and self.username and self.hash if valid_auth: self.db.add_credential("hash", domain, self.username, self.hash) - user_id = self.db.get_credential("hash", domain, self.username, self.hash) + user_id = self.db.get_credential( + "hash", domain, self.username, self.hash + ) host_id = self.db.get_hosts(self.host)[0].id self.db.add_loggedin_relation(user_id, host_id) if self.admin_privs and valid_auth: - self.db.add_admin_user("hash", domain, self.username, nthash, self.host, user_id=user_id) + self.db.add_admin_user( + "hash", domain, self.username, nthash, self.host, user_id=user_id + ) add_user_bh(f"{self.hostname}$", domain, self.logger, self.config) if not self.args.local_auth and valid_auth: add_user_bh(self.username, self.domain, self.logger, self.config) - self.logger.success(f"{domain}\\{self.username}:{process_secret(self.hash)} {self.mark_guest()}{self.mark_pwned()}") + self.logger.success( + f"{domain}\\{self.username}:{process_secret(self.hash)} { + self.mark_guest() + }{self.mark_pwned()}" + ) # check https://github.com/byt3bl33d3r/CrackMapExec/issues/321 if self.args.continue_on_success and self.signing: @@ -548,17 +734,25 @@ def hash_login(self, domain, username, ntlm_hash): except SessionError as e: error, desc = e.getErrorString() self.logger.fail( - f"{domain}\\{self.username}:{process_secret(self.hash)} {error} {f'({desc})' if self.args.verbose else ''}", + f"{domain}\\{self.username}:{process_secret(self.hash)} {error} { + f'({desc})' if self.args.verbose else '' + }", color="magenta" if error in smb_error_status else "red", ) - if error in ["STATUS_PASSWORD_MUST_CHANGE", "STATUS_PASSWORD_EXPIRED", "STATUS_NOLOGON_WORKSTATION_TRUST_ACCOUNT"] and self.args.module == ["change-password"]: + if error in [ + "STATUS_PASSWORD_MUST_CHANGE", + "STATUS_PASSWORD_EXPIRED", + "STATUS_NOLOGON_WORKSTATION_TRUST_ACCOUNT", + ] and self.args.module == ["change-password"]: return True if error not in smb_error_status: self.inc_failed_login(self.username) return False except (ConnectionResetError, NetBIOSTimeout, NetBIOSError) as e: desc = e.getErrorString() if hasattr(e, "getErrorString") else str(e) - self.logger.fail(f"{domain}\\{self.username}:{process_secret(self.password)} {desc}") + self.logger.fail( + f"{domain}\\{self.username}:{process_secret(self.password)} {desc}" + ) return False except BrokenPipeError: self.logger.fail("Broken Pipe Error while attempting to login") @@ -645,7 +839,9 @@ def check_if_admin(self): if self.args.no_admin_check: return self.logger.debug(f"Checking if user is admin on {self.host}") - rpctransport = SMBTransport(self.conn.getRemoteHost(), 445, r"\svcctl", smb_connection=self.conn) + rpctransport = SMBTransport( + self.conn.getRemoteHost(), 445, r"\svcctl", smb_connection=self.conn + ) dce = rpctransport.get_dce_rpc() try: dce.connect() @@ -657,7 +853,9 @@ def check_if_admin(self): try: # 0xF003F - SC_MANAGER_ALL_ACCESS # http://msdn.microsoft.com/en-us/library/windows/desktop/ms685981(v=vs.85).aspx - scmrobj = scmr.hROpenSCManagerW(dce, f"{self.host}\x00", "ServicesActive\x00", 0xF003F) + scmrobj = scmr.hROpenSCManagerW( + dce, f"{self.host}\x00", "ServicesActive\x00", 0xF003F + ) scmr.hREnumServicesStatusW(dce, scmrobj["lpScHandle"]) self.logger.debug(f"User is admin on {self.host}!") self.admin_privs = True @@ -687,7 +885,9 @@ def save_st(self, st, sk, new_spn=None): if new_spn: # there is a new principal, likely from tampering the SPN during S4U2proxy realm = get_realm_from_ticket(st) - principal = Principal(f"{new_spn}@{realm}", type=constants.PrincipalNameType.NT_SRV_INST.value) + principal = Principal( + f"{new_spn}@{realm}", type=constants.PrincipalNameType.NT_SRV_INST.value + ) self.logger.debug(f"Using principal {principal} for ST") ccache.credentials[0]["server"].fromPrincipal(principal) @@ -697,7 +897,9 @@ def save_st(self, st, sk, new_spn=None): def generate_tgt(self): self.logger.info(f"Attempting to get TGT for {self.username}@{self.domain}") - userName = Principal(self.username, type=constants.PrincipalNameType.NT_PRINCIPAL.value) + userName = Principal( + self.username, type=constants.PrincipalNameType.NT_PRINCIPAL.value + ) try: tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT( @@ -707,10 +909,12 @@ def generate_tgt(self): lmhash=binascii.unhexlify(self.lmhash) if self.lmhash else "", nthash=binascii.unhexlify(self.nthash) if self.nthash else "", aesKey=self.aesKey, - kdcHost=self.kdcHost + kdcHost=self.kdcHost, ) - self.logger.debug(f"TGT successfully obtained for {self.username}@{self.domain}") + self.logger.debug( + f"TGT successfully obtained for {self.username}@{self.domain}" + ) self.logger.debug(f"Using cipher: {cipher}") ccache = CCache() @@ -719,14 +923,18 @@ def generate_tgt(self): ccache.saveFile(tgt_file) self.logger.success(f"TGT saved to: {tgt_file}") - self.logger.success(f"Run the following command to use the TGT: export KRB5CCNAME={tgt_file}") + self.logger.success( + f"Run the following command to use the TGT: export KRB5CCNAME={tgt_file}" + ) except Exception as e: self.logger.fail(f"Failed to get TGT: {e}") def check_dc_ports(self, timeout=1): """Check multiple DC-specific ports in case first check fails""" import socket - dc_ports = [88, 389, 636, 3268, 9389] # Kerberos, LDAP, LDAPS, Global Catalog, ADWS + + # Kerberos, LDAP, LDAPS, Global Catalog, ADWS + dc_ports = [88, 389, 636, 3268, 9389] open_ports = 0 for port in dc_ports: @@ -759,9 +967,13 @@ def is_host_dc(self): self.isdc = True return True except DCERPCException: - self.logger.debug("Error while connecting to host: DCERPCException, which means this is probably not a DC!") + self.logger.debug( + "Error while connecting to host: DCERPCException, which means this is probably not a DC!" + ) except TimeoutError: - self.logger.debug("Timeout while connecting to host: likely not a DC or host is unreachable.") + self.logger.debug( + "Timeout while connecting to host: likely not a DC or host is unreachable." + ) except Exception as e: self.logger.debug(f"Error while connecting to host: {e}") self.isdc = False @@ -779,6 +991,7 @@ def is_host_dc(self): def _is_port_open(self, port, timeout=1): """Check if a specific port is open on the target host.""" import socket + try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(timeout) @@ -806,11 +1019,19 @@ def trigger_winreg(self): if "STATUS_PIPE_NOT_AVAILABLE" not in str(e): raise else: - self.logger.debug(f"Received expected error while triggering winreg: {e}") + self.logger.debug( + f"Received expected error while triggering winreg: {e}" + ) # Give remote registry time to start sleep(1) return True - except (SessionError, BrokenPipeError, ConnectionResetError, NetBIOSError, OSError) as e: + except ( + SessionError, + BrokenPipeError, + ConnectionResetError, + NetBIOSError, + OSError, + ) as e: self.logger.debug(f"Received unexpected error while triggering winreg: {e}") return False @@ -859,7 +1080,7 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: self.args.share, logger=self.logger, timeout=self.args.dcom_timeout, - tries=self.args.get_output_tries + tries=self.args.get_output_tries, ) self.logger.info("Executed command via wmiexec") break @@ -887,7 +1108,7 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: self.args.share, logger=self.logger, timeout=self.args.dcom_timeout, - tries=self.args.get_output_tries + tries=self.args.get_output_tries, ) self.logger.info("Executed command via mmcexec") break @@ -898,7 +1119,9 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: elif method == "atexec": try: exec_method = TSCH_EXEC( - self.host if not self.kerberos else self.hostname + "." + self.domain, + self.host + if not self.kerberos + else self.hostname + "." + self.domain, self.smb_share_name, self.username, self.password, @@ -910,7 +1133,7 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: self.hash, self.logger, self.args.get_output_tries, - self.args.share + self.args.share, ) self.logger.info("Executed command via atexec") break @@ -921,7 +1144,9 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: elif method == "smbexec": try: exec_method = SMBEXEC( - self.host if not self.kerberos else self.hostname + "." + self.domain, + self.host + if not self.kerberos + else self.hostname + "." + self.domain, self.smb_share_name, self.conn, self.username, @@ -935,7 +1160,7 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: self.args.share, self.port, self.logger, - self.args.get_output_tries + self.args.get_output_tries, ) self.logger.info("Executed command via smbexec") break @@ -953,18 +1178,22 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: if not isinstance(output, str): output = output.decode(self.args.codec) except UnicodeDecodeError: - self.logger.debug("Decoding error detected, consider running chcp.com at the target, map the result with https://docs.python.org/3/library/codecs.html#standard-encodings") + self.logger.debug( + "Decoding error detected, consider running chcp.com at the target, map the result with https://docs.python.org/3/library/codecs.html#standard-encodings" + ) output = output.decode("cp437") self.logger.debug(f"Raw Output: {output}") - output = "\n".join([ll.rstrip() for ll in output.splitlines() if ll.strip()]) + output = "\n".join([ + ll.rstrip() for ll in output.splitlines() if ll.strip() + ]) self.logger.debug(f"Cleaned Output: {output}") if "This script contains malicious content" in output: self.logger.fail("Command execution blocked by AMSI") return "" - if (self.args.execute or self.args.ps_execute): + if self.args.execute or self.args.ps_execute: self.logger.success(f"Executed command via {current_method}") if output: for line in output.split("\n"): @@ -975,7 +1204,15 @@ def execute(self, payload=None, get_output=False, methods=None) -> str: return "" @requires_admin - def ps_execute(self, payload=None, get_output=False, methods=None, force_ps32=False, obfs=False, encode=False) -> list: + def ps_execute( + self, + payload=None, + get_output=False, + methods=None, + force_ps32=False, + obfs=False, + encode=False, + ) -> list: """ Wrapper for executing a PowerShell command on the target host. This still uses the execute() method internally, but creates a PowerShell command together with possible AMSI bypasses and other options. @@ -991,7 +1228,9 @@ def ps_execute(self, payload=None, get_output=False, methods=None, force_ps32=Fa ------- list: A list containing the lines of the output of the command """ - payload = self.args.ps_execute if not payload and self.args.ps_execute else payload + payload = ( + self.args.ps_execute if not payload and self.args.ps_execute else payload + ) if not payload: self.logger.error("No command to execute specified!") return [] @@ -1002,16 +1241,44 @@ def ps_execute(self, payload=None, get_output=False, methods=None, force_ps32=Fa force_ps32 = force_ps32 if force_ps32 else self.args.force_ps32 get_output = True if not self.args.no_output else get_output - self.logger.debug(f"Starting ps_execute(): {payload=} {get_output=} {methods=} {force_ps32=} {obfs=} {encode=}") + self.logger.debug( + f"Starting ps_execute(): {payload=} {get_output=} { + methods=} {force_ps32=} {obfs=} {encode=}" + ) amsi_bypass = self.args.amsi_bypass[0] if self.args.amsi_bypass else None self.logger.debug(f"AMSI Bypass: {amsi_bypass}") if os.path.isfile(payload): self.logger.debug(f"File payload set: {payload}") with open(payload) as commands: - response = [self.execute(create_ps_command(c.strip(), force_ps32=force_ps32, obfs=obfs, custom_amsi=amsi_bypass, encode=encode), get_output, methods) for c in commands] + response = [ + self.execute( + create_ps_command( + c.strip(), + force_ps32=force_ps32, + obfs=obfs, + custom_amsi=amsi_bypass, + encode=encode, + ), + get_output, + methods, + ) + for c in commands + ] else: - response = [self.execute(create_ps_command(payload, force_ps32=force_ps32, obfs=obfs, custom_amsi=amsi_bypass, encode=encode), get_output, methods)] + response = [ + self.execute( + create_ps_command( + payload, + force_ps32=force_ps32, + obfs=obfs, + custom_amsi=amsi_bypass, + encode=encode, + ), + get_output, + methods, + ) + ] self.logger.debug(f"ps_execute response: {response}") return response @@ -1024,7 +1291,9 @@ def get_session_list(self): sessions = {} for i in rsessions: sess = i["SessionInfo"]["SessionEnum_Level1"] - state = TSTS.enum2value(TSTS.WINSTATIONSTATECLASS, sess["State"]).split("_")[-1] + state = TSTS.enum2value(TSTS.WINSTATIONSTATECLASS, sess["State"]).split( + "_" + )[-1] sessions[sess["SessionId"]] = { "state": state, "SessionName": sess["Name"], @@ -1033,27 +1302,46 @@ def get_session_list(self): "Username": "", "Domain": "", "Resolution": "", - "ClientTimeZone": "" + "ClientTimeZone": "", } return sessions def enumerate_sessions_info(self, sessions): if len(sessions): - with TSTS.TermSrvSession(self.conn, self.host, self.kerberos) as TermSrvSession: + with TSTS.TermSrvSession( + self.conn, self.host, self.kerberos + ) as TermSrvSession: for SessionId in sessions: sessdata = TermSrvSession.hRpcGetSessionInformationEx(SessionId) - sessflags = TSTS.enum2value(TSTS.SESSIONFLAGS, sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"]["SessionFlags"]) + sessflags = TSTS.enum2value( + TSTS.SESSIONFLAGS, + sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"][ + "SessionFlags" + ], + ) sessions[SessionId]["flags"] = sessflags - domain = sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"]["DomainName"] + domain = sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"][ + "DomainName" + ] if not len(sessions[SessionId]["Domain"]) and len(domain): sessions[SessionId]["Domain"] = domain - username = sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"]["UserName"] + username = sessdata["LSMSessionInfoExPtr"][ + "LSM_SessionInfo_Level1" + ]["UserName"] if not len(sessions[SessionId]["Username"]) and len(username): sessions[SessionId]["Username"] = username - sessions[SessionId]["ConnectTime"] = sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"]["ConnectTime"] - sessions[SessionId]["DisconnectTime"] = sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"]["DisconnectTime"] - sessions[SessionId]["LogonTime"] = sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"]["LogonTime"] - sessions[SessionId]["LastInputTime"] = sessdata["LSMSessionInfoExPtr"]["LSM_SessionInfo_Level1"]["LastInputTime"] + sessions[SessionId]["ConnectTime"] = sessdata[ + "LSMSessionInfoExPtr" + ]["LSM_SessionInfo_Level1"]["ConnectTime"] + sessions[SessionId]["DisconnectTime"] = sessdata[ + "LSMSessionInfoExPtr" + ]["LSM_SessionInfo_Level1"]["DisconnectTime"] + sessions[SessionId]["LogonTime"] = sessdata["LSMSessionInfoExPtr"][ + "LSM_SessionInfo_Level1" + ]["LogonTime"] + sessions[SessionId]["LastInputTime"] = sessdata[ + "LSMSessionInfoExPtr" + ]["LSM_SessionInfo_Level1"]["LastInputTime"] try: with TSTS.RCMPublic(self.conn, self.host, self.kerberos) as rcm: @@ -1062,11 +1350,17 @@ def enumerate_sessions_info(self, sessions): client = rcm.hRpcGetRemoteAddress(SessionId) if not client: continue - sessions[SessionId]["RemoteIp"] = client["pRemoteAddress"]["ipv4"]["in_addr"] + sessions[SessionId]["RemoteIp"] = client["pRemoteAddress"][ + "ipv4" + ]["in_addr"] except Exception as e: - self.logger.debug(f"Error getting client address for session {SessionId}: {e}") + self.logger.debug( + f"Error getting client address for session {SessionId}: {e}" + ) except SessionError: - self.logger.fail("RDP is probably not enabled, cannot list remote IPv4 addresses.") + self.logger.fail( + "RDP is probably not enabled, cannot list remote IPv4 addresses." + ) @requires_admin def taskkill(self): @@ -1080,15 +1374,23 @@ def taskkill(self): self.logger.error("Could not get process list") return - pidList = [i["UniqueProcessId"] for i in res if i["ImageName"].lower() == self.args.taskkill.lower()] + pidList = [ + i["UniqueProcessId"] + for i in res + if i["ImageName"].lower() == self.args.taskkill.lower() + ] if not pidList: - self.logger.fail(f"Could not find process named {self.args.taskkill}") + self.logger.fail( + f"Could not find process named {self.args.taskkill}" + ) return for pid in pidList: try: if legacy.hRpcWinStationTerminateProcess(handle, pid)["ErrorCode"]: - self.logger.highlight(f"Terminated PID {pid} ({self.args.taskkill})") + self.logger.highlight( + f"Terminated PID {pid} ({self.args.taskkill})" + ) else: self.logger.fail(f"Failed terminating PID {pid}") except Exception as e: @@ -1111,7 +1413,13 @@ def qwinsta(self): # Calculate max lengths for formatting maxSessionNameLen = max(len(sessions[i]["SessionName"]) + 1 for i in sessions) maxSessionNameLen = max(maxSessionNameLen, len("SESSIONNAME") + 1) - maxUsernameLen = max(len(sessions[i]["Username"] + sessions[i]["Domain"]) + 1 for i in sessions) + 1 + maxUsernameLen = ( + max( + len(sessions[i]["Username"] + sessions[i]["Domain"]) + 1 + for i in sessions + ) + + 1 + ) maxUsernameLen = max(maxUsernameLen, len("USERNAME") + 1) maxIdLen = max(len(str(i)) for i in sessions) maxIdLen = max(maxIdLen, len("ID") + 1) @@ -1119,14 +1427,16 @@ def qwinsta(self): maxStateLen = max(maxStateLen, len("STATE") + 1) # Create the template for formatting - template = (f"{{SESSIONNAME: <{maxSessionNameLen}}} " - f"{{USERNAME: <{maxUsernameLen}}} " - f"{{ID: <{maxIdLen}}} " - "{IPv4: <16} " - f"{{STATE: <{maxStateLen}}} " - "{DSTATE: <9} " - "{CONNTIME: <20} " - "{DISCTIME: <20} ") + template = ( + f"{{SESSIONNAME: <{maxSessionNameLen}}} " + f"{{USERNAME: <{maxUsernameLen}}} " + f"{{ID: <{maxIdLen}}} " + "{IPv4: <16} " + f"{{STATE: <{maxStateLen}}} " + "{DSTATE: <9} " + "{CONNTIME: <20} " + "{DISCTIME: <20} " + ) header = template.format( SESSIONNAME="SESSIONNAME", USERNAME="USERNAME", @@ -1169,10 +1479,18 @@ def qwinsta(self): continue connectTime = sessions[i]["ConnectTime"] - connectTime = connectTime.strftime(r"%Y/%m/%d %H:%M:%S") if connectTime.year > 1601 else "None" + connectTime = ( + connectTime.strftime(r"%Y/%m/%d %H:%M:%S") + if connectTime.year > 1601 + else "None" + ) disconnectTime = sessions[i]["DisconnectTime"] - disconnectTime = disconnectTime.strftime(r"%Y/%m/%d %H:%M:%S") if disconnectTime.year > 1601 else "None" + disconnectTime = ( + disconnectTime.strftime(r"%Y/%m/%d %H:%M:%S") + if disconnectTime.year > 1601 + else "None" + ) row = template.format( SESSIONNAME=sessions[i]["SessionName"], @@ -1210,16 +1528,24 @@ def format_row(procInfo): res = legacy.hRpcWinStationGetAllProcesses(handle) except Exception as e: # TODO: Issue https://github.com/fortra/impacket/issues/1816 - self.logger.debug(f"Exception while calling hRpcWinStationGetAllProcesses: {e}") + self.logger.debug( + f"Exception while calling hRpcWinStationGetAllProcesses: {e}" + ) return if not res: return self.logger.success("Enumerated processes") maxImageNameLen = max(len(i["ImageName"]) for i in res) maxSidLen = max(len(i["pSid"]) for i in res) - template = f"{{: <{maxImageNameLen}}} {{: <8}} {{: <11}} {{: <{maxSidLen}}} {{: >12}}" - self.logger.highlight(template.format("Image Name", "PID", "Session#", "SID", "Mem Usage")) - self.logger.highlight(template.replace(": ", ":=").format("", "", "", "", "")) + template = f"{{: <{maxImageNameLen}}} {{: <8}} {{: <11}} {{: <{ + maxSidLen + }}} {{: >12}}" + self.logger.highlight( + template.format("Image Name", "PID", "Session#", "SID", "Mem Usage") + ) + self.logger.highlight( + template.replace(": ", ":=").format("", "", "", "", "") + ) found_task = False # For each process on the remote host @@ -1248,11 +1574,17 @@ def output(sessions): # Calculate max lengths for formatting maxSidLen = max(len(key) + 1 for key in sessions) maxSidLen = max(maxSidLen, len("SID") + 1) - maxUsernameLen = max(len(str(vals["Username"]) + str(vals["Domain"])) + 1 for vals in sessions.values()) + 1 + maxUsernameLen = ( + max( + len(str(vals["Username"]) + str(vals["Domain"])) + 1 + for vals in sessions.values() + ) + + 1 + ) maxUsernameLen = max(maxUsernameLen, len("USERNAME") + 1) # Create the template for formatting - template = (f"{{USERNAME: <{maxUsernameLen}}} {{SID: <{maxSidLen}}}") + template = f"{{USERNAME: <{maxUsernameLen}}} {{SID: <{maxSidLen}}}" # Create headers header = template.format(USERNAME="USERNAME", SID="SID") @@ -1273,10 +1605,19 @@ def output(sessions): for row in result: self.logger.highlight(row) else: - self.logger.info(f"No active session found for specified user(s) using the Remote Registry service on {self.hostname}.") + self.logger.info( + f"No active session found for specified user(s) using the Remote Registry service on { + self.hostname + }." + ) # Bind to the Remote Registry Pipe - rpctransport = transport.SMBTransport(self.conn.getRemoteName(), self.conn.getRemoteHost(), filename=r"\winreg", smb_connection=self.conn) + rpctransport = transport.SMBTransport( + self.conn.getRemoteName(), + self.conn.getRemoteHost(), + filename=r"\winreg", + smb_connection=self.conn, + ) for binding_attempts in range(2, 0, -1): dce = rpctransport.get_dce_rpc() try: @@ -1284,9 +1625,15 @@ def output(sessions): dce.bind(rrp.MSRPC_UUID_RRP) break except SessionError as e: - self.logger.debug(f"Could not bind to the Remote Registry on {self.hostname}: {e}") - if binding_attempts == 1: # Last attempt - self.logger.info(f"The Remote Registry service seems to be disabled on {self.hostname}.") + self.logger.debug( + f"Could not bind to the Remote Registry on {self.hostname}: {e}" + ) + if binding_attempts == 1: # Last attempt + self.logger.info( + f"The Remote Registry service seems to be disabled on { + self.hostname + }." + ) return # STATUS_PIPE_NOT_AVAILABLE : Waiting 1 second for the service to start (if idle and set to 'Automatic' startup type) sleep(1) @@ -1296,7 +1643,11 @@ def output(sessions): resp = rrp.hOpenUsers(dce) except DCERPCException as e: if "rpc_s_access_denied" in str(e).lower(): - self.logger.info(f"Access denied while enumerating session using the Remote Registry on {self.hostname}.") + self.logger.info( + f"Access denied while enumerating session using the Remote Registry on { + self.hostname + }." + ) return else: self.logger.fail(f"Exception connecting to RPC on {self.hostname}: {e}") @@ -1316,7 +1667,9 @@ def output(sessions): resp = rrp.hBaseRegEnumKey(dce, key_handle, index) sid = resp["lpNameOut"].rstrip("\0") if re.match(sid_filter, sid) and sid not in exclude_sid: - self.logger.info(f"User with SID {sid} is logged in on {self.hostname}") + self.logger.info( + f"User with SID {sid} is logged in on {self.hostname}" + ) sessions.setdefault(sid, {"Username": "", "Domain": ""}) index += 1 except rrp.DCERPCSessionError as e: @@ -1324,31 +1677,51 @@ def output(sessions): self.logger.debug(f"No more items found in HKU on {self.hostname}.") break else: - self.logger.fail(f"Error enumerating HKU subkeys on {self.hostname}: {e}") + self.logger.fail( + f"Error enumerating HKU subkeys on {self.hostname}: {e}" + ) break rrp.hBaseRegCloseKey(dce, key_handle) dce.disconnect() if not sessions: - self.logger.info(f"No sessions found via the Remote Registry service on {self.hostname}.") + self.logger.info( + f"No sessions found via the Remote Registry service on {self.hostname}." + ) return # Bind to the LSARPC Pipe for SID resolution - rpctransport = transport.SMBTransport(self.conn.getRemoteName(), self.conn.getRemoteHost(), filename=r"\lsarpc", smb_connection=self.conn) + rpctransport = transport.SMBTransport( + self.conn.getRemoteName(), + self.conn.getRemoteHost(), + filename=r"\lsarpc", + smb_connection=self.conn, + ) dce = rpctransport.get_dce_rpc() try: dce.connect() dce.bind(lsat.MSRPC_UUID_LSAT) except Exception as e: - self.logger.debug(f"Failed to connect to LSARPC for SID resolution on {self.hostname}: {e}") + self.logger.debug( + f"Failed to connect to LSARPC for SID resolution on {self.hostname}: { + e + }" + ) output(sessions) return # Resolve SIDs with names - policy_handle = lsad.hLsarOpenPolicy2(dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES)["PolicyHandle"] + policy_handle = lsad.hLsarOpenPolicy2( + dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES + )["PolicyHandle"] try: - resp = lsat.hLsarLookupSids(dce, policy_handle, sessions.keys(), lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta) + resp = lsat.hLsarLookupSids( + dce, + policy_handle, + sessions.keys(), + lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta, + ) except DCERPCException as e: if str(e).find("STATUS_SOME_NOT_MAPPED") >= 0: resp = e.get_packet() @@ -1358,10 +1731,14 @@ def output(sessions): self.logger.debug(f"Could not resolve SID(s): {e}") if resp: - for sid, item in zip(sessions.keys(), resp["TranslatedNames"]["Names"], strict=False): + for sid, item in zip( + sessions.keys(), resp["TranslatedNames"]["Names"], strict=False + ): if item["DomainIndex"] >= 0: sessions[sid]["Username"] = item["Name"] - sessions[sid]["Domain"] = resp["ReferencedDomains"]["Domains"][item["DomainIndex"]]["Name"] + sessions[sid]["Domain"] = resp["ReferencedDomains"]["Domains"][ + item["DomainIndex"] + ]["Name"] # Filter for usernames if self.args.reg_sessions: @@ -1382,6 +1759,47 @@ def output(sessions): else: output(sessions) + def _check_subdir_write(self, share_name, path, depth, temp_dir_name): + """Recursively check write access in subdirectories up to specified depth.""" + if depth == 0: + return [] + writable = [] + try: + search_path = path + "*" if path else "*" + entries = self.conn.listPath(share_name, search_path) + except SessionError: + return writable + + for entry in entries: + if not entry.is_directory(): + continue + name = entry.get_longname() + if name in [".", ".."]: + continue + + subdir_path = f"{path}{name}\\" if path else f"{name}\\" + temp_path = ntpath.normpath("\\" + subdir_path + temp_dir_name) + + try: + self.conn.createDirectory(share_name, temp_path) + with contextlib.suppress(SessionError): + self.conn.deleteDirectory(share_name, temp_path) + writable.append(subdir_path.rstrip("\\")) + self.logger.debug( + f"WRITE access confirmed in subdirectory: {subdir_path}" + ) + continue + except SessionError: + pass + + if depth > 1: + deeper = self._check_subdir_write( + share_name, subdir_path, depth - 1, temp_dir_name + ) + writable.extend(deeper) + + return writable + def shares(self): temp_dir = ntpath.normpath("\\" + gen_random_string()) temp_file = ntpath.normpath("\\" + gen_random_string() + ".txt") @@ -1439,19 +1857,29 @@ def shares(self): share_info["access"].append("READ") except SessionError as e: error = get_error_string(e) - self.logger.debug(f"Error checking READ access on share {share_name}: {error}") + self.logger.debug( + f"Error checking READ access on share {share_name}: {error}" + ) except (NetBIOSError, UnicodeEncodeError) as e: write_check = False share_info["access"].append("UNKNOWN (try '--no-smbv1')") error = get_error_string(e) - self.logger.debug(f"Error checking READ access on share {share_name}: {error}. This exception always caused by special character in share name with SMBv1") - self.logger.info(f"Skipping WRITE permission check on share {share_name}") + self.logger.debug( + f"Error checking READ access on share {share_name}: { + error + }. This exception always caused by special character in share name with SMBv1" + ) + self.logger.info( + f"Skipping WRITE permission check on share {share_name}" + ) if write_check: try: self.conn.createDirectory(share_name, temp_dir) write_dir = True - self.logger.debug(f"WRITE access with DIR creation on share: {share_name}") + self.logger.debug( + f"WRITE access with DIR creation on share: {share_name}" + ) try: self.conn.deleteDirectory(share_name, temp_dir) except SessionError as e: @@ -1459,17 +1887,32 @@ def shares(self): if error == "STATUS_OBJECT_NAME_NOT_FOUND": pass else: - self.logger.debug(f"Error DELETING created temp dir {temp_dir} on share {share_name}: {error}") + self.logger.debug( + f"Error DELETING created temp dir {temp_dir} on share { + share_name + }: {error}" + ) except SessionError as e: error = get_error_string(e) - self.logger.debug(f"Error checking WRITE access with DIR creation on share {share_name}: {error}") + self.logger.debug( + f"Error checking WRITE access with DIR creation on share { + share_name + }: {error}" + ) try: tid = self.conn.connectTree(share_name) - fid = self.conn.createFile(tid, temp_file, desiredAccess=FILE_SHARE_WRITE, shareMode=FILE_SHARE_DELETE) + fid = self.conn.createFile( + tid, + temp_file, + desiredAccess=FILE_SHARE_WRITE, + shareMode=FILE_SHARE_DELETE, + ) self.conn.closeFile(tid, fid) write_file = True - self.logger.debug(f"WRITE access with FILE creation on share: {share_name}") + self.logger.debug( + f"WRITE access with FILE creation on share: {share_name}" + ) try: self.conn.deleteFile(share_name, temp_file) except SessionError as e: @@ -1477,28 +1920,50 @@ def shares(self): if error == "STATUS_OBJECT_NAME_NOT_FOUND": pass else: - self.logger.debug(f"Error DELETING created temp file {temp_file} on share {share_name}") + self.logger.debug( + f"Error DELETING created temp file { + temp_file + } on share {share_name}" + ) except SessionError as e: error = get_error_string(e) - self.logger.debug(f"Error checking WRITE access with FILE creation on share {share_name}: {error}") + self.logger.debug( + f"Error checking WRITE access with FILE creation on share { + share_name + }: {error}" + ) # If we either can create a file or a directory we add the write privs to the output. Agreed on in https://github.com/Pennyw0rth/NetExec/pull/404 if write_dir or write_file: write = True share_info["access"].append("WRITE") + # If root write check failed but we have read access, check subdirectories + if not write and read and write_check and self.args.shares_depth > 0: + writable_subdirs = self._check_subdir_write( + share_name, "", self.args.shares_depth, gen_random_string() + ) + if writable_subdirs: + write = True + for subdir in writable_subdirs: + share_info["access"].append(f"WRITE ({subdir})") + permissions.append(share_info) if share_name != "IPC$": try: # TODO: check if this already exists in DB before adding - self.db.add_share(self.hostname, user_id, share_name, share_remark, read, write) + self.db.add_share( + self.hostname, user_id, share_name, share_remark, read, write + ) except Exception as e: error = get_error_string(e) self.logger.debug(f"Error adding share: {error}") if self.args.filter_shares: - self.logger.display("[REMOVED] Use the --shares read,write options instead.") + self.logger.display( + "[REMOVED] Use the --shares read,write options instead." + ) self.logger.display("Enumerated shares") self.logger.highlight(f"{'Share':<15} {'Permissions':<15} {'Remark'}") @@ -1528,11 +1993,20 @@ def dir(self): if not contents: return - self.logger.highlight(f"{'Perms':<9}{'File Size':<15}{'Date':<30}{'File Path':<45}") - self.logger.highlight(f"{'-----':<9}{'---------':<15}{'----':<30}{'---------':<45}") + self.logger.highlight( + f"{'Perms':<9}{'File Size':<15}{'Date':<30}{'File Path':<45}" + ) + self.logger.highlight( + f"{'-----':<9}{'---------':<15}{'----':<30}{'---------':<45}" + ) for content in contents: full_path = ntpath.join(self.args.dir, content.get_longname()) - self.logger.highlight(f"{'d' if content.is_directory() else 'f'}{'rw-' if content.is_readonly() > 0 else 'r--':<8}{content.get_filesize():<15}{ctime(float(content.get_mtime_epoch())):<30}{full_path:<45}") + self.logger.highlight( + f"{'d' if content.is_directory() else 'f'}{ + 'rw-' if content.is_readonly() > 0 else 'r--':<8}{ + content.get_filesize():<15}{ + ctime(float(content.get_mtime_epoch())):<30}{full_path:<45}" + ) def interfaces(self): """ @@ -1552,7 +2026,7 @@ def interfaces(self): ctlCode=FSCTL_QUERY_NETWORK_INTERFACE_INFO, flags=SMB2_0_IOCTL_IS_FSCTL, inputBlob=b"", - maxOutputResponse=8192 + maxOutputResponse=8192, ) if response: @@ -1570,20 +2044,30 @@ def interfaces(self): while offset < len(response) and offset + 152 <= len(response): try: # Parse NETWORK_INTERFACE_INFO structure - next_offset = struct.unpack(" offset else offset + next_offset + offset = ( + next_offset + if next_offset > offset + else offset + next_offset + ) if offset >= len(response): break except (struct.error, IndexError) as e: - self.logger.fail(f"Error parsing interface at offset {offset}: {e}") + self.logger.fail( + f"Error parsing interface at offset {offset}: {e}" + ) break # Display interfaces @@ -1621,11 +2111,17 @@ def interfaces(self): self.logger.fail("No network interfaces found") return - self.logger.highlight(f"Found {len(grouped_interfaces)} network interface(s)") + self.logger.highlight( + f"Found {len(grouped_interfaces)} network interface(s)" + ) for i, if_index in enumerate(sorted(grouped_interfaces.keys())): iface = grouped_interfaces[if_index] - caps_str = ", ".join(iface["capabilities"]) if iface["capabilities"] else "None" + caps_str = ( + ", ".join(iface["capabilities"]) + if iface["capabilities"] + else "None" + ) speed_mbps = iface["link_speed"] / 1000000 self.logger.display(f"Interface {i + 1} (Index: {if_index}):") @@ -1652,12 +2148,19 @@ def get_dc_ips(self): return dc_ips def smb_sessions(self): - self.logger.fail("[REMOVED] Use option --reg-sessions --qwinsta or --loggedon-users") + self.logger.fail( + "[REMOVED] Use option --reg-sessions --qwinsta or --loggedon-users" + ) return def disks(self): try: - rpctransport = transport.SMBTransport(self.conn.getRemoteName(), self.conn.getRemoteHost(), filename=r"\srvsvc", smb_connection=self.conn) + rpctransport = transport.SMBTransport( + self.conn.getRemoteName(), + self.conn.getRemoteHost(), + filename=r"\srvsvc", + smb_connection=self.conn, + ) dce = rpctransport.get_dce_rpc() dce.connect() dce.bind(srvs.MSRPC_UUID_SRVS) @@ -1685,12 +2188,18 @@ def local_groups(self): for group_name, group_rid in groups.items(): self.logger.highlight(f"{group_rid} - {group_name}") - group_id = self.db.add_group(self.hostname, group_name, rid=group_rid)[0] + group_id = self.db.add_group(self.hostname, group_name, rid=group_rid)[ + 0 + ] self.logger.debug(f"Added group, returned id: {group_id}") elif groups and members: - self.logger.success(f"Enumerated users of local groups: {groups.popitem()[0]}") + self.logger.success( + f"Enumerated users of local groups: {groups.popitem()[0]}" + ) - members = dict(sorted(members.items(), key=lambda item: int(item[0].split("-")[-1]))) + members = dict( + sorted(members.items(), key=lambda item: int(item[0].split("-")[-1])) + ) for member in members: self.logger.highlight(f"{member} - {members[member]}") @@ -1701,7 +2210,9 @@ def groups(self): def users(self): if self.args.users: self.logger.debug(f"Dumping users: {', '.join(self.args.users)}") - return UserSamrDump(self).dump(requested_users=self.args.users, dump_path=self.args.users_export) + return UserSamrDump(self).dump( + requested_users=self.args.users, dump_path=self.args.users_export + ) def users_export(self): self.users() @@ -1712,25 +2223,40 @@ def computers(self): def loggedon_users(self): if self.args.loggedon_users_filter: - self.logger.fail("[REMOVED] Use option '--loggedon-users ' for filtering") + self.logger.fail( + "[REMOVED] Use option '--loggedon-users ' for filtering" + ) logged_on = set() try: - rpctransport = transport.SMBTransport(self.conn.getRemoteName(), self.conn.getRemoteHost(), filename=r"\wkssvc", smb_connection=self.conn) + rpctransport = transport.SMBTransport( + self.conn.getRemoteName(), + self.conn.getRemoteHost(), + filename=r"\wkssvc", + smb_connection=self.conn, + ) dce = rpctransport.get_dce_rpc() dce.connect() dce.bind(wkst.MSRPC_UUID_WKST) response = wkst.hNetrWkstaUserEnum(dce, 1) for user in response["UserInfo"]["WkstaUserInfo"]["Level1"]["Buffer"]: - user_info = (user["wkui1_logon_domain"][:-1], user["wkui1_username"][:-1], user["wkui1_logon_server"][:-1]) + user_info = ( + user["wkui1_logon_domain"][:-1], + user["wkui1_username"][:-1], + user["wkui1_logon_server"][:-1], + ) if user_info not in logged_on: logged_on.add(user_info) if self.args.loggedon_users: if re.match(self.args.loggedon_users, user_info[1]): - self.logger.highlight(f"{user_info[0]}\\{user_info[1]:<25} logon_server: {user_info[2]}") + self.logger.highlight( + f"{user_info[0]}\\{user_info[1]:<25} logon_server: {user_info[2]}" + ) else: - self.logger.highlight(f"{user_info[0]}\\{user_info[1]:<25} logon_server: {user_info[2]}") + self.logger.highlight( + f"{user_info[0]}\\{user_info[1]:<25} logon_server: {user_info[2]}" + ) except Exception as e: self.logger.fail(f"Error enumerating logged on users: {e}") @@ -1747,16 +2273,36 @@ def wmi_query(self, wql=None, namespace=None, callback_func=None): namespace = self.args.wmi_namespace try: - dcom = DCOMConnection(self.remoteName, self.username, self.password, self.domain, self.lmhash, self.nthash, oxidResolver=True, doKerberos=self.kerberos, kdcHost=self.kdcHost, aesKey=self.aesKey, remoteHost=self.host) - iInterface = dcom.CoCreateInstanceEx(CLSID_WbemLevel1Login, IID_IWbemLevel1Login) - flag, stringBinding = dcom_FirewallChecker(iInterface, self.host, self.args.dcom_timeout) + dcom = DCOMConnection( + self.remoteName, + self.username, + self.password, + self.domain, + self.lmhash, + self.nthash, + oxidResolver=True, + doKerberos=self.kerberos, + kdcHost=self.kdcHost, + aesKey=self.aesKey, + remoteHost=self.host, + ) + iInterface = dcom.CoCreateInstanceEx( + CLSID_WbemLevel1Login, IID_IWbemLevel1Login + ) + flag, stringBinding = dcom_FirewallChecker( + iInterface, self.host, self.args.dcom_timeout + ) if not flag or not stringBinding: - error_msg = f"WMI Query: Dcom initialization failed on connection with stringbinding: '{stringBinding}', please increase the timeout with the option '--dcom-timeout'. If it's still failing maybe something is blocking the RPC connection, try another exec method" + error_msg = f"WMI Query: Dcom initialization failed on connection with stringbinding: '{ + stringBinding + }', please increase the timeout with the option '--dcom-timeout'. If it's still failing maybe something is blocking the RPC connection, try another exec method" if not stringBinding: error_msg = "WMI Query: Dcom initialization failed: can't get target stringbinding, maybe cause by IPv6 or any other issues, please check your target again" - self.logger.fail(error_msg) if not flag else self.logger.debug(error_msg) + self.logger.fail(error_msg) if not flag else self.logger.debug( + error_msg + ) # Make it force break function dcom.disconnect() iWbemLevel1Login = IWbemLevel1Login(iInterface) @@ -1776,7 +2322,9 @@ def wmi_query(self, wql=None, namespace=None, callback_func=None): record = wmi_results.getProperties() records.append(record) for k, v in record.items(): - if k != "TimeGenerated": # from the wcc module, but this is a small hack to get it to stop spamming - TODO: add in method to disable output for this function + if ( + k != "TimeGenerated" + ): # from the wcc module, but this is a small hack to get it to stop spamming - TODO: add in method to disable output for this function self.logger.highlight(f"{k} => {v['value']}") else: callback_func(iEnumWbemClassObject, records) @@ -1796,7 +2344,7 @@ def spider( depth=None, content=False, only_files=True, - silent=True + silent=True, ): if exclude_dirs is None: exclude_dirs = [] @@ -1818,10 +2366,20 @@ def spider( self.args.depth, self.args.content, self.args.only_files, - self.args.silent + self.args.silent, ) else: - spider.spider(share, folder, pattern, regex, exclude_dirs, depth, content, only_files, silent) + spider.spider( + share, + folder, + pattern, + regex, + exclude_dirs, + depth, + content, + only_files, + silent, + ) if not silent: self.logger.display(f"Done spidering (Completed in {time() - start_time})") @@ -1846,7 +2404,14 @@ def rid_brute(self, max_rid=None): if hasattr(rpc_transport, "set_credentials"): # This method exists only for selected protocol sequences. - rpc_transport.set_credentials(self.username, self.password, self.domain, self.lmhash, self.nthash, self.aesKey) + rpc_transport.set_credentials( + self.username, + self.password, + self.domain, + self.lmhash, + self.nthash, + self.aesKey, + ) if self.kerberos: rpc_transport.set_kerberos(self.kerberos, self.kdcHost) @@ -1867,7 +2432,9 @@ def rid_brute(self, max_rid=None): dce.bind(lsat.MSRPC_UUID_LSAT) try: - resp = lsad.hLsarOpenPolicy2(dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES) + resp = lsad.hLsarOpenPolicy2( + dce, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES + ) except lsad.DCERPCSessionError as e: self.logger.fail(f"Error connecting: {e}") return entries @@ -1875,7 +2442,11 @@ def rid_brute(self, max_rid=None): policy_handle = resp["PolicyHandle"] try: - resp = lsad.hLsarQueryInformationPolicy2(dce, policy_handle, lsad.POLICY_INFORMATION_CLASS.PolicyAccountDomainInformation) + resp = lsad.hLsarQueryInformationPolicy2( + dce, + policy_handle, + lsad.POLICY_INFORMATION_CLASS.PolicyAccountDomainInformation, + ) except lsad.DCERPCException as e: if e.error_string == "nca_s_op_rng_error": self.logger.fail("RPC lookup failed: RPC method not implemented") @@ -1883,19 +2454,29 @@ def rid_brute(self, max_rid=None): self.logger.fail(f"Error querying policy information: {e}") return entries - domain_sid = resp["PolicyInformation"]["PolicyAccountDomainInfo"]["DomainSid"].formatCanonical() + domain_sid = resp["PolicyInformation"]["PolicyAccountDomainInfo"][ + "DomainSid" + ].formatCanonical() so_far = 0 simultaneous = 1000 for _j in range(max_rid // simultaneous + 1): - sids_to_check = (max_rid - so_far) % simultaneous if (max_rid - so_far) // simultaneous == 0 else simultaneous + sids_to_check = ( + (max_rid - so_far) % simultaneous + if (max_rid - so_far) // simultaneous == 0 + else simultaneous + ) if sids_to_check == 0: break - sids = [f"{domain_sid}-{i:d}" for i in range(so_far, so_far + sids_to_check)] + sids = [ + f"{domain_sid}-{i:d}" for i in range(so_far, so_far + sids_to_check) + ] try: - lsat.hLsarLookupSids(dce, policy_handle, sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta) + lsat.hLsarLookupSids( + dce, policy_handle, sids, lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta + ) except DCERPCException as e: if str(e).find("STATUS_NONE_MAPPED") >= 0: so_far += simultaneous @@ -1908,18 +2489,18 @@ def rid_brute(self, max_rid=None): for n, item in enumerate(resp["TranslatedNames"]["Names"]): if item["Use"] != SID_NAME_USE.SidTypeUnknown: rid = so_far + n - domain = resp["ReferencedDomains"]["Domains"][item["DomainIndex"]]["Name"] + domain = resp["ReferencedDomains"]["Domains"][item["DomainIndex"]][ + "Name" + ] user = item["Name"] sid_type = SID_NAME_USE.enumItems(item["Use"]).name self.logger.highlight(f"{rid}: {domain}\\{user} ({sid_type})") - entries.append( - { - "rid": rid, - "domain": domain, - "username": user, - "sidtype": sid_type, - } - ) + entries.append({ + "rid": rid, + "domain": domain, + "username": user, + "sidtype": sid_type, + }) so_far += simultaneous dce.disconnect() return entries @@ -1929,7 +2510,9 @@ def put_file_single(self, src, dst): with open(src, "rb") as file: try: self.conn.putFile(self.args.share, dst, file.read) - self.logger.success(f"Created file {src} on \\\\{self.args.share}\\{dst}") + self.logger.success( + f"Created file {src} on \\\\{self.args.share}\\{dst}" + ) except Exception as e: self.logger.fail(f"Error writing file to share {self.args.share}: {e}") @@ -1945,9 +2528,13 @@ def get_file_single(self, remote_path, download_path): with open(download_path, "wb+") as file: try: self.conn.getFile(share_name, remote_path, file.write) - self.logger.success(f'File "{remote_path}" was downloaded to "{download_path}"') + self.logger.success( + f'File "{remote_path}" was downloaded to "{download_path}"' + ) except Exception as e: - self.logger.fail(f'Error writing file "{remote_path}" from share "{share_name}": {e}') + self.logger.fail( + f'Error writing file "{remote_path}" from share "{share_name}": {e}' + ) if os.path.getsize(download_path) == 0: os.remove(download_path) @@ -1958,9 +2545,13 @@ def get_file(self): def enable_remoteops(self, regsecret=False): try: if regsecret: - self.remote_ops = RegSecretsRemoteOperations(self.conn, self.kerberos, self.kdcHost) + self.remote_ops = RegSecretsRemoteOperations( + self.conn, self.kerberos, self.kdcHost + ) else: - self.remote_ops = RemoteOperations(self.conn, self.kerberos, self.kdcHost) + self.remote_ops = RemoteOperations( + self.conn, self.kerberos, self.kdcHost + ) self.remote_ops.enableRegistry() if self.bootkey is None: self.bootkey = self.remote_ops.getBootKey() @@ -2008,10 +2599,14 @@ def add_sam_hash(sam_hash, host_id): ) self.logger.display("Dumping SAM hashes") - self.output_filename = self.output_file_template.format(output_folder="sam") + self.output_filename = self.output_file_template.format( + output_folder="sam" + ) SAM.dump() SAM.export(self.output_filename) - self.logger.success(f"Added {highlight(add_sam_hash.sam_hashes)} SAM hashes to the database") + self.logger.success( + f"Added {highlight(add_sam_hash.sam_hashes)} SAM hashes to the database" + ) try: self.remote_ops.finish() @@ -2022,7 +2617,9 @@ def add_sam_hash(sam_hash, host_id): SAM.finish() except SessionError as e: if "STATUS_ACCESS_DENIED" in e.getErrorString(): - self.logger.fail('Error "STATUS_ACCESS_DENIED" while dumping SAM. This is likely due to an endpoint protection.') + self.logger.fail( + 'Error "STATUS_ACCESS_DENIED" while dumping SAM. This is likely due to an endpoint protection.' + ) except Exception as e: self.logger.exception(str(e)) @@ -2052,12 +2649,20 @@ def sccm(self): self.logger.fail("No masterkeys looted") return - self.logger.success(f"Got {highlight(len(masterkeys))} decrypted masterkeys. Looting SCCM Credentials through {self.args.sccm}") + self.logger.success( + f"Got { + highlight(len(masterkeys)) + } decrypted masterkeys. Looting SCCM Credentials through {self.args.sccm}" + ) def sccm_callback(secret): if isinstance(secret, SCCMCred): tag = "NAA Account" - self.logger.highlight(f"[{tag}] {secret.username.decode('latin-1')}:{secret.password.decode('latin-1')}") + self.logger.highlight( + f"[{tag}] {secret.username.decode('latin-1')}:{ + secret.password.decode('latin-1') + }" + ) self.db.add_dpapi_secrets( target.address, f"SCCM - {tag}", @@ -2079,7 +2684,11 @@ def sccm_callback(secret): ) elif isinstance(secret, SCCMCollection): tag = "Collection Variable" - self.logger.highlight(f"[{tag}] {secret.variable.decode('latin-1')}:{secret.value.decode('latin-1')}") + self.logger.highlight( + f"[{tag}] {secret.variable.decode('latin-1')}:{ + secret.value.decode('latin-1') + }" + ) self.db.add_dpapi_secrets( target.address, f"SCCM - {tag}", @@ -2088,9 +2697,17 @@ def sccm_callback(secret): secret.value.decode("latin-1"), "N/A", ) + try: - sccm_triage = SCCMTriage(target=target, conn=conn, masterkeys=masterkeys, per_secret_callback=sccm_callback) - sccm_triage.triage_sccm(use_wmi=self.args.sccm == "wmi", ) + sccm_triage = SCCMTriage( + target=target, + conn=conn, + masterkeys=masterkeys, + per_secret_callback=sccm_callback, + ) + sccm_triage.triage_sccm( + use_wmi=self.args.sccm == "wmi", + ) except Exception as e: self.logger.debug(f"Error while looting sccm: {e}") @@ -2121,25 +2738,35 @@ def dpapi(self): use_kcache=self.use_kcache, ) - self.output_file = open(self.output_file_template.format(output_folder="dpapi"), "w", encoding="utf-8") # noqa: SIM115 + self.output_file = open( + self.output_file_template.format(output_folder="dpapi"), + "w", + encoding="utf-8", + ) # noqa: SIM115 conn = upgrade_to_dploot_connection(connection=self.conn, target=target) if conn is None: self.logger.debug("Could not upgrade connection") return - masterkeys = collect_masterkeys_from_target(self, target, conn, system=dump_system) + masterkeys = collect_masterkeys_from_target( + self, target, conn, system=dump_system + ) if len(masterkeys) == 0: self.logger.fail("No masterkeys looted") return - self.logger.success(f"Got {highlight(len(masterkeys))} decrypted masterkeys. Looting secrets...") + self.logger.success( + f"Got {highlight(len(masterkeys))} decrypted masterkeys. Looting secrets..." + ) # Collect User and Machine Credentials Manager secrets def credential_callback(credential): tag = "CREDENTIAL" - line = f"[{credential.winuser}][{tag}] {credential.target} - {credential.username}:{credential.password}" + line = f"[{credential.winuser}][{tag}] {credential.target} - { + credential.username + }:{credential.password}" self.logger.highlight(line) if self.output_file: self.output_file.write(line + "\n") @@ -2153,7 +2780,12 @@ def credential_callback(credential): ) try: - credentials_triage = CredentialsTriage(target=target, conn=conn, masterkeys=masterkeys, per_credential_callback=credential_callback) + credentials_triage = CredentialsTriage( + target=target, + conn=conn, + masterkeys=masterkeys, + per_credential_callback=credential_callback, + ) self.logger.debug(f"Credentials Triage Object: {credentials_triage}") credentials_triage.triage_credentials() if dump_system: @@ -2167,7 +2799,10 @@ def credential_callback(credential): try: cng_triage = CngTriage(target=target, conn=conn, masterkeys=masterkeys) for cng_file in cng_triage.triage_system_cng(): - if cng_file.cng_blob["Name"].decode("utf-16le").rstrip("\0") == "Google Chromekey1": + if ( + cng_file.cng_blob["Name"].decode("utf-16le").rstrip("\0") + == "Google Chromekey1" + ): self.logger.debug("Found CNG Google ChromeKey1\n") cng_chromekey = cng_file.decrypted_private_key except Exception as e: @@ -2177,7 +2812,9 @@ def credential_callback(credential): def browser_callback(secret): if isinstance(secret, LoginData): secret_url = secret.url + " -" if secret.url != "" else "-" - line = f"[{secret.winuser}][{secret.browser.upper()}] {secret_url} {secret.username}:{secret.password}" + line = f"[{secret.winuser}][{secret.browser.upper()}] {secret_url} { + secret.username + }:{secret.password}" self.logger.highlight(line) if self.output_file: self.output_file.write(line + "\n") @@ -2190,7 +2827,9 @@ def browser_callback(secret): secret.url, ) elif isinstance(secret, GoogleRefreshToken): - line = f"[{secret.winuser}][{secret.browser.upper()}] Google Refresh Token: {secret.service}:{secret.token}" + line = f"[{secret.winuser}][{ + secret.browser.upper() + }] Google Refresh Token: {secret.service}:{secret.token}" self.logger.highlight(line) if self.output_file: self.output_file.write(line + "\n") @@ -2203,14 +2842,23 @@ def browser_callback(secret): "Google Refresh Token", ) elif isinstance(secret, Cookie): - line = f"[{secret.winuser}][{secret.browser.upper()}] {secret.host}{secret.path} - {secret.cookie_name}:{secret.cookie_value}" + line = f"[{secret.winuser}][{secret.browser.upper()}] {secret.host}{ + secret.path + } - {secret.cookie_name}:{secret.cookie_value}" self.logger.highlight(line) if self.output_file: self.output_file.write(line + "\n") try: - browser_triage = BrowserTriage(target=target, conn=conn, masterkeys=masterkeys, per_secret_callback=browser_callback) - browser_triage.triage_browsers(gather_cookies=dump_cookies, cng_chromekey=cng_chromekey) + browser_triage = BrowserTriage( + target=target, + conn=conn, + masterkeys=masterkeys, + per_secret_callback=browser_callback, + ) + browser_triage.triage_browsers( + gather_cookies=dump_cookies, cng_chromekey=cng_chromekey + ) except Exception as e: self.logger.debug(f"Error while looting browsers: {e}") @@ -2218,7 +2866,9 @@ def vault_callback(secret): tag = "IEX" if secret.type == "Internet Explorer": resource = secret.resource + " -" if secret.resource != "" else "-" - line = f"[{secret.winuser}][{tag}] {resource} - {secret.username}:{secret.password}" + line = f"[{secret.winuser}][{tag}] {resource} - {secret.username}:{ + secret.password + }" self.logger.highlight(line) if self.output_file: self.output_file.write(line + "\n") @@ -2233,7 +2883,12 @@ def vault_callback(secret): try: # Collect User Internet Explorer stored secrets - vaults_triage = VaultsTriage(target=target, conn=conn, masterkeys=masterkeys, per_vault_callback=vault_callback) + vaults_triage = VaultsTriage( + target=target, + conn=conn, + masterkeys=masterkeys, + per_vault_callback=vault_callback, + ) vaults_triage.triage_vaults() except Exception as e: self.logger.debug(f"Error while looting vaults: {e}") @@ -2242,7 +2897,9 @@ def firefox_callback(secret): tag = "FIREFOX" if isinstance(secret, FirefoxData): url = secret.url + " -" if secret.url != "" else "-" - line = f"[{secret.winuser}][{tag}] {url} {secret.username}:{secret.password}" + line = f"[{secret.winuser}][{tag}] {url} {secret.username}:{ + secret.password + }" self.logger.highlight(line) if self.output_file: self.output_file.write(line + "\n") @@ -2255,14 +2912,21 @@ def firefox_callback(secret): secret.url, ) elif isinstance(secret, FirefoxCookie): - line = f"[{secret.winuser}][{tag}] {secret.host}{secret.path} {secret.cookie_name}:{secret.cookie_value}" + line = f"[{secret.winuser}][{tag}] {secret.host}{secret.path} { + secret.cookie_name + }:{secret.cookie_value}" self.logger.highlight(line) if self.output_file: self.output_file.write(line + "\n") try: # Collect Firefox stored secrets - firefox_triage = FirefoxTriage(target=target, logger=self.logger, conn=conn, per_secret_callback=firefox_callback) + firefox_triage = FirefoxTriage( + target=target, + logger=self.logger, + conn=conn, + per_secret_callback=firefox_callback, + ) firefox_triage.run(gather_cookies=dump_cookies) except Exception as e: self.logger.debug(f"Error while looting firefox: {e}") @@ -2310,7 +2974,9 @@ def add_lsa_secret(secret): LSA = RegSecretsLSASecrets( self.bootkey, self.remote_ops, - perSecretCallback=lambda secret_type, secret: add_lsa_secret(secret), + perSecretCallback=lambda secret_type, secret: add_lsa_secret( + secret + ), ) else: SECURITYFileName = self.remote_ops.saveSECURITY() @@ -2319,15 +2985,23 @@ def add_lsa_secret(secret): self.bootkey, self.remote_ops, isRemote=True, - perSecretCallback=lambda secret_type, secret: add_lsa_secret(secret), + perSecretCallback=lambda secret_type, secret: add_lsa_secret( + secret + ), ) self.logger.display("Dumping LSA secrets") - self.output_filename = self.output_file_template.format(output_folder="lsa") + self.output_filename = self.output_file_template.format( + output_folder="lsa" + ) LSA.dumpCachedHashes() LSA.exportCached(self.output_filename) LSA.dumpSecrets() LSA.exportSecrets(self.output_filename) - self.logger.success(f"Dumped {highlight(add_lsa_secret.secrets)} LSA secrets to {self.output_filename + '.secrets'} and {self.output_filename + '.cached'}") + self.logger.success( + f"Dumped {highlight(add_lsa_secret.secrets)} LSA secrets to { + self.output_filename + '.secrets' + } and {self.output_filename + '.cached'}" + ) try: self.remote_ops.finish() except Exception as e: @@ -2336,7 +3010,9 @@ def add_lsa_secret(secret): LSA.finish() except SessionError as e: if "STATUS_ACCESS_DENIED" in e.getErrorString(): - self.logger.fail('Error "STATUS_ACCESS_DENIED" while dumping LSA. This is likely due to an endpoint protection.') + self.logger.fail( + 'Error "STATUS_ACCESS_DENIED" while dumping LSA. This is likely due to an endpoint protection.' + ) except Exception as e: self.logger.exception(str(e)) @@ -2349,7 +3025,11 @@ def ntds(self): def add_hash(secret_type, secret, host_id): nonlocal printed_kerb_keys_banner - if self.args.kerberos_keys and not printed_kerb_keys_banner and secret_type == NTDSHashes.SECRET_TYPE.NTDS_KERBEROS: + if ( + self.args.kerberos_keys + and not printed_kerb_keys_banner + and secret_type == NTDSHashes.SECRET_TYPE.NTDS_KERBEROS + ): self.logger.display("Kerberos keys:") printed_kerb_keys_banner = True @@ -2369,7 +3049,11 @@ def add_hash(secret_type, secret, host_id): self.logger.highlight(secret) # Filter out computer accounts, history hashes and kerberos keys for adding to db - if secret.find("$") == -1 and secret_type == NTDSHashes.SECRET_TYPE.NTDS and "_history" not in secret: + if ( + secret.find("$") == -1 + and secret_type == NTDSHashes.SECRET_TYPE.NTDS + and "_history" not in secret + ): if secret.find("\\") != -1: domain, clean_hash = secret.split("\\") else: @@ -2380,12 +3064,16 @@ def add_hash(secret_type, secret, host_id): username, _, lmhash, nthash, _, _, _ = clean_hash.split(":") parsed_hash = f"{lmhash}:{nthash}" if validate_ntlm(parsed_hash): - self.db.add_credential("hash", domain, username, parsed_hash, pillaged_from=host_id) + self.db.add_credential( + "hash", domain, username, parsed_hash, pillaged_from=host_id + ) add_hash.added_to_db += 1 return raise except Exception: - self.logger.debug("Dumped hash is not NTLM, not adding to db for now ;)") + self.logger.debug( + "Dumped hash is not NTLM, not adding to db for now ;)" + ) else: self.logger.debug("Dumped hash is a computer account, not adding to db") @@ -2417,17 +3105,31 @@ def add_hash(secret_type, secret, host_id): outputFileName=self.output_filename, justUser=self.args.userntds if self.args.userntds else None, printUserStatus=True, - perSecretCallback=lambda secret_type, secret: add_hash(secret_type, secret, host_id), + perSecretCallback=lambda secret_type, secret: add_hash( + secret_type, secret, host_id + ), ) try: - self.logger.success("Dumping the NTDS, this could take a while so go grab a redbull...") + self.logger.success( + "Dumping the NTDS, this could take a while so go grab a redbull..." + ) NTDS.dump() ntds_outfile = f"{self.output_filename}.ntds" - self.logger.success(f"Dumped {highlight(add_hash.nt_lm_secrets)} NTDS hashes to {ntds_outfile} of which {highlight(add_hash.added_to_db)} were added to the database") + self.logger.success( + f"Dumped {highlight(add_hash.nt_lm_secrets)} NTDS hashes to { + ntds_outfile + } of which {highlight(add_hash.added_to_db)} were added to the database" + ) if self.args.kerberos_keys: - self.logger.success(f"Dumped {highlight(add_hash.kerb_secrets)} Kerberos keys to {ntds_outfile}.kerberos") - self.logger.display("To extract only enabled accounts from the output file, run the following command: ") + self.logger.success( + f"Dumped {highlight(add_hash.kerb_secrets)} Kerberos keys to { + ntds_outfile + }.kerberos" + ) + self.logger.display( + "To extract only enabled accounts from the output file, run the following command: " + ) self.logger.display(f"grep -iv disabled {ntds_outfile} | cut -d ':' -f1") except Exception as e: # if str(e).find('ERROR_DS_DRA_BAD_DN') >= 0: diff --git a/nxc/protocols/smb/proto_args.py b/nxc/protocols/smb/proto_args.py index 6765559c24..7a1fc51db4 100644 --- a/nxc/protocols/smb/proto_args.py +++ b/nxc/protocols/smb/proto_args.py @@ -1,113 +1,460 @@ from argparse import _StoreTrueAction, _StoreAction -from nxc.helpers.args import DisplayDefaultsNotNone, DefaultTrackingAction, get_conditional_action +from nxc.helpers.args import ( + DisplayDefaultsNotNone, + DefaultTrackingAction, + get_conditional_action, +) def proto_args(parser, parents): - smb_parser = parser.add_parser("smb", help="own stuff using SMB", parents=parents, formatter_class=DisplayDefaultsNotNone) - smb_parser.add_argument("-H", "--hash", metavar="HASH", dest="hash", nargs="+", default=[], help="NTLM hash(es) or file(s) containing NTLM hashes") + smb_parser = parser.add_parser( + "smb", + help="own stuff using SMB", + parents=parents, + formatter_class=DisplayDefaultsNotNone, + ) + smb_parser.add_argument( + "-H", + "--hash", + metavar="HASH", + dest="hash", + nargs="+", + default=[], + help="NTLM hash(es) or file(s) containing NTLM hashes", + ) - delegate_arg = smb_parser.add_argument("--delegate", action="store", help="Impersonate user with S4U2Self + S4U2Proxy") - delegate_spn_arg = smb_parser.add_argument("--delegate-spn", action=get_conditional_action(_StoreAction), make_required=[], help="SPN to use for S4U2Proxy, if not specified the SPN used will be cifs/", type=str) - generate_st = smb_parser.add_argument("--generate-st", type=str, dest="generate_st", action=get_conditional_action(_StoreAction), make_required=[], help="Store the S4U Service Ticket in the specified file") - self_delegate_arg = smb_parser.add_argument("--self", dest="no_s4u2proxy", action=get_conditional_action(_StoreTrueAction), make_required=[], help="Only do S4U2Self, no S4U2Proxy (use with delegate)") + delegate_arg = smb_parser.add_argument( + "--delegate", action="store", help="Impersonate user with S4U2Self + S4U2Proxy" + ) + delegate_spn_arg = smb_parser.add_argument( + "--delegate-spn", + action=get_conditional_action(_StoreAction), + make_required=[], + help="SPN to use for S4U2Proxy, if not specified the SPN used will be cifs/", + type=str, + ) + generate_st = smb_parser.add_argument( + "--generate-st", + type=str, + dest="generate_st", + action=get_conditional_action(_StoreAction), + make_required=[], + help="Store the S4U Service Ticket in the specified file", + ) + self_delegate_arg = smb_parser.add_argument( + "--self", + dest="no_s4u2proxy", + action=get_conditional_action(_StoreTrueAction), + make_required=[], + help="Only do S4U2Self, no S4U2Proxy (use with delegate)", + ) dgroup = smb_parser.add_mutually_exclusive_group() - dgroup.add_argument("-d", "--domain", metavar="DOMAIN", dest="domain", type=str, help="domain to authenticate to") - dgroup.add_argument("--local-auth", action="store_true", help="authenticate locally to each target") + dgroup.add_argument( + "-d", + "--domain", + metavar="DOMAIN", + dest="domain", + type=str, + help="domain to authenticate to", + ) + dgroup.add_argument( + "--local-auth", action="store_true", help="authenticate locally to each target" + ) smb_parser.add_argument("--port", type=int, default=445, help="SMB port") - smb_parser.add_argument("--share", metavar="SHARE", default="C$", help="specify a share") - smb_parser.add_argument("--smb-server-port", default="445", help="specify a server port for SMB", type=int) - smb_parser.add_argument("--no-smbv1", action="store_true", help="Force to disable SMBv1 in connection") - smb_parser.add_argument("--no-admin-check", action="store_true", help="Avoid checking admin which queries the Service Control Manager") - smb_parser.add_argument("--gen-relay-list", metavar="OUTPUT_FILE", help="outputs all hosts that don't require SMB signing to the specified file") - smb_parser.add_argument("--smb-timeout", help="SMB connection timeout", type=int, default=2) - smb_parser.add_argument("--laps", dest="laps", metavar="LAPS", type=str, help="LAPS authentification", nargs="?", const="administrator") - smb_parser.add_argument("--generate-hosts-file", type=str, help="Generate a hosts file like from a range of IP") - smb_parser.add_argument("--generate-krb5-file", type=str, help="Generate a krb5 file like from a range of IP") + smb_parser.add_argument( + "--share", metavar="SHARE", default="C$", help="specify a share" + ) + smb_parser.add_argument( + "--smb-server-port", + default="445", + help="specify a server port for SMB", + type=int, + ) + smb_parser.add_argument( + "--no-smbv1", action="store_true", help="Force to disable SMBv1 in connection" + ) + smb_parser.add_argument( + "--no-admin-check", + action="store_true", + help="Avoid checking admin which queries the Service Control Manager", + ) + smb_parser.add_argument( + "--gen-relay-list", + metavar="OUTPUT_FILE", + help="outputs all hosts that don't require SMB signing to the specified file", + ) + smb_parser.add_argument( + "--smb-timeout", help="SMB connection timeout", type=int, default=2 + ) + smb_parser.add_argument( + "--laps", + dest="laps", + metavar="LAPS", + type=str, + help="LAPS authentification", + nargs="?", + const="administrator", + ) + smb_parser.add_argument( + "--generate-hosts-file", + type=str, + help="Generate a hosts file like from a range of IP", + ) + smb_parser.add_argument( + "--generate-krb5-file", + type=str, + help="Generate a krb5 file like from a range of IP", + ) smb_parser.add_argument("--generate-tgt", type=str, help="Generate a tgt ticket") self_delegate_arg.make_required = [delegate_arg] generate_st.make_required = [delegate_arg] delegate_spn_arg.make_required = [delegate_arg] cred_gathering_group = smb_parser.add_argument_group("Credential Gathering") - cred_gathering_group.add_argument("--sam", choices={"regdump", "secdump"}, nargs="?", const="regdump", help="dump SAM hashes from target systems") - cred_gathering_group.add_argument("--lsa", choices={"regdump", "secdump"}, nargs="?", const="regdump", help="dump LSA secrets from target systems") - ntds_arg = cred_gathering_group.add_argument("--ntds", choices={"vss", "drsuapi"}, nargs="?", const="drsuapi", help="dump the NTDS.dit from target DCs using the specifed method") - cred_gathering_group.add_argument("--history", action="store_true", help="Also retrieve password history (NTDS.dit or SAM)") + cred_gathering_group.add_argument( + "--sam", + choices={"regdump", "secdump"}, + nargs="?", + const="regdump", + help="dump SAM hashes from target systems", + ) + cred_gathering_group.add_argument( + "--lsa", + choices={"regdump", "secdump"}, + nargs="?", + const="regdump", + help="dump LSA secrets from target systems", + ) + ntds_arg = cred_gathering_group.add_argument( + "--ntds", + choices={"vss", "drsuapi"}, + nargs="?", + const="drsuapi", + help="dump the NTDS.dit from target DCs using the specifed method", + ) + cred_gathering_group.add_argument( + "--history", + action="store_true", + help="Also retrieve password history (NTDS.dit or SAM)", + ) # NTDS options - kerb_keys_arg = cred_gathering_group.add_argument("--kerberos-keys", action=get_conditional_action(_StoreTrueAction), make_required=[], help="Also dump Kerberos AES and DES keys from target DC (NTDS.dit)") + kerb_keys_arg = cred_gathering_group.add_argument( + "--kerberos-keys", + action=get_conditional_action(_StoreTrueAction), + make_required=[], + help="Also dump Kerberos AES and DES keys from target DC (NTDS.dit)", + ) exclusive = cred_gathering_group.add_mutually_exclusive_group() - enabled_arg = exclusive.add_argument("--enabled", action=get_conditional_action(_StoreTrueAction), make_required=[], help="Only dump enabled targets from DC (NTDS.dit)") + enabled_arg = exclusive.add_argument( + "--enabled", + action=get_conditional_action(_StoreTrueAction), + make_required=[], + help="Only dump enabled targets from DC (NTDS.dit)", + ) kerb_keys_arg.make_required = [ntds_arg] enabled_arg.make_required = [ntds_arg] - cred_gathering_group.add_argument("--user", dest="userntds", type=str, help="Dump selected user from DC (NTDS.dit)") - cred_gathering_group.add_argument("--dpapi", choices={"cookies", "nosystem"}, nargs="*", help="dump DPAPI secrets from target systems, can dump cookies if you add 'cookies', will not dump SYSTEM dpapi if you add nosystem") - cred_gathering_group.add_argument("--sccm", choices={"wmi", "disk"}, nargs="?", const="disk", help="dump SCCM secrets from target systems") - cred_gathering_group.add_argument("--mkfile", action="store", help="DPAPI option. File with masterkeys in form of {GUID}:SHA1") - cred_gathering_group.add_argument("--pvk", action="store", help="DPAPI option. File with domain backupkey") - cred_gathering_group.add_argument("--list-snapshots", nargs="?", dest="list_snapshots", const="ADMIN$", help="Lists the VSS snapshots (default: %(const)s)") + cred_gathering_group.add_argument( + "--user", + dest="userntds", + type=str, + help="Dump selected user from DC (NTDS.dit)", + ) + cred_gathering_group.add_argument( + "--dpapi", + choices={"cookies", "nosystem"}, + nargs="*", + help="dump DPAPI secrets from target systems, can dump cookies if you add 'cookies', will not dump SYSTEM dpapi if you add nosystem", + ) + cred_gathering_group.add_argument( + "--sccm", + choices={"wmi", "disk"}, + nargs="?", + const="disk", + help="dump SCCM secrets from target systems", + ) + cred_gathering_group.add_argument( + "--mkfile", + action="store", + help="DPAPI option. File with masterkeys in form of {GUID}:SHA1", + ) + cred_gathering_group.add_argument( + "--pvk", action="store", help="DPAPI option. File with domain backupkey" + ) + cred_gathering_group.add_argument( + "--list-snapshots", + nargs="?", + dest="list_snapshots", + const="ADMIN$", + help="Lists the VSS snapshots (default: %(const)s)", + ) mapping_enum_group = smb_parser.add_argument_group("Mapping/Enumeration") - mapping_enum_group.add_argument("--shares", type=str, nargs="?", const="", help="Enumerate shares and access, filter on specified argument (read ; write ; read,write)") - mapping_enum_group.add_argument("--exclude-shares", nargs="+", help="List of shares to exclude from enumeration (e.g., C$ Admin$ IPC$)") - mapping_enum_group.add_argument("--dir", nargs="?", type=str, const="", help="List the content of a path (default path: '%(const)s')") - mapping_enum_group.add_argument("--interfaces", action="store_true", help="Enumerate network interfaces") - mapping_enum_group.add_argument("--no-write-check", action="store_true", help="Skip write check on shares (avoid leaving traces when missing delete permissions)") - mapping_enum_group.add_argument("--filter-shares", nargs="+", help="Filter share by access, option 'READ' 'WRITE' or 'READ,WRITE'") - mapping_enum_group.add_argument("--disks", action="store_true", help="Enumerate disks") - mapping_enum_group.add_argument("--users", nargs="*", metavar="USER", help="Enumerate domain users, if a user is specified than only its information is queried.") - mapping_enum_group.add_argument("--users-export", help="Enumerate domain users and export them to the specified file") - mapping_enum_group.add_argument("--groups", nargs="?", const="", metavar="GROUP", help="Enumerate domain groups, if a group is specified than its members are Enumerated") - mapping_enum_group.add_argument("--local-groups", nargs="?", const="", metavar="GROUP", help="Enumerate local groups, if a group is specified then its members are Enumerated") - mapping_enum_group.add_argument("--computers", nargs="?", const="", metavar="COMPUTER", help="Enumerate computer users") - mapping_enum_group.add_argument("--pass-pol", action="store_true", help="dump password policy") - mapping_enum_group.add_argument("--rid-brute", nargs="?", type=int, const=4000, metavar="MAX_RID", help="Enumerate users by bruteforcing RIDs") - mapping_enum_group.add_argument("--smb-sessions", action="store_true", help="Enumerate active smb sessions") - mapping_enum_group.add_argument("--reg-sessions", type=str, nargs="?", const="", help="Enumerate users sessions using the Remote Registry. If a username is given, filter for it. If a file is given, filter for listed usernames. If no value is given, list all.") - mapping_enum_group.add_argument("--loggedon-users", nargs="?", const="", help="Enumerate logged on users, if a user is specified than a regex filter is applied.") - mapping_enum_group.add_argument("--loggedon-users-filter", action="store", help="only search for specific user, works with regex") - mapping_enum_group.add_argument("--qwinsta", type=str, nargs="?", const="", help="Enumerate user sessions. If a username is given, filter for it; if a file is given, filter for listed usernames. If no value is given, list all.") - mapping_enum_group.add_argument("--tasklist", type=str, nargs="?", const=True, help="Enumerate running processes and filter for the specified one if specified") - mapping_enum_group.add_argument("--taskkill", type=str, help="Kills a specific PID or a proces name's PID's") + mapping_enum_group.add_argument( + "--shares", + type=str, + nargs="?", + const="", + help="Enumerate shares and access, filter on specified argument (read ; write ; read,write)", + ) + mapping_enum_group.add_argument( + "--exclude-shares", + nargs="+", + help="List of shares to exclude from enumeration (e.g., C$ Admin$ IPC$)", + ) + mapping_enum_group.add_argument( + "--dir", + nargs="?", + type=str, + const="", + help="List the content of a path (default path: '%(const)s')", + ) + mapping_enum_group.add_argument( + "--interfaces", action="store_true", help="Enumerate network interfaces" + ) + mapping_enum_group.add_argument( + "--no-write-check", + action="store_true", + help="Skip write check on shares (avoid leaving traces when missing delete permissions)", + ) + mapping_enum_group.add_argument( + "--shares-depth", + type=int, + default=0, + metavar="DEPTH", + help="Depth to check write permissions in subdirectories (default: 0, only root)", + ) + mapping_enum_group.add_argument( + "--filter-shares", + nargs="+", + help="Filter share by access, option 'READ' 'WRITE' or 'READ,WRITE'", + ) + mapping_enum_group.add_argument( + "--disks", action="store_true", help="Enumerate disks" + ) + mapping_enum_group.add_argument( + "--users", + nargs="*", + metavar="USER", + help="Enumerate domain users, if a user is specified than only its information is queried.", + ) + mapping_enum_group.add_argument( + "--users-export", + help="Enumerate domain users and export them to the specified file", + ) + mapping_enum_group.add_argument( + "--groups", + nargs="?", + const="", + metavar="GROUP", + help="Enumerate domain groups, if a group is specified than its members are Enumerated", + ) + mapping_enum_group.add_argument( + "--local-groups", + nargs="?", + const="", + metavar="GROUP", + help="Enumerate local groups, if a group is specified then its members are Enumerated", + ) + mapping_enum_group.add_argument( + "--computers", + nargs="?", + const="", + metavar="COMPUTER", + help="Enumerate computer users", + ) + mapping_enum_group.add_argument( + "--pass-pol", action="store_true", help="dump password policy" + ) + mapping_enum_group.add_argument( + "--rid-brute", + nargs="?", + type=int, + const=4000, + metavar="MAX_RID", + help="Enumerate users by bruteforcing RIDs", + ) + mapping_enum_group.add_argument( + "--smb-sessions", action="store_true", help="Enumerate active smb sessions" + ) + mapping_enum_group.add_argument( + "--reg-sessions", + type=str, + nargs="?", + const="", + help="Enumerate users sessions using the Remote Registry. If a username is given, filter for it. If a file is given, filter for listed usernames. If no value is given, list all.", + ) + mapping_enum_group.add_argument( + "--loggedon-users", + nargs="?", + const="", + help="Enumerate logged on users, if a user is specified than a regex filter is applied.", + ) + mapping_enum_group.add_argument( + "--loggedon-users-filter", + action="store", + help="only search for specific user, works with regex", + ) + mapping_enum_group.add_argument( + "--qwinsta", + type=str, + nargs="?", + const="", + help="Enumerate user sessions. If a username is given, filter for it; if a file is given, filter for listed usernames. If no value is given, list all.", + ) + mapping_enum_group.add_argument( + "--tasklist", + type=str, + nargs="?", + const=True, + help="Enumerate running processes and filter for the specified one if specified", + ) + mapping_enum_group.add_argument( + "--taskkill", type=str, help="Kills a specific PID or a proces name's PID's" + ) wmi_group = smb_parser.add_argument_group("WMI Queries") - wmi_group.add_argument("--wmi-query", metavar="QUERY", dest="wmi_query", type=str, help="Issues the specified WMI query") - wmi_group.add_argument("--wmi-namespace", metavar="NAMESPACE", default="root\\cimv2", help="WMI Namespace (default: %(default)s)") + wmi_group.add_argument( + "--wmi-query", + metavar="QUERY", + dest="wmi_query", + type=str, + help="Issues the specified WMI query", + ) + wmi_group.add_argument( + "--wmi-namespace", + metavar="NAMESPACE", + default="root\\cimv2", + help="WMI Namespace (default: %(default)s)", + ) spidering_group = smb_parser.add_argument_group("Spidering Shares") - spidering_group.add_argument("--spider", metavar="SHARE", type=str, help="share to spider") - spidering_group.add_argument("--spider-folder", metavar="FOLDER", default=".", type=str, help="folder to spider") - spidering_group.add_argument("--content", action="store_true", help="enable file content searching") - spidering_group.add_argument("--exclude-dirs", type=str, metavar="DIR_LIST", default="", help="directories to exclude from spidering") + spidering_group.add_argument( + "--spider", metavar="SHARE", type=str, help="share to spider" + ) + spidering_group.add_argument( + "--spider-folder", + metavar="FOLDER", + default=".", + type=str, + help="folder to spider", + ) + spidering_group.add_argument( + "--content", action="store_true", help="enable file content searching" + ) + spidering_group.add_argument( + "--exclude-dirs", + type=str, + metavar="DIR_LIST", + default="", + help="directories to exclude from spidering", + ) spidering_group.add_argument("--depth", type=int, help="max spider recursion depth") - spidering_group.add_argument("--only-files", action="store_true", help="only spider files") - spidering_group.add_argument("--silent", action="store_true", help="Do not print found files/directories", default=False) + spidering_group.add_argument( + "--only-files", action="store_true", help="only spider files" + ) + spidering_group.add_argument( + "--silent", + action="store_true", + help="Do not print found files/directories", + default=False, + ) segroup = spidering_group.add_mutually_exclusive_group() - segroup.add_argument("--pattern", nargs="+", help="pattern(s) to search for in folders, filenames and file content") - segroup.add_argument("--regex", nargs="+", help="regex(s) to search for in folders, filenames and file content") + segroup.add_argument( + "--pattern", + nargs="+", + help="pattern(s) to search for in folders, filenames and file content", + ) + segroup.add_argument( + "--regex", + nargs="+", + help="regex(s) to search for in folders, filenames and file content", + ) files_group = smb_parser.add_argument_group("File Operations") - files_group.add_argument("--put-file", action="append", nargs=2, metavar="FILE", help="Put a local file into remote target, ex: whoami.txt \\\\Windows\\\\Temp\\\\whoami.txt") - files_group.add_argument("--get-file", action="append", nargs=2, metavar="FILE", help="Get a remote file, ex: \\\\Windows\\\\Temp\\\\whoami.txt whoami.txt") - files_group.add_argument("--append-host", action="store_true", help="append the host to the get-file filename") + files_group.add_argument( + "--put-file", + action="append", + nargs=2, + metavar="FILE", + help="Put a local file into remote target, ex: whoami.txt \\\\Windows\\\\Temp\\\\whoami.txt", + ) + files_group.add_argument( + "--get-file", + action="append", + nargs=2, + metavar="FILE", + help="Get a remote file, ex: \\\\Windows\\\\Temp\\\\whoami.txt whoami.txt", + ) + files_group.add_argument( + "--append-host", + action="store_true", + help="append the host to the get-file filename", + ) cmd_exec_group = smb_parser.add_argument_group("Command Execution") - cmd_exec_group.add_argument("--exec-method", choices={"wmiexec", "mmcexec", "smbexec", "atexec"}, default="wmiexec", help="method to execute the command. Ignored if in MSSQL mode", action=DefaultTrackingAction) - cmd_exec_group.add_argument("--dcom-timeout", help="DCOM connection timeout", type=int, default=5) - cmd_exec_group.add_argument("--get-output-tries", help="Number of times atexec/smbexec/mmcexec tries to get results", type=int, default=10) - cmd_exec_group.add_argument("--codec", default="utf-8", help="Set encoding used (codec) from the target's output. If errors are detected, run chcp.com at the target & map the result with https://docs.python.org/3/library/codecs.html#standard-encodings and then execute again with --codec and the corresponding codec") - cmd_exec_group.add_argument("--no-output", action="store_true", help="do not retrieve command output") + cmd_exec_group.add_argument( + "--exec-method", + choices={"wmiexec", "mmcexec", "smbexec", "atexec"}, + default="wmiexec", + help="method to execute the command. Ignored if in MSSQL mode", + action=DefaultTrackingAction, + ) + cmd_exec_group.add_argument( + "--dcom-timeout", help="DCOM connection timeout", type=int, default=5 + ) + cmd_exec_group.add_argument( + "--get-output-tries", + help="Number of times atexec/smbexec/mmcexec tries to get results", + type=int, + default=10, + ) + cmd_exec_group.add_argument( + "--codec", + default="utf-8", + help="Set encoding used (codec) from the target's output. If errors are detected, run chcp.com at the target & map the result with https://docs.python.org/3/library/codecs.html#standard-encodings and then execute again with --codec and the corresponding codec", + ) + cmd_exec_group.add_argument( + "--no-output", action="store_true", help="do not retrieve command output" + ) cmd_exec_method_group = cmd_exec_group.add_mutually_exclusive_group() - cmd_exec_method_group.add_argument("-x", metavar="COMMAND", dest="execute", help="execute the specified CMD command") - cmd_exec_method_group.add_argument("-X", metavar="PS_COMMAND", dest="ps_execute", help="execute the specified PowerShell command") + cmd_exec_method_group.add_argument( + "-x", + metavar="COMMAND", + dest="execute", + help="execute the specified CMD command", + ) + cmd_exec_method_group.add_argument( + "-X", + metavar="PS_COMMAND", + dest="ps_execute", + help="execute the specified PowerShell command", + ) posh_group = smb_parser.add_argument_group("Powershell Script Obfuscation") - posh_group.add_argument("--obfs", action="store_true", help="Obfuscate PowerShell scripts") - posh_group.add_argument("--amsi-bypass", nargs=1, metavar="FILE", help="File with a custom AMSI bypass") - posh_group.add_argument("--clear-obfscripts", action="store_true", help="Clear all cached obfuscated PowerShell scripts") - posh_group.add_argument("--force-ps32", action="store_true", help="force PowerShell commands to run in a 32-bit process (may not apply to modules)") - posh_group.add_argument("--no-encode", action="store_true", default=False, help="Do not encode the PowerShell command ran on target") + posh_group.add_argument( + "--obfs", action="store_true", help="Obfuscate PowerShell scripts" + ) + posh_group.add_argument( + "--amsi-bypass", nargs=1, metavar="FILE", help="File with a custom AMSI bypass" + ) + posh_group.add_argument( + "--clear-obfscripts", + action="store_true", + help="Clear all cached obfuscated PowerShell scripts", + ) + posh_group.add_argument( + "--force-ps32", + action="store_true", + help="force PowerShell commands to run in a 32-bit process (may not apply to modules)", + ) + posh_group.add_argument( + "--no-encode", + action="store_true", + default=False, + help="Do not encode the PowerShell command ran on target", + ) return parser From c68598190354f00ca242e6302575e1fee319b4bb Mon Sep 17 00:00:00 2001 From: sup3rDav3 Date: Sun, 5 Apr 2026 23:31:25 -0400 Subject: [PATCH 2/2] test: add shares-depth to e2e test commands --- tests/e2e_commands.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e_commands.txt b/tests/e2e_commands.txt index f9318889b8..700cb20204 100644 --- a/tests/e2e_commands.txt +++ b/tests/e2e_commands.txt @@ -328,3 +328,4 @@ netexec nfs TARGET_HOST -u "" -p "" --shares netexec nfs TARGET_HOST -u "" -p "" --enum-shares netexec nfs TARGET_HOST -u "" -p "" --get-file /NFStest/test/test.txt ../test.txt netexec nfs TARGET_HOST -u "" -p "" --put-file ../test.txt /NFStest/test +netexec smb TARGET_HOST -u LOGIN_USERNAME -p LOGIN_PASSWORD --shares --shares-depth 1