Compare commits
No commits in common. "cf112ef561151c848cbf59890e0c127a60ddf674" and "d5381a3c33a85f2873a38be0e233759e0a287d7c" have entirely different histories.
cf112ef561
...
d5381a3c33
|
@ -37,7 +37,7 @@ func (b *Broker) Subscribe(name string) (ch chan skylab.BusEvent, err error) {
|
||||||
if ok {
|
if ok {
|
||||||
return nil, errors.New("name already in use")
|
return nil, errors.New("name already in use")
|
||||||
}
|
}
|
||||||
b.logger.Info("subscribe", "name", name)
|
b.logger.Info("new subscriber", "name", name)
|
||||||
ch = make(chan skylab.BusEvent, b.bufsize)
|
ch = make(chan skylab.BusEvent, b.bufsize)
|
||||||
|
|
||||||
b.subs[name] = ch
|
b.subs[name] = ch
|
||||||
|
@ -51,7 +51,6 @@ func (b *Broker) Unsubscribe(name string) {
|
||||||
// remove the channel from the map. We don't need to close it.
|
// remove the channel from the map. We don't need to close it.
|
||||||
b.lock.Lock()
|
b.lock.Lock()
|
||||||
defer b.lock.Unlock()
|
defer b.lock.Unlock()
|
||||||
b.logger.Debug("unsubscribe", "name", name)
|
|
||||||
delete(b.subs, name)
|
delete(b.subs, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +59,6 @@ func (b *Broker) Unsubscribe(name string) {
|
||||||
func (b *Broker) Publish(sender string, message skylab.BusEvent) {
|
func (b *Broker) Publish(sender string, message skylab.BusEvent) {
|
||||||
b.lock.RLock()
|
b.lock.RLock()
|
||||||
defer b.lock.RUnlock()
|
defer b.lock.RUnlock()
|
||||||
b.logger.Debug("publish", "sender", sender, "message", message)
|
|
||||||
for name, ch := range b.subs {
|
for name, ch := range b.subs {
|
||||||
if name == sender {
|
if name == sender {
|
||||||
continue
|
continue
|
||||||
|
@ -68,6 +66,7 @@ func (b *Broker) Publish(sender string, message skylab.BusEvent) {
|
||||||
// non blocking send.
|
// non blocking send.
|
||||||
select {
|
select {
|
||||||
case ch <- message:
|
case ch <- message:
|
||||||
|
b.logger.Debug("sent message", "dest", name, "src", sender)
|
||||||
default:
|
default:
|
||||||
b.logger.Warn("recipient buffer full", "dest", name)
|
b.logger.Warn("recipient buffer full", "dest", name)
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ func serve(cCtx *cli.Context) error {
|
||||||
broker := gotelem.NewBroker(20, logger.WithGroup("broker"))
|
broker := gotelem.NewBroker(20, logger.WithGroup("broker"))
|
||||||
|
|
||||||
// open database
|
// open database
|
||||||
dbPath := "gotelem.db"
|
dbPath := "file::memory:?cache=shared"
|
||||||
if cCtx.IsSet("db") {
|
if cCtx.IsSet("db") {
|
||||||
dbPath = cCtx.Path("db")
|
dbPath = cCtx.Path("db")
|
||||||
}
|
}
|
||||||
|
|
|
@ -144,8 +144,6 @@ func (sck *CanSocket) Send(msg *can.Frame) error {
|
||||||
idToWrite |= unix.CAN_RTR_FLAG
|
idToWrite |= unix.CAN_RTR_FLAG
|
||||||
case can.CanErrFrame:
|
case can.CanErrFrame:
|
||||||
return errors.New("you can't send error frames")
|
return errors.New("you can't send error frames")
|
||||||
case can.CanDataFrame:
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return errors.New("unknown frame type")
|
return errors.New("unknown frame type")
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,7 +51,7 @@ func TestCanSocket(t *testing.T) {
|
||||||
err := sock.Send(testFrame)
|
err := sock.Send(testFrame)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue