Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ docs/plans/
# CodeQL
.codeql-db
codeql-results.sarif

# ralphex progress logs
.ralphex/progress/
7 changes: 6 additions & 1 deletion emails/backend/smtp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@

from .backend import SMTPBackend
from .backend import SMTPBackend

try:
from .aio_backend import AsyncSMTPBackend
except ImportError:
pass
152 changes: 152 additions & 0 deletions emails/backend/smtp/aio_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
from __future__ import annotations

import asyncio
import logging
from typing import Any

import aiosmtplib

from ..response import SMTPResponse
from .aio_client import AsyncSMTPClientWithResponse
from ...utils import DNS_NAME
from .exceptions import SMTPConnectNetworkError


__all__ = ['AsyncSMTPBackend']

logger = logging.getLogger(__name__)


class AsyncSMTPBackend:

"""
AsyncSMTPBackend manages an async SMTP connection using aiosmtplib.
"""

DEFAULT_SOCKET_TIMEOUT = 5

response_cls = SMTPResponse

def __init__(self, ssl: bool = False, fail_silently: bool = True,
mail_options: list[str] | None = None, **kwargs: Any) -> None:

self.ssl = ssl
self.tls = kwargs.get('tls')
if self.ssl and self.tls:
raise ValueError(
"ssl/tls are mutually exclusive, so only set "
"one of those settings to True.")

kwargs.setdefault('timeout', self.DEFAULT_SOCKET_TIMEOUT)
kwargs.setdefault('local_hostname', DNS_NAME.get_fqdn())
kwargs['port'] = int(kwargs.get('port', 0))

self.smtp_cls_kwargs = kwargs

self.host: str | None = kwargs.get('host')
self.port: int = kwargs['port']
self.fail_silently = fail_silently
self.mail_options = mail_options or []

self._client: AsyncSMTPClientWithResponse | None = None
self._lock = asyncio.Lock()

async def get_client(self) -> AsyncSMTPClientWithResponse:
async with self._lock:
return await self._get_client_unlocked()

async def _get_client_unlocked(self) -> AsyncSMTPClientWithResponse:
if self._client is None:
client = AsyncSMTPClientWithResponse(
parent=self, ssl=self.ssl, **self.smtp_cls_kwargs
)
await client.initialize()
self._client = client
return self._client

async def close(self) -> None:
"""Closes the connection to the email server."""
async with self._lock:
await self._close_unlocked()

async def _close_unlocked(self) -> None:
if self._client:
try:
await self._client.quit()
except Exception:
if self.fail_silently:
return
raise
finally:
self._client = None

def make_response(self, exception: Exception | None = None) -> SMTPResponse:
return self.response_cls(backend=self, exception=exception)

async def _send(self, **kwargs: Any) -> SMTPResponse | None:
response = None
try:
client = await self._get_client_unlocked()
except aiosmtplib.SMTPConnectError as exc:
cause = exc.__cause__
if isinstance(cause, IOError):
response = self.make_response(
exception=SMTPConnectNetworkError.from_ioerror(cause))
else:
response = self.make_response(exception=exc)
if not self.fail_silently:
raise
except aiosmtplib.SMTPException as exc:
response = self.make_response(exception=exc)
if not self.fail_silently:
raise
except IOError as exc:
response = self.make_response(
exception=SMTPConnectNetworkError.from_ioerror(exc))
if not self.fail_silently:
raise

if response:
return response
else:
return await client.sendmail(**kwargs)

async def _send_with_retry(self, **kwargs: Any) -> SMTPResponse | None:
async with self._lock:
try:
return await self._send(**kwargs)
except aiosmtplib.SMTPServerDisconnected:
logger.debug('SMTPServerDisconnected, retry once')
await self._close_unlocked()
return await self._send(**kwargs)

async def sendmail(self, from_addr: str, to_addrs: str | list[str],
msg: Any, mail_options: list[str] | None = None,
rcpt_options: list[str] | None = None) -> SMTPResponse | None:

if not to_addrs:
return None

if not isinstance(to_addrs, (list, tuple)):
to_addrs = [to_addrs]

response = await self._send_with_retry(
from_addr=from_addr,
to_addrs=to_addrs,
msg=msg.as_bytes(),
mail_options=mail_options or self.mail_options,
rcpt_options=rcpt_options,
)

if response and not self.fail_silently:
response.raise_if_needed()

return response

async def __aenter__(self) -> AsyncSMTPBackend:
return self

async def __aexit__(self, exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: Any | None) -> None:
await self.close()
148 changes: 148 additions & 0 deletions emails/backend/smtp/aio_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from __future__ import annotations

__all__ = ['AsyncSMTPClientWithResponse']

import logging
from typing import TYPE_CHECKING

import aiosmtplib

from ..response import SMTPResponse
from ...utils import sanitize_email

if TYPE_CHECKING:
from .aio_backend import AsyncSMTPBackend

logger = logging.getLogger(__name__)


class AsyncSMTPClientWithResponse:
"""Async SMTP client built on aiosmtplib that returns SMTPResponse objects."""

def __init__(self, parent: AsyncSMTPBackend, **kwargs):
self.parent = parent
self.make_response = parent.make_response

self.tls = kwargs.pop('tls', False)
self.ssl = kwargs.pop('ssl', False)
self.debug = kwargs.pop('debug', 0)
self.user = kwargs.pop('user', None)
self.password = kwargs.pop('password', None)

# aiosmtplib uses use_tls for implicit TLS (SMTPS) and
# start_tls for STARTTLS after connect
smtp_kwargs = dict(kwargs)
smtp_kwargs['use_tls'] = self.ssl
smtp_kwargs['start_tls'] = self.tls

# aiosmtplib uses 'hostname' instead of 'host'
if 'host' in smtp_kwargs:
smtp_kwargs['hostname'] = smtp_kwargs.pop('host')

self._smtp = aiosmtplib.SMTP(**smtp_kwargs)

async def initialize(self):
await self._smtp.connect()
try:
if self._smtp.is_ehlo_or_helo_needed:
await self._smtp.ehlo()
if self.user:
await self._smtp.login(self.user, self.password)
except Exception:
await self.quit()
raise

async def quit(self):
"""Closes the connection to the email server."""
try:
await self._smtp.quit()
except (aiosmtplib.SMTPServerDisconnected, ConnectionError):
self._smtp.close()

async def _rset(self):
try:
await self._smtp.rset()
except (aiosmtplib.SMTPServerDisconnected, ConnectionError):
pass

async def sendmail(self, from_addr: str, to_addrs: list[str] | str,
msg: bytes, mail_options: list[str] | None = None,
rcpt_options: list[str] | None = None) -> SMTPResponse | None:

if not to_addrs:
return None

rcpt_options = rcpt_options or []
mail_options = mail_options or []
esmtp_opts = []
if self._smtp.supports_esmtp:
if self._smtp.supports_extension('size'):
esmtp_opts.append("size=%d" % len(msg))
for option in mail_options:
esmtp_opts.append(option)

response = self.make_response()

from_addr = sanitize_email(from_addr)

response.from_addr = from_addr
response.esmtp_opts = esmtp_opts[:]

try:
resp = await self._smtp.mail(from_addr, options=esmtp_opts)
except aiosmtplib.SMTPSenderRefused as exc:
response.set_status('mail', exc.code, exc.message.encode() if isinstance(exc.message, str) else exc.message)
response.set_exception(exc)
await self._rset()
return response

response.set_status('mail', resp.code, resp.message.encode() if isinstance(resp.message, str) else resp.message)

if resp.code != 250:
await self._rset()
response.set_exception(
aiosmtplib.SMTPSenderRefused(resp.code, resp.message, from_addr))
return response

if not isinstance(to_addrs, (list, tuple)):
to_addrs = [to_addrs]

to_addrs = [sanitize_email(e) for e in to_addrs]

response.to_addrs = to_addrs
response.rcpt_options = rcpt_options[:]
response.refused_recipients = {}

for a in to_addrs:
try:
resp = await self._smtp.rcpt(a, options=rcpt_options)
code = resp.code
resp_msg = resp.message.encode() if isinstance(resp.message, str) else resp.message
except aiosmtplib.SMTPRecipientRefused as exc:
code = exc.code
resp_msg = exc.message.encode() if isinstance(exc.message, str) else exc.message

response.set_status('rcpt', code, resp_msg, recipient=a)
if (code != 250) and (code != 251):
response.refused_recipients[a] = (code, resp_msg)

if len(response.refused_recipients) == len(to_addrs):
await self._rset()
refused_list = [
aiosmtplib.SMTPRecipientRefused(code, msg.decode() if isinstance(msg, bytes) else msg, addr)
for addr, (code, msg) in response.refused_recipients.items()
]
response.set_exception(aiosmtplib.SMTPRecipientsRefused(refused_list))
return response

resp = await self._smtp.data(msg)
resp_msg = resp.message.encode() if isinstance(resp.message, str) else resp.message
response.set_status('data', resp.code, resp_msg)
if resp.code != 250:
await self._rset()
response.set_exception(
aiosmtplib.SMTPDataError(resp.code, resp.message))
return response

response._finished = True
return response
6 changes: 2 additions & 4 deletions emails/backend/smtp/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def close(self) -> None:
if self._client:
try:
self._client.quit()
except:
except Exception:
if self.fail_silently:
return
raise
Expand All @@ -86,7 +86,7 @@ def wrapper(*args: Any, **kwargs: Any) -> SMTPResponse | None:
return func(*args, **kwargs)
except smtplib.SMTPServerDisconnected:
# If server disconected, clear old client
logging.debug('SMTPServerDisconnected, retry once')
logger.debug('SMTPServerDisconnected, retry once')
self.close()
return func(*args, **kwargs)
return wrapper
Expand All @@ -106,8 +106,6 @@ def _send(self, **kwargs: Any) -> SMTPResponse | None:
raise

if response:
if not self.fail_silently:
response.raise_if_needed()
return response
else:
return client.sendmail(**kwargs)
Expand Down
Loading
Loading