Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions internal/core/cache/counter_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,37 @@ type CounterKey struct {
TimeType byte
}

// counterSubKey is a secondary index key for per-object counter lookup.
type counterSubKey struct {
Counter string
TimeType byte
}

// CounterCache stores the latest counter values per object.
type CounterCache struct {
mu sync.RWMutex
store map[CounterKey]value.Value
byObj map[int32]map[counterSubKey]value.Value // secondary index for O(1) per-object lookup
}

func NewCounterCache() *CounterCache {
return &CounterCache{
store: make(map[CounterKey]value.Value),
byObj: make(map[int32]map[counterSubKey]value.Value),
}
}

func (c *CounterCache) Put(key CounterKey, v value.Value) {
c.mu.Lock()
defer c.mu.Unlock()
c.store[key] = v

sub, ok := c.byObj[key.ObjHash]
if !ok {
sub = make(map[counterSubKey]value.Value)
c.byObj[key.ObjHash] = sub
}
sub[counterSubKey{Counter: key.Counter, TimeType: key.TimeType}] = v
}

func (c *CounterCache) Get(key CounterKey) (value.Value, bool) {
Expand All @@ -49,11 +64,13 @@ func (c *CounterCache) Get(key CounterKey) (value.Value, bool) {
func (c *CounterCache) GetByObjHash(objHash int32) map[string]value.Value {
c.mu.RLock()
defer c.mu.RUnlock()
result := make(map[string]value.Value)
for k, v := range c.store {
if k.ObjHash == objHash {
result[k.Counter] = v
}
sub, ok := c.byObj[objHash]
if !ok {
return nil
}
result := make(map[string]value.Value, len(sub))
for k, v := range sub {
result[k.Counter] = v
}
return result
}
}
6 changes: 4 additions & 2 deletions internal/core/xlog_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ func (xc *XLogCore) run() {
}

// Serialize and cache for real-time streaming
o := protocol.NewDataOutputX()
o := protocol.AcquireDataOutputX()
pack.WritePack(o, xp)
b := o.ToByteArray()
b := make([]byte, len(o.ToByteArray()))
copy(b, o.ToByteArray())
protocol.ReleaseDataOutputX(o)
xc.xlogCache.Put(xp.ObjHash, xp.Elapsed, xp.Error != 0, b)

// Aggregate by service group for real-time throughput display
Expand Down
22 changes: 13 additions & 9 deletions internal/db/io/flushctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ type IFlushable interface {

// FlushController manages periodic flushing of registered IFlushable instances.
var flushCtl = &flushController{
items: make(map[IFlushable]struct{}),
items: make(map[IFlushable]time.Time),
}

type flushController struct {
mu sync.Mutex
items map[IFlushable]struct{}
items map[IFlushable]time.Time // value = last flush time
started bool
}

Expand All @@ -30,7 +30,7 @@ func GetFlushController() *flushController {
func (fc *flushController) Register(f IFlushable) {
fc.mu.Lock()
defer fc.mu.Unlock()
fc.items[f] = struct{}{}
fc.items[f] = time.Time{}
if !fc.started {
fc.started = true
go fc.run()
Expand All @@ -48,16 +48,20 @@ func (fc *flushController) run() {
defer ticker.Stop()
for range ticker.C {
fc.mu.Lock()
now := time.Now()
items := make([]IFlushable, 0, len(fc.items))
for f := range fc.items {
items = append(items, f)
for f, lastFlush := range fc.items {
if f.IsDirty() && now.Sub(lastFlush) >= f.Interval() {
items = append(items, f)
}
}
fc.mu.Unlock()

for _, f := range items {
if f.IsDirty() {
f.Flush()
}
f.Flush()
fc.mu.Lock()
fc.items[f] = time.Now()
fc.mu.Unlock()
}
}
}
}
4 changes: 2 additions & 2 deletions internal/db/profile/profile_wr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ProfileWR struct {

func NewProfileWR(baseDir string, queueSize int) *ProfileWR {
if queueSize <= 0 {
queueSize = 1000
queueSize = 10000
}
return &ProfileWR{
baseDir: baseDir,
Expand Down Expand Up @@ -56,7 +56,7 @@ func (w *ProfileWR) Add(entry *ProfileEntry) {
select {
case w.queue <- entry:
default:
slog.Debug("ProfileWR: queue full, dropping")
slog.Warn("ProfileWR: queue full, dropping")
}
}

Expand Down
14 changes: 14 additions & 0 deletions internal/db/text/text_rd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync"
)

const maxTextRDCacheSize = 100000

// cacheKey uniquely identifies a cached text entry.
type cacheKey struct {
Div string
Expand Down Expand Up @@ -71,6 +73,18 @@ func (r *TextRD) GetString(div string, hash int32) (string, error) {
return "", nil
}

// Evict ~10% if over capacity to prevent unbounded growth
if len(r.cache) >= maxTextRDCacheSize {
evictCount := maxTextRDCacheSize / 10
for k := range r.cache {
delete(r.cache, k)
evictCount--
if evictCount <= 0 {
break
}
}
}

// Cache the result
r.cache[key] = text
return text, nil
Expand Down
23 changes: 23 additions & 0 deletions internal/protocol/dataoutx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"io"
"math"
"sync"
)

const (
Expand All @@ -20,6 +21,28 @@ type DataOutputX struct {
scratch [8]byte // reusable scratch buffer — already on heap with the struct, avoids per-call allocation
}

var dataOutputXPool = sync.Pool{
New: func() any {
return &DataOutputX{buf: make([]byte, 0, 256)}
},
}

// AcquireDataOutputX returns a DataOutputX from the pool, ready for reuse.
func AcquireDataOutputX() *DataOutputX {
o := dataOutputXPool.Get().(*DataOutputX)
o.Reset()
return o
}

// ReleaseDataOutputX returns a DataOutputX to the pool.
// Do not use the DataOutputX after releasing; copy ToByteArray() before calling this.
func ReleaseDataOutputX(o *DataOutputX) {
if o.writer != nil {
return
}
dataOutputXPool.Put(o)
}

func NewDataOutputX() *DataOutputX {
return &DataOutputX{buf: make([]byte, 0, 256)}
}
Expand Down