龙空技术网

Go 连接池相关总结:HTTP、RPC、Redis 和数据库等

Go语言中文网 3913

前言:

而今看官们对“数据库连接有哪些协议”大概比较关怀,小伙伴们都想要学习一些“数据库连接有哪些协议”的相关文章。那么小编在网上网罗了一些关于“数据库连接有哪些协议””的相关知识,希望你们能喜欢,小伙伴们一起来了解一下吧!

本文作者 Xargin,个人博客:。

http 标准库服务端请求处理

package mainimport ( "io" "log" "net/http")func sayhello(wr http.ResponseWriter, r *http.Request) { wr.Header()["Content-Type"] = []string{"application/json"} io.WriteString(wr, "hello")}func main() { http.HandleFunc("/", sayhello) http.ListenAndServe(":9090", nil)}

1-1

每一个请求启动一个 goroutine,读取完毕之后,调用用户传入的 handler(没有的话就用默认的),在同一连接进行 response 响应。整体上是个 request/response loop 模型。

客户端连接池

type Transport struct { idleMu       sync.Mutex closeIdle    bool                                // user has requested to close all idle conns idleConn     map[connectMethodKey][]*persistConn // most recently used at end idleConnWait map[connectMethodKey]wantConnQueue  // waiting getConns idleLRU      connLRU connsPerHostMu   sync.Mutex connsPerHost     map[connectMethodKey]int connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns    // MaxIdleConns controls the maximum number of idle (keep-alive) // connections across all hosts. Zero means no limit. MaxIdleConns int // MaxIdleConnsPerHost, if non-zero, controls the maximum idle // (keep-alive) connections to keep per-host. If zero, // DefaultMaxIdleConnsPerHost is used. MaxIdleConnsPerHost int // MaxConnsPerHost optionally limits the total number of // connections per host, including connections in the dialing, // active, and idle states. On limit violation, dials will block. // // Zero means no limit. MaxConnsPerHost int // IdleConnTimeout is the maximum amount of time an idle // (keep-alive) connection will remain idle before closing // itself. // Zero means no limit. IdleConnTimeout time.Duration}

transport 和 client 是一一对应,每个 tranport 内有自己的 connpool, idleConn 的结构是:map[connectMethodKey][]*persistConn,这个 map 的 key 是个数据结构:

// connectMethodKey is the map key version of connectMethod, with a// stringified proxy URL (or the empty string) instead of a pointer to// a URL.type connectMethodKey struct { proxy, scheme, addr string onlyH1              bool}

proxy 地址 + 协议 + 地址,以及是否只支持 http1,构成该 map 的 key,proxy 地址是完整的 proxy 地址,比如 export HTTP_PROXY=localhost:1081,则该地址为用户提供的字符串。scheme 一般是 http:// 或 https:// 之类的字符串,addr 包含完整的域名(或 IP)和端口。

getConn:

2

在 http2 中,同一个连接可以被重复使用,所以 http2 的逻辑里,该连接被返回后仍然保持在连接池里。是否可以重复使用由 pconn.alt 来决定。

tryPutIdleConn

3

如果有正在等待连接的 goroutine,那么就把这条连接 deliver 给相应的 goroutine,这会触发相应的 ready 操作,使阻塞中的 goroutine 被唤醒继续处理请求。

否则将连接放回到 Transport 的 idleConn 和 idleLRU 中。

readloop 和 writeloop

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { go pconn.readLoop() go pconn.writeLoop() return pconn, nil}

所以每个 conn 都会有相应的 readloop 和 writeloop,因此每个连接至少有两个 goroutine。

用户协程在使用 http.Client 发送请求时,一路到 http.Transport.roundTrip -> http.persistConn.roundTrip:

 pc.writech <- writeRequest{req, writeErrCh, continueCh} resc := make(chan responseAndError) pc.reqch <- requestAndChan{  req:        req.Request,  ch:         resc,  addedGzip:  requestedGzip,  continueCh: continueCh,  callerGone: gone, }

在该函数中,将 request 和接收请求的 ch 传入到 reqch,把 writeRequest 写入到 writech。

writeloop 从 writech 中收到了写请求,会把内容写入到 conn 上,这个请求也就发给 server 端了readloop 收到 requestAndChan 结果,上面 writeloop 相当于已经把请求数据发送到 server 端,readloop 这时候可以从 conn 上读出 server 发回的 response 数据,所以 readloop 主要做的就是 ReadResponse,然后把 response 的内容写入到 requestAndChan.ch 中。主协程只要监听 requestAndChan.ch 来接收相应的 response 即可(用 select 同时监听 err、连接关闭等 chan)。

这里 http 标准库的做法要参考一下,把接收数据和相应的错误处理代码可以都集中在一起:

 for {  testHookWaitResLoop()  select {  case err := <-writeErrCh: // 往 server 端写数据异常   if debugRoundTrip {    req.logf("writeErrCh resv: %T/%#v", err, err)   }   if err != nil {    pc.close(fmt.Errorf("write error: %v", err))    return nil, pc.mapRoundTripError(req, startBytesWritten, err)   }   if d := pc.t.ResponseHeaderTimeout; d > 0 {    if debugRoundTrip {     req.logf("starting timer for %v", d)    }    timer := time.NewTimer(d)    defer timer.Stop() // prevent leaks    respHeaderTimer = timer.C   }  case <-pc.closech: // 连接关闭异常   if debugRoundTrip {    req.logf("closech recv: %T %#v", pc.closed, pc.closed)   }   return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)  case <-respHeaderTimer: // 读请求头超时   if debugRoundTrip {    req.logf("timeout waiting for response headers.")   }   pc.close(errTimeout)   return nil, errTimeout  case re := <-resc: // 正常地从 response 的 channel 里读到了响应数据   if (re.res == nil) == (re.err == nil) {    panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))   }   if debugRoundTrip {    req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)   }   if re.err != nil {    return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)   }   return re.res, nil  case <-cancelChan: // 用户侧通过 context 取消了流程   pc.t.CancelRequest(req.Request)   cancelChan = nil  case <-ctxDoneChan: // 这个应该意思差不多   pc.t.cancelRequest(req.Request, req.Context().Err())   cancelChan = nil   ctxDoneChan = nil  } }
http2

4

http2 协议通过 frame 中的 stream id 对请求和响应进行关联。

http2 可以不等待上一个请求响应后再发下一个请求,因此同一个连接上可以实现 multiplexing。标准库中对于 http2 连接的处理复用了 http1 的连接池逻辑,只不过从连接池中取连接时,并没有真的从连接池里把这个连接拿走。获取到的连接依然保留在 connpool 中。

除此之外,h2 的 connpool 和 h1 的没什么区别。

从 idleConn 数组中获取 idle 连接时:

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {   if delivered {    if pconn.alt != nil {     // HTTP/2: multiple clients can share pconn.     // Leave it in the list.    } else {     // HTTP/1: only one client can use pconn.     // Remove it from the list.     t.idleLRU.remove(pconn)     list = list[:len(list)-1]    }   }

把使用完的连接放回连接池时:

 // HTTP/2 (pconn.alt != nil) connections do not come out of the idle list, // because multiple goroutines can use them simultaneously. // If this is an HTTP/2 connection being “returned,” we're done. if pconn.alt != nil && t.idleLRU.m[pconn] != nil {  return nil } if pconn.alt == nil {   // HTTP/1.   // Loop over the waiting list until we find a w that isn't done already, and hand it pconn.   for q.len() > 0 {    w := q.popFront()    if w.tryDeliver(pconn, nil) {     done = true     break    }   }  } else {   // HTTP/2.   // Can hand the same pconn to everyone in the waiting list,   // and we still won't be done: we want to put it in the idle   // list unconditionally, for any future clients too.   for q.len() > 0 {    w := q.popFront()    w.tryDeliver(pconn, nil)   }  }
如果 LRU 列表非空,说明当前没有等待的 goroutine,而在获取 http2 连接时,并没有把连接从连接池中真地拿走,所以直接返回就行了。如果 LRU 列表为空,这条可能是新建的连接,需要把 waitqueue 弹到空,并把当前这条连接放进连接池。fasthttp服务端请求处理

5

fasthttp 的 server 端使用 worker pool 来进行 goroutine 复用,不会频繁创建新的 g。

workerPool.workerFunc 就是每个 worker 的主循环:

func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn for c = range ch.ch {  if c == nil {   break  }  wp.WorkerFunc(c) } wp.lock.Lock() wp.workersCount-- wp.lock.Unlock()}

每次 serve 新的 conn 时:

从 workerpool 中获取一个 worker,没有就新建,启动 workerFunc 主循环,监听 worker channel。把当前 serve 的新连接发送到 worker channelworkerFunc 获取到新 conn,即开始请求处理流程。执行 fasthttp.Server.serveConn客户端连接池

type HostClient struct { // Maximum number of connections which may be established to all hosts // listed in Addr. // // You can change this value while the HostClient is being used // using HostClient.SetMaxConns(value) // // DefaultMaxConnsPerHost is used if not set. MaxConns int // Keep-alive connections are closed after this duration. // // By default connection duration is unlimited. MaxConnDuration time.Duration // Idle keep-alive connections are closed after this duration. // // By default idle connections are closed // after DefaultMaxIdleConnDuration. MaxIdleConnDuration time.Duration // Maximum number of attempts for idempotent calls // // DefaultMaxIdemponentCallAttempts is used if not set. MaxIdemponentCallAttempts int        // Maximum duration for waiting for a free connection. // // By default will not waiting, return ErrNoFreeConns immediately MaxConnWaitTimeout time.Duration clientName  atomic.Value lastUseTime uint32 connsLock  sync.Mutex connsCount int conns      []*clientConn connsWait  *wantConnQueue}
acquireConn

6

流程比较简单,如果当前 client.conns 数组 > 0,说明有空闲连接,直接取最后一个元素就好,这个元素一般是最近放进去的连接。

releaseConn

func (c *HostClient) releaseConn(cc *clientConn) { cc.lastUseTime = time.Now() if c.MaxConnWaitTimeout <= 0 {  c.connsLock.Lock()  c.conns = append(c.conns, cc)  c.connsLock.Unlock()  return } // try to deliver an idle connection to a *wantConn c.connsLock.Lock() defer c.connsLock.Unlock() delivered := false if q := c.connsWait; q != nil && q.len() > 0 {  for q.len() > 0 {   w := q.popFront()   if w.waiting() {    delivered = w.tryDeliver(cc, nil)    break   }  } } if !delivered {  c.conns = append(c.conns, cc) }

releaseConn 会先尽量尝试把当前的连接给正在等待连接的请求(wantConn),弹出等待队列(connsWait)的第一个元素。并把连接转交给该请求。如果该请求的状态已经不是 waiting 了,则继续弹出,直到找到了合适的来接盘,或者等待队列弹空。

如果没有顺利地把连接交出去,把当前连接入空闲连接数组(c.conns)。

需要注意 fasthttp 里的 conns 是连接池,clientConnPool 是 clientConn 对象的对象池。

与标准库中的 client 不同的是,fasthttp 中没有 read write loop,所以每个请求是在当前协程中完成的:

把 request 的 header 和 body 写入到 conn从 conn 中读取 response释放连接、缓存各种过程中生成的 struct 对象gRPC服务端

gRPC 底层基于 http2,所以交互基于 http2 stream,服务端整体流程与 http2 没什么区别。

客户端

在 gRPC 中,客户端没有使用连接池,直接使用了 http2 连接:

Invoke-> invoke -> newClientStream -> newAttemptLocked -> getTransport -> blockingpiker.pick ->  getReadyTransport ->  addrConn.connect -> go ac.resetTransport()

然后一路走到创建 http2Client。

(dlv) bt0  0x00000000013e2539 in google.golang.org/grpc/internal/transport.newHTTP2Client   at /Users/xargin/go/src/google.golang.org/grpc/internal/transport/http2_client.go:1671  0x000000000145a5ca in google.golang.org/grpc/internal/transport.NewClientTransport   at /Users/xargin/go/src/google.golang.org/grpc/internal/transport/transport.go:5752  0x000000000145a5ca in google.golang.org/grpc.(*addrConn).createTransport   at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:12753  0x0000000001459e25 in google.golang.org/grpc.(*addrConn).tryAllAddrs   at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:12054  0x00000000014593b7 in google.golang.org/grpc.(*addrConn).resetTransport   at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:11205  0x000000000105b811 in runtime.goexit   at /usr/local/go/src/runtime/asm_amd64.s:1357
thrift

thrift 官方没有连接池,client 中生成的 seqid 只是用来和服务端返回的 rseqid 进行匹配。

func (p *TStandardClient) Recv(iprot TProtocol, seqId int32, method string, result TStruct) error { rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin() if err != nil {  return err } if method != rMethod {  return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method)) } else if seqId != rSeqId {  return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method)) } else if rTypeId == EXCEPTION {  var exception tApplicationException  if err := exception.Read(iprot); err != nil {   return err  }  if err := iprot.ReadMessageEnd(); err != nil {   return err  }  return &exception } else if rTypeId != REPLY {  return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method)) } if err := result.Read(iprot); err != nil {  return err } return iprot.ReadMessageEnd()}

thrift 的每个 client 对象中包裹了一个 transport:

 ... useTransport, err := transportFactory.GetTransport(transport) client := NewEchoClientFactory(useTransport, protocolFactory) if err := transport.Open(); err != nil {  fmt.Fprintln(os.Stderr, "Error opening socket to 127.0.0.1:9898", " ", err)  os.Exit(1) } defer transport.Close() req := &EchoReq{Msg: "You are welcome."} res, err := client.Echo(context.TODO(), req) ...type EchoClient struct { c thrift.TClient}func NewEchoClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *EchoClient { return &EchoClient{  c: thrift.NewTStandardClient(f.GetProtocol(t), f.GetProtocol(t)), }}

这个包裹的 transport 就是一条单独的 tcp 连接,没有连接池。

redigo

redigo 是个 client 库,没有服务端:

type Pool struct { // Dial is an application supplied function for creating and configuring a // connection. // // The connection returned from Dial must not be in a special state // (subscribed to pubsub channel, transaction started, ...). Dial func() (Conn, error) // DialContext is an application supplied function for creating and configuring a // connection with the given context. // // The connection returned from Dial must not be in a special state // (subscribed to pubsub channel, transaction started, ...). DialContext func(ctx context.Context) (Conn, error) // TestOnBorrow is an optional application supplied function for checking // the health of an idle connection before the connection is used again by // the application. Argument t is the time that the connection was returned // to the pool. If the function returns an error, then the connection is // closed. TestOnBorrow func(c Conn, t time.Time) error // Maximum number of idle connections in the pool. MaxIdle int // Maximum number of connections allocated by the pool at a given time. // When zero, there is no limit on the number of connections in the pool. MaxActive int // Close connections after remaining idle for this duration. If the value // is zero, then idle connections are not closed. Applications should set // the timeout to a value less than the server's timeout. IdleTimeout time.Duration // If Wait is true and the pool is at the MaxActive limit, then Get() waits // for a connection to be returned to the pool before returning. Wait bool // Close connections older than this duration. If the value is zero, then // the pool does not close connections based on age. MaxConnLifetime time.Duration chInitialized uint32 // set to 1 when field ch is initialized mu           sync.Mutex    // mu protects the following fields closed       bool          // set to true when the pool is closed. active       int           // the number of open connections in the pool ch           chan struct{} // limits open connections when p.Wait is true idle         idleList      // idle connections waitCount    int64         // total number of connections waited for. waitDuration time.Duration // total time waited for new connections.}
客户端:

redigo 的客户端需要显式声明并初始化内部的 pool:

func newPool(addr string) *redis.Pool {    return &redis.Pool{        MaxIdle: 3,        IdleTimeout: 240 * time.Second,        // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.        Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },    }}

初始化时可以提供 TestOnBorrow 的行为:

pool := &redis.Pool{  // Other pool configuration not shown in this example.  TestOnBorrow: func(c redis.Conn, t time.Time) error {    if time.Since(t) < time.Minute {      return nil    }    _, err := c.Do("PING")    return err  },}

使用时也需要用户显式地 defer Close:

func serveHome(w http.ResponseWriter, r *http.Request) {    conn := pool.Get()    defer conn.Close()    ...}
pool.Get

7

用户需要设置 pool.Wait 是否等待,如果 Waittrue,则在没有连接可用时,会阻塞等待。如果 Waitfalse,且连接已到达阈值 pool.MaxActive,则直接返回错误 ErrPoolExhausted。

activeConn.Close

func (ac *activeConn) Close() error { pc := ac.pc if pc == nil {  return nil } ac.pc = nil if ac.state&connectionMultiState != 0 {  pc.c.Send("DISCARD")  ac.state &^= (connectionMultiState | connectionWatchState) } else if ac.state&connectionWatchState != 0 {  pc.c.Send("UNWATCH")  ac.state &^= connectionWatchState } if ac.state&connectionSubscribeState != 0 {  pc.c.Send("UNSUBSCRIBE")  pc.c.Send("PUNSUBSCRIBE")  // To detect the end of the message stream, ask the server to echo  // a sentinel value and read until we see that value.  sentinelOnce.Do(initSentinel)  pc.c.Send("ECHO", sentinel)  pc.c.Flush()  for {   p, err := pc.c.Receive()   if err != nil {    break   }   if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {    ac.state &^= connectionSubscribeState    break   }  } } pc.c.Do("") ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil) return nil}

close 时会把这个 activeConn 放回连接池。

go-redis/redis

这个 redis 库屏蔽了连接池逻辑,用户侧基本不用关心连接,初始化时,传入连接池相关配置:

 rdb := redis.NewClient(&redis.Options{  Addr:     "localhost:6379", // use default Addr  Password: "",               // no password set  DB:       0,                // use default DB })func NewClient(opt *Options) *Client { opt.init() c := Client{  baseClient: newBaseClient(opt, newConnPool(opt)),  ctx:        context.Background(), } c.cmdable = c.Process return &c}func newConnPool(opt *Options) *pool.ConnPool { return pool.NewConnPool(&pool.Options{  Dialer: func(ctx context.Context) (net.Conn, error) {   return opt.Dialer(ctx, opt.Network, opt.Addr)  },  PoolSize:           opt.PoolSize,  MinIdleConns:       opt.MinIdleConns,  MaxConnAge:         opt.MaxConnAge,  PoolTimeout:        opt.PoolTimeout,  IdleTimeout:        opt.IdleTimeout,  IdleCheckFrequency: opt.IdleCheckFrequency, })}func (c *baseClient) _process(ctx context.Context, cmd Cmder) error { var lastErr error for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {  if attempt > 0 {   if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {    return err   }  }  retryTimeout := true  lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {   err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {    return writeCmd(wr, cmd)   })   if err != nil {    return err   }   err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)   if err != nil {    retryTimeout = cmd.readTimeout() == nil    return err   }   return nil  })  if lastErr == nil || !isRetryableError(lastErr, retryTimeout) {   return lastErr  } } return lastErr}func (c *baseClient) withConn( ctx context.Context, fn func(context.Context, *pool.Conn) error,) error { cn, err := c.getConn(ctx) if err != nil {  return err } defer func() {  c.releaseConn(cn, err) }() err = fn(ctx, cn) return err

连接池维护的逻辑和其它库差不多。与其它库不同的是,该库会保证 idle 的 conns 维持在 MinIdleConn 配置数量之上,不足的话,会在后台补充:

func (p *ConnPool) checkMinIdleConns() { if p.opt.MinIdleConns == 0 {  return } for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {  p.poolSize++  p.idleConnsLen++  go func() {   err := p.addIdleConn()   if err != nil {    p.connsMu.Lock()    p.poolSize--    p.idleConnsLen--    p.connsMu.Unlock()   }  }() }}
database/sql

这里的连接池与 RPC 系列的稍有区别,取的是 freeConns 的第一个,并且有一个可能效率比较低的 copy 过程:

 // Prefer a free connection, if possible. numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 {  conn := db.freeConn[0]  copy(db.freeConn, db.freeConn[1:])  db.freeConn = db.freeConn[:numFree-1]  conn.inUse = true  db.mu.Unlock()  if conn.expired(lifetime) {   conn.Close()   return nil, driver.ErrBadConn  }  // Lock around reading lastErr to ensure the session resetter finished.  conn.Lock()  err := conn.lastErr  conn.Unlock()  if err == driver.ErrBadConn {   conn.Close()   return nil, driver.ErrBadConn  }  return conn, nil 

其它的没啥特殊的。

标签: #数据库连接有哪些协议