diff options
author | Slack Coder <slackcoder@server.ky> | 2024-10-23 06:07:32 -0500 |
---|---|---|
committer | Slack Coder <slackcoder@server.ky> | 2024-10-23 06:14:49 -0500 |
commit | ca3a097505c42c2aec16e208c571c1b496c4b14b (patch) | |
tree | aa1020524d6166ad2e527c15864f42e0ce15301b /internal/service/service.go | |
parent | c8ba7263e7923fc76eee7dbe129a8bfbc9501bb7 (diff) | |
download | mirror-ca3a097505c42c2aec16e208c571c1b496c4b14b.tar.xz |
Support reload and refactor config
- Simplify config parameters.
- Support hot config reloading.
Diffstat (limited to 'internal/service/service.go')
-rw-r--r-- | internal/service/service.go | 136 |
1 files changed, 99 insertions, 37 deletions
diff --git a/internal/service/service.go b/internal/service/service.go index 3038a03..c5235f2 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -3,7 +3,6 @@ package service import ( "errors" "fmt" - "math/rand" "os" "strconv" "strings" @@ -20,19 +19,32 @@ var requiredCommands = []string{ "rsync", } +// Service data associated with each mirror. +type mirrorRecord struct { + *Mirror + LastRun time.Time + NextRun time.Time +} + +func (s *mirrorRecord) String() string { + return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen)) +} + type Service struct { cfg *Config mirrorsLock sync.Mutex - mirrors []*Mirror - schedule []time.Time + mirrors []*mirrorRecord stopCh chan struct{} } -func NewService(cfg *Config) (*Service, error) { +func NewService(arg *Config) (*Service, error) { // Apply defaults + var cfg Config + cfg.Append(&DefaultConfig) + cfg.Append(arg) for _, cmd := range requiredCommands { if err := internal.RequireCommand(cmd); err != nil { @@ -41,7 +53,7 @@ func NewService(cfg *Config) (*Service, error) { } srv := &Service{ - cfg: cfg, + cfg: &cfg, mirrors: nil, stopCh: make(chan struct{}), } @@ -56,18 +68,6 @@ func NewService(cfg *Config) (*Service, error) { return srv, nil } -func (s *Service) scheduleNextRun(arg *Mirror) { - // Be polite. - period := s.cfg.MaxInterval.Duration - s.cfg.MinInterval.Duration - at := time.Now().Add(s.cfg.MinInterval.Duration + time.Duration(rand.Intn(int(period)))) - - for i, m := range s.mirrors { - if m.Equal(arg) { - s.schedule[i] = at - } - } -} - func (s *Service) AddMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() @@ -78,20 +78,22 @@ func (s *Service) AddMirror(arg *Mirror) error { } } - s.schedule = append(s.schedule, time.Time{}) - s.mirrors = append(s.mirrors, arg) - s.scheduleNextRun(arg) + record := &mirrorRecord{ + Mirror: arg, + NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), + } + s.mirrors = append(s.mirrors, record) return nil } -func (s *Service) scheduled(yield func(*Mirror) bool) { +func (s *Service) scheduled(yield func(*mirrorRecord) bool) { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() now := time.Now() - for i, m := range s.mirrors { - if s.schedule[i].After(now) { + for _, m := range s.mirrors { + if m.NextRun.After(now) { continue } @@ -131,24 +133,83 @@ func (s *Service) RemoveMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() + index := -1 + for i, m := range s.mirrors { if m.Equal(arg) { - s.mirrors[i] = s.mirrors[len(s.mirrors)-1] - s.mirrors = s.mirrors[:len(s.mirrors)-1] - - s.schedule[i] = s.schedule[len(s.schedule)-1] - s.schedule = s.schedule[:len(s.schedule)-1] - - return nil + index = i + break } } - return errors.New("not found") + if index == -1 { + return errors.New("not found") + } + + s.mirrors[index] = s.mirrors[len(s.mirrors)-1] + s.mirrors = s.mirrors[:len(s.mirrors)-1] + + return nil } -// Take arguments -func (s *Service) Reload(cfg *Config) error { - return errors.New("not implemented") +// Reload reloads the service with the given configuration. +func (s *Service) Reload(arg *Config) { + s.mirrorsLock.Lock() + defer s.mirrorsLock.Unlock() + + var cfg Config + + cfg.Append(&DefaultConfig) + cfg.Append(arg) + + s.cfg = &cfg + + mirrorsByDest := make(map[string]*mirrorRecord) + for _, m := range s.mirrors { + mirrorsByDest[m.To.String()] = m + } + + mirrors := make([]*mirrorRecord, 0) + + for _, m := range cfg.Mirrors { + oldM, ok := mirrorsByDest[m.To.String()] + if !ok { + mirrors = append( + mirrors, + &mirrorRecord{ + Mirror: m, + LastRun: time.Time{}, + NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), + }, + ) + + continue + } + + delete(mirrorsByDest, m.To.String()) + + lastRun := oldM.LastRun + if lastRun.IsZero() { + lastRun = time.Now() + } + + untilNextRun := Duration{oldM.NextRun.Sub(lastRun)} + untilNextRun = cfg.MinInterval.Max(&untilNextRun) + untilNextRun = cfg.MaxInterval.Min(&untilNextRun) + + // Records match up. + record := mirrorRecord{ + Mirror: m, + LastRun: oldM.LastRun, + NextRun: time.Now().Add(untilNextRun.Duration), + } + mirrors = append( + mirrors, + &record, + ) + } + + s.mirrors = mirrors } func (s *Service) Log(str string) { @@ -169,13 +230,14 @@ mainLoop: break mainLoop } - s.scheduled(func(m *Mirror) bool { - err := s.Mirror(m) + s.scheduled(func(m *mirrorRecord) bool { + err := s.Mirror(m.Mirror) if err != nil { s.Log(err.Error()) } - s.scheduleNextRun(m) + m.LastRun = time.Now() + m.NextRun = time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration) return true }) |