Fork me on GitHub
image frame

Welcome to Jayj's Blog

Code Harder, Code Better

etcd clientv3 全api文档 之 KV

无论是新使用etcd还是需要更深一点了解etcd的人可能都会遇到我曾经遇到的问题:为什么文档这么少?为什么官方文档都不全?这个api怎么用,是什么?网上的教程简略到只有基础方法,各个网页还都是千篇一律的复制粘贴,让当初在使用etcd的我很痛苦。正好在我想要开源一个etcd的web panel的时候,我决定写出一篇文档,从源码下手,对每个api的用法讲解出来。不仅有助于我开源项目的理解,也可以帮助到对etcd不熟悉的小伙伴们。在学习etcd之前,有一个有意思的点可以帮助你理解etcd,就是将所有的key看成水平分布的key,而不是树形结构的key,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
/a
/b
/d
/e
/c
/f
看做
/a
/a/b
/a/b/d
/a/b/e
/a/c
/a/c/f

这样在使用如withPrefix方法时可以更清楚的知道你到底查找、删除了哪一系列的key。

关于clientv3的讲解不涉及具体的etcd源码分析,具体的源码分析会在后面写出。

before started

本文依照的是etcd 版本 3.5.1,etcd的v2与v3区别很大,改变了很多原理,如auth的方式等等;而v3中 3.1 3.2 3.3 3.4 3.5也各不相同,但改动不大,在观看之前需要看清楚版本。具体各个版本的区别再后面会单独写一章。

get started

在etcd全api文档的第一篇,我决定先从最常用的KV下手,包括了存取删除和事务等,基本是新手开始使用etcd最需要知道的几个方法。后面的文章也会以文件为分割来讲述etcd的源码及api使用方法。

KV struct

1
2
3
4
type kv struct {
remote pb.KVClient
callOpts []grpc.CallOption
}

第一步,创建一个连接

首先,你要先创建一个连接,然后才能进行后续的操作

1
2
3
4
5
6
7
8
cli, err := clientv3.New(clientv3.Config{
Endpoints: "example.endpoint",
DialTimeout: 5 * time.Second
})
if err := nil {
return err
}
defer cli.Close()

注意点

  • 你应该在使用完连接后立即关闭连接,未关闭的连接会使goroutine泄露
  • 虽然使用defer非常方便,也防止你后面忘记关闭连接。但是我更喜欢在操作完方法之后手动调用cli.Close()而不是使用defer,因为每个人的业务场景不一样,方法并不一定会在短时间内结束,可能一次bug导致这个方法一直存活着,你的defer也就一直得不到调用。

NewKV

使用kv与直接使用cli连接调用的区别不大,kv内可以调用的方法与cli一样,具体为调用NewKV会将cli身上的callOpts带到kv上

1
2
3
4
5
6
7
8
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}

return api
}

Put

将一个key-value键值对推入etcd

put方法

1
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

put的调用(使用kv)

1
2
3
kv := clientv3.NewKV(cli)

resp, err := kv.Put(context.Background(), key, val, ...opts)

put的具体实现

1
2
3
4
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}

关于Do方法

Get

取回keys,默认会返回你传入key的value,如果传入

  • WithRange(end), get会返回[key, end)范围内的结果
  • WithFromKey(), get会返回比你传入key大或者相等的key
  • WithRev(rev) 且 rev > 0,get会返回传入key指定版本的value,如果指定的版本(rev)被压缩(compacted)了,会返回error: ErrCompacted。
  • WithLimit(limit),会返回指定数量内的keys
  • WithSort(), 会对返回的keys进行过滤

get方法

1
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

get的调用(使用cli)

1
resp, err := cli.Put(context.Background(), key, ...opts)

get的具体实现

1
2
3
4
func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.Do(ctx, OpGet(key, opts...))
return r.get, toErr(ctx, err)
}

关于Do方法

Delete

删除一个key,或者

  • 使用WithRange(end)来删除[key, range)范围的key
  • 使用WithPrefix()来删除以key开头的key

delete方法

1
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

delete的调用方式

1
resp, err = cli.Delete(ctx, key, ...opts)

delete的具体实现

1
2
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, toErr(ctx, err)

关于Do方法

Do

Do这个方法为你可以将多种/次操作融合到一个方法内,并在之后(重复)调用。具体如何使用除了官方方法外,就要看具体的业务需求。如你可以将put->get->delete这一套流程融合到一个Do里。通过源码可以知道,官方的get、put等也是通过Do方法来实现,然后在Do内实现具体的get、put的etcd调用,区分于事务级别的调用。

Do方法

1
Do(ctx context.Context, op Op) (OpResponse, error)

Do的调用方式

1
2
3
4
5
6
7
8
9
10
ops := []clientv3.Op{
clientv3.OpPut("put-key", "123"),
clientv3.OpGet("put-key"),
clientv3.OpPut("put-key", "456")}

for _, op := range ops {
if _, err := cli.Do(context.TODO(), op); err != nil {
log.Fatal(err)
}
}

Do的具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t { // 对传入的op遍历
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
case tTxn:
var resp *pb.TxnResponse
resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
if err == nil {
return OpResponse{txn: (*TxnResponse)(resp)}, nil
}
default:
panic("Unknown op")
}
return OpResponse{}, toErr(ctx, err)
}

Txn

txn用来创建一个事务,将后续操作以事务进行, 事务有四种子方法

  • If
  • Then
  • Else
  • Commit

Txn方法

1
Txn(ctx context.Context) Txn

txn的调用例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
kvc := clientv3.NewKV(cli)

_, err = kvc.Put(context.TODO(), "key", "xyz")
if err != nil {
log.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = kvc.Txn(ctx).
// 事务的值是字典类型的比较
If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
// "xyz" > "abc" 时运行
Then(clientv3.OpPut("key", "XYZ")).
// else运行
Else(clientv3.OpPut("key", "ABC")).
Commit()

txn的源码

1
2
3
4
5
6
7
func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{
kv: kv,
ctx: ctx,
callOpts: kv.callOpts,
}
}

Compact

compact,压缩key-value,为了防止历史记录越来越大。传入指定的rev,会压缩rev之后的key-value历史

compact方法

1
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

compact的调用方式

1
2
3
4
5
6
7
8
9
resp, err := cli.Get(ctx, "foo")
cancel()
if err != nil {
log.Fatal(err)
}
compRev := resp.Header.Revision // 具体你要压缩哪个版本之前的key-val

ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Compact(ctx, compRev)

compact的具体实现

1
2
3
4
5
6
7
func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*CompactResponse)(resp), err
}

end

这是关于clientv3 api讲解的第一章,如果你有什么建议、想法、编写错误的指出或者好的想法,欢迎通过email或issue的方式来告诉我。

使用docker部署一套相比ELK更轻量的日志系统LPG

Why this

使用过ELK日志系统的话,你可能知道Logstash强大,它可以实现数据传输、格式处理、格式化输出,和强大的插件功能,经常用于日志处理,但是同时logstash也较为消耗资源,占用了很多的CPU和内存,并且由于没有消息队列缓存,存在数据丢失的隐患。很多时候我们对日志的需求并没有那么高,并不需要盯着日志分析结果,只是简单的看几个参数,有没有报错就停止了。更加常见的场景是,我们的服务器并没有那么多余的空间留给日志系统。

所以这篇文章来介绍一套轻量的日志系统LPG(loki、promtail、grafana)

  • loki: 主服务器,负责存储日志和处理查询
  • promtail: 代理,负责收集日志发送给loki
  • grafana: 看板,用来查询和显示日志

loki: like Prometheus, but for logs(像普罗米修斯监控一样,只不过是监控日志),受到普罗米修斯的启发的水平拓展、高可用、多租户日志聚合系统。

和其他聚合日志系统相比,loki:

  • 不对日志进行全文索引。通过存储压缩的非结构化日志和仅索引元数据,loki更易于操作且运行成本更低。
  • 使用您已经在 Prometheus 中使用的相同标签对日志流进行索引和分组,使您能够使用您已经在 Prometheus 中使用的相同标签在指标和日志之间无缝切换。
  • 尤其适合存储Kubernetes Pod 日志。Pod 标签等元数据会被自动抓取和索引。
  • 在 Grafana 看板中有图形界面支持(需要 Grafana v6.0)

根据官方介绍,可以了解到loki的优点是不会对日志全文索引,仅索引“标签”;压缩了非结构化的日志让体积更小;接下来介绍如何使用docker部署LPG日志系统

Get Started

loki.yml

这个文件是配置loki相关的,在docker-compose文件中会映射入loki的image内,我会把可能你会涉及到的配置标注上注释,更具体的配置请参考loki 官方文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# 认证相关
auth_enabled: false

server:
# 监听的端口 因为promtail是主动推送日志给loki 所以需要监听端口号
http_listen_port: 3100

# 配置 ingester 以及 ingester 如何将自己注册到键值存储。
ingester:
lifecycler:
# ip地址
address: 127.0.0.1
ring:
kvstore:
# 存在内存中
store: inmemory
replication_factor: 1
final_sleep: 0s
# 指定时间内的块(chunk)未接收到新日志将会被清空
chunk_idle_period: 1h
# 所有的chunk都将在指定时间后被清空,默认1h
max_chunk_age: 1h
# 每个chunk大小为1.5M
chunk_target_size: 1048576
# 如果使用索引缓存,则这个时间必须大于索引读取缓存的TTL(默认索引读取缓存 TTL 为 5m)
chunk_retain_period: 30s
# 最大尝试次数 0为disabled
max_transfer_retries: 0

# 配置块索引模式及其存储位置
schema_config:
configs:
- from: 2019-11-17 # 创建索引存储区的第一天的日期。如果这是您唯一的 period_config,请使用过去的日期. 否则请使用您希望架构切换的日期。 YYYY-MM-DD 格式,例如:2018-04-15。
store: boltdb-shipper # 用于索引的储存
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h

# 配置索引和块的存储
storage_config:
boltdb_shipper:
active_index_directory: /loki/boltdb-shipper-active
cache_location: /loki/boltdb-shipper-cache
cache_ttl: 24h # 缓存保存时长,可以提升长查询的速度,但是会占据更多的空间
shared_store: filesystem
filesystem:
directory: /loki/chunks

compactor:
working_directory: /loki/boltdb-shipper-compactor
shared_store: filesystem

limits_config:
reject_old_samples: true
reject_old_samples_max_age: 168h

chunk_store_config:
max_look_back_period: 0s

table_manager:
retention_deletes_enabled: false
retention_period: 0s

ruler:
storage:
type: local
local:
directory: /loki/rules
rule_path: /loki/rules-temp
alertmanager_url: http://localhost:9093
ring:
kvstore:
store: inmemory
enable_api: true

promtail.yml

这个文件是配置promtail相关的,注释情况如上,更具体的配置请参考promtail 官方文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server:
http_listen_port: 9080 # 监听端口
grpc_listen_port: 0 # grpc 监听端口

positions:
filename: /tmp/positions.yaml

clients:
- url: http://loki:3100/loki/api/v1/push # loki的通讯地址 与 loki.yml内的监听端口对应

scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
__path__: /var/log/*log # 在此处配置你要监听的日志的地址所存储的地址

docker-compose.yml

最后是lpg的compose文件, 具体每一行做了什么我会将注释写在文件里,如果你完全看不懂docker-compose,可以先尝试学习docker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
version: '3'

services:
# 日志的存储和解析
loki:
image: grafana/loki
container_name: lpg-promtail
volumes:
# 持久化loki配置 映射的地址可以根据自己的需求更改
- ../data/lpg/loki:/etcd/loki
# loki的配置文件
- ./loki.yml:/etc/loki-conf/loki.yml
command: -config.file=/etcd/loki-conf/loki.yml
ports:
- 3100:3100
networks:
- lpg

# 日志收集器
promtail:
image: grafana/promtail
container_name: lpg-promtail
volumes:
# 你需要将要监听的日志映射入容器内 使用编写软连接等方式将日志收集到一个文件夹内
# 通过promtail.yml对应的检索方式发现日志文件,修改左边的地址到你对应的日志存储地址
- ../data/lpg/logs:/var/log
# 持久化promtail
- ../data/lpg/promtail:/etc/promtail
# promtail的配置文件
- ./promtail.yml:/etc/promtail-conf/promtail.yml
command: -config.file=/etc/promtail-conf/promtail.yml
networks:
- lpg

# 日志看板 可视化
grafana:
image: grafana/grafana
container_name: lpg-grafana
ports:
- 3000:3000
networks:
- lpg

networks:
lpg:

文件结构

  • compose
    • data
      • lpg
        • logs 日志集合
        • loki loki持久化
        • promtail promtail持久化
    • lpg
      • loki.yml loki配置文件
      • docker-compose.yml compose文件
      • promtail.yml # promtail配置文件

这个结构只是一个参考,如果你会用docker,可以自己配置文件的位置和存储的地址等

启动compose文件

在编写完docker-compose文件之后,就可以启动了。需要注意的是,在启动promtail之前,promtail抓取日志的路径下应该至少存在一个符合抓取规则的日志,如果没有找到的话,promtail就会停止抓取。很多人一开始在grafana看板中找不到日志也是这个原因。

配置grafana

启动了compose文件之后,就可以进入grafana看板的地址查看效果了,启动的地址在3000(或者你自己设置的)端口,默认的账户名和密码都是admin。

在登录进入grafana之后,首先你应该配置数据来源(data source),如图1、2、3所示:

配置数据来源1
配置数据来源2
配置数据来源3

在配置完数据来源后,进入Explore查看当前的抓取情况,配置如图4、5、6所示:

配置查看页面4

根据图4的Add query会发现,一个grafana是可以对应多个loki的,对于有分布式部署日志收集需求的,就可以在这里设置多个数据来源来查看

配置查看页面5

点击展开图5的步骤2展开列表,选择你想通过哪种label来查看日志,我这里是通过文件名来筛选,然后在步骤4选择具体的文件。步骤5、6分别是查询的类型和最多限制多少行日志分页。 并且你还可以自己编写resulting selector(点击步骤8可以看到具体的语法详情)

配置查看页面6

在这里就基本大功告成了,你可以看到日志在时间内的产生速率,和各种过滤日志的方法,比如服务的日志存在大量重复的输出,就可以在Dedup(去重)处选择Signature,以特征过滤重复日志。根据图5中的步骤6来设置的每段多少行内容进行的分段等等。在这里还差一点,那就是我想看某一个时间段的日志,或者项目新版本刚上线,需要实时观看日志的产生可不可以呢?当然可以

live stream

在grafana看板右上角有按照你的需求查询和实时查询两个选项。可以通过这两个按钮查找所需时段的日志和实时产生的日志。在这里基本就是lpg的全部功能了。

end

到这里lpg部署的全部工作就完成了,如果你看到了这里,恭喜你学会了一套轻量、横向扩展的日志系统。如果在部署的过程中遇到了跟本文流程冲突的bug或者问题。可以在评论区留言或者通过邮箱联系我,我会尽量解答你的问题~

使用gin作网关, etcd作服务发现实现go-grpc微服务的保姆级教程

在网上有着数不清的帖子教学如何单独搭建grpc与etcd、如何使用go-grpc-gateway等等教程,如果你在读过这一大堆的文章教程之后,
仍然是没有搞懂网关如何接管RESTAPI的请求并转发、服务间如何通信、网关如何鉴权,以及这每一块积木我都知道是做什么的,到底如何
拼起来这样的疑惑,可以通过读完这篇文章,快速构建起整个微服务框架。

Before Started

在开始之前,你应该先有golang的基础,如果还不明白golang怎么用,就先不要看这里了。

如果你想直接看demo,或者后面有减少文章字数需要用到源码的地方,源码,点击获取

etcd

首先,如果你还未搭建起ETCD服务,可以借鉴这份docker-compose,或者百度谷歌上有各种教程。

etcd docker-compose file

protoc

1
2
3
git clone https://github.com/golang/protobuf.git && cd protobuf
go install ./proto
go install ./protoc-gen-go

Let’s Code

首先,你需要两个项目,分别是 gateway 和 ping, 分别是网关和其中的微服务,对于多个微服务之间的通信会在最后提到。

ping 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
- discovery 服务注册
- instance.go
- register.go
- resolver.go
- proto proto文件
- pb
- ping.pb.go
- ping.proto
- service
- ping_service.go
- main.go main文件
- go.mod
- go.sum

discovery

discovery文件是用来向ETCD注册服务、keep-alive、获取服务信息等功能的一套示例,对还在学习如何搭建起一套微服务框架的你,最好在之后再去搞懂这个文件的作用,目前先把源码展示出来,将它按照结构复制进去即可,我会在之后的帖子详细讲述这些文件的内容都做了什么

discovery

在后面说到网关项目的时候,会附上一段如果你不想跑封装好的discovery而是使用原生包的源码,可以在gateway参考一下

proto

按照结构创建文件,并在user.proto内写入如下内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";

package ping;

option go_package = "/proto/pb;pb";


service Ping {
rpc Pong (Req) returns (Resp) {}
}

message Req {
string name = 1;
}

message Resp {
string msg = 1;
}

在项目文件夹内命令行输入命令生成pb文件

1
protoc -I . --go_out=plugins=grpc:. proto/pb/*.proto

service

找到pingServer的interface, 并在自己的service文件中补齐interface内的方法

ping.pb.go

1
2
3
type PingServer interface {
Pong(context.Context, *Req) (*Resp, error)
}

写入service/ping_service.go, 并补齐缺失的import、user的model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package service

import (
"context"
"ping/proto/pb"
)

type PingService struct {
}

func NewPingService() *PingService {
return &PingService{}
}

func (s *PingService) Pong(ctx context.Context, in *pb.Req) (*pb.Resp, error) {

msg := fmt.Sprintf("hello %s", in.Name)

return &pb.Resp{
Msg: msg,
}, nil
}

main.go

最后就是main文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
"fmt"
"net"
"ping/discovery"
"ping/proto/pb"
"ping/service"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

const (
// 微服务名称
app = "ping"
// 微服务地址
grpcAddress = "127.0.0.1:10004"
)

func main() {
// etcd 地址
etcdAddress := []string{"127.0.0.1:2379"}

// 服务注册
etcdRegister := discovery.NewRegister(etcdAddress, logrus.New())
defer etcdRegister.Stop()

node := discovery.Server{
Name: app,
Addr: grpcAddress,
}

server := grpc.NewServer()
defer server.Stop()

// 绑定service
pb.RegisterPingServer(server, service.NewPingService())

lis, err := net.Listen("tcp", grpcAddress)
if err != nil {
panic(err)
}
defer lis.Close()

if _, err := etcdRegister.Register(node, 10); err != nil {
panic(fmt.Sprintf("start server failed, err: %v", err))
}

logrus.Info("server started listen on ", grpcAddress)
if err := server.Serve(lis); err != nil {
panic(err)
}
}

至此,如果你跟着完成了ping服务,可以跑起项目看一下,如果你有etcd的看板或者你使用了我的compose文件,可以看到/ping/127.0.0.1:10004下的服务

1
2
3
4
5
6
7
// 权重与版本没有声明,weight用于负载均衡
{
"name":"ping",
"addr":"127.0.0.1:10004",
"version":"",
"weight":0
}

gateway 项目结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
- discovery       服务注册 与ping服务内容一致
- middleware
- jwt.go jwt鉴权中间件
- proto proto文件 该文件是ping项目内的proto文件
- pb
- ping.pb.go
- ping.proto
- routers
- ping.go
- route.go
- utils
- res 一些gin相关返回值的封装 复制或者自己写就好
- jwt.go jwt鉴权
- main.go main文件
- go.mod
- go.sum

看到gateway的项目结构可能你会有两个小问题:

  • 为什么选择在网关处使用gin而不是grpc-gateway
    • 答:是因为gin更符合我以前做单体页面时候的习惯,你可以像以前一样使用中间件、路由,还有比这个更好的吗?
  • 为什么gateway里还有微服务的protobuf文件,我以后的项目也要这样吗?
    • 答:当然不是,我在这里只是偷了个懒,实际生产中你应该将所有的微服务模块上传git服务器,通过包的形式引入各个项目,在服务之间使用服务发现通讯也是如此。假如你有一个非常基础的模块user,模块内有user的model、方法,你更应该将user这个模块通过golang包的形式引入其他项目而不是同时维护多个model(不用想也知道维护多个model的后果是什么吧?)

discovery文件

在ping服务有讲,不再重复

proto文件

引用了ping服务的proto文件

routes

ping.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package routes

import (
"gateway/proto/pb"
"gateway/utils/res"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
)

func addPingRoute(rg *gin.RouterGroup) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
// 在这里出现了负载均衡,在完成了整个项目之后你可以创建两个ping服务同时注册
// 同时通过网关请求多次试一试
// 你会发现 一般情况下,两个ping服务会平分请求,也就达到了负载均衡
// 具体均衡规则还可以通过配置weight来实现
grpc.WithBalancerName(roundrobin.Name),
}

// 通过etcd做服务发现 在这里你并不是通过请求ping服务的10004来连接而是通过etcd去寻找ping服务
pingConn, err := grpc.Dial("etcd:///ping", opts...)
if err != nil {
logrus.Errorf("try connect ping service failed")
}
// 不要调用pingConn.Close() 因为addRoute方法只是导入了路由与对应的方法
// 在读取完毕之后会关闭链接 读取完毕之后的请求会无法链接 因为链接已经关闭
// 如果你一定要严谨的关闭 就把它移出去到公共的地方注册链接 并创建方法在优雅关闭内结束

pingClient := pb.NewPingClient(pingConn)

// 在这里 分组 路由的方法等 如果用过单体gin的同学是不是很眼熟了
ping := rg.Group("/ping")
{
ping.GET("", func(ctx *gin.Context) {
name := ctx.Query("name")

req := &pb.Req{
Name: name,
}

resp, err := pingClient.Pong(ctx, req)
if err != nil {
logrus.Errorln("/v1/ping err, err: ", err)
res.InternalError(ctx)
return
}

// 这里是自己封装的方法
res.Ok(ctx, res.OK, resp.Msg)
})
}
}

route.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package routes

import (
"gateway/middleware"
"gateway/utils/res"
"net/http"

"github.com/gin-gonic/gin"
)

// LoadGin 初始化gin 注册路由
func LoadGin() *gin.Engine {

g := gin.Default()

g.Use(middleware.Cors())

getRoute(g)

g.NoRoute(func(ctx *gin.Context) {
res.Error(ctx, http.StatusNotFound, res.UrlNotFound)
})

return g
}

func getRoute(g *gin.Engine) {

v1 := g.Group("/v1")
addPingRoute(v1)

// ...
}

main.go

终于,gateway项目也要完成了(jwt鉴权在跑通gateway与ping之后)

main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"gateway/discovery"
"gateway/routes"
"gateway/utils"
"net/http"
"time"

"github.com/sirupsen/logrus"
"google.golang.org/grpc/resolver"
)

func main() {

// etcd注册
r := discovery.NewResolver([]string{"localhost:2379"}, logrus.New())
defer r.Close()
resolver.Register(r)

// 初始化gin 加载路由
g := routes.LoadGin()

server := &http.Server{
Addr: ":8080",
Handler: g,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}

go func() {
// 优雅关闭http server 源码内有该方法,你也可以直接抹掉这一行
// 为什么要优雅关闭server?
// https://studygolang.com/articles/12600
utils.GracefullyShutdown(server)
}()

logrus.Info("gateway listen on :8080")

if err := server.ListenAndServe(); err != nil {
logrus.Fatal("gateway启动失败, err: ", err)
}
}

现在 跑起gateway与ping服务,用postman等请求工具请求一下localhost:8080/v1/ping?name=jay

或者直接命令行请求

1
curl -v "http:/127.0.0.1:8080/v1/ping?name=jay"

(我这里用的wsl2, 还配置了代理,需要请求windows的地址,所以我习惯用工具请求)

如果你得到的是

1
{"code":1001,"data":"hello jay","msg":"成功"}% 

恭喜你已经打通整个微服务,如果你已经可以熟练的使用gin或者知道如何使用jwt中间件,到这里你就完成了完整的流程。好像还有一点没提到,就是两个微服务之间如何相互请求,答案就在gateway的routes/ping.go文件里,通过服务发现去请求咯。

使用jwt中间件鉴权

如果你还不知道如何使用gin的中间件或者就是想看一下,接下来是如何使用jwt。做法是在网关处统一声明需要和不需要鉴权的路由,在通过http请求的时候,会通过gin来验证权限。而在微服务内部,使用服务发现的形式的请求就不需要鉴权服务了。

middleware/jwt.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package middleware

import (
"gateway/utils"
"gateway/utils/res"
"time"

"github.com/gin-gonic/gin"
)

// JWT token的生成是使用用户名、密码等一系列信息生成 在解密的时候可以拿到这些信息
func JWT() gin.HandlerFunc {
return func(ctx *gin.Context) {
var claims *utils.Claims
var err error

code := res.OK

// 对于我个人喜欢将鉴权的token放到header中的Authorization中
// 如果你有自己的想法,可以在这里获取你将token放的位置
token := ctx.GetHeader("Authorization")
if token == "" {
res.Unauthorized(ctx, res.TokenInvalid)

ctx.Abort()
return
}

// 验证
claims, err = utils.ParseToken(token)
if err != nil {

res.Unauthorized(ctx, res.TokenInvalid)

ctx.Abort()
return
}

// 判断是否过期
if time.Now().Unix() > claims.ExpiresAt {
code = res.TokenExpired
} else if err != nil {
code = res.TokenInvalid
}

if code == res.TokenExpired {
res.Ok__(ctx, res.TokenExpired)
} else if code != res.OK {

res.Unauthorized(ctx, code)

ctx.Abort()
return
}

// 我在这里的操作是将token对应的user放在了上下文内,
// 对于需要获取用户的方法,只通过这里就可以拿到userid
// 而不再需要通过传输参数的方式传入用户信息
ctx.Set("userid", claims.Userid)
ctx.Set("username", claims.Username)
ctx.Next()
}
}

gateway/routes/ping.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package routes

import (
"gateway/middleware"
"gateway/proto/pb"
"gateway/utils/res"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
)

func addPingRoute(rg *gin.RouterGroup) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithBalancerName(roundrobin.Name),
}

// 通过etcd做服务发现
pingConn, err := grpc.Dial("etcd:///ping", opts...)
if err != nil {
logrus.Errorf("try connect ping service failed")
}
// 不要调用pingConn.Close() 因为addRoute方法只是导入了路由与对应的方法
// 在读取完毕之后会关闭链接 在读取完毕之后的请求会无法链接 因为链接已经关闭
// 如果你一定要严谨的关闭 就把它移出去到公共的地方注册链接 并创建方法在优雅关闭内结束链接

pingClient := pb.NewPingClient(pingConn)

ping := rg.Group("/ping")
// 在这里使用中间件
ping.Use(middleware.JWT())
{
ping.GET("", func(ctx *gin.Context) {
name := ctx.Query("name")

req := &pb.Req{
Name: name,
}

resp, err := pingClient.Pong(ctx, req)
if err != nil {
logrus.Errorln("/v1/ping err, err: ", err)
res.InternalError(ctx)
return
}

res.Ok(ctx, res.OK, resp.Msg)
})
}
}

然后再去请求你就可以实现jwt的鉴权服务了,在需要的路由添加鉴权中间件即可。

如果有同学还不知道jwt的token是怎么生成的,就在 “utils/jwt.go” 中,注释写的很全。流程是用户登录之后,你将用户信息生成token返回给用户,之后用户每次请求都将token存在请求头中的Authorization中,就可以实现鉴权了,别忘了在token过期之后通知用户重新获取token~。

end

最后的最后,如果有疑问、觉得我哪里没有讲清楚、自己跑不起来或者报错等等等等,可以留言或者email我。我写how to的唯一原因,就是想干掉所有写的不清不楚、看了等于没看的how to。

  • Copyrights © 2019-2021 Jayj
  • 访问人数: | 浏览次数:

buy me a cup of coffee?

支付宝
微信