49 lines
925 B
Go
49 lines
925 B
Go
package handlers
|
|
|
|
import (
|
|
"sync"
|
|
)
|
|
|
|
type Broker struct {
|
|
mu sync.Mutex
|
|
subs map[string]map[chan struct{}]struct{} // key = path, val = set of channels
|
|
}
|
|
|
|
func NewBroker() *Broker {
|
|
return &Broker{subs: make(map[string]map[chan struct{}]struct{})}
|
|
}
|
|
|
|
func (b *Broker) Subscribe(path string) chan struct{} {
|
|
ch := make(chan struct{}, 1)
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if b.subs[path] == nil {
|
|
b.subs[path] = make(map[chan struct{}]struct{})
|
|
}
|
|
b.subs[path][ch] = struct{}{}
|
|
return ch
|
|
}
|
|
|
|
func (b *Broker) Unsubscribe(path string, ch chan struct{}) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if set, ok := b.subs[path]; ok {
|
|
delete(set, ch)
|
|
if len(set) == 0 {
|
|
delete(b.subs, path)
|
|
}
|
|
}
|
|
close(ch)
|
|
}
|
|
|
|
func (b *Broker) Publish(path string) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
for ch := range b.subs[path] {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|