// Chain defines a way to inject messages for ordering. // Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer // to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks, // and ultimately write the ledger also supplied via HandleChain. This design allows for two primary flows // 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka) // 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft) type Chain interface { // 普通消息/交易排序 // Order accepts a message which has been processed at a given configSeq. // If the configSeq advances, it is the responsibility of the consenter // to revalidate and potentially discard the message // The consenter may return an error, indicating the message was not accepted Order(env *cb.Envelope, configSeq uint64) error
// 配置消息/交易排序 // Configure accepts a message which reconfigures the channel and will // trigger an update to the configSeq if committed. The configuration must have // been triggered by a ConfigUpdate message. If the config sequence advances, // it is the responsibility of the consenter to recompute the resulting config, // discarding the message if the reconfiguration is no longer valid. // The consenter may return an error, indicating the message was not accepted Configure(config *cb.Envelope, configSeq uint64) error
// 等待排序集群可用 // WaitReady blocks waiting for consenter to be ready for accepting new messages. // This is useful when consenter needs to temporarily block ingress messages so // that in-flight messages can be consumed. It could return error if consenter is // in erroneous states. If this blocking behavior is not desired, consenter could // simply return nil. WaitReady() error
// 当排序集群发送错误时,会关闭返回的通道 // Errored returns a channel which will close when an error has occurred. // This is especially useful for the Deliver client, who must terminate waiting // clients when the consenter is not up to date. Errored() <-chanstruct{}
// 启动当前链 // Start should allocate whatever resources are needed for staying up to date with the chain. // Typically, this involves creating a thread which reads from the ordering source, passes those // messages to a block cutter, and writes the resulting blocks to the ledger. Start()
// 停止当前链,并释放资源 // Halt frees the resources which were allocated for this Chain. Halt() }
// ConsenterSupport provides the resources available to a Consenter implementation. type ConsenterSupport interface { crypto.LocalSigner msgprocessor.Processor
// VerifyBlockSignature verifies a signature of a block with a given optional // configuration (can be nil). VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error
// 提供把消息切成块的接口 // BlockCutter returns the block cutting helper for this channel. BlockCutter() blockcutter.Receiver
// 当前链的orderer配置 // SharedConfig provides the shared config from the channel's current config block. SharedConfig() channelconfig.Orderer
// 当前链的通道配置 // ChannelConfig provides the channel config from the channel's current config block. ChannelConfig() channelconfig.Channel
// 生成区块 // CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger // Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time. CreateNextBlock(messages []*cb.Envelope) *cb.Block
// 读区块 // Block returns a block with the given number, // or nil if such a block doesn't exist. Block(number uint64) *cb.Block
// 写区块 // WriteBlock commits a block to the ledger. WriteBlock(block *cb.Block, encodedMetadataValue []byte)
// 写配置区块并更新配置 // WriteConfigBlock commits a block to the ledger, and applies the config update inside. WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)
// Sequence returns the current config squence. Sequence() uint64
// ChainID returns the channel ID this support is associated with. ChainID() string
// Height returns the number of blocks in the chain this channel is associated with. Height() uint64
// 以原始数据的格式追加区块,不像WriteBlock那样会修改元数据 // Append appends a new block to the ledger in its raw form, // unlike WriteBlock that also mutates its metadata. Append(block *cb.Block) error }
// ChaincodeStubInterface is used by deployable chaincode apps to access and // modify their ledgers type ChaincodeStubInterface interface { // SetEvent allows the chaincode to set an event on the response to the // proposal to be included as part of a transaction. The event will be // available within the transaction in the committed block regardless of the // validity of the transaction. SetEvent(name string, payload []byte) error }
// 从SignatureHeader交易客户端的签名 // validate the signature err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId) if err != nil { // log the exact message on the peer but return a generic error message to // avoid malicious users scanning for channels putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err) sId := &msp.SerializedIdentity{} err := proto.Unmarshal(shdr.Creator, sId) if err != nil { // log the error here as well but still only return the generic error err = errors.Wrap(err, "could not deserialize a SerializedIdentity") putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err) } returnnil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid) } }
// ValidateTransaction checks that the transaction envelope is properly formed funcValidateTransaction(e *common.Envelope, c channelconfig.ApplicationCapabilities)(*common.Payload, pb.TxValidationCode) { putilsLogger.Debugf("ValidateTransactionEnvelope starts for envelope %p", e)
// validate the signature in the envelope err = checkSignatureFromCreator(shdr.Creator, e.Signature, e.Payload, chdr.ChannelId) if err != nil { putilsLogger.Errorf("checkSignatureFromCreator returns err %s", err) returnnil, pb.TxValidationCode_BAD_CREATOR_SIGNATURE }
// continue the validation in a way that depends on the type specified in the header switch common.HeaderType(chdr.Type) { case common.HeaderType_ENDORSER_TRANSACTION: // Verify that the transaction ID has been computed properly. // This check is needed to ensure that the lookup into the ledger // for the same TxID catches duplicates. err = utils.CheckTxID( chdr.TxId, shdr.Nonce, shdr.Creator)
// given a creator, a message and a signature, // this function returns nil if the creator // is a valid cert and the signature is valid funccheckSignatureFromCreator(creatorBytes []byte, sig []byte, msg []byte, ChainID string)error { putilsLogger.Debugf("begin")
// check for nil argument if creatorBytes == nil || sig == nil || msg == nil { return errors.New("nil arguments") }
// 每个链有各自的msp mspObj := mspmgmt.GetIdentityDeserializer(ChainID) if mspObj == nil { return errors.Errorf("could not get msp for channel [%s]", ChainID) }
putilsLogger.Debugf("creator is %s", creator.GetIdentifier())
// 验证证书是否有效 // ensure that creator is a valid certificate err = creator.Validate() if err != nil { return errors.WithMessage(err, "creator certificate is not valid") }
putilsLogger.Debugf("creator is valid")
// validate the signature // 验证签名 err = creator.Verify(msg, sig) if err != nil { return errors.WithMessage(err, "creator's signature over the proposal is not valid") }
// GetIdentityDeserializer returns the IdentityDeserializer for the given chain funcGetIdentityDeserializer(chainID string)msp.IdentityDeserializer { if chainID == "" { return GetLocalMSP() }
return GetManagerForChain(chainID) }
// GetManagerForChain returns the msp manager for the supplied // chain; if no such manager exists, one is created funcGetManagerForChain(chainID string)msp.MSPManager { m.Lock() defer m.Unlock()
// 先从缓存查找 mspMgr, ok := mspMap[chainID] if !ok { // 找不到则新建立当前通道Msp manager mspLogger.Debugf("Created new msp manager for channel `%s`", chainID) mspMgmtMgr := &mspMgmtMgr{msp.NewMSPManager(), false} mspMap[chainID] = mspMgmtMgr mspMgr = mspMgmtMgr } else { // check for internal mspManagerImpl and mspMgmtMgr types. if a different // type is found, it's because a developer has added a new type that // implements the MSPManager interface and should add a case to the logic // above to handle it. if !(reflect.TypeOf(mspMgr).Elem().Name() == "mspManagerImpl" || reflect.TypeOf(mspMgr).Elem().Name() == "mspMgmtMgr") { panic("Found unexpected MSPManager type.") } mspLogger.Debugf("Returning existing manager for channel '%s'", chainID) } return mspMgr }
// MSPManager has been setup for a channel, which indicates whether the channel // exists or not type mspMgmtMgr struct { msp.MSPManager // track whether this MSPManager has been setup successfully up bool }
// DeserializeIdentity returns an identity given its serialized version supplied as argument func(mgr *mspManagerImpl)DeserializeIdentity(serializedID []byte)(Identity, error) { // We first deserialize to a SerializedIdentity to get the MSP ID sId := &msp.SerializedIdentity{} err := proto.Unmarshal(serializedID, sId) if err != nil { returnnil, errors.Wrap(err, "could not deserialize a SerializedIdentity") }
// 获取发送方的msp实例 // we can now attempt to obtain the MSP msp := mgr.mspsMap[sId.Mspid] if msp == nil { returnnil, errors.Errorf("MSP %s is unknown", sId.Mspid) }
switch t := msp.(type) { case *bccspmsp: return t.deserializeIdentityInternal(sId.IdBytes) case *idemixmsp: return t.deserializeIdentityInternal(sId.IdBytes) default: return t.DeserializeIdentity(serializedID) } }
// 反序列化二进制,得到证书,然后用证书获取公钥,使用证书、公钥和msp,创建Identity // deserializeIdentityInternal returns an identity given its byte-level representation func(msp *bccspmsp)deserializeIdentityInternal(serializedIdentity []byte)(Identity, error) { // This MSP will always deserialize certs this way bl, _ := pem.Decode(serializedIdentity) if bl == nil { returnnil, errors.New("could not decode the PEM structure") } cert, err := x509.ParseCertificate(bl.Bytes) if err != nil { returnnil, errors.Wrap(err, "parseCertificate failed") }
// Now we have the certificate; make sure that its fields // (e.g. the Issuer.OU or the Subject.OU) match with the // MSP id that this MSP has; otherwise it might be an attack // TODO! // We can't do it yet because there is no standardized way // (yet) to encode the MSP ID into the x.509 body of a cert
// 从证书中提取公钥,封装一下,满足bccsp.Key接口 pub, err := msp.bccsp.KeyImport(cert, &bccsp.X509PublicKeyImportOpts{Temporary: true}) if err != nil { returnnil, errors.WithMessage(err, "failed to import certificate's public key") }
type identity struct { // id contains the identifier (MSPID and identity identifier) for this instance id *IdentityIdentifier
// cert contains the x.509 certificate that signs the public key of this instance cert *x509.Certificate
// this is the public key of this instance pk bccsp.Key
// reference to the MSP that "owns" this identity msp *bccspmsp }
funcnewIdentity(cert *x509.Certificate, pk bccsp.Key, msp *bccspmsp)(Identity, error) { if mspIdentityLogger.IsEnabledFor(zapcore.DebugLevel) { mspIdentityLogger.Debugf("Creating identity instance for cert %s", certToPEM(cert)) }
// 检查证书 // Sanitize first the certificate cert, err := msp.sanitizeCert(cert) if err != nil { returnnil, err }
// Compute identity identifier
// Use the hash of the identity's certificate as id in the IdentityIdentifier hashOpt, err := bccsp.GetHashOpt(msp.cryptoConfig.IdentityIdentifierHashFunction) if err != nil { returnnil, errors.WithMessage(err, "failed getting hash function options") }
digest, err := msp.bccsp.Hash(cert.Raw, hashOpt) if err != nil { returnnil, errors.WithMessage(err, "failed hashing raw certificate to compute the id of the IdentityIdentifier") }
id := &IdentityIdentifier{ Mspid: msp.name, Id: hex.EncodeToString(digest)}
// Verify checks against a signature and a message // to determine whether this identity produced the // signature; it returns nil if so or an error otherwise func(id *identity)Verify(msg []byte, sig []byte)error { // mspIdentityLogger.Infof("Verifying signature")
// Compute Hash hashOpt, err := id.getHashOpt(id.msp.cryptoConfig.SignatureHashFamily) if err != nil { return errors.WithMessage(err, "failed getting hash function options") }
// SignatureHeaderMaker creates a new SignatureHeader type SignatureHeaderMaker interface { // NewSignatureHeader creates a SignatureHeader with the correct signing identity and a valid nonce NewSignatureHeader() (*cb.SignatureHeader, error) }
SignedProposal |\_ Signature (signature on the Proposal message by the creator specified in the header) \_ Proposal |\_ Header (the header for this proposal) \_ Payload (the payload for this proposal)
proposal.proto这个文件还简要介绍了Client和背书节点之间通信的消息类型和过程。
Proposal
1 2 3 4 5 6 7 8
type SignedProposal struct { // The bytes of Proposal ProposalBytes []byte`protobuf:"bytes,1,opt,name=proposal_bytes,json=proposalBytes,proto3" json:"proposal_bytes,omitempty"` // Signaure over proposalBytes; this signature is to be verified against // the creator identity contained in the header of the Proposal message // marshaled as proposalBytes Signature []byte`protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"` }
1 2 3 4 5 6 7 8 9 10 11 12 13 14
type Proposal struct { // The header of the proposal. It is the bytes of the Header Header []byte`protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` // The payload of the proposal as defined by the type in the proposal // header. Payload []byte`protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` // Optional extensions to the proposal. Its content depends on the Header's // type field. For the type CHAINCODE, it might be the bytes of a // ChaincodeAction message. Extension []byte`protobuf:"bytes,3,opt,name=extension,proto3" json:"extension,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte`json:"-"` XXX_sizecache int32`json:"-"` }
// Header is a generic replay prevention and identity message to include in a signed payload type ChannelHeader struct { Type int32`protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"` // Version indicates message protocol version Version int32`protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` // Timestamp is the local time when the message was created // by the sender Timestamp *timestamp.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Identifier of the channel this message is bound for ChannelId string`protobuf:"bytes,4,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"` // An unique identifier that is used end-to-end. // - set by higher layers such as end user or SDK // - passed to the endorser (which will check for uniqueness) // - as the header is passed along unchanged, it will be // be retrieved by the committer (uniqueness check here as well) // - to be stored in the ledger TxId string`protobuf:"bytes,5,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"` // The epoch in which this header was generated, where epoch is defined based on block height // Epoch in which the response has been generated. This field identifies a // logical window of time. A proposal response is accepted by a peer only if // two conditions hold: // 1. the epoch specified in the message is the current epoch // 2. this message has been only seen once during this epoch (i.e. it hasn't // been replayed) Epoch uint64`protobuf:"varint,6,opt,name=epoch,proto3" json:"epoch,omitempty"` // Extension that may be attached based on the header type Extension []byte`protobuf:"bytes,7,opt,name=extension,proto3" json:"extension,omitempty"` // If mutual TLS is employed, this represents // the hash of the client's TLS certificate TlsCertHash []byte`protobuf:"bytes,8,opt,name=tls_cert_hash,json=tlsCertHash,proto3" json:"tls_cert_hash,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte`json:"-"` XXX_sizecache int32`json:"-"` }
type SignatureHeader struct { // Creator of the message, a marshaled msp.SerializedIdentity Creator []byte`protobuf:"bytes,1,opt,name=creator,proto3" json:"creator,omitempty"` // Arbitrary number that may only be used once. Can be used to detect replay attacks. Nonce []byte`protobuf:"bytes,2,opt,name=nonce,proto3" json:"nonce,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte`json:"-"` XXX_sizecache int32`json:"-"` }
gRPC定义
1 2 3 4 5 6 7 8 9 10 11
// EndorserClient is the client API for Endorser service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type EndorserClient interface { ProcessProposal(ctx context.Context, in *SignedProposal, opts ...grpc.CallOption) (*ProposalResponse, error) }
// EndorserServer is the server API for Endorser service. type EndorserServer interface { ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error) }
// 获取指定账本模拟器 // obtaining once the tx simulator for this proposal. This will be nil // for chainless proposals // Also obtain a history query executor for history queries, since tx simulator does not cover history var txsim ledger.TxSimulator var historyQueryExecutor ledger.HistoryQueryExecutor if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) { if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } ... defer txsim.Done()
txParams := &ccprovider.TransactionParams{ ChannelID: chainID, TxID: txid, SignedProp: signedProp, Proposal: prop, TXSimulator: txsim, HistoryQueryExecutor: historyQueryExecutor, } // this could be a request to a chainless SysCC
// 模拟执行交易,失败则返回背书失败的响应 // 1 -- simulate cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId) if err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } if res != nil { if res.Status >= shim.ERROR { endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid) var cceventBytes []byte if ccevent != nil { cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent) if err != nil { returnnil, errors.Wrap(err, "failed to marshal event bytes") } } pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility) if err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil }
return pResp, nil } }
// 对模拟执行的结果进行签名背书 // 2 -- endorse and get a marshalled ProposalResponse message var pResp *pb.ProposalResponse
// TODO till we implement global ESCC, CSCC for system chaincodes // chainless proposals (such as CSCC) don't have to be endorsed if chainID == "" { pResp = &pb.ProposalResponse{Response: res} } else { // Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
... }
// Set the proposal response payload - it // contains the "return value" from the // chaincode invocation pResp.Response = res
// total failed proposals = ProposalsReceived-SuccessfulProposals e.Metrics.SuccessfulProposals.Add(1) success = true
// 检查是否调用了不可外部(用户)的系统链码 // 先找到链码实例,然后调用链码的方法判断本身是否可调用 // block invocations to security-sensitive system chaincodes if e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) { endorserLogger.Errorf("Error: an attempt was made by %#v to invoke system chaincode %s", shdr.Creator, hdrExt.ChaincodeId.Name) err = errors.Errorf("chaincode %s cannot be invoked through a proposal", hdrExt.ChaincodeId.Name) vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}} return vr, err }
if chainID != "" { // labels that provide context for failure metrics meterLabels := []string{ "channel", chainID, "chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version, }
// 检查交易是否已上链 // Here we handle uniqueness check and ACLs for proposals targeting a chain // Notice that ValidateProposalMessage has already verified that TxID is computed properly if _, err = e.s.GetTransactionByID(chainID, txid); err == nil { // increment failure due to duplicate transactions. Useful for catching replay attacks in // addition to benign retries e.Metrics.DuplicateTxsFailure.With(meterLabels...).Add(1) err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator) vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}} return vr, err }
// 用户链码检查ACL // check ACL only for application chaincodes; ACLs // for system chaincodes are checked elsewhere if !e.s.IsSysCC(hdrExt.ChaincodeId.Name) { // check that the proposal complies with the Channel's writers if err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil { e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1) vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}} return vr, err } } } else { // chainless proposals do not/cannot affect ledger and cannot be submitted as transactions // ignore uniqueness checks; also, chainless proposals are not validated using the policies // of the chain since by definition there is no chain; they are validated against the local // MSP of the peer instead by the call to ValidateProposalMessage above }
// ValidateProposalMessage checks the validity of a SignedProposal message // this function returns Header and ChaincodeHeaderExtension messages since they // have been unmarshalled and validated funcValidateProposalMessage(signedProp *pb.SignedProposal)(*pb.Proposal, *common.Header, *pb.ChaincodeHeaderExtension, error) { if signedProp == nil { returnnil, nil, nil, errors.New("nil arguments") }
putilsLogger.Debugf("ValidateProposalMessage starts for signed proposal %p", signedProp)
// extract the Proposal message from signedProp prop, err := utils.GetProposal(signedProp.ProposalBytes) if err != nil { returnnil, nil, nil, err }
// 1) look at the ProposalHeader hdr, err := utils.GetHeader(prop.Header) if err != nil { returnnil, nil, nil, err }
// validate the header chdr, shdr, err := validateCommonHeader(hdr) if err != nil { returnnil, nil, nil, err }
// 从SignatureHeader交易客户端的签名 // validate the signature err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId) if err != nil { // log the exact message on the peer but return a generic error message to // avoid malicious users scanning for channels putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err) sId := &msp.SerializedIdentity{} err := proto.Unmarshal(shdr.Creator, sId) if err != nil { // log the error here as well but still only return the generic error err = errors.Wrap(err, "could not deserialize a SerializedIdentity") putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err) } returnnil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid) }
// 检查txid的计算是否符合规则 // Verify that the transaction ID has been computed properly. // This check is needed to ensure that the lookup into the ledger // for the same TxID catches duplicates. err = utils.CheckTxID( chdr.TxId, shdr.Nonce, shdr.Creator) if err != nil { returnnil, nil, nil, err }
// 依据不同的proposal类型对proposal分别进行检查 // continue the validation in a way that depends on the type specified in the header switch common.HeaderType(chdr.Type) { case common.HeaderType_CONFIG: //which the types are different the validation is the same //viz, validate a proposal to a chaincode. If we need other //special validation for confguration, we would have to implement //special validation fallthrough case common.HeaderType_ENDORSER_TRANSACTION: // 主要是提取ChaincodeHeaderExtension // validation of the proposal message knowing it's of type CHAINCODE chaincodeHdrExt, err := validateChaincodeProposalMessage(prop, hdr) if err != nil { returnnil, nil, nil, err }
return prop, hdr, chaincodeHdrExt, err default: //NOTE : we proably need a case returnnil, nil, nil, errors.Errorf("unsupported proposal type %d", common.HeaderType(chdr.Type)) } }
// Support contains functions that the endorser requires to execute its tasks type Support interface { crypto.SignerSupport // IsSysCCAndNotInvokableExternal returns true if the supplied chaincode is // ia system chaincode and it NOT invokable IsSysCCAndNotInvokableExternal(name string) bool
// GetTxSimulator returns the transaction simulator for the specified ledger // a client may obtain more than one such simulator; they are made unique // by way of the supplied txid GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error)
}
// GetTxSimulator returns the transaction simulator for the specified ledger // a client may obtain more than one such simulator; they are made unique // by way of the supplied txid func(s *SupportImpl)GetTxSimulator(ledgername string, txid string)(ledger.TxSimulator, error) { // 使用账本和txid创建模拟器,每个交易有单独的模拟器 lgr := s.Peer.GetLedger(ledgername) if lgr == nil { returnnil, errors.Errorf("Channel does not exist: %s", ledgername) } return lgr.NewTxSimulator(txid) }
// LockBasedQueryExecutor is a query executor used in `LockBasedTxMgr` // "只读",不包含写相关的操作 type lockBasedQueryExecutor struct { helper *queryHelper txid string }
txParams := &ccprovider.TransactionParams{ ChannelID: chainID, TxID: txid, SignedProp: signedProp, Proposal: prop, TXSimulator: txsim, // 模拟器在此 HistoryQueryExecutor: historyQueryExecutor, } // this could be a request to a chainless SysCC
// TODO: if the proposal has an extension, it will be of type ChaincodeAction; // if it's present it means that no simulation is to be performed because // we're trying to emulate a submitting peer. On the other hand, we need // to validate the supplied action before endorsing it
// SimulateProposal simulates the proposal by calling the chaincode func(e *Endorser)SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID)(ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) { endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", txParams.ChannelID, shorttxid(txParams.TxID), cid) defer endorserLogger.Debugf("[%s][%s] Exit", txParams.ChannelID, shorttxid(txParams.TxID)) // we do expect the payload to be a ChaincodeInvocationSpec // if we are supporting other payloads in future, this be glaringly point // as something that should change // 根据Proposal生成Invoke需要的信息 cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal) if err != nil { returnnil, nil, nil, nil, err }
// 链码的元数据 var cdLedger ccprovider.ChaincodeDefinition var version string
// 设置version if !e.s.IsSysCC(cid.Name) { // 根据要调用的链码名称,从lscc获取链码的元数据 cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator) if err != nil { returnnil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name)) } version = cdLedger.CCVersion()
// 存在私密数据 if simResult.PvtSimulationResults != nil { if cid.Name == "lscc" { // TODO: remove once we can store collection configuration outside of LSCC txParams.TXSimulator.Done() returnnil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate") } // 获取要通过Gossip传播的私密数据 pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txParams.TXSimulator) // To read collection config need to read collection updates before // releasing the lock, hence txParams.TXSimulator.Done() moved down here txParams.TXSimulator.Done()
if err != nil { returnnil, nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config") } endorsedAt, err := e.s.GetLedgerHeight(txParams.ChannelID) if err != nil { returnnil, nil, nil, nil, errors.WithMessage(err, fmt.Sprint("failed to obtain ledger height for channel", txParams.ChannelID)) } // Add ledger height at which transaction was endorsed, // `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'. // However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and // manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now. // Ideally, ledger should add support in the simulator as a first class function `GetHeight()`. pvtDataWithConfig.EndorsedAt = endorsedAt // 把私密数据同通道id、交易id和区块高度发出去,代表私密数据所属的区块和交易 if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil { returnnil, nil, nil, nil, err } }
// Support contains functions that the endorser requires to execute its tasks type Support interface
// SupportImpl provides an implementation of the endorser.Support interface // issuing calls to various static methods of the peer type SupportImpl struct { *PluginEndorser crypto.SignerSupport Peer peer.Operations PeerSupport peer.Support ChaincodeSupport *chaincode.ChaincodeSupport SysCCProvider *scc.Provider ACLProvider aclmgmt.ACLProvider }
// Launch starts executing chaincode if it is not already running. This method // blocks until the peer side handler gets into ready state or encounters a fatal // error. If the chaincode is already running, it simply returns. func(cs *ChaincodeSupport)Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor)(*Handler, error) { cname := chaincodeName + ":" + chaincodeVersion if h := cs.HandlerRegistry.Handler(cname); h != nil { return h, nil }
// 启动链码容器 ...
h := cs.HandlerRegistry.Handler(cname) if h == nil { returnnil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname) }
return h, nil }
链码容器的启动过程,不是本文的重点,所以不继续深入Launch的调用细节。
模拟执行交易
execute封装出执行交易的消息,然后使用 Handler 执行交易。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// execute executes a transaction and waits for it to complete until a timeout value. func(cs *ChaincodeSupport)execute(cctyp pb.ChaincodeMessage_Type, txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput, h *Handler)(*pb.ChaincodeMessage, error) { input.Decorations = txParams.ProposalDecorations // 创建消息 ccMsg, err := createCCMessage(cctyp, txParams.ChannelID, txParams.TxID, input) if err != nil { returnnil, errors.WithMessage(err, "failed to create chaincode message") }
// TransactionParams are parameters which are tied to a particular transaction // and which are required for invoking chaincode. type TransactionParams struct { TxID string ChannelID string SignedProp *pb.SignedProposal Proposal *pb.Proposal TXSimulator ledger.TxSimulator HistoryQueryExecutor ledger.HistoryQueryExecutor CollectionStore privdata.CollectionStore IsInitTransaction bool
// this is additional data passed to the chaincode ProposalDecorations map[string][]byte }
// 等待链码容器响应,或者超时 var ccresp *pb.ChaincodeMessage select { case ccresp = <-txctx.ResponseNotifier: // response is sent to user or calling chaincode. ChaincodeMessage_ERROR // are typically treated as error case <-time.After(timeout): err = errors.New("timeout expired while executing transaction") ccName := cccid.Name + ":" + cccid.Version h.Metrics.ExecuteTimeouts.With( "chaincode", ccName, ).Add(1) }
switch resp.Type { // 交易执行成功则提取Payload中保存的Response case pb.ChaincodeMessage_COMPLETED: res := &pb.Response{} err := proto.Unmarshal(resp.Payload, res) if err != nil { returnnil, nil, errors.Wrapf(err, "failed to unmarshal response for transaction %s", txid) } return res, resp.ChaincodeEvent, nil
// 失败,则提取Payload中保存的错误信息 case pb.ChaincodeMessage_ERROR: returnnil, resp.ChaincodeEvent, errors.Errorf("transaction returned with failure: %s", resp.Payload)
default: returnnil, nil, errors.Errorf("unexpected response type %d for transaction %s", resp.Type, txid) } }
if chainID == "" { pResp = &pb.ProposalResponse{Response: res} } else { // Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd) // ... } // ... }
// 系统链码和用户链码使用不同的ESCC isSysCC := cd == nil // 1) extract the name of the escc that is requested to endorse this chaincode var escc string // ie, "lscc" or system chaincodes if isSysCC { escc = "escc" } else { escc = cd.Endorsement() }
endorserLogger.Debugf("[%s][%s] escc for chaincode %s is %s", chainID, shorttxid(txid), ccid, escc)
// marshalling event bytes var err error var eventBytes []byte if event != nil { eventBytes, err = putils.GetBytesChaincodeEvent(event) if err != nil { returnnil, errors.Wrap(err, "failed to marshal event bytes") } }
// set version of executing chaincode if isSysCC { // if we want to allow mixed fabric levels we should // set syscc version to "" ccid.Version = util.GetSysCCVersion() } else { ccid.Version = cd.CCVersion() }
// Plugin endorses a proposal response type Plugin interface { // Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it. // Returns: // The Endorsement: A signature over the payload, and an identity that is used to verify the signature // The payload that was given as input (could be modified within this function) // Or error on failure Endorse(payload []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error)
// Init injects dependencies into the instance of the Plugin Init(dependencies ...Dependency) error }
// EndorseWithPlugin endorses the response with a plugin func(pe *PluginEndorser)EndorseWithPlugin(ctx Context)(*pb.ProposalResponse, error) { endorserLogger.Debug("Entering endorsement for", ctx)
if ctx.Response == nil { returnnil, errors.New("response is nil") }
if ctx.Response.Status >= shim.ERRORTHRESHOLD { return &pb.ProposalResponse{Response: ctx.Response}, nil }
// 获取插件 plugin, err := pe.getOrCreatePlugin(PluginName(ctx.PluginName), ctx.Channel) if err != nil { endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err) returnnil, errors.Errorf("plugin with name %s could not be used: %v", ctx.PluginName, err) }
// Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it. // Returns: // The Endorsement: A signature over the payload, and an identity that is used to verify the signature // The payload that was given as input (could be modified within this function) // Or error on failure func(e *DefaultEndorsement)Endorse(prpBytes []byte, sp *peer.SignedProposal)(*peer.Endorsement, []byte, error) { // 提取Proposal的签名人 signer, err := e.SigningIdentityForRequest(sp) if err != nil { returnnil, nil, errors.New(fmt.Sprintf("failed fetching signing identity: %v", err)) } // 得到签名人身份 // serialize the signing identity identityBytes, err := signer.Serialize() if err != nil { returnnil, nil, errors.New(fmt.Sprintf("could not serialize the signing identity: %v", err)) }
// 对Payload和身份进行签名 // sign the concatenation of the proposal response and the serialized endorser identity with this endorser's key signature, err := signer.Sign(append(prpBytes, identityBytes...)) if err != nil { returnnil, nil, errors.New(fmt.Sprintf("could not sign the proposal response payload: %v", err)) } endorsement := &peer.Endorsement{Signature: signature, Endorser: identityBytes} return endorsement, prpBytes, nil }
service Deliver { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received rpc Deliver (stream common.Envelope) returns (stream DeliverResponse) { } // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received rpc DeliverFiltered (stream common.Envelope) returns (stream DeliverResponse) { } }
// DeliverClient is the client API for Deliver service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type DeliverClient interface { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error) // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error) }
DeliverServer是服务端的接口,需要Peer实现。
1 2 3 4 5 6 7 8 9 10 11
// DeliverServer is the server API for Deliver service. type DeliverServer interface { // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of block replies is received Deliver(Deliver_DeliverServer) error // deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with // Payload data as a marshaled orderer.SeekInfo message, // then a stream of **filtered** block replies is received DeliverFiltered(Deliver_DeliverFilteredServer) error }
// server holds the dependencies necessary to create a deliver server type server struct { dh *deliver.Handler policyCheckerProvider PolicyCheckerProvider }
// Iterator is useful for a chain Reader to stream blocks as they are created type Iterator interface { // Next blocks until there is a new block available, or returns an error if // the next block is no longer retrievable Next() (*cb.Block, cb.Status) // Close releases resources acquired by the Iterator Close() }
// Next blocks until there is a new block available, or returns an error if the // next block is no longer retrievable func(cu *cursor)Next()(*cb.Block, cb.Status) { // This only loops once, as signal reading indicates non-nil next // 实际只执行1次 for { // 拿到区块 next := cu.list.getNext() if next != nil { cu.list = next return cu.list.block, cb.Status_SUCCESS } <-cu.list.signal } }
// deliverProvider is the connection provider used for connecting to the Deliver service var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer)(api.Connection, error) { if peer == nil { returnnil, errors.New("Peer is nil") }
eventEndpoint, ok := peer.(api.EventEndpoint) if !ok { panic("peer is not an EventEndpoint") } return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL(), eventEndpoint.Opts()...) }
// Dispatcher is responsible for handling all events, including connection and registration events originating from the client, // and events originating from the channel event service. All events are processed in a single Go routine // in order to avoid any race conditions and to ensure that events are processed in the order in which they are received. // This also avoids the need for synchronization. // The lastBlockNum member MUST be first to ensure it stays 64-bit aligned on 32-bit machines. type Dispatcher struct { lastBlockNum uint64// Must be first, do not move params updateLastBlockInfoOnly bool state int32 eventch chaninterface{} blockRegistrations []*BlockReg filteredBlockRegistrations []*FilteredBlockReg handlers map[reflect.Type]Handler txRegistrations map[string]*TxStatusReg ccRegistrations map[string]*ChaincodeReg }
注册事件
这是Dispatcher的事件注册函数,在它眼里,不止有4个事件:
1 2 3 4 5 6 7 8 9 10
// RegisterHandler registers an event handler func(ed *Dispatcher)RegisterHandler(t interface{}, h Handler) { htype := reflect.TypeOf(t) if _, ok := ed.handlers[htype]; !ok { logger.Debugf("Registering handler for %s on dispatcher %T", htype, ed) ed.handlers[htype] = h } else { logger.Debugf("Cannot register handler %s on dispatcher %T since it's already registered", htype, ed) } }
注册各注册事件的处理函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// RegisterHandlers registers all of the handlers by event type func(ed *Dispatcher)RegisterHandlers() { ed.RegisterHandler(&RegisterChaincodeEvent{}, ed.handleRegisterCCEvent) ed.RegisterHandler(&RegisterTxStatusEvent{}, ed.handleRegisterTxStatusEvent) ed.RegisterHandler(&RegisterBlockEvent{}, ed.handleRegisterBlockEvent) ed.RegisterHandler(&RegisterFilteredBlockEvent{}, ed.handleRegisterFilteredBlockEvent) ed.RegisterHandler(&UnregisterEvent{}, ed.handleUnregisterEvent) ed.RegisterHandler(&StopEvent{}, ed.HandleStopEvent) ed.RegisterHandler(&TransferEvent{}, ed.HandleTransferEvent) ed.RegisterHandler(&StopAndTransferEvent{}, ed.HandleStopAndTransferEvent) ed.RegisterHandler(&RegistrationInfoEvent{}, ed.handleRegistrationInfoEvent)
// The following events are used for testing only ed.RegisterHandler(&fab.BlockEvent{}, ed.handleBlockEvent) ed.RegisterHandler(&fab.FilteredBlockEvent{}, ed.handleFilteredBlockEvent) }
接收Peer事件
handleEvent用来处理来自Peer的事件,不同的类型调用不同的handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
func(ed *Dispatcher)handleEvent(e esdispatcher.Event) { delevent := e.(*connection.Event) evt := delevent.Event.(*pb.DeliverResponse) switch response := evt.Type.(type) { case *pb.DeliverResponse_Status: ed.handleDeliverResponseStatus(response) case *pb.DeliverResponse_Block: ed.HandleBlock(response.Block, delevent.SourceURL) case *pb.DeliverResponse_FilteredBlock: ed.HandleFilteredBlock(response.FilteredBlock, delevent.SourceURL) default: logger.Errorf("handler not found for deliver response type %T", response) } }
func(ed *Dispatcher)publishBlockEvents(block *cb.Block, sourceURL string) { for _, reg := range ed.blockRegistrations { if !reg.Filter(block) { logger.Debugf("Not sending block event for block #%d since it was filtered out.", block.Header.Number) continue }
if ed.eventConsumerTimeout < 0 { select { case reg.Eventch <- NewBlockEvent(block, sourceURL): default: logger.Warn("Unable to send to block event channel.") } } elseif ed.eventConsumerTimeout == 0 { reg.Eventch <- NewBlockEvent(block, sourceURL) } else { select { case reg.Eventch <- NewBlockEvent(block, sourceURL): case <-time.After(ed.eventConsumerTimeout): logger.Warn("Timed out sending block event.") } } } }
func(ed *Dispatcher)publishFilteredBlockEvents(fblock *pb.FilteredBlock, sourceURL string) { if fblock == nil { logger.Warn("Filtered block is nil. Event will not be published") return }
for _, tx := range fblock.FilteredTransactions { // 发布交易订阅 ed.publishTxStatusEvents(tx, fblock.Number, sourceURL)
// Only send a chaincode event if the transaction has committed if tx.TxValidationCode == pb.TxValidationCode_VALID { txActions := tx.GetTransactionActions() if txActions == nil { continue } iflen(txActions.ChaincodeActions) == 0 { logger.Debugf("No chaincode action found for TxID[%s], block[%d], source URL[%s]", tx.Txid, fblock.Number, sourceURL) } for _, action := range txActions.ChaincodeActions { if action.ChaincodeEvent != nil { // 发布chaincode event订阅 ed.publishCCEvents(action.ChaincodeEvent, fblock.Number, sourceURL) } } } else { logger.Debugf("Cannot publish CCEvents for block[%d] and source URL[%s] since Tx Validation Code[%d] is not valid", fblock.Number, sourceURL, tx.TxValidationCode) } } }
func(ed *Dispatcher)publishTxStatusEvents(tx *pb.FilteredTransaction, blockNum uint64, sourceURL string) { logger.Debugf("Publishing Tx Status event for TxID [%s]...", tx.Txid) if reg, ok := ed.txRegistrations[tx.Txid]; ok { logger.Debugf("Sending Tx Status event for TxID [%s] to registrant...", tx.Txid)
if ed.eventConsumerTimeout < 0 { select { case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL): default: logger.Warn("Unable to send to Tx Status event channel.") } } elseif ed.eventConsumerTimeout == 0 { reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL) } else { select { case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL): case <-time.After(ed.eventConsumerTimeout): logger.Warn("Timed out sending Tx Status event.") } } } }
func(ed *Dispatcher)publishCCEvents(ccEvent *pb.ChaincodeEvent, blockNum uint64, sourceURL string) { for _, reg := range ed.ccRegistrations { logger.Debugf("Matching CCEvent[%s,%s] against Reg[%s,%s] ...", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter) if reg.ChaincodeID == ccEvent.ChaincodeId && reg.EventRegExp.MatchString(ccEvent.EventName) { logger.Debugf("... matched CCEvent[%s,%s] against Reg[%s,%s]", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter)
if ed.eventConsumerTimeout < 0 { select { case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL): default: logger.Warn("Unable to send to CC event channel.") } } elseif ed.eventConsumerTimeout == 0 { reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL) } else { select { case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL): case <-time.After(ed.eventConsumerTimeout): logger.Warn("Timed out sending CC event.") } } } } }
2019-09-09 07:52:09.409 UTC [gossip.gossip] start -> INFO 013 Gossip instance peer1.org1.example.com:8051 started 2019-09-09 07:52:09.418 UTC [sccapi] deploySysCC -> INFO 014 system chaincode lscc/(github.com/hyperledger/fabric/core/scc/lscc) deployed 2019-09-09 07:52:09.420 UTC [cscc] Init -> INFO 015 Init CSCC 2019-09-09 07:52:09.422 UTC [sccapi] deploySysCC -> INFO 016 system chaincode cscc/(github.com/hyperledger/fabric/core/scc/cscc) deployed 2019-09-09 07:52:09.424 UTC [qscc] Init -> INFO 017 Init QSCC 2019-09-09 07:52:09.424 UTC [sccapi] deploySysCC -> INFO 018 system chaincode qscc/(github.com/hyperledger/fabric/core/scc/qscc) deployed 2019-09-09 07:52:09.425 UTC [sccapi] deploySysCC -> INFO 019 system chaincode (+lifecycle,github.com/hyperledger/fabric/core/chaincode/lifecycle) disabled ... 2019-09-09 07:52:14.386 UTC [sccapi] deploySysCC -> INFO 031 system chaincode lscc/mychannel(github.com/hyperledger/fabric/core/scc/lscc) deployed 2019-09-09 07:52:14.386 UTC [cscc] Init -> INFO 032 Init CSCC 2019-09-09 07:52:14.386 UTC [sccapi] deploySysCC -> INFO 033 system chaincode cscc/mychannel(github.com/hyperledger/fabric/core/scc/cscc) deployed 2019-09-09 07:52:14.387 UTC [qscc] Init -> INFO 034 Init QSCC 2019-09-09 07:52:14.387 UTC [sccapi] deploySysCC -> INFO 035 system chaincode qscc/mychannel(github.com/hyperledger/fabric/core/scc/qscc) deployed 2019-09-09 07:52:14.387 UTC [sccapi] deploySysCC -> INFO 036 system chaincode (+lifecycle,github.com/hyperledger/fabric/core/chaincode/lifecycle) disabled
// Registrar provides a way for system chaincodes to be registered type Registrar interface { // Register registers a system chaincode Register(ccid *ccintf.CCID, cc shim.Chaincode) error }
//Register registers system chaincode with given path. The deploy should be called to initialize func(r *Registry)Register(ccid *ccintf.CCID, cc shim.Chaincode)error { r.mutex.Lock() defer r.mutex.Unlock()
//DeploySysCCs is the hook for system chaincodes where system chaincodes are registered with the fabric //note the chaincode must still be deployed and launched like a user chaincode will be func(p *Provider)DeploySysCCs(chainID string, ccp ccprovider.ChaincodeProvider) { // 部署每一个scc for _, sysCC := range p.SysCCs { deploySysCC(chainID, ccp, sysCC) } }
// deploySysCC deploys the given system chaincode on a chain funcdeploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC)error { // disable或不在白名单的scc不执行部署 if !syscc.Enabled() || !isWhitelisted(syscc) { sysccLogger.Info(fmt.Sprintf("system chaincode (%s,%s) disabled", syscc.Name(), syscc.Path())) returnnil }
// Note, this structure is barely initialized, // we omit the history query executor, the proposal // and the signed proposal txParams := &ccprovider.TransactionParams{ TxID: txid, ChannelID: chainID, }
// 设置交易执行模拟器,系统通道chainID为"",所以系统通道的scc没有模拟器 if chainID != "" { // 获取链/通道的账本 lgr := peer.GetLedger(chainID) if lgr == nil { panic(fmt.Sprintf("syschain %s start up failure - unexpected nil ledger for channel %s", syscc.Name(), chainID)) }
// ChaincodeProvider provides an abstraction layer that is // used for different packages to interact with code in the // chaincode package without importing it; more methods // should be added below if necessary type ChaincodeProvider interface { // Execute executes a standard chaincode invocation for a chaincode and an input Execute(txParams *TransactionParams, cccid *CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) // ExecuteLegacyInit is a special case for executing chaincode deployment specs, // which are not already in the LSCC, needed for old lifecycle ExecuteLegacyInit(txParams *TransactionParams, cccid *CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) // Stop stops the chaincode give Stop(ccci *ChaincodeContainerInfo) error }
// ExecuteLegacyInit executes a chaincode which is not in the LSCC table func(c *CCProviderImpl)ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec)(*pb.Response, *pb.ChaincodeEvent, error) { return c.cs.ExecuteLegacyInit(txParams, cccid, spec) }
// ExecuteLegacyInit is a temporary method which should be removed once the old style lifecycle // is entirely deprecated. Ideally one release after the introduction of the new lifecycle. // It does not attempt to start the chaincode based on the information from lifecycle, but instead // accepts the container information directly in the form of a ChaincodeDeploymentSpec. func(cs *ChaincodeSupport)ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec)(*pb.Response, *pb.ChaincodeEvent, error) { // 部署链码需要的信息 ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec) ccci.Version = cccid.Version
cname := ccci.Name + ":" + ccci.Version h := cs.HandlerRegistry.Handler(cname) if h == nil { returnnil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname) }
// LaunchInit bypasses getting the chaincode spec from the LSCC table // as in the case of v1.0-v1.2 lifecycle, the chaincode will not yet be // defined in the LSCC table func(cs *ChaincodeSupport)LaunchInit(ccci *ccprovider.ChaincodeContainerInfo)error { cname := ccci.Name + ":" + ccci.Version // 已经有handler,即容器已经启动。调用链码的时候,也会获取handler if cs.HandlerRegistry.Handler(cname) != nil { returnnil }
//VMCReq - all requests should implement this interface. //The context should be passed and tested at each layer till we stop //note that we'd stop on the first method on the stack that does not //take context type VMCReq interface { Do(v VM) error GetCCID() ccintf.CCID }
// 从进程启动链码 func(ipc *inprocContainer)launchInProc(id string, args []string, env []string)error { if ipc.ChaincodeSupport == nil { inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()") }
// 和调用链码的上层通信的2个通道 peerRcvCCSend := make(chan *pb.ChaincodeMessage) ccRcvPeerSend := make(chan *pb.ChaincodeMessage) var err error ccchan := make(chanstruct{}, 1) ccsupportchan := make(chanstruct{}, 1) shimStartInProc := _shimStartInProc // shadow to avoid race in test gofunc() { deferclose(ccchan) // 启动链码 inprocLogger.Debugf("chaincode started for %s", id) if args == nil { args = ipc.args } if env == nil { env = ipc.env } // 利用shim启动 err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend) if err != nil { err = fmt.Errorf("chaincode-support ended with err: %s", err) _inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err) }()
// shadow function to avoid data race inprocLoggerErrorf := _inprocLoggerErrorf gofunc() { deferclose(ccsupportchan) // 处理scc和外部通信的消息流 inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend) inprocLogger.Debugf("chaincode-support started for %s", id) err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream) if err != nil { err = fmt.Errorf("chaincode ended with err: %s", err) inprocLoggerErrorf("%s", err) } inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err) }() }
// 启动SCC的入口 // StartInProc is an entry point for system chaincodes bootstrap. It is not an // API for chaincodes. funcStartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage)error { // 有点奇怪,这些日志都没有看到,因为已经在shim,不属于peer日志了 chaincodeLogger.Debugf("in proc %v", args)
// 从环境变量获取cc name var chaincodename string for _, v := range env { if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 { p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=") chaincodename = p[1] break } } if chaincodename == "" { return errors.New("error chaincode id not provided") }
//send may happen on a closed channel when the system is //shutting down. Just catch the exception and return error deferfunc() { if r := recover(); r != nil { err = SendPanicFailure(fmt.Sprintf("%s", r)) return } }() s.send <- msg return }
// 接收是从recv读数据 func(s *inProcStream)Recv()(*pb.ChaincodeMessage, error) { msg, ok := <-s.recv if !ok { returnnil, errors.New("channel is closed") } return msg, nil }
2019-09-09 07:52:09.915 UTC [chaincode] LaunchConfig -> DEBU 098 launchConfig: executable:"chaincode",Args:[chaincode,-peer.address=peer0.org1.example.com:7052],Envs:[CORE_CHAINCODE_LOGGING_LEVEL=info,CORE_CHAINCODE_LOGGING_SHIM=warning,CORE_CHAINCODE_LOGGING_FORMAT=%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message},CORE_CHAINCODE_ID_NAME=lscc:1.4.3,CORE_PEER_TLS_ENABLED=true,CORE_TLS_CLIENT_KEY_PATH=/etc/hyperledger/fabric/client.key,CORE_TLS_CLIENT_CERT_PATH=/etc/hyperledger/fabric/client.crt,CORE_PEER_TLS_ROOTCERT_FILE=/etc/hyperledger/fabric/peer.crt],Files:[/etc/hyperledger/fabric/client.crt /etc/hyperledger/fabric/client.key /etc/hyperledger/fabric/peer.crt] 2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 099 start container: lscc:1.4.3 2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 09a start container with args: chaincode -peer.address=peer0.org1.example.com:7052 2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 09b start container with env: CORE_CHAINCODE_LOGGING_LEVEL=info CORE_CHAINCODE_LOGGING_SHIM=warning CORE_CHAINCODE_LOGGING_FORMAT=%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message} CORE_CHAINCODE_ID_NAME=lscc:1.4.3 CORE_PEER_TLS_ENABLED=true CORE_TLS_CLIENT_KEY_PATH=/etc/hyperledger/fabric/client.key CORE_TLS_CLIENT_CERT_PATH=/etc/hyperledger/fabric/client.crt CORE_PEER_TLS_ROOTCERT_FILE=/etc/hyperledger/fabric/peer.crt 2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 09c waiting for container(lscc-1.4.3) lock 2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 09d got container (lscc-1.4.3) lock 2019-09-09 07:52:09.915 UTC [inproccontroller] getInstance -> DEBU 09e chaincode instance created for lscc-1.4.3 2019-09-09 07:52:09.915 UTC [container] unlockContainer -> DEBU 09f container lock deleted(lscc-1.4.3) 2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 0a0 waiting for container(lscc-1.4.3) lock 2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 0a1 got container (lscc-1.4.3) lock 2019-09-09 07:52:09.915 UTC [container] unlockContainer -> DEBU 0a2 container lock deleted(lscc-1.4.3) 2019-09-09 07:52:09.915 UTC [inproccontroller] func2 -> DEBU 0a3 chaincode-support started for lscc-1.4.3 2019-09-09 07:52:09.915 UTC [inproccontroller] func1 -> DEBU 0a4 chaincode started for lscc-1.4.3 // 以上日志对应的代码流程在上文都讲到了
// 以下是交互过程peer日志 // peer收到容器的注册消息 2019-09-09 07:52:09.916 UTC [chaincode] handleMessage -> DEBU 0a5 [] Fabric side handling ChaincodeMessage of type: REGISTER in state created 2019-09-09 07:52:09.916 UTC [chaincode] HandleRegister -> DEBU 0a6 Received REGISTER in state created 2019-09-09 07:52:09.916 UTC [chaincode] Register -> DEBU 0a7 registered handler complete for chaincode lscc:1.4.3 2019-09-09 07:52:09.916 UTC [chaincode] HandleRegister -> DEBU 0a8 Got REGISTER for chaincodeID = name:"lscc:1.4.3" , sending back REGISTERED 2019-09-09 07:52:09.920 UTC [grpc] HandleSubConnStateChange -> DEBU 0a9 pickfirstBalancer: HandleSubConnStateChange: 0xc0026318c0, READY 2019-09-09 07:52:09.923 UTC [chaincode] HandleRegister -> DEBU 0aa Changed state to established for name:"lscc:1.4.3"
// peer发送ready消息 2019-09-09 07:52:09.923 UTC [chaincode] sendReady -> DEBU 0ab sending READY for chaincode name:"lscc:1.4.3" 2019-09-09 07:52:09.923 UTC [chaincode] sendReady -> DEBU 0ac Changed to state ready for chaincode name:"lscc:1.4.3"
// 已经完成启动容器 2019-09-09 07:52:09.923 UTC [chaincode] Launch -> DEBU 0ad launch complete 2019-09-09 07:52:09.924 UTC [chaincode] Execute -> DEBU 0ae Entry // 收到容器COMPLETED消息 2019-09-09 07:52:09.925 UTC [chaincode] handleMessage -> DEBU 0af [01b03aae] Fabric side handling ChaincodeMessage of type: COMPLETED in state ready
// 通知scc,部署已经完成 2019-09-09 07:52:09.925 UTC [chaincode] Notify -> DEBU 0b0 [01b03aae] notifying Txid:01b03aae-17a6-4b63-874e-dc20d6f5df0c, channelID: 2019-09-09 07:52:09.925 UTC [chaincode] Execute -> DEBU 0b1 Exit 2019-09-09 07:52:09.925 UTC [sccapi] deploySysCC -> INFO 0b2 system chaincode lscc/(github.com/hyperledger/fabric/core/scc/lscc) deployed
func(v *VsccValidatorImpl)VSCCValidateTxForCC(ctx *Context)error { logger.Debug("Validating", ctx, "with plugin") // 使用插件验证交易 err := v.pluginValidator.ValidateWithPlugin(ctx) if err == nil { returnnil } // If the error is a pluggable validation execution error, cast it to the common errors ExecutionFailureError. if e, isExecutionError := err.(*validation.ExecutionFailureError); isExecutionError { return &commonerrors.VSCCExecutionFailureError{Err: e} } // Else, treat it as an endorsement error. return &commonerrors.VSCCEndorsementPolicyError{Err: err} }
// Plugin validates transactions type Plugin interface { // Validate returns nil if the action at the given position inside the transaction // at the given position in the given block is valid, or an error if not. Validate(block *common.Block, namespace string, txPosition int, actionPosition int, contextData ...ContextDatum) error
// Init injects dependencies into the instance of the Plugin Init(dependencies ...Dependency) error }
// 拿到序列化后的policy serializedPolicy, isSerializedPolicy := contextData[0].(SerializedPolicy) if !isSerializedPolicy { logger.Panicf("Expected to receive a serialized policy in the first context data") } if block == nil || block.Data == nil { return errors.New("empty block") } if txPosition >= len(block.Data.Data) { return errors.Errorf("block has only %d transactions, but requested tx at position %d", len(block.Data.Data), txPosition) } if block.Header == nil { return errors.Errorf("no block header") }
// 调用不同版本的validator进行验证 var err error switch { case v.Capabilities.V1_3Validation(): err = v.TxValidatorV1_3.Validate(block, namespace, txPosition, actionPosition, serializedPolicy.Bytes())
// 验证代码使用v2/validation_logic.go中的实现 // Validate validates the given envelope corresponding to a transaction with an endorsement // policy as given in its serialized form func(vscc *Validator)Validate( block *common.Block, namespace string, txPosition int, actionPosition int, policyBytes []byte, )commonerrors.TxValidationError { ... // evaluate the signature set against the policy err = vscc.policyEvaluator.Evaluate(policyBytes, signatureSet) if err != nil { logger.Warningf("Endorsement policy failure for transaction txid=%s, err: %s", chdr.GetTxId(), err.Error()) iflen(signatureSet) < len(cap.Action.Endorsements) { // Warning: duplicated identities exist, endorsement failure might be cause by this reason return policyErr(errors.New(DUPLICATED_IDENTITY_ERROR)) } return policyErr(fmt.Errorf("VSCC error: endorsement policy failure, err: %s", err)) } ... }
// PolicyEvaluator evaluates policies type PolicyEvaluator interface { validation.Dependency
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies // the policy with the given bytes Evaluate(policyBytes []byte, signatureSet []*common.SignedData) error }
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy func (id *PolicyEvaluator) Evaluate(policyBytes []byte, signatureSet []*common.SignedData) error { pp := cauthdsl.NewPolicyProvider(id.IdentityDeserializer) policy, _, err := pp.NewPolicy(policyBytes) if err != nil { return err } return policy.Evaluate(signatureSet) }
// Policy is used to determine if a signature is valid type Policy interface { // Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy Evaluate(signatureSet []*cb.SignedData) error }
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy func (p *policy) Evaluate(signatureSet []*cb.SignedData) error { if p == nil { return fmt.Errorf("No such policy") } idAndS := make([]IdentityAndSignature, len(signatureSet)) for i, sd := range signatureSet { idAndS[i] = &deserializeAndVerify{ signedData: sd, deserializer: p.deserializer, } }
ok := p.evaluator(deduplicate(idAndS), make([]bool, len(signatureSet))) if !ok { return errors.New("signature set did not satisfy policy") } return nil }
// Context defines information about a transaction // that is being validated type Context struct { Seq int Envelope []byte TxID string Channel string VSCCName string Policy []byte // 背书策略 Namespace string Block *common.Block }
// GetInfoForValidate gets the ChaincodeInstance(with latest version) of tx, vscc and policy from lscc func(v *VsccValidatorImpl)GetInfoForValidate(chdr *common.ChannelHeader, ccID string)(*sysccprovider.ChaincodeInstance, *sysccprovider.ChaincodeInstance, []byte, error) { cc := &sysccprovider.ChaincodeInstance{ ChainID: chdr.ChannelId, ChaincodeName: ccID, ChaincodeVersion: coreUtil.GetSysCCVersion(), } vscc := &sysccprovider.ChaincodeInstance{ ChainID: chdr.ChannelId, ChaincodeName: "vscc", // default vscc for system chaincodes ChaincodeVersion: coreUtil.GetSysCCVersion(), // Get vscc version } var policy []byte var err error if !v.sccprovider.IsSysCC(ccID) { // when we are validating a chaincode that is not a // system CC, we need to ask the CC to give us the name // of VSCC and of the policy that should be used
// obtain name of the VSCC and the policy // 获取cc 定义 cd, err := v.getCDataForCC(chdr.ChannelId, ccID) if err != nil { msg := fmt.Sprintf("Unable to get chaincode data from ledger for txid %s, due to %s", chdr.TxId, err) logger.Errorf(msg) returnnil, nil, nil, err } cc.ChaincodeName = cd.CCName() cc.ChaincodeVersion = cd.CCVersion() // 拿到policy vscc.ChaincodeName, policy = cd.Validation() } else { // when we are validating a system CC, we use the default // VSCC and a default policy that requires one signature // from any of the members of the channel p := cauthdsl.SignedByAnyMember(v.support.GetMSPIDs(chdr.ChannelId)) policy, err = utils.Marshal(p) if err != nil { returnnil, nil, nil, err } }
return cc, vscc, policy, nil }
//-------- ChaincodeDefinition - interface for ChaincodeData ------ // ChaincodeDefinition describes all of the necessary information for a peer to decide whether to endorse // a proposal and whether to validate a transaction, for a particular chaincode. type ChaincodeDefinition interface { // CCName returns the name of this chaincode (the name it was put in the ChaincodeRegistry with). CCName() string
// Hash returns the hash of the chaincode. Hash() []byte
// CCVersion returns the version of the chaincode. CCVersion() string
// Validation returns how to validate transactions for this chaincode. // The string returned is the name of the validation method (usually 'vscc') // and the bytes returned are the argument to the validation (in the case of // 'vscc', this is a marshaled pb.VSCCArgs message). Validation() (string, []byte)
// Endorsement returns how to endorse proposals for this chaincode. // The string returns is the name of the endorsement method (usually 'escc'). Endorsement() string }
// Validation returns how to validate transactions for this chaincode. // The string returned is the name of the validation method (usually 'vscc') // and the bytes returned are the argument to the validation (in the case of // 'vscc', this is a marshaled pb.VSCCArgs message). func(cd *ChaincodeData)Validation()(string, []byte) { return cd.Vscc, cd.Policy }
//-------- ChaincodeData is stored on the LSCC -------
// ChaincodeData defines the datastructure for chaincodes to be serialized by proto // Type provides an additional check by directing to use a specific package after instantiation // Data is Type specifc (see CDSPackage and SignedCDSPackage) type ChaincodeData struct { // Name of the chaincode Name string`protobuf:"bytes,1,opt,name=name"`
// Version of the chaincode Version string`protobuf:"bytes,2,opt,name=version"`
// Escc for the chaincode instance Escc string`protobuf:"bytes,3,opt,name=escc"`
// Vscc for the chaincode instance Vscc string`protobuf:"bytes,4,opt,name=vscc"`
// 背书策略 // Policy endorsement policy for the chaincode instance Policy []byte`protobuf:"bytes,5,opt,name=policy,proto3"`
// Data data specific to the package Data []byte`protobuf:"bytes,6,opt,name=data,proto3"`
// Id of the chaincode that's the unique fingerprint for the CC This is not // currently used anywhere but serves as a good eyecatcher Id []byte`protobuf:"bytes,7,opt,name=id,proto3"`
// InstantiationPolicy for the chaincode InstantiationPolicy []byte`protobuf:"bytes,8,opt,name=instantiation_policy,proto3"` }
// executeDeployOrUpgrade routes the code path either to executeDeploy or executeUpgrade // depending on its function argument func(lscc *LifeCycleSysCC)executeDeployOrUpgrade( stub shim.ChaincodeStubInterface, chainname string, cds *pb.ChaincodeDeploymentSpec, policy, escc, vscc, collectionConfigBytes []byte, function string, )(*ccprovider.ChaincodeData, error) {
// Invoke implements lifecycle functions "deploy", "start", "stop", "upgrade". // Deploy's arguments - {[]byte("deploy"), []byte(<chainname>), <unmarshalled pb.ChaincodeDeploymentSpec>} // // Invoke also implements some query-like functions // Get chaincode arguments - {[]byte("getid"), []byte(<chainname>), []byte(<chaincodename>)} func(lscc *LifeCycleSysCC)Invoke(stub shim.ChaincodeStubInterface)pb.Response { ... switch function { case INSTALL: ... case DEPLOY, UPGRADE: // 提取背书策略 // optional arguments here (they can each be nil and may or may not be present) // args[3] is a marshalled SignaturePolicyEnvelope representing the endorsement policy // args[4] is the name of escc // args[5] is the name of vscc // args[6] is a marshalled CollectionConfigPackage struct var EP []byte iflen(args) > 3 && len(args[3]) > 0 { EP = args[3] } else { p := cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel)) EP, err = utils.Marshal(p) if err != nil { return shim.Error(err.Error()) } } ... cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function) ... case ...: ... } }