aboutsummaryrefslogtreecommitdiffsponsor
diff options
context:
space:
mode:
authorSlack Coder <slackcoder@server.ky>2024-10-23 06:07:32 -0500
committerSlack Coder <slackcoder@server.ky>2024-10-23 06:14:49 -0500
commitca3a097505c42c2aec16e208c571c1b496c4b14b (patch)
treeaa1020524d6166ad2e527c15864f42e0ce15301b
parentc8ba7263e7923fc76eee7dbe129a8bfbc9501bb7 (diff)
downloadmirror-ca3a097505c42c2aec16e208c571c1b496c4b14b.tar.xz
Support reload and refactor config
- Simplify config parameters. - Support hot config reloading.
-rw-r--r--README.md4
-rw-r--r--cmd/mirror/main.go46
-rw-r--r--internal/service/config.go59
-rw-r--r--internal/service/service.go136
-rw-r--r--internal/service/time.go59
5 files changed, 219 insertions, 85 deletions
diff --git a/README.md b/README.md
index d6d0e85..307829b 100644
--- a/README.md
+++ b/README.md
@@ -90,3 +90,7 @@ method = "github-assets"
from = "https://github.com/ytdl-org/youtube-dl"
to = "/mirror/youtube-dl"
```
+
+Configuration may also be split across files in a directory. By default
+loads configuration from /etc/mirror/mirror.toml and the /etc/mirror/conf.d
+directory.
diff --git a/cmd/mirror/main.go b/cmd/mirror/main.go
index 1da9b15..4c551f0 100644
--- a/cmd/mirror/main.go
+++ b/cmd/mirror/main.go
@@ -6,6 +6,7 @@ import (
"log"
"os"
"os/signal"
+ "strings"
"syscall"
"git.server.ky/slackcoder/mirror/internal/service"
@@ -23,8 +24,7 @@ type Flags struct {
func ParseFlags() *Flags {
var flags Flags
- flag.StringVar(&flags.Config, "config", "/etc/mirror/mirror.toml", "configuration file which takes precedence")
- flag.StringVar(&flags.ConfigDir, "config-dir", "/etc/mirror/conf.d", "configuration directory")
+ flag.StringVar(&flags.Config, "config", "/etc/mirror/conf.d,/etc/mirror/mirror.toml", "comma separated configuration files or directories")
flag.BoolVar(&flags.Version, "version", false, "print version")
flag.Parse()
@@ -38,11 +38,27 @@ func exitOnError(err error) {
}
}
-func handleSignal(c <-chan os.Signal, srv *service.Service) {
+func splitCommaString(str string) []string {
+ strs := strings.Split(str, ",")
+
+ for i := range strs {
+ strs[i] = strings.TrimSpace(strs[i])
+ }
+
+ return strs
+}
+
+func handleSignal(c <-chan os.Signal, srv *service.Service, flags *Flags) {
for v := range c {
if v == syscall.SIGHUP {
- var cfg service.Config
- _ = srv.Reload(&cfg)
+ configs := splitCommaString(flags.Config)
+ cfg, err := service.ReadConfig(configs...)
+ if err != nil {
+ srv.Log(err.Error())
+ continue
+ }
+
+ srv.Reload(cfg)
}
}
}
@@ -56,26 +72,16 @@ func main() {
os.Exit(0)
}
- signalCh := make(chan os.Signal, 1)
- signal.Notify(signalCh, syscall.SIGHUP)
-
- var cfg service.Config
-
- c, err := service.ReadConfigDir(flags.ConfigDir)
+ configs := splitCommaString(flags.Config)
+ cfg, err := service.ReadConfig(configs...)
exitOnError(err)
- cfg.Append(c)
-
- c, err = service.ReadConfig(flags.Config)
+ srv, err := service.NewService(cfg)
exitOnError(err)
- cfg.Append(c)
-
- srv, err := service.NewService(&cfg)
- exitOnError(err)
-
- signalCh = make(chan os.Signal, 1)
+ signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGHUP)
+ go handleSignal(signalCh, srv, flags)
err = srv.Run()
exitOnError(err)
diff --git a/internal/service/config.go b/internal/service/config.go
index bfe271e..0f34997 100644
--- a/internal/service/config.go
+++ b/internal/service/config.go
@@ -3,7 +3,6 @@ package service
import (
"fmt"
"os"
- "path"
"path/filepath"
"time"
@@ -11,28 +10,6 @@ import (
"github.com/BurntSushi/toml"
)
-type Duration struct {
- time.Duration
-}
-
-func DurationRef(v time.Duration) *Duration {
- return &Duration{v}
-}
-
-func (s Duration) MarshalText() ([]byte, error) {
- return []byte(s.Duration.String()), nil
-}
-
-func (s *Duration) UnmarshalText(text []byte) error {
- v, err := time.ParseDuration(string(text))
- if err != nil {
- return err
- }
- *s = Duration{v}
-
- return nil
-}
-
// Global parameters
type GlobalConfig struct {
MaxInterval *Duration `toml:"max-interval"`
@@ -56,7 +33,7 @@ var DefaultConfig = Config{
}
// Read the given configuration file.
-func ReadConfig(fp string) (*Config, error) {
+func readConfigFile(fp string) (*Config, error) {
var config Config
f, err := os.Open(fp)
@@ -74,11 +51,10 @@ func ReadConfig(fp string) (*Config, error) {
}
// Read all configuration in the given directory.
-func ReadConfigDir(fp string) (*Config, error) {
+func readConfigDir(fp string) (*Config, error) {
var cfg Config
- confDPath := path.Join(path.Join(fp, "conf.d"))
- confDDir, err := os.ReadDir(confDPath)
+ confDDir, err := os.ReadDir(fp)
if os.IsNotExist(err) {
// No directory is an empty one.
return &Config{}, nil
@@ -91,7 +67,7 @@ func ReadConfigDir(fp string) (*Config, error) {
continue
}
- entryCfg, err := ReadConfig(filepath.Join(confDPath, entry.Name()))
+ entryCfg, err := readConfigFile(filepath.Join(fp, entry.Name()))
if err != nil {
return nil, err
}
@@ -102,6 +78,33 @@ func ReadConfigDir(fp string) (*Config, error) {
return &cfg, nil
}
+// Read all configuration in the given directory.
+func ReadConfig(fps ...string) (*Config, error) {
+ var cfg Config
+
+ for _, fp := range fps {
+ stat, err := os.Stat(fp)
+ if err != nil {
+ return nil, err
+ }
+
+ var c *Config
+
+ if stat.IsDir() {
+ c, err = readConfigDir(fp)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ c, err = readConfigFile(fp)
+ }
+
+ cfg.Append(c)
+ }
+
+ return &cfg, nil
+}
+
// Apply the given configuration parameters.
func (c *Config) Append(src *Config) {
if src.MaxInterval != nil {
diff --git a/internal/service/service.go b/internal/service/service.go
index 3038a03..c5235f2 100644
--- a/internal/service/service.go
+++ b/internal/service/service.go
@@ -3,7 +3,6 @@ package service
import (
"errors"
"fmt"
- "math/rand"
"os"
"strconv"
"strings"
@@ -20,19 +19,32 @@ var requiredCommands = []string{
"rsync",
}
+// Service data associated with each mirror.
+type mirrorRecord struct {
+ *Mirror
+ LastRun time.Time
+ NextRun time.Time
+}
+
+func (s *mirrorRecord) String() string {
+ return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen))
+}
+
type Service struct {
cfg *Config
mirrorsLock sync.Mutex
- mirrors []*Mirror
- schedule []time.Time
+ mirrors []*mirrorRecord
stopCh chan struct{}
}
-func NewService(cfg *Config) (*Service, error) {
+func NewService(arg *Config) (*Service, error) {
// Apply defaults
+ var cfg Config
+
cfg.Append(&DefaultConfig)
+ cfg.Append(arg)
for _, cmd := range requiredCommands {
if err := internal.RequireCommand(cmd); err != nil {
@@ -41,7 +53,7 @@ func NewService(cfg *Config) (*Service, error) {
}
srv := &Service{
- cfg: cfg,
+ cfg: &cfg,
mirrors: nil,
stopCh: make(chan struct{}),
}
@@ -56,18 +68,6 @@ func NewService(cfg *Config) (*Service, error) {
return srv, nil
}
-func (s *Service) scheduleNextRun(arg *Mirror) {
- // Be polite.
- period := s.cfg.MaxInterval.Duration - s.cfg.MinInterval.Duration
- at := time.Now().Add(s.cfg.MinInterval.Duration + time.Duration(rand.Intn(int(period))))
-
- for i, m := range s.mirrors {
- if m.Equal(arg) {
- s.schedule[i] = at
- }
- }
-}
-
func (s *Service) AddMirror(arg *Mirror) error {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
@@ -78,20 +78,22 @@ func (s *Service) AddMirror(arg *Mirror) error {
}
}
- s.schedule = append(s.schedule, time.Time{})
- s.mirrors = append(s.mirrors, arg)
- s.scheduleNextRun(arg)
+ record := &mirrorRecord{
+ Mirror: arg,
+ NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration),
+ }
+ s.mirrors = append(s.mirrors, record)
return nil
}
-func (s *Service) scheduled(yield func(*Mirror) bool) {
+func (s *Service) scheduled(yield func(*mirrorRecord) bool) {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
now := time.Now()
- for i, m := range s.mirrors {
- if s.schedule[i].After(now) {
+ for _, m := range s.mirrors {
+ if m.NextRun.After(now) {
continue
}
@@ -131,24 +133,83 @@ func (s *Service) RemoveMirror(arg *Mirror) error {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
+ index := -1
+
for i, m := range s.mirrors {
if m.Equal(arg) {
- s.mirrors[i] = s.mirrors[len(s.mirrors)-1]
- s.mirrors = s.mirrors[:len(s.mirrors)-1]
-
- s.schedule[i] = s.schedule[len(s.schedule)-1]
- s.schedule = s.schedule[:len(s.schedule)-1]
-
- return nil
+ index = i
+ break
}
}
- return errors.New("not found")
+ if index == -1 {
+ return errors.New("not found")
+ }
+
+ s.mirrors[index] = s.mirrors[len(s.mirrors)-1]
+ s.mirrors = s.mirrors[:len(s.mirrors)-1]
+
+ return nil
}
-// Take arguments
-func (s *Service) Reload(cfg *Config) error {
- return errors.New("not implemented")
+// Reload reloads the service with the given configuration.
+func (s *Service) Reload(arg *Config) {
+ s.mirrorsLock.Lock()
+ defer s.mirrorsLock.Unlock()
+
+ var cfg Config
+
+ cfg.Append(&DefaultConfig)
+ cfg.Append(arg)
+
+ s.cfg = &cfg
+
+ mirrorsByDest := make(map[string]*mirrorRecord)
+ for _, m := range s.mirrors {
+ mirrorsByDest[m.To.String()] = m
+ }
+
+ mirrors := make([]*mirrorRecord, 0)
+
+ for _, m := range cfg.Mirrors {
+ oldM, ok := mirrorsByDest[m.To.String()]
+ if !ok {
+ mirrors = append(
+ mirrors,
+ &mirrorRecord{
+ Mirror: m,
+ LastRun: time.Time{},
+ NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration),
+ },
+ )
+
+ continue
+ }
+
+ delete(mirrorsByDest, m.To.String())
+
+ lastRun := oldM.LastRun
+ if lastRun.IsZero() {
+ lastRun = time.Now()
+ }
+
+ untilNextRun := Duration{oldM.NextRun.Sub(lastRun)}
+ untilNextRun = cfg.MinInterval.Max(&untilNextRun)
+ untilNextRun = cfg.MaxInterval.Min(&untilNextRun)
+
+ // Records match up.
+ record := mirrorRecord{
+ Mirror: m,
+ LastRun: oldM.LastRun,
+ NextRun: time.Now().Add(untilNextRun.Duration),
+ }
+ mirrors = append(
+ mirrors,
+ &record,
+ )
+ }
+
+ s.mirrors = mirrors
}
func (s *Service) Log(str string) {
@@ -169,13 +230,14 @@ mainLoop:
break mainLoop
}
- s.scheduled(func(m *Mirror) bool {
- err := s.Mirror(m)
+ s.scheduled(func(m *mirrorRecord) bool {
+ err := s.Mirror(m.Mirror)
if err != nil {
s.Log(err.Error())
}
- s.scheduleNextRun(m)
+ m.LastRun = time.Now()
+ m.NextRun = time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration)
return true
})
diff --git a/internal/service/time.go b/internal/service/time.go
new file mode 100644
index 0000000..b275b82
--- /dev/null
+++ b/internal/service/time.go
@@ -0,0 +1,59 @@
+package service
+
+import (
+ "math/rand"
+ "time"
+)
+
+type Duration struct {
+ time.Duration
+}
+
+func DurationRef(v time.Duration) *Duration {
+ return &Duration{v}
+}
+
+func (s Duration) MarshalText() ([]byte, error) {
+ return []byte(s.Duration.String()), nil
+}
+
+func (s *Duration) Max(args ...*Duration) Duration {
+ m := s
+
+ for _, o := range args {
+ if m.Duration < o.Duration {
+ v := o
+ m = v
+ }
+ }
+
+ return *m
+}
+
+func (s *Duration) Min(args ...*Duration) Duration {
+ m := s
+
+ for _, o := range args {
+ if m.Duration > o.Duration {
+ v := o
+ m = v
+ }
+ }
+
+ return *m
+}
+
+func (s *Duration) UnmarshalText(text []byte) error {
+ v, err := time.ParseDuration(string(text))
+ if err != nil {
+ return err
+ }
+ *s = Duration{v}
+
+ return nil
+}
+
+func RandomDuration(from Duration, until Duration) Duration {
+ period := until.Duration - from.Duration
+ return Duration{from.Duration + time.Duration(rand.Intn(int(period)))}
+}