aboutsummaryrefslogtreecommitdiffsponsor
path: root/internal/service
diff options
context:
space:
mode:
Diffstat (limited to 'internal/service')
-rw-r--r--internal/service/config.go59
-rw-r--r--internal/service/service.go136
-rw-r--r--internal/service/time.go59
3 files changed, 189 insertions, 65 deletions
diff --git a/internal/service/config.go b/internal/service/config.go
index bfe271e..0f34997 100644
--- a/internal/service/config.go
+++ b/internal/service/config.go
@@ -3,7 +3,6 @@ package service
import (
"fmt"
"os"
- "path"
"path/filepath"
"time"
@@ -11,28 +10,6 @@ import (
"github.com/BurntSushi/toml"
)
-type Duration struct {
- time.Duration
-}
-
-func DurationRef(v time.Duration) *Duration {
- return &Duration{v}
-}
-
-func (s Duration) MarshalText() ([]byte, error) {
- return []byte(s.Duration.String()), nil
-}
-
-func (s *Duration) UnmarshalText(text []byte) error {
- v, err := time.ParseDuration(string(text))
- if err != nil {
- return err
- }
- *s = Duration{v}
-
- return nil
-}
-
// Global parameters
type GlobalConfig struct {
MaxInterval *Duration `toml:"max-interval"`
@@ -56,7 +33,7 @@ var DefaultConfig = Config{
}
// Read the given configuration file.
-func ReadConfig(fp string) (*Config, error) {
+func readConfigFile(fp string) (*Config, error) {
var config Config
f, err := os.Open(fp)
@@ -74,11 +51,10 @@ func ReadConfig(fp string) (*Config, error) {
}
// Read all configuration in the given directory.
-func ReadConfigDir(fp string) (*Config, error) {
+func readConfigDir(fp string) (*Config, error) {
var cfg Config
- confDPath := path.Join(path.Join(fp, "conf.d"))
- confDDir, err := os.ReadDir(confDPath)
+ confDDir, err := os.ReadDir(fp)
if os.IsNotExist(err) {
// No directory is an empty one.
return &Config{}, nil
@@ -91,7 +67,7 @@ func ReadConfigDir(fp string) (*Config, error) {
continue
}
- entryCfg, err := ReadConfig(filepath.Join(confDPath, entry.Name()))
+ entryCfg, err := readConfigFile(filepath.Join(fp, entry.Name()))
if err != nil {
return nil, err
}
@@ -102,6 +78,33 @@ func ReadConfigDir(fp string) (*Config, error) {
return &cfg, nil
}
+// Read all configuration in the given directory.
+func ReadConfig(fps ...string) (*Config, error) {
+ var cfg Config
+
+ for _, fp := range fps {
+ stat, err := os.Stat(fp)
+ if err != nil {
+ return nil, err
+ }
+
+ var c *Config
+
+ if stat.IsDir() {
+ c, err = readConfigDir(fp)
+ if err != nil {
+ return nil, err
+ }
+ } else {
+ c, err = readConfigFile(fp)
+ }
+
+ cfg.Append(c)
+ }
+
+ return &cfg, nil
+}
+
// Apply the given configuration parameters.
func (c *Config) Append(src *Config) {
if src.MaxInterval != nil {
diff --git a/internal/service/service.go b/internal/service/service.go
index 3038a03..c5235f2 100644
--- a/internal/service/service.go
+++ b/internal/service/service.go
@@ -3,7 +3,6 @@ package service
import (
"errors"
"fmt"
- "math/rand"
"os"
"strconv"
"strings"
@@ -20,19 +19,32 @@ var requiredCommands = []string{
"rsync",
}
+// Service data associated with each mirror.
+type mirrorRecord struct {
+ *Mirror
+ LastRun time.Time
+ NextRun time.Time
+}
+
+func (s *mirrorRecord) String() string {
+ return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen))
+}
+
type Service struct {
cfg *Config
mirrorsLock sync.Mutex
- mirrors []*Mirror
- schedule []time.Time
+ mirrors []*mirrorRecord
stopCh chan struct{}
}
-func NewService(cfg *Config) (*Service, error) {
+func NewService(arg *Config) (*Service, error) {
// Apply defaults
+ var cfg Config
+
cfg.Append(&DefaultConfig)
+ cfg.Append(arg)
for _, cmd := range requiredCommands {
if err := internal.RequireCommand(cmd); err != nil {
@@ -41,7 +53,7 @@ func NewService(cfg *Config) (*Service, error) {
}
srv := &Service{
- cfg: cfg,
+ cfg: &cfg,
mirrors: nil,
stopCh: make(chan struct{}),
}
@@ -56,18 +68,6 @@ func NewService(cfg *Config) (*Service, error) {
return srv, nil
}
-func (s *Service) scheduleNextRun(arg *Mirror) {
- // Be polite.
- period := s.cfg.MaxInterval.Duration - s.cfg.MinInterval.Duration
- at := time.Now().Add(s.cfg.MinInterval.Duration + time.Duration(rand.Intn(int(period))))
-
- for i, m := range s.mirrors {
- if m.Equal(arg) {
- s.schedule[i] = at
- }
- }
-}
-
func (s *Service) AddMirror(arg *Mirror) error {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
@@ -78,20 +78,22 @@ func (s *Service) AddMirror(arg *Mirror) error {
}
}
- s.schedule = append(s.schedule, time.Time{})
- s.mirrors = append(s.mirrors, arg)
- s.scheduleNextRun(arg)
+ record := &mirrorRecord{
+ Mirror: arg,
+ NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration),
+ }
+ s.mirrors = append(s.mirrors, record)
return nil
}
-func (s *Service) scheduled(yield func(*Mirror) bool) {
+func (s *Service) scheduled(yield func(*mirrorRecord) bool) {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
now := time.Now()
- for i, m := range s.mirrors {
- if s.schedule[i].After(now) {
+ for _, m := range s.mirrors {
+ if m.NextRun.After(now) {
continue
}
@@ -131,24 +133,83 @@ func (s *Service) RemoveMirror(arg *Mirror) error {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
+ index := -1
+
for i, m := range s.mirrors {
if m.Equal(arg) {
- s.mirrors[i] = s.mirrors[len(s.mirrors)-1]
- s.mirrors = s.mirrors[:len(s.mirrors)-1]
-
- s.schedule[i] = s.schedule[len(s.schedule)-1]
- s.schedule = s.schedule[:len(s.schedule)-1]
-
- return nil
+ index = i
+ break
}
}
- return errors.New("not found")
+ if index == -1 {
+ return errors.New("not found")
+ }
+
+ s.mirrors[index] = s.mirrors[len(s.mirrors)-1]
+ s.mirrors = s.mirrors[:len(s.mirrors)-1]
+
+ return nil
}
-// Take arguments
-func (s *Service) Reload(cfg *Config) error {
- return errors.New("not implemented")
+// Reload reloads the service with the given configuration.
+func (s *Service) Reload(arg *Config) {
+ s.mirrorsLock.Lock()
+ defer s.mirrorsLock.Unlock()
+
+ var cfg Config
+
+ cfg.Append(&DefaultConfig)
+ cfg.Append(arg)
+
+ s.cfg = &cfg
+
+ mirrorsByDest := make(map[string]*mirrorRecord)
+ for _, m := range s.mirrors {
+ mirrorsByDest[m.To.String()] = m
+ }
+
+ mirrors := make([]*mirrorRecord, 0)
+
+ for _, m := range cfg.Mirrors {
+ oldM, ok := mirrorsByDest[m.To.String()]
+ if !ok {
+ mirrors = append(
+ mirrors,
+ &mirrorRecord{
+ Mirror: m,
+ LastRun: time.Time{},
+ NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration),
+ },
+ )
+
+ continue
+ }
+
+ delete(mirrorsByDest, m.To.String())
+
+ lastRun := oldM.LastRun
+ if lastRun.IsZero() {
+ lastRun = time.Now()
+ }
+
+ untilNextRun := Duration{oldM.NextRun.Sub(lastRun)}
+ untilNextRun = cfg.MinInterval.Max(&untilNextRun)
+ untilNextRun = cfg.MaxInterval.Min(&untilNextRun)
+
+ // Records match up.
+ record := mirrorRecord{
+ Mirror: m,
+ LastRun: oldM.LastRun,
+ NextRun: time.Now().Add(untilNextRun.Duration),
+ }
+ mirrors = append(
+ mirrors,
+ &record,
+ )
+ }
+
+ s.mirrors = mirrors
}
func (s *Service) Log(str string) {
@@ -169,13 +230,14 @@ mainLoop:
break mainLoop
}
- s.scheduled(func(m *Mirror) bool {
- err := s.Mirror(m)
+ s.scheduled(func(m *mirrorRecord) bool {
+ err := s.Mirror(m.Mirror)
if err != nil {
s.Log(err.Error())
}
- s.scheduleNextRun(m)
+ m.LastRun = time.Now()
+ m.NextRun = time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration)
return true
})
diff --git a/internal/service/time.go b/internal/service/time.go
new file mode 100644
index 0000000..b275b82
--- /dev/null
+++ b/internal/service/time.go
@@ -0,0 +1,59 @@
+package service
+
+import (
+ "math/rand"
+ "time"
+)
+
+type Duration struct {
+ time.Duration
+}
+
+func DurationRef(v time.Duration) *Duration {
+ return &Duration{v}
+}
+
+func (s Duration) MarshalText() ([]byte, error) {
+ return []byte(s.Duration.String()), nil
+}
+
+func (s *Duration) Max(args ...*Duration) Duration {
+ m := s
+
+ for _, o := range args {
+ if m.Duration < o.Duration {
+ v := o
+ m = v
+ }
+ }
+
+ return *m
+}
+
+func (s *Duration) Min(args ...*Duration) Duration {
+ m := s
+
+ for _, o := range args {
+ if m.Duration > o.Duration {
+ v := o
+ m = v
+ }
+ }
+
+ return *m
+}
+
+func (s *Duration) UnmarshalText(text []byte) error {
+ v, err := time.ParseDuration(string(text))
+ if err != nil {
+ return err
+ }
+ *s = Duration{v}
+
+ return nil
+}
+
+func RandomDuration(from Duration, until Duration) Duration {
+ period := until.Duration - from.Duration
+ return Duration{from.Duration + time.Duration(rand.Intn(int(period)))}
+}