-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.go
More file actions
147 lines (124 loc) · 3.5 KB
/
main.go
File metadata and controls
147 lines (124 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
_ "github.com/mattn/go-sqlite3"
)
// scrapeStats tracks per-cycle scrape statistics
type scrapeStats struct {
feedsProcessed int64
feedsCached int64
feedsErrored int64
feedsSkipped int64
postsPublished int64
}
func (a *Atomstr) startWorkers(work string) error {
feeds, err := a.dbGetAllFeeds()
if err != nil {
return fmt.Errorf("failed to get feeds: %w", err)
}
if len(*feeds) == 0 {
log.Println("[WARN] No feeds found")
}
log.Printf("[DEBUG] Start %s (%d feeds)", work, len(*feeds))
if work == "scrape" {
prunePublishedPosts(1 * time.Hour)
}
stats := &scrapeStats{}
ch := make(chan feedStruct)
wg := sync.WaitGroup{}
// start the workers
for t := 0; t < maxWorkers; t++ {
wg.Add(1)
switch work {
case "metadata":
go a.processFeedMetadata(ch, &wg, stats)
default:
go a.processFeedURL(ch, &wg, stats)
}
}
// push the lines to the queue channel for processing
for _, feedItem := range *feeds {
ch <- feedItem
}
close(ch) // this will cause the workers to stop and exit their receive loop
wg.Wait() // make sure they all exit
// Log cycle summary at INFO level
if work == "scrape" {
log.Printf("[INFO] Scrape complete: %d feeds (%d cached, %d errors, %d skipped), %d posts published",
stats.feedsProcessed, stats.feedsCached, stats.feedsErrored, stats.feedsSkipped, stats.postsPublished)
} else {
log.Printf("[INFO] Metadata update complete: %d feeds (%d errors)",
stats.feedsProcessed, stats.feedsErrored)
}
return nil
}
func main() {
dryRun := flag.Bool("dry-run", false, "Enable dry-run mode (log JSON instead of publishing to relays)")
feedNew := flag.String("a", "", "Add a new URL to scrape")
feedDelete := flag.String("d", "", "Remove a feed from db")
flag.Bool("l", false, "List all feeds with npubs")
flag.Bool("v", false, "Shows version")
flag.Parse()
dryRunMode = *dryRun
flagset := make(map[string]bool) // map for flag.Visit. get bools to determine set flags
flag.Visit(func(f *flag.Flag) { flagset[f.Name] = true })
logger()
a := &Atomstr{db: dbInit()}
if flagset["a"] {
a.addSource(*feedNew)
} else if flagset["l"] {
if err := a.listFeeds(); err != nil {
log.Printf("[ERROR] %v", err)
}
} else if flagset["d"] {
if err := a.deleteSource(*feedDelete); err != nil {
log.Printf("[ERROR] %v", err)
}
} else if flagset["v"] {
log.Println("[INFO] atomstr version ", atomstrVersion)
} else {
log.Println("[INFO] Starting atomstr v", atomstrVersion)
// slog.Info("Starting atomstr v", atomstrVersion)
go a.webserver()
// first run
if err := a.startWorkers("metadata"); err != nil {
log.Printf("[ERROR] %v", err)
}
if err := a.startWorkers("scrape"); err != nil {
log.Printf("[ERROR] %v", err)
}
metadataTicker := time.NewTicker(metadataInterval)
updateTicker := time.NewTicker(fetchInterval)
cancelChan := make(chan os.Signal, 1)
// catch SIGETRM or SIGINTERRUPT
signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT)
go func() {
for {
select {
case <-metadataTicker.C:
if err := a.startWorkers("metadata"); err != nil {
log.Printf("[ERROR] %v", err)
}
case <-updateTicker.C:
if err := a.startWorkers("scrape"); err != nil {
log.Printf("[ERROR] %v", err)
}
}
}
}()
sig := <-cancelChan
log.Printf("[DEBUG] Caught signal %v", sig)
metadataTicker.Stop()
updateTicker.Stop()
log.Println("[INFO] Closing DB")
a.db.Close()
log.Println("[INFO] Shutting down")
}
}