service.go (7316B)
1 package service 2 3 import ( 4 "bytes" 5 "errors" 6 "fmt" 7 "os" 8 "os/exec" 9 "path" 10 "strconv" 11 "strings" 12 "sync" 13 "time" 14 15 "git.server.ky/slackcoder/mirror/internal" 16 "git.server.ky/slackcoder/mirror/internal/github" 17 ) 18 19 // These commands are required by the service to operate. 20 var requiredCommands = []string{ 21 "git", 22 "rsync", 23 } 24 25 // Service data associated with each mirror. 26 type mirrorRecord struct { 27 *Mirror 28 LastRun time.Time 29 NextRun time.Time 30 } 31 32 func (s *mirrorRecord) String() string { 33 return fmt.Sprintf("%s: %s -> %s at %s", s.Method, s.From, s.To, s.NextRun.Format(time.Kitchen)) 34 } 35 36 type Service struct { 37 cfg *Config 38 39 mirrorsLock sync.Mutex 40 mirrors []*mirrorRecord 41 42 stopCh chan struct{} 43 } 44 45 func NewService(arg *Config) (*Service, error) { 46 // Apply defaults 47 var cfg Config 48 49 cfg.Append(&DefaultConfig) 50 cfg.Append(arg) 51 52 for _, cmd := range requiredCommands { 53 if err := internal.RequireCommand(cmd); err != nil { 54 return nil, err 55 } 56 } 57 58 srv := &Service{ 59 cfg: &cfg, 60 mirrors: nil, 61 stopCh: make(chan struct{}), 62 } 63 64 for _, m := range cfg.Mirrors { 65 err := srv.AddMirror(m) 66 if err != nil { 67 return nil, err 68 } 69 } 70 71 return srv, nil 72 } 73 74 func (s *Service) AddMirror(arg *Mirror) error { 75 defer s.mirrorsLock.Unlock() 76 s.mirrorsLock.Lock() 77 78 for _, m := range s.mirrors { 79 if m.Equal(arg) { 80 return errors.New("must be unique") 81 } 82 } 83 84 record := &mirrorRecord{Mirror: arg} 85 scheduleNextRun(s.cfg, record) 86 87 s.mirrors = append(s.mirrors, record) 88 89 return nil 90 } 91 92 func (s *Service) scheduled(yield func(*mirrorRecord) bool) { 93 defer s.mirrorsLock.Unlock() 94 s.mirrorsLock.Lock() 95 96 now := time.Now() 97 for _, m := range s.mirrors { 98 if m.NextRun.IsZero() || m.NextRun.After(now) { 99 continue 100 } 101 102 if !yield(m) { 103 break 104 } 105 } 106 } 107 108 func runBash(runDir string, commands string) error { 109 cmd := exec.Command("bash", "-c", commands) 110 cmd.Dir = runDir 111 112 var stdErr bytes.Buffer 113 cmd.Stderr = &stdErr 114 115 err := cmd.Run() 116 if err != nil { 117 return errors.New(stdErr.String()) 118 } 119 120 return nil 121 } 122 123 func (s *Service) Mirror(arg *Mirror) error { 124 if arg.From.Path == "" || arg.To.Path == "" || arg.Method == "" { 125 return fmt.Errorf("badly formatted mirror '%s'", arg.String()) 126 } 127 128 if arg.To.Scheme != "" { 129 return fmt.Errorf("unsupported destination URL scheme '%s'", arg.To.Scheme) 130 } 131 132 cfgStagingMethod := s.cfg.StagingMethod 133 if arg.StagingMethod != "" { 134 cfgStagingMethod = arg.StagingMethod 135 } 136 137 cfgStagingPath := s.cfg.StagingPath 138 if arg.StagingPath != "" { 139 cfgStagingPath = arg.StagingPath 140 } 141 142 var downloadPath string 143 var err error 144 145 switch cfgStagingMethod { 146 case StagingMethodNone: 147 downloadPath = arg.To.Path 148 case StagingMethodTemporary: 149 tempPath, err := os.MkdirTemp("", "mirror.") 150 if err != nil { 151 return err 152 } 153 defer os.RemoveAll(tempPath) 154 155 downloadPath = tempPath 156 case StagingMethodPersistent: 157 downloadPath = path.Join(cfgStagingPath, arg.To.URL.Path) 158 default: 159 return fmt.Errorf("unknown staging method '%s'", cfgStagingMethod) 160 } 161 162 err = os.MkdirAll(downloadPath, 0755) 163 if err != nil { 164 return fmt.Errorf("creating directory '%s' for downloaded contents: %w", downloadPath, err) 165 } 166 err = os.MkdirAll(arg.To.URL.Path, 0755) 167 if err != nil { 168 return fmt.Errorf("creating directory '%s' for downloaded contents: %w", downloadPath, err) 169 } 170 171 err = Rsync(internal.MustURL(downloadPath), arg.To) 172 if err != nil { 173 return fmt.Errorf("copying current mirror data for staging to '%s': %w", downloadPath, err) 174 } 175 176 switch arg.Method { 177 case "git": 178 err = MirrorGit(internal.MustURL(downloadPath), arg.From, arg.Description) 179 case "github-assets": 180 client := github.NewClient() 181 err = client.MirrorAssets(internal.MustURL(downloadPath), arg.From) 182 case "rsync": 183 err = Rsync(internal.MustURL(downloadPath), arg.From) 184 default: 185 err = fmt.Errorf("unknown method '%s'", arg.Method) 186 } 187 188 if err != nil { 189 return fmt.Errorf("could not clone from '%s': %w", arg.From, err) 190 } 191 192 if arg.Verify != "" { 193 err = runBash(downloadPath, arg.Verify) 194 if err != nil { 195 return fmt.Errorf("verification failed: %w", err) 196 } 197 } 198 199 if downloadPath != arg.To.String() { 200 err = Rsync(arg.To, internal.MustURL(downloadPath)) 201 if err != nil { 202 return fmt.Errorf("committing staged mirror data: %w", err) 203 } 204 } 205 206 return nil 207 } 208 209 func (s *Service) RemoveMirror(arg *Mirror) error { 210 defer s.mirrorsLock.Unlock() 211 s.mirrorsLock.Lock() 212 213 index := -1 214 215 for i, m := range s.mirrors { 216 if m.Equal(arg) { 217 index = i 218 break 219 } 220 } 221 222 if index == -1 { 223 return errors.New("not found") 224 } 225 226 s.mirrors[index] = s.mirrors[len(s.mirrors)-1] 227 s.mirrors = s.mirrors[:len(s.mirrors)-1] 228 229 return nil 230 } 231 232 func scheduleNextRun(cfg *Config, m *mirrorRecord) { 233 if !m.NextRun.IsZero() { 234 return 235 } 236 237 // Default to it not existing, leaving error handling for others. 238 toPathExists := false 239 _, err := os.Stat(m.To.Path) 240 if err == nil { 241 toPathExists = true 242 } 243 244 minInterval := *cfg.MinInterval 245 if m.MinInterval != nil { 246 minInterval = *m.MinInterval 247 } 248 maxInterval := *cfg.MaxInterval 249 if m.MaxInterval != nil { 250 maxInterval = *m.MaxInterval 251 } 252 253 if minInterval.Duration > 0 || maxInterval.Duration > 0 || !toPathExists { 254 lastRun := m.LastRun 255 if lastRun.IsZero() { 256 lastRun = time.Now() 257 } 258 259 m.NextRun = lastRun.Add(RandomDuration(minInterval, maxInterval).Duration) 260 } 261 } 262 263 // Reload reloads the service with the given configuration. 264 func (s *Service) Reload(arg *Config) { 265 s.mirrorsLock.Lock() 266 defer s.mirrorsLock.Unlock() 267 268 var cfg Config 269 270 cfg.Append(&DefaultConfig) 271 cfg.Append(arg) 272 273 s.cfg = &cfg 274 275 mirrorsByDest := make(map[string]*mirrorRecord) 276 for _, m := range s.mirrors { 277 mirrorsByDest[m.To.String()] = m 278 } 279 280 mirrors := make([]*mirrorRecord, 0) 281 282 for _, m := range cfg.Mirrors { 283 record := &mirrorRecord{Mirror: m} 284 285 oldM, ok := mirrorsByDest[m.To.String()] 286 if ok { 287 delete(mirrorsByDest, m.To.String()) 288 289 record.LastRun = oldM.LastRun 290 record.NextRun = oldM.NextRun 291 292 // The run times may be out of sync with the new 293 // configuration. 294 lastRun := oldM.LastRun 295 if lastRun.IsZero() { 296 lastRun = time.Now() 297 } 298 299 minInterval := *cfg.MinInterval 300 if m.MinInterval != nil { 301 minInterval = *m.MinInterval 302 } 303 maxInterval := *cfg.MaxInterval 304 if m.MaxInterval != nil { 305 maxInterval = *m.MaxInterval 306 } 307 308 waitTime := oldM.NextRun.Sub(lastRun) 309 if waitTime < minInterval.Duration || waitTime > maxInterval.Duration { 310 record.NextRun = time.Time{} 311 } 312 313 } 314 315 scheduleNextRun(s.cfg, record) 316 317 mirrors = append( 318 mirrors, 319 record, 320 ) 321 } 322 323 // Swap out the old with the new. 324 s.mirrors = mirrors 325 } 326 327 func (s *Service) Log(str string) { 328 escaped := []rune(strconv.Quote(strings.TrimSpace(str))) 329 escaped = escaped[1 : len(escaped)-1] 330 331 fmt.Fprintf(os.Stderr, "%s: %s\n", time.Now().Format(time.RFC822Z), string(escaped)) 332 } 333 334 func (s *Service) Run() error { 335 wakeup := time.NewTicker(time.Second) 336 337 mainLoop: 338 for { 339 select { 340 case <-wakeup.C: 341 case <-s.stopCh: 342 break mainLoop 343 } 344 345 s.scheduled(func(m *mirrorRecord) bool { 346 err := s.Mirror(m.Mirror) 347 if err != nil { 348 s.Log(err.Error()) 349 } 350 351 m.LastRun = time.Now() 352 m.NextRun = time.Time{} 353 354 scheduleNextRun(s.cfg, m) 355 356 return true 357 }) 358 } 359 360 return nil 361 } 362 363 func (s *Service) Stop() { 364 select { 365 case <-s.stopCh: 366 default: 367 close(s.stopCh) 368 } 369 }