0%

昨天睡前了看了一本收藏已久的书,是关于投资的,叫《伟大的时代-深度解读价值投资》,这是一本采访了国内价值投资者的书籍,从这些投资者的话语里,看到了一些共性的东西,寻找垄断企业持续发展的根因,也就获得了投资收益,这个果。

今天起床后,就想到了因果关系、面试、个人能力,在这些角度进行了一些思考,在此记录下思考的成果,这篇文章会介绍:

  • 因果关系应该关注因,还是关注果?
  • 如何从因果关系角度,建设个人能力?
  • 如何从因果关系角度,发现优秀的面试者?

价值投资中的因果关系

这些投资者的共性是,都提到了要寻找垄断,并且能够持续垄断的企业,并投资这些企业。

垄断是“果”,持续垄断也是过,它们需要“因”

怎么才能有垄断,并且持续垄断呢?

需要找到企业的文化、价值观、制度,这些软性的东西、虚的东西,是企业不断发展和进化的根基,这些是企业保持垄断,或成长为垄断的基石,垄断创造收益,收益是实。应了阿里一句话:把虚做实,把实做虚。

所以,企业文化、价值观和制度是“因”,垄断是“果”。

如果垄断是“因”,企业收益就是“果”。

收益的因不只有垄断,但垄断可以带来巨大收益。

关注因,还是关注果?

从企业文化、垄断和收益,这3者看,因果关系可以形成链条,组成一条因果链,一个元素即可以是因,又可以是果。

比如,垄断是企业文化的果,是收益的因。

说关注因是对的,关注果也是对的,关注因果链中,关注最根本的“因”,才是最对的

说一个开发者最容易体会的例子:解决bug,需要定位问题的“根因”,只解决中间原因,并不能真正解决bug。

如何从因果关系角度,建设个人能力?

我把中级技术开发者的能力,分成5个维度:技术深度、技术广度、商业思维能力、管理能力和演讲能力。

不同岗位、层次看到的能力维度是不一样的,比如CTO在找技术总监时的岗位时,需要有体系建设的能力。所以上面强调的是中级开发者。

这5个维度的能力是因,项目、职位、收入这些是果。

果是我们的目标,是我们要达到的地方。而因才是我们要关注的地方。

建设能力,能力就转变成了“果”,那对应的因是什么呢?

建设能力的“因”是持续学习

这几年的付费学习,可把持续学习给玩坏了,总是弥漫着一种贩卖焦虑的气息,但不可否认的是,持续学习的人,总有机会。

持续学习,有很多种通俗的说法:

  1. 活到老,学到老。
  2. 永不止步。
  3. 不给自己设限。
  4. Stay foolish, Stay hungry。——乔布斯

关于技术上的持续学习,曹大(Xargin)最近这篇文章值得一看《工程师应该怎么学习》

如何从因果关系角度,发现优秀的面试者?

我考察候选人的经历不是特别多,1年下来,简历筛选过几百份,候选人也面过几十个了,有一些体会,今天就借着因果关系,浅谈一下。

面试的本质,是挖掘面试者当前的能力和持续学习的能力

上面这句代表2个观点:

  1. 简历是“果”,能力是“因”。
  2. 招进来能持续创建价值是未来的“果”,持续学习是“因”。

阿里有句土话,用来招聘很适合:没有过程的结果是垃圾,没有结果的过程是放屁。

上半句用来筛选简历,如果简历只写自己参与、做过了什么,而没有成果,这份简历就是垃圾,而写不出好简历的面试者,能力大概率也不优秀。

下半句用来面试,面试者是怎么取得这些成果的,TA具有哪些能力才取得了这些成果?

结语

因果关系,还让我深刻的想到一句话:抓住事物的主要矛盾

咱们国家的发展,不一直就是党和政府在抓主要矛盾,解决主要矛盾的过程吗。

远见

在世界读书日那天,美团技术团队推荐了许多书,其中有一本关于职业生涯的书籍:《远见:如何规划职业生涯3大阶段》,书很薄,也很容易理解,读完之后对职业生涯的认知提升了几个Level,值得深度阅读并实践,推荐给不断让自己更优秀的你。

下面是美团点评酒旅事业群前端团队负责人郭凯的推荐语:

我们不仅要找到热爱的工作,而且要建立热爱的生活。职业生涯就像是一场至少长达45年的马拉松,这本书介绍了远见思维和三大职业生涯阶段,并介绍了如何应对职场和生活的冲突。我辈应该多行动、少忧虑,并且提前做好中长期的职业思考和职业规划。如果用“远见”的思维看待眼前的影响和困难,就根本不值得一提。用“远见”的思维,长期有耐心,每天前进30公里。

区块链最核心的是可信数据,所有的功能与设计根源都是数据。本次从数据存储的角度,看一看Peer。

账本

区块链的数据存储在账本中,账本包含:

  • 区块存储
    • 区块文件
    • 区块索引数据库
  • 世界状态数据库
  • 历史数据库
  • 私有数据数据库

关于账本以上各数据库的工具,官方文档中对区块存储和世界状态数据库介绍的比较详细了,但我们介绍下它没有提到的。

区块文件和区块索引数据库

区块是保存在文件中的,为了快速查找区块、交易,Fabric建立了索引,指明某通道某区块高度的第x个交易,是存在哪个文件,偏移量是多少。当然,索引还包含了区块高度、区块hash等,方便根据高度、hash查询区块。

上图展示了一个区块文件存储区块的情况,每个区块包含:

  • 区块长度
  • 区块头
  • 每条交易长度、交易数据

每个区块的开始位置、交易的开始位置,在写区块的时候记录下来,然后写到索引数据库(Index DB)。

整个Fabric网络只有1个区块索引数据库,也就是多通道共用一个

历史数据库

用来记录交易中每个状态数据的历史信息,直白点可以理解为链码中某个key的历史数值。它的key实际是{通道id+链码id, key, 区块高度, 交易在区块中的序号}组成的复合key,值为空,并且只包含有效的交易。

有这样一个问题:值为空,到底怎么查询到历史状态呢?

答:通过历史数据库合成复合key,但复合key中没有交易在区块中的序号,创建一个迭代器,迭代器可以获取包含key的复合key,然后从复合key中提取到交易在区块的序号,然后去区块文件中提取交易,再提取到写集的Value,就可以合成某个key的所有历史值。

因此查询历史状态,需要结合历史数据库和区块文件

各数据库实现

区块文件使用文件直接存储区块,没有使用数据库的原因是:区块是一种自然的追加操作,写入后不再修改,即不会覆盖历史区块,使用文件系统直接存储区块,可以达到区块最快落盘的目的,因为向文件写区块是顺序写,而写数据库是随机写,磁盘(包含HD、SSD)的顺序写性能要高于随机写。

世界状态数据库可以使用leveldb或者CouchDB,CouchDB支持富查询功能,当链码数据按JSON建模时,CouchDB可以提供更好的数据查询,更多CouchDB的信息见文档使用 CouchDB

其他数据库都使用leveldb作为底层存储。

提醒:Fabric支持多通道,逻辑上每个通道拥有一个账本。实现上区块文件是按通道名隔离开了,使用leveldb的各数据库,被各通道共用。

从数据看Peer功能

和账本相关的概念还有区块、交易和状态,从账本的角度看,账本向上支撑了2类功能:

  1. 数据同步:广播与同步区块
  2. 交易背书:模拟执行交易

在下图中,数据同步和交易背书分别使用蓝色和橙色的线圈出,底部剩下的2层为账本。

账本

core/ledger实现了Peer的账本功能,包含了账本中的各项数据库,它依赖common/ledger实现区块文件存储,区块文件存储包含3类:

  • File:把区块保存在文件中,生产环境使用,orderer和peer皆可使用
  • Json:把文件保证JSON格式的文件中,使用在非生产环境,仅供orderer使用
  • Ram :把区块保存在内存中,使用在非生产环境,仅供orderer使用

core/ledger中的:

  • PeerLedger接口,代表Peer账本,主要用来向账本写区块和私有数据,查询区块、交易和私有数据
  • Txsimulator接口,代表交易模拟器,用来模拟执行1条交易
  • QueryExecutor接口用来查询最新的数据
  • HistoryQueryExecutor接口用来查询历史状态

同步数据

同步数据有2种方式:

  • Deliver服务,Peer使用事件从Orderer获取区块
  • Peer向其他节点请求获取某个区间的区块

虽然Peer获取区块的方式有2种,但收到区块,处理区块的方式只有1种,所以下面分3小节介绍。

使用Deliver同步区块

Deliver用来以事件的方式获取区块,场景有2点:

  • Peer从Oderer获取区块
  • 客户端/SDK从Peer获取区块

Fabric 1.4源码解读 8:Orderer和Peer的交互中已经介绍了Peer从Orderer获取区块,这里再做一点补充。

Deliver服务是Orderer和Peer都使用的功能,但Orderer并没有core/ledger,所以从设计和实现上,common/deliver是从common/ledger中直接读区块,而不是core/ledger读区块。

Peer请求区块

每个Peer可以通过Gossip得知同通道的、所连接的Peer信息,其中一项就是对方Peer账本的高度。账本高度低的Peer可以向高度高的Peer发送StateRequest,请求获取某个连续区间的区块。

Peer上负责StateRequest的是gossip/state模块,它负责:

  • 创建StateRequest请求
  • 处理StateRequest请求,生成StateRequest响应
  • 处理StateRequest响应

创建请求:假设Peer1比Peer2少50个区块,并且配置了Peer每次最多取10个区块,Peer1会创建5个StateRequest请求,顺序的向Peer2进行请求,Peer1收到前一个请求的响应后,才发出下一个请求。

处理请求:实际是从账本读取所请求区块的过程,这个过程主要是读取区块文件,如果区块涉及私密数据,也涉及读取私密数据库,这部分功能主要由gossip/privdata完成,gossip/state把读到的区块和私密数据生成请求响应。

Peer处理收到的区块

Peer从Orderer和其他Peer哪获取的区块,最终都会进入到gossip/state,区块会被放入到一个区块缓冲区:PayloadsBuffer,默认大小为存储200个区块。

每个通道账本都有一个goroutine,从各自的PayloadsBuffer拿下一个高度的区块,交给gossip/privdata进行区块的验证和写入。

验证区块

这部分功能由core/handler/validation完成。在Fabric 1.4中,StateImpl会调用QueryExecutor查询状态,但实际StateImpl没有被调用。

验证区块主要是并发验证区块中的交易:

  • 验证交易中的字段
  • 验证是否满足背书策略
  • 验证交易是否调用最新版本的链码
  • 验证交易是否重复

交易验证的结果,即交易是否有效,并不会保存在交易中,这样区块中记录所有交易的DataHash就变化了。区块中所有交易的有效性存储在区块的元数据中,区块元数据中有一个有效性数组,依次存放了每个交易的有效性,使用数组的下标,与交易在数组中的顺序,一一对应。

交易验证后,会修改区块的元数据,把无效的交易设置为响应的无效序号。

如果缺失区块的私有数据,gossip/privdata会创建获取私有数据的请求,并获取私有数据,当区块和私有数据都准备齐全后,开始commit区块和私有数据。

区块写入账本

包含2大块:

  • 交易MVCC验证
    • Fabric要求世界状态数据库支持MVCC,即多版本并发控制,以便交易能够并发执行(背书),在真正修改状态的时候,才判断读写的数据是否冲突,冲突的交易会被标记为无效。关于MVCC我们在下文的背书部分再详细介绍。
  • 把区块写入数据库,以及修改各数据库:
    • 把区块写入到区块文件
    • 把区块、交易的索引写入到索引数据库
    • 有效交易的写集更新到世界状态
    • 提交历史数据库
    • 提交私密数据库
写区块完成后

写区块完成后,还需要做一些修剪操作:私密数据是有有效期的,比如存活100个区块时间,假如在1000高度写入了某私有数据,第1100写入账本后,私密数据就要从私密数据库被抹除。

背书

Peer除了记账的另外一个角色就是背书,背书很重要的一个环节就是模拟执行交易。

MVCC

Fabric为了提供更高的系统性能,支持并发的执行交易,交易在执行过程中会读写世界状态数据库,也就存在并发访问数据库的场景,为了安全的访问数据库数据,就需要对数据库的并发进行限制。

Draveness在浅谈数据库并发控制 - 锁和 MVCC种介绍了并发控制3种手段:悲观锁、乐观锁和MVCC。

Fabric选择了MVCC,它要求世界状态数据库支持MVCC,本质上讲任何支持MVCC的数据库,都可以用来实现状态数据库。

MVCC是多版本并发控制的缩写,它是一种思想,而不是一种具体的算法,所以不同的数据库实现的MVCC不同。

在MVCC的数据存储中,数据有版本的概念,写一个数据的值,实际上是创建了一个新的版本来保存数据。

MVCC可以实现并发读写的能力,当读数据时,先确定待读数据的版本,然后从该版本读取数据,写数据时,创建新的版本保存数据。读数据必然是已经存在的版本,而写数据是新的版本,因此读写可以并行。

Fabric对MVCC的使用

背书节点在模拟执行交易的过程中,会生成读写集,读集和写集分别是所有待写key读出来时的版本和待写入的新值

交易并发执行到写入区块的过程中存在2种读写冲突的情况:

  1. 同一个区块中的前后两笔交易,后面的交易读集包含某个key,但key在前面交易的写集:也就说后面交易读的是老版本的数据,是一种脏读的情况
  2. 区块中交易的读集的某个key,某之前区块的交易写集修改:背书跟写区块是并发执行的,背书时产生的写集,直到写区块才会更新到世界状态数据库,这里存在一段时间,即key已经有了新版本的数据,只是还没有提交到数据库。如果这期间有新的交易模拟执行,就会读到老版本数据,也是一种脏读的情况

有效交易的写集会被应用到世界状态数据库,被修改数据都会有一个新的版本,这个版本是逻辑版本,成为Hight,由{区块高度,交易在区块内的顺序}组成。

注:验证函数为 validateTx,读写集冲突错误为 TxValidationCode_MVCC_READ_CONFLICT ,另一个读写冲突错误为 TxValidationCode_PHANTOM_READ_CONFLICT, 因为执行过程中有RangeQuery,查询某个区间的Key,也需要验证这些Key是否冲突,底层本质还是读写集的验证。

总结

本文从账本的视角,介绍了Peer的账本,以及和账本打交道的功能。

真正企业级的区块链、大用户规模的区块链,必然能够支撑大量的并发交易,这对账本以及底层存储,都会提出更高的性能要求、磁盘利用率要求,所以理解和掌握账本和存储机制是非常有必要的。

参考

引言

Write Ahead Logging,简称WAL,也被翻译成预写式日志,是数据库技术中实现事务日志(Transaction Journal)的一种标准方法,可以实现单机事务的原子性,同时可以提高数据库的写入效率。

思考如下场景,如何确保原子性:写操作修改数据库中a和b的值,二者是一个事务,需要把a和b的最新值持久化到磁盘,假如保存完a的值,系统宕机了,重新启动后,a的值已经写入,但b待写入的值已经丢失,如何发现事务没有完成呢?如何保证事务的原子性呢?

可以为事务加锁,也为事务增加标志位,修改完磁盘数据后,标志位设置事务为完成,事务状态保存在磁盘中,假使保存事务状态的过程中宕机了,就把事务回滚掉。实现REDO和UNDO,就能实现原子性。

数据库中针对CrashRecovery的解决方案是WAL。

原理

WAL的核心思想是先写日志再写数据文件,修改数据文件必须发生在修改操作记录在日志文件之后。

本文的日志指事务的操作日志,本文提到的日志都是指事务日志,不再特殊声明。

WAL

我们看WAL怎么解决宕机和恢复的问题

  • 写WAL前宕机了,重启后,数据处于事务未执行的状态。
  • 写WAL时宕机了,重启后,可以检查到WAL数据不正确,回滚当事务前的状态。
  • 写WAL后宕机了,重启后,把WAL中记录的操作,应用到数据库文件中,得到事务执行后的状态。

如此,保证了数据的恢复和事务的原子性。

上面提到的都是写操作,看一下使用WAL时的读操作。WAL中可能包含了未写入到数据库文件中的最新值,如果读最新值就需要从WAL中读取,如果WAL中未读到,从数据库读到的就是最新的数据。

检查点:写入到WAL文件中的操作记录并不一定会立刻应用到数据库文件上,这个过程是异步的,设计检查点来记录已经被应用到数据库文件上的操作序号,检查点后面的操作记录等待被应用到数据库文件上。

优点

WAL的作用是解决宕机和恢复的问题,同时也有其他优点:

  1. 提高写数据的性能
    1. WAL是顺序写,数据库文件是随机写,顺序写性能高于随机写
    2. 减少写磁盘次数
      1. 不直接修改数据库真实数据
      2. 合并若干小的事务,一次性commit到数据库
  2. 保证事务原子性
  3. 保证事务一致性
  4. 并发读写,比如SQLite中,读写、读读都是可以并行的,比如读时需要找到WAL某个值最后写入的位置,就可以从该位置读数据,而写操作是在WAL文件后Append,二者并行。但写写不能并行,因为2次写操作都要向WAL文件Append数据,无法同时进行。
  5. WAL文件中记录了数据的历史版本,因此可以读取历史版本的值,甚至把状态回滚到某个历史版本。

缺点

SQLite提到了WAL的几项缺点:

  1. WAL需要VFS的支持。
  2. 所有使用数据库的进程必须在同一个机器上,以为WAL是单机的。
  3. 多读少写的场景WAL比rollback-journal类型要慢1%~2%。

使用场景

WAL几乎是数据存储(数据库只是数据存储的一个类别,只不过这个类别很大)的标配:

  • Raft可以使用WAL保存log Entry以及状态
  • 数据库
    • PgSQL使用WAL实现事务日志实现事务原子性、一致性,提升性能
    • SQLite使用WAL实现原子事务和回滚
    • MySQL使用WAL保证数据不丢失的情况下提升性能
    • leveldb也使用WAL提升性能,保证操作原子性

资料

Peer与Orderer的交互主要是组织的Peer主节点从Orderer获取区块,本文就来介绍,Peer是如何从Orderer获取区块的,顺带介绍为何Peer从Orderer获取的区块“好慢”。

网络拓扑

假设存在如下的Fabric网络拓扑情况,本文使用此拓扑进行介绍Orderer到Peer的区块传播情况:

网络中存在两家组织:Org1和Org2,它们分别拥有Peer1作为主节点,连向了排序服务的Orderer1节点。

网络中存在2个应用channel:channel1和channel2,它们的账本分别是channel1 ledger和channel2 ledger,Org1和Org2都加入了这2个channel。

channel间是隔离的,所以Peer和Orderer对不同的channel都会分别处理

宏观视角

下图展示了Orderer向Peer传递区块的宏观视角,能够展示多个通道在Orderer和Peer间传递区块的情况

  1. Orderer上有2个通道的账本,每个Peer分别有2个Deliver Server对应2个通道的账本,从账本读取区块,发送给Peer。
  2. 每个Peer有2个Deliver Client,也对应2个通道,接收Orderer发来的区块,加入到缓冲区Payloads Buffer,然后再从Payloads Buffer中提取区块,验证后写入对应的通道账本。

后面,介绍区块同步某个通道区块的情况。

单通道区块同步

Peer利用Deliver从Orderer获取区块,就像SDK利用Deliver从Peer获取区块一样,Deliver服务端的处理是一样的,Deliver客户端的处理就由SDK、Peer自行处理了。

Deliver本质是一个事件订阅接口,Leading Peer启动后,会为每个通道,分别向Orderer节点注册区块事件,并且指定结束的区块高度为uint类型的最大值,这是为了不停的从orderer获取区块。

通过建立的gRPC连接,Orderer源源不断的向Peer发送区块,具体流程,如下图所示:

  1. Orderer调用deliverBlock函数,该函数是循环函数,获取区块直到指定高度。
  2. 每当有新区块产生,deliverBlock能利用NextBlock从通道账本中读到最新的区块,如果没有最新区块,NextBlock会阻塞。
  3. deliverBlock把获取的区块封装成区块事件,发送给Peer(写入到gRPC缓冲区)。
  4. Peer从gRPC读到区块事件,把区块提取出来后,加入到Payloads Buffer,Payloads Buffer默认大小为200(通过源码和日志发现,Payloads Buffer实际存储202个区块),如果Orderer想向Peer发送更多的区块,必须等Payloads Buffer被消费,有空闲的位置才可以。
  5. deliverPayloads为循环函数,不断消费Payloads Buffer中的区块,执行区块验证,添加区块剩余元数据,最后写入通道账本。
  6. 写通道账本包含区块写入区块账本,修改世界状态数据库,历史索引等。

为何Peer从Orderer获取区块慢?

在性能测试过程中,我们发现Orderer排序完成后,Peer还在不断的从Orderer获取区块,而不是所有排序后的区块都先发送给Peer,Peer缓存起来,慢慢去验证?

上面提到Orderer向Peer发送的区块,Peer收到后先存到Payloads Buffer中,Buffer有空闲位置的时候,Orderer发送的区块才能写入Buffer,deliverBlock 1次循环才能完成,才可以发送下一个区块。

但Payloads Buffer大小是有限的,当Buffer满后,Orderer发送区块的操作也会收到阻塞。

我们可以把Orderer和Peer间发送区块可以抽象一下,它们就是生产者-消费者模型,它们中间是缓冲区,Orderer是生产者,向缓冲区写数据,Peer是消费者,从缓冲区读数据,缓冲区满了会阻塞生产者写数据。

所以Orderer向Peer发送数据的快慢,取决消费者的速度,即取决于deliverPayloads处理一个区块的快慢

deliverPayloads慢在把区块写入区块账本,也就是写账本,成了整个网络的瓶颈。

为何不让Peer缓存所有未处理的区块?

从我们测试的情况看,Orderer排序的速度远快于Peer,Peer和Orderer的高度差可以达到10万+,如果让Peer来缓存这些区块,然后再做处理是需要耗费大量的空间。

在生产者-消费者模型中,只需要要消费者时刻都有数据处理即可。虽然Orderer和Peer之间是网络传输,测试网络比较可靠,传输速度远比Peer处理区块要快。

Payloads Buffer可以让网络传输区块和Peer处理区块并行,这样缩短了一个区块从Orderer中发出,到Peer写入区块到账本的总时间,提升Fabric网络整体性能。

Orderer介绍

排序服务由一组排序节点组成,它接收客户端提交的交易,把交易打包成区块,确保排序节点间达成一致的区块内容和顺序,提供区块链的一致性服务。

图片源自《区块链原理、设计与应用》,当时Fabric还不支持raft

排序服务所提供的一致性,依赖确定性的共识算法,而非比特币、以太坊等公有链,所采用的概率性共识算法。确定性的共识算法是区块上链,即不可修改。Fabric所采用的共识算法有Solo、Kafka、EtcdRaft。

客户端通过Broadcast接口向Orderer提交背书过的交易,客户端(此处广义指用户客户端和Peer节点通过Deliver接口订阅区块事件,从Orderer获取区块

更多的排序服务介绍请参考这篇官方文档排序服务

架构

Architecture of Orderer

本图依赖 Fabric 1.4 源码分析而得

Orderer由:多通道、共识插件、消息处理器、本地配置、区块元数据、gRPC服务端、账本等组成,其中gRPC中的Deliver、Ledger是通用的(Peer也有),其余都是Orderer独有的。

多通道

Fabric 支持多通道特性,而Orderer是多通道的核心组成部分。多通道由Registrar、ChainSupport、BlockWriter等一些重要部件组成。

Registrar是所有通道资源的汇总,访问每一条通道,都要经由Registrar,更多信息请看Registrar

ChainSupport代表了每一条通道,它融合了一条通道所有的资源,更多信息请看ChainSupport

BlockWriter 是区块达成共识后,Orderer写入区块到账本需要使用的接口。

共识插件

Fabric的共识是插件化的,抽象出了Orderer所使用的共识接口,任何一种共识插件,只要满足给定的接口,就可以配合Fabric Orderer使用。

当前共识有3种插件:Solo、Kafka、EtcdRaft。Solo用于实验环境,Kafka和EtcdRaft用于生产环境,Kafka和EtcdRaft都是CFT算法,但EtcdRaft比Kafka更易配置。

EtcdRaft实在Fabric 1.4开始引入的,如果之前的生产环境使用Kafka作为共识,可以遵循Fabric给的指导,把Kafka共识,迁移到Raft共识。

gRPC通信

Orderer只有2个gRPC接口:

  • Broadcast:用来接收客户端提交的待排序交易
  • Deliver:客户端(包括Peer节点)用来从Orderer节点获取已经达成一致的区块

其中,Broadcast是Orderer独有的,而Devliver是通用的,因为客户端也可以利用Deliver接口从Peer节点获取区块、交易等。

关于Broadcast和Orderer更多介绍可以参考杨保华的2篇笔记:

用来解析orderer节点的配置文件: orderer.yaml,并保存入内存。

该配置文件中的配置,是节点本地的配置,不需要Orderer节点间统一的配置,因此不需要上链,相关配置有:

  • 网络相关配置
  • 账本类型、位置
  • raft文件位置

而上链的配置,被称为通道配置,需要使用配置交易进行更新,这部分配置,写在configtx.yaml中,和Orderer相关的有:

  • 共识类型
  • 区块大小
  • 切区块的时间
  • 区块内交易数
  • 各种共识的相关配置

Metadata

区块中有4个元数据:

  • 区块签名,存放orderer对区块的SignatureHeader
  • 最新配置区块的高度,方便获取当前通道最新配置
  • 交易过滤,为数组,存放区块内所有交易的有效性,使用数字代表无效的原因,由验证交易的记账节点填写
  • orderer相关元数据,不同的共识类型,该元数据不同

区块Header中记录了Data.Hash(),Data是所有交易后序列化的结果,但不包含区块元数据,所以区块元数据是可以在产生区块后修改的。即,即使元数据上链了,但这数据是可以修改的,只不过修改也没有什么意义。

MsgProcessor

orderer收到交易后需要对交易进行多项检查,不同的通道可以设置不同的MsgProcessor,也就可以进行不同的检查。

当前Processor分2个:

  • 应用通道的叫StandardChannel
  • 系统通道的叫SystemChannel

StandardChannel会对交易进行以下检查:

  • 交易内容不能为空
  • 交易大小不能超过区块大小最大值(默认10MB)
  • 交易交易签名不符合签名策略
  • 签名者证书是否过期

SystemChannel只比StandardChannel多一项:系统配置检查,用来检查以下交易中包含的配置,配置项是否有缺失,或者此项配置是否允许更新等。

BlockCutter

BlockCutter用来把收到的交易分成多个组,每组交易会打包到一个区块中。而分组的过程,就是切块,每组交易被称为一个Batch,它有一个缓冲区用来存放待切块交易。

切块有3个可配置条件:

  • 缓冲区内交易数,达到区块包含的交易上限(默认500)
  • 缓冲区内交易总大小,达到区块大小上限(默认10MB)
  • 缓冲区存在交易,并且未出块的时间,达到切块超时时间(默认2s)

切块有1个不可配置条件:

  • 缓冲区收到配置交易,配置交易要放到单独区块,如果缓冲区有交易,缓冲区已有交易会切到1个区块

超多刚接触Fabric的人有这样一个疑问:排序节点是按什么规则对交易排序的?

按什么顺序对交易排序并不重要,只要交易在区块内的顺序是一致的,然后所有记账节点,按交易在区块内的顺序,处理交易,最后得到的状态必然是一致的,这也是区块链保持一致性的原理。

再回过头来说一下实现是什么顺序:哪个交易先写入BlockCutter的缓冲区,哪个交易就在前面,仅此而已。

BlockWriter

Orderer的BlockWriter是基于common/ledger实现的,它用来保存区块文件,不包含状态数据库等其他数据库,其中有3类区块文件:ram,json和file,file是Orderer和Peer都可使用的,另外2个只能Orderer使用。

BlockWriter用来向Peer的账本追加区块,但追加区块之前,还需要做另外1件事情,设置区块的元数据。

区块元数据包含:

  • 区块签名,存放orderer对区块的SignatureHeader
  • 最新配置区块的高度,方便获取当前通道最新配置
  • 交易过滤,为数组,存放区块内所有交易的有效性,使用数字代表无效的原因,由验证交易的记账节点填写
  • orderer相关元数据,不同的共识类型,该元数据不同

但此时只设置其中的3个:区块签名、配置区块高度、orderer相关的元数据。因为交易的有效性在记账节点检查后才能设置。

为何不在创建区块的时候就设置这些元数据信息,而是在区块经过共识之后?

共识的过程会传播区块,只让区块包含必要的信息,可以减少区块大小,降低通信量。但元数据占用大小非常小,所以这未必是真实原因。

BlockWriter还有另外一个功能:根据一个Batch创建下一个高度的区块。一个区块包含了:

  • Header:区块高度、前一个区块Hash、Data的哈希值
  • Data:被序列化的交易列表
  • Metadata:区块元数据

Header只记录Data的哈希值,不包含Metadata哈希值,这样的目的是,在区块创建之后,仍能修改区块。

Orderer节点启动

根据Fabric 1.4源码梳理Orderer启动步骤:

  • 加载配置文件
  • 设置Logger
  • 设置本地MSP
  • 核心启动部分:
    • 加载创世块
    • 创建账本工厂
    • 创建本机gRPCServer
    • 如果共识需要集群(raft),创建集群gRPCServer
    • 创建Registrar:设置好共识插件,启动各通道,如果共识是raft,还会设置集群的gRPC接口处理函数Step
    • 创建本机server:它是原子广播的处理服务,融合了Broadcast处理函数、deliver处理函数和registrar
    • 开启profile
    • 启动集群gRPC服务
    • 启动本机gRPC服务

启动流程图可请参考杨宝华的笔记Orderer 节点启动过程,笔记可能是老版本的Fabric,但依然有参考价值。

Orderer处理交易的流程

普通交易在Orderer中的流程

交易是区块链的核心,交易在Orderer中的流程分3阶段:

  1. Orderer 的 Broadcast 接口收到来自客户端提交的交易,会获取交易所在的链的资源,并进行首次检查,然后提交给该链的共识,对交易进行排序,最后向客户端发送响应,为下图蓝色部分。
  2. 共识实例是单独运行的,也就是说Orderer把交易交给共识后,共识可能还在处理交易,然而Orderer已经开始向客户端发送提交交易的响应。共识如果发现排序服务的配置如果进行了更新,会再次检查交易,然后利用把Pending的交易分割成一组,然后打包成区块,然后共识机制确保各Orderer节点对区块达成一致,最后将区块写入账本。为下图绿色部分。
  3. Peer会向Orderer订阅区块事件,每当新区块被Orderer写入账本时,Orderer会把新区块以区块事件的方式,发送给Peer。为下图换色部分。

上面提到Orderer和共识实例分别会对交易进行2次检查,这些检查是相同的,为何要进行两次检查呢?

代码如下:ProcessMessage 会调用ProcessNormalMsg,对交易进行第一次检查,如果有错误,会向客户端返回错误响应。 SomeConsensurFunc 是一个假的函数名称,但3种共识插件实现,都包含相同的代码片,当消息中 configSeq < seq 时,再次对交易进行检查,如果错误,则丢次此条交易。configSeq是Order函数传入的,即第一次检查交易时的配置号,seq为共识当前运行时的配置号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
// ...
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message from %s because of error: %s", chdr.ChannelId, addr, err)
return &ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()}
}
// ...
err = processor.Order(msg, configSeq)
// ...
}

func SomeConsensurFunc() {
// ...
if msg.configSeq < seq {
_, err = ch.support.ProcessNormalMsg(msg.normalMsg)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
continue
}
}
// ...
}

我认为如此设计的原因,考量如下:
共识插件应当尽量高效,orderer尽量把能做的做掉,把不能做的交给共识插件,而交易检查就是orderer能做的。共识插件只有在排序服务配置更新后,才需要重新检查交易,以判断是否依然满足规则。排序服务的配置通常是比较稳定的,更新频率很低,所以进行2次校验的频率也是非常低。这种方式,比只在共识插件校验,会拥有更高的整体性能。

配置交易在Orderer中的流程

配置交易可以用来创建通道、更新通道配置,与普通交易的处理流程总体是相似的,只不过多了一些地方或者使用不同的函数,比如:

  • 交易检查函数不是ProcessNormalMsg,而是ProcessConfigMsg
  • 配置交易单独打包在1个区块
  • 配置交易写入账本后,要让配置生效,即Orderer应用最新的配置

使用Raft共识,交易在Orderer中的流程

上面2中流程都是与具体共识算法无关的,这里补充一个Raft共识的。

使用Raft共识的链处理交易包含了上图中的4步:

  • 交易:处理交易
  • 区块:创建区块
  • Raft:使用Raft对区块达成共识
  • 账本:写区块元数据,把区块写入到账本

如果把图中提到的:转发和Raft去掉,就是以Solo为共识的链的过程。

下图是更加细化一层的,如果看不懂,建议先读下Etcd Raft架构设计和源码剖析2:数据流这篇文章。

红色圈出来的是etcd/raft的实现,蓝色圈出来的是Fabric使用raft为共识的部分,外面的Broadcast、Deliver是属于Orderer但不属于某条链。

这张图和etcd与raft交互没有太多不同,只有2个地方:

  1. chains要把交易转化为区块,再交给raft去共识
  2. chains的Apply并不是去修改状态机,而是把取消写到账本

源码简介

Orderer的代码位于fabric/orderer,其目录结构如下,标注了每个目录结构的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
➜  fabric git:(readCode) ✗ tree -L 2 orderer
orderer
├── README.md
├── common
│   ├── blockcutter 缓存待打包的交易,切块
│   ├── bootstrap 启动时替换通道创世块
│   ├── broadcast orderer的Broadcast接口
│   ├── cluster (Raft)集群服务
│   ├── localconfig 解析orderer配置文件orderer.yaml
│   ├── metadata 区块元数据填写
│   ├── msgprocessor 交易检查
│   ├── multichannel 多通道支持:Registrar、chainSupport、写区块
│   └── server Orderer节点的服务端程序
├── consensus 共识插件
│   ├── consensus.go 共识插件需要实现的接口等定义
│   ├── etcdraft raft共识插件
│   ├── inactive 未激活时的raft
│   ├── kafka kafka共识插件
│   ├── mocks 测试用的共识插件
│   └── solo solo共识插件
├── main.go orderer程序入口
├── mocks
│   ├── common
│   └── util
└── sample_clients orderer的客户端程序样例
├── broadcast_config
├── broadcast_msg
└── deliver_stdout

23 directories, 3 files

阅读Orderer源码,深入学习Orderer的时候,建议以下顺序:

  • 核心的数据结构,主要在multichannel、consensus.go:Fabric 1.4源码解读 6:Orderer核心数据结构
  • Orderer的启动
  • Broadcast接口
  • msgprocessor
  • 通过Solo掌握共识插件需要做哪些工作
  • 切块:blockcutter
  • 写区块:BlockWriter、metadata

总结

本文从宏观的角度介绍了Orderer的功能、核心组成,以及交易在Orderer中的流程,Peer如何从Orderer获取区块。

前言

许多Orderer的文章,都是从Orderer的启动过程讲起,今天换一种“乐高”角度,先看看有哪些“零件”,再看这些零件怎么配合。

Orderer负责接收交易,把交易打包成区块,然后区块在所有Orderer节点之间达成一致,再分发给Peer的功能,这涉及了:

  • 网络:gRPC接收交易,向Peer发送区块
  • 切块:把交易打包成区块
  • 共识:所有Orderer节点达成一致

这些功能是由Orderer核心数据结构组织起来。

在Fabric中,通道和链在概念上都是一条区块链,所以本文中也会可能会混用链和通道。

核心数据结构

Registrar

Registrar

代码中,这样描述Registrar:

Registrar serves as a point of access and control for the individual channel resources.

可见它负责了每个channel资源的访问和控制点,也就说,要对某个通道怎么样,得从这入手。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Registrar struct {
lock sync.RWMutex
// 保存了多条链
chains map[string]*ChainSupport

// 共识插件
consenters map[string]consensus.Consenter
ledgerFactory blockledger.Factory
signer crypto.LocalSigner

systemChannelID string
systemChannel *ChainSupport
...
}
  • chains保存了每一条链,每一条链在Orderer中都以ChainSupport代表。
  • consenters保存了所有的共识插件,每个共识插件都是一个Consenter,Fabric 1.4中共识插件有Solo、Kafka、EtcdRaft。
  • ledgerFactory用来读取和创建链的账本。
  • signer用来对Orderer中的数据进行签名,以及创建SignatureHeader
  • systemChannelIDsystemChannel分别是系统链ID、系统链实例。

ChainSupport

chainsupport

ChainSupport汇集了一条通道所需要的所有资源,所以说一个ChainSupport代表了一条链。

1
2
3
4
5
6
7
8
type ChainSupport struct {
*ledgerResources
msgprocessor.Processor
*BlockWriter
consensus.Chain
cutter blockcutter.Receiver
crypto.LocalSigner
}

ChainSupport 是一堆接口的集合,这些接口构成一条链所有的操作,接口可以分为4类:

  • 账本:ledgerResourcesBlockWriter分别是账本读写和把区块写入到账本。
  • 消息:msgprocessor.Processorcutter分别是处理交易和把交易切块。
  • 共识:consensus.Chain是Orderer的共识实例,比如每条链都有自己的Raft共识实例,它们互不干扰。
  • 签名:crypto.LocalSigner,同Registrar中的介绍。

Chain

Chain

Chain是接口,它的实现并不一条链,而是一条链的共识实例,可以是Solo、Kafka和EtcdRaft,它运行在单独的协程,使用Channel和ChainSupport通信,它调用其它接口完成切块,以及让所有的Orderer节点对交易达成一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Chain defines a way to inject messages for ordering.
// Note, that in order to allow flexibility in the implementation, it is the responsibility of the implementer
// to take the ordered messages, send them through the blockcutter.Receiver supplied via HandleChain to cut blocks,
// and ultimately write the ledger also supplied via HandleChain. This design allows for two primary flows
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// 普通消息/交易排序
// Order accepts a message which has been processed at a given configSeq.
// If the configSeq advances, it is the responsibility of the consenter
// to revalidate and potentially discard the message
// The consenter may return an error, indicating the message was not accepted
Order(env *cb.Envelope, configSeq uint64) error

// 配置消息/交易排序
// Configure accepts a message which reconfigures the channel and will
// trigger an update to the configSeq if committed. The configuration must have
// been triggered by a ConfigUpdate message. If the config sequence advances,
// it is the responsibility of the consenter to recompute the resulting config,
// discarding the message if the reconfiguration is no longer valid.
// The consenter may return an error, indicating the message was not accepted
Configure(config *cb.Envelope, configSeq uint64) error

// 等待排序集群可用
// WaitReady blocks waiting for consenter to be ready for accepting new messages.
// This is useful when consenter needs to temporarily block ingress messages so
// that in-flight messages can be consumed. It could return error if consenter is
// in erroneous states. If this blocking behavior is not desired, consenter could
// simply return nil.
WaitReady() error

// 当排序集群发送错误时,会关闭返回的通道
// Errored returns a channel which will close when an error has occurred.
// This is especially useful for the Deliver client, who must terminate waiting
// clients when the consenter is not up to date.
Errored() <-chan struct{}

// 启动当前链
// Start should allocate whatever resources are needed for staying up to date with the chain.
// Typically, this involves creating a thread which reads from the ordering source, passes those
// messages to a block cutter, and writes the resulting blocks to the ledger.
Start()

// 停止当前链,并释放资源
// Halt frees the resources which were allocated for this Chain.
Halt()
}

Consenter

Consenter

1
2
3
type Consenter interface {
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

Consenter也是接口,它只有1个功能用来创建Chain。每种共识插件,都有自己单独的consenter实现,分别用来创建solo实例、kafka实例或etcdraft实例。

ConsenterSupport

ConsenterSupport

ConsenterSupport为consenter实现提供所需的资源,其实就是共识用来访问外部数据的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ConsenterSupport provides the resources available to a Consenter implementation.
type ConsenterSupport interface {
crypto.LocalSigner
msgprocessor.Processor

// VerifyBlockSignature verifies a signature of a block with a given optional
// configuration (can be nil).
VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error

// 提供把消息切成块的接口
// BlockCutter returns the block cutting helper for this channel.
BlockCutter() blockcutter.Receiver

// 当前链的orderer配置
// SharedConfig provides the shared config from the channel's current config block.
SharedConfig() channelconfig.Orderer

// 当前链的通道配置
// ChannelConfig provides the channel config from the channel's current config block.
ChannelConfig() channelconfig.Channel

// 生成区块
// CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger
// Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time.
CreateNextBlock(messages []*cb.Envelope) *cb.Block

// 读区块
// Block returns a block with the given number,
// or nil if such a block doesn't exist.
Block(number uint64) *cb.Block

// 写区块
// WriteBlock commits a block to the ledger.
WriteBlock(block *cb.Block, encodedMetadataValue []byte)

// 写配置区块并更新配置
// WriteConfigBlock commits a block to the ledger, and applies the config update inside.
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)

// Sequence returns the current config squence.
Sequence() uint64

// ChainID returns the channel ID this support is associated with.
ChainID() string

// Height returns the number of blocks in the chain this channel is associated with.
Height() uint64

// 以原始数据的格式追加区块,不像WriteBlock那样会修改元数据
// Append appends a new block to the ledger in its raw form,
// unlike WriteBlock that also mutates its metadata.
Append(block *cb.Block) error
}

宏观视角

把上面介绍的各项,融合在一幅图中:

  • Registrar 包容万象,主要是ChainSupport和Consenter,Consenter是可插拔的
  • ChainSupport 代表了一条链,能够指向属于本条链的共识实例,该共识实例由对应共识类型的Consenter创建
  • 共识实例使用ConsenterSupport访问共识外部资源

使用fabric-sdk-go操作链码,介绍了使用官方Go SDK,安装、实例化和升级链码,调用和查询链码,本文介绍使用fabric-sdk-go订阅事件。

事件介绍

本质上就3种事件:

  • BlockEvent:获取区块信息
  • TransactionEvent:获取交易信息
  • ChainCodeEvnet:链码中自定义的链码事件

但每种事件都有2 种类型:

  • Filtered:事件订阅时默认的类型,获取的信息“不全”,不同的事件缺失的数据不同,比如链码事件,如果是Filtered的,其响应字段中的Payload是空的,也就是不知道链码事件携带的数据。这种方式能够降低fabric网络和SDK之间的流量,当Filtered后的字段信息就足够时,这种方式非常适合。关于Filtered的更多信息,这篇文章 Fabric 1.4源码解读 3:Event原理解读 非常有帮助。
  • 非Filtered :可以获取完整的区块、交易、链码事件信息,这种方式在SDK想获取更多信息时,是非常必要的。

4 个注册事件的接口1个取消注册的接口如下:

接口名称 描述 参数值 返回值
RegisterBlockEvent 注册块事件 filter …fab.BlockFilter fab.Registration, <-chan *fab.BlockEvent, error
RegisterFilteredBlockEvent 注册过滤块事件 fab.Registration, <-chan *fab.FilteredBlockEvent, error
RegisterTxStatusEvent 注册交易状态事件 txID string fab.Registration, <-chan *fab.TxStatusEvent, error
RegisterChaincodeEvent 注册链码事件 ccID, eventFilter string fab.Registration, <-chan *fab.CCEvent, error
Unregister 取消事件订阅 fab.Registration

注册会得到管理可以管理订阅的Registration、接收事件的通道,以及可能注册时发生的错误,关于每个接口的具体介绍、使用,可以参考官方的Event文档,其中包含了样例代码,如果想看真实的样例代码,可以参考示例项目

Option介绍

注册事件需要使用EventClient,创建EventClient时可以指定一些选项,这些选项其实就是事件订阅的选项。

有3个Option:

  • func WithBlockEvents() ClientOption

    指定了此选项,事件就是非“filtered”,fabric会向调用SDK客户端发送完整的区块,可以获得订阅事件完整的信息。

  • func WithSeekType(seek seek.Type) ClientOption

    使用此选项可以指定从哪个区块高度获取事件seek.TypeOldestNewestFromBlock 3种取值,分别代表从第1个区块、最后一个区块和指定区块号开始获取事件,FromBlock需要结合WithBlockNum使用。So,可以通过这个选项获取历史事件

  • func WithBlockNum(from uint64) ClientOption

    指定区块高度,只有WithSeekType(FromBlock)时才生效。

链码事件多说几句

链码如何发链码事件

ChaincodeStubInterfaceSetEvent的方法,入参分别为事件名称和事件锁携带的信息payload。

1
2
3
4
5
6
7
8
9
// ChaincodeStubInterface is used by deployable chaincode apps to access and
// modify their ledgers
type ChaincodeStubInterface interface {
// SetEvent allows the chaincode to set an event on the response to the
// proposal to be included as part of a transaction. The event will be
// available within the transaction in the committed block regardless of the
// validity of the transaction.
SetEvent(name string, payload []byte) error
}

通过ChannelClient订阅链码事件介绍

SDK的channel client也有订阅链码事件的接口:channel.Client.RegisterChaincodeEvent(),它的定义和event client提供的接口完全一样,但功能上有所差别。

channel client没有指定 WithBlockEvents,所以这是Filtered的事件链码,获取的事件链码中,其Payload为空。

示例项目

示例项目fabric-sdk-go-sample是结合Fabric的BYFN展示如何使用fabric-sdk-go的项目,它的Event样例部分,介绍了如何使用以上接口订阅Fabric事件,具体请参加该部分README

理论知识

如果不清楚数字证书、公私钥与签名的关系,建议阅读阮一峰的数字签名是什么?

Fabric证书和密钥文件

使用Fabric CA或者 cryptogen 工具可以生成证书和私钥文件,这里取 BYFN 例子的文件做介绍,Org1 Admin 账户的文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
➜  first-network git:(release-1.4) ✗ tree crypto-config/peerOrganizations/org1.example.com/users/Admin@org1.example.com
crypto-config/peerOrganizations/org1.example.com/users/Admin@org1.example.com
├── msp
│   ├── admincerts
│   │   └── Admin@org1.example.com-cert.pem
│   ├── cacerts
│   │   └── ca.org1.example.com-cert.pem
│   ├── keystore
│   │   └── f9f3dddb7fcc40086de6d5ae77f1481abbb99bff7a74839b950720d3dca0d8ee_sk
│   ├── signcerts
│   │   └── Admin@org1.example.com-cert.pem
│   └── tlscacerts
│   └── tlsca.org1.example.com-cert.pem
└── tls
├── ca.crt
├── client.crt
└── client.key

msp目录,为Admin的身份信息:

  • admincerts:组织管理员的身份验证证书。
  • cacerts:组织的根证书。
  • keystore:该用户的私钥,用来对消息签名。
  • signcerts:该用户的身份验证证书,被组织根证书签名。
  • tlscacerts:TLS通信用的身份证书,为组织的TLS证书。

tls目录,为TLS通信相关的证书:

  • ca.crt:组织根证书
  • client.crt:验证当前用户身份的证书,当前为验证管理员的证书
  • client.key:当前用户的身份私钥,用来签名

整体逻辑

交易是区块链的核心,一切状态的转移都是一条交易,交易的真伪需要使用数字签名进行保证。

在Fabric中,交易涉及两个概念:

  • Proposal:提案
  • Transaction:交易

所以 Proposal 和 Transaction 都需要使用数字签名进行保护,它们相关的消息中,都包含了发送方的身份信息:mspid、证书(证书中实际包含了公钥)。

提案的实际消息是 SignedProposal,其中包含了:

  • 数字签名:Signature
  • 证书、公钥等签名者身份信息:ProposalBytes.Proposal.Header.SignatureHeader.Creator

signed_proposal

图来自杨保华的hyperledger_code_fabric

交易中最重要的是Envelope结构体,SDK向Orderer提交交易时,会发送Envelope消息,它包含了:

  • 数字签名:Signature
  • 交易发送方的身份信息:Payload.Header.SignatureHeader.Creator
  • 可选背书节点的身份信息,不同的交易类型,Data包含了不同的信息,如果是需要背书的,可以包含背书的信息、签名和身份信息:Payload.Data.SignedChainccodeDeploymentSpec.OwnerEndorsements.signingidentity

Signed transaction

图来自《区块链原理、设计与应用》,为升级链码的交易Envelope结构。

在验证消息的签名时,会从中提取出数字签名Signature,身份信息(证书、公钥)和被签名消息体,完成以下验证:

  • 使用证书验证发送方的身份,发送方是否属于它所在的组织,以及发送方的公钥没有修改和替换
  • 使用公钥验证消息是否为发送方签名,并且消息没有被修改

验证的整体流程如下:

Verify signature

验证签名的函数

core/common/validation/msgvalidation.go 提供了2验证消息签名的函数,用来验证Proposal和Transaction,它们会调用相同的函数checkSignatureFromCreator进行数字签名的验证。

验证Porposal签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func ValidateProposalMessage(signedProp *pb.SignedProposal) (*pb.Proposal, *common.Header, *pb.ChaincodeHeaderExtension, error) {
...

// 从SignatureHeader交易客户端的签名
// validate the signature
err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
if err != nil {
// log the exact message on the peer but return a generic error message to
// avoid malicious users scanning for channels
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(shdr.Creator, sId)
if err != nil {
// log the error here as well but still only return the generic error
err = errors.Wrap(err, "could not deserialize a SerializedIdentity")
putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err)
}
return nil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid)
}
}

验证Transaction签名

Commit阶段会对交易进行验证,会调用此函数,该函数完成了对Transaction的验证,包含发送方数字签名的验证。

交易是包含背书结果和背书签名的,背书相关的验证并不包含在此,而是专门的背书验证,具体请看Fabric 1.4源码解读 1:背书策略是怎么使用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// ValidateTransaction checks that the transaction envelope is properly formed
func ValidateTransaction(e *common.Envelope, c channelconfig.ApplicationCapabilities) (*common.Payload, pb.TxValidationCode) {
putilsLogger.Debugf("ValidateTransactionEnvelope starts for envelope %p", e)

...

// validate the header
chdr, shdr, err := validateCommonHeader(payload.Header)
if err != nil {
putilsLogger.Errorf("validateCommonHeader returns err %s", err)
return nil, pb.TxValidationCode_BAD_COMMON_HEADER
}

// validate the signature in the envelope
err = checkSignatureFromCreator(shdr.Creator, e.Signature, e.Payload, chdr.ChannelId)
if err != nil {
putilsLogger.Errorf("checkSignatureFromCreator returns err %s", err)
return nil, pb.TxValidationCode_BAD_CREATOR_SIGNATURE
}

// continue the validation in a way that depends on the type specified in the header
switch common.HeaderType(chdr.Type) {
case common.HeaderType_ENDORSER_TRANSACTION:
// Verify that the transaction ID has been computed properly.
// This check is needed to ensure that the lookup into the ledger
// for the same TxID catches duplicates.
err = utils.CheckTxID(
chdr.TxId,
shdr.Nonce,
shdr.Creator)

if err != nil {
putilsLogger.Errorf("CheckTxID returns err %s", err)
return nil, pb.TxValidationCode_BAD_PROPOSAL_TXID
}

// 如果是背书交易,背书的签名不在此验证,由背书策略模块进行验证
err = validateEndorserTransaction(payload.Data, payload.Header)
putilsLogger.Debugf("ValidateTransactionEnvelope returns err %s", err)

验证签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// given a creator, a message and a signature,
// this function returns nil if the creator
// is a valid cert and the signature is valid
func checkSignatureFromCreator(creatorBytes []byte, sig []byte, msg []byte, ChainID string) error {
putilsLogger.Debugf("begin")

// check for nil argument
if creatorBytes == nil || sig == nil || msg == nil {
return errors.New("nil arguments")
}

// 每个链有各自的msp
mspObj := mspmgmt.GetIdentityDeserializer(ChainID)
if mspObj == nil {
return errors.Errorf("could not get msp for channel [%s]", ChainID)
}

// 获取proposal创建者/发送方的Identity
// creatorBytes 中是序列化后的mspid、证书、公钥等信息
creator, err := mspObj.DeserializeIdentity(creatorBytes)
if err != nil {
return errors.WithMessage(err, "MSP error")
}

putilsLogger.Debugf("creator is %s", creator.GetIdentifier())

// 验证证书是否有效
// ensure that creator is a valid certificate
err = creator.Validate()
if err != nil {
return errors.WithMessage(err, "creator certificate is not valid")
}

putilsLogger.Debugf("creator is valid")

// validate the signature
// 验证签名
err = creator.Verify(msg, sig)
if err != nil {
return errors.WithMessage(err, "creator's signature over the proposal is not valid")
}

putilsLogger.Debugf("exits successfully")

return nil
}

获取Identity

获取当前通道的MSP manager:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// GetIdentityDeserializer returns the IdentityDeserializer for the given chain
func GetIdentityDeserializer(chainID string) msp.IdentityDeserializer {
if chainID == "" {
return GetLocalMSP()
}

return GetManagerForChain(chainID)
}

// GetManagerForChain returns the msp manager for the supplied
// chain; if no such manager exists, one is created
func GetManagerForChain(chainID string) msp.MSPManager {
m.Lock()
defer m.Unlock()

// 先从缓存查找
mspMgr, ok := mspMap[chainID]
if !ok {
// 找不到则新建立当前通道Msp manager
mspLogger.Debugf("Created new msp manager for channel `%s`", chainID)
mspMgmtMgr := &mspMgmtMgr{msp.NewMSPManager(), false}
mspMap[chainID] = mspMgmtMgr
mspMgr = mspMgmtMgr
} else {
// check for internal mspManagerImpl and mspMgmtMgr types. if a different
// type is found, it's because a developer has added a new type that
// implements the MSPManager interface and should add a case to the logic
// above to handle it.
if !(reflect.TypeOf(mspMgr).Elem().Name() == "mspManagerImpl" || reflect.TypeOf(mspMgr).Elem().Name() == "mspMgmtMgr") {
panic("Found unexpected MSPManager type.")
}
mspLogger.Debugf("Returning existing manager for channel '%s'", chainID)
}
return mspMgr
}

// MSPManager has been setup for a channel, which indicates whether the channel
// exists or not
type mspMgmtMgr struct {
msp.MSPManager
// track whether this MSPManager has been setup successfully
up bool
}

msp.MSPManager是一个接口,从上面代码可以得知,它是利用NewMSPManager创建的:

1
2
3
4
// 创建等待Setup的MSPManager
func NewMSPManager() MSPManager {
return &mspManagerImpl{}
}

疑问是,啥时候Setup的,当前调用路径上没发现这个路径,可能从系统整体流程上,已经保证了,当前调用时,已经创建好了。

获取Identity,是一个剥洋葱的过程:

1
2
3
4
5
6
func (mgr *mspMgmtMgr) DeserializeIdentity(serializedIdentity []byte) (msp.Identity, error) {
if !mgr.up {
return nil, errors.New("channel doesn't exist")
}
return mgr.MSPManager.DeserializeIdentity(serializedIdentity)
}

实际调用mspManagerImplDeserializeIdentity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// DeserializeIdentity returns an identity given its serialized version supplied as argument
func (mgr *mspManagerImpl) DeserializeIdentity(serializedID []byte) (Identity, error) {
// We first deserialize to a SerializedIdentity to get the MSP ID
sId := &msp.SerializedIdentity{}
err := proto.Unmarshal(serializedID, sId)
if err != nil {
return nil, errors.Wrap(err, "could not deserialize a SerializedIdentity")
}

// 获取发送方的msp实例
// we can now attempt to obtain the MSP
msp := mgr.mspsMap[sId.Mspid]
if msp == nil {
return nil, errors.Errorf("MSP %s is unknown", sId.Mspid)
}

switch t := msp.(type) {
case *bccspmsp:
return t.deserializeIdentityInternal(sId.IdBytes)
case *idemixmsp:
return t.deserializeIdentityInternal(sId.IdBytes)
default:
return t.DeserializeIdentity(serializedID)
}
}

转到bccspmsp的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 反序列化二进制,得到证书,然后用证书获取公钥,使用证书、公钥和msp,创建Identity
// deserializeIdentityInternal returns an identity given its byte-level representation
func (msp *bccspmsp) deserializeIdentityInternal(serializedIdentity []byte) (Identity, error) {
// This MSP will always deserialize certs this way
bl, _ := pem.Decode(serializedIdentity)
if bl == nil {
return nil, errors.New("could not decode the PEM structure")
}
cert, err := x509.ParseCertificate(bl.Bytes)
if err != nil {
return nil, errors.Wrap(err, "parseCertificate failed")
}

// Now we have the certificate; make sure that its fields
// (e.g. the Issuer.OU or the Subject.OU) match with the
// MSP id that this MSP has; otherwise it might be an attack
// TODO!
// We can't do it yet because there is no standardized way
// (yet) to encode the MSP ID into the x.509 body of a cert

// 从证书中提取公钥,封装一下,满足bccsp.Key接口
pub, err := msp.bccsp.KeyImport(cert, &bccsp.X509PublicKeyImportOpts{Temporary: true})
if err != nil {
return nil, errors.WithMessage(err, "failed to import certificate's public key")
}

// 利用证书、公钥和msp建立角色身份
return newIdentity(cert, pub, msp)
}

Identity包含了Identity标示符,证书、公钥和所在的msp,创建Identity就是计算以上几项信息的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type identity struct {
// id contains the identifier (MSPID and identity identifier) for this instance
id *IdentityIdentifier

// cert contains the x.509 certificate that signs the public key of this instance
cert *x509.Certificate

// this is the public key of this instance
pk bccsp.Key

// reference to the MSP that "owns" this identity
msp *bccspmsp
}

func newIdentity(cert *x509.Certificate, pk bccsp.Key, msp *bccspmsp) (Identity, error) {
if mspIdentityLogger.IsEnabledFor(zapcore.DebugLevel) {
mspIdentityLogger.Debugf("Creating identity instance for cert %s", certToPEM(cert))
}

// 检查证书
// Sanitize first the certificate
cert, err := msp.sanitizeCert(cert)
if err != nil {
return nil, err
}

// Compute identity identifier

// Use the hash of the identity's certificate as id in the IdentityIdentifier
hashOpt, err := bccsp.GetHashOpt(msp.cryptoConfig.IdentityIdentifierHashFunction)
if err != nil {
return nil, errors.WithMessage(err, "failed getting hash function options")
}

digest, err := msp.bccsp.Hash(cert.Raw, hashOpt)
if err != nil {
return nil, errors.WithMessage(err, "failed hashing raw certificate to compute the id of the IdentityIdentifier")
}

id := &IdentityIdentifier{
Mspid: msp.name,
Id: hex.EncodeToString(digest)}

return &identity{id: id, cert: cert, pk: pk, msp: msp}, nil
}

验证数字签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Verify checks against a signature and a message
// to determine whether this identity produced the
// signature; it returns nil if so or an error otherwise
func (id *identity) Verify(msg []byte, sig []byte) error {
// mspIdentityLogger.Infof("Verifying signature")

// Compute Hash
hashOpt, err := id.getHashOpt(id.msp.cryptoConfig.SignatureHashFamily)
if err != nil {
return errors.WithMessage(err, "failed getting hash function options")
}

digest, err := id.msp.bccsp.Hash(msg, hashOpt)
if err != nil {
return errors.WithMessage(err, "failed computing digest")
}

if mspIdentityLogger.IsEnabledFor(zapcore.DebugLevel) {
mspIdentityLogger.Debugf("Verify: digest = %s", hex.Dump(digest))
mspIdentityLogger.Debugf("Verify: sig = %s", hex.Dump(sig))
}

// 最终会调用bccsp的接口验证签名,SW或者国密
valid, err := id.msp.bccsp.Verify(id.pk, sig, digest, nil)
if err != nil {
return errors.WithMessage(err, "could not determine the validity of the signature")
} else if !valid {
return errors.New("The signature is invalid")
}

return nil
}

解密SignatureHeader

Fabric 使用 SignatureHeader 保存发送方的身份信息,Creator即为序列化后的信息。

SignatureHeaderMaker 接口定义了创建一个 SignatureHeader 的方法,搜索起来实现该接口的结构体很多,本质上只有2个:mspSignerSignatureHeaderCreator

1
2
3
4
5
6
7
8
9
10
11
// SignatureHeaderMaker creates a new SignatureHeader
type SignatureHeaderMaker interface {
// NewSignatureHeader creates a SignatureHeader with the correct signing identity and a valid nonce
NewSignatureHeader() (*cb.SignatureHeader, error)
}

// localmsp
func (s *mspSigner) NewSignatureHeader() (*cb.SignatureHeader, error) {}

// crypto
func (bs *SignatureHeaderCreator) NewSignatureHeader() (*cb.SignatureHeader, error){}

两个实现本质上是一样的,以 mspSigner 为例进行介绍。首先获取实现SigningIdentity接口的实例,然后调用Serialize得到序列化后的身份信息,再随机生成一个Nonce,创建出SignatureHeader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// NewSignatureHeader creates a SignatureHeader with the correct signing identity and a valid nonce
func (s *mspSigner) NewSignatureHeader() (*cb.SignatureHeader, error) {
// 获得SigningIdentity接口实例
signer, err := mspmgmt.GetLocalMSP().GetDefaultSigningIdentity()
if err != nil {
return nil, fmt.Errorf("Failed getting MSP-based signer [%s]", err)
}

// 序列化得到creator
creatorIdentityRaw, err := signer.Serialize()
if err != nil {
return nil, fmt.Errorf("Failed serializing creator public identity [%s]", err)
}

// 获取一个随机nonce
nonce, err := crypto.GetRandomNonce()
if err != nil {
return nil, fmt.Errorf("Failed creating nonce [%s]", err)
}

sh := &cb.SignatureHeader{}
sh.Creator = creatorIdentityRaw
sh.Nonce = nonce

return sh, nil
}

SigningIdentity接口包含了Identity接口,Identity声明了跟证书相关的方法,SigningIdentity则增加了对消息签名的函数Sign

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type SigningIdentity interface {

// Extends Identity
Identity

// Sign the message
Sign(msg []byte) ([]byte, error)

// GetPublicVersion returns the public parts of this identity
GetPublicVersion() Identity
}

type Identity interface {
...
// Serialize converts an identity to bytes
Serialize() ([]byte, error)
...
}

Serialize的实现,实际只包含了证书和MSPID,说明了消息中携带的只包含MSPID和证书作为身份信息,而不是signingidentity的所有字段(signingidentity实现了SigningIdentity接口)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Serialize returns a byte array representation of this identity
func (id *identity) Serialize() ([]byte, error) {
// mspIdentityLogger.Infof("Serializing identity %s", id.id)

// Raw格式证书
pb := &pem.Block{Bytes: id.cert.Raw, Type: "CERTIFICATE"}
pemBytes := pem.EncodeToMemory(pb)
if pemBytes == nil {
return nil, errors.New("encoding of identity failed")
}

// 使用MSPID和序列化后的证书,再次序列化得到身份信息
sId := &msp.SerializedIdentity{Mspid: id.id.Mspid, IdBytes: pemBytes}
idBytes, err := proto.Marshal(sId)
if err != nil {
return nil, errors.Wrapf(err, "could not marshal a SerializedIdentity structure for identity %s", id.id)
}

return idBytes, nil
}

参考资料

  1. https://github.com/yeasy/hyperledger_code_fabric
  2. 《区块链原理、设计与应用》第9章、第10章

教程资料

问题汇总

replace 使用http或https

在使用go replace时,有2点注意:

  • 目标仓库不能带协议头,比如http、https,要从域名或者IP开始
  • 版本号格式要符合语义格式化,测试版本是否符合规则:Go playground 样例代码

直接修改 go.mod 文件格式:

1
replace github.com/hyperledger/fabric v1.4.1 => 192.168.9.251/hyperledger/fabric v1.4.1-alpha.11-yx

或使用命令:

1
go mod edit -replace=github.com/hyperledger/fabric@v1.4.1=192.168.9.251/hyperledger/fabric@v1.4.1

Gitlab 仓库没开启https

go mod 默认使用 go get 下载依赖,而 go get 默认使用 https,如果 Gitlab 仓库没有启用 https,需要使用 -insecure 让go get走http。

问题:

1
2
3
GOPROXY="" go get github.com/hyperledger/fabric@v1.4.1
go: 192.168.9.251/hyperledger/fabric@v1.4.1-alpha.11-yx: unrecognized import path "192.168.9.251/hyperledger/fabric" (https fetch: Get https://192.168.9.251/hyperledger/fabric?go-get=1: dial tcp 192.168.9.251:443: connect: connection refused)
go: error loading module requirements

方案:

1
GOPROXY="" go get -insecure github.com/hyperledger/fabric@v1.4.1

注解:遇到问题时,使用 go get -v 可以看到更多信息,有助分析问题。

Go Modules 代理

由于某些网络原因,国内下载 Github 等处的依赖,不够流程,需要设置代理,不同版本的设置如下:

  • go1.12

    1
    $ export GOPROXY=https://goproxy.cn
  • go1.13

    1
    2
    $ export GOPRIVATE=192.168.9.251
    $ export GOPROXY=https://goproxy.cn,direct

私有仓库

如果仓库设置为私有,这要求用户必须登录才能访问仓库。

Go Modules 默认使用 go get 下载依赖,go get 利用 https 或者 http, 但下载过程没有设置用户名和密码的地方,下载依赖时,可能遇到一下错误:

  • connection refused
  • unkown revision

可以通过设置Github/Gitlab Access Token结果,通过token的方式,访问仓库,token的获取方式为,登录Gitlab仓库,进入以下页面:

Gitlab User Setting -> Access Tokens

在此页面复制下顶端的 Your New Personal Access Token, 然后填写token名字和勾选下方的权限进行创建 Token。

然后执行以下命令:

1
2
3
git config --global \
url."http://oauth2:${your_access_token}@ip_address_or_domain".insteadOf \
"http://ip_address_or_domain"

后面再去 go get 的时候,就可顺利下载依赖。