// Mined broadcast loop func(pm *ProtocolManager)minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range pm.minedBlockSub.Chan() { switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: pm.BroadcastBlock(ev.Block, true) // First propagate block to peers pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } }
// BroadcastBlock will either propagate a block to a subset of it's peers, or // will only announce it's availability (depending what's requested). func(pm *ProtocolManager)BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash)
// If propagation is requested, send to a subset of the peer // 这种情况,要把区块广播给部分peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) // 计算新的总难度 var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers // 广播区块给部分peer transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it // 把区块hash值广播给所有peer if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.AsyncSendNewBlockHash(block) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } }
// broadcast is a write loop that multiplexes block propagations, announcements // and transaction broadcasts into the remote peer. The goal is to have an async // writer that does not lock up node internals. func(p *peer)broadcast() { for { select { // 广播交易 case txs := <-p.queuedTxs: if err := p.SendTransactions(txs); err != nil { return } p.Log().Trace("Broadcast transactions", "count", len(txs)) // 广播完整的新区块 case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
case msg.Code == NewBlockMsg: // Retrieve and decode the propagated block // 收到新区块,解码,赋值接收数据 var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } request.Block.ReceivedAt = msg.ReceivedAt request.Block.ReceivedFrom = p
// Mark the peer as owning the block and schedule it for import // 标记peer知道这个区块 p.MarkBlock(request.Block.Hash()) // 为啥要如队列?已经得到完整的区块了 // 答:存入fetcher的优先级队列,fetcher会从队列中选取当前高度需要的块 pm.fetcher.Enqueue(p.id, request.Block)
// Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. // 截止到parent区块的头和难度 var ( trueHead = request.Block.ParentHash() trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) ) // Update the peers total difficulty if better than the previous // 如果收到的块的难度大于peer之前的,以及自己本地的,就去和这个peer同步 // 问题:就只用了一下块里的hash值,为啥不直接使用这个块呢,如果这个块不能用,干嘛不少发送些数据,减少网络负载呢。 // 答案:实际上,这个块加入到了优先级队列中,当fetcher的loop检查到当前下一个区块的高度,正是队列中有的,则不再向peer请求 // 该区块,而是直接使用该区块,检查无误后交给block chain执行insertChain if _, td := p.Head(); trueTD.Cmp(td) > 0 { p.SetHead(trueHead, trueTD)
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of // a singe block (as the true TD is below the propagated block), however this // scenario should easily be covered by the fetcher. currentBlock := pm.blockchain.CurrentBlock() if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { go pm.synchronise(p) } } //------------------------ 以上 handleMsg
// Enqueue tries to fill gaps the the fetcher's future import queue. // 发给inject通道,当前协程在handleMsg,通过通道发送给fetcher的协程处理 func(f *Fetcher)Enqueue(peer string, block *types.Block)error { op := &inject{ origin: peer, block: block, } select { case f.inject <- op: returnnil case <-f.quit: return errTerminated } }
//------------------------ 以下 fetcher.loop处理inject部分 case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps // 区块加入队列,首先也填入未决的间距 propBroadcastInMeter.Mark(1) f.enqueue(op.origin, op.block)
//------------------------ 如队列函数
// enqueue schedules a new future import operation, if the block to be imported // has not yet been seen. // 把导入的新区块加入到队列,主要操作queue, queues, queued这3个变量,quque用来保存要插入的区块, // 按高度排序,queues记录了在队列中某个peer传来的区块的数量,用来做对抗DoS攻击,queued用来 // 判断某个区块是否已经在队列,防止2次插入,浪费时间 func(f *Fetcher)enqueue(peer string, block *types.Block) { hash := block.Hash()
// Ensure the peer isn't DOSing us // 防止peer的DOS攻击 count := f.queues[peer] + 1 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) propBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks // 高度检查:未来太远的块丢弃 if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) propBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } // Schedule the block for future importing // 块先加入优先级队列,加入链之前,还有很多要做 if _, ok := f.queued[hash]; !ok { op := &inject{ origin: peer, block: block, } f.queues[peer] = count f.queued[hash] = op f.queue.Push(op, -float32(block.NumberU64())) if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), true) } log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) } }
// Loop is the main fetcher loop, checking and processing various notification // events. func(f *Fetcher)loop() { // Iterate the block fetching until a quit is requested fetchTimer := time.NewTimer(0) completeTimer := time.NewTimer(0)
for { // Clean up any expired block fetches // 清理过期的区块 for hash, announce := range f.fetching { if time.Since(announce.time) > fetchTimeout { f.forgetHash(hash) } } // Import any queued blocks that could potentially fit // 导入队列中合适的块 height := f.chainHeight() for !f.queue.Empty() { op := f.queue.PopItem().(*inject) hash := op.block.Hash() if f.queueChangeHook != nil { f.queueChangeHook(hash, false) } // If too high up the chain or phase, continue later // 块不是链需要的下一个块,再入优先级队列,停止循环 number := op.block.NumberU64() if number > height+1 { f.queue.Push(op, -float32(number)) if f.queueChangeHook != nil { f.queueChangeHook(hash, true) } break } // Otherwise if fresh and still unknown, try and import // 高度正好是我们想要的,并且链上也没有这个块 if number+maxUncleDist < height || f.getBlock(hash) != nil { f.forgetBlock(hash) continue } // 那么,块插入链 f.insert(op.origin, op.block) } //省略 } }
// Run the import on a new thread log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash) gofunc() { deferfunc() { f.done <- hash }()
// If the parent's unknown, abort insertion parent := f.getBlock(block.ParentHash()) if parent == nil { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } // Quickly validate the header and propagate the block if it passes // 验证区块头,成功后广播区块 switch err := f.verifyHeader(block.Header()); err { casenil: // All ok, quickly propagate to our peers propBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true)
case consensus.ErrFutureBlock: // Weird future block, don't fail, but neither propagate
default: // Something went very wrong, drop the peer log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } // Run the actual import and log any issues // 调用回调函数,实际是blockChain.insertChain if _, err := f.insertChain(types.Blocks{block}); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false)
// Invoke the testing hook if needed if f.importedHook != nil { f.importedHook(block) } }() }
// handleMsg()部分 case msg.Code == NewBlockHashesMsg: var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } // Mark the hashes as present at the remote node for _, block := range announces { p.MarkBlock(block.Hash) } // Schedule all the unknown hashes for retrieval // 把本地链没有的块hash找出来,交给fetcher去下载 unknown := make(newBlockHashesData, 0, len(announces)) for _, block := range announces { if !pm.blockchain.HasBlock(block.Hash, block.Number) { unknown = append(unknown, block) } } for _, block := range unknown { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) }
// Notify announces the fetcher of the potential availability of a new block in // the network. // 通知fetcher(自己)有新块产生,没有块实体,有hash、高度等信息 func(f *Fetcher)Notify(peer string, hash common.Hash, number uint64, time time.Time, headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn)error { block := &announce{ hash: hash, number: number, time: time, origin: peer, fetchHeader: headerFetcher, fetchBodies: bodyFetcher, } select { case f.notify <- block: returnnil case <-f.quit: return errTerminated } }
// Fetcher is responsible for accumulating block announcements from various peers // and scheduling them for retrieval. // 积累块通知,然后调度获取这些块 type Fetcher struct { // Various event channels // 收到区块hash值的通道 notify chan *announce // 收到完整区块的通道 inject chan *inject
// Announce states // Peer已经给了本节点多少区块头通知 announces map[string]int// Per peer announce counts to prevent memory exhaustion // 已经announced的区块列表 announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching // 正在fetching区块头的请求 fetching map[common.Hash]*announce // Announced blocks, currently fetching // 已经fetch到区块头,还差body的请求,用来获取body fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval // 已经得到区块头的 completing map[common.Hash]*announce // Blocks with headers, currently body-completing
// Block cache // queue,优先级队列,高度做优先级 // queues,queued队列中某个peer发来的区块数量 // queued,等待插入到区块链的区块,实际插入时从queue取,queued就是用来快速判断区块是否在队列的 queue *prque.Prque // Queue containing the import operations (block number sorted) queues map[string]int// Per peer block counts to prevent memory exhaustion queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
// Callbacks getBlock blockRetrievalFn // Retrieves a block from the local chain verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work,验证区块头,包含了PoW验证 broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers,广播给peer chainHeight chainHeightFn // Retrieves the current chain's height insertChain chainInsertFn // Injects a batch of blocks into the chain,插入区块到链的函数 dropPeer peerDropFn // Drops a peer for misbehaving
case <-fetchTimer.C: // At least one block's timer ran out, check for needing retrieval // 有区块通知,去处理 request := make(map[string][]common.Hash)
for hash, announces := range f.announced { if time.Since(announces[0].time) > arriveTimeout-gatherSlack { // Pick a random peer to retrieve from, reset all others // 可能有很多peer都发送了这个区块的hash值,随机选择一个peer announce := announces[rand.Intn(len(announces))] f.forgetHash(hash)
// If the block still didn't arrive, queue for fetching // 本地还没有这个区块,创建获取区块的请求 if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.fetching[hash] = announce } } } // Send out all block header requests // 把所有的request发送出去 // 为每一个peer都创建一个协程,然后请求所有需要从该peer获取的请求 for peer, hashes := range request { log.Trace("Fetching scheduled headers", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes gofunc() { if f.fetchingHook != nil { f.fetchingHook(hashes) } for _, hash := range hashes { headerFetchMeter.Mark(1) fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals } }() } // Schedule the next fetch if blocks are still pending f.rescheduleFetch(fetchTimer)
// RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func(p *peer)RequestOneHeader(hash common.Hash)error { p.Log().Debug("Fetching single header", "hash", hash) return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) }
// handleMsg() // Block header query, collect the requested headers and reply case msg.Code == GetBlockHeadersMsg: // Decode the complex header query var query getBlockHeadersData if err := msg.Decode(&query); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } hashMode := query.Origin.Hash != (common.Hash{})
// Gather headers until the fetch or network limits is reached // 收集区块头,直到达到限制 var ( bytes common.StorageSize headers []*types.Header unknown bool ) // 自己已知区块 && 少于查询的数量 && 大小小于2MB && 小于能下载的最大数量 for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { // Retrieve the next header satisfying the query // 获取区块头 var origin *types.Header if hashMode { // fetcher 使用的模式 origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) } else { origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) } if origin == nil { break } number := origin.Number.Uint64() headers = append(headers, origin) bytes += estHeaderRlpSize
// Advance to the next header of the query // 下一个区块头的获取,不同策略,方式不同 switch { case query.Origin.Hash != (common.Hash{}) && query.Reverse: // ... } } return p.SendBlockHeaders(headers)
// handleMsg() case msg.Code == BlockHeadersMsg: // A batch of headers arrived to one of our previous requests var headers []*types.Header if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // If no headers were received, but we're expending a DAO fork check, maybe it's that // 检查是不是当前DAO的硬分叉 iflen(headers) == 0 && p.forkDrop != nil { // Possibly an empty reply to the fork header checks, sanity check TDs verifyDAO := true
// If we already have a DAO header, we can check the peer's TD against it. If // the peer's ahead of this, it too must have a reply to the DAO check if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { verifyDAO = false } } // If we're seemingly on the same chain, disable the drop timer if verifyDAO { p.Log().Debug("Seems to be on the same side of the DAO fork") p.forkDrop.Stop() p.forkDrop = nil returnnil } } // Filter out any explicitly requested headers, deliver the rest to the downloader // 过滤是不是fetcher请求的区块头,去掉fetcher请求的区块头再交给downloader filter := len(headers) == 1 if filter { // If it's a potential DAO fork check, validate against the rules // 检查是否硬分叉 if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { // Disable the fork drop timer p.forkDrop.Stop() p.forkDrop = nil
// Validate the header and either drop the peer or continue if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") return err } p.Log().Debug("Verified to be on the same side of the DAO fork") returnnil } // Irrelevant of the fork checks, send the header to the fetcher just in case // 使用fetcher过滤区块头 headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) } // 剩下的区块头交给downloader iflen(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { log.Debug("Failed to deliver headers", "err", err) } }
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // returning those that should be handled differently. // 寻找出fetcher请求的区块头 func(f *Fetcher)FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
// Send the filter channel to the fetcher // 任务通道 filter := make(chan *headerFilterTask)
select { // 任务通道发送到这个通道 case f.headerFilter <- filter: case <-f.quit: returnnil } // Request the filtering of the header list // 创建过滤任务,发送到任务通道 select { case filter <- &headerFilterTask{peer: peer, headers: headers, time: time}: case <-f.quit: returnnil } // Retrieve the headers remaining after filtering // 从任务通道,获取过滤的结果并返回 select { case task := <-filter: return task.headers case <-f.quit: returnnil } }
// fetcher.loop() case filter := <-f.headerFilter: // Headers arrived from a remote peer. Extract those that were explicitly // requested by the fetcher, and return everything else so it's delivered // to other parts of the system. // 收到从远端节点发送的区块头,过滤出fetcher请求的 // 从任务通道获取过滤任务 var task *headerFilterTask select { case task = <-filter: case <-f.quit: return } headerFilterInMeter.Mark(int64(len(task.headers)))
// Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. // unknown的不是fetcher请求的,complete放没有交易和uncle的区块,有头就够了,incomplete放 // 还需要获取uncle和交易的区块 unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} // 遍历所有收到的header for _, header := range task.headers { hash := header.Hash()
// Filter fetcher-requested headers from other synchronisation algorithms // 是正在获取的hash,并且对应请求的peer,并且未fetched,未completing,未queued if announce := f.fetching[hash]; announce != nil && announce.origin == task.peer && f.fetched[hash] == nil && f.completing[hash] == nil && f.queued[hash] == nil { // If the delivered header does not match the promised number, drop the announcer // 高度校验,竟然不匹配,扰乱秩序,peer肯定是坏蛋。 if header.Number.Uint64() != announce.number { log.Trace("Invalid block number fetched", "peer", announce.origin, "hash", header.Hash(), "announced", announce.number, "provided", header.Number) f.dropPeer(announce.origin) f.forgetHash(hash) continue } // Only keep if not imported by other means // 本地链没有当前区块 if f.getBlock(hash) == nil { announce.header = header announce.time = task.time
// If the block is empty (header only), short circuit into the final import queue // 如果区块没有交易和uncle,加入到complete if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { log.Trace("Block empty, skipping body retrieval", "peer", announce.origin, "number", header.Number, "hash", header.Hash())
// fetcher.loop() case <-completeTimer.C: // At least one header's timer ran out, retrieve everything // 至少有1个header已经获取完了 request := make(map[string][]common.Hash)
// 遍历所有待获取body的announce for hash, announces := range f.fetched { // Pick a random peer to retrieve from, reset all others // 随机选一个Peer发送请求,因为可能已经有很多Peer通知它这个区块了 announce := announces[rand.Intn(len(announces))] f.forgetHash(hash)
// If the block still didn't arrive, queue for completion // 如果本地没有这个区块,则放入到completing,创建请求 if f.getBlock(hash) == nil { request[announce.origin] = append(request[announce.origin], hash) f.completing[hash] = announce } } // Send out all block body requests // 发送所有的请求,获取body,依然是每个peer一个单独协程 for peer, hashes := range request { log.Trace("Fetching scheduled bodies", "peer", peer, "list", hashes)
// Create a closure of the fetch and schedule in on a new thread if f.completingHook != nil { f.completingHook(hashes) } bodyFetchMeter.Mark(int64(len(hashes))) go f.completing[hashes[0]].fetchBodies(hashes) } // Schedule the next fetch if blocks are still pending f.rescheduleComplete(completeTimer)
// handleMsg() case msg.Code == BlockBodiesMsg: // A batch of block bodies arrived to one of our previous requests var request blockBodiesData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Deliver them all to the downloader for queuing // 传递给downloader去处理 transactions := make([][]*types.Transaction, len(request)) uncles := make([][]*types.Header, len(request))
for i, body := range request { transactions[i] = body.Transactions uncles[i] = body.Uncles } // Filter out any explicitly requested bodies, deliver the rest to the downloader // 先让fetcher过滤去fetcher请求的body,剩下的给downloader filter := len(transactions) > 0 || len(uncles) > 0 if filter { transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) }
// FilterBodies extracts all the block bodies that were explicitly requested by // the fetcher, returning those that should be handled differently. // 过去出fetcher请求的body,返回它没有处理的,过程类型header的处理 func(f *Fetcher)FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time)([][]*types.Transaction, [][]*types.Header) { log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
// Send the filter channel to the fetcher filter := make(chan *bodyFilterTask)
select { case f.bodyFilter <- filter: case <-f.quit: returnnil, nil } // Request the filtering of the body list select { case filter <- &bodyFilterTask{peer: peer, transactions: transactions, uncles: uncles, time: time}: case <-f.quit: returnnil, nil } // Retrieve the bodies remaining after filtering select { case task := <-filter: return task.transactions, task.uncles case <-f.quit: returnnil, nil } }
case filter := <-f.bodyFilter: // Block bodies arrived, extract any explicitly requested blocks, return the rest var task *bodyFilterTask select { case task = <-filter: case <-f.quit: return } bodyFilterInMeter.Mark(int64(len(task.transactions)))
blocks := []*types.Block{} // 获取的每个body的txs列表和uncle列表 // 遍历每个区块的txs列表和uncle列表,计算hash后判断是否是当前fetcher请求的body for i := 0; i < len(task.transactions) && i < len(task.uncles); i++ { // Match up a body to any possible completion request matched := false
// 遍历所有保存的请求,因为tx和uncle,不知道它是属于哪个区块的,只能去遍历所有的请求,通常量不大,所以遍历没有性能影响 for hash, announce := range f.completing { if f.queued[hash] == nil { // 把传入的每个块的hash和unclehash和它请求出去的记录进行对比,匹配则说明是fetcher请求的区块body txnHash := types.DeriveSha(types.Transactions(task.transactions[i])) uncleHash := types.CalcUncleHash(task.uncles[i])
if txnHash == announce.header.TxHash && uncleHash == announce.header.UncleHash && announce.origin == task.peer { // Mark the body matched, reassemble if still unknown matched = true