Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions backend/push-notification-dispatcher/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"epac/observability"

"github.com/aws/aws-lambda-go/events"
"github.com/jackc/pgx/v5"
)

type apnsStub struct {
Expand Down Expand Up @@ -55,42 +54,47 @@ func (s *apnsStub) snapshot() []map[string]any {
}

func TestAcceptancePushNotificationDispatcherCallsAPNsThroughCompositionRoot(t *testing.T) {
_testdb.WithTx(t, func(conn *pgx.Conn) {
t.Setenv("DATABASE_URL", os.Getenv("DATABASE_URL"))
conn := _testdb.Connect(t)
token := "test-token-epac-2275"
_, _ = conn.Exec(context.Background(), `DELETE FROM device_subscriptions WHERE token = $1`, token)
t.Cleanup(func() {
_, _ = conn.Exec(context.Background(), `DELETE FROM device_subscriptions WHERE token = $1`, token)
})

t.Setenv("DATABASE_URL", os.Getenv("DATABASE_URL"))

_testdb.SeedDeviceSubscription(t, conn, "test-token-123", "", nil, nil)
_testdb.SeedDeviceSubscription(t, conn, token, "", nil, nil)

apns := &apnsStub{}
server := httptest.NewServer(apns)
defer server.Close()
apns := &apnsStub{}
server := httptest.NewServer(apns)
defer server.Close()

t.Setenv("EPAC_APNS_URL", server.URL)
t.Setenv("EPAC_APNS_URL", server.URL)

payload := `{
payload := `{
"division_id": 42,
"parliament": 45,
"session": 1,
"result": "carried",
"status": "concluded"
}`

req := events.APIGatewayProxyRequest{
Body: payload,
}
req := events.APIGatewayProxyRequest{
Body: payload,
}

wrapped := observability.WrapAPIGateway(pipelineName, HandleRequest)
resp, err := wrapped(context.Background(), req)
if err != nil {
t.Fatalf("wrapped HandleRequest error: %v", err)
}
wrapped := observability.WrapAPIGateway(pipelineName, HandleRequest)
resp, err := wrapped(context.Background(), req)
if err != nil {
t.Fatalf("wrapped HandleRequest error: %v", err)
}

if resp.StatusCode != 202 {
t.Errorf("expected status 202, got %d. body=%v", resp.StatusCode, resp.Body)
}
if resp.StatusCode != 202 {
t.Errorf("expected status 202, got %d. body=%v", resp.StatusCode, resp.Body)
}

hits := apns.snapshot()
if len(hits) != 1 {
t.Fatalf("apns hit count = %d, want 1", len(hits))
}
})
hits := apns.snapshot()
if len(hits) != 1 {
t.Fatalf("apns hit count = %d, want 1", len(hits))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package apns

import (
"bytes"
"context"
"fmt"
"net/http"
"net/url"
"strings"

"epac/push-notification-dispatcher/internal/domain"
)

const DefaultBaseURL = "https://api.push.apple.com"

type Client struct {
baseURL string
httpClient *http.Client
}

func NewClient(baseURL string) *Client {
return NewClientWithHTTPClient(baseURL, nil)
}

func NewClientWithHTTPClient(baseURL string, httpClient *http.Client) *Client {
baseURL = strings.TrimSpace(baseURL)
if baseURL == "" {
baseURL = DefaultBaseURL
}
if httpClient == nil {
httpClient = http.DefaultClient
}
return &Client{baseURL: strings.TrimRight(baseURL, "/"), httpClient: httpClient}
}

func (c *Client) Deliver(ctx context.Context, subscription domain.DeviceSubscription, payload domain.PushNotificationPayload) error {
body, err := payload.JSON()
if err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint(subscription.Token), bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
return fmt.Errorf("apns returned status %d", resp.StatusCode)
}
return nil
}

func (c *Client) endpoint(token domain.DeviceToken) string {
if !strings.Contains(c.baseURL, "apple") {
return c.baseURL
}
return fmt.Sprintf("%s/3/device/%s", c.baseURL, url.PathEscape(token.String()))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package postgres

import (
"context"
"errors"
"strings"

"epac/push-notification-dispatcher/internal/domain"

"github.com/jackc/pgx/v5"
)

func Connect(ctx context.Context, connStr string) (*pgx.Conn, error) {
connStr = strings.TrimSpace(connStr)
if connStr == "" {
return nil, errors.New("DATABASE_URL not set")
}
return pgx.Connect(ctx, connStr)
}

type DeviceSubscriptionRepository struct {
conn *pgx.Conn
}

func NewDeviceSubscriptionRepository(conn *pgx.Conn) *DeviceSubscriptionRepository {
return &DeviceSubscriptionRepository{conn: conn}
}

func (r *DeviceSubscriptionRepository) ListDeviceSubscriptions(ctx context.Context) ([]domain.DeviceSubscription, error) {
rows, err := r.conn.Query(ctx, `
SELECT
token,
COALESCE(my_mp_member_id, ''),
COALESCE(topic_ids, '{}'::text[]),
COALESCE(bill_ids, '{}'::text[])
FROM device_subscriptions
ORDER BY token`)
if err != nil {
return nil, err
}
defer rows.Close()

var subscriptions []domain.DeviceSubscription
for rows.Next() {
var (
token string
myMPMemberID string
topicIDs []string
billIDs []string
)
if err := rows.Scan(&token, &myMPMemberID, &topicIDs, &billIDs); err != nil {
return nil, err
}
subscriptions = append(subscriptions, domain.NewDeviceSubscription(token, myMPMemberID, topicIDs, billIDs))
}
if err := rows.Err(); err != nil {
return nil, err
}
return subscriptions, nil
}
103 changes: 103 additions & 0 deletions backend/push-notification-dispatcher/internal/domain/domain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package domain

import (
"bytes"
"encoding/json"
"errors"
"strings"
)

var ErrInvalidPushNotificationPayload = errors.New("invalid push notification payload")

type PushNotificationPayload struct {
DivisionID int `json:"division_id,omitempty"`
Parliament int `json:"parliament,omitempty"`
Session int `json:"session,omitempty"`
Result string `json:"result,omitempty"`
Status string `json:"status,omitempty"`
rawDocument json.RawMessage
}

func ParsePushNotificationPayload(raw []byte) (PushNotificationPayload, error) {
trimmed := bytes.TrimSpace(raw)
if len(trimmed) == 0 || trimmed[0] != '{' {
return PushNotificationPayload{}, ErrInvalidPushNotificationPayload
}

var fields struct {
DivisionID int `json:"division_id"`
Parliament int `json:"parliament"`
Session int `json:"session"`
Result string `json:"result"`
Status string `json:"status"`
}
if err := json.Unmarshal(trimmed, &fields); err != nil {
return PushNotificationPayload{}, ErrInvalidPushNotificationPayload
}

var compacted bytes.Buffer
if err := json.Compact(&compacted, trimmed); err != nil {
return PushNotificationPayload{}, ErrInvalidPushNotificationPayload
}

return PushNotificationPayload{
DivisionID: fields.DivisionID,
Parliament: fields.Parliament,
Session: fields.Session,
Result: fields.Result,
Status: fields.Status,
rawDocument: append(json.RawMessage(nil), compacted.Bytes()...),
}, nil
}

func (p PushNotificationPayload) Valid() bool {
return len(p.rawDocument) > 0
}

func (p PushNotificationPayload) JSON() ([]byte, error) {
if !p.Valid() {
return nil, ErrInvalidPushNotificationPayload
}
return append([]byte(nil), p.rawDocument...), nil
}

type DeviceToken string

func NewDeviceToken(value string) DeviceToken {
return DeviceToken(strings.TrimSpace(value))
}

func (t DeviceToken) String() string {
return string(t)
}

type DeviceSubscription struct {
Token DeviceToken
MyMPMemberID string
TopicIDs []string
BillIDs []string
}

func NewDeviceSubscription(token, myMPMemberID string, topicIDs, billIDs []string) DeviceSubscription {
return DeviceSubscription{
Token: NewDeviceToken(token),
MyMPMemberID: strings.TrimSpace(myMPMemberID),
TopicIDs: copyStrings(topicIDs),
BillIDs: copyStrings(billIDs),
}
}

type DispatchResult struct {
Subscriptions int
Delivered int
Failed int
}

func copyStrings(values []string) []string {
if len(values) == 0 {
return nil
}
out := make([]string, len(values))
copy(out, values)
return out
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package usecase

import (
"context"
"errors"

"epac/push-notification-dispatcher/internal/domain"
)

var ErrInvalidPayload = errors.New("invalid push notification payload")

type DeviceSubscriptionRepository interface {
ListDeviceSubscriptions(ctx context.Context) ([]domain.DeviceSubscription, error)
}

type PushNotificationClient interface {
Deliver(ctx context.Context, subscription domain.DeviceSubscription, payload domain.PushNotificationPayload) error
}

type DispatchPushNotification struct {
subscriptions DeviceSubscriptionRepository
client PushNotificationClient
}

func NewDispatchPushNotification(subscriptions DeviceSubscriptionRepository, client PushNotificationClient) *DispatchPushNotification {
return &DispatchPushNotification{subscriptions: subscriptions, client: client}
}

func (u *DispatchPushNotification) Execute(ctx context.Context, payload domain.PushNotificationPayload) (domain.DispatchResult, error) {
if !payload.Valid() {
return domain.DispatchResult{}, ErrInvalidPayload
}

subscriptions, err := u.subscriptions.ListDeviceSubscriptions(ctx)
if err != nil {
return domain.DispatchResult{}, err
}

result := domain.DispatchResult{Subscriptions: len(subscriptions)}
for _, subscription := range subscriptions {
if err := u.client.Deliver(ctx, subscription, payload); err != nil {
result.Failed++
continue
}
result.Delivered++
}

return result, nil
}
Loading
Loading