龙空技术网

go语言实现一个简单的简单网关

地鼠文档 1695

前言:

而今同学们对“nginxurlparse”大约比较关心,你们都想要剖析一些“nginxurlparse”的相关文章。那么小编也在网络上搜集了一些对于“nginxurlparse””的相关资讯,希望各位老铁们能喜欢,我们快快来学习一下吧!

网关=反向代理+负载均衡+各种策略,技术实现也有多种多样,有基于 nginx 使用 lua 的实现,比如 openresty、kong;也有基于 zuul 的通用网关;还有就是 golang 的网关,比如 tyk。

这篇文章主要是讲如何基于 golang 实现一个简单的网关。

转自:troy.wang/docs/golang/posts/golang-gateway/

整理:go语言钟文文档:

1. 预备1.1. 准备两个后端 web 服务

启动两个后端 web 服务(代码)

type RealServer struct { Addr string}func (r *RealServer) Run() { log.Println("start http server at " + r.Addr) mux := http.NewServeMux() mux.HandleFunc("/", r.EchoHandler) mux.HandleFunc("/base/error", r.ErrorHandler) mux.HandleFunc("/timeout", r.TimeoutHandler) server := &http.Server{  Addr:         r.Addr,  WriteTimeout: time.Second * 3,  Handler:      mux, } go func() {  log.Fatal(server.ListenAndServe()) }()}func (r *RealServer) EchoHandler(w http.ResponseWriter, req *http.Request) { upath := fmt.Sprintf("\n", r.Addr, req.URL.Path) realIP := fmt.Sprintf("RemoteAddr=%s,X-Forwarded-For=%v,X-Real-Ip=%v\n", req.RemoteAddr, req.Header.Get("X-Forwarded-For"), req.Header.Get("X-Real-Ip")) header := fmt.Sprintf("headers =%v\n", req.Header) io.WriteString(w, upath) io.WriteString(w, realIP) io.WriteString(w, header)}func (r *RealServer) ErrorHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(500) io.WriteString(w, "error handler")}func (r *RealServer) TimeoutHandler(w http.ResponseWriter, req *http.Request) { time.Sleep(6 * time.Second) w.WriteHeader(200) io.WriteString(w, "timeout handler")}func main() { rs1 := &RealServer{Addr: "127.0.0.1:2003"} rs1.Run() rs2 := &RealServer{Addr: "127.0.0.1:2004"} rs2.Run() quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit}
1.2. 访问工具

这里使用命令行工具进行测试

curl -v 
2. 反向代理2.1. 单后端(target)反向代理

具体代码

package mainimport ( "log" "net/http" "net/http/httputil" "net/url")var ( addr = "127.0.0.1:2002")func main()  { rsUrl, _:=url.Parse(";) reversePorxy := httputil.NewSingleHostReverseProxy(rsUrl) log.Println("Starting Httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, reversePorxy))}

直接使用基础库 httputil 提供的NewSingleHostReverseProxy即可,返回的reverseProxy对象实现了serveHttp方法,因此可以直接作为 handler。

2.2. 分析反向代理代码,并添加修改 response 内容

具体代码

package mainimport ( "bytes" "fmt" "io/ioutil" "log" "net/http" "net/http/httputil" "net/url" "strings")var ( addr = "127.0.0.1:2002")func main()  { rsUrl, _:=url.Parse(";) reversePorxy := NewSingleHostReverseProxy(rsUrl) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, reversePorxy))}func NewSingleHostReverseProxy(target *url.URL) *httputil.ReverseProxy { targetQuery := target.RawQuery director := func(req *http.Request) {  req.URL.Scheme = target.Scheme  req.URL.Host = target.Host  req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)  if targetQuery == "" || req.URL.RawQuery == "" {   req.URL.RawQuery = targetQuery + req.URL.RawQuery  } else {   req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery  }  if _, ok := req.Header["User-Agent"]; !ok {   // explicitly disable User-Agent so it's not set to default value   req.Header.Set("User-Agent", "")  }  // add when the reverseproxy is the first rp  req.Header.Set("X-Real-Ip", strings.Split(req.RemoteAddr, ":")[0]) } modifyFunc := func(res *http.Response) error {  if res.StatusCode != http.StatusOK {      oldPayLoad, err := ioutil.ReadAll(res.Body)      if err != nil {       return err      }      newPayLoad := []byte("hello " + string(oldPayLoad))      res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad))      res.ContentLength = int64(len(newPayLoad))      res.Header.Set("Content-Length",fmt.Sprint(len(newPayLoad)))  }  return nil } return &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc}}func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") switch { case aslash && bslash:  return a + b[1:] case !aslash && !bslash:  return a + "/" + b } return a + b}

director中定义回调函数,入参为*http.Request,决定如何构造向后端的请求,比如 host 是否向后传递,是否进行 url 重写,对于 header 的处理,后端 target 的选择等,都可以在这里完成。

director在这里具体做了:

根据后端 target,构造到后端请求的 url选择性传递必要的 header设置代理相关的 header,比如X-Forwarded-For、X-Real-IpX-Forwarded-For记录经过的所有代理,以proxyIp01, proxyIp02, proxyIp03的格式记录,由于是追加,可能被篡改,当然,如果第一代理以覆盖该头的方式进行记录,也是可信的X-Real-Ip用于记录客户端 IP,一般放在第一代理上,用于记录客户端的来源公网 IP,可信

modifyResponse中定义回调函数,入参为*http.Response,用于修改响应的信息,比如响应的 Body,响应的 Header 等信息。

最终依旧是返回一个ReverseProxy,然后将这个对象作为 handler 传入即可。

2.3. 支持多个后端服务器

参考 2.2 中的NewSingleHostReverseProxy,只需要实现一个类似的、支持多 targets 的方法即可,具体实现见后面。

3. 负载均衡

作为一个网关服务,在上面 2.3 的基础上,需要支持必要的负载均衡策略,比如:

随机轮询加权轮询一致性 hash3.1. 负载均衡算法3.1.1. 随机

随便 random 一个整数作为索引,然后取对应的地址即可,实现比较简单。

具体代码

type RandomN struct { rss []string}func (r *RandomN) Add(params ...string) error { if len(params) != 1 {  return fmt.Errorf("param length should be one") } r.rss = append(r.rss, params[0]) return nil}func (r *RandomN) Next() string { if len(r.rss) == 0 {  return "" } return r.rss[rand.Intn(len(r.rss))]}func (r *RandomN) Get(key string) (string, error) { return r.Next(), nil}
3.1.2. 轮询

使用curIndex进行累加计数,一旦超过 rss 数组的长度,则重置。

具体代码

type RR struct { curIndex int rss []string}func (r *RR) Add(params ...string) error { if len(params) != 1 {  return fmt.Errorf("param length should be one") } r.rss = append(r.rss, params[0]) return nil}func (r *RR) Next() string { if len(r.rss) == 0 {  return "" } if r.curIndex == len(r.rss) {  r.curIndex = 0 } node := r.rss[r.curIndex] r.curIndex++ return node}func (r *RR) Get(key string) (string, error) { return r.Next(), nil}
3.1.3. 加权轮询

轮询带权重,如果使用计数递减的方式,如果权重是5,1,1那么后端 rs 依次为a,a,a,a,a,b,c,a,a,a,a…,其中 a 后端会瞬间压力过大;参考 nginx 内部的加权轮询,或者应该称之为平滑加权轮询,思路是:

后端真实节点包含三个权重:

本身权重 weight —— 设置的权重有效权重 effectiveWeight —— 根据后端节点健康状态动态变化,当异常时,减一;当正常时,加一,最多到 weight 值当前权重 curWeight —— 初始值为 weight,计算时curWeight += effectiveWeight,如果curWeight最大,则被选中,然后curWeight -= total

操作步骤:

计算 curWeight选取最大 curWeight 的节点重新计算 curWeight

具体代码

type WeightedRR struct { rss []*WeightedNode}type WeightedNode struct { addr string weight int curWeight int effectiveWeight int}func (r *WeightedRR) Add(params ...string) error { if len(params) != 2 {  return fmt.Errorf("param length should be two") } addr := params[0] weight, err := strconv.ParseInt(params[1], 10, 64) if err != nil {  return err } node := &WeightedNode{  addr:            addr,  weight:          int(weight),  curWeight:       int(weight),  effectiveWeight: int(weight), } r.rss = append(r.rss, node) return nil}func (r *WeightedRR) Next() string { // 平滑加权轮询 --> 1 计算 total, 2 变更临时权重 3. 选择最大临时权重 4。 变更临时权重 total := 0 var best *WeightedNode for _, node := range r.rss {  n := node  total += n.effectiveWeight  n.curWeight += n.effectiveWeight  if best == nil || n.curWeight > best.curWeight {   best = n  } } if best == nil {  return "" } best.curWeight -= total return best.addr}func (r *WeightedRR) Get(key string) (string, error) { return r.Next(), nil}
3.1.4. 一致性 hash

一致性 hash 算法,主要是用于分布式 cache 热点/命中问题;这里用于基于某 key 的 hash 值,路由到固定后端,但是只能是基本满足流量绑定,一旦后端目标节点故障,会自动平移到环上最近的那么个节点。

实现:

首先存在一个环,环上的每个点都能被选择的 hash 函数映射到然后将后端真实节点+序号(副本数)映射到环上当请求进来的时候,使用某*特定组成的 key *代入 hash 函数计算得到一个位置如果 key 是由 url 组成,那就是** url hash**如果 key 是由 remoteIp 组成,那么就是** IP hash**使用二分查找,找到其在环上的下一个节点

具体代码

type Keys []uint32func (k Keys) Less(i,  j int) bool { return k[i] < k[j]}func (k Keys) Swap(i, j int)  { k[i], k[j] = k[j], k[i]}func (k Keys) Len() int { return len(k)}type ConsistentHash struct { mux sync.RWMutex hash func(data []byte) uint32 replicas int keys Keys hashMap map[uint32]string}func NewConsistentHash(replicas int, fn func(data []byte) uint32) *ConsistentHash { m := &ConsistentHash{  hash:     fn,  replicas: replicas,  hashMap:  make((map[uint32]string)), } if m.hash == nil {  m.hash = crc32.ChecksumIEEE } return m}func (c *ConsistentHash) Add(params ...string) error { if len(params) == 0 {  return errors.New("param len 1 at least") } addr := params[0] c.mux.Lock() defer c.mux.Unlock() for i := 0; i < c.replicas; i++ {  hash := c.hash([]byte(strconv.Itoa(i) + addr))  c.keys = append(c.keys, hash)  c.hashMap[hash] = addr } sort.Sort(c.keys) return nil}func (c *ConsistentHash) IsEmpty() bool { return len(c.keys) == 0}func (c *ConsistentHash) Get(key string) (string, error) { if c.IsEmpty() {  err := fmt.Errorf("nodes empty")  return "", err } hash := c.hash([]byte(key)) idx := sort.Search(len(c.keys), func(i int) bool {  return c.keys[i] >= hash }) if idx == len(c.keys) {  idx = 0 } return c.hashMap[c.keys[idx]], nil}
3.2. 通用接口/工厂模式
type LoadBalanceStrategy interface { Add(...string) error Get(string) (string, error)}

每一种不同的负载均衡算法,只需要实现添加以及获取的接口即可。

type LbType intconst ( LbRandom LbType = iota LbRoundRobin LbWeightRoundRobin LbConsistentHash)func LoadBanlanceFactory(lbType LbType) LoadBalanceStrategy { switch lbType { case LbRandom:  return &RandomN{} case LbConsistentHash:  return NewConsistentHash(10, nil) case LbRoundRobin:  return &RR{} case LbWeightRoundRobin:  return &WeightedRR{} default:  return &RR{} }}

然后使用工厂方法,根据传入的参数,决定使用哪种负载均衡策略。

3.3. 支持负载均衡算法的反向代理实现使用LoadBanlanceFactory工厂函数,传入负载均衡类型,获取负载均衡对象添加后端真实节点然后初始化NewMultiTargetsReverseProxy,在 director 回调函数中,根据负载均衡策略获取要请求的后端真实节点剩下的逻辑同2.2

具体代码

func NewMultiTargetsReverseProxy(lb lb_strategy.LoadBalanceStrategy) *httputil.ReverseProxy { director := func(req *http.Request) {  remoteIP := strings.Split(req.RemoteAddr, ":")[0]  nextAddr, err := lb.Get(remoteIP)  if err != nil {   log.Fatal("get next addr fail")  }  target, err := url.Parse(nextAddr)  if err != nil {   log.Fatal(err)  }  targetQuery := target.RawQuery  req.URL.Scheme = target.Scheme  req.URL.Host = target.Host  req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)  if targetQuery == "" || req.URL.RawQuery == "" {   req.URL.RawQuery = targetQuery + req.URL.RawQuery  } else {   req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery  }  if _, ok := req.Header["User-Agent"]; !ok {   req.Header.Set("User-Agent", "user-agent")  } } modifyFunc := func(resp *http.Response) error {  //请求以下命令:curl ';  if resp.StatusCode != 200 {   //获取内容   oldPayload, err := ioutil.ReadAll(resp.Body)   if err != nil {    return err   }   //追加内容   newPayload := []byte("StatusCode error:" + string(oldPayload))   resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))   resp.ContentLength = int64(len(newPayload))   resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))  }  return nil } errFunc := func(w http.ResponseWriter, r *http.Request, err error) {  //todo 如果是权重的负载则调整临时权重  http.Error(w, "ErrorHandler error:"+err.Error(), 500) } return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc}}func singleJoiningSlash(a, b string) string { aslash := strings.HasSuffix(a, "/") bslash := strings.HasPrefix(b, "/") switch { case aslash && bslash:  return a + b[1:] case !aslash && !bslash:  return a + "/" + b } return a + b}func main()  { rb := lb_strategy.LoadBanlanceFactory(lb_strategy.LbConsistentHash) rb.Add(";) rb.Add(";) rb.Add(";) proxy := NewMultiTargetsReverseProxy(rb) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, proxy))}
4. 中间件

作为网关,中间件必不可少,这类包括请求响应的模式,一般称作洋葱模式,每一层都是中间件,一层层进去,然后一层层出来。

中间件的实现一般有两种,一种是使用数组,然后配合 index 计数;一种是链式调用。

4.1. 基于数组的中间件实现NewSliceRouterHandler 获取SliceRouterHandler对象,该对象实现了Hanlder接口,可以作为handler传入 http 服务ServeHTTP方法中,调用newSliceRouterContext初始化1. SliceRouterContext,并且根据req中的 url,按照最长 url 前缀匹配的规则寻找groups中满足条件的SliceGroup丢给SliceRouterContextServeHTTP方法中,调用Next方法开始整个handlers数组的handler调用SliceRouterHandler包含coreFunc以及SliceRouter 对象SliceRouter包含SliceGroup列表SliceGroup对象包含path以及handlers使用Use方法来添加中间件,并且去重添加到SliceRouter中的groups中去使用Group方法初始化一个SliceGroup贯穿整条调用链的是SliceRouterContext对象,包含:SliceGroup指针ResponseWriterRequest指针Contextindex索引中间件中可以调用SliceRouterContext中的Next方法继续,也可以调用Abort方法进行终止Abort终止的方式就是设置索引 index 为abortIndex

具体代码

const abortIndex int8 = math.MaxInt8 / 2type HandlerFunc func(*SliceRouterContext)type SliceRouter struct {   groups []* SliceGroup}type SliceGroup struct {   *SliceRouter   path string   handlers []HandlerFunc}// slice router contexttype SliceRouterContext struct {   *SliceGroup   RespW http.ResponseWriter   Req *http.Request   Ctx context.Context   index int8}func newSliceRouterContext(rw http.ResponseWriter, req *http.Request, r *SliceRouter) *SliceRouterContext  {   newSliceGroup := &SliceGroup{}   matchUrlLen := 0   for _, group := range r.groups {      if strings.HasPrefix(req.RequestURI, group.path) {         pathLen := len(group.path)         if pathLen > matchUrlLen {            matchUrlLen = pathLen            *newSliceGroup = *group //浅拷贝数组指针         }      }   }   c := &SliceRouterContext{RespW: rw, Req: req, SliceGroup: newSliceGroup, Ctx: req.Context()}   c.Reset()   return c}// 获取上下文值func (ctx *SliceRouterContext) Get(key interface{}) interface{} {   return ctx.Ctx.Value(key)}// 设置上下文值func (ctx *SliceRouterContext) Set(key, val interface{}) {   ctx.Ctx = context.WithValue(ctx.Ctx, key, val)}//func (ctx *SliceRouterContext) Next()  {   ctx.index++   for ctx.index < int8(len(ctx.groups)) {      ctx.handlers[ctx.index](ctx)      ctx.index++   }}// 重置 handlers 数组计数func (ctx *SliceRouterContext) Reset()  {   ctx.index = -1}func (ctx *SliceRouterContext) Abort() {   ctx.index = abortIndex}// 是否跳过了回调func (ctx *SliceRouterContext) IsAborted() bool {   return ctx.index >= abortIndex}// sliceRouterHandlertype SliceRouterHandler struct {   coreFunc func(*SliceRouterContext) http.Handler   router   *SliceRouter}func NewSliceRouterHandler(coreFunc func(*SliceRouterContext) http.Handler, router *SliceRouter) *SliceRouterHandler {   return &SliceRouterHandler{      coreFunc: coreFunc,      router:   router,   }}func (w *SliceRouterHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {   c := newSliceRouterContext(rw, req, w.router)   if w.coreFunc != nil {      c.handlers = append(c.handlers, func(c *SliceRouterContext) {         w.coreFunc(c).ServeHTTP(rw, req)      })   }   c.Reset()   c.Next()}// 构造 routerfunc NewSliceRouter() *SliceRouter {   return &SliceRouter{}}// 创建 Groupfunc (g *SliceRouter) Group(path string) *SliceGroup {   return &SliceGroup{      SliceRouter: g,      path:        path,   }}// 构造回调方法func (g *SliceGroup) Use(middlewares ...HandlerFunc) *SliceGroup {   g.handlers = append(g.handlers, middlewares...)   existsFlag := false   for _, oldGroup := range g.SliceRouter.groups {      if oldGroup == g {         existsFlag = true      }   }   if !existsFlag {      g.SliceRouter.groups = append(g.SliceRouter.groups, g)   }   return g}``` tracelog 中间件 具体代码```gofunc TraceLogSliceMiddleware() func(c *SliceRouterContext) {   return func(c *SliceRouterContext) {      log.Println("trace_in")      c.Abort()      log.Println("trace_out")   }}``` 中间件使用 具体代码```govar addr = "127.0.0.1:2002"func main() {   reverseProxy := func(c *middleware.SliceRouterContext) http.Handler {      rs1 := ";      url1, err1 := url.Parse(rs1)      if err1 != nil {         log.Println(err1)      }      rs2 := ";      url2, err2 := url.Parse(rs2)      if err2 != nil {         log.Println(err2)      }      urls := []*url.URL{url1, url2}      return proxy.NewMultipleHostsReverseProxy(c, urls)   }   log.Println("Starting httpserver at " + addr)   sliceRouter := middleware.NewSliceRouter()   sliceRouter.Group("/base").Use(middleware.TraceLogSliceMiddleware(), func(c *middleware.SliceRouterContext) {      c.RespW.Write([]byte("test func"))   })   sliceRouter.Group("/").Use(middleware.TraceLogSliceMiddleware(), func(c *middleware.SliceRouterContext) {      fmt.Println("reverseProxy")      reverseProxy(c).ServeHTTP(c.RespW, c.Req)   })   routerHandler := middleware.NewSliceRouterHandler(nil, sliceRouter)   log.Fatal(http.ListenAndServe(addr, routerHandler))}

标签: #nginxurlparse