aboutsummaryrefslogtreecommitdiff
path: root/sync.go
blob: b799554f00245b684a57a77b64530851a70d490c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package sendoverhttp

import (
	"sync"
)

type Waitable interface {
	Wait()
	Stop()
}

type Waiter struct {
	stopCh chan struct{}
}

func NewWaiter() *Waiter {
	return &Waiter{
		stopCh: make(chan struct{}),
	}
}

func (s *Waiter) Wait() {
	<-s.stopCh
}

func (s *Waiter) Stop() {
	select {
	case s.stopCh <- struct{}{}:
	default:
	}
}

type Runnable interface {
	// Start the run, returning any error which occurs from start to
	// finish.
	Start() error
	// Stop the run and wait until it has ended.  It can be called multiple
	// times, ending immediately.
	Stop()
}

type NoOpRunner struct {
	waiter *Waiter
}

// Start the run, returning any error which occurs from start to
// finish.
func (s *NoOpRunner) Start() error {
	s.waiter.Wait()
	return nil
}

// Stop the run and wait until it has ended.
func (s *NoOpRunner) Stop() {
	s.waiter.Stop()
	return
}

type Runner struct {
	OnStart func() error
	OnStop  func()
}

func NewRunner() *Runner {
	return &Runner{}
}

func (s *Runner) Start() error {
	return s.OnStart()
}

func (s *Runner) Stop() {
	s.OnStop()
}

type MultiRunner struct {
	runs    []Runnable
	runsMut sync.Mutex
	err     error
	errMut  sync.Mutex
}

func NewMultiRunner() *MultiRunner {
	return &MultiRunner{}
}

// Error is the first error returned by any run.
func (s *MultiRunner) Error() error {
	s.errMut.Lock()
	defer s.errMut.Unlock()
	return s.err
}

func (s *MultiRunner) addRun(r Runnable) {
	s.runsMut.Lock()
	defer s.runsMut.Unlock()
	s.runs = append(s.runs, r)
}

// Run and wait for completion.  If it ends with an error, stop all
// other runners.
func (s *MultiRunner) Run(r Runnable) {
	s.addRun(r)

	err := r.Start()

	s.errMut.Lock()
	defer s.errMut.Unlock()
	if err != nil && s.err != nil {
		s.err = err
		//s.Stop()
	}
}

// Stop and wait for all runners to complete.
func (s *MultiRunner) Stop() {
	// TODO: Allow multiple calls to stop
	var wg sync.WaitGroup
	for _, r := range s.runs {
		wg.Add(1)
		go func(r Runnable) {
			defer wg.Done()
			r.Stop()
		}(r)
	}
	wg.Wait()
}