endless 如何实现不停机重启 Go 程序? (3)

这个方法其实和 net/http 库是比较像的,首先获取端口监听,然后调用 Serve 处理请求发送过来的数据,大家可以打开文章《 一文说透 Go 语言 HTTP 标准库 https://www.luozhiyun.com/archives/561 》对比一下和 endless 的异同。

但是还是有几点不一样的,endless 为了做到平滑重启需要用到信号监听处理,并且在 getListener 的时候也不一样,如果是子进程需要继承到父进程的 listen fd,这样才能做到不关闭监听的端口。

handleSignals 信号处理

graceful_restart4

信号处理主要是信号的一个监听,然后根据不同的信号循环处理。

func (srv *endlessServer) handleSignals() { var sig os.Signal // 注册信号监听 signal.Notify( srv.sigChan, hookableSignals..., ) // 获取pid pid := syscall.Getpid() for { sig = <-srv.sigChan // 在处理信号之前触发hook srv.signalHooks(PRE_SIGNAL, sig) switch sig { // 接收到平滑重启信号 case syscall.SIGHUP: log.Println(pid, "Received SIGHUP. forking.") err := srv.fork() if err != nil { log.Println("Fork err:", err) } // 停机信号 case syscall.SIGINT: log.Println(pid, "Received SIGINT.") srv.shutdown() // 停机信号 case syscall.SIGTERM: log.Println(pid, "Received SIGTERM.") srv.shutdown() ... // 在处理信号之后触发hook srv.signalHooks(POST_SIGNAL, sig) } }

这一部分的代码十分简洁,当我们用kill -1 $pid 的时候这里 srv.sigChan 就会接收到相应的信号,并进入到 case syscall.SIGHUP 这块逻辑代码中。

需要注意的是,在上面的 ListenAndServe 方法中子进程会像父进程发送 syscall.SIGTERM 信号也会在这里被处理,执行的是 shutdown 停机逻辑。

在进入到 case syscall.SIGHUP 这块逻辑代码之后会调用 fork 函数,下面我们再来看看 fork 逻辑:

func (srv *endlessServer) fork() (err error) { runningServerReg.Lock() defer runningServerReg.Unlock() // 校验是否已经fork过 if runningServersForked { return errors.New("Another process already forked. Ignoring this one.") } runningServersForked = true var files = make([]*os.File, len(runningServers)) var orderArgs = make([]string, len(runningServers)) // 因为有多 server 的情况,所以获取所有 listen fd for _, srvPtr := range runningServers { switch srvPtr.EndlessListener.(type) { case *endlessListener: files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File() default: files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File() } orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr } // 环境变量 env := append( os.Environ(), // 启动endless 的时候,会根据这个参数来判断是否是子进程 "ENDLESS_CONTINUE=1", ) if len(runningServers) > 1 { env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ","))) } // 程序运行路径 path := os.Args[0] var args []string // 参数 if len(os.Args) > 1 { args = os.Args[1:] } cmd := exec.Command(path, args...) // 标准输出 cmd.Stdout = os.Stdout // 错误 cmd.Stderr = os.Stderr cmd.ExtraFiles = files cmd.Env = env err = cmd.Start() if err != nil { log.Fatalf("Restart: Failed to launch, error: %v", err) } return }

fork 这块代码首先会根据 server 来获取不同的 listen fd 然后封装到 files 列表中,然后在调用 cmd 的时候将文件描述符传入到 ExtraFiles 参数中,这样子进程就可以无缝托管到父进程监听的端口。

需要注意的是,env 参数列表中有一个 ENDLESS_CONTINUE 参数,这个参数会在 endless 启动的时候做校验:

func NewServer(addr string, handler http.Handler) (srv *endlessServer) { runningServerReg.Lock() defer runningServerReg.Unlock() socketOrder = os.Getenv("ENDLESS_SOCKET_ORDER") isChild = os.Getenv("ENDLESS_CONTINUE") != "" ... }

下面我们再看看 接收到 SIGTERM 信号后,shutdown 会怎么做:

func (srv *endlessServer) shutdown() { if srv.getState() != STATE_RUNNING { return } srv.setState(STATE_SHUTTING_DOWN) // 默认 DefaultHammerTime 为 60秒 if DefaultHammerTime >= 0 { go srv.hammerTime(DefaultHammerTime) } // 关闭存活的连接 srv.SetKeepAlivesEnabled(false) err := srv.EndlessListener.Close() if err != nil { log.Println(syscall.Getpid(), "Listener.Close() error:", err) } else { log.Println(syscall.Getpid(), srv.EndlessListener.Addr(), "Listener closed.") } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyspwy.html