聚合国内IT技术精华文章,分享IT技术精华,帮助IT从业人士成长

实战etcd的服务发现

2020-11-17 23:07 浏览: 169 次 我要评论(0 条) 字号:

在云原生的时代,服务发现已经是必不可少的功能,我借着最近迁移 gRPC 服务的机会尝试了一下如何用 etcd 实现服务发现,期间遇到诸多问题,本文逐一记之。

虽然 gRPC 并没有内置 etcd 的服务发现功能,但是它提供了相关接口让我们扩展:

// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
	// Build creates a new resolver for the given target.
	//
	// gRPC dial calls Build synchronously, and fails if the returned error is
	// not nil.
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	// Scheme returns the scheme supported by this resolver.
	// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
	Scheme() string
}

// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
	// ResolveNow will be called by gRPC to try to resolve the target name
	// again. It's just a hint, resolver can ignore this if it's not necessary.
	//
	// It could be called multiple times concurrently.
	ResolveNow(ResolveNowOptions)
	// Close closes the resolver.
	Close()
}

在实际动手之前,我们还需要了解一下「gRPC Name Resolution」,它定义了 gRPC 的 URI 格式,举个例子:「dns://1.1.1.1/huoding.com」,其中:

  • Scheme:dns
  • Authority:1.1.1.1
  • Endpoint:huoding.com

表示通过 dns 服务器 1.1.1.1 查询 huoding.com 有哪些节点。

既然我们要支持 etcd,那么我们首先要想好 etcd 对应的 URI 应该是什么样的,Authority 填什么好呢?按 dns 的例子的意思,填 etcd 服务器的地址似乎就可以,不过实际情况中,一般会有多台 etcd 服务器,还牵扯到用户名密码,与其全写到 Authority 里,还不如直接从配置文件里获取来的实在,所以建议 Authority 留空。假设我们要通过 etcd 查询一个名为 foo 的服务对应的节点,那么 URI 可以定义为:「etcd:///foo」。

了解了基础知识之后,在编码前让我们在头脑里过一遍 gRPC 的服务流程:

  • 服务端启动,在 etcd 里通过租约注册键为「/foo/<ip>:<port>」并且值为「<ip>:<port>」的数据,同时定期发送心跳包,一旦节点退出会注销相关数据。
  • 客户端启动,gRPC 把 etcd:///foo 解析出 Scheme、Authority、Endpoint,并根据 Scheme 找到对应 Builder,调用其 Build 方法,返回对应的 Resolver,在 etcd 中查询前缀是「/foo/」的数据,就是目前可用的节点。
  • 最后,负载均衡会挑选出一个节点来提供服务。
etcd

etcd

下面可以粘代码了,我主要是参考 gRPC 内置的 dns_resolver.go 来实现的。

先是 builer.go,实现了 Builder 接口:

package etcd

import (
	"fmt"

	"go.etcd.io/etcd/clientv3"
	"google.golang.org/grpc/resolver"
)

type Builder struct {
	Client *clientv3.Client
}

func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	prefix := fmt.Sprintf("/%s/", target.Endpoint)

	r := &Resolver{
		Client: b.Client,
		cc:     cc,
		prefix: prefix,
	}

	go r.watcher()
	r.ResolveNow(resolver.ResolveNowOptions{})
	return r, nil
}

func (b *Builder) Scheme() string {
	return "etcd"
}

再是 resolver.go,实现了 Resolver 接口:

package etcd

import (
	"context"
	"sync"

	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/mvcc/mvccpb"
	"google.golang.org/grpc/resolver"
)

type Resolver struct {
	sync.RWMutex
	Client    *clientv3.Client
	cc        resolver.ClientConn
	prefix    string
	addresses map[string]resolver.Address
}

func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
	// todo
}

func (r *Resolver) Close() {
	// todo
}

func (r *Resolver) watcher() {
	r.addresses = make(map[string]resolver.Address)
	response, err := r.Client.Get(context.Background(), r.prefix, clientv3.WithPrefix())

	if err == nil {
		for _, kv := range response.Kvs {
			r.addresses[string(kv.Key)] = resolver.Address{Addr: string(kv.Value)}
		}

		r.cc.UpdateState(resolver.State{
			Addresses: r.getAddresses(),
		})
	}

	watch := r.Client.Watch(context.Background(), r.prefix, clientv3.WithPrefix())

	for response := range watch {
		for _, event := range response.Events {
			switch event.Type {
			case mvccpb.PUT:
				r.setAddress(string(event.Kv.Key), string(event.Kv.Value))
			case mvccpb.DELETE:
				r.delAddress(string(event.Kv.Key))
			}
		}

		r.cc.UpdateState(resolver.State{
			Addresses: r.getAddresses(),
		})
	}
}

func (r *Resolver) setAddress(key, address string) {
	r.Lock()
	defer r.Unlock()
	r.addresses[key] = resolver.Address{Addr: string(address)}
}

func (r *Resolver) delAddress(key string) {
	r.Lock()
	defer r.Unlock()
	delete(r.addresses, key)
}

func (r *Resolver) getAddresses() []resolver.Address {
	addresses := make([]resolver.Address, len(r.addresses))

	for _, address := range r.addresses {
		addresses = append(addresses, address)
	}

	return addresses
}

接着是服务端代码:

func main() {
	host := viper.GetString("server.host")
	port := viper.GetString("server.port")
	listener, err := net.Listen("tcp", net.JoinHostPort(host, port))

	if err != nil {
		log.Fatalln(err)
	}

	server := grpc.NewServer()
	reflection.Register(server)
	pb.RegisterFooServer(server, &foo.Server{})
	close, err := register("foo", 1)

	if err != nil {
		log.Fatalln(err)
	}

	go func() {
		if err := server.Serve(listener); err != nil {
			log.Fatalln(err)
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
	close()
}

func register(service string, ttl int64) (func(), error) {
	port := viper.GetString("server.port")
	client, err := etcdClient()

	if err != nil {
		return nil, err
	}

	ctx := context.Background()
	lease, err := client.Grant(ctx, ttl)

	if err != nil {
		return nil, err
	}

	ip, err := localIP()

	if err != nil {
		return nil, err
	}

	key := fmt.Sprintf("/%s/%s:%s", service, ip, port)
	value := fmt.Sprintf("%s:%s", localIP, port)
	_, err = client.Put(ctx, key, value, clientv3.WithLease(lease.ID))

	if err != nil {
		return nil, err
	}

	leaseKeepAliveResponse, err := client.KeepAlive(ctx, lease.ID)

	if err != nil {
		return nil, err
	}

	go func() {
		for {
			<-leaseKeepAliveResponse
		}
	}()

	close := func() {
		_, _ = client.Revoke(ctx, lease.ID)
	}

	return close, nil
}

func localIP() (string, error) {
	addrs, err := net.InterfaceAddrs()

	if err != nil {
		return "", err
	}

	for _, addr := range addrs {
		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
			if ipnet.IP.To4() != nil {
				return ipnet.IP.String(), nil
			}
		}
	}

	return "", errors.New("unable to determine local ip")
}

最后是客户端代码:

func main() {
	client, err := etcdClient()

	if err != nil {
		log.Fatalln(err)
	}

	builder := &etcd.Builder{
		Client: client,
	}

	resolver.Register(builder)
	ctx := context.Background()
	target := "etcd:///foo"

	cc, err := grpc.DialContext(
		ctx,
		target,
		grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)),
		grpc.WithInsecure(),
	)

	if err != nil {
		log.Fatalln(err)
	}

	defer cc.Close()

	// pb.NewFooClient(cc)
}

友情提示,目前 gRPC 和 etcd 的最新版本不兼容,需要在 go.mod 里指定版本:

replace (
	github.com/golang/protobuf => github.com/golang/protobuf v1.3.2
	google.golang.org/grpc => google.golang.org/grpc v1.26.0
)

收工。



网友评论已有0条评论, 我也要评论

发表评论

*

* (保密)

Ctrl+Enter 快捷回复