diff --git a/op-conductor/conductor/service.go b/op-conductor/conductor/service.go index b9c8273e9e9..9f8054bc84d 100644 --- a/op-conductor/conductor/service.go +++ b/op-conductor/conductor/service.go @@ -190,6 +190,7 @@ func (c *OpConductor) initConsensus(ctx context.Context) error { TrailingLogs: c.cfg.RaftTrailingLogs, HeartbeatTimeout: c.cfg.RaftHeartbeatTimeout, LeaderLeaseTimeout: c.cfg.RaftLeaderLeaseTimeout, + Metrics: c.metrics, } cons, err := consensus.NewRaftConsensus(c.log, raftConsensusConfig) if err != nil { @@ -293,6 +294,13 @@ func (oc *OpConductor) initRPCServer(ctx context.Context) error { Service: api, }) + // Binary SSZ commit endpoint. Sized to comfortably fit a 10MB SSZ block; + // raise alongside the JSON-RPC body limit if larger blocks are needed. + server.AddHandler( + conductorrpc.CommitUnsafePayloadPath, + conductorrpc.BinaryCommitHandler(oc.log, oc, 16*1024*1024, oc.metrics), + ) + if oc.cfg.RPCEnableProxy { execClient, err := dial.DialEthClientWithTimeout(ctx, 1*time.Minute, oc.log, oc.cfg.ExecutionRPC) if err != nil { @@ -637,6 +645,12 @@ func (oc *OpConductor) CommitUnsafePayload(_ context.Context, payload *eth.Execu return oc.cons.CommitUnsafePayload(payload) } +// CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the cluster FSM. +// Used by the binary HTTP endpoint to avoid the JSON-decode -> SSZ-marshal round trip. +func (oc *OpConductor) CommitUnsafePayloadSSZ(_ context.Context, ssz []byte) error { + return oc.cons.CommitUnsafePayloadSSZ(ssz) +} + // SequencerHealthy returns true if sequencer is healthy. func (oc *OpConductor) SequencerHealthy(_ context.Context) bool { return oc.healthy.Load() diff --git a/op-conductor/consensus/iface.go b/op-conductor/consensus/iface.go index 8c53a7e64ac..99c0a9c6291 100644 --- a/op-conductor/consensus/iface.go +++ b/op-conductor/consensus/iface.go @@ -4,6 +4,16 @@ import ( "github.com/ethereum-optimism/optimism/op-service/eth" ) +// ConsensusMetrics defines metrics for consensus commit operations. +// This is intentionally minimal so that the consensus layer does not +// depend on the full metrics.Metricer interface. +type ConsensusMetrics interface { + RecordCommitDuration(marshalSec, raftApplySec float64) + RecordCommitPayloadSize(payloadBytes float64) + RecordFSMApplyDuration(seconds float64) + RecordLogStoreDuration(seconds float64) +} + // ServerSuffrage determines whether a Server in a Configuration gets a vote. type ServerSuffrage int @@ -72,6 +82,10 @@ type Consensus interface { // CommitUnsafePayload commits latest unsafe payload to the FSM in a strongly consistent fashion. CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelope) error + // CommitUnsafePayloadSSZ commits a pre-SSZ-encoded unsafe payload to the FSM, + // skipping the marshal step. The bytes are handed directly to raft.Apply and + // validated by the FSM on receive. Used by the binary HTTP endpoint. + CommitUnsafePayloadSSZ(ssz []byte) error // LatestUnsafePayload returns the latest unsafe payload from FSM in a strongly consistent fashion. LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) diff --git a/op-conductor/consensus/mocks/Consensus.go b/op-conductor/consensus/mocks/Consensus.go index 1b874266bf1..d8a8ed048d6 100644 --- a/op-conductor/consensus/mocks/Consensus.go +++ b/op-conductor/consensus/mocks/Consensus.go @@ -275,6 +275,51 @@ func (_c *Consensus_CommitUnsafePayload_Call) RunAndReturn(run func(payload *eth return _c } +// CommitUnsafePayloadSSZ provides a mock function for the type Consensus +func (_mock *Consensus) CommitUnsafePayloadSSZ(ssz []byte) error { + ret := _mock.Called(ssz) + + if len(ret) == 0 { + panic("no return value specified for CommitUnsafePayloadSSZ") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func([]byte) error); ok { + r0 = returnFunc(ssz) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// Consensus_CommitUnsafePayloadSSZ_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CommitUnsafePayloadSSZ' +type Consensus_CommitUnsafePayloadSSZ_Call struct { + *mock.Call +} + +// CommitUnsafePayloadSSZ is a helper method to define mock.On call +// - ssz +func (_e *Consensus_Expecter) CommitUnsafePayloadSSZ(ssz interface{}) *Consensus_CommitUnsafePayloadSSZ_Call { + return &Consensus_CommitUnsafePayloadSSZ_Call{Call: _e.mock.On("CommitUnsafePayloadSSZ", ssz)} +} + +func (_c *Consensus_CommitUnsafePayloadSSZ_Call) Run(run func(ssz []byte)) *Consensus_CommitUnsafePayloadSSZ_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]byte)) + }) + return _c +} + +func (_c *Consensus_CommitUnsafePayloadSSZ_Call) Return(err error) *Consensus_CommitUnsafePayloadSSZ_Call { + _c.Call.Return(err) + return _c +} + +func (_c *Consensus_CommitUnsafePayloadSSZ_Call) RunAndReturn(run func(ssz []byte) error) *Consensus_CommitUnsafePayloadSSZ_Call { + _c.Call.Return(run) + return _c +} + // DemoteVoter provides a mock function for the type Consensus func (_mock *Consensus) DemoteVoter(id string, version uint64) error { ret := _mock.Called(id, version) diff --git a/op-conductor/consensus/raft.go b/op-conductor/consensus/raft.go index 9c186df259b..3eb76ead0a4 100644 --- a/op-conductor/consensus/raft.go +++ b/op-conductor/consensus/raft.go @@ -24,6 +24,8 @@ var _ Consensus = (*RaftConsensus)(nil) type RaftConsensus struct { log log.Logger + metrics ConsensusMetrics + serverID raft.ServerID r *raft.Raft @@ -58,6 +60,10 @@ type RaftConsensusConfig struct { TrailingLogs uint64 HeartbeatTimeout time.Duration LeaderLeaseTimeout time.Duration + + // Metrics collects sub-operation timing data for the commit path. + // If nil, no metrics are recorded. + Metrics ConsensusMetrics } // checkTCPPortOpen attempts to connect to the specified address and returns an error if the connection fails. @@ -89,10 +95,11 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, var err error logStorePath := filepath.Join(baseDir, "raft-log.db") - logStore, err := boltdb.NewBoltStore(logStorePath) + boltLogStore, err := boltdb.NewBoltStore(logStorePath) if err != nil { return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %w`, logStorePath, err) } + logStore := wrapInstrumentedLogStore(boltLogStore, cfg.Metrics) stableStorePath := filepath.Join(baseDir, "raft-stable.db") stableStore, err := boltdb.NewBoltStore(stableStorePath) @@ -131,7 +138,7 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, } log.Info("Raft server network transport is up", "addr", transport.LocalAddr()) - fsm := NewUnsafeHeadTracker(log) + fsm := NewUnsafeHeadTracker(log, cfg.Metrics) r, err := raft.NewRaft(rc, fsm, logStore, stableStore, snapshotStore, transport) if err != nil { @@ -173,6 +180,7 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, return &RaftConsensus{ log: log, + metrics: cfg.Metrics, r: r, serverID: raft.ServerID(cfg.ServerID), unsafeTracker: fsm, @@ -180,6 +188,39 @@ func NewRaftConsensus(log log.Logger, cfg *RaftConsensusConfig) (*RaftConsensus, }, err } +func wrapInstrumentedLogStore(store raft.LogStore, metrics ConsensusMetrics) raft.LogStore { + if metrics == nil { + return store + } + return &instrumentedLogStore{ + LogStore: store, + metrics: metrics, + } +} + +type instrumentedLogStore struct { + raft.LogStore + metrics ConsensusMetrics +} + +func (s *instrumentedLogStore) StoreLog(logEntry *raft.Log) error { + start := time.Now() + err := s.LogStore.StoreLog(logEntry) + if s.metrics != nil { + s.metrics.RecordLogStoreDuration(time.Since(start).Seconds()) + } + return err +} + +func (s *instrumentedLogStore) StoreLogs(logEntries []*raft.Log) error { + start := time.Now() + err := s.LogStore.StoreLogs(logEntries) + if s.metrics != nil { + s.metrics.RecordLogStoreDuration(time.Since(start).Seconds()) + } + return err +} + // Addr returns the address to contact this raft consensus server. // If no explicit address to advertise was configured, // the local network address that the raft-consensus server is listening on will be used. @@ -296,19 +337,77 @@ func (rc *RaftConsensus) CommitUnsafePayload(payload *eth.ExecutionPayloadEnvelo rc.log.Debug("committing unsafe payload", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex()) var buf bytes.Buffer + marshalStart := time.Now() if _, err := payload.MarshalSSZ(&buf); err != nil { return errors.Wrap(err, "failed to marshal payload envelope") } + marshalDur := time.Since(marshalStart) + applyStart := time.Now() f := rc.r.Apply(buf.Bytes(), defaultTimeout) if err := f.Error(); err != nil { return errors.Wrap(err, "failed to apply payload envelope") } + if resp := f.Response(); resp != nil { + if err, ok := resp.(error); ok { + return errors.Wrap(err, "failed to apply payload envelope to FSM") + } + return fmt.Errorf("unexpected raft apply response: %T: %v", resp, resp) + } + applyDur := time.Since(applyStart) + + if rc.metrics != nil { + rc.metrics.RecordCommitDuration(marshalDur.Seconds(), applyDur.Seconds()) + rc.metrics.RecordCommitPayloadSize(float64(buf.Len())) + } rc.log.Debug("unsafe payload committed", "number", uint64(payload.ExecutionPayload.BlockNumber), "hash", payload.ExecutionPayload.BlockHash.Hex()) return nil } +// CommitUnsafePayloadSSZ implements Consensus. The raw SSZ bytes are validated +// before being passed to raft.Apply so that malformed payloads are rejected +// before they are replicated to the raft log on every peer. After raft.Apply +// returns, the FSM response is also checked because the FSM may reject payloads +// for reasons beyond SSZ validity (e.g. version mismatches). +func (rc *RaftConsensus) CommitUnsafePayloadSSZ(ssz []byte) error { + if len(ssz) == 0 { + return errors.New("empty payload") + } + + // Pre-validate: attempt to decode the SSZ before writing to the raft log. + // This mirrors the implicit validation the JSON-RPC path gets from its + // json.Unmarshal → MarshalSSZ round-trip and prevents corrupt data from + // being replicated to every peer's log store. + env := new(eth.ExecutionPayloadEnvelope) + if err := env.UnmarshalSSZ(eth.BlockV4, uint32(len(ssz)), bytes.NewReader(ssz)); err != nil { + if err := env.UnmarshalSSZ(eth.BlockV3, uint32(len(ssz)), bytes.NewReader(ssz)); err != nil { + return fmt.Errorf("invalid ssz payload: %w", err) + } + } + + applyStart := time.Now() + f := rc.r.Apply(ssz, defaultTimeout) + if err := f.Error(); err != nil { + return errors.Wrap(err, "failed to apply payload envelope") + } + if resp := f.Response(); resp != nil { + if err, ok := resp.(error); ok { + return errors.Wrap(err, "failed to apply payload envelope to FSM") + } + return fmt.Errorf("unexpected raft apply response: %T: %v", resp, resp) + } + applyDur := time.Since(applyStart) + + if rc.metrics != nil { + // Mirror the metrics emitted by the JSON-RPC CommitUnsafePayload path. + // Marshal duration is zero because the SSZ bytes arrive pre-encoded. + rc.metrics.RecordCommitDuration(0, applyDur.Seconds()) + rc.metrics.RecordCommitPayloadSize(float64(len(ssz))) + } + return nil +} + // LatestUnsafePayload implements Consensus, it returns the latest unsafe payload from FSM in a strongly consistent fashion. func (rc *RaftConsensus) LatestUnsafePayload() (*eth.ExecutionPayloadEnvelope, error) { if err := rc.r.Barrier(defaultTimeout).Error(); err != nil { diff --git a/op-conductor/consensus/raft_fsm.go b/op-conductor/consensus/raft_fsm.go index 809e7f5dc51..bcd056cb5d5 100644 --- a/op-conductor/consensus/raft_fsm.go +++ b/op-conductor/consensus/raft_fsm.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/ethereum/go-ethereum/log" "github.com/hashicorp/raft" @@ -17,18 +18,27 @@ var _ raft.FSM = (*unsafeHeadTracker)(nil) // unsafeHeadTracker implements raft.FSM for storing unsafe head payload into raft consensus layer. type unsafeHeadTracker struct { log log.Logger + metrics ConsensusMetrics mtx sync.RWMutex unsafeHead *eth.ExecutionPayloadEnvelope } -func NewUnsafeHeadTracker(log log.Logger) *unsafeHeadTracker { +func NewUnsafeHeadTracker(log log.Logger, metrics ConsensusMetrics) *unsafeHeadTracker { return &unsafeHeadTracker{ - log: log, + log: log, + metrics: metrics, } } // Apply implements raft.FSM, it applies the latest change (latest unsafe head payload) to FSM. func (t *unsafeHeadTracker) Apply(l *raft.Log) interface{} { + start := time.Now() + defer func() { + if t.metrics != nil { + t.metrics.RecordFSMApplyDuration(time.Since(start).Seconds()) + } + }() + if len(l.Data) == 0 { return fmt.Errorf("log data is nil or empty") } diff --git a/op-conductor/metrics/metrics.go b/op-conductor/metrics/metrics.go index 05e95e54f3c..b3003bba822 100644 --- a/op-conductor/metrics/metrics.go +++ b/op-conductor/metrics/metrics.go @@ -3,6 +3,7 @@ package metrics import ( "strconv" + "github.com/ethereum-optimism/optimism/op-conductor/consensus" "github.com/ethereum-optimism/optimism/op-service/httputil" opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" "github.com/prometheus/client_golang/prometheus" @@ -21,11 +22,17 @@ type Metricer interface { RecordLoopExecutionTime(duration float64) RecordRollupBoostConnectionAttempts(success bool, source string) RecordWebSocketClientCount(count int) + // RecordBinaryCommitDuration records end-to-end handler duration for + // POST /commit-unsafe-payload requests. The equivalent metric for the + // JSON-RPC path is rpc_server_request_duration_seconds{method=conductor_commitUnsafePayload}. + RecordBinaryCommitDuration(seconds float64, success bool) opmetrics.RPCMetricer + consensus.ConsensusMetrics } // Metrics implementation must implement RegistryMetricer to allow the metrics server to work. var _ opmetrics.RegistryMetricer = (*Metrics)(nil) +var _ consensus.ConsensusMetrics = (*Metrics)(nil) type Metrics struct { ns string @@ -46,6 +53,14 @@ type Metrics struct { loopExecutionTime prometheus.Histogram webSocketClients prometheus.Gauge + + binaryCommitRequestDuration *prometheus.HistogramVec + + commitMarshalDuration prometheus.Histogram + commitRaftApplyDuration prometheus.Histogram + commitPayloadSize prometheus.Histogram + fsmApplyDuration prometheus.Histogram + logStoreDuration prometheus.Histogram } func (m *Metrics) Registry() *prometheus.Registry { @@ -122,6 +137,44 @@ func NewMetrics() *Metrics { Name: "websocket_clients_connected", Help: "Number of WebSocket clients currently connected to the hub", }), + binaryCommitRequestDuration: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: Namespace, + Name: "binary_commit_request_duration_seconds", + Help: "End-to-end handler duration for POST /commit-unsafe-payload requests. " + + "Directly comparable to rpc_server_request_duration_seconds{method=conductor_commitunsafepayload} " + + "on the JSON-RPC path.", + Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5}, + }, []string{"success"}), + commitMarshalDuration: factory.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Name: "commit_marshal_duration_seconds", + Help: "Time (in seconds) to SSZ-marshal the payload in CommitUnsafePayload", + Buckets: []float64{.0001, .00025, .0005, .001, .0025, .005, .01, .025}, + }), + commitRaftApplyDuration: factory.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Name: "commit_raft_apply_duration_seconds", + Help: "Time (in seconds) for raft Apply (replication, storage, and FSM apply) in CommitUnsafePayload", + Buckets: []float64{.001, .0025, .005, .01, .025, .05, .1, .25, .5}, + }), + commitPayloadSize: factory.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Name: "commit_payload_size_bytes", + Help: "SSZ-encoded payload size in bytes", + Buckets: []float64{1000, 10000, 50000, 100000, 500000, 1000000, 1500000, 2000000, 2500000}, + }), + fsmApplyDuration: factory.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Name: "fsm_apply_duration_seconds", + Help: "Time (in seconds) spent in FSM Apply (SSZ decode plus unsafe-head update)", + Buckets: []float64{.0001, .00025, .0005, .001, .0025, .005, .01, .025}, + }), + logStoreDuration: factory.NewHistogram(prometheus.HistogramOpts{ + Namespace: Namespace, + Name: "raft_log_store_duration_seconds", + Help: "Time (in seconds) spent writing raft log entries to the underlying log store", + Buckets: []float64{.0001, .00025, .0005, .001, .0025, .005, .01, .025, .05}, + }), } } @@ -183,3 +236,24 @@ func (m *Metrics) RecordRollupBoostConnectionAttempts(success bool, source strin func (m *Metrics) RecordWebSocketClientCount(count int) { m.webSocketClients.Set(float64(count)) } + +func (m *Metrics) RecordBinaryCommitDuration(seconds float64, success bool) { + m.binaryCommitRequestDuration.WithLabelValues(strconv.FormatBool(success)).Observe(seconds) +} + +func (m *Metrics) RecordCommitDuration(marshalSec, raftApplySec float64) { + m.commitMarshalDuration.Observe(marshalSec) + m.commitRaftApplyDuration.Observe(raftApplySec) +} + +func (m *Metrics) RecordCommitPayloadSize(payloadBytes float64) { + m.commitPayloadSize.Observe(payloadBytes) +} + +func (m *Metrics) RecordFSMApplyDuration(seconds float64) { + m.fsmApplyDuration.Observe(seconds) +} + +func (m *Metrics) RecordLogStoreDuration(seconds float64) { + m.logStoreDuration.Observe(seconds) +} diff --git a/op-conductor/metrics/noop.go b/op-conductor/metrics/noop.go index 832d7258875..e560461f4dc 100644 --- a/op-conductor/metrics/noop.go +++ b/op-conductor/metrics/noop.go @@ -18,3 +18,8 @@ func (*NoopMetricsImpl) RecordHealthCheck(success bool, err error) func (*NoopMetricsImpl) RecordLoopExecutionTime(duration float64) {} func (*NoopMetricsImpl) RecordRollupBoostConnectionAttempts(success bool, source string) {} func (*NoopMetricsImpl) RecordWebSocketClientCount(count int) {} +func (*NoopMetricsImpl) RecordBinaryCommitDuration(seconds float64, success bool) {} +func (*NoopMetricsImpl) RecordCommitDuration(marshalSec, raftApplySec float64) {} +func (*NoopMetricsImpl) RecordCommitPayloadSize(payloadBytes float64) {} +func (*NoopMetricsImpl) RecordFSMApplyDuration(seconds float64) {} +func (*NoopMetricsImpl) RecordLogStoreDuration(seconds float64) {} diff --git a/op-conductor/rpc/binary.go b/op-conductor/rpc/binary.go new file mode 100644 index 00000000000..2db6fcbd04f --- /dev/null +++ b/op-conductor/rpc/binary.go @@ -0,0 +1,112 @@ +package rpc + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +// CommitUnsafePayloadPath is the HTTP route for the SSZ binary commit endpoint. +// External clients (op-node, base's Rust CL replacement, etc.) POST a raw +// SSZ-encoded ExecutionPayloadEnvelope here. The body is handed verbatim to +// raft.Apply; the FSM validates by attempting UnmarshalSSZ on receive. +// +// Wire format: +// - method: POST +// - path: /commit-unsafe-payload +// - content-type: application/octet-stream +// - body: SSZ-encoded ExecutionPayloadEnvelope (no length prefix, +// body length implies SSZ scope; current FSM tries V4 then +// V3, matching the JSON-RPC path). +// - response: 200 on success, 4xx for client errors, 5xx for raft +// errors. Body is empty on 200, plain-text error message +// otherwise. +const CommitUnsafePayloadPath = "/commit-unsafe-payload" + +// SSZContentType is the content type clients should send for the binary endpoint. +const SSZContentType = "application/octet-stream" + +// commitSSZBackend is the subset of the conductor backend the binary endpoint needs. +type commitSSZBackend interface { + CommitUnsafePayloadSSZ(ctx context.Context, ssz []byte) error +} + +// BinaryCommitRecorder records latency for the binary commit endpoint. +// Implement this with a Prometheus histogram to get a metric comparable to +// op_conductor_rpc_server_request_duration_seconds on the JSON-RPC path. +type BinaryCommitRecorder interface { + RecordBinaryCommitDuration(seconds float64, success bool) +} + +// BinaryCommitHandler returns an http.Handler that accepts SSZ-encoded payloads +// and forwards them to the conductor's raft layer. maxBodyBytes caps the +// request body to prevent DoS; 0 means no cap (not recommended). +// recorder may be nil (metrics disabled). +func BinaryCommitHandler(lgr log.Logger, backend commitSSZBackend, maxBodyBytes int64, recorder BinaryCommitRecorder) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + if r.Method != http.MethodPost { + w.Header().Set("Allow", "POST") + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if ct := r.Header.Get("Content-Type"); ct != "" && ct != SSZContentType { + http.Error(w, fmt.Sprintf("unsupported content-type %q, want %s", ct, SSZContentType), http.StatusUnsupportedMediaType) + return + } + + body := r.Body + if maxBodyBytes > 0 { + // Reject upfront if Content-Length declares an over-limit body. + if r.ContentLength > maxBodyBytes { + http.Error(w, fmt.Sprintf("payload too large: %d > %d", r.ContentLength, maxBodyBytes), http.StatusRequestEntityTooLarge) + return + } + body = http.MaxBytesReader(w, r.Body, maxBodyBytes) + } + + // When Content-Length is set, pre-allocate the exact buffer and use + // ReadFull. Avoids io.ReadAll's grow-and-copy. ~10% faster end-to-end + // for multi-MB bodies; pure win when the client sends Content-Length + // (every standard HTTP client does). + var ssz []byte + var err error + if r.ContentLength > 0 { + ssz = make([]byte, r.ContentLength) + _, err = io.ReadFull(body, ssz) + } else { + ssz, err = io.ReadAll(body) + } + if err != nil { + var maxErr *http.MaxBytesError + if errors.As(err, &maxErr) { + http.Error(w, fmt.Sprintf("payload too large: > %d bytes", maxErr.Limit), http.StatusRequestEntityTooLarge) + return + } + http.Error(w, fmt.Sprintf("read body: %v", err), http.StatusBadRequest) + return + } + if len(ssz) == 0 { + http.Error(w, "empty payload", http.StatusBadRequest) + return + } + + if err := backend.CommitUnsafePayloadSSZ(r.Context(), ssz); err != nil { + lgr.Warn("failed to commit unsafe payload (binary)", "err", err, "size", len(ssz)) + if recorder != nil { + recorder.RecordBinaryCommitDuration(time.Since(start).Seconds(), false) + } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if recorder != nil { + recorder.RecordBinaryCommitDuration(time.Since(start).Seconds(), true) + } + w.WriteHeader(http.StatusOK) + }) +} diff --git a/op-conductor/rpc/client.go b/op-conductor/rpc/client.go index 0a88cc6538e..7c48c20ff6a 100644 --- a/op-conductor/rpc/client.go +++ b/op-conductor/rpc/client.go @@ -1,7 +1,12 @@ package rpc import ( + "bytes" "context" + "fmt" + "io" + "net/http" + "strings" "github.com/ethereum/go-ethereum/rpc" @@ -137,3 +142,58 @@ func (c *APIClient) ClusterMembership(ctx context.Context) (*consensus.ClusterMe err := c.c.CallContext(ctx, &clusterMembership, prefixRPC("clusterMembership")) return &clusterMembership, err } + +// BinaryCommitClient is a thin HTTP client for the SSZ binary commit endpoint. +// It is intentionally separate from APIClient (which speaks JSON-RPC) so that +// callers can choose the binary path without sharing transport or codec. +type BinaryCommitClient struct { + httpClient *http.Client + endpoint string +} + +// NewBinaryCommitClient constructs a client targeting baseURL (e.g. +// "http://conductor:8547"). httpClient may be nil; the default is used. +func NewBinaryCommitClient(baseURL string, httpClient *http.Client) *BinaryCommitClient { + if httpClient == nil { + httpClient = http.DefaultClient + } + return &BinaryCommitClient{ + httpClient: httpClient, + endpoint: strings.TrimRight(baseURL, "/") + CommitUnsafePayloadPath, + } +} + +// CommitUnsafePayload SSZ-encodes payload and POSTs it to the conductor's +// binary endpoint. Returns nil on 200, otherwise an error including the +// server's response body. +func (c *BinaryCommitClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error { + var buf bytes.Buffer + if _, err := payload.MarshalSSZ(&buf); err != nil { + return fmt.Errorf("marshal ssz: %w", err) + } + return c.CommitUnsafePayloadSSZ(ctx, buf.Bytes()) +} + +// CommitUnsafePayloadSSZ sends already-SSZ-encoded bytes. Useful when the +// caller already has the SSZ form (e.g. constructed directly by the EL client). +func (c *BinaryCommitClient) CommitUnsafePayloadSSZ(ctx context.Context, ssz []byte) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint, bytes.NewReader(ssz)) + if err != nil { + return err + } + req.Header.Set("Content-Type", SSZContentType) + req.ContentLength = int64(len(ssz)) + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + _, _ = io.Copy(io.Discard, resp.Body) + return nil + } + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("commit failed: %s: %s", resp.Status, strings.TrimSpace(string(body))) +} diff --git a/op-node/config/config.go b/op-node/config/config.go index 4b56347c9e4..8762199ce3b 100644 --- a/op-node/config/config.go +++ b/op-node/config/config.go @@ -81,9 +81,10 @@ type Config struct { Cancel context.CancelCauseFunc // Conductor is used to determine this node is the leader sequencer. - ConductorEnabled bool - ConductorRpc ConductorRPCFunc - ConductorRpcTimeout time.Duration + ConductorEnabled bool + ConductorRpc ConductorRPCFunc + ConductorRpcTimeout time.Duration + ConductorBinaryCommit bool // AltDA config AltDA altda.CLIConfig diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 9cf587f92e9..12e97f774f9 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -408,6 +408,15 @@ var ( Value: time.Second * 1, Category: SequencerCategory, } + ConductorBinaryCommitFlag = &cli.BoolFlag{ + Name: "conductor.binary-commit", + Usage: "Use the conductor's SSZ-binary commit-unsafe-payload endpoint instead of " + + "JSON-RPC. Avoids JSON-encoding the payload (~10x faster on the leader RPC " + + "handler). Requires conductor with binary endpoint support.", + EnvVars: prefixEnvVars("CONDUCTOR_BINARY_COMMIT"), + Value: false, + Category: SequencerCategory, + } /* Interop flags, experimental. */ InteropRPCAddr = &cli.StringFlag{ Name: "interop.rpc.addr", @@ -506,6 +515,7 @@ var optionalFlags = []cli.Flag{ ConductorEnabledFlag, ConductorRpcFlag, ConductorRpcTimeoutFlag, + ConductorBinaryCommitFlag, SafeDBPath, L1ChainConfig, L2EngineKind, diff --git a/op-node/node/conductor.go b/op-node/node/conductor.go index b1f0e9c8c03..7189889bec5 100644 --- a/op-node/node/conductor.go +++ b/op-node/node/conductor.go @@ -3,6 +3,7 @@ package node import ( "context" "fmt" + "net/http" "sync/atomic" "time" @@ -25,7 +26,8 @@ type ConductorClient struct { metrics *metrics.Metrics log log.Logger - apiClient locks.RWValue[*conductorRpc.APIClient] + apiClient locks.RWValue[*conductorRpc.APIClient] + binaryClient locks.RWValue[*conductorRpc.BinaryCommitClient] // overrideLeader is used to override the leader check for disaster recovery purposes. // During disaster situations where the cluster is unhealthy (no leader, only 1 or less nodes up), @@ -63,6 +65,16 @@ func (c *ConductorClient) initialize(ctx context.Context) error { return fmt.Errorf("failed to dial conductor RPC: %w", err) } c.apiClient.Value = conductorRpc.NewAPIClient(conductorRpcClient) + + if c.cfg.ConductorBinaryCommit { + c.binaryClient.Lock() + defer c.binaryClient.Unlock() + // Reuse the conductor RPC endpoint URL — the binary commit handler is + // served on the same HTTP server at conductorRpc.CommitUnsafePayloadPath. + httpClient := &http.Client{Timeout: c.cfg.ConductorRpcTimeout} + c.binaryClient.Value = conductorRpc.NewBinaryCommitClient(endpoint, httpClient) + c.log.Info("Conductor binary commit endpoint enabled", "endpoint", endpoint) + } return nil } @@ -94,6 +106,8 @@ func (c *ConductorClient) Leader(ctx context.Context) (bool, error) { } // CommitUnsafePayload commits an unsafe payload to the conductor log. +// Uses the SSZ-binary endpoint when --conductor.binary-commit is set; +// otherwise the existing JSON-RPC method. func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope) error { if c.overrideLeader.Load() { return nil @@ -105,10 +119,13 @@ func (c *ConductorClient) CommitUnsafePayload(ctx context.Context, payload *eth. ctx, cancel := context.WithTimeout(ctx, c.cfg.ConductorRpcTimeout) defer cancel() - err := retry.Do0(ctx, 2, retry.Fixed(50*time.Millisecond), func() error { + commit := func() error { + if bc := c.binaryClient.Get(); bc != nil { + return bc.CommitUnsafePayload(ctx, payload) + } return c.apiClient.Get().CommitUnsafePayload(ctx, payload) - }) - return err + } + return retry.Do0(ctx, 2, retry.Fixed(50*time.Millisecond), commit) } // OverrideLeader implements conductor.SequencerConductor. diff --git a/op-node/service.go b/op-node/service.go index 42bea8478c6..2eebacd0524 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -124,7 +124,8 @@ func NewConfig(ctx cliiface.Context, log log.Logger) (*config.Config, error) { ConductorRpc: func(context.Context) (string, error) { return conductorRPCEndpoint, nil }, - ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name), + ConductorRpcTimeout: ctx.Duration(flags.ConductorRpcTimeoutFlag.Name), + ConductorBinaryCommit: ctx.Bool(flags.ConductorBinaryCommitFlag.Name), AltDA: altda.ReadCLIConfig(ctx),