From f5260f502891b97438a608a83142b471da0f8485 Mon Sep 17 00:00:00 2001 From: "Sakith B." Date: Tue, 5 May 2026 10:39:27 +0530 Subject: [PATCH 1/3] queueing and sending --- backend/migrations/Version20260505100305.php | 29 +++++++++++++++++++ backend/src/Entity/Send.php | 15 ++++++++++ .../src/Repository/IpAddressRepository.php | 17 +++++++++++ backend/src/Service/Send/SendService.php | 3 ++ .../tests/Api/Console/Send/SendEmailTest.php | 5 +++- worker/email_worker.go | 2 +- worker/send_pg.go | 6 ++-- 7 files changed, 72 insertions(+), 5 deletions(-) create mode 100644 backend/migrations/Version20260505100305.php diff --git a/backend/migrations/Version20260505100305.php b/backend/migrations/Version20260505100305.php new file mode 100644 index 00000000..4ae2a2a3 --- /dev/null +++ b/backend/migrations/Version20260505100305.php @@ -0,0 +1,29 @@ +addSql('ALTER TABLE sends ADD ip_address_id INT DEFAULT NULL REFERENCES ip_addresses(id) ON DELETE SET NULL'); + } + + public function down(Schema $schema): void + { + $this->addSql('ALTER TABLE sends DROP ip_address_id'); + } +} diff --git a/backend/src/Entity/Send.php b/backend/src/Entity/Send.php index 9e7c94e0..7dcf9185 100644 --- a/backend/src/Entity/Send.php +++ b/backend/src/Entity/Send.php @@ -43,6 +43,10 @@ class Send #[ORM\JoinColumn] private Queue $queue; + #[ORM\ManyToOne(targetEntity: IpAddress::class)] + #[ORM\JoinColumn(onDelete: "SET NULL")] + private ?IpAddress $ip_address = null; + #[ORM\Column()] private string $queue_name; // denormalized for easier access @@ -187,6 +191,17 @@ public function setQueue(Queue $queue): static return $this; } + public function getIpAddress(): ?IpAddress + { + return $this->ip_address; + } + + public function setIpAddress(?IpAddress $ipAddress): static + { + $this->ip_address = $ipAddress; + return $this; + } + public function getQueueName(): string { return $this->queue_name; diff --git a/backend/src/Repository/IpAddressRepository.php b/backend/src/Repository/IpAddressRepository.php index d73d96e0..873d90ee 100644 --- a/backend/src/Repository/IpAddressRepository.php +++ b/backend/src/Repository/IpAddressRepository.php @@ -6,6 +6,8 @@ use Doctrine\Persistence\ManagerRegistry; use Doctrine\Bundle\DoctrineBundle\Repository\ServiceEntityRepository; +use App\Entity\Queue; + /** * @extends ServiceEntityRepository */ @@ -15,4 +17,19 @@ public function __construct(ManagerRegistry $registry) { parent::__construct($registry, IpAddress::class); } + + public function getRandomIpForQueue(Queue $queue): ?IpAddress + { + $conn = $this->getEntityManager()->getConnection(); + $sql = 'SELECT id FROM ip_addresses WHERE queue_id = :queue_id ORDER BY RANDOM() LIMIT 1'; + $stmt = $conn->prepare($sql); + $result = $stmt->executeQuery(['queue_id' => $queue->getId()]); + $id = $result->fetchOne(); + + if ($id) { + return $this->find($id); + } + + return null; + } } \ No newline at end of file diff --git a/backend/src/Service/Send/SendService.php b/backend/src/Service/Send/SendService.php index 34b2f8a7..d1ddb888 100644 --- a/backend/src/Service/Send/SendService.php +++ b/backend/src/Service/Send/SendService.php @@ -9,6 +9,7 @@ use App\Entity\SendRecipient; use App\Entity\Type\SendRecipientStatus; use App\Entity\Type\SendRecipientType; +use App\Repository\IpAddressRepository; use App\Repository\SendRepository; use App\Service\Send\Dto\SendingAttachment; use App\Service\Send\Exception\EmailTooLargeException; @@ -28,6 +29,7 @@ public function __construct( private EmailBuilder $emailBuilder, private SendRepository $sendRepository, private RecipientFactory $recipientFactory, + private IpAddressRepository $ipAddressRepository, ) { } @@ -150,6 +152,7 @@ public function createSend( $send->setDomain($domain); $send->setQueue($queue); $send->setQueueName($queue->getName()); + $send->setIpAddress($this->ipAddressRepository->getRandomIpForQueue($queue)); $send->setFromAddress($from->getAddress()); $send->setFromName($from->getName()); $send->setSubject($subject); diff --git a/backend/tests/Api/Console/Send/SendEmailTest.php b/backend/tests/Api/Console/Send/SendEmailTest.php index 7411c008..6fe73d83 100644 --- a/backend/tests/Api/Console/Send/SendEmailTest.php +++ b/backend/tests/Api/Console/Send/SendEmailTest.php @@ -21,6 +21,7 @@ use App\Service\Suppression\SuppressionService; use App\Tests\Case\WebTestCase; use App\Tests\Factory\DomainFactory; +use App\Tests\Factory\IpAddressFactory; use App\Tests\Factory\ProjectFactory; use App\Tests\Factory\QueueFactory; use App\Tests\Factory\SuppressionFactory; @@ -382,7 +383,8 @@ public function test_validation( #[TestWith([false])] public function test_queues_mail(bool $useArrayAddress): void { - QueueFactory::createTransactional(); + $queue = QueueFactory::createTransactional(); + $ip = IpAddressFactory::createOne(['queue' => $queue]); $project = ProjectFactory::createOne(); DomainFactory::createOne([ @@ -434,6 +436,7 @@ public function test_queues_mail(bool $useArrayAddress): void $send = $send[0]; $this->assertSame(true, $send->getQueued()); + $this->assertSame($ip->getId(), $send->getIpAddress()?->getId()); $this->assertSame("Test Email", $send->getSubject()); $this->assertSame("This is a test email.", $send->getBodyText()); $this->assertSame("

This is a test email.

", $send->getBodyHtml()); diff --git a/worker/email_worker.go b/worker/email_worker.go index 3a91dd40..616cfeb4 100644 --- a/worker/email_worker.go +++ b/worker/email_worker.go @@ -194,7 +194,7 @@ func (worker *EmailWorker) processSend(conn *sql.DB) error { return err } - send, recipients, err := sendTx.FetchSend(worker.ip.QueueId) + send, recipients, err := sendTx.FetchSend(worker.ip.Id) if err != nil { diff --git a/worker/send_pg.go b/worker/send_pg.go index 5bacc84f..99c17375 100644 --- a/worker/send_pg.go +++ b/worker/send_pg.go @@ -44,13 +44,13 @@ func NewSendTransaction( }, nil } -func (b *SendTransaction) FetchSend(queueId int) (*SendRow, []*RecipientRow, error) { +func (b *SendTransaction) FetchSend(ipId int) (*SendRow, []*RecipientRow, error) { row := b.tx.QueryRowContext(b.ctx, ` WITH ids AS MATERIALIZED ( SELECT id, uuid, from_address, raw, queue_name FROM sends - WHERE queued = true AND queue_id = $1 AND send_after < NOW() + WHERE queued = true AND ip_address_id = $1 AND send_after < NOW() FOR UPDATE SKIP LOCKED LIMIT 1 ) @@ -58,7 +58,7 @@ func (b *SendTransaction) FetchSend(queueId int) (*SendRow, []*RecipientRow, err SET queued = false, updated_at = NOW() WHERE id = ANY(SELECT id FROM ids) RETURNING id, uuid, from_address, raw, queue_name - `, queueId) + `, ipId) var send SendRow From 66838168d55992f81d9229f97b4263ad67a932c3 Mon Sep 17 00:00:00 2001 From: "Sakith B." Date: Tue, 5 May 2026 11:03:17 +0530 Subject: [PATCH 2/3] fix tests --- worker/email_worker_test.go | 2 ++ worker/pg_test.go | 22 +++++++++++++++------- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/worker/email_worker_test.go b/worker/email_worker_test.go index 043a6810..b19ecc28 100644 --- a/worker/email_worker_test.go +++ b/worker/email_worker_test.go @@ -243,6 +243,7 @@ func TestEmailWorker_ProcessSend(t *testing.T) { ctx: context.Background(), logger: slog.New(slog.NewTextHandler(io.Discard, nil)), ip: GoStateIp{ + Id: send.IpAddressId, QueueId: send.QueueId, }, AttemptSendToDomainFunc: func( @@ -370,6 +371,7 @@ func TestEmailWorker_ProcessSend_Requeuing(t *testing.T) { ctx: context.Background(), logger: slogDiscard(), ip: GoStateIp{ + Id: send.IpAddressId, QueueId: send.QueueId, }, AttemptSendToDomainFunc: func( diff --git a/worker/pg_test.go b/worker/pg_test.go index 6ab315b2..a876e719 100644 --- a/worker/pg_test.go +++ b/worker/pg_test.go @@ -168,6 +168,7 @@ type FactorySend struct { ProjectId int DomainId int QueueId int + IpAddressId int Queued bool SendAfter time.Time FromAddress string @@ -203,24 +204,30 @@ func (m *TestFactory) Send(send *FactorySend) (*FactorySend, error) { return nil, err } + ipId, err := m.IpAddress() + if err != nil { + return nil, err + } + send.ProjectId = projectId send.DomainId = domainId send.QueueId = queueId + send.IpAddressId = ipId now := time.Now() err = m.conn.QueryRow(` INSERT INTO sends ( - created_at, updated_at, send_after, project_id, domain_id, queue_id, + created_at, updated_at, send_after, project_id, domain_id, queue_id, ip_address_id, queue_name, from_address, subject, body_html, body_text, headers, message_id, raw, size_bytes, queued ) VALUES ( - $1, $2, $3, $4, $5, $6, - $7, $8, $9, $10, $11, - $12, $13, $14, - 0, $15 + $1, $2, $3, $4, $5, $6, $7, + $8, $9, $10, $11, $12, + $13, $14, $15, + 0, $16 ) RETURNING id, uuid - `, now, now, send.SendAfter, projectId, send.DomainId, queueId, + `, now, now, send.SendAfter, projectId, send.DomainId, queueId, ipId, "test-queue", send.FromAddress, send.Subject, send.BodyHtml, send.BodyText, nil, "test-message-id", "raw-email-content", send.Queued, @@ -239,7 +246,7 @@ func (f *TestFactory) GetSendById(id int) (*FactorySend, error) { var send FactorySend row := f.conn.QueryRow(` SELECT - id, uuid, project_id, domain_id, queue_id, queued, send_after, + id, uuid, project_id, domain_id, queue_id, ip_address_id, queued, send_after, from_address, subject, body_html, body_text FROM sends WHERE id = $1 `, id) @@ -250,6 +257,7 @@ func (f *TestFactory) GetSendById(id int) (*FactorySend, error) { &send.ProjectId, &send.DomainId, &send.QueueId, + &send.IpAddressId, &send.Queued, &send.SendAfter, &send.FromAddress, From 71e79562cd7cb228c0611bbdcd888c98ac810ccf Mon Sep 17 00:00:00 2001 From: "Sakith B." Date: Thu, 7 May 2026 10:33:11 +0530 Subject: [PATCH 3/3] wip --- backend/src/Api/Console/Object/SendObject.php | 2 + backend/src/Repository/SendRepository.php | 11 ++ .../src/Service/Ip/Event/IpRemovedEvent.php | 20 +++ .../Ip/IpAddressQueueAssignedListener.php | 40 ++++++ backend/src/Service/Ip/IpAddressService.php | 3 + backend/src/Service/Ip/IpRemovedListener.php | 30 +++++ .../Send/Message/RouteNullIpsMessage.php | 16 +++ .../Message/RouteQueueNullIpsToIpMessage.php | 17 +++ .../RouteNullIpsMessageHandler.php | 38 ++++++ .../RouteQueueNullIpsToIpMessageHandler.php | 40 ++++++ .../Api/Console/Send/GetSendByUuidTest.php | 1 + .../Ip/IpAddressQueueAssignedListenerTest.php | 81 ++++++++++++ .../Service/Ip/IpRemovedListenerTest.php | 46 +++++++ .../RouteNullIpsMessageHandlerTest.php | 98 +++++++++++++++ ...outeQueueNullIpsToIpMessageHandlerTest.php | 118 ++++++++++++++++++ .../console/[id]/sends/[uuid]/Overview.svelte | 2 + frontend/src/routes/console/types.ts | 1 + 17 files changed, 564 insertions(+) create mode 100644 backend/src/Service/Ip/Event/IpRemovedEvent.php create mode 100644 backend/src/Service/Ip/IpAddressQueueAssignedListener.php create mode 100644 backend/src/Service/Ip/IpRemovedListener.php create mode 100644 backend/src/Service/Send/Message/RouteNullIpsMessage.php create mode 100644 backend/src/Service/Send/Message/RouteQueueNullIpsToIpMessage.php create mode 100644 backend/src/Service/Send/MessageHandler/RouteNullIpsMessageHandler.php create mode 100644 backend/src/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandler.php create mode 100644 backend/tests/Service/Ip/IpAddressQueueAssignedListenerTest.php create mode 100644 backend/tests/Service/Ip/IpRemovedListenerTest.php create mode 100644 backend/tests/Service/Send/MessageHandler/RouteNullIpsMessageHandlerTest.php create mode 100644 backend/tests/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandlerTest.php diff --git a/backend/src/Api/Console/Object/SendObject.php b/backend/src/Api/Console/Object/SendObject.php index f135cac8..f3dbe944 100644 --- a/backend/src/Api/Console/Object/SendObject.php +++ b/backend/src/Api/Console/Object/SendObject.php @@ -22,6 +22,7 @@ class SendObject public int $size_bytes; public bool $queued; public int $send_after; + public ?string $ip_address; /** * @var SendRecipientObject[] @@ -61,6 +62,7 @@ public function __construct( $this->size_bytes = $send->getSizeBytes(); $this->queued = $send->getQueued(); $this->send_after = $send->getSendAfter()->getTimestamp(); + $this->ip_address = $send->getIpAddress()?->getIpAddress(); $this->recipients = array_map(fn($recipient) => new SendRecipientObject($recipient), $send->getRecipients()->toArray()); diff --git a/backend/src/Repository/SendRepository.php b/backend/src/Repository/SendRepository.php index 3b3986ca..6031577f 100644 --- a/backend/src/Repository/SendRepository.php +++ b/backend/src/Repository/SendRepository.php @@ -15,4 +15,15 @@ public function __construct(ManagerRegistry $registry) { parent::__construct($registry, Send::class); } + + public function updateNullIpSendsForQueue(int $queueId, ?int $ipAddressId): int + { + $conn = $this->getEntityManager()->getConnection(); + $sql = 'UPDATE sends SET ip_address_id = :ip_id, updated_at = NOW() WHERE queue_id = :queue_id AND ip_address_id IS NULL'; + $stmt = $conn->prepare($sql); + return $stmt->executeStatement([ + 'ip_id' => $ipAddressId, + 'queue_id' => $queueId, + ]); + } } \ No newline at end of file diff --git a/backend/src/Service/Ip/Event/IpRemovedEvent.php b/backend/src/Service/Ip/Event/IpRemovedEvent.php new file mode 100644 index 00000000..57bdc9ca --- /dev/null +++ b/backend/src/Service/Ip/Event/IpRemovedEvent.php @@ -0,0 +1,20 @@ +ipAddress; + } + +} diff --git a/backend/src/Service/Ip/IpAddressQueueAssignedListener.php b/backend/src/Service/Ip/IpAddressQueueAssignedListener.php new file mode 100644 index 00000000..7547e51f --- /dev/null +++ b/backend/src/Service/Ip/IpAddressQueueAssignedListener.php @@ -0,0 +1,40 @@ +getUpdates(); + + if (!$updates->queueSet) { + return; + } + + $ipAddress = $event->getIpAddress(); + $queue = $ipAddress->getQueue(); + + if ($queue === null) { + return; + } + + $this->bus->dispatch(new RouteQueueNullIpsToIpMessage( + $queue->getId(), + $ipAddress->getId() + )); + } + +} diff --git a/backend/src/Service/Ip/IpAddressService.php b/backend/src/Service/Ip/IpAddressService.php index 6d9b2f61..e023d79e 100644 --- a/backend/src/Service/Ip/IpAddressService.php +++ b/backend/src/Service/Ip/IpAddressService.php @@ -7,6 +7,7 @@ use App\Service\Ip\Dto\PtrValidationDto; use App\Service\Ip\Dto\UpdateIpAddressDto; use App\Service\Ip\Event\IpAddressUpdatedEvent; +use App\Service\Ip\Event\IpRemovedEvent; use App\Service\Queue\QueueService; use Doctrine\ORM\EntityManagerInterface; use Symfony\Component\Clock\ClockAwareTrait; @@ -103,6 +104,8 @@ public function createIpAddress(Server $server, string $ipAddress): IpAddress public function deleteIpAddress(IpAddress $ipAddress): void { + $this->ed->dispatch(new IpRemovedEvent($ipAddress)); + $this->em->remove($ipAddress); $this->em->flush(); } diff --git a/backend/src/Service/Ip/IpRemovedListener.php b/backend/src/Service/Ip/IpRemovedListener.php new file mode 100644 index 00000000..b67b09a0 --- /dev/null +++ b/backend/src/Service/Ip/IpRemovedListener.php @@ -0,0 +1,30 @@ +getIpAddress()->getQueue(); + + if ($queue === null) { + return; + } + + $this->bus->dispatch(new RouteNullIpsMessage($queue->getId())); + } + +} diff --git a/backend/src/Service/Send/Message/RouteNullIpsMessage.php b/backend/src/Service/Send/Message/RouteNullIpsMessage.php new file mode 100644 index 00000000..e1766f8b --- /dev/null +++ b/backend/src/Service/Send/Message/RouteNullIpsMessage.php @@ -0,0 +1,16 @@ +queueService->getQueueById($message->queueId); + + if ($queue === null) { + return; + } + + $newIp = $this->ipAddressRepository->getRandomIpForQueue($queue); + + $this->sendRepository->updateNullIpSendsForQueue( + $queue->getId(), + $newIp?->getId() + ); + } + +} diff --git a/backend/src/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandler.php b/backend/src/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandler.php new file mode 100644 index 00000000..c60f0888 --- /dev/null +++ b/backend/src/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandler.php @@ -0,0 +1,40 @@ +ipAddressRepository->find($message->ipAddressId); + + if ($ipAddress === null) { + return; + } + + $queue = $ipAddress->getQueue(); + + if ($queue === null || $queue->getId() !== $message->queueId) { + return; + } + + $this->sendRepository->updateNullIpSendsForQueue( + $message->queueId, + $message->ipAddressId + ); + } + +} diff --git a/backend/tests/Api/Console/Send/GetSendByUuidTest.php b/backend/tests/Api/Console/Send/GetSendByUuidTest.php index 062972eb..df9f0049 100644 --- a/backend/tests/Api/Console/Send/GetSendByUuidTest.php +++ b/backend/tests/Api/Console/Send/GetSendByUuidTest.php @@ -55,6 +55,7 @@ public function test_get_specific_email(): void $this->assertArrayHasKey('id', $json); $this->assertSame($send->getId(), $json['id']); + $this->assertArrayHasKey('ip_address', $json); $attempts = $json['attempts']; $this->assertIsArray($attempts); diff --git a/backend/tests/Service/Ip/IpAddressQueueAssignedListenerTest.php b/backend/tests/Service/Ip/IpAddressQueueAssignedListenerTest.php new file mode 100644 index 00000000..1730c283 --- /dev/null +++ b/backend/tests/Service/Ip/IpAddressQueueAssignedListenerTest.php @@ -0,0 +1,81 @@ + $queue]); + + $updates = new UpdateIpAddressDto(); + $updates->queue = $queue->_real(); + + $event = new IpAddressUpdatedEvent( + $ipAddress->_real(), + $ipAddress->_real(), + $updates + ); + + $this->getEd()->dispatch($event); + + $transport = $this->transport('async'); + $sent = $transport->dispatched(); + + $this->assertCount(1, $sent); + $message = $sent->first()->getMessage(); + $this->assertInstanceOf(RouteQueueNullIpsToIpMessage::class, $message); + $this->assertSame($queue->getId(), $message->queueId); + $this->assertSame($ipAddress->getId(), $message->ipAddressId); + } + + public function test_no_message_when_queue_not_set(): void + { + $ipAddress = IpAddressFactory::createOne(); + + $updates = new UpdateIpAddressDto(); + + $event = new IpAddressUpdatedEvent( + $ipAddress->_real(), + $ipAddress->_real(), + $updates + ); + + $this->getEd()->dispatch($event); + + $transport = $this->transport('async'); + $this->assertCount(0, $transport->dispatched()); + } + + public function test_no_message_when_queue_is_null(): void + { + $ipAddress = IpAddressFactory::createOne(['queue' => null]); + + $updates = new UpdateIpAddressDto(); + $updates->queue = null; + + $event = new IpAddressUpdatedEvent( + $ipAddress->_real(), + $ipAddress->_real(), + $updates + ); + + $this->getEd()->dispatch($event); + + $transport = $this->transport('async'); + $this->assertCount(0, $transport->dispatched()); + } + +} diff --git a/backend/tests/Service/Ip/IpRemovedListenerTest.php b/backend/tests/Service/Ip/IpRemovedListenerTest.php new file mode 100644 index 00000000..6fc4a6a7 --- /dev/null +++ b/backend/tests/Service/Ip/IpRemovedListenerTest.php @@ -0,0 +1,46 @@ + $queue]); + + $event = new IpRemovedEvent($ipAddress->_real()); + $this->getEd()->dispatch($event); + + $transport = $this->transport('async'); + $sent = $transport->dispatched(); + + $this->assertCount(1, $sent); + $message = $sent->first()->getMessage(); + $this->assertInstanceOf(RouteNullIpsMessage::class, $message); + $this->assertSame($queue->getId(), $message->queueId); + } + + public function test_no_message_when_queue_is_null(): void + { + $ipAddress = IpAddressFactory::createOne(['queue' => null]); + + $event = new IpRemovedEvent($ipAddress->_real()); + $this->getEd()->dispatch($event); + + $transport = $this->transport('async'); + $this->assertCount(0, $transport->dispatched()); + } + +} diff --git a/backend/tests/Service/Send/MessageHandler/RouteNullIpsMessageHandlerTest.php b/backend/tests/Service/Send/MessageHandler/RouteNullIpsMessageHandlerTest.php new file mode 100644 index 00000000..30570388 --- /dev/null +++ b/backend/tests/Service/Send/MessageHandler/RouteNullIpsMessageHandlerTest.php @@ -0,0 +1,98 @@ + $queue]); + $ip2 = IpAddressFactory::createOne(['queue' => $queue]); + + $send = SendFactory::createOne([ + 'queue' => $queue, + 'ip_address' => $ip1, + ]); + + // Simulate IP removal (delete IP and nullify sends) + $this->em->remove($ip1->_real()); + $this->em->flush(); + + $transport = $this->transport('async'); + $transport->send(new RouteNullIpsMessage($queue->getId())); + $transport->throwExceptions()->process(); + + $this->em->clear(); + + /** @var SendRepository $sendRepo */ + $sendRepo = $this->em->getRepository(Send::class); + $updatedSend = $sendRepo->find($send->getId()); + + $this->assertNotNull($updatedSend); + $this->assertNotNull($updatedSend->getIpAddress()); + $this->assertSame($ip2->getId(), $updatedSend->getIpAddress()->getId()); + } + + public function test_leaves_null_when_no_ip_available(): void + { + $queue = QueueFactory::createOne(); + + $send = SendFactory::createOne([ + 'queue' => $queue, + 'ip_address' => null, + ]); + + $transport = $this->transport('async'); + $transport->send(new RouteNullIpsMessage($queue->getId())); + $transport->throwExceptions()->process(); + + $this->em->clear(); + + /** @var SendRepository $sendRepo */ + $sendRepo = $this->em->getRepository(Send::class); + $updatedSend = $sendRepo->find($send->getId()); + + $this->assertNotNull($updatedSend); + $this->assertNull($updatedSend->getIpAddress()); + } + + public function test_does_not_affect_sends_with_existing_ip(): void + { + $queue = QueueFactory::createOne(); + $ip1 = IpAddressFactory::createOne(['queue' => $queue]); + $ip2 = IpAddressFactory::createOne(['queue' => $queue]); + + $send = SendFactory::createOne([ + 'queue' => $queue, + 'ip_address' => $ip1, + ]); + + $transport = $this->transport('async'); + $transport->send(new RouteNullIpsMessage($queue->getId())); + $transport->throwExceptions()->process(); + + $this->em->clear(); + + /** @var SendRepository $sendRepo */ + $sendRepo = $this->em->getRepository(Send::class); + $updatedSend = $sendRepo->find($send->getId()); + + $this->assertNotNull($updatedSend); + $this->assertNotNull($updatedSend->getIpAddress()); + $this->assertSame($ip1->getId(), $updatedSend->getIpAddress()->getId()); + } + +} diff --git a/backend/tests/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandlerTest.php b/backend/tests/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandlerTest.php new file mode 100644 index 00000000..db2f98b8 --- /dev/null +++ b/backend/tests/Service/Send/MessageHandler/RouteQueueNullIpsToIpMessageHandlerTest.php @@ -0,0 +1,118 @@ + $queue]); + + $send = SendFactory::createOne([ + 'queue' => $queue, + 'ip_address' => null, + ]); + + $transport = $this->transport('async'); + $transport->send(new RouteQueueNullIpsToIpMessage($queue->getId(), $ipAddress->getId())); + $transport->throwExceptions()->process(); + + $this->em->clear(); + + /** @var SendRepository $sendRepo */ + $sendRepo = $this->em->getRepository(Send::class); + $updatedSend = $sendRepo->find($send->getId()); + + $this->assertNotNull($updatedSend); + $this->assertNotNull($updatedSend->getIpAddress()); + $this->assertSame($ipAddress->getId(), $updatedSend->getIpAddress()->getId()); + } + + public function test_does_not_affect_sends_with_existing_ip(): void + { + $queue = QueueFactory::createOne(); + $ip1 = IpAddressFactory::createOne(['queue' => $queue]); + $ip2 = IpAddressFactory::createOne(['queue' => $queue]); + + $send = SendFactory::createOne([ + 'queue' => $queue, + 'ip_address' => $ip1, + ]); + + $transport = $this->transport('async'); + $transport->send(new RouteQueueNullIpsToIpMessage($queue->getId(), $ip2->getId())); + $transport->throwExceptions()->process(); + + $this->em->clear(); + + /** @var SendRepository $sendRepo */ + $sendRepo = $this->em->getRepository(Send::class); + $updatedSend = $sendRepo->find($send->getId()); + + $this->assertNotNull($updatedSend); + $this->assertNotNull($updatedSend->getIpAddress()); + $this->assertSame($ip1->getId(), $updatedSend->getIpAddress()->getId()); + } + + public function test_no_op_when_ip_does_not_exist(): void + { + $queue = QueueFactory::createOne(); + + $send = SendFactory::createOne([ + 'queue' => $queue, + 'ip_address' => null, + ]); + + $transport = $this->transport('async'); + $transport->send(new RouteQueueNullIpsToIpMessage($queue->getId(), 99999)); + $transport->throwExceptions()->process(); + + $this->em->clear(); + + /** @var SendRepository $sendRepo */ + $sendRepo = $this->em->getRepository(Send::class); + $updatedSend = $sendRepo->find($send->getId()); + + $this->assertNotNull($updatedSend); + $this->assertNull($updatedSend->getIpAddress()); + } + + public function test_no_op_when_ip_belongs_to_different_queue(): void + { + $queue1 = QueueFactory::createOne(); + $queue2 = QueueFactory::createOne(); + $ipAddress = IpAddressFactory::createOne(['queue' => $queue2]); + + $send = SendFactory::createOne([ + 'queue' => $queue1, + 'ip_address' => null, + ]); + + $transport = $this->transport('async'); + $transport->send(new RouteQueueNullIpsToIpMessage($queue1->getId(), $ipAddress->getId())); + $transport->throwExceptions()->process(); + + $this->em->clear(); + + /** @var SendRepository $sendRepo */ + $sendRepo = $this->em->getRepository(Send::class); + $updatedSend = $sendRepo->find($send->getId()); + + $this->assertNotNull($updatedSend); + $this->assertNull($updatedSend->getIpAddress()); + } + +} diff --git a/frontend/src/routes/console/[id]/sends/[uuid]/Overview.svelte b/frontend/src/routes/console/[id]/sends/[uuid]/Overview.svelte index 09d0274c..da576ca8 100644 --- a/frontend/src/routes/console/[id]/sends/[uuid]/Overview.svelte +++ b/frontend/src/routes/console/[id]/sends/[uuid]/Overview.svelte @@ -120,6 +120,8 @@ + + diff --git a/frontend/src/routes/console/types.ts b/frontend/src/routes/console/types.ts index fd15c3c4..a66af05f 100644 --- a/frontend/src/routes/console/types.ts +++ b/frontend/src/routes/console/types.ts @@ -78,6 +78,7 @@ export type Send = { size_bytes: number; queued: boolean; send_after: number; + ip_address: string | null; recipients: SendRecipient[]; attempts: SendAttempt[];