From ca3a097505c42c2aec16e208c571c1b496c4b14b Mon Sep 17 00:00:00 2001 From: Slack Coder Date: Wed, 23 Oct 2024 06:07:32 -0500 Subject: Support reload and refactor config - Simplify config parameters. - Support hot config reloading. --- README.md | 4 ++ cmd/mirror/main.go | 46 ++++++++------- internal/service/config.go | 59 ++++++++++--------- internal/service/service.go | 136 ++++++++++++++++++++++++++++++++------------ internal/service/time.go | 59 +++++++++++++++++++ 5 files changed, 219 insertions(+), 85 deletions(-) create mode 100644 internal/service/time.go diff --git a/README.md b/README.md index d6d0e85..307829b 100644 --- a/README.md +++ b/README.md @@ -90,3 +90,7 @@ method = "github-assets" from = "https://github.com/ytdl-org/youtube-dl" to = "/mirror/youtube-dl" ``` + +Configuration may also be split across files in a directory. By default +loads configuration from /etc/mirror/mirror.toml and the /etc/mirror/conf.d +directory. diff --git a/cmd/mirror/main.go b/cmd/mirror/main.go index 1da9b15..4c551f0 100644 --- a/cmd/mirror/main.go +++ b/cmd/mirror/main.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "strings" "syscall" "git.server.ky/slackcoder/mirror/internal/service" @@ -23,8 +24,7 @@ type Flags struct { func ParseFlags() *Flags { var flags Flags - flag.StringVar(&flags.Config, "config", "/etc/mirror/mirror.toml", "configuration file which takes precedence") - flag.StringVar(&flags.ConfigDir, "config-dir", "/etc/mirror/conf.d", "configuration directory") + flag.StringVar(&flags.Config, "config", "/etc/mirror/conf.d,/etc/mirror/mirror.toml", "comma separated configuration files or directories") flag.BoolVar(&flags.Version, "version", false, "print version") flag.Parse() @@ -38,11 +38,27 @@ func exitOnError(err error) { } } -func handleSignal(c <-chan os.Signal, srv *service.Service) { +func splitCommaString(str string) []string { + strs := strings.Split(str, ",") + + for i := range strs { + strs[i] = strings.TrimSpace(strs[i]) + } + + return strs +} + +func handleSignal(c <-chan os.Signal, srv *service.Service, flags *Flags) { for v := range c { if v == syscall.SIGHUP { - var cfg service.Config - _ = srv.Reload(&cfg) + configs := splitCommaString(flags.Config) + cfg, err := service.ReadConfig(configs...) + if err != nil { + srv.Log(err.Error()) + continue + } + + srv.Reload(cfg) } } } @@ -56,26 +72,16 @@ func main() { os.Exit(0) } - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, syscall.SIGHUP) - - var cfg service.Config - - c, err := service.ReadConfigDir(flags.ConfigDir) + configs := splitCommaString(flags.Config) + cfg, err := service.ReadConfig(configs...) exitOnError(err) - cfg.Append(c) - - c, err = service.ReadConfig(flags.Config) + srv, err := service.NewService(cfg) exitOnError(err) - cfg.Append(c) - - srv, err := service.NewService(&cfg) - exitOnError(err) - - signalCh = make(chan os.Signal, 1) + signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, syscall.SIGHUP) + go handleSignal(signalCh, srv, flags) err = srv.Run() exitOnError(err) diff --git a/internal/service/config.go b/internal/service/config.go index bfe271e..0f34997 100644 --- a/internal/service/config.go +++ b/internal/service/config.go @@ -3,7 +3,6 @@ package service import ( "fmt" "os" - "path" "path/filepath" "time" @@ -11,28 +10,6 @@ import ( "github.com/BurntSushi/toml" ) -type Duration struct { - time.Duration -} - -func DurationRef(v time.Duration) *Duration { - return &Duration{v} -} - -func (s Duration) MarshalText() ([]byte, error) { - return []byte(s.Duration.String()), nil -} - -func (s *Duration) UnmarshalText(text []byte) error { - v, err := time.ParseDuration(string(text)) - if err != nil { - return err - } - *s = Duration{v} - - return nil -} - // Global parameters type GlobalConfig struct { MaxInterval *Duration `toml:"max-interval"` @@ -56,7 +33,7 @@ var DefaultConfig = Config{ } // Read the given configuration file. -func ReadConfig(fp string) (*Config, error) { +func readConfigFile(fp string) (*Config, error) { var config Config f, err := os.Open(fp) @@ -74,11 +51,10 @@ func ReadConfig(fp string) (*Config, error) { } // Read all configuration in the given directory. -func ReadConfigDir(fp string) (*Config, error) { +func readConfigDir(fp string) (*Config, error) { var cfg Config - confDPath := path.Join(path.Join(fp, "conf.d")) - confDDir, err := os.ReadDir(confDPath) + confDDir, err := os.ReadDir(fp) if os.IsNotExist(err) { // No directory is an empty one. return &Config{}, nil @@ -91,7 +67,7 @@ func ReadConfigDir(fp string) (*Config, error) { continue } - entryCfg, err := ReadConfig(filepath.Join(confDPath, entry.Name())) + entryCfg, err := readConfigFile(filepath.Join(fp, entry.Name())) if err != nil { return nil, err } @@ -102,6 +78,33 @@ func ReadConfigDir(fp string) (*Config, error) { return &cfg, nil } +// Read all configuration in the given directory. +func ReadConfig(fps ...string) (*Config, error) { + var cfg Config + + for _, fp := range fps { + stat, err := os.Stat(fp) + if err != nil { + return nil, err + } + + var c *Config + + if stat.IsDir() { + c, err = readConfigDir(fp) + if err != nil { + return nil, err + } + } else { + c, err = readConfigFile(fp) + } + + cfg.Append(c) + } + + return &cfg, nil +} + // Apply the given configuration parameters. func (c *Config) Append(src *Config) { if src.MaxInterval != nil { diff --git a/internal/service/service.go b/internal/service/service.go index 3038a03..c5235f2 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -3,7 +3,6 @@ package service import ( "errors" "fmt" - "math/rand" "os" "strconv" "strings" @@ -20,19 +19,32 @@ var requiredCommands = []string{ "rsync", } +// Service data associated with each mirror. +type mirrorRecord struct { + *Mirror + LastRun time.Time + NextRun time.Time +} + +func (s *mirrorRecord) String() string { + return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen)) +} + type Service struct { cfg *Config mirrorsLock sync.Mutex - mirrors []*Mirror - schedule []time.Time + mirrors []*mirrorRecord stopCh chan struct{} } -func NewService(cfg *Config) (*Service, error) { +func NewService(arg *Config) (*Service, error) { // Apply defaults + var cfg Config + cfg.Append(&DefaultConfig) + cfg.Append(arg) for _, cmd := range requiredCommands { if err := internal.RequireCommand(cmd); err != nil { @@ -41,7 +53,7 @@ func NewService(cfg *Config) (*Service, error) { } srv := &Service{ - cfg: cfg, + cfg: &cfg, mirrors: nil, stopCh: make(chan struct{}), } @@ -56,18 +68,6 @@ func NewService(cfg *Config) (*Service, error) { return srv, nil } -func (s *Service) scheduleNextRun(arg *Mirror) { - // Be polite. - period := s.cfg.MaxInterval.Duration - s.cfg.MinInterval.Duration - at := time.Now().Add(s.cfg.MinInterval.Duration + time.Duration(rand.Intn(int(period)))) - - for i, m := range s.mirrors { - if m.Equal(arg) { - s.schedule[i] = at - } - } -} - func (s *Service) AddMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() @@ -78,20 +78,22 @@ func (s *Service) AddMirror(arg *Mirror) error { } } - s.schedule = append(s.schedule, time.Time{}) - s.mirrors = append(s.mirrors, arg) - s.scheduleNextRun(arg) + record := &mirrorRecord{ + Mirror: arg, + NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), + } + s.mirrors = append(s.mirrors, record) return nil } -func (s *Service) scheduled(yield func(*Mirror) bool) { +func (s *Service) scheduled(yield func(*mirrorRecord) bool) { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() now := time.Now() - for i, m := range s.mirrors { - if s.schedule[i].After(now) { + for _, m := range s.mirrors { + if m.NextRun.After(now) { continue } @@ -131,24 +133,83 @@ func (s *Service) RemoveMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() + index := -1 + for i, m := range s.mirrors { if m.Equal(arg) { - s.mirrors[i] = s.mirrors[len(s.mirrors)-1] - s.mirrors = s.mirrors[:len(s.mirrors)-1] - - s.schedule[i] = s.schedule[len(s.schedule)-1] - s.schedule = s.schedule[:len(s.schedule)-1] - - return nil + index = i + break } } - return errors.New("not found") + if index == -1 { + return errors.New("not found") + } + + s.mirrors[index] = s.mirrors[len(s.mirrors)-1] + s.mirrors = s.mirrors[:len(s.mirrors)-1] + + return nil } -// Take arguments -func (s *Service) Reload(cfg *Config) error { - return errors.New("not implemented") +// Reload reloads the service with the given configuration. +func (s *Service) Reload(arg *Config) { + s.mirrorsLock.Lock() + defer s.mirrorsLock.Unlock() + + var cfg Config + + cfg.Append(&DefaultConfig) + cfg.Append(arg) + + s.cfg = &cfg + + mirrorsByDest := make(map[string]*mirrorRecord) + for _, m := range s.mirrors { + mirrorsByDest[m.To.String()] = m + } + + mirrors := make([]*mirrorRecord, 0) + + for _, m := range cfg.Mirrors { + oldM, ok := mirrorsByDest[m.To.String()] + if !ok { + mirrors = append( + mirrors, + &mirrorRecord{ + Mirror: m, + LastRun: time.Time{}, + NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), + }, + ) + + continue + } + + delete(mirrorsByDest, m.To.String()) + + lastRun := oldM.LastRun + if lastRun.IsZero() { + lastRun = time.Now() + } + + untilNextRun := Duration{oldM.NextRun.Sub(lastRun)} + untilNextRun = cfg.MinInterval.Max(&untilNextRun) + untilNextRun = cfg.MaxInterval.Min(&untilNextRun) + + // Records match up. + record := mirrorRecord{ + Mirror: m, + LastRun: oldM.LastRun, + NextRun: time.Now().Add(untilNextRun.Duration), + } + mirrors = append( + mirrors, + &record, + ) + } + + s.mirrors = mirrors } func (s *Service) Log(str string) { @@ -169,13 +230,14 @@ mainLoop: break mainLoop } - s.scheduled(func(m *Mirror) bool { - err := s.Mirror(m) + s.scheduled(func(m *mirrorRecord) bool { + err := s.Mirror(m.Mirror) if err != nil { s.Log(err.Error()) } - s.scheduleNextRun(m) + m.LastRun = time.Now() + m.NextRun = time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration) return true }) diff --git a/internal/service/time.go b/internal/service/time.go new file mode 100644 index 0000000..b275b82 --- /dev/null +++ b/internal/service/time.go @@ -0,0 +1,59 @@ +package service + +import ( + "math/rand" + "time" +) + +type Duration struct { + time.Duration +} + +func DurationRef(v time.Duration) *Duration { + return &Duration{v} +} + +func (s Duration) MarshalText() ([]byte, error) { + return []byte(s.Duration.String()), nil +} + +func (s *Duration) Max(args ...*Duration) Duration { + m := s + + for _, o := range args { + if m.Duration < o.Duration { + v := o + m = v + } + } + + return *m +} + +func (s *Duration) Min(args ...*Duration) Duration { + m := s + + for _, o := range args { + if m.Duration > o.Duration { + v := o + m = v + } + } + + return *m +} + +func (s *Duration) UnmarshalText(text []byte) error { + v, err := time.ParseDuration(string(text)) + if err != nil { + return err + } + *s = Duration{v} + + return nil +} + +func RandomDuration(from Duration, until Duration) Duration { + period := until.Duration - from.Duration + return Duration{from.Duration + time.Duration(rand.Intn(int(period)))} +} -- cgit v1.2.3