Commit d252ea2b authored by Adrian Sutton's avatar Adrian Sutton

op-node: Add logic for reading and persisting sequencer state

parent 72c6fb0a
package node
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
)
type RunningState int
const (
Unset RunningState = iota
Started
Stopped
)
type persistedState struct {
SequencerStarted *bool `json:"sequencerStarted,omitempty"`
}
type ConfigPersistence interface {
SequencerStarted() error
SequencerStopped() error
SequencerState() (RunningState, error)
}
var _ ConfigPersistence = (*ActiveConfigPersistence)(nil)
var _ ConfigPersistence = DisabledConfigPersistence{}
type ActiveConfigPersistence struct {
lock sync.Mutex
file string
}
func NewConfigPersistence(file string) (*ActiveConfigPersistence, error) {
return &ActiveConfigPersistence{file: file}, nil
}
func (p *ActiveConfigPersistence) SequencerStarted() error {
return p.persist(true)
}
func (p *ActiveConfigPersistence) SequencerStopped() error {
return p.persist(false)
}
// persist writes the new config state to the file as safely as possible.
// It uses sync to ensure the data is actually persisted to disk and initially writes to a temp file
// before renaming it into place. On UNIX systems this rename is typically atomic, ensuring the
// actual file isn't corrupted if IO errors occur during writing.
func (p *ActiveConfigPersistence) persist(sequencerStarted bool) error {
p.lock.Lock()
defer p.lock.Unlock()
data, err := json.Marshal(persistedState{SequencerStarted: &sequencerStarted})
if err != nil {
return fmt.Errorf("marshall new config: %w", err)
}
dir := filepath.Dir(p.file)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("create config dir (%v): %w", p.file, err)
}
// Write the new content to a temp file first, then rename into place
// Avoids corrupting the content if the disk is full or there are IO errors
tmpFile := p.file + ".tmp"
file, err := os.OpenFile(tmpFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("open file (%v) for writing: %w", tmpFile, err)
}
defer file.Close()
_, err = file.Write(data)
if err != nil {
return fmt.Errorf("write new config to temp file (%v): %w", tmpFile, err)
}
if err := file.Sync(); err != nil {
return fmt.Errorf("sync new config temp file (%v): %w", tmpFile, err)
}
// Rename to replace the previous file
if err := os.Rename(tmpFile, p.file); err != nil {
return fmt.Errorf("rename temp config file to final destination: %w", err)
}
return nil
}
func (p *ActiveConfigPersistence) SequencerState() (RunningState, error) {
config, err := p.read()
if err != nil {
return Unset, err
}
if config.SequencerStarted == nil {
return Unset, nil
} else if *config.SequencerStarted {
return Started, nil
} else {
return Stopped, nil
}
}
func (p *ActiveConfigPersistence) read() (persistedState, error) {
p.lock.Lock()
defer p.lock.Unlock()
data, err := os.ReadFile(p.file)
if errors.Is(err, os.ErrNotExist) {
return persistedState{}, nil
} else if err != nil {
return persistedState{}, fmt.Errorf("read config file (%v): %w", p.file, err)
}
var config persistedState
if err = json.Unmarshal(data, &config); err != nil {
return persistedState{}, fmt.Errorf("invalid config file (%v): %w", p.file, err)
}
return config, nil
}
// DisabledConfigPersistence provides an implementation of config persistence
// that does not persist anything and reports unset for all values
type DisabledConfigPersistence struct {
}
func (d DisabledConfigPersistence) SequencerState() (RunningState, error) {
return Unset, nil
}
func (d DisabledConfigPersistence) SequencerStarted() error {
return nil
}
func (d DisabledConfigPersistence) SequencerStopped() error {
return nil
}
package node
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestActive(t *testing.T) {
create := func() *ActiveConfigPersistence {
dir := t.TempDir()
config, err := NewConfigPersistence(dir + "/state")
require.NoError(t, err)
return config
}
t.Run("SequencerStateUnsetWhenFileDoesNotExist", func(t *testing.T) {
config := create()
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, Unset, state)
})
t.Run("PersistSequencerStarted", func(t *testing.T) {
config1 := create()
require.NoError(t, config1.SequencerStarted())
state, err := config1.SequencerState()
require.NoError(t, err)
require.Equal(t, Started, state)
config2, err := NewConfigPersistence(config1.file)
require.NoError(t, err)
state, err = config2.SequencerState()
require.NoError(t, err)
require.Equal(t, Started, state)
})
t.Run("PersistSequencerStopped", func(t *testing.T) {
config1 := create()
require.NoError(t, config1.SequencerStopped())
state, err := config1.SequencerState()
require.NoError(t, err)
require.Equal(t, Stopped, state)
config2, err := NewConfigPersistence(config1.file)
require.NoError(t, err)
state, err = config2.SequencerState()
require.NoError(t, err)
require.Equal(t, Stopped, state)
})
t.Run("PersistMultipleChanges", func(t *testing.T) {
config := create()
require.NoError(t, config.SequencerStarted())
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, Started, state)
require.NoError(t, config.SequencerStopped())
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, Stopped, state)
})
t.Run("CreateParentDirs", func(t *testing.T) {
dir := t.TempDir()
config, err := NewConfigPersistence(dir + "/some/dir/state")
require.NoError(t, err)
// Should be unset before file exists
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, Unset, state)
require.NoFileExists(t, config.file)
// Should create directories when updating
require.NoError(t, config.SequencerStarted())
require.FileExists(t, config.file)
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, Started, state)
})
}
func TestDisabledConfigPersistence_AlwaysUnset(t *testing.T) {
config := DisabledConfigPersistence{}
state, err := config.SequencerState()
require.NoError(t, err)
require.Equal(t, Unset, state)
require.NoError(t, config.SequencerStarted())
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, Unset, state)
require.NoError(t, config.SequencerStopped())
state, err = config.SequencerState()
require.NoError(t, err)
require.Equal(t, Unset, state)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment