Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.obsidian
user.creds
.DS_STORE
65 changes: 65 additions & 0 deletions 21-nats-overview/client/cli-durable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package client

import (
"bufio"
"fmt"
"os"
"strings"

"github.com/nats-io/nats.go"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the new jetstream api at github.com/nats-io/nats.go/jetstream

)

func RunCLIDurable() {
// Connect to NATS
fmt.Println(nats.DefaultURL)
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
fmt.Println(err)
return
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
fmt.Println(err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you log.Fatal here instead

}

_, err = js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
})
if err != nil {
fmt.Println(err)
}

// Subscribe to chat messages
nc.Subscribe("chat.messages", func(m *nats.Msg) {
fmt.Printf("Received: %s\n", string(m.Data))
})

// Retrieve chat history
consumer, err := js.PullSubscribe("chat.messages", "")
if err != nil {
fmt.Println(err)
return
}

msgs, err := consumer.Fetch(100)
if err != nil {
fmt.Println(err)
} else {
for _, msg := range msgs {
fmt.Printf("Received: %s\n", string(msg.Data))
}
}

// Publish chat messages
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
msg := scanner.Text()
if strings.ToLower(msg) == "quit" {
break
}
js.Publish("chat.messages", []byte(msg))
}
}
35 changes: 35 additions & 0 deletions 21-nats-overview/client/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package client
import (
"bufio"
"fmt"
"os"
"strings"

"github.com/nats-io/nats.go"
)

func RunCLI() {
// Connect to NATS
fmt.Println(nats.DefaultURL)
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
fmt.Println(err)
return
}
defer nc.Close()

// Subscribe to chat messages
nc.Subscribe("chat", func(m *nats.Msg) {
fmt.Printf("Received: %s\n", string(m.Data))
})

// Publish chat messages
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
msg := scanner.Text()
if strings.ToLower(msg) == "quit" {
break
}
nc.Publish("chat", []byte(msg))
}
}
39 changes: 39 additions & 0 deletions 21-nats-overview/client/twitch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package client

import (
"fmt"

"github.com/gempir/go-twitch-irc/v4"
"github.com/nats-io/nats.go"
)

func RunTwitch(channel string) {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
fmt.Println(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
fmt.Println(err)
}

client := twitch.NewAnonymousClient()

client.OnPrivateMessage(func(message twitch.PrivateMessage) {
chatMessage := fmt.Sprintf("%s: %s (twitch)", message.User.DisplayName, message.Message)

_, err := js.Publish("chat.messages", []byte(chatMessage))
if err != nil {
fmt.Println("Error publishing to NATS:", err)
}
})

client.Join(channel)

err = client.Connect()
if err != nil {
panic(err)
}
}
85 changes: 85 additions & 0 deletions 21-nats-overview/client/web-durable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package client

import (
"fmt"
"net/http"

"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
)

var upgraderJS = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func RunWebDurable() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
fmt.Println(err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
fmt.Println(err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "CHAT",
Subjects: []string{"chat.>"},
})
if err != nil {
fmt.Println(err)
}

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgraderJS.Upgrade(w, r, nil)
if err != nil {
fmt.Println(err)
return
}
defer conn.Close()

sub, err := nc.Subscribe("chat.messages", func(msg *nats.Msg) {
conn.WriteMessage(websocket.TextMessage, msg.Data)
})
if err != nil {
fmt.Println(err)
return
}
defer sub.Unsubscribe()

// Retrieve chat history
consumer, err := js.PullSubscribe("chat.messages", "")
if err != nil {
fmt.Println(err)
return
}

msgs, err := consumer.Fetch(100)
if err != nil {
fmt.Println(err)
} else {
for _, msg := range msgs {
conn.WriteMessage(websocket.TextMessage, msg.Data)
}
}

for {
_, msg, err := conn.ReadMessage()
if err != nil {
fmt.Println(err)
break
}
_, err = js.Publish("chat.messages", msg)
if err != nil {
fmt.Println(err)
}
}
})

http.Handle("/", http.FileServer(http.Dir("./static")))
fmt.Println(http.ListenAndServe(":8080", nil))
}
53 changes: 53 additions & 0 deletions 21-nats-overview/client/web.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package client

import (
"log"
"net/http"

"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
)

var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

func RunWeb() {
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Close()

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
defer conn.Close()

sub, err := nc.Subscribe("chat", func(msg *nats.Msg) {
conn.WriteMessage(websocket.TextMessage, msg.Data)
})
if err != nil {
log.Println(err)
return
}
defer sub.Unsubscribe()

for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println(err)
break
}
nc.Publish("chat", message)
}
})

http.Handle("/", http.FileServer(http.Dir("./static")))
log.Fatal(http.ListenAndServe(":8080", nil))
}
19 changes: 19 additions & 0 deletions 21-nats-overview/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module nats-chat

go 1.23.1

require (
github.com/gempir/go-twitch-irc/v4 v4.0.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.5.8 // indirect
github.com/nats-io/nats-server/v2 v2.10.21 // indirect
github.com/nats-io/nats.go v1.37.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/time v0.6.0 // indirect
)
33 changes: 33 additions & 0 deletions 21-nats-overview/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
github.com/gempir/go-twitch-irc/v4 v4.0.0 h1:sHVIvbWOv9nHXGEErilclxASv0AaQEr/r/f9C0B9aO8=
github.com/gempir/go-twitch-irc/v4 v4.0.0/go.mod h1:QsOMMAk470uxQ7EYD9GJBGAVqM/jDrXBNbuePfTauzg=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
github.com/nats-io/jwt/v2 v2.5.8/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.21 h1:gfG6T06wBdI25XyY2IsauarOc2srWoFxxfsOKjrzoRA=
github.com/nats-io/nats-server/v2 v2.10.21/go.mod h1:I1YxSAEWbXCfy0bthwvNb5X43WwIWMz7gx5ZVPDr5Rc=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
38 changes: 38 additions & 0 deletions 21-nats-overview/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"os"

"nats-chat/client"
)

func main() {
if len(os.Args) < 2 {
fmt.Println("Usage: go run main.go [cli|web]")
return
}

clientType := os.Args[1]
switch clientType {
case "cli":
client.RunCLI()
case "cli-durable":
client.RunCLIDurable()
case "web":
client.RunWeb()
case "web-durable":
client.RunWebDurable()
case "twitch":
if len(os.Args) < 3 {
fmt.Println("Please provide channel name for twitch")
return
}
channelName := os.Args[2]
client.RunTwitch(channelName)
fmt.Printf("Connecting to channel: %s\n", channelName)
default:
fmt.Printf("Unknown client type: %s\n", clientType)
fmt.Println("Usage: go run main.go [cli|web]")
}
}
Loading