From a7097bd419171b73645f825c75a71efb6fffa064 Mon Sep 17 00:00:00 2001 From: Marcin Fabrykowski Date: Wed, 11 Jun 2025 21:42:58 +0200 Subject: [PATCH] add support for memdb Signed-off-by: Marcin Fabrykowski --- dist/examples/config.toml | 4 + internal/cli/common.go | 7 +- internal/cli/ex-get-slots.go | 2 +- internal/config/settings.go | 4 + internal/config/toml.go | 19 +++ internal/lock/etcdlock.go | 213 +++++++++++++++++++++++++++++++ internal/lock/lock.go | 216 +++----------------------------- internal/lock/memdblock.go | 76 +++++++++++ internal/server/airlock.go | 2 +- internal/server/pre_reboot.go | 2 +- internal/server/steady_state.go | 2 +- 11 files changed, 340 insertions(+), 207 deletions(-) create mode 100644 internal/lock/etcdlock.go create mode 100644 internal/lock/memdblock.go diff --git a/dist/examples/config.toml b/dist/examples/config.toml index 952362d..66823ed 100644 --- a/dist/examples/config.toml +++ b/dist/examples/config.toml @@ -15,6 +15,10 @@ tls = false [etcd3] endpoints = [ "http://127.0.0.1:2379" ] +# MemoryDB configuration +[memdb] +enabled = false + # Lock configuration, base reboot group [lock] default_group_name = "default" diff --git a/internal/cli/common.go b/internal/cli/common.go index 242d897..77b11b1 100644 --- a/internal/cli/common.go +++ b/internal/cli/common.go @@ -82,8 +82,11 @@ func verbosityLevel(verbCount int) logrus.Level { // validateSettings sanity-checks all settings func validateSettings(cfg config.Settings) error { - if len(cfg.EtcdEndpoints) == 0 { - return errors.New("no etcd3 endpoints configured") + if len(cfg.EtcdEndpoints) == 0 && !cfg.MemDBEnabled { + return errors.New("no etcd3 endpoints configured and MemDB is not enabled") + } + if len(cfg.EtcdEndpoints) > 0 && cfg.MemDBEnabled { + return errors.New("both etcd3 endpoints and MemDB are configured, choose one") } if len(cfg.LockGroups) == 0 { return errors.New("no lock-groups configured") diff --git a/internal/cli/ex-get-slots.go b/internal/cli/ex-get-slots.go index 705bd4d..2f6d929 100644 --- a/internal/cli/ex-get-slots.go +++ b/internal/cli/ex-get-slots.go @@ -33,7 +33,7 @@ func runGetSlots(cmd *cobra.Command, cmdArgs []string) error { ctx, cancel := context.WithTimeout(context.Background(), runSettings.EtcdTxnTimeout) defer cancel() - manager, err := lock.NewManager(ctx, runSettings.EtcdEndpoints, runSettings.ClientCertPubPath, runSettings.ClientCertKeyPath, runSettings.EtcdTxnTimeout, group, maxSlots) + manager, err := lock.NewManager(ctx, runSettings, group, maxSlots) if err != nil { return err } diff --git a/internal/config/settings.go b/internal/config/settings.go index 08b4fef..c5f9233 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -21,6 +21,8 @@ type Settings struct { ClientCertKeyPath string EtcdTxnTimeout time.Duration + MemDBEnabled bool + LockGroups map[string]uint64 } @@ -55,6 +57,8 @@ func defaultSettings() Settings { EtcdEndpoints: []string{}, EtcdTxnTimeout: time.Duration(3) * time.Second, + MemDBEnabled: false, + LockGroups: make(map[string]uint64), } } diff --git a/internal/config/toml.go b/internal/config/toml.go index 5c5434c..8487574 100644 --- a/internal/config/toml.go +++ b/internal/config/toml.go @@ -11,6 +11,7 @@ type tomlConfig struct { Service *serviceSection `toml:"service"` Status *statusSection `toml:"status"` Etcd3 *etcd3Section `toml:"etcd3"` + MemDB *MemDBSection `toml:"memdb"` Lock *lockSection `toml:"lock"` } @@ -37,6 +38,11 @@ type etcd3Section struct { ClientCertKeyPath string `toml:"client_cert_key_path"` } +// MemDBSection holds the optional `memdb` fragment +type MemDBSection struct { + Enabled *bool `toml:"enabled"` +} + // lockSection holds the optional `lock` fragment type lockSection struct { DefaultGroupName *string `toml:"default_group_name"` @@ -77,6 +83,9 @@ func mergeToml(settings *Settings, cfg tomlConfig) { if cfg.Etcd3 != nil { mergeEtcd(settings, *cfg.Etcd3) } + if cfg.MemDB != nil { + mergeMemDB(settings, *cfg.MemDB) + } if cfg.Lock != nil { mergeLock(settings, *cfg.Lock) } @@ -136,6 +145,16 @@ func mergeEtcd(settings *Settings, cfg etcd3Section) { } } +func mergeMemDB(settings *Settings, cfg MemDBSection) { + if settings == nil { + return + } + + if cfg.Enabled != nil { + settings.MemDBEnabled = *cfg.Enabled + } +} + func mergeLock(settings *Settings, cfg lockSection) { if settings == nil { return diff --git a/internal/lock/etcdlock.go b/internal/lock/etcdlock.go new file mode 100644 index 0000000..5e55960 --- /dev/null +++ b/internal/lock/etcdlock.go @@ -0,0 +1,213 @@ +package lock + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "time" + + transport "go.etcd.io/etcd/client/pkg/v3/transport" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + keyTemplate = "com.coreos.airlock/groups/%s/v1/semaphore" +) + +var ( + // ErrNilEtcdManager is returned on nil manager + ErrNilEtcdManager = errors.New("nil EtcdManager") +) + +// EtcdManager takes care of locking for clients +type EtcdManager struct { + client *clientv3.Client + keyPath string +} + +// NewEtcdManager returns a new lock manager, ensuring the underlying semaphore is initialized. +func NewEtcdManager(ctx context.Context, etcdURLs []string, certPubPath string, certKeyPath string, txnTimeoutMs time.Duration, group string, slots uint64) (*EtcdManager, error) { + tlsInfo := transport.TLSInfo{ + CertFile: certPubPath, + KeyFile: certKeyPath, + } + + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + + client, err := clientv3.New(clientv3.Config{ + Endpoints: etcdURLs, + DialTimeout: time.Duration(txnTimeoutMs) * time.Millisecond, + TLS: tlsConfig, + }) + if err != nil { + return nil, err + } + + keyPath := fmt.Sprintf(keyTemplate, url.QueryEscape(group)) + manager := EtcdManager{client, keyPath} + + if err := manager.ensureInit(ctx, slots); err != nil { + return nil, err + } + + return &manager, nil +} + +// RecursiveLock adds this lock `id` as a holder of the semaphore +// +// It will return an error if there is a problem getting or setting the +// semaphore, or if the maximum number of holders has been reached. +func (m *EtcdManager) RecursiveLock(ctx context.Context, id string) (*Semaphore, error) { + sem, version, err := m.get(ctx) + if err != nil { + return nil, err + } + + held, err := sem.RecursiveLock(id) + if err != nil { + return nil, err + } + if held { + return sem, nil + } + + if err := m.set(ctx, sem, version); err != nil { + return nil, err + } + + return sem, nil +} + +// UnlockIfHeld removes this lock `id` as a holder of the semaphore +// +// It returns an error if there is a problem getting or setting the semaphore. +func (m *EtcdManager) UnlockIfHeld(ctx context.Context, id string) (*Semaphore, error) { + sem, version, err := m.get(ctx) + if err != nil { + return nil, err + } + + if err := sem.UnlockIfHeld(id); err != nil { + return nil, err + } + + if err := m.set(ctx, sem, version); err != nil { + return nil, err + } + + return sem, nil +} + +// FetchSemaphore fetches current semaphore version +func (m *EtcdManager) FetchSemaphore(ctx context.Context) (*Semaphore, error) { + semaphore, _, err := m.get(ctx) + if err != nil { + return nil, err + } + + return semaphore, nil +} + +// Close reaps all running goroutines +func (m *EtcdManager) Close() { + if m == nil { + return + } + + m.client.Close() +} + +// ensureInit initialize the semaphore in etcd, if it does not exist yet +func (m *EtcdManager) ensureInit(ctx context.Context, slots uint64) error { + if m == nil { + return ErrNilEtcdManager + } + + sem := NewSemaphore(slots) + semValue, err := sem.String() + if err != nil { + return err + } + + _, err = m.client.Txn(ctx).If( + // version=0 means that the key does not exist. + clientv3.Compare(clientv3.Version(m.keyPath), "=", 0), + ).Then( + clientv3.OpPut(m.keyPath, semValue), + ).Commit() + + if err != nil { + return err + } + return nil +} + +// get returns the current semaphore value and version, or an error +func (m *EtcdManager) get(ctx context.Context) (*Semaphore, int64, error) { + resp, err := m.client.Get(ctx, m.keyPath) + if err != nil { + return nil, 0, err + } + if resp.Count != 1 { + return nil, 0, fmt.Errorf("unexpected number of results: %d", resp.Count) + } + + var data []byte + var version int64 + for _, kv := range resp.Kvs { + data = kv.Value + version = kv.Version + break + } + if version == 0 { + return nil, 0, errors.New("key at version 0") + } + if len(data) == 0 { + return nil, 0, errors.New("empty semaphore value") + } + + sem := &Semaphore{} + err = json.Unmarshal(data, sem) + if err != nil { + return nil, 0, err + } + + return sem, version, nil +} + +// set updates the semaphore in etcd, if `version` matches the one previously observed +func (m *EtcdManager) set(ctx context.Context, sem *Semaphore, version int64) error { + if m == nil { + return ErrNilEtcdManager + } + if sem == nil { + return ErrNilSemaphore + } + + data, err := json.Marshal(sem) + if err != nil { + return err + } + + // Conditionally Put if version in etcd is still the same we observed. + // If the condition is not met, the transaction will return as "not succeeding". + resp, err := m.client.Txn(ctx).If( + clientv3.Compare(clientv3.Version(m.keyPath), "=", version), + ).Then( + clientv3.OpPut(m.keyPath, string(data)), + ).Commit() + + if err != nil { + return err + } + if !resp.Succeeded { + return errors.New("conflict on semaphore detected, aborting") + } + + return nil +} diff --git a/internal/lock/lock.go b/internal/lock/lock.go index 277d06a..f8ae706 100644 --- a/internal/lock/lock.go +++ b/internal/lock/lock.go @@ -2,212 +2,26 @@ package lock import ( "context" - "encoding/json" - "errors" - "fmt" - "net/url" - "time" - - transport "go.etcd.io/etcd/client/pkg/v3/transport" - clientv3 "go.etcd.io/etcd/client/v3" -) - -const ( - keyTemplate = "com.coreos.airlock/groups/%s/v1/semaphore" -) - -var ( - // ErrNilManager is returned on nil manager - ErrNilManager = errors.New("nil Manager") + "github.com/coreos/airlock/internal/config" ) -// Manager takes care of locking for clients -type Manager struct { - client *clientv3.Client - keyPath string -} - -// NewManager returns a new lock manager, ensuring the underlying semaphore is initialized. -func NewManager(ctx context.Context, etcdURLs []string, certPubPath string, certKeyPath string, txnTimeoutMs time.Duration, group string, slots uint64) (*Manager, error) { - tlsInfo := transport.TLSInfo{ - CertFile: certPubPath, - KeyFile: certKeyPath, - } - - tlsConfig, err := tlsInfo.ClientConfig() - if err != nil { - return nil, err - } - - client, err := clientv3.New(clientv3.Config{ - Endpoints: etcdURLs, - DialTimeout: time.Duration(txnTimeoutMs) * time.Millisecond, - TLS: tlsConfig, - }) - if err != nil { - return nil, err - } - - keyPath := fmt.Sprintf(keyTemplate, url.QueryEscape(group)) - manager := Manager{client, keyPath} - - if err := manager.ensureInit(ctx, slots); err != nil { - return nil, err - } - - return &manager, nil -} - -// RecursiveLock adds this lock `id` as a holder of the semaphore -// -// It will return an error if there is a problem getting or setting the -// semaphore, or if the maximum number of holders has been reached. -func (m *Manager) RecursiveLock(ctx context.Context, id string) (*Semaphore, error) { - sem, version, err := m.get(ctx) - if err != nil { - return nil, err - } - - held, err := sem.RecursiveLock(id) - if err != nil { - return nil, err - } - if held { - return sem, nil - } - - if err := m.set(ctx, sem, version); err != nil { - return nil, err - } - - return sem, nil -} - -// UnlockIfHeld removes this lock `id` as a holder of the semaphore -// -// It returns an error if there is a problem getting or setting the semaphore. -func (m *Manager) UnlockIfHeld(ctx context.Context, id string) (*Semaphore, error) { - sem, version, err := m.get(ctx) - if err != nil { - return nil, err - } - - if err := sem.UnlockIfHeld(id); err != nil { - return nil, err - } - - if err := m.set(ctx, sem, version); err != nil { - return nil, err - } - - return sem, nil +type Manager interface { + RecursiveLock(ctx context.Context, key string) (*Semaphore, error) + UnlockIfHeld(ctx context.Context, id string) (*Semaphore, error) + FetchSemaphore(ctx context.Context) (*Semaphore, error) + Close() } -// FetchSemaphore fetches current semaphore version -func (m *Manager) FetchSemaphore(ctx context.Context) (*Semaphore, error) { - semaphore, _, err := m.get(ctx) - if err != nil { - return nil, err - } - - return semaphore, nil -} - -// Close reaps all running goroutines -func (m *Manager) Close() { - if m == nil { - return - } - - m.client.Close() -} - -// ensureInit initialize the semaphore in etcd, if it does not exist yet -func (m *Manager) ensureInit(ctx context.Context, slots uint64) error { - if m == nil { - return ErrNilManager - } +func NewManager(ctx context.Context, a *config.Settings, group string, maxSlots uint64) (Manager, error) { - sem := NewSemaphore(slots) - semValue, err := sem.String() - if err != nil { - return err + // Create a new lock manager. + var err error + var manager Manager + if len(a.EtcdEndpoints) > 0 { + manager, err = NewEtcdManager(ctx, a.EtcdEndpoints, a.ClientCertPubPath, a.ClientCertKeyPath, a.EtcdTxnTimeout, group, maxSlots) } - - _, err = m.client.Txn(ctx).If( - // version=0 means that the key does not exist. - clientv3.Compare(clientv3.Version(m.keyPath), "=", 0), - ).Then( - clientv3.OpPut(m.keyPath, semValue), - ).Commit() - - if err != nil { - return err - } - return nil -} - -// get returns the current semaphore value and version, or an error -func (m *Manager) get(ctx context.Context) (*Semaphore, int64, error) { - resp, err := m.client.Get(ctx, m.keyPath) - if err != nil { - return nil, 0, err - } - if resp.Count != 1 { - return nil, 0, fmt.Errorf("unexpected number of results: %d", resp.Count) - } - - var data []byte - var version int64 - for _, kv := range resp.Kvs { - data = kv.Value - version = kv.Version - break + if a.MemDBEnabled { + manager, err = NewMemDBManager(ctx, group, maxSlots) } - if version == 0 { - return nil, 0, errors.New("key at version 0") - } - if len(data) == 0 { - return nil, 0, errors.New("empty semaphore value") - } - - sem := &Semaphore{} - err = json.Unmarshal(data, sem) - if err != nil { - return nil, 0, err - } - - return sem, version, nil -} - -// set updates the semaphore in etcd, if `version` matches the one previously observed -func (m *Manager) set(ctx context.Context, sem *Semaphore, version int64) error { - if m == nil { - return ErrNilManager - } - if sem == nil { - return ErrNilSemaphore - } - - data, err := json.Marshal(sem) - if err != nil { - return err - } - - // Conditionally Put if version in etcd is still the same we observed. - // If the condition is not met, the transaction will return as "not succeeding". - resp, err := m.client.Txn(ctx).If( - clientv3.Compare(clientv3.Version(m.keyPath), "=", version), - ).Then( - clientv3.OpPut(m.keyPath, string(data)), - ).Commit() - - if err != nil { - return err - } - if !resp.Succeeded { - return errors.New("conflict on semaphore detected, aborting") - } - - return nil + return manager, err } diff --git a/internal/lock/memdblock.go b/internal/lock/memdblock.go new file mode 100644 index 0000000..fcaa830 --- /dev/null +++ b/internal/lock/memdblock.go @@ -0,0 +1,76 @@ +package lock + +import ( + "context" + "errors" +) + +type MemDBManager struct { + lock *Semaphore +} + +var ( + ErrEmptyGroupName = errors.New("empty group name") + ErrZeroSlots = errors.New("zero slots specified") + ErrNilMemDBManager = errors.New("nil MemDBManager") + + memoryDB map[string]*Semaphore = make(map[string]*Semaphore, 0) +) + +func NewMemDBManager(ctx context.Context, group string, maxSlots uint64) (*MemDBManager, error) { + if group == "" { + return nil, ErrEmptyGroupName + } + + if maxSlots == 0 { + return nil, ErrZeroSlots + } + + manager := &MemDBManager{ + lock: memoryDB[group], + } + if manager.lock == nil { + manager.lock = NewSemaphore(maxSlots) + memoryDB[group] = manager.lock + } + + return manager, nil +} + +func (m *MemDBManager) RecursiveLock(ctx context.Context, key string) (*Semaphore, error) { + if m.lock == nil { + return nil, ErrNilMemDBManager + } + + held, err := m.lock.RecursiveLock(key) + if err != nil { + return nil, err + } + if held { + return m.lock, nil + } + + return m.lock, nil +} + +func (m *MemDBManager) UnlockIfHeld(ctx context.Context, id string) (*Semaphore, error) { + if m.lock == nil { + return nil, ErrNilMemDBManager + } + + if err := m.lock.UnlockIfHeld(id); err != nil { + return nil, err + } + + return m.lock, nil +} + +func (m *MemDBManager) FetchSemaphore(ctx context.Context) (*Semaphore, error) { + if m.lock != nil { + return m.lock, nil + } + return nil, errors.New("no semaphore found") +} + +func (m *MemDBManager) Close() { +} diff --git a/internal/server/airlock.go b/internal/server/airlock.go index 0c87de8..126b124 100644 --- a/internal/server/airlock.go +++ b/internal/server/airlock.go @@ -100,7 +100,7 @@ func (a *Airlock) checkConsistency(ctx context.Context, group string, maxSlots u defer cancel() // TODO(lucab): re-arrange so that the manager can be re-used. - manager, err := lock.NewManager(innerCtx, a.EtcdEndpoints, a.ClientCertPubPath, a.ClientCertKeyPath, a.EtcdTxnTimeout, group, maxSlots) + manager, err := lock.NewManager(innerCtx, &a.Settings, group, maxSlots) if err != nil { logrus.WithFields(logrus.Fields{ "reason": err.Error(), diff --git a/internal/server/pre_reboot.go b/internal/server/pre_reboot.go index b5ed983..776127f 100644 --- a/internal/server/pre_reboot.go +++ b/internal/server/pre_reboot.go @@ -70,7 +70,7 @@ func (a *Airlock) preRebootHandler(req *http.Request) *herrors.HTTPError { ctx, cancel := context.WithTimeout(context.Background(), a.EtcdTxnTimeout) defer cancel() - lockManager, err := lock.NewManager(ctx, a.EtcdEndpoints, a.ClientCertPubPath, a.ClientCertKeyPath, a.EtcdTxnTimeout, nodeIdentity.Group, slots) + lockManager, err := lock.NewManager(ctx, &a.Settings, nodeIdentity.Group, slots) if err != nil { msg := fmt.Sprintf("failed to initialize semaphore manager: %s", err.Error()) logrus.Errorln(msg) diff --git a/internal/server/steady_state.go b/internal/server/steady_state.go index b9a1f89..4be7de4 100644 --- a/internal/server/steady_state.go +++ b/internal/server/steady_state.go @@ -70,7 +70,7 @@ func (a *Airlock) steadyStateHandler(req *http.Request) *herrors.HTTPError { ctx, cancel := context.WithTimeout(context.Background(), a.EtcdTxnTimeout) defer cancel() - lockManager, err := lock.NewManager(ctx, a.EtcdEndpoints, a.ClientCertPubPath, a.ClientCertKeyPath, a.EtcdTxnTimeout, nodeIdentity.Group, slots) + lockManager, err := lock.NewManager(ctx, &a.Settings, nodeIdentity.Group, slots) if err != nil { msg := fmt.Sprintf("failed to initialize semaphore manager: %s", err.Error()) logrus.Errorln(msg)