Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 80 additions & 115 deletions modules/scanners/tcp_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,166 +8,131 @@

import socket
import threading
from queue import Queue, Empty
from BaseModule import AuxiliaryModule


class TCPScanner(AuxiliaryModule):
def __init__(self):
super().__init__()
# metadata - use lowercase keys to match BaseModule

self.info.update({
'name': 'TCP Port Scanner',
'description': 'Performs TCP port scanning against targets',
'author': 'Abhay Pratap Singh',
'version': '1.0'
'version': '4.0'
})

# module options (flat, compatible with BaseModule.get_option / set_option)
self.options.update({
'RHOSTS': '', # inherited but re-stated for clarity
'PORTS': '1-1000', # port range string (e.g., "1-1000,80,443")
'RPORT': 0, # kept for compatibility; not used if PORTS provided
'THREADS': 10, # controls concurrency across targets (run_threaded)
'TIMEOUT': 5 # per-socket timeout in seconds (fallback)
'RHOSTS': '',
'RPORT': '1-1000',
'THREADS': 100,
'TIMEOUT': 1
})

# AuxiliaryModule already adds RHOSTS to required_options, but ensure it:
self.required_options.add('RHOSTS')
self.required_options.add('RPORT')

def parse_ports(self, port_string):
"""Parse port string into list of ports (supports ranges and comma lists)."""
ports = []
if not port_string:
return ports
self.q = None
self.timeout = 1


def parse_ports(self, port_string):
ports = set()
for part in str(port_string).split(','):
part = part.strip()
if not part:
continue
if '-' in part:
try:
start_s, end_s = part.split('-', 1)
start = int(start_s.strip())
end = int(end_s.strip())
if start <= 0:
start = 1
if end > 65535:
end = 65535
if end >= start:
ports.extend(range(start, end + 1))
start, end = part.split('-', 1)
for p in range(int(start), int(end) + 1):
if 0 < p <= 65535:
ports.add(p)
except ValueError:
self.print_error(f"Invalid port range: {part}")
else:
try:
p = int(part)
if 0 < p <= 65535:
ports.append(p)
else:
self.print_error(f"Invalid port: {part}")
ports.add(p)
except ValueError:
self.print_error(f"Invalid port: {part}")
return sorted(ports)

return sorted(list(set(ports)))

def banner_grab(self, target, port):
"""Try to grab a banner from an open TCP port (non-blocking safe)."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(int(self.get_option('TIMEOUT') or 5))
s.connect((target, port))
# try to receive a short banner (some services send immediately)

def worker(self, target):
while self.running:
try:
data = s.recv(1024)
banner = data.decode(errors='ignore').strip()
except Exception:
banner = ""
try:
s.close()
except Exception:
pass
return banner
except Exception:
return ""

def scan_target_ports(self, target):
"""
Scan multiple ports on a single target. This method is run per-target,
and returns list of open-port info dicts for that target.
"""
ports = self.parse_ports(self.get_option('PORTS'))
if not ports:
self.print_error("No valid ports specified in PORTS option")
return []

results = []
timeout = int(self.get_option('TIMEOUT') or 5)

# For each port, check connect_ex (fast)
for port in ports:
port = self.q.get(timeout=0.3)
except Empty:
return

try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
res = sock.connect_ex((target, port))
try:
sock.close()
except Exception:
pass

if res == 0:
banner = self.banner_grab(target, port)
info = {
'target': target,
'port': port,
'status': 'open',
'banner': banner
}
sock.settimeout(self.timeout)

if sock.connect_ex((target, port)) == 0:
banner = ""
try:
sock.settimeout(0.2) # FAST banner grab
data = sock.recv(1024)
banner = data.decode(errors='ignore').strip()
except Exception:
pass

if banner:
self.print_good(f"{target}:{port} - Open - {banner}")
self.print_good(f"{target}:{port} OPEN | {banner}")
else:
self.print_good(f"{target}:{port} - Open")
results.append(info)
except Exception as e:
if self.get_option('VERBOSE'):
self.print_error(f"Error scanning {target}:{port} - {e}")
self.print_good(f"{target}:{port} OPEN")

return results
sock.close()
except Exception:
pass
finally:
self.q.task_done()


def scan_target(self, target):
"""
Adapter for run_threaded: called per-target.
Returns None (run_threaded expects functions that don't return to collector),
but we keep behavior of printing results inside scan_target_ports.
"""
self.print_status(f"Scanning {target} ...")
self.scan_target_ports(target)
self.print_status(f"Scanning {target}")
ports = self.parse_ports(self.get_option('RPORT'))
self.timeout = int(self.get_option('TIMEOUT'))

def run(self):
"""Main run() for AuxiliaryModule style: iterate RHOSTS and scan ports."""
if super().run() == False:
return False
self.q = Queue()
for p in ports:
self.q.put(p)

targets_string = self.get_option('RHOSTS')
if not targets_string:
self.print_error("RHOSTS not set. Use 'set RHOSTS <target[,target2,...]>'")
return False
threads = []
for _ in range(int(self.get_option('THREADS'))):
t = threading.Thread(target=self.worker, args=(target,))
t.start()
threads.append(t)

try:
while not self.q.empty() and self.running:
pass
except KeyboardInterrupt:
self.running = False
self.print_status("Scan interrupted by user (Ctrl+C)")

for t in threads:
t.join(timeout=1)

targets = [t.strip() for t in targets_string.split(",") if t.strip()]
if not targets:
self.print_error("No valid targets in RHOSTS")

def run(self):
if super().run() is False:
return False

# Use run_threaded to run scan_target concurrently across multiple targets
max_threads = int(self.get_option('THREADS') or 5)
self.print_status(f"Starting TCP port scan of {len(targets)} target(s) with {max_threads} threads...")
self.run_threaded(self.scan_target, targets, max_threads=max_threads)
targets = [t.strip() for t in self.get_option('RHOSTS').split(',') if t.strip()]

try:
for target in targets:
if not self.running:
break
self.scan_target(target)
except KeyboardInterrupt:
self.running = False
self.print_status("Execution stopped by user")

self.print_good("TCP port scan completed")
self.cleanup()
return True


if __name__ == "__main__":
# quick local test
m = TCPScanner()
m.set_option('RHOSTS', '127.0.0.1')
m.set_option('PORTS', '21,22,80,443')
m.run()