From 08ce30d91a218049938c1e44a42e3a3b1201e95a Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Thu, 7 Aug 2025 19:14:19 +0300 Subject: [PATCH 1/3] Add some tests --- http2_general/test_h2_block_action.py | 38 +++++ t_stress/test_stress.py | 68 ++++++++ tools/close_connection_flood/main.go | 232 ++++++++++++++++++++++++++ 3 files changed, 338 insertions(+) create mode 100644 tools/close_connection_flood/main.go diff --git a/http2_general/test_h2_block_action.py b/http2_general/test_h2_block_action.py index bb9467bf1..499599292 100644 --- a/http2_general/test_h2_block_action.py +++ b/http2_general/test_h2_block_action.py @@ -4,11 +4,14 @@ __copyright__ = "Copyright (C) 2023 Tempesta Technologies, Inc." __license__ = "GPL2" +import time from h2.errors import ErrorCodes from hpack import HeaderTuple +from helpers import util from http2_general.helpers import BlockActionH2Base, H2Base, generate_custom_error_page from test_suite import marks +from helpers.deproxy import HttpMessage @marks.parameterize_class( @@ -224,3 +227,38 @@ def test_block_action_error_drop(self): self.assertTrue(client.wait_for_connection_close()) self.assertIsNone(client.last_response) self.check_rst_no_fin_in_sniffer(sniffer) + + +class TestDroppingConnection(H2Base): + def test(self): + self.start_all_services() + client = self.get_client("deproxy") + client.auto_flow_control = False + client.update_initial_settings(initial_window_size=0) + client.send_bytes(client.h2_connection.data_to_send()) + client.wait_for_ack_settings() + + response_body = "x" * 200000 + + server = self.get_server("deproxy") + server.set_response( + "HTTP/1.1 200 OK\r\n" + + "Server: Debian\r\n" + + f"Date: {HttpMessage.date_time_string()}\r\n" + + f"Content-Length: {len(response_body)}\r\n\r\n" + + response_body + ) + client.last_response_buffer = bytes() # clearing the buffer after exchanging settings + + client.make_request(self.get_request) + self.assertTrue(client.wait_for_headers_frame(stream_id=1)) + client.increment_flow_control_window(stream_id=1, flow_controlled_length=1000) + + self.assertTrue( + util.wait_until(lambda: not (b"x" * 1000) in client.last_response_buffer), + "Tempesta did not send first DATA frame after receiving WindowUpdate frame.", + ) + client.stop() + print(util.get_used_memory()) + time.sleep(5) + print(util.get_used_memory()) diff --git a/t_stress/test_stress.py b/t_stress/test_stress.py index f6f42ecc4..c1adefa75 100644 --- a/t_stress/test_stress.py +++ b/t_stress/test_stress.py @@ -6,6 +6,7 @@ import time from pathlib import Path +from helpers.deproxy import HttpMessage from helpers import dmesg, remote, tf_cfg from helpers.networker import NetWorker from helpers.deproxy import HttpMessage @@ -1201,4 +1202,71 @@ def test(self, name, cmd_args, check_func): check_func(self) +class TestConnectionFlood(tester.TempestaTest): + backends = [ + { + "id": "deproxy", + "type": "deproxy", + "port": "8000", + "response": "static", + "response_content": ( + "HTTP/1.1 200 OK\r\n" + + f"Date: {HttpMessage.date_time_string()}\r\n" + + "Server: debian\r\n" + + "Content-Length: 0\r\n\r\n" + ), + } + ] + + tempesta = { + "config": """ + listen 443 proto=h2,https; + + server ${server_ip}:8000; + + tls_certificate ${tempesta_workdir}/tempesta.crt; + tls_certificate_key ${tempesta_workdir}/tempesta.key; + tls_match_any_server_name; + cache 0; + """ + } + + clients = [ + { + "id": "close_connection_flood", + "type": "external", + "binary": "close_connection_flood", + "ssl": True, + "cmd_args": "", + }, + ] + + @marks.Parameterize.expand( + [ + marks.Param( + name="Http2ZeroWindow", + cmd_args=f"-address %s:443 -threads 4 -iterations 100 -connections 100 -debug 1 -close_type FIN -flood_type http2_zero_window", + ), + ] + ) + def test(self, name, cmd_args): + self.start_all_services(client=False) + response_body = "x" * 200000 + server = self.get_server("deproxy") + server.set_response( + "HTTP/1.1 200 OK\r\n" + + "Server: Debian\r\n" + + f"Date: {HttpMessage.date_time_string()}\r\n" + + f"Content-Length: {len(response_body)}\r\n\r\n" + + response_body + ) + flood_client = self.get_client("close_connection_flood") + flood_client.options = [cmd_args % tf_cfg.cfg.get("Tempesta", "ip")] + flood_client.start() + self.wait_while_busy(flood_client) + flood_client.stop() + + print(flood_client.stdout) + print(flood_client.stderr) + # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tools/close_connection_flood/main.go b/tools/close_connection_flood/main.go new file mode 100644 index 000000000..cbb640e6b --- /dev/null +++ b/tools/close_connection_flood/main.go @@ -0,0 +1,232 @@ +package main + +import ( + "bytes" + "flag" + "log" + "crypto/tls" + "sync/atomic" + "time" + "net" + "os" + "fmt" + "syscall" + "reflect" + "unsafe" + + "golang.org/x/net/http2" + "golang.org/x/net/http2/hpack" +) + +type TcpCloseType int + +const ( + TcpCloseTypeUnknown TcpCloseType = iota + TcpCloseTypeFin + TcpCloseTypeRst +) + +const ( + transportDefaultStreamFlow = 4 << 20 + maxFrameSize = 1 << 24 - 1 + maxHeaderListSize = 10 << 20 + headerTableSize = 4096 +) + +var ( + iterations int + threads int + address string + host string + close_type string + flood_type string + connections int + debug int +) + +var finished int64 + +func init() { + flag.StringVar(&address, "address", ":443", "server address") + flag.StringVar(&host, "host", "localhost", "host/authority header") + flag.IntVar(&threads, "threads", 1, "number of threads to start") + flag.IntVar(&iterations, "iterations", 1, "number of flood iterations") + flag.IntVar(&connections, "connections", 100, "count of connections to flood") + flag.StringVar(&close_type, "close_type", "unknown", "type of connection close (FIN/RST)") + flag.StringVar(&flood_type, "flood_type", "unknown", "type of flood") + flag.IntVar(&debug, "debug", 0, "debug level") + flag.Parse() +} + +func main() { + log.Printf("Starting %d iterations in %d threads", iterations, threads) + var iter_per_thread int = iterations / threads + var rem int = iterations % threads + + for i := 0; i < threads; i++ { + inc := 0 + if rem > 0 { + inc = 1 + rem-- + } + go func(i int, inc int) { + for j := 0; j < iter_per_thread + inc; j++ { + switch flood_type { + case "http2_zero_window": + flood_http2_zero_window(i, close_type) + default: + panic(fmt.Errorf("Unknown flood type: %s", flood_type)) + } + nfinished := atomic.AddInt64(&finished, 1) + if (nfinished == int64(iterations)) { + log.Printf("All connections are finished, stopping program\n") + os.Exit(0) + } + } + }(i, inc) + } + + done := make(chan struct{}) + <-done +} + +func sendRst(conn net.Conn) error { + // Extract syscall.RawConn from net.Conn + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return fmt.Errorf("not a TCPConn") + } + + rawConn, err := tcpConn.SyscallConn() + if err != nil { + return err + } + + var serr error + rawConn.Control(func(fd uintptr) { + linger := syscall.Linger{ + Onoff: 1, // Enable linger + Linger: 0, // Timeout = 0, causes RST on close + } + serr = syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &linger) + if serr != nil { + return + } + serr = syscall.Close(int(fd)) // Close socket to send RST + }) + return serr +} + +func getNetConn(tlsConn *tls.Conn) net.Conn { + v := reflect.ValueOf(tlsConn).Elem() + connField := v.FieldByName("conn") + conn := reflect.NewAt(connField.Type(), unsafe.Pointer(connField.UnsafeAddr())).Elem().Interface() + return conn.(net.Conn) +} + +func flood_http2_zero_window(cid int, close_type string) { + conns := []*tls.Conn{} + initialSettings := []http2.Setting{ + {ID: http2.SettingEnablePush, Val: 0}, + {ID: http2.SettingInitialWindowSize, Val: 0}, + {ID: http2.SettingMaxFrameSize, Val: maxFrameSize}, + {ID: http2.SettingMaxHeaderListSize, Val: maxHeaderListSize}, + {ID: http2.SettingHeaderTableSize, Val: headerTableSize}, + } + conf := &tls.Config { + InsecureSkipVerify: true, + NextProtos: []string{"h2"}, + } + + for conn_n := 0; conn_n < connections; conn_n++ { + conn, err := tls.DialWithDialer(&net.Dialer{Timeout: 2 * time.Second}, + "tcp", address, conf) + if err != nil { + panic(err) + } + + conns = append(conns, conn) + _, err = conn.Write([]byte(http2.ClientPreface)) + if err != nil { + panic(err) + } + + framer := http2.NewFramer(conn, conn) + err = framer.WriteSettings(initialSettings...) + if err != nil { + panic(err) + } + + for i := 0; i < 2; i++ { + _, err := readPrintFrame(cid, framer) + if err != nil { + panic(err) + } + } + + blockBuffer := bytes.Buffer{} + henc := hpack.NewEncoder(&blockBuffer) + henc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) + henc.WriteField(hpack.HeaderField{Name: ":path", Value: "/"}) + henc.WriteField(hpack.HeaderField{Name: ":scheme", Value: "https"}) + henc.WriteField(hpack.HeaderField{Name: ":authority", Value: "localhost"}) + + err = framer.WriteHeaders(http2.HeadersFrameParam{ + StreamID: 1, + EndStream: true, + EndHeaders: true, + BlockFragment: blockBuffer.Bytes(), + }) + if (err != nil) { + panic(err) + } + + for { + frame, err := readPrintFrame(cid, framer) + if err != nil { + panic(err) + } + + if hf, ok := frame.(*http2.HeadersFrame); ok { + if hf.HeadersEnded() { + break + } + } + } + } + + for conn_n := 0; conn_n < connections; conn_n++ { + var conn net.Conn = getNetConn(conns[conn_n]) + var err error = nil + switch close_type { + case "FIN": + err = conn.Close() + case "RST": + err = sendRst(conn) + default: + panic("Invalid close type") + } + + if err != nil { + panic(err) + } + } +} + +func readPrintFrame(cid int, framer *http2.Framer) (http2.Frame, error) { + frame, err := framer.ReadFrame() + if err != nil { + log.Printf("[%d] Error reading %+v. Reason: %s\n", cid, frame, err) + return frame, err + } + + if debug > 1 { + log.Printf("[%d] Read frame %+v\n", cid, frame) + } + + if frame.Header().Type == http2.FrameGoAway { + log.Println(frame.(*http2.GoAwayFrame).ErrCode) + } + + return frame, nil +} From 7db4edc25ec43a1821760d599c4a2f72c28c725d Mon Sep 17 00:00:00 2001 From: symstu Date: Mon, 18 Aug 2025 18:31:15 +0200 Subject: [PATCH 2/3] tests --- framework/deproxy_client.py | 13 +- framework/deproxy_server.py | 45 ++- framework/nginx_server.py | 4 - framework/stateful.py | 7 + t_stress/test_connection_flood.py | 501 +++++++++++++++++++++++++++ t_stress/test_stress.py | 71 ---- test_suite/tester.py | 9 + tools/close_connection_flood/main.go | 232 ------------- 8 files changed, 557 insertions(+), 325 deletions(-) create mode 100644 t_stress/test_connection_flood.py delete mode 100644 tools/close_connection_flood/main.go diff --git a/framework/deproxy_client.py b/framework/deproxy_client.py index c5a647498..244c6cfbc 100644 --- a/framework/deproxy_client.py +++ b/framework/deproxy_client.py @@ -51,6 +51,7 @@ def __init__( conn_addr: Optional[str], is_ssl: bool, server_hostname: str, + is_http2: bool = None, ): # Initialize the `BaseDeproxy` super().__init__( @@ -64,7 +65,11 @@ def __init__( ) self.ssl = is_ssl - self._is_http2 = isinstance(self, DeproxyClientH2) + self._is_http2 = is_http2 + + if self._is_http2 is None: + self._is_http2 = isinstance(self, DeproxyClientH2) + self._create_context() self.server_hostname = server_hostname @@ -459,6 +464,12 @@ class ReqBodyBuffer: class DeproxyClientH2(BaseDeproxyClient): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.h2_connection: Optional[h2.connection.H2Connection] = None + self.encoder = None + @property def ping_received(self) -> int: return self._ping_received diff --git a/framework/deproxy_server.py b/framework/deproxy_server.py index d6ce52593..2ed80863a 100644 --- a/framework/deproxy_server.py +++ b/framework/deproxy_server.py @@ -5,7 +5,6 @@ import time from typing import Optional -import run_config from framework import stateful from framework.deproxy_base import BaseDeproxy from helpers import deproxy, error, tempesta, tf_cfg, util @@ -36,6 +35,7 @@ def __init__(self, *, server: "StaticDeproxyServer", sock: socket.socket): self._cur_responses_list = [] self.__time_to_send: float = 0 self.__new_response: bool = True + self.last_request: Optional[deproxy.Request] = None self.nrreq: int = 0 self._tcp_logger.debug("New server connection") @@ -91,7 +91,7 @@ def handle_read(self): while self._request_buffer: try: - request = deproxy.Request(self._request_buffer) + self.last_request = deproxy.Request(self._request_buffer) self.nrreq += 1 except deproxy.IncompleteMessage: return None @@ -102,8 +102,8 @@ def handle_read(self): return None self._http_logger.info("Receive request") - self._http_logger.debug(request) - response, need_close = self._server.receive_request(request) + self._http_logger.debug(self.last_request) + response, need_close = self._server.receive_request(self.last_request) if self._server.drop_conn_when_request_received: self.handle_close() if response: @@ -116,18 +116,21 @@ def handle_read(self): if need_close: self.close() - self._request_buffer = self._request_buffer[len(request.msg) :] + self._request_buffer = self._request_buffer[len(self.last_request.msg) :] # Handler will be called even if buffer is empty. else: return None + def send_data(self, request: deproxy.Request, data: bytes) -> int: + return self.socket.send(data) + def handle_write(self): if self._server.delay_before_sending_response and self.__new_response: self.__new_response = False return self.sleep() resp = self._response_buffer[self._responses_done] - sent = self.socket.send(resp[: self._server.segment_size]) + sent = self.send_data(self.last_request, resp[: self._server.segment_size]) if sent < 0: return @@ -188,19 +191,27 @@ def _reinit_variables(self): self._connections: list[ServerConnection] = list() self._requests: list[deproxy.Request] = list() + def create_new_connection(self, sock): + return ServerConnection(server=self, sock=sock) + def handle_accept(self): pair = self.accept() - if pair is not None: - sock, _ = pair - if self.segment_size: - sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - handler = ServerConnection(server=self, sock=sock) - self._connections.append(handler) - # ATTENTION - # Due to the polling cycle, creating new connection can be - # performed before removing old connection. - # So we can have case with > expected amount of connections - # It's not a error case, it's a problem of polling + + if pair is None: + return + + sock, _ = pair + + if self.segment_size: + sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + + handler = self.create_new_connection(sock=sock) + self._connections.append(handler) + # ATTENTION + # Due to the polling cycle, creating new connection can be + # performed before removing old connection. + # So we can have case with > expected amount of connections + # It's not a error case, it's a problem of polling def handle_error(self): type_, v, _ = sys.exc_info() diff --git a/framework/nginx_server.py b/framework/nginx_server.py index f110de68f..160f5108a 100644 --- a/framework/nginx_server.py +++ b/framework/nginx_server.py @@ -34,7 +34,6 @@ def __init__(self, id_, props): self.conns_n = tempesta.server_conns_default() self.active_conns = 0 self.requests = 0 - self.name = id_ self.status_uri = fill_template(props["status_uri"], props) self.stop_procedures = [self.stop_nginx, self.remove_config] self.weight = int(props["weight"]) if "weight" in props else None @@ -42,9 +41,6 @@ def __init__(self, id_, props): self.clear_stats() - def get_name(self): - return self.name - def clear_stats(self): self.active_conns = 0 self.requests = 0 diff --git a/framework/stateful.py b/framework/stateful.py index d77072057..11ee16fe2 100644 --- a/framework/stateful.py +++ b/framework/stateful.py @@ -22,6 +22,7 @@ class Stateful(abc.ABC): def __init__(self, *, id_: str): self._state = STATE_STOPPED self.stop_procedures = [] + self.id_ = id_ self._exceptions = [] self._generate_service_id(id_) self._logger = logging.LoggerAdapter( @@ -34,6 +35,12 @@ def _generate_service_id(self, id_: str) -> None: def __str__(self): return f"{self.__class__.__name__}" + def get_name(self): + return self.id_ + + def get_id(self): + return self.id_ + @property def state(self) -> str: return self._state diff --git a/t_stress/test_connection_flood.py b/t_stress/test_connection_flood.py new file mode 100644 index 000000000..1f3f71840 --- /dev/null +++ b/t_stress/test_connection_flood.py @@ -0,0 +1,501 @@ +""" +HTTP Stress tests - load Tempesta FW with multiple connections. +""" + +import socket +import struct +import time + +from h2.connection import H2Connection +from h2.settings import SettingCodes, Settings + +from framework.deproxy_client import DeproxyClient, DeproxyClientH2, HuffmanEncoder +from framework.deproxy_server import ServerConnection, StaticDeproxyServer +from helpers import deproxy, tf_cfg +from helpers.deproxy import HttpMessage +from test_suite import marks, tester + +__author__ = "Tempesta Technologies, Inc." +__copyright__ = "Copyright (C) 2022-2026 Tempesta Technologies, Inc." +__license__ = "GPL2" + + +class BaseClientWaitForFinish: + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.connection_closed = False + self.initialized_at = time.time() + + def is_time_out(self, timeout: int): + return (time.time() - self.initialized_at) > timeout + + def wait_for_finish(self, timeout): + + while not self.connection_closed and not self.is_time_out(timeout): + time.sleep(0.001) + + return True + + +class FloodClientH1SmallRequest(BaseClientWaitForFinish, DeproxyClient): + def flood(self, client_id: str): + self.make_request(self.create_request(method="POST", headers=[("client-id", client_id)])) + + def is_stream_ready(self): + return True + + +class FloodClientH2SmallRequest(BaseClientWaitForFinish, DeproxyClientH2): + """ + Initiate the connection closing from client side after + the half of small response sent + """ + + def update_initial_settings(self, *_, **__) -> None: + self.h2_connection = H2Connection() + self.h2_connection.encoder = HuffmanEncoder() + self.encoder = HuffmanEncoder() + + self.h2_connection.local_settings = Settings( + initial_values={ + SettingCodes.ENABLE_PUSH: 0, + SettingCodes.INITIAL_WINDOW_SIZE: 0, + SettingCodes.MAX_FRAME_SIZE: 1 << 24 - 1, + SettingCodes.MAX_HEADER_LIST_SIZE: 10 << 20, + SettingCodes.HEADER_TABLE_SIZE: 4096, + } + ) + + def flood(self, client_id: int): + self.h2_connection.initiate_connection() + self.send_bytes(self.h2_connection.data_to_send()) + + self.h2_connection.send_headers( + stream_id=1, + end_stream=True, + headers=[ + (":authority", "example.com"), + (":path", "/"), + (":scheme", "https"), + (":method", "POST"), + ("client-id", str(client_id)), + ], + ) + + data_to_send = self.h2_connection.data_to_send() + self.send_bytes(data_to_send) + + def is_stream_ready(self): + return self.h2_connection.streams.get(1) + + +class FloodClientH1LargeRequest(FloodClientH1SmallRequest): + def flood(self, client_id: int): + self.make_request( + self.create_request( + method="POST", + headers=[("client-id", str(client_id))], + body="H" * 16384, + ) + ) + + +class FloodClientH2LargeRequest(FloodClientH2SmallRequest): + + def flood(self, client_id: int): + self.h2_connection.initiate_connection() + self.send_bytes(self.h2_connection.data_to_send()) + + request_body = b"H" * 16384 + + self.h2_connection.send_headers( + stream_id=1, + end_stream=False, + headers=[ + (":authority", "example.com"), + (":path", "/"), + (":scheme", "https"), + (":method", "POST"), + ("client-id", str(client_id)), + ("content-type", "plain/text"), + ("content-length", str(len(request_body))), + ], + ) + + data_to_send = self.h2_connection.data_to_send() + self.send_bytes(data_to_send) + + self.h2_connection.send_data( + stream_id=1, + data=request_body, + end_stream=True, + ) + data_to_send = self.h2_connection.data_to_send() + self.send_bytes(data_to_send) + + +class LifespanServerConnection(ServerConnection): + """ + Initiate the connection closing from client side after + the half of huge response sent + """ + + def __init__( + self, + *args, + flood_clients: dict[str, FloodClientH2SmallRequest | FloodClientH2LargeRequest] = None, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.flood_clients = flood_clients + + def close_client_connection(self, client_id: str): + if self._server.rst_close: + self.flood_clients[client_id].socket.setsockopt( + socket.SOL_SOCKET, + socket.SO_LINGER, + struct.pack("ii", 1, 0), + ) + + self.flood_clients[client_id].socket.shutdown(socket.SHUT_WR) + self.flood_clients[client_id].socket.close() + self.flood_clients[client_id].connection_closed = True + + def send_data(self, request: deproxy.Request, data: bytes) -> int: + client_id = request.headers["client-id"] + half_of_response = len(data) // 2 + + sent = self.socket.send(data[:half_of_response]) + self.close_client_connection(client_id) + sent += self.socket.send(data[half_of_response:]) + return sent + + +class DeproxyServerWithCallback(StaticDeproxyServer): + + def __init__( + self, + *args, + flood_clients: dict[str, FloodClientH2SmallRequest | FloodClientH2LargeRequest] = None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.flood_clients = flood_clients + self.response_size = 0 + self.response_body = "" + self.rst_close = False + + def create_new_connection(self, sock): + return LifespanServerConnection(server=self, sock=sock, flood_clients=self.flood_clients) + + def set_response_size(self, size: int) -> None: + self.response_size = size + self.response_body = "x" * size + self.set_response( + "HTTP/1.1 200 OK\r\n" + "Server: Debian\r\n" + f"Date: {HttpMessage.date_time_string()}\r\n" + f"Content-Length: {self.response_size}\r\n\r\n" + f"{self.response_body}" + ) + + def close_with_rst(self, rst_close: bool): + self.rst_close = rst_close + + +class LifespanServerCloseAfterHeaders(LifespanServerConnection): + """ + Initiate the connection closing from client side + the response headers sent + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.headers_sent = False + + def send_data(self, request: deproxy.Request, data: bytes) -> int: + client_id = request.headers["client-id"] + + if not self.headers_sent: + headers = ( + "HTTP/1.1 200 OK\r\n" + "Server: Debian\r\n" + f"Date: {HttpMessage.date_time_string()}\r\n" + f"Content-Length: {self._server.response_size}\r\n\r\n" + ) + sent = self.socket.send(headers.encode()) + self.headers_sent = True + return sent + + if not self.flood_clients[client_id].is_stream_ready(): + return 0 + + self.close_client_connection(client_id) + + sent = self.socket.send(self._server.response_body.encode()) + return sent + + +class DeproxyServerHeaders(DeproxyServerWithCallback): + def create_new_connection(self, sock): + return LifespanServerCloseAfterHeaders( + server=self, sock=sock, flood_clients=self.flood_clients + ) + + +class LifespanServerSegmented(LifespanServerConnection): + """ + Initiate the connection closing from client side after + half of response segments sent. Each segment sends with + 1-second delay. + """ + + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.headers_sent = False + self.segment_size = 2000 + self.total_segments_to_stop = None + self.segments_sent = 0 + self.last_sent = time.time() + self.delay_between_segments = 1 + + def send_data(self, request: deproxy.Request, data: bytes) -> int: + time_since_last_sent = time.time() - self.last_sent + + if time_since_last_sent < self.delay_between_segments: + return 0 + + if not self.total_segments_to_stop: + self.total_segments_to_stop = len(data) // self.segment_size // 2 + + client_id = request.headers["client-id"] + + sent = self.socket.send(data[: self.segment_size]) + + if not sent: + return 0 + + self.segments_sent += 1 + self.last_sent = time.time() + + if self.segments_sent >= self.total_segments_to_stop: + self.close_client_connection(client_id) + + return sent + + +class DeproxyServerSegmented(DeproxyServerWithCallback): + def create_new_connection(self, sock): + return LifespanServerSegmented(server=self, sock=sock, flood_clients=self.flood_clients) + + +@marks.parameterize_class( + [ + { + "name": "SmallReqSmallResp", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "SmallReqLargeResp", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "LargeReqSmallResp", + "client": FloodClientH1LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "LargeReqLargeResp", + "client": FloodClientH1LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "AfterHeaders", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerHeaders, + "response_size": 200_000, + }, + { + "name": "Segmented", + "client": FloodClientH1SmallRequest, + "server": DeproxyServerSegmented, + "response_size": 200_000, + }, + ] +) +class TestConnectionFloodH1(tester.TempestaTest): + client = None + server = None + response_size = 0 + + tempesta = { + "config": """ + listen 443 proto=https; + + access_log dmesg; + server ${server_ip}:8000; + frang_limits {http_methods GET HEAD POST PUT DELETE;} + + tls_certificate ${tempesta_workdir}/tempesta.crt; + tls_certificate_key ${tempesta_workdir}/tempesta.key; + tls_match_any_server_name; + + cache 0; + """ + } + + def setUp(self): + super().setUp() + + for index in enumerate(range(1000)): + client = self.client( + id_=f"deproxy_flood_{index}", + deproxy_auto_parser=self.get_deproxy_auto_parser(), + port=443, + bind_addr=tf_cfg.cfg.get("Server", "ip"), + segment_gap=0, + segment_size=0, + is_ipv6=False, + conn_addr=tf_cfg.cfg.get("Tempesta", "ip"), + is_ssl=True, + server_hostname="tempesta-tech.com", + ) + self.add_client(client) + self.deproxy_manager.add_client(client) + + server = self.server( + id_="deproxy", + deproxy_auto_parser=self.get_deproxy_auto_parser(), + flood_clients={client.get_name(): client for client in self.get_clients()}, + port=8000, + bind_addr=tf_cfg.cfg.get("Server", "ip"), + segment_size=0, + segment_gap=0, + is_ipv6=False, + response=b"", + keep_alive=True, + drop_conn_when_request_received=False, + delay_before_sending_response=0.0, + hang_on_req_num=0, + pipelined=0, + ) + self.add_server(server) + self.deproxy_manager.add_server(server) + + def run_test(self, rst_close: bool): + self.start_all_services(client=False) + + server: DeproxyServerWithCallback = self.get_server("deproxy") + server.set_response_size(self.response_size) + server.close_with_rst(rst_close) + + self.assertTrue(self.wait_all_connections()) + + for client in self.get_clients(): + client.start() + client.flood(client_id=client.get_name()) + + self.wait_while_busy(*self.get_clients()) + + self.loggers.dmesg.update() + + logs = self.loggers.dmesg.log_findall( + "request dropped: non-idempotent requests " + "aren't re-forwarded or re-scheduled, status 504" + ) + self.assertEqual(len(logs), 0, "Dropped requests") + + logs = self.loggers.dmesg.log_findall("Close TCP socket w/o sending alert to the peer") + self.assertEqual(len(logs), 0, "Problems with closing TCP connections") + + logs = self.loggers.dmesg.log_findall("Cannot send TLS alert") + self.assertEqual(len(logs), 0, "TLS alerts exists") + + logs = self.loggers.dmesg.log_findall(" 504 ") + self.assertEqual(len(logs), 0, "Timeout requests exists") + + logs = self.loggers.dmesg.log_findall(" 200 ") + self.assertGreater(len(logs), 0, "No successful responses") + + @marks.Parameterize.expand( + [ + marks.Param(name="FIN", rst_close=False), + marks.Param(name="RST", rst_close=True), + ] + ) + def test_connection_flood(self, *_, rst_close: bool = False, **__): + self.run_test(rst_close=rst_close) + + +@marks.parameterize_class( + [ + { + "name": "SmallReqSmallResp", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "SmallReqLargeResp", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "LargeReqSmallResp", + "client": FloodClientH2LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200, + }, + { + "name": "LargeReqLargeResp", + "client": FloodClientH2LargeRequest, + "server": DeproxyServerWithCallback, + "response_size": 200_000, + }, + { + "name": "AfterHeaders", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerHeaders, + "response_size": 200_000, + }, + { + "name": "H2Segmented", + "client": FloodClientH2SmallRequest, + "server": DeproxyServerSegmented, + "response_size": 200_000, + }, + ] +) +class TestConnectionFloodH2(TestConnectionFloodH1): + tempesta = { + "config": """ + listen 443 proto=h2,https; + + access_log dmesg; + server ${server_ip}:8000; + frang_limits {http_methods GET HEAD POST PUT DELETE;} + + tls_certificate ${tempesta_workdir}/tempesta.crt; + tls_certificate_key ${tempesta_workdir}/tempesta.key; + tls_match_any_server_name; + + cache 0; + """ + } + + @marks.Parameterize.expand( + [ + marks.Param(name="FIN", rst_close=False), + marks.Param(name="RST", rst_close=True), + ] + ) + def test_connection_flood(self, *_, rst_close: bool = False, **__): + self.run_test(rst_close=rst_close) diff --git a/t_stress/test_stress.py b/t_stress/test_stress.py index c1adefa75..f80a49172 100644 --- a/t_stress/test_stress.py +++ b/t_stress/test_stress.py @@ -6,7 +6,6 @@ import time from pathlib import Path -from helpers.deproxy import HttpMessage from helpers import dmesg, remote, tf_cfg from helpers.networker import NetWorker from helpers.deproxy import HttpMessage @@ -1200,73 +1199,3 @@ def test(self, name, cmd_args, check_func): ) self._test(cmd_args) check_func(self) - - -class TestConnectionFlood(tester.TempestaTest): - backends = [ - { - "id": "deproxy", - "type": "deproxy", - "port": "8000", - "response": "static", - "response_content": ( - "HTTP/1.1 200 OK\r\n" - + f"Date: {HttpMessage.date_time_string()}\r\n" - + "Server: debian\r\n" - + "Content-Length: 0\r\n\r\n" - ), - } - ] - - tempesta = { - "config": """ - listen 443 proto=h2,https; - - server ${server_ip}:8000; - - tls_certificate ${tempesta_workdir}/tempesta.crt; - tls_certificate_key ${tempesta_workdir}/tempesta.key; - tls_match_any_server_name; - cache 0; - """ - } - - clients = [ - { - "id": "close_connection_flood", - "type": "external", - "binary": "close_connection_flood", - "ssl": True, - "cmd_args": "", - }, - ] - - @marks.Parameterize.expand( - [ - marks.Param( - name="Http2ZeroWindow", - cmd_args=f"-address %s:443 -threads 4 -iterations 100 -connections 100 -debug 1 -close_type FIN -flood_type http2_zero_window", - ), - ] - ) - def test(self, name, cmd_args): - self.start_all_services(client=False) - response_body = "x" * 200000 - server = self.get_server("deproxy") - server.set_response( - "HTTP/1.1 200 OK\r\n" - + "Server: Debian\r\n" - + f"Date: {HttpMessage.date_time_string()}\r\n" - + f"Content-Length: {len(response_body)}\r\n\r\n" - + response_body - ) - flood_client = self.get_client("close_connection_flood") - flood_client.options = [cmd_args % tf_cfg.cfg.get("Tempesta", "ip")] - flood_client.start() - self.wait_while_busy(flood_client) - flood_client.stop() - - print(flood_client.stdout) - print(flood_client.stderr) - -# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/test_suite/tester.py b/test_suite/tester.py index 28fc1491c..585886765 100644 --- a/test_suite/tester.py +++ b/test_suite/tester.py @@ -188,6 +188,9 @@ def disable_deproxy_auto_parser(self) -> None: """ self._deproxy_auto_parser.parsing = False + def get_deproxy_auto_parser(self) -> DeproxyAutoParser: + return self._deproxy_auto_parser + def __create_client_deproxy(self, client: dict, ssl: bool, bind_addr: str): client_factories = { "deproxy_h2": deproxy_client.DeproxyClientH2, @@ -295,6 +298,9 @@ def __create_servers(self): # Copy description to keep it clean between several tests. self.__create_backend(server.copy()) + def add_server(self, server: StaticDeproxyServer | Nginx | LXCServer | DockerServer): + self.__servers[server.get_name()] = server + def get_server(self, sid) -> StaticDeproxyServer | Nginx | LXCServer | DockerServer | None: """Return client with specified id""" return self.__servers.get(sid) @@ -311,6 +317,9 @@ def __create_clients(self): # Copy description to keep it clean between several tests. self.__create_client(client.copy()) + def add_client(self, client: deproxy_client.DeproxyClient | deproxy_client.DeproxyClientH2): + self.__clients[client.get_name()] = client + def get_client(self, cid) -> typing.Union[ deproxy_client.DeproxyClientH2, deproxy_client.DeproxyClient, diff --git a/tools/close_connection_flood/main.go b/tools/close_connection_flood/main.go deleted file mode 100644 index cbb640e6b..000000000 --- a/tools/close_connection_flood/main.go +++ /dev/null @@ -1,232 +0,0 @@ -package main - -import ( - "bytes" - "flag" - "log" - "crypto/tls" - "sync/atomic" - "time" - "net" - "os" - "fmt" - "syscall" - "reflect" - "unsafe" - - "golang.org/x/net/http2" - "golang.org/x/net/http2/hpack" -) - -type TcpCloseType int - -const ( - TcpCloseTypeUnknown TcpCloseType = iota - TcpCloseTypeFin - TcpCloseTypeRst -) - -const ( - transportDefaultStreamFlow = 4 << 20 - maxFrameSize = 1 << 24 - 1 - maxHeaderListSize = 10 << 20 - headerTableSize = 4096 -) - -var ( - iterations int - threads int - address string - host string - close_type string - flood_type string - connections int - debug int -) - -var finished int64 - -func init() { - flag.StringVar(&address, "address", ":443", "server address") - flag.StringVar(&host, "host", "localhost", "host/authority header") - flag.IntVar(&threads, "threads", 1, "number of threads to start") - flag.IntVar(&iterations, "iterations", 1, "number of flood iterations") - flag.IntVar(&connections, "connections", 100, "count of connections to flood") - flag.StringVar(&close_type, "close_type", "unknown", "type of connection close (FIN/RST)") - flag.StringVar(&flood_type, "flood_type", "unknown", "type of flood") - flag.IntVar(&debug, "debug", 0, "debug level") - flag.Parse() -} - -func main() { - log.Printf("Starting %d iterations in %d threads", iterations, threads) - var iter_per_thread int = iterations / threads - var rem int = iterations % threads - - for i := 0; i < threads; i++ { - inc := 0 - if rem > 0 { - inc = 1 - rem-- - } - go func(i int, inc int) { - for j := 0; j < iter_per_thread + inc; j++ { - switch flood_type { - case "http2_zero_window": - flood_http2_zero_window(i, close_type) - default: - panic(fmt.Errorf("Unknown flood type: %s", flood_type)) - } - nfinished := atomic.AddInt64(&finished, 1) - if (nfinished == int64(iterations)) { - log.Printf("All connections are finished, stopping program\n") - os.Exit(0) - } - } - }(i, inc) - } - - done := make(chan struct{}) - <-done -} - -func sendRst(conn net.Conn) error { - // Extract syscall.RawConn from net.Conn - tcpConn, ok := conn.(*net.TCPConn) - if !ok { - return fmt.Errorf("not a TCPConn") - } - - rawConn, err := tcpConn.SyscallConn() - if err != nil { - return err - } - - var serr error - rawConn.Control(func(fd uintptr) { - linger := syscall.Linger{ - Onoff: 1, // Enable linger - Linger: 0, // Timeout = 0, causes RST on close - } - serr = syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &linger) - if serr != nil { - return - } - serr = syscall.Close(int(fd)) // Close socket to send RST - }) - return serr -} - -func getNetConn(tlsConn *tls.Conn) net.Conn { - v := reflect.ValueOf(tlsConn).Elem() - connField := v.FieldByName("conn") - conn := reflect.NewAt(connField.Type(), unsafe.Pointer(connField.UnsafeAddr())).Elem().Interface() - return conn.(net.Conn) -} - -func flood_http2_zero_window(cid int, close_type string) { - conns := []*tls.Conn{} - initialSettings := []http2.Setting{ - {ID: http2.SettingEnablePush, Val: 0}, - {ID: http2.SettingInitialWindowSize, Val: 0}, - {ID: http2.SettingMaxFrameSize, Val: maxFrameSize}, - {ID: http2.SettingMaxHeaderListSize, Val: maxHeaderListSize}, - {ID: http2.SettingHeaderTableSize, Val: headerTableSize}, - } - conf := &tls.Config { - InsecureSkipVerify: true, - NextProtos: []string{"h2"}, - } - - for conn_n := 0; conn_n < connections; conn_n++ { - conn, err := tls.DialWithDialer(&net.Dialer{Timeout: 2 * time.Second}, - "tcp", address, conf) - if err != nil { - panic(err) - } - - conns = append(conns, conn) - _, err = conn.Write([]byte(http2.ClientPreface)) - if err != nil { - panic(err) - } - - framer := http2.NewFramer(conn, conn) - err = framer.WriteSettings(initialSettings...) - if err != nil { - panic(err) - } - - for i := 0; i < 2; i++ { - _, err := readPrintFrame(cid, framer) - if err != nil { - panic(err) - } - } - - blockBuffer := bytes.Buffer{} - henc := hpack.NewEncoder(&blockBuffer) - henc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) - henc.WriteField(hpack.HeaderField{Name: ":path", Value: "/"}) - henc.WriteField(hpack.HeaderField{Name: ":scheme", Value: "https"}) - henc.WriteField(hpack.HeaderField{Name: ":authority", Value: "localhost"}) - - err = framer.WriteHeaders(http2.HeadersFrameParam{ - StreamID: 1, - EndStream: true, - EndHeaders: true, - BlockFragment: blockBuffer.Bytes(), - }) - if (err != nil) { - panic(err) - } - - for { - frame, err := readPrintFrame(cid, framer) - if err != nil { - panic(err) - } - - if hf, ok := frame.(*http2.HeadersFrame); ok { - if hf.HeadersEnded() { - break - } - } - } - } - - for conn_n := 0; conn_n < connections; conn_n++ { - var conn net.Conn = getNetConn(conns[conn_n]) - var err error = nil - switch close_type { - case "FIN": - err = conn.Close() - case "RST": - err = sendRst(conn) - default: - panic("Invalid close type") - } - - if err != nil { - panic(err) - } - } -} - -func readPrintFrame(cid int, framer *http2.Framer) (http2.Frame, error) { - frame, err := framer.ReadFrame() - if err != nil { - log.Printf("[%d] Error reading %+v. Reason: %s\n", cid, frame, err) - return frame, err - } - - if debug > 1 { - log.Printf("[%d] Read frame %+v\n", cid, frame) - } - - if frame.Header().Type == http2.FrameGoAway { - log.Println(frame.(*http2.GoAwayFrame).ErrCode) - } - - return frame, nil -} From 3b55307447da8af58d76c0431604819ebb9dfda1 Mon Sep 17 00:00:00 2001 From: EvgeniiMekhanik Date: Fri, 19 Sep 2025 14:05:28 +0300 Subject: [PATCH 3/3] TMP --- http2_general/test_h2_block_action.py | 35 --------------------------- 1 file changed, 35 deletions(-) diff --git a/http2_general/test_h2_block_action.py b/http2_general/test_h2_block_action.py index 499599292..478ef3e47 100644 --- a/http2_general/test_h2_block_action.py +++ b/http2_general/test_h2_block_action.py @@ -227,38 +227,3 @@ def test_block_action_error_drop(self): self.assertTrue(client.wait_for_connection_close()) self.assertIsNone(client.last_response) self.check_rst_no_fin_in_sniffer(sniffer) - - -class TestDroppingConnection(H2Base): - def test(self): - self.start_all_services() - client = self.get_client("deproxy") - client.auto_flow_control = False - client.update_initial_settings(initial_window_size=0) - client.send_bytes(client.h2_connection.data_to_send()) - client.wait_for_ack_settings() - - response_body = "x" * 200000 - - server = self.get_server("deproxy") - server.set_response( - "HTTP/1.1 200 OK\r\n" - + "Server: Debian\r\n" - + f"Date: {HttpMessage.date_time_string()}\r\n" - + f"Content-Length: {len(response_body)}\r\n\r\n" - + response_body - ) - client.last_response_buffer = bytes() # clearing the buffer after exchanging settings - - client.make_request(self.get_request) - self.assertTrue(client.wait_for_headers_frame(stream_id=1)) - client.increment_flow_control_window(stream_id=1, flow_controlled_length=1000) - - self.assertTrue( - util.wait_until(lambda: not (b"x" * 1000) in client.last_response_buffer), - "Tempesta did not send first DATA frame after receiving WindowUpdate frame.", - ) - client.stop() - print(util.get_used_memory()) - time.sleep(5) - print(util.get_used_memory())