diff options
Diffstat (limited to 'sync.go')
-rw-r--r-- | sync.go | 127 |
1 files changed, 127 insertions, 0 deletions
@@ -0,0 +1,127 @@ +package sendoverhttp + +import ( + "sync" +) + +type Waitable interface { + Wait() + Stop() +} + +type Waiter struct { + stopCh chan struct{} +} + +func NewWaiter() *Waiter { + return &Waiter{ + stopCh: make(chan struct{}), + } +} + +func (s *Waiter) Wait() { + <-s.stopCh +} + +func (s *Waiter) Stop() { + select { + case s.stopCh <- struct{}{}: + default: + } +} + +type Runnable interface { + // Start the run, returning any error which occurs from start to + // finish. + Start() error + // Stop the run and wait until it has ended. It can be called multiple + // times, ending immediately. + Stop() +} + +type NoOpRunner struct { + waiter *Waiter +} + +// Start the run, returning any error which occurs from start to +// finish. +func (s *NoOpRunner) Start() error { + s.waiter.Wait() + return nil +} + +// Stop the run and wait until it has ended. +func (s *NoOpRunner) Stop() { + s.waiter.Stop() + return +} + +type Runner struct { + OnStart func() error + OnStop func() +} + +func NewRunner() *Runner { + return &Runner{} +} + +func (s *Runner) Start() error { + return s.OnStart() +} + +func (s *Runner) Stop() { + s.OnStop() +} + +type MultiRunner struct { + runs []Runnable + runsMut sync.Mutex + err error + errMut sync.Mutex +} + +func NewMultiRunner() *MultiRunner { + return &MultiRunner{} +} + +// Error is the first error returned by any run. +func (s *MultiRunner) Error() error { + s.errMut.Lock() + defer s.errMut.Unlock() + return s.err +} + +func (s *MultiRunner) addRun(r Runnable) { + s.runsMut.Lock() + defer s.runsMut.Unlock() + s.runs = append(s.runs, r) +} + +// Run and wait for completion. If it ends with an error, stop all +// other runners. +func (s *MultiRunner) Run(r Runnable) { + s.addRun(r) + + err := r.Start() + + s.errMut.Lock() + defer s.errMut.Unlock() + if err != nil && s.err != nil { + s.err = err + //s.Stop() + } +} + +// Stop and wait for all runners to complete. +func (s *MultiRunner) Stop() { + // TODO: Allow multiple calls to stop + var wg sync.WaitGroup + for _, r := range s.runs { + wg.Add(1) + go func(r Runnable) { + defer wg.Done() + r.Stop() + }(r) + } + wg.Wait() +} |