0%

引言

Write Ahead Logging,简称WAL,也被翻译成预写式日志,是数据库技术中实现事务日志(Transaction Journal)的一种标准方法,可以实现单机事务的原子性,同时可以提高数据库的写入效率。

思考如下场景,如何确保原子性:写操作修改数据库中a和b的值,二者是一个事务,需要把a和b的最新值持久化到磁盘,假如保存完a的值,系统宕机了,重新启动后,a的值已经写入,但b待写入的值已经丢失,如何发现事务没有完成呢?如何保证事务的原子性呢?

可以为事务加锁,也为事务增加标志位,修改完磁盘数据后,标志位设置事务为完成,事务状态保存在磁盘中,假使保存事务状态的过程中宕机了,就把事务回滚掉。实现REDO和UNDO,就能实现原子性。

数据库中针对CrashRecovery的解决方案是WAL。

原理

WAL的核心思想是先写日志再写数据文件,修改数据文件必须发生在修改操作记录在日志文件之后。

本文的日志指事务的操作日志,本文提到的日志都是指事务日志,不再特殊声明。

WAL

我们看WAL怎么解决宕机和恢复的问题

  • 写WAL前宕机了,重启后,数据处于事务未执行的状态。
  • 写WAL时宕机了,重启后,可以检查到WAL数据不正确,回滚当事务前的状态。
  • 写WAL后宕机了,重启后,把WAL中记录的操作,应用到数据库文件中,得到事务执行后的状态。

如此,保证了数据的恢复和事务的原子性。

上面提到的都是写操作,看一下使用WAL时的读操作。WAL中可能包含了未写入到数据库文件中的最新值,如果读最新值就需要从WAL中读取,如果WAL中未读到,从数据库读到的就是最新的数据。

检查点:写入到WAL文件中的操作记录并不一定会立刻应用到数据库文件上,这个过程是异步的,设计检查点来记录已经被应用到数据库文件上的操作序号,检查点后面的操作记录等待被应用到数据库文件上。

优点

WAL的作用是解决宕机和恢复的问题,同时也有其他优点:

  1. 提高写数据的性能
    1. WAL是顺序写,数据库文件是随机写,顺序写性能高于随机写
    2. 减少写磁盘次数
      1. 不直接修改数据库真实数据
      2. 合并若干小的事务,一次性commit到数据库
  2. 保证事务原子性
  3. 保证事务一致性
  4. 并发读写,比如SQLite中,读写、读读都是可以并行的,比如读时需要找到WAL某个值最后写入的位置,就可以从该位置读数据,而写操作是在WAL文件后Append,二者并行。但写写不能并行,因为2次写操作都要向WAL文件Append数据,无法同时进行。
  5. WAL文件中记录了数据的历史版本,因此可以读取历史版本的值,甚至把状态回滚到某个历史版本。

缺点

SQLite提到了WAL的几项缺点:

  1. WAL需要VFS的支持。
  2. 所有使用数据库的进程必须在同一个机器上,以为WAL是单机的。
  3. 多读少写的场景WAL比rollback-journal类型要慢1%~2%。

使用场景

WAL几乎是数据存储(数据库只是数据存储的一个类别,只不过这个类别很大)的标配:

  • Raft可以使用WAL保存log Entry以及状态
  • 数据库
    • PgSQL使用WAL实现事务日志实现事务原子性、一致性,提升性能
    • SQLite使用WAL实现原子事务和回滚
    • MySQL使用WAL保证数据不丢失的情况下提升性能
    • leveldb也使用WAL提升性能,保证操作原子性

资料

Peer与Orderer的交互主要是组织的Peer主节点从Orderer获取区块,本文就来介绍,Peer是如何从Orderer获取区块的,顺带介绍为何Peer从Orderer获取的区块“好慢”。

网络拓扑

假设存在如下的Fabric网络拓扑情况,本文使用此拓扑进行介绍Orderer到Peer的区块传播情况:

网络中存在两家组织:Org1和Org2,它们分别拥有Peer1作为主节点,连向了排序服务的Orderer1节点。

网络中存在2个应用channel:channel1和channel2,它们的账本分别是channel1 ledger和channel2 ledger,Org1和Org2都加入了这2个channel。

channel间是隔离的,所以Peer和Orderer对不同的channel都会分别处理

宏观视角

下图展示了Orderer向Peer传递区块的宏观视角,能够展示多个通道在Orderer和Peer间传递区块的情况

  1. Orderer上有2个通道的账本,每个Peer分别有2个Deliver Server对应2个通道的账本,从账本读取区块,发送给Peer。
  2. 每个Peer有2个Deliver Client,也对应2个通道,接收Orderer发来的区块,加入到缓冲区Payloads Buffer,然后再从Payloads Buffer中提取区块,验证后写入对应的通道账本。

后面,介绍区块同步某个通道区块的情况。

单通道区块同步

Peer利用Deliver从Orderer获取区块,就像SDK利用Deliver从Peer获取区块一样,Deliver服务端的处理是一样的,Deliver客户端的处理就由SDK、Peer自行处理了。

Deliver本质是一个事件订阅接口,Leading Peer启动后,会为每个通道,分别向Orderer节点注册区块事件,并且指定结束的区块高度为uint类型的最大值,这是为了不停的从orderer获取区块。

通过建立的gRPC连接,Orderer源源不断的向Peer发送区块,具体流程,如下图所示:

  1. Orderer调用deliverBlock函数,该函数是循环函数,获取区块直到指定高度。
  2. 每当有新区块产生,deliverBlock能利用NextBlock从通道账本中读到最新的区块,如果没有最新区块,NextBlock会阻塞。
  3. deliverBlock把获取的区块封装成区块事件,发送给Peer(写入到gRPC缓冲区)。
  4. Peer从gRPC读到区块事件,把区块提取出来后,加入到Payloads Buffer,Payloads Buffer默认大小为200(通过源码和日志发现,Payloads Buffer实际存储202个区块),如果Orderer想向Peer发送更多的区块,必须等Payloads Buffer被消费,有空闲的位置才可以。
  5. deliverPayloads为循环函数,不断消费Payloads Buffer中的区块,执行区块验证,添加区块剩余元数据,最后写入通道账本。
  6. 写通道账本包含区块写入区块账本,修改世界状态数据库,历史索引等。

为何Peer从Orderer获取区块慢?

在性能测试过程中,我们发现Orderer排序完成后,Peer还在不断的从Orderer获取区块,而不是所有排序后的区块都先发送给Peer,Peer缓存起来,慢慢去验证?

上面提到Orderer向Peer发送的区块,Peer收到后先存到Payloads Buffer中,Buffer有空闲位置的时候,Orderer发送的区块才能写入Buffer,deliverBlock 1次循环才能完成,才可以发送下一个区块。

但Payloads Buffer大小是有限的,当Buffer满后,Orderer发送区块的操作也会收到阻塞。

我们可以把Orderer和Peer间发送区块可以抽象一下,它们就是生产者-消费者模型,它们中间是缓冲区,Orderer是生产者,向缓冲区写数据,Peer是消费者,从缓冲区读数据,缓冲区满了会阻塞生产者写数据。

所以Orderer向Peer发送数据的快慢,取决消费者的速度,即取决于deliverPayloads处理一个区块的快慢

deliverPayloads慢在把区块写入区块账本,也就是写账本,成了整个网络的瓶颈。

为何不让Peer缓存所有未处理的区块?

从我们测试的情况看,Orderer排序的速度远快于Peer,Peer和Orderer的高度差可以达到10万+,如果让Peer来缓存这些区块,然后再做处理是需要耗费大量的空间。

在生产者-消费者模型中,只需要要消费者时刻都有数据处理即可。虽然Orderer和Peer之间是网络传输,测试网络比较可靠,传输速度远比Peer处理区块要快。

Payloads Buffer可以让网络传输区块和Peer处理区块并行,这样缩短了一个区块从Orderer中发出,到Peer写入区块到账本的总时间,提升Fabric网络整体性能。

Orderer介绍

排序服务由一组排序节点组成,它接收客户端提交的交易,把交易打包成区块,确保排序节点间达成一致的区块内容和顺序,提供区块链的一致性服务。

图片源自《区块链原理、设计与应用》,当时Fabric还不支持raft

排序服务所提供的一致性,依赖确定性的共识算法,而非比特币、以太坊等公有链,所采用的概率性共识算法。确定性的共识算法是区块上链,即不可修改。Fabric所采用的共识算法有Solo、Kafka、EtcdRaft。

客户端通过Broadcast接口向Orderer提交背书过的交易,客户端(此处广义指用户客户端和Peer节点通过Deliver接口订阅区块事件,从Orderer获取区块

更多的排序服务介绍请参考这篇官方文档排序服务

架构

Architecture of Orderer

本图依赖 Fabric 1.4 源码分析而得

Orderer由:多通道、共识插件、消息处理器、本地配置、区块元数据、gRPC服务端、账本等组成,其中gRPC中的Deliver、Ledger是通用的(Peer也有),其余都是Orderer独有的。

多通道

Fabric 支持多通道特性,而Orderer是多通道的核心组成部分。多通道由Registrar、ChainSupport、BlockWriter等一些重要部件组成。

Registrar是所有通道资源的汇总,访问每一条通道,都要经由Registrar,更多信息请看Registrar

ChainSupport代表了每一条通道,它融合了一条通道所有的资源,更多信息请看ChainSupport

BlockWriter 是区块达成共识后,Orderer写入区块到账本需要使用的接口。

共识插件

Fabric的共识是插件化的,抽象出了Orderer所使用的共识接口,任何一种共识插件,只要满足给定的接口,就可以配合Fabric Orderer使用。

当前共识有3种插件:Solo、Kafka、EtcdRaft。Solo用于实验环境,Kafka和EtcdRaft用于生产环境,Kafka和EtcdRaft都是CFT算法,但EtcdRaft比Kafka更易配置。

EtcdRaft实在Fabric 1.4开始引入的,如果之前的生产环境使用Kafka作为共识,可以遵循Fabric给的指导,把Kafka共识,迁移到Raft共识。

gRPC通信

Orderer只有2个gRPC接口:

  • Broadcast:用来接收客户端提交的待排序交易
  • Deliver:客户端(包括Peer节点)用来从Orderer节点获取已经达成一致的区块

其中,Broadcast是Orderer独有的,而Devliver是通用的,因为客户端也可以利用Deliver接口从Peer节点获取区块、交易等。

关于Broadcast和Orderer更多介绍可以参考杨保华的2篇笔记:

用来解析orderer节点的配置文件: orderer.yaml,并保存入内存。

该配置文件中的配置,是节点本地的配置,不需要Orderer节点间统一的配置,因此不需要上链,相关配置有:

  • 网络相关配置
  • 账本类型、位置
  • raft文件位置

而上链的配置,被称为通道配置,需要使用配置交易进行更新,这部分配置,写在configtx.yaml中,和Orderer相关的有:

  • 共识类型
  • 区块大小
  • 切区块的时间
  • 区块内交易数
  • 各种共识的相关配置

Metadata

区块中有4个元数据:

  • 区块签名,存放orderer对区块的SignatureHeader
  • 最新配置区块的高度,方便获取当前通道最新配置
  • 交易过滤,为数组,存放区块内所有交易的有效性,使用数字代表无效的原因,由验证交易的记账节点填写
  • orderer相关元数据,不同的共识类型,该元数据不同

区块Header中记录了Data.Hash(),Data是所有交易后序列化的结果,但不包含区块元数据,所以区块元数据是可以在产生区块后修改的。即,即使元数据上链了,但这数据是可以修改的,只不过修改也没有什么意义。

MsgProcessor

orderer收到交易后需要对交易进行多项检查,不同的通道可以设置不同的MsgProcessor,也就可以进行不同的检查。

当前Processor分2个:

  • 应用通道的叫StandardChannel
  • 系统通道的叫SystemChannel

StandardChannel会对交易进行以下检查:

  • 交易内容不能为空
  • 交易大小不能超过区块大小最大值(默认10MB)
  • 交易交易签名不符合签名策略
  • 签名者证书是否过期

SystemChannel只比StandardChannel多一项:系统配置检查,用来检查以下交易中包含的配置,配置项是否有缺失,或者此项配置是否允许更新等。

BlockCutter

BlockCutter用来把收到的交易分成多个组,每组交易会打包到一个区块中。而分组的过程,就是切块,每组交易被称为一个Batch,它有一个缓冲区用来存放待切块交易。

切块有3个可配置条件:

  • 缓冲区内交易数,达到区块包含的交易上限(默认500)
  • 缓冲区内交易总大小,达到区块大小上限(默认10MB)
  • 缓冲区存在交易,并且未出块的时间,达到切块超时时间(默认2s)

切块有1个不可配置条件:

  • 缓冲区收到配置交易,配置交易要放到单独区块,如果缓冲区有交易,缓冲区已有交易会切到1个区块

超多刚接触Fabric的人有这样一个疑问:排序节点是按什么规则对交易排序的?

按什么顺序对交易排序并不重要,只要交易在区块内的顺序是一致的,然后所有记账节点,按交易在区块内的顺序,处理交易,最后得到的状态必然是一致的,这也是区块链保持一致性的原理。

再回过头来说一下实现是什么顺序:哪个交易先写入BlockCutter的缓冲区,哪个交易就在前面,仅此而已。

BlockWriter

Orderer的BlockWriter是基于common/ledger实现的,它用来保存区块文件,不包含状态数据库等其他数据库,其中有3类区块文件:ram,json和file,file是Orderer和Peer都可使用的,另外2个只能Orderer使用。

BlockWriter用来向Peer的账本追加区块,但追加区块之前,还需要做另外1件事情,设置区块的元数据。

区块元数据包含:

  • 区块签名,存放orderer对区块的SignatureHeader
  • 最新配置区块的高度,方便获取当前通道最新配置
  • 交易过滤,为数组,存放区块内所有交易的有效性,使用数字代表无效的原因,由验证交易的记账节点填写
  • orderer相关元数据,不同的共识类型,该元数据不同

但此时只设置其中的3个:区块签名、配置区块高度、orderer相关的元数据。因为交易的有效性在记账节点检查后才能设置。

为何不在创建区块的时候就设置这些元数据信息,而是在区块经过共识之后?

共识的过程会传播区块,只让区块包含必要的信息,可以减少区块大小,降低通信量。但元数据占用大小非常小,所以这未必是真实原因。

BlockWriter还有另外一个功能:根据一个Batch创建下一个高度的区块。一个区块包含了:

  • Header:区块高度、前一个区块Hash、Data的哈希值
  • Data:被序列化的交易列表
  • Metadata:区块元数据

Header只记录Data的哈希值,不包含Metadata哈希值,这样的目的是,在区块创建之后,仍能修改区块。

Orderer节点启动

根据Fabric 1.4源码梳理Orderer启动步骤:

  • 加载配置文件
  • 设置Logger
  • 设置本地MSP
  • 核心启动部分:
    • 加载创世块
    • 创建账本工厂
    • 创建本机gRPCServer
    • 如果共识需要集群(raft),创建集群gRPCServer
    • 创建Registrar:设置好共识插件,启动各通道,如果共识是raft,还会设置集群的gRPC接口处理函数Step
    • 创建本机server:它是原子广播的处理服务,融合了Broadcast处理函数、deliver处理函数和registrar
    • 开启profile
    • 启动集群gRPC服务
    • 启动本机gRPC服务

启动流程图可请参考杨宝华的笔记Orderer 节点启动过程,笔记可能是老版本的Fabric,但依然有参考价值。

Orderer处理交易的流程

普通交易在Orderer中的流程

交易是区块链的核心,交易在Orderer中的流程分3阶段:

  1. Orderer 的 Broadcast 接口收到来自客户端提交的交易,会获取交易所在的链的资源,并进行首次检查,然后提交给该链的共识,对交易进行排序,最后向客户端发送响应,为下图蓝色部分。
  2. 共识实例是单独运行的,也就是说Orderer把交易交给共识后,共识可能还在处理交易,然而Orderer已经开始向客户端发送提交交易的响应。共识如果发现排序服务的配置如果进行了更新,会再次检查交易,然后利用把Pending的交易分割成一组,然后打包成区块,然后共识机制确保各Orderer节点对区块达成一致,最后将区块写入账本。为下图绿色部分。
  3. Peer会向Orderer订阅区块事件,每当新区块被Orderer写入账本时,Orderer会把新区块以区块事件的方式,发送给Peer。为下图换色部分。

上面提到Orderer和共识实例分别会对交易进行2次检查,这些检查是相同的,为何要进行两次检查呢?

代码如下:ProcessMessage 会调用ProcessNormalMsg,对交易进行第一次检查,如果有错误,会向客户端返回错误响应。 SomeConsensurFunc 是一个假的函数名称,但3种共识插件实现,都包含相同的代码片,当消息中 configSeq < seq 时,再次对交易进行检查,如果错误,则丢次此条交易。configSeq是Order函数传入的,即第一次检查交易时的配置号,seq为共识当前运行时的配置号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
// ...
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
// ...
err = processor.Order(msg, configSeq)
// ...
}

func SomeConsensurFunc() {
// ...
if msg.configSeq < seq {
_, err = ch.support.ProcessNormalMsg(msg.normalMsg)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
continue
}
}
// ...
}

我认为如此设计的原因,考量如下:
共识插件应当尽量高效,orderer尽量把能做的做掉,把不能做的交给共识插件,而交易检查就是orderer能做的。共识插件只有在排序服务配置更新后,才需要重新检查交易,以判断是否依然满足规则。排序服务的配置通常是比较稳定的,更新频率很低,所以进行2次校验的频率也是非常低。这种方式,比只在共识插件校验,会拥有更高的整体性能。

配置交易在Orderer中的流程

配置交易可以用来创建通道、更新通道配置,与普通交易的处理流程总体是相似的,只不过多了一些地方或者使用不同的函数,比如:

  • 交易检查函数不是ProcessNormalMsg,而是ProcessConfigMsg
  • 配置交易单独打包在1个区块
  • 配置交易写入账本后,要让配置生效,即Orderer应用最新的配置

使用Raft共识,交易在Orderer中的流程

上面2中流程都是与具体共识算法无关的,这里补充一个Raft共识的。

使用Raft共识的链处理交易包含了上图中的4步:

  • 交易:处理交易
  • 区块:创建区块
  • Raft:使用Raft对区块达成共识
  • 账本:写区块元数据,把区块写入到账本

如果把图中提到的:转发和Raft去掉,就是以Solo为共识的链的过程。

下图是更加细化一层的,如果看不懂,建议先读下Etcd Raft架构设计和源码剖析2:数据流这篇文章。

红色圈出来的是etcd/raft的实现,蓝色圈出来的是Fabric使用raft为共识的部分,外面的Broadcast、Deliver是属于Orderer但不属于某条链。

这张图和etcd与raft交互没有太多不同,只有2个地方:

  1. chains要把交易转化为区块,再交给raft去共识
  2. chains的Apply并不是去修改状态机,而是把取消写到账本

源码简介

Orderer的代码位于fabric/orderer,其目录结构如下,标注了每个目录结构的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
➜  fabric git:(readCode) ✗ tree -L 2 orderer
orderer
├── README.md
├── common
│   ├── blockcutter 缓存待打包的交易,切块
│   ├── bootstrap 启动时替换通道创世块
│   ├── broadcast orderer的Broadcast接口
│   ├── cluster (Raft)集群服务
│   ├── localconfig 解析orderer配置文件orderer.yaml
│   ├── metadata 区块元数据填写
│   ├── msgprocessor 交易检查
│   ├── multichannel 多通道支持:Registrar、chainSupport、写区块
│   └── server Orderer节点的服务端程序
├── consensus 共识插件
│   ├── consensus.go 共识插件需要实现的接口等定义
│   ├── etcdraft raft共识插件
│   ├── inactive 未激活时的raft
│   ├── kafka kafka共识插件
│   ├── mocks 测试用的共识插件
│   └── solo solo共识插件
├── main.go orderer程序入口
├── mocks
│   ├── common
│   └── util
└── sample_clients orderer的客户端程序样例
├── broadcast_config
├── broadcast_msg
└── deliver_stdout

23 directories, 3 files

阅读Orderer源码,深入学习Orderer的时候,建议以下顺序:

  • 核心的数据结构,主要在multichannel、consensus.go:Fabric 1.4源码解读 6:Orderer核心数据结构
  • Orderer的启动
  • Broadcast接口
  • msgprocessor
  • 通过Solo掌握共识插件需要做哪些工作
  • 切块:blockcutter
  • 写区块:BlockWriter、metadata

总结

本文从宏观的角度介绍了Orderer的功能、核心组成,以及交易在Orderer中的流程,Peer如何从Orderer获取区块。

前言

许多Orderer的文章,都是从Orderer的启动过程讲起,今天换一种“乐高”角度,先看看有哪些“零件”,再看这些零件怎么配合。

Orderer负责接收交易,把交易打包成区块,然后区块在所有Orderer节点之间达成一致,再分发给Peer的功能,这涉及了:

  • 网络:gRPC接收交易,向Peer发送区块
  • 切块:把交易打包成区块
  • 共识:所有Orderer节点达成一致

这些功能是由Orderer核心数据结构组织起来。

在Fabric中,通道和链在概念上都是一条区块链,所以本文中也会可能会混用链和通道。

核心数据结构

Registrar

Registrar

代码中,这样描述Registrar:

Registrar serves as a point of access and control for the individual channel resources.

可见它负责了每个channel资源的访问和控制点,也就说,要对某个通道怎么样,得从这入手。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Registrar struct {
lock sync.RWMutex
// 保存了多条链
chains map[string]*ChainSupport

// 共识插件
consenters map[string]consensus.Consenter
ledgerFactory blockledger.Factory
signer crypto.LocalSigner

systemChannelID string
systemChannel *ChainSupport
...
}
  • chains保存了每一条链,每一条链在Orderer中都以ChainSupport代表。
  • consenters保存了所有的共识插件,每个共识插件都是一个Consenter,Fabric 1.4中共识插件有Solo、Kafka、EtcdRaft。
  • ledgerFactory用来读取和创建链的账本。
  • signer用来对Orderer中的数据进行签名,以及创建SignatureHeader
  • systemChannelIDsystemChannel分别是系统链ID、系统链实例。

ChainSupport

chainsupport

ChainSupport汇集了一条通道所需要的所有资源,所以说一个ChainSupport代表了一条链。

1
2
3
4
5
6
7
8
type ChainSupport struct {
*ledgerResources
msgprocessor.Processor
*BlockWriter
consensus.Chain
cutter blockcutter.Receiver
crypto.LocalSigner
}

ChainSupport 是一堆接口的集合,这些接口构成一条链所有的操作,接口可以分为4类:

  • 账本:ledgerResourcesBlockWriter分别是账本读写和把区块写入到账本。
  • 消息:msgprocessor.Processorcutter分别是处理交易和把交易切块。
  • 共识:consensus.Chain是Orderer的共识实例,比如每条链都有自己的Raft共识实例,它们互不干扰。
  • 签名:crypto.LocalSigner,同Registrar中的介绍。

Chain

Chain

Chain是接口,它的实现并不一条链,而是一条链的共识实例,可以是Solo、Kafka和EtcdRaft,它运行在单独的协程,使用Channel和ChainSupport通信,它调用其它接口完成切块,以及让所有的Orderer节点对交易达成一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Chain defines a way to inject messages for ordering.
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
// and ultimately write the ledger also supplied via HandleChain. This design allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// 普通消息/交易排序
// Order accepts a message which has been processed at a given configSeq.
// If the configSeq advances, it is the responsibility of the consenter
// to revalidate and potentially discard the message
// The consenter may return an error, indicating the message was not accepted
Order(env *cb.Envelope, configSeq uint64) error

// 配置消息/交易排序
// Configure accepts a message which reconfigures the channel and will
// trigger an update to the configSeq if committed. The configuration must have
// been triggered by a ConfigUpdate message. If the config sequence advances,
// it is the responsibility of the consenter to recompute the resulting config,
// discarding the message if the reconfiguration is no longer valid.
// The consenter may return an error, indicating the message was not accepted
Configure(config *cb.Envelope, configSeq uint64) error

// 等待排序集群可用
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
// This is useful when consenter needs to temporarily block ingress messages so
// that in-flight messages can be consumed. It could return error if consenter is
// in erroneous states. If this blocking behavior is not desired, consenter could
// simply return nil.
WaitReady() error

// 当排序集群发送错误时,会关闭返回的通道
// Errored returns a channel which will close when an error has occurred.
// This is especially useful for the Deliver client, who must terminate waiting
// clients when the consenter is not up to date.
Errored() <-chan struct{}

// 启动当前链
// Start should allocate whatever resources are needed for staying up to date with the chain.
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger.
Start()

// 停止当前链,并释放资源
// Halt frees the resources which were allocated for this Chain.
Halt()
}

Consenter

Consenter

1
2
3
type Consenter interface {
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

Consenter也是接口,它只有1个功能用来创建Chain。每种共识插件,都有自己单独的consenter实现,分别用来创建solo实例、kafka实例或etcdraft实例。

ConsenterSupport

ConsenterSupport

ConsenterSupport为consenter实现提供所需的资源,其实就是共识用来访问外部数据的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ConsenterSupport provides the resources available to a Consenter implementation.
type ConsenterSupport interface {
crypto.LocalSigner
msgprocessor.Processor

// VerifyBlockSignature verifies a signature of a block with a given optional
// configuration (can be nil).
VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error

// 提供把消息切成块的接口
// BlockCutter returns the block cutting helper for this channel.
BlockCutter() blockcutter.Receiver

// 当前链的orderer配置
// SharedConfig provides the shared config from the channel's current config block.
SharedConfig() channelconfig.Orderer

// 当前链的通道配置
// ChannelConfig provides the channel config from the channel's current config block.
ChannelConfig() channelconfig.Channel

// 生成区块
// CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger
// Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time.
CreateNextBlock(messages []*cb.Envelope) *cb.Block

// 读区块
// Block returns a block with the given number,
// or nil if such a block doesn't exist.
Block(number uint64) *cb.Block

// 写区块
// WriteBlock commits a block to the ledger.
WriteBlock(block *cb.Block, encodedMetadataValue []byte)

// 写配置区块并更新配置
// WriteConfigBlock commits a block to the ledger, and applies the config update inside.
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)

// Sequence returns the current config squence.
Sequence() uint64

// ChainID returns the channel ID this support is associated with.
ChainID() string

// Height returns the number of blocks in the chain this channel is associated with.
Height() uint64

// 以原始数据的格式追加区块,不像WriteBlock那样会修改元数据
// Append appends a new block to the ledger in its raw form,
// unlike WriteBlock that also mutates its metadata.
Append(block *cb.Block) error
}

宏观视角

把上面介绍的各项,融合在一幅图中:

  • Registrar 包容万象,主要是ChainSupport和Consenter,Consenter是可插拔的
  • ChainSupport 代表了一条链,能够指向属于本条链的共识实例,该共识实例由对应共识类型的Consenter创建
  • 共识实例使用ConsenterSupport访问共识外部资源

使用fabric-sdk-go操作链码,介绍了使用官方Go SDK,安装、实例化和升级链码,调用和查询链码,本文介绍使用fabric-sdk-go订阅事件。

事件介绍

本质上就3种事件:

  • BlockEvent:获取区块信息
  • TransactionEvent:获取交易信息
  • ChainCodeEvnet:链码中自定义的链码事件

但每种事件都有2 种类型:

  • Filtered:事件订阅时默认的类型,获取的信息“不全”,不同的事件缺失的数据不同,比如链码事件,如果是Filtered的,其响应字段中的Payload是空的,也就是不知道链码事件携带的数据。这种方式能够降低fabric网络和SDK之间的流量,当Filtered后的字段信息就足够时,这种方式非常适合。关于Filtered的更多信息,这篇文章 Fabric 1.4源码解读 3:Event原理解读 非常有帮助。
  • 非Filtered :可以获取完整的区块、交易、链码事件信息,这种方式在SDK想获取更多信息时,是非常必要的。

4 个注册事件的接口1个取消注册的接口如下:

接口名称 描述 参数值 返回值
RegisterBlockEvent 注册块事件 filter …fab.BlockFilter fab.Registration, <-chan *fab.BlockEvent, error
RegisterFilteredBlockEvent 注册过滤块事件 fab.Registration, <-chan *fab.FilteredBlockEvent, error
RegisterTxStatusEvent 注册交易状态事件 txID string fab.Registration, <-chan *fab.TxStatusEvent, error
RegisterChaincodeEvent 注册链码事件 ccID, eventFilter string fab.Registration, <-chan *fab.CCEvent, error
Unregister 取消事件订阅 fab.Registration

注册会得到管理可以管理订阅的Registration、接收事件的通道,以及可能注册时发生的错误,关于每个接口的具体介绍、使用,可以参考官方的Event文档,其中包含了样例代码,如果想看真实的样例代码,可以参考示例项目

Option介绍

注册事件需要使用EventClient,创建EventClient时可以指定一些选项,这些选项其实就是事件订阅的选项。

有3个Option:

  • func WithBlockEvents() ClientOption

    指定了此选项,事件就是非“filtered”,fabric会向调用SDK客户端发送完整的区块,可以获得订阅事件完整的信息。

  • func WithSeekType(seek seek.Type) ClientOption

    使用此选项可以指定从哪个区块高度获取事件seek.TypeOldestNewestFromBlock 3种取值,分别代表从第1个区块、最后一个区块和指定区块号开始获取事件,FromBlock需要结合WithBlockNum使用。So,可以通过这个选项获取历史事件

  • func WithBlockNum(from uint64) ClientOption

    指定区块高度,只有WithSeekType(FromBlock)时才生效。

链码事件多说几句

链码如何发链码事件

ChaincodeStubInterfaceSetEvent的方法,入参分别为事件名称和事件锁携带的信息payload。

1
2
3
4
5
6
7
8
9
// ChaincodeStubInterface is used by deployable chaincode apps to access and
// modify their ledgers
type ChaincodeStubInterface interface {
// SetEvent allows the chaincode to set an event on the response to the
// proposal to be included as part of a transaction. The event will be
// available within the transaction in the committed block regardless of the
// validity of the transaction.
SetEvent(name string, payload []byte) error
}

通过ChannelClient订阅链码事件介绍

SDK的channel client也有订阅链码事件的接口:channel.Client.RegisterChaincodeEvent(),它的定义和event client提供的接口完全一样,但功能上有所差别。

channel client没有指定 WithBlockEvents,所以这是Filtered的事件链码,获取的事件链码中,其Payload为空。

示例项目

示例项目fabric-sdk-go-sample是结合Fabric的BYFN展示如何使用fabric-sdk-go的项目,它的Event样例部分,介绍了如何使用以上接口订阅Fabric事件,具体请参加该部分README

理论知识

如果不清楚数字证书、公私钥与签名的关系,建议阅读阮一峰的数字签名是什么?

Fabric证书和密钥文件

使用Fabric CA或者 cryptogen 工具可以生成证书和私钥文件,这里取 BYFN 例子的文件做介绍,Org1 Admin 账户的文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
➜  first-network git:(release-1.4) ✗ tree crypto-config/peerOrganizations/org1.example.com/users/Admin@org1.example.com
crypto-config/peerOrganizations/org1.example.com/users/Admin@org1.example.com
├── msp
│   ├── admincerts
│   │   └── Admin@org1.example.com-cert.pem
│   ├── cacerts
│   │   └── ca.org1.example.com-cert.pem
│   ├── keystore
│   │   └── f9f3dddb7fcc40086de6d5ae77f1481abbb99bff7a74839b950720d3dca0d8ee_sk
│   ├── signcerts
│   │   └── Admin@org1.example.com-cert.pem
│   └── tlscacerts
│   └── tlsca.org1.example.com-cert.pem
└── tls
├── ca.crt
├── client.crt
└── client.key

msp目录,为Admin的身份信息:

  • admincerts:组织管理员的身份验证证书。
  • cacerts:组织的根证书。
  • keystore:该用户的私钥,用来对消息签名。
  • signcerts:该用户的身份验证证书,被组织根证书签名。
  • tlscacerts:TLS通信用的身份证书,为组织的TLS证书。

tls目录,为TLS通信相关的证书:

  • ca.crt:组织根证书
  • client.crt:验证当前用户身份的证书,当前为验证管理员的证书
  • client.key:当前用户的身份私钥,用来签名

整体逻辑

交易是区块链的核心,一切状态的转移都是一条交易,交易的真伪需要使用数字签名进行保证。

在Fabric中,交易涉及两个概念:

  • Proposal:提案
  • Transaction:交易

所以 Proposal 和 Transaction 都需要使用数字签名进行保护,它们相关的消息中,都包含了发送方的身份信息:mspid、证书(证书中实际包含了公钥)。

提案的实际消息是 SignedProposal,其中包含了:

  • 数字签名:Signature
  • 证书、公钥等签名者身份信息:ProposalBytes.Proposal.Header.SignatureHeader.Creator

signed_proposal

图来自杨保华的hyperledger_code_fabric

交易中最重要的是Envelope结构体,SDK向Orderer提交交易时,会发送Envelope消息,它包含了:

  • 数字签名:Signature
  • 交易发送方的身份信息:Payload.Header.SignatureHeader.Creator
  • 可选背书节点的身份信息,不同的交易类型,Data包含了不同的信息,如果是需要背书的,可以包含背书的信息、签名和身份信息:Payload.Data.SignedChainccodeDeploymentSpec.OwnerEndorsements.signingidentity

Signed transaction

图来自《区块链原理、设计与应用》,为升级链码的交易Envelope结构。

在验证消息的签名时,会从中提取出数字签名Signature,身份信息(证书、公钥)和被签名消息体,完成以下验证:

  • 使用证书验证发送方的身份,发送方是否属于它所在的组织,以及发送方的公钥没有修改和替换
  • 使用公钥验证消息是否为发送方签名,并且消息没有被修改

验证的整体流程如下:

Verify signature

验证签名的函数

core/common/validation/msgvalidation.go 提供了2验证消息签名的函数,用来验证Proposal和Transaction,它们会调用相同的函数checkSignatureFromCreator进行数字签名的验证。

验证Porposal签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func ValidateProposalMessage(signedProp *pb.SignedProposal) (*pb.Proposal, *common.Header, *pb.ChaincodeHeaderExtension, error) {
...

// 从SignatureHeader交易客户端的签名
// validate the signature
err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
if err != nil {
// log the exact message on the peer but return a generic error message to
// avoid malicious users scanning for channels
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(shdr.Creator, sId)
if err != nil {
// log the error here as well but still only return the generic error
err = errors.Wrap(err, "could not deserialize a SerializedIdentity")
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
}
return nil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid)
}
}

验证Transaction签名

Commit阶段会对交易进行验证,会调用此函数,该函数完成了对Transaction的验证,包含发送方数字签名的验证。

交易是包含背书结果和背书签名的,背书相关的验证并不包含在此,而是专门的背书验证,具体请看Fabric 1.4源码解读 1:背书策略是怎么使用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// ValidateTransaction checks that the transaction envelope is properly formed
func ValidateTransaction(e *common.Envelope, c channelconfig.ApplicationCapabilities) (*common.Payload, pb.TxValidationCode) {
putilsLogger.Debugf("ValidateTransactionEnvelope starts for envelope %p", e)

...

// validate the header
chdr, shdr, err := validateCommonHeader(payload.Header)
if err != nil {
putilsLogger.Errorf("validateCommonHeader returns err %s", err)
return nil, pb.TxValidationCode_BAD_COMMON_HEADER
}

// validate the signature in the envelope
err = checkSignatureFromCreator(shdr.Creator, e.Signature, e.Payload, chdr.ChannelId)
if err != nil {
putilsLogger.Errorf("checkSignatureFromCreator returns err %s", err)
return nil, pb.TxValidationCode_BAD_CREATOR_SIGNATURE
}

// continue the validation in a way that depends on the type specified in the header
switch common.HeaderType(chdr.Type) {
case common.HeaderType_ENDORSER_TRANSACTION:
// Verify that the transaction ID has been computed properly.
// This check is needed to ensure that the lookup into the ledger
// for the same TxID catches duplicates.
err = utils.CheckTxID(
chdr.TxId,
shdr.Nonce,
shdr.Creator)

if err != nil {
putilsLogger.Errorf("CheckTxID returns err %s", err)
return nil, pb.TxValidationCode_BAD_PROPOSAL_TXID
}

// 如果是背书交易,背书的签名不在此验证,由背书策略模块进行验证
err = validateEndorserTransaction(payload.Data, payload.Header)
putilsLogger.Debugf("ValidateTransactionEnvelope returns err %s", err)

验证签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// given a creator, a message and a signature,
// this function returns nil if the creator
// is a valid cert and the signature is valid
func checkSignatureFromCreator(creatorBytes []byte, sig []byte, msg []byte, ChainID string) error {
putilsLogger.Debugf("begin")

// check for nil argument
if creatorBytes == nil || sig == nil || msg == nil {
return errors.New("nil arguments")
}

// 每个链有各自的msp
mspObj := mspmgmt.GetIdentityDeserializer(ChainID)
if mspObj == nil {
return errors.Errorf("could not get msp for channel [%s]", ChainID)
}

// 获取proposal创建者/发送方的Identity
// creatorBytes 中是序列化后的mspid、证书、公钥等信息
creator, err := mspObj.DeserializeIdentity(creatorBytes)
if err != nil {
return errors.WithMessage(err, "MSP error")
}

putilsLogger.Debugf("creator is %s", creator.GetIdentifier())

// 验证证书是否有效
// ensure that creator is a valid certificate
err = creator.Validate()
if err != nil {
return errors.WithMessage(err, "creator certificate is not valid")
}

putilsLogger.Debugf("creator is valid")

// validate the signature
// 验证签名
err = creator.Verify(msg, sig)
if err != nil {
return errors.WithMessage(err, "creator's signature over the proposal is not valid")
}

putilsLogger.Debugf("exits successfully")

return nil
}

获取Identity

获取当前通道的MSP manager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// GetIdentityDeserializer returns the IdentityDeserializer for the given chain
func GetIdentityDeserializer(chainID string) msp.IdentityDeserializer {
if chainID == "" {
return GetLocalMSP()
}

return GetManagerForChain(chainID)
}

// GetManagerForChain returns the msp manager for the supplied
// chain; if no such manager exists, one is created
func GetManagerForChain(chainID string) msp.MSPManager {
m.Lock()
defer m.Unlock()

// 先从缓存查找
mspMgr, ok := mspMap[chainID]
if !ok {
// 找不到则新建立当前通道Msp manager
mspLogger.Debugf("Created new msp manager for channel `%s`", chainID)
mspMgmtMgr := &mspMgmtMgr{msp.NewMSPManager(), false}
mspMap[chainID] = mspMgmtMgr
mspMgr = mspMgmtMgr
} else {
// check for internal mspManagerImpl and mspMgmtMgr types. if a different
// type is found, it's because a developer has added a new type that
// implements the MSPManager interface and should add a case to the logic
// above to handle it.
if !(reflect.TypeOf(mspMgr).Elem().Name() == "mspManagerImpl" || reflect.TypeOf(mspMgr).Elem().Name() == "mspMgmtMgr") {
panic("Found unexpected MSPManager type.")
}
mspLogger.Debugf("Returning existing manager for channel '%s'", chainID)
}
return mspMgr
}

// MSPManager has been setup for a channel, which indicates whether the channel
// exists or not
type mspMgmtMgr struct {
msp.MSPManager
// track whether this MSPManager has been setup successfully
up bool
}

msp.MSPManager是一个接口,从上面代码可以得知,它是利用NewMSPManager创建的:

1
2
3
4
// 创建等待Setup的MSPManager
func NewMSPManager() MSPManager {
return &mspManagerImpl{}
}

疑问是,啥时候Setup的,当前调用路径上没发现这个路径,可能从系统整体流程上,已经保证了,当前调用时,已经创建好了。

获取Identity,是一个剥洋葱的过程:

1
2
3
4
5
6
func (mgr *mspMgmtMgr) DeserializeIdentity(serializedIdentity []byte) (msp.Identity, error) {
if !mgr.up {
return nil, errors.New("channel doesn't exist")
}
return mgr.MSPManager.DeserializeIdentity(serializedIdentity)
}

实际调用mspManagerImplDeserializeIdentity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// DeserializeIdentity returns an identity given its serialized version supplied as argument
func (mgr *mspManagerImpl) DeserializeIdentity(serializedID []byte) (Identity, error) {
// We first deserialize to a SerializedIdentity to get the MSP ID
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(serializedID, sId)
if err != nil {
return nil, errors.Wrap(err, "could not deserialize a SerializedIdentity")
}

// 获取发送方的msp实例
// we can now attempt to obtain the MSP
msp := mgr.mspsMap[sId.Mspid]
if msp == nil {
return nil, errors.Errorf("MSP %s is unknown", sId.Mspid)
}

switch t := msp.(type) {
case *bccspmsp:
return t.deserializeIdentityInternal(sId.IdBytes)
case *idemixmsp:
return t.deserializeIdentityInternal(sId.IdBytes)
default:
return t.DeserializeIdentity(serializedID)
}
}

转到bccspmsp的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 反序列化二进制,得到证书,然后用证书获取公钥,使用证书、公钥和msp,创建Identity
// deserializeIdentityInternal returns an identity given its byte-level representation
func (msp *bccspmsp) deserializeIdentityInternal(serializedIdentity []byte) (Identity, error) {
// This MSP will always deserialize certs this way
bl, _ := pem.Decode(serializedIdentity)
if bl == nil {
return nil, errors.New("could not decode the PEM structure")
}
cert, err := x509.ParseCertificate(bl.Bytes)
if err != nil {
return nil, errors.Wrap(err, "parseCertificate failed")
}

// Now we have the certificate; make sure that its fields
// (e.g. the Issuer.OU or the Subject.OU) match with the
// MSP id that this MSP has; otherwise it might be an attack
// TODO!
// We can't do it yet because there is no standardized way
// (yet) to encode the MSP ID into the x.509 body of a cert

// 从证书中提取公钥,封装一下,满足bccsp.Key接口
pub, err := msp.bccsp.KeyImport(cert, &bccsp.X509PublicKeyImportOpts{Temporary: true})
if err != nil {
return nil, errors.WithMessage(err, "failed to import certificate's public key")
}

// 利用证书、公钥和msp建立角色身份
return newIdentity(cert, pub, msp)
}

Identity包含了Identity标示符,证书、公钥和所在的msp,创建Identity就是计算以上几项信息的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type identity struct {
// id contains the identifier (MSPID and identity identifier) for this instance
id *IdentityIdentifier

// cert contains the x.509 certificate that signs the public key of this instance
cert *x509.Certificate

// this is the public key of this instance
pk bccsp.Key

// reference to the MSP that "owns" this identity
msp *bccspmsp
}

func newIdentity(cert *x509.Certificate, pk bccsp.Key, msp *bccspmsp) (Identity, error) {
if mspIdentityLogger.IsEnabledFor(zapcore.DebugLevel) {
mspIdentityLogger.Debugf("Creating identity instance for cert %s", certToPEM(cert))
}

// 检查证书
// Sanitize first the certificate
cert, err := msp.sanitizeCert(cert)
if err != nil {
return nil, err
}

// Compute identity identifier

// Use the hash of the identity's certificate as id in the IdentityIdentifier
hashOpt, err := bccsp.GetHashOpt(msp.cryptoConfig.IdentityIdentifierHashFunction)
if err != nil {
return nil, errors.WithMessage(err, "failed getting hash function options")
}

digest, err := msp.bccsp.Hash(cert.Raw, hashOpt)
if err != nil {
return nil, errors.WithMessage(err, "failed hashing raw certificate to compute the id of the IdentityIdentifier")
}

id := &IdentityIdentifier{
Mspid: msp.name,
Id: hex.EncodeToString(digest)}

return &identity{id: id, cert: cert, pk: pk, msp: msp}, nil
}

验证数字签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Verify checks against a signature and a message
// to determine whether this identity produced the
// signature; it returns nil if so or an error otherwise
func (id *identity) Verify(msg []byte, sig []byte) error {
// mspIdentityLogger.Infof("Verifying signature")

// Compute Hash
hashOpt, err := id.getHashOpt(id.msp.cryptoConfig.SignatureHashFamily)
if err != nil {
return errors.WithMessage(err, "failed getting hash function options")
}

digest, err := id.msp.bccsp.Hash(msg, hashOpt)
if err != nil {
return errors.WithMessage(err, "failed computing digest")
}

if mspIdentityLogger.IsEnabledFor(zapcore.DebugLevel) {
mspIdentityLogger.Debugf("Verify: digest = %s", hex.Dump(digest))
mspIdentityLogger.Debugf("Verify: sig = %s", hex.Dump(sig))
}

// 最终会调用bccsp的接口验证签名,SW或者国密
valid, err := id.msp.bccsp.Verify(id.pk, sig, digest, nil)
if err != nil {
return errors.WithMessage(err, "could not determine the validity of the signature")
} else if !valid {
return errors.New("The signature is invalid")
}

return nil
}

解密SignatureHeader

Fabric 使用 SignatureHeader 保存发送方的身份信息,Creator即为序列化后的信息。

SignatureHeaderMaker 接口定义了创建一个 SignatureHeader 的方法,搜索起来实现该接口的结构体很多,本质上只有2个:mspSignerSignatureHeaderCreator

1
2
3
4
5
6
7
8
9
10
11
// SignatureHeaderMaker creates a new SignatureHeader
type SignatureHeaderMaker interface {
// NewSignatureHeader creates a SignatureHeader with the correct signing identity and a valid nonce
NewSignatureHeader() (*cb.SignatureHeader, error)
}

// localmsp
func (s *mspSigner) NewSignatureHeader() (*cb.SignatureHeader, error) {}

// crypto
func (bs *SignatureHeaderCreator) NewSignatureHeader() (*cb.SignatureHeader, error){}

两个实现本质上是一样的,以 mspSigner 为例进行介绍。首先获取实现SigningIdentity接口的实例,然后调用Serialize得到序列化后的身份信息,再随机生成一个Nonce,创建出SignatureHeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// NewSignatureHeader creates a SignatureHeader with the correct signing identity and a valid nonce
func (s *mspSigner) NewSignatureHeader() (*cb.SignatureHeader, error) {
// 获得SigningIdentity接口实例
signer, err := mspmgmt.GetLocalMSP().GetDefaultSigningIdentity()
if err != nil {
return nil, fmt.Errorf("Failed getting MSP-based signer [%s]", err)
}

// 序列化得到creator
creatorIdentityRaw, err := signer.Serialize()
if err != nil {
return nil, fmt.Errorf("Failed serializing creator public identity [%s]", err)
}

// 获取一个随机nonce
nonce, err := crypto.GetRandomNonce()
if err != nil {
return nil, fmt.Errorf("Failed creating nonce [%s]", err)
}

sh := &cb.SignatureHeader{}
sh.Creator = creatorIdentityRaw
sh.Nonce = nonce

return sh, nil
}

SigningIdentity接口包含了Identity接口,Identity声明了跟证书相关的方法,SigningIdentity则增加了对消息签名的函数Sign

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type SigningIdentity interface {

// Extends Identity
Identity

// Sign the message
Sign(msg []byte) ([]byte, error)

// GetPublicVersion returns the public parts of this identity
GetPublicVersion() Identity
}

type Identity interface {
...
// Serialize converts an identity to bytes
Serialize() ([]byte, error)
...
}

Serialize的实现,实际只包含了证书和MSPID,说明了消息中携带的只包含MSPID和证书作为身份信息,而不是signingidentity的所有字段(signingidentity实现了SigningIdentity接口)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Serialize returns a byte array representation of this identity
func (id *identity) Serialize() ([]byte, error) {
// mspIdentityLogger.Infof("Serializing identity %s", id.id)

// Raw格式证书
pb := &pem.Block{Bytes: id.cert.Raw, Type: "CERTIFICATE"}
pemBytes := pem.EncodeToMemory(pb)
if pemBytes == nil {
return nil, errors.New("encoding of identity failed")
}

// 使用MSPID和序列化后的证书,再次序列化得到身份信息
sId := &msp.SerializedIdentity{Mspid: id.id.Mspid, IdBytes: pemBytes}
idBytes, err := proto.Marshal(sId)
if err != nil {
return nil, errors.Wrapf(err, "could not marshal a SerializedIdentity structure for identity %s", id.id)
}

return idBytes, nil
}

参考资料

  1. https://github.com/yeasy/hyperledger_code_fabric
  2. 《区块链原理、设计与应用》第9章、第10章

教程资料

问题汇总

replace 使用http或https

在使用go replace时,有2点注意:

  • 目标仓库不能带协议头,比如http、https,要从域名或者IP开始
  • 版本号格式要符合语义格式化,测试版本是否符合规则:Go playground 样例代码

直接修改 go.mod 文件格式:

1
replace github.com/hyperledger/fabric v1.4.1 => 192.168.9.251/hyperledger/fabric v1.4.1-alpha.11-yx

或使用命令:

1
go mod edit -replace=github.com/hyperledger/fabric@v1.4.1=192.168.9.251/hyperledger/fabric@v1.4.1

Gitlab 仓库没开启https

go mod 默认使用 go get 下载依赖,而 go get 默认使用 https,如果 Gitlab 仓库没有启用 https,需要使用 -insecure 让go get走http。

问题:

1
2
3
GOPROXY="" go get github.com/hyperledger/fabric@v1.4.1
go: 192.168.9.251/hyperledger/fabric@v1.4.1-alpha.11-yx: unrecognized import path "192.168.9.251/hyperledger/fabric" (https fetch: Get https://192.168.9.251/hyperledger/fabric?go-get=1: dial tcp 192.168.9.251:443: connect: connection refused)
go: error loading module requirements

方案:

1
GOPROXY="" go get -insecure github.com/hyperledger/fabric@v1.4.1

注解:遇到问题时,使用 go get -v 可以看到更多信息,有助分析问题。

Go Modules 代理

由于某些网络原因,国内下载 Github 等处的依赖,不够流程,需要设置代理,不同版本的设置如下:

  • go1.12

    1
    $ export GOPROXY=https://goproxy.cn
  • go1.13

    1
    2
    $ export GOPRIVATE=192.168.9.251
    $ export GOPROXY=https://goproxy.cn,direct

私有仓库

如果仓库设置为私有,这要求用户必须登录才能访问仓库。

Go Modules 默认使用 go get 下载依赖,go get 利用 https 或者 http, 但下载过程没有设置用户名和密码的地方,下载依赖时,可能遇到一下错误:

  • connection refused
  • unkown revision

可以通过设置Github/Gitlab Access Token结果,通过token的方式,访问仓库,token的获取方式为,登录Gitlab仓库,进入以下页面:

Gitlab User Setting -> Access Tokens

在此页面复制下顶端的 Your New Personal Access Token, 然后填写token名字和勾选下方的权限进行创建 Token。

然后执行以下命令:

1
2
3
git config --global \
url."http://oauth2:${your_access_token}@ip_address_or_domain".insteadOf \
"http://ip_address_or_domain"

后面再去 go get 的时候,就可顺利下载依赖。

流程

快速入门Fabric核心概念和框架 这篇文章中,介绍了 Fabric 的很多概念,其中也包含了交易、提案(Proposal)和链码。同时也介绍了,交易的执行流程,链码的调用流程等。

本文聚焦介绍交易流程的一个环节:交易背书,以下的3幅图,在快速入门Fabric核心概念和框架 中都有介绍,有必要的话,去读一下上下文信息。

交易宏观流程

交易的详细流程请阅读 交易流程,了解交易流程的几大环节。

链码调用流程

上图,展示了客户端、Peer,以及链码容器 3大主体在交易流程中的背书过程,请关注一下Peer中的 Handler,它负责和链码容器交互。

提案背书流程

上图,从接近源码的层面,展示了交易背书过程。其中Fabric、Shim 是 Peer 中的模块,ChainCode 代表链码容器,Endorser Chaincode 代表 Peer 对交易提案和模拟执行结果进行背书。

如果了解过Chaincode,你会知道 Shim 是链码容器和 Peer 交互所依赖的模块。

最后推荐一份保华大佬整理的 Peer 提案背书过程,是读源码前,必读的资料。虽然精简,但把重要的核心流程都串联起来了。

源码分析

Proposal定义

客户端发送被背书节点的是 SignedProposal ,它包含了签名和Proposal,这是它在proposal.proto中的组成简介,:

1
2
3
4
5
SignedProposal
|\_ Signature (signature on the Proposal message by the creator specified in the header)
\_ Proposal
|\_ Header (the header for this proposal)
\_ Payload (the payload for this proposal)

proposal.proto这个文件还简要介绍了Client和背书节点之间通信的消息类型和过程。

Proposal

1
2
3
4
5
6
7
8
type SignedProposal struct {
// The bytes of Proposal
ProposalBytes []byte `protobuf:"bytes,1,opt,name=proposal_bytes,json=proposalBytes,proto3" json:"proposal_bytes,omitempty"`
// Signaure over proposalBytes; this signature is to be verified against
// the creator identity contained in the header of the Proposal message
// marshaled as proposalBytes
Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Proposal struct {
// The header of the proposal. It is the bytes of the Header
Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
// The payload of the proposal as defined by the type in the proposal
// header.
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
// Optional extensions to the proposal. Its content depends on the Header's
// type field. For the type CHAINCODE, it might be the bytes of a
// ChaincodeAction message.
Extension []byte `protobuf:"bytes,3,opt,name=extension,proto3" json:"extension,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
type Header struct {
ChannelHeader []byte `protobuf:"bytes,1,opt,name=channel_header,json=channelHeader,proto3" json:"channel_header,omitempty"`
SignatureHeader []byte `protobuf:"bytes,2,opt,name=signature_header,json=signatureHeader,proto3" json:"signature_header,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}


// Header is a generic replay prevention and identity message to include in a signed payload
type ChannelHeader struct {
Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
// Version indicates message protocol version
Version int32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
// Timestamp is the local time when the message was created
// by the sender
Timestamp *timestamp.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// Identifier of the channel this message is bound for
ChannelId string `protobuf:"bytes,4,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"`
// An unique identifier that is used end-to-end.
// - set by higher layers such as end user or SDK
// - passed to the endorser (which will check for uniqueness)
// - as the header is passed along unchanged, it will be
// be retrieved by the committer (uniqueness check here as well)
// - to be stored in the ledger
TxId string `protobuf:"bytes,5,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"`
// The epoch in which this header was generated, where epoch is defined based on block height
// Epoch in which the response has been generated. This field identifies a
// logical window of time. A proposal response is accepted by a peer only if
// two conditions hold:
// 1. the epoch specified in the message is the current epoch
// 2. this message has been only seen once during this epoch (i.e. it hasn't
// been replayed)
Epoch uint64 `protobuf:"varint,6,opt,name=epoch,proto3" json:"epoch,omitempty"`
// Extension that may be attached based on the header type
Extension []byte `protobuf:"bytes,7,opt,name=extension,proto3" json:"extension,omitempty"`
// If mutual TLS is employed, this represents
// the hash of the client's TLS certificate
TlsCertHash []byte `protobuf:"bytes,8,opt,name=tls_cert_hash,json=tlsCertHash,proto3" json:"tls_cert_hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

type SignatureHeader struct {
// Creator of the message, a marshaled msp.SerializedIdentity
Creator []byte `protobuf:"bytes,1,opt,name=creator,proto3" json:"creator,omitempty"`
// Arbitrary number that may only be used once. Can be used to detect replay attacks.
Nonce []byte `protobuf:"bytes,2,opt,name=nonce,proto3" json:"nonce,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

gRPC定义

1
2
3
4
5
6
7
8
9
10
11
// EndorserClient is the client API for Endorser service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type EndorserClient interface {
ProcessProposal(ctx context.Context, in *SignedProposal, opts ...grpc.CallOption) (*ProposalResponse, error)
}

// EndorserServer is the server API for Endorser service.
type EndorserServer interface {
ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error)
}

SDK发送Proposal

Peer接收Proposal

Peer处理Proposal主流程

主要是把背书节点的背书工作聚合一下:

  1. Proposal预处理
  2. 获取交易执行模拟器,模拟执行Proposal
  3. 如果模拟执行成功,调用ESCC对Proposal和结果进行背书,如果模拟执行失败直接返回背书失败的响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
...
// 0 -- check and validate
// 这里有相当多的工作量
vr, err := e.preProcess(signedProp)
if err != nil {
resp := vr.resp
return resp, err
}

prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid

// 获取指定账本模拟器
// obtaining once the tx simulator for this proposal. This will be nil
// for chainless proposals
// Also obtain a history query executor for history queries, since tx simulator does not cover history
var txsim ledger.TxSimulator
var historyQueryExecutor ledger.HistoryQueryExecutor
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
...
defer txsim.Done()

if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
}

txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim,
HistoryQueryExecutor: historyQueryExecutor,
}
// this could be a request to a chainless SysCC

// 模拟执行交易,失败则返回背书失败的响应
// 1 -- simulate
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
if res != nil {
if res.Status >= shim.ERROR {
endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid)
var cceventBytes []byte
if ccevent != nil {
cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event bytes")
}
}
pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}

return pResp, nil
}
}

// 对模拟执行的结果进行签名背书
// 2 -- endorse and get a marshalled ProposalResponse message
var pResp *pb.ProposalResponse

// TODO till we implement global ESCC, CSCC for system chaincodes
// chainless proposals (such as CSCC) don't have to be endorsed
if chainID == "" {
pResp = &pb.ProposalResponse{Response: res}
} else {
// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim
pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)

...
}

// Set the proposal response payload - it
// contains the "return value" from the
// chaincode invocation
pResp.Response = res

// total failed proposals = ProposalsReceived-SuccessfulProposals
e.Metrics.SuccessfulProposals.Add(1)
success = true

return pResp, nil
}

preProcess 检查和获取信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// preProcess checks the tx proposal headers, uniqueness and ACL
// 检查proposal、ACL
func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, error) {
// 包含proposal、header、chainID、txid等信息
vr := &validateResult{}
// at first, we check whether the message is valid
// 检查proposal,并获取各种需要的信息
prop, hdr, hdrExt, err := validation.ValidateProposalMessage(signedProp)

if err != nil {
e.Metrics.ProposalValidationFailed.Add(1)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

// 获取Header中的2个Header
chdr, err := putils.UnmarshalChannelHeader(hdr.ChannelHeader)
if err != nil {
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

shdr, err := putils.GetSignatureHeader(hdr.SignatureHeader)
if err != nil {
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

// 检查是否调用了不可外部(用户)的系统链码
// 先找到链码实例,然后调用链码的方法判断本身是否可调用
// block invocations to security-sensitive system chaincodes
if e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) {
endorserLogger.Errorf("Error: an attempt was made by %#v to invoke system chaincode %s", shdr.Creator, hdrExt.ChaincodeId.Name)
err = errors.Errorf("chaincode %s cannot be invoked through a proposal", hdrExt.ChaincodeId.Name)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

chainID := chdr.ChannelId
txid := chdr.TxId
endorserLogger.Debugf("[%s][%s] processing txid: %s", chainID, shorttxid(txid), txid)

if chainID != "" {
// labels that provide context for failure metrics
meterLabels := []string{
"channel", chainID,
"chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version,
}

// 检查交易是否已上链
// Here we handle uniqueness check and ACLs for proposals targeting a chain
// Notice that ValidateProposalMessage has already verified that TxID is computed properly
if _, err = e.s.GetTransactionByID(chainID, txid); err == nil {
// increment failure due to duplicate transactions. Useful for catching replay attacks in
// addition to benign retries
e.Metrics.DuplicateTxsFailure.With(meterLabels...).Add(1)
err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}

// 用户链码检查ACL
// check ACL only for application chaincodes; ACLs
// for system chaincodes are checked elsewhere
if !e.s.IsSysCC(hdrExt.ChaincodeId.Name) {
// check that the proposal complies with the Channel's writers
if err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil {
e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
}
} else {
// chainless proposals do not/cannot affect ledger and cannot be submitted as transactions
// ignore uniqueness checks; also, chainless proposals are not validated using the policies
// of the chain since by definition there is no chain; they are validated against the local
// MSP of the peer instead by the call to ValidateProposalMessage above
}

// 保存提取的各信息
vr.prop, vr.hdrExt, vr.chainID, vr.txid = prop, hdrExt, chainID, txid
return vr, nil
}

// ValidateProposalMessage checks the validity of a SignedProposal message
// this function returns Header and ChaincodeHeaderExtension messages since they
// have been unmarshalled and validated
func ValidateProposalMessage(signedProp *pb.SignedProposal) (*pb.Proposal, *common.Header, *pb.ChaincodeHeaderExtension, error) {
if signedProp == nil {
return nil, nil, nil, errors.New("nil arguments")
}

putilsLogger.Debugf("ValidateProposalMessage starts for signed proposal %p", signedProp)

// extract the Proposal message from signedProp
prop, err := utils.GetProposal(signedProp.ProposalBytes)
if err != nil {
return nil, nil, nil, err
}

// 1) look at the ProposalHeader
hdr, err := utils.GetHeader(prop.Header)
if err != nil {
return nil, nil, nil, err
}

// validate the header
chdr, shdr, err := validateCommonHeader(hdr)
if err != nil {
return nil, nil, nil, err
}

// 从SignatureHeader交易客户端的签名
// validate the signature
err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
if err != nil {
// log the exact message on the peer but return a generic error message to
// avoid malicious users scanning for channels
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(shdr.Creator, sId)
if err != nil {
// log the error here as well but still only return the generic error
err = errors.Wrap(err, "could not deserialize a SerializedIdentity")
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
}
return nil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid)
}

// 检查txid的计算是否符合规则
// Verify that the transaction ID has been computed properly.
// This check is needed to ensure that the lookup into the ledger
// for the same TxID catches duplicates.
err = utils.CheckTxID(
chdr.TxId,
shdr.Nonce,
shdr.Creator)
if err != nil {
return nil, nil, nil, err
}

// 依据不同的proposal类型对proposal分别进行检查
// continue the validation in a way that depends on the type specified in the header
switch common.HeaderType(chdr.Type) {
case common.HeaderType_CONFIG:
//which the types are different the validation is the same
//viz, validate a proposal to a chaincode. If we need other
//special validation for confguration, we would have to implement
//special validation
fallthrough
case common.HeaderType_ENDORSER_TRANSACTION:
// 主要是提取ChaincodeHeaderExtension
// validation of the proposal message knowing it's of type CHAINCODE
chaincodeHdrExt, err := validateChaincodeProposalMessage(prop, hdr)
if err != nil {
return nil, nil, nil, err
}

return prop, hdr, chaincodeHdrExt, err
default:
//NOTE : we proably need a case
return nil, nil, nil, errors.Errorf("unsupported proposal type %d", common.HeaderType(chdr.Type))
}
}

背书节点模拟执行交易

获取模拟器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// Support contains functions that the endorser requires to execute its tasks
type Support interface {
crypto.SignerSupport
// IsSysCCAndNotInvokableExternal returns true if the supplied chaincode is
// ia system chaincode and it NOT invokable
IsSysCCAndNotInvokableExternal(name string) bool

// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error)

}

// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
func (s *SupportImpl) GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error) {
// 使用账本和txid创建模拟器,每个交易有单独的模拟器
lgr := s.Peer.GetLedger(ledgername)
if lgr == nil {
return nil, errors.Errorf("Channel does not exist: %s", ledgername)
}
return lgr.NewTxSimulator(txid)
}

// NewTxSimulator returns new `ledger.TxSimulator`
func (l *kvLedger) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
return l.txtmgmt.NewTxSimulator(txid)
}

// NewTxSimulator implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
logger.Debugf("constructing new tx simulator")
s, err := newLockBasedTxSimulator(txmgr, txid)
if err != nil {
return nil, err
}
txmgr.commitRWLock.RLock()
return s, nil
}

// 就2项重要的:查询执行器、读写集构建器
// LockBasedTxSimulator is a transaction simulator used in `LockBasedTxMgr`
type lockBasedTxSimulator struct {
lockBasedQueryExecutor
rwsetBuilder *rwsetutil.RWSetBuilder
writePerformed bool
pvtdataQueriesPerformed bool
simulationResultsComputed bool
paginatedQueriesPerformed bool
}

func newLockBasedTxSimulator(txmgr *LockBasedTxMgr, txid string) (*lockBasedTxSimulator, error) {
// 创建读写集构建器,能帮助构建读写集
rwsetBuilder := rwsetutil.NewRWSetBuilder()
helper := newQueryHelper(txmgr, rwsetBuilder)
logger.Debugf("constructing new tx simulator txid = [%s]", txid)
return &lockBasedTxSimulator{lockBasedQueryExecutor{helper, txid}, rwsetBuilder, false, false, false, false}, nil
}

// LockBasedQueryExecutor is a query executor used in `LockBasedTxMgr`
// "只读",不包含写相关的操作
type lockBasedQueryExecutor struct {
helper *queryHelper
txid string
}

模拟执行

endorser部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
defer txsim.Done()

// 历史查询器
if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
}
}

txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim, // 模拟器在此
HistoryQueryExecutor: historyQueryExecutor,
}
// this could be a request to a chainless SysCC

// TODO: if the proposal has an extension, it will be of type ChaincodeAction;
// if it's present it means that no simulation is to be performed because
// we're trying to emulate a submitting peer. On the other hand, we need
// to validate the supplied action before endorsing it

// 模拟执行交易,失败则返回背书失败的响应
// 1 -- simulate
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)

调用chaincode模块模拟执行交易,获取交易执行的公开和私密数据读写集,以及交易执行产生的事件,并把结果返回给上层进行背书。

其中还包含了私密数据的处理,会把它取出来,然后通过Gossip传播给在私密数据中的Peer节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// SimulateProposal simulates the proposal by calling the chaincode
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) {
endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", txParams.ChannelID, shorttxid(txParams.TxID), cid)
defer endorserLogger.Debugf("[%s][%s] Exit", txParams.ChannelID, shorttxid(txParams.TxID))
// we do expect the payload to be a ChaincodeInvocationSpec
// if we are supporting other payloads in future, this be glaringly point
// as something that should change
// 根据Proposal生成Invoke需要的信息
cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal)
if err != nil {
return nil, nil, nil, nil, err
}

// 链码的元数据
var cdLedger ccprovider.ChaincodeDefinition
var version string

// 设置version
if !e.s.IsSysCC(cid.Name) {
// 根据要调用的链码名称,从lscc获取链码的元数据
cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name))
}
version = cdLedger.CCVersion()

// 实际被打桩了,无实现
err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger)
if err != nil {
return nil, nil, nil, nil, err
}
} else {
// scc版本是固定的"latest"
version = util.GetSysCCVersion()
}

// ---3. execute the proposal and get simulation results
var simResult *ledger.TxSimulationResults
var pubSimResBytes []byte
var res *pb.Response
var ccevent *pb.ChaincodeEvent
// 模拟执行,执行结果保存在模拟器
res, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid)
if err != nil {
endorserLogger.Errorf("[%s][%s] failed to invoke chaincode %s, error: %+v", txParams.ChannelID, shorttxid(txParams.TxID), cid, err)
return nil, nil, nil, nil, err
}

if txParams.TXSimulator != nil {
// 通过模拟器获取模拟执行结果,包含公开和私密数据2份读写集
if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil {
txParams.TXSimulator.Done()
return nil, nil, nil, nil, err
}

// 存在私密数据
if simResult.PvtSimulationResults != nil {
if cid.Name == "lscc" {
// TODO: remove once we can store collection configuration outside of LSCC
txParams.TXSimulator.Done()
return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")
}
// 获取要通过Gossip传播的私密数据
pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txParams.TXSimulator)
// To read collection config need to read collection updates before
// releasing the lock, hence txParams.TXSimulator.Done() moved down here
txParams.TXSimulator.Done()

if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config")
}
endorsedAt, err := e.s.GetLedgerHeight(txParams.ChannelID)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprint("failed to obtain ledger height for channel", txParams.ChannelID))
}
// Add ledger height at which transaction was endorsed,
// `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'.
// However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and
// manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now.
// Ideally, ledger should add support in the simulator as a first class function `GetHeight()`.
pvtDataWithConfig.EndorsedAt = endorsedAt
// 把私密数据同通道id、交易id和区块高度发出去,代表私密数据所属的区块和交易
if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {
return nil, nil, nil, nil, err
}
}

// 交易模拟完成,释放模拟器占用的资源
txParams.TXSimulator.Done()
// 获取模拟执行的公开结果
if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
return nil, nil, nil, nil, err
}
}

// 返回链码元数据、模拟执行结果、交易执行产生的事件
return cdLedger, res, pubSimResBytes, ccevent, nil
}

callChaincode 调用chaincode模块执行链码。在前面的流程中,还没有区分系统链码SCC和用户链码UCC,SCC和UCC都会通过Execute函数被传递给chaincode模块而执行。

如果是调用lscc部署或升级UCC,会调用ExecuteLegacyInit执行链码容器的初始化。

最后返回链码模拟执行结果和事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// call specified chaincode (system or user)
func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, version string, input *pb.ChaincodeInput, cid *pb.ChaincodeID) (*pb.Response, *pb.ChaincodeEvent, error) {
...
// scc也在这执行
// is this a system chaincode
res, ccevent, err = e.s.Execute(txParams, txParams.ChannelID, cid.Name, version, txParams.TxID, txParams.SignedProp, txParams.Proposal, input)
if err != nil {
return nil, nil, err
}
...

// 如果是调用lscc部署或升级链码,会走这段流程
if cid.Name == "lscc" && len(input.Args) >= 3 && (string(input.Args[0]) == "deploy" || string(input.Args[0]) == "upgrade") {
userCDS, err := putils.GetChaincodeDeploymentSpec(input.Args[2], e.PlatformRegistry)
...
// 进行链码容器初始化,最后会调用链码的Init的函数
_, _, err = e.s.ExecuteLegacyInit(txParams, txParams.ChannelID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txParams.TxID, txParams.SignedProp, txParams.Proposal, cds)
...
}
// ----- END -------

return res, ccevent, err
}

Support 接口实际集合了众多背书节点需要的外部模块功能,比如链码、系统链码、ACL等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Support contains functions that the endorser requires to execute its tasks
type Support interface

// SupportImpl provides an implementation of the endorser.Support interface
// issuing calls to various static methods of the peer
type SupportImpl struct {
*PluginEndorser
crypto.SignerSupport
Peer peer.Operations
PeerSupport peer.Support
ChaincodeSupport *chaincode.ChaincodeSupport
SysCCProvider *scc.Provider
ACLProvider aclmgmt.ACLProvider
}

Execute就是调用ChaincodeSupport.Execute实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Execute a proposal and return the chaincode response
func (s *SupportImpl) Execute(txParams *ccprovider.TransactionParams, cid, name, version, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
cccid := &ccprovider.CCContext{
Name: name,
Version: version,
}

// decorate the chaincode input
decorators := library.InitRegistry(library.Config{}).Lookup(library.Decoration).([]decoration.Decorator)
input.Decorations = make(map[string][]byte)
input = decoration.Apply(prop, input, decorators...)
txParams.ProposalDecorations = input.Decorations

return s.ChaincodeSupport.Execute(txParams, cccid, input)
}
chaincode部分

通过上面的接口,跨入chaincode模块的大门。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (cs *ChaincodeSupport) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
// Invoke得到ChaincodeMessage
resp, err := cs.Invoke(txParams, cccid, input)
// 根据ChaincodeMessage得到Response和事件
return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}

func (cs *ChaincodeSupport) Invoke(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
h, err := cs.Launch(txParams.ChannelID, cccid.Name, cccid.Version, txParams.TXSimulator)
if err != nil {
return nil, err
}

// 执行调用链码的交易(和链码之间的消息为ChaincodeMessage_TRANSACTION)
cctype := pb.ChaincodeMessage_TRANSACTION
return cs.execute(cctype, txParams, cccid, input, h)
}
获取链码执行环境

Launch 可以获取链码执行环境,即用户链码容器,如果已实例化的链码,在当前背书节点上,链码容器未启动,则启动链码容器,Launch会返回一个跟链码容器交互Handler。

某个 Peer 上可以部署多个链码容器,Peer 为了和这些链码容器交互/通信,给每个链码容器都创建了一个 Handler,Handler 携带了 Peer 和链码容器交互的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Launch starts executing chaincode if it is not already running. This method
// blocks until the peer side handler gets into ready state or encounters a fatal
// error. If the chaincode is already running, it simply returns.
func (cs *ChaincodeSupport) Launch(chainID, chaincodeName, chaincodeVersion string, qe ledger.QueryExecutor) (*Handler, error) {
cname := chaincodeName + ":" + chaincodeVersion
if h := cs.HandlerRegistry.Handler(cname); h != nil {
return h, nil
}

// 启动链码容器 ...

h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", chainID, cname)
}

return h, nil
}

链码容器的启动过程,不是本文的重点,所以不继续深入Launch的调用细节。

模拟执行交易

execute封装出执行交易的消息,然后使用 Handler 执行交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// execute executes a transaction and waits for it to complete until a timeout value.
func (cs *ChaincodeSupport) execute(cctyp pb.ChaincodeMessage_Type, txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput, h *Handler) (*pb.ChaincodeMessage, error) {
input.Decorations = txParams.ProposalDecorations
// 创建消息
ccMsg, err := createCCMessage(cctyp, txParams.ChannelID, txParams.TxID, input)
if err != nil {
return nil, errors.WithMessage(err, "failed to create chaincode message")
}

// 执行交易
ccresp, err := h.Execute(txParams, cccid, ccMsg, cs.ExecuteTimeout)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error sending"))
}

return ccresp, nil
}

注意以下这个参数 txParams *ccprovider.TransactionParams 其类型定义如下,它包含了一条交易执行过程中的信息和资源,所以交易传递的过程中,一直有这个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// TransactionParams are parameters which are tied to a particular transaction
// and which are required for invoking chaincode.
type TransactionParams struct {
TxID string
ChannelID string
SignedProp *pb.SignedProposal
Proposal *pb.Proposal
TXSimulator ledger.TxSimulator
HistoryQueryExecutor ledger.HistoryQueryExecutor
CollectionStore privdata.CollectionStore
IsInitTransaction bool

// this is additional data passed to the chaincode
ProposalDecorations map[string][]byte
}

Handler 执行交易的过程如下,创建交易执行的上下文 Context,因为链码容器在执行交易的时候,会和 Peer 之间进行多次通信,进行数据的读写,上下文可以让数据读写获取到正确的信息。

之后 Handler 把消息发送给链码容器,并等待链码容器发来包含执行结果的消息,或者执行超时,默认执行时间是 30s。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (h *Handler) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
chaincodeLogger.Debugf("Entry")
defer chaincodeLogger.Debugf("Exit")

// 私密数据
txParams.CollectionStore = h.getCollectionStore(msg.ChannelId)
// 是否是执行链码初始化
txParams.IsInitTransaction = (msg.Type == pb.ChaincodeMessage_INIT)

// 创建交易context
txctx, err := h.TXContexts.Create(txParams)
if err != nil {
return nil, err
}
// 退出时(执行交易完毕),释放交易上下文资源
defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)

// proposal保存到msg
if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
return nil, err
}

// 向链码容器发送msg
h.serialSendAsync(msg)

// 等待链码容器响应,或者超时
var ccresp *pb.ChaincodeMessage
select {
case ccresp = <-txctx.ResponseNotifier:
// response is sent to user or calling chaincode. ChaincodeMessage_ERROR
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
ccName := cccid.Name + ":" + cccid.Version
h.Metrics.ExecuteTimeouts.With(
"chaincode", ccName,
).Add(1)
}

return ccresp, err
}
处理链码容器模拟响应

链码容器执行的响应会向上传递,直到 ChaincodeSupport.Execute,它调用 processChaincodeExecutionResult 把链码容器返回的响应,转化为交易模拟执行的 Response,而 Response 最终会返回给Endorser,大家可去调用流程上翻。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func processChaincodeExecutionResult(txid, ccName string, resp *pb.ChaincodeMessage, err error) (*pb.Response, *pb.ChaincodeEvent, error) {
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to execute transaction %s", txid)
}
if resp == nil {
return nil, nil, errors.Errorf("nil response from transaction %s", txid)
}

if resp.ChaincodeEvent != nil {
resp.ChaincodeEvent.ChaincodeId = ccName
resp.ChaincodeEvent.TxId = txid
}

switch resp.Type {
// 交易执行成功则提取Payload中保存的Response
case pb.ChaincodeMessage_COMPLETED:
res := &pb.Response{}
err := proto.Unmarshal(resp.Payload, res)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to unmarshal response for transaction %s", txid)
}
return res, resp.ChaincodeEvent, nil

// 失败,则提取Payload中保存的错误信息
case pb.ChaincodeMessage_ERROR:
return nil, resp.ChaincodeEvent, errors.Errorf("transaction returned with failure: %s", resp.Payload)

default:
return nil, nil, errors.Errorf("unexpected response type %d for transaction %s", resp.Type, txid)
}
}

释放模拟器资源

回想一下,在 Endorser.SimulateProposal 中,它获取了 交易模拟执行器 TXSimulator,这里面可是有很多资源的,如果不及时释放,在高 TPS 下,Peer压力上大,资源泄露,性能低下等问题会爆发出来。

txParams.TXSimulator.Done() 用来释放资源,上文提到 TxSimulator 包含了 QueryExecutor, lockBasedQueryExecutor 实现了 QueryExecutor,也就是说,主要是释放查询操作相关的资源。

从源码可以看到会释放读写锁以及迭代器资源,如果不及时释放,后果果然不堪。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Done implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) Done() {
logger.Debugf("Done with transaction simulation / query execution [%s]", q.txid)
q.helper.done()
}

func (h *queryHelper) done() {
if h.doneInvoked {
return
}

defer func() {
// 释放锁
h.txmgr.commitRWLock.RUnlock()
h.doneInvoked = true
// 释放迭代器
for _, itr := range h.itrs {
itr.Close()
}
}()
}

ESCC处理模拟执行结果

上文提到,模拟执行的 Response 会最终回到 Endorser,Endorser 会调用 ESCC 对结果进行背书,最终生成 ProposalResponse,我们看一下这个过程。

1
2
3
4
5
6
7
8
9
10
11
12
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
// Pre-process, simulate

if chainID == "" {
pResp = &pb.ProposalResponse{Response: res}
} else {
// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim
pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
// ...
}
// ...
}
Endorser.endorseProposal

背书链码实现了可插拔,可以使用不同的ESCC,系统链码和用户链码的背书过程是不同的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// endorse the proposal by calling the ESCC
func (e *Endorser) endorseProposal(_ context.Context, chainID string, txid string, signedProp *pb.SignedProposal, proposal *pb.Proposal, response *pb.Response, simRes []byte, event *pb.ChaincodeEvent, visibility []byte, ccid *pb.ChaincodeID, txsim ledger.TxSimulator, cd ccprovider.ChaincodeDefinition) (*pb.ProposalResponse, error) {
endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", chainID, shorttxid(txid), ccid)
defer endorserLogger.Debugf("[%s][%s] Exit", chainID, shorttxid(txid))

// 系统链码和用户链码使用不同的ESCC
isSysCC := cd == nil
// 1) extract the name of the escc that is requested to endorse this chaincode
var escc string
// ie, "lscc" or system chaincodes
if isSysCC {
escc = "escc"
} else {
escc = cd.Endorsement()
}

endorserLogger.Debugf("[%s][%s] escc for chaincode %s is %s", chainID, shorttxid(txid), ccid, escc)

// marshalling event bytes
var err error
var eventBytes []byte
if event != nil {
eventBytes, err = putils.GetBytesChaincodeEvent(event)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event bytes")
}
}

// set version of executing chaincode
if isSysCC {
// if we want to allow mixed fabric levels we should
// set syscc version to ""
ccid.Version = util.GetSysCCVersion()
} else {
ccid.Version = cd.CCVersion()
}

// 创建背书上下文信息
ctx := Context{
PluginName: escc, // 插件名称
Channel: chainID,
SignedProposal: signedProp,
ChaincodeID: ccid,
Event: eventBytes,
SimRes: simRes,
Response: response,
Visibility: visibility,
Proposal: proposal,
TxID: txid,
}

// 调用插件背书
return e.s.EndorseWithPlugin(ctx)
}

背书插件实现下面的接口即可。

1
2
3
4
5
6
7
8
9
10
11
12
// Plugin endorses a proposal response
type Plugin interface {
// Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it.
// Returns:
// The Endorsement: A signature over the payload, and an identity that is used to verify the signature
// The payload that was given as input (could be modified within this function)
// Or error on failure
Endorse(payload []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error)

// Init injects dependencies into the instance of the Plugin
Init(dependencies ...Dependency) error
}

使用插件背书,需获取插件实例,然后组装响应Payload,它包含了交易执行的多种结果,然后对Payload以及签名的Proposal背书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// EndorseWithPlugin endorses the response with a plugin
func (pe *PluginEndorser) EndorseWithPlugin(ctx Context) (*pb.ProposalResponse, error) {
endorserLogger.Debug("Entering endorsement for", ctx)

if ctx.Response == nil {
return nil, errors.New("response is nil")
}

if ctx.Response.Status >= shim.ERRORTHRESHOLD {
return &pb.ProposalResponse{Response: ctx.Response}, nil
}

// 获取插件
plugin, err := pe.getOrCreatePlugin(PluginName(ctx.PluginName), ctx.Channel)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.Errorf("plugin with name %s could not be used: %v", ctx.PluginName, err)
}

// 把模拟执行的信息组成生成背书响应Payload
prpBytes, err := proposalResponsePayloadFromContext(ctx)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.Wrap(err, "failed assembling proposal response payload")
}

// 对Payload和签名的Proposal进行背书
endorsement, prpBytes, err := plugin.Endorse(prpBytes, ctx.SignedProposal)
if err != nil {
endorserLogger.Warning("Endorsement with plugin for", ctx, " failed:", err)
return nil, errors.WithStack(err)
}

resp := &pb.ProposalResponse{
Version: 1,
Endorsement: endorsement,
Payload: prpBytes,
Response: ctx.Response,
}
endorserLogger.Debug("Exiting", ctx)
return resp, nil
}

系统提供的默认背书插件如下,本质是对交易执行结果和Proposal签名人信息进行签名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Endorse signs the given payload(ProposalResponsePayload bytes), and optionally mutates it.
// Returns:
// The Endorsement: A signature over the payload, and an identity that is used to verify the signature
// The payload that was given as input (could be modified within this function)
// Or error on failure
func (e *DefaultEndorsement) Endorse(prpBytes []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error) {
// 提取Proposal的签名人
signer, err := e.SigningIdentityForRequest(sp)
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("failed fetching signing identity: %v", err))
}

// 得到签名人身份
// serialize the signing identity
identityBytes, err := signer.Serialize()
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("could not serialize the signing identity: %v", err))
}

// 对Payload和身份进行签名
// sign the concatenation of the proposal response and the serialized endorser identity with this endorser's key
signature, err := signer.Sign(append(prpBytes, identityBytes...))
if err != nil {
return nil, nil, errors.New(fmt.Sprintf("could not sign the proposal response payload: %v", err))
}
endorsement := &peer.Endorsement{Signature: signature, Endorser: identityBytes}
return endorsement, prpBytes, nil
}

发送Response

ProcessProposal 会把 ProposalResponse 作为返回值,剩下的就交给 gRPC,发送给请求方了。

总结

本文从宏观和源码层面,解读了交易提案背书涉及的数据结构,以及其主要背书流程,核心可以主要包含以下几步:

  1. 检查Proposal
  2. 为交易创建模拟器,并调用模拟器模拟执行交易,生成执行结果
  3. 背书模块对执行结果和Proposal身份信息背书(签名),然后生成背书响应发送给客户端

关于背书流程,本文未涉及的环节有:

  1. Proposal中各字段,层层递进的含义
  2. 模拟执行交易,是链码执行函数,并和Peer交互的过程,以及模拟执行的各种资源
  3. 2种插件化ESCC的实现

后面的章节,会对相关源码实现做进一步分析。