aboutsummaryrefslogtreecommitdiffsponsor
path: root/internal/service/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/service/service.go')
-rw-r--r--internal/service/service.go136
1 files changed, 99 insertions, 37 deletions
diff --git a/internal/service/service.go b/internal/service/service.go
index 3038a03..c5235f2 100644
--- a/internal/service/service.go
+++ b/internal/service/service.go
@@ -3,7 +3,6 @@ package service
import (
"errors"
"fmt"
- "math/rand"
"os"
"strconv"
"strings"
@@ -20,19 +19,32 @@ var requiredCommands = []string{
"rsync",
}
+// Service data associated with each mirror.
+type mirrorRecord struct {
+ *Mirror
+ LastRun time.Time
+ NextRun time.Time
+}
+
+func (s *mirrorRecord) String() string {
+ return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen))
+}
+
type Service struct {
cfg *Config
mirrorsLock sync.Mutex
- mirrors []*Mirror
- schedule []time.Time
+ mirrors []*mirrorRecord
stopCh chan struct{}
}
-func NewService(cfg *Config) (*Service, error) {
+func NewService(arg *Config) (*Service, error) {
// Apply defaults
+ var cfg Config
+
cfg.Append(&DefaultConfig)
+ cfg.Append(arg)
for _, cmd := range requiredCommands {
if err := internal.RequireCommand(cmd); err != nil {
@@ -41,7 +53,7 @@ func NewService(cfg *Config) (*Service, error) {
}
srv := &Service{
- cfg: cfg,
+ cfg: &cfg,
mirrors: nil,
stopCh: make(chan struct{}),
}
@@ -56,18 +68,6 @@ func NewService(cfg *Config) (*Service, error) {
return srv, nil
}
-func (s *Service) scheduleNextRun(arg *Mirror) {
- // Be polite.
- period := s.cfg.MaxInterval.Duration - s.cfg.MinInterval.Duration
- at := time.Now().Add(s.cfg.MinInterval.Duration + time.Duration(rand.Intn(int(period))))
-
- for i, m := range s.mirrors {
- if m.Equal(arg) {
- s.schedule[i] = at
- }
- }
-}
-
func (s *Service) AddMirror(arg *Mirror) error {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
@@ -78,20 +78,22 @@ func (s *Service) AddMirror(arg *Mirror) error {
}
}
- s.schedule = append(s.schedule, time.Time{})
- s.mirrors = append(s.mirrors, arg)
- s.scheduleNextRun(arg)
+ record := &mirrorRecord{
+ Mirror: arg,
+ NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration),
+ }
+ s.mirrors = append(s.mirrors, record)
return nil
}
-func (s *Service) scheduled(yield func(*Mirror) bool) {
+func (s *Service) scheduled(yield func(*mirrorRecord) bool) {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
now := time.Now()
- for i, m := range s.mirrors {
- if s.schedule[i].After(now) {
+ for _, m := range s.mirrors {
+ if m.NextRun.After(now) {
continue
}
@@ -131,24 +133,83 @@ func (s *Service) RemoveMirror(arg *Mirror) error {
defer s.mirrorsLock.Unlock()
s.mirrorsLock.Lock()
+ index := -1
+
for i, m := range s.mirrors {
if m.Equal(arg) {
- s.mirrors[i] = s.mirrors[len(s.mirrors)-1]
- s.mirrors = s.mirrors[:len(s.mirrors)-1]
-
- s.schedule[i] = s.schedule[len(s.schedule)-1]
- s.schedule = s.schedule[:len(s.schedule)-1]
-
- return nil
+ index = i
+ break
}
}
- return errors.New("not found")
+ if index == -1 {
+ return errors.New("not found")
+ }
+
+ s.mirrors[index] = s.mirrors[len(s.mirrors)-1]
+ s.mirrors = s.mirrors[:len(s.mirrors)-1]
+
+ return nil
}
-// Take arguments
-func (s *Service) Reload(cfg *Config) error {
- return errors.New("not implemented")
+// Reload reloads the service with the given configuration.
+func (s *Service) Reload(arg *Config) {
+ s.mirrorsLock.Lock()
+ defer s.mirrorsLock.Unlock()
+
+ var cfg Config
+
+ cfg.Append(&DefaultConfig)
+ cfg.Append(arg)
+
+ s.cfg = &cfg
+
+ mirrorsByDest := make(map[string]*mirrorRecord)
+ for _, m := range s.mirrors {
+ mirrorsByDest[m.To.String()] = m
+ }
+
+ mirrors := make([]*mirrorRecord, 0)
+
+ for _, m := range cfg.Mirrors {
+ oldM, ok := mirrorsByDest[m.To.String()]
+ if !ok {
+ mirrors = append(
+ mirrors,
+ &mirrorRecord{
+ Mirror: m,
+ LastRun: time.Time{},
+ NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration),
+ },
+ )
+
+ continue
+ }
+
+ delete(mirrorsByDest, m.To.String())
+
+ lastRun := oldM.LastRun
+ if lastRun.IsZero() {
+ lastRun = time.Now()
+ }
+
+ untilNextRun := Duration{oldM.NextRun.Sub(lastRun)}
+ untilNextRun = cfg.MinInterval.Max(&untilNextRun)
+ untilNextRun = cfg.MaxInterval.Min(&untilNextRun)
+
+ // Records match up.
+ record := mirrorRecord{
+ Mirror: m,
+ LastRun: oldM.LastRun,
+ NextRun: time.Now().Add(untilNextRun.Duration),
+ }
+ mirrors = append(
+ mirrors,
+ &record,
+ )
+ }
+
+ s.mirrors = mirrors
}
func (s *Service) Log(str string) {
@@ -169,13 +230,14 @@ mainLoop:
break mainLoop
}
- s.scheduled(func(m *Mirror) bool {
- err := s.Mirror(m)
+ s.scheduled(func(m *mirrorRecord) bool {
+ err := s.Mirror(m.Mirror)
if err != nil {
s.Log(err.Error())
}
- s.scheduleNextRun(m)
+ m.LastRun = time.Now()
+ m.NextRun = time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration)
return true
})