0%

前言

许多Orderer的文章,都是从Orderer的启动过程讲起,今天换一种“乐高”角度,先看看有哪些“零件”,再看这些零件怎么配合。

Orderer负责接收交易,把交易打包成区块,然后区块在所有Orderer节点之间达成一致,再分发给Peer的功能,这涉及了:

  • 网络:gRPC接收交易,向Peer发送区块
  • 切块:把交易打包成区块
  • 共识:所有Orderer节点达成一致

这些功能是由Orderer核心数据结构组织起来。

在Fabric中,通道和链在概念上都是一条区块链,所以本文中也会可能会混用链和通道。

核心数据结构

Registrar

Registrar

代码中,这样描述Registrar:

Registrar serves as a point of access and control for the individual channel resources.

可见它负责了每个channel资源的访问和控制点,也就说,要对某个通道怎么样,得从这入手。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Registrar struct {
lock sync.RWMutex
// 保存了多条链
chains map[string]*ChainSupport

// 共识插件
consenters map[string]consensus.Consenter
ledgerFactory blockledger.Factory
signer crypto.LocalSigner

systemChannelID string
systemChannel *ChainSupport
...
}
  • chains保存了每一条链,每一条链在Orderer中都以ChainSupport代表。
  • consenters保存了所有的共识插件,每个共识插件都是一个Consenter,Fabric 1.4中共识插件有Solo、Kafka、EtcdRaft。
  • ledgerFactory用来读取和创建链的账本。
  • signer用来对Orderer中的数据进行签名,以及创建SignatureHeader
  • systemChannelIDsystemChannel分别是系统链ID、系统链实例。

ChainSupport

chainsupport

ChainSupport汇集了一条通道所需要的所有资源,所以说一个ChainSupport代表了一条链。

1
2
3
4
5
6
7
8
type ChainSupport struct {
*ledgerResources
msgprocessor.Processor
*BlockWriter
consensus.Chain
cutter blockcutter.Receiver
crypto.LocalSigner
}

ChainSupport 是一堆接口的集合,这些接口构成一条链所有的操作,接口可以分为4类:

  • 账本:ledgerResourcesBlockWriter分别是账本读写和把区块写入到账本。
  • 消息:msgprocessor.Processorcutter分别是处理交易和把交易切块。
  • 共识:consensus.Chain是Orderer的共识实例,比如每条链都有自己的Raft共识实例,它们互不干扰。
  • 签名:crypto.LocalSigner,同Registrar中的介绍。

Chain

Chain

Chain是接口,它的实现并不一条链,而是一条链的共识实例,可以是Solo、Kafka和EtcdRaft,它运行在单独的协程,使用Channel和ChainSupport通信,它调用其它接口完成切块,以及让所有的Orderer节点对交易达成一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Chain defines a way to inject messages for ordering.
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
// and ultimately write the ledger also supplied via HandleChain. This design allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// 普通消息/交易排序
// Order accepts a message which has been processed at a given configSeq.
// If the configSeq advances, it is the responsibility of the consenter
// to revalidate and potentially discard the message
// The consenter may return an error, indicating the message was not accepted
Order(env *cb.Envelope, configSeq uint64) error

// 配置消息/交易排序
// Configure accepts a message which reconfigures the channel and will
// trigger an update to the configSeq if committed. The configuration must have
// been triggered by a ConfigUpdate message. If the config sequence advances,
// it is the responsibility of the consenter to recompute the resulting config,
// discarding the message if the reconfiguration is no longer valid.
// The consenter may return an error, indicating the message was not accepted
Configure(config *cb.Envelope, configSeq uint64) error

// 等待排序集群可用
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
// This is useful when consenter needs to temporarily block ingress messages so
// that in-flight messages can be consumed. It could return error if consenter is
// in erroneous states. If this blocking behavior is not desired, consenter could
// simply return nil.
WaitReady() error

// 当排序集群发送错误时,会关闭返回的通道
// Errored returns a channel which will close when an error has occurred.
// This is especially useful for the Deliver client, who must terminate waiting
// clients when the consenter is not up to date.
Errored() <-chan struct{}

// 启动当前链
// Start should allocate whatever resources are needed for staying up to date with the chain.
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger.
Start()

// 停止当前链,并释放资源
// Halt frees the resources which were allocated for this Chain.
Halt()
}

Consenter

Consenter

1
2
3
type Consenter interface {
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

Consenter也是接口,它只有1个功能用来创建Chain。每种共识插件,都有自己单独的consenter实现,分别用来创建solo实例、kafka实例或etcdraft实例。

ConsenterSupport

ConsenterSupport

ConsenterSupport为consenter实现提供所需的资源,其实就是共识用来访问外部数据的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ConsenterSupport provides the resources available to a Consenter implementation.
type ConsenterSupport interface {
crypto.LocalSigner
msgprocessor.Processor

// VerifyBlockSignature verifies a signature of a block with a given optional
// configuration (can be nil).
VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error

// 提供把消息切成块的接口
// BlockCutter returns the block cutting helper for this channel.
BlockCutter() blockcutter.Receiver

// 当前链的orderer配置
// SharedConfig provides the shared config from the channel's current config block.
SharedConfig() channelconfig.Orderer

// 当前链的通道配置
// ChannelConfig provides the channel config from the channel's current config block.
ChannelConfig() channelconfig.Channel

// 生成区块
// CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger
// Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time.
CreateNextBlock(messages []*cb.Envelope) *cb.Block

// 读区块
// Block returns a block with the given number,
// or nil if such a block doesn't exist.
Block(number uint64) *cb.Block

// 写区块
// WriteBlock commits a block to the ledger.
WriteBlock(block *cb.Block, encodedMetadataValue []byte)

// 写配置区块并更新配置
// WriteConfigBlock commits a block to the ledger, and applies the config update inside.
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)

// Sequence returns the current config squence.
Sequence() uint64

// ChainID returns the channel ID this support is associated with.
ChainID() string

// Height returns the number of blocks in the chain this channel is associated with.
Height() uint64

// 以原始数据的格式追加区块,不像WriteBlock那样会修改元数据
// Append appends a new block to the ledger in its raw form,
// unlike WriteBlock that also mutates its metadata.
Append(block *cb.Block) error
}

宏观视角

把上面介绍的各项,融合在一幅图中:

  • Registrar 包容万象,主要是ChainSupport和Consenter,Consenter是可插拔的
  • ChainSupport 代表了一条链,能够指向属于本条链的共识实例,该共识实例由对应共识类型的Consenter创建
  • 共识实例使用ConsenterSupport访问共识外部资源

使用fabric-sdk-go操作链码,介绍了使用官方Go SDK,安装、实例化和升级链码,调用和查询链码,本文介绍使用fabric-sdk-go订阅事件。

事件介绍

本质上就3种事件:

  • BlockEvent:获取区块信息
  • TransactionEvent:获取交易信息
  • ChainCodeEvnet:链码中自定义的链码事件

但每种事件都有2 种类型:

  • Filtered:事件订阅时默认的类型,获取的信息“不全”,不同的事件缺失的数据不同,比如链码事件,如果是Filtered的,其响应字段中的Payload是空的,也就是不知道链码事件携带的数据。这种方式能够降低fabric网络和SDK之间的流量,当Filtered后的字段信息就足够时,这种方式非常适合。关于Filtered的更多信息,这篇文章 Fabric 1.4源码解读 3:Event原理解读 非常有帮助。
  • 非Filtered :可以获取完整的区块、交易、链码事件信息,这种方式在SDK想获取更多信息时,是非常必要的。

4 个注册事件的接口1个取消注册的接口如下:

接口名称 描述 参数值 返回值
RegisterBlockEvent 注册块事件 filter …fab.BlockFilter fab.Registration, <-chan *fab.BlockEvent, error
RegisterFilteredBlockEvent 注册过滤块事件 fab.Registration, <-chan *fab.FilteredBlockEvent, error
RegisterTxStatusEvent 注册交易状态事件 txID string fab.Registration, <-chan *fab.TxStatusEvent, error
RegisterChaincodeEvent 注册链码事件 ccID, eventFilter string fab.Registration, <-chan *fab.CCEvent, error
Unregister 取消事件订阅 fab.Registration

注册会得到管理可以管理订阅的Registration、接收事件的通道,以及可能注册时发生的错误,关于每个接口的具体介绍、使用,可以参考官方的Event文档,其中包含了样例代码,如果想看真实的样例代码,可以参考示例项目

Option介绍

注册事件需要使用EventClient,创建EventClient时可以指定一些选项,这些选项其实就是事件订阅的选项。

有3个Option:

  • func WithBlockEvents() ClientOption

    指定了此选项,事件就是非“filtered”,fabric会向调用SDK客户端发送完整的区块,可以获得订阅事件完整的信息。

  • func WithSeekType(seek seek.Type) ClientOption

    使用此选项可以指定从哪个区块高度获取事件seek.TypeOldestNewestFromBlock 3种取值,分别代表从第1个区块、最后一个区块和指定区块号开始获取事件,FromBlock需要结合WithBlockNum使用。So,可以通过这个选项获取历史事件

  • func WithBlockNum(from uint64) ClientOption

    指定区块高度,只有WithSeekType(FromBlock)时才生效。

链码事件多说几句

链码如何发链码事件

ChaincodeStubInterfaceSetEvent的方法,入参分别为事件名称和事件锁携带的信息payload。

1
2
3
4
5
6
7
8
9
// ChaincodeStubInterface is used by deployable chaincode apps to access and
// modify their ledgers
type ChaincodeStubInterface interface {
// SetEvent allows the chaincode to set an event on the response to the
// proposal to be included as part of a transaction. The event will be
// available within the transaction in the committed block regardless of the
// validity of the transaction.
SetEvent(name string, payload []byte) error
}

通过ChannelClient订阅链码事件介绍

SDK的channel client也有订阅链码事件的接口:channel.Client.RegisterChaincodeEvent(),它的定义和event client提供的接口完全一样,但功能上有所差别。

channel client没有指定 WithBlockEvents,所以这是Filtered的事件链码,获取的事件链码中,其Payload为空。

示例项目

示例项目fabric-sdk-go-sample是结合Fabric的BYFN展示如何使用fabric-sdk-go的项目,它的Event样例部分,介绍了如何使用以上接口订阅Fabric事件,具体请参加该部分README

理论知识

如果不清楚数字证书、公私钥与签名的关系,建议阅读阮一峰的数字签名是什么?

Fabric证书和密钥文件

使用Fabric CA或者 cryptogen 工具可以生成证书和私钥文件,这里取 BYFN 例子的文件做介绍,Org1 Admin 账户的文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
➜  first-network git:(release-1.4) ✗ tree crypto-config/peerOrganizations/org1.example.com/users/Admin@org1.example.com
crypto-config/peerOrganizations/org1.example.com/users/Admin@org1.example.com
├── msp
│   ├── admincerts
│   │   └── Admin@org1.example.com-cert.pem
│   ├── cacerts
│   │   └── ca.org1.example.com-cert.pem
│   ├── keystore
│   │   └── f9f3dddb7fcc40086de6d5ae77f1481abbb99bff7a74839b950720d3dca0d8ee_sk
│   ├── signcerts
│   │   └── Admin@org1.example.com-cert.pem
│   └── tlscacerts
│   └── tlsca.org1.example.com-cert.pem
└── tls
├── ca.crt
├── client.crt
└── client.key

msp目录,为Admin的身份信息:

  • admincerts:组织管理员的身份验证证书。
  • cacerts:组织的根证书。
  • keystore:该用户的私钥,用来对消息签名。
  • signcerts:该用户的身份验证证书,被组织根证书签名。
  • tlscacerts:TLS通信用的身份证书,为组织的TLS证书。

tls目录,为TLS通信相关的证书:

  • ca.crt:组织根证书
  • client.crt:验证当前用户身份的证书,当前为验证管理员的证书
  • client.key:当前用户的身份私钥,用来签名

整体逻辑

交易是区块链的核心,一切状态的转移都是一条交易,交易的真伪需要使用数字签名进行保证。

在Fabric中,交易涉及两个概念:

  • Proposal:提案
  • Transaction:交易

所以 Proposal 和 Transaction 都需要使用数字签名进行保护,它们相关的消息中,都包含了发送方的身份信息:mspid、证书(证书中实际包含了公钥)。

提案的实际消息是 SignedProposal,其中包含了:

  • 数字签名:Signature
  • 证书、公钥等签名者身份信息:ProposalBytes.Proposal.Header.SignatureHeader.Creator

signed_proposal

图来自杨保华的hyperledger_code_fabric

交易中最重要的是Envelope结构体,SDK向Orderer提交交易时,会发送Envelope消息,它包含了:

  • 数字签名:Signature
  • 交易发送方的身份信息:Payload.Header.SignatureHeader.Creator
  • 可选背书节点的身份信息,不同的交易类型,Data包含了不同的信息,如果是需要背书的,可以包含背书的信息、签名和身份信息:Payload.Data.SignedChainccodeDeploymentSpec.OwnerEndorsements.signingidentity

Signed transaction

图来自《区块链原理、设计与应用》,为升级链码的交易Envelope结构。

在验证消息的签名时,会从中提取出数字签名Signature,身份信息(证书、公钥)和被签名消息体,完成以下验证:

  • 使用证书验证发送方的身份,发送方是否属于它所在的组织,以及发送方的公钥没有修改和替换
  • 使用公钥验证消息是否为发送方签名,并且消息没有被修改

验证的整体流程如下:

Verify signature

验证签名的函数

core/common/validation/msgvalidation.go 提供了2验证消息签名的函数,用来验证Proposal和Transaction,它们会调用相同的函数checkSignatureFromCreator进行数字签名的验证。

验证Porposal签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func ValidateProposalMessage(signedProp *pb.SignedProposal) (*pb.Proposal, *common.Header, *pb.ChaincodeHeaderExtension, error) {
...

// 从SignatureHeader交易客户端的签名
// validate the signature
err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
if err != nil {
// log the exact message on the peer but return a generic error message to
// avoid malicious users scanning for channels
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(shdr.Creator, sId)
if err != nil {
// log the error here as well but still only return the generic error
err = errors.Wrap(err, "could not deserialize a SerializedIdentity")
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
}
return nil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid)
}
}

验证Transaction签名

Commit阶段会对交易进行验证,会调用此函数,该函数完成了对Transaction的验证,包含发送方数字签名的验证。

交易是包含背书结果和背书签名的,背书相关的验证并不包含在此,而是专门的背书验证,具体请看Fabric 1.4源码解读 1:背书策略是怎么使用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// ValidateTransaction checks that the transaction envelope is properly formed
func ValidateTransaction(e *common.Envelope, c channelconfig.ApplicationCapabilities) (*common.Payload, pb.TxValidationCode) {
putilsLogger.Debugf("ValidateTransactionEnvelope starts for envelope %p", e)

...

// validate the header
chdr, shdr, err := validateCommonHeader(payload.Header)
if err != nil {
putilsLogger.Errorf("validateCommonHeader returns err %s", err)
return nil, pb.TxValidationCode_BAD_COMMON_HEADER
}

// validate the signature in the envelope
err = checkSignatureFromCreator(shdr.Creator, e.Signature, e.Payload, chdr.ChannelId)
if err != nil {
putilsLogger.Errorf("checkSignatureFromCreator returns err %s", err)
return nil, pb.TxValidationCode_BAD_CREATOR_SIGNATURE
}

// continue the validation in a way that depends on the type specified in the header
switch common.HeaderType(chdr.Type) {
case common.HeaderType_ENDORSER_TRANSACTION:
// Verify that the transaction ID has been computed properly.
// This check is needed to ensure that the lookup into the ledger
// for the same TxID catches duplicates.
err = utils.CheckTxID(
chdr.TxId,
shdr.Nonce,
shdr.Creator)

if err != nil {
putilsLogger.Errorf("CheckTxID returns err %s", err)
return nil, pb.TxValidationCode_BAD_PROPOSAL_TXID
}

// 如果是背书交易,背书的签名不在此验证,由背书策略模块进行验证
err = validateEndorserTransaction(payload.Data, payload.Header)
putilsLogger.Debugf("ValidateTransactionEnvelope returns err %s", err)

验证签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// given a creator, a message and a signature,
// this function returns nil if the creator
// is a valid cert and the signature is valid
func checkSignatureFromCreator(creatorBytes []byte, sig []byte, msg []byte, ChainID string) error {
putilsLogger.Debugf("begin")

// check for nil argument
if creatorBytes == nil || sig == nil || msg == nil {
return errors.New("nil arguments")
}

// 每个链有各自的msp
mspObj := mspmgmt.GetIdentityDeserializer(ChainID)
if mspObj == nil {
return errors.Errorf("could not get msp for channel [%s]", ChainID)
}

// 获取proposal创建者/发送方的Identity
// creatorBytes 中是序列化后的mspid、证书、公钥等信息
creator, err := mspObj.DeserializeIdentity(creatorBytes)
if err != nil {
return errors.WithMessage(err, "MSP error")
}

putilsLogger.Debugf("creator is %s", creator.GetIdentifier())

// 验证证书是否有效
// ensure that creator is a valid certificate
err = creator.Validate()
if err != nil {
return errors.WithMessage(err, "creator certificate is not valid")
}

putilsLogger.Debugf("creator is valid")

// validate the signature
// 验证签名
err = creator.Verify(msg, sig)
if err != nil {
return errors.WithMessage(err, "creator's signature over the proposal is not valid")
}

putilsLogger.Debugf("exits successfully")

return nil
}

获取Identity

获取当前通道的MSP manager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// GetIdentityDeserializer returns the IdentityDeserializer for the given chain
func GetIdentityDeserializer(chainID string) msp.IdentityDeserializer {
if chainID == "" {
return GetLocalMSP()
}

return GetManagerForChain(chainID)
}

// GetManagerForChain returns the msp manager for the supplied
// chain; if no such manager exists, one is created
func GetManagerForChain(chainID string) msp.MSPManager {
m.Lock()
defer m.Unlock()

// 先从缓存查找
mspMgr, ok := mspMap[chainID]
if !ok {
// 找不到则新建立当前通道Msp manager
mspLogger.Debugf("Created new msp manager for channel `%s`", chainID)
mspMgmtMgr := &mspMgmtMgr{msp.NewMSPManager(), false}
mspMap[chainID] = mspMgmtMgr
mspMgr = mspMgmtMgr
} else {
// check for internal mspManagerImpl and mspMgmtMgr types. if a different
// type is found, it's because a developer has added a new type that
// implements the MSPManager interface and should add a case to the logic
// above to handle it.
if !(reflect.TypeOf(mspMgr).Elem().Name() == "mspManagerImpl" || reflect.TypeOf(mspMgr).Elem().Name() == "mspMgmtMgr") {
panic("Found unexpected MSPManager type.")
}
mspLogger.Debugf("Returning existing manager for channel '%s'", chainID)
}
return mspMgr
}

// MSPManager has been setup for a channel, which indicates whether the channel
// exists or not
type mspMgmtMgr struct {
msp.MSPManager
// track whether this MSPManager has been setup successfully
up bool
}

msp.MSPManager是一个接口,从上面代码可以得知,它是利用NewMSPManager创建的:

1
2
3
4
// 创建等待Setup的MSPManager
func NewMSPManager() MSPManager {
return &mspManagerImpl{}
}

疑问是,啥时候Setup的,当前调用路径上没发现这个路径,可能从系统整体流程上,已经保证了,当前调用时,已经创建好了。

获取Identity,是一个剥洋葱的过程:

1
2
3
4
5
6
func (mgr *mspMgmtMgr) DeserializeIdentity(serializedIdentity []byte) (msp.Identity, error) {
if !mgr.up {
return nil, errors.New("channel doesn't exist")
}
return mgr.MSPManager.DeserializeIdentity(serializedIdentity)
}

实际调用mspManagerImplDeserializeIdentity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// DeserializeIdentity returns an identity given its serialized version supplied as argument
func (mgr *mspManagerImpl) DeserializeIdentity(serializedID []byte) (Identity, error) {
// We first deserialize to a SerializedIdentity to get the MSP ID
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(serializedID, sId)
if err != nil {
return nil, errors.Wrap(err, "could not deserialize a SerializedIdentity")
}

// 获取发送方的msp实例
// we can now attempt to obtain the MSP
msp := mgr.mspsMap[sId.Mspid]
if msp == nil {
return nil, errors.Errorf("MSP %s is unknown", sId.Mspid)
}

switch t := msp.(type) {
case *bccspmsp:
return t.deserializeIdentityInternal(sId.IdBytes)
case *idemixmsp:
return t.deserializeIdentityInternal(sId.IdBytes)
default:
return t.DeserializeIdentity(serializedID)
}
}

转到bccspmsp的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 反序列化二进制,得到证书,然后用证书获取公钥,使用证书、公钥和msp,创建Identity
// deserializeIdentityInternal returns an identity given its byte-level representation
func (msp *bccspmsp) deserializeIdentityInternal(serializedIdentity []byte) (Identity, error) {
// This MSP will always deserialize certs this way
bl, _ := pem.Decode(serializedIdentity)
if bl == nil {
return nil, errors.New("could not decode the PEM structure")
}
cert, err := x509.ParseCertificate(bl.Bytes)
if err != nil {
return nil, errors.Wrap(err, "parseCertificate failed")
}

// Now we have the certificate; make sure that its fields
// (e.g. the Issuer.OU or the Subject.OU) match with the
// MSP id that this MSP has; otherwise it might be an attack
// TODO!
// We can't do it yet because there is no standardized way
// (yet) to encode the MSP ID into the x.509 body of a cert

pub, err := msp.bccsp.KeyImport(cert, &bccsp.X509PublicKeyImportOpts{Temporary: true})
if err != nil {
return nil, errors.WithMessage(err, "failed to import certificate's public key")
}

return newIdentity(cert, pub, msp)
}

Identity包含了Identity标示符,证书、公钥和所在的msp,创建Identity就是计算以上几项信息的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type identity struct {
// id contains the identifier (MSPID and identity identifier) for this instance
id *IdentityIdentifier

// cert contains the x.509 certificate that signs the public key of this instance
cert *x509.Certificate

// this is the public key of this instance
pk bccsp.Key

// reference to the MSP that "owns" this identity
msp *bccspmsp
}

func newIdentity(cert *x509.Certificate, pk bccsp.Key, msp *bccspmsp) (Identity, error) {
if mspIdentityLogger.IsEnabledFor(zapcore.DebugLevel) {
mspIdentityLogger.Debugf("Creating identity instance for cert %s", certToPEM(cert))
}

// 检查证书
// Sanitize first the certificate
cert, err := msp.sanitizeCert(cert)
if err != nil {
return nil, err
}

// Compute identity identifier

// Use the hash of the identity's certificate as id in the IdentityIdentifier
hashOpt, err := bccsp.GetHashOpt(msp.cryptoConfig.IdentityIdentifierHashFunction)
if err != nil {
return nil, errors.WithMessage(err, "failed getting hash function options")
}

digest, err := msp.bccsp.Hash(cert.Raw, hashOpt)
if err != nil {
return nil, errors.WithMessage(err, "failed hashing raw certificate to compute the id of the IdentityIdentifier")
}

id := &IdentityIdentifier{
Mspid: msp.name,
Id: hex.EncodeToString(digest)}

return &identity{id: id, cert: cert, pk: pk, msp: msp}, nil
}

验证数字签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Verify checks against a signature and a message
// to determine whether this identity produced the
// signature; it returns nil if so or an error otherwise
func (id *identity) Verify(msg []byte, sig []byte) error {
// mspIdentityLogger.Infof("Verifying signature")

// Compute Hash
hashOpt, err := id.getHashOpt(id.msp.cryptoConfig.SignatureHashFamily)
if err != nil {
return errors.WithMessage(err, "failed getting hash function options")
}

digest, err := id.msp.bccsp.Hash(msg, hashOpt)
if err != nil {
return errors.WithMessage(err, "failed computing digest")
}

if mspIdentityLogger.IsEnabledFor(zapcore.DebugLevel) {
mspIdentityLogger.Debugf("Verify: digest = %s", hex.Dump(digest))
mspIdentityLogger.Debugf("Verify: sig = %s", hex.Dump(sig))
}

// 最终会调用bccsp的接口验证签名,SW或者国密
valid, err := id.msp.bccsp.Verify(id.pk, sig, digest, nil)
if err != nil {
return errors.WithMessage(err, "could not determine the validity of the signature")
} else if !valid {
return errors.New("The signature is invalid")
}

return nil
}

解密SignatureHeader

Fabric 使用 SignatureHeader 保存发送方的身份信息,Creator即为序列化后的信息。

SignatureHeaderMaker 接口定义了创建一个 SignatureHeader 的方法,搜索起来实现该接口的结构体很多,本质上只有2个:mspSignerSignatureHeaderCreator

1
2
3
4
5
6
7
8
9
10
11
// SignatureHeaderMaker creates a new SignatureHeader
type SignatureHeaderMaker interface {
// NewSignatureHeader creates a SignatureHeader with the correct signing identity and a valid nonce
NewSignatureHeader() (*cb.SignatureHeader, error)
}

// localmsp
func (s *mspSigner) NewSignatureHeader() (*cb.SignatureHeader, error) {}

// crypto
func (bs *SignatureHeaderCreator) NewSignatureHeader() (*cb.SignatureHeader, error){}

两个实现本质上是一样的,以 mspSigner 为例进行介绍。首先获取实现SigningIdentity接口的实例,然后调用Serialize得到序列化后的身份信息,再随机生成一个Nonce,创建出SignatureHeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// NewSignatureHeader creates a SignatureHeader with the correct signing identity and a valid nonce
func (s *mspSigner) NewSignatureHeader() (*cb.SignatureHeader, error) {
// 获得SigningIdentity接口实例
signer, err := mspmgmt.GetLocalMSP().GetDefaultSigningIdentity()
if err != nil {
return nil, fmt.Errorf("Failed getting MSP-based signer [%s]", err)
}

// 序列化得到creator
creatorIdentityRaw, err := signer.Serialize()
if err != nil {
return nil, fmt.Errorf("Failed serializing creator public identity [%s]", err)
}

// 获取一个随机nonce
nonce, err := crypto.GetRandomNonce()
if err != nil {
return nil, fmt.Errorf("Failed creating nonce [%s]", err)
}

sh := &cb.SignatureHeader{}
sh.Creator = creatorIdentityRaw
sh.Nonce = nonce

return sh, nil
}

SigningIdentity接口包含了Identity接口,Identity声明了跟证书相关的方法,SigningIdentity则增加了对消息签名的函数Sign

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type SigningIdentity interface {

// Extends Identity
Identity

// Sign the message
Sign(msg []byte) ([]byte, error)

// GetPublicVersion returns the public parts of this identity
GetPublicVersion() Identity
}

type Identity interface {
...
// Serialize converts an identity to bytes
Serialize() ([]byte, error)
...
}

Serialize的实现,实际只包含了证书和MSPID,说明了消息中携带的只包含MSPID和证书作为身份信息,而不是signingidentity的所有字段(signingidentity实现了SigningIdentity接口)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Serialize returns a byte array representation of this identity
func (id *identity) Serialize() ([]byte, error) {
// mspIdentityLogger.Infof("Serializing identity %s", id.id)

// Raw格式证书
pb := &pem.Block{Bytes: id.cert.Raw, Type: "CERTIFICATE"}
pemBytes := pem.EncodeToMemory(pb)
if pemBytes == nil {
return nil, errors.New("encoding of identity failed")
}

// 使用MSPID和序列化后的证书,再次序列化得到身份信息
sId := &msp.SerializedIdentity{Mspid: id.id.Mspid, IdBytes: pemBytes}
idBytes, err := proto.Marshal(sId)
if err != nil {
return nil, errors.Wrapf(err, "could not marshal a SerializedIdentity structure for identity %s", id.id)
}

return idBytes, nil
}

参考资料

  1. https://github.com/yeasy/hyperledger_code_fabric
  2. 《区块链原理、设计与应用》第9章、第10章

教程资料

问题汇总

replace 使用http或https

在使用go replace时,有2点注意:

  • 目标仓库不能带协议头,比如http、https,要从域名或者IP开始
  • 版本号格式要符合语义格式化,测试版本是否符合规则:Go playground 样例代码

直接修改 go.mod 文件格式:

1
replace github.com/hyperledger/fabric v1.4.1 => 192.168.9.251/hyperledger/fabric v1.4.1-alpha.11-yx

或使用命令:

1
go mod edit -replace=github.com/hyperledger/fabric@v1.4.1=192.168.9.251/hyperledger/fabric@v1.4.1

Gitlab 仓库没开启https

go mod 默认使用 go get 下载依赖,而 go get 默认使用 https,如果 Gitlab 仓库没有启用 https,需要使用 -insecure 让go get走http。

问题:

1
2
3
GOPROXY="" go get github.com/hyperledger/fabric@v1.4.1
go: 192.168.9.251/hyperledger/fabric@v1.4.1-alpha.11-yx: unrecognized import path "192.168.9.251/hyperledger/fabric" (https fetch: Get https://192.168.9.251/hyperledger/fabric?go-get=1: dial tcp 192.168.9.251:443: connect: connection refused)
go: error loading module requirements

方案:

1
GOPROXY="" go get -insecure github.com/hyperledger/fabric@v1.4.1

注解:遇到问题时,使用 go get -v 可以看到更多信息,有助分析问题。

Go Modules 代理

由于某些网络原因,国内下载 Github 等处的依赖,不够流程,需要设置代理,不同版本的设置如下:

  • go1.12

    1
    $ export GOPROXY=https://goproxy.cn
  • go1.13

    1
    2
    $ export GOPRIVATE=192.168.9.251
    $ export GOPROXY=https://goproxy.cn,direct

私有仓库

如果仓库设置为私有,这要求用户必须登录才能访问仓库。

Go Modules 默认使用 go get 下载依赖,go get 利用 https 或者 http, 但下载过程没有设置用户名和密码的地方,下载依赖时,可能遇到一下错误:

  • connection refused
  • unkown revision

可以通过设置Github/Gitlab Access Token结果,通过token的方式,访问仓库,token的获取方式为,登录Gitlab仓库,进入以下页面:

Gitlab User Setting -> Access Tokens

在此页面复制下顶端的 Your New Personal Access Token, 然后填写token名字和勾选下方的权限进行创建 Token。

然后执行以下命令:

1
2
3
git config --global \
url."http://oauth2:${your_access_token}@ip_address_or_domain".insteadOf \
"http://ip_address_or_domain"

后面再去 go get 的时候,就可顺利下载依赖。

流程

快速入门Fabric核心概念和框架 这篇文章中,介绍了 Fabric 的很多概念,其中也包含了交易、提案(Proposal)和链码。同时也介绍了,交易的执行流程,链码的调用流程等。

本文聚焦介绍交易流程的一个环节:交易背书,以下的3幅图,在快速入门Fabric核心概念和框架 中都有介绍,有必要的话,去读一下上下文信息。

交易宏观流程

交易的详细流程请阅读 交易流程,了解交易流程的几大环节。

链码调用流程

上图,展示了客户端、Peer,以及链码容器 3大主体在交易流程中的背书过程,请关注一下Peer中的 Handler,它负责和链码容器交互。

提案背书流程

上图,从接近源码的层面,展示了交易背书过程。其中Fabric、Shim 是 Peer 中的模块,ChainCode 代表链码容器,Endorser Chaincode 代表 Peer 对交易提案和模拟执行结果进行背书。

如果了解过Chaincode,你会知道 Shim 是链码容器和 Peer 交互所依赖的模块。

最后推荐一份保华大佬整理的 Peer 提案背书过程,是读源码前,必读的资料。虽然精简,但把重要的核心流程都串联起来了。

源码分析

Proposal定义

客户端发送被背书节点的是 SignedProposal ,它包含了签名和Proposal,这是它在proposal.proto中的组成简介,:

1
2
3
4
5
SignedProposal
|\_ Signature (signature on the Proposal message by the creator specified in the header)
\_ Proposal
|\_ Header (the header for this proposal)
\_ Payload (the payload for this proposal)

proposal.proto这个文件还简要介绍了Client和背书节点之间通信的消息类型和过程。

Proposal

1
2
3
4
5
6
7
8
type SignedProposal struct {
// The bytes of Proposal
ProposalBytes []byte `protobuf:"bytes,1,opt,name=proposal_bytes,json=proposalBytes,proto3" json:"proposal_bytes,omitempty"`
// Signaure over proposalBytes; this signature is to be verified against
// the creator identity contained in the header of the Proposal message
// marshaled as proposalBytes
Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Proposal struct {
// The header of the proposal. It is the bytes of the Header
Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
// The payload of the proposal as defined by the type in the proposal
// header.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Optional extensions to the proposal. Its content depends on the Header's
// type field. For the type CHAINCODE, it might be the bytes of a
// ChaincodeAction message.
Extension []byte `protobuf:"bytes,3,opt,name=extension,proto3" json:"extension,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
type Header struct {
ChannelHeader []byte `protobuf:"bytes,1,opt,name=channel_header,json=channelHeader,proto3" json:"channel_header,omitempty"`
SignatureHeader []byte `protobuf:"bytes,2,opt,name=signature_header,json=signatureHeader,proto3" json:"signature_header,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}


// Header is a generic replay prevention and identity message to include in a signed payload
type ChannelHeader struct {
Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
// Version indicates message protocol version
Version int32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
// Timestamp is the local time when the message was created
// by the sender
Timestamp *timestamp.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// Identifier of the channel this message is bound for
ChannelId string `protobuf:"bytes,4,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"`
// An unique identifier that is used end-to-end.
// - set by higher layers such as end user or SDK
// - passed to the endorser (which will check for uniqueness)
// - as the header is passed along unchanged, it will be
// be retrieved by the committer (uniqueness check here as well)
// - to be stored in the ledger
TxId string `protobuf:"bytes,5,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"`
// The epoch in which this header was generated, where epoch is defined based on block height
// Epoch in which the response has been generated. This field identifies a
// logical window of time. A proposal response is accepted by a peer only if
// two conditions hold:
// 1. the epoch specified in the message is the current epoch
// 2. this message has been only seen once during this epoch (i.e. it hasn't
// been replayed)
Epoch uint64 `protobuf:"varint,6,opt,name=epoch,proto3" json:"epoch,omitempty"`
// Extension that may be attached based on the header type
Extension []byte `protobuf:"bytes,7,opt,name=extension,proto3" json:"extension,omitempty"`
// If mutual TLS is employed, this represents
// the hash of the client's TLS certificate
TlsCertHash []byte `protobuf:"bytes,8,opt,name=tls_cert_hash,json=tlsCertHash,proto3" json:"tls_cert_hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

type SignatureHeader struct {
// Creator of the message, a marshaled msp.SerializedIdentity
Creator []byte `protobuf:"bytes,1,opt,name=creator,proto3" json:"creator,omitempty"`
// Arbitrary number that may only be used once. Can be used to detect replay attacks.
Nonce []byte `protobuf:"bytes,2,opt,name=nonce,proto3" json:"nonce,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

gRPC定义

1
2
3
4
5
6
7
8
9
10
11
// EndorserClient is the client API for Endorser service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type EndorserClient interface {
ProcessProposal(ctx context.Context, in *SignedProposal, opts ...grpc.CallOption) (*ProposalResponse, error)
}

// EndorserServer is the server API for Endorser service.
type EndorserServer interface {
ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error)
}

SDK发送Proposal

Peer接收Proposal

Peer处理Proposal主流程

主要是把背书节点的背书工作聚合一下:

  1. Proposal预处理
  2. 获取交易执行模拟器,模拟执行Proposal
  3. 如果模拟执行成功,调用ESCC对Proposal和结果进行背书,如果模拟执行失败直接返回背书失败的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
...
// 0 -- check and validate
// 这里有相当多的工作量
vr, err := e.preProcess(signedProp)
if err != nil {
resp := vr.resp
return resp, err
}

prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid

// 获取指定账本模拟器
// obtaining once the tx simulator for this proposal. This will be nil
// for chainless proposals
// Also obtain a history query executor for history queries, since tx simulator does not cover history
var txsim ledger.TxSimulator
var historyQueryExecutor ledger.HistoryQueryExecutor
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
...
defer txsim.Done()

if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
}

txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim,
HistoryQueryExecutor: historyQueryExecutor,
}
// this could be a request to a chainless SysCC

// 模拟执行交易,失败则返回背书失败的响应
// 1 -- simulate
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
if res != nil {
if res.Status >= shim.ERROR {
endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid)
var cceventBytes []byte
if ccevent != nil {
cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event bytes")
}
}
pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}

return pResp, nil
}
}

// 对模拟执行的结果进行签名背书
// 2 -- endorse and get a marshalled ProposalResponse message
var pResp *pb.ProposalResponse

// TODO till we implement global ESCC, CSCC for system chaincodes
// chainless proposals (such as CSCC) don't have to be endorsed
if chainID == "" {
pResp = &pb.ProposalResponse{Response: res}
} else {
// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim
pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)

...
}

// Set the proposal response payload - it
// contains the "return value" from the
// chaincode invocation
pResp.Response = res

// total failed proposals = ProposalsReceived-SuccessfulProposals
e.Metrics.SuccessfulProposals.Add(1)
success = true

return pResp, nil
}

preProcess 检查和获取信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// preProcess checks the tx proposal headers, uniqueness and ACL
// 检查proposal、ACL
func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, error) {
// 包含proposal、header、chainID、txid等信息
vr := &validateResult{}
// at first, we check whether the message is valid
// 检查proposal,并获取各种需要的信息
prop, hdr, hdrExt, err := validation.ValidateProposalMessage(signedProp)

if err != nil {
e.Metrics.ProposalValidationFailed.Add(1)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

// 获取Header中的2个Header
chdr, err := putils.UnmarshalChannelHeader(hdr.ChannelHeader)
if err != nil {
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

shdr, err := putils.GetSignatureHeader(hdr.SignatureHeader)
if err != nil {
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

// 检查是否调用了不可外部(用户)的系统链码
// 先找到链码实例,然后调用链码的方法判断本身是否可调用
// block invocations to security-sensitive system chaincodes
if e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) {
endorserLogger.Errorf("Error: an attempt was made by %#v to invoke system chaincode %s", shdr.Creator, hdrExt.ChaincodeId.Name)
err = errors.Errorf("chaincode %s cannot be invoked through a proposal", hdrExt.ChaincodeId.Name)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

chainID := chdr.ChannelId
txid := chdr.TxId
endorserLogger.Debugf("[%s][%s] processing txid: %s", chainID, shorttxid(txid), txid)

if chainID != "" {
// labels that provide context for failure metrics
meterLabels := []string{
"channel", chainID,
"chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version,
}

// 检查交易是否已上链
// Here we handle uniqueness check and ACLs for proposals targeting a chain
// Notice that ValidateProposalMessage has already verified that TxID is computed properly
if _, err = e.s.GetTransactionByID(chainID, txid); err == nil {
// increment failure due to duplicate transactions. Useful for catching replay attacks in
// addition to benign retries
e.Metrics.DuplicateTxsFailure.With(meterLabels...).Add(1)
err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

// 用户链码检查ACL
// check ACL only for application chaincodes; ACLs
// for system chaincodes are checked elsewhere
if !e.s.IsSysCC(hdrExt.ChaincodeId.Name) {
// check that the proposal complies with the Channel's writers
if err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil {
e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
}
} else {
// chainless proposals do not/cannot affect ledger and cannot be submitted as transactions
// ignore uniqueness checks; also, chainless proposals are not validated using the policies
// of the chain since by definition there is no chain; they are validated against the local
// MSP of the peer instead by the call to ValidateProposalMessage above
}

// 保存提取的各信息
vr.prop, vr.hdrExt, vr.chainID, vr.txid = prop, hdrExt, chainID, txid
return vr, nil
}

// ValidateProposalMessage checks the validity of a SignedProposal message
// this function returns Header and ChaincodeHeaderExtension messages since they
// have been unmarshalled and validated
func ValidateProposalMessage(signedProp *pb.SignedProposal) (*pb.Proposal, *common.Header, *pb.ChaincodeHeaderExtension, error) {
if signedProp == nil {
return nil, nil, nil, errors.New("nil arguments")
}

putilsLogger.Debugf("ValidateProposalMessage starts for signed proposal %p", signedProp)

// extract the Proposal message from signedProp
prop, err := utils.GetProposal(signedProp.ProposalBytes)
if err != nil {
return nil, nil, nil, err
}

// 1) look at the ProposalHeader
hdr, err := utils.GetHeader(prop.Header)
if err != nil {
return nil, nil, nil, err
}

// validate the header
chdr, shdr, err := validateCommonHeader(hdr)
if err != nil {
return nil, nil, nil, err
}

// 从SignatureHeader交易客户端的签名
// validate the signature
err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
if err != nil {
// log the exact message on the peer but return a generic error message to
// avoid malicious users scanning for channels
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(shdr.Creator, sId)
if err != nil {
// log the error here as well but still only return the generic error
err = errors.Wrap(err, "could not deserialize a SerializedIdentity")
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
}
return nil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid)
}

// 检查txid的计算是否符合规则
// Verify that the transaction ID has been computed properly.
// This check is needed to ensure that the lookup into the ledger
// for the same TxID catches duplicates.
err = utils.CheckTxID(
chdr.TxId,
shdr.Nonce,
shdr.Creator)
if err != nil {
return nil, nil, nil, err
}

// 依据不同的proposal类型对proposal分别进行检查
// continue the validation in a way that depends on the type specified in the header
switch common.HeaderType(chdr.Type) {
case common.HeaderType_CONFIG:
//which the types are different the validation is the same
//viz, validate a proposal to a chaincode. If we need other
//special validation for confguration, we would have to implement
//special validation
fallthrough
case common.HeaderType_ENDORSER_TRANSACTION:
// 主要是提取ChaincodeHeaderExtension
// validation of the proposal message knowing it's of type CHAINCODE
chaincodeHdrExt, err := validateChaincodeProposalMessage(prop, hdr)
if err != nil {
return nil, nil, nil, err
}

return prop, hdr, chaincodeHdrExt, err
default:
//NOTE : we proably need a case
return nil, nil, nil, errors.Errorf("unsupported proposal type %d", common.HeaderType(chdr.Type))
}
}

背书节点模拟执行交易

获取模拟器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Support contains functions that the endorser requires to execute its tasks
type Support interface {
crypto.SignerSupport
// IsSysCCAndNotInvokableExternal returns true if the supplied chaincode is
// ia system chaincode and it NOT invokable
IsSysCCAndNotInvokableExternal(name string) bool

// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error)

}

// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
func (s *SupportImpl) GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error) {
// 使用账本和txid创建模拟器,每个交易有单独的模拟器
lgr := s.Peer.GetLedger(ledgername)
if lgr == nil {
return nil, errors.Errorf("Channel does not exist: %s", ledgername)
}
return lgr.NewTxSimulator(txid)
}

// NewTxSimulator returns new `ledger.TxSimulator`
func (l *kvLedger) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
return l.txtmgmt.NewTxSimulator(txid)
}

// NewTxSimulator implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
logger.Debugf("constructing new tx simulator")
s, err := newLockBasedTxSimulator(txmgr, txid)
if err != nil {
return nil, err
}
txmgr.commitRWLock.RLock()
return s, nil
}

// 就2项重要的:查询执行器、读写集构建器
// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr`
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
rwsetBuilder *rwsetutil.RWSetBuilder
writePerformed bool
pvtdataQueriesPerformed bool
simulationResultsComputed bool
paginatedQueriesPerformed bool
}

func newLockBasedTxSimulator(txmgr *LockBasedTxMgr, txid string) (*lockBasedTxSimulator, error) {
// 创建读写集构建器,能帮助构建读写集
rwsetBuilder := rwsetutil.NewRWSetBuilder()
helper := newQueryHelper(txmgr, rwsetBuilder)
logger.Debugf("constructing new tx simulator txid = [%s]", txid)
return &lockBasedTxSimulator{lockBasedQueryExecutor{helper, txid}, rwsetBuilder, false, false, false, false}, nil
}

// LockBasedQueryExecutor is a query executor used in `LockBasedTxMgr`
// "只读",不包含写相关的操作
type lockBasedQueryExecutor struct {
helper *queryHelper
txid string
}

模拟执行

endorser部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
defer txsim.Done()

// 历史查询器
if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
}

txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim, // 模拟器在此
HistoryQueryExecutor: historyQueryExecutor,
}
// this could be a request to a chainless SysCC

// TODO: if the proposal has an extension, it will be of type ChaincodeAction;
// if it's present it means that no simulation is to be performed because
// we're trying to emulate a submitting peer. On the other hand, we need
// to validate the supplied action before endorsing it

// 模拟执行交易,失败则返回背书失败的响应
// 1 -- simulate
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)

调用chaincode模块模拟执行交易,获取交易执行的公开和私密数据读写集,以及交易执行产生的事件,并把结果返回给上层进行背书。

其中还包含了私密数据的处理,会把它取出来,然后通过Gossip传播给在私密数据中的Peer节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// SimulateProposal simulates the proposal by calling the chaincode
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) {
endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", txParams.ChannelID, shorttxid(txParams.TxID), cid)
defer endorserLogger.Debugf("[%s][%s] Exit", txParams.ChannelID, shorttxid(txParams.TxID))
// we do expect the payload to be a ChaincodeInvocationSpec
// if we are supporting other payloads in future, this be glaringly point
// as something that should change
// 根据Proposal生成Invoke需要的信息
cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal)
if err != nil {
return nil, nil, nil, nil, err
}

// 链码的元数据
var cdLedger ccprovider.ChaincodeDefinition
var version string

// 设置version
if !e.s.IsSysCC(cid.Name) {
// 根据要调用的链码名称,从lscc获取链码的元数据
cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name))
}
version = cdLedger.CCVersion()

// 实际被打桩了,无实现
err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger)
if err != nil {
return nil, nil, nil, nil, err
}
} else {
// scc版本是固定的"latest"
version = util.GetSysCCVersion()
}

// ---3. execute the proposal and get simulation results
var simResult *ledger.TxSimulationResults
var pubSimResBytes []byte
var res *pb.Response
var ccevent *pb.ChaincodeEvent
// 模拟执行,执行结果保存在模拟器
res, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid)
if err != nil {
endorserLogger.Errorf("[%s][%s] failed to invoke chaincode %s, error: %+v", txParams.ChannelID, shorttxid(txParams.TxID), cid, err)
return nil, nil, nil, nil, err
}

if txParams.TXSimulator != nil {
// 通过模拟器获取模拟执行结果,包含公开和私密数据2份读写集
if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil {
txParams.TXSimulator.Done()
return nil, nil, nil, nil, err
}

// 存在私密数据
if simResult.PvtSimulationResults != nil {
if cid.Name == "lscc" {
// TODO: remove once we can store collection configuration outside of LSCC
txParams.TXSimulator.Done()
return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")
}
// 获取要通过Gossip传播的私密数据
pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txParams.TXSimulator)
// To read collection config need to read collection updates before
// releasing the lock, hence txParams.TXSimulator.Done() moved down here
txParams.TXSimulator.Done()

if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config")
}
endorsedAt, err := e.s.GetLedgerHeight(txParams.ChannelID)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprint("failed to obtain ledger height for channel", txParams.ChannelID))
}
// Add ledger height at which transaction was endorsed,
// `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'.
// However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and
// manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now.
// Ideally, ledger should add support in the simulator as a first class function `GetHeight()`.
pvtDataWithConfig.EndorsedAt = endorsedAt
// 把私密数据同通道id、交易id和区块高度发出去,代表私密数据所属的区块和交易
if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {
return nil, nil, nil, nil, err
}
}

// 交易模拟完成,释放模拟器占用的资源
txParams.TXSimulator.Done()
// 获取模拟执行的公开结果
if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
return nil, nil, nil, nil, err
}
}

// 返回链码元数据、模拟执行结果、交易执行产生的事件
return cdLedger, res, pubSimResBytes, ccevent, nil
}

callChaincode 调用chaincode模块执行链码。在前面的流程中,还没有区分系统链码SCC和用户链码UCC,SCC和UCC都会通过Execute函数被传递给chaincode模块而执行。

如果是调用lscc部署或升级UCC,会调用ExecuteLegacyInit执行链码容器的初始化。

最后返回链码模拟执行结果和事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// call specified chaincode (system or user)
func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, version string, input *pb.ChaincodeInput, cid *pb.ChaincodeID) (*pb.Response, *pb.ChaincodeEvent, error) {
...
// scc也在这执行
// is this a system chaincode
res, ccevent, err = e.s.Execute(txParams, txParams.ChannelID, cid.Name, version, txParams.TxID, txParams.SignedProp, txParams.Proposal, input)
if err != nil {
return nil, nil, err
}
...

// 如果是调用lscc部署或升级链码,会走这段流程
if cid.Name == "lscc" && len(input.Args) >= 3 && (string(input.Args[0]) == "deploy" || string(input.Args[0]) == "upgrade") {
userCDS, err := putils.GetChaincodeDeploymentSpec(input.Args[2], e.PlatformRegistry)
...
// 进行链码容器初始化,最后会调用链码的Init的函数
_, _, err = e.s.ExecuteLegacyInit(txParams, txParams.ChannelID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txParams.TxID, txParams.SignedProp, txParams.Proposal, cds)
...
}
// ----- END -------

return res, ccevent, err
}

Support 接口实际集合了众多背书节点需要的外部模块功能,比如链码、系统链码、ACL等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Support contains functions that the endorser requires to execute its tasks
type Support interface

// SupportImpl provides an implementation of the endorser.Support interface
// issuing calls to various static methods of the peer
type SupportImpl struct {
*PluginEndorser
crypto.SignerSupport
Peer peer.Operations
PeerSupport peer.Support
ChaincodeSupport *chaincode.ChaincodeSupport
SysCCProvider *scc.Provider
ACLProvider aclmgmt.ACLProvider
}

Execute就是调用ChaincodeSupport.Execute实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Execute a proposal and return the chaincode response
func (s *SupportImpl) Execute(txParams *ccprovider.TransactionParams, cid, name, version, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
cccid := &ccprovider.CCContext{
Name: name,
Version: version,
}

// decorate the chaincode input
decorators := library.InitRegistry(library.Config{}).Lookup(library.Decoration).([]decoration.Decorator)
input.Decorations = make(map[string][]byte)
input = decoration.Apply(prop, input, decorators...)
txParams.ProposalDecorations = input.Decorations

return s.ChaincodeSupport.Execute(txParams, cccid, input)
}
chaincode部分

通过上面的接口,跨入chaincode模块的大门。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (cs *ChaincodeSupport) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
// Invoke得到ChaincodeMessage
resp, err := cs.Invoke(txParams, cccid, input)
// 根据ChaincodeMessage得到Response和事件
return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}

func (cs *ChaincodeSupport) Invoke(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
h, err := cs.Launch(txParams.ChannelID, cccid.Name, cccid.Version, txParams.TXSimulator)
if err != nil {
return nil, err
}

// 执行调用链码的交易(和链码之间的消息为ChaincodeMessage_TRANSACTION)
cctype := pb.ChaincodeMessage_TRANSACTION
return cs.execute(cctype, txParams, cccid, input, h)
}
获取链码执行环境

Launch 可以获取链码执行环境,即用户链码容器,如果已实例化的链码,在当前背书节点上,链码容器未启动,则启动链码容器,Launch会返回一个跟链码容器交互Handler。

某个 Peer 上可以部署多个链码容器,Peer 为了和这些链码容器交互/通信,给每个链码容器都创建了一个 Handler,Handler 携带了 Peer 和链码容器交互的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Launch starts executing chaincode if it is not already running. This method
// blocks until the peer side handler gets into ready state or encounters a fatal
// error. If the chaincode is already running, it simply returns.
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
cname := chaincodeName + ":" + chaincodeVersion
if h := cs.HandlerRegistry.Handler(cname); h != nil {
return h, nil
}

// 启动链码容器 ...

h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
}

return h, nil
}

链码容器的启动过程,不是本文的重点,所以不继续深入Launch的调用细节。

模拟执行交易

execute封装出执行交易的消息,然后使用 Handler 执行交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// execute executes a transaction and waits for it to complete until a timeout value.
func (cs *ChaincodeSupport) execute(cctyp pb.ChaincodeMessage_Type, txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput, h *Handler) (*pb.ChaincodeMessage, error) {
input.Decorations = txParams.ProposalDecorations
// 创建消息
ccMsg, err := createCCMessage(cctyp, txParams.ChannelID, txParams.TxID, input)
if err != nil {
return nil, errors.WithMessage(err, "failed to create chaincode message")
}

// 执行交易
ccresp, err := h.Execute(txParams, cccid, ccMsg, cs.ExecuteTimeout)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error sending"))
}

return ccresp, nil
}

注意以下这个参数 txParams *ccprovider.TransactionParams 其类型定义如下,它包含了一条交易执行过程中的信息和资源,所以交易传递的过程中,一直有这个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// TransactionParams are parameters which are tied to a particular transaction
// and which are required for invoking chaincode.
type TransactionParams struct {
TxID string
ChannelID string
SignedProp *pb.SignedProposal
Proposal *pb.Proposal
TXSimulator ledger.TxSimulator
HistoryQueryExecutor ledger.HistoryQueryExecutor
CollectionStore privdata.CollectionStore
IsInitTransaction bool

// this is additional data passed to the chaincode
ProposalDecorations map[string][]byte
}

Handler 执行交易的过程如下,创建交易执行的上下文 Context,因为链码容器在执行交易的时候,会和 Peer 之间进行多次通信,进行数据的读写,上下文可以让数据读写获取到正确的信息。

之后 Handler 把消息发送给链码容器,并等待链码容器发来包含执行结果的消息,或者执行超时,默认执行时间是 30s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
chaincodeLogger.Debugf("Entry")
defer chaincodeLogger.Debugf("Exit")

// 私密数据
txParams.CollectionStore = h.getCollectionStore(msg.ChannelId)
// 是否是执行链码初始化
txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT)

// 创建交易context
txctx, err := h.TXContexts.Create(txParams)
if err != nil {
return nil, err
}
// 退出时(执行交易完毕),释放交易上下文资源
defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)

// proposal保存到msg
if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
return nil, err
}

// 向链码容器发送msg
h.serialSendAsync(msg)

// 等待链码容器响应,或者超时
var ccresp *pb.ChaincodeMessage
select {
case ccresp = <-txctx.ResponseNotifier:
// response is sent to user or calling chaincode. ChaincodeMessage_ERROR
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
ccName := cccid.Name + ":" + cccid.Version
h.Metrics.ExecuteTimeouts.With(
"chaincode", ccName,
).Add(1)
}

return ccresp, err
}
处理链码容器模拟响应

链码容器执行的响应会向上传递,直到 ChaincodeSupport.Execute,它调用 processChaincodeExecutionResult 把链码容器返回的响应,转化为交易模拟执行的 Response,而 Response 最终会返回给Endorser,大家可去调用流程上翻。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func processChaincodeExecutionResult(txid, ccName string, resp *pb.ChaincodeMessage, err error) (*pb.Response, *pb.ChaincodeEvent, error) {
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to execute transaction %s", txid)
}
if resp == nil {
return nil, nil, errors.Errorf("nil response from transaction %s", txid)
}

if resp.ChaincodeEvent != nil {
resp.ChaincodeEvent.ChaincodeId = ccName
resp.ChaincodeEvent.TxId = txid
}

switch resp.Type {
// 交易执行成功则提取Payload中保存的Response
case pb.ChaincodeMessage_COMPLETED:
res := &pb.Response{}
err := proto.Unmarshal(resp.Payload, res)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to unmarshal response for transaction %s", txid)
}
return res, resp.ChaincodeEvent, nil

// 失败,则提取Payload中保存的错误信息
case pb.ChaincodeMessage_ERROR:
return nil, resp.ChaincodeEvent, errors.Errorf("transaction returned with failure: %s", resp.Payload)

default:
return nil, nil, errors.Errorf("unexpected response type %d for transaction %s", resp.Type, txid)
}
}

释放模拟器资源

回想一下,在 Endorser.SimulateProposal 中,它获取了 交易模拟执行器 TXSimulator,这里面可是有很多资源的,如果不及时释放,在高 TPS 下,Peer压力上大,资源泄露,性能低下等问题会爆发出来。

txParams.TXSimulator.Done() 用来释放资源,上文提到 TxSimulator 包含了 QueryExecutor, lockBasedQueryExecutor 实现了 QueryExecutor,也就是说,主要是释放查询操作相关的资源。

从源码可以看到会释放读写锁以及迭代器资源,如果不及时释放,后果果然不堪。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Done implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) Done() {
logger.Debugf("Done with transaction simulation / query execution [%s]", q.txid)
q.helper.done()
}

func (h *queryHelper) done() {
if h.doneInvoked {
return
}

defer func() {
// 释放锁
h.txmgr.commitRWLock.RUnlock()
h.doneInvoked = true
// 释放迭代器
for _, itr := range h.itrs {
itr.Close()
}
}()
}

ESCC处理模拟执行结果

上文提到,模拟执行的 Response 会最终回到 Endorser,Endorser 会调用 ESCC 对结果进行背书,最终生成 ProposalResponse,我们看一下这个过程。

1
2
3
4
5
6
7
8
9
10
11
12
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
// Pre-process, simulate

if chainID == "" {
pResp = &pb.ProposalResponse{Response: res}
} else {
// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim
pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
// ...
}
// ...
}
Endorser.endorseProposal

背书链码实现了可插拔,可以使用不同的ESCC,系统链码和用户链码的背书过程是不同的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// endorse the proposal by calling the ESCC
func (e *Endorser) endorseProposal(_ context.Context, chainID string, txid string, signedProp *pb.SignedProposal, proposal *pb.Proposal, response *pb.Response, simRes []byte, event *pb.ChaincodeEvent, visibility []byte, ccid *pb.ChaincodeID, txsim ledger.TxSimulator, cd ccprovider.ChaincodeDefinition) (*pb.ProposalResponse, error) {
endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", chainID, shorttxid(txid), ccid)
defer endorserLogger.Debugf("[%s][%s] Exit", chainID, shorttxid(txid))

// 系统链码和用户链码使用不同的ESCC
isSysCC := cd == nil
// 1) extract the name of the escc that is requested to endorse this chaincode
var escc string
// ie, "lscc" or system chaincodes
if isSysCC {
escc = "escc"
} else {
escc = cd.Endorsement()
}

endorserLogger.Debugf("[%s][%s] escc for chaincode %s is %s", chainID, shorttxid(txid), ccid, escc)

// marshalling event bytes
var err error
var eventBytes []byte
if event != nil {
eventBytes, err = putils.GetBytesChaincodeEvent(event)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event bytes")
}
}

// set version of executing chaincode
if isSysCC {
// if we want to allow mixed fabric levels we should
// set syscc version to ""
ccid.Version = util.GetSysCCVersion()
} else {
ccid.Version = cd.CCVersion()
}

// 创建背书上下文信息
ctx := Context{
PluginName: escc, // 插件名称
Channel: chainID,
SignedProposal: signedProp,
ChaincodeID: ccid,
Event: eventBytes,
SimRes: simRes,
Response: response,
Visibility: visibility,
Proposal: proposal,
TxID: txid,
}

// 调用插件背书
return e.s.EndorseWithPlugin(ctx)
}

背书插件实现下面的接口即可。

1
2
3
4
5
6
7
8
9
10
11
12
// Plugin endorses a proposal response
type Plugin interface {
// Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it.
// Returns:
// The Endorsement: A signature over the payload, and an identity that is used to verify the signature
// The payload that was given as input (could be modified within this function)
// Or error on failure
Endorse(payload []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error)

// Init injects dependencies into the instance of the Plugin
Init(dependencies ...Dependency) error
}

使用插件背书,需获取插件实例,然后组装响应Payload,它包含了交易执行的多种结果,然后对Payload以及签名的Proposal背书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// EndorseWithPlugin endorses the response with a plugin
func (pe *PluginEndorser) EndorseWithPlugin(ctx Context) (*pb.ProposalResponse, error) {
endorserLogger.Debug("Entering endorsement for", ctx)

if ctx.Response == nil {
return nil, errors.New("response is nil")
}

if ctx.Response.Status >= shim.ERRORTHRESHOLD {
return &pb.ProposalResponse{Response: ctx.Response}, nil
}

// 获取插件
plugin, err := pe.getOrCreatePlugin(PluginName(ctx.PluginName), ctx.Channel)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.Errorf("plugin with name %s could not be used: %v", ctx.PluginName, err)
}

// 把模拟执行的信息组成生成背书响应Payload
prpBytes, err := proposalResponsePayloadFromContext(ctx)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.Wrap(err, "failed assembling proposal response payload")
}

// 对Payload和签名的Proposal进行背书
endorsement, prpBytes, err := plugin.Endorse(prpBytes, ctx.SignedProposal)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.WithStack(err)
}

resp := &pb.ProposalResponse{
Version: 1,
Endorsement: endorsement,
Payload: prpBytes,
Response: ctx.Response,
}
endorserLogger.Debug("Exiting", ctx)
return resp, nil
}

系统提供的默认背书插件如下,本质是对交易执行结果和Proposal签名人信息进行签名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it.
// Returns:
// The Endorsement: A signature over the payload, and an identity that is used to verify the signature
// The payload that was given as input (could be modified within this function)
// Or error on failure
func (e *DefaultEndorsement) Endorse(prpBytes []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error) {
// 提取Proposal的签名人
signer, err := e.SigningIdentityForRequest(sp)
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("failed fetching signing identity: %v", err))
}

// 得到签名人身份
// serialize the signing identity
identityBytes, err := signer.Serialize()
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("could not serialize the signing identity: %v", err))
}

// 对Payload和身份进行签名
// sign the concatenation of the proposal response and the serialized endorser identity with this endorser's key
signature, err := signer.Sign(append(prpBytes, identityBytes...))
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("could not sign the proposal response payload: %v", err))
}
endorsement := &peer.Endorsement{Signature: signature, Endorser: identityBytes}
return endorsement, prpBytes, nil
}

发送Response

ProcessProposal 会把 ProposalResponse 作为返回值,剩下的就交给 gRPC,发送给请求方了。

总结

本文从宏观和源码层面,解读了交易提案背书涉及的数据结构,以及其主要背书流程,核心可以主要包含以下几步:

  1. 检查Proposal
  2. 为交易创建模拟器,并调用模拟器模拟执行交易,生成执行结果
  3. 背书模块对执行结果和Proposal身份信息背书(签名),然后生成背书响应发送给客户端

关于背书流程,本文未涉及的环节有:

  1. Proposal中各字段,层层递进的含义
  2. 模拟执行交易,是链码执行函数,并和Peer交互的过程,以及模拟执行的各种资源
  3. 2种插件化ESCC的实现

后面的章节,会对相关源码实现做进一步分析。

新老朋友好久不见,我是大彬。今天为大家带来的分享是Go语言垃圾回收,这篇文章筹划的了很久,因为GC也是很强大的一个话题,关于GC已经有很多篇论文还有书籍,想通过一篇文章来介绍Go语言的垃圾回收是困难的,所以决定分几篇文章来完成Go语言垃圾回收的相关话题:

  1. Go垃圾回收 1: 历史和原理
  2. Go垃圾回收 2: GC主要流程
  3. Go垃圾回收 3: 源码分析
  4. Go垃圾回收 4: GC对性能的影响与优化

虽然划分成了3部分,但每个子话题依然很大,依然难写,依然大而不全,每一篇文章都会有宏观与细节,这样的大而不全对于不了解GC的朋友是好事,即可以有宏观上的认识,又可以有重要细节的感知。

这篇文章就是第一个话题:Go垃圾回收历史和原理,希望各位有所收获。

Go语言垃圾回收简介

垃圾指内存中不再使用的内存区域,自动发现与释放这种内存区域的过程就是垃圾回收。

内存资源是有限的,而垃圾回收可以让内存重复使用,并且减轻开发者对内存管理的负担,减少程序中的内存问题。

以下是从网上对垃圾回收的2个定义:

  1. Garbage consists of objects that are dead.
  2. In tracing garbage collection, the term is sometimes used to mean objects that are known to be dead; that is, objects that are unreachable.

Go垃圾回收发展史

  • go1.1,提高效率和垃圾回收精确度。
  • go1.3,提高了垃圾回收的精确度。
  • go1.4,之前版本的runtime大部分是使用C写的,这个版本大量使用Go进行了重写,让GC有了扫描stack的能力,进一步提高了垃圾回收的精确度。

  • go1.5,目标是降低GC延迟,采用了并发标记和并发清除,三色标记write barrier,以及实现了更好的回收器调度,设计文档1文档2,以及这个版本的Go talk

  • go1.6,小优化,当程序使用大量内存时,GC暂停时间有所降低。
  • go1.7,小优化,当程序有大量空闲goroutine,stack大小波动比较大时,GC暂停时间有显著降低。
  • go1.8write barrier切换到hybrid write barrier,以消除STW中的re-scan,把STW的最差情况降低到50us,设计文档
  • go1.9,提升指标比较多,1)过去 runtime.GC, debug.SetGCPercent, 和 debug.FreeOSMemory都不能触发并发GC,他们触发的GC都是阻塞的,go1.9可以了,变成了在垃圾回收之前只阻塞调用GC的goroutine。2)debug.SetGCPercent只在有必要的情况下才会触发GC。
  • go.1.10,小优化,加速了GC,程序应当运行更快一点点
  • go1.12,显著提高了堆内存存在大碎片情况下的sweeping性能,能够降低GC后立即分配内存的延迟。

以上的历史版本信息都来自Go release归档,有兴趣可以去翻阅一下。

Go垃圾回收主要流程

下面这幅图来自Go1.5的go talk,虽然go1.12的GC与go1.5有了许多改变,但总体的流程没有较大改变,并且也找不到官方更新的图了,所有就用这幅图介绍GC主流程。

Go GC

Go 垃圾回收是分轮次的,每一轮GC都是从 Off 状态开始,如果不是 Off 状态,则代表上一轮GC还未完成,如果这时修改指针的值,是直接修改的。

Go 垃圾回收的主要分2部分,第1部分是扫描所有对象进行三色标记,标记为黑色、灰色和白色,标记完成后只有黑色和白色对象,黑色代表使用中对象,白色对象代表垃圾,灰色是白色过渡到黑色的中间临时状态,第2部分是清扫垃圾,即清理白色对象。

第1部分包含了栈扫描、标记和标记结束3个阶段。在栈扫描之前有2个重要的准备:STW(Stop The World)和开启写屏障(WB,Write Barrier)。

STW是为了暂停当前所有运行中的goroutine,进行一些准备工作,比如开启WB,把全局变量,以及每个goroutine中的 Root对象 收集起来,Root对象是标记扫描的源头,可以从Root对象依次索引到使用中的对象。

Objects Reference Tree

假设内存中的对象用圆圈表示,那根据对象的指向关系,所有的对象可以组成若干依赖树,每一个 Root对象 都是树根,按图索骥能找到每一个使用中的对象。但树根不一定是Root对象,也有可能是垃圾,使用灰色树根代表Root对象,白色树根代表垃圾。

每个P都有一个 mcache ,每个 mcache 都有1个Span用来存放 TinyObject,TinyObject 都是不包含指针的对象,所以这些对象可以直接标记为黑色,然后关闭 STW。

如果不了解mcache和Tiny对象,赶紧翻一下这篇文章Go内存分配那些事

每个P都有1个进行扫描标记的 goroutine,可以进行并发标记,关闭STW后,这些 goroutine 就变成可运行状态,接收 Go Scheduler 的调度,被调度时执行1轮标记,它负责第1部分任务:栈扫描、标记和标记结束。

栈扫描阶段就是把前面搜集的Root对象找出来,标记为黑色,然后把它们引用的对象也找出来,标记为灰色,并且加入到gcWork队列,gcWork队列保存了灰色的对象,每个灰色的对象都是一个Work。

后面可以进入标记阶段,它是一个循环,不断的从gcWork队列中取出work,所指向的对象标记为黑色,该对象指向的对象标记为灰色,然后加入队列,直到队列为空。

然后进入标记结束阶段,再次开启STW,不同的版本处理方式是不同的。

在Go1.7的版本是Dijkstra写屏障,这个写屏障只监控堆上指针数据的变动,由于成本原因,没有监控栈上指针的变动,由于应用goroutine和GC的标记goroutine都在运行,当栈上的指针指向的对象变更为白色对象时,这个白色对象应当标记为黑色,需要再次扫描全局变量和栈,以免释放这类不该释放的对象。

在Go1.8及以后的版本引入了混合写屏障,这个写屏障依然不监控栈上指针的变动,但是它的策略,使得无需再次扫描栈和全局变量,但依然需要STW然后进行一些检查。

标记结束阶段的最后会关闭写屏障,然后关闭STW,唤醒熟睡已久的负责清扫垃圾的goroutine。

清扫goroutine是应用启动后立即创建的一个后台goroutine,它会立刻进入睡眠,等待被唤醒,然后执行垃圾清理:把白色对象挨个清理掉,清扫goroutine和应用goroutine是并发进行的。清扫完成之后,它再次进入睡眠状态,等待下次被唤醒。

最后执行一些数据统计和状态修改的工作,并且设置好触发下一轮GC的阈值,把GC状态设置为Off。

以上就是Go垃圾回收的主要流程,但和go1.12的源码稍微有一些不同,比如标记结束后,就开始设置各种状态数据以及把GC状态成了Off,在开启一轮GC时,会自动检测当前是否处于Off,如果不是Off,则当前goroutine会调用清扫函数,帮助清扫goroutine一起清扫span,实际的Go垃圾回收流程以源码为准。

主要流程是宏观一点的角度,接下去会扩散一下,介绍主要流程中提到的各种概念,比如三色标记、并发标记清理、STW、写屏障、辅助GC、GC persent。

几类垃圾回收思想

垃圾回收的研究已经存在了几十年,远在Go诞生之前,就存在了多种垃圾回收的思想,我们这里看几个跟Go垃圾回收相关的几个。

Tracing GC

WIKI介绍:https://en.wikipedia.org/wiki/Tracing_garbage_collection

Tracing GC 是垃圾回收的一个大类,另外一个大类是引用计数,关于各种垃圾回收的类别可以看下这个系列文章深入浅出垃圾回收

本文主要介绍Tracing GC的简要原理,我们首先看一下引用树的概念。把内存中所有的对象,都作为一个节点,对象A中的指针,指向了对象B,就存在从对象A指向对象B的一条边,对象B也可能指向了其他对象,那么根据指向关系就能生成一颗对象引用树。

Objects Reference Tree

把内存中所有的对象引用树组合起来,就组成了一幅图。

Memory Objects

Tracing GC中有2类对象:

  1. 可到达对象,即使用中对象
  2. 不可到达对象,即垃圾

Tracing GC使用对象引用树找到所有可到达的对象,找到可到达对象有2个原则。

原则1:被程序中调用栈,或者全局变量指向的对象是可到达对象。

Root Objects

原则2:被可到达对象指向的对象也是可到达对象。

A是可到达的,并且B被A引用,所以B也是可到达的。

Reachable Objects

Tracing GC使用任何一种图论的遍历算法,都可以从Root对象,根据引用关系找到所有的可到达对象,并把他们做标记。Tracing GC扫描后,黑色对象为可到达对象,剩下的白色对象为不可到达对象。

原生的 Tracing GC 只有黑色和白色2种颜色。

Tracing GC

增量式垃圾回收思想

垃圾回收离不开STW,STW是Stop The World,指会暂停所有正在执行的用户线程/协程,进行垃圾回收的操作,STW为垃圾对象的扫描和标记提供了必要的条件。

非增量式垃圾回收需要STW,在STW期间完成所有垃圾对象的标记,STW结束后慢慢的执行垃圾对象的清理。

增量式垃圾回收也需要STW,在STW期间完成部分垃圾对象的标记,然后结束STW继续执行用户线程,一段时间后再次执行STW再标记部分垃圾对象,这个过程会多次重复执行,直到所有垃圾对象标记完成。

Increment GC

GC算法有3大性能指标:吞吐量、最大暂停时间(最大的STW占时)、内存占用率。增量式垃圾回收不能提高吞吐量,但和非增量式垃圾回收相比,每次STW的时间更短,能够降低最大暂停时间,就是Go每个版本Release Note中提到的GC延迟、GC暂停时间。

下图是非增量式GC和增量式GC的对比:

Normal V.S. Increment GC

以上图片来自 Incremental Garbage Collection in Ruby 2.2 ,它也很好的介绍了增量式垃圾回收的思想。

并发垃圾回收

减少最大暂停时间还有一种思路:并发垃圾回收,注意不是并行垃圾回收。

并行垃圾回收是每个核上都跑垃圾回收的线程,同时进行垃圾回收,这期间为STW,会暂停用户线程的执行。

并发垃圾回收是先STW找到所有的Root对象,然后结束STW,让垃圾标记线程和用户线程并发执行,垃圾标记完成后,再次开启STW,再次扫描和标记,以免释放使用中的内存。

并发垃圾回收和并行垃圾回收的重要区别就是不会持续暂停用户线程,并发垃圾回收也降低了STW的时间,达到了减少最大暂停时间的目的。

图片来自 Reducing Garbage-Collection Pause Time ,橙色线条为垃圾回收线程的运行,蓝色线条为用户线程。

Go垃圾回收主要原理

三色标记

为什么需要三色标记?

三色标记的目的,主要是利用Tracing GC做增量式垃圾回收,降低最大暂停时间。原生Tracing GC只有黑色和白色,没有中间的状态,这就要求GC扫描过程必须一次性完成,得到最后的黑色和白色对象。在前面增量式GC中介绍到了,这种方式会存在较大的暂停时间。

三色标记增加了中间状态灰色,增量式GC运行过程中,应用线程的运行可能改变了对象引用树,只要让黑色对象不直接引用白色对象,GC就可以增量式的运行,减少停顿时间。

什么是三色标记?

三色标记,望文生义可以知道它由3种颜色组成:

  1. 黑色 Black:表示对象是可达的,即使用中的对象,黑色是已经被扫描的对象。
  2. 灰色 Gary:表示被黑色对象直接引用的对象,但还没对它进行扫描。
  3. 白色 White:白色是对象的初始颜色,如果扫描完成后,对象依然还是白色的,说明此对象是垃圾对象。

三色标记规则:黑色不能指向白色对象。即黑色可以指向灰色,灰色可以指向白色。

三色标记主要流程:

  1. 初始所有对象被标记为白色。
  2. 寻找所有Root对象,比如被线程直接引用的对象,把Root对象标记为灰色。
  3. 把灰色对象标记为黑色,并它们引用的对象标记为灰色。
  4. 持续遍历每一个灰色对象,直到没有灰色对象。
  5. 剩余白色对象为垃圾对象。

推荐一篇结合Go代码展示了三色标记的过程的优秀文章:
Golang’s Real-time GC in Theory and Practice

记录三色的方法简介

Go1.12 使用位图和队列结合表示三种颜色状态:

  1. 白色:位图没有标记被扫描。
  2. 灰色:位图被标记已扫描,并且对象在队列。
  3. 黑色:位图被标记已扫描,并且对象已从队列弹出。

位图是全局的,表示了Heap中内存块是否被扫描,是否包含指针等。

队列有全局的一个和每个P有一个本地队列,扫描对象进行标记的过程,优先处理本P的队列,其思想与P的g本地队列和全局队列类似,减少资源竞争,提高并行化。

写屏障

我们结合一段用户代码介绍写屏障:

1
2
A.Next = B
A.Next = &C{}

三色标记的扫描线程是跟用户线程并发执行的,考虑这种情况:

用户线程执行完 A.Next = B 后,扫描线程把A标记为黑色,B标记为灰色,用户线程执行 A.Next = &C{} ,C是新对象,被标记为白色,由于A已经被扫描,不会重复扫描,所以C不会被标记为灰色,造成了黑色对象指向白色对象的情况,这种三色标记中是不允许的,结果是C被认为是垃圾对象,最终被清扫掉,当访问C时会造成非法内存访问而Panic。

写屏障可以解决这个问题,当对象引用树发生改变时,即对象指向关系发生变化时,将被指向的对象标记为灰色,维护了三色标记的约束:黑色对象不能直接引用白色对象,这避免了使用中的对象被释放。

有写屏障后,用户线程执行 A.Next = &C{} 后,写屏障把C标记为灰色。

并发标记

并发垃圾回收的主要思想上文已经介绍,Go的垃圾回收为每个P都分配了一个gcMarker协程,用于并发标记对象,这样有些P在标记对象,而有些P上继续运行用户协程。

Go的并发标记有4种运行模式,还没深入研究,这里举一个并发标记的场景:在goroutine的调度过程中,如果当前P上已经没有g可以执行,也偷不到g时,P就空闲下来了,这时候可以运行当前P的gcMarker协程。

触发GC

GC有3种触发方式:

  • 辅助GC

    在分配内存时,会判断当前的Heap内存分配量是否达到了触发一轮GC的阈值(每轮GC完成后,该阈值会被动态设置),如果超过阈值,则启动一轮GC。

  • 调用runtime.GC()强制启动一轮GC。

  • sysmon是运行时的守护进程,当超过 forcegcperiod (2分钟)没有运行GC会启动一轮GC。

GC调节参数

Go垃圾回收不像Java垃圾回收那样,有很多参数可供调节,Go为了保证使用GC的简洁性,只提供了一个参数GOGC

GOGC代表了占用中的内存增长比率,达到该比率时应当触发1次GC,该参数可以通过环境变量设置。

它的单位是百分比,取值范围并不是 [0, 100],可以是1000,甚至2000,2000时代表2000%,即20倍。

假如当前heap占用内存为4MB,GOGC = 75

1
4 * (1+75%) = 7MB

等heap占用内存大小达到7MB时会触发1轮GC。

GOGC还有2个特殊值:

  1. "off" : 代表关闭GC
  2. 0 : 代表持续进行垃圾回收,只用于调试

总结

本文主要介绍了Go垃圾回收的发展史,以及Go垃圾回收的一些主要概念,是为掌握Go垃圾回收提供一个基础。下期文章将把本文提到的概念串起来,介绍Go垃圾回收的主要流程,下期见。

参考资料

JAVA (JAVA8 HOTSPOT VM) GO
Collector Several collectors (Serial, Parallel, CMS, G1) CMS
Compaction Compacts Does not compact
Generational GC Generational GC Non-generational GC
Tuning parameters Depends on the collector.Multiple parameters available. Go垃圾回收 only
  • 【译】 Golang 中的垃圾回收(一)

    这篇文章是William Kennedy垃圾回收系列文章的第一篇的译文,这个文章从宏观的角度介绍了垃圾回收的原理,把垃圾回收跟调度结合起来介绍,分析了Go GC是如何实现低延时的。并且详细介绍了并发标记、STW、并发清除等。

  • 图解Golang的GC算法

    RyuGou用图的方式简述了三色标记法的标记清除过程以及写屏障。

  • Golang’s Real-time GC in Theory and Practice

    这篇文章有一个非常棒的GC动画。

  • 学习 Golang GC

    这篇文章对GC的历史、原理、goroutine栈,Go GC历史,基础原理,触发时间都有介绍,是一篇大而全的文章,但每个部分确实也都不详细,值得再参考。

  • Golang 垃圾回收剖析

  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/10/20/go-gc-1-history-and-priciple/

关注公众号,获取最新Golang文章

前言

Event是应用和Fabric网络交互的一种方式,应用可以通过SDKPeer订阅某种类型的事件,当Peer发现事件发生时,可以把Event发送给应用,应用获取到通知信息。

Event功能介绍

Event从来源上可以分为2类:

  1. 链码容器发出的Event
  2. Peer上账本变更发出的Event

fabric event

图源自Tutorial Chaincode Event Listener on Hyperledger Fabric Java SDK

翻阅Node SDK和Go SDK的文档,发现SDK提供了4类事件:

  1. BlockEvent,可以用来监控被添加到账本上的区块。客户端需要Admin权限,这样才能读取完整的区块,每产生一个区块,它都会接收到通知。区块中有交易,交易中有chaincode event,所以可以通过BlockEvent获取其他事件。
  2. FilteredBlockEvent,可以用来监控简要的区块信息,当不只关心区块包含了哪些交易,交易是否成功时,它非常实用,还可以降低网络负载。它包含区块的部分信息,所以被称为filtered,信息有channel ID,区块号,交易的validation code。
  3. TransactionStatusEvent,可以用来监控某个交易在当前组织的peer何时完成。可以得到交易的validation code和交易所在区块。
  4. ChaincodeEvent,用来监听Chaincode发出的事件,不同的链码可以自定义自己的事件,所以这个更具有个性化。包含了交易id、区块号、链码id、事件名称,事件内容。如果想要查看事件内容,客户端所使用的账号,必须是Admin权限。

另外,订阅事件时可以指定开启和结束的区块号范围,如果开始的区块号已经产生,即区块已经写入账本,可以重放事件,更多信息可以看下面的文档。

关于Event的2篇重要文档,深深感觉Node SDK的文档,比Go SDK的文档丰富。

架构

上一节的介绍能够知道有哪些Event,各有什么作用,这一节介绍SDK和Peer是如何进行事件交互的。

SDK和Peer之间是通过gRPC通信的,gRPC的protos的定义文件4种message:

1
2
3
4
FilteredBlock,给FilteredBlockEvent使用
FilteredTransaction,结合下一个,给FilteredTransactionEvent使用
FilteredTransactionActions
FilteredChaincodeAction,给ChaincodeEvent使用

和1个Response,其中使用了oneof。

  • status,指http status,成功的时候无需使用,错误的时候可以使用指明错误。
  • block,给BlockEvent使用
  • FilteredBlock,给另外3种事件使用
1
2
3
4
5
6
7
8
// DeliverResponse
message DeliverResponse {
oneof Type {
common.Status status = 1;
common.Block block = 2;
FilteredBlock filtered_block = 3;
}
}

以及2个gRPC通信接口:

1
2
3
4
5
6
7
8
9
10
11
12
service Deliver {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received
rpc Deliver (stream common.Envelope) returns (stream DeliverResponse) {
}
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received
rpc DeliverFiltered (stream common.Envelope) returns (stream DeliverResponse) {
}
}

咦,4个Event,怎么只有2个接口?

配合下图,我们做一番讲解。

fabric sdk go event

对于Peer而言,只有2中类型的订阅:

  1. BlockEvent,即完整的区块
  2. FilteredBlockEvent,即不完整的区块,可以根据FilteredBlockEvent中的信息,生成FilteredTransactionEvent信息和ChainCodeEvent信息

图中深蓝色和绿色的线分别代表BlockEvent和FilteredBlockEvent相关的数据流,BlockEvent使用的是Deliver函数,FilteredBlockEvent使用的是DeliverFiltered函数。

每一个事件订阅,都是一个gRPC连接,Peer会不断的从账本读区块,然后根据区块生成事件,发送给客户端。

Go SDK中实现了一个Dispatcher,就是提供这么一个中转的功能,对上层应用提供4中类型的事件,把4种事件注册请求转换为2种,调用DeliverClient把事件订阅请求发送给Peer,又把Peer发来的2种事件,转换为应用订阅的事件响应。

Peer启动时,启动gRPC服务后,会注册好DeliverServer接收事件订阅,然后调用deliverBlocks进入循环,在新区块产生后,会生成订阅的BlockEvent或FilteredBlockEvent,利用ResponseSender把事件发送给SDK。

event.pb.go源码

这就是根据events.proto生成的Go文件,负责创建gRPC通信的客户端和服务端,以及两边的消息发送。

主要关注下2个接口:

deliverClient实现了DeliverClient,已经在该源文件实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
// DeliverClient is the client API for Deliver service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DeliverClient interface {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received
Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error)
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received
DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error)
}

DeliverServer是服务端的接口,需要Peer实现。

1
2
3
4
5
6
7
8
9
10
11
// DeliverServer is the server API for Deliver service.
type DeliverServer interface {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received
Deliver(Deliver_DeliverServer) error
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received
DeliverFiltered(Deliver_DeliverFilteredServer) error
}

Peer event源码

Peer干了这么几件事:

  1. 注册gRPC服务,即注册接受客户端发来的事件订阅的函数
  2. gRPC收到消息,订阅相应事件注册处理函数
  3. 处理函数持续向客户端发送区块事件,直到结束

添加Deliver服务

serve是Peer启动后的运行的主函数,它会创建gRPC server,以及创建DeliverEvent server,并把它绑定到gRPC server上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// peer/node/start.go
func serve(args []string) error {
...
// 创建peer的gRPC server
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
if err != nil {
logger.Fatalf("Failed to create peer server (%s)", err)
}
...
// 创建和启动基于gRPC的event deliver server
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
...
}

创建DeliverEventsServer,实际是创建好处理事件订阅的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// core/peer/deliverevents.go
// NewDeliverEventsServer creates a peer.Deliver server to deliver block and
// filtered block events
func NewDeliverEventsServer(mutualTLS bool, policyCheckerProvider PolicyCheckerProvider, chainManager deliver.ChainManager, metricsProvider metrics.Provider) peer.DeliverServer {
timeWindow := viper.GetDuration("peer.authentication.timewindow")
if timeWindow == 0 {
defaultTimeWindow := 15 * time.Minute
logger.Warningf("`peer.authentication.timewindow` not set; defaulting to %s", defaultTimeWindow)
timeWindow = defaultTimeWindow
}
metrics := deliver.NewMetrics(metricsProvider)
return &server{
// 创建handler
dh: deliver.NewHandler(chainManager, timeWindow, mutualTLS, metrics),
policyCheckerProvider: policyCheckerProvider,
}
}

// NewHandler creates an implementation of the Handler interface.
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics) *Handler {
return &Handler{
ChainManager: cm,
TimeWindow: timeWindow,
BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)),
Metrics: metrics,
}
}

server实现了DeliverServer接口,当gRPC接收到事件注册时,就可以调用Deliver或者FilteredDeliver被调用时,就调用server的DeliverFiltered或者Deliver函数。

1
2
3
4
5
// server holds the dependencies necessary to create a deliver server
type server struct {
dh *deliver.Handler
policyCheckerProvider PolicyCheckerProvider
}

接收事件订阅

BlockEvent的注册和事件处理主要流程如下:

1
2
server.Deliver -> Handler.Handle ->
deliverBlocks -> SendBlockResponse -> blockResponseSender.SendBlockResponse -> gRPC生成的server Send函数

FilteredBlockEvent的注册和事件处理主要流程如下:

1
2
server.DeliverFiltered -> Handler.Handle ->
deliverBlocks -> SendBlockResponse -> filteredBlockResponseSender.SendBlockResponseg -> RPC生成的server Send函数

它们2个流程是类似的,下面就以BlockEvent的流程介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Deliver sends a stream of blocks to a client after commitment
func (s *server) Deliver(srv peer.Deliver_DeliverServer) (err error) {
logger.Debugf("Starting new Deliver handler")
defer dumpStacktraceOnPanic()
// getting policy checker based on resources.Event_Block resource name
deliverServer := &deliver.Server{
PolicyChecker: s.policyCheckerProvider(resources.Event_Block),
Receiver: srv,
// 创建了sender
ResponseSender: &blockResponseSender{
Deliver_DeliverServer: srv,
},
}
return s.dh.Handle(srv.Context(), deliverServer)
}

// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
addr := util.ExtractRemoteAddress(ctx)
logger.Debugf("Starting new deliver loop for %s", addr)
h.Metrics.StreamsOpened.Add(1)
defer h.Metrics.StreamsClosed.Add(1)
for {
logger.Debugf("Attempting to read seek info message from %s", addr)
envelope, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}

// 主体
status, err := h.deliverBlocks(ctx, srv, envelope)
if err != nil {
return err
}

err = srv.SendStatusResponse(status)
if status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}

logger.Debugf("Waiting for new SeekInfo from %s", addr)
}
}

deliverBlocks的主要作用就是不停的获取区块,然后调用sender发送事件,其中还包含了事件订阅信息的获取,错误处理等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
...
for {
...
var block *cb.Block
var status cb.Status

iterCh := make(chan struct{})
go func() {
// 获取下一个区块,当账本Append Block时,就可以拿到要写入到账本的区块
block, status = cursor.Next()
close(iterCh)
}()
...
// 发送区块
if err := srv.SendBlockResponse(block); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
}

h.Metrics.BlocksSent.With(labels...).Add(1)

// 停止判断
if stopNum == block.Header.Number {
break
}
}
...
}

Iterator接口用来获取区块.

1
2
3
4
5
6
7
8
// Iterator is useful for a chain Reader to stream blocks as they are created
type Iterator interface {
// Next blocks until there is a new block available, or returns an error if
// the next block is no longer retrievable
Next() (*cb.Block, cb.Status)
// Close releases resources acquired by the Iterator
Close()
}

Fabric有3种类型的账本:ram、json和file,它们都实现了这个接口,这里主要是为了辅助解释事件机制,我们看一个最简单的:ram的实现。

Next()拿到的区块是从simpleList.SetNext()存进去的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (cu *cursor) Next() (*cb.Block, cb.Status) {
// This only loops once, as signal reading indicates non-nil next
// 实际只执行1次
for {
// 拿到区块
next := cu.list.getNext()
if next != nil {
cu.list = next
return cu.list.block, cb.Status_SUCCESS
}
<-cu.list.signal
}
}

func (s *simpleList) getNext() *simpleList {
s.lock.RLock()
defer s.lock.RUnlock()
return s.next
}

// 设置
func (s *simpleList) setNext(n *simpleList) {
s.lock.Lock()
defer s.lock.Unlock()
s.next = n
}

Append()是账本对外提供的接口,当要把区块追加到账本时,会调用此函数,该函数会调用setNext()设置待追加的区块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Append appends a new block to the ledger
func (rl *ramLedger) Append(block *cb.Block) error {
rl.lock.Lock()
defer rl.lock.Unlock()
// ...
rl.appendBlock(block)
return nil
}

func (rl *ramLedger) appendBlock(block *cb.Block) {
next := &simpleList{
signal: make(chan struct{}),
block: block,
}
// 设置最新的区块
rl.newest.setNext(next)
// ...
}

发送事件消息

blockResponseSender.SendBlockResponse是BlockEvent的事件发送函数,实际就是调用gRPC生成的函数。

blockResponseSender是在server.Deliver中创建的,它实际就是peer.Deliver_DeliverServer

1
2
3
4
5
6
7
8
9
10
11
12
// blockResponseSender structure used to send block responses
type blockResponseSender struct {
peer.Deliver_DeliverServer
}

// SendBlockResponse generates deliver response with block message
func (brs *blockResponseSender) SendBlockResponse(block *common.Block) error {
response := &peer.DeliverResponse{
Type: &peer.DeliverResponse_Block{Block: block},
}
return brs.Send(response)
}

Go SDK源码

社区正在重构fabric-sdk-go,所以这里不着重介绍sdk的源码了,提醒几个重要的点,可能以后还有。

DeliverDeliverFiltered被封装成了2个全局函数:

1
2
3
4
5
6
7
8
9
10
11
var (
// Deliver creates a Deliver stream
Deliver = func(client pb.DeliverClient) (deliverStream, error) {
return client.Deliver(context.Background())
}

// DeliverFiltered creates a DeliverFiltered stream
DeliverFiltered = func(client pb.DeliverClient) (deliverStream, error) {
return client.DeliverFiltered(context.Background())
}
)

它们会被调用,进一步封装成provider,provider会为dispatch服务:

1
2
3
4
5
6
7
8
9
10
11
12
// deliverProvider is the connection provider used for connecting to the Deliver service
var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
if peer == nil {
return nil, errors.New("Peer is nil")
}

eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
}
return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL(), eventEndpoint.Opts()...)
}

Dispatcher

Dispatcher会保存BlockEvent和FilteredBlockEvent的注册,以及用2个maptxRegistrationsccRegistrations保存交易和Chaincode Event的注册,handlers是各种注册事件的处理函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Dispatcher is responsible for handling all events, including connection and registration events originating from the client,
// and events originating from the channel event service. All events are processed in a single Go routine
// in order to avoid any race conditions and to ensure that events are processed in the order in which they are received.
// This also avoids the need for synchronization.
// The lastBlockNum member MUST be first to ensure it stays 64-bit aligned on 32-bit machines.
type Dispatcher struct {
lastBlockNum uint64 // Must be first, do not move
params
updateLastBlockInfoOnly bool
state int32
eventch chan interface{}
blockRegistrations []*BlockReg
filteredBlockRegistrations []*FilteredBlockReg
handlers map[reflect.Type]Handler
txRegistrations map[string]*TxStatusReg
ccRegistrations map[string]*ChaincodeReg
}

注册事件

这是Dispatcher的事件注册函数,在它眼里,不止有4个事件:

1
2
3
4
5
6
7
8
9
10
// RegisterHandler registers an event handler
func (ed *Dispatcher) RegisterHandler(t interface{}, h Handler) {
htype := reflect.TypeOf(t)
if _, ok := ed.handlers[htype]; !ok {
logger.Debugf("Registering handler for %s on dispatcher %T", htype, ed)
ed.handlers[htype] = h
} else {
logger.Debugf("Cannot register handler %s on dispatcher %T since it's already registered", htype, ed)
}
}

注册各注册事件的处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// RegisterHandlers registers all of the handlers by event type
func (ed *Dispatcher) RegisterHandlers() {
ed.RegisterHandler(&RegisterChaincodeEvent{}, ed.handleRegisterCCEvent)
ed.RegisterHandler(&RegisterTxStatusEvent{}, ed.handleRegisterTxStatusEvent)
ed.RegisterHandler(&RegisterBlockEvent{}, ed.handleRegisterBlockEvent)
ed.RegisterHandler(&RegisterFilteredBlockEvent{}, ed.handleRegisterFilteredBlockEvent)
ed.RegisterHandler(&UnregisterEvent{}, ed.handleUnregisterEvent)
ed.RegisterHandler(&StopEvent{}, ed.HandleStopEvent)
ed.RegisterHandler(&TransferEvent{}, ed.HandleTransferEvent)
ed.RegisterHandler(&StopAndTransferEvent{}, ed.HandleStopAndTransferEvent)
ed.RegisterHandler(&RegistrationInfoEvent{}, ed.handleRegistrationInfoEvent)

// The following events are used for testing only
ed.RegisterHandler(&fab.BlockEvent{}, ed.handleBlockEvent)
ed.RegisterHandler(&fab.FilteredBlockEvent{}, ed.handleFilteredBlockEvent)
}

接收Peer事件

handleEvent用来处理来自Peer的事件,不同的类型调用不同的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (ed *Dispatcher) handleEvent(e esdispatcher.Event) {
delevent := e.(*connection.Event)
evt := delevent.Event.(*pb.DeliverResponse)
switch response := evt.Type.(type) {
case *pb.DeliverResponse_Status:
ed.handleDeliverResponseStatus(response)
case *pb.DeliverResponse_Block:
ed.HandleBlock(response.Block, delevent.SourceURL)
case *pb.DeliverResponse_FilteredBlock:
ed.HandleFilteredBlock(response.FilteredBlock, delevent.SourceURL)
default:
logger.Errorf("handler not found for deliver response type %T", response)
}
}

HandleBlock把Event封装是BlockEvent退给应用。可以看到BlockEvent也会发布FilteredBlockEvent。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// HandleBlock handles a block event
func (ed *Dispatcher) HandleBlock(block *cb.Block, sourceURL string) {
logger.Debugf("Handling block event - Block #%d", block.Header.Number)

if err := ed.updateLastBlockNum(block.Header.Number); err != nil {
logger.Error(err.Error())
return
}

if ed.updateLastBlockInfoOnly {
ed.updateLastBlockInfoOnly = false
return
}

logger.Debug("Publishing block event...")
ed.publishBlockEvents(block, sourceURL)
ed.publishFilteredBlockEvents(toFilteredBlock(block), sourceURL)
}

func (ed *Dispatcher) publishBlockEvents(block *cb.Block, sourceURL string) {
for _, reg := range ed.blockRegistrations {
if !reg.Filter(block) {
logger.Debugf("Not sending block event for block #%d since it was filtered out.", block.Header.Number)
continue
}

if ed.eventConsumerTimeout < 0 {
select {
case reg.Eventch <- NewBlockEvent(block, sourceURL):
default:
logger.Warn("Unable to send to block event channel.")
}
} else if ed.eventConsumerTimeout == 0 {
reg.Eventch <- NewBlockEvent(block, sourceURL)
} else {
select {
case reg.Eventch <- NewBlockEvent(block, sourceURL):
case <-time.After(ed.eventConsumerTimeout):
logger.Warn("Timed out sending block event.")
}
}
}
}

FilteredBlockEvent能解析出TransactionEvent和ChaincodeEvent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (ed *Dispatcher) publishFilteredBlockEvents(fblock *pb.FilteredBlock, sourceURL string) {
if fblock == nil {
logger.Warn("Filtered block is nil. Event will not be published")
return
}

logger.Debugf("Publishing filtered block event: %#v", fblock)

checkFilteredBlockRegistrations(ed, fblock, sourceURL)

for _, tx := range fblock.FilteredTransactions {
// 发布交易订阅
ed.publishTxStatusEvents(tx, fblock.Number, sourceURL)

// Only send a chaincode event if the transaction has committed
if tx.TxValidationCode == pb.TxValidationCode_VALID {
txActions := tx.GetTransactionActions()
if txActions == nil {
continue
}
if len(txActions.ChaincodeActions) == 0 {
logger.Debugf("No chaincode action found for TxID[%s], block[%d], source URL[%s]", tx.Txid, fblock.Number, sourceURL)
}
for _, action := range txActions.ChaincodeActions {
if action.ChaincodeEvent != nil {
// 发布chaincode event订阅
ed.publishCCEvents(action.ChaincodeEvent, fblock.Number, sourceURL)
}
}
} else {
logger.Debugf("Cannot publish CCEvents for block[%d] and source URL[%s] since Tx Validation Code[%d] is not valid", fblock.Number, sourceURL, tx.TxValidationCode)
}
}
}

func (ed *Dispatcher) publishTxStatusEvents(tx *pb.FilteredTransaction, blockNum uint64, sourceURL string) {
logger.Debugf("Publishing Tx Status event for TxID [%s]...", tx.Txid)
if reg, ok := ed.txRegistrations[tx.Txid]; ok {
logger.Debugf("Sending Tx Status event for TxID [%s] to registrant...", tx.Txid)

if ed.eventConsumerTimeout < 0 {
select {
case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL):
default:
logger.Warn("Unable to send to Tx Status event channel.")
}
} else if ed.eventConsumerTimeout == 0 {
reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL)
} else {
select {
case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL):
case <-time.After(ed.eventConsumerTimeout):
logger.Warn("Timed out sending Tx Status event.")
}
}
}
}

func (ed *Dispatcher) publishCCEvents(ccEvent *pb.ChaincodeEvent, blockNum uint64, sourceURL string) {
for _, reg := range ed.ccRegistrations {
logger.Debugf("Matching CCEvent[%s,%s] against Reg[%s,%s] ...", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter)
if reg.ChaincodeID == ccEvent.ChaincodeId && reg.EventRegExp.MatchString(ccEvent.EventName) {
logger.Debugf("... matched CCEvent[%s,%s] against Reg[%s,%s]", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter)

if ed.eventConsumerTimeout < 0 {
select {
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL):
default:
logger.Warn("Unable to send to CC event channel.")
}
} else if ed.eventConsumerTimeout == 0 {
reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL)
} else {
select {
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL):
case <-time.After(ed.eventConsumerTimeout):
logger.Warn("Timed out sending CC event.")
}
}
}
}
}

总结

本文介绍了:

  1. Peer支持的2类Even,
  2. Peer是如何支持事件订阅,和发送事件的,
  3. SDK支持的4类Event,这4类Event和Peer的2类Event的关系
  4. SDK和Peer之间的gRPC通信

更多SDK事件的使用,请参考文档

Fabric事件介绍的[官方文档(https://stone-fabric.readthedocs.io/zh/latest/peer_event_services.html)。

Fabric在examples中还提供了一个eventclient样例,看这个样例更有助于理解Fabric event的原理,以及是如何交互的。

前言

一份Peer节点启动的INFO级别日志如下,可以发现:

  1. 先注册了scc目录下的lscc, cscc, qscc,未注册chaincode目录下的lifecycle
  2. 然后又依次部署了上述scc。

本文的目的就是梳理出,系统链码的部署流程,这是peer节点提供背书、链码管理、配置、查询等功能的基础。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2019-09-09 07:52:09.409 UTC [gossip.gossip] start -> INFO 013 Gossip instance peer1.org1.example.com:8051 started
2019-09-09 07:52:09.418 UTC [sccapi] deploySysCC -> INFO 014 system chaincode lscc/(github.com/hyperledger/fabric/core/scc/lscc) deployed
2019-09-09 07:52:09.420 UTC [cscc] Init -> INFO 015 Init CSCC
2019-09-09 07:52:09.422 UTC [sccapi] deploySysCC -> INFO 016 system chaincode cscc/(github.com/hyperledger/fabric/core/scc/cscc) deployed
2019-09-09 07:52:09.424 UTC [qscc] Init -> INFO 017 Init QSCC
2019-09-09 07:52:09.424 UTC [sccapi] deploySysCC -> INFO 018 system chaincode qscc/(github.com/hyperledger/fabric/core/scc/qscc) deployed
2019-09-09 07:52:09.425 UTC [sccapi] deploySysCC -> INFO 019 system chaincode (+lifecycle,github.com/hyperledger/fabric/core/chaincode/lifecycle) disabled
...
2019-09-09 07:52:14.386 UTC [sccapi] deploySysCC -> INFO 031 system chaincode lscc/mychannel(github.com/hyperledger/fabric/core/scc/lscc) deployed
2019-09-09 07:52:14.386 UTC [cscc] Init -> INFO 032 Init CSCC
2019-09-09 07:52:14.386 UTC [sccapi] deploySysCC -> INFO 033 system chaincode cscc/mychannel(github.com/hyperledger/fabric/core/scc/cscc) deployed
2019-09-09 07:52:14.387 UTC [qscc] Init -> INFO 034 Init QSCC
2019-09-09 07:52:14.387 UTC [sccapi] deploySysCC -> INFO 035 system chaincode qscc/mychannel(github.com/hyperledger/fabric/core/scc/qscc) deployed
2019-09-09 07:52:14.387 UTC [sccapi] deploySysCC -> INFO 036 system chaincode (+lifecycle,github.com/hyperledger/fabric/core/chaincode/lifecycle) disabled

宏观流程

提醒,本文使用SCC代指系统链码,使用scc代指core.scc模块。

在介绍源码之前,先给出总体流程,以便看源码的时候不会迷失。

部署SCC会涉及到4个模块:

  1. peer.node,它是peer的主程序,可以调用core.scc进行注册和部署SCC
  2. core.scc,它包含了lscc、qscc、cscc这3个scc,以及SCC的注册和部署
  3. core.chaincode,它是链码管理,普通链码和SCC都会走该模块,去部署和调用链码,和链码容器交互,并且它还提供了1个链码容器的工具shim
  4. core.container,它是实现链码容器,有2种链码容器,SCC使用的InprocVM,和普通链码使用的DockerVM

注册和部署的简要流程如下:

  1. peer运行启动程序
  2. 注册scc
    1. peer.node创建好lscc、cscc、qscc等scc实例,以及从配置文件读取的scc
    2. peer.node调用core.scc依次注册每一个scc实例
    3. core.scc调用core.container把scc实例信息注册到container
  3. 部署scc
    1. peer.node调用core.scc依次部署每一个注册的scc
    2. core.scc部署scc的流程复用普通链码部署流程,调用core.chaincode
    3. core.chaincode执行启动链码容器,scc也有链码容器是Inproc类型,不是Docker类型
    4. core.chaincode会调用core.container建立scc的Inproc容器实例
    5. core.container调用core.chaincode.shim启动容器内的程序,并负责和peer通信
    6. 启动完成后,core.chaincode向容器发送Init消息,让容器初始化,容器初始化完成会发送响应消息给core.chaincode,core.chaincode部署scc完成

总流程

列出源码的过程,会省略大量不相关代码,用...代替。

peer启动过程中,会调用node.serve,其中包含了为系统链码注册SCC和部署SCC。之后,还会为应用通道部署SCC,说明每个通道有各自的SCC,这里省略掉这部分。

1
2
3
4
5
6
7
8
9
10
11
12
func serve(args []string) error {
...
// 获取support,会注册SCC
// Initialize chaincode service
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
...
// 为系统通道部署已经注册的SCC
// deploy system chaincodes
sccp.DeploySysCCs("", ccp)
logger.Infof("Deployed system chaincodes")
...
}

注册SCC

注册SCC的流程:

peer.node -> core.scc -> core.container

peer.node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// startChaincodeServer will finish chaincode related initialization, including:
// 1) setup local chaincode install path
// 2) create chaincode specific tls CA
// 3) start the chaincode specific gRPC listening service
func startChaincodeServer(
peerHost string,
aclProvider aclmgmt.ACLProvider,
pr *platforms.Registry,
ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider, *persistence.PackageProvider) {
...
// 会注册SCC
chaincodeSupport, ccp, sccp := registerChaincodeSupport(
ccSrv,
ccEndpoint,
ca,
packageProvider,
aclProvider,
pr,
lifecycleSCC,
ops,
)
go ccSrv.Start()
return chaincodeSupport, ccp, sccp, packageProvider
}

func registerChaincodeSupport(
grpcServer *comm.GRPCServer,
ccEndpoint string,
ca tlsgen.CA,
packageProvider *persistence.PackageProvider,
aclProvider aclmgmt.ACLProvider,
pr *platforms.Registry,
lifecycleSCC *lifecycle.SCC,
ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {
...
// SCC的VM provider
ipRegistry := inproccontroller.NewRegistry()

// 创建SCC provider
sccp := scc.NewProvider(peer.Default, peer.DefaultSupport, ipRegistry)
// 创建lscc实例
lsccInst := lscc.New(sccp, aclProvider, pr)

// 普通链码,docker容器类型的VM provider
dockerProvider := dockercontroller.NewProvider(
viper.GetString("peer.id"),
viper.GetString("peer.networkId"),
ops.Provider,
)
dockerVM := dockercontroller.NewDockerVM(
dockerProvider.PeerID,
dockerProvider.NetworkID,
dockerProvider.BuildMetrics,
)
...
chaincodeSupport := chaincode.NewChaincodeSupport(
chaincode.GlobalConfig(),
ccEndpoint,
userRunsCC,
ca.CertBytes(),
authenticator,
packageProvider,
lsccInst, // chaincodeSupport的声明周期管理使用了lscc,而不是lifecycle
aclProvider,
container.NewVMController(
map[string]container.VMProvider{
dockercontroller.ContainerType: dockerProvider,
inproccontroller.ContainerType: ipRegistry,
},
),
sccp,
pr,
peer.DefaultSupport,
ops.Provider,
)
ipRegistry.ChaincodeSupport = chaincodeSupport
// chaincode provider,可以用来创建cscc
ccp := chaincode.NewProvider(chaincodeSupport)
...
// 创建cscc、qscc
csccInst := cscc.New(ccp, sccp, aclProvider)
qsccInst := qscc.New(aclProvider)

//Now that chaincode is initialized, register all system chaincodes.
sccs := scc.CreatePluginSysCCs(sccp)
// 加入lscc、cscc、qscc
// lifecycleSCC在1.4中disable了
// sccs是用户自定义的系统链码
for _, cc := range append([]scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC}, sccs...) {
// 注册每一个SCC
sccp.RegisterSysCC(cc)
}
...
return chaincodeSupport, ccp, sccp
}

core.scc

注册某1个系统合约。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Provider implements sysccprovider.SystemChaincodeProvider
type Provider struct {
Peer peer.Operations
PeerSupport peer.Support
Registrar Registrar // 注册
SysCCs []SelfDescribingSysCC // 注册的scc,包含失败的
}

// RegisterSysCC registers a system chaincode with the syscc provider.
func (p *Provider) RegisterSysCC(scc SelfDescribingSysCC) {
// 收集/注册scc到scc provider
p.SysCCs = append(p.SysCCs, scc)
_, err := p.registerSysCC(scc)
if err != nil {
sysccLogger.Panicf("Could not register system chaincode: %s", err)
}
}

// registerSysCC registers the given system chaincode with the peer
func (p *Provider) registerSysCC(syscc SelfDescribingSysCC) (bool, error) {
// 检测该scc是否开启或不在白名单
if !syscc.Enabled() || !isWhitelisted(syscc) {
sysccLogger.Info(fmt.Sprintf("system chaincode (%s,%s,%t) disabled", syscc.Name(), syscc.Path(), syscc.Enabled()))
return false, nil
}

// XXX This is an ugly hack, version should be tied to the chaincode instance, not he peer binary
version := util.GetSysCCVersion()

// cc的描述信息
ccid := &ccintf.CCID{
Name: syscc.Name(),
Version: version,
}
// 注册scc的chaincode
err := p.Registrar.Register(ccid, syscc.Chaincode())
if err != nil {
//if the type is registered, the instance may not be... keep going
if _, ok := err.(inproccontroller.SysCCRegisteredErr); !ok {
errStr := fmt.Sprintf("could not register (%s,%v): %s", syscc.Path(), syscc, err)
sysccLogger.Error(errStr)
return false, fmt.Errorf(errStr)
}
}

sysccLogger.Infof("system chaincode %s(%s) registered", syscc.Name(), syscc.Path())
return true, err
}

// Registrar provides a way for system chaincodes to be registered
type Registrar interface {
// Register registers a system chaincode
Register(ccid *ccintf.CCID, cc shim.Chaincode) error
}

core.container

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//Register registers system chaincode with given path. The deploy should be called to initialize
func (r *Registry) Register(ccid *ccintf.CCID, cc shim.Chaincode) error {
r.mutex.Lock()
defer r.mutex.Unlock()

// 注册系统链码
name := ccid.GetName()
inprocLogger.Debugf("Registering chaincode instance: %s", name)
tmp := r.typeRegistry[name]
if tmp != nil {
return SysCCRegisteredErr(name)
}

r.typeRegistry[name] = &inprocContainer{chaincode: cc}
return nil
}


// Registry stores registered system chaincodes.
// It implements container.VMProvider and scc.Registrar
type Registry struct {
mutex sync.Mutex
typeRegistry map[string]*inprocContainer // 已注册链码映射
instRegistry map[string]*inprocContainer // 链码示例映射

ChaincodeSupport ccintf.CCSupport
}

部署SCC

部署SCC的流程:

peer.node -> core.scc -> core.chaincode -> core.container

peer.node

1
2
3
4
5
6
7
8
func serve(args []string) error {
...
// 为系统通道部署已经注册的SCC
// deploy system chaincodes
sccp.DeploySysCCs("", ccp)
logger.Infof("Deployed system chaincodes")
...
}

core.scc

DeploySysCCs会为chainID对应的channel,部署注册过程中收集的每一个SCC,它们在p.SysCCs中。

部署链码实际是一笔交易,为了复用普通链码的部署流程,core.scc使用deploySysCC封装部署链码需要的参数,链码是实际部署,走core.chaincode流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//DeploySysCCs is the hook for system chaincodes where system chaincodes are registered with the fabric
//note the chaincode must still be deployed and launched like a user chaincode will be
func (p *Provider) DeploySysCCs(chainID string, ccp ccprovider.ChaincodeProvider) {
// 部署每一个scc
for _, sysCC := range p.SysCCs {
deploySysCC(chainID, ccp, sysCC)
}
}

// deploySysCC deploys the given system chaincode on a chain
func deploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC) error {
// disable或不在白名单的scc不执行部署
if !syscc.Enabled() || !isWhitelisted(syscc) {
sysccLogger.Info(fmt.Sprintf("system chaincode (%s,%s) disabled", syscc.Name(), syscc.Path()))
return nil
}

// 为scc生成txid,因为部署链码的过程需要txParams,与普通链码的流程相同
txid := util.GenerateUUID()

// Note, this structure is barely initialized,
// we omit the history query executor, the proposal
// and the signed proposal
txParams := &ccprovider.TransactionParams{
TxID: txid,
ChannelID: chainID,
}

// 设置交易执行模拟器,系统通道chainID为"",所以系统通道的scc没有模拟器
if chainID != "" {
// 获取链/通道的账本
lgr := peer.GetLedger(chainID)
if lgr == nil {
panic(fmt.Sprintf("syschain %s start up failure - unexpected nil ledger for channel %s", syscc.Name(), chainID))
}

// 根据交易id创建链码模拟器
txsim, err := lgr.NewTxSimulator(txid)
if err != nil {
return err
}

// 指定链码执行模拟器
txParams.TXSimulator = txsim
defer txsim.Done()
}

chaincodeID := &pb.ChaincodeID{Path: syscc.Path(), Name: syscc.Name()}
spec := &pb.ChaincodeSpec{Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]), ChaincodeId: chaincodeID, Input: &pb.ChaincodeInput{Args: syscc.InitArgs()}}

// ChaincodeDeploymentSpec_SYSTEM标明:部署SCC
chaincodeDeploymentSpec := &pb.ChaincodeDeploymentSpec{ExecEnv: pb.ChaincodeDeploymentSpec_SYSTEM, ChaincodeSpec: spec}

// XXX This is an ugly hack, version should be tied to the chaincode instance, not he peer binary
version := util.GetSysCCVersion()

cccid := &ccprovider.CCContext{
Name: chaincodeDeploymentSpec.ChaincodeSpec.ChaincodeId.Name,
Version: version,
}

// 部署SCC
resp, _, err := ccprov.ExecuteLegacyInit(txParams, cccid, chaincodeDeploymentSpec)
if err == nil && resp.Status != shim.OK {
err = errors.New(resp.Message)
}

sysccLogger.Infof("system chaincode %s/%s(%s) deployed", syscc.Name(), chainID, syscc.Path())

return err
}


// ChaincodeProvider provides an abstraction layer that is
// used for different packages to interact with code in the
// chaincode package without importing it; more methods
// should be added below if necessary
type ChaincodeProvider interface {
// Execute executes a standard chaincode invocation for a chaincode and an input
Execute(txParams *TransactionParams, cccid *CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error)
// ExecuteLegacyInit is a special case for executing chaincode deployment specs,
// which are not already in the LSCC, needed for old lifecycle
ExecuteLegacyInit(txParams *TransactionParams, cccid *CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error)
// Stop stops the chaincode give
Stop(ccci *ChaincodeContainerInfo) error
}

core.chaincode

CCProviderImpl实现了ChaincodeProvider接口,可以用来部署链码,ExecuteLegacyInit会执行2项:

  1. 启动链码容器
  2. 执行链码Init函数,链码容器启动后,peer和链码容器通过消息通信,ChaincodeMessage_INIT是执行链码容器的Init函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// ExecuteLegacyInit executes a chaincode which is not in the LSCC table
func (c *CCProviderImpl) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) {
return c.cs.ExecuteLegacyInit(txParams, cccid, spec)
}


// ExecuteLegacyInit is a temporary method which should be removed once the old style lifecycle
// is entirely deprecated. Ideally one release after the introduction of the new lifecycle.
// It does not attempt to start the chaincode based on the information from lifecycle, but instead
// accepts the container information directly in the form of a ChaincodeDeploymentSpec.
func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) {
// 部署链码需要的信息
ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec)
ccci.Version = cccid.Version

// 启动容器
err := cs.LaunchInit(ccci)
if err != nil {
return nil, nil, err
}

cname := ccci.Name + ":" + ccci.Version
h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname)
}

// 调用链码Init
resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h)
return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}

LaunchInit是启动容器的一层检查,实际启动由Launcher.Launch完成。启动链码容器是异步的,会创建单独的goroutine去执行。

core.chaincode使用Runtime接口操控链码容器的启停。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// LaunchInit bypasses getting the chaincode spec from the LSCC table
// as in the case of v1.0-v1.2 lifecycle, the chaincode will not yet be
// defined in the LSCC table
func (cs *ChaincodeSupport) LaunchInit(ccci *ccprovider.ChaincodeContainerInfo) error {
cname := ccci.Name + ":" + ccci.Version
// 已经有handler,即容器已经启动。调用链码的时候,也会获取handler
if cs.HandlerRegistry.Handler(cname) != nil {
return nil
}

// 否则启动容器,设置handler
return cs.Launcher.Launch(ccci)
}

func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
var startFailCh chan error
var timeoutCh <-chan time.Time

startTime := time.Now()
cname := ccci.Name + ":" + ccci.Version
launchState, alreadyStarted := r.Registry.Launching(cname)
// 链码容器未启动,启动容器
if !alreadyStarted {
startFailCh = make(chan error, 1)
timeoutCh = time.NewTimer(r.StartupTimeout).C

codePackage, err := r.getCodePackage(ccci)
if err != nil {
return err
}

go func() {
// 启动容器
if err := r.Runtime.Start(ccci, codePackage); err != nil {
startFailCh <- errors.WithMessage(err, "error starting container")
return
}
exitCode, err := r.Runtime.Wait(ccci)
if err != nil {
launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
}
launchState.Notify(errors.Errorf("container exited with %d", exitCode))
}()
}
...
}

// Runtime is used to manage chaincode runtime instances.
type Runtime interface {
Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error
Stop(ccci *ccprovider.ChaincodeContainerInfo) error
Wait(ccci *ccprovider.ChaincodeContainerInfo) (int, error)
}

ContainerRuntime是core.chaincode封装出来和core.container交互的,在这里它会创建启动链码请求,交给container。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Start launches chaincode in a runtime environment.
func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error {
cname := ccci.Name + ":" + ccci.Version

lc, err := c.LaunchConfig(cname, ccci.Type)
if err != nil {
return err
}

chaincodeLogger.Debugf("start container: %s", cname)
chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " "))
chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(lc.Envs, "\n\t"))

// 启动链码的请求
scr := container.StartContainerReq{
Builder: &container.PlatformBuilder{
Type: ccci.Type,
Name: ccci.Name,
Version: ccci.Version,
Path: ccci.Path,
CodePackage: codePackage,
PlatformRegistry: c.PlatformRegistry,
},
Args: lc.Args,
Env: lc.Envs,
FilesToUpload: lc.Files,
CCID: ccintf.CCID{
Name: ccci.Name,
Version: ccci.Version,
},
}

// 处理容器操作请求
if err := c.Processor.Process(ccci.ContainerType, scr); err != nil {
return errors.WithMessage(err, "error starting container")
}

return nil
}


// Processor processes vm and container requests.
type Processor interface {
Process(vmtype string, req container.VMCReq) error
}

core.container

VMController实现了Processor,它会按指定的类型建立虚拟机,明明就是容器,为啥内部又叫VM,VM有2种:

  1. InprocVM,意思是运行在单独进程中的虚拟机,但不是指操作系统的进程,而是指一个隔离的环境,SCC是这类。
  2. DockerVM,指利用Docker启动的容器,普通链码就是这类。

类型是存在ccci.ContainerType中的,ccci包含了部署链码所需要的信息,这个信息在core.chaincode很早就获取到了,可以往前翻。

Process就是创建VM,然后利用VM处理请求的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 根据请求对VM进行某种操作
func (vmc *VMController) Process(vmtype string, req VMCReq) error {
// 创建vm
v := vmc.newVM(vmtype)
ccid := req.GetCCID()
id := ccid.GetName()

vmc.lockContainer(id)
defer vmc.unlockContainer(id)

// 把vm传递给请求,即用该vm执行请求内容
return req.Do(v)
}

虚拟机创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 利用指定类型的vm provider创建vm
func (vmc *VMController) newVM(typ string) VM {
v, ok := vmc.vmProviders[typ]
if !ok {
vmLogger.Panicf("Programming error: unsupported VM type: %s", typ)
}
return v.NewVM()
}

// NewVMController creates a new instance of VMController
func NewVMController(vmProviders map[string]VMProvider) *VMController {
return &VMController{
containerLocks: make(map[string]*refCountedLock),
vmProviders: vmProviders,
}
}

创建VM需要使用NewVMController,回过去找它的创建地方。

在注册SCC的过程中,调用registerChaincodeSupport创建了chaincodeSupport,其中一个字段为创建NewVMController,就包含了2类Vm provider:

  1. ipRegistry,SCC的
  2. dockerProvider,普通链码的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func registerChaincodeSupport(
grpcServer *comm.GRPCServer,
ccEndpoint string,
ca tlsgen.CA,
packageProvider *persistence.PackageProvider,
aclProvider aclmgmt.ACLProvider,
pr *platforms.Registry,
lifecycleSCC *lifecycle.SCC,
ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {
...
// SCC的VM provider
ipRegistry := inproccontroller.NewRegistry()
...
// 普通链码,docker容器类型的VM provider
dockerProvider := dockercontroller.NewProvider(
viper.GetString("peer.id"),
viper.GetString("peer.networkId"),
ops.Provider,
)
...
chaincodeSupport := chaincode.NewChaincodeSupport(
chaincode.GlobalConfig(),
ccEndpoint,
userRunsCC,
ca.CertBytes(),
authenticator,
packageProvider,
lsccInst, // chaincodeSupport的声明周期管理使用了lscc,而不是lifecycle
aclProvider,
// 创建了VM controller,controller提供了inproc和docker 2中子controller,
// 即2中链码运行方式
container.NewVMController(
map[string]container.VMProvider{
dockercontroller.ContainerType: dockerProvider,
inproccontroller.ContainerType: ipRegistry,
},
),
sccp,
pr,
peer.DefaultSupport,
ops.Provider,
)
...
}

VM处理操作虚拟机的请求

core.container的请求,都实现了VMCReq接口,StartContainerReq、StopContainerReq、WaitContainerReq是实现VMCReq的3类请求。

启动实际是启动虚拟机接口,处理请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//VMCReq - all requests should implement this interface.
//The context should be passed and tested at each layer till we stop
//note that we'd stop on the first method on the stack that does not
//take context
type VMCReq interface {
Do(v VM) error
GetCCID() ccintf.CCID
}


// 启动容器
func (si StartContainerReq) Do(v VM) error {
return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}

//VM is an abstract virtual image for supporting arbitrary virual machines
type VM interface {
Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder Builder) error
Stop(ccid ccintf.CCID, timeout uint, dontkill bool, dontremove bool) error
Wait(ccid ccintf.CCID) (int, error)
HealthCheck(context.Context) error
}

DockerVM和InprocVM都实现了VM接口,本文只关注InprocVM类型,即SCC的。

InprocVM会得到一个容器实例ipc,用它来运行SCC。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//Start starts a previously registered system codechain
func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
path := ccid.GetName()

ipctemplate := vm.registry.getType(path)
if ipctemplate == nil {
return fmt.Errorf(fmt.Sprintf("%s not registered", path))
}

// 即ccid.Name
instName := vm.GetVMName(ccid)

// 获取容器实例
ipc, err := vm.getInstance(ipctemplate, instName, args, env)
if err != nil {
return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName))
}

// 已经在运行了,还部署个啥!
if ipc.running {
return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
}

ipc.running = true

go func() {
defer func() {
if r := recover(); r != nil {
inprocLogger.Criticalf("caught panic from chaincode %s", instName)
}
}()
// 启动进程级容器
ipc.launchInProc(instName, args, env)
}()

return nil
}

inprocContainer开启2个goroutine:

  1. 第一个调用shimStartInProc,即利用core.chaincode.shim启动InProc类型的容器。
  2. 第二个调用HandleChaincodeStream,处理peer和Inproc容器间的通信数据,此处的stream是peer端的。

这里可以看到创建了2个通道peerRcvCCSendccRcvPeerSend,它们表明了peer和scc的链码容器是通过通道直接通信的。peer和docker链码容器之间是走gRPC通信的,这个到普通链码的时候再介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 从进程启动链码
func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
if ipc.ChaincodeSupport == nil {
inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
}

// 和调用链码的上层通信的2个通道
peerRcvCCSend := make(chan *pb.ChaincodeMessage)
ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
var err error
ccchan := make(chan struct{}, 1)
ccsupportchan := make(chan struct{}, 1)
shimStartInProc := _shimStartInProc // shadow to avoid race in test
go func() {
defer close(ccchan)
// 启动链码
inprocLogger.Debugf("chaincode started for %s", id)
if args == nil {
args = ipc.args
}
if env == nil {
env = ipc.env
}
// 利用shim启动
err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
if err != nil {
err = fmt.Errorf("chaincode-support ended with err: %s", err)
_inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)
}()

// shadow function to avoid data race
inprocLoggerErrorf := _inprocLoggerErrorf
go func() {
defer close(ccsupportchan)
// 处理scc和外部通信的消息流
inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
inprocLogger.Debugf("chaincode-support started for %s", id)
err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
if err != nil {
err = fmt.Errorf("chaincode ended with err: %s", err)
inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
}()
}

利用shim启动Inproc链码容器中的程序

shim是chaincode提供给容器,运行链码的工具,它运行在容器里。

利用shim启动InprocVM使用的函数是StartInProc,提取一些运行链码需要的数据,比如又一个stream,此处的stream是容器端的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 启动SCC的入口
// StartInProc is an entry point for system chaincodes bootstrap. It is not an
// API for chaincodes.
func StartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) error {
// 有点奇怪,这些日志都没有看到,因为已经在shim,不属于peer日志了
chaincodeLogger.Debugf("in proc %v", args)

// 从环境变量获取cc name
var chaincodename string
for _, v := range env {
if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 {
p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=")
chaincodename = p[1]
break
}
}
if chaincodename == "" {
return errors.New("error chaincode id not provided")
}

// 创建peer和chaincode通信的通道
stream := newInProcStream(recv, send)
chaincodeLogger.Debugf("starting chat with peer using name=%s", chaincodename)
// 与peer进行通信
err := chatWithPeer(chaincodename, stream, cc)
return err
}

chatWithPeer是通用的,普通的链码也调用这个程序。它创建了一个handler,用来处理消息(发送和接收),以及操作(调用)链码。

这个过程,它会向peer发送REGISTER消息,和peer先“握手”,也会从peer读消息,消息的处理函数就是里面的for循环,这样链码容器就运行起来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 通用,SCC和CC都使用这个函数
func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {
// 把stream和cc交给handler,handler可以发送和接收数据,即读写通道
// Create the shim handler responsible for all control logic
handler := newChaincodeHandler(stream, cc)
defer stream.CloseSend()

// Send the ChaincodeID during register.
chaincodeID := &pb.ChaincodeID{Name: chaincodename}
payload, err := proto.Marshal(chaincodeID)
if err != nil {
return errors.Wrap(err, "error marshalling chaincodeID during chaincode registration")
}

// 在stream上向peer发送注册消息
// Register on the stream
chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil {
return errors.WithMessage(err, "error sending chaincode REGISTER")
}

// holds return values from gRPC Recv below
type recvMsg struct {
msg *pb.ChaincodeMessage
err error
}
msgAvail := make(chan *recvMsg, 1)
errc := make(chan error)

receiveMessage := func() {
in, err := stream.Recv()
msgAvail <- &recvMsg{in, err}
}

// 异步读取1个消息
go receiveMessage()

// 循环处理peer发送的消息
for {
select {
case rmsg := <-msgAvail:
switch {
case rmsg.err == io.EOF:
err = errors.Wrapf(rmsg.err, "received EOF, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return err
case rmsg.err != nil:
err := errors.Wrap(rmsg.err, "receive failed")
chaincodeLogger.Errorf("Received error from server, ending chaincode stream: %+v", err)
return err
case rmsg.msg == nil:
err := errors.New("received nil message, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return err
default:
// 处理消息
chaincodeLogger.Debugf("[%s]Received message %s from peer", shorttxid(rmsg.msg.Txid), rmsg.msg.Type)
err := handler.handleMessage(rmsg.msg, errc)
if err != nil {
err = errors.WithMessage(err, "error handling message")
return err
}

// 读取下一个消息
go receiveMessage()
}

case sendErr := <-errc:
if sendErr != nil {
err := errors.Wrap(sendErr, "error sending")
return err
}
}
}
}

具体的消息处理函数,先跳过,回过头来,关注scc容器和peer的通信。

SCC和Peer的通信通道

链码容器和Peer之间使用Stream进行通信,Stream有2种实现:

  1. 使用channel封装的Stream
  2. gRPC的Stream

链码容器和Peer通信的接口是:

1
2
3
4
5
6
// PeerChaincodeStream interface for stream between Peer and chaincode instance.
type PeerChaincodeStream interface {
Send(*pb.ChaincodeMessage) error
Recv() (*pb.ChaincodeMessage, error)
CloseSend() error
}

普通链码使用gRPC:

1
2
3
type chaincodeSupportRegisterClient struct {
grpc.ClientStream
}

系统链码直接使用通道通信,发送和接收消息都在下面了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// peer和chaincode之间通信的通道
// PeerChaincodeStream interface for stream between Peer and chaincode instance.
type inProcStream struct {
recv <-chan *pb.ChaincodeMessage
send chan<- *pb.ChaincodeMessage
}

func newInProcStream(recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) *inProcStream {
return &inProcStream{recv, send}
}

// 发送其实就是向send写数据
func (s *inProcStream) Send(msg *pb.ChaincodeMessage) (err error) {
err = nil

//send may happen on a closed channel when the system is
//shutting down. Just catch the exception and return error
defer func() {
if r := recover(); r != nil {
err = SendPanicFailure(fmt.Sprintf("%s", r))
return
}
}()
s.send <- msg
return
}

// 接收是从recv读数据
func (s *inProcStream) Recv() (*pb.ChaincodeMessage, error) {
msg, ok := <-s.recv
if !ok {
return nil, errors.New("channel is closed")
}
return msg, nil
}

func (s *inProcStream) CloseSend() error {
// 实际啥也没做
return nil
}

Peer和链码容器的交互,完成链码容器启动

部署链码需要Peer和链码容器交互,不然Peer怎么知道链码容器已经启动。以下是一份peer的DEBUG日志,在下面标注了启动容器和链码Init过程中的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2019-09-09 07:52:09.915 UTC [chaincode] LaunchConfig -> DEBU 098 launchConfig: executable:"chaincode",Args:[chaincode,-peer.address=peer0.org1.example.com:7052],Envs:[CORE_CHAINCODE_LOGGING_LEVEL=info,CORE_CHAINCODE_LOGGING_SHIM=warning,CORE_CHAINCODE_LOGGING_FORMAT=%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message},CORE_CHAINCODE_ID_NAME=lscc:1.4.3,CORE_PEER_TLS_ENABLED=true,CORE_TLS_CLIENT_KEY_PATH=/etc/hyperledger/fabric/client.key,CORE_TLS_CLIENT_CERT_PATH=/etc/hyperledger/fabric/client.crt,CORE_PEER_TLS_ROOTCERT_FILE=/etc/hyperledger/fabric/peer.crt],Files:[/etc/hyperledger/fabric/client.crt /etc/hyperledger/fabric/client.key /etc/hyperledger/fabric/peer.crt]
2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 099 start container: lscc:1.4.3
2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 09a start container with args: chaincode -peer.address=peer0.org1.example.com:7052
2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 09b start container with env:
CORE_CHAINCODE_LOGGING_LEVEL=info
CORE_CHAINCODE_LOGGING_SHIM=warning
CORE_CHAINCODE_LOGGING_FORMAT=%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message}
CORE_CHAINCODE_ID_NAME=lscc:1.4.3
CORE_PEER_TLS_ENABLED=true
CORE_TLS_CLIENT_KEY_PATH=/etc/hyperledger/fabric/client.key
CORE_TLS_CLIENT_CERT_PATH=/etc/hyperledger/fabric/client.crt
CORE_PEER_TLS_ROOTCERT_FILE=/etc/hyperledger/fabric/peer.crt
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 09c waiting for container(lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 09d got container (lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [inproccontroller] getInstance -> DEBU 09e chaincode instance created for lscc-1.4.3
2019-09-09 07:52:09.915 UTC [container] unlockContainer -> DEBU 09f container lock deleted(lscc-1.4.3)
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 0a0 waiting for container(lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 0a1 got container (lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [container] unlockContainer -> DEBU 0a2 container lock deleted(lscc-1.4.3)
2019-09-09 07:52:09.915 UTC [inproccontroller] func2 -> DEBU 0a3 chaincode-support started for lscc-1.4.3
2019-09-09 07:52:09.915 UTC [inproccontroller] func1 -> DEBU 0a4 chaincode started for lscc-1.4.3
// 以上日志对应的代码流程在上文都讲到了

// 以下是交互过程peer日志
// peer收到容器的注册消息
2019-09-09 07:52:09.916 UTC [chaincode] handleMessage -> DEBU 0a5 [] Fabric side handling ChaincodeMessage of type: REGISTER in state created
2019-09-09 07:52:09.916 UTC [chaincode] HandleRegister -> DEBU 0a6 Received REGISTER in state created
2019-09-09 07:52:09.916 UTC [chaincode] Register -> DEBU 0a7 registered handler complete for chaincode lscc:1.4.3
2019-09-09 07:52:09.916 UTC [chaincode] HandleRegister -> DEBU 0a8 Got REGISTER for chaincodeID = name:"lscc:1.4.3" , sending back REGISTERED
2019-09-09 07:52:09.920 UTC [grpc] HandleSubConnStateChange -> DEBU 0a9 pickfirstBalancer: HandleSubConnStateChange: 0xc0026318c0, READY
2019-09-09 07:52:09.923 UTC [chaincode] HandleRegister -> DEBU 0aa Changed state to established for name:"lscc:1.4.3"

// peer发送ready消息
2019-09-09 07:52:09.923 UTC [chaincode] sendReady -> DEBU 0ab sending READY for chaincode name:"lscc:1.4.3"
2019-09-09 07:52:09.923 UTC [chaincode] sendReady -> DEBU 0ac Changed to state ready for chaincode name:"lscc:1.4.3"

// 已经完成启动容器
2019-09-09 07:52:09.923 UTC [chaincode] Launch -> DEBU 0ad launch complete
2019-09-09 07:52:09.924 UTC [chaincode] Execute -> DEBU 0ae Entry
// 收到容器COMPLETED消息
2019-09-09 07:52:09.925 UTC [chaincode] handleMessage -> DEBU 0af [01b03aae] Fabric side handling ChaincodeMessage of type: COMPLETED in state ready

// 通知scc,部署已经完成
2019-09-09 07:52:09.925 UTC [chaincode] Notify -> DEBU 0b0 [01b03aae] notifying Txid:01b03aae-17a6-4b63-874e-dc20d6f5df0c, channelID:
2019-09-09 07:52:09.925 UTC [chaincode] Execute -> DEBU 0b1 Exit
2019-09-09 07:52:09.925 UTC [sccapi] deploySysCC -> INFO 0b2 system chaincode lscc/(github.com/hyperledger/fabric/core/scc/lscc) deployed

可以到REGISTER、READY、COMPLETED等消息,以及状态的改变:created、ready。

但前面还没有介绍Peer和链码容器之间的通信,所以不展示代码了,展示一下Peer和链码容器的消息交互图:

endorser policy

背书策略是Fabric中的一个重要一环,想梳理一下背书策略的上链和使用流程。

背书策略是部署和升级链码时使用的,需要发送配置交易,所以尝试了从背书节点收到交易,然后处理交易的流程入手,找到背书策略的入口,结果毫无头绪。

换一种思路,从使用入手,向上追溯,这种就非常顺利了。

从背书策略的使用入手

VSCC会利用背书策略,并且背书策略不满足时会返回一个:背书策略不满足的错误,每一个上链的交易详细中都有这么一个Validation字段,为0代表有效交易,否则是无效交易,并用数字表示原因,背书策略不满足的序号就是10。

1
2
3
4
5
6
7
type TxValidationCode int32

const (
...
TxValidationCode_ENDORSEMENT_POLICY_FAILURE TxValidationCode = 10
...
)

TxValidationCode_ENDORSEMENT_POLICY_FAILUREVSCCValidateTx使用,系统链码和普通链码都有背书策略需要满足,下面代码片是普通链码部分,可以发现调用VSCCValidateTxForCC验证交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
// VSCCValidateTx executes vscc validation for transaction
func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, envBytes []byte, block *common.Block) (error, peer.TxValidationCode) {
...
if err = v.VSCCValidateTxForCC(ctx); err != nil {
switch err.(type) {
case *commonerrors.VSCCEndorsementPolicyError:
return err, peer.TxValidationCode_ENDORSEMENT_POLICY_FAILURE
default:
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
}
...
}

每个chaincode都会提供escc和vscc,现在都是默认的,也就是说escc和vscc都可以是具备可插拔的。

1
2
3
peer chaincode list -C mychannel --instantiated
Get instantiated chaincodes on channel mychannel:
Name: mycc, Version: 1.1, Path: github.com/chaincode/chaincode_example02/go/, Escc: escc, Vscc: vscc

VSCCValidateTxForCC会从交易的context中获取验证插件,然后利用插件验证交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (v *VsccValidatorImpl) VSCCValidateTxForCC(ctx *Context) error {
logger.Debug("Validating", ctx, "with plugin")
// 使用插件验证交易
err := v.pluginValidator.ValidateWithPlugin(ctx)
if err == nil {
return nil
}
// If the error is a pluggable validation execution error, cast it to the common errors ExecutionFailureError.
if e, isExecutionError := err.(*validation.ExecutionFailureError); isExecutionError {
return &commonerrors.VSCCExecutionFailureError{Err: e}
}
// Else, treat it as an endorsement error.
return &commonerrors.VSCCEndorsementPolicyError{Err: err}
}

func (pv *PluginValidator) ValidateWithPlugin(ctx *Context) error {
// 获取vscc插件
plugin, err := pv.getOrCreatePlugin(ctx)
if err != nil {
return &validation.ExecutionFailureError{
Reason: fmt.Sprintf("plugin with name %s couldn't be used: %v", ctx.VSCCName, err),
}
}
// 利用插件验证
err = plugin.Validate(ctx.Block, ctx.Namespace, ctx.Seq, 0, SerializedPolicy(ctx.Policy))
validityStatus := "valid"
if err != nil {
validityStatus = fmt.Sprintf("invalid: %v", err)
}
logger.Debug("Transaction", ctx.TxID, "appears to be", validityStatus)
return err
}

// Plugin validates transactions
type Plugin interface {
// Validate returns nil if the action at the given position inside the transaction
// at the given position in the given block is valid, or an error if not.
Validate(block *common.Block, namespace string, txPosition int, actionPosition int, contextData ...ContextDatum) error

// Init injects dependencies into the instance of the Plugin
Init(dependencies ...Dependency) error
}

当前验证插件有2种实现,TxValidatorV1_2V1_3Validation,Validate还从context取出了序列化的背书策略,vscc会调用PolicyEvalutor交易的背书是否满足背书策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (v *DefaultValidation) Validate(block *common.Block, namespace string, txPosition int, actionPosition int, contextData ...validation.ContextDatum) error {
if len(contextData) == 0 {
logger.Panicf("Expected to receive policy bytes in context data")
}

// 拿到序列化后的policy
serializedPolicy, isSerializedPolicy := contextData[0].(SerializedPolicy)
if !isSerializedPolicy {
logger.Panicf("Expected to receive a serialized policy in the first context data")
}
if block == nil || block.Data == nil {
return errors.New("empty block")
}
if txPosition >= len(block.Data.Data) {
return errors.Errorf("block has only %d transactions, but requested tx at position %d", len(block.Data.Data), txPosition)
}
if block.Header == nil {
return errors.Errorf("no block header")
}

// 调用不同版本的validator进行验证
var err error
switch {
case v.Capabilities.V1_3Validation():
err = v.TxValidatorV1_3.Validate(block, namespace, txPosition, actionPosition, serializedPolicy.Bytes())

case v.Capabilities.V1_2Validation():
fallthrough

default:
err = v.TxValidatorV1_2.Validate(block, namespace, txPosition, actionPosition, serializedPolicy.Bytes())
}

logger.Debugf("block %d, namespace: %s, tx %d validation results is: %v", block.Header.Number, namespace, txPosition, err)
return convertErrorTypeOrPanic(err)
}

// 验证代码使用v2/validation_logic.go中的实现
// Validate validates the given envelope corresponding to a transaction with an endorsement
// policy as given in its serialized form
func (vscc *Validator) Validate(
block *common.Block,
namespace string,
txPosition int,
actionPosition int,
policyBytes []byte,
) commonerrors.TxValidationError {
...
// evaluate the signature set against the policy
err = vscc.policyEvaluator.Evaluate(policyBytes, signatureSet)
if err != nil {
logger.Warningf("Endorsement policy failure for transaction txid=%s, err: %s", chdr.GetTxId(), err.Error())
if len(signatureSet) < len(cap.Action.Endorsements) {
// Warning: duplicated identities exist, endorsement failure might be cause by this reason
return policyErr(errors.New(DUPLICATED_IDENTITY_ERROR))
}
return policyErr(fmt.Errorf("VSCC error: endorsement policy failure, err: %s", err))
}
...
}

// PolicyEvaluator evaluates policies
type PolicyEvaluator interface {
validation.Dependency

// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies
// the policy with the given bytes
Evaluate(policyBytes []byte, signatureSet []*common.SignedData) error
}

Evaluate会创建背书策略实例,然后利用背书策略验证背书签名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy
func (id *PolicyEvaluator) Evaluate(policyBytes []byte, signatureSet []*common.SignedData) error {
pp := cauthdsl.NewPolicyProvider(id.IdentityDeserializer)
policy, _, err := pp.NewPolicy(policyBytes)
if err != nil {
return err
}
return policy.Evaluate(signatureSet)
}

// Policy is used to determine if a signature is valid
type Policy interface {
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy
Evaluate(signatureSet []*cb.SignedData) error
}

// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy
func (p *policy) Evaluate(signatureSet []*cb.SignedData) error {
if p == nil {
return fmt.Errorf("No such policy")
}
idAndS := make([]IdentityAndSignature, len(signatureSet))
for i, sd := range signatureSet {
idAndS[i] = &deserializeAndVerify{
signedData: sd,
deserializer: p.deserializer,
}
}

ok := p.evaluator(deduplicate(idAndS), make([]bool, len(signatureSet)))
if !ok {
return errors.New("signature set did not satisfy policy")
}
return nil
}

具体背书验证签名的实现,当下就先不关心了。回过头来想一下,VSCC从哪拿到了背书策略?

VSCC的背书策略哪来的?

回到上文第一次出现背书策略的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (pv *PluginValidator) ValidateWithPlugin(ctx *Context) error {
err = plugin.Validate(ctx.Block, ctx.Namespace, ctx.Seq, 0, SerializedPolicy(ctx.Policy))
}

// Context defines information about a transaction
// that is being validated
type Context struct {
Seq int
Envelope []byte
TxID string
Channel string
VSCCName string
Policy []byte // 背书策略
Namespace string
Block *common.Block
}

VSCCValidateTx函数会创建Context,填写policy字段,其中policy是调用GetInfoForValidate获取的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, envBytes []byte, block *common.Block) (error, peer.TxValidationCode) {
...
// 普通链码
if !v.sccprovider.IsSysCC(ccID) {
...
// 获取policy、vscc等
// Get latest chaincode version, vscc and validate policy
txcc, vscc, policy, err := v.GetInfoForValidate(chdr, ns)
...
// do VSCC validation
ctx := &Context{
Seq: seq,
Envelope: envBytes,
Block: block,
TxID: chdr.TxId,
Channel: chdr.ChannelId,
Namespace: ns,
Policy: policy, // Here
VSCCName: vscc.ChaincodeName,
}
if err = v.VSCCValidateTxForCC(ctx); err != nil {
switch err.(type) {
case *commonerrors.VSCCEndorsementPolicyError:
return err, peer.TxValidationCode_ENDORSEMENT_POLICY_FAILURE
default:
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
}
} else {
// SCC
}
}

GetInfoForValidate先是获取了ChaincodeDefinition,它记录了peer对某个链码的proposal背书和验证的必要信息,然后利用ChaincodeDefinition.Validation获取了policy。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// GetInfoForValidate gets the ChaincodeInstance(with latest version) of tx, vscc and policy from lscc
func (v *VsccValidatorImpl) GetInfoForValidate(chdr *common.ChannelHeader, ccID string) (*sysccprovider.ChaincodeInstance, *sysccprovider.ChaincodeInstance, []byte, error) {
cc := &sysccprovider.ChaincodeInstance{
ChainID: chdr.ChannelId,
ChaincodeName: ccID,
ChaincodeVersion: coreUtil.GetSysCCVersion(),
}
vscc := &sysccprovider.ChaincodeInstance{
ChainID: chdr.ChannelId,
ChaincodeName: "vscc", // default vscc for system chaincodes
ChaincodeVersion: coreUtil.GetSysCCVersion(), // Get vscc version
}
var policy []byte
var err error
if !v.sccprovider.IsSysCC(ccID) {
// when we are validating a chaincode that is not a
// system CC, we need to ask the CC to give us the name
// of VSCC and of the policy that should be used

// obtain name of the VSCC and the policy
// 获取cc 定义
cd, err := v.getCDataForCC(chdr.ChannelId, ccID)
if err != nil {
msg := fmt.Sprintf("Unable to get chaincode data from ledger for txid %s, due to %s", chdr.TxId, err)
logger.Errorf(msg)
return nil, nil, nil, err
}
cc.ChaincodeName = cd.CCName()
cc.ChaincodeVersion = cd.CCVersion()
// 拿到policy
vscc.ChaincodeName, policy = cd.Validation()
} else {
// when we are validating a system CC, we use the default
// VSCC and a default policy that requires one signature
// from any of the members of the channel
p := cauthdsl.SignedByAnyMember(v.support.GetMSPIDs(chdr.ChannelId))
policy, err = utils.Marshal(p)
if err != nil {
return nil, nil, nil, err
}
}

return cc, vscc, policy, nil
}

//-------- ChaincodeDefinition - interface for ChaincodeData ------
// ChaincodeDefinition describes all of the necessary information for a peer to decide whether to endorse
// a proposal and whether to validate a transaction, for a particular chaincode.
type ChaincodeDefinition interface {
// CCName returns the name of this chaincode (the name it was put in the ChaincodeRegistry with).
CCName() string

// Hash returns the hash of the chaincode.
Hash() []byte

// CCVersion returns the version of the chaincode.
CCVersion() string

// Validation returns how to validate transactions for this chaincode.
// The string returned is the name of the validation method (usually 'vscc')
// and the bytes returned are the argument to the validation (in the case of
// 'vscc', this is a marshaled pb.VSCCArgs message).
Validation() (string, []byte)

// Endorsement returns how to endorse proposals for this chaincode.
// The string returns is the name of the endorsement method (usually 'escc').
Endorsement() string
}

ChaincodeData实现了ChaincodeDefinition接口,ChaincodeData是LSCC保存的数据,它其中有1个字段就是Policy。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Validation returns how to validate transactions for this chaincode.
// The string returned is the name of the validation method (usually 'vscc')
// and the bytes returned are the argument to the validation (in the case of
// 'vscc', this is a marshaled pb.VSCCArgs message).
func (cd *ChaincodeData) Validation() (string, []byte) {
return cd.Vscc, cd.Policy
}

//-------- ChaincodeData is stored on the LSCC -------

// ChaincodeData defines the datastructure for chaincodes to be serialized by proto
// Type provides an additional check by directing to use a specific package after instantiation
// Data is Type specifc (see CDSPackage and SignedCDSPackage)
type ChaincodeData struct {
// Name of the chaincode
Name string `protobuf:"bytes,1,opt,name=name"`

// Version of the chaincode
Version string `protobuf:"bytes,2,opt,name=version"`

// Escc for the chaincode instance
Escc string `protobuf:"bytes,3,opt,name=escc"`

// Vscc for the chaincode instance
Vscc string `protobuf:"bytes,4,opt,name=vscc"`

// 背书策略
// Policy endorsement policy for the chaincode instance
Policy []byte `protobuf:"bytes,5,opt,name=policy,proto3"`

// Data data specific to the package
Data []byte `protobuf:"bytes,6,opt,name=data,proto3"`

// Id of the chaincode that's the unique fingerprint for the CC This is not
// currently used anywhere but serves as a good eyecatcher
Id []byte `protobuf:"bytes,7,opt,name=id,proto3"`

// InstantiationPolicy for the chaincode
InstantiationPolicy []byte `protobuf:"bytes,8,opt,name=instantiation_policy,proto3"`
}

LSCC的Policy哪来的?

提醒:链码实例化在代码里使用Deploy,而不是Instantiate,这样可以让代码更简洁,所以链码实例化也常称为链码部署。

executeDeploy为部署链码,也就是在部署链码的时候会保存背书策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// executeDeploy implements the "instantiate" Invoke transaction
func (lscc *LifeCycleSysCC) executeDeploy(
stub shim.ChaincodeStubInterface,
chainname string,
cds *pb.ChaincodeDeploymentSpec,
policy []byte,
escc []byte,
vscc []byte,
cdfs *ccprovider.ChaincodeData,
ccpackfs ccprovider.CCPackage,
collectionConfigBytes []byte,
) (*ccprovider.ChaincodeData, error) {
//just test for existence of the chaincode in the LSCC
chaincodeName := cds.ChaincodeSpec.ChaincodeId.Name
_, err := lscc.getCCInstance(stub, chaincodeName)
if err == nil {
return nil, ExistsErr(chaincodeName)
}

//retain chaincode specific data and fill channel specific ones
cdfs.Escc = string(escc)
cdfs.Vscc = string(vscc)
// 保存背书策略
cdfs.Policy = policy
}

executeDeployOrUpgrade是执行链码实例化和升级时调用,它会传递Policy,在链码部署和升级时都会保存背书策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// executeDeployOrUpgrade routes the code path either to executeDeploy or executeUpgrade
// depending on its function argument
func (lscc *LifeCycleSysCC) executeDeployOrUpgrade(
stub shim.ChaincodeStubInterface,
chainname string,
cds *pb.ChaincodeDeploymentSpec,
policy, escc, vscc, collectionConfigBytes []byte,
function string,
) (*ccprovider.ChaincodeData, error) {

chaincodeName := cds.ChaincodeSpec.ChaincodeId.Name
chaincodeVersion := cds.ChaincodeSpec.ChaincodeId.Version

if err := lscc.isValidChaincodeName(chaincodeName); err != nil {
return nil, err
}

if err := lscc.isValidChaincodeVersion(chaincodeName, chaincodeVersion); err != nil {
return nil, err
}

ccpack, err := lscc.Support.GetChaincodeFromLocalStorage(chaincodeName, chaincodeVersion)
if err != nil {
retErrMsg := fmt.Sprintf("cannot get package for chaincode (%s:%s)", chaincodeName, chaincodeVersion)
logger.Errorf("%s-err:%s", retErrMsg, err)
return nil, fmt.Errorf("%s", retErrMsg)
}
cd := ccpack.GetChaincodeData()

switch function {
case DEPLOY:
return lscc.executeDeploy(stub, chainname, cds, policy, escc, vscc, cd, ccpack, collectionConfigBytes)
case UPGRADE:
return lscc.executeUpgrade(stub, chainname, cds, policy, escc, vscc, cd, ccpack, collectionConfigBytes)
default:
logger.Panicf("Programming error, unexpected function '%s'", function)
panic("") // unreachable code
}
}

LSCC也实现了ChainCode接口,与普通链码的实现并没有区别,只不过LSCC并不运行在容器中。LifeCycleSysCC.Invoke会根据参数调用不同的函数,而部署和升级时,会调用executeDeployOrUpgrade部署链码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Invoke implements lifecycle functions "deploy", "start", "stop", "upgrade".
// Deploy's arguments - {[]byte("deploy"), []byte(<chainname>), <unmarshalled pb.ChaincodeDeploymentSpec>}
//
// Invoke also implements some query-like functions
// Get chaincode arguments - {[]byte("getid"), []byte(<chainname>), []byte(<chaincodename>)}
func (lscc *LifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
...
switch function {
case INSTALL:
...
case DEPLOY, UPGRADE:
// 提取背书策略
// optional arguments here (they can each be nil and may or may not be present)
// args[3] is a marshalled SignaturePolicyEnvelope representing the endorsement policy
// args[4] is the name of escc
// args[5] is the name of vscc
// args[6] is a marshalled CollectionConfigPackage struct
var EP []byte
if len(args) > 3 && len(args[3]) > 0 {
EP = args[3]
} else {
p := cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel))
EP, err = utils.Marshal(p)
if err != nil {
return shim.Error(err.Error())
}
}
...
cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function)
...
case ...:
...
}
}

总结

我们终于知道Policy是哪来的,又是如何被使用的了。管理和查看链码信息,本质是创建一个调用LSCC的Proposal或者交易,链码的信息会保存在LSCC,当VSCC验证链码的交易时,会从LSCC获取信息,包括背书策略、vscc插件等,以验证交易。

最后,ESCC、VSCC也是进行了可插拔设计的。

endorser policy