diff --git a/backend/live-vote-poller/internal/adapter/artifacts/repository.go b/backend/live-vote-poller/internal/adapter/artifacts/repository.go new file mode 100644 index 00000000..9890934a --- /dev/null +++ b/backend/live-vote-poller/internal/adapter/artifacts/repository.go @@ -0,0 +1,77 @@ +package artifacts + +import ( + "bytes" + "context" + "os" + "path/filepath" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" +) + +type Repository struct { + Dir string + Bucket string + Prefix string + Client *s3.Client +} + +func (r *Repository) Exists(ctx context.Context, key string) (bool, error) { + if r.Dir != "" { + path := filepath.Join(r.Dir, filepath.FromSlash(key)) + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + if r.Bucket == "" || r.Client == nil { + return false, nil + } + + s3Key := key + if r.Prefix != "" { + s3Key = r.Prefix + "/" + key + } + + _, err := r.Client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(r.Bucket), + Key: aws.String(s3Key), + }) + if err != nil { + return false, nil + } + return true, nil +} + +func (r *Repository) Write(ctx context.Context, key string, payload []byte) error { + if r.Dir != "" { + path := filepath.Join(r.Dir, filepath.FromSlash(key)) + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return err + } + return os.WriteFile(path, payload, 0644) + } + + if r.Bucket == "" || r.Client == nil { + return nil + } + + s3Key := key + if r.Prefix != "" { + s3Key = r.Prefix + "/" + key + } + + _, err := r.Client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(r.Bucket), + Key: aws.String(s3Key), + Body: bytes.NewReader(payload), + ContentType: aws.String("application/json"), + }) + return err +} diff --git a/backend/live-vote-poller/internal/adapter/ourcommons/divisions_client.go b/backend/live-vote-poller/internal/adapter/ourcommons/divisions_client.go new file mode 100644 index 00000000..98031da4 --- /dev/null +++ b/backend/live-vote-poller/internal/adapter/ourcommons/divisions_client.go @@ -0,0 +1,41 @@ +package ourcommons + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "epac/backend/live-vote-poller/internal/usecase" +) + +type DivisionsClient struct { + URL string + Client *http.Client +} + +func (c *DivisionsClient) FetchDivisions(ctx context.Context) ([]usecase.Division, error) { + req, err := http.NewRequestWithContext(ctx, "GET", c.URL, nil) + if err != nil { + return nil, err + } + + resp, err := c.Client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("parliament API returned %d", resp.StatusCode) + } + + var data struct { + Divisions []usecase.Division `json:"divisions"` + } + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return nil, fmt.Errorf("decode parliament response: %w", err) + } + + return data.Divisions, nil +} diff --git a/backend/live-vote-poller/internal/adapter/push/dispatcher.go b/backend/live-vote-poller/internal/adapter/push/dispatcher.go new file mode 100644 index 00000000..5f2d2877 --- /dev/null +++ b/backend/live-vote-poller/internal/adapter/push/dispatcher.go @@ -0,0 +1,37 @@ +package push + +import ( + "bytes" + "context" + "fmt" + "net/http" +) + +type Dispatcher struct { + URL string + Client *http.Client +} + +func (d *Dispatcher) Dispatch(ctx context.Context, payload []byte) error { + if d.URL == "" { + return nil + } + + req, err := http.NewRequestWithContext(ctx, "POST", d.URL, bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + res, err := d.Client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusAccepted && res.StatusCode != http.StatusOK { + return fmt.Errorf("dispatcher returned %d", res.StatusCode) + } + + return nil +} diff --git a/backend/live-vote-poller/internal/usecase/poll_live_divisions.go b/backend/live-vote-poller/internal/usecase/poll_live_divisions.go new file mode 100644 index 00000000..e3377706 --- /dev/null +++ b/backend/live-vote-poller/internal/usecase/poll_live_divisions.go @@ -0,0 +1,113 @@ +package usecase + +import ( + "context" + "encoding/json" + "fmt" + "time" +) + +type Division map[string]any + +type DivisionsFetching interface { + FetchDivisions(ctx context.Context) ([]Division, error) +} + +type ArtifactRepository interface { + Exists(ctx context.Context, key string) (bool, error) + Write(ctx context.Context, key string, payload []byte) error +} + +type PushDispatching interface { + Dispatch(ctx context.Context, payload []byte) error +} + +type Clock interface { + Now() time.Time +} + +type PollLiveDivisions struct { + Fetcher DivisionsFetching + Repository ArtifactRepository + Dispatcher PushDispatching + Clock Clock +} + +func (u *PollLiveDivisions) Execute(ctx context.Context) error { + if !isSittingHours(u.Clock.Now()) { + return nil + } + + divisions, err := u.Fetcher.FetchDivisions(ctx) + if err != nil { + return fmt.Errorf("fetch parliament divisions: %w", err) + } + + for _, div := range divisions { + status, _ := div["status"].(string) + if status != "concluded" { + continue + } + + divID, _ := div["division_id"].(float64) + parl, _ := div["parliament"].(float64) + sess, _ := div["session"].(float64) + + if divID == 0 || parl == 0 || sess == 0 { + // Skip malformed division + continue + } + + if err := u.processConcludedDivision(ctx, div, int(parl), int(sess), int(divID)); err != nil { + return fmt.Errorf("process division %d: %w", int(divID), err) + } + } + + return nil +} + +func isSittingHours(now time.Time) bool { + loc, err := time.LoadLocation("America/Toronto") + if err != nil { + return true // fail open if tzdata is missing + } + t := now.In(loc) + offset := t.Hour()*60 + t.Minute() + + switch t.Weekday() { + case time.Monday: + return offset >= 10*60+30 && offset <= 19*60 + case time.Tuesday, time.Wednesday, time.Thursday: + return offset >= 9*60+30 && offset <= 19*60 + case time.Friday: + return offset >= 9*60+30 && offset <= 15*60 + default: + return false + } +} + +func (u *PollLiveDivisions) processConcludedDivision(ctx context.Context, div Division, parl, sess, divID int) error { + key := fmt.Sprintf("votes/live/%d-%d-%d.json", parl, sess, divID) + + exists, err := u.Repository.Exists(ctx, key) + if err != nil { + return err + } + if exists { + return nil + } + + payload, err := json.Marshal(div) + if err != nil { + return err + } + if err := u.Repository.Write(ctx, key, payload); err != nil { + return err + } + + if err := u.Dispatcher.Dispatch(ctx, payload); err != nil { + return err + } + + return nil +} diff --git a/backend/live-vote-poller/main.go b/backend/live-vote-poller/main.go index 153db6b3..549e69dc 100644 --- a/backend/live-vote-poller/main.go +++ b/backend/live-vote-poller/main.go @@ -1,19 +1,18 @@ package main import ( - "bytes" "context" - "encoding/json" - "fmt" "net/http" "os" - "path/filepath" "time" + "epac/backend/live-vote-poller/internal/adapter/artifacts" + "epac/backend/live-vote-poller/internal/adapter/ourcommons" + "epac/backend/live-vote-poller/internal/adapter/push" + "epac/backend/live-vote-poller/internal/usecase" "epac/observability" "github.com/aws/aws-lambda-go/lambda" - "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" ) @@ -26,188 +25,49 @@ func main() { var clock = time.Now -func isSittingHours(now time.Time) bool { - loc, err := time.LoadLocation("America/Toronto") - if err != nil { - return true // fail open if tzdata is missing - } - t := now.In(loc) - offset := t.Hour()*60 + t.Minute() +type funcClock func() time.Time - switch t.Weekday() { - case time.Monday: - return offset >= 10*60+30 && offset <= 19*60 - case time.Tuesday, time.Wednesday, time.Thursday: - return offset >= 9*60+30 && offset <= 19*60 - case time.Friday: - return offset >= 9*60+30 && offset <= 15*60 - default: - return false - } +func (f funcClock) Now() time.Time { + return f() } func HandleRequest(ctx context.Context) error { - if !isSittingHours(clock()) { - return nil - } - parliamentURL := os.Getenv("EPAC_PARLIAMENT_DIVISIONS_URL") if parliamentURL == "" { parliamentURL = "https://www.ourcommons.ca/members/en/votes/api/divisions" } - resp, err := http.Get(parliamentURL) - if err != nil { - return fmt.Errorf("fetch parliament divisions: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("parliament API returned %d", resp.StatusCode) - } - - var data struct { - Divisions []map[string]any `json:"divisions"` - } - if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { - return fmt.Errorf("decode parliament response: %w", err) - } - - for _, div := range data.Divisions { - status, _ := div["status"].(string) - if status != "concluded" { - continue - } - - divID, _ := div["division_id"].(float64) - parl, _ := div["parliament"].(float64) - sess, _ := div["session"].(float64) - - if divID == 0 || parl == 0 || sess == 0 { - // Skip malformed division - continue - } - - if err := processConcludedDivision(ctx, div, int(parl), int(sess), int(divID)); err != nil { - return fmt.Errorf("process division %d: %w", int(divID), err) - } - } - - return nil -} - -func processConcludedDivision(ctx context.Context, div map[string]any, parl, sess, divID int) error { - key := fmt.Sprintf("votes/live/%d-%d-%d.json", parl, sess, divID) - - exists, err := artifactExists(ctx, key) - if err != nil { - return err - } - if exists { - return nil - } - - payload, err := json.Marshal(div) - if err != nil { - return err - } - if err := writeArtifact(ctx, key, payload); err != nil { - return err - } - dispatcherURL := os.Getenv("EPAC_PUSH_DISPATCHER_URL") - if dispatcherURL != "" { - req, err := http.NewRequestWithContext(ctx, "POST", dispatcherURL, bytes.NewReader(payload)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - res, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - defer res.Body.Close() - if res.StatusCode != http.StatusAccepted && res.StatusCode != http.StatusOK { - return fmt.Errorf("dispatcher returned %d", res.StatusCode) - } - } - - return nil -} - -func artifactExists(ctx context.Context, key string) (bool, error) { artifactsDir := os.Getenv("EPAC_ARTIFACTS_DIR") - if artifactsDir != "" { - path := filepath.Join(artifactsDir, filepath.FromSlash(key)) - _, err := os.Stat(path) - if err == nil { - return true, nil - } - if os.IsNotExist(err) { - return false, nil - } - return false, err - } - bucket := os.Getenv("EPAC_ARTIFACT_BUCKET") - if bucket == "" { - return false, nil - } - - cfg, err := config.LoadDefaultConfig(ctx) - if err != nil { - return false, err - } - client := s3.NewFromConfig(cfg) - prefix := os.Getenv("EPAC_ARTIFACT_PREFIX") - s3Key := key - if prefix != "" { - s3Key = prefix + "/" + key - } - - _, err = client.HeadObject(ctx, &s3.HeadObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(s3Key), - }) - if err != nil { - return false, nil - } - return true, nil -} -func writeArtifact(ctx context.Context, key string, payload []byte) error { - artifactsDir := os.Getenv("EPAC_ARTIFACTS_DIR") - if artifactsDir != "" { - path := filepath.Join(artifactsDir, filepath.FromSlash(key)) - if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + var s3Client *s3.Client + if bucket != "" { + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { return err } - return os.WriteFile(path, payload, 0644) - } - - bucket := os.Getenv("EPAC_ARTIFACT_BUCKET") - if bucket == "" { - return nil - } - - cfg, err := config.LoadDefaultConfig(ctx) - if err != nil { - return err - } - client := s3.NewFromConfig(cfg) - - prefix := os.Getenv("EPAC_ARTIFACT_PREFIX") - s3Key := key - if prefix != "" { - s3Key = prefix + "/" + key - } - - _, err = client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(bucket), - Key: aws.String(s3Key), - Body: bytes.NewReader(payload), - ContentType: aws.String("application/json"), - }) - return err + s3Client = s3.NewFromConfig(cfg) + } + + uc := &usecase.PollLiveDivisions{ + Fetcher: &ourcommons.DivisionsClient{ + URL: parliamentURL, + Client: http.DefaultClient, + }, + Repository: &artifacts.Repository{ + Dir: artifactsDir, + Bucket: bucket, + Prefix: prefix, + Client: s3Client, + }, + Dispatcher: &push.Dispatcher{ + URL: dispatcherURL, + Client: http.DefaultClient, + }, + Clock: funcClock(clock), + } + + return uc.Execute(ctx) } diff --git a/docs/architecture/use-case-catalog.md b/docs/architecture/use-case-catalog.md index ebdd44e6..93ffcfe3 100644 --- a/docs/architecture/use-case-catalog.md +++ b/docs/architecture/use-case-catalog.md @@ -138,6 +138,9 @@ to the issue that will build the missing artifact. | `MembersSource` | backend Go | outbound | Implemented: `backend/members-indexer/internal/usecase/usecase.go`; adapter: `backend/members-indexer/internal/adapter/ourcommons/fetcher.go`. | Fetch House member roster details, biographies, attendance rows, and private member's bill sponsorship links for the relational members artifact. | | `SQLiteWriter` | backend Go | outbound | Implemented: `backend/members-indexer/internal/usecase/usecase.go`; adapter: `backend/members-indexer/internal/adapter/sqlite/writer.go`. | Write members, biographies, attendance records, PMB sponsorships, and build metadata into `members.db`. | | `SQLiteUploader` | backend Go | outbound | Implemented: `backend/members-indexer/internal/usecase/usecase.go`; adapter: `backend/members-indexer/internal/adapter/s3/s3.go`. | Upload the members SQLite artifact to S3 and return immutable size/hash metadata for the manifest. | +| `DivisionsFetching` | backend Go | outbound | Implemented: `backend/live-vote-poller/internal/usecase/poll_live_divisions.go`; adapter: `backend/live-vote-poller/internal/adapter/ourcommons/divisions_client.go`. | Fetch live parliamentary divisions from ourcommons.ca. | +| `ArtifactRepository` | backend Go | outbound | Implemented: `backend/live-vote-poller/internal/usecase/poll_live_divisions.go`; adapter: `backend/live-vote-poller/internal/adapter/artifacts/repository.go`. | Check existence and persist completed vote payload artifacts. | +| `PushDispatching` | backend Go | outbound | Implemented: `backend/live-vote-poller/internal/usecase/poll_live_divisions.go`; adapter: `backend/live-vote-poller/internal/adapter/push/dispatcher.go`. | Forward concluded division payloads to the push-notification dispatcher. | ## Use Cases @@ -1208,6 +1211,28 @@ Current implementation: --- +### PollLiveDivisions + +``` +Actor: System (cron scheduler) +Goal: Fetch active parliamentary divisions, identify concluded votes, save them as artifacts, and dispatch a push notification payload. +Inputs: Current time (to verify sitting hours). +Outputs: Concluded division artifacts written to storage, push notifications dispatched. +Entities / values: Division. +Ports: backend Go: `DivisionsFetching`, `ArtifactRepository`, `PushDispatching`, `Clock`. +Primary adapters: backend/live-vote-poller Lambda handler, ourcommons API client, S3/local-disk artifact repository, HTTP push dispatcher client. +Current implementation: + backend/live-vote-poller/main.go + backend/live-vote-poller/internal/usecase/poll_live_divisions.go + backend/live-vote-poller/internal/adapter/ourcommons/divisions_client.go + backend/live-vote-poller/internal/adapter/artifacts/repository.go + backend/live-vote-poller/internal/adapter/push/dispatcher.go +``` + +> Boundary rule: The usecase checks sitting hours and division status independently of how AWS schedules the lambda or how HTTP requests are formed. + +--- + ## Boundary Check Run locally to verify inward files do not import framework types: