diff --git a/framework/deproxy_client.py b/framework/deproxy_client.py index c5a647498..244c6cfbc 100644 --- a/framework/deproxy_client.py +++ b/framework/deproxy_client.py @@ -51,6 +51,7 @@ def __init__( conn_addr: Optional[str], is_ssl: bool, server_hostname: str, + is_http2: bool = None, ): # Initialize the `BaseDeproxy` super().__init__( @@ -64,7 +65,11 @@ def __init__( ) self.ssl = is_ssl - self._is_http2 = isinstance(self, DeproxyClientH2) + self._is_http2 = is_http2 + + if self._is_http2 is None: + self._is_http2 = isinstance(self, DeproxyClientH2) + self._create_context() self.server_hostname = server_hostname @@ -459,6 +464,12 @@ class ReqBodyBuffer: class DeproxyClientH2(BaseDeproxyClient): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.h2_connection: Optional[h2.connection.H2Connection] = None + self.encoder = None + @property def ping_received(self) -> int: return self._ping_received diff --git a/framework/deproxy_server.py b/framework/deproxy_server.py index d6ce52593..2ed80863a 100644 --- a/framework/deproxy_server.py +++ b/framework/deproxy_server.py @@ -5,7 +5,6 @@ import time from typing import Optional -import run_config from framework import stateful from framework.deproxy_base import BaseDeproxy from helpers import deproxy, error, tempesta, tf_cfg, util @@ -36,6 +35,7 @@ def __init__(self, *, server: "StaticDeproxyServer", sock: socket.socket): self._cur_responses_list = [] self.__time_to_send: float = 0 self.__new_response: bool = True + self.last_request: Optional[deproxy.Request] = None self.nrreq: int = 0 self._tcp_logger.debug("New server connection") @@ -91,7 +91,7 @@ def handle_read(self): while self._request_buffer: try: - request = deproxy.Request(self._request_buffer) + self.last_request = deproxy.Request(self._request_buffer) self.nrreq += 1 except deproxy.IncompleteMessage: return None @@ -102,8 +102,8 @@ def handle_read(self): return None self._http_logger.info("Receive request") - self._http_logger.debug(request) - response, need_close = self._server.receive_request(request) + self._http_logger.debug(self.last_request) + response, need_close = self._server.receive_request(self.last_request) if self._server.drop_conn_when_request_received: self.handle_close() if response: @@ -116,18 +116,21 @@ def handle_read(self): if need_close: self.close() - self._request_buffer = self._request_buffer[len(request.msg) :] + self._request_buffer = self._request_buffer[len(self.last_request.msg) :] # Handler will be called even if buffer is empty. else: return None + def send_data(self, request: deproxy.Request, data: bytes) -> int: + return self.socket.send(data) + def handle_write(self): if self._server.delay_before_sending_response and self.__new_response: self.__new_response = False return self.sleep() resp = self._response_buffer[self._responses_done] - sent = self.socket.send(resp[: self._server.segment_size]) + sent = self.send_data(self.last_request, resp[: self._server.segment_size]) if sent < 0: return @@ -188,19 +191,27 @@ def _reinit_variables(self): self._connections: list[ServerConnection] = list() self._requests: list[deproxy.Request] = list() + def create_new_connection(self, sock): + return ServerConnection(server=self, sock=sock) + def handle_accept(self): pair = self.accept() - if pair is not None: - sock, _ = pair - if self.segment_size: - sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - handler = ServerConnection(server=self, sock=sock) - self._connections.append(handler) - # ATTENTION - # Due to the polling cycle, creating new connection can be - # performed before removing old connection. - # So we can have case with > expected amount of connections - # It's not a error case, it's a problem of polling + + if pair is None: + return + + sock, _ = pair + + if self.segment_size: + sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + + handler = self.create_new_connection(sock=sock) + self._connections.append(handler) + # ATTENTION + # Due to the polling cycle, creating new connection can be + # performed before removing old connection. + # So we can have case with > expected amount of connections + # It's not a error case, it's a problem of polling def handle_error(self): type_, v, _ = sys.exc_info() diff --git a/framework/nginx_server.py b/framework/nginx_server.py index f110de68f..160f5108a 100644 --- a/framework/nginx_server.py +++ b/framework/nginx_server.py @@ -34,7 +34,6 @@ def __init__(self, id_, props): self.conns_n = tempesta.server_conns_default() self.active_conns = 0 self.requests = 0 - self.name = id_ self.status_uri = fill_template(props["status_uri"], props) self.stop_procedures = [self.stop_nginx, self.remove_config] self.weight = int(props["weight"]) if "weight" in props else None @@ -42,9 +41,6 @@ def __init__(self, id_, props): self.clear_stats() - def get_name(self): - return self.name - def clear_stats(self): self.active_conns = 0 self.requests = 0 diff --git a/framework/stateful.py b/framework/stateful.py index d77072057..11ee16fe2 100644 --- a/framework/stateful.py +++ b/framework/stateful.py @@ -22,6 +22,7 @@ class Stateful(abc.ABC): def __init__(self, *, id_: str): self._state = STATE_STOPPED self.stop_procedures = [] + self.id_ = id_ self._exceptions = [] self._generate_service_id(id_) self._logger = logging.LoggerAdapter( @@ -34,6 +35,12 @@ def _generate_service_id(self, id_: str) -> None: def __str__(self): return f"{self.__class__.__name__}" + def get_name(self): + return self.id_ + + def get_id(self): + return self.id_ + @property def state(self) -> str: return self._state diff --git a/http2_general/test_h2_block_action.py b/http2_general/test_h2_block_action.py index bb9467bf1..478ef3e47 100644 --- a/http2_general/test_h2_block_action.py +++ b/http2_general/test_h2_block_action.py @@ -4,11 +4,14 @@ __copyright__ = "Copyright (C) 2023 Tempesta Technologies, Inc." __license__ = "GPL2" +import time from h2.errors import ErrorCodes from hpack import HeaderTuple +from helpers import util from http2_general.helpers import BlockActionH2Base, H2Base, generate_custom_error_page from test_suite import marks +from helpers.deproxy import HttpMessage @marks.parameterize_class( diff --git a/t_stress/test_connection_flood.py b/t_stress/test_connection_flood.py new file mode 100644 index 000000000..1f3f71840 --- /dev/null +++ b/t_stress/test_connection_flood.py @@ -0,0 +1,501 @@ +""" +HTTP Stress tests - load Tempesta FW with multiple connections. +""" + +import socket +import struct +import time + +from h2.connection import H2Connection +from h2.settings import SettingCodes, Settings + +from framework.deproxy_client import DeproxyClient, DeproxyClientH2, HuffmanEncoder +from framework.deproxy_server import ServerConnection, StaticDeproxyServer +from helpers import deproxy, tf_cfg +from helpers.deproxy import HttpMessage +from test_suite import marks, tester + +__author__ = "Tempesta Technologies, Inc." +__copyright__ = "Copyright (C) 2022-2026 Tempesta Technologies, Inc." +__license__ = "GPL2" + + +class BaseClientWaitForFinish: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.connection_closed = False + self.initialized_at = time.time() + + def is_time_out(self, timeout: int): + return (time.time() - self.initialized_at) > timeout + + def wait_for_finish(self, timeout): + + while not self.connection_closed and not self.is_time_out(timeout): + time.sleep(0.001) + + return True + + +class FloodClientH1SmallRequest(BaseClientWaitForFinish, DeproxyClient): + def flood(self, client_id: str): + self.make_request(self.create_request(method="POST", headers=[("client-id", client_id)])) + + def is_stream_ready(self): + return True + + +class FloodClientH2SmallRequest(BaseClientWaitForFinish, DeproxyClientH2): + """ + Initiate the connection closing from client side after + the half of small response sent + """ + + def update_initial_settings(self, *_, **__) -> None: + self.h2_connection = H2Connection() + self.h2_connection.encoder = HuffmanEncoder() + self.encoder = HuffmanEncoder() + + self.h2_connection.local_settings = Settings( + initial_values={ + SettingCodes.ENABLE_PUSH: 0, + SettingCodes.INITIAL_WINDOW_SIZE: 0, + SettingCodes.MAX_FRAME_SIZE: 1 << 24 - 1, + SettingCodes.MAX_HEADER_LIST_SIZE: 10 << 20, + SettingCodes.HEADER_TABLE_SIZE: 4096, + } + ) + + def flood(self, client_id: int): + self.h2_connection.initiate_connection() + self.send_bytes(self.h2_connection.data_to_send()) + + self.h2_connection.send_headers( + stream_id=1, + end_stream=True, + headers=[ + (":authority", "example.com"), + (":path", "/"), + (":scheme", "https"), + (":method", "POST"), + ("client-id", str(client_id)), + ], + ) + + data_to_send = self.h2_connection.data_to_send() + self.send_bytes(data_to_send) + + def is_stream_ready(self): + return self.h2_connection.streams.get(1) + + +class FloodClientH1LargeRequest(FloodClientH1SmallRequest): + def flood(self, client_id: int): + self.make_request( + self.create_request( + method="POST", + headers=[("client-id", str(client_id))], + body="H" * 16384, + ) + ) + + +class FloodClientH2LargeRequest(FloodClientH2SmallRequest): + + def flood(self, client_id: int): + self.h2_connection.initiate_connection() + self.send_bytes(self.h2_connection.data_to_send()) + + request_body = b"H" * 16384 + + self.h2_connection.send_headers( + stream_id=1, + end_stream=False, + headers=[ + (":authority", "example.com"), + (":path", "/"), + (":scheme", "https"), + (":method", "POST"), + ("client-id", str(client_id)), + ("content-type", "plain/text"), + ("content-length", str(len(request_body))), + ], + ) + + data_to_send = self.h2_connection.data_to_send() + self.send_bytes(data_to_send) + + self.h2_connection.send_data( + stream_id=1, + data=request_body, + end_stream=True, + ) + data_to_send = self.h2_connection.data_to_send() + self.send_bytes(data_to_send) + + +class LifespanServerConnection(ServerConnection): + """ + Initiate the connection closing from client side after + the half of huge response sent + """ + + def __init__( + self, + *args, + flood_clients: dict[str, FloodClientH2SmallRequest | FloodClientH2LargeRequest] = None, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.flood_clients = flood_clients + + def close_client_connection(self, client_id: str): + if self._server.rst_close: + self.flood_clients[client_id].socket.setsockopt( + socket.SOL_SOCKET, + socket.SO_LINGER, + struct.pack("ii", 1, 0), + ) + + self.flood_clients[client_id].socket.shutdown(socket.SHUT_WR) + self.flood_clients[client_id].socket.close() + self.flood_clients[client_id].connection_closed = True + + def send_data(self, request: deproxy.Request, data: bytes) -> int: + client_id = request.headers["client-id"] + half_of_response = len(data) // 2 + + sent = self.socket.send(data[:half_of_response]) + self.close_client_connection(client_id) + sent += self.socket.send(data[half_of_response:]) + return sent + + +class DeproxyServerWithCallback(StaticDeproxyServer): + + def __init__( + self, + *args, + flood_clients: dict[str, FloodClientH2SmallRequest | FloodClientH2LargeRequest] = None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.flood_clients = flood_clients + self.response_size = 0 + self.response_body = "" + self.rst_close = False + + def create_new_connection(self, sock): + return LifespanServerConnection(server=self, sock=sock, flood_clients=self.flood_clients) + + def set_response_size(self, size: int) -> None: + self.response_size = size + self.response_body = "x" * size + self.set_response( + "HTTP/1.1 200 OK\r\n" + "Server: Debian\r\n" + f"Date: {HttpMessage.date_time_string()}\r\n" + f"Content-Length: {self.response_size}\r\n\r\n" + f"{self.response_body}" + ) + + def close_with_rst(self, rst_close: bool): + self.rst_close = rst_close + + +class LifespanServerCloseAfterHeaders(LifespanServerConnection): + """ + Initiate the connection closing from client side + the response headers sent + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.headers_sent = False + + def send_data(self, request: deproxy.Request, data: bytes) -> int: + client_id = request.headers["client-id"] + + if not self.headers_sent: + headers = ( + "HTTP/1.1 200 OK\r\n" + "Server: Debian\r\n" + f"Date: {HttpMessage.date_time_string()}\r\n" + f"Content-Length: {self._server.response_size}\r\n\r\n" + ) + sent = self.socket.send(headers.encode()) + self.headers_sent = True + return sent + + if not self.flood_clients[client_id].is_stream_ready(): + return 0 + + self.close_client_connection(client_id) + + sent = self.socket.send(self._server.response_body.encode()) + return sent + + +class DeproxyServerHeaders(DeproxyServerWithCallback): + def create_new_connection(self, sock): + return LifespanServerCloseAfterHeaders( + server=self, sock=sock, flood_clients=self.flood_clients + ) + + +class LifespanServerSegmented(LifespanServerConnection): + """ + Initiate the connection closing from client side after + half of response segments sent. Each segment sends with + 1-second delay. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.headers_sent = False + self.segment_size = 2000 + self.total_segments_to_stop = None + self.segments_sent = 0 + self.last_sent = time.time() + self.delay_between_segments = 1 + + def send_data(self, request: deproxy.Request, data: bytes) -> int: + time_since_last_sent = time.time() - self.last_sent + + if time_since_last_sent < self.delay_between_segments: + return 0 + + if not self.total_segments_to_stop: + self.total_segments_to_stop = len(data) // self.segment_size // 2 + + client_id = request.headers["client-id"] + + sent = self.socket.send(data[: self.segment_size]) + + if not sent: + return 0 + + self.segments_sent += 1 + self.last_sent = time.time() + + if self.segments_sent >= self.total_segments_to_stop: + self.close_client_connection(client_id) + + return sent + + +class DeproxyServerSegmented(DeproxyServerWithCallback): + def create_new_connection(self, sock): + return LifespanServerSegmented(server=self, sock=sock, flood_clients=self.flood_clients) + + +@marks.parameterize_class( + [ + { + "name": "SmallReqSmallResp", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "SmallReqLargeResp", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "LargeReqSmallResp", + "client": FloodClientH1LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "LargeReqLargeResp", + "client": FloodClientH1LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "AfterHeaders", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerHeaders, + "response_size": 200_000, + }, + { + "name": "Segmented", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerSegmented, + "response_size": 200_000, + }, + ] +) +class TestConnectionFloodH1(tester.TempestaTest): + client = None + server = None + response_size = 0 + + tempesta = { + "config": """ + listen 443 proto=https; + + access_log dmesg; + server ${server_ip}:8000; + frang_limits {http_methods GET HEAD POST PUT DELETE;} + + tls_certificate ${tempesta_workdir}/tempesta.crt; + tls_certificate_key ${tempesta_workdir}/tempesta.key; + tls_match_any_server_name; + + cache 0; + """ + } + + def setUp(self): + super().setUp() + + for index in enumerate(range(1000)): + client = self.client( + id_=f"deproxy_flood_{index}", + deproxy_auto_parser=self.get_deproxy_auto_parser(), + port=443, + bind_addr=tf_cfg.cfg.get("Server", "ip"), + segment_gap=0, + segment_size=0, + is_ipv6=False, + conn_addr=tf_cfg.cfg.get("Tempesta", "ip"), + is_ssl=True, + server_hostname="tempesta-tech.com", + ) + self.add_client(client) + self.deproxy_manager.add_client(client) + + server = self.server( + id_="deproxy", + deproxy_auto_parser=self.get_deproxy_auto_parser(), + flood_clients={client.get_name(): client for client in self.get_clients()}, + port=8000, + bind_addr=tf_cfg.cfg.get("Server", "ip"), + segment_size=0, + segment_gap=0, + is_ipv6=False, + response=b"", + keep_alive=True, + drop_conn_when_request_received=False, + delay_before_sending_response=0.0, + hang_on_req_num=0, + pipelined=0, + ) + self.add_server(server) + self.deproxy_manager.add_server(server) + + def run_test(self, rst_close: bool): + self.start_all_services(client=False) + + server: DeproxyServerWithCallback = self.get_server("deproxy") + server.set_response_size(self.response_size) + server.close_with_rst(rst_close) + + self.assertTrue(self.wait_all_connections()) + + for client in self.get_clients(): + client.start() + client.flood(client_id=client.get_name()) + + self.wait_while_busy(*self.get_clients()) + + self.loggers.dmesg.update() + + logs = self.loggers.dmesg.log_findall( + "request dropped: non-idempotent requests " + "aren't re-forwarded or re-scheduled, status 504" + ) + self.assertEqual(len(logs), 0, "Dropped requests") + + logs = self.loggers.dmesg.log_findall("Close TCP socket w/o sending alert to the peer") + self.assertEqual(len(logs), 0, "Problems with closing TCP connections") + + logs = self.loggers.dmesg.log_findall("Cannot send TLS alert") + self.assertEqual(len(logs), 0, "TLS alerts exists") + + logs = self.loggers.dmesg.log_findall(" 504 ") + self.assertEqual(len(logs), 0, "Timeout requests exists") + + logs = self.loggers.dmesg.log_findall(" 200 ") + self.assertGreater(len(logs), 0, "No successful responses") + + @marks.Parameterize.expand( + [ + marks.Param(name="FIN", rst_close=False), + marks.Param(name="RST", rst_close=True), + ] + ) + def test_connection_flood(self, *_, rst_close: bool = False, **__): + self.run_test(rst_close=rst_close) + + +@marks.parameterize_class( + [ + { + "name": "SmallReqSmallResp", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "SmallReqLargeResp", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "LargeReqSmallResp", + "client": FloodClientH2LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "LargeReqLargeResp", + "client": FloodClientH2LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "AfterHeaders", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerHeaders, + "response_size": 200_000, + }, + { + "name": "H2Segmented", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerSegmented, + "response_size": 200_000, + }, + ] +) +class TestConnectionFloodH2(TestConnectionFloodH1): + tempesta = { + "config": """ + listen 443 proto=h2,https; + + access_log dmesg; + server ${server_ip}:8000; + frang_limits {http_methods GET HEAD POST PUT DELETE;} + + tls_certificate ${tempesta_workdir}/tempesta.crt; + tls_certificate_key ${tempesta_workdir}/tempesta.key; + tls_match_any_server_name; + + cache 0; + """ + } + + @marks.Parameterize.expand( + [ + marks.Param(name="FIN", rst_close=False), + marks.Param(name="RST", rst_close=True), + ] + ) + def test_connection_flood(self, *_, rst_close: bool = False, **__): + self.run_test(rst_close=rst_close) diff --git a/t_stress/test_stress.py b/t_stress/test_stress.py index f6f42ecc4..f80a49172 100644 --- a/t_stress/test_stress.py +++ b/t_stress/test_stress.py @@ -1199,6 +1199,3 @@ def test(self, name, cmd_args, check_func): ) self._test(cmd_args) check_func(self) - - -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/test_suite/tester.py b/test_suite/tester.py index 28fc1491c..585886765 100644 --- a/test_suite/tester.py +++ b/test_suite/tester.py @@ -188,6 +188,9 @@ def disable_deproxy_auto_parser(self) -> None: """ self._deproxy_auto_parser.parsing = False + def get_deproxy_auto_parser(self) -> DeproxyAutoParser: + return self._deproxy_auto_parser + def __create_client_deproxy(self, client: dict, ssl: bool, bind_addr: str): client_factories = { "deproxy_h2": deproxy_client.DeproxyClientH2, @@ -295,6 +298,9 @@ def __create_servers(self): # Copy description to keep it clean between several tests. self.__create_backend(server.copy()) + def add_server(self, server: StaticDeproxyServer | Nginx | LXCServer | DockerServer): + self.__servers[server.get_name()] = server + def get_server(self, sid) -> StaticDeproxyServer | Nginx | LXCServer | DockerServer | None: """Return client with specified id""" return self.__servers.get(sid) @@ -311,6 +317,9 @@ def __create_clients(self): # Copy description to keep it clean between several tests. self.__create_client(client.copy()) + def add_client(self, client: deproxy_client.DeproxyClient | deproxy_client.DeproxyClientH2): + self.__clients[client.get_name()] = client + def get_client(self, cid) -> typing.Union[ deproxy_client.DeproxyClientH2, deproxy_client.DeproxyClient,