gotelem/broker.go

77 lines
1.8 KiB
Go
Raw Normal View History

2023-05-09 15:25:01 +00:00
package gotelem
2023-05-25 18:01:50 +00:00
import (
"errors"
"sync"
2023-05-26 20:24:51 +00:00
"log/slog"
2023-05-29 00:39:03 +00:00
"github.com/kschamplin/gotelem/skylab"
2023-05-25 18:01:50 +00:00
)
2023-05-09 15:25:01 +00:00
2024-03-07 19:30:32 +00:00
// Broker is a Bus Event broadcast system. You can subscribe to events,
// and send events.
2023-06-30 04:59:16 +00:00
type Broker struct {
2023-05-29 00:39:03 +00:00
subs map[string]chan skylab.BusEvent // contains the channel for each subsciber
2023-05-25 18:01:50 +00:00
2023-05-29 00:39:03 +00:00
logger *slog.Logger
lock sync.RWMutex
2023-05-26 20:24:51 +00:00
bufsize int // size of chan buffer in elements.
}
2024-03-07 19:30:32 +00:00
// NewBroker creates a new broker with a given logger.
2023-06-30 04:59:16 +00:00
func NewBroker(bufsize int, logger *slog.Logger) *Broker {
return &Broker{
2023-05-29 00:39:03 +00:00
subs: make(map[string]chan skylab.BusEvent),
logger: logger,
2023-05-26 20:24:51 +00:00
bufsize: bufsize,
}
2023-05-25 18:01:50 +00:00
}
2024-03-07 19:30:32 +00:00
// Subscribe joins the broker with the given name. The name must be unique.
2023-06-30 04:59:16 +00:00
func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
2023-05-25 18:01:50 +00:00
// get rw lock.
b.lock.Lock()
defer b.lock.Unlock()
_, ok := b.subs[name]
if ok {
return nil, errors.New("name already in use")
}
2024-03-07 21:05:51 +00:00
b.logger.Info("subscribe", "name", name)
2023-05-29 00:39:03 +00:00
ch = make(chan skylab.BusEvent, b.bufsize)
2023-05-25 18:01:50 +00:00
2023-05-26 22:22:44 +00:00
b.subs[name] = ch
2023-05-25 18:01:50 +00:00
return
}
2024-03-07 19:30:32 +00:00
// Unsubscribe removes a subscriber matching the name. It doesn't do anything
// if there's nobody subscribed with that name
2023-06-30 04:59:16 +00:00
func (b *Broker) Unsubscribe(name string) {
2023-05-26 20:24:51 +00:00
// remove the channel from the map. We don't need to close it.
2023-05-25 18:01:50 +00:00
b.lock.Lock()
defer b.lock.Unlock()
2024-03-07 21:05:51 +00:00
b.logger.Debug("unsubscribe", "name", name)
2023-05-25 18:01:50 +00:00
delete(b.subs, name)
}
2024-03-07 19:30:32 +00:00
// Publish sends a bus event to all subscribers. It includes a sender
// string which prevents loopback.
2023-06-30 04:59:16 +00:00
func (b *Broker) Publish(sender string, message skylab.BusEvent) {
2023-05-26 20:24:51 +00:00
b.lock.RLock()
defer b.lock.RUnlock()
2024-03-07 21:05:51 +00:00
b.logger.Debug("publish", "sender", sender, "message", message)
2023-05-26 20:24:51 +00:00
for name, ch := range b.subs {
if name == sender {
continue
2023-05-25 18:01:50 +00:00
}
2023-05-26 20:24:51 +00:00
// non blocking send.
select {
case ch <- message:
default:
b.logger.Warn("recipient buffer full", "dest", name)
}
}
2023-05-25 18:01:50 +00:00
}