From 748dee60e21d07c62cdfdede515d67388d75682c Mon Sep 17 00:00:00 2001 From: hossein Date: Sat, 17 Feb 2024 10:29:40 +0330 Subject: [PATCH] add supervisor --- pkg/supervisor/supervisor.go | 350 +++++++++++++++++++++++++++++++++++ 1 file changed, 350 insertions(+) create mode 100644 pkg/supervisor/supervisor.go diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go new file mode 100644 index 0000000..d7b7cb2 --- /dev/null +++ b/pkg/supervisor/supervisor.go @@ -0,0 +1,350 @@ +package supervisor + +import ( + "context" + "git.gocasts.ir/ebhomengo/niki/logger" + + "log/slog" + "os" + "os/signal" + "sync" + "time" +) + +// ProcessFunc is a long-running process which listens on finishSignal +// It notifies the supervisor by terminate channel when it terminates +type ProcessFunc func(finishSignal context.Context, processName string, terminateChannel chan<- string) error + +var noopProcessFunc = func(finishSignal context.Context, processName string, terminateChannel chan<- string) error { + return nil +} + +// Supervisor is responsible to manage long-running processes +// Supervisor is not for concurrent use and should be used as the main goroutine of application +type Supervisor struct { + logger *slog.Logger + lock *sync.Mutex + processes map[string]Process + shutdownSignal chan os.Signal + ctx context.Context + ctxCancel context.CancelFunc + shutdownTimeout time.Duration + // terminateChannel should be used to notify supervisor when a process terminates + terminateChannel chan string +} + +func New(shutdownTimeout time.Duration, l *slog.Logger) *Supervisor { + ctx, cancel := context.WithCancel(context.Background()) + + if l == nil { + l = logger.L() + } + + if shutdownTimeout == 0 { + shutdownTimeout = DefaultGracefulShutdownTimeout + } + + return &Supervisor{ + lock: &sync.Mutex{}, + logger: l.WithGroup(LogNSSupervisor), + processes: make(map[string]Process), + shutdownSignal: make(chan os.Signal, 1), + ctx: ctx, + ctxCancel: cancel, + shutdownTimeout: shutdownTimeout, + // TODO : how to set terminateChannel buffer? + terminateChannel: make(chan string, 10), + } +} + +type Process struct { + name string + handler ProcessFunc + options ProcessOption + state ProcessState +} + +type ProcessState struct { + // RecoveredNum count number of time the process recovered + RecoveredNum int +} + +type ProcessOption struct { + Recover bool + RecoverInterval time.Duration + RecoverCount int + RetryCount int + RetryInterval time.Duration + IsFatal bool +} + +const ( + ProcessRetryCount = 3 + ProcessRetryInterval = 3 * time.Second + ProcessRecoverCount = 10 + ProcessRecoverInterval = 2 * time.Second + DefaultGracefulShutdownTimeout = 5 * time.Second + LogNSSupervisor = "supervisor" +) + +var defaultOptions = ProcessOption{ + Recover: true, + RetryInterval: ProcessRetryInterval, + RecoverInterval: ProcessRecoverInterval, + RecoverCount: ProcessRecoverCount, + RetryCount: ProcessRetryCount, + IsFatal: true, +} + +// Register registers a new process to supervisor +func (s *Supervisor) Register(name string, process ProcessFunc, options *ProcessOption) { + // TODO : don't allow any registration after Start is called using a mutex + + s.warnIfNameAlreadyInUse(name) + + // TODO : validate name + p := Process{ + name: name, + handler: process, + options: defaultOptions, + state: ProcessState{RecoveredNum: 0}, + } + + if options != nil { + p.options = *options + } + + s.lock.Lock() + s.processes[name] = p + s.lock.Unlock() +} + +// Start spawns a new goroutine for each process +// Spawned goroutine is responsible to handle the panics and restart the process +func (s *Supervisor) Start() { + // TODO : is it viable to use a goroutine pool such as Ants ? + for name := range s.processes { + go s.executeProcessWithRetryPolicy(name) + } +} + +func (s *Supervisor) executeProcessWithRetryPolicy(name string) { + defer func() { + if r := recover(); r != nil { + s.logger.Error("recover from panic", slog.String("process_name", name), slog.Any("panic", r)) + + if s.isRecoverable(name) { + s.incRecover(name) + s.waitFoRecover(name) + s.logger.Info("restart the process", slog.String("process_name", name)) + + // spawn new goroutine to avoid heap/stack memory leak when the recover count is big + go s.executeProcessWithRetryPolicy(name) + return + } + + s.logger.Info("don't try any more to restart the process", slog.String("process_name", name)) + s.removeProcess(name) + + if s.isFatal(name) { + s.logger.Error("can't recover important process. exit..", slog.String("process_name", name)) + s.shutdownSignal <- os.Interrupt + } + } + }() + + for i := 1; i <= s.retryCount(name); i++ { + s.logger.Info("execute process", slog.String("process_name", name)) + f := s.handler(name) + err := f(s.ctx, name, s.terminateChannel) + if err != nil { + s.logger.Error("failed to execute process", slog.String("process_name", name), + slog.Int("attempt", i), slog.String("error", err.Error())) + + s.waitFoRetry(name) + continue + } + + // don't expect handler return if it hasn't any error because it's long-running process + // it should return when receives shutdown signal + s.logger.Info("process terminates with no error", slog.String("process_name", name)) + + if s.isFatal(name) { + s.logger.Error("can't recover important process. exit..", slog.String("process_name", name)) + s.shutdownSignal <- os.Interrupt + } + + return + } + + s.logger.Info("don't try any more to execute process", slog.String("process_name", name)) + s.removeProcess(name) +} + +// WaitOnShutdownSignal wait to receive shutdown signal. +// WaitOnShutdownSignal should not be called in other goroutines except main goroutine of application +func (s *Supervisor) WaitOnShutdownSignal() { + // TODO : is it necessary to add os.Interrupt to supervisor config? + signal.Notify(s.shutdownSignal, os.Interrupt) + <-s.shutdownSignal + + s.gracefulShutdown() +} + +func (s *Supervisor) gracefulShutdown() { + s.logger.Info("shutdown all processes gracefully") + + s.logger.Info("notify all processes (goroutines) to finish their jobs", slog.Duration("shutdown_timeout", s.shutdownTimeout)) + s.ctxCancel() + + forceExitCtx, forceExitCancel := context.WithTimeout(context.Background(), s.shutdownTimeout) + defer forceExitCancel() + + for { + select { + case name := <-s.terminateChannel: + s.logger.Info("process terminates gracefully", slog.String("process_name", name)) + s.removeProcess(name) + + case <-forceExitCtx.Done(): + s.logger.Info("supervisor terminates its job.", slog.Int("number_of_unfinished_processes", len(s.processes))) + return + } + } +} + +func (s *Supervisor) removeProcess(name string) { + s.lock.Lock() + delete(s.processes, name) + s.lock.Unlock() +} + +func (s *Supervisor) isRecoverable(name string) bool { + s.lock.Lock() + defer s.lock.Unlock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return false + } + + if v.options.Recover && v.state.RecoveredNum < v.options.RecoverCount { + return true + } + + return false +} + +func (s *Supervisor) isFatal(name string) bool { + s.lock.Lock() + defer s.lock.Unlock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return false + } + + return v.options.IsFatal +} + +func (s *Supervisor) incRecover(name string) { + s.lock.Lock() + defer s.lock.Unlock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return + } + + v.state.RecoveredNum++ + s.processes[name] = v +} + +func (s *Supervisor) retryCount(name string) int { + s.lock.Lock() + defer s.lock.Unlock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return -1 + } + + return v.options.RetryCount +} + +func (s *Supervisor) retryInterval(name string) time.Duration { + s.lock.Lock() + defer s.lock.Unlock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return -1 + } + + return v.options.RetryInterval +} + +func (s *Supervisor) waitFoRecover(name string) { + s.lock.Lock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return + } + + t := v.options.RecoverInterval + + // free lock before sleep + s.lock.Unlock() + + time.Sleep(t) +} + +func (s *Supervisor) waitFoRetry(name string) { + s.lock.Lock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return + } + + t := v.options.RetryInterval + + // free lock before sleep + s.lock.Unlock() + + s.logger.Info("wait to retry execute process after sleep interval", + slog.String("process_name", name), slog.Duration("interval", + t)) + + time.Sleep(t) +} + +func (s *Supervisor) handler(name string) ProcessFunc { + s.lock.Lock() + defer s.lock.Unlock() + + v, ok := s.processes[name] + if !ok { + s.logger.Warn("process doesn't exist", slog.String("process_name", name)) + return noopProcessFunc + } + + return v.handler +} + +func (s *Supervisor) warnIfNameAlreadyInUse(name string) { + s.lock.Lock() + defer s.lock.Unlock() + + if _, ok := s.processes[name]; ok { + s.logger.Warn("process name already in use", slog.String("process_name", name)) + } +}