Publish & Subscribe
Redis provides a Publish/Subscribe messaging paradigm, and Redigo offers a convenient wrapper, PubSubConn
, to work with it.
The PubSubConn
Wrapper
A standard redis.Conn
can be used for Pub/Sub, but it requires manual handling of message types. The PubSubConn
struct wraps a redis.Conn
and simplifies the process of receiving and identifying different kinds of notifications.
To create one, simply assign a connection to it:
c, err := redis.Dial("tcp", ":6379")
// handle err
psc := redis.PubSubConn{Conn: c}
Subscribing to Channels
You can subscribe to channels by name or by pattern.
psc.Subscribe(channel ...interface{})
: Subscribes to one or more channels by their exact names.psc.PSubscribe(pattern ...interface{})
: Subscribes to channels matching the given glob-style patterns.
// Subscribe to 'channel1' and 'channel2'
psc.Subscribe("channel1", "channel2")
// Subscribe to all channels starting with 'news:'
psc.PSubscribe("news:*")
You can also unsubscribe with psc.Unsubscribe()
and psc.PUnsubscribe()
.
Receiving Notifications
Once subscribed, the connection enters a special mode where it can only receive Pub/Sub notifications. The psc.Receive()
method blocks until a notification arrives and returns it as an interface{}
. You can then use a type switch to handle the different notification types.
There are three main types of notifications:
redis.Message
: A message published to a channel you are subscribed to.redis.Subscription
: A confirmation that you have successfully subscribed, unsubscribed, or psubscribed.redis.Pong
: A reply to aPING
command sent on the subscribed connection.
for {
switch v := psc.Receive().(type) {
case redis.Message:
fmt.Printf("Channel: %s, Message: %s\n", v.Channel, v.Data)
case redis.Subscription:
fmt.Printf("Subscription: %s to %s (%d total)\n", v.Kind, v.Channel, v.Count)
case error:
fmt.Println("Error:", v)
return
}
}
A Complete Example
This example demonstrates a robust Pub/Sub listener that includes context-based cancellation and periodic health checks using PING
.
// listenPubSubChannels listens for messages on Redis pubsub channels.
func listenPubSubChannels(ctx context.Context, onMessage func(channel string, data []byte) error, channels ...string) error {
const healthCheckPeriod = time.Minute
c, err := redis.Dial("tcp", ":6379",
redis.DialReadTimeout(healthCheckPeriod+10*time.Second),
)
if err != nil {
return err
}
defer c.Close()
psc := redis.PubSubConn{Conn: c}
if err := psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
return err
}
done := make(chan error, 1)
// Goroutine to receive notifications.
go func() {
for {
switch n := psc.Receive().(type) {
case error:
done <- n
return
case redis.Message:
if err := onMessage(n.Channel, n.Data); err != nil {
done <- err
return
}
case redis.Subscription:
if n.Count == 0 {
done <- nil
return
}
}
}
}()
ticker := time.NewTicker(healthCheckPeriod)
defer ticker.Stop()
loop:
for err == nil {
select {
case <-ticker.C:
// Send PING to check health of connection.
if err = psc.Ping(""); err != nil {
break loop
}
case <-ctx.Done():
break loop
case err := <-done:
// Return error from the receive goroutine.
return err
}
}
// Unsubscribe and wait for goroutine to exit.
psc.Unsubscribe() // nolint: errcheck
return <-done
}