Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ jobs:
- cp311-cp311
- cp312-cp312
- cp313-cp313
- cp313-cp313t
- cp314-cp314
- cp314-cp314t
permissions:
id-token: write # This is required for requesting the JWT
steps:
Expand All @@ -81,6 +84,9 @@ jobs:
- cp311-cp311
- cp312-cp312
- cp313-cp313
- cp313-cp313t
- cp314-cp314
- cp314-cp314t
permissions:
id-token: write # This is required for requesting the JWT
steps:
Expand Down Expand Up @@ -108,6 +114,7 @@ jobs:
- cp311-cp311
- cp312-cp312
- cp313-cp313
- cp313-cp313t
permissions:
id-token: write # This is required for requesting the JWT
steps:
Expand All @@ -133,6 +140,7 @@ jobs:
- cp311-cp311
- cp312-cp312
- cp313-cp313
- cp313-cp313t
permissions:
id-token: write # This is required for requesting the JWT
steps:
Expand Down
55 changes: 34 additions & 21 deletions awscrt/aio/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from io import BytesIO
from concurrent.futures import Future
from typing import List, Tuple, Optional, Callable, AsyncIterator
import threading


class AIOHttpClientConnectionUnified(HttpClientConnectionBase):
Expand Down Expand Up @@ -328,7 +329,8 @@ class AIOHttpClientStreamUnified(HttpClientStreamBase):
'_completion_future',
'_stream_completed',
'_status_code',
'_loop')
'_loop',
'_deque_lock')

def __init__(self,
connection: AIOHttpClientConnection,
Expand All @@ -347,8 +349,8 @@ def __init__(self,
raise TypeError("loop must be an instance of asyncio.AbstractEventLoop")
self._loop = loop

# deque is thread-safe for appending and popping, so that we don't need
# locks to handle the callbacks from the C thread
# Lock to protect check-then-act sequences on deques for thread safety in free-threaded Python
self._deque_lock = threading.Lock()
self._chunk_futures = deque()
self._received_chunks = deque()
self._stream_completed = False
Expand All @@ -373,12 +375,16 @@ def _on_response(self, status_code: int, name_value_pairs: List[Tuple[str, str]]
self._response_headers_future.set_result(name_value_pairs)

def _on_body(self, chunk: bytes) -> None:
"""Process body chunk on the correct event loop thread."""
if self._chunk_futures:
future = self._chunk_futures.popleft()
future.set_result(chunk)
else:
self._received_chunks.append(chunk)
"""Process body chunk - called from C thread."""
with self._deque_lock:
if self._chunk_futures:
future = self._chunk_futures.popleft()
else:
self._received_chunks.append(chunk)
return

# Set result outside lock (Future is thread-safe)
future.set_result(chunk)

def _on_complete(self, error_code: int) -> None:
"""Set the completion status of the stream."""
Expand All @@ -387,10 +393,14 @@ def _on_complete(self, error_code: int) -> None:
else:
self._completion_future.set_exception(awscrt.exceptions.from_code(error_code))

# Resolve all pending chunk futures with an empty string to indicate end of stream
while self._chunk_futures:
future = self._chunk_futures.popleft()
future.set_result("")
# Resolve all pending chunk futures with lock protection
with self._deque_lock:
pending_futures = list(self._chunk_futures)
self._chunk_futures.clear()

# Set results outside lock (Future is thread-safe)
for future in pending_futures:
future.set_result(b"")

async def _set_request_body_generator(self, body_iterator: AsyncIterator[bytes]):
...
Expand Down Expand Up @@ -418,14 +428,17 @@ async def get_next_response_chunk(self) -> bytes:
bytes: The next chunk of data from the response body.
Returns empty bytes when the stream is completed and no more chunks are left.
"""
if self._received_chunks:
return self._received_chunks.popleft()
elif self._completion_future.done():
return b""
else:
future = Future()
self._chunk_futures.append(future)
return await asyncio.wrap_future(future, loop=self._loop)
with self._deque_lock:
if self._received_chunks:
return self._received_chunks.popleft()
elif self._completion_future.done():
return b""
else:
future = Future()
self._chunk_futures.append(future)

# Await outside lock
return await asyncio.wrap_future(future, loop=self._loop)

async def wait_for_completion(self) -> int:
"""Wait asynchronously for the stream to complete.
Expand Down
14 changes: 12 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
# sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET').
MACOS_DEPLOYMENT_TARGET_MIN = "10.15"

# True if this is a free-threaded Python build.
FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") == 1

# This is the minimum version of the Windows SDK needed for schannel.h with SCH_CREDENTIALS and
# TLS_PARAMETERS. These are required to build Windows Binaries with TLS 1.3 support.
WINDOWS_SDK_VERSION_TLS1_3_SUPPORT = "10.0.17763.0"
Expand Down Expand Up @@ -428,7 +431,10 @@ class bdist_wheel_abi3(bdist_wheel):
def get_tag(self):
python, abi, plat = super().get_tag()
# on CPython, our wheels are abi3 and compatible back to 3.11
if python.startswith("cp") and sys.version_info >= (3, 13):
if FREE_THREADED_BUILD:
# free-threaded builds don't use limited API, so skip abi3 tag
return python, abi, plat
elif python.startswith("cp") and sys.version_info >= (3, 13):
# 3.13 deprecates PyWeakref_GetObject(), adds alternative
return "cp313", "abi3", plat
elif python.startswith("cp") and sys.version_info >= (3, 11):
Expand Down Expand Up @@ -532,7 +538,11 @@ def awscrt_ext():
extra_link_args += ['-Wl,--fatal-warnings']

# prefer building with stable ABI, so a wheel can work with multiple major versions
if sys.version_info >= (3, 13):
if FREE_THREADED_BUILD and sys.version_info[:2] <= (3, 14):
# 3.14 free threaded (aka no gil) does not support limited api.
# disable it for now. 3.15 promises to support limited api + free threading combo
py_limited_api = False
elif sys.version_info >= (3, 13):
# 3.13 deprecates PyWeakref_GetObject(), adds alternative
define_macros.append(('Py_LIMITED_API', '0x030D0000'))
py_limited_api = True
Expand Down
2 changes: 2 additions & 0 deletions source/cbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ static PyObject *s_cbor_encoder_write_pydict(struct aws_cbor_encoder *encoder, P
PyObject *value = NULL;
Py_ssize_t pos = 0;

/* Accessing the pydict without lock, since the cbor_decoder and encoder are not thread-safe. It's user's
* responsibility to not modifying the map from another thread. */
while (PyDict_Next(py_dict, &pos, &key, &value)) {
PyObject *key_result = s_cbor_encoder_write_pyobject(encoder, key);
if (!key_result) {
Expand Down
15 changes: 14 additions & 1 deletion source/http_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,17 @@ static int s_convert_http2_settings(
}

*out_settings = aws_mem_calloc(allocator, py_list_size, sizeof(struct aws_http2_setting));
PyObject *setting_py = NULL;
bool strong_ref = false;

for (Py_ssize_t i = 0; i < py_list_size; i++) {
PyObject *setting_py = PyList_GetItem(initial_settings_py, i);
#ifdef Py_GIL_DISABLED
setting_py = PyList_GetItemRef(initial_settings_py, i);
strong_ref = true;
#else
setting_py = PyList_GetItem(initial_settings_py, i); // Borrowed Reference

#endif
/* Get id attribute */
enum aws_http2_settings_id id = PyObject_GetAttrAsIntEnum(setting_py, "Http2Setting", "id");
if (PyErr_Occurred()) {
Expand All @@ -169,11 +176,17 @@ static int s_convert_http2_settings(
}
(*out_settings)[i].id = id;
(*out_settings)[i].value = value;
if (strong_ref) {
Py_DECREF(setting_py);
}
}

*out_size = (size_t)py_list_size;
return AWS_OP_SUCCESS;
error:
if (setting_py && strong_ref) {
Py_DECREF(setting_py);
}
*out_size = 0;
aws_mem_release(allocator, *out_settings);
*out_settings = NULL;
Expand Down
10 changes: 10 additions & 0 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,13 @@ PyObject *aws_py_weakref_get_ref(PyObject *ref) {
}

int aws_py_gilstate_ensure(PyGILState_STATE *out_state) {
/* If Python >= 3.13 */
#if PY_VERSION_HEX >= 0x030D0000
// Py_IsFinalizing is part of the Stable ABI since version 3.13.
if (AWS_LIKELY(!Py_IsFinalizing())) {
#else
if (AWS_LIKELY(Py_IsInitialized())) {
#endif
*out_state = PyGILState_Ensure();
return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -1023,6 +1029,10 @@ PyMODINIT_FUNC PyInit__awscrt(void) {
return NULL;
}

#ifdef Py_GIL_DISABLED
PyUnstable_Module_SetGIL(m, Py_MOD_GIL_NOT_USED);
#endif

s_init_allocator();

/* Don't report this memory when dumping possible leaks. */
Expand Down
14 changes: 13 additions & 1 deletion source/s3_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,27 @@ PyObject *aws_py_s3_client_new(PyObject *self, PyObject *args) {
network_interface_names =
aws_mem_calloc(allocator, num_network_interface_names, sizeof(struct aws_byte_cursor));
for (size_t i = 0; i < num_network_interface_names; ++i) {
PyObject *str_obj = PyList_GetItem(network_interface_names_py, i); /* Borrowed reference */
bool strong_ref = false;
#ifdef Py_GIL_DISABLED
PyObject *str_obj = PyList_GetItemRef(network_interface_names_py, i);
strong_ref = true;
#else
PyObject *str_obj = PyList_GetItem(network_interface_names_py, i); // Borrowed Reference
#endif
if (!str_obj) {
goto cleanup;
}
network_interface_names[i] = aws_byte_cursor_from_pyunicode(str_obj);
if (network_interface_names[i].ptr == NULL) {
if (strong_ref) {
Py_DECREF(str_obj);
}
PyErr_SetString(PyExc_TypeError, "Expected all network_interface_names elements to be strings.");
goto cleanup;
}
if (strong_ref) {
Py_DECREF(str_obj);
}
}
}
struct aws_s3_file_io_options fio_opts = {
Expand Down
Loading
Loading