Skip to content

Commit

Permalink
use pool
Browse files Browse the repository at this point in the history
  • Loading branch information
mattn committed Oct 7, 2024
1 parent c77a589 commit 73e909a
Showing 1 changed file with 16 additions and 39 deletions.
55 changes: 16 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ var revision = "HEAD"

var (
//feedRelay = "wss://universe.nostrich.land/?lang=ja"
feedRelay = "wss://relay-jp.nostr.wirednet.jp"
feedRelays = []string{
"wss://relay-jp.nostr.wirednet.jp",
"wss://yabu.me",
}

postRelays = []string{
"wss://nostr-relay.nokotaro.com",
//"wss://nostr-relay.nokotaro.com",
"wss://relay-jp.nostr.wirednet.jp",
"wss://nostr.holybea.com",
"wss://relay.snort.social",
"wss://yabu.me",
//"wss://nostr.holybea.com",
//"wss://relay.snort.social",
"wss://relay.damus.io",
"wss://relay.nostrich.land",
"wss://nostr.h3z.jp",
//"wss://relay.nostrich.land",
//"wss://nostr.h3z.jp",
}

nsec string
Expand Down Expand Up @@ -207,26 +211,15 @@ func server(from *time.Time) {
enc := json.NewEncoder(os.Stdout)

log.Println("Connecting to relay")
relay, err := nostr.RelayConnect(context.Background(), feedRelay)
if err != nil {
log.Println(err)
return
}
defer relay.Close()

log.Println("Connected to relay")
pool := nostr.NewSimplePool(context.Background())

events := make(chan *nostr.Event, 100)
timestamp := nostr.Timestamp(from.Unix())
filters := []nostr.Filter{{
Kinds: []int{nostr.KindTextNote, nostr.KindChannelMessage},
Since: &timestamp,
}}
sub, err := relay.Subscribe(context.Background(), filters)
if err != nil {
log.Println(err)
return
}
sub := pool.SubMany(context.Background(), feedRelays, filters)

hbtimer := time.NewTicker(5 * time.Minute)
defer hbtimer.Stop()
Expand All @@ -236,7 +229,6 @@ func server(from *time.Time) {
go func(wg *sync.WaitGroup, events chan *nostr.Event) {
defer wg.Done()

retry := 0
log.Println("Start")
events_loop:
for {
Expand Down Expand Up @@ -271,33 +263,18 @@ func server(from *time.Time) {
continue events_loop
}
}
err = analyze(ev)
err := analyze(ev)
if err != nil {
log.Println(err)
continue
}
if ev.CreatedAt.Time().After(*from) {
*from = ev.CreatedAt.Time()
}
retry = 0
case <-hbtimer.C:
if url := os.Getenv("HEARTBEAT_URL"); url != "" {
go heartbeatPush(url)
}
case <-time.After(10 * time.Second):
if relay.ConnectionError != nil {
log.Println(err)
close(events)
sub.Unsub()
break events_loop
}
retry++
log.Println("Health check", retry)
if retry > 60 {
close(events)
sub.Unsub()
break events_loop
}
}
}
log.Println("Finish")
Expand All @@ -307,11 +284,11 @@ func server(from *time.Time) {

loop:
for {
ev, ok := <-sub.Events
if !ok || ev == nil {
ev, ok := <-sub
if !ok {
break loop
}
events <- ev
events <- ev.Event
}
wg.Wait()

Expand Down

0 comments on commit 73e909a

Please sign in to comment.