Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions block/internal/cache/generic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ func (c *Cache) setSeenBatch(hashes []string, height uint64) {
}
}

func (c *Cache) getHashByHeight(height uint64) (string, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
h, ok := c.hashByHeight[height]
return h, ok
}

func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
23 changes: 23 additions & 0 deletions block/internal/cache/generic_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,26 @@ func TestCache_DeleteAllForHeight_CleansHashAndDA(t *testing.T) {
_, ok = c.getDAIncludedByHeight(2)
assert.True(t, ok)
}

func TestCache_getHashByHeight(t *testing.T) {
c := NewCache(nil, "")

h, ok := c.getHashByHeight(42)
assert.False(t, ok)
assert.Empty(t, h)

c.setSeen("abc", 42)
h, ok = c.getHashByHeight(42)
assert.True(t, ok)
assert.Equal(t, "abc", h)

// setDAIncluded also maintains hashByHeight.
c.setDAIncluded("def", 7, 100)
h, ok = c.getHashByHeight(100)
assert.True(t, ok)
assert.Equal(t, "def", h)

c.deleteAllForHeight(42)
_, ok = c.getHashByHeight(42)
assert.False(t, ok)
}
101 changes: 80 additions & 21 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ type CacheManager interface {
// Header operations
IsHeaderSeen(hash string) bool
SetHeaderSeen(hash string, blockHeight uint64)
GetHeaderHashByHeight(blockHeight uint64) (string, bool)
GetHeaderDAIncludedByHash(hash string) (uint64, bool)
GetHeaderDAIncludedByHeight(blockHeight uint64) (uint64, bool)
SetHeaderDAIncluded(hash string, daHeight uint64, blockHeight uint64)
RemoveHeaderDAIncluded(hash string)

// Pending signed header operations (in-flight, pre-persistence)
SetPendingSignedHeader(h *types.SignedHeader, source string)
GetPendingSignedHeader(blockHeight uint64) (*types.SignedHeader, string, bool)
RemovePendingSignedHeader(blockHeight uint64)

// Data operations
IsDataSeen(hash string) bool
SetDataSeen(hash string, blockHeight uint64)
Expand Down Expand Up @@ -92,17 +98,24 @@ type Manager interface {
var _ Manager = (*implementation)(nil)

type implementation struct {
headerCache *Cache
dataCache *Cache
txCache *Cache
txTimestamps *sync.Map // map[string]time.Time
pendingEvents map[uint64]*common.DAHeightEvent
pendingMu sync.Mutex
pendingHeaders *PendingHeaders
pendingData *PendingData
store store.Store
config config.Config
logger zerolog.Logger
headerCache *Cache
dataCache *Cache
txCache *Cache
txTimestamps *sync.Map // map[string]time.Time
pendingEvents map[uint64]*common.DAHeightEvent
pendingMu sync.Mutex
pendingHeaders *PendingHeaders
pendingData *PendingData
pendingSignedHeaders map[uint64]pendingSignedHeader
pendingSignedHeadersMu sync.RWMutex
store store.Store
config config.Config
logger zerolog.Logger
}

type pendingSignedHeader struct {
header *types.SignedHeader
source string
}

// NewManager creates a new Manager, restoring or clearing persisted state as configured.
Expand All @@ -122,16 +135,17 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag
}

impl := &implementation{
headerCache: headerCache,
dataCache: dataCache,
txCache: txCache,
txTimestamps: new(sync.Map),
pendingEvents: make(map[uint64]*common.DAHeightEvent),
pendingHeaders: pendingHeaders,
pendingData: pendingData,
store: st,
config: cfg,
logger: logger,
headerCache: headerCache,
dataCache: dataCache,
txCache: txCache,
txTimestamps: new(sync.Map),
pendingEvents: make(map[uint64]*common.DAHeightEvent),
pendingHeaders: pendingHeaders,
pendingData: pendingData,
pendingSignedHeaders: make(map[uint64]pendingSignedHeader),
store: st,
config: cfg,
logger: logger,
}

if cfg.ClearCache {
Expand All @@ -157,6 +171,11 @@ func (m *implementation) SetHeaderSeen(hash string, blockHeight uint64) {
m.headerCache.setSeen(hash, blockHeight)
}

// GetHeaderHashByHeight returns the first-seen header hash at the given height.
func (m *implementation) GetHeaderHashByHeight(blockHeight uint64) (string, bool) {
return m.headerCache.getHashByHeight(blockHeight)
}

func (m *implementation) GetHeaderDAIncludedByHash(hash string) (uint64, bool) {
return m.headerCache.getDAIncluded(hash)
}
Expand All @@ -173,6 +192,42 @@ func (m *implementation) RemoveHeaderDAIncluded(hash string) {
m.headerCache.removeDAIncluded(hash)
}

// SetPendingSignedHeader records the first SignedHeader seen at this height.
// First-write-wins: later writes at the same height are ignored so the
// double-sign detector can match alternates against the original observation.
func (m *implementation) SetPendingSignedHeader(h *types.SignedHeader, source string) {
if h == nil {
return
}
height := h.Height()
m.pendingSignedHeadersMu.Lock()
defer m.pendingSignedHeadersMu.Unlock()
if _, exists := m.pendingSignedHeaders[height]; exists {
return
}
m.pendingSignedHeaders[height] = pendingSignedHeader{header: h, source: source}
}

// GetPendingSignedHeader returns the first-seen SignedHeader and the source
// ("da" or "p2p") it was observed from.
func (m *implementation) GetPendingSignedHeader(blockHeight uint64) (*types.SignedHeader, string, bool) {
m.pendingSignedHeadersMu.RLock()
defer m.pendingSignedHeadersMu.RUnlock()
entry, ok := m.pendingSignedHeaders[blockHeight]
if !ok {
return nil, "", false
}
return entry.header, entry.source, true
}

// RemovePendingSignedHeader evicts the entry once the height is persisted, so
// the store becomes the authoritative source for double-sign comparison.
func (m *implementation) RemovePendingSignedHeader(blockHeight uint64) {
m.pendingSignedHeadersMu.Lock()
delete(m.pendingSignedHeaders, blockHeight)
m.pendingSignedHeadersMu.Unlock()
}

// DaHeight returns the highest DA height seen across header and data caches.
func (m *implementation) DaHeight() uint64 {
return max(m.headerCache.daHeight(), m.dataCache.daHeight())
Expand Down Expand Up @@ -263,6 +318,7 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
m.pendingMu.Lock()
delete(m.pendingEvents, blockHeight)
m.pendingMu.Unlock()
m.RemovePendingSignedHeader(blockHeight)

// Note: txCache is intentionally NOT deleted here because:
// 1. Transactions are tracked by hash, not by block height (they use height 0)
Expand Down Expand Up @@ -408,6 +464,9 @@ func (m *implementation) ClearFromStore() error {
m.dataCache = NewCache(m.store, DataDAIncludedPrefix)
m.txCache = NewCache(nil, "")
m.pendingEvents = make(map[uint64]*common.DAHeightEvent)
m.pendingSignedHeadersMu.Lock()
m.pendingSignedHeaders = make(map[uint64]pendingSignedHeader)
m.pendingSignedHeadersMu.Unlock()

// Initialize DA height from store metadata to ensure DaHeight() is never 0.
m.initDAHeightFromStore(ctx)
Expand Down
Loading