mirror

Mirror free and open-source projects you like with minimal effort
git clone git://git.server.ky/slackcoder/mirror
Log | Files | Refs | README

service.go (7316B)


      1 package service
      2 
      3 import (
      4 	"bytes"
      5 	"errors"
      6 	"fmt"
      7 	"os"
      8 	"os/exec"
      9 	"path"
     10 	"strconv"
     11 	"strings"
     12 	"sync"
     13 	"time"
     14 
     15 	"git.server.ky/slackcoder/mirror/internal"
     16 	"git.server.ky/slackcoder/mirror/internal/github"
     17 )
     18 
     19 // These commands are required by the service to operate.
     20 var requiredCommands = []string{
     21 	"git",
     22 	"rsync",
     23 }
     24 
     25 // Service data associated with each mirror.
     26 type mirrorRecord struct {
     27 	*Mirror
     28 	LastRun time.Time
     29 	NextRun time.Time
     30 }
     31 
     32 func (s *mirrorRecord) String() string {
     33 	return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen))
     34 }
     35 
     36 type Service struct {
     37 	cfg *Config
     38 
     39 	mirrorsLock sync.Mutex
     40 	mirrors     []*mirrorRecord
     41 
     42 	stopCh chan struct{}
     43 }
     44 
     45 func NewService(arg *Config) (*Service, error) {
     46 	// Apply  defaults
     47 	var cfg Config
     48 
     49 	cfg.Append(&DefaultConfig)
     50 	cfg.Append(arg)
     51 
     52 	for _, cmd := range requiredCommands {
     53 		if err := internal.RequireCommand(cmd); err != nil {
     54 			return nil, err
     55 		}
     56 	}
     57 
     58 	srv := &Service{
     59 		cfg:     &cfg,
     60 		mirrors: nil,
     61 		stopCh:  make(chan struct{}),
     62 	}
     63 
     64 	for _, m := range cfg.Mirrors {
     65 		err := srv.AddMirror(m)
     66 		if err != nil {
     67 			return nil, err
     68 		}
     69 	}
     70 
     71 	return srv, nil
     72 }
     73 
     74 func (s *Service) AddMirror(arg *Mirror) error {
     75 	defer s.mirrorsLock.Unlock()
     76 	s.mirrorsLock.Lock()
     77 
     78 	for _, m := range s.mirrors {
     79 		if m.Equal(arg) {
     80 			return errors.New("must be unique")
     81 		}
     82 	}
     83 
     84 	record := &mirrorRecord{Mirror: arg}
     85 	scheduleNextRun(s.cfg, record)
     86 
     87 	s.mirrors = append(s.mirrors, record)
     88 
     89 	return nil
     90 }
     91 
     92 func (s *Service) scheduled(yield func(*mirrorRecord) bool) {
     93 	defer s.mirrorsLock.Unlock()
     94 	s.mirrorsLock.Lock()
     95 
     96 	now := time.Now()
     97 	for _, m := range s.mirrors {
     98 		if m.NextRun.IsZero() || m.NextRun.After(now) {
     99 			continue
    100 		}
    101 
    102 		if !yield(m) {
    103 			break
    104 		}
    105 	}
    106 }
    107 
    108 func runBash(runDir string, commands string) error {
    109 	cmd := exec.Command("bash", "-c", commands)
    110 	cmd.Dir = runDir
    111 
    112 	var stdErr bytes.Buffer
    113 	cmd.Stderr = &stdErr
    114 
    115 	err := cmd.Run()
    116 	if err != nil {
    117 		return errors.New(stdErr.String())
    118 	}
    119 
    120 	return nil
    121 }
    122 
    123 func (s *Service) Mirror(arg *Mirror) error {
    124 	if arg.From.Path == "" || arg.To.Path == "" || arg.Method == "" {
    125 		return fmt.Errorf("badly formatted mirror '%s'", arg.String())
    126 	}
    127 
    128 	if arg.To.Scheme != "" {
    129 		return fmt.Errorf("unsupported destination URL scheme '%s'", arg.To.Scheme)
    130 	}
    131 
    132 	cfgStagingMethod := s.cfg.StagingMethod
    133 	if arg.StagingMethod != "" {
    134 		cfgStagingMethod = arg.StagingMethod
    135 	}
    136 
    137 	cfgStagingPath := s.cfg.StagingPath
    138 	if arg.StagingPath != "" {
    139 		cfgStagingPath = arg.StagingPath
    140 	}
    141 
    142 	var downloadPath string
    143 	var err error
    144 
    145 	switch cfgStagingMethod {
    146 	case StagingMethodNone:
    147 		downloadPath = arg.To.Path
    148 	case StagingMethodTemporary:
    149 		tempPath, err := os.MkdirTemp("", "mirror.")
    150 		if err != nil {
    151 			return err
    152 		}
    153 		defer os.RemoveAll(tempPath)
    154 
    155 		downloadPath = tempPath
    156 	case StagingMethodPersistent:
    157 		downloadPath = path.Join(cfgStagingPath, arg.To.URL.Path)
    158 	default:
    159 		return fmt.Errorf("unknown staging method '%s'", cfgStagingMethod)
    160 	}
    161 
    162 	err = os.MkdirAll(downloadPath, 0755)
    163 	if err != nil {
    164 		return fmt.Errorf("creating directory '%s' for downloaded contents: %w", downloadPath, err)
    165 	}
    166 	err = os.MkdirAll(arg.To.URL.Path, 0755)
    167 	if err != nil {
    168 		return fmt.Errorf("creating directory '%s' for downloaded contents: %w", downloadPath, err)
    169 	}
    170 
    171 	err = Rsync(internal.MustURL(downloadPath), arg.To)
    172 	if err != nil {
    173 		return fmt.Errorf("copying current mirror data for staging to '%s': %w", downloadPath, err)
    174 	}
    175 
    176 	switch arg.Method {
    177 	case "git":
    178 		err = MirrorGit(internal.MustURL(downloadPath), arg.From, arg.Description)
    179 	case "github-assets":
    180 		client := github.NewClient()
    181 		err = client.MirrorAssets(internal.MustURL(downloadPath), arg.From)
    182 	case "rsync":
    183 		err = Rsync(internal.MustURL(downloadPath), arg.From)
    184 	default:
    185 		err = fmt.Errorf("unknown method '%s'", arg.Method)
    186 	}
    187 
    188 	if err != nil {
    189 		return fmt.Errorf("could not clone from '%s': %w", arg.From, err)
    190 	}
    191 
    192 	if arg.Verify != "" {
    193 		err = runBash(downloadPath, arg.Verify)
    194 		if err != nil {
    195 			return fmt.Errorf("verification failed: %w", err)
    196 		}
    197 	}
    198 
    199 	if downloadPath != arg.To.String() {
    200 		err = Rsync(arg.To, internal.MustURL(downloadPath))
    201 		if err != nil {
    202 			return fmt.Errorf("committing staged mirror data: %w", err)
    203 		}
    204 	}
    205 
    206 	return nil
    207 }
    208 
    209 func (s *Service) RemoveMirror(arg *Mirror) error {
    210 	defer s.mirrorsLock.Unlock()
    211 	s.mirrorsLock.Lock()
    212 
    213 	index := -1
    214 
    215 	for i, m := range s.mirrors {
    216 		if m.Equal(arg) {
    217 			index = i
    218 			break
    219 		}
    220 	}
    221 
    222 	if index == -1 {
    223 		return errors.New("not found")
    224 	}
    225 
    226 	s.mirrors[index] = s.mirrors[len(s.mirrors)-1]
    227 	s.mirrors = s.mirrors[:len(s.mirrors)-1]
    228 
    229 	return nil
    230 }
    231 
    232 func scheduleNextRun(cfg *Config, m *mirrorRecord) {
    233 	if !m.NextRun.IsZero() {
    234 		return
    235 	}
    236 
    237 	// Default to it not existing, leaving error handling for others.
    238 	toPathExists := false
    239 	_, err := os.Stat(m.To.Path)
    240 	if err == nil {
    241 		toPathExists = true
    242 	}
    243 
    244 	minInterval := *cfg.MinInterval
    245 	if m.MinInterval != nil {
    246 		minInterval = *m.MinInterval
    247 	}
    248 	maxInterval := *cfg.MaxInterval
    249 	if m.MaxInterval != nil {
    250 		maxInterval = *m.MaxInterval
    251 	}
    252 
    253 	if minInterval.Duration > 0 || maxInterval.Duration > 0 || !toPathExists {
    254 		lastRun := m.LastRun
    255 		if lastRun.IsZero() {
    256 			lastRun = time.Now()
    257 		}
    258 
    259 		m.NextRun = lastRun.Add(RandomDuration(minInterval, maxInterval).Duration)
    260 	}
    261 }
    262 
    263 // Reload reloads the service with the given configuration.
    264 func (s *Service) Reload(arg *Config) {
    265 	s.mirrorsLock.Lock()
    266 	defer s.mirrorsLock.Unlock()
    267 
    268 	var cfg Config
    269 
    270 	cfg.Append(&DefaultConfig)
    271 	cfg.Append(arg)
    272 
    273 	s.cfg = &cfg
    274 
    275 	mirrorsByDest := make(map[string]*mirrorRecord)
    276 	for _, m := range s.mirrors {
    277 		mirrorsByDest[m.To.String()] = m
    278 	}
    279 
    280 	mirrors := make([]*mirrorRecord, 0)
    281 
    282 	for _, m := range cfg.Mirrors {
    283 		record := &mirrorRecord{Mirror: m}
    284 
    285 		oldM, ok := mirrorsByDest[m.To.String()]
    286 		if ok {
    287 			delete(mirrorsByDest, m.To.String())
    288 
    289 			record.LastRun = oldM.LastRun
    290 			record.NextRun = oldM.NextRun
    291 
    292 			// The run times may be out of sync with the new
    293 			// configuration.
    294 			lastRun := oldM.LastRun
    295 			if lastRun.IsZero() {
    296 				lastRun = time.Now()
    297 			}
    298 
    299 			minInterval := *cfg.MinInterval
    300 			if m.MinInterval != nil {
    301 				minInterval = *m.MinInterval
    302 			}
    303 			maxInterval := *cfg.MaxInterval
    304 			if m.MaxInterval != nil {
    305 				maxInterval = *m.MaxInterval
    306 			}
    307 
    308 			waitTime := oldM.NextRun.Sub(lastRun)
    309 			if waitTime < minInterval.Duration || waitTime > maxInterval.Duration {
    310 				record.NextRun = time.Time{}
    311 			}
    312 
    313 		}
    314 
    315 		scheduleNextRun(s.cfg, record)
    316 
    317 		mirrors = append(
    318 			mirrors,
    319 			record,
    320 		)
    321 	}
    322 
    323 	// Swap out the old with the new.
    324 	s.mirrors = mirrors
    325 }
    326 
    327 func (s *Service) Log(str string) {
    328 	escaped := []rune(strconv.Quote(strings.TrimSpace(str)))
    329 	escaped = escaped[1 : len(escaped)-1]
    330 
    331 	fmt.Fprintf(os.Stderr, "%s: %s\n", time.Now().Format(time.RFC822Z), string(escaped))
    332 }
    333 
    334 func (s *Service) Run() error {
    335 	wakeup := time.NewTicker(time.Second)
    336 
    337 mainLoop:
    338 	for {
    339 		select {
    340 		case <-wakeup.C:
    341 		case <-s.stopCh:
    342 			break mainLoop
    343 		}
    344 
    345 		s.scheduled(func(m *mirrorRecord) bool {
    346 			err := s.Mirror(m.Mirror)
    347 			if err != nil {
    348 				s.Log(err.Error())
    349 			}
    350 
    351 			m.LastRun = time.Now()
    352 			m.NextRun = time.Time{}
    353 
    354 			scheduleNextRun(s.cfg, m)
    355 
    356 			return true
    357 		})
    358 	}
    359 
    360 	return nil
    361 }
    362 
    363 func (s *Service) Stop() {
    364 	select {
    365 	case <-s.stopCh:
    366 	default:
    367 		close(s.stopCh)
    368 	}
    369 }