区块链教程Fabric1.0源代码分析Peer peer node start命令实现
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了区块链教程Fabric1.0源代码分析Peer peer node start命令实现,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含17292字,纯文字阅读大概需要25分钟。
内容图文
区块链教程Fabric1.0源代码分析Peer peer node start命令实现,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁、回归理性,表面上看相关人才需求与身价似乎正在回落。但事实上,正是初期泡沫的渐退,让人们更多的关注点放在了区块链真正的技术之上。
Fabric 1.0源代码笔记 之 Peer #peer node start命令实现
1、peer node加载子命令start和status
peer node加载子命令start和status,代码如下:
func Cmd() *cobra.Command {
????nodeCmd.AddCommand(startCmd()) //加载子命令start
????nodeCmd.AddCommand(statusCmd()) //加载子命令status
????return nodeCmd
}
var nodeCmd = &cobra.Command{
????Use: nodeFuncName,
????Short: fmt.Sprint(shortDes),
????Long: fmt.Sprint(longDes),
}
//代码在peer/node/node.go
startCmd()代码如下:
其中serve(args)为peer node start的实现代码,比较复杂,本文将重点讲解。
另statusCmd()代码与startCmd()相近,暂略。
func startCmd() *cobra.Command {
????flags := nodeStartCmd.Flags()
????flags.BoolVarP(&chaincodeDevMode, "peer-chaincodedev", "", false, "Whether peer in chaincode development mode")
????flags.BoolVarP(&peerDefaultChain, "peer-defaultchain", "", false, "Whether to start peer with chain testchainid")
????flags.StringVarP(&orderingEndpoint, "orderer", "o", "orderer:7050", "Ordering service endpoint") //orderer
????return nodeStartCmd
}
var nodeStartCmd = &cobra.Command{
????Use: "start",
????Short: "Starts the node.",
????Long: `Starts a node that interacts with the network.`,
????RunE: func(cmd *cobra.Command, args []string) error {
????????return serve(args) //serve(args)为peer node start的实现代码
????},
}
//代码在peer/node/start.go
注:如下内容均为serve(args)的代码,即peer node start命令执行流程。
2、初始化Ledger(账本)
初始化账本,即如下一条代码:
ledgermgmt.Initialize()
//代码在peer/node/start.go
ledgermgmt.Initialize()代码展开如下:
func initialize() {
????openedLedgers = make(map[string]ledger.PeerLedger) //openedLedgers为全局变量,存储目前使用的账本列表
????provider, err := kvledger.NewProvider() //创建账本Provider实例
????ledgerProvider = provider
}
//代码在core/ledger/ledgermgmt/ledger_mgmt.go
kvledger.NewProvider()代码如下:
func NewProvider() (ledger.PeerLedgerProvider, error) {
????idStore := openIDStore(ledgerconfig.GetLedgerProviderPath()) //打开idStore
????//创建并初始化blkstorage
????attrsToIndex := []blkstorage.IndexableAttr{
????????blkstorage.IndexableAttrBlockHash,
????????blkstorage.IndexableAttrBlockNum,
????????blkstorage.IndexableAttrTxID,
????????blkstorage.IndexableAttrBlockNumTranNum,
????????blkstorage.IndexableAttrBlockTxID,
????????blkstorage.IndexableAttrTxValidationCode,
????}
????indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
????blockStoreProvider := fsblkstorage.NewProvider(
????????fsblkstorage.NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()),
????????indexConfig)
????//创建并初始化statedb
????var vdbProvider statedb.VersionedDBProvider
????if !ledgerconfig.IsCouchDBEnabled() {
????????vdbProvider = stateleveldb.NewVersionedDBProvider()
????} else {
????????vdbProvider, err = statecouchdb.NewVersionedDBProvider()
????}
????//创建并初始化historydb
????var historydbProvider historydb.HistoryDBProvider
????historydbProvider = historyleveldb.NewHistoryDBProvider()
????//构造Provider
????provider := &Provider{idStore, blockStoreProvider, vdbProvider, historydbProvider}
????provider.recoverUnderConstructionLedger()
????return provider, nil
}
//代码在core/ledger/kvledger/kv_ledger_provider.go
Ledger更详细内容,参考:Fabric 1.0源代码笔记 之 Ledger(账本)
3、配置及创建PeerServer、创建EventHubServer(事件中心服务器)
//初始化全局变量localAddress和peerEndpoint
err := peer.CacheConfiguration()
peerEndpoint, err := peer.GetPeerEndpoint() //获取peerEndpoint
listenAddr := viper.GetString("peer.listenAddress") //PeerServer监听地址
secureConfig, err := peer.GetSecureConfig() //获取PeerServer安全配置,是否启用TLS、公钥、私钥、根证书
//以监听地址和安全配置,创建Peer GRPC Server
peerServer, err := peer.CreatePeerServer(listenAddr, secureConfig)
//创建EventHubServer(事件中心服务器)
ehubGrpcServer, err := createEventHubServer(secureConfig)
//代码在peer/node/start.go
func createEventHubServer(secureConfig comm.SecureServerConfig) (comm.GRPCServer, error)代码如下:
var lis net.Listener
var err error
lis, err = net.Listen("tcp", viper.GetString("peer.events.address")) //创建Listen
grpcServer, err := comm.NewGRPCServerFromListener(lis, secureConfig) //从Listen创建GRPCServer
ehServer := producer.NewEventsServer(
????uint(viper.GetInt("peer.events.buffersize")), //最大进行缓冲的消息数
????viper.GetDuration("peer.events.timeout")) //缓冲已满的情况下,往缓冲中发送消息的超时
pb.RegisterEventsServer(grpcServer.Server(), ehServer) //EventHubServer注册至grpcServer
return grpcServer, nil
//代码在peer/node/start.go
pb.RegisterEventsServer(grpcServer.Server(), ehServer)代码如下:
func RegisterEventsServer(s *grpc.Server, srv EventsServer) {
????s.RegisterService(&_Events_serviceDesc, srv)
}
//代码在protos/peer/events.pb.go
events(事件服务)更详细内容,参考:Fabric 1.0源代码笔记 之 events(事件服务)
4、创建并启动Chaincode Server,并注册系统链码
代码如下:
ccprovider.EnableCCInfoCache() //ccInfoCacheEnabled = true
//创建Chaincode Server
//如果peer.chaincodeListenAddress,没有定义或者定义和peerListenAddress相同,均直接使用peerServer
//否则另行创建NewGRPCServer用于Chaincode Server
ccSrv, ccEpFunc := createChaincodeServer(peerServer, listenAddr)
//Chaincode service注册到grpcServer,并注册系统链码
registerChaincodeSupport(ccSrv.Server(), ccEpFunc)
go ccSrv.Start() //启动grpcServer
//代码在peer/node/start.go
ccSrv, ccEpFunc := createChaincodeServer(peerServer, listenAddr)代码如下:创建Chaincode Server。
func createChaincodeServer(peerServer comm.GRPCServer, peerListenAddress string) (comm.GRPCServer, ccEndpointFunc) {
????//peer.chaincodeListenAddress,链码容器连接时的监听地址
????cclistenAddress := viper.GetString("peer.chaincodeListenAddress")
????var srv comm.GRPCServer
????var ccEpFunc ccEndpointFunc
????
????//如果peer.chaincodeListenAddress,没有定义或者定义和peerListenAddress相同,均直接使用peerServer
????//否则另行创建NewGRPCServer用于Chaincode Server
????if cclistenAddress == "" {
????????ccEpFunc = peer.GetPeerEndpoint
????????srv = peerServer
????} else if cclistenAddress == peerListenAddress {
????????ccEpFunc = peer.GetPeerEndpoint
????????srv = peerServer
????} else {
????????config, err := peer.GetSecureConfig()
????????srv, err = comm.NewGRPCServer(cclistenAddress, config)
????????ccEpFunc = getChaincodeAddressEndpoint
????}
????return srv, ccEpFunc
}
//代码在peer/node/start.go
registerChaincodeSupport(ccSrv.Server(), ccEpFunc)代码如下:Chaincode service注册到grpcServer。
func registerChaincodeSupport(grpcServer *grpc.Server, ccEpFunc ccEndpointFunc) {
????userRunsCC := chaincode.IsDevMode() //是否开发模式
????ccStartupTimeout := viper.GetDuration("chaincode.startuptimeout") //启动链码容器的超时
????if ccStartupTimeout < time.Duration(5)*time.Second { //至少5秒
????????ccStartupTimeout = time.Duration(5) * time.Second
????} else {
????}
????//构造ChaincodeSupport
????ccSrv := chaincode.NewChaincodeSupport(ccEpFunc, userRunsCC, ccStartupTimeout)
????scc.RegisterSysCCs() //注册系统链码
????pb.RegisterChaincodeSupportServer(grpcServer, ccSrv) //service注册到grpcServer
}
//代码在peer/node/start.go
ccSrv := chaincode.NewChaincodeSupport(ccEpFunc, userRunsCC, ccStartupTimeout)代码如下:构造ChaincodeSupport。
var theChaincodeSupport *ChaincodeSupport
func NewChaincodeSupport(getCCEndpoint func() (*pb.PeerEndpoint, error), userrunsCC bool, ccstartuptimeout time.Duration) *ChaincodeSupport {
????//即/var/hyperledger/production/chaincodes
????ccprovider.SetChaincodesPath(config.GetPath("peer.fileSystemPath") + string(filepath.Separator) + "chaincodes")
????pnid := viper.GetString("peer.networkId") //网络ID
????pid := viper.GetString("peer.id") //节点ID
????//构造ChaincodeSupport
????theChaincodeSupport = &ChaincodeSupport{runningChaincodes: &runningChaincodes{chaincodeMap: make(map[string]*chaincodeRTEnv), launchStarted: make(map[string]bool)}, peerNetworkID:
pnid, peerID: pid}
????ccEndpoint, err := getCCEndpoint() //此处传入ccEpFunc
????if err != nil {
????????theChaincodeSupport.peerAddress = viper.GetString("chaincode.peerAddress")
????} else {
????????theChaincodeSupport.peerAddress = ccEndpoint.Address
????}
????if theChaincodeSupport.peerAddress == "" {
????????theChaincodeSupport.peerAddress = peerAddressDefault
????}
????theChaincodeSupport.userRunsCC = userrunsCC //是否开发模式
????theChaincodeSupport.ccStartupTimeout = ccstartuptimeout //启动链码容器的超时
????theChaincodeSupport.peerTLS = viper.GetBool("peer.tls.enabled") //是否启用TLS
????if theChaincodeSupport.peerTLS {
????????theChaincodeSupport.peerTLSCertFile = config.GetPath("peer.tls.cert.file")
????????theChaincodeSupport.peerTLSKeyFile = config.GetPath("peer.tls.key.file")
????????theChaincodeSupport.peerTLSSvrHostOrd = viper.GetString("peer.tls.serverhostoverride")
????}
????kadef := 0
????// Peer和链码之间的心跳超时,小于或等于0意味着关闭
????if ka := viper.GetString("chaincode.keepalive"); ka == "" {
????????theChaincodeSupport.keepalive = time.Duration(kadef) * time.Second //0
????} else {
????????t, terr := strconv.Atoi(ka)
????????if terr != nil {
????????????t = kadef //0
????????} else if t <= 0 {
????????????t = kadef //0
????????}
????????theChaincodeSupport.keepalive = time.Duration(t) * time.Second //非0
????}
????execto := time.Duration(30) * time.Second
????//invoke和initialize命令执行超时
????if eto := viper.GetDuration("chaincode.executetimeout"); eto <= time.Duration(1)*time.Second {
????????//小于1秒时,默认30秒
????} else {
????????execto = eto
????}
????theChaincodeSupport.executetimeout = execto
????viper.SetEnvPrefix("CORE")
????viper.AutomaticEnv()
????replacer := strings.NewReplacer(".", "_")
????viper.SetEnvKeyReplacer(replacer)
????theChaincodeSupport.chaincodeLogLevel = getLogLevelFromViper("level")
????theChaincodeSupport.shimLogLevel = getLogLevelFromViper("shim")
????theChaincodeSupport.logFormat = viper.GetString("chaincode.logging.format")
????return theChaincodeSupport
}
//代码在core/chaincode/chaincode_support.go
scc.RegisterSysCCs()代码如下:注册系统链码。
func RegisterSysCCs() {
????//cscc、lscc、escc、vscc、qscc
????for _, sysCC := range systemChaincodes {
????????RegisterSysCC(sysCC)
????}
}
代码在core/scc/importsysccs.go
5、注册Admin server和Endorser server
代码如下:
//s.RegisterService(&_Admin_serviceDesc, srv)
//var _Admin_serviceDesc = grpc.ServiceDesc{...}
//core.NewAdminServer()构造ServerAdmin结构体,ServerAdmin结构体实现type AdminServer interface接口
pb.RegisterAdminServer(peerServer.Server(), core.NewAdminServer())
//构造结构体Endorser,Endorser结构体实现type EndorserServer interface接口
serverEndorser := endorser.NewEndorserServer()
//s.RegisterService(&_Endorser_serviceDesc, srv)
//var _Endorser_serviceDesc = grpc.ServiceDesc{...}
pb.RegisterEndorserServer(peerServer.Server(), serverEndorser)
//代码在peer/node/start.go
附type AdminServer interface接口定义:
type AdminServer interface {
????GetStatus(context.Context, *google_protobuf.Empty) (*ServerStatus, error)
????StartServer(context.Context, *google_protobuf.Empty) (*ServerStatus, error)
????GetModuleLogLevel(context.Context, *LogLevelRequest) (*LogLevelResponse, error)
????SetModuleLogLevel(context.Context, *LogLevelRequest) (*LogLevelResponse, error)
????RevertLogLevels(context.Context, *google_protobuf.Empty) (*google_protobuf.Empty, error)
}
//代码在protos/peer/admin.pb.go
附type EndorserServer interface接口定义:
type EndorserServer interface {
????ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error)
}
//代码在protos/peer/peer.pb.go
6、初始化Gossip服务
bootstrap := viper.GetStringSlice("peer.gossip.bootstrap") //启动节点后gossip连接的初始节点
serializedIdentity, err := mgmt.GetLocalSigningIdentityOrPanic().Serialize() //获取签名身份
messageCryptoService := peergossip.NewMCS( //构造mspMessageCryptoService(消息加密服务)
????peer.NewChannelPolicyManagerGetter(), //构造type channelPolicyManagerGetter struct{}
????localmsp.NewSigner(), //构造type mspSigner struct {}
????mgmt.NewDeserializersManager()) //构造type mspDeserializersManager struct{}
secAdv := peergossip.NewSecurityAdvisor(mgmt.NewDeserializersManager()) //构造mspSecurityAdvisor(安全顾问)
secureDialOpts := func() []grpc.DialOption {
????var dialOpts []grpc.DialOption
????dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(comm.MaxRecvMsgSize()), //MaxRecvMsgSize
????????grpc.MaxCallSendMsgSize(comm.MaxSendMsgSize()))) //MaxSendMsgSize
????dialOpts = append(dialOpts, comm.ClientKeepaliveOptions()...) //ClientKeepaliveOptions
????????
????if comm.TLSEnabled() {
????????tlsCert := peerServer.ServerCertificate()
????????dialOpts = append(dialOpts, grpc.WithTransportCredentials(comm.GetCASupport().GetPeerCredentials(tlsCert)))
????} else {
????????dialOpts = append(dialOpts, grpc.WithInsecure())
????}
????return dialOpts
}
err = service.InitGossipService(serializedIdentity, peerEndpoint.Address, peerServer.Server(),
????messageCryptoService, secAdv, secureDialOpts, bootstrap...) //构造gossipServiceImpl
defer service.GetGossipService().Stop()
//代码在peer/node/start.go
Gossip更详细内容参考:Fabric 1.0源代码笔记 之 gossip(流言算法)
7、初始化、部署并执行系统链码(scc)
代码如下:
initSysCCs() //初始化系统链码,调用scc.DeploySysCCs("")
peer.Initialize(func(cid string) { //初始化所有链
????scc.DeploySysCCs(cid) //按chain id部署并运行系统链码
})
//代码在peer/node/start.go
func Initialize(init func(string))代码如下:
func Initialize(init func(string)) {
????chainInitializer = init
????var cb *common.Block
????var ledger ledger.PeerLedger
????ledgermgmt.Initialize()
????ledgerIds, err := ledgermgmt.GetLedgerIDs()
????for _, cid := range ledgerIds {
????????ledger, err = ledgermgmt.OpenLedger(cid)
????????cb, err = getCurrConfigBlockFromLedger(ledger) //获取最新的配置块
????????err = createChain(cid, ledger, cb)
????????InitChain(cid) //即调用chainInitializer(cid),即scc.DeploySysCCs(cid)
????}
}
//代码在core/peer/peer.go
createChain(cid, ledger, cb)代码如下:
func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block) error {
????envelopeConfig, err := utils.ExtractEnvelope(cb, 0) //获取配置交易
????configtxInitializer := configtx.NewInitializer() //构造initializer
????gossipEventer := service.GetGossipService().NewConfigEventer() //获取gossipServiceInstance,并构造configEventer
????gossipCallbackWrapper := func(cm configtxapi.Manager) {
????????ac, ok := configtxInitializer.ApplicationConfig()
????????gossipEventer.ProcessConfigUpdate(&chainSupport{
????????????Manager: cm,
????????????Application: ac,
????????})
????????//验证可疑节点身份,并关闭无效链接
????????service.GetGossipService().SuspectPeers(func(identity api.PeerIdentityType) bool {
????????????return true
????????})
????}
????trustedRootsCallbackWrapper := func(cm configtxapi.Manager) {
????????updateTrustedRoots(cm)
????}
????configtxManager, err := configtx.NewManagerImpl(
????????envelopeConfig,
????????configtxInitializer,
????????[]func(cm configtxapi.Manager){gossipCallbackWrapper, trustedRootsCallbackWrapper},
????)
????mspmgmt.XXXSetMSPManager(cid, configtxManager.MSPManager())
????ac, ok := configtxInitializer.ApplicationConfig()
????cs := &chainSupport{
????????Manager: configtxManager,
????????Application: ac, // TODO, refactor as this is accessible through Manager
????????ledger: ledger,
????}
????c := committer.NewLedgerCommitterReactive(ledger, txvalidator.NewTxValidator(cs), func(block *common.Block) error {
????????chainID, err := utils.GetChainIDFromBlock(block)
????????if err != nil {
????????????return err
????????}
????????return SetCurrConfigBlock(block, chainID)
????})
????ordererAddresses := configtxManager.ChannelConfig().OrdererAddresses()
????service.GetGossipService().InitializeChannel(cs.ChainID(), c, ordererAddresses)
????chains.Lock()
????defer chains.Unlock()
????chains.list[cid] = &chain{
????????cs: cs,
????????cb: cb,
????????committer: c,
????}
????return nil
}
//代码在core/peer/peer.go
scc更详细内容参考:Fabric 1.0源代码笔记 之 scc(系统链码)
8、启动peerServer和ehubGrpcServer,并监控系统信号,以及启动Go自带的profiling支持进行调试
代码如下:
serve := make(chan error)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
????sig := <-sigs //接收系统信号
????serve <- nil
}
go func() {
????var grpcErr error
????grpcErr = peerServer.Start() //启动peerServer
????serve <- grpcErr
}
err := writePid(config.GetPath("peer.fileSystemPath")+"/peer.pid", os.Getpid()) //写入pid
go ehubGrpcServer.Start() //启动ehubGrpcServer
if viper.GetBool("peer.profile.enabled") {
????go func() { //启动Go自带的profiling支持进行调试
????????profileListenAddress := viper.GetString("peer.profile.listenAddress")
????????profileErr := http.ListenAndServe(profileListenAddress, nil)
????}
}
return <-serve //等待serve
//代码在peer/node/start.go
9、按配置文件重新更新模块日志级别
overrideLogModules := []string{"msp", "gossip", "ledger", "cauthdsl", "policies", "grpc"}
for _, module := range overrideLogModules {
????err = common.SetLogLevelFromViper(module)
}
flogging.SetPeerStartupModulesMap()
//代码在peer/node/start.go
内容总结
以上是互联网集市为您收集整理的区块链教程Fabric1.0源代码分析Peer peer node start命令实现全部内容,希望文章能够帮你解决区块链教程Fabric1.0源代码分析Peer peer node start命令实现所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。