From 7d38a392fa81c500af267369754610887d8c7e3f Mon Sep 17 00:00:00 2001 From: Yilei Yang Date: Tue, 24 Feb 2026 00:21:27 +0000 Subject: [PATCH 01/15] Build Python 3.14 free-threaded wheels. --- .github/workflows/ci.yml | 8 ++++++++ setup.py | 14 ++++++++++++-- source/module.c | 4 ++++ 3 files changed, 24 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9e319e063..e1f12cb05 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,6 +56,8 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp314-cp314 + - cp314-cp314t permissions: id-token: write # This is required for requesting the JWT steps: @@ -81,6 +83,8 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp314-cp314 + - cp314-cp314t permissions: id-token: write # This is required for requesting the JWT steps: @@ -108,6 +112,8 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp314-cp314 + - cp314-cp314t permissions: id-token: write # This is required for requesting the JWT steps: @@ -133,6 +139,8 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp314-cp314 + - cp314-cp314t permissions: id-token: write # This is required for requesting the JWT steps: diff --git a/setup.py b/setup.py index 68e0f617b..ae00a7829 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,9 @@ # sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET'). MACOS_DEPLOYMENT_TARGET_MIN = "10.15" +# True if this is a free-threaded Python build. +FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") + # This is the minimum version of the Windows SDK needed for schannel.h with SCH_CREDENTIALS and # TLS_PARAMETERS. These are required to build Windows Binaries with TLS 1.3 support. WINDOWS_SDK_VERSION_TLS1_3_SUPPORT = "10.0.17763.0" @@ -428,7 +431,10 @@ class bdist_wheel_abi3(bdist_wheel): def get_tag(self): python, abi, plat = super().get_tag() # on CPython, our wheels are abi3 and compatible back to 3.11 - if python.startswith("cp") and sys.version_info >= (3, 13): + # but free-threaded builds don't use limited API, so skip abi3 tag + if FREE_THREADED_BUILD: + return python, abi, plat + elif python.startswith("cp") and sys.version_info >= (3, 13): # 3.13 deprecates PyWeakref_GetObject(), adds alternative return "cp313", "abi3", plat elif python.startswith("cp") and sys.version_info >= (3, 11): @@ -532,7 +538,11 @@ def awscrt_ext(): extra_link_args += ['-Wl,--fatal-warnings'] # prefer building with stable ABI, so a wheel can work with multiple major versions - if sys.version_info >= (3, 13): + if FREE_THREADED_BUILD and sys.version_info[:2] == (3, 14): + # 3.14 free threaded (aka no gil) does not support limited api. + # disable it for now. 3.15 promises to support limited api + free threading combo + py_limited_api = False + elif sys.version_info >= (3, 13): # 3.13 deprecates PyWeakref_GetObject(), adds alternative define_macros.append(('Py_LIMITED_API', '0x030D0000')) py_limited_api = True diff --git a/source/module.c b/source/module.c index dd346747e..2c61da64b 100644 --- a/source/module.c +++ b/source/module.c @@ -1013,6 +1013,10 @@ PyMODINIT_FUNC PyInit__awscrt(void) { return NULL; } +#ifdef Py_GIL_DISABLED + PyUnstable_Module_SetGIL(m, Py_MOD_GIL_NOT_USED); +#endif + s_init_allocator(); /* Don't report this memory when dumping possible leaks. */ From 73e606396c76ea83278dbb14aa376c8b60409f73 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Mon, 23 Mar 2026 16:21:11 -0700 Subject: [PATCH 02/15] let's make sure it built with 13t as well --- .github/workflows/ci.yml | 4 ++++ setup.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e1f12cb05..51844a547 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -56,6 +56,7 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp313-cp313t - cp314-cp314 - cp314-cp314t permissions: @@ -83,6 +84,7 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp313-cp313t - cp314-cp314 - cp314-cp314t permissions: @@ -112,6 +114,7 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp313-cp313t - cp314-cp314 - cp314-cp314t permissions: @@ -139,6 +142,7 @@ jobs: - cp311-cp311 - cp312-cp312 - cp313-cp313 + - cp313-cp313t - cp314-cp314 - cp314-cp314t permissions: diff --git a/setup.py b/setup.py index ae00a7829..c271c1da1 100644 --- a/setup.py +++ b/setup.py @@ -431,8 +431,8 @@ class bdist_wheel_abi3(bdist_wheel): def get_tag(self): python, abi, plat = super().get_tag() # on CPython, our wheels are abi3 and compatible back to 3.11 - # but free-threaded builds don't use limited API, so skip abi3 tag if FREE_THREADED_BUILD: + # free-threaded builds don't use limited API, so skip abi3 tag return python, abi, plat elif python.startswith("cp") and sys.version_info >= (3, 13): # 3.13 deprecates PyWeakref_GetObject(), adds alternative @@ -538,7 +538,7 @@ def awscrt_ext(): extra_link_args += ['-Wl,--fatal-warnings'] # prefer building with stable ABI, so a wheel can work with multiple major versions - if FREE_THREADED_BUILD and sys.version_info[:2] == (3, 14): + if FREE_THREADED_BUILD and sys.version_info[:2] <= (3, 14): # 3.14 free threaded (aka no gil) does not support limited api. # disable it for now. 3.15 promises to support limited api + free threading combo py_limited_api = False From 5fe1d5f423b413aa7594e9e4451a832ae51040ce Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 10:40:40 -0700 Subject: [PATCH 03/15] explicitly check for 1 or not --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index c271c1da1..756290d47 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ MACOS_DEPLOYMENT_TARGET_MIN = "10.15" # True if this is a free-threaded Python build. -FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") +FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") == 1 # This is the minimum version of the Windows SDK needed for schannel.h with SCH_CREDENTIALS and # TLS_PARAMETERS. These are required to build Windows Binaries with TLS 1.3 support. @@ -431,8 +431,8 @@ class bdist_wheel_abi3(bdist_wheel): def get_tag(self): python, abi, plat = super().get_tag() # on CPython, our wheels are abi3 and compatible back to 3.11 + # but free-threaded builds don't use limited API, so skip abi3 tag if FREE_THREADED_BUILD: - # free-threaded builds don't use limited API, so skip abi3 tag return python, abi, plat elif python.startswith("cp") and sys.version_info >= (3, 13): # 3.13 deprecates PyWeakref_GetObject(), adds alternative @@ -538,7 +538,7 @@ def awscrt_ext(): extra_link_args += ['-Wl,--fatal-warnings'] # prefer building with stable ABI, so a wheel can work with multiple major versions - if FREE_THREADED_BUILD and sys.version_info[:2] <= (3, 14): + if FREE_THREADED_BUILD and sys.version_info[:2] == (3, 14): # 3.14 free threaded (aka no gil) does not support limited api. # disable it for now. 3.15 promises to support limited api + free threading combo py_limited_api = False From 9b5010bd55cef9814b9a53c456c66f574dd4b2c9 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 10:49:44 -0700 Subject: [PATCH 04/15] Revert "explicitly check for 1 or not" This reverts commit 5fe1d5f423b413aa7594e9e4451a832ae51040ce. --- setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/setup.py b/setup.py index 756290d47..c271c1da1 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ MACOS_DEPLOYMENT_TARGET_MIN = "10.15" # True if this is a free-threaded Python build. -FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") == 1 +FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") # This is the minimum version of the Windows SDK needed for schannel.h with SCH_CREDENTIALS and # TLS_PARAMETERS. These are required to build Windows Binaries with TLS 1.3 support. @@ -431,8 +431,8 @@ class bdist_wheel_abi3(bdist_wheel): def get_tag(self): python, abi, plat = super().get_tag() # on CPython, our wheels are abi3 and compatible back to 3.11 - # but free-threaded builds don't use limited API, so skip abi3 tag if FREE_THREADED_BUILD: + # free-threaded builds don't use limited API, so skip abi3 tag return python, abi, plat elif python.startswith("cp") and sys.version_info >= (3, 13): # 3.13 deprecates PyWeakref_GetObject(), adds alternative @@ -538,7 +538,7 @@ def awscrt_ext(): extra_link_args += ['-Wl,--fatal-warnings'] # prefer building with stable ABI, so a wheel can work with multiple major versions - if FREE_THREADED_BUILD and sys.version_info[:2] == (3, 14): + if FREE_THREADED_BUILD and sys.version_info[:2] <= (3, 14): # 3.14 free threaded (aka no gil) does not support limited api. # disable it for now. 3.15 promises to support limited api + free threading combo py_limited_api = False From 3889cc31cc8e3d1ec6bd8db0c3f3db5bc16ece89 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 10:50:11 -0700 Subject: [PATCH 05/15] check for 1 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c271c1da1..c71b7db08 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ MACOS_DEPLOYMENT_TARGET_MIN = "10.15" # True if this is a free-threaded Python build. -FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") +FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") == 1 # This is the minimum version of the Windows SDK needed for schannel.h with SCH_CREDENTIALS and # TLS_PARAMETERS. These are required to build Windows Binaries with TLS 1.3 support. From 5d186665ec2a19e63df9dea625373ddfd9e39eb5 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 11:49:04 -0700 Subject: [PATCH 06/15] fixing the tests --- test/test_aiohttp_client.py | 295 +++++++++++++++++++--------- test/test_http_client.py | 380 ++++++++++++++++++++++-------------- 2 files changed, 431 insertions(+), 244 deletions(-) diff --git a/test/test_aiohttp_client.py b/test/test_aiohttp_client.py index 757d2dde5..efe31f68f 100644 --- a/test/test_aiohttp_client.py +++ b/test/test_aiohttp_client.py @@ -62,11 +62,13 @@ def do_PUT(self): self.end_headers() -class TestAsyncClient(NativeResourceTest): +class AsyncLocalServerTestBase(NativeResourceTest): + """Base class for async tests that use local HTTP/1.x server""" hostname = 'localhost' timeout = 5 # seconds def _start_server(self, secure, http_1_0=False): + """Start local HTTP server""" # HTTP/1.0 closes the connection at the end of each request # HTTP/1.1 will keep the connection alive if http_1_0: @@ -89,10 +91,14 @@ def _start_server(self, secure, http_1_0=False): self.server_thread.start() def _stop_server(self): + """Stop local HTTP server""" self.server.shutdown() self.server.server_close() self.server_thread.join() + +class TestAsyncClient(AsyncLocalServerTestBase): + async def _new_client_connection(self, secure, proxy_options=None): if secure: tls_ctx_opt = TlsContextOptions() @@ -473,8 +479,8 @@ def test_cross_thread_http2_client(self): asyncio.run(self._test_cross_thread_http2_client()) -@unittest.skipUnless(os.environ.get('AWS_TEST_LOCALHOST'), 'set env var to run test: AWS_TEST_LOCALHOST') -class TestAsyncClientMockServer(NativeResourceTest): +class AsyncMockServerTestBase(NativeResourceTest): + """Base class for async tests that use the H2 mock server""" timeout = 5 # seconds p_server = None mock_server_url = None @@ -499,7 +505,6 @@ def setUp(self): def _wait_for_server_ready(self): """Wait until server is accepting connections.""" max_attempts = 20 - for attempt in range(max_attempts): try: with socket.create_connection(("127.0.0.1", self.mock_server_url.port), timeout=1): @@ -520,6 +525,47 @@ def tearDown(self): self.p_server.kill() super().tearDown() + async def _new_mock_h2_connection( + self, + manual_window_management=False, + initial_window_size=None, + initial_settings=None): + """Create HTTP/2 async client connection to local mock server""" + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + + port = self.mock_server_url.port + if port is None: + port = 443 + + tls_ctx_options = TlsContextOptions() + tls_ctx_options.verify_peer = False + tls_ctx = ClientTlsContext(tls_ctx_options) + tls_conn_opt = tls_ctx.new_connection_options() + tls_conn_opt.set_server_name(self.mock_server_url.hostname) + tls_conn_opt.set_alpn_list(["h2"]) + + if initial_settings is None: + initial_settings = [Http2Setting(Http2SettingID.ENABLE_PUSH, 0)] + + kwargs = { + 'host_name': self.mock_server_url.hostname, + 'port': port, + 'bootstrap': bootstrap, + 'tls_connection_options': tls_conn_opt, + 'initial_settings': initial_settings, + 'manual_window_management': manual_window_management + } + if initial_window_size is not None: + kwargs['initial_window_size'] = initial_window_size + + return await AIOHttp2ClientConnection.new(**kwargs) + + +@unittest.skipUnless(os.environ.get('AWS_TEST_LOCALHOST'), 'set env var to run test: AWS_TEST_LOCALHOST') +class TestAsyncClientMockServer(AsyncMockServerTestBase): + def _on_remote_settings_changed(self, settings): # The mock server has the default settings with # ENABLE_PUSH = 0 @@ -659,91 +705,159 @@ def test_h2_mock_server_settings(self): asyncio.run(self._test_h2_mock_server_settings()) -class AIOFlowControlTest(NativeResourceTest): +class AIOFlowControlTest(AsyncLocalServerTestBase): + """HTTP/1.1 async flow control tests using local server""" timeout = 10.0 + async def _new_h1_client_connection( + self, + secure, + manual_window_management=False, + initial_window_size=None, + read_buffer_capacity=None): + """Create HTTP/1.1 async client connection to local server""" + if secure: + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_conn_opt = tls_ctx.new_connection_options() + tls_conn_opt.set_server_name(self.hostname) + else: + tls_conn_opt = None + + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + + kwargs = { + 'host_name': self.hostname, + 'port': self.port, + 'bootstrap': bootstrap, + 'tls_connection_options': tls_conn_opt, + 'manual_window_management': manual_window_management + } + if initial_window_size is not None: + kwargs['initial_window_size'] = initial_window_size + if read_buffer_capacity is not None: + kwargs['read_buffer_capacity'] = read_buffer_capacity + + return await AIOHttpClientConnection.new(**kwargs) + async def _test_h1_manual_window_management_happy_path(self): """Test HTTP/1.1 manual window management happy path""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['http/1.1'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") - - connection = await AIOHttpClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options, - manual_window_management=True, - initial_window_size=5, - read_buffer_capacity=1000 - ) + self._start_server(secure=True) + try: + connection = await self._new_h1_client_connection( + secure=True, + manual_window_management=True, + initial_window_size=5, + read_buffer_capacity=1000 + ) - request = HttpRequest('GET', '/bytes/10') - request.headers.add('host', 'httpbin.org') - stream = connection.request(request) + # Create a file with known size for testing + test_data = b'0123456789' # 10 bytes + test_file_path = 'test_aio_flow_control_data.txt' + with open(test_file_path, 'wb') as f: + f.write(test_data) - response = Response() - status_code = await response.collect_response(stream) + try: + request = HttpRequest('GET', '/' + test_file_path) + request.headers.add('host', self.hostname) + stream = connection.request(request) - self.assertEqual(200, status_code) - self.assertEqual(10, len(response.body)) - await connection.close() + chunks_received = [] + body = bytearray() + + await stream.get_response_status_code() + while True: + chunk = await stream.get_next_response_chunk() + if not chunk: + break + chunks_received.append(len(chunk)) + body.extend(chunk) + stream.update_window(len(chunk)) + + self.assertEqual(test_data, bytes(body)) + self.assertGreater(len(chunks_received), 0, "No data chunks received") + + await connection.close() + finally: + # Clean up test file + if os.path.exists(test_file_path): + os.remove(test_file_path) + finally: + self._stop_server() def test_h1_manual_window_management_happy_path(self): asyncio.run(self._test_h1_manual_window_management_happy_path()) - async def _test_h2_manual_window_management_happy_path(self): - """Test HTTP/2 manual window management happy path""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['h2'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") + async def _test_h1_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/1.1 stream flow control actually blocks and resumes""" + self._start_server(secure=True) + try: + connection = await self._new_h1_client_connection( + secure=True, + manual_window_management=True, + initial_window_size=1, + read_buffer_capacity=1000 + ) - connection = await AIOHttp2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options, - manual_window_management=True, - initial_window_size=65536 - ) + # Create a file with 100 bytes for testing + test_data = bytes(range(100)) + test_file_path = 'test_aio_flow_control_100.txt' + with open(test_file_path, 'wb') as f: + f.write(test_data) - request = HttpRequest('GET', '/get') - request.headers.add('host', 'httpbin.org') - stream = connection.request(request) + try: + request = HttpRequest('GET', '/' + test_file_path) + request.headers.add('host', self.hostname) + stream = connection.request(request) - response = Response() - status_code = await response.collect_response(stream) + chunks_received = [] + body = bytearray() + + await stream.get_response_status_code() + while True: + chunk = await stream.get_next_response_chunk() + if not chunk: + break + chunks_received.append(len(chunk)) + body.extend(chunk) + stream.update_window(len(chunk)) + + self.assertEqual(test_data, bytes(body)) + # With window=1, we should receive many small chunks + self.assertGreater(len(chunks_received), 1, "Should receive multiple chunks with tiny window") + + await connection.close() + finally: + # Clean up test file + if os.path.exists(test_file_path): + os.remove(test_file_path) + finally: + self._stop_server() - self.assertEqual(200, status_code) - self.assertGreater(len(response.body), 0, "No data received") - await connection.close() + def test_h1_stream_flow_control_blocks_and_resumes(self): + asyncio.run(self._test_h1_stream_flow_control_blocks_and_resumes()) - def test_h2_manual_window_management_happy_path(self): - asyncio.run(self._test_h2_manual_window_management_happy_path()) - async def _test_h2_stream_flow_control_blocks_and_resumes(self): - """Test that stream flow control actually blocks and resumes""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['h2'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") +@unittest.skipUnless(os.environ.get('AWS_TEST_LOCALHOST'), 'set env var to run test: AWS_TEST_LOCALHOST') +class AIOFlowControlH2Test(AsyncMockServerTestBase): + """HTTP/2 async flow control tests using local mock server""" + timeout = 10.0 - connection = await AIOHttp2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options, + async def _test_h2_manual_window_management_happy_path(self): + """Test HTTP/2 manual window management happy path""" + connection = await self._new_mock_h2_connection( manual_window_management=True, - initial_window_size=10 # Tiny window + initial_window_size=65536 ) - request = HttpRequest('GET', '/bytes/100') - request.headers.add('host', 'httpbin.org') + # GET request with x-repeat-data header to download data + request = HttpRequest('GET', self.mock_server_url.path) + request.headers.add('host', self.mock_server_url.hostname) + request.headers.add('x-repeat-data', '100') # Request 100 bytes of data + stream = connection.request(request) chunks_received = [] @@ -758,33 +872,26 @@ async def _test_h2_stream_flow_control_blocks_and_resumes(self): body.extend(chunk) stream.update_window(len(chunk)) - self.assertEqual(100, len(body)) - self.assertEqual(len(chunks_received), 10, "Should receive exactly 10 chunks") + self.assertGreater(len(body), 0, "No response body received") + self.assertGreater(len(chunks_received), 0, "No data chunks received") + await connection.close() - def test_h2_stream_flow_control_blocks_and_resumes(self): - asyncio.run(self._test_h2_stream_flow_control_blocks_and_resumes()) + def test_h2_manual_window_management_happy_path(self): + asyncio.run(self._test_h2_manual_window_management_happy_path()) - async def _test_h1_stream_flow_control_blocks_and_resumes(self): - """Test that HTTP/1.1 stream flow control actually blocks and resumes""" - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['http/1.1'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - tls_options = tls_ctx.new_connection_options() - tls_options.set_server_name("httpbin.org") - - connection = await AIOHttpClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=tls_options, + async def _test_h2_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/2 stream flow control actually blocks and resumes""" + connection = await self._new_mock_h2_connection( manual_window_management=True, - initial_window_size=1, # Tiny window - read_buffer_capacity=1000 + initial_window_size=10 # Small window to force multiple chunks ) - request = HttpRequest('GET', '/bytes/100') - request.headers.add('host', 'httpbin.org') + # GET request with x-repeat-data header to download data + request = HttpRequest('GET', self.mock_server_url.path) + request.headers.add('host', self.mock_server_url.hostname) + request.headers.add('x-repeat-data', '100') # Request 100 bytes of data + stream = connection.request(request) chunks_received = [] @@ -799,12 +906,14 @@ async def _test_h1_stream_flow_control_blocks_and_resumes(self): body.extend(chunk) stream.update_window(len(chunk)) - self.assertEqual(100, len(body)) - self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") + self.assertGreater(len(body), 0, "No response body received") + # With small window, we should receive multiple chunks + self.assertGreater(len(chunks_received), 1, "Should receive multiple chunks with small window") + await connection.close() - def test_h1_stream_flow_control_blocks_and_resumes(self): - asyncio.run(self._test_h1_stream_flow_control_blocks_and_resumes()) + def test_h2_stream_flow_control_blocks_and_resumes(self): + asyncio.run(self._test_h2_stream_flow_control_blocks_and_resumes()) if __name__ == '__main__': diff --git a/test/test_http_client.py b/test/test_http_client.py index a8c217698..e0bbbb941 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -48,11 +48,13 @@ def do_PUT(self): self.end_headers() -class TestClient(NativeResourceTest): +class LocalServerTestBase(NativeResourceTest): + """Base class for tests that use local HTTP/1.x server""" hostname = 'localhost' timeout = 5 # seconds def _start_server(self, secure, http_1_0=False): + """Start local HTTP server""" # HTTP/1.0 closes the connection at the end of each request # HTTP/1.1 will keep the connection alive if http_1_0: @@ -74,10 +76,14 @@ def _start_server(self, secure, http_1_0=False): self.server_thread.start() def _stop_server(self): + """Stop local HTTP server""" self.server.shutdown() self.server.server_close() self.server_thread.join() + +class TestClient(LocalServerTestBase): + def _new_client_connection(self, secure, proxy_options=None, cipher_pref=TlsCipherPref.DEFAULT): if secure: tls_ctx_opt = TlsContextOptions() @@ -402,16 +408,15 @@ def test_connect_pq_default(self): self._test_connect(secure=True, cipher_pref=TlsCipherPref.PQ_DEFAULT) -@unittest.skipUnless(os.environ.get('AWS_TEST_LOCALHOST'), 'set env var to run test: AWS_TEST_LOCALHOST') -class TestClientMockServer(NativeResourceTest): - +class MockServerTestBase(NativeResourceTest): + """Base class for tests that use the H2 mock server""" timeout = 5 # seconds p_server = None mock_server_url = None def setUp(self): super().setUp() - # Start the mock server from the aws-c-http. + # Start the mock server from the aws-c-http server_path = os.path.join( os.path.dirname(__file__), '..', @@ -429,7 +434,6 @@ def setUp(self): def _wait_for_server_ready(self): """Wait until server is accepting connections.""" max_attempts = 20 - for attempt in range(max_attempts): try: with socket.create_connection(("127.0.0.1", self.mock_server_url.port), timeout=1): @@ -450,13 +454,49 @@ def tearDown(self): self.p_server.kill() super().tearDown() + def _new_mock_h2_connection(self, manual_window_management=False, initial_window_size=None, initial_settings=None): + """Create HTTP/2 client connection to local mock server""" + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + + port = self.mock_server_url.port + if port is None: + port = 443 + + tls_ctx_options = TlsContextOptions() + tls_ctx_options.verify_peer = False + tls_ctx = ClientTlsContext(tls_ctx_options) + tls_conn_opt = tls_ctx.new_connection_options() + tls_conn_opt.set_server_name(self.mock_server_url.hostname) + tls_conn_opt.set_alpn_list(["h2"]) + + if initial_settings is None: + initial_settings = [Http2Setting(Http2SettingID.ENABLE_PUSH, 0)] + + kwargs = { + 'host_name': self.mock_server_url.hostname, + 'port': port, + 'bootstrap': bootstrap, + 'tls_connection_options': tls_conn_opt, + 'initial_settings': initial_settings, + 'manual_window_management': manual_window_management + } + if initial_window_size is not None: + kwargs['initial_window_size'] = initial_window_size + + connection_future = Http2ClientConnection.new(**kwargs) + return connection_future.result(self.timeout) + + +@unittest.skipUnless(os.environ.get('AWS_TEST_LOCALHOST'), 'set env var to run test: AWS_TEST_LOCALHOST') +class TestClientMockServer(MockServerTestBase): + def _on_remote_settings_changed(self, settings): # The mock server has the default settings with # ENABLE_PUSH = 0 # MAX_CONCURRENT_STREAMS = 100 # MAX_HEADER_LIST_SIZE = 2**16 - # using h2@4.1.0, code can be found in - # https://github.com/python-hyper/h2/blob/191ac06e0949fcfe3367b06eeb101a5a1a335964/src/h2/connection.py#L340-L359 # Check the settings here self.assertEqual(len(settings), 3) for i in settings: @@ -468,31 +508,32 @@ def _on_remote_settings_changed(self, settings): self.assertEqual(i.value, 2**16) def _new_mock_connection(self, initial_settings=None): + """Create connection with remote settings callback""" + kwargs = {'initial_settings': initial_settings} + connection_future = Http2ClientConnection.new( + host_name=self.mock_server_url.hostname, + port=self.mock_server_url.port if self.mock_server_url.port else 443, + bootstrap=self._create_client_bootstrap(), + tls_connection_options=self._create_tls_connection_options(), + initial_settings=initial_settings if initial_settings else [Http2Setting(Http2SettingID.ENABLE_PUSH, 0)], + on_remote_settings_changed=self._on_remote_settings_changed) + return connection_future.result(self.timeout) + def _create_client_bootstrap(self): + """Create client bootstrap""" event_loop_group = EventLoopGroup() host_resolver = DefaultHostResolver(event_loop_group) - bootstrap = ClientBootstrap(event_loop_group, host_resolver) + return ClientBootstrap(event_loop_group, host_resolver) - port = self.mock_server_url.port - # only test https - if port is None: - port = 443 + def _create_tls_connection_options(self): + """Create TLS connection options for mock server""" tls_ctx_options = TlsContextOptions() - tls_ctx_options.verify_peer = False # allow localhost + tls_ctx_options.verify_peer = False tls_ctx = ClientTlsContext(tls_ctx_options) tls_conn_opt = tls_ctx.new_connection_options() tls_conn_opt.set_server_name(self.mock_server_url.hostname) tls_conn_opt.set_alpn_list(["h2"]) - if initial_settings is None: - initial_settings = [Http2Setting(Http2SettingID.ENABLE_PUSH, 0)] - - connection_future = Http2ClientConnection.new(host_name=self.mock_server_url.hostname, - port=port, - bootstrap=bootstrap, - tls_connection_options=tls_conn_opt, - initial_settings=initial_settings, - on_remote_settings_changed=self._on_remote_settings_changed) - return connection_future.result(self.timeout) + return tls_conn_opt def test_h2_mock_server_manual_write(self): connection = self._new_mock_connection() @@ -652,177 +693,214 @@ def on_body(self, http_stream, chunk, **kwargs): self.body.extend(chunk) -class FlowControlTest(NativeResourceTest): +class FlowControlTest(LocalServerTestBase): + """HTTP/1.1 flow control tests using local server""" timeout = 10.0 - def setUp(self): - super().setUp() - tls_ctx_opt = TlsContextOptions() - tls_ctx_opt.verify_peer = False - tls_ctx_opt.alpn_list = ['h2', 'http/1.1'] - tls_ctx = ClientTlsContext(tls_ctx_opt) - self.tls_options = tls_ctx.new_connection_options() - self.tls_options.set_server_name("httpbin.org") + def _new_h1_client_connection( + self, + secure, + manual_window_management=False, + initial_window_size=None, + read_buffer_capacity=None): + """Create HTTP/1.1 client connection to local server""" + if secure: + tls_ctx_opt = TlsContextOptions() + tls_ctx_opt.verify_peer = False + tls_ctx = ClientTlsContext(tls_ctx_opt) + tls_conn_opt = tls_ctx.new_connection_options() + tls_conn_opt.set_server_name(self.hostname) + else: + tls_conn_opt = None + + event_loop_group = EventLoopGroup() + host_resolver = DefaultHostResolver(event_loop_group) + bootstrap = ClientBootstrap(event_loop_group, host_resolver) + + kwargs = { + 'host_name': self.hostname, + 'port': self.port, + 'bootstrap': bootstrap, + 'tls_connection_options': tls_conn_opt, + 'manual_window_management': manual_window_management + } + if initial_window_size is not None: + kwargs['initial_window_size'] = initial_window_size + if read_buffer_capacity is not None: + kwargs['read_buffer_capacity'] = read_buffer_capacity + + connection_future = HttpClientConnection.new(**kwargs) + return connection_future.result(self.timeout) def test_h1_manual_window_management_happy_path(self): """Test HTTP/1.1 manual window management happy path""" - connection_future = HttpClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=self.tls_options, - manual_window_management=True, - initial_window_size=5, - read_buffer_capacity=1000 - ) - + self._start_server(secure=True) try: - connection = connection_future.result(timeout=self.timeout) - request = HttpRequest('GET', '/bytes/10') - request.headers.add('host', 'httpbin.org') + connection = self._new_h1_client_connection( + secure=True, + manual_window_management=True, + initial_window_size=5, + read_buffer_capacity=1000 + ) + + # Create a file with known size for testing + test_data = b'0123456789' # 10 bytes + test_file_path = 'test_flow_control_data.txt' + with open(test_file_path, 'wb') as f: + f.write(test_data) - response = Response() - received_chunks = [] - window_updates_sent = [] + try: + request = HttpRequest('GET', '/' + test_file_path) + request.headers.add('host', self.hostname) - def on_body_with_window_update(http_stream, chunk, **kwargs): - received_chunks.append(len(chunk)) - response.body.extend(chunk) - if hasattr(http_stream, '_binding') and http_stream._binding: + response = Response() + received_chunks = [] + window_updates_sent = [] + + def on_body_with_window_update(http_stream, chunk, **kwargs): + received_chunks.append(len(chunk)) + response.body.extend(chunk) http_stream.update_window(len(chunk)) window_updates_sent.append(len(chunk)) - stream = connection.request(request, response.on_response, on_body_with_window_update) - stream.activate() - stream_completion_result = stream.completion_future.result(timeout=self.timeout) - - self.assertEqual(200, response.status_code) - self.assertEqual(200, stream_completion_result) - self.assertEqual(10, len(response.body)) + stream = connection.request(request, response.on_response, on_body_with_window_update) + stream.activate() + stream_completion_result = stream.completion_future.result(timeout=self.timeout) - if len(response.body) > 0: + self.assertEqual(200, response.status_code) + self.assertEqual(200, stream_completion_result) + self.assertEqual(test_data, bytes(response.body)) self.assertGreater(len(received_chunks), 0, "No data chunks received") self.assertGreater(len(window_updates_sent), 0, "No window updates sent") self.assertEqual(sum(received_chunks), sum(window_updates_sent), "Window updates don't match received data") - connection.close() - except Exception as e: - self.skipTest(f"HTTP/1.1 flow control test skipped due to connection issue: {e}") - - def test_h2_manual_window_management_happy_path(self): - """Test HTTP/2 manual window management happy path""" - connection_future = Http2ClientConnection.new( - host_name="nghttp2.org", - port=443, - tls_connection_options=self.tls_options, - manual_window_management=True, - initial_window_size=65536 - ) + connection.close() + finally: + # Clean up test file + if os.path.exists(test_file_path): + os.remove(test_file_path) + finally: + self._stop_server() + def test_h1_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/1.1 stream flow control actually blocks and resumes""" + self._start_server(secure=True) try: - connection = connection_future.result(timeout=self.timeout) - request = HttpRequest('GET', '/httpbin/get') - request.headers.add('host', 'nghttp2.org') + connection = self._new_h1_client_connection( + secure=True, + manual_window_management=True, + initial_window_size=1, + read_buffer_capacity=1000 + ) + + # Create a file with 100 bytes for testing + test_data = bytes(range(100)) + test_file_path = 'test_flow_control_100.txt' + with open(test_file_path, 'wb') as f: + f.write(test_data) - response = Response() - received_chunks = [] - window_updates_sent = [] + try: + request = HttpRequest('GET', '/' + test_file_path) + request.headers.add('host', self.hostname) + + response = Response() + chunks_received = [] - def on_body_with_window_update(http_stream, chunk, **kwargs): - received_chunks.append(len(chunk)) - response.body.extend(chunk) - if hasattr(http_stream, '_binding') and http_stream._binding: + def on_body(http_stream, chunk, **kwargs): + chunks_received.append(len(chunk)) + response.body.extend(chunk) http_stream.update_window(len(chunk)) - window_updates_sent.append(len(chunk)) - stream = connection.request(request, response.on_response, on_body_with_window_update) - stream.activate() - stream_completion_result = stream.completion_future.result(timeout=self.timeout) + stream = connection.request(request, response.on_response, on_body) + stream.activate() + stream.completion_future.result(timeout=self.timeout) - self.assertEqual(200, response.status_code) - self.assertEqual(200, stream_completion_result) - self.assertGreater(len(received_chunks), 0, "No data chunks received") - self.assertGreater(len(window_updates_sent), 0, "No window updates sent") + self.assertEqual(test_data, bytes(response.body)) + # With window=1, we should receive many small chunks + self.assertGreater(len(chunks_received), 1, "Should receive multiple chunks with tiny window") - connection.close() - except Exception as e: - self.skipTest(f"HTTP/2 flow control test skipped due to connection issue: {e}") + connection.close() + finally: + # Clean up test file + if os.path.exists(test_file_path): + os.remove(test_file_path) + finally: + self._stop_server() - def test_h2_stream_flow_control_blocks_and_resumes(self): - """Test that stream flow control actually blocks and resumes""" - connection_future = Http2ClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=self.tls_options, + +@unittest.skipUnless(os.environ.get('AWS_TEST_LOCALHOST'), 'set env var to run test: AWS_TEST_LOCALHOST') +class FlowControlH2Test(MockServerTestBase): + """HTTP/2 flow control tests using local mock server""" + timeout = 10.0 + + def test_h2_manual_window_management_happy_path(self): + """Test HTTP/2 manual window management happy path""" + connection = self._new_mock_h2_connection( manual_window_management=True, - initial_window_size=1 # Tiny window - will block immediately + initial_window_size=65536 ) - try: - connection = connection_future.result(timeout=self.timeout) - request = HttpRequest('GET', '/bytes/100') - request.headers.add('host', 'httpbin.org') + # GET request with x-repeat-data header to download data + request = HttpRequest('GET', self.mock_server_url.path) + request.headers.add('host', self.mock_server_url.hostname) + request.headers.add('x-repeat-data', '100') # Request 100 bytes of data - response = Response() - chunks_received = [] + response = Response() + received_chunks = [] + window_updates_sent = [] - def on_body(http_stream, chunk, **kwargs): - chunks_received.append(len(chunk)) - response.body.extend(chunk) - # Update window to allow more data - http_stream.update_window(len(chunk)) + def on_body_with_window_update(http_stream, chunk, **kwargs): + received_chunks.append(len(chunk)) + response.body.extend(chunk) + http_stream.update_window(len(chunk)) + window_updates_sent.append(len(chunk)) - stream = connection.request(request, response.on_response, on_body) - stream.activate() - stream.completion_future.result(timeout=self.timeout) + stream = connection.request(request, response.on_response, on_body_with_window_update) + stream.activate() - self.assertEqual(100, len(response.body)) - # With window=10, we should receive many small chunks - self.assertEqual(len(chunks_received), 100, "Expected multiple chunks with tiny window") + stream_completion_result = stream.completion_future.result(timeout=self.timeout) - connection.close() - except Exception as e: - self.skipTest(f"HTTP/2 flow control test skipped: {e}") + self.assertEqual(200, response.status_code) + self.assertEqual(200, stream_completion_result) + self.assertGreater(len(response.body), 0, "No response body received") + self.assertGreater(len(received_chunks), 0, "No data chunks received") + self.assertGreater(len(window_updates_sent), 0, "No window updates sent") - def test_h1_stream_flow_control_blocks_and_resumes(self): - """Test that HTTP/1.1 stream flow control actually blocks and resumes""" - connection_future = HttpClientConnection.new( - host_name="httpbin.org", - port=443, - tls_connection_options=self.tls_options, + connection.close() + + def test_h2_stream_flow_control_blocks_and_resumes(self): + """Test that HTTP/2 stream flow control actually blocks and resumes""" + connection = self._new_mock_h2_connection( manual_window_management=True, - initial_window_size=1, # Tiny window - read_buffer_capacity=1000 + initial_window_size=10 # Small window to force multiple chunks ) - try: - connection = connection_future.result(timeout=self.timeout) - request = HttpRequest('GET', '/bytes/100') - request.headers.add('host', 'httpbin.org') + # GET request with x-repeat-data header to download data + request = HttpRequest('GET', self.mock_server_url.path) + request.headers.add('host', self.mock_server_url.hostname) + request.headers.add('x-repeat-data', '100') # Request 100 bytes of data - response = Response() - chunks_received = [] + response = Response() + chunks_received = [] - def on_body(http_stream, chunk, **kwargs): - chunks_received.append(len(chunk)) - response.body.extend(chunk) - http_stream.update_window(len(chunk)) + def on_body(http_stream, chunk, **kwargs): + chunks_received.append(len(chunk)) + response.body.extend(chunk) + # Update window to allow more data + http_stream.update_window(len(chunk)) - stream = connection.request(request, response.on_response, on_body) - stream.activate() - stream.completion_future.result(timeout=self.timeout) + stream = connection.request(request, response.on_response, on_body) + stream.activate() - self.assertEqual(100, len(response.body)) - # With window=1, we should receive many small chunks - self.assertEqual(len(chunks_received), 100, "Should receive exactly 100 chunks") + stream.completion_future.result(timeout=self.timeout) - connection.close() - except Exception as e: - self.skipTest(f"HTTP/1.1 flow control test skipped: {e}") + self.assertEqual(200, response.status_code) + self.assertGreater(len(response.body), 0, "No response body received") + # With small window, we should receive multiple chunks + self.assertGreater(len(chunks_received), 1, "Should receive multiple chunks with small window") - def tearDown(self): - self.tls_options = None - super().tearDown() + connection.close() if __name__ == '__main__': From 7b26d91f1162b93b5e131dca1b1d973df67c4b2b Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 13:13:50 -0700 Subject: [PATCH 07/15] kickoff again From 2cc6f1bd682e3fbabb5cf239d052a016a5471891 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 13:45:24 -0700 Subject: [PATCH 08/15] i don't know, but that seems to be the only thing different than it's doc --- source/module.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/module.c b/source/module.c index 799115979..d22076081 100644 --- a/source/module.c +++ b/source/module.c @@ -619,7 +619,13 @@ PyObject *aws_py_weakref_get_ref(PyObject *ref) { } int aws_py_gilstate_ensure(PyGILState_STATE *out_state) { + /* If Python >= 3.13 */ +#if PY_VERSION_HEX >= 0x030D0000 + // Py_IsFinalizing is part of the Stable ABI since version 3.13. + if (AWS_LIKELY(Py_IsFinalizing())) { +#else if (AWS_LIKELY(Py_IsInitialized())) { +#endif *out_state = PyGILState_Ensure(); return AWS_OP_SUCCESS; } From 1f709e784ff3319500b818bf9a009d76899f859e Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 14:00:20 -0700 Subject: [PATCH 09/15] eh, if not --- source/module.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/module.c b/source/module.c index d22076081..1691a92a6 100644 --- a/source/module.c +++ b/source/module.c @@ -621,8 +621,10 @@ PyObject *aws_py_weakref_get_ref(PyObject *ref) { int aws_py_gilstate_ensure(PyGILState_STATE *out_state) { /* If Python >= 3.13 */ #if PY_VERSION_HEX >= 0x030D0000 + printf("NEW PYTHON!!!!!\n"); // Py_IsFinalizing is part of the Stable ABI since version 3.13. - if (AWS_LIKELY(Py_IsFinalizing())) { + if (AWS_LIKELY(!Py_IsFinalizing())) { + printf("NEW Py_IsFinalizing!!!!!\n"); #else if (AWS_LIKELY(Py_IsInitialized())) { #endif From f387afd34773e46c95beca1b0fe21a5d5f38b60a Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 15:05:08 -0700 Subject: [PATCH 10/15] add a lock --- awscrt/aio/http.py | 55 ++++++++++++++++++++++++++++------------------ source/module.c | 2 -- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/awscrt/aio/http.py b/awscrt/aio/http.py index 6cf26836e..c267638bb 100644 --- a/awscrt/aio/http.py +++ b/awscrt/aio/http.py @@ -22,6 +22,7 @@ from io import BytesIO from concurrent.futures import Future from typing import List, Tuple, Optional, Callable, AsyncIterator +import threading class AIOHttpClientConnectionUnified(HttpClientConnectionBase): @@ -328,7 +329,8 @@ class AIOHttpClientStreamUnified(HttpClientStreamBase): '_completion_future', '_stream_completed', '_status_code', - '_loop') + '_loop', + '_deque_lock') def __init__(self, connection: AIOHttpClientConnection, @@ -347,8 +349,8 @@ def __init__(self, raise TypeError("loop must be an instance of asyncio.AbstractEventLoop") self._loop = loop - # deque is thread-safe for appending and popping, so that we don't need - # locks to handle the callbacks from the C thread + # Lock to protect check-then-act sequences on deques for thread safety in free-threaded Python + self._deque_lock = threading.Lock() self._chunk_futures = deque() self._received_chunks = deque() self._stream_completed = False @@ -373,12 +375,16 @@ def _on_response(self, status_code: int, name_value_pairs: List[Tuple[str, str]] self._response_headers_future.set_result(name_value_pairs) def _on_body(self, chunk: bytes) -> None: - """Process body chunk on the correct event loop thread.""" - if self._chunk_futures: - future = self._chunk_futures.popleft() - future.set_result(chunk) - else: - self._received_chunks.append(chunk) + """Process body chunk - called from C thread.""" + with self._deque_lock: + if self._chunk_futures: + future = self._chunk_futures.popleft() + else: + self._received_chunks.append(chunk) + return + + # Set result outside lock (Future is thread-safe) + future.set_result(chunk) def _on_complete(self, error_code: int) -> None: """Set the completion status of the stream.""" @@ -387,10 +393,14 @@ def _on_complete(self, error_code: int) -> None: else: self._completion_future.set_exception(awscrt.exceptions.from_code(error_code)) - # Resolve all pending chunk futures with an empty string to indicate end of stream - while self._chunk_futures: - future = self._chunk_futures.popleft() - future.set_result("") + # Resolve all pending chunk futures with lock protection + with self._deque_lock: + pending_futures = list(self._chunk_futures) + self._chunk_futures.clear() + + # Set results outside lock (Future is thread-safe) + for future in pending_futures: + future.set_result(b"") async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]): ... @@ -418,14 +428,17 @@ async def get_next_response_chunk(self) -> bytes: bytes: The next chunk of data from the response body. Returns empty bytes when the stream is completed and no more chunks are left. """ - if self._received_chunks: - return self._received_chunks.popleft() - elif self._completion_future.done(): - return b"" - else: - future = Future() - self._chunk_futures.append(future) - return await asyncio.wrap_future(future, loop=self._loop) + with self._deque_lock: + if self._received_chunks: + return self._received_chunks.popleft() + elif self._completion_future.done(): + return b"" + else: + future = Future() + self._chunk_futures.append(future) + + # Await outside lock + return await asyncio.wrap_future(future, loop=self._loop) async def wait_for_completion(self) -> int: """Wait asynchronously for the stream to complete. diff --git a/source/module.c b/source/module.c index 1691a92a6..c2a0c111f 100644 --- a/source/module.c +++ b/source/module.c @@ -621,10 +621,8 @@ PyObject *aws_py_weakref_get_ref(PyObject *ref) { int aws_py_gilstate_ensure(PyGILState_STATE *out_state) { /* If Python >= 3.13 */ #if PY_VERSION_HEX >= 0x030D0000 - printf("NEW PYTHON!!!!!\n"); // Py_IsFinalizing is part of the Stable ABI since version 3.13. if (AWS_LIKELY(!Py_IsFinalizing())) { - printf("NEW Py_IsFinalizing!!!!!\n"); #else if (AWS_LIKELY(Py_IsInitialized())) { #endif From d8f3197a25cebb713aca29e8d4b6c820c0be793a Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 15:12:59 -0700 Subject: [PATCH 11/15] musl 1-1 is end of life, doesn't have python3.14 --- .github/workflows/ci.yml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 51844a547..d2f683f00 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -115,8 +115,6 @@ jobs: - cp312-cp312 - cp313-cp313 - cp313-cp313t - - cp314-cp314 - - cp314-cp314t permissions: id-token: write # This is required for requesting the JWT steps: @@ -143,8 +141,6 @@ jobs: - cp312-cp312 - cp313-cp313 - cp313-cp313t - - cp314-cp314 - - cp314-cp314t permissions: id-token: write # This is required for requesting the JWT steps: From 113388d64393bb4553456338fb981891b2d172cc Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 16:01:53 -0700 Subject: [PATCH 12/15] for the sake of being totally safe --- source/cbor.c | 2 ++ source/http_connection.c | 7 ++++++- source/s3_client.c | 4 +++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/cbor.c b/source/cbor.c index 602ccb21d..57a966602 100644 --- a/source/cbor.c +++ b/source/cbor.c @@ -160,6 +160,8 @@ static PyObject *s_cbor_encoder_write_pydict(struct aws_cbor_encoder *encoder, P PyObject *value = NULL; Py_ssize_t pos = 0; + /* Accessing the pydict without lock, since the cbor_decoder and encoder are not thread-safe. It's user's + * responsibility to not modifying the map from another thread. */ while (PyDict_Next(py_dict, &pos, &key, &value)) { PyObject *key_result = s_cbor_encoder_write_pyobject(encoder, key); if (!key_result) { diff --git a/source/http_connection.c b/source/http_connection.c index 656cf8500..b243dda5b 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -152,9 +152,10 @@ static int s_convert_http2_settings( } *out_settings = aws_mem_calloc(allocator, py_list_size, sizeof(struct aws_http2_setting)); + PyObject *setting_py = NULL; for (Py_ssize_t i = 0; i < py_list_size; i++) { - PyObject *setting_py = PyList_GetItem(initial_settings_py, i); + setting_py = PyList_GetItemRef(initial_settings_py, i); /* Get id attribute */ enum aws_http2_settings_id id = PyObject_GetAttrAsIntEnum(setting_py, "Http2Setting", "id"); @@ -169,11 +170,15 @@ static int s_convert_http2_settings( } (*out_settings)[i].id = id; (*out_settings)[i].value = value; + Py_DECREF(setting_py); } *out_size = (size_t)py_list_size; return AWS_OP_SUCCESS; error: + if (setting_py) { + Py_DECREF(setting_py); + } *out_size = 0; aws_mem_release(allocator, *out_settings); *out_settings = NULL; diff --git a/source/s3_client.c b/source/s3_client.c index bc36760bf..0411d256d 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -365,15 +365,17 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { network_interface_names = aws_mem_calloc(allocator, num_network_interface_names, sizeof(struct aws_byte_cursor)); for (size_t i = 0; i < num_network_interface_names; ++i) { - PyObject *str_obj = PyList_GetItem(network_interface_names_py, i); /* Borrowed reference */ + PyObject *str_obj = PyList_GetItemRef(network_interface_names_py, i); if (!str_obj) { goto cleanup; } network_interface_names[i] = aws_byte_cursor_from_pyunicode(str_obj); if (network_interface_names[i].ptr == NULL) { + Py_DECREF(str_object); PyErr_SetString(PyExc_TypeError, "Expected all network_interface_names elements to be strings."); goto cleanup; } + Py_DECREF(str_object); } } struct aws_s3_file_io_options fio_opts = { From 2cdddbb4c22a634c8ce713191b748374d4757fc5 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 16:11:01 -0700 Subject: [PATCH 13/15] awkward --- source/http_connection.c | 12 ++++++++++-- source/s3_client.c | 15 +++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/source/http_connection.c b/source/http_connection.c index b243dda5b..6a000fe40 100644 --- a/source/http_connection.c +++ b/source/http_connection.c @@ -153,10 +153,16 @@ static int s_convert_http2_settings( *out_settings = aws_mem_calloc(allocator, py_list_size, sizeof(struct aws_http2_setting)); PyObject *setting_py = NULL; + bool strong_ref = false; for (Py_ssize_t i = 0; i < py_list_size; i++) { +#ifdef Py_GIL_DISABLED setting_py = PyList_GetItemRef(initial_settings_py, i); + strong_ref = true; +#else + setting_py = PyList_GetItem(initial_settings_py, i); // Borrowed Reference +#endif /* Get id attribute */ enum aws_http2_settings_id id = PyObject_GetAttrAsIntEnum(setting_py, "Http2Setting", "id"); if (PyErr_Occurred()) { @@ -170,13 +176,15 @@ static int s_convert_http2_settings( } (*out_settings)[i].id = id; (*out_settings)[i].value = value; - Py_DECREF(setting_py); + if (strong_ref) { + Py_DECREF(setting_py); + } } *out_size = (size_t)py_list_size; return AWS_OP_SUCCESS; error: - if (setting_py) { + if (setting_py && strong_ref) { Py_DECREF(setting_py); } *out_size = 0; diff --git a/source/s3_client.c b/source/s3_client.c index 0411d256d..22aacfd53 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -365,17 +365,28 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { network_interface_names = aws_mem_calloc(allocator, num_network_interface_names, sizeof(struct aws_byte_cursor)); for (size_t i = 0; i < num_network_interface_names; ++i) { + bool strong_ref = false; +#ifdef Py_GIL_DISABLED PyObject *str_obj = PyList_GetItemRef(network_interface_names_py, i); + strong_ref = true; +#else + PyObject *str_obj = PyList_GetItem(network_interface_names_py, i); // Borrowed Reference + +#endif if (!str_obj) { goto cleanup; } network_interface_names[i] = aws_byte_cursor_from_pyunicode(str_obj); if (network_interface_names[i].ptr == NULL) { - Py_DECREF(str_object); + if (strong_ref) { + Py_DECREF(str_object); + } PyErr_SetString(PyExc_TypeError, "Expected all network_interface_names elements to be strings."); goto cleanup; } - Py_DECREF(str_object); + if (strong_ref) { + Py_DECREF(str_object); + } } } struct aws_s3_file_io_options fio_opts = { From e9c3fcf39a9065f7b58c648cfb563d45270fb2b7 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 16:14:16 -0700 Subject: [PATCH 14/15] fix typo --- source/s3_client.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/s3_client.c b/source/s3_client.c index 22aacfd53..a6f93203d 100644 --- a/source/s3_client.c +++ b/source/s3_client.c @@ -371,7 +371,6 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { strong_ref = true; #else PyObject *str_obj = PyList_GetItem(network_interface_names_py, i); // Borrowed Reference - #endif if (!str_obj) { goto cleanup; @@ -379,13 +378,13 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) { network_interface_names[i] = aws_byte_cursor_from_pyunicode(str_obj); if (network_interface_names[i].ptr == NULL) { if (strong_ref) { - Py_DECREF(str_object); + Py_DECREF(str_obj); } PyErr_SetString(PyExc_TypeError, "Expected all network_interface_names elements to be strings."); goto cleanup; } if (strong_ref) { - Py_DECREF(str_object); + Py_DECREF(str_obj); } } } From ba7434f5af1c49a34f9425a43978595b27df19dd Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Tue, 24 Mar 2026 16:48:03 -0700 Subject: [PATCH 15/15] we should wait for the connection close to finish --- test/test_http_client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/test_http_client.py b/test/test_http_client.py index e0bbbb941..b195daf74 100644 --- a/test/test_http_client.py +++ b/test/test_http_client.py @@ -775,7 +775,7 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): self.assertEqual(sum(received_chunks), sum(window_updates_sent), "Window updates don't match received data") - connection.close() + self.assertEqual(None, connection.close().result(self.timeout)) finally: # Clean up test file if os.path.exists(test_file_path): @@ -820,7 +820,7 @@ def on_body(http_stream, chunk, **kwargs): # With window=1, we should receive many small chunks self.assertGreater(len(chunks_received), 1, "Should receive multiple chunks with tiny window") - connection.close() + self.assertEqual(None, connection.close().result(self.timeout)) finally: # Clean up test file if os.path.exists(test_file_path): @@ -867,7 +867,7 @@ def on_body_with_window_update(http_stream, chunk, **kwargs): self.assertGreater(len(received_chunks), 0, "No data chunks received") self.assertGreater(len(window_updates_sent), 0, "No window updates sent") - connection.close() + self.assertEqual(None, connection.close().result(self.timeout)) def test_h2_stream_flow_control_blocks_and_resumes(self): """Test that HTTP/2 stream flow control actually blocks and resumes""" @@ -900,7 +900,7 @@ def on_body(http_stream, chunk, **kwargs): # With small window, we should receive multiple chunks self.assertGreater(len(chunks_received), 1, "Should receive multiple chunks with small window") - connection.close() + self.assertEqual(None, connection.close().result(self.timeout)) if __name__ == '__main__':