-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.go
More file actions
182 lines (158 loc) · 5.42 KB
/
stream.go
File metadata and controls
182 lines (158 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package tok
import (
"strings"
"sync"
"github.com/GrayCodeAI/tok/internal/core"
)
// StreamCompressor maintains a background-compressed version of accumulating content.
// As new content is appended, it re-compresses in the background so a compressed
// snapshot is always available without blocking.
//
// Delta-only compression: once initial compression has run, subsequent compressions
// only process the new (delta) content using context-independent layers (entropy,
// ngram), then append the result to the existing compressed output. This avoids
// re-compressing the entire accumulated content each time the threshold is crossed.
type StreamCompressor struct {
mu sync.RWMutex
raw []string // accumulated raw segments
compressed string // accumulated compressed output (ready to read)
stats Stats // stats from last compression
opts []Option // compression options
threshold int // re-compress when raw tokens exceed this
dirty bool // new content since last compression
compressing bool // background compression in progress
lastCompressedIdx int // raw[] index up to which content has been compressed
done chan struct{}
wg sync.WaitGroup // tracks in-progress compression goroutines
}
// NewStreamCompressor creates a background compressor that keeps compressed
// output ready at all times. Threshold is the token count that triggers
// background re-compression. If threshold <= 0, it defaults to 2000 tokens.
func NewStreamCompressor(threshold int, opts ...Option) *StreamCompressor {
if threshold <= 0 {
threshold = 2000
}
return &StreamCompressor{
threshold: threshold,
opts: opts,
done: make(chan struct{}),
}
}
// Append adds new content. If accumulated tokens exceed the threshold,
// triggers background re-compression. Only the new delta (content added
// since the last compression) is compressed using context-independent layers,
// then merged with the existing compressed output.
func (sc *StreamCompressor) Append(content string) {
if content == "" {
return
}
sc.mu.Lock()
sc.raw = append(sc.raw, content)
sc.dirty = true
shouldCompress := !sc.compressing && sc.tokenCountLocked() >= sc.threshold
if shouldCompress {
sc.compressing = true
}
// Take a snapshot of the delta (segments not yet compressed) under lock.
var deltaCopy string
var rawLen int
if shouldCompress {
rawLen = len(sc.raw)
deltaIdx := sc.lastCompressedIdx
if deltaIdx < rawLen {
deltaCopy = strings.Join(sc.raw[deltaIdx:], "\n")
} else {
// Nothing new to compress; don't start a goroutine.
sc.compressing = false
shouldCompress = false
}
}
sc.mu.Unlock()
if shouldCompress {
sc.wg.Add(1)
go sc.backgroundCompress(deltaCopy, rawLen)
}
}
// backgroundCompress runs delta-only compression on the given content and
// merges the result with the existing compressed output.
func (sc *StreamCompressor) backgroundCompress(delta string, compressedUpTo int) {
defer sc.wg.Done()
// Check if we've been shut down before starting.
select {
case <-sc.done:
sc.mu.Lock()
sc.compressing = false
sc.mu.Unlock()
return
default:
}
// Compress only the delta content using the user's options.
compressed, deltaStats := Compress(delta, sc.opts...)
sc.mu.Lock()
defer sc.mu.Unlock()
// Merge: accumulate compressed output rather than replacing it.
if sc.compressed == "" {
sc.compressed = compressed
} else {
sc.compressed = sc.compressed + "\n" + compressed
}
// Accumulate stats across delta compressions.
sc.stats.OriginalTokens += deltaStats.OriginalTokens
sc.stats.FinalTokens += deltaStats.FinalTokens
sc.stats.TokensSaved += deltaStats.TokensSaved
if sc.stats.OriginalTokens > 0 {
sc.stats.ReductionPercent = float64(sc.stats.TokensSaved) / float64(sc.stats.OriginalTokens) * 100
}
sc.lastCompressedIdx = compressedUpTo
sc.dirty = false
sc.compressing = false
}
// Snapshot returns the current compressed output without blocking.
// If compression hasn't run yet, returns the raw content joined.
func (sc *StreamCompressor) Snapshot() (string, Stats) {
sc.mu.RLock()
defer sc.mu.RUnlock()
if sc.compressed == "" {
return sc.joinRawLocked(), Stats{}
}
return sc.compressed, sc.stats
}
// Raw returns all accumulated raw content.
func (sc *StreamCompressor) Raw() string {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.joinRawLocked()
}
// TokenCount returns estimated token count of raw content.
func (sc *StreamCompressor) TokenCount() int {
sc.mu.RLock()
defer sc.mu.RUnlock()
return sc.tokenCountLocked()
}
// Reset clears all accumulated content and resets the compressor to its
// initial state, allowing it to be reused.
func (sc *StreamCompressor) Reset() {
sc.mu.Lock()
defer sc.mu.Unlock()
sc.raw = nil
sc.compressed = ""
sc.stats = Stats{}
sc.dirty = false
sc.compressing = false
sc.lastCompressedIdx = 0
}
// Close shuts down the background compressor and waits for any in-progress
// compression to finish.
func (sc *StreamCompressor) Close() {
close(sc.done)
sc.wg.Wait()
}
// joinRawLocked joins raw segments. Caller must hold at least a read lock.
func (sc *StreamCompressor) joinRawLocked() string {
return strings.Join(sc.raw, "\n")
}
// tokenCountLocked estimates the token count of raw content.
// Caller must hold at least a read lock.
func (sc *StreamCompressor) tokenCountLocked() int {
return core.EstimateTokens(sc.joinRawLocked())
}