0%

序言

etcd raft定义了一些重要的结构体,来传递和表示raft使用到的数据。

在介绍各结构体之前,先澄清一下raft、log和state machine的关系,它们三个是独立的,没有隶属关系,尤其是state machine并不属于raft。

State machine

Consensus Module指raft算法,它输出一致的Log Entry序列,State machine指应用Entry后得到的状态,状态机是并不是raft的一部分,而是用来存储数据的模块。

Entry

每个Raft集群节点都是一个状态机,每个节点都使用相同的log entry序列修改状态机的数据,Entry就是每一个操作项,raft的核心能力就是为应用层提供序列相同的entry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Entry struct {
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
}

type EntryType int32

const (
EntryNormal EntryType = 0
EntryConfChange EntryType = 1
EntryConfChangeV2 EntryType = 2
)

每一个Entry,都可以使用(Term, Index)进行唯一标记,相当于Entry的ID:

  • Term:即raft论文中的Term,表明了当前Entry所属的Term。raft不使用绝对时间,而是使用相对时间,它把时间分割成了大小不等的term,每一轮选举都会开启一个新的term,term值会连续累加。如果当前的节点已经是Term 10缺收到了Term 8的Entry,Term 8的Entry已经过时,会被丢弃。
  • Index:每一个Entry都有一个的Index,代表当前Entry在log entry序列中的位置,每个index上最终只有1个达成共识的Entry。

除了用于达成一致的Term和Index外,Entry还携带了数据:

  • Type:表明当前Entry的类型,EntryNormal代表是Entry携带的是修改状态机的操作数据,EntryConfChangeEntryConfChangeV2代表的是Entry携带的是修改当前raft集群的配置。
  • Data:是序列化的数据,不同的Type类型,对应不同的Data。

Snapshot

在Entry特别多的场景下,会存在一些问题,比如现在有1亿条已经达成一致Entry,后面还有源源不断的Entry产生,是否有以下问题:

  1. 这些Entry占用了大量的磁盘空间,但实际上过去的Entry已经对已经拥有这些Entry的节点没有意义了,只对那些没有Entry的节点有意义,leader把Entry发送给没有这些Entry节点,以让这些节点最终能和leader保持一致的状态。
  2. 有些follower非常慢,或者刚启动,或者重启过,与leader的当前状态已经严重脱节,让他们从Entry 0开始同步,然后应用到状态机,这种操作时间效率是不是非常慢?然后每一个Entry都会产生一个历史的状态,当产生新的状态之后,历史状态对当前节点也没有意义。

解决这种问题的办法就是快照,比如虚拟机的快照,或者docker镜像(镜像本质也是一种快照),有了快照就可以把状态机快速恢复到快照时的状态,空间和时间上效率都能提高很多

Raft可以定期产生一些快照,然后在这些快照上按序应用快照之后的Entry就能得到一致的状态。1亿个Entry + 1亿01个Entry得到的状态,跟第1亿个Entry后所产生的快照+1亿零1个Entry得到的状态是一致的。

1
2
3
4
5
6
7
8
9
10
type Snapshot struct {
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
}

type SnapshotMetadata struct {
ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"`
}
  • Data:是状态机中状态的快照。
  • Metadata:是快照自身相关的数据。
    • ConfState:是快照时,当前raft的配置状态,这些状态数据并不在状态机中,所以需要进行保存。
    • Index、Term:快照所依据的Entry所在的Index和Term。

Message

Raft集群节点之间的通信只使用了1个结构体Message,Message中有一个Type成员,表明了当前的Message是哪种消息,比如可以是Raft论文中提到的AppendEntries,RequestVotes等,目前实际可以容纳19种类型的消息,每种消息对Raft都有不同的作用,具体见这篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 不同的Message类型会用到不同的字段
type Message struct {
Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
To uint64 `protobuf:"varint,2,opt,name=to" json:"to"`
From uint64 `protobuf:"varint,3,opt,name=from" json:"from"`
Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"`
LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"`
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"`
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
}

Message中包含了很多字段,不同的消息类型使用的字段组合不相同,可以从不同消息的处理逻辑中看出来。

  • To, From:是消息的接收节点和发送节点的的Raft ID。
  • Term:创建Message时,发送节点所在的Term。
  • LogTerm:创建Message时,发送节点本地所保存的log entry序列中最大的Term,在选举的时候会使用。
  • Index:不同的消息类型,Index的含义不同。Term和Index与Entry中的Term和Index不一定会相同,因为某个follower可能比较慢,leader向follower发送已经committed的Entry。
  • Entries:发送给follower,待follower处理的Entry。
  • Commit:创建Message时,不同消息含义不同,Append时是发送节点本地已committed的Index,Heartbeat时是committed Index或者与follower匹配的Index。
  • Snapshot:leader传递给follower的snapshot。
  • Reject:投票和Append的响应消息使用,Reject表示拒绝leader发来的消息。
  • RejectHint:拒绝Append消息的响应消息使用,用来给leader提示,发送follower已有的最后一个Index。
  • Context:某些消息的附加信息,即用来存储通用的数据。比如竞选时,存放campaignTransfer

Storage

etcd/raft不负责持久化数据存储和网络通信,网络数据都是通过Node接口的函数传入和传出raft。持久化数据存储由创建raft.Node的应用层负责,包含:

  • 应用层使用Entry生成的状态机,即一致的应用数据。
  • WAL:Write Ahead Log,历史的Entry(包含还未达成一致的Entry)和快照数据。

Snapshot是已在节点间达成一致Entry的快照,快照之前的Entry必然都是已经达成一致的,而快照之后,有达成一致的,也有写入磁盘还未达成一致的Entry。etcd/raft会使用到这些Entry和快照,而Storage接口,就是用来读这些数据的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
// TODO(tbg): split this into two interfaces, LogStorage and StateStorage.

// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}

使用这个接口,从应用层读取:

  • InitialState:HardState和配置状态Confstate
  • Entries:根据Index获取连续的Entry
  • Term:获取某个Entry所在的Term
  • LastIndex:获取本节点已存储的最新的Entry的Index
  • FirstIndex:获取本节点已存储的第一个Entry的Index
  • Snapshot:获取本节点最近生成的Snapshot,Snapshot是由应用层创建的,并暂时保存起来,raft调用此接口读取

每次都从磁盘文件读取这些数据,效率必然是不高的,所以etcd/raft内定义了MemoryStorage,它实现了Storage接口,并且提供了函数来维护最新快照后的Entry,关于MemoryStorageraftLog小节,其中的storage即为MemoryStorage

unstable

因为Entry的存储是由应用层负责的,所以raft需要暂时存储还未存到Storage中的Entry或者Snapshot,在创建Ready时,Entry和Snapshot会被封装到Ready,由应用层写入到storage。

1
2
3
4
5
6
7
8
9
10
11
12
// unstable.entries[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
// position in storage; this means that the next write to storage
// might need to truncate the log before persisting unstable.entries.
type unstable struct {
// the incoming unstable snapshot, if any.
snapshot *pb.Snapshot
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
...
}
  • Snapshot:是follower从leader收到的最新的Snapshot。
  • entries:对leader而已,是raft刚利用请求创建的Entry,对follower而言是从leader收到的Entry。
  • offset:Entries[i].Index = i + offset。

raftLog

raft使用raftLog来管理当前Entry序列和Snapshot等信息,它由Storage、unstable、committed和applied组成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage

// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable

// committed和applied是storage的2个整数下标
// committed到applied需要Ready
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
...
}

Storage和unstable前面已经介绍过了,所以介绍下committed和applied。

committed指最后一个在raft集群多数节点之间达成一致的Entry Index。

applied指当前节点被应用层应用到状态机的最后一个Entry Index。applied和committed之间的Entry就是等待被应用层应用到状态机的Entry。

前面提到Storage接口可以获取第一个索引firstIdx,最后一个索引lastIdx,在生成snapshot之后签名的Entry就可以删除了,所以firstidx是storage中snapshot后的第一个Entry的Index,lastIndex是storage中保存的最后一个Entry的Index,这个Entry可能还没有在raft集群多数节点之间达成一致,所以在committed之后,这些Entry是等待commit的Entry,leader发现某个Entry Index已经在多数节点之间达成一致,就会把committed移动到该Entry Index。

raftLog

SoftState

SoftState指易变的状态数据,记录了当前的Leader的Node ID,以及当前节点的角色。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// SoftState provides state that is useful for logging and debugging.
// The state is volatile and does not need to be persisted to the WAL.
type SoftState struct {
// leader的Node ID
Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
// 节点是什么角色:leader、follower...
RaftState StateType
}

// StateType represents the role of a node in a cluster.
type StateType uint64

var stmap = [...]string{
"StateFollower",
"StateCandidate",
"StateLeader",
"StatePreCandidate",
}

HardState

HardState是写入到WAL(存储Entry的文件)的状态,可以在节点重启时恢复raft的状态,它了记录:

  • Term:节点当前所在的Term。
  • Vote:节点在竞选期间所投的候选节点ID。
  • Commit:当前已经committed Entry Index。
1
2
3
4
5
type HardState struct {
Term uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
Vote uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
Commit uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
}

Ready

终于到etcd raft最重要的一个结构体了。raft使用Ready结构体对外传递数据,是多种数据的打包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Ready encapsulates the entries and messages that are ready to read,
// be saved to stable storage, committed or sent to other peers.
// All fields in Ready are read-only.
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState

// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState

// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState

// unstable的entry,即待写入到storage的entry
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry

// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot

// 待applied的entry
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry

// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message

// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}

SoftState、HardState、Entry、Snapshot、Message都已经介绍过,不再单独介绍含义。

Entries和CommittedEntries的区别是,Entries保存的是从unstable读取的Entry,它们即将被应用层写入storage,CommittedEntries是已经被Committed,还没有applied,应用层会把他们应用到状态机。

ReadStates用来处理读请求,MustSync用来指明应用层是否采用异步的方式写数据。

应用层在接收到Ready后,应当处理Ready中的每一个有效字段,处理完毕后,调用Advance()通知raft Ready已处理完毕。

前言

社区里在讨论一个问题,是由官方的文档引发的,文档上讲不同的peer可以使用不同语言的链码,前提是2份链码功能、接口等必须一致。

fabric chaincode error

大家的问题是:

一个链码可以采用不同的语言实现,不同peer上使用不同的链码真的可行吗?

经过实证,这是不可行的。

分2种情况,2种都有问题:

  1. 不同peer安装不同语言链码,然后同时实例化:实例化后,只能启动发送实例化交易的peer拥有的语言的链码
  2. 部分peer先实例化,另外peer再安装不同语言链码:调用链码时报指纹不匹配错误

不同peer安装不同语言链码,然后同时实例化

1、修改BFYN,只在peer0.org1和peer0.org2上安装Go语言链码,不进行后续操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Installing chaincode on peer0.org1...
+ peer chaincode install -n mycc -v 1.0 -l golang -p github.com/chaincode/chaincode_example02/go/
+ res=0
+ set +x
2019-09-03 02:08:43.813 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 02:08:43.813 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 02:08:44.108 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >
===================== Chaincode is installed on peer0.org1 =====================

Install chaincode on peer0.org2...
+ peer chaincode install -n mycc -v 1.0 -l golang -p github.com/chaincode/chaincode_example02/go/
+ res=0
+ set +x
2019-09-03 02:08:44.260 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 02:08:44.260 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 02:08:44.529 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >
===================== Chaincode is installed on peer0.org2 =====================


========= All GOOD, BYFN execution completed ===========

2、在peer1.org1上安装Java语言链码

1
2
3
4
root@6cec20eb7502:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode install -n mycc -v 1.0 -l java -p /opt/gopath/src/github.com/chaincode/chaincode_example02/java/
2019-09-03 03:19:44.710 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 03:19:44.711 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 03:19:44.754 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >

3、在peer1.org1上发起实例化链码

1
2
3
root@6cec20eb7502:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode instantiate -o orderer.example.com:7050 --tls true --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/example.com/orderers/orderer.example.com/msp/tlscacerts/tlsca.example.com-cert.pem -C mychannel -n mycc -l golang -v 1.0 -c '{"Args":["init","a","100","b","200"]}' -P 'OR ('\''Org1MSP.peer'\'','\''Org2MSP.peer'\'')'
2019-09-03 03:22:12.430 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 03:22:12.431 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc

4、查看链码容器,只有peer1.org1的链码容器,peer0.org1和peer0.org2的链码容器都没有起来。

1
2
3
4
5
6
7
8
9
➜  fabric-sdk-go-sample git:(master) ✗ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f8f6aa8b5da6 dev-peer1.org1.example.com-mycc-1.0-cd123150154e6bf2df7ce682e0b1bcbea40499416f37a6da3aae14c4eb51b08d "/root/chaincode-jav…" 37 seconds ago Up 36 seconds dev-peer1.org1.example.com-mycc-1.0
6cec20eb7502 hyperledger/fabric-tools:latest "/bin/bash" About an hour ago Up About an hour cli
7e134fe7e8e9 hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:8051->8051/tcp peer1.org1.example.com
ed6f5511d938 hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:10051->10051/tcp peer1.org2.example.com
025a71178777 hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:7051->7051/tcp peer0.org1.example.com
8687dfd14e7b hyperledger/fabric-peer:latest "peer node start" About an hour ago Up About an hour 0.0.0.0:9051->9051/tcp peer0.org2.example.com
e9cc8b410d7f hyperledger/fabric-orderer:latest "orderer" About an hour ago Up About an hour 0.0.0.0:7050->7050/tcp orderer.example.com

部分peer先实例化,另外peer再安装不同语言链码

不改造BYFN,原生启动。peer0.org1,peer0.org2,peer1.org2都已经实例化了Go语言链码。

然后在peer1.org1上安装Java语言的链码,在执行Invoke或者查询,报指纹不匹配-数据不匹配的错误。

原因分析:操作链码时,会调用LSCC的LifeCycleSysCC.getCCCode获取链码,一份链码是从数据库取的,即当前链码容器的,一份链码是本地存储的,会对2份进行匹配,如果不匹配就会报指纹不匹配错误。

匹配函数为CDSPackage.ValidateCC,匹配项为:

  1. 名称、版本
  2. CodeHash、元数据Hash

调用链码时报的指纹不匹配错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode install -n mycc -v 1.0 -l java -p /opt/gopath/src/github.com/chaincode/chaincode_example02/java/
2019-09-03 01:52:15.714 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 001 Using default escc
2019-09-03 01:52:15.714 UTC [chaincodeCmd] checkChaincodeCmdParams -> INFO 002 Using default vscc
2019-09-03 01:52:15.755 UTC [chaincodeCmd] install -> INFO 003 Installed remotely response:<status:200 payload:"OK" >
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode query -C mychannel -n mycc -c '{"Args":["query","a"]}'
Error: endorsement failure during query. response: status:500 message:"failed to execute transaction b8b740aab0e6dd10cfe62416240ef94bfb90a55358904233c4d60dd5a39e6fe3: [channel mychannel] failed to get chaincode container info for mycc:1.0: could not get chaincode code: chaincode fingerprint mismatch: data mismatch"
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer#
root@d0533ffe1864:/opt/gopath/src/github.com/hyperledger/fabric/peer# peer chaincode invoke -o orderer.example.com:7050 --tls true --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/example.com/orderers/orderer.example.com/msp/tlscacerts/tlsca.example.com-cert.pem -C mychannel -n mycc --peerAddresses peer1.org1.example.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.example.com/peers/peer1.org1.example.com/tls/ca.crt -c '{"Args":["invoke","a","b","50"]}'
Error: endorsement failure during invoke. response: status:500 message:"failed to execute transaction aec5a0ccbcf86032774dc80220b90419d2816cc3f050a104c1cfcde55a2247cb: [channel mychannel] failed to get chaincode container info for mycc:1.0: could not get chaincode code: chaincode fingerprint mismatch: data mismatch"

文章

另外,社区里的hucg编写了一篇源码文章:https://blog.csdn.net/love_feng_forever/article/details/100532324

Hyperledger下有许多区块链相关的项目,曾以为它们分别是不同的类别,专注不同的功能,梳理一下其实并不是这样,比如光区块链框架就有Fabric、Iroha和Sawtooth。

hyperledger projects

各项目的简要介绍以及TWGC的介绍,一定要看看Baohua Yang大佬的PPT,ppt介绍Hyperledger社区的现状,各子项目概览,以及发展方向,以及Hyperledger 中国技术工作组的最新动态。。

做Fabric相关的工作,需要关注Fabric、Cello、Caliper、Explorer等几个项目。

fabric projects arch

前言

本文把与fabric网络交互的baas、应用程序、客户端统称成为客户端,它们可以使用sdk和fabric网络进行交互,sdk调用grpc可以与指定的peer和orderer进行通信,本文的目的是在BYFN搭建的fabric网络的基础之上,展示如何使用fabric-sdk-go操作链码。

fabric sdk

fabric-sdk-go项目简介

fabric-sdk-go是Fabric官方的Go语言SDK,它的目录结构如下:

fabric sdk go

有2个目录需要注意一下,internal和third_party,它们两个包含了sdk依赖的一些代码,来自于fabric、fabric-ca,当使用到fabric的一些类型时,应当使用以下的方式,而不是直接导入fabric或者fabric-ca:

1
import "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/xxx"

pkg目录是sdk的主要实现,doc 文档介绍了不同目录所提供的功能,以及给出了接口调用样例:

  • pkg/fabsdk:主package,主要用来生成fabsdk以及各种其他pkg使用的option context。
  • pkg/client/channel:主要用来调用、查询链码,或者注册链码事件。
  • pkg/client/resmgmt:主要用来fabric网络的管理,比如创建、加入通道,安装、实例化和升级链码。
  • pkg/client/event:配合channel模块来进行链码事件注册和过滤。
  • pkg/client/ledger:主要用来账本的查询,查询区块、交易、配置等。
  • pkg/client/msp:主要用来管理fabric的成员关系。

想用好fabric-go-sdk,建议仔细看看doc 文档

使用SDK步骤

  1. 为client编写配置文件config.yaml
  2. 为client创建fabric sdk实例fabsdk
  3. 为client创建resource manage client,简称RC,RC用来进行管理操作的client,比如通道的创建,链码的安装、实例化和升级等
  4. 为client创建channel client,简称CC,CC用来链码的调用、查询以及链码事件的注册和取消注册

SDK配置文件config.yaml

client使用sdk与fabric网络交互,需要告诉sdk两类信息:

  1. 我是谁:即当前client的信息,包含所属组织、密钥和证书文件的路径等,这是每个client专用的信息。
  2. 对方是谁:即fabric网络结构的信息,channel、org、orderer和peer等的怎么组合起当前fabric网络的,这些结构信息应当与configytx.yaml中是一致的。这是通用配置,每个客户端都可以拿来使用。另外,这部分信息并不需要是完整fabric网络信息,如果当前client只和部分节点交互,那配置文件中只需要包含所使用到的网络信息。

fabric sdk config

这里提供一个适合BFYN的精简配置文件fabric-sdk-go-sample/config.yaml

使用go mod管理依赖

fabric-sdk-go目前本身使用go modules管理依赖,从go.mod可知,依赖的一些包指定了具体的版本,如果项目依赖的版本和sdk依赖的版本不同,会产生编译问题。

建议项目也使用go moudles管理依赖,然后相同的软件包可以使用相同的版本,可以这样操作:

  1. go mod init初始化好项目的go.mod文件。
  2. 编写代码,完成后运行go mod run,会自动下载依赖的项目,但版本可能与fabric-sdk-go中的依赖版本不同,编译存在问题。
  3. go.mod中的内容复制到项目的go.mod中,然后保存,go mod会自动合并相同的依赖,运行go mod tidy,会自动添加新的依赖或删除不需要的依赖。

项目的go mod样例可以参考securekey/fabric-examples … /go.modshitaibin/fabric-sdk-go-sample/go.mod

创建Client

利用config.yaml创建fabsdk

通过config.FromFile解析配置文件,然后通过fabsdk.New创建sdk实例。

1
2
3
4
5
6
7
import "github.com/hyperledger/fabric-sdk-go/pkg/core/config"
import "github.com/hyperledger/fabric-sdk-go/pkg/fabsdk"

sdk, err := fabsdk.New(config.FromFile(c.ConfigPath))
if err != nil {
log.Panicf("failed to create fabric sdk: %s", err)
}

创建RC

管理员账号才能进行fabric网络的管理操作,所以创建rc一定要使用管理员账号。

通过fabsdk.WithOrg("Org1")fabsdk.WithUser("Admin")指定Org1的Admin账户,使用sdk.Context创建clientProvider,然后通过resmgmt.New创建rc。

1
2
3
4
5
6
7
import 	"github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt"

rcp := sdk.Context(fabsdk.WithUser("Admin"), fabsdk.WithOrg("Org1"))
rc, err := resmgmt.New(rcp)
if err != nil {
log.Panicf("failed to create resource client: %s", err)
}

创建CC

创建cc使用用户账号,进行链码的调用和查询,使用sdk.ChannelContext创建channelProvider,需要指定channelID和用户User1,然后通过channel.New创建cc,此cc就是调用channelID对应channel上链码的channel client。

1
2
3
4
5
6
7
import 	"github.com/hyperledger/fabric-sdk-go/pkg/client/channel"

ccp := sdk.ChannelContext(ChannelID, fabsdk.WithUser("User1"))
cc, err := channel.New(ccp)
if err != nil {
log.Panicf("failed to create channel client: %s", err)
}

管理操作

安装链码

安装链码使用rc.InstallCC接口,需要指定resmgmt.InstallCCRequest以及在哪些peers上面安装。resmgmt.InstallCCRequest指明了链码ID、链码路径、链码版本以及打包后的链码。

打包链码需要使用到链码路径CCPathGoPathGoPath即本机的$GOPATHCCPath是相对于GoPath相对路径,如果路径设置不对,会造成sdk找不到链码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// pack the chaincode
ccPkg, err := gopackager.NewCCPackage("github.com/hyperledger/fabric-samples/chaincode/chaincode_example02/go/", "/Users/shitaibin/go")
if err != nil {
return errors.WithMessage(err, "pack chaincode error")
}

// new request of installing chaincode
req := resmgmt.InstallCCRequest{
Name: c.CCID,
Path: c.CCPath,
Version: v,
Package: ccPkg,
}

reqPeers := resmgmt.WithTargetEndpoints("peer0.org1.example.com")
resps, err := rc.InstallCC(req, reqPeers)
if err != nil {
return errors.WithMessage(err, "installCC error")
}

实例化链码

实例化链码需要使用rc.InstantiateCC接口,需要通过ChannelID、resmgmt.InstantiateCCRequest和peers,指明在哪个channel上实例化链码,请求包含了链码的ID、路径、版本,以及初始化参数和背书策略,背书策略可以通过cauthdsl.FromString生成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// endorser policy
org1OrOrg2 := "OR('Org1MSP.member','Org2MSP.member')"
ccPolicy, err := cauthdsl.FromString(org1OrOrg2)
if err != nil {
return errors.WithMessage(err, "gen policy from string error")
}

// new request
args := packArgs([]string{"init", "a", "100", "b", "200"})
req := resmgmt.InstantiateCCRequest{
Name: c.CCID,
Path: c.CCPath,
Version: v,
Args: args,
Policy: ccPolicy,
}

// send request and handle response
reqPeers := resmgmt.WithTargetEndpoints("peer0.org1.example.com")
resp, err := rc.InstantiateCC(ChannelID, req, reqPeers)
if err != nil {
return errors.WithMessage(err, "instantiate chaincode error")
}

升级链码

升级链码和实例化链码是非常相似的,不同点只在请求是resmgmt.UpgradeCCRequest,调用的接口是rc.UpgradeCC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// endorser policy
org1AndOrg2 := "AND('Org1MSP.member','Org2MSP.member')"
ccPolicy, err := c.genPolicy(org1AndOrg2)
if err != nil {
return errors.WithMessage(err, "gen policy from string error")
}

// new request
args := packArgs([]string{"init", "a", "100", "b", "200"})
req := resmgmt.UpgradeCCRequest{
Name: c.CCID,
Path: c.CCPath,
Version: v,
Args: args,
Policy: ccPolicy,
}

// send request and handle response
reqPeers := resmgmt.WithTargetEndpoints("peer0.org1.example.com")
resp, err := rc.UpgradeCC(ChannelID, req, reqPeers)
if err != nil {
return errors.WithMessage(err, "instantiate chaincode error")
}

查询操作

调用链码

调用链码使用cc.Execute接口,使用入参channel.Request和peers指明要让哪些peer上执行链码,进行背书。channel.Request指明了要调用的链码,以及链码内要Invoke的函数args,args是序列化的结果,序列化是自定义的,只要链码能够按相同的规则进行反序列化即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// new channel request for invoke
args := packArgs([]string{"a", "b", "10"})
req := channel.Request{
ChaincodeID: c.CCID,
Fcn: "invoke",
Args: args,
}

// send request and handle response
// peers is needed
reqPeers := channel.WithTargetEndpoints("peer0.org1.example.com")
resp, err := cc.Execute(req, reqPeers)
if err != nil {
return errors.WithMessage(err, "invoke chaincode error")
}
log.Printf("invoke chaincode tx: %s", resp.TransactionID)

查询链码

查询和调用链码是非常相似的,使用相同的channel.Request,指明了Invoke链码中的query函数,然后调用cc.Query进行查询操作,这样节点不会对请求进行背书。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// new channel request for query
req := channel.Request{
ChaincodeID: c.CCID,
Fcn: "query",
Args: packArgs([]string{keys}),
}

// send request and handle response
reqPeers := channel.WithTargetEndpoints(peer)
resp, err := cc.Query(req, reqPeers)
if err != nil {
return errors.WithMessage(err, "query chaincode error")
}

log.Printf("query chaincode tx: %s", resp.TransactionID)
log.Printf("result: %v", string(resp.Payload))

示例项目

本文的基础是创建了一个结合fabric byfn的示例项目,在byfn的基础之上对链码进行安装、实例化、升级,调用和查询等操作,项目的使用可见项目README文档,项目地址:https://github.com/Shitaibin/fabric-sdk-go-sample ,项目样例执行后,可见新部署和升级成功的链码容器,操作日志可见项目。

byfn-sdk

这是一篇姊妹篇文章,浅析一下Go是如何实现protobuf编解码的:

  1. Go是如何实现protobuf的编解码的(1): 原理
  2. Go是如何实现protobuf的编解码的(2): 源码

本编是第二篇。

前言

上一篇文章Go是如何实现protobuf的编解码的(1):原理
中已经指出了Go语言数据和Protobuf数据的编解码是由包github.com/golang/protobuf/proto完成的,本编就来分析一下proto包是如何实现编解码的。

编解码原理

编解码包都有支持的编解码类型,我们暂且把这些类型称为底层类型,编解码的本质是:

  1. 为每一个底层类型配备一个或多个编解码函数
  2. 把一个结构体的字段,递归的拆解成底层类型,然后选择合适的函数进行编码或解码操作

接下来先看编码,再看解码。

编码

约定:以下所有的代码片,如果是request.pb.go或main.go中的代码,会在第一行标记文件名,否则都是proto包的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// main.go
package main

import (
"fmt"

"./types"
"github.com/golang/protobuf/proto"
)

func main() {
req := &types.Request{Data: "Hello Dabin"}

// Marshal
encoded, err := proto.Marshal(req)
if err != nil {
fmt.Printf("Encode to protobuf data error: %v", err)
}
...
}

编码调用的是proto.Marshal函数,它可以完成的是Go语言数据序列化成protobuf数据,返回序列化结果或错误。

proto编译成的Go结构体都是符合Message接口的,从Marshal可知Go结构体有3种序列化方式:

  1. pb Message满足newMarshaler接口,则调用XXX_Marshal()进行序列化。
  2. pb满足Marshaler接口,则调用Marshal()进行序列化,这种方式适合某类型自定义序列化规则的情况。
  3. 否则,使用默认的序列化方式,创建一个Warpper,利用wrapper对pb进行序列化,后面会介绍方式1实际就是使用方式3。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Marshal takes a protocol buffer message
// and encodes it into the wire format, returning the data.
// This is the main entry point.
func Marshal(pb Message) ([]byte, error) {
if m, ok := pb.(newMarshaler); ok {
siz := m.XXX_Size()
b := make([]byte, 0, siz)
return m.XXX_Marshal(b, false)
}
if m, ok := pb.(Marshaler); ok {
// If the message can marshal itself, let it do it, for compatibility.
// NOTE: This is not efficient.
return m.Marshal()
}
// in case somehow we didn't generate the wrapper
if pb == nil {
return nil, ErrNil
}
var info InternalMessageInfo
siz := info.Size(pb)
b := make([]byte, 0, siz)
return info.Marshal(b, pb, false)
}

newMarshalerMarshaler如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// newMarshaler is the interface representing objects that can marshal themselves.
//
// This exists to support protoc-gen-go generated messages.
// The proto package will stop type-asserting to this interface in the future.
//
// DO NOT DEPEND ON THIS.
type newMarshaler interface {
XXX_Size() int
XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
}

// Marshaler is the interface representing objects that can marshal themselves.
type Marshaler interface {
Marshal() ([]byte, error)
}

Request实现了newMarshaler接口,XXX_Marshal实现如下,它实际是调用了xxx_messageInfo_Request.Marshalxxx_messageInfo_Request是定义在request.pb.go中的一个全局变量,类型就是InternalMessageInfo,实际就是前文提到的wrapper。

1
2
3
4
5
6
7
8
// request.pb.go
func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
print("Called xxx marshal\n")
panic("I want see stack trace")
return xxx_messageInfo_Request.Marshal(b, m, deterministic)
}

var xxx_messageInfo_Request proto.InternalMessageInfo

本质上,XXX_Marshal也是wrapper,后面才是真正序列化的主体函数在proto包中。

InternalMessageInfo主要是用来缓存序列化和反序列化需要用到的信息。

1
2
3
4
5
6
7
8
9
// InternalMessageInfo is a type used internally by generated .pb.go files.
// This type is not intended to be used by non-generated code.
// This type is not subject to any compatibility guarantee.
type InternalMessageInfo struct {
marshal *marshalInfo // marshal信息
unmarshal *unmarshalInfo // unmarshal信息
merge *mergeInfo
discard *discardInfo
}

InternalMessageInfo.Marshal首先是获取待序列化类型的序列化信息u marshalInfo,然后利用u.marshal进行序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Marshal is the entry point from generated code,
// and should be ONLY called by generated code.
// It marshals msg to the end of b.
// a is a pointer to a place to store cached marshal info.
func (a *InternalMessageInfo) Marshal(b []byte, msg Message, deterministic bool) ([]byte, error) {
// 获取该message类型的MarshalInfo,这些信息都缓存起来了
// 大量并发时无需重复创建
u := getMessageMarshalInfo(msg, a)
// 入参校验
ptr := toPointer(&msg)
if ptr.isNil() {
// We get here if msg is a typed nil ((*SomeMessage)(nil)),
// so it satisfies the interface, and msg == nil wouldn't
// catch it. We don't want crash in this case.
return b, ErrNil
}
// 根据MarshalInfo对数据进行marshal
return u.marshal(b, ptr, deterministic)
}

由于每种类型的序列化信息是一致的,所以getMessageMarshalInfo对序列化信息进行了缓存,缓存在a.marshal中,如果a中不存在marshal信息,则去生成,但不进行初始化,然后保存到a中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func getMessageMarshalInfo(msg interface{}, a *InternalMessageInfo) *marshalInfo {
// u := a.marshal, but atomically.
// We use an atomic here to ensure memory consistency.
// 从InternalMessageInfo中读取
u := atomicLoadMarshalInfo(&a.marshal)
// 读取不到代表未保存过
if u == nil {
// Get marshal information from type of message.
t := reflect.ValueOf(msg).Type()
if t.Kind() != reflect.Ptr {
panic(fmt.Sprintf("cannot handle non-pointer message type %v", t))
}
u = getMarshalInfo(t.Elem())
// Store it in the cache for later users.
// a.marshal = u, but atomically.
atomicStoreMarshalInfo(&a.marshal, u)
}
return u
}

getMarshalInfo只是创建了一个marshalInfo对象,填充了字段typ,剩余的字段未填充。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// getMarshalInfo returns the information to marshal a given type of message.
// The info it returns may not necessarily initialized.
// t is the type of the message (NOT the pointer to it).
// 获取MarshalInfo结构体,如果不存在则使用message类型t创建1个
func getMarshalInfo(t reflect.Type) *marshalInfo {
marshalInfoLock.Lock()
u, ok := marshalInfoMap[t]
if !ok {
u = &marshalInfo{typ: t}
marshalInfoMap[t] = u
}
marshalInfoLock.Unlock()
return u
}

// marshalInfo is the information used for marshaling a message.
type marshalInfo struct {
typ reflect.Type
fields []*marshalFieldInfo
unrecognized field // offset of XXX_unrecognized
extensions field // offset of XXX_InternalExtensions
v1extensions field // offset of XXX_extensions
sizecache field // offset of XXX_sizecache
initialized int32 // 0 -- only typ is set, 1 -- fully initialized
messageset bool // uses message set wire format
hasmarshaler bool // has custom marshaler
sync.RWMutex // protect extElems map, also for initialization
extElems map[int32]*marshalElemInfo // info of extension elements
}

marshalInfo.marshal是Marshal真实主体,会判断u是否已经初始化,如果未初始化调用computeMarshalInfo计算Marshal需要的信息,实际就是填充marshalInfo中的各种字段。

u.hasmarshaler代表当前类型是否实现了Marshaler接口,直接调用Marshal函数进行序列化。可以确定Marshal函数的序列化方式2,即实现Marshaler接口的方法,最后肯定也会调用marshalInfo.marshal

该函数的主体是一个for循环,依次遍历该类型的每一个字段,对required属性进行校验,然后按字段类型,调用f.marshaler对该字段类型进行序列化。这个f.marshaler哪来的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// marshal is the main function to marshal a message. It takes a byte slice and appends
// the encoded data to the end of the slice, returns the slice and error (if any).
// ptr is the pointer to the message.
// If deterministic is true, map is marshaled in deterministic order.
// 该函数是Marshal的主体函数,把消息编码为数据后,追加到b之后,最后返回b。
// deterministic为true代表map会以确定的顺序进行编码。
func (u *marshalInfo) marshal(b []byte, ptr pointer, deterministic bool) ([]byte, error) {
// 初始化marshalInfo的基础信息
// 主要是根据已有信息填充该结构体的一些字段
if atomic.LoadInt32(&u.initialized) == 0 {
u.computeMarshalInfo()
}

// If the message can marshal itself, let it do it, for compatibility.
// NOTE: This is not efficient.
// 如果该类型实现了Marshaler接口,即能够对自己Marshal,则自行Marshal
// 结果追加到b
if u.hasmarshaler {
m := ptr.asPointerTo(u.typ).Interface().(Marshaler)
b1, err := m.Marshal()
b = append(b, b1...)
return b, err
}

var err, errLater error
// The old marshaler encodes extensions at beginning.
// 检查扩展字段,把message的扩展字段追加到b
if u.extensions.IsValid() {
// offset函数用来根据指针偏移量获取message的指定字段
e := ptr.offset(u.extensions).toExtensions()
if u.messageset {
b, err = u.appendMessageSet(b, e, deterministic)
} else {
b, err = u.appendExtensions(b, e, deterministic)
}
if err != nil {
return b, err
}
}
if u.v1extensions.IsValid() {
m := *ptr.offset(u.v1extensions).toOldExtensions()
b, err = u.appendV1Extensions(b, m, deterministic)
if err != nil {
return b, err
}
}

// 遍历message的每一个字段,检查并做编码,然后追加到b
for _, f := range u.fields {
if f.required {
// 如果required的字段未设置,则记录错误,所有的marshal工作完成后再处理
if ptr.offset(f.field).getPointer().isNil() {
// Required field is not set.
// We record the error but keep going, to give a complete marshaling.
if errLater == nil {
errLater = &RequiredNotSetError{f.name}
}
continue
}
}
// 字段为指针类型,并且为nil,代表未设置,该字段无需编码
if f.isPointer && ptr.offset(f.field).getPointer().isNil() {
// nil pointer always marshals to nothing
continue
}
// 利用这个字段的marshaler进行编码
b, err = f.marshaler(b, ptr.offset(f.field), f.wiretag, deterministic)
if err != nil {
if err1, ok := err.(*RequiredNotSetError); ok {
// required字段但未设置错误
// Required field in submessage is not set.
// We record the error but keep going, to give a complete marshaling.
if errLater == nil {
errLater = &RequiredNotSetError{f.name + "." + err1.field}
}
continue
}
// “动态数组”中包含nil元素
if err == errRepeatedHasNil {
err = errors.New("proto: repeated field " + f.name + " has nil element")
}
if err == errInvalidUTF8 {
if errLater == nil {
fullName := revProtoTypes[reflect.PtrTo(u.typ)] + "." + f.name
errLater = &invalidUTF8Error{fullName}
}
continue
}
return b, err
}
}
// 为识别的类型字段,直接转为bytes,追加到b
// computeMarshalInfo中已经收集这些字段
if u.unrecognized.IsValid() {
s := *ptr.offset(u.unrecognized).toBytes()
b = append(b, s...)
}
return b, errLater
}

computeMarshalInfo实际上就是对要序列化的类型,进行一次全面检查,设置好序列化要使用的数据,这其中就包含了各字段的序列化函数f.marshaler。我们就重点关注下这部分,struct的每一个字段都会分配一个marshalFieldInfo,代表这个字段序列化需要的信息,会调用computeMarshalFieldInfo会填充这个对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// computeMarshalInfo initializes the marshal info.
func (u *marshalInfo) computeMarshalInfo() {
// 加锁,代表了不能同时计算marshal信息
u.Lock()
defer u.Unlock()
// 计算1次即可
if u.initialized != 0 { // non-atomic read is ok as it is protected by the lock
return
}

// 获取要marshal的message类型
t := u.typ
u.unrecognized = invalidField
u.extensions = invalidField
u.v1extensions = invalidField
u.sizecache = invalidField

// If the message can marshal itself, let it do it, for compatibility.
// 判断当前类型是否实现了Marshal接口,如果实现标记为类型自有marshaler
// 没用类型断言是因为t是Type类型,不是保存在某个接口的变量
// NOTE: This is not efficient.
if reflect.PtrTo(t).Implements(marshalerType) {
u.hasmarshaler = true
atomic.StoreInt32(&u.initialized, 1)
// 可以直接返回了,后面使用自有的marshaler编码
return
}

// get oneof implementers
// 看*t实现了以下哪个接口,oneof特性
var oneofImplementers []interface{}
switch m := reflect.Zero(reflect.PtrTo(t)).Interface().(type) {
case oneofFuncsIface:
_, _, _, oneofImplementers = m.XXX_OneofFuncs()
case oneofWrappersIface:
oneofImplementers = m.XXX_OneofWrappers()
}

n := t.NumField()

// deal with XXX fields first
// 遍历t的每一个XXX字段
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
// 跳过非XXX开头的字段
if !strings.HasPrefix(f.Name, "XXX_") {
continue
}
// 处理以下几个protobuf自带的字段
switch f.Name {
case "XXX_sizecache":
u.sizecache = toField(&f)
case "XXX_unrecognized":
u.unrecognized = toField(&f)
case "XXX_InternalExtensions":
u.extensions = toField(&f)
u.messageset = f.Tag.Get("protobuf_messageset") == "1"
case "XXX_extensions":
u.v1extensions = toField(&f)
case "XXX_NoUnkeyedLiteral":
// nothing to do
default:
panic("unknown XXX field: " + f.Name)
}
n--
}

// normal fields
// 处理message的普通字段
fields := make([]marshalFieldInfo, n) // batch allocation
u.fields = make([]*marshalFieldInfo, 0, n)
for i, j := 0, 0; i < t.NumField(); i++ {
f := t.Field(i)

// 跳过XXX字段
if strings.HasPrefix(f.Name, "XXX_") {
continue
}

// 取fields的下一个有效字段,指针类型
// j代表了fields有效字段数量,n是包含了XXX字段的总字段数量
field := &fields[j]
j++
field.name = f.Name
// 填充到u.fields
u.fields = append(u.fields, field)
// 字段的tag里包含“protobuf_oneof”特殊处理
if f.Tag.Get("protobuf_oneof") != "" {
field.computeOneofFieldInfo(&f, oneofImplementers)
continue
}
// 字段里不包含“protobuf”,代表不是protoc自动生成的字段
if f.Tag.Get("protobuf") == "" {
// field has no tag (not in generated message), ignore it
// 删除刚刚保存的字段信息
u.fields = u.fields[:len(u.fields)-1]
j--
continue
}
// 填充字段的marshal信息
field.computeMarshalFieldInfo(&f)
}

// fields are marshaled in tag order on the wire.
// 字段排序
sort.Sort(byTag(u.fields))

// 初始化完成
atomic.StoreInt32(&u.initialized, 1)
}

回顾一下Request的定义,它包含1个字段Data,后面protobuf:...描述了protobuf要使用的信息,"bytes,..."这段被称为tags,用逗号进行分割后,其中:

  • tags[0]: bytes,代表Data类型的数据要被转换为bytes
  • tags[1]: 1,代表了字段的ID
  • tags[2]: opt,代表可行,非必须
  • tags[3]: name=data,proto文件中的名称
  • tags[4]: proto3,代表使用的protobuf版本
1
2
3
4
5
// request.pb.go
type Request struct{
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
...
}

computeMarshalFieldInfo首先要获取字段ID和要转换的类型,填充到marshalFieldInfo,然后调用setMarshaler利用字段f和tags获取该字段类型的序列化函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// computeMarshalFieldInfo fills up the information to marshal a field.
func (fi *marshalFieldInfo) computeMarshalFieldInfo(f *reflect.StructField) {
// parse protobuf tag of the field.
// tag has format of "bytes,49,opt,name=foo,def=hello!"
// 获取"protobuf"的完整tag,然后使用,分割,得到上面的格式
tags := strings.Split(f.Tag.Get("protobuf"), ",")
if tags[0] == "" {
return
}
// tag的编号,即message中设置的string name = x,则x就是这个字段的tag id
tag, err := strconv.Atoi(tags[1])
if err != nil {
panic("tag is not an integer")
}
// 要转换成的类型,bytes,varint等等
wt := wiretype(tags[0])
// 设置字段是required还是opt
if tags[2] == "req" {
fi.required = true
}
// 设置field和tag信息到marshalFieldInfo
fi.setTag(f, tag, wt)
// 根据当前的tag信息(类型等),选择marshaler函数
fi.setMarshaler(f, tags)
}

setMarshaler的重点是typeMarshalertypeMarshaler这个函数非常长,其实就是根据类型设置返回对于的序列化函数,比如Bool、Int32、Uint32…,如果是结构体、切片等复合类型,就可以形成递归了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// setMarshaler fills up the sizer and marshaler in the info of a field.
func (fi *marshalFieldInfo) setMarshaler(f *reflect.StructField, tags []string) {
// map类型字段特殊处理
switch f.Type.Kind() {
case reflect.Map:
// map field
fi.isPointer = true
fi.sizer, fi.marshaler = makeMapMarshaler(f)
return
case reflect.Ptr, reflect.Slice:
// 指针字段和切片字段标记指针类型
fi.isPointer = true
}

// 根据字段类型和tag选择marshaler
fi.sizer, fi.marshaler = typeMarshaler(f.Type, tags, true, false)
}

// typeMarshaler returns the sizer and marshaler of a given field.
// t is the type of the field.
// tags is the generated "protobuf" tag of the field.
// If nozero is true, zero value is not marshaled to the wire.
// If oneof is true, it is a oneof field.
// 函数非常长,省略内容
func typeMarshaler(t reflect.Type, tags []string, nozero, oneof bool) (sizer, marshaler) {
...
switch t.Kind() {
case reflect.Bool:
if pointer {
return sizeBoolPtr, appendBoolPtr
}
if slice {
if packed {
return sizeBoolPackedSlice, appendBoolPackedSlice
}
return sizeBoolSlice, appendBoolSlice
}
if nozero {
return sizeBoolValueNoZero, appendBoolValueNoZero
}
return sizeBoolValue, appendBoolValue
case reflect.Uint32:
...
case reflect.Int32:
....
case reflect.Struct:
...
}

以下是Bool和String类型的2个序列化函数示例:

1
2
3
4
5
6
7
8
9
10
func appendBoolValue(b []byte, ptr pointer, wiretag uint64, _ bool) ([]byte, error) {
v := *ptr.toBool()
b = appendVarint(b, wiretag)
if v {
b = append(b, 1)
} else {
b = append(b, 0)
}
return b, nil
}
1
2
3
4
5
6
7
func appendStringValue(b []byte, ptr pointer, wiretag uint64, _ bool) ([]byte, error) {
v := *ptr.toString()
b = appendVarint(b, wiretag)
b = appendVarint(b, uint64(len(v)))
b = append(b, v...)
return b, nil
}

所以序列化后的[]byte,应当是符合这种模式:

1
| wiretag | data | wiretag | data | ... | data |

OK,以上就是编码的主要流程,简单回顾一下:

  1. proto.Marshal会调用*.pb.go中自动生成的Wrapper函数,Wrapper函数会调用InternalMessageInfo进行序列化,然后才步入序列化的正题
  2. 首先获取要序列化类型的marshal信息u,如果u没有初始化,则进行初始化,即设置好结构体每个字段的序列化函数,以及其他信息
  3. 遍历结构体的每个字段,使用u中的信息为每个字段进行编码,并把加过追加到[]byte,所以字段编码完成,则返回序列化的结果[]byte或者错误。

解码

解码的流程其实与编码很类似,会是上面回顾的3大步骤,主要的区别在步骤2:它要获取的是序列化类型的unmarshal信息u,如果u没有初始化,会进行初始化,设置的是结构体每个字段的反序列化函数,以及其他信息。

所以解码的函数解析会简要的过一遍,不再有编码那么详细的解释。

下面是proto包中反序列化的接口和函数定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Unmarshaler is the interface representing objects that can
// unmarshal themselves. The argument points to data that may be
// overwritten, so implementations should not keep references to the
// buffer.
// Unmarshal implementations should not clear the receiver.
// Any unmarshaled data should be merged into the receiver.
// Callers of Unmarshal that do not want to retain existing data
// should Reset the receiver before calling Unmarshal.
type Unmarshaler interface {
Unmarshal([]byte) error
}

// newUnmarshaler is the interface representing objects that can
// unmarshal themselves. The semantics are identical to Unmarshaler.
//
// This exists to support protoc-gen-go generated messages.
// The proto package will stop type-asserting to this interface in the future.
//
// DO NOT DEPEND ON THIS.
type newUnmarshaler interface {
// 实现了XXX_Unmarshal
XXX_Unmarshal([]byte) error
}

// Unmarshal parses the protocol buffer representation in buf and places the
// decoded result in pb. If the struct underlying pb does not match
// the data in buf, the results can be unpredictable.
//
// Unmarshal resets pb before starting to unmarshal, so any
// existing data in pb is always removed. Use UnmarshalMerge
// to preserve and append to existing data.
func Unmarshal(buf []byte, pb Message) error {
pb.Reset()
// pb自己有unmarshal函数,实现了newUnmarshaler接口
if u, ok := pb.(newUnmarshaler); ok {
return u.XXX_Unmarshal(buf)
}
// pb自己有unmarshal函数,实现了Unmarshaler接口
if u, ok := pb.(Unmarshaler); ok {
return u.Unmarshal(buf)
}
// 使用默认的Unmarshal
return NewBuffer(buf).Unmarshal(pb)
}

Request实现了Unmarshaler接口:

1
2
3
4
// request.pb.go
func (m *Request) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Request.Unmarshal(m, b)
}

反序列化也是使用InternalMessageInfo进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Unmarshal is the entry point from the generated .pb.go files.
// This function is not intended to be used by non-generated code.
// This function is not subject to any compatibility guarantee.
// msg contains a pointer to a protocol buffer struct.
// b is the data to be unmarshaled into the protocol buffer.
// a is a pointer to a place to store cached unmarshal information.
func (a *InternalMessageInfo) Unmarshal(msg Message, b []byte) error {
// Load the unmarshal information for this message type.
// The atomic load ensures memory consistency.
// 获取保存在a中的unmarshal信息
u := atomicLoadUnmarshalInfo(&a.unmarshal)
if u == nil {
// Slow path: find unmarshal info for msg, update a with it.
u = getUnmarshalInfo(reflect.TypeOf(msg).Elem())
atomicStoreUnmarshalInfo(&a.unmarshal, u)
}
// Then do the unmarshaling.
// 执行unmarshal
err := u.unmarshal(toPointer(&msg), b)
return err
}

以下是反序列化的主题函数,u未初始化时会调用computeUnmarshalInfo设置反序列化需要的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// unmarshal does the main work of unmarshaling a message.
// u provides type information used to unmarshal the message.
// m is a pointer to a protocol buffer message.
// b is a byte stream to unmarshal into m.
// This is top routine used when recursively unmarshaling submessages.
func (u *unmarshalInfo) unmarshal(m pointer, b []byte) error {
if atomic.LoadInt32(&u.initialized) == 0 {
// 为u填充unmarshal信息,以及设置每个字段类型的unmarshaler函数
u.computeUnmarshalInfo()
}
if u.isMessageSet {
return unmarshalMessageSet(b, m.offset(u.extensions).toExtensions())
}
var reqMask uint64 // bitmask of required fields we've seen.
var errLater error
for len(b) > 0 {
// Read tag and wire type.
// Special case 1 and 2 byte varints.
var x uint64
if b[0] < 128 {
x = uint64(b[0])
b = b[1:]
} else if len(b) >= 2 && b[1] < 128 {
x = uint64(b[0]&0x7f) + uint64(b[1])<<7
b = b[2:]
} else {
var n int
x, n = decodeVarint(b)
if n == 0 {
return io.ErrUnexpectedEOF
}
b = b[n:]
}
// 获取tag和wire标记
tag := x >> 3
wire := int(x) & 7

// Dispatch on the tag to one of the unmarshal* functions below.
// 根据tag选择该类型的unmarshalFieldInfo:f
var f unmarshalFieldInfo
if tag < uint64(len(u.dense)) {
f = u.dense[tag]
} else {
f = u.sparse[tag]
}
// 如果该类型有unmarshaler函数,则执行解码和错误处理
if fn := f.unmarshal; fn != nil {
var err error
// 从b解析,然后填充到f的对应字段
b, err = fn(b, m.offset(f.field), wire)
if err == nil {
reqMask |= f.reqMask
continue
}
if r, ok := err.(*RequiredNotSetError); ok {
// Remember this error, but keep parsing. We need to produce
// a full parse even if a required field is missing.
if errLater == nil {
errLater = r
}
reqMask |= f.reqMask
continue
}
if err != errInternalBadWireType {
if err == errInvalidUTF8 {
if errLater == nil {
fullName := revProtoTypes[reflect.PtrTo(u.typ)] + "." + f.name
errLater = &invalidUTF8Error{fullName}
}
continue
}
return err
}
// Fragments with bad wire type are treated as unknown fields.
}

// Unknown tag.
// 跳过未知tag,可能是proto中的message定义升级了,增加了一些字段,使用老版本的,就不识别新的字段
if !u.unrecognized.IsValid() {
// Don't keep unrecognized data; just skip it.
var err error
b, err = skipField(b, wire)
if err != nil {
return err
}
continue
}
// 检查未识别字段是不是extension
// Keep unrecognized data around.
// maybe in extensions, maybe in the unrecognized field.
z := m.offset(u.unrecognized).toBytes()
var emap map[int32]Extension
var e Extension
for _, r := range u.extensionRanges {
if uint64(r.Start) <= tag && tag <= uint64(r.End) {
if u.extensions.IsValid() {
mp := m.offset(u.extensions).toExtensions()
emap = mp.extensionsWrite()
e = emap[int32(tag)]
z = &e.enc
break
}
if u.oldExtensions.IsValid() {
p := m.offset(u.oldExtensions).toOldExtensions()
emap = *p
if emap == nil {
emap = map[int32]Extension{}
*p = emap
}
e = emap[int32(tag)]
z = &e.enc
break
}
panic("no extensions field available")
}
}

// Use wire type to skip data.
var err error
b0 := b
b, err = skipField(b, wire)
if err != nil {
return err
}
*z = encodeVarint(*z, tag<<3|uint64(wire))
*z = append(*z, b0[:len(b0)-len(b)]...)

if emap != nil {
emap[int32(tag)] = e
}
}
// 校验解析到的required字段的数量,如果与u中记录的不匹配,则报错
if reqMask != u.reqMask && errLater == nil {
// A required field of this message is missing.
for _, n := range u.reqFields {
if reqMask&1 == 0 {
errLater = &RequiredNotSetError{n}
}
reqMask >>= 1
}
}
return errLater
}

设置字段反序列化函数的过程不看了,看一下怎么选函数的,typeUnmarshaler是为字段类型,选择反序列化函数,这些函数选择与序列化函数是一一对应的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// typeUnmarshaler returns an unmarshaler for the given field type / field tag pair.
func typeUnmarshaler(t reflect.Type, tags string) unmarshaler {
...
// Figure out packaging (pointer, slice, or both)
slice := false
pointer := false
if t.Kind() == reflect.Slice && t.Elem().Kind() != reflect.Uint8 {
slice = true
t = t.Elem()
}
if t.Kind() == reflect.Ptr {
pointer = true
t = t.Elem()
}
...
switch t.Kind() {
case reflect.Bool:
if pointer {
return unmarshalBoolPtr
}
if slice {
return unmarshalBoolSlice
}
return unmarshalBoolValue
}
}

unmarshalBoolValue是默认的Bool类型反序列化函数,会把protobuf数据b解码,然后转换为bool类型v,最后赋值给字段f。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func unmarshalBoolValue(b []byte, f pointer, w int) ([]byte, error) {
if w != WireVarint {
return b, errInternalBadWireType
}
// Note: any length varint is allowed, even though any sane
// encoder will use one byte.
// See https://github.com/golang/protobuf/issues/76
x, n := decodeVarint(b)
if n == 0 {
return nil, io.ErrUnexpectedEOF
}
// TODO: check if x>1? Tests seem to indicate no.
// toBool是返回bool类型的指针
// 完成对字段f的赋值
v := x != 0
*f.toBool() = v
return b[n:], nil
}

总结

本文分析了Go语言protobuf数据的序列化和反序列过程,可以简要概括为:

  1. proto.Marshalproto.Unmarshal会调用*.pb.go中自动生成的Wrapper函数,Wrapper函数会调用InternalMessageInfo进行(反)序列化,然后才步入(反)序列化的正题
  2. 首先获取要目标类型的(un)marshal信息u,如果u没有初始化,则进行初始化,即设置好结构体每个字段的(反)序列化函数,以及其他信息
  3. 遍历结构体的每个字段,使用u中的信息为每个字段进行编码,生成序列化的结果,或进行解码,给结构体成员进行赋值

参考文章

以下参考文章都值得阅读:

这是一篇姊妹篇文章,浅析一下Go是如何实现protobuf编解码的:

  1. Go是如何实现protobuf的编解码的(1): 原理
  2. Go是如何实现protobuf的编解码的(2): 源码

本编是第一篇。

Protocol Buffers介绍

Protocol buffers缩写为protobuf,是由Google创造的一种用于序列化的标记语言,项目Github仓库:https://github.com/protocolbuffers/protobuf。

Protobuf主要用于不同的编程语言的协作RPC场景下,定义需要序列化的数据格式。Protobuf本质上仅仅是一种用于交互的结构式定义,从功能上和XML、JSON等各种其他的交互形式都并无本质不同,只负责定义不负责数据编解码

其官方介绍如下:

Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

Protocol buffers的多语言支持

protobuf是支持多种编程语言的,即多种编程语言的类型数据可以转换成protobuf定义的类型数据,各种语言的类型对应可以看此介绍

我们介绍一下protobuf对多语言的支持原理。protobuf有个程序叫protoc,它是一个编译程序,负责把proto文件编译成对应语言的文件,它已经支持了C++、C#、Java、Python,而对于Go和Dart需要安装插件才能配合生成对于语言的文件。

对于C++,protoc可以把a.proto,编译成a.pb.ha.pb.cc

对于Go,protoc需要使用插件protoc-gen-go,把a.proto,编译成a.pb.go,其中包含了定义的数据类型,它的序列化和反序列化函数等。

敲黑板,对Go语言,protoc只负责利用protoc-gen-go把proto文件编译成Go语言文件,并不负责序列化和反序列化,生成的Go语言文件中的序列化和反序列化操作都是只是wrapper。

那Go语言对protobuf的序列化和反序列化,是由谁完成的?

github.com/golang/protobuf/proto完成,它负责把结构体等序列化成proto数据([]byte),把proto数据反序列化成Go结构体。

OK,原理部分就铺垫这些,看一个简单样例,了解protoc和protoc-gen-go的使用,以及进行序列化和反序列化操作。

一个Hello World样例

根据上面的介绍,Go语言使用protobuf我们要先安装2个工具:protoc和protoc-gen-go。

安装protoc和protoc-gen-go

首先去下载页下载符合你系统的protoc,本文示例版本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
➜  protoc-3.9.0-osx-x86_64 tree .
.
├── bin
│   └── protoc
├── include
│   └── google
│   └── protobuf
│   ├── any.proto
│   ├── api.proto
│   ├── compiler
│   │   └── plugin.proto
│   ├── descriptor.proto
│   ├── duration.proto
│   ├── empty.proto
│   ├── field_mask.proto
│   ├── source_context.proto
│   ├── struct.proto
│   ├── timestamp.proto
│   ├── type.proto
│   └── wrappers.proto
└── readme.txt

5 directories, 14 files

protoc的安装步骤在readme.txt中:

To install, simply place this binary somewhere in your PATH.

protoc-3.9.0-osx-x86_64/bin加入到PATH。

If you intend to use the included well known types then don’t forget to
copy the contents of the ‘include’ directory somewhere as well, for example
into ‘/usr/local/include/‘.

如果使用已经定义好的类型,即上面include目录*.proto文件中的类型,把include目录下文件,拷贝到/usr/local/include/

安装protoc-gen-go:

1
go get –u github.com/golang/protobuf/protoc-gen-go

检查安装,应该能查到这2个程序的位置:

1
2
3
4
➜  fabric git:(release-1.4) which protoc
/usr/local/bin/protoc
➜ fabric git:(release-1.4) which protoc-gen-go
/Users/shitaibin/go/bin/protoc-gen-go

Hello world

创建了一个使用protoc的小玩具,项目地址Github: golang_step_by_step

它的目录结构如下:

1
2
3
4
5
6
➜  protobuf git:(master) tree helloworld1
helloworld1
├── main.go
├── request.proto
└── types
└── request.pb.go

定义proto文件

使用proto3,定义一个Request,request.proto内容如下:

1
2
3
4
5
6
7
8
// file: request.proto
syntax = "proto3";
package helloworld;
option go_package="./types";

message Request {
string data = 1;
}
  • syntax:protobuf版本,现在是proto3
  • package:不完全等价于Go的package,最好另行设定go_package,指定根据protoc文件生成的go语言文件的package名称。
  • message:会编译成Go的struct
    • string data = 1:代表request的成员data是string类型,该成员的id是1,protoc给每个成员都定义一个编号,编解码的时候使用编号代替使用成员名称,压缩数据量。

编译proto文件

1
$ protoc --go_out=. ./request.proto

--go_out指明了要把./request.proto编译成Go语言文件,生成的是./types/request.pb.go,注意观察一下为Request结构体生产的2个方法XXX_UnmarshalXXX_Marshal,文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// file: ./types/request.pb.go
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: request.proto

package types

import (
fmt "fmt"
math "math"

proto "github.com/golang/protobuf/proto"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

type Request struct {
Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
// 以下是protobuf自动填充的字段,protobuf需要使用
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) {
return fileDescriptor_7f73548e33e655fe, []int{0}
}

// 反序列化函数
func (m *Request) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Request.Unmarshal(m, b)
}
// 序列化函数
func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Request.Marshal(b, m, deterministic)
}
func (m *Request) XXX_Merge(src proto.Message) {
xxx_messageInfo_Request.Merge(m, src)
}
func (m *Request) XXX_Size() int {
return xxx_messageInfo_Request.Size(m)
}
func (m *Request) XXX_DiscardUnknown() {
xxx_messageInfo_Request.DiscardUnknown(m)
}

var xxx_messageInfo_Request proto.InternalMessageInfo

// 获取字段
func (m *Request) GetData() string {
if m != nil {
return m.Data
}
return ""
}

func init() {
proto.RegisterType((*Request)(nil), "helloworld.Request")
}

func init() { proto.RegisterFile("request.proto", fileDescriptor_7f73548e33e655fe) }

var fileDescriptor_7f73548e33e655fe = []byte{
// 91 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2d, 0x4a, 0x2d, 0x2c,
0x4d, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0xca, 0x48, 0xcd, 0xc9, 0xc9,
0x2f, 0xcf, 0x2f, 0xca, 0x49, 0x51, 0x92, 0xe5, 0x62, 0x0f, 0x82, 0x48, 0x0a, 0x09, 0x71, 0xb1,
0xa4, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x4e, 0x9c, 0x51,
0xec, 0x7a, 0xfa, 0x25, 0x95, 0x05, 0xa9, 0xc5, 0x49, 0x6c, 0x60, 0xcd, 0xc6, 0x80, 0x00, 0x00,
0x00, 0xff, 0xff, 0x2e, 0x52, 0x69, 0xb5, 0x4d, 0x00, 0x00, 0x00,
}

编写Go语言程序

下面这段测试程序就是创建了一个请求,序列化又反序列化的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// file: main.go
package main

import (
"fmt"

"./types"
"github.com/golang/protobuf/proto"
)

func main() {
req := &types.Request{Data: "Hello LIB"}

// Marshal
encoded, err := proto.Marshal(req)
if err != nil {
fmt.Printf("Encode to protobuf data error: %v", err)
}

// Unmarshal
var unmarshaledReq types.Request
err = proto.Unmarshal(encoded, &unmarshaledReq)
if err != nil {
fmt.Printf("Unmarshal to struct error: %v", err)
}

fmt.Printf("req: %v\n", req.String())
fmt.Printf("unmarshaledReq: %v\n", unmarshaledReq.String())
}

运行结果:

1
2
3
➜  helloworld1 git:(master) go run main.go
req: data:"Hello LIB"
unmarshaledReq: data:"Hello LIB"

以上都是铺垫,下一节的proto包怎么实现编解码才是重点,protobuf用法可以去翻:

  1. 官方介绍:protoc3介绍编码介绍Go教程
  2. 煎鱼grpc系列文章

参考文章

前言

之前看到一幅描述etcd raft的流程图,感觉非常直观,但和自己看源码的又有些不同,所以自己模仿着画了一下,再介绍一下。

下图从左到右依次分为4个部分:

  1. raft:raft主体功能部分
  2. Node:raft提供的接口,raft跟上层的通信接口,会运行一个run函数,持续循环处理通道上的数据
  3. raftNode:上层应用逻辑
  4. 其他:Client、Network、State

etcd raft workflow

图中的箭头为数据的流向,这幅图包含了多个流程,接下来会分成4个流程介绍:

  1. 客户端请求
  2. 发送消息给其他节点
  3. 接收其他节点消息及处理
  4. 应用达成一致的日志

客户端请求

客户端请求的流程,在下图已经使用红色箭头标出,流程如下:

  1. 客户端将请求发送给应用层raftNode
  2. raftNode使用Propose方法,请求写入到propc通道
  3. raft.Step接收到通道数据,会通过append等函数加入到raftLog
  4. raftLog用来暂时存储和查询日志,请求会先加入到unstable

etcd raft request flow

发送消息

发送消息的数据流,已经用红色箭头标出,流程如下:

  1. raft发现有数据发送给其他节点,数据可以是leader要发送给follower的日志、snapshot,或者其他类型的消息,比如follower给leader的响应消息
  2. 利用NewReady创建结构体Ready,并写入到readyc通道
  3. raftNode从通道读到Ready,取出其中的消息,交给Network发送给其他节点

etcd raft send message flow

接收消息

接收消息的数据流,已经在下图用红色箭头标出,流程如下:

  1. 从Network收到消息,可以是leader给follower的消息,也可以是follower发给leader的响应消息,Network的handler函数将数据回传给raftNode
  2. raftNode调用Step函数,将数据发给raft,数据被写入recvc通道
  3. raft的Step从recvc收到消息,并修改raftLog中的日志

etcd raft receive msg flow

应用日志

raft会将达成一致的log通知给raftNode,让它应用到上层的数据库,数据流已经在下图用红色箭头标出,流程如下:

  1. raft发现有日志需要交给raftNode,调用NewReady创建Ready,从raftLog读取日志,并存到Ready结构体
  2. Ready结构体写入到readyc通道
  3. raftNode读到Ready结构体,发现Ready结构体中包含日志
  4. raftNode会把日志写入到storage和WAL,把需要应用的日志,提交给状态机或数据库,去修改数据
  5. raftNode处理完Ready后,调用Advance函数,通过advancec发送一个信号给raft,告知raft传出来的Ready已经处理完毕

可以发现有2个storage,1个是raftLog.Storage,一个是raftNode.storage,Storage是一个接口,可以用来读取storage中的数据,但不写入,storage的数据写入是由raftNode完成的,但raftNode.storage就是raft.MemoryStorage,所以不稳定的、稳定的都由raft存储,持久化存储由WAL负责,etcd中有现成实现的WAL操作可用,用来存储历史Entry、快照。

Storage接口更多信息请看Storage接口介绍

etcd raft apply logs flow

序言

Etcd提供了一个样例contrib/raftexample,用来展示如何使用etcd raft。这篇文章通过raftexample介绍如何使用etcd raft。

raft服务

raftexample是一个分布式KV数据库,客户端可以向集群的节点发送写数据和读数据,以及修改集群配置的请求,它使用etcd raft保持各集群之间数据的一致性。

cluster

etcd raft

etcd raft实现了raft论文的核心,所有的IO(磁盘存储、网络通信)它都没有实现,它做了解耦。

它是一个状态机,有数据作为输入,经过当前状态和输入,得到确定性的输出,即每个节点上都是一样的。

etcd-raft

raft应用架构

raft集群会由多个节点组成,客户端的请求发送给raft leader,再由raft leader通过网络通信在集群之中对请求达成共识。

集群中的每个节点从架构上都可以分为两层:

  • 应用层,负责处理用户请求,数据存储以及集群节点间的网络通信,
  • 共识层,负责相同和输入数据和状态,生成确定性的、一致的输出,

共识层由etcd raft负责,应用层要负责业务逻辑,数据存储和网络通信不需要应用层实现,而是由不同的模块负责,应用层负责起衔接存储存储和网络通信即可。

app-arch

应用层有3个重要组成部分:http API、kv store和raftNode。

http API

每个节点都会启动一个http API用来接受客户端请求,它只是接收请求,不对请求做处理。它会把客户端的写入请求PUT和查询请求GET都交给kv store。

对于修改raft集群配置请求,它会生成ConfChange交给raftNode。

kv store

一个kv数据库服务,它保存有一个kv db,用来存储用户数据

  • 对于查询请求,它直接从db中读取数据。
  • 对于写入请求,需要修改用户数据,这就需要集群节点使用raft对请求达成共识,它把请求传递给raftNode。

raftNode

raftNode用来跟etcd raft交互,他需要:

  • 把客户端的写请求,修改raft配置的请求交给etcd raft
  • 衔接网络通信跟etcd raft之间的桥梁,把etcd raft的消息发送出去,或接受到的raft消息交给raft
  • 保存raft的WAL和snapshot。

对于写请求,它会把请求数据编码后发送给etcd raft,etcd raft会把写请求封装成raft的Propose消息MsgProp,编码后的数据成为log Entry。因为raft并不关心具体的请求内容,它只需要保证每个集群节点在相同的log index拥有相同的log Entry,即请求即可。

raftNode还会启动1个http server,用来集群节点之间的通信,传递raft消息,让集群节点达成共识。它与http api是不同的,http api用来接收用户请求。

raftNode与raft交互

raft模块内部定义了一个Node接口,它代表了raft集群中的一个raft节点,它是应用层跟共识层交互的接口。

其中有几个与数据传递相关函数的是:

  • Propose:应用层通过此函数把客户端写请求传递raft。
  • ProposeConfChange:应用层通过此函数把客户端修改raft集群配置的请求传递raft。
  • Step:应用层把收到的raft集群之间通信的消息传递给raft。
  • Ready:raft对外的出口只有1个,就是Ready函数,Ready函数返回一个通道,应用层可以从这个通道中读到raft要输出的所有数据,这个数据被称为Ready结构体,包括log entry,集群间的通信消息等。
  • Advance:应用层处理完Ready结构体后,调用Advance通知raft,它已处理完刚读到的Ready结构体,raft可以根据最新状态生成下一个Ready结构体。

还有一个ApplyConfChange函数,当Ready结构体中包含修改raft集群配置的log entry时,应用层会调用此函数,把配置应用到raft。

raft架构

瞄完raft应用架构,可以从宏观角度看一下raft是如何跟应用层对接的。

raft包内部有2个很重要的结构体:node和raft。

node结构体

node结构体(后续称为raft.node)实现了Node接口,负责跟应用层对接,raft.node有个goroutine持续运行,应用层raftNode也有goroutine持续运行,raftNode调用raft.node的函数,每个函数都有对应的一个channel,用来把raftNode要传递给raft的数据,发送给raft.node。比如Propose函数的通道是proc,Step函数的通道是recvc。

raft结构体

raft结构体(后续称为raft.raft)是raft算法的主要实现。

raft.node把输入推给raft.raft,raft.raft根据输入和当前的状态数据生成输出,输出临时保存在raft内,raft.node会检查raft.raft是否有输出,如果有输出数据,就把输出生成Ready结构体,并传递给应用层。

raft.raft应用层有一个storage,存放的是当前的状态数据,包含了保存在内存中的log entry,但这个storage并不是raft.raft的,是应用层的,raft.raft只从中读取数据,log entry的写入由应用层负责。

raft-arch

几个存储相关的概念

WAL是Write Ahead Logs的缩写,存储的是log entry记录,即所有写请求的记录。

storage也是存的log entry,只不过是保存在内存中的。

kv db是保存了所有数据的最新值,而log entry是修改数据值的操作记录。

log entry在集群节点之间达成共识之后,log entry会写入WAL文件,也会写入storage,然后会被应用到kv store中,改变kv db中的数据。

Snapshot是kv db是某个log entry被应用后生成的快照,可以根据快照快速回复kv db,而无需从所有的历史log entry依次应用,恢复kv db。

一个写请求的处理过程

有了上面架构层面的了解,我们从宏观的角度看一下一个写请求被处理的过程。

  1. 客户端把写请求发给leader节点
  2. leader节点的http api接收请求,并把请求传递给kv store,kv sotre把写请求发送给raftNode,raftNode把写请求传递给raft.node
  3. leader节点的raft.node把写请求转化为log entry,并交给raft.raft,raft.raft生成发送给每一个follower的Append消息
  4. leader节点的raft.node取出raft.raft中的Append消息以及其他数据,封装成Ready传递给raft.Node
  5. leader节点的raft.Node把Ready中的entry保存到storage,然后把Ready中的消息,发送给相应的节点
  6. follower节点的raft.Node收到消息,把消息传递给raft.node,raft.node推给raft.raft
  7. follower的raft.raft处理Append消息,进行匹配和校验后,生成Append Response消息和保存log entry
  8. follower的raft.node从raft.raft获取数据,然后生成Ready传递给raft.Node
  9. follower节点的raft.Node把Ready中的entry保存到storage,然后把Ready中的消息,发送给相应的节点
  10. leader节点的raft.Node收到消息,把消息传递给raft.node,raft.node推给raft.raft
  11. leader节点的raft.raft处理Append Response消息,然后检查已经达成半数以上同意的log entry,更新已经被commit的log entry的index
  12. leader节点的raft.raft在创建Append等消息的时候,填写了已被commited的log index,所以下次在生成消息,并发送给follower后,follower就根据committed log index提交本地的log entry
  13. 无论是leader,还是follower在生成Ready的时候,会包含已经被committed的log entry,这些entry是等待应用到kv store的,raftNode拿到Ready后,会把这些entry取出来,传递给kv store,kv store会修改key-value的最新值。

总结

本文从宏观角度介绍了:

  • 使用etcd raft应用的架构
  • 使用etcd raft应用应当提供哪些功能供raft使用
  • 应用是如何和etcd raft交互的
  • etcd raft涉及到的存储概念
  • 一个写请求从客户端到在节点之间达成一致,应用到状态机的过程

前言

本文在 https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md 基础上修改、标注重点。如果遇到中文别扭的地方、不懂的地方,建议配合英文原文一起阅读。

Raft与PBFT、Paxos等其他一致性算法相比,确实简单不少,易于理解和实现,还有很强的可用性。

Raft的3大核心是强领导者、领导者选举和成员关系变更(增加和减少Raft节点),论文就是围绕着3个核心进行介绍,以及论证安全性(正确性、一致性)和可用性。但成员关系变更的细节,依然还有一些疑问,做了一些注释在文中,需要去看源码和讨论弄清楚。

除了3大核心,论文还提到了一些优化策略:

  1. 日志快照能够解决2个问题:
    1. 日志不断积累占用空间不断增长,带来了可用性问题
    2. 性能低下的机器跟不上领导者的节奏,领导者可以把快照发送给跟随者,慢的跟随者就能“大跨步”到比较新的状态
  2. 在领导人接到大量请求时,领导人批量发送日志,可以提高系统的吞吐率

除了论文提到的优化,工业实践中也有几大类优化策略:

  1. Batch and Pipeline
  2. Append Log Parallelly
  3. Asynchronous Apply
  4. Asynchronous Lease Read
  5. Lease Read

相关优化文章:

  1. TiKV 源码解析系列 - Raft 的优化
  2. TiKV 源码解析系列 - Lease Read
  3. 详解蚂蚁金服 SOFAJRaft:生产级高性能 Java 实现
  4. 蚂蚁金服开源 SOFAJRaft:生产级 Java Raft 算法库

寻找一种易于理解的一致性算法(扩展版)

摘要

Raft 是一种为了管理复制日志的一致性算法。它提供了和 Paxos 算法相同的功能和性能,但是它的算法结构和 Paxos 不同,使得 Raft 算法更加容易理解并且更容易构建实际的系统。为了提升可理解性,Raft 将一致性算法分解成了几个关键模块,例如领导人选举、日志复制和安全性。同时它通过实施一个更强的一致性来减少需要考虑的状态的数量。从一个用户研究的结果可以证明,对于学生而言,Raft 算法比 Paxos 算法更加容易学习。Raft 算法还包括一个新的机制来允许集群成员的动态改变,它利用重叠的大多数来保证安全性。

1 介绍

一致性算法允许一组机器像一个整体一样工作,即使其中一些机器出现故障也能够继续工作下去。正因为如此,一致性算法在构建可信赖的大规模软件系统中扮演着重要的角色。在过去的 10 年里,Paxos 算法统治着一致性算法这一领域:绝大多数的实现都是基于 Paxos 或者受其影响。同时 Paxos 也成为了教学领域里讲解一致性问题时的示例。

但是不幸的是,尽管有很多工作都在尝试降低它的复杂性,但是 Paxos 算法依然十分难以理解。并且,Paxos 自身的算法结构需要进行大幅的修改才能够应用到实际的系统中。这些都导致了工业界和学术界都对 Paxos 算法感到十分头疼。

和 Paxos 算法进行过努力之后,我们开始寻找一种新的一致性算法,可以为构建实际的系统和教学提供更好的基础。我们的做法是不寻常的,我们的首要目标是可理解性:我们是否可以在实际系统中定义一个一致性算法,并且能够比 Paxos 算法以一种更加容易的方式来学习。此外,我们希望该算法方便系统构建者的直觉的发展。不仅一个算法能够工作很重要,而且能够显而易见的知道为什么能工作也很重要。

Raft 一致性算法就是这些工作的结果。在设计 Raft 算法的时候,我们使用一些特别的技巧来提升它的可理解性,包括算法分解(Raft 主要被分成了领导人选举,日志复制和安全三个模块)和减少状态机的状态(相对于 Paxos,Raft 减少了非确定性和服务器互相处于非一致性的方式)。一份针对两所大学 43 个学生的研究表明 Raft 明显比 Paxos 算法更加容易理解。在这些学生同时学习了这两种算法之后,和 Paxos 比起来,其中 33 个学生能够回答有关于 Raft 的问题。

Raft 算法在许多方面和现有的一致性算法都很相似(主要是 Oki 和 Liskov 的 Viewstamped Replication),但是它也有一些独特的特性:

  • 强领导者:和其他一致性算法相比,Raft 使用一种更强的领导能力形式。比如,日志条目只从领导者发送给其他的服务器。这种方式简化了对复制日志的管理并且使得 Raft 算法更加易于理解。
  • 领导选举:Raft 算法使用一个随机计时器来选举领导者。这种方式只是在任何一致性算法都必须实现的心跳机制上增加了一点机制。在解决冲突的时候会更加简单快捷。
  • 成员关系调整:Raft 使用一种联结一致(joint consensus)的方法来处理集群成员变换的问题,在这种方法下,处于调整过程中的两种不同的配置集群中大多数机器会有重叠,这就使得集群在成员变换的时候依然可以继续工作。

我们相信,Raft 算法不论出于教学目的还是作为实践项目的基础都是要比 Paxos 或者其他一致性算法要优异的。它比其他算法更加简单,更加容易理解;它的算法描述足以实现一个现实的系统;它有好多开源的实现并且在很多公司里使用;它的安全性已经被证明;它的效率和其他算法比起来也不相上下。

接下来,这篇论文会介绍以下内容:复制状态机问题(第 2 节),讨论 Paxos 的优点和缺点(第 3 节),讨论我们为了可理解性而采取的方法(第 4 节),阐述 Raft 一致性算法(第 5-8 节),评价 Raft 算法(第 9 节),以及一些相关的工作(第 10 节)。

2 复制状态机

一致性算法是从复制状态机的背景下提出的(参考英文原文引用37)。在这种方法中,一组服务器上的状态机产生相同状态的副本,并且在一些机器宕掉的情况下也可以继续运行。复制状态机在分布式系统中被用于解决很多容错的问题。例如,大规模的系统中通常都有一个集群领导者,像 GFS、HDFS 和 RAMCloud,典型应用就是一个独立的的复制状态机去管理领导选举和存储配置信息并且在领导人宕机的情况下也要存活下来。比如 Chubby 和 ZooKeeper。

图 1

图 1 :复制状态机的结构。一致性算法管理着来自客户端指令的复制日志。状态机从日志中处理相同顺序的相同指令,所以产生的结果也是相同的。

复制状态机通常都是基于复制日志实现的,如图 1。每一个服务器存储一个包含一系列指令的日志,并且按照日志的顺序进行执行。每一个日志都按照相同的顺序包含相同的指令,所以每一个服务器都执行相同的指令序列。因为每个状态机都是确定的,每一次执行操作都产生相同的状态和同样的序列。

保证复制日志相同就是一致性算法的工作了。在一台服务器上,一致性模块接收客户端发送来的指令然后增加到自己的日志中去。它和其他服务器上的一致性模块进行通信来保证每一个服务器上的日志最终都以相同的顺序包含相同的请求,尽管有些服务器会宕机。一旦指令被正确的复制,每一个服务器的状态机按照日志顺序处理他们,然后输出结果被返回给客户端。因此,服务器集群看起来形成一个高可靠的状态机。

实际系统中使用的一致性算法通常含有以下特性:

  • 安全性保证(绝对不会返回一个错误的结果):在非拜占庭错误情况下,包括网络延迟、分区、丢包、冗余和乱序等错误都可以保证正确。
  • 可用性:集群中只要有大多数的机器可运行并且能够相互通信、和客户端通信,就可以保证可用。因此,一个典型的包含 5 个节点的集群可以容忍两个节点的失败。服务器被停止就认为是失败。他们当有稳定的存储的时候可以从状态中恢复回来并重新加入集群。
  • 不依赖时序来保证一致性:物理时钟错误或者极端的消息延迟只有在最坏情况下才会导致可用性问题
  • 通常情况下,一条指令可以尽可能快的在集群中大多数节点响应一轮远程过程调用时完成。小部分比较慢的节点不会影响系统整体的性能

3 Paxos 算法的问题

在过去的 10 年里,Leslie Lamport 的 Paxos 算法几乎已经成为一致性的代名词:Paxos 是在课程教学中最经常使用的算法,同时也是大多数一致性算法实现的起点。Paxos 首先定义了一个能够达成单一决策一致的协议,比如单条的复制日志项。我们把这一子集叫做单决策 Paxos。然后通过组合多个 Paxos 协议的实例来促进一系列决策的达成。Paxos 保证安全性和活性,同时也支持集群成员关系的变更。Paxos 的正确性已经被证明,在通常情况下也很高效。

不幸的是,Paxos 有两个明显的缺点。第一个缺点是 Paxos 算法特别的难以理解。完整的解释是出了名的不透明;通过极大的努力之后,也只有少数人成功理解了这个算法。因此,有了几次用更简单的术语来解释 Paxos 的尝试。尽管这些解释都只关注了单决策的子集问题,但依然很具有挑战性。在 2012 年 NSDI 的会议中的一次调查显示,很少有人对 Paxos 算法感到满意,甚至在经验老道的研究者中也是如此。我们自己也尝试去理解 Paxos;我们一直没能理解 Paxos 直到我们读了很多对 Paxos 的简化解释并且设计了我们自己的算法之后,这一过程花了近一年时间。

我们假设 Paxos 的不透明性来自它选择单决策问题作为它的基础。单决策 Paxos 是晦涩微妙的,它被划分成了两种没有简单直观解释和无法独立理解的情景。因此,这导致了很难建立起直观的感受为什么单决策 Paxos 算法能够工作。构成多决策 Paxos 增加了很多错综复杂的规则。我们相信,在多决策上达成一致性的问题(一份日志而不是单一的日志记录)能够被分解成其他的方式并且更加直接和明显。

Paxos算法的第二个问题就是它没有提供一个足够好的用来构建一个现实系统的基础。一个原因是还没有一种被广泛认同的多决策问题的算法。Lamport 的描述基本上都是关于单决策 Paxos 的;他简要描述了实施多决策 Paxos 的方法,但是缺乏很多细节。当然也有很多具体化 Paxos 的尝试,但是他们都互相不一样,和 Paxos 的概述也不同。例如 Chubby 这样的系统实现了一个类似于 Paxos 的算法,但是大多数的细节并没有被公开。

而且,Paxos 算法的结构也不是十分易于构建实践的系统;单决策分解也会产生其他的结果。例如,独立的选择一组日志条目然后合并成一个序列化的日志并没有带来太多的好处,仅仅增加了不少复杂性。围绕着日志来设计一个系统是更加简单高效的;新日志条目以严格限制的顺序增添到日志中去。另一个问题是,Paxos 使用了一种对等的点对点的方式作为它的核心(尽管它最终提议了一种弱领导人的方法来优化性能)。在只有一个决策会被制定的简化世界中是很有意义的,但是很少有现实的系统使用这种方式。如果有一系列的决策需要被制定,首先选择一个领导人,然后让他去协调所有的决议,会更加简单快速。

因此,实际的系统中很少有和 Paxos 相似的实践。每一种实现都是从 Paxos 开始研究,然后发现很多实现上的难题,再然后开发了一种和 Paxos 明显不一样的结构。这样是非常费时和容易出错的,并且理解 Paxos 的难度使得这个问题更加糟糕。Paxos 算法在理论上被证明是正确可行的,但是现实的系统和 Paxos 差别是如此的大,以至于这些证明没有什么太大的价值。下面来自 Chubby 实现非常典型:

在Paxos算法描述和实现现实系统中间有着巨大的鸿沟。最终的系统建立在一种没有经过证明的算法之上。

大彬注:Paxos 只提供了多进程如何确定出一个共同的值。没有提供像Raft一样的日志、快照、节点成员变更等,如果用Paxos实现一致性,就需要开发者自己设计,也就出现了上面提到的,各家的实现有非常大的不同,整套系统也没有经过完整的证明。

由于以上问题,我们认为 Paxos 算法既没有提供一个良好的基础给实践的系统,也没有给教学很好的帮助。基于一致性问题在大规模软件系统中的重要性,我们决定看看我们是否可以设计一个拥有更好特性的替代 Paxos 的一致性算法。Raft算法就是这次实验的结果。

4 为了可理解性的设计

设计 Raft 算法我们有几个初衷:它必须提供一个完整的实际的系统实现基础,这样才能大大减少开发者的工作;它必须在任何情况下都是安全的并且在大多数的情况下都是可用的;并且它的大部分操作必须是高效的。但是我们最重要也是最大的挑战是可理解性。它必须保证对于普遍的人群都可以十分容易的去理解。另外,它必须能够让人形成直观的认识,这样系统的构建者才能够在现实中进行必然的扩展。

在设计 Raft 算法的时候,有很多的点需要我们在各种备选方案中进行选择。在这种情况下,我们评估备选方案基于可理解性原则:解释各个备选方案有多大的难度(例如,Raft 的状态空间有多复杂,是否有微妙的暗示)?对于一个读者而言,完全理解这个方案和暗示是否容易?

我们意识到对这种可理解性分析上具有高度的主观性;尽管如此,我们使用了两种通常适用的技术来解决这个问题。第一个技术就是众所周知的问题分解:只要有可能,我们就将问题分解成几个相对独立的,可被解决的、可解释的和可理解的子问题。例如,Raft 算法被我们分成领导人选举,日志复制,安全性和角色改变几个部分

我们使用的第二个方法是通过减少状态的数量来简化需要考虑的状态空间,使得系统更加连贯并且在可能的时候消除不确定性。特别的,所有的日志是不允许有空洞的,并且 Raft 限制了日志之间变成不一致状态的可能。尽管在大多数情况下我们都试图去消除不确定性,但是也有一些情况下不确定性可以提升可理解性。尤其是,随机化方法增加了不确定性,但是他们有利于减少状态空间数量,通过处理所有可能选择时使用相似的方法。我们使用随机化去简化 Raft 中领导人选举算法

5 Raft 一致性算法

Raft 是一种用来管理章节 2 中描述的复制日志的算法。图 2 为了参考之用,总结这个算法的简略版本,图 3 列举了这个算法的一些关键特性。图中的这些元素会在剩下的章节逐一介绍。

Raft 通过选举一个权威的领导人,然后给予他全部的管理复制日志的责任来实现一致性。领导人从客户端接收日志条目,把日志条目复制到其他服务器上,并且当保证安全性的时候告诉其他的服务器应用日志条目到他们的状态机中。拥有一个领导人大大简化了对复制日志的管理。例如,领导人可以决定新的日志条目需要放在日志中的什么位置而不需要和其他服务器商议,并且数据都从领导人流向其他服务器。一个领导人可以宕机,可以和其他服务器失去连接,这时一个新的领导人会被选举出来。

通过领导人的方式,Raft 将一致性问题分解成了三个相对独立的子问题,这些问题会在接下来的子章节中进行讨论:

  • 领导选举:一个新的领导人需要被选举出来,当现存的领导人宕机的时候(章节 5.2)
  • 日志复制:领导人必须从客户端接收日志然后复制到集群中的其他节点,并且强制要求其他节点的日志保持和自己相同。
  • 安全性:在 Raft 中安全性的关键是在图 3 中展示的状态机安全:如果有任何的服务器节点已经应用了一个确定的日志条目到它的状态机中,那么其他服务器节点不能在同一个日志索引位置应用一个不同的指令。章节 5.4 阐述了 Raft 算法是如何保证这个特性的;这个解决方案涉及到一个额外的选举机制(5.2 节)上的限制。

在展示一致性算法之后,这一章节会讨论可用性的一些问题和计时在系统的作用。

状态

状态 所有服务器上持久存在的状态
currentTerm 服务器最后一次知道的任期号(初始化为 0,持续递增)
votedFor 在当前任期获得选票的候选人的 Id
log[] 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号。(即WAL,Write Ahead Log
状态 所有服务器上经常变的状态
commitIndex 已知的最大的已经被提交的日志条目的索引
lastApplied 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增)
状态 在领导人上经常改变的状态 (选举后重新初始化)
nextIndex[] 为每个服务器保存的,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加1)
matchIndex[] 为每个服务器保存的,已经复制给他的日志的最高索引值(初始值为0)

追加日志的RPC

由领导人负责调用来复制日志指令;也会用作heartbeat

RPC请求参数 解释
term 领导人的任期号
leaderId 领导人的 Id,以便于跟随者重定向请求
prevLogIndex 前一条日志条目索引
prevLogTerm prevLogIndex 条目的所在的任期
entries[] 准备存储的日志条目(表示心跳时为空;一次性发送多个是为了提高效率,这是Batch优化)
leaderCommit 领导人已经提交的日志的索引值
RPC结果返回值 解释
term 当前的任期号,用于领导人去更新自己
success 跟随者包含了匹配上 prevLogIndex 和 prevLogTerm 的日志时为真

接收者实现:

  1. 如果 term < currentTerm 就返回 false (5.1 节)
  2. 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节)
  3. 如果已经存在的日志条目和新的产生冲突(相同索引值但是任期号不同),删除这一条和之后所有的条目 (5.3 节)
  4. 追加日志中不存在的任何新条目
  5. 如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个

请求投票的RPC

由候选人负责调用用来征集选票(5.2 节)

请求的参数 解释
term 候选人的任期号
candidateId 请求选票的候选人的 Id
lastLogIndex 候选人的最后日志条目的索引值
lastLogTerm 候选人最后日志条目的任期号
请求的结果 解释
term 当前任期号,以便于候选人去更新自己的任期号
voteGranted 候选人赢得了此张选票时为真

接收者实现:

  1. 如果term < currentTerm返回 false (5.2 节)
  2. 如果 votedFor 为空或者为 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)

所有服务器需遵守的规则

所有服务器:

  • 如果commitIndex > lastApplied,那么就 lastApplied 加一,并把log[lastApplied]应用到状态机中(5.3 节)
  • 如果接收到的 RPC 请求或响应中,任期号T > currentTerm,那么就令 currentTerm 等于 T,并切换状态为跟随者(5.1 节)

跟随者(5.2 节):

  • 响应来自候选人和领导者的请求
  • 如果在超过选举超时时间的情况之前都没有收到领导人的心跳,或者是候选人请求投票的,就自己变成候选人

候选人(5.2 节):

  • 在转变成候选人后就立即开始选举过程
    • 自增当前的任期号(currentTerm)
    • 给自己投票
    • 重置选举超时计时器
    • 发送请求投票的 RPC 给其他所有服务器
  • 如果接收到大多数服务器的选票,那么就变成领导人
  • 如果接收到来自新的领导人的追加日志 RPC,转变成跟随者
  • 如果选举过程超时,再次发起一轮选举

领导人:

  • 一旦成为领导人:发送空的追加日志 RPC(心跳)给其他所有的服务器;在一定的空余时间之后不停的重复发送,以阻止跟随者超时(5.2 节)
  • 如果接收到来自客户端的请求:追加条目到本地日志中,在条目被应用到状态机后响应客户端(5.3 节)
  • 如果对于一个跟随者,最后日志条目的索引值大于等于 nextIndex,那么:发送从 nextIndex 开始的所有日志条目:
    • 如果成功:更新相应跟随者的 nextIndex 和 matchIndex
    • 如果因为日志不一致而失败,减少 nextIndex 重试
  • 如果存在一个满足N > commitIndex的 N,并且大多数的matchIndex[i] ≥ N成立,并且log[N].term == currentTerm成立,那么令 commitIndex 等于这个 N (5.3 和 5.4 节)

图 2

图 2:一个关于 Raft 一致性算法的浓缩总结(不包括成员变换和日志压缩)。

特性 解释
选举安全特性 对于一个给定的任期号,最多只会有一个领导人被选举出来(5.2 节)
领导人只追加原则 领导人绝对不会删除或者覆盖自己的日志,只会增加(5.3 节)
日志匹配原则 如果两个日志在相同的索引位置的日志条目的任期号相同,那么我们就认为这个日志从头到这个索引位置之间全部完全相同(5.3 节)
领导人完全特性 如果某个日志条目在某个任期号中已经被提交,那么这个条目必然出现在更大任期号的所有领导人中(5.4 节)
状态机安全特性 如果一个领导人已经在给定的索引值位置的日志条目应用到状态机中,那么其他任何的服务器在这个索引位置不会提交一个不同的日志(5.4.3 节)

图 3

图 3:Raft 在任何时候都保证以上的各个特性。

5.1 Raft 基础

一个 Raft 集群包含若干个服务器节点;通常是 5 个,这允许整个系统容忍 2 个节点的失效。在任何时刻,每一个服务器节点都处于这三个状态之一:领导人、跟随者或者候选人。在通常情况下,系统中只有一个领导人并且其他的节点全部都是跟随者。跟随者都是被动的:他们不会发送任何请求,只是简单的响应来自领导者或者候选人的请求。领导人处理所有的客户端请求(如果一个客户端和跟随者联系,那么跟随者会把请求重定向给领导人)。第三种状态,候选人,是用来在 5.2 节描述的选举新领导人时使用。图 4 展示了这些状态和他们之间的转换关系;这些转换关系会在接下来进行讨论。

图 4

图 4:服务器状态。跟随者只响应来自其他服务器的请求。如果跟随者接收不到消息,那么他就会变成候选人并发起一次选举。获得集群中大多数选票的候选人将成为领导者。在一个任期内,领导人一直都会是领导人直到自己宕机了。

图 5

图 5:时间被划分成一个个的任期,每个任期开始都是一次选举。在选举成功后,领导人会管理整个集群直到任期结束。有时候选举会失败,那么这个任期就会没有领导人而结束。任期之间的切换可以在不同的时间不同的服务器上观察到。

Raft 把时间分割成任意长度的任期,如图 5。任期用连续的整数标记。每一段任期从一次选举开始,就像章节 5.2 描述的一样,一个或者多个候选人尝试成为领导者。如果一个候选人赢得选举,然后他就在接下来的任期内充当领导人的职责。在某些情况下,一次选举过程会造成选票的瓜分。在这种情况下,这一任期会以没有领导人结束;一个新的任期(和一次新的选举)会很快重新开始。Raft 保证了在一个给定的任期内,最多只有一个领导者。

不同的服务器节点可能多次观察到任期之间的转换,但在某些情况下,一个节点也可能观察不到任何一次选举或者整个任期全程。任期在 Raft 算法中充当逻辑时钟的作用,这会允许服务器节点查明一些过期的信息比如陈旧的领导者。每一个节点存储一个当前任期号,这一编号在整个时期内单调的增长。当服务器之间通信的时候会交换当前任期号;如果一个服务器的当前任期号比其他人小,那么他会更新自己的编号到较大的编号值。如果一个候选人或者领导者发现自己的任期号过期了,那么他会立即恢复成跟随者状态。如果一个节点接收到一个包含过期的任期号的请求,那么他会直接拒绝这个请求。

Raft 算法中服务器节点之间通信使用远程过程调用(RPCs),并且基本的一致性算法只需要两种类型的 RPCs。请求投票(RequestVote) RPCs 由候选人在选举期间发起(章节 5.2),然后追加条目(AppendEntries)RPCs 由领导人发起,用来复制日志和提供一种心跳机制(章节 5.3)。第 7 节为了在服务器之间传输快照增加了第三种 RPC。当服务器没有及时的收到 RPC 的响应时,会进行重试, 并且他们能够并行的发起 RPCs 来获得最佳的性能

5.2 领导人选举

Raft 使用一种心跳机制来触发领导人选举。当服务器程序启动时,他们都是跟随者身份。一个服务器节点继续保持着跟随者状态只要他从领导人或者候选者处接收到有效的 RPCs。领导者周期性的向所有跟随者发送心跳包(即不包含日志项内容的追加日志项 RPCs)来维持自己的权威如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,那么他就会认为系统中没有可用的领导者,并且发起选举以选出新的领导者

要开始一次选举过程,跟随者先要增加自己的当前任期号并且转换到候选人状态。然后他会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。候选人会继续保持着当前状态直到以下三件事情之一发生:(a) 他自己赢得了这次的选举,(b) 其他的服务器成为领导者,(c) 一段时间之后没有任何一个获胜的人。这些结果会分别的在下面的段落里进行讨论。

当一个候选人从整个集群的大多数服务器节点获得了针对同一个任期号的选票,那么他就赢得了这次选举并成为领导人。每一个服务器最多会对一个任期号投出一张选票,按照先来先服务的原则(注意:5.4 节在投票上增加了一点额外的限制)。要求大多数选票的规则确保了最多只会有一个候选人赢得此次选举(图 3 中的选举安全性)。一旦候选人赢得选举,他就立即成为领导人。然后他会向其他的服务器发送心跳消息来建立自己的权威并且阻止新的领导人的产生

在等待投票的时候,候选人可能会从其他的服务器接收到声明它是领导人的追加日志项 RPC。如果这个领导人的任期号(包含在此次的 RPC中)不小于候选人当前的任期号,那么候选人会承认领导人合法并回到跟随者状态。 如果此次 RPC 中的任期号比自己小,那么候选人就会拒绝这次的 RPC 并且继续保持候选人状态。

第三种可能的结果是候选人既没有赢得选举也没有输:如果有多个跟随者同时成为候选人,那么选票可能会被瓜分以至于没有候选人可以赢得大多数人的支持。当这种情况发生的时候,每一个候选人都会超时,然后通过增加当前任期号来开始一轮新的选举。然而,没有其他机制的话,选票可能会被无限的重复瓜分。

Raft 算法使用随机选举超时时间的方法来确保很少会发生选票瓜分的情况,就算发生也能很快的解决。为了阻止选票起初就被瓜分,选举超时时间是从一个固定的区间(例如 150-300 毫秒)随机选择。这样可以把服务器都分散开以至于在大多数情况下只有一个服务器会选举超时;然后他赢得选举并在其他服务器超时之前发送心跳包。同样的机制被用在选票瓜分的情况下。每一个候选人在开始一次选举的时候会重置一个随机的选举超时时间,然后在超时时间内等待投票的结果;这样减少了在新的选举中另外的选票瓜分的可能性。9.3 节展示了这种方案能够快速的选出一个领导人。

领导人选举这个例子,体现了可理解性原则是如何指导我们进行方案设计的。起初我们计划使用一种排名系统:每一个候选人都被赋予一个唯一的排名,供候选人之间竞争时进行选择。如果一个候选人发现另一个候选人拥有更高的排名,那么他就会回到跟随者状态,这样高排名的候选人能够更加容易的赢得下一次选举。但是我们发现这种方法在可用性方面会有一点问题(如果高排名的服务器宕机了,那么低排名的服务器可能会超时并再次进入候选人状态。而且如果这个行为发生得足够快,则可能会导致整个选举过程都被重置掉)。我们针对算法进行了多次调整,但是每次调整之后都会有新的问题。最终我们认为随机重试的方法是更加明显和易于理解的。

5.3 日志复制

一旦一个领导人被选举出来,他就开始为客户端提供服务。客户端的每一个请求都包含一条被复制状态机执行的指令。领导人把这条指令作为一条新的日志条目追加到日志中去,然后并行的发起追加条目 RPCs 给其他的服务器,让他们复制这条日志条目。当这条日志条目被安全的复制(下面会介绍),领导人会应用这条日志条目到它的状态机中然后把执行的结果返回给客户端。如果跟随者崩溃或者运行缓慢,再或者网络丢包,领导人会不断的重复尝试追加日志条目 RPCs (尽管已经回复了客户端)直到所有的跟随者都最终存储了所有的日志条目。

所以这时并没有真正的写入日志,如果所有的follower都没收到这个日志,并且leader这时候挂了,其实这条日志可能丢失了。

图 6

图 6:日志由有序序号标记的条目组成。每个条目都包含创建时的任期号(图中框中的数字),和一个状态机需要执行的指令。一个条目当可以安全的被应用到状态机中去的时候,就认为是可以提交了。

日志以图 6 展示的方式组织。每一个日志条目存储一条状态机指令和从领导人收到这条指令时的任期号。日志中的任期号用来检查是否出现不一致的情况,同时也用来保证图 3 中的某些性质。每一条日志条目同时也都有一个整数索引值来表明它在日志中的位置

领导人来决定什么时候把日志条目应用到状态机中是安全的:这种日志条目被称为已提交。Raft 算法保证所有已提交的日志条目都是持久化的并且最终会被所有可用的状态机执行。在领导人将创建的日志条目复制到大多数的服务器上的时候,日志条目就会被提交(例如在图 6 中的条目 7)。同时,领导人的日志中之前的所有日志条目也都会被提交,包括由其他领导人创建的条目。5.4 节会讨论某些当在领导人改变之后应用这条规则的隐晦内容,同时他也展示了这种提交的定义是安全的。领导人跟踪了最大的将会被提交的日志项的索引,并且索引值会被包含在未来的所有追加日志 RPCs (包括心跳包),这样其他的服务器才能最终知道领导人的提交位置。一旦跟随者知道一条日志条目已经被提交,那么他也会将这个日志条目应用到本地的状态机中(按照日志的顺序)。

我们设计了 Raft 的日志机制来维护一个不同服务器的日志之间的高层次的一致性。这么做不仅简化了系统的行为也使得更加可预计,同时他也是安全性保证的一个重要组件。Raft 维护着以下的特性,这些同时也组成了图 3 中的日志匹配特性

  • 如果在不同的日志中的两个条目拥有相同的索引和任期号,那么他们存储了相同的指令。
  • 如果在不同的日志中的两个条目拥有相同的索引和任期号,那么他们之前的所有日志条目也全部相同。

第一个特性来自这样的一个事实,领导人最多在一个任期里在指定的一个日志索引位置创建一条日志条目,同时日志条目在日志中的位置也从来不会改变。第二个特性由追加日志 RPC 的一个简单的一致性检查所保证。在发送追加日志 RPC 的时候,领导人会把新的日志条目紧接着之前的条目的索引位置和任期号包含在里面。如果跟随者在它的日志中找不到包含相同索引位置和任期号的条目,那么他就会拒绝接收新的日志条目。一致性检查就像一个归纳步骤:一开始空的日志状态肯定是满足日志匹配特性的,当日志扩展的时候,一致性检查也确保了日志匹配特性。因此,每当追加日志 RPC 返回成功时,领导人就知道跟随者的日志一定是和自己相同的了。

在正常的操作中,领导人和跟随者的日志保持一致性,所以追加日志 RPC 的一致性检查从来不会失败。然而,领导人崩溃的情况会使得日志处于不一致的状态(老的领导人可能还没有完全复制所有的日志条目)。这种不一致问题会在领导人和跟随者的一系列崩溃下加剧。图 7 展示了跟随者的日志可能和新的领导人不同的方式。跟随者可能会丢失一些在新的领导人中有的日志条目,他也可能拥有一些领导人没有的日志条目,或者两者都发生。丢失或者多出日志条目可能会持续多个任期

图 7

图 7:当一个领导人成功当选时,跟随者可能是任何情况(a-f)。每一个盒子表示是一个日志条目;里面的数字表示任期号。跟随者可能会缺少一些日志条目(a-b),可能会有一些未被提交的日志条目(c-d),或者两种情况都存在(e-f)。例如,场景 f 可能会这样发生,某服务器在任期 2 的时候是领导人,已追加了一些日志条目到自己的日志中,但在提交之前就崩溃了;很快这个机器就被重启了,在任期 3 重新被选为领导人,并且又增加了一些日志条目到自己的日志中;在任期 2 和任期 3 的日志被提交之前,这个服务器又宕机了,并且在接下来的几个任期里一直处于宕机状态。

在 Raft 算法中,领导人处理不一致是通过强制跟随者直接复制自己的日志来解决了。这意味着在跟随者中的冲突的日志条目会被领导人的日志覆盖。5.4 节会阐述如何通过增加一些限制来使得这样的操作是安全的。

要使得跟随者的日志进入和自己一致的状态,领导人必须找到最后两者达成一致的地方,然后删除跟随者从那个点之后的所有日志条目,发送自己的日志给跟随者。所有的这些操作都在进行追加日志 RPCs 的一致性检查时完成。领导人针对每一个跟随者维护了一个 nextIndex,这表示下一个需要发送给跟随者的日志条目的索引地址。当一个领导人刚获得权力的时候,他初始化所有的 nextIndex 值为自己的最后一条日志的index加1(图 7 中的 11)。如果一个跟随者的日志和领导人不一致,那么在下一次的追加日志 RPC 时的一致性检查就会失败。在被跟随者拒绝之后,领导人就会减小 nextIndex 值并进行重试。最终 nextIndex 会在某个位置使得领导人和跟随者的日志达成一致。当这种情况发生,追加日志 RPC 就会成功,这时就会把跟随者冲突的日志条目全部删除并且加上领导人的日志。一旦追加日志 RPC 成功,那么跟随者的日志就会和领导人保持一致,并且在接下来的任期里一直继续保持。

如果需要的话,算法可以通过减少被拒绝的追加日志 RPCs 的次数来优化。例如,当追加日志 RPC 的请求被拒绝的时候,跟随者可以包含冲突的条目的任期号和自己存储的那个任期的最早的索引地址。借助这些信息,领导人可以减小 nextIndex 越过所有那个任期冲突的所有日志条目;这样就变成每个任期需要一次追加条目 RPC 而不是每个条目一次。在实践中,我们十分怀疑这种优化是否是必要的,因为失败是很少发生的并且也不大可能会有这么多不一致的日志。

通过这种机制,领导人在获得权力的时候就不需要任何特殊的操作来恢复一致性。他只需要进行正常的操作,然后日志就能自动的在回复追加日志 RPC 的一致性检查失败的时候自动趋于一致。领导人从来不会覆盖或者删除自己的日志(图 3 的领导人只追加特性)。

日志复制机制展示出了第 2 节中形容的一致性特性:Raft 能够接受,复制并应用新的日志条目只要大部分的机器是工作的;在通常的情况下,新的日志条目可以在一次 RPC 中被复制给集群中的大多数机器;并且单个的缓慢的跟随者不会影响整体的性能。

5.4 安全性

前面的章节里描述了 Raft 算法是如何选举和复制日志的。然而,到目前为止描述的机制并不能充分的保证每一个状态机会按照相同的顺序执行相同的指令。例如,一个跟随者可能会进入不可用状态同时领导人已经提交了若干的日志条目,然后这个跟随者可能会被选举为领导人并且覆盖这些日志条目;因此,不同的状态机可能会执行不同的指令序列

这一节通过在领导选举的时候增加一些限制来完善 Raft 算法。这一限制保证了任何的领导人对于给定的任期号,都拥有了之前任期的所有被提交的日志条目(图 3 中的领导人完整特性)。增加这一选举时的限制,我们对于提交时的规则也更加清晰。最终,我们将展示对于领导人完整特性的简要证明,并且说明领导人是如何领导复制状态机的做出正确行为的。

5.4.1 选举限制

在任何基于领导人的一致性算法中,领导人都必须存储所有已经提交的日志条目。在某些一致性算法中,例如 Viewstamped Replication,某个节点即使是一开始并没有包含所有已经提交的日志条目,它也能被选为领导者。这些算法都包含一些额外的机制来识别丢失的日志条目并把他们传送给新的领导人,要么是在选举阶段要么在之后很快进行。不幸的是,这种方法会导致相当大的额外的机制和复杂性。Raft 使用了一种更加简单的方法,它可以保证所有之前的任期号中已经提交的日志条目在选举的时候都会出现在新的领导人中,不需要传送这些日志条目给领导人。这意味着日志条目的传送是单向的,只从领导人传给跟随者,并且领导人从不会覆盖自身本地日志中已经存在的条目。

Raft 使用投票的方式来阻止一个候选人赢得选举,除非这个候选人包含了所有已经提交的日志条目。候选人为了赢得选举必须联系集群中的大部分节点,这意味着每一个已经提交的日志条目在这些服务器节点中肯定存在于至少一个节点上。如果候选人的日志至少和大多数的服务器节点一样新(这个新的定义会在下面讨论),那么他一定持有了所有已经提交的日志条目。请求投票 RPC 实现了这样的限制: RPC 中包含了候选人的日志信息,然后投票人会拒绝掉那些日志没有自己的日志新的投票请求

这样选举出来的leader,包含的日志数也是尽量多的,可能比某几个少,但比大多数节点多。

Raft 通过比较两份日志中最后一条日志条目的索引值和任期号定义谁的日志比较新。如果两份日志最后的条目的任期号不同,那么任期号大的日志更加新。如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。

5.4.2 提交之前任期内的日志条目

如同 5.3 节介绍的那样,领导人知道一条当前任期内的日志记录是可以被提交的,只要它被存储到了大多数的服务器上。如果一个领导人在提交日志条目之前崩溃了,未来后续的领导人会继续尝试复制这条日志记录。然而,一个领导人不能断定一个之前任期里的日志条目被保存到大多数服务器上的时候就一定已经提交了。图 8 展示了一种情况,一条已经被存储到大多数节点上的老日志条目,也依然有可能会被未来的领导人覆盖掉。

图 8

图 8:如图的时间序列展示了为什么领导人无法决定对老任期号的日志条目进行提交。在 (a) 中,S1 是领导者,部分的复制了索引位置 2 的日志条目。在 (b) 中,S1 崩溃了,然后 S5 在任期 3 里通过 S3、S4 和自己的选票赢得选举,然后从客户端接收了一条不一样的日志条目放在了索引 2 处。然后到 (c),S5 又崩溃了;S1 重新启动,选举成功,开始复制日志。在这时,来自任期 2 的那条日志已经被复制到了集群中的大多数机器上,但是还没有被提交。如果 S1 在 (d) 中又崩溃了,S5 可以重新被选举成功(通过来自 S2,S3 和 S4 的选票),然后覆盖了他们在索引 2 处的日志。反之,如果在崩溃之前,S1 把自己主导的新任期里产生的日志条目复制到了大多数机器上,就如 (e) 中那样,那么在后面任期里面这些新的日志条目就会被提交(因为S5 就不可能选举成功)。 这样在同一时刻就同时保证了,之前的所有老的日志条目就会被提交。

为了消除图 8 里描述的情况,Raft 永远不会通过计算副本数目的方式去提交一个之前任期内的日志条目。只有领导人当前任期里的日志条目通过计算副本数目可以被提交;一旦当前任期的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。在某些情况下,领导人可以安全的知道一个老的日志条目是否已经被提交(例如,该条目是否存储到所有服务器上),但是 Raft 为了简化问题使用一种更加保守的方法。

补充:获得多数投票的节点被选举为leader,leader并不一定包含上一个任期未提交的log。
如果包含,当前新日志提交的时候,新日志之前的日志也被确认,那些没有前一个任期日志的节点,会跟leader进行同步。

当领导人复制之前任期里的日志时,Raft 会为所有日志保留原始的任期号, 这在提交规则上产生了额外的复杂性。在其他的一致性算法中,如果一个新的领导人要重新复制之前的任期里的日志时,它必须使用当前新的任期号。Raft 使用的方法更加容易辨别出日志,因为它可以随着时间和日志的变化对日志维护着同一个任期编号。另外,和其他的算法相比,Raft 中的新领导人只需要发送更少日志条目(其他算法中必须在他们被提交之前发送更多的冗余日志条目来为他们重新编号)。

5.4.3 安全性论证

补:本节证明被提交的log是不会被删除的,即使发生leader切换,以及怎么确保所有机器数据一致。

在给定了完整的 Raft 算法之后,我们现在可以更加精确的讨论领导人完整性特性(这一讨论基于 9.2 节的安全性证明)。我们假设领导人完全性特性是不存在的,然后我们推出矛盾来。假设任期 T 的领导人(领导人 T)在任期内提交了一条日志条目,但是这条日志条目没有被存储到未来某个任期的领导人的日志中。设大于 T 的最小任期 U 的领导人 U 没有这条日志条目。

图 9

图 9:如果 S1 (任期 T 的领导者)提交了一条新的日志在它的任期里,然后 S5 在之后的任期 U 里被选举为领导人,然后至少会有一个机器,如 S3,既拥有来自 S1 的日志,也给 S5 投票了。

  1. 在领导人 U 选举的时候一定没有那条被提交的日志条目(领导人从不会删除或者覆盖任何条目)。
  2. 领导人 T 复制这条日志条目给集群中的大多数节点,同时,领导人U 从集群中的大多数节点赢得了选票。因此,至少有一个节点(投票者、选民)同时接受了来自领导人T 的日志条目,并且给领导人U 投票了,如图 9。这个投票者是产生这个矛盾的关键。
  3. 这个投票者必须在给领导人 U 投票之前先接受了从领导人 T 发来的已经被提交的日志条目;否则他就会拒绝来自领导人 T 的追加日志请求(因为此时他的任期号会比 T 大)。
  4. 投票者在给领导人 U 投票时依然保存有这条日志条目,因为任何中间的领导人都包含该日志条目(根据上述的假设),领导人从不会删除条目,并且跟随者只有在和领导人冲突的时候才会删除条目。
  5. 投票者把自己选票投给领导人 U 时,领导人 U 的日志必须和投票者自己一样新。这就导致了两者矛盾之一。
  6. 首先,如果投票者和领导人 U 的最后一条日志的任期号相同,那么领导人 U 的日志至少和投票者一样长,所以领导人 U 的日志一定包含所有投票者的日志。这是另一处矛盾,因为投票者包含了那条已经被提交的日志条目,但是在上述的假设里,领导人 U 是不包含的。
  7. 除此之外,领导人 U 的最后一条日志的任期号就必须比投票人大了。此外,他也比 T 大,因为投票人的最后一条日志的任期号至少和 T 一样大(他包含了来自任期 T 的已提交的日志)。创建了领导人 U 最后一条日志的之前领导人一定已经包含了那条被提交的日志(根据上述假设,领导人 U 是第一个不包含该日志条目的领导人)。所以,根据日志匹配特性,领导人 U 一定也包含那条被提交的日志,这里产生矛盾。
  8. 这里完成了矛盾。因此,所有比 T 大的领导人一定包含了所有来自 T 的已经被提交的日志。
  9. 日志匹配原则保证了未来的领导人也同时会包含被间接提交的条目,例如图 8 (d) 中的索引 2。

通过领导人完全特性,我们就能证明图 3 中的状态机安全特性,即如果服务器已经在某个给定的索引值应用了日志条目到自己的状态机里,那么其他的服务器不会应用一个不一样的日志到同一个索引值上。在一个服务器应用一条日志条目到他自己的状态机中时,他的日志必须和领导人的日志,在该条目和之前的条目上相同,并且已经被提交。现在我们来考虑在任何一个服务器应用一个指定索引位置的日志的最小任期;日志完全特性保证拥有更高任期号的领导人会存储相同的日志条目,所以之后的任期里应用某个索引位置的日志条目也会是相同的值。因此,状态机安全特性是成立的。

最后,Raft 要求服务器按照日志中索引位置顺序应用日志条目,和状态机安全特性结合起来看,这就意味着所有的服务器会应用相同的日志序列集到自己的状态机中,并且是按照相同的顺序

5.5 跟随者和候选人崩溃

到目前为止,我们都只关注了领导人崩溃的情况。跟随者和候选人崩溃后的处理方式比领导人要简单的多,并且他们的处理方式是相同的。如果跟随者或者候选人崩溃了,那么后续发送给他们的 RPCs 都会失败。Raft 中处理这种失败就是简单的通过无限的重试;如果崩溃的机器重启了,那么这些 RPC 就会完整的成功。如果一个服务器在完成了一个 RPC,但是还没有响应的时候崩溃了,那么在他重新启动之后就会再次收到同样的请求。Raft 的 RPCs 都是幂等的,所以这样重试不会造成任何问题。例如一个跟随者如果收到追加日志请求但是他已经包含了这一日志,那么他就会直接忽略这个新的请求。

5.6 时间和可用性

Raft 的要求之一就是安全性不能依赖时间:整个系统不能因为某些事件运行的比预期快一点或者慢一点就产生了错误的结果。但是,可用性(系统可以及时的响应客户端)不可避免的要依赖于时间。例如,如果消息交换比服务器故障间隔时间长,候选人将没有足够长的时间来赢得选举;没有一个稳定的领导人,Raft 将无法工作。

领导人选举是 Raft 中对时间要求最为关键的方面。Raft 可以选举并维持一个稳定的领导人,只要系统满足下面的时间要求

广播时间(broadcastTime) << 选举超时时间(electionTimeout) << 平均故障间隔时间(MTBF)

在这个不等式中,广播时间指的是从一个服务器并行的发送 RPCs 给集群中的其他服务器并接收响应的平均时间;选举超时时间就是在 5.2 节中介绍的选举的超时时间限制;然后平均故障间隔时间就是对于一台服务器而言,两次故障之间的平均时间。广播时间必须比选举超时时间小一个量级,这样领导人才能够发送稳定的心跳消息来阻止跟随者开始进入选举状态;通过随机化选举超时时间的方法,这个不等式也使得选票瓜分的情况变得不可能。选举超时时间应该要比平均故障间隔时间小上几个数量级,这样整个系统才能稳定的运行。当领导人崩溃后,整个系统会大约相当于选举超时的时间里不可用;我们希望这种情况在整个系统的运行中很少出现。

广播时间和平均故障间隔时间是由系统决定的,但是选举超时时间是我们自己选择的。Raft 的 RPCs 需要接收方将信息持久化的保存到稳定存储中去,所以广播时间大约是 0.5 毫秒到 20 毫秒,取决于存储的技术。因此,选举超时时间可能需要在 10 毫秒到 500 毫秒之间。大多数的服务器的平均故障间隔时间都在几个月甚至更长,很容易满足时间的需求。

6 集群成员变化

到目前为止,我们都假设集群的配置(加入到一致性算法的服务器集合)是固定不变的。但是在实践中,偶尔是会改变集群的配置的,例如替换那些宕机的机器或者改变复制级别。尽管可以通过暂停整个集群,更新所有配置,然后重启整个集群的方式来实现,但是在更改的时候集群会不可用。另外,如果存在手工操作步骤,那么就会有操作失误的风险。为了避免这样的问题,我们决定自动化配置改变并且将其纳入到 Raft 一致性算法中来。

为了让配置修改机制能够安全,那么在转换的过程中不能够存在任何时间点使得两个领导人同时被选举成功在同一个任期里。不幸的是,任何服务器直接从旧的配置直接转换到新的配置的方案都是不安全的。一次性自动的转换所有服务器是不可能的,所以在转换期间整个集群存在划分成两个独立的大多数群体的可能性(见图 10)。

补:成员变更有脑裂的风险。

图 10

图 10:直接从一种配置转到新的配置是十分不安全的,因为各个机器可能在任何的时候进行转换。在这个例子中,集群配额从 3 台机器变成了 5 台。不幸的是,存在这样的一个时间点,两个不同的领导人在同一个任期里都可以被选举成功。一个是通过旧的配置,一个通过新的配置。

为了保证安全性,配置更改必须使用两阶段方法。目前有很多种两阶段的实现。例如,有些系统在第一阶段停掉旧的配置所以集群就不能处理客户端请求;然后在第二阶段在启用新的配置。**在 Raft 中,集群先切换到一个过渡的配置,我们称之为联结一致(joint consensus)**;一旦联结一致(joint consensus)已经被提交了,那么系统就切换到新的配置上。联结一致(joint consensus)是老配置和新配置的结合:

  • 日志条目被复制给集群中新、老配置的所有服务器。
  • 新、旧配置的服务器都可以成为领导人。
  • 达成一致(针对选举和提交)需要分别在两种配置上获得大多数的支持。

联结一致(joint consensus)允许独立的服务器在不影响安全性的前提下,在不同的时间进行配置转换过程。此外,联结一致(joint consensus)可以让集群在配置转换的过程人依然响应客户端的请求。

集群配置在复制日志中以特殊的日志条目来存储和通信;图 11 展示了配置转换的过程。当一个领导人接收到一个改变配置从 C-old 到 C-new 的请求,他会为了联结一致(joint consensus)存储配置(图中的 C-old,new),以前面描述的日志条目和副本的形式。一旦一个服务器将新的配置日志条目增加到它的日志中,他就会用这个配置来做出未来所有的决定(服务器总是使用最新的配置,无论他是否已经被提交)。这意味着领导人要使用 C-old,new 的规则来决定日志条目 C-old,new 什么时候需要被提交。如果领导人崩溃了,被选出来的新领导人可能是使用 C-old 配置也可能是 C-old,new 配置,这取决于赢得选举的候选人是否已经接收到了 C-old,new 配置。在任何情况下, C-new 配置在这一时期都不会单方面的做出决定。

一旦 C-old,new 被提交,那么无论是 C-old 还是 C-new,在没有经过他人批准的情况下都不可能做出决定,并且领导人完全特性保证了只有拥有 C-old,new 日志条目的服务器才有可能被选举为领导人。这个时候,领导人创建一条关于 C-new 配置的日志条目并复制给集群就是安全的了。再者,每个服务器在见到新的配置的时候就会立即生效。当新的配置在 C-new 的规则下被提交,旧的配置就变得无关紧要,同时不使用新的配置的服务器就可以被关闭了。如图 11,C-old 和 C-new 没有任何机会同时做出单方面的决定;这保证了安全性。

图 11

图 11:一个配置切换的时间线。虚线表示已经被创建但是还没有被提交的条目,实线表示最后被提交的日志条目。领导人首先创建了 C-old,new 的配置条目在自己的日志中,并提交到 C-old,new 中(C-old 的大多数和 C-new 的大多数)。然后他创建 C-new 条目并提交到 C-new 中的大多数。这样就不存在 C-new 和 C-old 可以同时做出决定的时间点。

在关于重新配置还有三个问题需要解决。第一个问题是,新的服务器可能初始化没有存储任何的日志条目。当这些服务器以这种状态加入到集群中,那么他们需要一段时间来更新追赶,这时还不能提交新的日志条目。为了避免这种可用性的间隔时间,Raft 在配置更新的时候使用了一种额外的阶段,在这个阶段,新的服务器以没有投票权身份加入到集群中来(领导人复制日志给他们,但是不考虑他们是大多数)。一旦新的服务器追赶上了集群中的其他机器,重新配置可以像上面描述的一样处理。

第二个问题是,集群的领导人可能不是新配置的一员。在这种情况下,领导人就会在提交了 C-new 日志之后退位(回到跟随者状态)。这意味着有这样的一段时间,领导人管理着集群,但是不包括他自己;他复制日志但是不把他自己算作是大多数之一。当 C-new 被提交时,会发生领导人过渡,因为这时是最早新的配置可以独立工作的时间点(将总是能够在 C-new 配置下选出新的领导人)。在此之前,可能只能从 C-old 中选出领导人。

第三个问题是,移除不在 C-new 中的服务器可能会扰乱集群。这些服务器将不会再接收到心跳,所以当选举超时,他们就会进行新的选举过程。他们会发送拥有新的任期号的请求投票 RPCs,这样会导致当前的领导人回退成跟随者状态。新的领导人最终会被选出来,但是被移除的服务器将会再次超时,然后这个过程会再次重复,导致整体可用性大幅降低。为了避免这个问题,当服务器确认当前领导人存在时,服务器会忽略请求投票 RPCs。特别的,当服务器在当前最小选举超时时间内收到一个请求投票 RPC,他不会更新当前的任期号或者投出选票。这不会影响正常的选举,每个服务器在开始一次选举之前,至少等待一个最小选举超时时间。然而,这有利于避免被移除的服务器扰乱:如果领导人能够发送心跳给集群,那么他就不会被更大的任期号废黜。

简述版本:

  1. 客户端向leader发送更新配置的请求C-new,
  2. 领导人生成联结一致的配置C-old,new,即一条特殊的log,
  3. 并分发给所有follower,包含old和new2部分,
  4. 节点始终使用最新的配置,即使是未提交的,
  5. 所以leader把C-old,new分发给所有follower,包含old和new两部分,
  6. follower收到C-old,new后,follower就开始使用新配置,向leader发送响应,
  7. leader收到old和new多数的响应后,提交C-old,new,
  8. leader创建一个包含C-new的log,
  9. 按照“节点始终使用最新的配置,即使是未提交的”原则,那leader只需要发送给new集群的节点了,那old咋能知道leader发送新日志了?old咋知道切换到新配置?还得给old follower发送新log,
  10. follower按收到的新log 配置进行,new集群的follower向leader发送响应,(问题:按新配置了,leader不在新配置中,为啥还得向leader发送响应,但此时也不能进行选举,会造成存在2个leader的情况),
  11. leader将新log提交,
  12. 如果leader不在新集群中,leader主动退位,新集群进行选举。

7 日志压缩

Raft 的日志在正常操作中不断的增长,但是在实际的系统中,日志不能无限制的增长。随着日志不断增长,他会占用越来越多的空间,花费越来越多的时间来重置。如果没有一定的机制去清除日志里积累的陈旧的信息,那么会带来可用性问题。

快照是最简单的压缩方法。在快照系统中,整个系统的状态都以快照的形式写入到稳定的持久化存储中,然后到那个时间点之前的日志全部丢弃。快照技术被使用在 Chubby 和 ZooKeeper 中,接下来的章节会介绍 Raft 中的快照技术。

增量压缩的方法,例如日志清理或者日志结构合并树,都是可行的。这些方法每次只对一小部分数据进行操作,这样就分散了压缩的负载压力。首先,他们先选择一个已经积累的大量已经被删除或者被覆盖对象的区域,然后重写那个区域还活跃的对象,之后释放那个区域。和简单操作整个数据集合的快照相比,需要增加复杂的机制来实现。状态机可以实现 LSM tree 使用和快照相同的接口,但是日志清除方法就需要修改 Raft 了。

图 12

图 12:一个服务器用新的快照替换了从 1 到 5 的条目,快照值存储了当前的状态。快照中包含了最后的索引位置和任期号。

图 12 展示了 Raft 中快照的基础思想。每个服务器独立的创建快照,只包括已经被提交的日志。主要的工作包括将状态机的状态写入到快照中。Raft 也包含一些少量的元数据到快照中:最后被包含索引指的是被快照取代的最后的条目在日志中的索引值(状态机最后应用的日志),最后被包含的任期指的是该条目的任期号。保留这些数据是为了支持快照后紧接着的第一个条目的追加日志请求时的一致性检查,因为这个条目需要前一日志条目的索引值和任期号。为了支持集群成员更新(第 6 节),快照中也将最后的一次配置作为最后一个条目存下来。一旦服务器完成一次快照,他就可以删除最后索引位置之前的所有日志和快照了。

尽管通常服务器都是独立的创建快照,但是领导人必须偶尔的发送快照给一些落后的跟随者。这通常发生在当领导人已经丢弃了下一条需要发送给跟随者的日志条目的时候。幸运的是这种情况不是常规操作:一个与领导人保持同步的跟随者通常都会有这个条目。然而一个运行非常缓慢的跟随者或者新加入集群的服务器(第 6 节)将不会有这个条目。这时让这个跟随者更新到最新的状态的方式就是通过网络把快照发送给他们。

安装快照 RPC

由领导人调用以将快照的分块发送给跟随者。领导者总是按顺序发送分块。

参数 解释
term 领导人的任期号
leaderId 领导人的 Id,以便于跟随者重定向请求
lastIncludedIndex 快照中包含的最后日志条目的索引值
lastIncludedTerm 快照中包含的最后日志条目的任期号
offset 分块在快照中的字节偏移量
data[] 原始数据
done 如果这是最后一个分块则为 true
结果 解释
term 当前任期号(currentTerm),便于领导人更新自己

接收者实现

  1. 如果term < currentTerm就立即回复
  2. 如果是第一个分块(offset 为 0)就创建一个新的快照
  3. 在指定偏移量写入数据
  4. 如果 done 是 false,则继续等待更多的数据
  5. 保存快照文件,丢弃具有较小索引的任何现有或部分快照
  6. 如果现存的日志条目与快照中最后包含的日志条目具有相同的索引值和任期号,则保留其后的日志条目并进行回复
  7. 丢弃整个日志
  8. 使用快照重置状态机(并加载快照的集群配置)

图 13

图 13:一个关于安装快照的简要概述。为了便于传输,快照都是被分成分块的;每个分块都给了跟随者生命的迹象,所以跟随者可以重置选举超时计时器

快照可能比较大,分片传输可以降低快照传输对log传输的影响,同时可以让跟随者重置超时定时器,避免发生选举。

在这种情况下领导人使用一种叫做安装快照的新的 RPC 来发送快照给太落后的跟随者;见图 13。当跟随者通过这种 RPC 接收到快照时,他必须自己决定对于已经存在的日志该如何处理。通常快照会包含没有在接收者日志中存在的信息。在这种情况下,跟随者丢弃其整个日志;它全部被快照取代,并且可能包含与快照冲突的未提交条目。如果接收到的快照是自己日志的前面部分(由于网络重传或者错误),那么被快照包含的条目将会被全部删除,但是快照后面的条目仍然有效,必须保留。

这种快照的方式背离了 Raft 的强领导人原则,因为跟随者可以在不知道领导人情况下创建快照。但是我们认为这种背离是值得的。领导人的存在,是为了解决在达成一致性的时候的冲突,但是在创建快照的时候,一致性已经达成,这时不存在冲突了,所以没有领导人也是可以的。数据依然是从领导人传给跟随者,只是跟随者可以重新组织他们的数据了

我们考虑过一种替代的基于领导人的快照方案,即只有领导人创建快照,然后发送给所有的跟随者。但是这样做有两个缺点。第一,发送快照会浪费网络带宽并且延缓了快照处理的时间。每个跟随者都已经拥有了所有产生快照需要的信息,而且很显然,自己从本地的状态中创建快照比通过网络接收别人发来的要经济。第二,领导人的实现会更加复杂。例如,领导人需要发送快照的同时并行的将新的日志条目发送给跟随者,这样才不会阻塞新的客户端请求。

还有两个问题影响了快照的性能。首先,服务器必须决定什么时候应该创建快照。如果快照创建的过于频繁,那么就会浪费大量的磁盘带宽和其他资源;如果创建快照频率太低,他就要承受耗尽存储容量的风险,同时也增加了从日志重建的时间。一个简单的策略就是当日志大小达到一个固定大小的时候就创建一次快照。如果这个阈值设置的显著大于期望的快照的大小,那么快照对磁盘压力的影响就会很小了。

第二个影响性能的问题就是写入快照需要花费显著的一段时间,并且我们还不希望影响到正常操作。解决方案是通过写时复制的技术,这样新的更新就可以被接收而不影响到快照。例如,具有函数式数据结构的状态机天然支持这样的功能。另外,操作系统的写时复制技术的支持(如 Linux 上的 fork)可以被用来创建完整的状态机的内存快照(我们的实现就是这样的)

8 客户端交互

这一节将介绍客户端是如何和 Raft 进行交互的,包括客户端如何发现领导人和 Raft 是如何支持线性化语义的。这些问题对于所有基于一致性的系统都存在,并且 Raft 的解决方案和其他的也差不多。

Raft 中的客户端发送所有请求给领导人。当客户端启动的时候,他会随机挑选一个服务器进行通信。如果客户端第一次挑选的服务器不是领导人,那么那个服务器会拒绝客户端的请求并且提供他最近接收到的领导人的信息(追加条目请求包含了领导人的网络地址)。如果领导人已经崩溃了,那么客户端的请求就会超时;客户端之后会再次重试随机挑选服务器的过程。

我们 Raft 的目标是要实现线性化语义(每一次操作立即执行,只执行一次,在他调用和收到回复之间)。但是,如上述,Raft 是可以执行同一条命令多次的:例如,如果领导人在提交了这条日志之后,但是在响应客户端之前崩溃了,那么客户端会和新的领导人重试这条指令,导致这条命令就被再次执行了。解决方案就是客户端对于每一条指令都赋予一个唯一的序列号。然后,状态机跟踪每条指令最新的序列号和相应的响应。如果接收到一条指令,它的序列号已经被执行了,那么就立即返回结果,而不重新执行指令

log的是有状态的,无需重复计算状态。

只读的操作可以直接处理而不需要记录日志。但是,在不增加任何限制的情况下,这么做可能会冒着返回脏数据的风险,因为领导人响应客户端请求时可能已经被新的领导人作废了,但是他还不知道。线性化的读操作必须不能返回脏数据,Raft 需要使用两个额外的措施在不使用日志的情况下保证这一点。首先,领导人必须有关于被提交日志的最新信息。领导人完全特性保证了领导人一定拥有所有已经被提交的日志条目,但是在他任期开始的时候,他可能不知道哪些是已经被提交的。为了知道这些信息,他需要在他的任期里提交一条日志条目。Raft 中通过领导人在任期开始的时候提交一个空白的没有任何操作的日志条目到日志中去来实现第二,领导人在处理只读的请求之前必须检查自己是否已经被废黜了(他自己的信息已经变脏了如果一个更新的领导人被选举出来)。Raft 中通过让领导人在响应只读请求之前,先和集群中的大多数节点交换一次心跳信息来处理这个问题。可选的,领导人可以依赖心跳机制来实现一种租约的机制,但是这种方法依赖时间来保证安全性(假设时间误差是有界的)。

9 算法实现和评估

我们已经为 RAMCloud 实现了 Raft 算法作为存储配置信息的复制状态机的一部分,并且帮助 RAMCloud 协调故障转移。这个 Raft 实现包含大约 2000 行 C++ 代码,其中不包括测试、注释和空行。这些代码是开源的。同时也有大约 25 个其他独立的第三方的基于这篇论文草稿的开源实现,针对不同的开发场景。同时,很多公司已经部署了基于 Raft 的系统。

这一节会从三个方面来评估 Raft 算法:可理解性、正确性和性能。

9.1 可理解性

为了和 Paxos 比较 Raft 算法的可理解能力,我们针对高层次的本科生和研究生,在斯坦福大学的高级操作系统课程和加州大学伯克利分校的分布式计算课程上,进行了一次学习的实验。我们分别拍了针对 Raft 和 Paxos 的视频课程,并准备了相应的小测验。Raft 的视频讲课覆盖了这篇论文的所有内容除了日志压缩;Paxos 讲课包含了足够的资料来创建一个等价的复制状态机,包括单决策 Paxos,多决策 Paxos,重新配置和一些实际系统需要的性能优化(例如领导人选举)。小测验测试一些对算法的基本理解和解释一些边角的示例。每个学生都是看完第一个视频,回答相应的测试,再看第二个视频,回答相应的测试。大约有一半的学生先进行 Paxos 部分,然后另一半先进行 Raft 部分,这是为了说明两者从第一部分的算法学习中获得的表现和经验的差异。我们计算参加人员的每一个小测验的得分来看参与者是否在 Raft 算法上更加容易理解。

我们尽可能的使得 Paxos 和 Raft 的比较更加公平。这个实验偏爱 Paxos 表现在两个方面:43 个参加者中有 15 个人在之前有一些 Paxos 的经验,并且 Paxos 的视频要长 14%。如表格 1 总结的那样,我们采取了一些措施来减轻这种潜在的偏见。我们所有的材料都可供审查。

关心 缓和偏见采取的手段 可供查看的材料
相同的讲课质量 两者使用同一个讲师。Paxos 使用的是现在很多大学里经常使用的。Paxos 会长 14%。 视频
相同的测验难度 问题以难度分组,在两个测验里成对出现。 小测验
公平评分 使用评价量规。随机顺序打分,两个测验交替进行。 评价量规(rubric)

表 1:考虑到可能会存在的偏见,对于每种情况的解决方法,和相应的材料。

参加者平均在 Raft 的测验中比 Paxos 高 4.9 分(总分 60,那么 Raft 的平均得分是 25.7,而 Paxos 是 20.8);图 14 展示了每个参与者的得分。配置t-检验(又称student‘s t-test)表明,在 95% 的可信度下,真实的 Raft 分数分布至少比 Paxos 高 2.5 分。

图 14

图 14:一个散点图表示了 43 个学生在 Paxos 和 Raft 的小测验中的成绩。在对角线之上的点表示在 Raft 获得了更高分数的学生。

我们也建立了一个线性回归模型来预测一个新的学生的测验成绩,基于以下三个因素:他们使用的是哪个小测验,之前对 Paxos 的经验,和学习算法的顺序。模型预测,对小测验的选择会产生 12.5 分的差别。这显著的高于之前的 4.9 分,因为很多学生在之前都已经有了对于 Paxos 的经验,这相当明显的帮助 Paxos,对 Raft 就没什么太大影响了。但是奇怪的是,模型预测对于先进行 Paxos 小测验的人而言,Raft的得分低了6.3分; 虽然我们不知道为什么,这似乎在统计上是有意义的。

我们同时也在测验之后调查了参与者,他们认为哪个算法更加容易实现和解释;这个的结果在图 15 上。压倒性的结果表明 Raft 算法更加容易实现和解释(41 人中的 33个)。但是,这种自己报告的结果不如参与者的成绩更加可信,并且参与者可能因为我们的 Raft 更加易于理解的假说而产生偏见。

图 15

图 15:通过一个 5 分制的问题,参与者(左边)被问哪个算法他们觉得在一个高效正确的系统里更容易实现,右边被问哪个更容易向学生解释。

关于 Raft 用户学习有一个更加详细的讨论。

9.2 正确性

在第 5 节,我们已经制定了正式的规范,和对一致性机制的安全性证明。这个正式规范使用 TLA+ 规范语言使图 2 中总结的信息非常清晰。它长约400行,并作为证明的主题。同时对于任何想实现 Raft 的人也是十分有用的。我们通过 TLA 证明系统非常机械的证明了日志完全特性。然而,这个证明依赖的约束前提还没有被机械证明(例如,我们还没有证明规范的类型安全)。而且,我们已经写了一个非正式的证明关于状态机安全性是完备的,并且是相当清晰的(大约 3500 个词)。

9.3 性能

Raft 和其他一致性算法例如 Paxos 有着差不多的性能。在性能方面,最重要的关注点是,当领导人被选举成功时,什么时候复制新的日志条目。Raft 通过很少数量的消息包(一轮从领导人到集群大多数机器的消息)就达成了这个目的。同时,进一步提升 Raft 的性能也是可行的。例如,很容易通过支持批量操作和管道操作来提高吞吐量和降低延迟。对于其他一致性算法已经提出过很多性能优化方案;其中有很多也可以应用到 Raft 中来,但是我们暂时把这个问题放到未来的工作中去。

我们使用我们自己的 Raft 实现来衡量 Raft 领导人选举的性能并且回答两个问题。首先,领导人选举的过程收敛是否快速?第二,在领导人宕机之后,最小的系统宕机时间是多久?

图 16

图 16:发现并替换一个已经崩溃的领导人的时间。上面的图考察了在选举超时时间上的随机化程度,下面的图考察了最小选举超时时间。每条线代表了 1000 次实验(除了 150-150 毫秒只试了 100 次),和相应的确定的选举超时时间。例如,150-155 毫秒意思是,选举超时时间从这个区间范围内随机选择并确定下来。这个实验在一个拥有 5 个节点的集群上进行,其广播时延大约是 15 毫秒。对于 9 个节点的集群,结果也差不多。

为了衡量领导人选举,我们反复的使一个拥有五个节点的服务器集群的领导人宕机,并计算需要多久才能发现领导人已经宕机并选出一个新的领导人(见图 16)。为了构建一个最坏的场景,在每一次的尝试里,服务器都有不同长度的日志,意味着有些候选人是没有成为领导人的资格的。另外,为了促成选票瓜分的情况,我们的测试脚本在终止领导人之前同步的发送了一次心跳广播(这大约和领导人在崩溃前复制一个新的日志给其他机器很像)。领导人均匀的随机的在心跳间隔里宕机,也就是最小选举超时时间的一半。因此,最小宕机时间大约就是最小选举超时时间的一半。

图 16 中上面的图表明,只需要在选举超时时间上使用很少的随机化就可以大大避免选票被瓜分的情况。在没有随机化的情况下,在我们的测试里,选举过程往往都需要花费超过 10 秒钟由于太多的选票瓜分的情况。仅仅增加 5 毫秒的随机化时间,就大大的改善了选举过程,现在平均的宕机时间只有 287 毫秒。增加更多的随机化时间可以大大改善最坏情况:通过增加 50 毫秒的随机化时间,最坏的完成情况(1000 次尝试)只要 513 毫秒。

图 16 中下面的图显示,通过减少选举超时时间可以减少系统的宕机时间。在选举超时时间为 12-24 毫秒的情况下,只需要平均 35 毫秒就可以选举出新的领导人(最长的一次花费了 152 毫秒)。然而,进一步降低选举超时时间的话就会违反 Raft 的时间不等式需求:在选举新领导人之前,领导人就很难发送完心跳包。这会导致没有意义的领导人改变并降低了系统整体的可用性。我们建议使用更为保守的选举超时时间,比如 150-300 毫秒;这样的时间不大可能导致没有意义的领导人改变,而且依然提供不错的可用性。

10 相关工作

已经有很多关于一致性算法的工作被发表出来,其中很多都可以归到下面的类别中:

  • Lamport 关于 Paxos 的原始描述,和尝试描述的更清晰。
  • 关于 Paxos 的更详尽的描述,补充遗漏的细节并修改算法,使得可以提供更加容易的实现基础。
  • 实现一致性算法的系统,例如 Chubby,ZooKeeper 和 Spanner。对于 Chubby 和 Spanner 的算法并没有公开发表其技术细节,尽管他们都声称是基于 Paxos 的。ZooKeeper 的算法细节已经发表,但是和 Paxos 着实有着很大的差别。
  • Paxos 可以应用的性能优化。
  • Oki 和 Liskov 的 Viewstamped Replication(VR),一种和 Paxos 差不多的替代算法。原始的算法描述和分布式传输协议耦合在了一起,但是核心的一致性算法在最近的更新里被分离了出来。VR 使用了一种基于领导人的方法,和 Raft 有很多相似之处。

Raft 和 Paxos 最大的不同之处就在于 Raft 的强领导特性:Raft 使用领导人选举作为一致性协议里必不可少的部分,并且将尽可能多的功能集中到了领导人身上。这样就可以使得算法更加容易理解。例如,在 Paxos 中,领导人选举和基本的一致性协议是正交的:领导人选举仅仅是性能优化的手段,而且不是一致性所必须要求的。但是,这样就增加了多余的机制:Paxos 同时包含了针对基本一致性要求的两阶段提交协议和针对领导人选举的独立的机制。相比较而言,Raft 就直接将领导人选举纳入到一致性算法中,并作为两阶段一致性的第一步。这样就减少了很多机制。

像 Raft 一样,VR 和 ZooKeeper 也是基于领导人的,因此他们也拥有一些 Raft 的优点。但是,Raft 比 VR 和 ZooKeeper 拥有更少的机制因为 Raft 尽可能的减少了非领导人的功能。例如,Raft 中日志条目都遵循着从领导人发送给其他人这一个方向:追加条目 RPC 是向外发送的。在 VR 中,日志条目的流动是双向的(领导人可以在选举过程中接收日志);这就导致了额外的机制和复杂性。根据 ZooKeeper 公开的资料看,它的日志条目也是双向传输的,但是它的实现更像 Raft。

和上述我们提及的其他基于一致性的日志复制算法中,Raft 的消息类型更少。例如,我们数了一下 VR 和 ZooKeeper 使用的用来基本一致性需要和成员改变的消息数(排除了日志压缩和客户端交互,因为这些都比较独立且和算法关系不大)。VR 和 ZooKeeper 都分别定义了 10 中不同的消息类型,相对的,Raft 只有 4 中消息类型(两种 RPC 请求和对应的响应)。Raft 的消息都稍微比其他算法的要信息量大,但是都很简单。另外,VR 和 ZooKeeper 都在领导人改变时传输了整个日志;所以为了能够实践中使用,额外的消息类型就很必要了。

Raft 的强领导人模型简化了整个算法,但是同时也排斥了一些性能优化的方法。例如,平等主义 Paxos (EPaxos)在某些没有领导人的情况下可以达到很高的性能。平等主义 Paxos 充分发挥了在状态机指令中的交换性。任何服务器都可以在一轮通信下就提交指令,除非其他指令同时被提出了。然而,如果指令都是并发的被提出,并且互相之间不通信沟通,那么 EPaxos 就需要额外的一轮通信。因为任何服务器都可以提交指令,所以 EPaxos 在服务器之间的负载均衡做的很好,并且很容易在 WAN 网络环境下获得很低的延迟。但是,他在 Paxos 上增加了非常明显的复杂性。

一些集群成员变换的方法已经被提出或者在其他的工作中被实现,包括 Lamport 的原始的讨论,VR 和 SMART。我们选择使用联结一致(joint consensus)的方法因为他对一致性协议的其他部分影响很小,这样我们只需要很少的一些机制就可以实现成员变换。Lamport 的基于 α 的方法之所以没有被 Raft 选择是因为它假设在没有领导人的情况下也可以达到一致性。和 VR 和 SMART 相比较,Raft 的重新配置算法可以在不限制正常请求处理的情况下进行;相比较的,VR 需要停止所有的处理过程,SMART 引入了一个和 α 类似的方法,限制了请求处理的数量。Raft 的方法同时也需要更少的额外机制来实现,和 VR、SMART 比较而言。

11 结论

算法的设计通常会把正确性,效率或者简洁作为主要的目标。尽管这些都是很有意义的目标,但是我们相信,可理解性也是一样的重要。在开发者把算法应用到实际的系统中之前,这些目标没有一个会被实现,这些都会必然的偏离发表时的形式。除非开发人员对这个算法有着很深的理解并且有着直观的感觉,否则将会对他们而言很难在实现的时候保持原有期望的特性。

在这篇论文中,我们尝试解决分布式一致性问题,但是一个广为接受但是十分令人费解的算法 Paxos 已经困扰了无数学生和开发者很多年了。我们创造了一种新的算法 Raft,显而易见的比 Paxos 要容易理解。我们同时也相信,Raft 也可以为实际的实现提供坚实的基础。把可理解性作为设计的目标改变了我们设计 Raft 的方式;随着设计的进展,我们发现自己重复使用了一些技术,比如分解问题和简化状态空间。这些技术不仅提升了 Raft 的可理解性,同时也使我们坚信其正确性。

12 感谢

这项研究必须感谢以下人员的支持:Ali Ghodsi,David Mazie`res,和伯克利 CS 294-91 课程、斯坦福 CS 240 课程的学生。Scott Klemmer 帮我们设计了用户调查,Nelson Ray 建议我们进行统计学的分析。在用户调查时使用的关于 Paxos 的幻灯片很大一部分是从 Lorenzo Alvisi 的幻灯片上借鉴过来的。特别的,非常感谢 DavidMazieres 和 Ezra Hoch,他们找到了 Raft 中一些难以发现的漏洞。许多人提供了关于这篇论文十分有用的反馈和用户调查材料,包括 Ed Bugnion,Michael Chan,Hugues Evrard,Daniel Giffin,Arjun Gopalan,Jon Howell,Vimalkumar Jeyakumar,Ankita Kejriwal,Aleksandar Kracun,Amit Levy,Joel Martin,Satoshi Matsushita,Oleg Pesok,David Ramos,Robbert van Renesse,Mendel Rosenblum,Nicolas Schiper,Deian Stefan,Andrew Stone,Ryan Stutsman,David Terei,Stephen Yang,Matei Zaharia 以及 24 位匿名的会议审查人员(可能有重复),并且特别感谢我们的领导人 Eddie Kohler。Werner Vogels 发了一条早期草稿链接的推特,给 Raft 带来了极大的关注。我们的工作由 Gigascale 系统研究中心和 Multiscale 系统研究中心给予支持,这两个研究中心由关注中心研究程序资金支持,一个是半导体研究公司的程序,由 STARnet 支持,一个半导体研究公司的程序由 MARCO 和 DARPA 支持,在国家科学基金会的 0963859 号批准,并且获得了来自 Facebook,Google,Mellanox,NEC,NetApp,SAP 和 Samsung 的支持。Diego Ongaro 由 Junglee 公司,斯坦福的毕业团体支持。

Paxos和Raft类比

用2个小故事场景,来对比Paxos和Raft,Paxos和Raft都是非拜占庭算法,所以就不要考虑故事中的作弊、欺骗等问题。

故事背景

小岛上有个村子,村里的大事必须进行提案,如果想让所有村民都执行同1个提案,必须得到大部门村民的同意,但是,它们有2种达成一致的办法,分别称为Paxos和Raft。

Paxos

Paxos是一种原始的民主,每一个人可以提案,提案人把提案发送给所有村民,村民投赞成或者反对,提案人收集投票,收到一半投票的时候,把投票结果分发给所有人,所有人执行提案。

但是,因为每个人都可以提案,假如有个村民提案了,在村民执行提案之前,收到其他村民的新提案,老提案就会被打断,被废弃掉了。如果这种情况持续,就一直没有办法达成一致了。

Raft

Raft是一种进化的民主,村民可以选举,村民都选举个村长,让村长来提案,大家来投票就行,当村长收到大多数村民,对提案的投票时,就告诉村民,某个提案通过了,大家都执行提案。

如果村长不在了,村民还可以选举一个新的村长。

无论是Paxos还是Raft,当参与投票的村民达不到半数时,都无法对提案达成一致。

参考

联盟链中动态加入组织是很正常的一件事,但联盟链不会像公链那样,可以自由加入和退出,所以,加入是要费一般功夫的。

需要做以下几件事情:

  1. 生成新组织的证书和在要加入通道中的配置
  2. 拉取要加入通道的配置,根据新组织通道中的配置和通道配置,最终生成更新通道配置的交易,pb格式
  3. 根据通道配置更新策略,让组织节点对交易签名,然后发送更新配置交易到排序节点,并打包上链
  4. 新组织利用通道创世块加入通道
  5. 可选:新组织设置本组织在通道中的锚节点

生成org3的证书和在通道中的配置

生成新组织的证书,结果在当前目录下生成crypto-config

1
cryptogen generate --config=./org3-crypto.yaml

当前目录下有configtx.yaml,里面是新组织的配置,利用configtxgen生成更新配置的json文件,放到channel的配置文件。

1
2
export FABRIC_CFG_PATH=$PWD
configtxgen -printOrg Org3MSP > ../channel-artifacts/org3.json

拉取最新通道配置

连接cli,修改环境变量,设置Orederer的CA以及通道名称,后续操作会使用

1
2
3
docker exec -it cli bash
export ORDERER_CA=/opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/example.com/orderers/orderer.example.com/msp/tlscacerts/tlsca.example.com-cert.pem
export CHANNEL_NAME=mychannel

拉取当前的配置块,它会把cahnnel的配置,保存到二进制的protobuf文件config_block.pb,保存在当前目录,这个命令默认会拉去最新的配置:

1
peer channel fetch config config_block.pb -o orderer.example.com:7050 -c $CHANNEL_NAME --tls --cafile $ORDERER_CA

peer channel fetch config可以确保获取最新的配置区块,config_block.pb实际是个区块,上述命令日志的最后一行,会显示出当前获取的是哪个区块:

1
2
3
4
5
2019-07-30 09:40:53.047 UTC [msp] GetDefaultSigningIdentity -> DEBU 044 Obtaining default signing identity
2019-07-30 09:40:53.047 UTC [msp] GetDefaultSigningIdentity -> DEBU 045 Obtaining default signing identity
2019-07-30 09:40:53.047 UTC [msp.identity] Sign -> DEBU 046 Sign: plaintext: 0AED060A1508051A0608A5A180EA0522...3849120C0A041A02080212041A020802
2019-07-30 09:40:53.047 UTC [msp.identity] Sign -> DEBU 047 Sign: digest: 2202E1BA573DD47D5F54FA5E022F1ABD78B1DCDBB9CA10C5843A675C031CDAD8
2019-07-30 09:40:53.049 UTC [cli.common] readBlock -> INFO 048 Received block: 2

在byfn中,已经做了几次更改:

  1. 区块0:使用应用通道创世块创建应用通道
  2. 区块1:更新org1的锚节点
  3. 区块2:更新org2的锚节点

所以,通过上诉命令,得到了区块2中保持的配置。

需要把protobuf文件转换为人可读的JSON配置,configtxlator把protobuf转换为JSON,jq是一个处理JSON的工具,这里是把配置相关的数据读出来,存到config.json中,因为配置块里不仅仅包含配置:

1
configtxlator proto_decode --input config_block.pb --type common.Block | jq .data.data[0].payload.data.config > config.json

配置文件内容700+行,此处省略,具体可看这利用工具解析fabric区块:应用通道创世块

利用通道配置和org3配置生成更新通道的配置

config.json中包含了各组织的信息,已经包含了org1和org2的,现在要把org3.json加入到config.json中:

1
jq -s '.[0] * {"channel_group":{"groups":{"Application":{"groups": {"Org3MSP":.[1]}}}}}' config.json ./channel-artifacts/org3.json > modified_config.json

上面的命令类似格式化输出,-s指定了格式,.[0],.[1]代表了第1个参数和第2个参数,效果就是增加了更org1、org2平级的org3的配置,然后保存到新的配置文件modified_config.json,可以使用diff对比。

1
2
3
4
5
6
7
8
9
diff config.json modified_config.json
> },
> "Org3MSP": {
> "groups": {},
> "mod_policy": "Admins",
...
> }
> },
> "version": "0"

接下来要做3件事:

  1. config.json -> config.pb
  2. modified_config.json -> modified_config.pb
  3. modified_config.pb - config.pb -> org3_update.pb

目的是利用pb类型的配置文件的差集,生成升级配置的配置文件org3_update.pb

1
2
3
configtxlator proto_encode --input config.json --type common.Config --output config.pb
configtxlator proto_encode --input modified_config.json --type common.Config --output modified_config.pb
configtxlator compute_update --channel_id $CHANNEL_NAME --original config.pb --updated modified_config.pb --output org3_update.pb

然而,org3_update.pb也不是能直接用来升级的,但是距离升级又进了一步,再在它外面封装一层,就可以用来升级了。但是,封装这一层,需要使用json来处理,所以需要把org3_update.pb转为json格式,封装完成,再转为pb格式。

1
2
3
configtxlator proto_decode --input org3_update.pb --type common.ConfigUpdate | jq . > org3_update.json
echo '{"payload":{"header":{"channel_header":{"channel_id":"mychannel", "type":2}},"data":{"config_update":'$(cat org3_update.json)'}}}' | jq . > org3_update_in_envelope.json
configtxlator proto_encode --input org3_update_in_envelope.json --type common.Envelope --output org3_update_in_envelope.pb

org3_update_in_envelope.pb就是用来升级的。

搞了这么一大圈,怎么不用modified_config.json和config.json直接来生成org3_update.json?

对更新配置进行签名

通道的默认修改策略是MAJORITY,即需要多数的org的Admin账号进行签名。

因为cli中的环境变量就是设置的org1 admin的,所以可以直接签名:

1
peer channel signconfigtx -f org3_update_in_envelope.pb

导出org2的环境变量设置,然后重新执行签名命令:

1
2
3
4
export CORE_PEER_LOCALMSPID="Org2MSP"
export CORE_PEER_TLS_ROOTCERT_FILE=/opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.example.com/peers/peer0.org2.example.com/tls/ca.crt
export CORE_PEER_MSPCONFIGPATH=/opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.example.com/users/Admin@org2.example.com/msp
export CORE_PEER_ADDRESS=peer0.org2.example.com:9051

签名的结果在哪?就在pb文件里。

发送更新通道配置的交易

1
peer channel update -f org3_update_in_envelope.pb -c $CHANNEL_NAME -o orderer.example.com:7050 --tls --cafile $ORDERER_CA

交易提交成功:

1
2019-07-31 06:17:58.376 UTC [channelCmd] update -> INFO 04d Successfully submitted channel update

配置选举以接收区块

新加入的组织节点只能使用genesis区块启动,创世块不包含他们已经加入到通道的配置,所以它们无法利用gossip验证它们从本组织其他peer发来的区块,直到他们接收到了它们加入通道的配置交易。所以新加入的节点必须,配置它们从哪接收区块的排序服务。

如果利用的静态leader模式,使用如下配置:

1
2
CORE_PEER_GOSSIP_USELEADERELECTION=false
CORE_PEER_GOSSIP_ORGLEADER=true

否则如果是动态leader,使用如下配置:

1
2
CORE_PEER_GOSSIP_USELEADERELECTION=true
CORE_PEER_GOSSIP_ORGLEADER=false

这样新组织的peer都宣称自己是leader,加速了获取区块,等他们接收到它们自己的配置交易,就会只有1个leader peer代表本组织。

启动新组织节点

启动容器

指定组织的compose文件,启动组织的容器。这会创建3个容器,peer1.org3, peer2.org3和org3cli,其中org3cli是为org3特制的cli,里面已经设置好了org3的环境变量。

1
docker-compose -f docker-compose-org3.yaml up -d

后续都是连接到org3cli进行操作。

设置orderer的信息。

1
export ORDERER_CA=/opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/example.com/orderers/orderer.example.com/msp/tlscacerts/tlsca.example.com-cert.pem && export CHANNEL_NAME=mychannel

拉取mychannel的创世块

peer channel fetch 0指拉去0号区块,也就是mychannel的创世区块,保存到mychannel.block。

1
peer channel fetch 0 mychannel.block -o orderer.example.com:7050 -c $CHANNEL_NAME --tls --cafile $ORDERER_CA

加入通道

1
peer channel join -b mychannel.block

设置新组织锚节点

设置锚节点也需要更新通道配置,但流程与初始的锚节点不太一致,因为新组织的配置不在configtx.yanml中,设置新组织锚节点跟添加新组织的流程一样,具体见:
Updating the Channel Config to include an Org3 Anchor Peer (Optional)

参考资料

  1. 官方文档
  2. first network项目中的eyfn,运行脚本可以看到具体流程。