Skip to content

Commit b932228

Browse files
committed
broker: multiple members for gzip compression
Each active gzip writer introduces a small but significant amount of memory overhead, on the order of several hundred KB. When there are many active journals being written, this can add up to a large amount of memory usage. This adds a threshold where incremental compression will occur only if is at least 1 MB of data to compress, and creates a new gzip writing mechanism that allows closing & creating new gzip members, concatenated into the same output file. The spool logic uses this mechanism to create a new gzip member for every batch of incremental compression, eliminating the need to hold a gzip writer in memory for the entire lifetime of the fragment file.
1 parent 0f34a43 commit b932228

File tree

3 files changed

+150
-4
lines changed

3 files changed

+150
-4
lines changed

broker/codecs/codecs.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func NewCodecWriter(w io.Writer, codec pb.CompressionCodec) (Compressor, error)
4141
case pb.CompressionCodec_NONE:
4242
return nopWriteCloser{w}, nil
4343
case pb.CompressionCodec_GZIP, pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION:
44-
return gzip.NewWriter(w), nil
44+
return &GzipBatcher{w: w}, nil
4545
case pb.CompressionCodec_SNAPPY:
4646
return snappy.NewBufferedWriter(w), nil
4747
case pb.CompressionCodec_ZSTANDARD:
@@ -51,6 +51,33 @@ func NewCodecWriter(w io.Writer, codec pb.CompressionCodec) (Compressor, error)
5151
}
5252
}
5353

54+
// GzipBatcher allows for batching multiple writes into a single gzip member,
55+
// concatenated per RFC 1952. Members are terminated by calling Close, with a
56+
// new gzip writer initialized on the next Write.
57+
type GzipBatcher struct {
58+
w io.Writer
59+
gz *gzip.Writer
60+
}
61+
62+
func (gzb *GzipBatcher) Write(p []byte) (n int, err error) {
63+
if gzb.gz == nil {
64+
gzb.gz = gzip.NewWriter(gzb.w)
65+
}
66+
67+
return gzb.gz.Write(p)
68+
}
69+
70+
func (gzb *GzipBatcher) Close() error {
71+
if gzb.gz == nil {
72+
return nil
73+
} else if err := gzb.gz.Close(); err != nil {
74+
return err
75+
}
76+
gzb.gz = nil
77+
78+
return nil
79+
}
80+
5481
type nopWriteCloser struct{ io.Writer }
5582

5683
func (nopWriteCloser) Close() error { return nil }

broker/fragment/spool.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import (
1414
pb "go.gazette.dev/core/broker/protocol"
1515
)
1616

17+
// Minimum size of accumulated uncompressed data before performing incremental
18+
// compression. Useful for GZIP, which creates a new member per compression
19+
// invocation.
20+
var compressionBatchSize = 1024 * 1024
21+
1722
// Spool is a Fragment which is in the process of being created, backed by a
1823
// local *os.File. As commits occur and the file extent is updated, the Spool
1924
// Fragment is also updated to reflect the new committed extent. At all
@@ -31,6 +36,8 @@ type Spool struct {
3136
// Length of compressed content written to |compressedFile|. Set only after
3237
// the compressor is finalized.
3338
compressedLength int64
39+
// Offset through which content has been compressed.
40+
compressedTo int64
3441
// Compressor of |compressedFile|.
3542
compressor codecs.Compressor
3643

@@ -119,7 +126,7 @@ func (s *Spool) applyCommit(r *pb.ReplicateRequest, primary bool) pb.ReplicateRe
119126
if r.Proposal.End > s.Fragment.End+s.delta ||
120127
(r.Proposal.End == s.Fragment.End && r.Proposal.ContentLength() == 0) {
121128

122-
if s.compressor != nil {
129+
if s.compressor != nil || primary && s.CompressionCodec != pb.CompressionCodec_NONE {
123130
s.finishCompression()
124131
}
125132
if s.ContentLength() != 0 {
@@ -166,7 +173,7 @@ func (s *Spool) applyCommit(r *pb.ReplicateRequest, primary bool) pb.ReplicateRe
166173
spoolCommitsTotal.Inc()
167174
spoolCommitBytesTotal.Add(float64(s.delta))
168175

169-
if primary && s.CompressionCodec != pb.CompressionCodec_NONE {
176+
if primary && s.CompressionCodec != pb.CompressionCodec_NONE && int(r.Proposal.End-s.compressedTo) >= compressionBatchSize {
170177
s.compressThrough(r.Proposal.End)
171178
}
172179
s.Fragment.Fragment = *r.Proposal
@@ -233,6 +240,7 @@ func (s *Spool) compressThrough(end int64) {
233240
if s.CompressionCodec == pb.CompressionCodec_NONE {
234241
panic("expected CompressionCodec != NONE")
235242
}
243+
236244
var err error
237245

238246
var buf = bufferPool.Get().([]byte)
@@ -241,9 +249,15 @@ func (s *Spool) compressThrough(end int64) {
241249
// Garden path: we've already compressed all content of the current Fragment,
242250
// and now incrementally compress through |end|.
243251
if s.compressor != nil {
244-
var offset, delta = s.Fragment.ContentLength(), end - s.Fragment.End
252+
var offset, delta = s.compressedTo, end - s.compressedTo
245253

246254
if _, err = io.CopyBuffer(s.compressor, io.NewSectionReader(s.File, offset, delta), buf); err == nil {
255+
if s.CompressionCodec == pb.CompressionCodec_GZIP || s.CompressionCodec == pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION {
256+
err = s.compressor.Close()
257+
}
258+
}
259+
if err == nil {
260+
s.compressedTo = end
247261
return // Done.
248262
}
249263
err = fmt.Errorf("while incrementally compressing: %s", err)
@@ -281,7 +295,15 @@ func (s *Spool) compressThrough(end int64) {
281295
s.compressor = nil
282296
continue
283297
}
298+
if s.CompressionCodec == pb.CompressionCodec_GZIP || s.CompressionCodec == pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION {
299+
if err = s.compressor.Close(); err != nil {
300+
err = fmt.Errorf("flushing gzip batch compressor: %s", err)
301+
s.compressor = nil
302+
continue
303+
}
304+
}
284305

306+
s.compressedTo = end
285307
break // Success.
286308
}
287309
}

broker/fragment/spool_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fragment
22

33
import (
4+
"bytes"
45
"errors"
56
"io"
67
"testing"
@@ -117,6 +118,102 @@ func TestCompressionNotPrimary(t *testing.T) {
117118
contentString(t, obv.completes[0], pb.CompressionCodec_GZIP))
118119
}
119120

121+
func TestGzipBatcherMultipleMembers(t *testing.T) {
122+
var origBatchSize = compressionBatchSize
123+
compressionBatchSize = 10
124+
defer func() { compressionBatchSize = origBatchSize }()
125+
126+
var obv testSpoolObserver
127+
var spool = NewSpool("a/journal", &obv)
128+
129+
var resp, err = spool.Apply(&pb.ReplicateRequest{
130+
Proposal: &pb.Fragment{
131+
Journal: "a/journal",
132+
Begin: 0,
133+
End: 0,
134+
CompressionCodec: pb.CompressionCodec_GZIP,
135+
},
136+
Registers: &regEmpty,
137+
}, true)
138+
require.NoError(t, err)
139+
require.Equal(t, pb.Status_OK, resp.Status)
140+
141+
// Commit some data. The compression batch size has been artificially
142+
// lowered so the first commit is compressed, the second commit is buffered,
143+
// the third commit triggers a second member, and the fourth commit triggers
144+
// a third member.
145+
for _, req := range []pb.ReplicateRequest{
146+
{Content: []byte("first write ")},
147+
{Content: []byte("second ")},
148+
{Content: []byte("third ")},
149+
{Content: []byte("fourth write ")},
150+
} {
151+
var resp, err = spool.Apply(&req, true)
152+
require.NoError(t, err)
153+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
154+
155+
var proposal = spool.Next()
156+
resp, err = spool.Apply(&pb.ReplicateRequest{Proposal: &proposal}, true)
157+
require.NoError(t, err)
158+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
159+
}
160+
161+
// Complete the spool.
162+
resp, err = spool.Apply(&pb.ReplicateRequest{Proposal: &pb.Fragment{
163+
Journal: "a/journal",
164+
Begin: 12 + 7 + 6 + 13,
165+
End: 12 + 7 + 6 + 13,
166+
CompressionCodec: pb.CompressionCodec_GZIP,
167+
},
168+
Registers: &regEmpty,
169+
}, true)
170+
require.NoError(t, err)
171+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
172+
173+
require.Len(t, obv.commits, 4)
174+
require.Len(t, obv.completes, 1)
175+
require.NotNil(t, obv.completes[0].compressedFile)
176+
require.NotEqual(t, int64(0), obv.completes[0].compressedLength)
177+
178+
var expected = "first write second third fourth write "
179+
var actual = contentString(t, obv.completes[0], pb.CompressionCodec_GZIP)
180+
require.Equal(t, expected, actual)
181+
182+
// Decompress and verify each member.
183+
var parts []string
184+
var compressedData = make([]byte, obv.completes[0].compressedLength)
185+
_, err = obv.completes[0].compressedFile.ReadAt(compressedData, 0)
186+
require.NoError(t, err)
187+
var gzipHeader = []byte{0x1f, 0x8b}
188+
for start := bytes.Index(compressedData, gzipHeader); start != -1; {
189+
var next = bytes.Index(compressedData[start+len(gzipHeader):], gzipHeader)
190+
var end int
191+
if next == -1 {
192+
end = len(compressedData)
193+
} else {
194+
end = start + len(gzipHeader) + next
195+
}
196+
197+
var memberData = compressedData[start:end]
198+
var reader, readerErr = codecs.NewCodecReader(bytes.NewReader(memberData), pb.CompressionCodec_GZIP)
199+
require.NoError(t, readerErr)
200+
var decompressed, readErr = io.ReadAll(reader)
201+
require.NoError(t, readErr)
202+
require.NoError(t, reader.Close())
203+
204+
parts = append(parts, string(decompressed))
205+
if next == -1 {
206+
break
207+
}
208+
start = end
209+
}
210+
211+
require.Len(t, parts, 3)
212+
require.Equal(t, "first write ", parts[0])
213+
require.Equal(t, "second third ", parts[1])
214+
require.Equal(t, "fourth write ", parts[2])
215+
}
216+
120217
func TestRejectRollBeforeCurrentEnd(t *testing.T) {
121218
var obv testSpoolObserver
122219
var spool = NewSpool("a/journal", &obv)

0 commit comments

Comments
 (0)