0%

新老朋友好久不见,我是大彬。今天为大家带来的分享是Go语言垃圾回收,这篇文章筹划的了很久,因为GC也是很强大的一个话题,关于GC已经有很多篇论文还有书籍,想通过一篇文章来介绍Go语言的垃圾回收是困难的,所以决定分几篇文章来完成Go语言垃圾回收的相关话题:

  1. Go垃圾回收 1: 历史和原理
  2. Go垃圾回收 2: GC主要流程
  3. Go垃圾回收 3: 源码分析
  4. Go垃圾回收 4: GC对性能的影响与优化

虽然划分成了3部分,但每个子话题依然很大,依然难写,依然大而不全,每一篇文章都会有宏观与细节,这样的大而不全对于不了解GC的朋友是好事,即可以有宏观上的认识,又可以有重要细节的感知。

这篇文章就是第一个话题:Go垃圾回收历史和原理,希望各位有所收获。

Go语言垃圾回收简介

垃圾指内存中不再使用的内存区域,自动发现与释放这种内存区域的过程就是垃圾回收。

内存资源是有限的,而垃圾回收可以让内存重复使用,并且减轻开发者对内存管理的负担,减少程序中的内存问题。

以下是从网上对垃圾回收的2个定义:

  1. Garbage consists of objects that are dead.
  2. In tracing garbage collection, the term is sometimes used to mean objects that are known to be dead; that is, objects that are unreachable.

Go垃圾回收发展史

  • go1.1,提高效率和垃圾回收精确度。
  • go1.3,提高了垃圾回收的精确度。
  • go1.4,之前版本的runtime大部分是使用C写的,这个版本大量使用Go进行了重写,让GC有了扫描stack的能力,进一步提高了垃圾回收的精确度。

  • go1.5,目标是降低GC延迟,采用了并发标记和并发清除,三色标记write barrier,以及实现了更好的回收器调度,设计文档1文档2,以及这个版本的Go talk

  • go1.6,小优化,当程序使用大量内存时,GC暂停时间有所降低。
  • go1.7,小优化,当程序有大量空闲goroutine,stack大小波动比较大时,GC暂停时间有显著降低。
  • go1.8write barrier切换到hybrid write barrier,以消除STW中的re-scan,把STW的最差情况降低到50us,设计文档
  • go1.9,提升指标比较多,1)过去 runtime.GC, debug.SetGCPercent, 和 debug.FreeOSMemory都不能触发并发GC,他们触发的GC都是阻塞的,go1.9可以了,变成了在垃圾回收之前只阻塞调用GC的goroutine。2)debug.SetGCPercent只在有必要的情况下才会触发GC。
  • go.1.10,小优化,加速了GC,程序应当运行更快一点点
  • go1.12,显著提高了堆内存存在大碎片情况下的sweeping性能,能够降低GC后立即分配内存的延迟。

以上的历史版本信息都来自Go release归档,有兴趣可以去翻阅一下。

Go垃圾回收主要流程

下面这幅图来自Go1.5的go talk,虽然go1.12的GC与go1.5有了许多改变,但总体的流程没有较大改变,并且也找不到官方更新的图了,所有就用这幅图介绍GC主流程。

Go GC

Go 垃圾回收是分轮次的,每一轮GC都是从 Off 状态开始,如果不是 Off 状态,则代表上一轮GC还未完成,如果这时修改指针的值,是直接修改的。

Go 垃圾回收的主要分2部分,第1部分是扫描所有对象进行三色标记,标记为黑色、灰色和白色,标记完成后只有黑色和白色对象,黑色代表使用中对象,白色对象代表垃圾,灰色是白色过渡到黑色的中间临时状态,第2部分是清扫垃圾,即清理白色对象。

第1部分包含了栈扫描、标记和标记结束3个阶段。在栈扫描之前有2个重要的准备:STW(Stop The World)和开启写屏障(WB,Write Barrier)。

STW是为了暂停当前所有运行中的goroutine,进行一些准备工作,比如开启WB,把全局变量,以及每个goroutine中的 Root对象 收集起来,Root对象是标记扫描的源头,可以从Root对象依次索引到使用中的对象。

Objects Reference Tree

假设内存中的对象用圆圈表示,那根据对象的指向关系,所有的对象可以组成若干依赖树,每一个 Root对象 都是树根,按图索骥能找到每一个使用中的对象。但树根不一定是Root对象,也有可能是垃圾,使用灰色树根代表Root对象,白色树根代表垃圾。

每个P都有一个 mcache ,每个 mcache 都有1个Span用来存放 TinyObject,TinyObject 都是不包含指针的对象,所以这些对象可以直接标记为黑色,然后关闭 STW。

如果不了解mcache和Tiny对象,赶紧翻一下这篇文章Go内存分配那些事

每个P都有1个进行扫描标记的 goroutine,可以进行并发标记,关闭STW后,这些 goroutine 就变成可运行状态,接收 Go Scheduler 的调度,被调度时执行1轮标记,它负责第1部分任务:栈扫描、标记和标记结束。

栈扫描阶段就是把前面搜集的Root对象找出来,标记为黑色,然后把它们引用的对象也找出来,标记为灰色,并且加入到gcWork队列,gcWork队列保存了灰色的对象,每个灰色的对象都是一个Work。

后面可以进入标记阶段,它是一个循环,不断的从gcWork队列中取出work,所指向的对象标记为黑色,该对象指向的对象标记为灰色,然后加入队列,直到队列为空。

然后进入标记结束阶段,再次开启STW,不同的版本处理方式是不同的。

在Go1.7的版本是Dijkstra写屏障,这个写屏障只监控堆上指针数据的变动,由于成本原因,没有监控栈上指针的变动,由于应用goroutine和GC的标记goroutine都在运行,当栈上的指针指向的对象变更为白色对象时,这个白色对象应当标记为黑色,需要再次扫描全局变量和栈,以免释放这类不该释放的对象。

在Go1.8及以后的版本引入了混合写屏障,这个写屏障依然不监控栈上指针的变动,但是它的策略,使得无需再次扫描栈和全局变量,但依然需要STW然后进行一些检查。

标记结束阶段的最后会关闭写屏障,然后关闭STW,唤醒熟睡已久的负责清扫垃圾的goroutine。

清扫goroutine是应用启动后立即创建的一个后台goroutine,它会立刻进入睡眠,等待被唤醒,然后执行垃圾清理:把白色对象挨个清理掉,清扫goroutine和应用goroutine是并发进行的。清扫完成之后,它再次进入睡眠状态,等待下次被唤醒。

最后执行一些数据统计和状态修改的工作,并且设置好触发下一轮GC的阈值,把GC状态设置为Off。

以上就是Go垃圾回收的主要流程,但和go1.12的源码稍微有一些不同,比如标记结束后,就开始设置各种状态数据以及把GC状态成了Off,在开启一轮GC时,会自动检测当前是否处于Off,如果不是Off,则当前goroutine会调用清扫函数,帮助清扫goroutine一起清扫span,实际的Go垃圾回收流程以源码为准。

主要流程是宏观一点的角度,接下去会扩散一下,介绍主要流程中提到的各种概念,比如三色标记、并发标记清理、STW、写屏障、辅助GC、GC persent。

几类垃圾回收思想

垃圾回收的研究已经存在了几十年,远在Go诞生之前,就存在了多种垃圾回收的思想,我们这里看几个跟Go垃圾回收相关的几个。

Tracing GC

WIKI介绍:https://en.wikipedia.org/wiki/Tracing_garbage_collection

Tracing GC 是垃圾回收的一个大类,另外一个大类是引用计数,关于各种垃圾回收的类别可以看下这个系列文章深入浅出垃圾回收

本文主要介绍Tracing GC的简要原理,我们首先看一下引用树的概念。把内存中所有的对象,都作为一个节点,对象A中的指针,指向了对象B,就存在从对象A指向对象B的一条边,对象B也可能指向了其他对象,那么根据指向关系就能生成一颗对象引用树。

Objects Reference Tree

把内存中所有的对象引用树组合起来,就组成了一幅图。

Memory Objects

Tracing GC中有2类对象:

  1. 可到达对象,即使用中对象
  2. 不可到达对象,即垃圾

Tracing GC使用对象引用树找到所有可到达的对象,找到可到达对象有2个原则。

原则1:被程序中调用栈,或者全局变量指向的对象是可到达对象。

Root Objects

原则2:被可到达对象指向的对象也是可到达对象。

A是可到达的,并且B被A引用,所以B也是可到达的。

Reachable Objects

Tracing GC使用任何一种图论的遍历算法,都可以从Root对象,根据引用关系找到所有的可到达对象,并把他们做标记。Tracing GC扫描后,黑色对象为可到达对象,剩下的白色对象为不可到达对象。

原生的 Tracing GC 只有黑色和白色2种颜色。

Tracing GC

增量式垃圾回收思想

垃圾回收离不开STW,STW是Stop The World,指会暂停所有正在执行的用户线程/协程,进行垃圾回收的操作,STW为垃圾对象的扫描和标记提供了必要的条件。

非增量式垃圾回收需要STW,在STW期间完成所有垃圾对象的标记,STW结束后慢慢的执行垃圾对象的清理。

增量式垃圾回收也需要STW,在STW期间完成部分垃圾对象的标记,然后结束STW继续执行用户线程,一段时间后再次执行STW再标记部分垃圾对象,这个过程会多次重复执行,直到所有垃圾对象标记完成。

Increment GC

GC算法有3大性能指标:吞吐量、最大暂停时间(最大的STW占时)、内存占用率。增量式垃圾回收不能提高吞吐量,但和非增量式垃圾回收相比,每次STW的时间更短,能够降低最大暂停时间,就是Go每个版本Release Note中提到的GC延迟、GC暂停时间。

下图是非增量式GC和增量式GC的对比:

Normal V.S. Increment GC

以上图片来自 Incremental Garbage Collection in Ruby 2.2 ,它也很好的介绍了增量式垃圾回收的思想。

并发垃圾回收

减少最大暂停时间还有一种思路:并发垃圾回收,注意不是并行垃圾回收。

并行垃圾回收是每个核上都跑垃圾回收的线程,同时进行垃圾回收,这期间为STW,会暂停用户线程的执行。

并发垃圾回收是先STW找到所有的Root对象,然后结束STW,让垃圾标记线程和用户线程并发执行,垃圾标记完成后,再次开启STW,再次扫描和标记,以免释放使用中的内存。

并发垃圾回收和并行垃圾回收的重要区别就是不会持续暂停用户线程,并发垃圾回收也降低了STW的时间,达到了减少最大暂停时间的目的。

图片来自 Reducing Garbage-Collection Pause Time ,橙色线条为垃圾回收线程的运行,蓝色线条为用户线程。

Go垃圾回收主要原理

三色标记

为什么需要三色标记?

三色标记的目的,主要是利用Tracing GC做增量式垃圾回收,降低最大暂停时间。原生Tracing GC只有黑色和白色,没有中间的状态,这就要求GC扫描过程必须一次性完成,得到最后的黑色和白色对象。在前面增量式GC中介绍到了,这种方式会存在较大的暂停时间。

三色标记增加了中间状态灰色,增量式GC运行过程中,应用线程的运行可能改变了对象引用树,只要让黑色对象不直接引用白色对象,GC就可以增量式的运行,减少停顿时间。

什么是三色标记?

三色标记,望文生义可以知道它由3种颜色组成:

  1. 黑色 Black:表示对象是可达的,即使用中的对象,黑色是已经被扫描的对象。
  2. 灰色 Gary:表示被黑色对象直接引用的对象,但还没对它进行扫描。
  3. 白色 White:白色是对象的初始颜色,如果扫描完成后,对象依然还是白色的,说明此对象是垃圾对象。

三色标记规则:黑色不能指向白色对象。即黑色可以指向灰色,灰色可以指向白色。

三色标记主要流程:

  1. 初始所有对象被标记为白色。
  2. 寻找所有Root对象,比如被线程直接引用的对象,把Root对象标记为灰色。
  3. 把灰色对象标记为黑色,并它们引用的对象标记为灰色。
  4. 持续遍历每一个灰色对象,直到没有灰色对象。
  5. 剩余白色对象为垃圾对象。

推荐一篇结合Go代码展示了三色标记的过程的优秀文章:
Golang’s Real-time GC in Theory and Practice

记录三色的方法简介

Go1.12 使用位图和队列结合表示三种颜色状态:

  1. 白色:位图没有标记被扫描。
  2. 灰色:位图被标记已扫描,并且对象在队列。
  3. 黑色:位图被标记已扫描,并且对象已从队列弹出。

位图是全局的,表示了Heap中内存块是否被扫描,是否包含指针等。

队列有全局的一个和每个P有一个本地队列,扫描对象进行标记的过程,优先处理本P的队列,其思想与P的g本地队列和全局队列类似,减少资源竞争,提高并行化。

写屏障

我们结合一段用户代码介绍写屏障:

1
2
A.Next = B
A.Next = &C{}

三色标记的扫描线程是跟用户线程并发执行的,考虑这种情况:

用户线程执行完 A.Next = B 后,扫描线程把A标记为黑色,B标记为灰色,用户线程执行 A.Next = &C{} ,C是新对象,被标记为白色,由于A已经被扫描,不会重复扫描,所以C不会被标记为灰色,造成了黑色对象指向白色对象的情况,这种三色标记中是不允许的,结果是C被认为是垃圾对象,最终被清扫掉,当访问C时会造成非法内存访问而Panic。

写屏障可以解决这个问题,当对象引用树发生改变时,即对象指向关系发生变化时,将被指向的对象标记为灰色,维护了三色标记的约束:黑色对象不能直接引用白色对象,这避免了使用中的对象被释放。

有写屏障后,用户线程执行 A.Next = &C{} 后,写屏障把C标记为灰色。

并发标记

并发垃圾回收的主要思想上文已经介绍,Go的垃圾回收为每个P都分配了一个gcMarker协程,用于并发标记对象,这样有些P在标记对象,而有些P上继续运行用户协程。

Go的并发标记有4种运行模式,还没深入研究,这里举一个并发标记的场景:在goroutine的调度过程中,如果当前P上已经没有g可以执行,也偷不到g时,P就空闲下来了,这时候可以运行当前P的gcMarker协程。

触发GC

GC有3种触发方式:

  • 辅助GC

    在分配内存时,会判断当前的Heap内存分配量是否达到了触发一轮GC的阈值(每轮GC完成后,该阈值会被动态设置),如果超过阈值,则启动一轮GC。

  • 调用runtime.GC()强制启动一轮GC。

  • sysmon是运行时的守护进程,当超过 forcegcperiod (2分钟)没有运行GC会启动一轮GC。

GC调节参数

Go垃圾回收不像Java垃圾回收那样,有很多参数可供调节,Go为了保证使用GC的简洁性,只提供了一个参数GOGC

GOGC代表了占用中的内存增长比率,达到该比率时应当触发1次GC,该参数可以通过环境变量设置。

它的单位是百分比,取值范围并不是 [0, 100],可以是1000,甚至2000,2000时代表2000%,即20倍。

假如当前heap占用内存为4MB,GOGC = 75

1
4 * (1+75%) = 7MB

等heap占用内存大小达到7MB时会触发1轮GC。

GOGC还有2个特殊值:

  1. "off" : 代表关闭GC
  2. 0 : 代表持续进行垃圾回收,只用于调试

总结

本文主要介绍了Go垃圾回收的发展史,以及Go垃圾回收的一些主要概念,是为掌握Go垃圾回收提供一个基础。下期文章将把本文提到的概念串起来,介绍Go垃圾回收的主要流程,下期见。

参考资料

JAVA (JAVA8 HOTSPOT VM) GO
Collector Several collectors (Serial, Parallel, CMS, G1) CMS
Compaction Compacts Does not compact
Generational GC Generational GC Non-generational GC
Tuning parameters Depends on the collector.Multiple parameters available. Go垃圾回收 only
  • 【译】 Golang 中的垃圾回收(一)

    这篇文章是William Kennedy垃圾回收系列文章的第一篇的译文,这个文章从宏观的角度介绍了垃圾回收的原理,把垃圾回收跟调度结合起来介绍,分析了Go GC是如何实现低延时的。并且详细介绍了并发标记、STW、并发清除等。

  • 图解Golang的GC算法

    RyuGou用图的方式简述了三色标记法的标记清除过程以及写屏障。

  • Golang’s Real-time GC in Theory and Practice

    这篇文章有一个非常棒的GC动画。

  • 学习 Golang GC

    这篇文章对GC的历史、原理、goroutine栈,Go GC历史,基础原理,触发时间都有介绍,是一篇大而全的文章,但每个部分确实也都不详细,值得再参考。

  • Golang 垃圾回收剖析

  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/10/20/go-gc-1-history-and-priciple/

关注公众号,获取最新Golang文章

前言

Event是应用和Fabric网络交互的一种方式,应用可以通过SDKPeer订阅某种类型的事件,当Peer发现事件发生时,可以把Event发送给应用,应用获取到通知信息。

Event功能介绍

Event从来源上可以分为2类:

  1. 链码容器发出的Event
  2. Peer上账本变更发出的Event

fabric event

图源自Tutorial Chaincode Event Listener on Hyperledger Fabric Java SDK

翻阅Node SDK和Go SDK的文档,发现SDK提供了4类事件:

  1. BlockEvent,可以用来监控被添加到账本上的区块。客户端需要Admin权限,这样才能读取完整的区块,每产生一个区块,它都会接收到通知。区块中有交易,交易中有chaincode event,所以可以通过BlockEvent获取其他事件。
  2. FilteredBlockEvent,可以用来监控简要的区块信息,当不只关心区块包含了哪些交易,交易是否成功时,它非常实用,还可以降低网络负载。它包含区块的部分信息,所以被称为filtered,信息有channel ID,区块号,交易的validation code。
  3. TransactionStatusEvent,可以用来监控某个交易在当前组织的peer何时完成。可以得到交易的validation code和交易所在区块。
  4. ChaincodeEvent,用来监听Chaincode发出的事件,不同的链码可以自定义自己的事件,所以这个更具有个性化。包含了交易id、区块号、链码id、事件名称,事件内容。如果想要查看事件内容,客户端所使用的账号,必须是Admin权限。

另外,订阅事件时可以指定开启和结束的区块号范围,如果开始的区块号已经产生,即区块已经写入账本,可以重放事件,更多信息可以看下面的文档。

关于Event的2篇重要文档,深深感觉Node SDK的文档,比Go SDK的文档丰富。

架构

上一节的介绍能够知道有哪些Event,各有什么作用,这一节介绍SDK和Peer是如何进行事件交互的。

SDK和Peer之间是通过gRPC通信的,gRPC的protos的定义文件4种message:

1
2
3
4
FilteredBlock,给FilteredBlockEvent使用
FilteredTransaction,结合下一个,给FilteredTransactionEvent使用
FilteredTransactionActions
FilteredChaincodeAction,给ChaincodeEvent使用

和1个Response,其中使用了oneof。

  • status,指http status,成功的时候无需使用,错误的时候可以使用指明错误。
  • block,给BlockEvent使用
  • FilteredBlock,给另外3种事件使用
1
2
3
4
5
6
7
8
// DeliverResponse
message DeliverResponse {
oneof Type {
common.Status status = 1;
common.Block block = 2;
FilteredBlock filtered_block = 3;
}
}

以及2个gRPC通信接口:

1
2
3
4
5
6
7
8
9
10
11
12
service Deliver {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received
rpc Deliver (stream common.Envelope) returns (stream DeliverResponse) {
}
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received
rpc DeliverFiltered (stream common.Envelope) returns (stream DeliverResponse) {
}
}

咦,4个Event,怎么只有2个接口?

配合下图,我们做一番讲解。

fabric sdk go event

对于Peer而言,只有2中类型的订阅:

  1. BlockEvent,即完整的区块
  2. FilteredBlockEvent,即不完整的区块,可以根据FilteredBlockEvent中的信息,生成FilteredTransactionEvent信息和ChainCodeEvent信息

图中深蓝色和绿色的线分别代表BlockEvent和FilteredBlockEvent相关的数据流,BlockEvent使用的是Deliver函数,FilteredBlockEvent使用的是DeliverFiltered函数。

每一个事件订阅,都是一个gRPC连接,Peer会不断的从账本读区块,然后根据区块生成事件,发送给客户端。

Go SDK中实现了一个Dispatcher,就是提供这么一个中转的功能,对上层应用提供4中类型的事件,把4种事件注册请求转换为2种,调用DeliverClient把事件订阅请求发送给Peer,又把Peer发来的2种事件,转换为应用订阅的事件响应。

Peer启动时,启动gRPC服务后,会注册好DeliverServer接收事件订阅,然后调用deliverBlocks进入循环,在新区块产生后,会生成订阅的BlockEvent或FilteredBlockEvent,利用ResponseSender把事件发送给SDK。

event.pb.go源码

这就是根据events.proto生成的Go文件,负责创建gRPC通信的客户端和服务端,以及两边的消息发送。

主要关注下2个接口:

deliverClient实现了DeliverClient,已经在该源文件实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
// DeliverClient is the client API for Deliver service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DeliverClient interface {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received
Deliver(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverClient, error)
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received
DeliverFiltered(ctx context.Context, opts ...grpc.CallOption) (Deliver_DeliverFilteredClient, error)
}

DeliverServer是服务端的接口,需要Peer实现。

1
2
3
4
5
6
7
8
9
10
11
// DeliverServer is the server API for Deliver service.
type DeliverServer interface {
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of block replies is received
Deliver(Deliver_DeliverServer) error
// deliver first requires an Envelope of type ab.DELIVER_SEEK_INFO with
// Payload data as a marshaled orderer.SeekInfo message,
// then a stream of **filtered** block replies is received
DeliverFiltered(Deliver_DeliverFilteredServer) error
}

Peer event源码

Peer干了这么几件事:

  1. 注册gRPC服务,即注册接受客户端发来的事件订阅的函数
  2. gRPC收到消息,订阅相应事件注册处理函数
  3. 处理函数持续向客户端发送区块事件,直到结束

添加Deliver服务

serve是Peer启动后的运行的主函数,它会创建gRPC server,以及创建DeliverEvent server,并把它绑定到gRPC server上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// peer/node/start.go
func serve(args []string) error {
...
// 创建peer的gRPC server
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
if err != nil {
logger.Fatalf("Failed to create peer server (%s)", err)
}
...
// 创建和启动基于gRPC的event deliver server
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
...
}

创建DeliverEventsServer,实际是创建好处理事件订阅的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// core/peer/deliverevents.go
// NewDeliverEventsServer creates a peer.Deliver server to deliver block and
// filtered block events
func NewDeliverEventsServer(mutualTLS bool, policyCheckerProvider PolicyCheckerProvider, chainManager deliver.ChainManager, metricsProvider metrics.Provider) peer.DeliverServer {
timeWindow := viper.GetDuration("peer.authentication.timewindow")
if timeWindow == 0 {
defaultTimeWindow := 15 * time.Minute
logger.Warningf("`peer.authentication.timewindow` not set; defaulting to %s", defaultTimeWindow)
timeWindow = defaultTimeWindow
}
metrics := deliver.NewMetrics(metricsProvider)
return &server{
// 创建handler
dh: deliver.NewHandler(chainManager, timeWindow, mutualTLS, metrics),
policyCheckerProvider: policyCheckerProvider,
}
}

// NewHandler creates an implementation of the Handler interface.
func NewHandler(cm ChainManager, timeWindow time.Duration, mutualTLS bool, metrics *Metrics) *Handler {
return &Handler{
ChainManager: cm,
TimeWindow: timeWindow,
BindingInspector: InspectorFunc(comm.NewBindingInspector(mutualTLS, ExtractChannelHeaderCertHash)),
Metrics: metrics,
}
}

server实现了DeliverServer接口,当gRPC接收到事件注册时,就可以调用Deliver或者FilteredDeliver被调用时,就调用server的DeliverFiltered或者Deliver函数。

1
2
3
4
5
// server holds the dependencies necessary to create a deliver server
type server struct {
dh *deliver.Handler
policyCheckerProvider PolicyCheckerProvider
}

接收事件订阅

BlockEvent的注册和事件处理主要流程如下:

1
2
server.Deliver -> Handler.Handle ->
deliverBlocks -> SendBlockResponse -> blockResponseSender.SendBlockResponse -> gRPC生成的server Send函数

FilteredBlockEvent的注册和事件处理主要流程如下:

1
2
server.DeliverFiltered -> Handler.Handle ->
deliverBlocks -> SendBlockResponse -> filteredBlockResponseSender.SendBlockResponseg -> RPC生成的server Send函数

它们2个流程是类似的,下面就以BlockEvent的流程介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Deliver sends a stream of blocks to a client after commitment
func (s *server) Deliver(srv peer.Deliver_DeliverServer) (err error) {
logger.Debugf("Starting new Deliver handler")
defer dumpStacktraceOnPanic()
// getting policy checker based on resources.Event_Block resource name
deliverServer := &deliver.Server{
PolicyChecker: s.policyCheckerProvider(resources.Event_Block),
Receiver: srv,
// 创建了sender
ResponseSender: &blockResponseSender{
Deliver_DeliverServer: srv,
},
}
return s.dh.Handle(srv.Context(), deliverServer)
}

// Handle receives incoming deliver requests.
func (h *Handler) Handle(ctx context.Context, srv *Server) error {
addr := util.ExtractRemoteAddress(ctx)
logger.Debugf("Starting new deliver loop for %s", addr)
h.Metrics.StreamsOpened.Add(1)
defer h.Metrics.StreamsClosed.Add(1)
for {
logger.Debugf("Attempting to read seek info message from %s", addr)
envelope, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}

// 主体
status, err := h.deliverBlocks(ctx, srv, envelope)
if err != nil {
return err
}

err = srv.SendStatusResponse(status)
if status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}

logger.Debugf("Waiting for new SeekInfo from %s", addr)
}
}

deliverBlocks的主要作用就是不停的获取区块,然后调用sender发送事件,其中还包含了事件订阅信息的获取,错误处理等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) {
...
for {
...
var block *cb.Block
var status cb.Status

iterCh := make(chan struct{})
go func() {
// 获取下一个区块,当账本Append Block时,就可以拿到要写入到账本的区块
block, status = cursor.Next()
close(iterCh)
}()
...
// 发送区块
if err := srv.SendBlockResponse(block); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
}

h.Metrics.BlocksSent.With(labels...).Add(1)

// 停止判断
if stopNum == block.Header.Number {
break
}
}
...
}

Iterator接口用来获取区块.

1
2
3
4
5
6
7
8
// Iterator is useful for a chain Reader to stream blocks as they are created
type Iterator interface {
// Next blocks until there is a new block available, or returns an error if
// the next block is no longer retrievable
Next() (*cb.Block, cb.Status)
// Close releases resources acquired by the Iterator
Close()
}

Fabric有3种类型的账本:ram、json和file,它们都实现了这个接口,这里主要是为了辅助解释事件机制,我们看一个最简单的:ram的实现。

Next()拿到的区块是从simpleList.SetNext()存进去的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (cu *cursor) Next() (*cb.Block, cb.Status) {
// This only loops once, as signal reading indicates non-nil next
// 实际只执行1次
for {
// 拿到区块
next := cu.list.getNext()
if next != nil {
cu.list = next
return cu.list.block, cb.Status_SUCCESS
}
<-cu.list.signal
}
}

func (s *simpleList) getNext() *simpleList {
s.lock.RLock()
defer s.lock.RUnlock()
return s.next
}

// 设置
func (s *simpleList) setNext(n *simpleList) {
s.lock.Lock()
defer s.lock.Unlock()
s.next = n
}

Append()是账本对外提供的接口,当要把区块追加到账本时,会调用此函数,该函数会调用setNext()设置待追加的区块。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Append appends a new block to the ledger
func (rl *ramLedger) Append(block *cb.Block) error {
rl.lock.Lock()
defer rl.lock.Unlock()
// ...
rl.appendBlock(block)
return nil
}

func (rl *ramLedger) appendBlock(block *cb.Block) {
next := &simpleList{
signal: make(chan struct{}),
block: block,
}
// 设置最新的区块
rl.newest.setNext(next)
// ...
}

发送事件消息

blockResponseSender.SendBlockResponse是BlockEvent的事件发送函数,实际就是调用gRPC生成的函数。

blockResponseSender是在server.Deliver中创建的,它实际就是peer.Deliver_DeliverServer

1
2
3
4
5
6
7
8
9
10
11
12
// blockResponseSender structure used to send block responses
type blockResponseSender struct {
peer.Deliver_DeliverServer
}

// SendBlockResponse generates deliver response with block message
func (brs *blockResponseSender) SendBlockResponse(block *common.Block) error {
response := &peer.DeliverResponse{
Type: &peer.DeliverResponse_Block{Block: block},
}
return brs.Send(response)
}

Go SDK源码

社区正在重构fabric-sdk-go,所以这里不着重介绍sdk的源码了,提醒几个重要的点,可能以后还有。

DeliverDeliverFiltered被封装成了2个全局函数:

1
2
3
4
5
6
7
8
9
10
11
var (
// Deliver creates a Deliver stream
Deliver = func(client pb.DeliverClient) (deliverStream, error) {
return client.Deliver(context.Background())
}

// DeliverFiltered creates a DeliverFiltered stream
DeliverFiltered = func(client pb.DeliverClient) (deliverStream, error) {
return client.DeliverFiltered(context.Background())
}
)

它们会被调用,进一步封装成provider,provider会为dispatch服务:

1
2
3
4
5
6
7
8
9
10
11
12
// deliverProvider is the connection provider used for connecting to the Deliver service
var deliverProvider = func(context fabcontext.Client, chConfig fab.ChannelCfg, peer fab.Peer) (api.Connection, error) {
if peer == nil {
return nil, errors.New("Peer is nil")
}

eventEndpoint, ok := peer.(api.EventEndpoint)
if !ok {
panic("peer is not an EventEndpoint")
}
return deliverconn.New(context, chConfig, deliverconn.Deliver, peer.URL(), eventEndpoint.Opts()...)
}

Dispatcher

Dispatcher会保存BlockEvent和FilteredBlockEvent的注册,以及用2个maptxRegistrationsccRegistrations保存交易和Chaincode Event的注册,handlers是各种注册事件的处理函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Dispatcher is responsible for handling all events, including connection and registration events originating from the client,
// and events originating from the channel event service. All events are processed in a single Go routine
// in order to avoid any race conditions and to ensure that events are processed in the order in which they are received.
// This also avoids the need for synchronization.
// The lastBlockNum member MUST be first to ensure it stays 64-bit aligned on 32-bit machines.
type Dispatcher struct {
lastBlockNum uint64 // Must be first, do not move
params
updateLastBlockInfoOnly bool
state int32
eventch chan interface{}
blockRegistrations []*BlockReg
filteredBlockRegistrations []*FilteredBlockReg
handlers map[reflect.Type]Handler
txRegistrations map[string]*TxStatusReg
ccRegistrations map[string]*ChaincodeReg
}

注册事件

这是Dispatcher的事件注册函数,在它眼里,不止有4个事件:

1
2
3
4
5
6
7
8
9
10
// RegisterHandler registers an event handler
func (ed *Dispatcher) RegisterHandler(t interface{}, h Handler) {
htype := reflect.TypeOf(t)
if _, ok := ed.handlers[htype]; !ok {
logger.Debugf("Registering handler for %s on dispatcher %T", htype, ed)
ed.handlers[htype] = h
} else {
logger.Debugf("Cannot register handler %s on dispatcher %T since it's already registered", htype, ed)
}
}

注册各注册事件的处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// RegisterHandlers registers all of the handlers by event type
func (ed *Dispatcher) RegisterHandlers() {
ed.RegisterHandler(&RegisterChaincodeEvent{}, ed.handleRegisterCCEvent)
ed.RegisterHandler(&RegisterTxStatusEvent{}, ed.handleRegisterTxStatusEvent)
ed.RegisterHandler(&RegisterBlockEvent{}, ed.handleRegisterBlockEvent)
ed.RegisterHandler(&RegisterFilteredBlockEvent{}, ed.handleRegisterFilteredBlockEvent)
ed.RegisterHandler(&UnregisterEvent{}, ed.handleUnregisterEvent)
ed.RegisterHandler(&StopEvent{}, ed.HandleStopEvent)
ed.RegisterHandler(&TransferEvent{}, ed.HandleTransferEvent)
ed.RegisterHandler(&StopAndTransferEvent{}, ed.HandleStopAndTransferEvent)
ed.RegisterHandler(&RegistrationInfoEvent{}, ed.handleRegistrationInfoEvent)

// The following events are used for testing only
ed.RegisterHandler(&fab.BlockEvent{}, ed.handleBlockEvent)
ed.RegisterHandler(&fab.FilteredBlockEvent{}, ed.handleFilteredBlockEvent)
}

接收Peer事件

handleEvent用来处理来自Peer的事件,不同的类型调用不同的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (ed *Dispatcher) handleEvent(e esdispatcher.Event) {
delevent := e.(*connection.Event)
evt := delevent.Event.(*pb.DeliverResponse)
switch response := evt.Type.(type) {
case *pb.DeliverResponse_Status:
ed.handleDeliverResponseStatus(response)
case *pb.DeliverResponse_Block:
ed.HandleBlock(response.Block, delevent.SourceURL)
case *pb.DeliverResponse_FilteredBlock:
ed.HandleFilteredBlock(response.FilteredBlock, delevent.SourceURL)
default:
logger.Errorf("handler not found for deliver response type %T", response)
}
}

HandleBlock把Event封装是BlockEvent退给应用。可以看到BlockEvent也会发布FilteredBlockEvent。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// HandleBlock handles a block event
func (ed *Dispatcher) HandleBlock(block *cb.Block, sourceURL string) {
logger.Debugf("Handling block event - Block #%d", block.Header.Number)

if err := ed.updateLastBlockNum(block.Header.Number); err != nil {
logger.Error(err.Error())
return
}

if ed.updateLastBlockInfoOnly {
ed.updateLastBlockInfoOnly = false
return
}

logger.Debug("Publishing block event...")
ed.publishBlockEvents(block, sourceURL)
ed.publishFilteredBlockEvents(toFilteredBlock(block), sourceURL)
}

func (ed *Dispatcher) publishBlockEvents(block *cb.Block, sourceURL string) {
for _, reg := range ed.blockRegistrations {
if !reg.Filter(block) {
logger.Debugf("Not sending block event for block #%d since it was filtered out.", block.Header.Number)
continue
}

if ed.eventConsumerTimeout < 0 {
select {
case reg.Eventch <- NewBlockEvent(block, sourceURL):
default:
logger.Warn("Unable to send to block event channel.")
}
} else if ed.eventConsumerTimeout == 0 {
reg.Eventch <- NewBlockEvent(block, sourceURL)
} else {
select {
case reg.Eventch <- NewBlockEvent(block, sourceURL):
case <-time.After(ed.eventConsumerTimeout):
logger.Warn("Timed out sending block event.")
}
}
}
}

FilteredBlockEvent能解析出TransactionEvent和ChaincodeEvent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (ed *Dispatcher) publishFilteredBlockEvents(fblock *pb.FilteredBlock, sourceURL string) {
if fblock == nil {
logger.Warn("Filtered block is nil. Event will not be published")
return
}

logger.Debugf("Publishing filtered block event: %#v", fblock)

checkFilteredBlockRegistrations(ed, fblock, sourceURL)

for _, tx := range fblock.FilteredTransactions {
// 发布交易订阅
ed.publishTxStatusEvents(tx, fblock.Number, sourceURL)

// Only send a chaincode event if the transaction has committed
if tx.TxValidationCode == pb.TxValidationCode_VALID {
txActions := tx.GetTransactionActions()
if txActions == nil {
continue
}
if len(txActions.ChaincodeActions) == 0 {
logger.Debugf("No chaincode action found for TxID[%s], block[%d], source URL[%s]", tx.Txid, fblock.Number, sourceURL)
}
for _, action := range txActions.ChaincodeActions {
if action.ChaincodeEvent != nil {
// 发布chaincode event订阅
ed.publishCCEvents(action.ChaincodeEvent, fblock.Number, sourceURL)
}
}
} else {
logger.Debugf("Cannot publish CCEvents for block[%d] and source URL[%s] since Tx Validation Code[%d] is not valid", fblock.Number, sourceURL, tx.TxValidationCode)
}
}
}

func (ed *Dispatcher) publishTxStatusEvents(tx *pb.FilteredTransaction, blockNum uint64, sourceURL string) {
logger.Debugf("Publishing Tx Status event for TxID [%s]...", tx.Txid)
if reg, ok := ed.txRegistrations[tx.Txid]; ok {
logger.Debugf("Sending Tx Status event for TxID [%s] to registrant...", tx.Txid)

if ed.eventConsumerTimeout < 0 {
select {
case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL):
default:
logger.Warn("Unable to send to Tx Status event channel.")
}
} else if ed.eventConsumerTimeout == 0 {
reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL)
} else {
select {
case reg.Eventch <- NewTxStatusEvent(tx.Txid, tx.TxValidationCode, blockNum, sourceURL):
case <-time.After(ed.eventConsumerTimeout):
logger.Warn("Timed out sending Tx Status event.")
}
}
}
}

func (ed *Dispatcher) publishCCEvents(ccEvent *pb.ChaincodeEvent, blockNum uint64, sourceURL string) {
for _, reg := range ed.ccRegistrations {
logger.Debugf("Matching CCEvent[%s,%s] against Reg[%s,%s] ...", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter)
if reg.ChaincodeID == ccEvent.ChaincodeId && reg.EventRegExp.MatchString(ccEvent.EventName) {
logger.Debugf("... matched CCEvent[%s,%s] against Reg[%s,%s]", ccEvent.ChaincodeId, ccEvent.EventName, reg.ChaincodeID, reg.EventFilter)

if ed.eventConsumerTimeout < 0 {
select {
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL):
default:
logger.Warn("Unable to send to CC event channel.")
}
} else if ed.eventConsumerTimeout == 0 {
reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL)
} else {
select {
case reg.Eventch <- NewChaincodeEvent(ccEvent.ChaincodeId, ccEvent.EventName, ccEvent.TxId, ccEvent.Payload, blockNum, sourceURL):
case <-time.After(ed.eventConsumerTimeout):
logger.Warn("Timed out sending CC event.")
}
}
}
}
}

总结

本文介绍了:

  1. Peer支持的2类Even,
  2. Peer是如何支持事件订阅,和发送事件的,
  3. SDK支持的4类Event,这4类Event和Peer的2类Event的关系
  4. SDK和Peer之间的gRPC通信

更多SDK事件的使用,请参考文档

Fabric事件介绍的[官方文档(https://stone-fabric.readthedocs.io/zh/latest/peer_event_services.html)。

Fabric在examples中还提供了一个eventclient样例,看这个样例更有助于理解Fabric event的原理,以及是如何交互的。

前言

一份Peer节点启动的INFO级别日志如下,可以发现:

  1. 先注册了scc目录下的lscc, cscc, qscc,未注册chaincode目录下的lifecycle
  2. 然后又依次部署了上述scc。

本文的目的就是梳理出,系统链码的部署流程,这是peer节点提供背书、链码管理、配置、查询等功能的基础。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
2019-09-09 07:52:09.409 UTC [gossip.gossip] start -> INFO 013 Gossip instance peer1.org1.example.com:8051 started
2019-09-09 07:52:09.418 UTC [sccapi] deploySysCC -> INFO 014 system chaincode lscc/(github.com/hyperledger/fabric/core/scc/lscc) deployed
2019-09-09 07:52:09.420 UTC [cscc] Init -> INFO 015 Init CSCC
2019-09-09 07:52:09.422 UTC [sccapi] deploySysCC -> INFO 016 system chaincode cscc/(github.com/hyperledger/fabric/core/scc/cscc) deployed
2019-09-09 07:52:09.424 UTC [qscc] Init -> INFO 017 Init QSCC
2019-09-09 07:52:09.424 UTC [sccapi] deploySysCC -> INFO 018 system chaincode qscc/(github.com/hyperledger/fabric/core/scc/qscc) deployed
2019-09-09 07:52:09.425 UTC [sccapi] deploySysCC -> INFO 019 system chaincode (+lifecycle,github.com/hyperledger/fabric/core/chaincode/lifecycle) disabled
...
2019-09-09 07:52:14.386 UTC [sccapi] deploySysCC -> INFO 031 system chaincode lscc/mychannel(github.com/hyperledger/fabric/core/scc/lscc) deployed
2019-09-09 07:52:14.386 UTC [cscc] Init -> INFO 032 Init CSCC
2019-09-09 07:52:14.386 UTC [sccapi] deploySysCC -> INFO 033 system chaincode cscc/mychannel(github.com/hyperledger/fabric/core/scc/cscc) deployed
2019-09-09 07:52:14.387 UTC [qscc] Init -> INFO 034 Init QSCC
2019-09-09 07:52:14.387 UTC [sccapi] deploySysCC -> INFO 035 system chaincode qscc/mychannel(github.com/hyperledger/fabric/core/scc/qscc) deployed
2019-09-09 07:52:14.387 UTC [sccapi] deploySysCC -> INFO 036 system chaincode (+lifecycle,github.com/hyperledger/fabric/core/chaincode/lifecycle) disabled

宏观流程

提醒,本文使用SCC代指系统链码,使用scc代指core.scc模块。

在介绍源码之前,先给出总体流程,以便看源码的时候不会迷失。

部署SCC会涉及到4个模块:

  1. peer.node,它是peer的主程序,可以调用core.scc进行注册和部署SCC
  2. core.scc,它包含了lscc、qscc、cscc这3个scc,以及SCC的注册和部署
  3. core.chaincode,它是链码管理,普通链码和SCC都会走该模块,去部署和调用链码,和链码容器交互,并且它还提供了1个链码容器的工具shim
  4. core.container,它是实现链码容器,有2种链码容器,SCC使用的InprocVM,和普通链码使用的DockerVM

注册和部署的简要流程如下:

  1. peer运行启动程序
  2. 注册scc
    1. peer.node创建好lscc、cscc、qscc等scc实例,以及从配置文件读取的scc
    2. peer.node调用core.scc依次注册每一个scc实例
    3. core.scc调用core.container把scc实例信息注册到container
  3. 部署scc
    1. peer.node调用core.scc依次部署每一个注册的scc
    2. core.scc部署scc的流程复用普通链码部署流程,调用core.chaincode
    3. core.chaincode执行启动链码容器,scc也有链码容器是Inproc类型,不是Docker类型
    4. core.chaincode会调用core.container建立scc的Inproc容器实例
    5. core.container调用core.chaincode.shim启动容器内的程序,并负责和peer通信
    6. 启动完成后,core.chaincode向容器发送Init消息,让容器初始化,容器初始化完成会发送响应消息给core.chaincode,core.chaincode部署scc完成

总流程

列出源码的过程,会省略大量不相关代码,用...代替。

peer启动过程中,会调用node.serve,其中包含了为系统链码注册SCC和部署SCC。之后,还会为应用通道部署SCC,说明每个通道有各自的SCC,这里省略掉这部分。

1
2
3
4
5
6
7
8
9
10
11
12
func serve(args []string) error {
...
// 获取support,会注册SCC
// Initialize chaincode service
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
...
// 为系统通道部署已经注册的SCC
// deploy system chaincodes
sccp.DeploySysCCs("", ccp)
logger.Infof("Deployed system chaincodes")
...
}

注册SCC

注册SCC的流程:

peer.node -> core.scc -> core.container

peer.node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// startChaincodeServer will finish chaincode related initialization, including:
// 1) setup local chaincode install path
// 2) create chaincode specific tls CA
// 3) start the chaincode specific gRPC listening service
func startChaincodeServer(
peerHost string,
aclProvider aclmgmt.ACLProvider,
pr *platforms.Registry,
ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider, *persistence.PackageProvider) {
...
// 会注册SCC
chaincodeSupport, ccp, sccp := registerChaincodeSupport(
ccSrv,
ccEndpoint,
ca,
packageProvider,
aclProvider,
pr,
lifecycleSCC,
ops,
)
go ccSrv.Start()
return chaincodeSupport, ccp, sccp, packageProvider
}

func registerChaincodeSupport(
grpcServer *comm.GRPCServer,
ccEndpoint string,
ca tlsgen.CA,
packageProvider *persistence.PackageProvider,
aclProvider aclmgmt.ACLProvider,
pr *platforms.Registry,
lifecycleSCC *lifecycle.SCC,
ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {
...
// SCC的VM provider
ipRegistry := inproccontroller.NewRegistry()

// 创建SCC provider
sccp := scc.NewProvider(peer.Default, peer.DefaultSupport, ipRegistry)
// 创建lscc实例
lsccInst := lscc.New(sccp, aclProvider, pr)

// 普通链码,docker容器类型的VM provider
dockerProvider := dockercontroller.NewProvider(
viper.GetString("peer.id"),
viper.GetString("peer.networkId"),
ops.Provider,
)
dockerVM := dockercontroller.NewDockerVM(
dockerProvider.PeerID,
dockerProvider.NetworkID,
dockerProvider.BuildMetrics,
)
...
chaincodeSupport := chaincode.NewChaincodeSupport(
chaincode.GlobalConfig(),
ccEndpoint,
userRunsCC,
ca.CertBytes(),
authenticator,
packageProvider,
lsccInst, // chaincodeSupport的声明周期管理使用了lscc,而不是lifecycle
aclProvider,
container.NewVMController(
map[string]container.VMProvider{
dockercontroller.ContainerType: dockerProvider,
inproccontroller.ContainerType: ipRegistry,
},
),
sccp,
pr,
peer.DefaultSupport,
ops.Provider,
)
ipRegistry.ChaincodeSupport = chaincodeSupport
// chaincode provider,可以用来创建cscc
ccp := chaincode.NewProvider(chaincodeSupport)
...
// 创建cscc、qscc
csccInst := cscc.New(ccp, sccp, aclProvider)
qsccInst := qscc.New(aclProvider)

//Now that chaincode is initialized, register all system chaincodes.
sccs := scc.CreatePluginSysCCs(sccp)
// 加入lscc、cscc、qscc
// lifecycleSCC在1.4中disable了
// sccs是用户自定义的系统链码
for _, cc := range append([]scc.SelfDescribingSysCC{lsccInst, csccInst, qsccInst, lifecycleSCC}, sccs...) {
// 注册每一个SCC
sccp.RegisterSysCC(cc)
}
...
return chaincodeSupport, ccp, sccp
}

core.scc

注册某1个系统合约。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Provider implements sysccprovider.SystemChaincodeProvider
type Provider struct {
Peer peer.Operations
PeerSupport peer.Support
Registrar Registrar // 注册
SysCCs []SelfDescribingSysCC // 注册的scc,包含失败的
}

// RegisterSysCC registers a system chaincode with the syscc provider.
func (p *Provider) RegisterSysCC(scc SelfDescribingSysCC) {
// 收集/注册scc到scc provider
p.SysCCs = append(p.SysCCs, scc)
_, err := p.registerSysCC(scc)
if err != nil {
sysccLogger.Panicf("Could not register system chaincode: %s", err)
}
}

// registerSysCC registers the given system chaincode with the peer
func (p *Provider) registerSysCC(syscc SelfDescribingSysCC) (bool, error) {
// 检测该scc是否开启或不在白名单
if !syscc.Enabled() || !isWhitelisted(syscc) {
sysccLogger.Info(fmt.Sprintf("system chaincode (%s,%s,%t) disabled", syscc.Name(), syscc.Path(), syscc.Enabled()))
return false, nil
}

// XXX This is an ugly hack, version should be tied to the chaincode instance, not he peer binary
version := util.GetSysCCVersion()

// cc的描述信息
ccid := &ccintf.CCID{
Name: syscc.Name(),
Version: version,
}
// 注册scc的chaincode
err := p.Registrar.Register(ccid, syscc.Chaincode())
if err != nil {
//if the type is registered, the instance may not be... keep going
if _, ok := err.(inproccontroller.SysCCRegisteredErr); !ok {
errStr := fmt.Sprintf("could not register (%s,%v): %s", syscc.Path(), syscc, err)
sysccLogger.Error(errStr)
return false, fmt.Errorf(errStr)
}
}

sysccLogger.Infof("system chaincode %s(%s) registered", syscc.Name(), syscc.Path())
return true, err
}

// Registrar provides a way for system chaincodes to be registered
type Registrar interface {
// Register registers a system chaincode
Register(ccid *ccintf.CCID, cc shim.Chaincode) error
}

core.container

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//Register registers system chaincode with given path. The deploy should be called to initialize
func (r *Registry) Register(ccid *ccintf.CCID, cc shim.Chaincode) error {
r.mutex.Lock()
defer r.mutex.Unlock()

// 注册系统链码
name := ccid.GetName()
inprocLogger.Debugf("Registering chaincode instance: %s", name)
tmp := r.typeRegistry[name]
if tmp != nil {
return SysCCRegisteredErr(name)
}

r.typeRegistry[name] = &inprocContainer{chaincode: cc}
return nil
}


// Registry stores registered system chaincodes.
// It implements container.VMProvider and scc.Registrar
type Registry struct {
mutex sync.Mutex
typeRegistry map[string]*inprocContainer // 已注册链码映射
instRegistry map[string]*inprocContainer // 链码示例映射

ChaincodeSupport ccintf.CCSupport
}

部署SCC

部署SCC的流程:

peer.node -> core.scc -> core.chaincode -> core.container

peer.node

1
2
3
4
5
6
7
8
func serve(args []string) error {
...
// 为系统通道部署已经注册的SCC
// deploy system chaincodes
sccp.DeploySysCCs("", ccp)
logger.Infof("Deployed system chaincodes")
...
}

core.scc

DeploySysCCs会为chainID对应的channel,部署注册过程中收集的每一个SCC,它们在p.SysCCs中。

部署链码实际是一笔交易,为了复用普通链码的部署流程,core.scc使用deploySysCC封装部署链码需要的参数,链码是实际部署,走core.chaincode流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//DeploySysCCs is the hook for system chaincodes where system chaincodes are registered with the fabric
//note the chaincode must still be deployed and launched like a user chaincode will be
func (p *Provider) DeploySysCCs(chainID string, ccp ccprovider.ChaincodeProvider) {
// 部署每一个scc
for _, sysCC := range p.SysCCs {
deploySysCC(chainID, ccp, sysCC)
}
}

// deploySysCC deploys the given system chaincode on a chain
func deploySysCC(chainID string, ccprov ccprovider.ChaincodeProvider, syscc SelfDescribingSysCC) error {
// disable或不在白名单的scc不执行部署
if !syscc.Enabled() || !isWhitelisted(syscc) {
sysccLogger.Info(fmt.Sprintf("system chaincode (%s,%s) disabled", syscc.Name(), syscc.Path()))
return nil
}

// 为scc生成txid,因为部署链码的过程需要txParams,与普通链码的流程相同
txid := util.GenerateUUID()

// Note, this structure is barely initialized,
// we omit the history query executor, the proposal
// and the signed proposal
txParams := &ccprovider.TransactionParams{
TxID: txid,
ChannelID: chainID,
}

// 设置交易执行模拟器,系统通道chainID为"",所以系统通道的scc没有模拟器
if chainID != "" {
// 获取链/通道的账本
lgr := peer.GetLedger(chainID)
if lgr == nil {
panic(fmt.Sprintf("syschain %s start up failure - unexpected nil ledger for channel %s", syscc.Name(), chainID))
}

// 根据交易id创建链码模拟器
txsim, err := lgr.NewTxSimulator(txid)
if err != nil {
return err
}

// 指定链码执行模拟器
txParams.TXSimulator = txsim
defer txsim.Done()
}

chaincodeID := &pb.ChaincodeID{Path: syscc.Path(), Name: syscc.Name()}
spec := &pb.ChaincodeSpec{Type: pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value["GOLANG"]), ChaincodeId: chaincodeID, Input: &pb.ChaincodeInput{Args: syscc.InitArgs()}}

// ChaincodeDeploymentSpec_SYSTEM标明:部署SCC
chaincodeDeploymentSpec := &pb.ChaincodeDeploymentSpec{ExecEnv: pb.ChaincodeDeploymentSpec_SYSTEM, ChaincodeSpec: spec}

// XXX This is an ugly hack, version should be tied to the chaincode instance, not he peer binary
version := util.GetSysCCVersion()

cccid := &ccprovider.CCContext{
Name: chaincodeDeploymentSpec.ChaincodeSpec.ChaincodeId.Name,
Version: version,
}

// 部署SCC
resp, _, err := ccprov.ExecuteLegacyInit(txParams, cccid, chaincodeDeploymentSpec)
if err == nil && resp.Status != shim.OK {
err = errors.New(resp.Message)
}

sysccLogger.Infof("system chaincode %s/%s(%s) deployed", syscc.Name(), chainID, syscc.Path())

return err
}


// ChaincodeProvider provides an abstraction layer that is
// used for different packages to interact with code in the
// chaincode package without importing it; more methods
// should be added below if necessary
type ChaincodeProvider interface {
// Execute executes a standard chaincode invocation for a chaincode and an input
Execute(txParams *TransactionParams, cccid *CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error)
// ExecuteLegacyInit is a special case for executing chaincode deployment specs,
// which are not already in the LSCC, needed for old lifecycle
ExecuteLegacyInit(txParams *TransactionParams, cccid *CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error)
// Stop stops the chaincode give
Stop(ccci *ChaincodeContainerInfo) error
}

core.chaincode

CCProviderImpl实现了ChaincodeProvider接口,可以用来部署链码,ExecuteLegacyInit会执行2项:

  1. 启动链码容器
  2. 执行链码Init函数,链码容器启动后,peer和链码容器通过消息通信,ChaincodeMessage_INIT是执行链码容器的Init函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// ExecuteLegacyInit executes a chaincode which is not in the LSCC table
func (c *CCProviderImpl) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) {
return c.cs.ExecuteLegacyInit(txParams, cccid, spec)
}


// ExecuteLegacyInit is a temporary method which should be removed once the old style lifecycle
// is entirely deprecated. Ideally one release after the introduction of the new lifecycle.
// It does not attempt to start the chaincode based on the information from lifecycle, but instead
// accepts the container information directly in the form of a ChaincodeDeploymentSpec.
func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) {
// 部署链码需要的信息
ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec)
ccci.Version = cccid.Version

// 启动容器
err := cs.LaunchInit(ccci)
if err != nil {
return nil, nil, err
}

cname := ccci.Name + ":" + ccci.Version
h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname)
}

// 调用链码Init
resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h)
return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}

LaunchInit是启动容器的一层检查,实际启动由Launcher.Launch完成。启动链码容器是异步的,会创建单独的goroutine去执行。

core.chaincode使用Runtime接口操控链码容器的启停。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// LaunchInit bypasses getting the chaincode spec from the LSCC table
// as in the case of v1.0-v1.2 lifecycle, the chaincode will not yet be
// defined in the LSCC table
func (cs *ChaincodeSupport) LaunchInit(ccci *ccprovider.ChaincodeContainerInfo) error {
cname := ccci.Name + ":" + ccci.Version
// 已经有handler,即容器已经启动。调用链码的时候,也会获取handler
if cs.HandlerRegistry.Handler(cname) != nil {
return nil
}

// 否则启动容器,设置handler
return cs.Launcher.Launch(ccci)
}

func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
var startFailCh chan error
var timeoutCh <-chan time.Time

startTime := time.Now()
cname := ccci.Name + ":" + ccci.Version
launchState, alreadyStarted := r.Registry.Launching(cname)
// 链码容器未启动,启动容器
if !alreadyStarted {
startFailCh = make(chan error, 1)
timeoutCh = time.NewTimer(r.StartupTimeout).C

codePackage, err := r.getCodePackage(ccci)
if err != nil {
return err
}

go func() {
// 启动容器
if err := r.Runtime.Start(ccci, codePackage); err != nil {
startFailCh <- errors.WithMessage(err, "error starting container")
return
}
exitCode, err := r.Runtime.Wait(ccci)
if err != nil {
launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
}
launchState.Notify(errors.Errorf("container exited with %d", exitCode))
}()
}
...
}

// Runtime is used to manage chaincode runtime instances.
type Runtime interface {
Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error
Stop(ccci *ccprovider.ChaincodeContainerInfo) error
Wait(ccci *ccprovider.ChaincodeContainerInfo) (int, error)
}

ContainerRuntime是core.chaincode封装出来和core.container交互的,在这里它会创建启动链码请求,交给container。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Start launches chaincode in a runtime environment.
func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error {
cname := ccci.Name + ":" + ccci.Version

lc, err := c.LaunchConfig(cname, ccci.Type)
if err != nil {
return err
}

chaincodeLogger.Debugf("start container: %s", cname)
chaincodeLogger.Debugf("start container with args: %s", strings.Join(lc.Args, " "))
chaincodeLogger.Debugf("start container with env:\n\t%s", strings.Join(lc.Envs, "\n\t"))

// 启动链码的请求
scr := container.StartContainerReq{
Builder: &container.PlatformBuilder{
Type: ccci.Type,
Name: ccci.Name,
Version: ccci.Version,
Path: ccci.Path,
CodePackage: codePackage,
PlatformRegistry: c.PlatformRegistry,
},
Args: lc.Args,
Env: lc.Envs,
FilesToUpload: lc.Files,
CCID: ccintf.CCID{
Name: ccci.Name,
Version: ccci.Version,
},
}

// 处理容器操作请求
if err := c.Processor.Process(ccci.ContainerType, scr); err != nil {
return errors.WithMessage(err, "error starting container")
}

return nil
}


// Processor processes vm and container requests.
type Processor interface {
Process(vmtype string, req container.VMCReq) error
}

core.container

VMController实现了Processor,它会按指定的类型建立虚拟机,明明就是容器,为啥内部又叫VM,VM有2种:

  1. InprocVM,意思是运行在单独进程中的虚拟机,但不是指操作系统的进程,而是指一个隔离的环境,SCC是这类。
  2. DockerVM,指利用Docker启动的容器,普通链码就是这类。

类型是存在ccci.ContainerType中的,ccci包含了部署链码所需要的信息,这个信息在core.chaincode很早就获取到了,可以往前翻。

Process就是创建VM,然后利用VM处理请求的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 根据请求对VM进行某种操作
func (vmc *VMController) Process(vmtype string, req VMCReq) error {
// 创建vm
v := vmc.newVM(vmtype)
ccid := req.GetCCID()
id := ccid.GetName()

vmc.lockContainer(id)
defer vmc.unlockContainer(id)

// 把vm传递给请求,即用该vm执行请求内容
return req.Do(v)
}

虚拟机创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 利用指定类型的vm provider创建vm
func (vmc *VMController) newVM(typ string) VM {
v, ok := vmc.vmProviders[typ]
if !ok {
vmLogger.Panicf("Programming error: unsupported VM type: %s", typ)
}
return v.NewVM()
}

// NewVMController creates a new instance of VMController
func NewVMController(vmProviders map[string]VMProvider) *VMController {
return &VMController{
containerLocks: make(map[string]*refCountedLock),
vmProviders: vmProviders,
}
}

创建VM需要使用NewVMController,回过去找它的创建地方。

在注册SCC的过程中,调用registerChaincodeSupport创建了chaincodeSupport,其中一个字段为创建NewVMController,就包含了2类Vm provider:

  1. ipRegistry,SCC的
  2. dockerProvider,普通链码的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func registerChaincodeSupport(
grpcServer *comm.GRPCServer,
ccEndpoint string,
ca tlsgen.CA,
packageProvider *persistence.PackageProvider,
aclProvider aclmgmt.ACLProvider,
pr *platforms.Registry,
lifecycleSCC *lifecycle.SCC,
ops *operations.System,
) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {
...
// SCC的VM provider
ipRegistry := inproccontroller.NewRegistry()
...
// 普通链码,docker容器类型的VM provider
dockerProvider := dockercontroller.NewProvider(
viper.GetString("peer.id"),
viper.GetString("peer.networkId"),
ops.Provider,
)
...
chaincodeSupport := chaincode.NewChaincodeSupport(
chaincode.GlobalConfig(),
ccEndpoint,
userRunsCC,
ca.CertBytes(),
authenticator,
packageProvider,
lsccInst, // chaincodeSupport的声明周期管理使用了lscc,而不是lifecycle
aclProvider,
// 创建了VM controller,controller提供了inproc和docker 2中子controller,
// 即2中链码运行方式
container.NewVMController(
map[string]container.VMProvider{
dockercontroller.ContainerType: dockerProvider,
inproccontroller.ContainerType: ipRegistry,
},
),
sccp,
pr,
peer.DefaultSupport,
ops.Provider,
)
...
}

VM处理操作虚拟机的请求

core.container的请求,都实现了VMCReq接口,StartContainerReq、StopContainerReq、WaitContainerReq是实现VMCReq的3类请求。

启动实际是启动虚拟机接口,处理请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//VMCReq - all requests should implement this interface.
//The context should be passed and tested at each layer till we stop
//note that we'd stop on the first method on the stack that does not
//take context
type VMCReq interface {
Do(v VM) error
GetCCID() ccintf.CCID
}


// 启动容器
func (si StartContainerReq) Do(v VM) error {
return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}

//VM is an abstract virtual image for supporting arbitrary virual machines
type VM interface {
Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder Builder) error
Stop(ccid ccintf.CCID, timeout uint, dontkill bool, dontremove bool) error
Wait(ccid ccintf.CCID) (int, error)
HealthCheck(context.Context) error
}

DockerVM和InprocVM都实现了VM接口,本文只关注InprocVM类型,即SCC的。

InprocVM会得到一个容器实例ipc,用它来运行SCC。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//Start starts a previously registered system codechain
func (vm *InprocVM) Start(ccid ccintf.CCID, args []string, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
path := ccid.GetName()

ipctemplate := vm.registry.getType(path)
if ipctemplate == nil {
return fmt.Errorf(fmt.Sprintf("%s not registered", path))
}

// 即ccid.Name
instName := vm.GetVMName(ccid)

// 获取容器实例
ipc, err := vm.getInstance(ipctemplate, instName, args, env)
if err != nil {
return fmt.Errorf(fmt.Sprintf("could not create instance for %s", instName))
}

// 已经在运行了,还部署个啥!
if ipc.running {
return fmt.Errorf(fmt.Sprintf("chaincode running %s", path))
}

ipc.running = true

go func() {
defer func() {
if r := recover(); r != nil {
inprocLogger.Criticalf("caught panic from chaincode %s", instName)
}
}()
// 启动进程级容器
ipc.launchInProc(instName, args, env)
}()

return nil
}

inprocContainer开启2个goroutine:

  1. 第一个调用shimStartInProc,即利用core.chaincode.shim启动InProc类型的容器。
  2. 第二个调用HandleChaincodeStream,处理peer和Inproc容器间的通信数据,此处的stream是peer端的。

这里可以看到创建了2个通道peerRcvCCSendccRcvPeerSend,它们表明了peer和scc的链码容器是通过通道直接通信的。peer和docker链码容器之间是走gRPC通信的,这个到普通链码的时候再介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 从进程启动链码
func (ipc *inprocContainer) launchInProc(id string, args []string, env []string) error {
if ipc.ChaincodeSupport == nil {
inprocLogger.Panicf("Chaincode support is nil, most likely you forgot to set it immediately after calling inproccontroller.NewRegsitry()")
}

// 和调用链码的上层通信的2个通道
peerRcvCCSend := make(chan *pb.ChaincodeMessage)
ccRcvPeerSend := make(chan *pb.ChaincodeMessage)
var err error
ccchan := make(chan struct{}, 1)
ccsupportchan := make(chan struct{}, 1)
shimStartInProc := _shimStartInProc // shadow to avoid race in test
go func() {
defer close(ccchan)
// 启动链码
inprocLogger.Debugf("chaincode started for %s", id)
if args == nil {
args = ipc.args
}
if env == nil {
env = ipc.env
}
// 利用shim启动
err := shimStartInProc(env, args, ipc.chaincode, ccRcvPeerSend, peerRcvCCSend)
if err != nil {
err = fmt.Errorf("chaincode-support ended with err: %s", err)
_inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode ended for %s with err: %s", id, err)
}()

// shadow function to avoid data race
inprocLoggerErrorf := _inprocLoggerErrorf
go func() {
defer close(ccsupportchan)
// 处理scc和外部通信的消息流
inprocStream := newInProcStream(peerRcvCCSend, ccRcvPeerSend)
inprocLogger.Debugf("chaincode-support started for %s", id)
err := ipc.ChaincodeSupport.HandleChaincodeStream(inprocStream)
if err != nil {
err = fmt.Errorf("chaincode ended with err: %s", err)
inprocLoggerErrorf("%s", err)
}
inprocLogger.Debugf("chaincode-support ended for %s with err: %s", id, err)
}()
}

利用shim启动Inproc链码容器中的程序

shim是chaincode提供给容器,运行链码的工具,它运行在容器里。

利用shim启动InprocVM使用的函数是StartInProc,提取一些运行链码需要的数据,比如又一个stream,此处的stream是容器端的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 启动SCC的入口
// StartInProc is an entry point for system chaincodes bootstrap. It is not an
// API for chaincodes.
func StartInProc(env []string, args []string, cc Chaincode, recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) error {
// 有点奇怪,这些日志都没有看到,因为已经在shim,不属于peer日志了
chaincodeLogger.Debugf("in proc %v", args)

// 从环境变量获取cc name
var chaincodename string
for _, v := range env {
if strings.Index(v, "CORE_CHAINCODE_ID_NAME=") == 0 {
p := strings.SplitAfter(v, "CORE_CHAINCODE_ID_NAME=")
chaincodename = p[1]
break
}
}
if chaincodename == "" {
return errors.New("error chaincode id not provided")
}

// 创建peer和chaincode通信的通道
stream := newInProcStream(recv, send)
chaincodeLogger.Debugf("starting chat with peer using name=%s", chaincodename)
// 与peer进行通信
err := chatWithPeer(chaincodename, stream, cc)
return err
}

chatWithPeer是通用的,普通的链码也调用这个程序。它创建了一个handler,用来处理消息(发送和接收),以及操作(调用)链码。

这个过程,它会向peer发送REGISTER消息,和peer先“握手”,也会从peer读消息,消息的处理函数就是里面的for循环,这样链码容器就运行起来了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 通用,SCC和CC都使用这个函数
func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {
// 把stream和cc交给handler,handler可以发送和接收数据,即读写通道
// Create the shim handler responsible for all control logic
handler := newChaincodeHandler(stream, cc)
defer stream.CloseSend()

// Send the ChaincodeID during register.
chaincodeID := &pb.ChaincodeID{Name: chaincodename}
payload, err := proto.Marshal(chaincodeID)
if err != nil {
return errors.Wrap(err, "error marshalling chaincodeID during chaincode registration")
}

// 在stream上向peer发送注册消息
// Register on the stream
chaincodeLogger.Debugf("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
if err = handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload}); err != nil {
return errors.WithMessage(err, "error sending chaincode REGISTER")
}

// holds return values from gRPC Recv below
type recvMsg struct {
msg *pb.ChaincodeMessage
err error
}
msgAvail := make(chan *recvMsg, 1)
errc := make(chan error)

receiveMessage := func() {
in, err := stream.Recv()
msgAvail <- &recvMsg{in, err}
}

// 异步读取1个消息
go receiveMessage()

// 循环处理peer发送的消息
for {
select {
case rmsg := <-msgAvail:
switch {
case rmsg.err == io.EOF:
err = errors.Wrapf(rmsg.err, "received EOF, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return err
case rmsg.err != nil:
err := errors.Wrap(rmsg.err, "receive failed")
chaincodeLogger.Errorf("Received error from server, ending chaincode stream: %+v", err)
return err
case rmsg.msg == nil:
err := errors.New("received nil message, ending chaincode stream")
chaincodeLogger.Debugf("%+v", err)
return err
default:
// 处理消息
chaincodeLogger.Debugf("[%s]Received message %s from peer", shorttxid(rmsg.msg.Txid), rmsg.msg.Type)
err := handler.handleMessage(rmsg.msg, errc)
if err != nil {
err = errors.WithMessage(err, "error handling message")
return err
}

// 读取下一个消息
go receiveMessage()
}

case sendErr := <-errc:
if sendErr != nil {
err := errors.Wrap(sendErr, "error sending")
return err
}
}
}
}

具体的消息处理函数,先跳过,回过头来,关注scc容器和peer的通信。

SCC和Peer的通信通道

链码容器和Peer之间使用Stream进行通信,Stream有2种实现:

  1. 使用channel封装的Stream
  2. gRPC的Stream

链码容器和Peer通信的接口是:

1
2
3
4
5
6
// PeerChaincodeStream interface for stream between Peer and chaincode instance.
type PeerChaincodeStream interface {
Send(*pb.ChaincodeMessage) error
Recv() (*pb.ChaincodeMessage, error)
CloseSend() error
}

普通链码使用gRPC:

1
2
3
type chaincodeSupportRegisterClient struct {
grpc.ClientStream
}

系统链码直接使用通道通信,发送和接收消息都在下面了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// peer和chaincode之间通信的通道
// PeerChaincodeStream interface for stream between Peer and chaincode instance.
type inProcStream struct {
recv <-chan *pb.ChaincodeMessage
send chan<- *pb.ChaincodeMessage
}

func newInProcStream(recv <-chan *pb.ChaincodeMessage, send chan<- *pb.ChaincodeMessage) *inProcStream {
return &inProcStream{recv, send}
}

// 发送其实就是向send写数据
func (s *inProcStream) Send(msg *pb.ChaincodeMessage) (err error) {
err = nil

//send may happen on a closed channel when the system is
//shutting down. Just catch the exception and return error
defer func() {
if r := recover(); r != nil {
err = SendPanicFailure(fmt.Sprintf("%s", r))
return
}
}()
s.send <- msg
return
}

// 接收是从recv读数据
func (s *inProcStream) Recv() (*pb.ChaincodeMessage, error) {
msg, ok := <-s.recv
if !ok {
return nil, errors.New("channel is closed")
}
return msg, nil
}

func (s *inProcStream) CloseSend() error {
// 实际啥也没做
return nil
}

Peer和链码容器的交互,完成链码容器启动

部署链码需要Peer和链码容器交互,不然Peer怎么知道链码容器已经启动。以下是一份peer的DEBUG日志,在下面标注了启动容器和链码Init过程中的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2019-09-09 07:52:09.915 UTC [chaincode] LaunchConfig -> DEBU 098 launchConfig: executable:"chaincode",Args:[chaincode,-peer.address=peer0.org1.example.com:7052],Envs:[CORE_CHAINCODE_LOGGING_LEVEL=info,CORE_CHAINCODE_LOGGING_SHIM=warning,CORE_CHAINCODE_LOGGING_FORMAT=%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message},CORE_CHAINCODE_ID_NAME=lscc:1.4.3,CORE_PEER_TLS_ENABLED=true,CORE_TLS_CLIENT_KEY_PATH=/etc/hyperledger/fabric/client.key,CORE_TLS_CLIENT_CERT_PATH=/etc/hyperledger/fabric/client.crt,CORE_PEER_TLS_ROOTCERT_FILE=/etc/hyperledger/fabric/peer.crt],Files:[/etc/hyperledger/fabric/client.crt /etc/hyperledger/fabric/client.key /etc/hyperledger/fabric/peer.crt]
2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 099 start container: lscc:1.4.3
2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 09a start container with args: chaincode -peer.address=peer0.org1.example.com:7052
2019-09-09 07:52:09.915 UTC [chaincode] Start -> DEBU 09b start container with env:
CORE_CHAINCODE_LOGGING_LEVEL=info
CORE_CHAINCODE_LOGGING_SHIM=warning
CORE_CHAINCODE_LOGGING_FORMAT=%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message}
CORE_CHAINCODE_ID_NAME=lscc:1.4.3
CORE_PEER_TLS_ENABLED=true
CORE_TLS_CLIENT_KEY_PATH=/etc/hyperledger/fabric/client.key
CORE_TLS_CLIENT_CERT_PATH=/etc/hyperledger/fabric/client.crt
CORE_PEER_TLS_ROOTCERT_FILE=/etc/hyperledger/fabric/peer.crt
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 09c waiting for container(lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 09d got container (lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [inproccontroller] getInstance -> DEBU 09e chaincode instance created for lscc-1.4.3
2019-09-09 07:52:09.915 UTC [container] unlockContainer -> DEBU 09f container lock deleted(lscc-1.4.3)
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 0a0 waiting for container(lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [container] lockContainer -> DEBU 0a1 got container (lscc-1.4.3) lock
2019-09-09 07:52:09.915 UTC [container] unlockContainer -> DEBU 0a2 container lock deleted(lscc-1.4.3)
2019-09-09 07:52:09.915 UTC [inproccontroller] func2 -> DEBU 0a3 chaincode-support started for lscc-1.4.3
2019-09-09 07:52:09.915 UTC [inproccontroller] func1 -> DEBU 0a4 chaincode started for lscc-1.4.3
// 以上日志对应的代码流程在上文都讲到了

// 以下是交互过程peer日志
// peer收到容器的注册消息
2019-09-09 07:52:09.916 UTC [chaincode] handleMessage -> DEBU 0a5 [] Fabric side handling ChaincodeMessage of type: REGISTER in state created
2019-09-09 07:52:09.916 UTC [chaincode] HandleRegister -> DEBU 0a6 Received REGISTER in state created
2019-09-09 07:52:09.916 UTC [chaincode] Register -> DEBU 0a7 registered handler complete for chaincode lscc:1.4.3
2019-09-09 07:52:09.916 UTC [chaincode] HandleRegister -> DEBU 0a8 Got REGISTER for chaincodeID = name:"lscc:1.4.3" , sending back REGISTERED
2019-09-09 07:52:09.920 UTC [grpc] HandleSubConnStateChange -> DEBU 0a9 pickfirstBalancer: HandleSubConnStateChange: 0xc0026318c0, READY
2019-09-09 07:52:09.923 UTC [chaincode] HandleRegister -> DEBU 0aa Changed state to established for name:"lscc:1.4.3"

// peer发送ready消息
2019-09-09 07:52:09.923 UTC [chaincode] sendReady -> DEBU 0ab sending READY for chaincode name:"lscc:1.4.3"
2019-09-09 07:52:09.923 UTC [chaincode] sendReady -> DEBU 0ac Changed to state ready for chaincode name:"lscc:1.4.3"

// 已经完成启动容器
2019-09-09 07:52:09.923 UTC [chaincode] Launch -> DEBU 0ad launch complete
2019-09-09 07:52:09.924 UTC [chaincode] Execute -> DEBU 0ae Entry
// 收到容器COMPLETED消息
2019-09-09 07:52:09.925 UTC [chaincode] handleMessage -> DEBU 0af [01b03aae] Fabric side handling ChaincodeMessage of type: COMPLETED in state ready

// 通知scc,部署已经完成
2019-09-09 07:52:09.925 UTC [chaincode] Notify -> DEBU 0b0 [01b03aae] notifying Txid:01b03aae-17a6-4b63-874e-dc20d6f5df0c, channelID:
2019-09-09 07:52:09.925 UTC [chaincode] Execute -> DEBU 0b1 Exit
2019-09-09 07:52:09.925 UTC [sccapi] deploySysCC -> INFO 0b2 system chaincode lscc/(github.com/hyperledger/fabric/core/scc/lscc) deployed

可以到REGISTER、READY、COMPLETED等消息,以及状态的改变:created、ready。

但前面还没有介绍Peer和链码容器之间的通信,所以不展示代码了,展示一下Peer和链码容器的消息交互图:

endorser policy

背书策略是Fabric中的一个重要一环,想梳理一下背书策略的上链和使用流程。

背书策略是部署和升级链码时使用的,需要发送配置交易,所以尝试了从背书节点收到交易,然后处理交易的流程入手,找到背书策略的入口,结果毫无头绪。

换一种思路,从使用入手,向上追溯,这种就非常顺利了。

从背书策略的使用入手

VSCC会利用背书策略,并且背书策略不满足时会返回一个:背书策略不满足的错误,每一个上链的交易详细中都有这么一个Validation字段,为0代表有效交易,否则是无效交易,并用数字表示原因,背书策略不满足的序号就是10。

1
2
3
4
5
6
7
type TxValidationCode int32

const (
...
TxValidationCode_ENDORSEMENT_POLICY_FAILURE TxValidationCode = 10
...
)

TxValidationCode_ENDORSEMENT_POLICY_FAILUREVSCCValidateTx使用,系统链码和普通链码都有背书策略需要满足,下面代码片是普通链码部分,可以发现调用VSCCValidateTxForCC验证交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
// VSCCValidateTx executes vscc validation for transaction
func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, envBytes []byte, block *common.Block) (error, peer.TxValidationCode) {
...
if err = v.VSCCValidateTxForCC(ctx); err != nil {
switch err.(type) {
case *commonerrors.VSCCEndorsementPolicyError:
return err, peer.TxValidationCode_ENDORSEMENT_POLICY_FAILURE
default:
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
}
...
}

每个chaincode都会提供escc和vscc,现在都是默认的,也就是说escc和vscc都可以是具备可插拔的。

1
2
3
peer chaincode list -C mychannel --instantiated
Get instantiated chaincodes on channel mychannel:
Name: mycc, Version: 1.1, Path: github.com/chaincode/chaincode_example02/go/, Escc: escc, Vscc: vscc

VSCCValidateTxForCC会从交易的context中获取验证插件,然后利用插件验证交易。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func (v *VsccValidatorImpl) VSCCValidateTxForCC(ctx *Context) error {
logger.Debug("Validating", ctx, "with plugin")
// 使用插件验证交易
err := v.pluginValidator.ValidateWithPlugin(ctx)
if err == nil {
return nil
}
// If the error is a pluggable validation execution error, cast it to the common errors ExecutionFailureError.
if e, isExecutionError := err.(*validation.ExecutionFailureError); isExecutionError {
return &commonerrors.VSCCExecutionFailureError{Err: e}
}
// Else, treat it as an endorsement error.
return &commonerrors.VSCCEndorsementPolicyError{Err: err}
}

func (pv *PluginValidator) ValidateWithPlugin(ctx *Context) error {
// 获取vscc插件
plugin, err := pv.getOrCreatePlugin(ctx)
if err != nil {
return &validation.ExecutionFailureError{
Reason: fmt.Sprintf("plugin with name %s couldn't be used: %v", ctx.VSCCName, err),
}
}
// 利用插件验证
err = plugin.Validate(ctx.Block, ctx.Namespace, ctx.Seq, 0, SerializedPolicy(ctx.Policy))
validityStatus := "valid"
if err != nil {
validityStatus = fmt.Sprintf("invalid: %v", err)
}
logger.Debug("Transaction", ctx.TxID, "appears to be", validityStatus)
return err
}

// Plugin validates transactions
type Plugin interface {
// Validate returns nil if the action at the given position inside the transaction
// at the given position in the given block is valid, or an error if not.
Validate(block *common.Block, namespace string, txPosition int, actionPosition int, contextData ...ContextDatum) error

// Init injects dependencies into the instance of the Plugin
Init(dependencies ...Dependency) error
}

当前验证插件有2种实现,TxValidatorV1_2V1_3Validation,Validate还从context取出了序列化的背书策略,vscc会调用PolicyEvalutor交易的背书是否满足背书策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func (v *DefaultValidation) Validate(block *common.Block, namespace string, txPosition int, actionPosition int, contextData ...validation.ContextDatum) error {
if len(contextData) == 0 {
logger.Panicf("Expected to receive policy bytes in context data")
}

// 拿到序列化后的policy
serializedPolicy, isSerializedPolicy := contextData[0].(SerializedPolicy)
if !isSerializedPolicy {
logger.Panicf("Expected to receive a serialized policy in the first context data")
}
if block == nil || block.Data == nil {
return errors.New("empty block")
}
if txPosition >= len(block.Data.Data) {
return errors.Errorf("block has only %d transactions, but requested tx at position %d", len(block.Data.Data), txPosition)
}
if block.Header == nil {
return errors.Errorf("no block header")
}

// 调用不同版本的validator进行验证
var err error
switch {
case v.Capabilities.V1_3Validation():
err = v.TxValidatorV1_3.Validate(block, namespace, txPosition, actionPosition, serializedPolicy.Bytes())

case v.Capabilities.V1_2Validation():
fallthrough

default:
err = v.TxValidatorV1_2.Validate(block, namespace, txPosition, actionPosition, serializedPolicy.Bytes())
}

logger.Debugf("block %d, namespace: %s, tx %d validation results is: %v", block.Header.Number, namespace, txPosition, err)
return convertErrorTypeOrPanic(err)
}

// 验证代码使用v2/validation_logic.go中的实现
// Validate validates the given envelope corresponding to a transaction with an endorsement
// policy as given in its serialized form
func (vscc *Validator) Validate(
block *common.Block,
namespace string,
txPosition int,
actionPosition int,
policyBytes []byte,
) commonerrors.TxValidationError {
...
// evaluate the signature set against the policy
err = vscc.policyEvaluator.Evaluate(policyBytes, signatureSet)
if err != nil {
logger.Warningf("Endorsement policy failure for transaction txid=%s, err: %s", chdr.GetTxId(), err.Error())
if len(signatureSet) < len(cap.Action.Endorsements) {
// Warning: duplicated identities exist, endorsement failure might be cause by this reason
return policyErr(errors.New(DUPLICATED_IDENTITY_ERROR))
}
return policyErr(fmt.Errorf("VSCC error: endorsement policy failure, err: %s", err))
}
...
}

// PolicyEvaluator evaluates policies
type PolicyEvaluator interface {
validation.Dependency

// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies
// the policy with the given bytes
Evaluate(policyBytes []byte, signatureSet []*common.SignedData) error
}

Evaluate会创建背书策略实例,然后利用背书策略验证背书签名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy
func (id *PolicyEvaluator) Evaluate(policyBytes []byte, signatureSet []*common.SignedData) error {
pp := cauthdsl.NewPolicyProvider(id.IdentityDeserializer)
policy, _, err := pp.NewPolicy(policyBytes)
if err != nil {
return err
}
return policy.Evaluate(signatureSet)
}

// Policy is used to determine if a signature is valid
type Policy interface {
// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy
Evaluate(signatureSet []*cb.SignedData) error
}

// Evaluate takes a set of SignedData and evaluates whether this set of signatures satisfies the policy
func (p *policy) Evaluate(signatureSet []*cb.SignedData) error {
if p == nil {
return fmt.Errorf("No such policy")
}
idAndS := make([]IdentityAndSignature, len(signatureSet))
for i, sd := range signatureSet {
idAndS[i] = &deserializeAndVerify{
signedData: sd,
deserializer: p.deserializer,
}
}

ok := p.evaluator(deduplicate(idAndS), make([]bool, len(signatureSet)))
if !ok {
return errors.New("signature set did not satisfy policy")
}
return nil
}

具体背书验证签名的实现,当下就先不关心了。回过头来想一下,VSCC从哪拿到了背书策略?

VSCC的背书策略哪来的?

回到上文第一次出现背书策略的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (pv *PluginValidator) ValidateWithPlugin(ctx *Context) error {
err = plugin.Validate(ctx.Block, ctx.Namespace, ctx.Seq, 0, SerializedPolicy(ctx.Policy))
}

// Context defines information about a transaction
// that is being validated
type Context struct {
Seq int
Envelope []byte
TxID string
Channel string
VSCCName string
Policy []byte // 背书策略
Namespace string
Block *common.Block
}

VSCCValidateTx函数会创建Context,填写policy字段,其中policy是调用GetInfoForValidate获取的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, envBytes []byte, block *common.Block) (error, peer.TxValidationCode) {
...
// 普通链码
if !v.sccprovider.IsSysCC(ccID) {
...
// 获取policy、vscc等
// Get latest chaincode version, vscc and validate policy
txcc, vscc, policy, err := v.GetInfoForValidate(chdr, ns)
...
// do VSCC validation
ctx := &Context{
Seq: seq,
Envelope: envBytes,
Block: block,
TxID: chdr.TxId,
Channel: chdr.ChannelId,
Namespace: ns,
Policy: policy, // Here
VSCCName: vscc.ChaincodeName,
}
if err = v.VSCCValidateTxForCC(ctx); err != nil {
switch err.(type) {
case *commonerrors.VSCCEndorsementPolicyError:
return err, peer.TxValidationCode_ENDORSEMENT_POLICY_FAILURE
default:
return err, peer.TxValidationCode_INVALID_OTHER_REASON
}
}
} else {
// SCC
}
}

GetInfoForValidate先是获取了ChaincodeDefinition,它记录了peer对某个链码的proposal背书和验证的必要信息,然后利用ChaincodeDefinition.Validation获取了policy。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// GetInfoForValidate gets the ChaincodeInstance(with latest version) of tx, vscc and policy from lscc
func (v *VsccValidatorImpl) GetInfoForValidate(chdr *common.ChannelHeader, ccID string) (*sysccprovider.ChaincodeInstance, *sysccprovider.ChaincodeInstance, []byte, error) {
cc := &sysccprovider.ChaincodeInstance{
ChainID: chdr.ChannelId,
ChaincodeName: ccID,
ChaincodeVersion: coreUtil.GetSysCCVersion(),
}
vscc := &sysccprovider.ChaincodeInstance{
ChainID: chdr.ChannelId,
ChaincodeName: "vscc", // default vscc for system chaincodes
ChaincodeVersion: coreUtil.GetSysCCVersion(), // Get vscc version
}
var policy []byte
var err error
if !v.sccprovider.IsSysCC(ccID) {
// when we are validating a chaincode that is not a
// system CC, we need to ask the CC to give us the name
// of VSCC and of the policy that should be used

// obtain name of the VSCC and the policy
// 获取cc 定义
cd, err := v.getCDataForCC(chdr.ChannelId, ccID)
if err != nil {
msg := fmt.Sprintf("Unable to get chaincode data from ledger for txid %s, due to %s", chdr.TxId, err)
logger.Errorf(msg)
return nil, nil, nil, err
}
cc.ChaincodeName = cd.CCName()
cc.ChaincodeVersion = cd.CCVersion()
// 拿到policy
vscc.ChaincodeName, policy = cd.Validation()
} else {
// when we are validating a system CC, we use the default
// VSCC and a default policy that requires one signature
// from any of the members of the channel
p := cauthdsl.SignedByAnyMember(v.support.GetMSPIDs(chdr.ChannelId))
policy, err = utils.Marshal(p)
if err != nil {
return nil, nil, nil, err
}
}

return cc, vscc, policy, nil
}

//-------- ChaincodeDefinition - interface for ChaincodeData ------
// ChaincodeDefinition describes all of the necessary information for a peer to decide whether to endorse
// a proposal and whether to validate a transaction, for a particular chaincode.
type ChaincodeDefinition interface {
// CCName returns the name of this chaincode (the name it was put in the ChaincodeRegistry with).
CCName() string

// Hash returns the hash of the chaincode.
Hash() []byte

// CCVersion returns the version of the chaincode.
CCVersion() string

// Validation returns how to validate transactions for this chaincode.
// The string returned is the name of the validation method (usually 'vscc')
// and the bytes returned are the argument to the validation (in the case of
// 'vscc', this is a marshaled pb.VSCCArgs message).
Validation() (string, []byte)

// Endorsement returns how to endorse proposals for this chaincode.
// The string returns is the name of the endorsement method (usually 'escc').
Endorsement() string
}

ChaincodeData实现了ChaincodeDefinition接口,ChaincodeData是LSCC保存的数据,它其中有1个字段就是Policy。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Validation returns how to validate transactions for this chaincode.
// The string returned is the name of the validation method (usually 'vscc')
// and the bytes returned are the argument to the validation (in the case of
// 'vscc', this is a marshaled pb.VSCCArgs message).
func (cd *ChaincodeData) Validation() (string, []byte) {
return cd.Vscc, cd.Policy
}

//-------- ChaincodeData is stored on the LSCC -------

// ChaincodeData defines the datastructure for chaincodes to be serialized by proto
// Type provides an additional check by directing to use a specific package after instantiation
// Data is Type specifc (see CDSPackage and SignedCDSPackage)
type ChaincodeData struct {
// Name of the chaincode
Name string `protobuf:"bytes,1,opt,name=name"`

// Version of the chaincode
Version string `protobuf:"bytes,2,opt,name=version"`

// Escc for the chaincode instance
Escc string `protobuf:"bytes,3,opt,name=escc"`

// Vscc for the chaincode instance
Vscc string `protobuf:"bytes,4,opt,name=vscc"`

// 背书策略
// Policy endorsement policy for the chaincode instance
Policy []byte `protobuf:"bytes,5,opt,name=policy,proto3"`

// Data data specific to the package
Data []byte `protobuf:"bytes,6,opt,name=data,proto3"`

// Id of the chaincode that's the unique fingerprint for the CC This is not
// currently used anywhere but serves as a good eyecatcher
Id []byte `protobuf:"bytes,7,opt,name=id,proto3"`

// InstantiationPolicy for the chaincode
InstantiationPolicy []byte `protobuf:"bytes,8,opt,name=instantiation_policy,proto3"`
}

LSCC的Policy哪来的?

提醒:链码实例化在代码里使用Deploy,而不是Instantiate,这样可以让代码更简洁,所以链码实例化也常称为链码部署。

executeDeploy为部署链码,也就是在部署链码的时候会保存背书策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// executeDeploy implements the "instantiate" Invoke transaction
func (lscc *LifeCycleSysCC) executeDeploy(
stub shim.ChaincodeStubInterface,
chainname string,
cds *pb.ChaincodeDeploymentSpec,
policy []byte,
escc []byte,
vscc []byte,
cdfs *ccprovider.ChaincodeData,
ccpackfs ccprovider.CCPackage,
collectionConfigBytes []byte,
) (*ccprovider.ChaincodeData, error) {
//just test for existence of the chaincode in the LSCC
chaincodeName := cds.ChaincodeSpec.ChaincodeId.Name
_, err := lscc.getCCInstance(stub, chaincodeName)
if err == nil {
return nil, ExistsErr(chaincodeName)
}

//retain chaincode specific data and fill channel specific ones
cdfs.Escc = string(escc)
cdfs.Vscc = string(vscc)
// 保存背书策略
cdfs.Policy = policy
}

executeDeployOrUpgrade是执行链码实例化和升级时调用,它会传递Policy,在链码部署和升级时都会保存背书策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// executeDeployOrUpgrade routes the code path either to executeDeploy or executeUpgrade
// depending on its function argument
func (lscc *LifeCycleSysCC) executeDeployOrUpgrade(
stub shim.ChaincodeStubInterface,
chainname string,
cds *pb.ChaincodeDeploymentSpec,
policy, escc, vscc, collectionConfigBytes []byte,
function string,
) (*ccprovider.ChaincodeData, error) {

chaincodeName := cds.ChaincodeSpec.ChaincodeId.Name
chaincodeVersion := cds.ChaincodeSpec.ChaincodeId.Version

if err := lscc.isValidChaincodeName(chaincodeName); err != nil {
return nil, err
}

if err := lscc.isValidChaincodeVersion(chaincodeName, chaincodeVersion); err != nil {
return nil, err
}

ccpack, err := lscc.Support.GetChaincodeFromLocalStorage(chaincodeName, chaincodeVersion)
if err != nil {
retErrMsg := fmt.Sprintf("cannot get package for chaincode (%s:%s)", chaincodeName, chaincodeVersion)
logger.Errorf("%s-err:%s", retErrMsg, err)
return nil, fmt.Errorf("%s", retErrMsg)
}
cd := ccpack.GetChaincodeData()

switch function {
case DEPLOY:
return lscc.executeDeploy(stub, chainname, cds, policy, escc, vscc, cd, ccpack, collectionConfigBytes)
case UPGRADE:
return lscc.executeUpgrade(stub, chainname, cds, policy, escc, vscc, cd, ccpack, collectionConfigBytes)
default:
logger.Panicf("Programming error, unexpected function '%s'", function)
panic("") // unreachable code
}
}

LSCC也实现了ChainCode接口,与普通链码的实现并没有区别,只不过LSCC并不运行在容器中。LifeCycleSysCC.Invoke会根据参数调用不同的函数,而部署和升级时,会调用executeDeployOrUpgrade部署链码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Invoke implements lifecycle functions "deploy", "start", "stop", "upgrade".
// Deploy's arguments - {[]byte("deploy"), []byte(<chainname>), <unmarshalled pb.ChaincodeDeploymentSpec>}
//
// Invoke also implements some query-like functions
// Get chaincode arguments - {[]byte("getid"), []byte(<chainname>), []byte(<chaincodename>)}
func (lscc *LifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
...
switch function {
case INSTALL:
...
case DEPLOY, UPGRADE:
// 提取背书策略
// optional arguments here (they can each be nil and may or may not be present)
// args[3] is a marshalled SignaturePolicyEnvelope representing the endorsement policy
// args[4] is the name of escc
// args[5] is the name of vscc
// args[6] is a marshalled CollectionConfigPackage struct
var EP []byte
if len(args) > 3 && len(args[3]) > 0 {
EP = args[3]
} else {
p := cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel))
EP, err = utils.Marshal(p)
if err != nil {
return shim.Error(err.Error())
}
}
...
cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function)
...
case ...:
...
}
}

总结

我们终于知道Policy是哪来的,又是如何被使用的了。管理和查看链码信息,本质是创建一个调用LSCC的Proposal或者交易,链码的信息会保存在LSCC,当VSCC验证链码的交易时,会从LSCC获取信息,包括背书策略、vscc插件等,以验证交易。

最后,ESCC、VSCC也是进行了可插拔设计的。

endorser policy

序言

etcd raft定义了一些重要的结构体,来传递和表示raft使用到的数据。

在介绍各结构体之前,先澄清一下raft、log和state machine的关系,它们三个是独立的,没有隶属关系,尤其是state machine并不属于raft。

State machine

Consensus Module指raft算法,它输出一致的Log Entry序列,State machine指应用Entry后得到的状态,状态机是并不是raft的一部分,而是用来存储数据的模块。

Entry

每个Raft集群节点都是一个状态机,每个节点都使用相同的log entry序列修改状态机的数据,Entry就是每一个操作项,raft的核心能力就是为应用层提供序列相同的entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Entry struct {
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
}

type EntryType int32

const (
EntryNormal EntryType = 0
EntryConfChange EntryType = 1
EntryConfChangeV2 EntryType = 2
)

每一个Entry,都可以使用(Term, Index)进行唯一标记,相当于Entry的ID:

  • Term:即raft论文中的Term,表明了当前Entry所属的Term。raft不使用绝对时间,而是使用相对时间,它把时间分割成了大小不等的term,每一轮选举都会开启一个新的term,term值会连续累加。如果当前的节点已经是Term 10缺收到了Term 8的Entry,Term 8的Entry已经过时,会被丢弃。
  • Index:每一个Entry都有一个的Index,代表当前Entry在log entry序列中的位置,每个index上最终只有1个达成共识的Entry。

除了用于达成一致的Term和Index外,Entry还携带了数据:

  • Type:表明当前Entry的类型,EntryNormal代表是Entry携带的是修改状态机的操作数据,EntryConfChangeEntryConfChangeV2代表的是Entry携带的是修改当前raft集群的配置。
  • Data:是序列化的数据,不同的Type类型,对应不同的Data。

Snapshot

在Entry特别多的场景下,会存在一些问题,比如现在有1亿条已经达成一致Entry,后面还有源源不断的Entry产生,是否有以下问题:

  1. 这些Entry占用了大量的磁盘空间,但实际上过去的Entry已经对已经拥有这些Entry的节点没有意义了,只对那些没有Entry的节点有意义,leader把Entry发送给没有这些Entry节点,以让这些节点最终能和leader保持一致的状态。
  2. 有些follower非常慢,或者刚启动,或者重启过,与leader的当前状态已经严重脱节,让他们从Entry 0开始同步,然后应用到状态机,这种操作时间效率是不是非常慢?然后每一个Entry都会产生一个历史的状态,当产生新的状态之后,历史状态对当前节点也没有意义。

解决这种问题的办法就是快照,比如虚拟机的快照,或者docker镜像(镜像本质也是一种快照),有了快照就可以把状态机快速恢复到快照时的状态,空间和时间上效率都能提高很多

Raft可以定期产生一些快照,然后在这些快照上按序应用快照之后的Entry就能得到一致的状态。1亿个Entry + 1亿01个Entry得到的状态,跟第1亿个Entry后所产生的快照+1亿零1个Entry得到的状态是一致的。

1
2
3
4
5
6
7
8
9
10
type Snapshot struct {
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
}

type SnapshotMetadata struct {
ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"`
}
  • Data:是状态机中状态的快照。
  • Metadata:是快照自身相关的数据。
    • ConfState:是快照时,当前raft的配置状态,这些状态数据并不在状态机中,所以需要进行保存。
    • Index、Term:快照所依据的Entry所在的Index和Term。

Message

Raft集群节点之间的通信只使用了1个结构体Message,Message中有一个Type成员,表明了当前的Message是哪种消息,比如可以是Raft论文中提到的AppendEntries,RequestVotes等,目前实际可以容纳19种类型的消息,每种消息对Raft都有不同的作用,具体见这篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 不同的Message类型会用到不同的字段
type Message struct {
Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
To uint64 `protobuf:"varint,2,opt,name=to" json:"to"`
From uint64 `protobuf:"varint,3,opt,name=from" json:"from"`
Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"`
LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"`
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"`
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
}

Message中包含了很多字段,不同的消息类型使用的字段组合不相同,可以从不同消息的处理逻辑中看出来。

  • To, From:是消息的接收节点和发送节点的的Raft ID。
  • Term:创建Message时,发送节点所在的Term。
  • LogTerm:创建Message时,发送节点本地所保存的log entry序列中最大的Term,在选举的时候会使用。
  • Index:不同的消息类型,Index的含义不同。Term和Index与Entry中的Term和Index不一定会相同,因为某个follower可能比较慢,leader向follower发送已经committed的Entry。
  • Entries:发送给follower,待follower处理的Entry。
  • Commit:创建Message时,不同消息含义不同,Append时是发送节点本地已committed的Index,Heartbeat时是committed Index或者与follower匹配的Index。
  • Snapshot:leader传递给follower的snapshot。
  • Reject:投票和Append的响应消息使用,Reject表示拒绝leader发来的消息。
  • RejectHint:拒绝Append消息的响应消息使用,用来给leader提示,发送follower已有的最后一个Index。
  • Context:某些消息的附加信息,即用来存储通用的数据。比如竞选时,存放campaignTransfer

Storage

etcd/raft不负责持久化数据存储和网络通信,网络数据都是通过Node接口的函数传入和传出raft。持久化数据存储由创建raft.Node的应用层负责,包含:

  • 应用层使用Entry生成的状态机,即一致的应用数据。
  • WAL:Write Ahead Log,历史的Entry(包含还未达成一致的Entry)和快照数据。

Snapshot是已在节点间达成一致Entry的快照,快照之前的Entry必然都是已经达成一致的,而快照之后,有达成一致的,也有写入磁盘还未达成一致的Entry。etcd/raft会使用到这些Entry和快照,而Storage接口,就是用来读这些数据的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
// TODO(tbg): split this into two interfaces, LogStorage and StateStorage.

// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}

使用这个接口,从应用层读取:

  • InitialState:HardState和配置状态Confstate
  • Entries:根据Index获取连续的Entry
  • Term:获取某个Entry所在的Term
  • LastIndex:获取本节点已存储的最新的Entry的Index
  • FirstIndex:获取本节点已存储的第一个Entry的Index
  • Snapshot:获取本节点最近生成的Snapshot,Snapshot是由应用层创建的,并暂时保存起来,raft调用此接口读取

每次都从磁盘文件读取这些数据,效率必然是不高的,所以etcd/raft内定义了MemoryStorage,它实现了Storage接口,并且提供了函数来维护最新快照后的Entry,关于MemoryStorageraftLog小节,其中的storage即为MemoryStorage

unstable

因为Entry的存储是由应用层负责的,所以raft需要暂时存储还未存到Storage中的Entry或者Snapshot,在创建Ready时,Entry和Snapshot会被封装到Ready,由应用层写入到storage。

1
2
3
4
5
6
7
8
9
10
11
12
// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
...
}
  • Snapshot:是follower从leader收到的最新的Snapshot。
  • entries:对leader而已,是raft刚利用请求创建的Entry,对follower而言是从leader收到的Entry。
  • offset:Entries[i].Index = i + offset。

raftLog

raft使用raftLog来管理当前Entry序列和Snapshot等信息,它由Storage、unstable、committed和applied组成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage

// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable

// committed和applied是storage的2个整数下标
// committed到applied需要Ready
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
...
}

Storage和unstable前面已经介绍过了,所以介绍下committed和applied。

committed指最后一个在raft集群多数节点之间达成一致的Entry Index。

applied指当前节点被应用层应用到状态机的最后一个Entry Index。applied和committed之间的Entry就是等待被应用层应用到状态机的Entry。

前面提到Storage接口可以获取第一个索引firstIdx,最后一个索引lastIdx,在生成snapshot之后签名的Entry就可以删除了,所以firstidx是storage中snapshot后的第一个Entry的Index,lastIndex是storage中保存的最后一个Entry的Index,这个Entry可能还没有在raft集群多数节点之间达成一致,所以在committed之后,这些Entry是等待commit的Entry,leader发现某个Entry Index已经在多数节点之间达成一致,就会把committed移动到该Entry Index。

raftLog

SoftState

SoftState指易变的状态数据,记录了当前的Leader的Node ID,以及当前节点的角色。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
// leader的Node ID
Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
// 节点是什么角色:leader、follower...
RaftState StateType
}

// StateType represents the role of a node in a cluster.
type StateType uint64

var stmap = [...]string{
"StateFollower",
"StateCandidate",
"StateLeader",
"StatePreCandidate",
}

HardState

HardState是写入到WAL(存储Entry的文件)的状态,可以在节点重启时恢复raft的状态,它了记录:

  • Term:节点当前所在的Term。
  • Vote:节点在竞选期间所投的候选节点ID。
  • Commit:当前已经committed Entry Index。
1
2
3
4
5
type HardState struct {
Term uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
Vote uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
Commit uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
}

Ready

终于到etcd raft最重要的一个结构体了。raft使用Ready结构体对外传递数据,是多种数据的打包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState

// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState

// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState

// unstable的entry,即待写入到storage的entry
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry

// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot

// 待applied的entry
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry

// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message

// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}

SoftState、HardState、Entry、Snapshot、Message都已经介绍过,不再单独介绍含义。

Entries和CommittedEntries的区别是,Entries保存的是从unstable读取的Entry,它们即将被应用层写入storage,CommittedEntries是已经被Committed,还没有applied,应用层会把他们应用到状态机。

ReadStates用来处理读请求,MustSync用来指明应用层是否采用异步的方式写数据。

应用层在接收到Ready后,应当处理Ready中的每一个有效字段,处理完毕后,调用Advance()通知raft Ready已处理完毕。

前言

社区里在讨论一个问题,是由官方的文档引发的,文档上讲不同的peer可以使用不同语言的链码,前提是2份链码功能、接口等必须一致。

fabric chaincode error

大家的问题是:

一个链码可以采用不同的语言实现,不同peer上使用不同的链码真的可行吗?

经过实证,这是不可行的。

分2种情况,2种都有问题:

  1. 不同peer安装不同语言链码,然后同时实例化:实例化后,只能启动发送实例化交易的peer拥有的语言的链码
  2. 部分peer先实例化,另外peer再安装不同语言链码:调用链码时报指纹不匹配错误

不同peer安装不同语言链码,然后同时实例化

1、修改BFYN,只在peer0.org1和peer0.org2上安装Go语言链码,不进行后续操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Installing chaincode on peer0.org1...
+ peer chaincode install -n mycc -v 1.0 -l golang -p github.com/chaincode/chaincode_example02/go/
+ res=0
+ set +x
2019-09-03 02:08:43.813 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 02:08:43.813 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 02:08:44.108 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >
===================== Chaincode is installed on peer0.org1 =====================

Install chaincode on peer0.org2...
+ peer chaincode install -n mycc -v 1.0 -l golang -p github.com/chaincode/chaincode_example02/go/
+ res=0
+ set +x
2019-09-03 02:08:44.260 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 02:08:44.260 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 02:08:44.529 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >
===================== Chaincode is installed on peer0.org2 =====================


========= All GOOD, BYFN execution completed ===========

2、在peer1.org1上安装Java语言链码

1
2
3
4
root@6cec20eb7502:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode install -n mycc -v 1.0 -l java -p /opt/gopath/src/github.com/chaincode/chaincode_example02/java/
2019-09-03 03:19:44.710 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 03:19:44.711 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 03:19:44.754 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >

3、在peer1.org1上发起实例化链码

1
2
3
root@6cec20eb7502:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode instantiate -o orderer.example.com:7050 --tls true --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/example.com/orderers/orderer.example.com/msp/tlscacerts/tlsca.example.com-cert.pem -C mychannel -n mycc -l golang -v 1.0 -c '{"Args":["init","a","100","b","200"]}' -P 'OR ('\''Org1MSP.peer'\'','\''Org2MSP.peer'\'')'
2019-09-03 03:22:12.430 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 03:22:12.431 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc

4、查看链码容器,只有peer1.org1的链码容器,peer0.org1和peer0.org2的链码容器都没有起来。

1
2
3
4
5
6
7
8
9
➜  fabric-sdk-go-sample git:(master) ✗ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f8f6aa8b5da6 dev-peer1.org1.example.com-mycc-1.0-cd123150154e6bf2df7ce682e0b1bcbea40499416f37a6da3aae14c4eb51b08d "/root/chaincode-jav…" 37 seconds ago Up 36 seconds dev-peer1.org1.example.com-mycc-1.0
6cec20eb7502 hyperledger/fabric-tools:latest "/bin/bash" About an hour ago Up About an hour cli
7e134fe7e8e9 hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:8051->8051/tcp peer1.org1.example.com
ed6f5511d938 hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:10051->10051/tcp peer1.org2.example.com
025a71178777 hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:7051->7051/tcp peer0.org1.example.com
8687dfd14e7b hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:9051->9051/tcp peer0.org2.example.com
e9cc8b410d7f hyperledger/fabric-orderer:latest "orderer" About an hour ago Up About an hour 0.0.0.0:7050->7050/tcp orderer.example.com

部分peer先实例化,另外peer再安装不同语言链码

不改造BYFN,原生启动。peer0.org1,peer0.org2,peer1.org2都已经实例化了Go语言链码。

然后在peer1.org1上安装Java语言的链码,在执行Invoke或者查询,报指纹不匹配-数据不匹配的错误。

原因分析:操作链码时,会调用LSCC的LifeCycleSysCC.getCCCode获取链码,一份链码是从数据库取的,即当前链码容器的,一份链码是本地存储的,会对2份进行匹配,如果不匹配就会报指纹不匹配错误。

匹配函数为CDSPackage.ValidateCC,匹配项为:

  1. 名称、版本
  2. CodeHash、元数据Hash

调用链码时报的指纹不匹配错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode install -n mycc -v 1.0 -l java -p /opt/gopath/src/github.com/chaincode/chaincode_example02/java/
2019-09-03 01:52:15.714 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 01:52:15.714 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 01:52:15.755 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode query -C mychannel -n mycc -c '{"Args":["query","a"]}'
Error: endorsement failure during query. response: status:500 message:"failed to execute transaction b8b740aab0e6dd10cfe62416240ef94bfb90a55358904233c4d60dd5a39e6fe3: [channel mychannel] failed to get chaincode container info for mycc:1.0: could not get chaincode code: chaincode fingerprint mismatch: data mismatch"
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode invoke -o orderer.example.com:7050 --tls true --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/example.com/orderers/orderer.example.com/msp/tlscacerts/tlsca.example.com-cert.pem -C mychannel -n mycc --peerAddresses peer1.org1.example.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.example.com/peers/peer1.org1.example.com/tls/ca.crt -c '{"Args":["invoke","a","b","50"]}'
Error: endorsement failure during invoke. response: status:500 message:"failed to execute transaction aec5a0ccbcf86032774dc80220b90419d2816cc3f050a104c1cfcde55a2247cb: [channel mychannel] failed to get chaincode container info for mycc:1.0: could not get chaincode code: chaincode fingerprint mismatch: data mismatch"

文章

另外,社区里的hucg编写了一篇源码文章:https://blog.csdn.net/love_feng_forever/article/details/100532324

Hyperledger下有许多区块链相关的项目,曾以为它们分别是不同的类别,专注不同的功能,梳理一下其实并不是这样,比如光区块链框架就有Fabric、Iroha和Sawtooth。

hyperledger projects

各项目的简要介绍以及TWGC的介绍,一定要看看Baohua Yang大佬的PPT,ppt介绍Hyperledger社区的现状,各子项目概览,以及发展方向,以及Hyperledger 中国技术工作组的最新动态。。

做Fabric相关的工作,需要关注Fabric、Cello、Caliper、Explorer等几个项目。

fabric projects arch

前言

本文把与fabric网络交互的baas、应用程序、客户端统称成为客户端,它们可以使用sdk和fabric网络进行交互,sdk调用grpc可以与指定的peer和orderer进行通信,本文的目的是在BYFN搭建的fabric网络的基础之上,展示如何使用fabric-sdk-go操作链码。

fabric sdk

fabric-sdk-go项目简介

fabric-sdk-go是Fabric官方的Go语言SDK,它的目录结构如下:

fabric sdk go

有2个目录需要注意一下,internal和third_party,它们两个包含了sdk依赖的一些代码,来自于fabric、fabric-ca,当使用到fabric的一些类型时,应当使用以下的方式,而不是直接导入fabric或者fabric-ca:

1
import "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/xxx"

pkg目录是sdk的主要实现,doc 文档介绍了不同目录所提供的功能,以及给出了接口调用样例:

  • pkg/fabsdk:主package,主要用来生成fabsdk以及各种其他pkg使用的option context。
  • pkg/client/channel:主要用来调用、查询链码,或者注册链码事件。
  • pkg/client/resmgmt:主要用来fabric网络的管理,比如创建、加入通道,安装、实例化和升级链码。
  • pkg/client/event:配合channel模块来进行链码事件注册和过滤。
  • pkg/client/ledger:主要用来账本的查询,查询区块、交易、配置等。
  • pkg/client/msp:主要用来管理fabric的成员关系。

想用好fabric-go-sdk,建议仔细看看doc 文档

使用SDK步骤

  1. 为client编写配置文件config.yaml
  2. 为client创建fabric sdk实例fabsdk
  3. 为client创建resource manage client,简称RC,RC用来进行管理操作的client,比如通道的创建,链码的安装、实例化和升级等
  4. 为client创建channel client,简称CC,CC用来链码的调用、查询以及链码事件的注册和取消注册

SDK配置文件config.yaml

client使用sdk与fabric网络交互,需要告诉sdk两类信息:

  1. 我是谁:即当前client的信息,包含所属组织、密钥和证书文件的路径等,这是每个client专用的信息。
  2. 对方是谁:即fabric网络结构的信息,channel、org、orderer和peer等的怎么组合起当前fabric网络的,这些结构信息应当与configytx.yaml中是一致的。这是通用配置,每个客户端都可以拿来使用。另外,这部分信息并不需要是完整fabric网络信息,如果当前client只和部分节点交互,那配置文件中只需要包含所使用到的网络信息。

fabric sdk config

这里提供一个适合BFYN的精简配置文件fabric-sdk-go-sample/config.yaml

使用go mod管理依赖

fabric-sdk-go目前本身使用go modules管理依赖,从go.mod可知,依赖的一些包指定了具体的版本,如果项目依赖的版本和sdk依赖的版本不同,会产生编译问题。

建议项目也使用go moudles管理依赖,然后相同的软件包可以使用相同的版本,可以这样操作:

  1. go mod init初始化好项目的go.mod文件。
  2. 编写代码,完成后运行go mod run,会自动下载依赖的项目,但版本可能与fabric-sdk-go中的依赖版本不同,编译存在问题。
  3. go.mod中的内容复制到项目的go.mod中,然后保存,go mod会自动合并相同的依赖,运行go mod tidy,会自动添加新的依赖或删除不需要的依赖。

项目的go mod样例可以参考securekey/fabric-examples … /go.modshitaibin/fabric-sdk-go-sample/go.mod

创建Client

利用config.yaml创建fabsdk

通过config.FromFile解析配置文件,然后通过fabsdk.New创建sdk实例。

1
2
3
4
5
6
7
import "github.com/hyperledger/fabric-sdk-go/pkg/core/config"
import "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk"

sdk, err := fabsdk.New(config.FromFile(c.ConfigPath))
if err != nil {
log.Panicf("failed to create fabric sdk: %s", err)
}

创建RC

管理员账号才能进行fabric网络的管理操作,所以创建rc一定要使用管理员账号。

通过fabsdk.WithOrg("Org1")fabsdk.WithUser("Admin")指定Org1的Admin账户,使用sdk.Context创建clientProvider,然后通过resmgmt.New创建rc。

1
2
3
4
5
6
7
import 	"github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt"

rcp := sdk.Context(fabsdk.WithUser("Admin"), fabsdk.WithOrg("Org1"))
rc, err := resmgmt.New(rcp)
if err != nil {
log.Panicf("failed to create resource client: %s", err)
}

创建CC

创建cc使用用户账号,进行链码的调用和查询,使用sdk.ChannelContext创建channelProvider,需要指定channelID和用户User1,然后通过channel.New创建cc,此cc就是调用channelID对应channel上链码的channel client。

1
2
3
4
5
6
7
import 	"github.com/hyperledger/fabric-sdk-go/pkg/client/channel"

ccp := sdk.ChannelContext(ChannelID, fabsdk.WithUser("User1"))
cc, err := channel.New(ccp)
if err != nil {
log.Panicf("failed to create channel client: %s", err)
}

管理操作

安装链码

安装链码使用rc.InstallCC接口,需要指定resmgmt.InstallCCRequest以及在哪些peers上面安装。resmgmt.InstallCCRequest指明了链码ID、链码路径、链码版本以及打包后的链码。

打包链码需要使用到链码路径CCPathGoPathGoPath即本机的$GOPATHCCPath是相对于GoPath相对路径,如果路径设置不对,会造成sdk找不到链码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// pack the chaincode
ccPkg, err := gopackager.NewCCPackage("github.com/hyperledger/fabric-samples/chaincode/chaincode_example02/go/", "/Users/shitaibin/go")
if err != nil {
return errors.WithMessage(err, "pack chaincode error")
}

// new request of installing chaincode
req := resmgmt.InstallCCRequest{
Name: c.CCID,
Path: c.CCPath,
Version: v,
Package: ccPkg,
}

reqPeers := resmgmt.WithTargetEndpoints("peer0.org1.example.com")
resps, err := rc.InstallCC(req, reqPeers)
if err != nil {
return errors.WithMessage(err, "installCC error")
}

实例化链码

实例化链码需要使用rc.InstantiateCC接口,需要通过ChannelID、resmgmt.InstantiateCCRequest和peers,指明在哪个channel上实例化链码,请求包含了链码的ID、路径、版本,以及初始化参数和背书策略,背书策略可以通过cauthdsl.FromString生成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// endorser policy
org1OrOrg2 := "OR('Org1MSP.member','Org2MSP.member')"
ccPolicy, err := cauthdsl.FromString(org1OrOrg2)
if err != nil {
return errors.WithMessage(err, "gen policy from string error")
}

// new request
args := packArgs([]string{"init", "a", "100", "b", "200"})
req := resmgmt.InstantiateCCRequest{
Name: c.CCID,
Path: c.CCPath,
Version: v,
Args: args,
Policy: ccPolicy,
}

// send request and handle response
reqPeers := resmgmt.WithTargetEndpoints("peer0.org1.example.com")
resp, err := rc.InstantiateCC(ChannelID, req, reqPeers)
if err != nil {
return errors.WithMessage(err, "instantiate chaincode error")
}

升级链码

升级链码和实例化链码是非常相似的,不同点只在请求是resmgmt.UpgradeCCRequest,调用的接口是rc.UpgradeCC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// endorser policy
org1AndOrg2 := "AND('Org1MSP.member','Org2MSP.member')"
ccPolicy, err := c.genPolicy(org1AndOrg2)
if err != nil {
return errors.WithMessage(err, "gen policy from string error")
}

// new request
args := packArgs([]string{"init", "a", "100", "b", "200"})
req := resmgmt.UpgradeCCRequest{
Name: c.CCID,
Path: c.CCPath,
Version: v,
Args: args,
Policy: ccPolicy,
}

// send request and handle response
reqPeers := resmgmt.WithTargetEndpoints("peer0.org1.example.com")
resp, err := rc.UpgradeCC(ChannelID, req, reqPeers)
if err != nil {
return errors.WithMessage(err, "instantiate chaincode error")
}

查询操作

调用链码

调用链码使用cc.Execute接口,使用入参channel.Request和peers指明要让哪些peer上执行链码,进行背书。channel.Request指明了要调用的链码,以及链码内要Invoke的函数args,args是序列化的结果,序列化是自定义的,只要链码能够按相同的规则进行反序列化即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// new channel request for invoke
args := packArgs([]string{"a", "b", "10"})
req := channel.Request{
ChaincodeID: c.CCID,
Fcn: "invoke",
Args: args,
}

// send request and handle response
// peers is needed
reqPeers := channel.WithTargetEndpoints("peer0.org1.example.com")
resp, err := cc.Execute(req, reqPeers)
if err != nil {
return errors.WithMessage(err, "invoke chaincode error")
}
log.Printf("invoke chaincode tx: %s", resp.TransactionID)

查询链码

查询和调用链码是非常相似的,使用相同的channel.Request,指明了Invoke链码中的query函数,然后调用cc.Query进行查询操作,这样节点不会对请求进行背书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// new channel request for query
req := channel.Request{
ChaincodeID: c.CCID,
Fcn: "query",
Args: packArgs([]string{keys}),
}

// send request and handle response
reqPeers := channel.WithTargetEndpoints(peer)
resp, err := cc.Query(req, reqPeers)
if err != nil {
return errors.WithMessage(err, "query chaincode error")
}

log.Printf("query chaincode tx: %s", resp.TransactionID)
log.Printf("result: %v", string(resp.Payload))

示例项目

本文的基础是创建了一个结合fabric byfn的示例项目,在byfn的基础之上对链码进行安装、实例化、升级,调用和查询等操作,项目的使用可见项目README文档,项目地址:https://github.com/Shitaibin/fabric-sdk-go-sample ,项目样例执行后,可见新部署和升级成功的链码容器,操作日志可见项目。

byfn-sdk

这是一篇姊妹篇文章,浅析一下Go是如何实现protobuf编解码的:

  1. Go是如何实现protobuf的编解码的(1): 原理
  2. Go是如何实现protobuf的编解码的(2): 源码

本编是第二篇。

前言

上一篇文章Go是如何实现protobuf的编解码的(1):原理
中已经指出了Go语言数据和Protobuf数据的编解码是由包github.com/golang/protobuf/proto完成的,本编就来分析一下proto包是如何实现编解码的。

编解码原理

编解码包都有支持的编解码类型,我们暂且把这些类型称为底层类型,编解码的本质是:

  1. 为每一个底层类型配备一个或多个编解码函数
  2. 把一个结构体的字段,递归的拆解成底层类型,然后选择合适的函数进行编码或解码操作

接下来先看编码,再看解码。

编码

约定:以下所有的代码片,如果是request.pb.go或main.go中的代码,会在第一行标记文件名,否则都是proto包的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// main.go
package main

import (
"fmt"

"./types"
"github.com/golang/protobuf/proto"
)

func main() {
req := &types.Request{Data: "Hello Dabin"}

// Marshal
encoded, err := proto.Marshal(req)
if err != nil {
fmt.Printf("Encode to protobuf data error: %v", err)
}
...
}

编码调用的是proto.Marshal函数,它可以完成的是Go语言数据序列化成protobuf数据,返回序列化结果或错误。

proto编译成的Go结构体都是符合Message接口的,从Marshal可知Go结构体有3种序列化方式:

  1. pb Message满足newMarshaler接口,则调用XXX_Marshal()进行序列化。
  2. pb满足Marshaler接口,则调用Marshal()进行序列化,这种方式适合某类型自定义序列化规则的情况。
  3. 否则,使用默认的序列化方式,创建一个Warpper,利用wrapper对pb进行序列化,后面会介绍方式1实际就是使用方式3。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Marshal takes a protocol buffer message
// and encodes it into the wire format, returning the data.
// This is the main entry point.
func Marshal(pb Message) ([]byte, error) {
if m, ok := pb.(newMarshaler); ok {
siz := m.XXX_Size()
b := make([]byte, 0, siz)
return m.XXX_Marshal(b, false)
}
if m, ok := pb.(Marshaler); ok {
// If the message can marshal itself, let it do it, for compatibility.
// NOTE: This is not efficient.
return m.Marshal()
}
// in case somehow we didn't generate the wrapper
if pb == nil {
return nil, ErrNil
}
var info InternalMessageInfo
siz := info.Size(pb)
b := make([]byte, 0, siz)
return info.Marshal(b, pb, false)
}

newMarshalerMarshaler如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// newMarshaler is the interface representing objects that can marshal themselves.
//
// This exists to support protoc-gen-go generated messages.
// The proto package will stop type-asserting to this interface in the future.
//
// DO NOT DEPEND ON THIS.
type newMarshaler interface {
XXX_Size() int
XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
}

// Marshaler is the interface representing objects that can marshal themselves.
type Marshaler interface {
Marshal() ([]byte, error)
}

Request实现了newMarshaler接口,XXX_Marshal实现如下,它实际是调用了xxx_messageInfo_Request.Marshalxxx_messageInfo_Request是定义在request.pb.go中的一个全局变量,类型就是InternalMessageInfo,实际就是前文提到的wrapper。

1
2
3
4
5
6
7
8
// request.pb.go
func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
print("Called xxx marshal\n")
panic("I want see stack trace")
return xxx_messageInfo_Request.Marshal(b, m, deterministic)
}

var xxx_messageInfo_Request proto.InternalMessageInfo

本质上,XXX_Marshal也是wrapper,后面才是真正序列化的主体函数在proto包中。

InternalMessageInfo主要是用来缓存序列化和反序列化需要用到的信息。

1
2
3
4
5
6
7
8
9
// InternalMessageInfo is a type used internally by generated .pb.go files.
// This type is not intended to be used by non-generated code.
// This type is not subject to any compatibility guarantee.
type InternalMessageInfo struct {
marshal *marshalInfo // marshal信息
unmarshal *unmarshalInfo // unmarshal信息
merge *mergeInfo
discard *discardInfo
}

InternalMessageInfo.Marshal首先是获取待序列化类型的序列化信息u marshalInfo,然后利用u.marshal进行序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Marshal is the entry point from generated code,
// and should be ONLY called by generated code.
// It marshals msg to the end of b.
// a is a pointer to a place to store cached marshal info.
func (a *InternalMessageInfo) Marshal(b []byte, msg Message, deterministic bool) ([]byte, error) {
// 获取该message类型的MarshalInfo,这些信息都缓存起来了
// 大量并发时无需重复创建
u := getMessageMarshalInfo(msg, a)
// 入参校验
ptr := toPointer(&msg)
if ptr.isNil() {
// We get here if msg is a typed nil ((*SomeMessage)(nil)),
// so it satisfies the interface, and msg == nil wouldn't
// catch it. We don't want crash in this case.
return b, ErrNil
}
// 根据MarshalInfo对数据进行marshal
return u.marshal(b, ptr, deterministic)
}

由于每种类型的序列化信息是一致的,所以getMessageMarshalInfo对序列化信息进行了缓存,缓存在a.marshal中,如果a中不存在marshal信息,则去生成,但不进行初始化,然后保存到a中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func getMessageMarshalInfo(msg interface{}, a *InternalMessageInfo) *marshalInfo {
// u := a.marshal, but atomically.
// We use an atomic here to ensure memory consistency.
// 从InternalMessageInfo中读取
u := atomicLoadMarshalInfo(&a.marshal)
// 读取不到代表未保存过
if u == nil {
// Get marshal information from type of message.
t := reflect.ValueOf(msg).Type()
if t.Kind() != reflect.Ptr {
panic(fmt.Sprintf("cannot handle non-pointer message type %v", t))
}
u = getMarshalInfo(t.Elem())
// Store it in the cache for later users.
// a.marshal = u, but atomically.
atomicStoreMarshalInfo(&a.marshal, u)
}
return u
}

getMarshalInfo只是创建了一个marshalInfo对象,填充了字段typ,剩余的字段未填充。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// getMarshalInfo returns the information to marshal a given type of message.
// The info it returns may not necessarily initialized.
// t is the type of the message (NOT the pointer to it).
// 获取MarshalInfo结构体,如果不存在则使用message类型t创建1个
func getMarshalInfo(t reflect.Type) *marshalInfo {
marshalInfoLock.Lock()
u, ok := marshalInfoMap[t]
if !ok {
u = &marshalInfo{typ: t}
marshalInfoMap[t] = u
}
marshalInfoLock.Unlock()
return u
}

// marshalInfo is the information used for marshaling a message.
type marshalInfo struct {
typ reflect.Type
fields []*marshalFieldInfo
unrecognized field // offset of XXX_unrecognized
extensions field // offset of XXX_InternalExtensions
v1extensions field // offset of XXX_extensions
sizecache field // offset of XXX_sizecache
initialized int32 // 0 -- only typ is set, 1 -- fully initialized
messageset bool // uses message set wire format
hasmarshaler bool // has custom marshaler
sync.RWMutex // protect extElems map, also for initialization
extElems map[int32]*marshalElemInfo // info of extension elements
}

marshalInfo.marshal是Marshal真实主体,会判断u是否已经初始化,如果未初始化调用computeMarshalInfo计算Marshal需要的信息,实际就是填充marshalInfo中的各种字段。

u.hasmarshaler代表当前类型是否实现了Marshaler接口,直接调用Marshal函数进行序列化。可以确定Marshal函数的序列化方式2,即实现Marshaler接口的方法,最后肯定也会调用marshalInfo.marshal

该函数的主体是一个for循环,依次遍历该类型的每一个字段,对required属性进行校验,然后按字段类型,调用f.marshaler对该字段类型进行序列化。这个f.marshaler哪来的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// marshal is the main function to marshal a message. It takes a byte slice and appends
// the encoded data to the end of the slice, returns the slice and error (if any).
// ptr is the pointer to the message.
// If deterministic is true, map is marshaled in deterministic order.
// 该函数是Marshal的主体函数,把消息编码为数据后,追加到b之后,最后返回b。
// deterministic为true代表map会以确定的顺序进行编码。
func (u *marshalInfo) marshal(b []byte, ptr pointer, deterministic bool) ([]byte, error) {
// 初始化marshalInfo的基础信息
// 主要是根据已有信息填充该结构体的一些字段
if atomic.LoadInt32(&u.initialized) == 0 {
u.computeMarshalInfo()
}

// If the message can marshal itself, let it do it, for compatibility.
// NOTE: This is not efficient.
// 如果该类型实现了Marshaler接口,即能够对自己Marshal,则自行Marshal
// 结果追加到b
if u.hasmarshaler {
m := ptr.asPointerTo(u.typ).Interface().(Marshaler)
b1, err := m.Marshal()
b = append(b, b1...)
return b, err
}

var err, errLater error
// The old marshaler encodes extensions at beginning.
// 检查扩展字段,把message的扩展字段追加到b
if u.extensions.IsValid() {
// offset函数用来根据指针偏移量获取message的指定字段
e := ptr.offset(u.extensions).toExtensions()
if u.messageset {
b, err = u.appendMessageSet(b, e, deterministic)
} else {
b, err = u.appendExtensions(b, e, deterministic)
}
if err != nil {
return b, err
}
}
if u.v1extensions.IsValid() {
m := *ptr.offset(u.v1extensions).toOldExtensions()
b, err = u.appendV1Extensions(b, m, deterministic)
if err != nil {
return b, err
}
}

// 遍历message的每一个字段,检查并做编码,然后追加到b
for _, f := range u.fields {
if f.required {
// 如果required的字段未设置,则记录错误,所有的marshal工作完成后再处理
if ptr.offset(f.field).getPointer().isNil() {
// Required field is not set.
// We record the error but keep going, to give a complete marshaling.
if errLater == nil {
errLater = &RequiredNotSetError{f.name}
}
continue
}
}
// 字段为指针类型,并且为nil,代表未设置,该字段无需编码
if f.isPointer && ptr.offset(f.field).getPointer().isNil() {
// nil pointer always marshals to nothing
continue
}
// 利用这个字段的marshaler进行编码
b, err = f.marshaler(b, ptr.offset(f.field), f.wiretag, deterministic)
if err != nil {
if err1, ok := err.(*RequiredNotSetError); ok {
// required字段但未设置错误
// Required field in submessage is not set.
// We record the error but keep going, to give a complete marshaling.
if errLater == nil {
errLater = &RequiredNotSetError{f.name + "." + err1.field}
}
continue
}
// “动态数组”中包含nil元素
if err == errRepeatedHasNil {
err = errors.New("proto: repeated field " + f.name + " has nil element")
}
if err == errInvalidUTF8 {
if errLater == nil {
fullName := revProtoTypes[reflect.PtrTo(u.typ)] + "." + f.name
errLater = &invalidUTF8Error{fullName}
}
continue
}
return b, err
}
}
// 为识别的类型字段,直接转为bytes,追加到b
// computeMarshalInfo中已经收集这些字段
if u.unrecognized.IsValid() {
s := *ptr.offset(u.unrecognized).toBytes()
b = append(b, s...)
}
return b, errLater
}

computeMarshalInfo实际上就是对要序列化的类型,进行一次全面检查,设置好序列化要使用的数据,这其中就包含了各字段的序列化函数f.marshaler。我们就重点关注下这部分,struct的每一个字段都会分配一个marshalFieldInfo,代表这个字段序列化需要的信息,会调用computeMarshalFieldInfo会填充这个对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// computeMarshalInfo initializes the marshal info.
func (u *marshalInfo) computeMarshalInfo() {
// 加锁,代表了不能同时计算marshal信息
u.Lock()
defer u.Unlock()
// 计算1次即可
if u.initialized != 0 { // non-atomic read is ok as it is protected by the lock
return
}

// 获取要marshal的message类型
t := u.typ
u.unrecognized = invalidField
u.extensions = invalidField
u.v1extensions = invalidField
u.sizecache = invalidField

// If the message can marshal itself, let it do it, for compatibility.
// 判断当前类型是否实现了Marshal接口,如果实现标记为类型自有marshaler
// 没用类型断言是因为t是Type类型,不是保存在某个接口的变量
// NOTE: This is not efficient.
if reflect.PtrTo(t).Implements(marshalerType) {
u.hasmarshaler = true
atomic.StoreInt32(&u.initialized, 1)
// 可以直接返回了,后面使用自有的marshaler编码
return
}

// get oneof implementers
// 看*t实现了以下哪个接口,oneof特性
var oneofImplementers []interface{}
switch m := reflect.Zero(reflect.PtrTo(t)).Interface().(type) {
case oneofFuncsIface:
_, _, _, oneofImplementers = m.XXX_OneofFuncs()
case oneofWrappersIface:
oneofImplementers = m.XXX_OneofWrappers()
}

n := t.NumField()

// deal with XXX fields first
// 遍历t的每一个XXX字段
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
// 跳过非XXX开头的字段
if !strings.HasPrefix(f.Name, "XXX_") {
continue
}
// 处理以下几个protobuf自带的字段
switch f.Name {
case "XXX_sizecache":
u.sizecache = toField(&f)
case "XXX_unrecognized":
u.unrecognized = toField(&f)
case "XXX_InternalExtensions":
u.extensions = toField(&f)
u.messageset = f.Tag.Get("protobuf_messageset") == "1"
case "XXX_extensions":
u.v1extensions = toField(&f)
case "XXX_NoUnkeyedLiteral":
// nothing to do
default:
panic("unknown XXX field: " + f.Name)
}
n--
}

// normal fields
// 处理message的普通字段
fields := make([]marshalFieldInfo, n) // batch allocation
u.fields = make([]*marshalFieldInfo, 0, n)
for i, j := 0, 0; i < t.NumField(); i++ {
f := t.Field(i)

// 跳过XXX字段
if strings.HasPrefix(f.Name, "XXX_") {
continue
}

// 取fields的下一个有效字段,指针类型
// j代表了fields有效字段数量,n是包含了XXX字段的总字段数量
field := &fields[j]
j++
field.name = f.Name
// 填充到u.fields
u.fields = append(u.fields, field)
// 字段的tag里包含“protobuf_oneof”特殊处理
if f.Tag.Get("protobuf_oneof") != "" {
field.computeOneofFieldInfo(&f, oneofImplementers)
continue
}
// 字段里不包含“protobuf”,代表不是protoc自动生成的字段
if f.Tag.Get("protobuf") == "" {
// field has no tag (not in generated message), ignore it
// 删除刚刚保存的字段信息
u.fields = u.fields[:len(u.fields)-1]
j--
continue
}
// 填充字段的marshal信息
field.computeMarshalFieldInfo(&f)
}

// fields are marshaled in tag order on the wire.
// 字段排序
sort.Sort(byTag(u.fields))

// 初始化完成
atomic.StoreInt32(&u.initialized, 1)
}

回顾一下Request的定义,它包含1个字段Data,后面protobuf:...描述了protobuf要使用的信息,"bytes,..."这段被称为tags,用逗号进行分割后,其中:

  • tags[0]: bytes,代表Data类型的数据要被转换为bytes
  • tags[1]: 1,代表了字段的ID
  • tags[2]: opt,代表可行,非必须
  • tags[3]: name=data,proto文件中的名称
  • tags[4]: proto3,代表使用的protobuf版本
1
2
3
4
5
// request.pb.go
type Request struct{
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
...
}

computeMarshalFieldInfo首先要获取字段ID和要转换的类型,填充到marshalFieldInfo,然后调用setMarshaler利用字段f和tags获取该字段类型的序列化函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// computeMarshalFieldInfo fills up the information to marshal a field.
func (fi *marshalFieldInfo) computeMarshalFieldInfo(f *reflect.StructField) {
// parse protobuf tag of the field.
// tag has format of "bytes,49,opt,name=foo,def=hello!"
// 获取"protobuf"的完整tag,然后使用,分割,得到上面的格式
tags := strings.Split(f.Tag.Get("protobuf"), ",")
if tags[0] == "" {
return
}
// tag的编号,即message中设置的string name = x,则x就是这个字段的tag id
tag, err := strconv.Atoi(tags[1])
if err != nil {
panic("tag is not an integer")
}
// 要转换成的类型,bytes,varint等等
wt := wiretype(tags[0])
// 设置字段是required还是opt
if tags[2] == "req" {
fi.required = true
}
// 设置field和tag信息到marshalFieldInfo
fi.setTag(f, tag, wt)
// 根据当前的tag信息(类型等),选择marshaler函数
fi.setMarshaler(f, tags)
}

setMarshaler的重点是typeMarshalertypeMarshaler这个函数非常长,其实就是根据类型设置返回对于的序列化函数,比如Bool、Int32、Uint32…,如果是结构体、切片等复合类型,就可以形成递归了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// setMarshaler fills up the sizer and marshaler in the info of a field.
func (fi *marshalFieldInfo) setMarshaler(f *reflect.StructField, tags []string) {
// map类型字段特殊处理
switch f.Type.Kind() {
case reflect.Map:
// map field
fi.isPointer = true
fi.sizer, fi.marshaler = makeMapMarshaler(f)
return
case reflect.Ptr, reflect.Slice:
// 指针字段和切片字段标记指针类型
fi.isPointer = true
}

// 根据字段类型和tag选择marshaler
fi.sizer, fi.marshaler = typeMarshaler(f.Type, tags, true, false)
}

// typeMarshaler returns the sizer and marshaler of a given field.
// t is the type of the field.
// tags is the generated "protobuf" tag of the field.
// If nozero is true, zero value is not marshaled to the wire.
// If oneof is true, it is a oneof field.
// 函数非常长,省略内容
func typeMarshaler(t reflect.Type, tags []string, nozero, oneof bool) (sizer, marshaler) {
...
switch t.Kind() {
case reflect.Bool:
if pointer {
return sizeBoolPtr, appendBoolPtr
}
if slice {
if packed {
return sizeBoolPackedSlice, appendBoolPackedSlice
}
return sizeBoolSlice, appendBoolSlice
}
if nozero {
return sizeBoolValueNoZero, appendBoolValueNoZero
}
return sizeBoolValue, appendBoolValue
case reflect.Uint32:
...
case reflect.Int32:
....
case reflect.Struct:
...
}

以下是Bool和String类型的2个序列化函数示例:

1
2
3
4
5
6
7
8
9
10
func appendBoolValue(b []byte, ptr pointer, wiretag uint64, _ bool) ([]byte, error) {
v := *ptr.toBool()
b = appendVarint(b, wiretag)
if v {
b = append(b, 1)
} else {
b = append(b, 0)
}
return b, nil
}
1
2
3
4
5
6
7
func appendStringValue(b []byte, ptr pointer, wiretag uint64, _ bool) ([]byte, error) {
v := *ptr.toString()
b = appendVarint(b, wiretag)
b = appendVarint(b, uint64(len(v)))
b = append(b, v...)
return b, nil
}

所以序列化后的[]byte,应当是符合这种模式:

1
| wiretag | data | wiretag | data | ... | data |

OK,以上就是编码的主要流程,简单回顾一下:

  1. proto.Marshal会调用*.pb.go中自动生成的Wrapper函数,Wrapper函数会调用InternalMessageInfo进行序列化,然后才步入序列化的正题
  2. 首先获取要序列化类型的marshal信息u,如果u没有初始化,则进行初始化,即设置好结构体每个字段的序列化函数,以及其他信息
  3. 遍历结构体的每个字段,使用u中的信息为每个字段进行编码,并把加过追加到[]byte,所以字段编码完成,则返回序列化的结果[]byte或者错误。

解码

解码的流程其实与编码很类似,会是上面回顾的3大步骤,主要的区别在步骤2:它要获取的是序列化类型的unmarshal信息u,如果u没有初始化,会进行初始化,设置的是结构体每个字段的反序列化函数,以及其他信息。

所以解码的函数解析会简要的过一遍,不再有编码那么详细的解释。

下面是proto包中反序列化的接口和函数定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Unmarshaler is the interface representing objects that can
// unmarshal themselves. The argument points to data that may be
// overwritten, so implementations should not keep references to the
// buffer.
// Unmarshal implementations should not clear the receiver.
// Any unmarshaled data should be merged into the receiver.
// Callers of Unmarshal that do not want to retain existing data
// should Reset the receiver before calling Unmarshal.
type Unmarshaler interface {
Unmarshal([]byte) error
}

// newUnmarshaler is the interface representing objects that can
// unmarshal themselves. The semantics are identical to Unmarshaler.
//
// This exists to support protoc-gen-go generated messages.
// The proto package will stop type-asserting to this interface in the future.
//
// DO NOT DEPEND ON THIS.
type newUnmarshaler interface {
// 实现了XXX_Unmarshal
XXX_Unmarshal([]byte) error
}

// Unmarshal parses the protocol buffer representation in buf and places the
// decoded result in pb. If the struct underlying pb does not match
// the data in buf, the results can be unpredictable.
//
// Unmarshal resets pb before starting to unmarshal, so any
// existing data in pb is always removed. Use UnmarshalMerge
// to preserve and append to existing data.
func Unmarshal(buf []byte, pb Message) error {
pb.Reset()
// pb自己有unmarshal函数,实现了newUnmarshaler接口
if u, ok := pb.(newUnmarshaler); ok {
return u.XXX_Unmarshal(buf)
}
// pb自己有unmarshal函数,实现了Unmarshaler接口
if u, ok := pb.(Unmarshaler); ok {
return u.Unmarshal(buf)
}
// 使用默认的Unmarshal
return NewBuffer(buf).Unmarshal(pb)
}

Request实现了Unmarshaler接口:

1
2
3
4
// request.pb.go
func (m *Request) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Request.Unmarshal(m, b)
}

反序列化也是使用InternalMessageInfo进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Unmarshal is the entry point from the generated .pb.go files.
// This function is not intended to be used by non-generated code.
// This function is not subject to any compatibility guarantee.
// msg contains a pointer to a protocol buffer struct.
// b is the data to be unmarshaled into the protocol buffer.
// a is a pointer to a place to store cached unmarshal information.
func (a *InternalMessageInfo) Unmarshal(msg Message, b []byte) error {
// Load the unmarshal information for this message type.
// The atomic load ensures memory consistency.
// 获取保存在a中的unmarshal信息
u := atomicLoadUnmarshalInfo(&a.unmarshal)
if u == nil {
// Slow path: find unmarshal info for msg, update a with it.
u = getUnmarshalInfo(reflect.TypeOf(msg).Elem())
atomicStoreUnmarshalInfo(&a.unmarshal, u)
}
// Then do the unmarshaling.
// 执行unmarshal
err := u.unmarshal(toPointer(&msg), b)
return err
}

以下是反序列化的主题函数,u未初始化时会调用computeUnmarshalInfo设置反序列化需要的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// unmarshal does the main work of unmarshaling a message.
// u provides type information used to unmarshal the message.
// m is a pointer to a protocol buffer message.
// b is a byte stream to unmarshal into m.
// This is top routine used when recursively unmarshaling submessages.
func (u *unmarshalInfo) unmarshal(m pointer, b []byte) error {
if atomic.LoadInt32(&u.initialized) == 0 {
// 为u填充unmarshal信息,以及设置每个字段类型的unmarshaler函数
u.computeUnmarshalInfo()
}
if u.isMessageSet {
return unmarshalMessageSet(b, m.offset(u.extensions).toExtensions())
}
var reqMask uint64 // bitmask of required fields we've seen.
var errLater error
for len(b) > 0 {
// Read tag and wire type.
// Special case 1 and 2 byte varints.
var x uint64
if b[0] < 128 {
x = uint64(b[0])
b = b[1:]
} else if len(b) >= 2 && b[1] < 128 {
x = uint64(b[0]&0x7f) + uint64(b[1])<<7
b = b[2:]
} else {
var n int
x, n = decodeVarint(b)
if n == 0 {
return io.ErrUnexpectedEOF
}
b = b[n:]
}
// 获取tag和wire标记
tag := x >> 3
wire := int(x) & 7

// Dispatch on the tag to one of the unmarshal* functions below.
// 根据tag选择该类型的unmarshalFieldInfo:f
var f unmarshalFieldInfo
if tag < uint64(len(u.dense)) {
f = u.dense[tag]
} else {
f = u.sparse[tag]
}
// 如果该类型有unmarshaler函数,则执行解码和错误处理
if fn := f.unmarshal; fn != nil {
var err error
// 从b解析,然后填充到f的对应字段
b, err = fn(b, m.offset(f.field), wire)
if err == nil {
reqMask |= f.reqMask
continue
}
if r, ok := err.(*RequiredNotSetError); ok {
// Remember this error, but keep parsing. We need to produce
// a full parse even if a required field is missing.
if errLater == nil {
errLater = r
}
reqMask |= f.reqMask
continue
}
if err != errInternalBadWireType {
if err == errInvalidUTF8 {
if errLater == nil {
fullName := revProtoTypes[reflect.PtrTo(u.typ)] + "." + f.name
errLater = &invalidUTF8Error{fullName}
}
continue
}
return err
}
// Fragments with bad wire type are treated as unknown fields.
}

// Unknown tag.
// 跳过未知tag,可能是proto中的message定义升级了,增加了一些字段,使用老版本的,就不识别新的字段
if !u.unrecognized.IsValid() {
// Don't keep unrecognized data; just skip it.
var err error
b, err = skipField(b, wire)
if err != nil {
return err
}
continue
}
// 检查未识别字段是不是extension
// Keep unrecognized data around.
// maybe in extensions, maybe in the unrecognized field.
z := m.offset(u.unrecognized).toBytes()
var emap map[int32]Extension
var e Extension
for _, r := range u.extensionRanges {
if uint64(r.Start) <= tag && tag <= uint64(r.End) {
if u.extensions.IsValid() {
mp := m.offset(u.extensions).toExtensions()
emap = mp.extensionsWrite()
e = emap[int32(tag)]
z = &e.enc
break
}
if u.oldExtensions.IsValid() {
p := m.offset(u.oldExtensions).toOldExtensions()
emap = *p
if emap == nil {
emap = map[int32]Extension{}
*p = emap
}
e = emap[int32(tag)]
z = &e.enc
break
}
panic("no extensions field available")
}
}

// Use wire type to skip data.
var err error
b0 := b
b, err = skipField(b, wire)
if err != nil {
return err
}
*z = encodeVarint(*z, tag<<3|uint64(wire))
*z = append(*z, b0[:len(b0)-len(b)]...)

if emap != nil {
emap[int32(tag)] = e
}
}
// 校验解析到的required字段的数量,如果与u中记录的不匹配,则报错
if reqMask != u.reqMask && errLater == nil {
// A required field of this message is missing.
for _, n := range u.reqFields {
if reqMask&1 == 0 {
errLater = &RequiredNotSetError{n}
}
reqMask >>= 1
}
}
return errLater
}

设置字段反序列化函数的过程不看了,看一下怎么选函数的,typeUnmarshaler是为字段类型,选择反序列化函数,这些函数选择与序列化函数是一一对应的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// typeUnmarshaler returns an unmarshaler for the given field type / field tag pair.
func typeUnmarshaler(t reflect.Type, tags string) unmarshaler {
...
// Figure out packaging (pointer, slice, or both)
slice := false
pointer := false
if t.Kind() == reflect.Slice && t.Elem().Kind() != reflect.Uint8 {
slice = true
t = t.Elem()
}
if t.Kind() == reflect.Ptr {
pointer = true
t = t.Elem()
}
...
switch t.Kind() {
case reflect.Bool:
if pointer {
return unmarshalBoolPtr
}
if slice {
return unmarshalBoolSlice
}
return unmarshalBoolValue
}
}

unmarshalBoolValue是默认的Bool类型反序列化函数,会把protobuf数据b解码,然后转换为bool类型v,最后赋值给字段f。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func unmarshalBoolValue(b []byte, f pointer, w int) ([]byte, error) {
if w != WireVarint {
return b, errInternalBadWireType
}
// Note: any length varint is allowed, even though any sane
// encoder will use one byte.
// See https://github.com/golang/protobuf/issues/76
x, n := decodeVarint(b)
if n == 0 {
return nil, io.ErrUnexpectedEOF
}
// TODO: check if x>1? Tests seem to indicate no.
// toBool是返回bool类型的指针
// 完成对字段f的赋值
v := x != 0
*f.toBool() = v
return b[n:], nil
}

总结

本文分析了Go语言protobuf数据的序列化和反序列过程,可以简要概括为:

  1. proto.Marshalproto.Unmarshal会调用*.pb.go中自动生成的Wrapper函数,Wrapper函数会调用InternalMessageInfo进行(反)序列化,然后才步入(反)序列化的正题
  2. 首先获取要目标类型的(un)marshal信息u,如果u没有初始化,则进行初始化,即设置好结构体每个字段的(反)序列化函数,以及其他信息
  3. 遍历结构体的每个字段,使用u中的信息为每个字段进行编码,生成序列化的结果,或进行解码,给结构体成员进行赋值

参考文章

以下参考文章都值得阅读:

这是一篇姊妹篇文章,浅析一下Go是如何实现protobuf编解码的:

  1. Go是如何实现protobuf的编解码的(1): 原理
  2. Go是如何实现protobuf的编解码的(2): 源码

本编是第一篇。

Protocol Buffers介绍

Protocol buffers缩写为protobuf,是由Google创造的一种用于序列化的标记语言,项目Github仓库:https://github.com/protocolbuffers/protobuf。

Protobuf主要用于不同的编程语言的协作RPC场景下,定义需要序列化的数据格式。Protobuf本质上仅仅是一种用于交互的结构式定义,从功能上和XML、JSON等各种其他的交互形式都并无本质不同,只负责定义不负责数据编解码

其官方介绍如下:

Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

Protocol buffers的多语言支持

protobuf是支持多种编程语言的,即多种编程语言的类型数据可以转换成protobuf定义的类型数据,各种语言的类型对应可以看此介绍

我们介绍一下protobuf对多语言的支持原理。protobuf有个程序叫protoc,它是一个编译程序,负责把proto文件编译成对应语言的文件,它已经支持了C++、C#、Java、Python,而对于Go和Dart需要安装插件才能配合生成对于语言的文件。

对于C++,protoc可以把a.proto,编译成a.pb.ha.pb.cc

对于Go,protoc需要使用插件protoc-gen-go,把a.proto,编译成a.pb.go,其中包含了定义的数据类型,它的序列化和反序列化函数等。

敲黑板,对Go语言,protoc只负责利用protoc-gen-go把proto文件编译成Go语言文件,并不负责序列化和反序列化,生成的Go语言文件中的序列化和反序列化操作都是只是wrapper。

那Go语言对protobuf的序列化和反序列化,是由谁完成的?

github.com/golang/protobuf/proto完成,它负责把结构体等序列化成proto数据([]byte),把proto数据反序列化成Go结构体。

OK,原理部分就铺垫这些,看一个简单样例,了解protoc和protoc-gen-go的使用,以及进行序列化和反序列化操作。

一个Hello World样例

根据上面的介绍,Go语言使用protobuf我们要先安装2个工具:protoc和protoc-gen-go。

安装protoc和protoc-gen-go

首先去下载页下载符合你系统的protoc,本文示例版本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
➜  protoc-3.9.0-osx-x86_64 tree .
.
├── bin
│   └── protoc
├── include
│   └── google
│   └── protobuf
│   ├── any.proto
│   ├── api.proto
│   ├── compiler
│   │   └── plugin.proto
│   ├── descriptor.proto
│   ├── duration.proto
│   ├── empty.proto
│   ├── field_mask.proto
│   ├── source_context.proto
│   ├── struct.proto
│   ├── timestamp.proto
│   ├── type.proto
│   └── wrappers.proto
└── readme.txt

5 directories, 14 files

protoc的安装步骤在readme.txt中:

To install, simply place this binary somewhere in your PATH.

protoc-3.9.0-osx-x86_64/bin加入到PATH。

If you intend to use the included well known types then don’t forget to
copy the contents of the ‘include’ directory somewhere as well, for example
into ‘/usr/local/include/‘.

如果使用已经定义好的类型,即上面include目录*.proto文件中的类型,把include目录下文件,拷贝到/usr/local/include/

安装protoc-gen-go:

1
go get –u github.com/golang/protobuf/protoc-gen-go

检查安装,应该能查到这2个程序的位置:

1
2
3
4
➜  fabric git:(release-1.4) which protoc
/usr/local/bin/protoc
➜ fabric git:(release-1.4) which protoc-gen-go
/Users/shitaibin/go/bin/protoc-gen-go

Hello world

创建了一个使用protoc的小玩具,项目地址Github: golang_step_by_step

它的目录结构如下:

1
2
3
4
5
6
➜  protobuf git:(master) tree helloworld1
helloworld1
├── main.go
├── request.proto
└── types
└── request.pb.go

定义proto文件

使用proto3,定义一个Request,request.proto内容如下:

1
2
3
4
5
6
7
8
// file: request.proto
syntax = "proto3";
package helloworld;
option go_package="./types";

message Request {
string data = 1;
}
  • syntax:protobuf版本,现在是proto3
  • package:不完全等价于Go的package,最好另行设定go_package,指定根据protoc文件生成的go语言文件的package名称。
  • message:会编译成Go的struct
    • string data = 1:代表request的成员data是string类型,该成员的id是1,protoc给每个成员都定义一个编号,编解码的时候使用编号代替使用成员名称,压缩数据量。

编译proto文件

1
$ protoc --go_out=. ./request.proto

--go_out指明了要把./request.proto编译成Go语言文件,生成的是./types/request.pb.go,注意观察一下为Request结构体生产的2个方法XXX_UnmarshalXXX_Marshal,文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// file: ./types/request.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: request.proto

package types

import (
fmt "fmt"
math "math"

proto "github.com/golang/protobuf/proto"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

type Request struct {
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
// 以下是protobuf自动填充的字段,protobuf需要使用
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) {
return fileDescriptor_7f73548e33e655fe, []int{0}
}

// 反序列化函数
func (m *Request) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Request.Unmarshal(m, b)
}
// 序列化函数
func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Request.Marshal(b, m, deterministic)
}
func (m *Request) XXX_Merge(src proto.Message) {
xxx_messageInfo_Request.Merge(m, src)
}
func (m *Request) XXX_Size() int {
return xxx_messageInfo_Request.Size(m)
}
func (m *Request) XXX_DiscardUnknown() {
xxx_messageInfo_Request.DiscardUnknown(m)
}

var xxx_messageInfo_Request proto.InternalMessageInfo

// 获取字段
func (m *Request) GetData() string {
if m != nil {
return m.Data
}
return ""
}

func init() {
proto.RegisterType((*Request)(nil), "helloworld.Request")
}

func init() { proto.RegisterFile("request.proto", fileDescriptor_7f73548e33e655fe) }

var fileDescriptor_7f73548e33e655fe = []byte{
// 91 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0x4a, 0x2d, 0x2c,
0x4d, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x48, 0xcd, 0xc9, 0xc9,
0x2f, 0xcf, 0x2f, 0xca, 0x49, 0x51, 0x92, 0xe5, 0x62, 0x0f, 0x82, 0x48, 0x0a, 0x09, 0x71, 0xb1,
0xa4, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x4e, 0x9c, 0x51,
0xec, 0x7a, 0xfa, 0x25, 0x95, 0x05, 0xa9, 0xc5, 0x49, 0x6c, 0x60, 0xcd, 0xc6, 0x80, 0x00, 0x00,
0x00, 0xff, 0xff, 0x2e, 0x52, 0x69, 0xb5, 0x4d, 0x00, 0x00, 0x00,
}

编写Go语言程序

下面这段测试程序就是创建了一个请求,序列化又反序列化的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// file: main.go
package main

import (
"fmt"

"./types"
"github.com/golang/protobuf/proto"
)

func main() {
req := &types.Request{Data: "Hello LIB"}

// Marshal
encoded, err := proto.Marshal(req)
if err != nil {
fmt.Printf("Encode to protobuf data error: %v", err)
}

// Unmarshal
var unmarshaledReq types.Request
err = proto.Unmarshal(encoded, &unmarshaledReq)
if err != nil {
fmt.Printf("Unmarshal to struct error: %v", err)
}

fmt.Printf("req: %v\n", req.String())
fmt.Printf("unmarshaledReq: %v\n", unmarshaledReq.String())
}

运行结果:

1
2
3
➜  helloworld1 git:(master) go run main.go
req: data:"Hello LIB"
unmarshaledReq: data:"Hello LIB"

以上都是铺垫,下一节的proto包怎么实现编解码才是重点,protobuf用法可以去翻:

  1. 官方介绍:protoc3介绍编码介绍Go教程
  2. 煎鱼grpc系列文章

参考文章