blob: b799554f00245b684a57a77b64530851a70d490c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
package sendoverhttp
import (
"sync"
)
type Waitable interface {
Wait()
Stop()
}
type Waiter struct {
stopCh chan struct{}
}
func NewWaiter() *Waiter {
return &Waiter{
stopCh: make(chan struct{}),
}
}
func (s *Waiter) Wait() {
<-s.stopCh
}
func (s *Waiter) Stop() {
select {
case s.stopCh <- struct{}{}:
default:
}
}
type Runnable interface {
// Start the run, returning any error which occurs from start to
// finish.
Start() error
// Stop the run and wait until it has ended. It can be called multiple
// times, ending immediately.
Stop()
}
type NoOpRunner struct {
waiter *Waiter
}
// Start the run, returning any error which occurs from start to
// finish.
func (s *NoOpRunner) Start() error {
s.waiter.Wait()
return nil
}
// Stop the run and wait until it has ended.
func (s *NoOpRunner) Stop() {
s.waiter.Stop()
return
}
type Runner struct {
OnStart func() error
OnStop func()
}
func NewRunner() *Runner {
return &Runner{}
}
func (s *Runner) Start() error {
return s.OnStart()
}
func (s *Runner) Stop() {
s.OnStop()
}
type MultiRunner struct {
runs []Runnable
runsMut sync.Mutex
err error
errMut sync.Mutex
}
func NewMultiRunner() *MultiRunner {
return &MultiRunner{}
}
// Error is the first error returned by any run.
func (s *MultiRunner) Error() error {
s.errMut.Lock()
defer s.errMut.Unlock()
return s.err
}
func (s *MultiRunner) addRun(r Runnable) {
s.runsMut.Lock()
defer s.runsMut.Unlock()
s.runs = append(s.runs, r)
}
// Run and wait for completion. If it ends with an error, stop all
// other runners.
func (s *MultiRunner) Run(r Runnable) {
s.addRun(r)
err := r.Start()
s.errMut.Lock()
defer s.errMut.Unlock()
if err != nil && s.err != nil {
s.err = err
//s.Stop()
}
}
// Stop and wait for all runners to complete.
func (s *MultiRunner) Stop() {
// TODO: Allow multiple calls to stop
var wg sync.WaitGroup
for _, r := range s.runs {
wg.Add(1)
go func(r Runnable) {
defer wg.Done()
r.Stop()
}(r)
}
wg.Wait()
}
|