github.com/ethereum/go-ethereum/event 包实现了一个事件发布订阅的库,使用接口主要是 event.Feed 类型,以前还有 event.TypeMux 类型,看代码注释,说过时了,目前主要使用 Feed 类型。
package main ?import ( ?"fmt" ?"sync" ?"github.com/ethereum/go-ethereum/event" ?) ?func main() { ?type someEvent struct{ I int } ?var feed event.Feed ?var wg sync.WaitGroup ?ch := make(chan someEvent) ?sub := feed.Subscribe(ch) ?wg.Add(1) ?go func() { ?defer wg.Done() ?for event := range ch { ?fmt.Printf("Received: %#v\n", event.I) ?} ?sub.Unsubscribe() ?fmt.Println("done") ?}() ?feed.Send(someEvent{5}) ?feed.Send(someEvent{10}) ?feed.Send(someEvent{7}) ?feed.Send(someEvent{14}) ?close(ch) ?wg.Wait() ?} |
通过调用 event.Feed 类型的Subscrible方法订阅事件通知,需要使用者提前指定接收事件的 channel,Subscribe 返回 Subscription 对象,是一个接口类型:
Err() 返回获取error 的channel,调用Unsubscribe()取消事件订阅。事件的发布者调用 Send() 方法,发送事件。type Subscription interface {
?Err() // returns the error channel
?Unsubscribe()
?// cancels sending of events, closing the error channel
?}
可以使用同一个channel实例,多次调用Feed 的Subscrible()方法:
这个例子中, 有三个订阅者, 有三个发送者, 每个发送者发送三次1, 同一个channel ch 里面被推送了9个1。package main
?import (
?"fmt"
?"sync"
?"github.com/ethereum/go-ethereum/event"
?)
?func main() {
?var (
?feed event.Feed
?recv sync.WaitGroup
?sender sync.WaitGroup
?)
?ch := make(chan int)
?feed.Subscribe(ch)
?feed.Subscribe(ch)
?feed.Subscribe(ch)
?expectSends := func(value, n int) {
?defer sender.Done()
?if nsent := feed.Send(value); nsent != n {
?fmt.Printf("send delivered %d times, want %d\n", nsent, n)
?}
?}
?expectRecv := func(wantValue, n int) {
?defer recv.Done()
?for v := range ch {
?if v != wantValue {
?fmt.Printf("received %d, want %d\n", v, wantValue)
?} else {
?fmt.Printf("recv v = %d\n", v)
?}
?}
?}
?sender.Add(3)
?for i := 0; i < 3; i++ {
?go expectSends(1, 3)
?}
?go func() {
?sender.Wait()
?close(ch)
?}()
?recv.Add(1)
?go expectRecv(1, 3)
?recv.Wait()
?}
ethereum event 库还提供了一些高级别的方便接口, 比如event.NewSubscription函数,接收一个函数类型,作为数据的生产者, producer本身在后台一个单独的goroutine内执行, 后台goroutine往用户的channel 发送数据:package main
?import (
?"fmt"
您可能想查找下面的文章: