Publish & Subscribe

Redis provides a Publish/Subscribe messaging paradigm, and Redigo offers a convenient wrapper, PubSubConn, to work with it.

The PubSubConn Wrapper

A standard redis.Conn can be used for Pub/Sub, but it requires manual handling of message types. The PubSubConn struct wraps a redis.Conn and simplifies the process of receiving and identifying different kinds of notifications.

To create one, simply assign a connection to it:

c, err := redis.Dial("tcp", ":6379")
// handle err
psc := redis.PubSubConn{Conn: c}

Subscribing to Channels

You can subscribe to channels by name or by pattern.

  • psc.Subscribe(channel ...interface{}): Subscribes to one or more channels by their exact names.
  • psc.PSubscribe(pattern ...interface{}): Subscribes to channels matching the given glob-style patterns.
// Subscribe to 'channel1' and 'channel2'
psc.Subscribe("channel1", "channel2")

// Subscribe to all channels starting with 'news:'
psc.PSubscribe("news:*")

You can also unsubscribe with psc.Unsubscribe() and psc.PUnsubscribe().

Receiving Notifications

Once subscribed, the connection enters a special mode where it can only receive Pub/Sub notifications. The psc.Receive() method blocks until a notification arrives and returns it as an interface{}. You can then use a type switch to handle the different notification types.

There are three main types of notifications:

  • redis.Message: A message published to a channel you are subscribed to.
  • redis.Subscription: A confirmation that you have successfully subscribed, unsubscribed, or psubscribed.
  • redis.Pong: A reply to a PING command sent on the subscribed connection.
for {
    switch v := psc.Receive().(type) {
    case redis.Message:
        fmt.Printf("Channel: %s, Message: %s\n", v.Channel, v.Data)
    case redis.Subscription:
        fmt.Printf("Subscription: %s to %s (%d total)\n", v.Kind, v.Channel, v.Count)
    case error:
        fmt.Println("Error:", v)
        return
    }
}

A Complete Example

This example demonstrates a robust Pub/Sub listener that includes context-based cancellation and periodic health checks using PING.

// listenPubSubChannels listens for messages on Redis pubsub channels.
func listenPubSubChannels(ctx context.Context, onMessage func(channel string, data []byte) error, channels ...string) error {
    const healthCheckPeriod = time.Minute

    c, err := redis.Dial("tcp", ":6379",
        redis.DialReadTimeout(healthCheckPeriod+10*time.Second),
    )
    if err != nil {
        return err
    }
    defer c.Close()

    psc := redis.PubSubConn{Conn: c}
    if err := psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil {
        return err
    }

    done := make(chan error, 1)

    // Goroutine to receive notifications.
    go func() {
        for {
            switch n := psc.Receive().(type) {
            case error:
                done <- n
                return
            case redis.Message:
                if err := onMessage(n.Channel, n.Data); err != nil {
                    done <- err
                    return
                }
            case redis.Subscription:
                if n.Count == 0 {
                    done <- nil
                    return
                }
            }
        }
    }()

    ticker := time.NewTicker(healthCheckPeriod)
    defer ticker.Stop()

loop:
    for err == nil {
        select {
        case <-ticker.C:
            // Send PING to check health of connection.
            if err = psc.Ping(""); err != nil {
                break loop
            }
        case <-ctx.Done():
            break loop
        case err := <-done:
            // Return error from the receive goroutine.
            return err
        }
    }

    // Unsubscribe and wait for goroutine to exit.
    psc.Unsubscribe() // nolint: errcheck
    return <-done
}