diff options
Diffstat (limited to 'internal/service')
-rw-r--r-- | internal/service/config.go | 59 | ||||
-rw-r--r-- | internal/service/service.go | 136 | ||||
-rw-r--r-- | internal/service/time.go | 59 |
3 files changed, 189 insertions, 65 deletions
diff --git a/internal/service/config.go b/internal/service/config.go index bfe271e..0f34997 100644 --- a/internal/service/config.go +++ b/internal/service/config.go @@ -3,7 +3,6 @@ package service import ( "fmt" "os" - "path" "path/filepath" "time" @@ -11,28 +10,6 @@ import ( "github.com/BurntSushi/toml" ) -type Duration struct { - time.Duration -} - -func DurationRef(v time.Duration) *Duration { - return &Duration{v} -} - -func (s Duration) MarshalText() ([]byte, error) { - return []byte(s.Duration.String()), nil -} - -func (s *Duration) UnmarshalText(text []byte) error { - v, err := time.ParseDuration(string(text)) - if err != nil { - return err - } - *s = Duration{v} - - return nil -} - // Global parameters type GlobalConfig struct { MaxInterval *Duration `toml:"max-interval"` @@ -56,7 +33,7 @@ var DefaultConfig = Config{ } // Read the given configuration file. -func ReadConfig(fp string) (*Config, error) { +func readConfigFile(fp string) (*Config, error) { var config Config f, err := os.Open(fp) @@ -74,11 +51,10 @@ func ReadConfig(fp string) (*Config, error) { } // Read all configuration in the given directory. -func ReadConfigDir(fp string) (*Config, error) { +func readConfigDir(fp string) (*Config, error) { var cfg Config - confDPath := path.Join(path.Join(fp, "conf.d")) - confDDir, err := os.ReadDir(confDPath) + confDDir, err := os.ReadDir(fp) if os.IsNotExist(err) { // No directory is an empty one. return &Config{}, nil @@ -91,7 +67,7 @@ func ReadConfigDir(fp string) (*Config, error) { continue } - entryCfg, err := ReadConfig(filepath.Join(confDPath, entry.Name())) + entryCfg, err := readConfigFile(filepath.Join(fp, entry.Name())) if err != nil { return nil, err } @@ -102,6 +78,33 @@ func ReadConfigDir(fp string) (*Config, error) { return &cfg, nil } +// Read all configuration in the given directory. +func ReadConfig(fps ...string) (*Config, error) { + var cfg Config + + for _, fp := range fps { + stat, err := os.Stat(fp) + if err != nil { + return nil, err + } + + var c *Config + + if stat.IsDir() { + c, err = readConfigDir(fp) + if err != nil { + return nil, err + } + } else { + c, err = readConfigFile(fp) + } + + cfg.Append(c) + } + + return &cfg, nil +} + // Apply the given configuration parameters. func (c *Config) Append(src *Config) { if src.MaxInterval != nil { diff --git a/internal/service/service.go b/internal/service/service.go index 3038a03..c5235f2 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -3,7 +3,6 @@ package service import ( "errors" "fmt" - "math/rand" "os" "strconv" "strings" @@ -20,19 +19,32 @@ var requiredCommands = []string{ "rsync", } +// Service data associated with each mirror. +type mirrorRecord struct { + *Mirror + LastRun time.Time + NextRun time.Time +} + +func (s *mirrorRecord) String() string { + return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen)) +} + type Service struct { cfg *Config mirrorsLock sync.Mutex - mirrors []*Mirror - schedule []time.Time + mirrors []*mirrorRecord stopCh chan struct{} } -func NewService(cfg *Config) (*Service, error) { +func NewService(arg *Config) (*Service, error) { // Apply defaults + var cfg Config + cfg.Append(&DefaultConfig) + cfg.Append(arg) for _, cmd := range requiredCommands { if err := internal.RequireCommand(cmd); err != nil { @@ -41,7 +53,7 @@ func NewService(cfg *Config) (*Service, error) { } srv := &Service{ - cfg: cfg, + cfg: &cfg, mirrors: nil, stopCh: make(chan struct{}), } @@ -56,18 +68,6 @@ func NewService(cfg *Config) (*Service, error) { return srv, nil } -func (s *Service) scheduleNextRun(arg *Mirror) { - // Be polite. - period := s.cfg.MaxInterval.Duration - s.cfg.MinInterval.Duration - at := time.Now().Add(s.cfg.MinInterval.Duration + time.Duration(rand.Intn(int(period)))) - - for i, m := range s.mirrors { - if m.Equal(arg) { - s.schedule[i] = at - } - } -} - func (s *Service) AddMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() @@ -78,20 +78,22 @@ func (s *Service) AddMirror(arg *Mirror) error { } } - s.schedule = append(s.schedule, time.Time{}) - s.mirrors = append(s.mirrors, arg) - s.scheduleNextRun(arg) + record := &mirrorRecord{ + Mirror: arg, + NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), + } + s.mirrors = append(s.mirrors, record) return nil } -func (s *Service) scheduled(yield func(*Mirror) bool) { +func (s *Service) scheduled(yield func(*mirrorRecord) bool) { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() now := time.Now() - for i, m := range s.mirrors { - if s.schedule[i].After(now) { + for _, m := range s.mirrors { + if m.NextRun.After(now) { continue } @@ -131,24 +133,83 @@ func (s *Service) RemoveMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() + index := -1 + for i, m := range s.mirrors { if m.Equal(arg) { - s.mirrors[i] = s.mirrors[len(s.mirrors)-1] - s.mirrors = s.mirrors[:len(s.mirrors)-1] - - s.schedule[i] = s.schedule[len(s.schedule)-1] - s.schedule = s.schedule[:len(s.schedule)-1] - - return nil + index = i + break } } - return errors.New("not found") + if index == -1 { + return errors.New("not found") + } + + s.mirrors[index] = s.mirrors[len(s.mirrors)-1] + s.mirrors = s.mirrors[:len(s.mirrors)-1] + + return nil } -// Take arguments -func (s *Service) Reload(cfg *Config) error { - return errors.New("not implemented") +// Reload reloads the service with the given configuration. +func (s *Service) Reload(arg *Config) { + s.mirrorsLock.Lock() + defer s.mirrorsLock.Unlock() + + var cfg Config + + cfg.Append(&DefaultConfig) + cfg.Append(arg) + + s.cfg = &cfg + + mirrorsByDest := make(map[string]*mirrorRecord) + for _, m := range s.mirrors { + mirrorsByDest[m.To.String()] = m + } + + mirrors := make([]*mirrorRecord, 0) + + for _, m := range cfg.Mirrors { + oldM, ok := mirrorsByDest[m.To.String()] + if !ok { + mirrors = append( + mirrors, + &mirrorRecord{ + Mirror: m, + LastRun: time.Time{}, + NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), + }, + ) + + continue + } + + delete(mirrorsByDest, m.To.String()) + + lastRun := oldM.LastRun + if lastRun.IsZero() { + lastRun = time.Now() + } + + untilNextRun := Duration{oldM.NextRun.Sub(lastRun)} + untilNextRun = cfg.MinInterval.Max(&untilNextRun) + untilNextRun = cfg.MaxInterval.Min(&untilNextRun) + + // Records match up. + record := mirrorRecord{ + Mirror: m, + LastRun: oldM.LastRun, + NextRun: time.Now().Add(untilNextRun.Duration), + } + mirrors = append( + mirrors, + &record, + ) + } + + s.mirrors = mirrors } func (s *Service) Log(str string) { @@ -169,13 +230,14 @@ mainLoop: break mainLoop } - s.scheduled(func(m *Mirror) bool { - err := s.Mirror(m) + s.scheduled(func(m *mirrorRecord) bool { + err := s.Mirror(m.Mirror) if err != nil { s.Log(err.Error()) } - s.scheduleNextRun(m) + m.LastRun = time.Now() + m.NextRun = time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration) return true }) diff --git a/internal/service/time.go b/internal/service/time.go new file mode 100644 index 0000000..b275b82 --- /dev/null +++ b/internal/service/time.go @@ -0,0 +1,59 @@ +package service + +import ( + "math/rand" + "time" +) + +type Duration struct { + time.Duration +} + +func DurationRef(v time.Duration) *Duration { + return &Duration{v} +} + +func (s Duration) MarshalText() ([]byte, error) { + return []byte(s.Duration.String()), nil +} + +func (s *Duration) Max(args ...*Duration) Duration { + m := s + + for _, o := range args { + if m.Duration < o.Duration { + v := o + m = v + } + } + + return *m +} + +func (s *Duration) Min(args ...*Duration) Duration { + m := s + + for _, o := range args { + if m.Duration > o.Duration { + v := o + m = v + } + } + + return *m +} + +func (s *Duration) UnmarshalText(text []byte) error { + v, err := time.ParseDuration(string(text)) + if err != nil { + return err + } + *s = Duration{v} + + return nil +} + +func RandomDuration(from Duration, until Duration) Duration { + period := until.Duration - from.Duration + return Duration{from.Duration + time.Duration(rand.Intn(int(period)))} +} |