Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions op-conductor/conductor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (c *OpConductor) initConsensus(ctx context.Context) error {
TrailingLogs: c.cfg.RaftTrailingLogs,
HeartbeatTimeout: c.cfg.RaftHeartbeatTimeout,
LeaderLeaseTimeout: c.cfg.RaftLeaderLeaseTimeout,
Metrics: c.metrics,
}
cons, err := consensus.NewRaftConsensus(c.log, raftConsensusConfig)
if err != nil {
Expand Down Expand Up @@ -293,6 +294,13 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error {
Service: api,
})

// Binary SSZ commit endpoint. Sized to comfortably fit a 10MB SSZ block;
// raise alongside the JSON-RPC body limit if larger blocks are needed.
server.AddHandler(
conductorrpc.CommitUnsafePayloadPath,
conductorrpc.BinaryCommitHandler(oc.log, oc, 16*1024*1024, oc.metrics),
)

if oc.cfg.RPCEnableProxy {
execClient, err := dial.DialEthClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.ExecutionRPC)
if err != nil {
Expand Down Expand Up @@ -637,6 +645,12 @@ func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.Execu
return oc.cons.CommitUnsafePayload(payload)
}

// CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the cluster FSM.
// Used by the binary HTTP endpoint to avoid the JSON-decode -> SSZ-marshal round trip.
func (oc *OpConductor) CommitUnsafePayloadSSZ(_ context.Context, ssz []byte) error {
return oc.cons.CommitUnsafePayloadSSZ(ssz)
}

// SequencerHealthy returns true if sequencer is healthy.
func (oc *OpConductor) SequencerHealthy(_ context.Context) bool {
return oc.healthy.Load()
Expand Down
14 changes: 14 additions & 0 deletions op-conductor/consensus/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ import (
"github.com/ethereum-optimism/optimism/op-service/eth"
)

// ConsensusMetrics defines metrics for consensus commit operations.
// This is intentionally minimal so that the consensus layer does not
// depend on the full metrics.Metricer interface.
type ConsensusMetrics interface {
RecordCommitDuration(marshalSec, raftApplySec float64)
RecordCommitPayloadSize(payloadBytes float64)
RecordFSMApplyDuration(seconds float64)
RecordLogStoreDuration(seconds float64)
}

// ServerSuffrage determines whether a Server in a Configuration gets a vote.
type ServerSuffrage int

Expand Down Expand Up @@ -72,6 +82,10 @@ type Consensus interface {

// CommitUnsafePayload commits latest unsafe payload to the FSM in a strongly consistent fashion.
CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error
// CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the FSM,
// skipping the marshal step. The bytes are handed directly to raft.Apply and
// validated by the FSM on receive. Used by the binary HTTP endpoint.
CommitUnsafePayloadSSZ(ssz []byte) error
// LatestUnsafePayload returns the latest unsafe payload from FSM in a strongly consistent fashion.
LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error)

Expand Down
45 changes: 45 additions & 0 deletions op-conductor/consensus/mocks/Consensus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 101 additions & 2 deletions op-conductor/consensus/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var _ Consensus = (*RaftConsensus)(nil)
type RaftConsensus struct {
log log.Logger

metrics ConsensusMetrics

serverID raft.ServerID
r *raft.Raft

Expand Down Expand Up @@ -58,6 +60,10 @@ type RaftConsensusConfig struct {
TrailingLogs uint64
HeartbeatTimeout time.Duration
LeaderLeaseTimeout time.Duration

// Metrics collects sub-operation timing data for the commit path.
// If nil, no metrics are recorded.
Metrics ConsensusMetrics
}

// checkTCPPortOpen attempts to connect to the specified address and returns an error if the connection fails.
Expand Down Expand Up @@ -89,10 +95,11 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus,

var err error
logStorePath := filepath.Join(baseDir, "raft-log.db")
logStore, err := boltdb.NewBoltStore(logStorePath)
boltLogStore, err := boltdb.NewBoltStore(logStorePath)
if err != nil {
return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %w`, logStorePath, err)
}
logStore := wrapInstrumentedLogStore(boltLogStore, cfg.Metrics)

stableStorePath := filepath.Join(baseDir, "raft-stable.db")
stableStore, err := boltdb.NewBoltStore(stableStorePath)
Expand Down Expand Up @@ -131,7 +138,7 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus,
}
log.Info("Raft server network transport is up", "addr", transport.LocalAddr())

fsm := NewUnsafeHeadTracker(log)
fsm := NewUnsafeHeadTracker(log, cfg.Metrics)

r, err := raft.NewRaft(rc, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
Expand Down Expand Up @@ -173,13 +180,47 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus,

return &RaftConsensus{
log: log,
metrics: cfg.Metrics,
r: r,
serverID: raft.ServerID(cfg.ServerID),
unsafeTracker: fsm,
transport: transport,
}, err
}

func wrapInstrumentedLogStore(store raft.LogStore, metrics ConsensusMetrics) raft.LogStore {
if metrics == nil {
return store
}
return &instrumentedLogStore{
LogStore: store,
metrics: metrics,
}
}

type instrumentedLogStore struct {
raft.LogStore
metrics ConsensusMetrics
}

func (s *instrumentedLogStore) StoreLog(logEntry *raft.Log) error {
start := time.Now()
err := s.LogStore.StoreLog(logEntry)
if s.metrics != nil {
s.metrics.RecordLogStoreDuration(time.Since(start).Seconds())
}
return err
}

func (s *instrumentedLogStore) StoreLogs(logEntries []*raft.Log) error {
start := time.Now()
err := s.LogStore.StoreLogs(logEntries)
if s.metrics != nil {
s.metrics.RecordLogStoreDuration(time.Since(start).Seconds())
}
return err
}

// Addr returns the address to contact this raft consensus server.
// If no explicit address to advertise was configured,
// the local network address that the raft-consensus server is listening on will be used.
Expand Down Expand Up @@ -296,19 +337,77 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelo
rc.log.Debug("committing unsafe payload", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex())

var buf bytes.Buffer
marshalStart := time.Now()
if _, err := payload.MarshalSSZ(&buf); err != nil {
return errors.Wrap(err, "failed to marshal payload envelope")
}
marshalDur := time.Since(marshalStart)

applyStart := time.Now()
f := rc.r.Apply(buf.Bytes(), defaultTimeout)
if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply payload envelope")
}
if resp := f.Response(); resp != nil {
if err, ok := resp.(error); ok {
return errors.Wrap(err, "failed to apply payload envelope to FSM")
}
return fmt.Errorf("unexpected raft apply response: %T: %v", resp, resp)
}
applyDur := time.Since(applyStart)

if rc.metrics != nil {
rc.metrics.RecordCommitDuration(marshalDur.Seconds(), applyDur.Seconds())
rc.metrics.RecordCommitPayloadSize(float64(buf.Len()))
}
rc.log.Debug("unsafe payload committed", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex())

return nil
}

// CommitUnsafePayloadSSZ implements Consensus. The raw SSZ bytes are validated
// before being passed to raft.Apply so that malformed payloads are rejected
// before they are replicated to the raft log on every peer. After raft.Apply
// returns, the FSM response is also checked because the FSM may reject payloads
// for reasons beyond SSZ validity (e.g. version mismatches).
func (rc *RaftConsensus) CommitUnsafePayloadSSZ(ssz []byte) error {
if len(ssz) == 0 {
return errors.New("empty payload")
}

// Pre-validate: attempt to decode the SSZ before writing to the raft log.
// This mirrors the implicit validation the JSON-RPC path gets from its
// json.Unmarshal → MarshalSSZ round-trip and prevents corrupt data from
// being replicated to every peer's log store.
env := new(eth.ExecutionPayloadEnvelope)
if err := env.UnmarshalSSZ(eth.BlockV4, uint32(len(ssz)), bytes.NewReader(ssz)); err != nil {
if err := env.UnmarshalSSZ(eth.BlockV3, uint32(len(ssz)), bytes.NewReader(ssz)); err != nil {
return fmt.Errorf("invalid ssz payload: %w", err)
}
}

applyStart := time.Now()
f := rc.r.Apply(ssz, defaultTimeout)
if err := f.Error(); err != nil {
return errors.Wrap(err, "failed to apply payload envelope")
Copy link
Copy Markdown

@niran niran May 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Codex:

This only checks raft-level apply errors. The SSZ validation happens inside unsafeHeadTracker.Apply, but hashicorp/raft returns FSM Apply errors via ApplyFuture.Response(), not ApplyFuture.Error(). As written, malformed SSZ can be appended to the raft log, fail FSM decoding, and still return success from the binary endpoint. That is a behavior change from the JSON-RPC path, which decodes into an ExecutionPayloadEnvelope and successfully MarshalSSZs before raft.Apply.

We should either validate the SSZ before applying:

env := new(eth.ExecutionPayloadEnvelope)
if err := env.UnmarshalSSZ(eth.BlockV4, uint32(len(ssz)), bytes.NewReader(ssz)); err != nil {
    if err := env.UnmarshalSSZ(eth.BlockV3, uint32(len(ssz)), bytes.NewReader(ssz)); err != nil {
        return fmt.Errorf("invalid ssz payload: %w", err)
    }
}

Or check f.Response() for an error after f.Error() returns nil. For this FSM, success returns nil; decode/apply failures return the error object from unsafeHeadTracker.Apply.

f := rc.r.Apply(ssz, defaultTimeout)
if err := f.Error(); err != nil {
    return errors.Wrap(err, "failed to apply payload envelope")
}
if resp := f.Response(); resp != nil {
    if err, ok := resp.(error); ok {
        return errors.Wrap(err, "failed to apply payload envelope to FSM")
    }
    return errors.Errorf("unexpected raft apply response: %T: %v", resp, resp)
}

}
if resp := f.Response(); resp != nil {
if err, ok := resp.(error); ok {
return errors.Wrap(err, "failed to apply payload envelope to FSM")
}
return fmt.Errorf("unexpected raft apply response: %T: %v", resp, resp)
}
applyDur := time.Since(applyStart)

if rc.metrics != nil {
// Mirror the metrics emitted by the JSON-RPC CommitUnsafePayload path.
// Marshal duration is zero because the SSZ bytes arrive pre-encoded.
rc.metrics.RecordCommitDuration(0, applyDur.Seconds())
rc.metrics.RecordCommitPayloadSize(float64(len(ssz)))
}
return nil
}

// LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM in a strongly consistent fashion.
func (rc *RaftConsensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) {
if err := rc.r.Barrier(defaultTimeout).Error(); err != nil {
Expand Down
14 changes: 12 additions & 2 deletions op-conductor/consensus/raft_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"sync"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/hashicorp/raft"
Expand All @@ -17,18 +18,27 @@ var _ raft.FSM = (*unsafeHeadTracker)(nil)
// unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer.
type unsafeHeadTracker struct {
log log.Logger
metrics ConsensusMetrics
mtx sync.RWMutex
unsafeHead *eth.ExecutionPayloadEnvelope
}

func NewUnsafeHeadTracker(log log.Logger) *unsafeHeadTracker {
func NewUnsafeHeadTracker(log log.Logger, metrics ConsensusMetrics) *unsafeHeadTracker {
return &unsafeHeadTracker{
log: log,
log: log,
metrics: metrics,
}
}

// Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM.
func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} {
start := time.Now()
defer func() {
if t.metrics != nil {
t.metrics.RecordFSMApplyDuration(time.Since(start).Seconds())
}
}()

if len(l.Data) == 0 {
return fmt.Errorf("log data is nil or empty")
}
Expand Down
Loading
Loading