龙空技术网

endless 如何实现不停机重启 Go 程序?

柔情寂寞如烟 594

前言:

此时朋友们对“php平滑重启”大约比较讲究,咱们都想要分析一些“php平滑重启”的相关资讯。那么小编同时在网络上搜集了一些关于“php平滑重启””的相关文章,希望大家能喜欢,小伙伴们一起来了解一下吧!

前几篇文章讲解了如何实现一个高效的 HTTP 服务,这次我们来看一下如何实现一个永不不停机的 Go 程序。

前提#

事情是这样的,在一天风和日丽的周末,我正在看 TiDB 源码的时候,有一位胖友找到我说,Go 是不是每次修改都需要重启才行?由于我才疏学浅不知道有不停机重启这个东西,所以回答是的。然后他说,那完全没有 PHP 好用啊,PHP 修改逻辑完之后直接替换一个文件就可以实现发布,不需要重启。我当时只能和他说可以多 Pod 部署,金丝雀发布等等也可以做到整个服务不停机发布。但是他最后还是带着得以意笑容离去。

当时看着他离去的身影我就发誓,我要研究一下 Go 语言的不停机重启,证明不是 Go 不行,而是我不行 [DOGE] [DOGE] [DOGE],所以就有了这么一篇文章。

那么对于一个不停机重启 Go 程序我们需要解决以下两个问题:

进程重启不需要关闭监听的端口;既有请求应当完全处理或者超时;

后面我们会看一下 endless 是如何做到这两点的。

基本概念#

下面先简单介绍一下两个知识点,以便后面的开展

信号处理#

Go 信号通知通过在 Channel 上发送 os.Signal 值来工作。如我们如果使用 Ctrl+C,那么会触发 SIGINT 信号,操作系统会中断该进程的正常流程,并进入相应的信号处理函数执行操作,完成后再回到中断的地方继续执行。

Copyfunc main() {    sigs := make(chan os.Signal, 1)    done := make(chan bool, 1)    // 监听信号    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)     go func() {        // 接收到信号返回        sig := <-sigs        fmt.Println()        fmt.Println(sig)        done <- true    }()     fmt.Println("awaiting signal")    // 等待信号的接收    <-done    fmt.Println("exiting")}

通过上述简单的几行代码,我们就可以监听 SIGINT 和 SIGTERM 信号。当 Go 接收到操作系统发送过来的信号,那么会将信号值放入到 sigs 管道中进行处理。

Fork 子进程#

在Go语言中 exec 包为我们很好的封装好了 Fork 调用,并且使用它可以使用 ExtraFiles 很好的继承父进程已打开的文件。

Copyfile := netListener.File() // this returns a Dup()path := "/path/to/executable"args := []string{    "-graceful"}// 产生 Cmd 实例cmd := exec.Command(path, args...)// 标准输出cmd.Stdout = os.Stdout// 标准错误输出cmd.Stderr = os.Stderrcmd.ExtraFiles = []*os.File{file}// 启动命令err := cmd.Start()if err != nil {    log.Fatalf("gracefulRestart: Failed to launch, error: %v", err)}

通过调用 exec 包的 Command 命令传入 path(将要执行的命令路径)、args (命令的参数)即可返回 Cmd 实例,通过 ExtraFiles 字段指定额外被新进程继承的已打开文件,最后调用 Start 方法创建子进程。

这里的 netListener.File会通过系统调用 dup 复制一份 file descriptor 文件描述符。

Copyfunc Dup(oldfd int) (fd int, err error) {	r0, _, e1 := Syscall(SYS_DUP, uintptr(oldfd), 0, 0)	fd = int(r0)	if e1 != 0 {		err = errnoErr(e1)	}	return}

我们可以看到 dup 的命令介绍:

Copydup and dup2 create a copy of the file descriptor oldfd.After successful return of dup or dup2, the old and new descriptors maybe used interchangeably. They share locks, file position pointers andflags; for example, if the file position is modified by using lseek onone of the descriptors, the position is also changed for the other.The two descriptors do not share the close-on-exec flag, however.

通过上面的描述可以知道,返回的新文件描述符和参数 oldfd 指向同一个文件,共享所有的索性、读写指针、各项权限或标志位等。但是不共享关闭标志位,也就是说 oldfd 已经关闭了,也不影响写入新的数据到 newfd 中。

上图显示了fork一个子进程,子进程复制父进程的文件描述符表。

endless 不停机重启示例#

我这里稍微写一下 endless 的使用示例给没有用过 endless 的同学看看,熟悉 endless 使用的同学可以跳过。

Copyimport (	"log"	"net/http"	"os"	"sync"	"time"	"github.com/fvbock/endless"	"github.com/gorilla/mux")func handler(w http.ResponseWriter, r *http.Request) {	duration, err := time.ParseDuration(r.FormValue("duration"))	if err != nil {		http.Error(w, err.Error(), 400)		return	}	time.Sleep(duration)	w.Write([]byte("Hello World"))}func main() {	mux1 := mux.NewRouter()	mux1.HandleFunc("/sleep", handler)	w := sync.WaitGroup{}	w.Add(1)	go func() {		err := endless.ListenAndServe("127.0.0.1:5003", mux1)		if err != nil {			log.Println(err)		}		log.Println("Server on 5003 stopped")		w.Done()	}()	w.Wait()	log.Println("All servers stopped. Exiting.")	os.Exit(0)}

下面验证一下 endless 创建的不停机服务:

Copy# 第一次构建项目go build main.go# 运行项目,这时就可以做内容修改了./endless &# 请求项目,60s后返回curl "; &# 再次构建项目,这里是新内容go build main.go# 重启,17171为pidkill -1 17171# 新API请求curl "; 

运行完上面的命令我们可以看到,对于第一个请求返回的是:Hello world,在发送第二个请求之前,我将 handler 里面的返回值改成了:Hello world2222,然后进行构建重启。

由于我设置了 60s 才返回第一个请求,第二个请求设置的是 1s 返回,所以这里会先返回第二个请求的值,然后再返回第一个请求的值。

整个时间线如下所示:

并且在等待第一个请求返回期间,可以看到同时有两个进程在跑:

Copy$ ps -ef |grep mainroot      84636  80539  0 22:25 pts/2    00:00:00 ./mainroot      85423  84636  0 22:26 pts/2    00:00:00 ./main

在第一个请求响应之后,我们再看进程可以发现父进程已经关掉了,实现了父子进程无缝切换:

Copy$ ps -ef |grep mainroot      85423      1  0 22:26 pts/2    00:00:00 ./main
实现原理#

在实现上,我这里用的是 endless 的实现方案,所以下面原理和代码都通过它的代码进行讲解。

我们要做的不停机重启,实现原理如上图所示:

监听 SIGHUP 信号;收到信号时 fork 子进程(使用相同的启动命令),将服务监听的 socket 文件描述符传递给子进程;子进程监听父进程的 socket,这个时候父进程和子进程都可以接收请求;子进程启动成功之后发送 SIGTERM 信号给父进程,父进程停止接收新的连接,等待旧连接处理完成(或超时);父进程退出,升级完成;代码实现#

我们从上面的示例可以看出,endless 的入口是 ListenAndServe 函数:

Copy func ListenAndServe(addr string, handler http.Handler) error {    // 初始化 server	server := NewServer(addr, handler)    // 监听以及处理请求	return server.ListenAndServe()}

这个方法分为两部分,先是初始化 server,然后再监听以及处理请求。

初始化 Server#

我们首先看一下一个 endless 服务的 Server 结构体是怎样:

Copytype endlessServer struct {	// 用于继承 http.Server 结构	http.Server	// 监听客户端请求的 Listener	EndlessListener  net.Listener  	// 用于记录还有多少客户端请求没有完成	wg               sync.WaitGroup	// 用于接收信号的管道	sigChan          chan os.Signal	// 用于重启时标志本进程是否是为一个新进程	isChild          bool	// 当前进程的状态	state            uint8     ...}

这个 endlessServer 除了继承 http.Server 所有字段以外,因为还需要监听信号以及判断是不是一个新的进程,所以添加了几个状态位的字段:

wg:标记还有多少客户端请求没有完成;sigChan:用于接收信号的管道;isChild:用于重启时标志本进程是否是为一个新进程;state:当前进程的状态。

下面我们看看如何初始化 endlessServer :

Copyfunc NewServer(addr string, handler http.Handler) (srv *endlessServer) {	runningServerReg.Lock()	defer runningServerReg.Unlock() 	    socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")	// 根据环境变量判断是不是子进程	isChild = os.Getenv("ENDLESS_CONTINUE") != ""     // 由于支持多 server,所以这里需要设置一下 server 的顺序    if len(socketOrder) > 0 {		for i, addr := range strings.Split(socketOrder, ",") {			socketPtrOffsetMap[addr] = uint(i)		}	} else {		socketPtrOffsetMap[addr] = uint(len(runningServersOrder))	}	srv = &endlessServer{		wg:      sync.WaitGroup{},		sigChan: make(chan os.Signal),		isChild: isChild,		...		state: STATE_INIT,		lock:  &sync.RWMutex{},	}	srv.Server.Addr = addr	srv.Server.ReadTimeout = DefaultReadTimeOut	srv.Server.WriteTimeout = DefaultWriteTimeOut	srv.Server.MaxHeaderBytes = DefaultMaxHeaderBytes	srv.Server.Handler = handler 	runningServers[addr] = srv	...	return}

这里初始化都是我们在 net/http 里面看到的一些常见的参数,包括 ReadTimeout 读取超时时间、WriteTimeout 写入超时时间、Handler 请求处理器等,不熟悉的可以看一下这篇:《 一文说透 Go 语言 HTTP 标准库 》。

需要注意的是,这里是通过 ENDLESS_CONTINUE 环境变量来判断是否是个子进程,这个环境变量会在 fork 子进程的时候写入。因为 endless 是支持多 server 的,所以需要用 ENDLESS_SOCKET_ORDER变量来判断一下 server 的顺序。

ListenAndServe#

Copyfunc (srv *endlessServer) ListenAndServe() (err error) {	addr := srv.Addr	if addr == "" {		addr = ":http"	}	// 异步处理信号量	go srv.handleSignals()	// 获取端口监听	l, err := srv.getListener(addr)	if err != nil {		log.Println(err)		return	}	// 将监听转为 endlessListener	srv.EndlessListener = newEndlessListener(l, srv)	// 如果是子进程,那么发送 SIGTERM 信号给父进程	if srv.isChild {		syscall.Kill(syscall.Getppid(), syscall.SIGTERM)	}	srv.BeforeBegin(srv.Addr)	// 响应Listener监听,执行对应请求逻辑	return srv.Serve()}

这个方法其实和 net/http 库是比较像的,首先获取端口监听,然后调用 Serve 处理请求发送过来的数据,大家可以打开文章《 一文说透 Go 语言 HTTP 标准库 》对比一下和 endless 的异同。

但是还是有几点不一样的,endless 为了做到平滑重启需要用到信号监听处理,并且在 getListener 的时候也不一样,如果是子进程需要继承到父进程的 listen fd,这样才能做到不关闭监听的端口。

handleSignals 信号处理#

信号处理主要是信号的一个监听,然后根据不同的信号循环处理。

Copyfunc (srv *endlessServer) handleSignals() {	var sig os.Signal	// 注册信号监听	signal.Notify(		srv.sigChan,		hookableSignals...,	)	// 获取pid	pid := syscall.Getpid()	for {		sig = <-srv.sigChan		// 在处理信号之前触发hook		srv.signalHooks(PRE_SIGNAL, sig)		switch sig {		// 接收到平滑重启信号		case syscall.SIGHUP:			log.Println(pid, "Received SIGHUP. forking.")			err := srv.fork()			if err != nil {				log.Println("Fork err:", err)			} 		// 停机信号		case syscall.SIGINT:			log.Println(pid, "Received SIGINT.")			srv.shutdown()		// 停机信号		case syscall.SIGTERM:			log.Println(pid, "Received SIGTERM.")			srv.shutdown()		...		// 在处理信号之后触发hook		srv.signalHooks(POST_SIGNAL, sig)	}}

这一部分的代码十分简洁,当我们用kill -1 $pid 的时候这里 srv.sigChan 就会接收到相应的信号,并进入到 case syscall.SIGHUP 这块逻辑代码中。

需要注意的是,在上面的 ListenAndServe 方法中子进程会像父进程发送 syscall.SIGTERM 信号也会在这里被处理,执行的是 shutdown 停机逻辑。

在进入到 case syscall.SIGHUP 这块逻辑代码之后会调用 fork 函数,下面我们再来看看 fork 逻辑:

Copyfunc (srv *endlessServer) fork() (err error) {	runningServerReg.Lock()	defer runningServerReg.Unlock()	// 校验是否已经fork过	if runningServersForked {		return errors.New("Another process already forked. Ignoring this one.")	} 	runningServersForked = true	var files = make([]*os.File, len(runningServers))	var orderArgs = make([]string, len(runningServers))	// 因为有多 server 的情况,所以获取所有 listen fd	for _, srvPtr := range runningServers { 		switch srvPtr.EndlessListener.(type) {		case *endlessListener: 			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()		default: 			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()		}		orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr	}	// 环境变量	env := append(		os.Environ(),    // 启动endless 的时候,会根据这个参数来判断是否是子进程		"ENDLESS_CONTINUE=1",	)	if len(runningServers) > 1 {		env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))	}	// 程序运行路径	path := os.Args[0]	var args []string	// 参数	if len(os.Args) > 1 {		args = os.Args[1:]	}	cmd := exec.Command(path, args...)	// 标准输出	cmd.Stdout = os.Stdout	// 错误	cmd.Stderr = os.Stderr	cmd.ExtraFiles = files	cmd.Env = env  	err = cmd.Start()	if err != nil {		log.Fatalf("Restart: Failed to launch, error: %v", err)	} 	return}

fork 这块代码首先会根据 server 来获取不同的 listen fd 然后封装到 files 列表中,然后在调用 cmd 的时候将文件描述符传入到 ExtraFiles 参数中,这样子进程就可以无缝托管到父进程监听的端口。

需要注意的是,env 参数列表中有一个 ENDLESS_CONTINUE 参数,这个参数会在 endless 启动的时候做校验:

Copyfunc NewServer(addr string, handler http.Handler) (srv *endlessServer) {	runningServerReg.Lock()	defer runningServerReg.Unlock()	socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER")	isChild = os.Getenv("ENDLESS_CONTINUE") != ""  ...}

下面我们再看看 接收到 SIGTERM 信号后,shutdown 会怎么做:

Copyfunc (srv *endlessServer) shutdown() {	if srv.getState() != STATE_RUNNING {		return	}	srv.setState(STATE_SHUTTING_DOWN)    // 默认 DefaultHammerTime 为 60秒	if DefaultHammerTime >= 0 {		go srv.hammerTime(DefaultHammerTime)	}	// 关闭存活的连接	srv.SetKeepAlivesEnabled(false)	err := srv.EndlessListener.Close()	if err != nil {		log.Println(syscall.Getpid(), "Listener.Close() error:", err)	} else {		log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.")	}}

shutdown 这里会先将连接关闭,因为这个时候子进程已经启动了,所以不再处理请求,需要把端口的监听关了。这里还会异步调用 srv.hammerTime 方法等待60秒把父进程的请求处理完毕才关闭父进程。

getListener 获取端口监听#

Copyfunc (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {	// 如果是子进程	if srv.isChild {		var ptrOffset uint = 0		runningServerReg.RLock()		defer runningServerReg.RUnlock()		// 这里还是处理多个 server 的情况		if len(socketPtrOffsetMap) > 0 {			// 根据server 的顺序来获取 listen fd 的序号			ptrOffset = socketPtrOffsetMap[laddr] 		}		// fd 0,1,2是预留给 标准输入、输出和错误的,所以从3开始		f := os.NewFile(uintptr(3+ptrOffset), "")		l, err = net.FileListener(f)		if err != nil {			err = fmt.Errorf("net.FileListener error: %v", err)			return		}	} else {		// 父进程 直接返回 listener		l, err = net.Listen("tcp", laddr)		if err != nil {			err = fmt.Errorf("net.Listen error: %v", err)			return		}	}	return}

这里如果是父进程没什么好说的,直接创建一个端口监听并返回就好了。

但是对于子进程来说是有一些绕,首先说一下 os.NewFile 的参数为什么要从3开始。因为子进程在继承父进程的 fd 的时候0,1,2是预留给 标准输入、输出和错误的,所以父进程给的第一个fd在子进程里顺序排就是从3开始了,又因为 fork 的时候cmd.ExtraFiles 参数传入的是一个 files,如果有多个 server 那么会依次从3开始递增。

如下图,前三个 fd 是预留给 标准输入、输出和错误的,fd 3 是根据传入 ExtraFiles 的数组列表依次递增的。

其实这里我们也可以用开头的例子做一下试验:

Copy# 第一次构建项目go build main.go# 运行项目,这时就可以做内容修改了./endless &# 这个时候我们看看父进程打开的文件lsof  -P -p 17116COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME...main    18942 root    0u      CHR   136,2      0t0        5 /dev/pts/2main    18942 root    1u      CHR   136,2      0t0        5 /dev/pts/2main    18942 root    2u      CHR   136,2      0t0        5 /dev/pts/2main    18942 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)# 请求项目,60s后返回curl "; & # 重启,17116为父进程pidkill -1 17116# 然后我们看一下 main 程序的进程应该有两个ps -ef |grep ./mainroot      17116  80539  0 04:19 pts/2    00:00:00 ./mainroot      18110  17116  0 04:21 pts/2    00:00:00 ./main# 可以看到子进程pid 为18110,我们看看该进程打开的文件lsof  -P -p 18110COMMAND   PID USER   FD      TYPE  DEVICE SIZE/OFF     NODE NAME...main    19073 root    0r      CHR     1,3      0t0     1028 /dev/nullmain    19073 root    1u      CHR   136,2      0t0        5 /dev/pts/2main    19073 root    2u      CHR   136,2      0t0        5 /dev/pts/2main    19073 root    3u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)main    19073 root    4u     IPv4 2223979      0t0      TCP localhost:5003 (LISTEN)# 新API请求curl "; 
总结#

通过上面的介绍,我们通过 endless 学习了在 Go 服务中如何做到不停机也可以重启服务,相信这个功能在很多场景下都会用到,没用到的同学也可以尝试在自己的系统上玩一下。

热重启总的来说它允许服务重启期间,不中断已经建立的连接,老服务进程不再接受新连接请求,新连接请求将在新服务进程中受理。对于原服务进程中已经建立的连接,也可以将其设为读关闭,等待平滑处理完连接上的请求及连接空闲后再行退出。

通过这种方式,可以保证已建立的连接不中断,新的服务进程也可以正常接受连接请求。

标签: #php平滑重启