From f8a30b17a0fd927302c831ae240a8d758d19cbd8 Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Tue, 30 Jul 2024 14:57:21 +0300 Subject: [PATCH 01/12] Add script to dump traffic on production --- helpers/tcpdump.py | 88 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 helpers/tcpdump.py diff --git a/helpers/tcpdump.py b/helpers/tcpdump.py new file mode 100644 index 000000000..0d6ba509a --- /dev/null +++ b/helpers/tcpdump.py @@ -0,0 +1,88 @@ +import datetime +import os +import time +import subprocess +import signal +import argparse +import shutil + +class Logger(): + def __run_tcpdump(self, ethname) -> None: + """ + Run `tcpdump` before the test if `-s` (--save-tcpdump) option is used. + Save result in a .pcap file, where is name of test. + """ + path = f"/var/tcpdump/{datetime.date.today()}" + file_name = datetime.datetime.now().strftime('%H:%M:%S') + + if not os.path.isdir(path): + os.makedirs(path) + self.__tcpdump = subprocess.Popen( + [ + "tcpdump", + "-U", + "-i", + f"{ethname}", + "-w", + f"{path}/{file_name}.pcap", + ], + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + def __clear_tcpdump_files(self, directory, cur_time, time_to_clear_dump) -> None: + for _f in os.listdir(directory): + f = os.path.join(directory, _f) + t = os.path.getctime(f) + if cur_time - t > time_to_clear_dump: + os.remove(f) + + def __clear_tcpdump(self, cur_time, time_to_clear_dump) -> None: + path = f"/var/tcpdump/" + directory = os.fsencode(path) + + for _d in os.listdir(directory): + d = os.path.join(directory, _d) + if d.decode('UTF-8') != f"/var/tcpdump/{datetime.date.today()}": + shutil.rmtree(d) + else: + self.__clear_tcpdump_files(d, cur_time, time_to_clear_dump) + + def __stop_tcpdump(self) -> None: + """ + Stop tcpdump. + `wait()` always causes `TimeoutExpired` error because `tcpdump` cannot terminate on + its own. But it requires a timeout to flush data from buffer. + """ + try: + self.__tcpdump.send_signal(signal.SIGUSR2) + self.__tcpdump.wait(timeout=3) + except subprocess.TimeoutExpired: + self.__tcpdump.kill() + self.__tcpdump.wait() + + self.__tcpdump = None + + def run(self, ethname, time_to_new_dump=3600, time_to_clear_dump=21600) -> None: + t0 = time.time() + self.__run_tcpdump(ethname) + + while True: + t = time.time() + self.__clear_tcpdump(t, time_to_clear_dump) + if t - t0 > time_to_new_dump: + self.__stop_tcpdump() + self.__run_tcpdump(ethname) + t0 = t + time.sleep(time_to_new_dump / 10 if time_to_new_dump > 10 else 1) + +parser = argparse.ArgumentParser() +parser.add_argument("-e", "--ethname", type=str) +parser.add_argument("-tn", "--time-to-new-dump", type=int) +parser.add_argument("-tc", "--time-to-clear-dump", type=int) +args = parser.parse_args() + +Log = Logger() +Log.run(args.ethname, args.time_to_new_dump, args.time_to_clear_dump) + From b1e6100320cac2d043298f8f723196cfa25e2d3a Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Tue, 30 Jul 2024 17:47:29 +0300 Subject: [PATCH 02/12] Add tests to replay pcap files --- t_replay/__init__.py | 5 +++ t_replay/test_replay.py | 85 ++++++++++++++++++++++++++++++++++++++ tests_disabled.json | 4 ++ tests_disabled_remote.json | 4 ++ tests_disabled_tcpseg.json | 4 ++ 5 files changed, 102 insertions(+) create mode 100644 t_replay/__init__.py create mode 100644 t_replay/test_replay.py diff --git a/t_replay/__init__.py b/t_replay/__init__.py new file mode 100644 index 000000000..055d3f1ef --- /dev/null +++ b/t_replay/__init__.py @@ -0,0 +1,5 @@ +__all__ = [ + "test_replay", +] + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/t_replay/test_replay.py b/t_replay/test_replay.py new file mode 100644 index 000000000..bbaf58fcb --- /dev/null +++ b/t_replay/test_replay.py @@ -0,0 +1,85 @@ +""" +Test TempestaFW reeboot under load. +""" + +__author__ = "Tempesta Technologies, Inc." +__copyright__ = "Copyright (C) 2017-2024 Tempesta Technologies, Inc." +__license__ = "GPL2" + +from framework import tester +from helpers import remote, sysnet, tf_cfg + + +class TestReplay(tester.TempestaTest): + clients = [ + {"id": "tcpreplay", "type": "external", "binary": "tcpreplay", "ssl": True, "cmd_args": ""}, + ] + + backends = [ + { + "id": "deproxy", + "type": "deproxy", + "port": "8080", + "response": "static", + "response_content": ("HTTP/1.1 200 OK\r\n" "Content-length: 0\r\n" "\r\n"), + }, + { + "id": "deproxy_h2", + "type": "deproxy", + "port": "8443", + "response": "static", + "response_content": ("HTTP/1.1 200 OK\r\n" "Content-length: 0\r\n" "\r\n"), + }, + ] + + tempesta = { + "config": """ + listen 443 proto=https,h2; + listen 80 proto=http; + + access_log on; + + block_action attack reply; + block_action error reply; + + tls_certificate ${tempesta_workdir}/tempesta.crt; + tls_certificate_key ${tempesta_workdir}/tempesta.key; + + srv_group h2 { + server ${server_ip}:8443; + } + srv_group http { + server ${server_ip}:8080; + } + + vhost h2 { + proxy_pass h2; + } + + vhost http { + proxy_pass http; + } + + http_chain { + mark == 1 -> http; + ->h2; + } + """ + } + + def test_replay(self) -> None: + self.start_all_servers() + self.start_tempesta() + + ETH = sysnet.route_dst_ip(remote.tempesta, tf_cfg.cfg.get("Tempesta", "ip")) + tcpreplay = self.get_client("tcpreplay") + tcpreplay.options = [f"-i {ETH} /tmp/tcpdump/replay.pcap"] + + tcpreplay.start() + self.wait_while_busy(tcpreplay) + tcpreplay.stop() + + print(tcpreplay.response_msg) + + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tests_disabled.json b/tests_disabled.json index 8bc839d8e..1415900f3 100644 --- a/tests_disabled.json +++ b/tests_disabled.json @@ -1,6 +1,10 @@ { "disable" : true, "disabled" : [ + { + "name": "t_replay", + "reason": "These tests should not be run on CI. Run it only to reproduce bugs on production" + }, { "name" : "tcp_connection.test_connection_close.CloseClientConnectiononInvalidReq", "reason" : "Invalid request is parsed as two requests. The second response is not supported by current deproxy architecture." diff --git a/tests_disabled_remote.json b/tests_disabled_remote.json index 0434a56c5..349bfd39a 100644 --- a/tests_disabled_remote.json +++ b/tests_disabled_remote.json @@ -1,6 +1,10 @@ { "disable" : true, "disabled" : [ + { + "name": "t_replay", + "reason": "These tests should not be run on CI. Run it only to reproduce bugs on production" + }, { "name": "t_stress.test_nginx", "reason": "These tests should not be run with remote setup." diff --git a/tests_disabled_tcpseg.json b/tests_disabled_tcpseg.json index 0aecd4f28..21f28672a 100644 --- a/tests_disabled_tcpseg.json +++ b/tests_disabled_tcpseg.json @@ -1,6 +1,10 @@ { "disable" : true, "disabled" : [ + { + "name": "t_replay", + "reason": "These tests should not be run on CI. Run it only to reproduce bugs on production" + }, { "name": "t_fault_injection", "reason": "These tests should not be run with TCP segmentation." From 7016bb0aa01949b6124bbacf1f3546fed15f2b63 Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Tue, 30 Jul 2024 19:45:24 +0300 Subject: [PATCH 03/12] Fix according review --- helpers/tcpdump.py | 178 ++++++++++++++++++++++++++++++--------------- 1 file changed, 118 insertions(+), 60 deletions(-) diff --git a/helpers/tcpdump.py b/helpers/tcpdump.py index 0d6ba509a..fc4a49f52 100644 --- a/helpers/tcpdump.py +++ b/helpers/tcpdump.py @@ -6,83 +6,141 @@ import argparse import shutil -class Logger(): - def __run_tcpdump(self, ethname) -> None: + +class Logger: + def __run_tcpdump( + self, ethname, file_size, file_count, file_name, ports, src, dst, direction + ) -> None: """ - Run `tcpdump` before the test if `-s` (--save-tcpdump) option is used. - Save result in a .pcap file, where is name of test. + Save result in a .pcap file. """ path = f"/var/tcpdump/{datetime.date.today()}" - file_name = datetime.datetime.now().strftime('%H:%M:%S') if not os.path.isdir(path): os.makedirs(path) - self.__tcpdump = subprocess.Popen( - [ - "tcpdump", - "-U", - "-i", - f"{ethname}", - "-w", - f"{path}/{file_name}.pcap", - ], - shell=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - def __clear_tcpdump_files(self, directory, cur_time, time_to_clear_dump) -> None: - for _f in os.listdir(directory): - f = os.path.join(directory, _f) - t = os.path.getctime(f) - if cur_time - t > time_to_clear_dump: - os.remove(f) - - def __clear_tcpdump(self, cur_time, time_to_clear_dump) -> None: - path = f"/var/tcpdump/" - directory = os.fsencode(path) - - for _d in os.listdir(directory): - d = os.path.join(directory, _d) - if d.decode('UTF-8') != f"/var/tcpdump/{datetime.date.today()}": - shutil.rmtree(d) - else: - self.__clear_tcpdump_files(d, cur_time, time_to_clear_dump) - - def __stop_tcpdump(self) -> None: + + args = [ + "tcpdump", + "-U", + "-i", + f"{ethname}", + "-C", + f"{file_size}", + "-W", + f"{file_count}", + "-w", + f"{path}/{file_name}-{direction}.pcap", + "-Q", + f"{direction}", + "-Z", + "root", + ] + + if ports: + args += ["port"] + args += [f"{ports[0]}"] + + for i in range(1, len(ports)): + args += ["and"] + args += ["port"] + args += [f"{ports[i]}"] + + if src: + args += [f"ip src {src}"] + if dst: + args += [f"ip dst {dst}"] + + if direction == "in": + self.__tcpdump_in = subprocess.Popen( + args=args, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + elif direction == "out": + self.__tcpdump_out = subprocess.Popen( + args=args, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + else: + print("Fail to start tcpdump - invalid direction") + return + + def __stop_tcpdump(self, direction) -> None: """ Stop tcpdump. - `wait()` always causes `TimeoutExpired` error because `tcpdump` cannot terminate on - its own. But it requires a timeout to flush data from buffer. + `wait()` should never causes `TimeoutExpired` error because `tcpdump` can + be successfully terminate by SIGINT. But it requires a timeout to flush + data from buffer. """ + + __tcpdump = None + if direction == "in": + __tcpdump = self.__tcpdump_in + elif direction == "out": + __tcpdump = self.__tcpdump_out + else: + print("Fail to stop tcpdump - invalid direction") + return + try: - self.__tcpdump.send_signal(signal.SIGUSR2) - self.__tcpdump.wait(timeout=3) + __tcpdump.send_signal(signal.SIGINT) + __tcpdump.wait(timeout=3) except subprocess.TimeoutExpired: - self.__tcpdump.kill() - self.__tcpdump.wait() + __tcpdump.kill() + __tcpdump.wait() - self.__tcpdump = None + if direction == "in": + self.__tcpdump_in = None + elif direction == "out": + self.__tcpdump_out = None - def run(self, ethname, time_to_new_dump=3600, time_to_clear_dump=21600) -> None: - t0 = time.time() - self.__run_tcpdump(ethname) + def run( + self, + ethname=None, + exec_time=None, + file_size=None, + file_count=None, + file_name=None, + ports=None, + src=None, + dst=None, + ) -> None: + ethname = ethname if ethname else "lo" + exec_time = exec_time if exec_time else 60 + file_size = file_size if file_size else 50 + file_count = file_count if file_count else 10 + file_name = file_name if file_name else f"{datetime.datetime.now().strftime('%H:%M:%S')}" + + self.__run_tcpdump(ethname, file_size, file_count, file_name, ports, src, dst, "in") + self.__run_tcpdump(ethname, file_size, file_count, file_name, ports, src, dst, "out") + time.sleep(exec_time * 60) + self.__stop_tcpdump("in") + self.__stop_tcpdump("out") - while True: - t = time.time() - self.__clear_tcpdump(t, time_to_clear_dump) - if t - t0 > time_to_new_dump: - self.__stop_tcpdump() - self.__run_tcpdump(ethname) - t0 = t - time.sleep(time_to_new_dump / 10 if time_to_new_dump > 10 else 1) parser = argparse.ArgumentParser() parser.add_argument("-e", "--ethname", type=str) -parser.add_argument("-tn", "--time-to-new-dump", type=int) -parser.add_argument("-tc", "--time-to-clear-dump", type=int) +parser.add_argument("-t", "--exec-time", type=int) +parser.add_argument("-s", "--file-size", type=int) +parser.add_argument("-c", "--file-count", type=int) +parser.add_argument("-n", "--file-name", type=str) +parser.add_argument("-p", "--port", action="append", type=int) +parser.add_argument("--src", type=str) +parser.add_argument("--dst", type=str) + args = parser.parse_args() Log = Logger() -Log.run(args.ethname, args.time_to_new_dump, args.time_to_clear_dump) - +Log.run( + args.ethname, + exec_time=args.exec_time, + file_size=args.file_size, + file_count=args.file_count, + file_name=args.file_name, + ports=args.port, + src=args.src, + dst=args.dst, +) From d57320f9ebbc0d45d0fa44f944e06abd5a5bb2fa Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Wed, 31 Jul 2024 11:03:09 +0300 Subject: [PATCH 04/12] TMP --- helpers/tcpdump.py | 48 +++++++++++++++++++++++++++++++++-------- t_replay/test_replay.py | 14 ++++++------ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/helpers/tcpdump.py b/helpers/tcpdump.py index fc4a49f52..8a343a1f6 100644 --- a/helpers/tcpdump.py +++ b/helpers/tcpdump.py @@ -41,13 +41,15 @@ def __run_tcpdump( args += [f"{ports[0]}"] for i in range(1, len(ports)): - args += ["and"] + args += ["or"] args += ["port"] args += [f"{ports[i]}"] if src: args += [f"ip src {src}"] if dst: + if src: + args += ["or"] args += [f"ip dst {dst}"] if direction == "in": @@ -122,14 +124,42 @@ def run( parser = argparse.ArgumentParser() -parser.add_argument("-e", "--ethname", type=str) -parser.add_argument("-t", "--exec-time", type=int) -parser.add_argument("-s", "--file-size", type=int) -parser.add_argument("-c", "--file-count", type=int) -parser.add_argument("-n", "--file-name", type=str) -parser.add_argument("-p", "--port", action="append", type=int) -parser.add_argument("--src", type=str) -parser.add_argument("--dst", type=str) +parser.add_argument("-e", "--ethname", type=str, help="Device name to capture packets.") +parser.add_argument( + "-t", "--exec-time", type=int, help="Execution time in minutes (60 by default)." +) +parser.add_argument( + "-s", + "--file-size", + type=int, + help="Dump file size in megabytes (50 by default). When size is exceeded new file will be created.", +) +parser.add_argument( + "-c", + "--file-count", + type=int, + help="Count of dump files (10 by default). When count is exceeded new file overwrite old file.", +) +parser.add_argument( + "-n", "--file-name", type=str, help="Dump file name (current time in H:M:S by default)." +) +parser.add_argument( + "-p", + "--port", + action="append", + type=int, + help="Ports, used in tcpdump filtration (empty by default, dump for all ports).", +) +parser.add_argument( + "--src", + type=str, + help="Source ip address, used in tcpdump filtration (empty by default, dump for all source ip).", +) +parser.add_argument( + "--dst", + type=str, + help="Destination ip address, used in tcpdump filtration (empty by default, dump for all destination ip).", +) args = parser.parse_args() diff --git a/t_replay/test_replay.py b/t_replay/test_replay.py index bbaf58fcb..9f0bf9668 100644 --- a/t_replay/test_replay.py +++ b/t_replay/test_replay.py @@ -12,7 +12,7 @@ class TestReplay(tester.TempestaTest): clients = [ - {"id": "tcpreplay", "type": "external", "binary": "tcpreplay", "ssl": True, "cmd_args": ""}, + {"id": "tcpreplay", "type": "external", "binary": "tcpreplay", "ssl": True, "cmd_args": ""} ] backends = [ @@ -34,8 +34,8 @@ class TestReplay(tester.TempestaTest): tempesta = { "config": """ - listen 443 proto=https,h2; - listen 80 proto=http; + listen 192.168.122.100:443 proto=https,h2; + listen 192.168.122.100:80 proto=http; access_log on; @@ -44,6 +44,7 @@ class TestReplay(tester.TempestaTest): tls_certificate ${tempesta_workdir}/tempesta.crt; tls_certificate_key ${tempesta_workdir}/tempesta.key; + tls_match_any_server_name; srv_group h2 { server ${server_ip}:8443; @@ -68,15 +69,14 @@ class TestReplay(tester.TempestaTest): } def test_replay(self) -> None: - self.start_all_servers() - self.start_tempesta() + self.start_all_services(client=False) ETH = sysnet.route_dst_ip(remote.tempesta, tf_cfg.cfg.get("Tempesta", "ip")) tcpreplay = self.get_client("tcpreplay") - tcpreplay.options = [f"-i {ETH} /tmp/tcpdump/replay.pcap"] + tcpreplay.options = [f"-i ens4 /tmp/tcpdump/replay.pcap"] tcpreplay.start() - self.wait_while_busy(tcpreplay) + self.wait_while_busy(tcpreplay, timeout=100) tcpreplay.stop() print(tcpreplay.response_msg) From 1bf6c376d7ee3610c8b8b927d8aa9d2a43956b1a Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Wed, 31 Jul 2024 15:39:49 +0300 Subject: [PATCH 05/12] TMP1 --- helpers/tcpdump.py | 80 +++++++++++++++++++++++++++++++++------------- 1 file changed, 57 insertions(+), 23 deletions(-) diff --git a/helpers/tcpdump.py b/helpers/tcpdump.py index 8a343a1f6..4e8d8101b 100644 --- a/helpers/tcpdump.py +++ b/helpers/tcpdump.py @@ -8,8 +8,28 @@ class Logger: + def __add_ports(self, ports, args): + if ports: + args += ["tcp port"] + args += [f"{ports[0]}"] + + for i in range(1, len(ports)): + args += ["or"] + args += ["tcp port"] + args += [f"{ports[i]}"] + + def __add_ip(self, ip, direction, args): + if ip: + args += [f"{direction}"] + args += [f"{ip[0]}"] + + for i in range(1, len(ip)): + args += ["or"] + args += [f"{direction}"] + args += [f"{ip[i]}"] + def __run_tcpdump( - self, ethname, file_size, file_count, file_name, ports, src, dst, direction + self, ethname, file_size, file_count, file_name, src_ports, dst_ports, src, dst, direction ) -> None: """ Save result in a .pcap file. @@ -36,21 +56,20 @@ def __run_tcpdump( "root", ] - if ports: - args += ["port"] - args += [f"{ports[0]}"] + combine = src_ports and dst_ports + self.__add_ports(src_ports, args) + if combine: + args += ["or"] + self.__add_ports(dst_ports, args) - for i in range(1, len(ports)): - args += ["or"] - args += ["port"] - args += [f"{ports[i]}"] + if (src_ports or dst_ports) and (src or dst): + args += ["or"] - if src: - args += [f"ip src {src}"] - if dst: - if src: - args += ["or"] - args += [f"ip dst {dst}"] + combine = src and dst + self.__add_ip(src, "src", args) + if combine: + args += ["or"] + self.__add_ip(dst, "dst", args) if direction == "in": self.__tcpdump_in = subprocess.Popen( @@ -106,7 +125,8 @@ def run( file_size=None, file_count=None, file_name=None, - ports=None, + src_ports=None, + dst_ports=None, src=None, dst=None, ) -> None: @@ -116,8 +136,12 @@ def run( file_count = file_count if file_count else 10 file_name = file_name if file_name else f"{datetime.datetime.now().strftime('%H:%M:%S')}" - self.__run_tcpdump(ethname, file_size, file_count, file_name, ports, src, dst, "in") - self.__run_tcpdump(ethname, file_size, file_count, file_name, ports, src, dst, "out") + self.__run_tcpdump( + ethname, file_size, file_count, file_name, src_ports, dst_ports, src, dst, "in" + ) + self.__run_tcpdump( + ethname, file_size, file_count, file_name, src_ports, dst_ports, src, dst, "out" + ) time.sleep(exec_time * 60) self.__stop_tcpdump("in") self.__stop_tcpdump("out") @@ -144,21 +168,30 @@ def run( "-n", "--file-name", type=str, help="Dump file name (current time in H:M:S by default)." ) parser.add_argument( - "-p", - "--port", + "-ps", + "--src-port", action="append", type=int, - help="Ports, used in tcpdump filtration (empty by default, dump for all ports).", + help="Source ports, used in tcpdump filtration (empty by default, dump for all ports).", +) +parser.add_argument( + "-pd", + "--dst-port", + action="append", + type=int, + help="Destination ports, used in tcpdump filtration (empty by default, dump for all ports).", ) parser.add_argument( "--src", type=str, - help="Source ip address, used in tcpdump filtration (empty by default, dump for all source ip).", + help="Source ip addresses, used in tcpdump filtration (empty by default, dump for all source ip).", + action="append", ) parser.add_argument( "--dst", type=str, - help="Destination ip address, used in tcpdump filtration (empty by default, dump for all destination ip).", + help="Destination ip addresses, used in tcpdump filtration (empty by default, dump for all destination ip).", + action="append", ) args = parser.parse_args() @@ -170,7 +203,8 @@ def run( file_size=args.file_size, file_count=args.file_count, file_name=args.file_name, - ports=args.port, + src_ports=args.src_port, + dst_ports=args.dst_port, src=args.src, dst=args.dst, ) From caf5f61c13951990c835a65faf35e66eaee111b9 Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Wed, 31 Jul 2024 16:03:53 +0300 Subject: [PATCH 06/12] TMP2 --- helpers/tcpdump.py | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/helpers/tcpdump.py b/helpers/tcpdump.py index 4e8d8101b..637da76bb 100644 --- a/helpers/tcpdump.py +++ b/helpers/tcpdump.py @@ -20,12 +20,12 @@ def __add_ports(self, ports, args): def __add_ip(self, ip, direction, args): if ip: - args += [f"{direction}"] + args += [f"ip {direction}"] args += [f"{ip[0]}"] for i in range(1, len(ip)): args += ["or"] - args += [f"{direction}"] + args += [f"ip {direction}"] args += [f"{ip[i]}"] def __run_tcpdump( @@ -63,7 +63,7 @@ def __run_tcpdump( self.__add_ports(dst_ports, args) if (src_ports or dst_ports) and (src or dst): - args += ["or"] + args += ["and"] combine = src and dst self.__add_ip(src, "src", args) @@ -129,19 +129,27 @@ def run( dst_ports=None, src=None, dst=None, + direction=None, ) -> None: ethname = ethname if ethname else "lo" exec_time = exec_time if exec_time else 60 file_size = file_size if file_size else 50 file_count = file_count if file_count else 10 file_name = file_name if file_name else f"{datetime.datetime.now().strftime('%H:%M:%S')}" + direction = direction if direction else ["in", "out"] + + if "in" in direction: + self.__run_tcpdump( + ethname, file_size, file_count, file_name, src_ports, dst_ports, src, dst, "in" + ) + if "out" in direction: + self.__run_tcpdump( + ethname, file_size, file_count, file_name, src_ports, dst_ports, src, dst, "out" + ) + if (not "in" in direction) and (not "out" in direction): + print("Invalid direction, (in|out) supported") + return - self.__run_tcpdump( - ethname, file_size, file_count, file_name, src_ports, dst_ports, src, dst, "in" - ) - self.__run_tcpdump( - ethname, file_size, file_count, file_name, src_ports, dst_ports, src, dst, "out" - ) time.sleep(exec_time * 60) self.__stop_tcpdump("in") self.__stop_tcpdump("out") @@ -193,6 +201,12 @@ def run( help="Destination ip addresses, used in tcpdump filtration (empty by default, dump for all destination ip).", action="append", ) +parser.add_argument( + "--direction", + type=str, + help="Type of collected traffic(in|put)", + action="append", +) args = parser.parse_args() @@ -207,4 +221,5 @@ def run( dst_ports=args.dst_port, src=args.src, dst=args.dst, + direction=args.direction, ) From 6a57093cdcf779d6f9f9152aad1b44142b3b3fa5 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 9 Aug 2024 17:51:58 +0400 Subject: [PATCH 07/12] fix `apply_proto_settings` method for https/1.1 client. Deproxy must set the alpn protocol for context to correct work with `proto=h2,https` config in TempestaFW. --- helpers/deproxy.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/helpers/deproxy.py b/helpers/deproxy.py index 4ac81f675..9425c81da 100644 --- a/helpers/deproxy.py +++ b/helpers/deproxy.py @@ -965,6 +965,8 @@ def apply_proto_settings(self): # RFC 9113 Section 9.2.1: A deployment of HTTP/2 over TLS 1.2 MUST disable # compression. self.context.options |= ssl.OP_NO_COMPRESSION + elif self.proto == "http/1.1": + self.context.set_alpn_protocols(["http/1.1"]) class Client(TlsClient, stateful.Stateful): From 3f0fbdea9395ebddbbf0fe2c226bd655527cb7a0 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 9 Aug 2024 17:57:19 +0400 Subject: [PATCH 08/12] typo fix --- framework/deproxy_client.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/framework/deproxy_client.py b/framework/deproxy_client.py index 61b1932c5..4135a8fa4 100644 --- a/framework/deproxy_client.py +++ b/framework/deproxy_client.py @@ -503,7 +503,7 @@ def update_initial_settings( self, header_table_size: int = None, enable_push: int = None, - max_concurrent_stream: int = None, + max_concurrent_streams: int = None, initial_window_size: int = None, max_frame_size: int = None, max_header_list_size: int = None, @@ -515,7 +515,7 @@ def update_initial_settings( new_settings = self.__generate_new_settings( header_table_size, enable_push, - max_concurrent_stream, + max_concurrent_streams, initial_window_size, max_frame_size, max_header_list_size, @@ -532,7 +532,7 @@ def send_settings_frame( self, header_table_size: int = None, enable_push: int = None, - max_concurrent_stream: int = None, + max_concurrent_streams: int = None, initial_window_size: int = None, max_frame_size: int = None, max_header_list_size: int = None, @@ -542,7 +542,7 @@ def send_settings_frame( new_settings = self.__generate_new_settings( header_table_size, enable_push, - max_concurrent_stream, + max_concurrent_streams, initial_window_size, max_frame_size, max_header_list_size, @@ -733,7 +733,7 @@ def __binary_headers_to_string(headers): def __generate_new_settings( header_table_size: int = None, enable_push: int = None, - max_concurrent_stream: int = None, + max_concurrent_streams: int = None, initial_window_size: int = None, max_frame_size: int = None, max_header_list_size: int = None, @@ -742,9 +742,9 @@ def __generate_new_settings( if header_table_size is not None: new_settings[SettingCodes.HEADER_TABLE_SIZE] = header_table_size if enable_push is not None: - new_settings[SettingCodes.ENABLE_PUSH] = header_table_size - if max_concurrent_stream is not None: - new_settings[SettingCodes.MAX_CONCURRENT_STREAMS] = max_concurrent_stream + new_settings[SettingCodes.ENABLE_PUSH] = enable_push + if max_concurrent_streams is not None: + new_settings[SettingCodes.MAX_CONCURRENT_STREAMS] = max_concurrent_streams if initial_window_size is not None: new_settings[SettingCodes.INITIAL_WINDOW_SIZE] = initial_window_size if max_frame_size is not None: From f6ea6b9e35594f21f6d861bfe89c037daf214abc Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 12 Aug 2024 12:23:40 +0400 Subject: [PATCH 09/12] add TCP reader for extracting http messages from .pcap files. --- README.md | 2 +- helpers/tcpreplay.py | 220 +++++++++++++++++++++++++++++++++++++++++++ setup.sh | 2 +- 3 files changed, 222 insertions(+), 2 deletions(-) create mode 100644 helpers/tcpreplay.py diff --git a/README.md b/README.md index 17b523a3c..27050cb48 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ as root: ## Requirements - Host for testing framework: `python3`, `wrk`, `ab`, `nghttp2`, `h2spec`, -`curl`, `h2load`, `tls-perf`, `netstat`, `lxc`, `nginx`, web content +`curl`, `h2load`, `tls-perf`, `netstat`, `lxc`, `nginx`, `tshark`, web content directory accessible by nginx, nginx should not be running before the tests start. See Python libraries in `requirements.txt` - All hosts except previous one: `sftp-server` diff --git a/helpers/tcpreplay.py b/helpers/tcpreplay.py new file mode 100644 index 000000000..0e3e75b21 --- /dev/null +++ b/helpers/tcpreplay.py @@ -0,0 +1,220 @@ +__author__ = "Tempesta Technologies, Inc." +__copyright__ = "Copyright (C) 2024 Tempesta Technologies, Inc." +__license__ = "GPL2" + + +import json +import os +import subprocess as sp +from collections import defaultdict +from dataclasses import dataclass + + +@dataclass +class HttpRequest: + method: str + uri: str + version: str + headers: list[tuple[str, str]] + body: str | None + + +@dataclass +class HeadersFrame: + stream_id: int + headers: list[tuple[str, str]] + flags: str + + +@dataclass +class DataFrame: + stream_id: int + body: str + flags: str + + +@dataclass +class SettingsFrame: + header_table_size: int | None + enable_push: int | None + initial_window_size: int | None + max_frame_size: int | None + max_concurrent_streams: int | None + max_header_list_size: int | None + + +class HttpReader: + """Read tcp packets from .pcap files and prepare HTTP requests.""" + + def __init__( + self, + file_names: list[str], + tempesta_tls_ports: tuple[str] = ("443",), + tempesta_http_ports: tuple[str] = ("80",), + output_suffix: str = "", + home_dir: str = "", + ): + self.__file_names: list[str] = file_names + self.__tempesta_tls_ports: tuple[str] = tempesta_tls_ports + self.__tempesta_https_ports: tuple[str] = tempesta_http_ports + self.__output_file: str = f"{home_dir}output{output_suffix}.json" + self.__http2_file: str = f"{home_dir}http2_requests{output_suffix}.json" + self.__https_file: str = f"{home_dir}https_requests{output_suffix}.json" + self.__http_file: str = f"{home_dir}http_requests{output_suffix}.json" + self.http2_requests: dict = defaultdict(dict) + self.https_requests: dict = defaultdict(dict) + self.http_requests: dict = defaultdict(dict) + self.__remove_old_files() + self.__extract_http_and_http2_packets() + + def prepare_http_messages(self) -> None: + """Prepare h2, https, http requests for sending or saving to files.""" + with open(self.__output_file, "rb") as file: + for line in file: + packet = json.loads(line) + if packet.get("index") is not None: + continue + + layers: dict = packet["layers"] + con_id: str = f"{layers['ip']['ip_ip_src']}:{layers['tcp']['tcp_tcp_srcport']}" + dstport: str = layers["tcp"]["tcp_tcp_dstport"] + if dstport in self.__tempesta_tls_ports and layers.get("http2") is not None: + self._process_http2_request(packet, con_id) + elif dstport in self.__tempesta_tls_ports and layers.get("http") is not None: + self._process_http_request(packet, self.https_requests, con_id) + elif dstport in self.__tempesta_https_ports and layers.get("http") is not None: + self._process_http_request(packet, self.http_requests, con_id) + + def save_to_files(self) -> None: + """Save completed messages to separate json files.""" + for messages, name in zip( + [self.http2_requests, self.https_requests, self.http_requests], + [self.__http2_file, self.__https_file, self.__http_file], + ): + with open(name, "w") as file: + json.dump(messages, file, indent=2) + + def __remove_old_files(self) -> None: + """Remove old files if they exist""" + for file in [ + self.__output_file, + self.__http2_file, + self.__https_file, + self.__http_file, + ]: + try: + os.remove(file) + except FileNotFoundError: + pass + + def __extract_http_and_http2_packets(self) -> None: + """Extract decrypted http and http2 messages from .pcap files""" + for name in self.__file_names: + sp.run(f'tshark -r {name} -T ek -Y "http2 or http" >> {self.__output_file}', shell=True) + + @staticmethod + def __get_segments(packet: dict, proto: str) -> list[dict]: + """ + Some TCP segment may contain some h2 frames. + For example - TempestaFW return 2 DATA frames in one TCP frame.""" + segments = packet["layers"][proto] + return segments if type(segments) is list else [segments] + + @staticmethod + def __prepare_field(field: list[str] | str) -> list[str]: + """As for `__get_segments` method.""" + return field if type(field) is list else field + + def _process_http2_request(self, packet: dict, con_id: str) -> None: + for frame in self.__get_segments(packet, "http2"): + if not frame: + continue + + frame_types = self.__prepare_field(frame["http2_http2_type"]) + stream_ids = self.__prepare_field(frame["http2_http2_streamid"]) + flags = self.__prepare_field(frame["http2_http2_flags"]) + + if self.http2_requests.get(con_id) is None: + self.http2_requests[con_id] = list() + + for type_, stream_id, frame_flags in zip(frame_types, stream_ids, flags): + if type_ == "4": + # SETTINGS frame + settings = {} + for s in [ + "http2_http2_settings_header_table_size", + "http2_http2_settings_enable_push", + "http2_http2_settings_initial_window_size", + "http2_http2_settings_max_frame_size", + "http2_http2_settings_max_concurrent_streams", + "http2_http2_settings_max_header_list_size", + ]: + if frame.get(s) is not None: + settings[s.replace("http2_http2_settings_", "")] = int(frame.get(s)) + if settings: + self.http2_requests[con_id].append(SettingsFrame(**settings)) + + elif type_ == "1": + # HEADERS frame + self.http2_requests[con_id].append( + HeadersFrame( + stream_id=int(stream_id), + headers=[ + (h_name, h_value) + for h_name, h_value in zip( + frame["http2_http2_header_name"], + frame["http2_http2_header_value"], + ) + ], + flags=frame_flags, + ) + ) + + elif type_ == "0": + # DATA frame + if frame.get("http2_http2_body_reassembled_data") is not None: + body = frame["http2_http2_body_reassembled_data"] + else: + body = frame["http2_http2_data_data"] + self.http2_requests[con_id].append( + DataFrame( + stream_id=int(stream_id), + body=bytes.fromhex(body.replace(":", "")).decode(), + flags=frame_flags, + ) + ) + elif type_ == "9": + # CONTINUATION frame + for f in self.http2_requests[con_id]: + if type(f) is HeadersFrame and f.stream_id == int(stream_id): + f.headers.append( + ( + frame["http2_http2_header_name"], + frame["http2_http2_header_value"], + ) + ) + + def _process_http_request(self, packet: dict, requests_dict: dict, con_id: str) -> None: + for segment in self.__get_segments(packet, "http"): + if not segment: + continue + + if segment.get("data") is not None: + body = bytes.fromhex(segment["data"]["data_data_data"].replace(":", "")).decode() + else: + body = "" + + request = HttpRequest( + method=segment["http_http_request_method"], + uri=segment["http_http_request_uri"], + version=segment["http_http_request_version"], + headers=[ + tuple(h.rstrip("\r\n").split(": ")) for h in segment["http_http_request_line"] + ], + body=body, + ) + + if self.https_requests.get(con_id): + requests_dict[con_id].append(request) + else: + requests_dict[con_id] = [request] diff --git a/setup.sh b/setup.sh index 2c5a341a3..9bb290bda 100755 --- a/setup.sh +++ b/setup.sh @@ -5,7 +5,7 @@ CURRENT_DIR=$(pwd) apt install python3-pip nginx libnginx-mod-http-echo libtool net-tools libssl-dev \ apache2-utils nghttp2-client libnghttp2-dev autoconf unzip libtemplate-perl \ - tcpdump lxc -y + tcpdump lxc tshark -y # stop and disable installed nginx systemctl stop nginx From 8df84494e19cc177cb4bbadf46e58ff10332aac6 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 12 Aug 2024 16:18:23 +0400 Subject: [PATCH 10/12] add a test for repeating traffic --- t_replay/test_replay.py | 220 ++++++++++++++++++++++++++++++++-------- 1 file changed, 175 insertions(+), 45 deletions(-) diff --git a/t_replay/test_replay.py b/t_replay/test_replay.py index 9f0bf9668..c7d594a5f 100644 --- a/t_replay/test_replay.py +++ b/t_replay/test_replay.py @@ -1,43 +1,81 @@ -""" -Test TempestaFW reeboot under load. -""" - __author__ = "Tempesta Technologies, Inc." -__copyright__ = "Copyright (C) 2017-2024 Tempesta Technologies, Inc." +__copyright__ = "Copyright (C) 2024 Tempesta Technologies, Inc." __license__ = "GPL2" -from framework import tester -from helpers import remote, sysnet, tf_cfg + +from threading import Thread + +from framework import deproxy_client, tester +from helpers.tcpreplay import DataFrame, HeadersFrame, HttpReader, SettingsFrame + +NGINX_CONFIG = """ +pid ${pid}; +worker_processes auto; + +events { + worker_connections 1024; + use epoll; +} + +http { + keepalive_timeout ${server_keepalive_timeout}; + keepalive_requests ${server_keepalive_requests}; + sendfile on; + tcp_nopush on; + tcp_nodelay on; + + open_file_cache max=1000; + open_file_cache_valid 30s; + open_file_cache_min_uses 2; + open_file_cache_errors off; + client_max_body_size 10000M; + + # [ debug | info | notice | warn | error | crit | alert | emerg ] + # Fully disable log errors. + error_log /dev/null emerg; + + # Disable access log altogether. + access_log off; + + server { + listen ${server_ip}:8443; + + location / { + return 200; + } + location /nginx_status { + stub_status on; + } + } +} +""" class TestReplay(tester.TempestaTest): - clients = [ - {"id": "tcpreplay", "type": "external", "binary": "tcpreplay", "ssl": True, "cmd_args": ""} - ] + + h2_https_port = "443" + http_port = "80" + server_port = "8000" + + clients = [] backends = [ { - "id": "deproxy", - "type": "deproxy", - "port": "8080", - "response": "static", - "response_content": ("HTTP/1.1 200 OK\r\n" "Content-length: 0\r\n" "\r\n"), - }, - { - "id": "deproxy_h2", - "type": "deproxy", + "id": "nginx", + "type": "nginx", "port": "8443", - "response": "static", - "response_content": ("HTTP/1.1 200 OK\r\n" "Content-length: 0\r\n" "\r\n"), + "status_uri": "http://${server_ip}:8443/nginx_status", + "config": NGINX_CONFIG, }, ] tempesta = { "config": """ - listen 192.168.122.100:443 proto=https,h2; - listen 192.168.122.100:80 proto=http; + listen ${tempesta_ip}:443 proto=h2,https; + listen ${tempesta_ip}:80 proto=http; access_log on; + client_tbl_size 134217728; block_action attack reply; block_action error reply; @@ -46,40 +84,132 @@ class TestReplay(tester.TempestaTest): tls_certificate_key ${tempesta_workdir}/tempesta.key; tls_match_any_server_name; - srv_group h2 { + srv_group main { server ${server_ip}:8443; } - srv_group http { - server ${server_ip}:8080; - } - - vhost h2 { - proxy_pass h2; - } - vhost http { - proxy_pass http; + vhost main { + proxy_pass main; } http_chain { - mark == 1 -> http; - ->h2; + -> main; } """ } - def test_replay(self) -> None: - self.start_all_services(client=False) + def setUp(self): + self.http_reader = HttpReader( + file_names=[ + "/mnt/other/tempesta-test/tcpdump/" + "selftests.test_deproxy.DeproxyTestH2.test_make_request.pcap" + ], + output_suffix="-test", + ) + self.http_reader.prepare_http_messages() + client_n = 0 + for _ in self.http_reader.http2_requests.keys(): + self.clients.append( + { + "id": f"h2-{client_n}", + "type": "deproxy_h2", + "addr": "${tempesta_ip}", + "port": str(self.h2_https_port), + "ssl": True, + "ssl_hostname": "main", + } + ) + client_n += 1 + + client_n = 0 + for _ in self.http_reader.https_requests.keys(): + self.clients.append( + { + "id": f"https-{client_n}", + "type": "deproxy", + "addr": "${tempesta_ip}", + "port": str(self.h2_https_port), + "ssl": True, + "ssl_hostname": "main", + } + ) + client_n += 1 + + client_n = 0 + for _ in self.http_reader.http_requests.keys(): + self.clients.append( + { + "id": f"http-{client_n}", + "type": "deproxy", + "addr": "${tempesta_ip}", + "port": str(self.http_port), + "ssl": False, + } + ) + client_n += 1 + + super().setUp() + + def send_http_requests(self, http_clients: list) -> None: + for client, request_list in zip(http_clients, self.http_reader.http_requests.values()): + for request in request_list: + client.make_request(client.create_request(authority=None, **request.__dict__)) + + for client in http_clients: + self.assertTrue(client.wait_for_response()) + + def send_https_requests(self, https_clients: list) -> None: + for client, request_list in zip(https_clients, self.http_reader.https_requests.values()): + for request in request_list: + client.make_request(client.create_request(authority=None, **request.__dict__)) + + for client in https_clients: + self.assertTrue(client.wait_for_response()) + + def send_h2_requests(self, h2_clients: list) -> None: + for client, frames in zip(h2_clients, self.http_reader.http2_requests.values()): + client: deproxy_client.DeproxyClientH2 + client.send_bytes(client.h2_connection.data_to_send()) + client.wait_for_ack_settings() + + for frame in frames: + frame_type = type(frame) + if frame_type is SettingsFrame: + frame: SettingsFrame + client.send_settings_frame( + **{k: v for k, v in frame.__dict__.items() if v is not None} + ) + elif frame_type is HeadersFrame: + frame: HeadersFrame + end_stream = True if frame.flags in ["0x05"] else False + client.stream_id = frame.stream_id + client.make_request(frame.headers, end_stream) + elif frame_type is DataFrame: + frame: DataFrame + client.stream_id = frame.stream_id + end_stream = True if frame.flags in ["0x01", "0x05"] else False + client.make_request(frame.body, end_stream) + + for client in h2_clients: + self.assertTrue(client.wait_for_response()) - ETH = sysnet.route_dst_ip(remote.tempesta, tf_cfg.cfg.get("Tempesta", "ip")) - tcpreplay = self.get_client("tcpreplay") - tcpreplay.options = [f"-i ens4 /tmp/tcpdump/replay.pcap"] + def test_replay(self) -> None: + self.start_all_services(client=True) - tcpreplay.start() - self.wait_while_busy(tcpreplay, timeout=100) - tcpreplay.stop() + h2_clients = [client for client in self.get_clients() if client.proto == "h2"] + https_clients = [ + client for client in self.get_clients() if client.proto == "http/1.1" and client.ssl + ] + http_clients = [ + client for client in self.get_clients() if client.proto == "http/1.1" and not client.ssl + ] - print(tcpreplay.response_msg) + t_h2 = Thread(target=self.send_h2_requests, args=(h2_clients,)) + t_https = Thread(target=self.send_https_requests, args=(https_clients,)) + t_http = Thread(target=self.send_http_requests, args=(http_clients,)) + for t in [t_h2, t_https, t_http]: + t.start() -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 + for t in [t_h2, t_https, t_http]: + t.join() From c2bf752324c36e5d2c02f15aef1d9040460379f1 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 16 Aug 2024 16:33:07 +0400 Subject: [PATCH 11/12] deproxy_client.py: Now DeproxyClientH2 can send body as bytes. This is necessary to bypass unnecessary decoding of the body received in as bytes. tcpreplay.py: HttpReader is reworked. Now it can work with the captured files from tcpdump and tshark --- framework/deproxy_client.py | 14 +++++--- helpers/tcpreplay.py | 72 ++++++++++++++++++++++--------------- t_replay/test_replay.py | 38 ++++++-------------- 3 files changed, 63 insertions(+), 61 deletions(-) diff --git a/framework/deproxy_client.py b/framework/deproxy_client.py index 4135a8fa4..02129aa62 100644 --- a/framework/deproxy_client.py +++ b/framework/deproxy_client.py @@ -217,7 +217,7 @@ def send_request(self, request, expected_status_code: Optional[str] = None, time ) def send_bytes(self, data: bytes, expect_response=False): - self._add_to_request_buffers(data=data, end_stream=None) + self._add_to_request_buffers(data=data, end_stream=None, is_body=False) self.nrreq += 1 if expect_response: self.valid_req_num += 1 @@ -438,7 +438,7 @@ def make_requests(self, requests, huffman=True, *args, **kwargs): def make_request( self, - request: Union[tuple, list, str, deproxy.H2Request], + request: Union[tuple, list, str, bytes, deproxy.H2Request], end_stream=True, priority_weight=None, priority_depends_on=None, @@ -449,7 +449,7 @@ def make_request( Add request to buffers and change counters. Args: request: - str - send data frame; + str or bytes - send data frame; list - send headers frame; tuple - send headers and data frame in one TCP-packet; end_stream (bool) - set END_STREAM flag for frame; @@ -470,6 +470,7 @@ def make_request( priority_weight=priority_weight, priority_depends_on=priority_depends_on, priority_exclusive=priority_exclusive, + is_body=True if isinstance(request, bytes) else False, ) self.nrreq += 1 @@ -792,11 +793,16 @@ def _add_to_request_buffers( priority_weight=None, priority_depends_on=None, priority_exclusive=None, + is_body: bool, ) -> None: - if isinstance(data, bytes): + if isinstance(data, bytes) and not is_body: # in case when you use `send_bytes` method self._request_buffers.append(data) self._add_to_body_buffers(body=None, stream_id=None, end_stream=None) + elif isinstance(data, bytes) and is_body: + # in case when you use `mak_request` method + self._request_buffers.append(b"") + self._add_to_body_buffers(body=data, stream_id=self.stream_id, end_stream=end_stream) elif isinstance(data, str): # in case when you use `make_request` to sending body self._request_buffers.append(b"") diff --git a/helpers/tcpreplay.py b/helpers/tcpreplay.py index 0e3e75b21..494396ac8 100644 --- a/helpers/tcpreplay.py +++ b/helpers/tcpreplay.py @@ -29,18 +29,18 @@ class HeadersFrame: @dataclass class DataFrame: stream_id: int - body: str + body: bytes flags: str @dataclass class SettingsFrame: - header_table_size: int | None - enable_push: int | None - initial_window_size: int | None - max_frame_size: int | None - max_concurrent_streams: int | None - max_header_list_size: int | None + header_table_size: int | None = None + enable_push: int | None = None + initial_window_size: int | None = None + max_frame_size: int | None = None + max_concurrent_streams: int | None = None + max_header_list_size: int | None = None class HttpReader: @@ -48,13 +48,19 @@ class HttpReader: def __init__( self, - file_names: list[str], + # Example command: tcpdump -U -i any host tempesta_ip -w file_path.pcap + tcpdump_files: list[str] | None = None, + # Example command: tshark -i any -T ek -J "tcp http2" -Y "http2" >> output + tshark_files: list[str] | None = None, tempesta_tls_ports: tuple[str] = ("443",), tempesta_http_ports: tuple[str] = ("80",), output_suffix: str = "", home_dir: str = "", ): - self.__file_names: list[str] = file_names + if tcpdump_files is None and tshark_files is None: + raise AttributeError("You must set `tcpdump_files` or `tshark_files` args, or both.") + self.__tcpdump_files: list[str] = tcpdump_files + self.__tshark_files: list[str] = tshark_files self.__tempesta_tls_ports: tuple[str] = tempesta_tls_ports self.__tempesta_https_ports: tuple[str] = tempesta_http_ports self.__output_file: str = f"{home_dir}output{output_suffix}.json" @@ -65,25 +71,33 @@ def __init__( self.https_requests: dict = defaultdict(dict) self.http_requests: dict = defaultdict(dict) self.__remove_old_files() - self.__extract_http_and_http2_packets() + if self.__tcpdump_files: + self.__extract_http_and_http2_packets() def prepare_http_messages(self) -> None: """Prepare h2, https, http requests for sending or saving to files.""" - with open(self.__output_file, "rb") as file: - for line in file: - packet = json.loads(line) - if packet.get("index") is not None: - continue - - layers: dict = packet["layers"] - con_id: str = f"{layers['ip']['ip_ip_src']}:{layers['tcp']['tcp_tcp_srcport']}" - dstport: str = layers["tcp"]["tcp_tcp_dstport"] - if dstport in self.__tempesta_tls_ports and layers.get("http2") is not None: - self._process_http2_request(packet, con_id) - elif dstport in self.__tempesta_tls_ports and layers.get("http") is not None: - self._process_http_request(packet, self.https_requests, con_id) - elif dstport in self.__tempesta_https_ports and layers.get("http") is not None: - self._process_http_request(packet, self.http_requests, con_id) + files = self.__tshark_files + if os.path.exists(self.__output_file): + files.append(self.__output_file) + + for file_name in files: + with open(file_name, "rb") as file: + for line in file: + packet = json.loads(line.decode(encoding="utf-8", errors="replace")) + if packet.get("index") is not None: + continue + + layers: dict = packet["layers"] + con_id: str = ( + f"{layers['tcp']['tcp_tcp_stream']}:{layers['tcp']['tcp_tcp_srcport']}" + ) + dstport: str = layers["tcp"]["tcp_tcp_dstport"] + if dstport in self.__tempesta_tls_ports and layers.get("http2") is not None: + self._process_http2_request(packet, con_id) + elif dstport in self.__tempesta_tls_ports and layers.get("http") is not None: + self._process_http_request(packet, self.https_requests, con_id) + elif dstport in self.__tempesta_https_ports and layers.get("http") is not None: + self._process_http_request(packet, self.http_requests, con_id) def save_to_files(self) -> None: """Save completed messages to separate json files.""" @@ -109,7 +123,7 @@ def __remove_old_files(self) -> None: def __extract_http_and_http2_packets(self) -> None: """Extract decrypted http and http2 messages from .pcap files""" - for name in self.__file_names: + for name in self.__tcpdump_files: sp.run(f'tshark -r {name} -T ek -Y "http2 or http" >> {self.__output_file}', shell=True) @staticmethod @@ -123,11 +137,11 @@ def __get_segments(packet: dict, proto: str) -> list[dict]: @staticmethod def __prepare_field(field: list[str] | str) -> list[str]: """As for `__get_segments` method.""" - return field if type(field) is list else field + return field if type(field) is list else [field] def _process_http2_request(self, packet: dict, con_id: str) -> None: for frame in self.__get_segments(packet, "http2"): - if not frame: + if not frame or frame.get("http2_http2_magic") is not None: continue frame_types = self.__prepare_field(frame["http2_http2_type"]) @@ -179,7 +193,7 @@ def _process_http2_request(self, packet: dict, con_id: str) -> None: self.http2_requests[con_id].append( DataFrame( stream_id=int(stream_id), - body=bytes.fromhex(body.replace(":", "")).decode(), + body=bytes.fromhex(body.replace(":", "")), flags=frame_flags, ) ) diff --git a/t_replay/test_replay.py b/t_replay/test_replay.py index c7d594a5f..3185b0c35 100644 --- a/t_replay/test_replay.py +++ b/t_replay/test_replay.py @@ -2,9 +2,6 @@ __copyright__ = "Copyright (C) 2024 Tempesta Technologies, Inc." __license__ = "GPL2" - -from threading import Thread - from framework import deproxy_client, tester from helpers.tcpreplay import DataFrame, HeadersFrame, HttpReader, SettingsFrame @@ -83,6 +80,7 @@ class TestReplay(tester.TempestaTest): tls_certificate ${tempesta_workdir}/tempesta.crt; tls_certificate_key ${tempesta_workdir}/tempesta.key; tls_match_any_server_name; + max_concurrent_streams 1000; srv_group main { server ${server_ip}:8443; @@ -99,13 +97,7 @@ class TestReplay(tester.TempestaTest): } def setUp(self): - self.http_reader = HttpReader( - file_names=[ - "/mnt/other/tempesta-test/tcpdump/" - "selftests.test_deproxy.DeproxyTestH2.test_make_request.pcap" - ], - output_suffix="-test", - ) + self.http_reader = HttpReader(tshark_files=["tcpdump/output0.txt"]) self.http_reader.prepare_http_messages() client_n = 0 for _ in self.http_reader.http2_requests.keys(): @@ -169,6 +161,7 @@ def send_https_requests(self, https_clients: list) -> None: def send_h2_requests(self, h2_clients: list) -> None: for client, frames in zip(h2_clients, self.http_reader.http2_requests.values()): client: deproxy_client.DeproxyClientH2 + client.start() client.send_bytes(client.h2_connection.data_to_send()) client.wait_for_ack_settings() @@ -191,25 +184,14 @@ def send_h2_requests(self, h2_clients: list) -> None: client.make_request(frame.body, end_stream) for client in h2_clients: - self.assertTrue(client.wait_for_response()) + client.wait_for_response() def test_replay(self) -> None: - self.start_all_services(client=True) + self.start_all_services(client=False) h2_clients = [client for client in self.get_clients() if client.proto == "h2"] - https_clients = [ - client for client in self.get_clients() if client.proto == "http/1.1" and client.ssl - ] - http_clients = [ - client for client in self.get_clients() if client.proto == "http/1.1" and not client.ssl - ] - - t_h2 = Thread(target=self.send_h2_requests, args=(h2_clients,)) - t_https = Thread(target=self.send_https_requests, args=(https_clients,)) - t_http = Thread(target=self.send_http_requests, args=(http_clients,)) - - for t in [t_h2, t_https, t_http]: - t.start() - - for t in [t_h2, t_https, t_http]: - t.join() + self.send_h2_requests(h2_clients) + + tempesta = self.get_tempesta() + tempesta.get_stats() + print(tempesta.stats.__dict__) From accfff8829cb72f511fa8b938e0d6098ecfa42bb Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 16 Aug 2024 16:52:13 +0400 Subject: [PATCH 12/12] move scripts to a separate directory. --- {t_replay => scripts}/__init__.py | 0 {helpers => scripts}/tcpdump.py | 8 ++++---- {helpers => scripts}/tcpreplay.py | 0 {t_replay => scripts}/test_replay.py | 2 +- tests_disabled.json | 2 +- tests_disabled_remote.json | 2 +- tests_disabled_tcpseg.json | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) rename {t_replay => scripts}/__init__.py (100%) rename {helpers => scripts}/tcpdump.py (100%) rename {helpers => scripts}/tcpreplay.py (100%) rename {t_replay => scripts}/test_replay.py (98%) diff --git a/t_replay/__init__.py b/scripts/__init__.py similarity index 100% rename from t_replay/__init__.py rename to scripts/__init__.py diff --git a/helpers/tcpdump.py b/scripts/tcpdump.py similarity index 100% rename from helpers/tcpdump.py rename to scripts/tcpdump.py index 637da76bb..572f3a4b0 100644 --- a/helpers/tcpdump.py +++ b/scripts/tcpdump.py @@ -1,10 +1,10 @@ +import argparse import datetime import os -import time -import subprocess -import signal -import argparse import shutil +import signal +import subprocess +import time class Logger: diff --git a/helpers/tcpreplay.py b/scripts/tcpreplay.py similarity index 100% rename from helpers/tcpreplay.py rename to scripts/tcpreplay.py diff --git a/t_replay/test_replay.py b/scripts/test_replay.py similarity index 98% rename from t_replay/test_replay.py rename to scripts/test_replay.py index 3185b0c35..bd949f796 100644 --- a/t_replay/test_replay.py +++ b/scripts/test_replay.py @@ -3,7 +3,7 @@ __license__ = "GPL2" from framework import deproxy_client, tester -from helpers.tcpreplay import DataFrame, HeadersFrame, HttpReader, SettingsFrame +from scripts.tcpreplay import DataFrame, HeadersFrame, HttpReader, SettingsFrame NGINX_CONFIG = """ pid ${pid}; diff --git a/tests_disabled.json b/tests_disabled.json index 1415900f3..e9c532804 100644 --- a/tests_disabled.json +++ b/tests_disabled.json @@ -2,7 +2,7 @@ "disable" : true, "disabled" : [ { - "name": "t_replay", + "name": "scripts", "reason": "These tests should not be run on CI. Run it only to reproduce bugs on production" }, { diff --git a/tests_disabled_remote.json b/tests_disabled_remote.json index 349bfd39a..cacf9e75c 100644 --- a/tests_disabled_remote.json +++ b/tests_disabled_remote.json @@ -2,7 +2,7 @@ "disable" : true, "disabled" : [ { - "name": "t_replay", + "name": "scripts", "reason": "These tests should not be run on CI. Run it only to reproduce bugs on production" }, { diff --git a/tests_disabled_tcpseg.json b/tests_disabled_tcpseg.json index 21f28672a..f39ad7871 100644 --- a/tests_disabled_tcpseg.json +++ b/tests_disabled_tcpseg.json @@ -2,7 +2,7 @@ "disable" : true, "disabled" : [ { - "name": "t_replay", + "name": "scripts", "reason": "These tests should not be run on CI. Run it only to reproduce bugs on production" }, {