package supervisor import ( "context" "log/slog" "os" "os/signal" "sync" "time" "git.gocasts.ir/ebhomengo/niki/logger" ) // ProcessFunc is a long-running process which listens on finishSignal // It notifies the supervisor by terminate channel when it terminates. type ProcessFunc func(finishSignal context.Context, processName string, terminateChannel chan<- string) error var noopProcessFunc = func(finishSignal context.Context, processName string, terminateChannel chan<- string) error { return nil } // Supervisor is responsible to manage long-running processes // Supervisor is not for concurrent use and should be used as the main goroutine of application. type Supervisor struct { logger *slog.Logger lock *sync.Mutex processes map[string]Process shutdownSignal chan os.Signal ctx context.Context ctxCancel context.CancelFunc shutdownTimeout time.Duration // terminateChannel should be used to notify supervisor when a process terminates terminateChannel chan string } func New(shutdownTimeout time.Duration, l *slog.Logger) *Supervisor { ctx, cancel := context.WithCancel(context.Background()) if l == nil { l = logger.L() } if shutdownTimeout == 0 { shutdownTimeout = DefaultGracefulShutdownTimeout } return &Supervisor{ lock: &sync.Mutex{}, logger: l.WithGroup(LogNSSupervisor), processes: make(map[string]Process), shutdownSignal: make(chan os.Signal, 1), ctx: ctx, ctxCancel: cancel, shutdownTimeout: shutdownTimeout, // TODO : how to set terminateChannel buffer? //nolint terminateChannel: make(chan string, 10), } } type Process struct { name string handler ProcessFunc options ProcessOption state ProcessState } type ProcessState struct { // RecoveredNum count number of time the process recovered RecoveredNum int } type ProcessOption struct { Recover bool RecoverInterval time.Duration RecoverCount int RetryCount int RetryInterval time.Duration IsFatal bool } const ( ProcessRetryCount = 3 ProcessRetryInterval = 3 * time.Second ProcessRecoverCount = 10 ProcessRecoverInterval = 2 * time.Second DefaultGracefulShutdownTimeout = 5 * time.Second LogNSSupervisor = "supervisor" ) var defaultOptions = ProcessOption{ Recover: true, RetryInterval: ProcessRetryInterval, RecoverInterval: ProcessRecoverInterval, RecoverCount: ProcessRecoverCount, RetryCount: ProcessRetryCount, IsFatal: true, } // Register registers a new process to supervisor. func (s *Supervisor) Register(name string, process ProcessFunc, options *ProcessOption) { // TODO : don't allow any registration after Start is called using a mutex s.warnIfNameAlreadyInUse(name) // TODO : validate name p := Process{ name: name, handler: process, options: defaultOptions, state: ProcessState{RecoveredNum: 0}, } if options != nil { p.options = *options } s.lock.Lock() s.processes[name] = p s.lock.Unlock() } // Start spawns a new goroutine for each process // Spawned goroutine is responsible to handle the panics and restart the process. func (s *Supervisor) Start() { // TODO : is it viable to use a goroutine pool such as Ants ? for name := range s.processes { go s.executeProcessWithRetryPolicy(name) } } func (s *Supervisor) executeProcessWithRetryPolicy(name string) { defer func() { if r := recover(); r != nil { s.logger.Error("recover from panic", slog.String("process_name", name), slog.Any("panic", r)) if s.isRecoverable(name) { s.incRecover(name) s.waitFoRecover(name) s.logger.Info("restart the process", slog.String("process_name", name)) // spawn new goroutine to avoid heap/stack memory leak when the recover count is big go s.executeProcessWithRetryPolicy(name) return } s.logger.Info("don't try any more to restart the process", slog.String("process_name", name)) s.removeProcess(name) if s.isFatal(name) { s.logger.Error("can't recover important process. exit..", slog.String("process_name", name)) s.shutdownSignal <- os.Interrupt } } }() for i := 1; i <= s.retryCount(name); i++ { s.logger.Info("execute process", slog.String("process_name", name)) f := s.handler(name) err := f(s.ctx, name, s.terminateChannel) if err != nil { s.logger.Error("failed to execute process", slog.String("process_name", name), slog.Int("attempt", i), slog.String("error", err.Error())) s.waitFoRetry(name) continue } // don't expect handler return if it hasn't any error because it's long-running process // it should return when receives shutdown signal s.logger.Info("process terminates with no error", slog.String("process_name", name)) if s.isFatal(name) { s.logger.Error("can't recover important process. exit..", slog.String("process_name", name)) s.shutdownSignal <- os.Interrupt } return } s.logger.Info("don't try any more to execute process", slog.String("process_name", name)) s.removeProcess(name) } // WaitOnShutdownSignal wait to receive shutdown signal. // WaitOnShutdownSignal should not be called in other goroutines except main goroutine of application. func (s *Supervisor) WaitOnShutdownSignal() { // TODO : is it necessary to add os.Interrupt to supervisor config? signal.Notify(s.shutdownSignal, os.Interrupt) <-s.shutdownSignal s.gracefulShutdown() } func (s *Supervisor) gracefulShutdown() { s.logger.Info("shutdown all processes gracefully") s.logger.Info("notify all processes (goroutines) to finish their jobs", slog.Duration("shutdown_timeout", s.shutdownTimeout)) s.ctxCancel() forceExitCtx, forceExitCancel := context.WithTimeout(context.Background(), s.shutdownTimeout) defer forceExitCancel() for { select { case name := <-s.terminateChannel: s.logger.Info("process terminates gracefully", slog.String("process_name", name)) s.removeProcess(name) case <-forceExitCtx.Done(): s.logger.Info("supervisor terminates its job.", slog.Int("number_of_unfinished_processes", len(s.processes))) return } } } func (s *Supervisor) removeProcess(name string) { s.lock.Lock() delete(s.processes, name) s.lock.Unlock() } func (s *Supervisor) isRecoverable(name string) bool { s.lock.Lock() defer s.lock.Unlock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return false } if v.options.Recover && v.state.RecoveredNum < v.options.RecoverCount { return true } return false } func (s *Supervisor) isFatal(name string) bool { s.lock.Lock() defer s.lock.Unlock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return false } return v.options.IsFatal } func (s *Supervisor) incRecover(name string) { s.lock.Lock() defer s.lock.Unlock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return } v.state.RecoveredNum++ s.processes[name] = v } func (s *Supervisor) retryCount(name string) int { s.lock.Lock() defer s.lock.Unlock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return -1 } return v.options.RetryCount } // nolint func (s *Supervisor) retryInterval(name string) time.Duration { s.lock.Lock() defer s.lock.Unlock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return -1 } return v.options.RetryInterval } func (s *Supervisor) waitFoRecover(name string) { s.lock.Lock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return } t := v.options.RecoverInterval // free lock before sleep s.lock.Unlock() time.Sleep(t) } func (s *Supervisor) waitFoRetry(name string) { s.lock.Lock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return } t := v.options.RetryInterval // free lock before sleep s.lock.Unlock() s.logger.Info("wait to retry execute process after sleep interval", slog.String("process_name", name), slog.Duration("interval", t)) time.Sleep(t) } func (s *Supervisor) handler(name string) ProcessFunc { s.lock.Lock() defer s.lock.Unlock() v, ok := s.processes[name] if !ok { s.logger.Warn("process doesn't exist", slog.String("process_name", name)) return noopProcessFunc } return v.handler } func (s *Supervisor) warnIfNameAlreadyInUse(name string) { s.lock.Lock() defer s.lock.Unlock() if _, ok := s.processes[name]; ok { s.logger.Warn("process name already in use", slog.String("process_name", name)) } }