forked from ebhomengo/niki
1
0
Fork 0
niki/pkg/supervisor/supervisor.go

364 lines
8.7 KiB
Go

package supervisor
import (
"context"
"log/slog"
"os"
"os/signal"
"sync"
"time"
"git.gocasts.ir/ebhomengo/niki/logger"
)
// ProcessFunc is a long-running process which listens on finishSignal
// It notifies the supervisor by terminate channel when it terminates.
type ProcessFunc func(finishSignal context.Context, processName string, terminateChannel chan<- string) error
var noopProcessFunc = func(finishSignal context.Context, processName string, terminateChannel chan<- string) error {
return nil
}
// Supervisor is responsible to manage long-running processes
// Supervisor is not for concurrent use and should be used as the main goroutine of application.
type Supervisor struct {
logger *slog.Logger
lock *sync.Mutex
processes map[string]Process
shutdownSignal chan os.Signal
ctx context.Context
ctxCancel context.CancelFunc
shutdownTimeout time.Duration
// terminateChannel should be used to notify supervisor when a process terminates
terminateChannel chan string
}
func New(shutdownTimeout time.Duration, l *slog.Logger) *Supervisor {
ctx, cancel := context.WithCancel(context.Background())
if l == nil {
l = logger.L()
}
if shutdownTimeout == 0 {
shutdownTimeout = DefaultGracefulShutdownTimeout
}
return &Supervisor{
lock: &sync.Mutex{},
logger: l.WithGroup(LogNSSupervisor),
processes: make(map[string]Process),
shutdownSignal: make(chan os.Signal, 1),
ctx: ctx,
ctxCancel: cancel,
shutdownTimeout: shutdownTimeout,
// TODO : how to set terminateChannel buffer?
//nolint
terminateChannel: make(chan string, 10),
}
}
type Process struct {
name string
handler ProcessFunc
options ProcessOption
state ProcessState
}
type ProcessState struct {
// RecoveredNum count number of time the process recovered
RecoveredNum int
}
type ProcessOption struct {
Recover bool
RecoverInterval time.Duration
RecoverCount int
RetryCount int
RetryInterval time.Duration
IsFatal bool
}
const (
ProcessRetryCount = 3
ProcessRetryInterval = 3 * time.Second
ProcessRecoverCount = 10
ProcessRecoverInterval = 2 * time.Second
DefaultGracefulShutdownTimeout = 5 * time.Second
LogNSSupervisor = "supervisor"
)
var defaultOptions = ProcessOption{
Recover: true,
RetryInterval: ProcessRetryInterval,
RecoverInterval: ProcessRecoverInterval,
RecoverCount: ProcessRecoverCount,
RetryCount: ProcessRetryCount,
IsFatal: true,
}
// Register registers a new process to supervisor.
func (s *Supervisor) Register(name string, process ProcessFunc, options *ProcessOption) {
// TODO : don't allow any registration after Start is called using a mutex
s.warnIfNameAlreadyInUse(name)
// TODO : validate name
p := Process{
name: name,
handler: process,
options: defaultOptions,
state: ProcessState{RecoveredNum: 0},
}
if options != nil {
p.options = *options
}
s.lock.Lock()
s.processes[name] = p
s.lock.Unlock()
}
// Start spawns a new goroutine for each process
// Spawned goroutine is responsible to handle the panics and restart the process.
func (s *Supervisor) Start() {
// TODO : is it viable to use a goroutine pool such as Ants ?
for name := range s.processes {
go s.executeProcessWithRetryPolicy(name)
}
}
func (s *Supervisor) executeProcessWithRetryPolicy(name string) {
defer func() {
if r := recover(); r != nil {
s.logger.Error("recover from panic", slog.String("process_name", name), slog.Any("panic", r))
if s.isRecoverable(name) {
s.incRecover(name)
s.waitFoRecover(name)
s.logger.Info("restart the process", slog.String("process_name", name))
// spawn new goroutine to avoid heap/stack memory leak when the recover count is big
go s.executeProcessWithRetryPolicy(name)
return
}
s.logger.Info("don't try any more to restart the process", slog.String("process_name", name))
s.removeProcess(name)
if s.isFatal(name) {
s.logger.Error("can't recover important process. exit..", slog.String("process_name", name))
s.shutdownSignal <- os.Interrupt
}
}
}()
for i := 1; i <= s.retryCount(name); i++ {
s.logger.Info("execute process", slog.String("process_name", name))
f := s.handler(name)
err := f(s.ctx, name, s.terminateChannel)
if err != nil {
s.logger.Error("failed to execute process", slog.String("process_name", name),
slog.Int("attempt", i), slog.String("error", err.Error()))
s.waitFoRetry(name)
continue
}
// don't expect handler return if it hasn't any error because it's long-running process
// it should return when receives shutdown signal
s.logger.Info("process terminates with no error", slog.String("process_name", name))
if s.isFatal(name) {
s.logger.Error("can't recover important process. exit..", slog.String("process_name", name))
s.shutdownSignal <- os.Interrupt
}
return
}
s.logger.Info("don't try any more to execute process", slog.String("process_name", name))
s.removeProcess(name)
}
// WaitOnShutdownSignal wait to receive shutdown signal.
// WaitOnShutdownSignal should not be called in other goroutines except main goroutine of application.
func (s *Supervisor) WaitOnShutdownSignal() {
// TODO : is it necessary to add os.Interrupt to supervisor config?
signal.Notify(s.shutdownSignal, os.Interrupt)
<-s.shutdownSignal
s.gracefulShutdown()
}
func (s *Supervisor) gracefulShutdown() {
s.logger.Info("shutdown all processes gracefully")
s.logger.Info("notify all processes (goroutines) to finish their jobs", slog.Duration("shutdown_timeout", s.shutdownTimeout))
s.ctxCancel()
forceExitCtx, forceExitCancel := context.WithTimeout(context.Background(), s.shutdownTimeout)
defer forceExitCancel()
for {
select {
case name := <-s.terminateChannel:
s.logger.Info("process terminates gracefully", slog.String("process_name", name))
s.removeProcess(name)
case <-forceExitCtx.Done():
s.logger.Info("supervisor terminates its job.", slog.Int("number_of_unfinished_processes", len(s.processes)))
return
}
}
}
func (s *Supervisor) removeProcess(name string) {
s.lock.Lock()
delete(s.processes, name)
s.lock.Unlock()
}
func (s *Supervisor) isRecoverable(name string) bool {
s.lock.Lock()
defer s.lock.Unlock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return false
}
if v.options.Recover && v.state.RecoveredNum < v.options.RecoverCount {
return true
}
return false
}
func (s *Supervisor) isFatal(name string) bool {
s.lock.Lock()
defer s.lock.Unlock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return false
}
return v.options.IsFatal
}
func (s *Supervisor) incRecover(name string) {
s.lock.Lock()
defer s.lock.Unlock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return
}
v.state.RecoveredNum++
s.processes[name] = v
}
func (s *Supervisor) retryCount(name string) int {
s.lock.Lock()
defer s.lock.Unlock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return -1
}
return v.options.RetryCount
}
//nolint
func (s *Supervisor) retryInterval(name string) time.Duration {
s.lock.Lock()
defer s.lock.Unlock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return -1
}
return v.options.RetryInterval
}
func (s *Supervisor) waitFoRecover(name string) {
s.lock.Lock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return
}
t := v.options.RecoverInterval
// free lock before sleep
s.lock.Unlock()
time.Sleep(t)
}
func (s *Supervisor) waitFoRetry(name string) {
s.lock.Lock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return
}
t := v.options.RetryInterval
// free lock before sleep
s.lock.Unlock()
s.logger.Info("wait to retry execute process after sleep interval",
slog.String("process_name", name), slog.Duration("interval",
t))
time.Sleep(t)
}
func (s *Supervisor) handler(name string) ProcessFunc {
s.lock.Lock()
defer s.lock.Unlock()
v, ok := s.processes[name]
if !ok {
s.logger.Warn("process doesn't exist", slog.String("process_name", name))
return noopProcessFunc
}
return v.handler
}
func (s *Supervisor) warnIfNameAlreadyInUse(name string) {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.processes[name]; ok {
s.logger.Warn("process name already in use", slog.String("process_name", name))
}
}