sync.go (2025B)
1 package sendoverhttp 2 3 import ( 4 "sync" 5 ) 6 7 type Waitable interface { 8 Wait() 9 Stop() 10 } 11 12 type Waiter struct { 13 stopCh chan struct{} 14 } 15 16 func NewWaiter() *Waiter { 17 return &Waiter{ 18 stopCh: make(chan struct{}), 19 } 20 } 21 22 func (s *Waiter) Wait() { 23 <-s.stopCh 24 } 25 26 func (s *Waiter) Stop() { 27 select { 28 case s.stopCh <- struct{}{}: 29 default: 30 } 31 } 32 33 type Runnable interface { 34 // Start the run, returning any error which occurs from start to 35 // finish. 36 Start() error 37 // Stop the run and wait until it has ended. It can be called multiple 38 // times, ending immediately. 39 Stop() 40 } 41 42 type NoOpRunner struct { 43 waiter *Waiter 44 } 45 46 // Start the run, returning any error which occurs from start to 47 // finish. 48 func (s *NoOpRunner) Start() error { 49 s.waiter.Wait() 50 return nil 51 } 52 53 // Stop the run and wait until it has ended. 54 func (s *NoOpRunner) Stop() { 55 s.waiter.Stop() 56 return 57 } 58 59 type Runner struct { 60 OnStart func() error 61 OnStop func() 62 } 63 64 func NewRunner() *Runner { 65 return &Runner{} 66 } 67 68 func (s *Runner) Start() error { 69 return s.OnStart() 70 } 71 72 func (s *Runner) Stop() { 73 s.OnStop() 74 } 75 76 type MultiRunner struct { 77 runs []Runnable 78 runsMut sync.Mutex 79 err error 80 errMut sync.Mutex 81 } 82 83 func NewMultiRunner() *MultiRunner { 84 return &MultiRunner{} 85 } 86 87 // Error is the first error returned by any run. 88 func (s *MultiRunner) Error() error { 89 s.errMut.Lock() 90 defer s.errMut.Unlock() 91 return s.err 92 } 93 94 func (s *MultiRunner) addRun(r Runnable) { 95 s.runsMut.Lock() 96 defer s.runsMut.Unlock() 97 s.runs = append(s.runs, r) 98 } 99 100 // Run and wait for completion. If it ends with an error, stop all 101 // other runners. 102 func (s *MultiRunner) Run(r Runnable) { 103 s.addRun(r) 104 105 err := r.Start() 106 107 s.errMut.Lock() 108 defer s.errMut.Unlock() 109 if err != nil && s.err != nil { 110 s.err = err 111 //s.Stop() 112 } 113 } 114 115 // Stop and wait for all runners to complete. 116 func (s *MultiRunner) Stop() { 117 // TODO: Allow multiple calls to stop 118 var wg sync.WaitGroup 119 for _, r := range s.runs { 120 wg.Add(1) 121 go func(r Runnable) { 122 defer wg.Done() 123 r.Stop() 124 }(r) 125 } 126 wg.Wait() 127 }