package service import ( "errors" "fmt" "os" "strconv" "strings" "sync" "time" "git.server.ky/slackcoder/mirror/internal" "git.server.ky/slackcoder/mirror/internal/github" ) // These commands are required by the service to operate. var requiredCommands = []string{ "git", "rsync", } // Service data associated with each mirror. type mirrorRecord struct { *Mirror LastRun time.Time NextRun time.Time } func (s *mirrorRecord) String() string { return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen)) } type Service struct { cfg *Config mirrorsLock sync.Mutex mirrors []*mirrorRecord stopCh chan struct{} } func NewService(arg *Config) (*Service, error) { // Apply defaults var cfg Config cfg.Append(&DefaultConfig) cfg.Append(arg) for _, cmd := range requiredCommands { if err := internal.RequireCommand(cmd); err != nil { return nil, err } } srv := &Service{ cfg: &cfg, mirrors: nil, stopCh: make(chan struct{}), } for _, m := range cfg.Mirrors { err := srv.AddMirror(m) if err != nil { return nil, err } } return srv, nil } func (s *Service) AddMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() for _, m := range s.mirrors { if m.Equal(arg) { return errors.New("must be unique") } } record := &mirrorRecord{ Mirror: arg, NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), } s.mirrors = append(s.mirrors, record) return nil } func (s *Service) scheduled(yield func(*mirrorRecord) bool) { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() now := time.Now() for _, m := range s.mirrors { if m.NextRun.After(now) { continue } if !yield(m) { break } } } func (s *Service) Mirror(arg *Mirror) error { if arg.From.Path == "" || arg.To.Path == "" || arg.Method == "" { return fmt.Errorf("badly formatted mirror '%s'", arg.String()) } var err error switch arg.Method { case "git": err = MirrorGit(arg.To, arg.From, arg.Description) case "github-assets": client := github.NewClient() err = client.MirrorAssets(arg.To, arg.From) case "rsync": err = Rsync(arg.To, arg.From) default: err = fmt.Errorf("unknown method '%s'", arg.Method) } if err != nil { return fmt.Errorf("could not clone from '%s': %w", arg.From, err) } return nil } func (s *Service) RemoveMirror(arg *Mirror) error { defer s.mirrorsLock.Unlock() s.mirrorsLock.Lock() index := -1 for i, m := range s.mirrors { if m.Equal(arg) { index = i break } } if index == -1 { return errors.New("not found") } s.mirrors[index] = s.mirrors[len(s.mirrors)-1] s.mirrors = s.mirrors[:len(s.mirrors)-1] return nil } // Reload reloads the service with the given configuration. func (s *Service) Reload(arg *Config) { s.mirrorsLock.Lock() defer s.mirrorsLock.Unlock() var cfg Config cfg.Append(&DefaultConfig) cfg.Append(arg) s.cfg = &cfg mirrorsByDest := make(map[string]*mirrorRecord) for _, m := range s.mirrors { mirrorsByDest[m.To.String()] = m } mirrors := make([]*mirrorRecord, 0) for _, m := range cfg.Mirrors { oldM, ok := mirrorsByDest[m.To.String()] if !ok { mirrors = append( mirrors, &mirrorRecord{ Mirror: m, LastRun: time.Time{}, NextRun: time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration), }, ) continue } delete(mirrorsByDest, m.To.String()) lastRun := oldM.LastRun if lastRun.IsZero() { lastRun = time.Now() } untilNextRun := Duration{oldM.NextRun.Sub(lastRun)} untilNextRun = cfg.MinInterval.Max(&untilNextRun) untilNextRun = cfg.MaxInterval.Min(&untilNextRun) // Records match up. record := mirrorRecord{ Mirror: m, LastRun: oldM.LastRun, NextRun: time.Now().Add(untilNextRun.Duration), } mirrors = append( mirrors, &record, ) } s.mirrors = mirrors } func (s *Service) Log(str string) { escaped := []rune(strconv.Quote(strings.TrimSpace(str))) escaped = escaped[1 : len(escaped)-1] fmt.Fprintf(os.Stderr, "%s: %s\n", time.Now().Format(time.RFC822Z), string(escaped)) } func (s *Service) Run() error { wakeup := time.NewTicker(time.Second) mainLoop: for { select { case <-wakeup.C: case <-s.stopCh: break mainLoop } s.scheduled(func(m *mirrorRecord) bool { err := s.Mirror(m.Mirror) if err != nil { s.Log(err.Error()) } m.LastRun = time.Now() m.NextRun = time.Now().Add(RandomDuration(*s.cfg.MinInterval, *s.cfg.MaxInterval).Duration) return true }) } return nil } func (s *Service) Stop() { select { case <-s.stopCh: default: close(s.stopCh) } }