Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package artifacts

import (
"bytes"
"context"
"os"
"path/filepath"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
)

type Repository struct {
Dir string
Bucket string
Prefix string
Client *s3.Client
}

func (r *Repository) Exists(ctx context.Context, key string) (bool, error) {
if r.Dir != "" {
path := filepath.Join(r.Dir, filepath.FromSlash(key))
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}

if r.Bucket == "" || r.Client == nil {
return false, nil
}

s3Key := key
if r.Prefix != "" {
s3Key = r.Prefix + "/" + key
}

_, err := r.Client.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(s3Key),
})
if err != nil {
return false, nil
}
return true, nil
}

func (r *Repository) Write(ctx context.Context, key string, payload []byte) error {
if r.Dir != "" {
path := filepath.Join(r.Dir, filepath.FromSlash(key))
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return err
}
return os.WriteFile(path, payload, 0644)
}

if r.Bucket == "" || r.Client == nil {
return nil
}

s3Key := key
if r.Prefix != "" {
s3Key = r.Prefix + "/" + key
}

_, err := r.Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(r.Bucket),
Key: aws.String(s3Key),
Body: bytes.NewReader(payload),
ContentType: aws.String("application/json"),
})
return err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ourcommons

import (
"context"
"encoding/json"
"fmt"
"net/http"

"epac/backend/live-vote-poller/internal/usecase"
)

type DivisionsClient struct {
URL string
Client *http.Client
}

func (c *DivisionsClient) FetchDivisions(ctx context.Context) ([]usecase.Division, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.URL, nil)
if err != nil {
return nil, err
}

resp, err := c.Client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("parliament API returned %d", resp.StatusCode)
}

var data struct {
Divisions []usecase.Division `json:"divisions"`
}
if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return nil, fmt.Errorf("decode parliament response: %w", err)
}

return data.Divisions, nil
}
37 changes: 37 additions & 0 deletions backend/live-vote-poller/internal/adapter/push/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package push

import (
"bytes"
"context"
"fmt"
"net/http"
)

type Dispatcher struct {
URL string
Client *http.Client
}

func (d *Dispatcher) Dispatch(ctx context.Context, payload []byte) error {
if d.URL == "" {
return nil
}

req, err := http.NewRequestWithContext(ctx, "POST", d.URL, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

res, err := d.Client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusAccepted && res.StatusCode != http.StatusOK {
return fmt.Errorf("dispatcher returned %d", res.StatusCode)
}

return nil
}
113 changes: 113 additions & 0 deletions backend/live-vote-poller/internal/usecase/poll_live_divisions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package usecase

import (
"context"
"encoding/json"
"fmt"
"time"
)

type Division map[string]any

type DivisionsFetching interface {
FetchDivisions(ctx context.Context) ([]Division, error)
}

type ArtifactRepository interface {
Exists(ctx context.Context, key string) (bool, error)
Write(ctx context.Context, key string, payload []byte) error
}

type PushDispatching interface {
Dispatch(ctx context.Context, payload []byte) error
}

type Clock interface {
Now() time.Time
}

type PollLiveDivisions struct {
Fetcher DivisionsFetching
Repository ArtifactRepository
Dispatcher PushDispatching
Clock Clock
}

func (u *PollLiveDivisions) Execute(ctx context.Context) error {
if !isSittingHours(u.Clock.Now()) {
return nil
}

divisions, err := u.Fetcher.FetchDivisions(ctx)
if err != nil {
return fmt.Errorf("fetch parliament divisions: %w", err)
}

for _, div := range divisions {
status, _ := div["status"].(string)
if status != "concluded" {
continue
}

divID, _ := div["division_id"].(float64)
parl, _ := div["parliament"].(float64)
sess, _ := div["session"].(float64)

if divID == 0 || parl == 0 || sess == 0 {
// Skip malformed division
continue
}

if err := u.processConcludedDivision(ctx, div, int(parl), int(sess), int(divID)); err != nil {
return fmt.Errorf("process division %d: %w", int(divID), err)
}
}

return nil
}

func isSittingHours(now time.Time) bool {
loc, err := time.LoadLocation("America/Toronto")
if err != nil {
return true // fail open if tzdata is missing
}
t := now.In(loc)
offset := t.Hour()*60 + t.Minute()

switch t.Weekday() {
case time.Monday:
return offset >= 10*60+30 && offset <= 19*60
case time.Tuesday, time.Wednesday, time.Thursday:
return offset >= 9*60+30 && offset <= 19*60
case time.Friday:
return offset >= 9*60+30 && offset <= 15*60
default:
return false
}
}

func (u *PollLiveDivisions) processConcludedDivision(ctx context.Context, div Division, parl, sess, divID int) error {
key := fmt.Sprintf("votes/live/%d-%d-%d.json", parl, sess, divID)

exists, err := u.Repository.Exists(ctx, key)
if err != nil {
return err
}
if exists {
return nil
}

payload, err := json.Marshal(div)
if err != nil {
return err
}
if err := u.Repository.Write(ctx, key, payload); err != nil {
return err
}

if err := u.Dispatcher.Dispatch(ctx, payload); err != nil {
return err
}

return nil
}
Loading
Loading