1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package node
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
)
type RunningState int
const (
StateUnset RunningState = iota
StateStarted
StateStopped
)
type persistedState struct {
SequencerStarted *bool `json:"sequencerStarted,omitempty"`
}
type ConfigPersistence interface {
SequencerStarted() error
SequencerStopped() error
SequencerState() (RunningState, error)
}
var _ ConfigPersistence = (*ActiveConfigPersistence)(nil)
var _ ConfigPersistence = DisabledConfigPersistence{}
type ActiveConfigPersistence struct {
lock sync.Mutex
file string
}
func NewConfigPersistence(file string) *ActiveConfigPersistence {
return &ActiveConfigPersistence{file: file}
}
func (p *ActiveConfigPersistence) SequencerStarted() error {
return p.persist(true)
}
func (p *ActiveConfigPersistence) SequencerStopped() error {
return p.persist(false)
}
// persist writes the new config state to the file as safely as possible.
// It uses sync to ensure the data is actually persisted to disk and initially writes to a temp file
// before renaming it into place. On UNIX systems this rename is typically atomic, ensuring the
// actual file isn't corrupted if IO errors occur during writing.
func (p *ActiveConfigPersistence) persist(sequencerStarted bool) error {
p.lock.Lock()
defer p.lock.Unlock()
data, err := json.Marshal(persistedState{SequencerStarted: &sequencerStarted})
if err != nil {
return fmt.Errorf("marshall new config: %w", err)
}
dir := filepath.Dir(p.file)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("create config dir (%v): %w", p.file, err)
}
// Write the new content to a temp file first, then rename into place
// Avoids corrupting the content if the disk is full or there are IO errors
tmpFile := p.file + ".tmp"
file, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("open file (%v) for writing: %w", tmpFile, err)
}
defer file.Close() // Ensure file is closed even if write or sync fails
if _, err = file.Write(data); err != nil {
return fmt.Errorf("write new config to temp file (%v): %w", tmpFile, err)
}
if err := file.Sync(); err != nil {
return fmt.Errorf("sync new config temp file (%v): %w", tmpFile, err)
}
if err := file.Close(); err != nil {
return fmt.Errorf("close new config temp file (%v): %w", tmpFile, err)
}
// Rename to replace the previous file
if err := os.Rename(tmpFile, p.file); err != nil {
return fmt.Errorf("rename temp config file to final destination: %w", err)
}
return nil
}
func (p *ActiveConfigPersistence) SequencerState() (RunningState, error) {
config, err := p.read()
if err != nil {
return StateUnset, err
}
if config.SequencerStarted == nil {
return StateUnset, nil
} else if *config.SequencerStarted {
return StateStarted, nil
} else {
return StateStopped, nil
}
}
func (p *ActiveConfigPersistence) read() (persistedState, error) {
p.lock.Lock()
defer p.lock.Unlock()
data, err := os.ReadFile(p.file)
if errors.Is(err, os.ErrNotExist) {
// persistedState.SequencerStarted == nil: SequencerState() will return StateUnset if no state is found
return persistedState{}, nil
} else if err != nil {
return persistedState{}, fmt.Errorf("read config file (%v): %w", p.file, err)
}
var config persistedState
dec := json.NewDecoder(bytes.NewReader(data))
dec.DisallowUnknownFields()
if err = dec.Decode(&config); err != nil {
return persistedState{}, fmt.Errorf("invalid config file (%v): %w", p.file, err)
}
if config.SequencerStarted == nil {
return persistedState{}, fmt.Errorf("missing sequencerStarted value in config file (%v)", p.file)
}
return config, nil
}
// DisabledConfigPersistence provides an implementation of config persistence
// that does not persist anything and reports unset for all values
type DisabledConfigPersistence struct {
}
func (d DisabledConfigPersistence) SequencerState() (RunningState, error) {
return StateUnset, nil
}
func (d DisabledConfigPersistence) SequencerStarted() error {
return nil
}
func (d DisabledConfigPersistence) SequencerStopped() error {
return nil
}