> 文章列表 > 【go语言grpc之client端源码分析二】

【go语言grpc之client端源码分析二】

【go语言grpc之client端源码分析二】

go语言grpc之server端源码分析二

  • DialContext
    • parseTargetAndFindResolver
      • getResolver
    • newCCResolverWrapper
      • ccResolverWrapper.UpdateState
        • cc.maybeApplyDefaultServiceConfig
        • ccBalancerWrapper.updateClientConnState

上一篇文章分析了ClientConn的主要结构体成员,然后接下来看一下对应的实现也就是DialContext方法

DialContext

// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// https://github.com/grpc/grpc/blob/master/doc/naming.md.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target:            target,csMgr:             &connectivityStateManager{},conns:             make(map[*addrConn]struct{}),dopts:             defaultDialOptions(),blockingpicker:    newPickerWrapper(),czData:            new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}// 初始化ctxcc.ctx, cc.cancel = context.WithCancel(context.Background())// 加载额外的配置for _, opt := range opts {opt.apply(&cc.dopts)}// 将额外的配置串起来chainUnaryClientInterceptors(cc)chainStreamClientInterceptors(cc)defer func() {if err != nil {cc.Close()}}() // 删掉一些用不到的ssl的配置// 设置cc.dopts.copts.UserAgent 为 "grpc-go/1.45.0"if cc.dopts.copts.UserAgent != "" {cc.dopts.copts.UserAgent += " " + grpcUA} else {cc.dopts.copts.UserAgent = grpcUA}// 获取使用的resolverBuilderresolverBuilder, err := cc.parseTargetAndFindResolver()if err != nil {return nil, err}// 获取authority 这里是 localtion:8002cc.authority, err = determineAuthority(cc.parsedTarget.Endpoint, cc.target, cc.dopts)if err != nil {return nil, err}// 初始化balancercc.balancerBuildOpts = balancer.BuildOptions{DialCreds:        credsClone,CredsBundle:      cc.dopts.copts.CredsBundle,Dialer:           cc.dopts.copts.Dialer,Authority:        cc.authority,CustomUserAgent:  cc.dopts.copts.UserAgent,ChannelzParentID: cc.channelzID,Target:           cc.parsedTarget,}// 对resolverBuilder增加覆盖初始化rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()return cc, nil
}

上面的是在删除了很多不用的代码精简后的结果,然后看一下这个主要是下面的两个方法。

  • parseTargetAndFindResolver
  • newCCResolverWrapper

parseTargetAndFindResolver

首先看一下代码的实现

func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)// 接下targetvar rb resolver.BuilderparsedTarget, err := parseTarget(cc.target)if err != nil {channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)} else {// 根据scheme获取Resolver 如果存在那么就直接返回rb = cc.getResolver(parsedTarget.Scheme)if rb != nil {cc.parsedTarget = parsedTargetreturn rb, nil}}// 如果没有对应的resolver那么就使用 passthrough 对应的resolver//获取默认的scheme 这里就是passthroughdefScheme := resolver.GetDefaultScheme()channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)// 这里的canonicalTarget就是passthrough:///localhost:8002canonicalTarget := defScheme + ":///" + cc.target// 根据canonicalTarget 去解析目标parsedTarget, err = parseTarget(canonicalTarget)if err != nil {return nil, err}// 回去的就是passthrough对应的Resolverrb = cc.getResolver(parsedTarget.Scheme)if rb == nil {return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)}// 添加到parsedTarget中去cc.parsedTarget = parsedTargetreturn rb, nil
}

然后看一下parseTarget这个,其实就是如果我们传入的scheme://host:port.然后解析到Target结构体,我们在上一篇文章也说了,也就是

type Target struct {// Deprecated: use URL.Scheme instead.Scheme string// Deprecated: use URL.Host instead.Authority string// Deprecated: use URL.Path or URL.Opaque instead. The latter is set when// the former is empty.Endpoint string// URL contains the parsed dial target with an optional default scheme added// to it if the original dial target contained no scheme or contained an// unregistered scheme. Any query params specified in the original dial// target can be accessed from here.URL url.URL
}

getResolver

然后就是根据getResolver获取对应的Builder。先看一下对应的方法

func (cc *ClientConn) getResolver(scheme string) resolver.Builder {for _, rb := range cc.dopts.resolvers {if scheme == rb.Scheme() {return rb}}return resolver.Get(scheme)
}

然后Build的实现是

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {// Build creates a new resolver for the given target.//// gRPC dial calls Build synchronously, and fails if the returned error is// not nil.Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)// Scheme returns the scheme supported by this resolver.// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.Scheme() string
}

而这里的ClientConn是一个interface,实现是

// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {// UpdateState updates the state of the ClientConn appropriately.UpdateState(State) error// ReportError notifies the ClientConn that the Resolver encountered an// error.  The ClientConn will notify the load balancer and begin calling// ResolveNow on the Resolver with exponential backoff.ReportError(error)// NewAddress is called by resolver to notify ClientConn a new list// of resolved addresses.// The address list should be the complete list of resolved addresses.//// Deprecated: Use UpdateState instead.NewAddress(addresses []Address)// NewServiceConfig is called by resolver to notify ClientConn a new// service config. The service config should be provided as a json string.//// Deprecated: Use UpdateState instead.NewServiceConfig(serviceConfig string)// ParseServiceConfig parses the provided service config and returns an// object that provides the parsed config.ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}

然后 Resolver的实现前面提到过,这里是


// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {// ResolveNow will be called by gRPC to try to resolve the target name// again. It's just a hint, resolver can ignore this if it's not necessary.//// It could be called multiple times concurrently.ResolveNow(ResolveNowOptions)// Close closes the resolver.Close()
}

然后 这里

func Register(b Builder) {m[b.Scheme()] = b
}
var (// m is a map from scheme to resolver builder.m = make(map[string]Builder)// defaultScheme is the default scheme to use.defaultScheme = "passthrough"
)

可以看到这里默认的Scheme 就是passthrough。然后看一下在哪里将passthrough的resolver进行注册。

const scheme = "passthrough"type passthroughBuilder struct{}func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {r := &passthroughResolver{target: target,cc:     cc,}r.start()return r, nil
}func (*passthroughBuilder) Scheme() string {return scheme
}type passthroughResolver struct {target resolver.Targetcc     resolver.ClientConn
}func (r *passthroughResolver) start() {r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}func (*passthroughResolver) Close() {}func init() {resolver.Register(&passthroughBuilder{})
}

可以看出来这里是利用了init方法注册了,然后返回的是passthroughBuilder这个方法.

所以这里parseTargetAndFindResolver方法也就说完了,返回的就是passthroughBuilder结构体,这个里面的build方法我们放到后面再说。

newCCResolverWrapper

然后这个方法有两个参数,第一个是cc也就是在DialContext方法刚开始就初始的ClientConn,然后就是resolverBuilder,也就是passthroughBuilder。然后看一下这个方法的实现。

// newCCResolverWrapper uses the resolver.Builder to build a Resolver and
// returns a ccResolverWrapper object which wraps the newly built resolver.
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {// 初始ccResolverWrapperccr := &ccResolverWrapper{cc:   cc,done: grpcsync.NewEvent(),}var credsClone credentials.TransportCredentialsif creds := cc.dopts.copts.TransportCredentials; creds != nil {credsClone = creds.Clone()}rbo := resolver.BuildOptions{DisableServiceConfig: cc.dopts.disableServiceConfig,DialCreds:            credsClone,CredsBundle:          cc.dopts.copts.CredsBundle,Dialer:               cc.dopts.copts.Dialer,}var err error// We need to hold the lock here while we assign to the ccr.resolver field// to guard against a data race caused by the following code path,// rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up// accessing ccr.resolver which is being assigned here.ccr.resolverMu.Lock()defer ccr.resolverMu.Unlock()// 调用传入的resolver.Builder的build方法ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)if err != nil {return nil, err}return ccr, nil
}

然后这个方法的逻辑就是初始化ccResolverWrapper,然后就是调用resolver.Builder的build获取resolver,放入到ccResolverWrapper,然后在把ccResolverWrapper返回。
因为这里的rb其实就是passthroughResolver,然后看一下这个的build方法。
根据上面的源码passthroughResolver的Build其实就是初始化passthroughResolver,然后调用传入的ccr的UpdateState方法。参数就是resolver.State。


func (r *passthroughResolver) start() {r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}

因为这里cc就是ccr也就是ccResolverWrapper.所以看一下ccResolverWrapper的UpdateState方法。

ccResolverWrapper.UpdateState

先看一下源码的实现

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {ccr.incomingMu.Lock()defer ccr.incomingMu.Unlock()ccr.curState = sif err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {return balancer.ErrBadResolverState}return nil
}

这里的ccr.cc就是ClientConn,也就是一开始在DialContext中初始化,然后看一下ClientConn的updateResolverState方法。

func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {defer cc.firstResolveEvent.Fire()cc.mu.Lock()// Check if the ClientConn is already closed. Some fields (e.g.// balancerWrapper) are set to nil when closing the ClientConn, and could// cause nil pointer panic if we don't have this check.if cc.conns == nil {cc.mu.Unlock()return nil}// 删除err不为nil的逻辑var ret errorif cc.dopts.disableServiceConfig {channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)cc.maybeApplyDefaultServiceConfig(s.Addresses)} else if s.ServiceConfig == nil {cc.maybeApplyDefaultServiceConfig(s.Addresses)// TODO: do we need to apply a failing LB policy if there is no// default, per the error handling design?}    cc.blockingpicker.updatePicker(base.NewErrPicker(err))cc.csMgr.updateState(connectivity.TransientFailure)cc.mu.Unlock()return ret}}}var balCfg serviceconfig.LoadBalancingConfigif cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {balCfg = cc.sc.lbConfig.cfg}cbn := cc.curBalancerNamebw := cc.balancerWrappercc.mu.Unlock()if cbn != grpclbName {// Filter any grpclb addresses since we don't have the grpclb balancer.for i := 0; i < len(s.Addresses); {if s.Addresses[i].Type == resolver.GRPCLB {copy(s.Addresses[i:], s.Addresses[i+1:])s.Addresses = s.Addresses[:len(s.Addresses)-1]continue}i++}}uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})if ret == nil {ret = uccsErr // prefer ErrBadResolver state since any other error is// currently meaningless to the caller.}return ret
}

上面的代码很多其实也就是两个方法。
第一个是当 s.ServiceConfig == nil 的时候调用cc.maybeApplyDefaultServiceConfig(s.Addresses).
第一个也就是
bw := cc.balancerWrapper
uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})

然后接下来看一下这两个方法

cc.maybeApplyDefaultServiceConfig

这里的address就是在初始化的时候传入的 localhost:8002.
然后看一下这个方法的实现

func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {if sc == nil {// should never reach here.return}cc.sc = sc// 生成balancerBuilderif cc.dopts.balancerBuilder == nil {// Only look at balancer types and switch balancer if balancer dial// option is not set.var newBalancerName stringif cc.sc != nil && cc.sc.lbConfig != nil {newBalancerName = cc.sc.lbConfig.name} else {var isGRPCLB boolfor _, a := range addrs {if a.Type == resolver.GRPCLB {isGRPCLB = truebreak}}if isGRPCLB {newBalancerName = grpclbName} else if cc.sc != nil && cc.sc.LB != nil {newBalancerName = *cc.sc.LB} else {newBalancerName = PickFirstBalancerName}}// 生成PickFirstBalancerName的balancecc.switchBalancer(newBalancerName)} else if cc.balancerWrapper == nil {// Balancer dial option was set, and this is the first time handling// resolved addresses. Build a balancer with dopts.balancerBuilder.cc.curBalancerName = cc.dopts.balancerBuilder.Name()cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)}
}

因为cc.dopts.balancerBuilder 这里为nil,同时newBalancerName为else中的逻辑,也就是PickFirstBalancerName也就是pick_first。
所以这个方法的逻辑也就是cc.switchBalancer(“pick_first”)。

// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {if strings.EqualFold(cc.curBalancerName, name) {return}channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)if cc.dopts.balancerBuilder != nil {channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")return}if cc.balancerWrapper != nil {// Don't hold cc.mu while closing the balancers. The balancers may call// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex// would cause a deadlock in that case.cc.mu.Unlock()cc.balancerWrapper.close()cc.mu.Lock()}builder := balancer.Get(name)if builder == nil {channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)builder = newPickfirstBuilder()} else {channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)}cc.curBalancerName = builder.Name()cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

看一下这里的pick_first的实现

// PickFirstBalancerName is the name of the pick_first balancer.
const PickFirstBalancerName = "pick_first"func newPickfirstBuilder() balancer.Builder {return &pickfirstBuilder{}
}type pickfirstBuilder struct{}func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {return &pickfirstBalancer{cc: cc}
}

然后看一下newCCBalancerWrapper这个方法

func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper {ccb := &ccBalancerWrapper{cc:       cc,updateCh: buffer.NewUnbounded(),closed:   grpcsync.NewEvent(),done:     grpcsync.NewEvent(),subConns: make(map[*acBalancerWrapper]struct{}),}go ccb.watcher()ccb.balancer = b.Build(ccb, bopts)_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)return ccb
}

这里是初始化ccBalancerWrapper这个结构体,然后调用build实例化balancer。
然后看一下Build这个方法,也就是pickfirstBuilder。这里的build其实就是返回了pickfirstBalancer这个结构体,看一下实现

type pickfirstBalancer struct {state connectivity.Statecc    balancer.ClientConnsc    balancer.SubConn
}

然后看一下ccb.watcher方法,也就是

// watcher balancer functions sequentially, so the balancer can be implemented
// lock-free.
func (ccb *ccBalancerWrapper) watcher() {for {select {case t := <-ccb.updateCh.Get():ccb.updateCh.Load()if ccb.closed.HasFired() {break}switch u := t.(type) {case *scStateUpdate:ccb.balancerMu.Lock()ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err})ccb.balancerMu.Unlock()case *acBalancerWrapper:ccb.mu.Lock()if ccb.subConns != nil {delete(ccb.subConns, u)ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)}ccb.mu.Unlock()case exitIdle:if ccb.cc.GetState() == connectivity.Idle {if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {// We already checked that the balancer implements// ExitIdle before pushing the event to updateCh, but// check conditionally again as defensive programming.ccb.balancerMu.Lock()ei.ExitIdle()ccb.balancerMu.Unlock()}}default:logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)}case <-ccb.closed.Done():}if ccb.closed.HasFired() {ccb.balancerMu.Lock()ccb.balancer.Close()ccb.balancerMu.Unlock()ccb.mu.Lock()scs := ccb.subConnsccb.subConns = nilccb.mu.Unlock()ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil})ccb.done.Fire()// Fire done before removing the addr conns.  We can safely unblock// ccb.close and allow the removeAddrConns to happen// asynchronously.for acbw := range scs {ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)}return}}
}

这里主要是scStateUpdate这个case,这里可以看出来当状态有更新的时候,会调用对应balance的UpdateSubConnState方法,在这里的实现是pickfirst是

func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {if logger.V(2) {logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s)}if b.sc != sc {if logger.V(2) {logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized")}return}b.state = s.ConnectivityStateif s.ConnectivityState == connectivity.Shutdown {b.sc = nilreturn}switch s.ConnectivityState {case connectivity.Ready:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}})case connectivity.Connecting:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})case connectivity.Idle:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}})case connectivity.TransientFailure:b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState,Picker:            &picker{err: s.ConnectionError},})}
}

注意这里的cc是ccBalancerWrapper,所以也就是调用ccBalancerWrapper的UpdateState方法,也就是

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {ccb.mu.Lock()defer ccb.mu.Unlock()if ccb.subConns == nil {return}// Update picker before updating state.  Even though the ordering here does// not matter, it can lead to multiple calls of Pick in the common start-up// case where we wait for ready and then perform an RPC.  If the picker is// updated later, we could call the "connecting" picker when the state is// updated, and then call the "ready" picker after the picker gets updated.ccb.cc.blockingpicker.updatePicker(s.Picker)ccb.cc.csMgr.updateState(s.ConnectivityState)
}

然后blockingpicker的updatePicker是

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {pw.mu.Lock()if pw.done {pw.mu.Unlock()return}pw.picker = p// pw.blockingCh should never be nil.close(pw.blockingCh)pw.blockingCh = make(chan struct{})pw.mu.Unlock()
}

其实就是更新pickerWrapper,并且通知通过close通知picker有更新。
然后调用updateState也就是csMgr

// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = statechannelz.Infof(logger, csm.channelzID, "Channel Connectivity change to %v", state)if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}
}

这里的就是更新ClientConn中的connectivityStateManager的状态。

ccBalancerWrapper.updateClientConnState

上面的cc.applyServiceConfigAndBalancer说完了,然后就是updateClientConnState方法,

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {ccb.balancerMu.Lock()defer ccb.balancerMu.Unlock()return ccb.balancer.UpdateClientConnState(*ccs)
}

然后就是调用balancer的UpdateClientConnState。注意这里的balancer还是pickfirst。看一下实现

func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error {if len(cs.ResolverState.Addresses) == 0 {b.ResolverError(errors.New("produced zero addresses"))return balancer.ErrBadResolverState}if b.sc == nil {var err errorb.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{})if err != nil {if logger.V(2) {logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)}b.state = connectivity.TransientFailureb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure,Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},})return balancer.ErrBadResolverState}b.state = connectivity.Idleb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})b.sc.Connect()} else {b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)b.sc.Connect()}return nil
}

这里的sc就是subConn。然后就是调用cc的NewSubConn,也就是ccBalancerWrapper的NewSubConn。然后看一下实现

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {if len(addrs) <= 0 {return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list")}ccb.mu.Lock()defer ccb.mu.Unlock()if ccb.subConns == nil {return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed")}ac, err := ccb.cc.newAddrConn(addrs, opts)if err != nil {return nil, err}acbw := &acBalancerWrapper{ac: ac}acbw.ac.mu.Lock()ac.acbw = acbwacbw.ac.mu.Unlock()ccb.subConns[acbw] = struct{}{}return acbw, nil
}

然后就是ac的实现,也就是

// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
//
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {ac := &addrConn{state:        connectivity.Idle,cc:           cc,addrs:        addrs,scopts:       opts,dopts:        cc.dopts,czData:       new(channelzData),resetBackoff: make(chan struct{}),}ac.ctx, ac.cancel = context.WithCancel(cc.ctx)// Track ac in cc. This needs to be done before any getTransport(...) is called.cc.mu.Lock()if cc.conns == nil {cc.mu.Unlock()return nil, ErrClientConnClosing}if channelz.IsOn() {ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{Desc:     "Subchannel Created",Severity: channelz.CtInfo,Parent: &channelz.TraceEventDesc{Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),Severity: channelz.CtInfo,},})}cc.conns[ac] = struct{}{}cc.mu.Unlock()return ac, nil
}

所以就是返回了acBalancerWrapper这个结构体,然后看一下ccBalancerWrapper的UpdateState方法,这个上面说过,主要是更新
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
这两个方法。
然后就是connect‘,其实就是根据地址去真正的连接后端的地址。

然后看一下acBalancerWrapper的connect方法,也就是

func (acbw *acBalancerWrapper) Connect() {acbw.mu.Lock()defer acbw.mu.Unlock()go acbw.ac.connect()
}

然后看一下addrConn的connect实现,

// connect starts creating a transport.
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
func (ac *addrConn) connect() error {ac.mu.Lock()if ac.state == connectivity.Shutdown {ac.mu.Unlock()return errConnClosing}if ac.state != connectivity.Idle {ac.mu.Unlock()return nil}// Update connectivity state within the lock to prevent subsequent or// concurrent calls from resetting the transport more than once.// 更新ac的状态ac.updateConnectivityState(connectivity.Connecting, nil)ac.mu.Unlock()// 更新地址ac.resetTransport()return nil
}

然后就是 ac.tryAllAddrs方法,然后就是在调用ac.createTransport。最后调用transport.NewClientTransport方法。看一下实现

// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
}

到这里就是http2的逻辑了,然后看一下newHTTP2Client的实现

// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {scheme := "http"ctx, cancel := context.WithCancel(ctx)defer func() {if err != nil {cancel()}}()// gRPC, resolver, balancer etc. can specify arbitrary data in the// Attributes field of resolver.Address, which is shoved into connectCtx// and passed to the dialer and credential handshaker. This makes it possible for// address specific arbitrary data to reach custom dialers and credential handshakers.connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)if err != nil {if opts.FailOnNonTempDialError {return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)}return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)}// Any further errors will close the underlying connectiondefer func(conn net.Conn) {if err != nil {conn.Close()}}(conn)kp := opts.KeepaliveParams// Validate keepalive parameters.if kp.Time == 0 {kp.Time = defaultClientKeepaliveTime}if kp.Timeout == 0 {kp.Timeout = defaultClientKeepaliveTimeout}keepaliveEnabled := falseif kp.Time != infinity {if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)}keepaliveEnabled = true}var (isSecure boolauthInfo credentials.AuthInfo)transportCreds := opts.TransportCredentialsperRPCCreds := opts.PerRPCCredentialsif b := opts.CredsBundle; b != nil {if t := b.TransportCredentials(); t != nil {transportCreds = t}if t := b.PerRPCCredentials(); t != nil {perRPCCreds = append(perRPCCreds, t)}}if transportCreds != nil {rawConn := conn// Pull the deadline from the connectCtx, which will be used for// timeouts in the authentication protocol handshake. Can ignore the// boolean as the deadline will return the zero value, which will make// the conn not timeout on I/O operations.deadline, _ := connectCtx.Deadline()rawConn.SetDeadline(deadline)conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, rawConn)rawConn.SetDeadline(time.Time{})if err != nil {return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)}for _, cd := range perRPCCreds {if cd.RequireTransportSecurity() {if ci, ok := authInfo.(interface {GetCommonAuthInfo() credentials.CommonAuthInfo}); ok {secLevel := ci.GetCommonAuthInfo().SecurityLevelif secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")}}}}isSecure = trueif transportCreds.Info().SecurityProtocol == "tls" {scheme = "https"}}dynamicWindow := trueicwz := int32(initialWindowSize)if opts.InitialConnWindowSize >= defaultWindowSize {icwz = opts.InitialConnWindowSizedynamicWindow = false}writeBufSize := opts.WriteBufferSizereadBufSize := opts.ReadBufferSizemaxHeaderListSize := defaultClientMaxHeaderListSizeif opts.MaxHeaderListSize != nil {maxHeaderListSize = *opts.MaxHeaderListSize}t := &http2Client{ctx:                   ctx,ctxDone:               ctx.Done(), // Cache Done chan.cancel:                cancel,userAgent:             opts.UserAgent,conn:                  conn,remoteAddr:            conn.RemoteAddr(),localAddr:             conn.LocalAddr(),authInfo:              authInfo,readerDone:            make(chan struct{}),writerDone:            make(chan struct{}),goAway:                make(chan struct{}),framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),fc:                    &trInFlow{limit: uint32(icwz)},scheme:                scheme,activeStreams:         make(map[uint32]*Stream),isSecure:              isSecure,perRPCCreds:           perRPCCreds,kp:                    kp,statsHandler:          opts.StatsHandler,initialWindowSize:     initialWindowSize,onPrefaceReceipt:      onPrefaceReceipt,nextID:                1,maxConcurrentStreams:  defaultMaxStreamsClient,streamQuota:           defaultMaxStreamsClient,streamsQuotaAvailable: make(chan struct{}, 1),czData:                new(channelzData),onGoAway:              onGoAway,onClose:               onClose,keepaliveEnabled:      keepaliveEnabled,bufferPool:            newBufferPool(),}if md, ok := addr.Metadata.(*metadata.MD); ok {t.md = *md} else if md := imetadata.Get(addr); md != nil {t.md = md}t.controlBuf = newControlBuffer(t.ctxDone)if opts.InitialWindowSize >= defaultWindowSize {t.initialWindowSize = opts.InitialWindowSizedynamicWindow = false}if dynamicWindow {t.bdpEst = &bdpEstimator{bdp:               initialWindowSize,updateFlowControl: t.updateFlowControl,}}if t.statsHandler != nil {t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{RemoteAddr: t.remoteAddr,LocalAddr:  t.localAddr,})connBegin := &stats.ConnBegin{Client: true,}t.statsHandler.HandleConn(t.ctx, connBegin)}if channelz.IsOn() {t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))}if t.keepaliveEnabled {t.kpDormancyCond = sync.NewCond(&t.mu)go t.keepalive()}// Start the reader goroutine for incoming message. Each transport has// a dedicated goroutine which reads HTTP2 frame from network. Then it// dispatches the frame to the corresponding stream entity.go t.reader()// Send connection preface to server.n, err := t.conn.Write(clientPreface)if err != nil {err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)t.Close(err)return nil, err}if n != len(clientPreface) {err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))t.Close(err)return nil, err}var ss []http2.Settingif t.initialWindowSize != defaultWindowSize {ss = append(ss, http2.Setting{ID:  http2.SettingInitialWindowSize,Val: uint32(t.initialWindowSize),})}if opts.MaxHeaderListSize != nil {ss = append(ss, http2.Setting{ID:  http2.SettingMaxHeaderListSize,Val: *opts.MaxHeaderListSize,})}err = t.framer.fr.WriteSettings(ss...)if err != nil {err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)t.Close(err)return nil, err}// Adjust the connection flow control window if needed.if delta := uint32(icwz - defaultWindowSize); delta > 0 {if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)t.Close(err)return nil, err}}t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)if err := t.framer.writer.Flush(); err != nil {return nil, err}go func() {t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)err := t.loopy.run()if err != nil {if logger.V(logLevel) {logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)}}// Do not close the transport.  Let reader goroutine handle it since// there might be data in the buffers.t.conn.Close()t.controlBuf.finish()close(t.writerDone)}()return t, nil
}

可以看出来这里的http2的实现和之前是大同小异的,这里就不多描述了。
接下来化了一个流程图来进行帮助记忆。
【go语言grpc之client端源码分析二】

手机评测