service Deliver { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received rpc Deliver (stream common.Envelope) returns (stream DeliverResponse) { } // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received rpc DeliverFiltered (stream common.Envelope) returns (stream DeliverResponse) { } }
// DeliverClient is the client API for Deliver service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type DeliverClient interface { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error) // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error) }
DeliverServer是服务端的接口,需要Peer实现。
1 2 3 4 5 6 7 8 9 10 11
// DeliverServer is the server API for Deliver service. type DeliverServer interface { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received Deliver(Deliver_DeliverServer) error // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received DeliverFiltered(Deliver_DeliverFilteredServer) error }
// server holds the dependencies necessary to create a deliver server type server struct { dh *deliver.Handler policyCheckerProvider PolicyCheckerProvider }
// Iterator is useful for a chain Reader to stream blocks as they are created type Iterator interface { // Next blocks until there is a new block available, or returns an error if // the next block is no longer retrievable Next() (*cb.Block, cb.Status) // Close releases resources acquired by the Iterator Close() }
// Next blocks until there is a new block available, or returns an error if the // next block is no longer retrievable func(cu *cursor)Next()(*cb.Block, cb.Status) { // This only loops once, as signal reading indicates non-nil next // 实际只执行1次 for { // 拿到区块 next := cu.list.getNext() if next != nil { cu.list = next return cu.list.block, cb.Status_SUCCESS } <-cu.list.signal } }
// deliverProvider is the connection provider used for connecting to the Deliver service var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer)(api.Connection, error) { if peer == nil { returnnil, errors.New("Peer is nil") }
eventEndpoint, ok := peer.(api.EventEndpoint) if !ok { panic("peer is not an EventEndpoint") } return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL(), eventEndpoint.Opts()...) }
// Dispatcher is responsible for handling all events, including connection and registration events originating from the client, // and events originating from the channel event service. All events are processed in a single Go routine // in order to avoid any race conditions and to ensure that events are processed in the order in which they are received. // This also avoids the need for synchronization. // The lastBlockNum member MUST be first to ensure it stays 64-bit aligned on 32-bit machines. type Dispatcher struct { lastBlockNum uint64// Must be first, do not move params updateLastBlockInfoOnly bool state int32 eventch chaninterface{} blockRegistrations []*BlockReg filteredBlockRegistrations []*FilteredBlockReg handlers map[reflect.Type]Handler txRegistrations map[string]*TxStatusReg ccRegistrations map[string]*ChaincodeReg }
注册事件
这是Dispatcher的事件注册函数,在它眼里,不止有4个事件:
1 2 3 4 5 6 7 8 9 10
// RegisterHandler registers an event handler func(ed *Dispatcher)RegisterHandler(t interface{}, h Handler) { htype := reflect.TypeOf(t) if _, ok := ed.handlers[htype]; !ok { logger.Debugf("Registering handler for %s on dispatcher %T", htype, ed) ed.handlers[htype] = h } else { logger.Debugf("Cannot register handler %s on dispatcher %T since it's already registered", htype, ed) } }
注册各注册事件的处理函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// RegisterHandlers registers all of the handlers by event type func(ed *Dispatcher)RegisterHandlers() { ed.RegisterHandler(&RegisterChaincodeEvent{}, ed.handleRegisterCCEvent) ed.RegisterHandler(&RegisterTxStatusEvent{}, ed.handleRegisterTxStatusEvent) ed.RegisterHandler(&RegisterBlockEvent{}, ed.handleRegisterBlockEvent) ed.RegisterHandler(&RegisterFilteredBlockEvent{}, ed.handleRegisterFilteredBlockEvent) ed.RegisterHandler(&UnregisterEvent{}, ed.handleUnregisterEvent) ed.RegisterHandler(&StopEvent{}, ed.HandleStopEvent) ed.RegisterHandler(&TransferEvent{}, ed.HandleTransferEvent) ed.RegisterHandler(&StopAndTransferEvent{}, ed.HandleStopAndTransferEvent) ed.RegisterHandler(&RegistrationInfoEvent{}, ed.handleRegistrationInfoEvent)
// The following events are used for testing only ed.RegisterHandler(&fab.BlockEvent{}, ed.handleBlockEvent) ed.RegisterHandler(&fab.FilteredBlockEvent{}, ed.handleFilteredBlockEvent) }
接收Peer事件
handleEvent用来处理来自Peer的事件,不同的类型调用不同的handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
func(ed *Dispatcher)handleEvent(e esdispatcher.Event) { delevent := e.(*connection.Event) evt := delevent.Event.(*pb.DeliverResponse) switch response := evt.Type.(type) { case *pb.DeliverResponse_Status: ed.handleDeliverResponseStatus(response) case *pb.DeliverResponse_Block: ed.HandleBlock(response.Block, delevent.SourceURL) case *pb.DeliverResponse_FilteredBlock: ed.HandleFilteredBlock(response.FilteredBlock, delevent.SourceURL) default: logger.Errorf("handler not found for deliver response type %T", response) } }
func(ed *Dispatcher)publishBlockEvents(block *cb.Block, sourceURL string) { for _, reg := range ed.blockRegistrations { if !reg.Filter(block) { logger.Debugf("Not sending block event for block #%d since it was filtered out.", block.Header.Number) continue }
if ed.eventConsumerTimeout < 0 { select { case reg.Eventch <- NewBlockEvent(block, sourceURL): default: logger.Warn("Unable to send to block event channel.") } } elseif ed.eventConsumerTimeout == 0 { reg.Eventch <- NewBlockEvent(block, sourceURL) } else { select { case reg.Eventch <- NewBlockEvent(block, sourceURL): case <-time.After(ed.eventConsumerTimeout): logger.Warn("Timed out sending block event.") } } } }
func(ed *Dispatcher)publishFilteredBlockEvents(fblock *pb.FilteredBlock, sourceURL string) { if fblock == nil { logger.Warn("Filtered block is nil. Event will not be published") return }
for _, tx := range fblock.FilteredTransactions { // 发布交易订阅 ed.publishTxStatusEvents(tx, fblock.Number, sourceURL)
// Only send a chaincode event if the transaction has committed if tx.TxValidationCode == pb.TxValidationCode_VALID { txActions := tx.GetTransactionActions() if txActions == nil { continue } iflen(txActions.ChaincodeActions) == 0 { logger.Debugf("No chaincode action found for TxID[%s], block[%d], source URL[%s]", tx.Txid, fblock.Number, sourceURL) } for _, action := range txActions.ChaincodeActions { if action.ChaincodeEvent != nil { // 发布chaincode event订阅 ed.publishCCEvents(action.ChaincodeEvent, fblock.Number, sourceURL) } } } else { logger.Debugf("Cannot publish CCEvents for block[%d] and source URL[%s] since Tx Validation Code[%d] is not valid", fblock.Number, sourceURL, tx.TxValidationCode) } } }
func(ed *Dispatcher)publishTxStatusEvents(tx *pb.FilteredTransaction, blockNum uint64, sourceURL string) { logger.Debugf("Publishing Tx Status event for TxID [%s]...", tx.Txid) if reg, ok := ed.txRegistrations[tx.Txid]; ok { logger.Debugf("Sending Tx Status event for TxID [%s] to registrant...", tx.Txid)
if ed.eventConsumerTimeout < 0 { select { case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL): default: logger.Warn("Unable to send to Tx Status event channel.") } } elseif ed.eventConsumerTimeout == 0 { reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL) } else { select { case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL): case <-time.After(ed.eventConsumerTimeout): logger.Warn("Timed out sending Tx Status event.") } } } }
func(ed *Dispatcher)publishCCEvents(ccEvent *pb.ChaincodeEvent, blockNum uint64, sourceURL string) { for _, reg := range ed.ccRegistrations { logger.Debugf("Matching CCEvent[%s,%s] against Reg[%s,%s] ...", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter) if reg.ChaincodeID == ccEvent.ChaincodeId && reg.EventRegExp.MatchString(ccEvent.EventName) { logger.Debugf("... matched CCEvent[%s,%s] against Reg[%s,%s]", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter)
if ed.eventConsumerTimeout < 0 { select { case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL): default: logger.Warn("Unable to send to CC event channel.") } } elseif ed.eventConsumerTimeout == 0 { reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL) } else { select { case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL): case <-time.After(ed.eventConsumerTimeout): logger.Warn("Timed out sending CC event.") } } } } }