From fd14cf0cfa899553221f4efd1ee16840ab475bf9 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 14 May 2026 14:18:58 -0700 Subject: [PATCH 1/3] reuse grpc buffer in querier's store-gate way stream Signed-off-by: Essam Eldaly --- integration/grpc_store_gateway_bench_test.go | 144 +++++++++++++ pkg/querier/blocks_store_queryable.go | 26 +++ .../series_response_free_tracking_test.go | 193 ++++++++++++++++++ pkg/querier/series_response_release_test.go | 131 ++++++++++++ .../thanos/pkg/store/storepb/buf_ref.go | 32 +++ .../thanos/pkg/store/storepb/rpc.pb.go | 50 ++++- .../thanos/pkg/store/storepb/rpc.proto | 4 + 7 files changed, 579 insertions(+), 1 deletion(-) create mode 100644 integration/grpc_store_gateway_bench_test.go create mode 100644 pkg/querier/series_response_free_tracking_test.go create mode 100644 pkg/querier/series_response_release_test.go create mode 100644 vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go diff --git a/integration/grpc_store_gateway_bench_test.go b/integration/grpc_store_gateway_bench_test.go new file mode 100644 index 00000000000..af28f2afed5 --- /dev/null +++ b/integration/grpc_store_gateway_bench_test.go @@ -0,0 +1,144 @@ +//go:build requires_docker + +package integration + +import ( + "context" + "fmt" + "io" + "net" + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + // Import cortexpb to register the cortexCodec (buffer pooling). + _ "github.com/cortexproject/cortex/pkg/cortexpb" +) + +// mockStoreGatewayServer implements storepb.StoreServer and streams +// pre-built SeriesResponse messages for benchmarking. +type mockStoreGatewayServer struct { + storepb.UnimplementedStoreServer + responses []*storepb.SeriesResponse +} + +func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + for _, resp := range m.responses { + if err := srv.Send(resp); err != nil { + return err + } + } + return nil +} + +// BenchmarkGrpcStoreGatewayCalls benchmarks the full gRPC path for store gateway +// Series streaming with the cortexCodec and compression enabled. +// This is the store-gateway equivalent of BenchmarkGrpcCalls (which tests the ingester path). +// +// With SeriesResponse implementing ReleasableMessage, calling Free() after each Recv() +// returns the unmarshal buffer to the pool, reducing per-message allocations by ~32KB. +func BenchmarkGrpcStoreGatewayCalls(b *testing.B) { + // Build realistic SeriesResponse messages (large enough to trigger buffer pooling). + responses := make([]*storepb.SeriesResponse, 10) + for i := range responses { + responses[i] = createStoreGatewayBenchResponse(i) + } + + mock := &mockStoreGatewayServer{responses: responses} + + // Start gRPC server. + listener, err := net.Listen("tcp", "localhost:0") + require.NoError(b, err) + + gRPCServer := grpc.NewServer() + storepb.RegisterStoreServer(gRPCServer, mock) + + go func() { + if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { + b.Error(err) + } + }() + defer gRPCServer.Stop() + + // Connect client with compression (zstd via cortexCodec default call options). + conn, err := grpc.NewClient( + listener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(b, err) + defer conn.Close() + + client := storepb.NewStoreClient(conn) + + // freeable checks if the response supports Free() (i.e., has MessageWithBufRef embedded). + // This allows the benchmark to compile and run on both old builds (without Free) + // and new builds (with Free), so you can compare results via benchstat. + type freeable interface { + Free() + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + stream, err := client.Series(context.Background(), &storepb.SeriesRequest{}) + require.NoError(b, err) + + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + require.NoError(b, err) + if f, ok := interface{}(resp).(freeable); ok { + f.Free() + } + } + } +} + +// createStoreGatewayBenchResponse creates a realistic SeriesResponse with chunk data +// large enough to exceed the buffer pooling threshold (~1KB). +func createStoreGatewayBenchResponse(n int) *storepb.SeriesResponse { + lbls := labels.FromStrings( + "__name__", fmt.Sprintf("http_requests_total_%d", n), + "cluster", "us-east-1", + "namespace", "production", + "pod", fmt.Sprintf("web-server-deployment-7f8b9c6d4f-abc%02d", n), + "container", "nginx", + "instance", fmt.Sprintf("10.0.%d.%d:8080", n, n+1), + "job", "kubernetes-pods", + ) + + // Create chunk data (~4KB per chunk, simulating real store gateway responses). + chunkData := make([]byte, 4096) + for i := range chunkData { + chunkData[i] = byte((i + n) % 256) + } + + numChunks := 5 + n + chunks := make([]storepb.AggrChunk, numChunks) + for i := 0; i < numChunks; i++ { + chunks[i] = storepb.AggrChunk{ + MinTime: int64(i * 7200000), + MaxTime: int64((i + 1) * 7200000), + Raw: &storepb.Chunk{ + Type: storepb.Chunk_XOR, + Data: chunkData, + }, + } + } + + return &storepb.SeriesResponse{ + Result: &storepb.SeriesResponse_Series{ + Series: &storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(lbls), + Chunks: chunks, + }, + }, + } +} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index f9deca3b9bf..a89cd7e63cc 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -29,6 +29,7 @@ import ( "github.com/thanos-io/thanos/pkg/pool" thanosquery "github.com/thanos-io/thanos/pkg/query" "github.com/thanos-io/thanos/pkg/store/hintspb" + "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "go.uber.org/atomic" @@ -67,6 +68,10 @@ var ( errNoStoreGatewayAddress = errors.New("no store-gateway address configured") errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)" defaultAggrs = []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} + + // Compile-time check: SeriesResponse must satisfy cortexpb.ReleasableMessage + // so that cortexCodec registers unmarshal buffers for explicit lifecycle management. + _ cortexpb.ReleasableMessage = &storepb.SeriesResponse{} ) // BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. @@ -675,6 +680,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( myQueriedBlocks := []ulid.ULID(nil) processSeries := func(s *storepb.Series) error { + // Detach series data from the gRPC unmarshal buffer so that + // resp.Free() can safely return the buffer to the pool. + detachSeriesFromBuffer(s) mySeries = append(mySeries, s) // Add series fingerprint to query limiter; will return error if we are over the limit @@ -746,6 +754,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( // Response may either contain series, batch, warning or hints. if s := resp.GetSeries(); s != nil { if err := processSeries(s); err != nil { + resp.Free() return err } } @@ -753,6 +762,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if b := resp.GetBatch(); b != nil { for _, s := range b.Series { if err := processSeries(s); err != nil { + resp.Free() return err } } @@ -765,11 +775,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if h := resp.GetHints(); h != nil { hints := hintspb.SeriesResponseHints{} if err := types.UnmarshalAny(h, &hints); err != nil { + resp.Free() return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress()) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) if err != nil { + resp.Free() return errors.Wrapf(err, "failed to parse queried block IDs from received hints") } @@ -778,6 +790,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( seriesQueryStats.Merge(hints.QueryStats) } } + + resp.Free() } numSeries := len(mySeries) @@ -1189,6 +1203,18 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) { return res, nil } +// detachSeriesFromBuffer re-allocates label strings and chunk data byte slices +// so that the series no longer references the gRPC unmarshal buffer. This allows +// resp.Free() to safely return the buffer to the pool without causing use-after-free. +func detachSeriesFromBuffer(s *storepb.Series) { + labelpb.ReAllocZLabelsStrings(&s.Labels, true) + for i := range s.Chunks { + if s.Chunks[i].Raw != nil && len(s.Chunks[i].Raw.Data) > 0 { + s.Chunks[i].Raw.Data = append([]byte(nil), s.Chunks[i].Raw.Data...) + } + } +} + // countChunkBytes returns the size of the chunks making up the provided series in bytes func countChunkBytes(series ...*storepb.Series) (count int) { for _, s := range series { diff --git a/pkg/querier/series_response_free_tracking_test.go b/pkg/querier/series_response_free_tracking_test.go new file mode 100644 index 00000000000..c5ce4e4866a --- /dev/null +++ b/pkg/querier/series_response_free_tracking_test.go @@ -0,0 +1,193 @@ +package querier + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/user" + "google.golang.org/grpc/mem" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/util/limiter" +) + +type freeTracker struct { + freed atomic.Bool +} + +type singleFreePool struct { + tracker *freeTracker +} + +func (p *singleFreePool) Get(length int) *[]byte { + b := make([]byte, length) + return &b +} + +func (p *singleFreePool) Put(_ *[]byte) { + p.tracker.freed.Store(true) +} + +func registerTrackingBuffer(resp *storepb.SeriesResponse, tracker *freeTracker) { + data := make([]byte, 2048) + resp.RegisterBuffer(mem.NewBuffer(&data, &singleFreePool{tracker: tracker})) +} + +func TestFreeCalledOnAllResponses(t *testing.T) { + t.Parallel() + + const ( + metricName = "test_metric" + minT = int64(10) + maxT = int64(20) + ) + + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + + t.Run("success path", func(t *testing.T) { + t.Parallel() + + resp1 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "1"), + []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil, + ) + resp2 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "2"), + []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil, + ) + resp3 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "3"), + []cortexpb.Sample{{Value: 3, TimestampMs: minT}}, nil, nil, + ) + hintsResp := mockHintsResponse(block1, block2) + + tracker1 := &freeTracker{} + tracker2 := &freeTracker{} + tracker3 := &freeTracker{} + trackerHints := &freeTracker{} + registerTrackingBuffer(resp1, tracker1) + registerTrackingBuffer(resp2, tracker2) + registerTrackingBuffer(resp3, tracker3) + registerTrackingBuffer(hintsResp, trackerHints) + + ctx := user.InjectOrgID(context.Background(), "user-1") + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, 0)) + + stores := &blocksStoreSetMock{mockedResponses: []any{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{resp1, resp2, resp3, hintsResp}, + }: {block1, block2}, + }, + }} + + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything). + Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1}, + &bucketindex.Block{ID: block2}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + set := q.Select(ctx, true, &storage.SelectHints{Start: minT, End: maxT}, matchers...) + require.NoError(t, set.Err()) + for set.Next() { + _ = set.At() + } + + assert.True(t, tracker1.freed.Load(), "resp1 should be freed") + assert.True(t, tracker2.freed.Load(), "resp2 should be freed") + assert.True(t, tracker3.freed.Load(), "resp3 should be freed") + assert.True(t, trackerHints.freed.Load(), "hints resp should be freed") + }) + + t.Run("error path - series limit exceeded", func(t *testing.T) { + t.Parallel() + + resp1 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "1"), + []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil, + ) + resp2 := mockSeriesResponse( + labels.FromStrings(labels.MetricName, metricName, "series", "2"), + []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil, + ) + + tracker1 := &freeTracker{} + tracker2 := &freeTracker{} + registerTrackingBuffer(resp1, tracker1) + registerTrackingBuffer(resp2, tracker2) + + // Limit to 1 series so the second triggers an error. + ctx := user.InjectOrgID(context.Background(), "user-1") + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(1, 0, 0, 0)) + + stores := &blocksStoreSetMock{mockedResponses: []any{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{resp1, resp2}, + }: {block1, block2}, + }, + }} + + finder := &blocksFinderMock{} + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything). + Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1}, + &bucketindex.Block{ID: block2}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), + } + + set := q.Select(ctx, true, &storage.SelectHints{Start: minT, End: maxT}, matchers...) + require.Error(t, set.Err()) + + assert.True(t, tracker1.freed.Load(), "resp1 should be freed") + assert.True(t, tracker2.freed.Load(), "resp2 should be freed (caused the error)") + }) +} diff --git a/pkg/querier/series_response_release_test.go b/pkg/querier/series_response_release_test.go new file mode 100644 index 00000000000..afb484a1a31 --- /dev/null +++ b/pkg/querier/series_response_release_test.go @@ -0,0 +1,131 @@ +package querier + +import ( + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc/mem" + + "github.com/cortexproject/cortex/pkg/cortexpb" +) + +func TestSeriesResponseImplementsReleasableMessage(t *testing.T) { + var resp storepb.SeriesResponse + var _ cortexpb.ReleasableMessage = &resp + + assert.NotPanics(t, func() { resp.Free() }) +} + +func TestSeriesResponseFreeIdempotence(t *testing.T) { + t.Run("zero value", func(t *testing.T) { + var resp storepb.SeriesResponse + assert.NotPanics(t, func() { resp.Free() }) + assert.NotPanics(t, func() { resp.Free() }) + }) + + t.Run("with registered buffer", func(t *testing.T) { + var resp storepb.SeriesResponse + buf := mem.SliceBuffer(make([]byte, 64)) + resp.RegisterBuffer(buf) + + assert.NotPanics(t, func() { resp.Free() }) + assert.NotPanics(t, func() { resp.Free() }) + }) + + t.Run("with pooled buffer", func(t *testing.T) { + var resp storepb.SeriesResponse + b := make([]byte, 128) + resp.RegisterBuffer(mem.NewBuffer(&b, mem.NopBufferPool{})) + + assert.NotPanics(t, func() { resp.Free() }) + assert.NotPanics(t, func() { resp.Free() }) + }) +} + +func TestDetachSeriesFromBuffer_NoUseAfterFree(t *testing.T) { + t.Run("labels survive buffer overwrite", func(t *testing.T) { + // ZLabel strings are unsafe casts into the unmarshal buffer. + bufData := []byte("__name__\x00http_requests_total\x00cluster\x00us-east-1\x00") + series := &storepb.Series{ + Labels: []storepb.Label{ + {Name: unsafeString(bufData[0:8]), Value: unsafeString(bufData[9:28])}, + {Name: unsafeString(bufData[29:36]), Value: unsafeString(bufData[37:46])}, + }, + } + + detachSeriesFromBuffer(series) + + // Overwrite original buffer (simulates pool reuse). + for i := range bufData { + bufData[i] = 0xFF + } + + assert.Equal(t, "__name__", series.Labels[0].Name) + assert.Equal(t, "http_requests_total", series.Labels[0].Value) + assert.Equal(t, "cluster", series.Labels[1].Name) + assert.Equal(t, "us-east-1", series.Labels[1].Value) + }) + + t.Run("chunk data survives buffer overwrite", func(t *testing.T) { + chunkBuf := make([]byte, 4096) + for i := range chunkBuf { + chunkBuf[i] = byte(i % 256) + } + expected := make([]byte, len(chunkBuf)) + copy(expected, chunkBuf) + + series := &storepb.Series{ + Chunks: []storepb.AggrChunk{ + {MinTime: 1000, MaxTime: 2000, Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chunkBuf}}, + }, + } + + detachSeriesFromBuffer(series) + + for i := range chunkBuf { + chunkBuf[i] = 0xFF + } + + assert.Equal(t, expected, series.Chunks[0].Raw.Data) + }) + + t.Run("end-to-end with Free and buffer overwrite", func(t *testing.T) { + chunkData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} + series := &storepb.Series{ + Labels: []storepb.Label{{Name: "job", Value: "prometheus"}}, + Chunks: []storepb.AggrChunk{ + {Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chunkData}}, + }, + } + resp := &storepb.SeriesResponse{ + Result: &storepb.SeriesResponse_Series{Series: series}, + } + + poolBuf := make([]byte, 32768) + resp.RegisterBuffer(mem.NewBuffer(&poolBuf, mem.NopBufferPool{})) + + s := resp.GetSeries() + detachSeriesFromBuffer(s) + resp.Free() + + // Overwrite both the pool buffer and original chunk slice. + for i := range poolBuf { + poolBuf[i] = 0xDE + } + for i := range chunkData { + chunkData[i] = 0xAB + } + + assert.Equal(t, "job", s.Labels[0].Name) + assert.Equal(t, "prometheus", s.Labels[0].Value) + assert.Equal(t, []byte{0x01, 0x02, 0x03, 0x04, 0x05}, s.Chunks[0].Raw.Data) + }) +} + +// unsafeString creates a string sharing memory with the byte slice, +// simulating protobuf's zero-copy string unmarshal. +func unsafeString(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go new file mode 100644 index 00000000000..926099ae326 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go @@ -0,0 +1,32 @@ +package storepb + +import ( + "google.golang.org/grpc/mem" +) + +// MessageWithBufRef holds a reference to gRPC unmarshal buffers for explicit lifecycle management. +// It satisfies cortexpb.ReleasableMessage via structural typing. +type MessageWithBufRef struct { + bs mem.BufferSlice +} + +func (m *MessageWithBufRef) RegisterBuffer(buffer mem.Buffer) { + m.bs = append(m.bs, buffer) +} + +// Free releases all registered buffers. Idempotent and safe on zero-value. +func (m *MessageWithBufRef) Free() { + m.bs.Free() + m.bs = m.bs[:0] +} + +// Proto serialization no-ops (MessageWithBufRef has no wire representation). + +func (m *MessageWithBufRef) Size() int { return 0 } +func (m *MessageWithBufRef) Marshal() ([]byte, error) { return nil, nil } +func (m *MessageWithBufRef) MarshalTo(dAtA []byte) (int, error) { return 0, nil } +func (m *MessageWithBufRef) MarshalToSizedBuffer(dAtA []byte) (int, error) { + return 0, nil +} +func (m *MessageWithBufRef) Unmarshal(dAtA []byte) error { return nil } +func (m MessageWithBufRef) Equal(that MessageWithBufRef) bool { return true } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go index 0406eb990c4..d871f238617 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go @@ -440,7 +440,8 @@ type SeriesResponse struct { // *SeriesResponse_Warning // *SeriesResponse_Hints // *SeriesResponse_Batch - Result isSeriesResponse_Result `protobuf_oneof:"result"` + Result isSeriesResponse_Result `protobuf_oneof:"result"` + MessageWithBufRef `protobuf:"bytes,1001,opt,name=Ref,proto3,embedded=Ref" json:"Ref"` } func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } @@ -1602,6 +1603,18 @@ func (m *SeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + { + size := m.MessageWithBufRef.Size() + i -= size + if _, err := m.MessageWithBufRef.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x3e + i-- + dAtA[i] = 0xca if m.Result != nil { { size := m.Result.Size() @@ -2201,6 +2214,8 @@ func (m *SeriesResponse) Size() (n int) { if m.Result != nil { n += m.Result.Size() } + l = m.MessageWithBufRef.Size() + n += 2 + l + sovRpc(uint64(l)) return n } @@ -3848,6 +3863,39 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { } m.Result = &SeriesResponse_Batch{v} iNdEx = postIndex + case 1001: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MessageWithBufRef", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.MessageWithBufRef.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto index afc9a6ec382..20624e258b5 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto @@ -8,6 +8,7 @@ import "store/storepb/types.proto"; import "gogoproto/gogo.proto"; import "store/storepb/prompb/types.proto"; import "google/protobuf/any.proto"; +import "github.com/cortexproject/cortex/pkg/cortexpb/cortex.proto"; option go_package = "storepb"; @@ -208,6 +209,9 @@ message SeriesResponse { /// batch is an array of series so more than 1 series can be included in the response SeriesBatch batch = 4; } + + // Buffer reference for explicit memory management via cortexCodec. + cortexpb.MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/cortexpb.MessageWithBufRef", (gogoproto.nullable) = false]; } message LabelNamesRequest { From 532b96f82719ebc77d06eb2d98023a6740ee4c45 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 14 May 2026 14:32:44 -0700 Subject: [PATCH 2/3] lint Signed-off-by: Essam Eldaly --- pkg/querier/series_response_free_tracking_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/series_response_free_tracking_test.go b/pkg/querier/series_response_free_tracking_test.go index c5ce4e4866a..ea35f6a9fdc 100644 --- a/pkg/querier/series_response_free_tracking_test.go +++ b/pkg/querier/series_response_free_tracking_test.go @@ -2,7 +2,6 @@ package querier import ( "context" - "sync/atomic" "testing" "github.com/go-kit/log" @@ -15,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "google.golang.org/grpc/mem" "github.com/cortexproject/cortex/pkg/cortexpb" From 102cf1f74e3b8e49d84b4b2cf921e368a96adc40 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Tue, 19 May 2026 15:23:55 -0700 Subject: [PATCH 3/3] Scope down pr to just detach Signed-off-by: Essam Eldaly --- integration/grpc_store_gateway_bench_test.go | 144 ------------- pkg/querier/blocks_store_queryable.go | 16 +- .../series_response_free_tracking_test.go | 193 ------------------ pkg/querier/series_response_release_test.go | 131 ------------ .../thanos/pkg/store/storepb/buf_ref.go | 32 --- .../thanos/pkg/store/storepb/rpc.pb.go | 50 +---- .../thanos/pkg/store/storepb/rpc.proto | 4 - 7 files changed, 3 insertions(+), 567 deletions(-) delete mode 100644 integration/grpc_store_gateway_bench_test.go delete mode 100644 pkg/querier/series_response_free_tracking_test.go delete mode 100644 pkg/querier/series_response_release_test.go delete mode 100644 vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go diff --git a/integration/grpc_store_gateway_bench_test.go b/integration/grpc_store_gateway_bench_test.go deleted file mode 100644 index af28f2afed5..00000000000 --- a/integration/grpc_store_gateway_bench_test.go +++ /dev/null @@ -1,144 +0,0 @@ -//go:build requires_docker - -package integration - -import ( - "context" - "fmt" - "io" - "net" - "testing" - - "github.com/prometheus/prometheus/model/labels" - "github.com/stretchr/testify/require" - "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/store/storepb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - // Import cortexpb to register the cortexCodec (buffer pooling). - _ "github.com/cortexproject/cortex/pkg/cortexpb" -) - -// mockStoreGatewayServer implements storepb.StoreServer and streams -// pre-built SeriesResponse messages for benchmarking. -type mockStoreGatewayServer struct { - storepb.UnimplementedStoreServer - responses []*storepb.SeriesResponse -} - -func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - for _, resp := range m.responses { - if err := srv.Send(resp); err != nil { - return err - } - } - return nil -} - -// BenchmarkGrpcStoreGatewayCalls benchmarks the full gRPC path for store gateway -// Series streaming with the cortexCodec and compression enabled. -// This is the store-gateway equivalent of BenchmarkGrpcCalls (which tests the ingester path). -// -// With SeriesResponse implementing ReleasableMessage, calling Free() after each Recv() -// returns the unmarshal buffer to the pool, reducing per-message allocations by ~32KB. -func BenchmarkGrpcStoreGatewayCalls(b *testing.B) { - // Build realistic SeriesResponse messages (large enough to trigger buffer pooling). - responses := make([]*storepb.SeriesResponse, 10) - for i := range responses { - responses[i] = createStoreGatewayBenchResponse(i) - } - - mock := &mockStoreGatewayServer{responses: responses} - - // Start gRPC server. - listener, err := net.Listen("tcp", "localhost:0") - require.NoError(b, err) - - gRPCServer := grpc.NewServer() - storepb.RegisterStoreServer(gRPCServer, mock) - - go func() { - if err := gRPCServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { - b.Error(err) - } - }() - defer gRPCServer.Stop() - - // Connect client with compression (zstd via cortexCodec default call options). - conn, err := grpc.NewClient( - listener.Addr().String(), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - require.NoError(b, err) - defer conn.Close() - - client := storepb.NewStoreClient(conn) - - // freeable checks if the response supports Free() (i.e., has MessageWithBufRef embedded). - // This allows the benchmark to compile and run on both old builds (without Free) - // and new builds (with Free), so you can compare results via benchstat. - type freeable interface { - Free() - } - - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - stream, err := client.Series(context.Background(), &storepb.SeriesRequest{}) - require.NoError(b, err) - - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } - require.NoError(b, err) - if f, ok := interface{}(resp).(freeable); ok { - f.Free() - } - } - } -} - -// createStoreGatewayBenchResponse creates a realistic SeriesResponse with chunk data -// large enough to exceed the buffer pooling threshold (~1KB). -func createStoreGatewayBenchResponse(n int) *storepb.SeriesResponse { - lbls := labels.FromStrings( - "__name__", fmt.Sprintf("http_requests_total_%d", n), - "cluster", "us-east-1", - "namespace", "production", - "pod", fmt.Sprintf("web-server-deployment-7f8b9c6d4f-abc%02d", n), - "container", "nginx", - "instance", fmt.Sprintf("10.0.%d.%d:8080", n, n+1), - "job", "kubernetes-pods", - ) - - // Create chunk data (~4KB per chunk, simulating real store gateway responses). - chunkData := make([]byte, 4096) - for i := range chunkData { - chunkData[i] = byte((i + n) % 256) - } - - numChunks := 5 + n - chunks := make([]storepb.AggrChunk, numChunks) - for i := 0; i < numChunks; i++ { - chunks[i] = storepb.AggrChunk{ - MinTime: int64(i * 7200000), - MaxTime: int64((i + 1) * 7200000), - Raw: &storepb.Chunk{ - Type: storepb.Chunk_XOR, - Data: chunkData, - }, - } - } - - return &storepb.SeriesResponse{ - Result: &storepb.SeriesResponse_Series{ - Series: &storepb.Series{ - Labels: labelpb.ZLabelsFromPromLabels(lbls), - Chunks: chunks, - }, - }, - } -} diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index a89cd7e63cc..6902fcde5eb 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -68,10 +68,6 @@ var ( errNoStoreGatewayAddress = errors.New("no store-gateway address configured") errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)" defaultAggrs = []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} - - // Compile-time check: SeriesResponse must satisfy cortexpb.ReleasableMessage - // so that cortexCodec registers unmarshal buffers for explicit lifecycle management. - _ cortexpb.ReleasableMessage = &storepb.SeriesResponse{} ) // BlocksStoreSet is the interface used to get the clients to query series on a set of blocks. @@ -680,8 +676,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( myQueriedBlocks := []ulid.ULID(nil) processSeries := func(s *storepb.Series) error { - // Detach series data from the gRPC unmarshal buffer so that - // resp.Free() can safely return the buffer to the pool. + // Detach series data from the gRPC unmarshal buffer so that it can be freed. detachSeriesFromBuffer(s) mySeries = append(mySeries, s) @@ -754,7 +749,6 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( // Response may either contain series, batch, warning or hints. if s := resp.GetSeries(); s != nil { if err := processSeries(s); err != nil { - resp.Free() return err } } @@ -762,7 +756,6 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if b := resp.GetBatch(); b != nil { for _, s := range b.Series { if err := processSeries(s); err != nil { - resp.Free() return err } } @@ -775,13 +768,11 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if h := resp.GetHints(); h != nil { hints := hintspb.SeriesResponseHints{} if err := types.UnmarshalAny(h, &hints); err != nil { - resp.Free() return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress()) } ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) if err != nil { - resp.Free() return errors.Wrapf(err, "failed to parse queried block IDs from received hints") } @@ -790,8 +781,6 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( seriesQueryStats.Merge(hints.QueryStats) } } - - resp.Free() } numSeries := len(mySeries) @@ -1204,8 +1193,7 @@ func convertBlockHintsToULIDs(hints []hintspb.Block) ([]ulid.ULID, error) { } // detachSeriesFromBuffer re-allocates label strings and chunk data byte slices -// so that the series no longer references the gRPC unmarshal buffer. This allows -// resp.Free() to safely return the buffer to the pool without causing use-after-free. +// so that the series no longer references the gRPC unmarshal buffer. func detachSeriesFromBuffer(s *storepb.Series) { labelpb.ReAllocZLabelsStrings(&s.Labels, true) for i := range s.Chunks { diff --git a/pkg/querier/series_response_free_tracking_test.go b/pkg/querier/series_response_free_tracking_test.go deleted file mode 100644 index ea35f6a9fdc..00000000000 --- a/pkg/querier/series_response_free_tracking_test.go +++ /dev/null @@ -1,193 +0,0 @@ -package querier - -import ( - "context" - "testing" - - "github.com/go-kit/log" - "github.com/oklog/ulid/v2" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/weaveworks/common/user" - "go.uber.org/atomic" - "google.golang.org/grpc/mem" - - "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" - "github.com/cortexproject/cortex/pkg/util/limiter" -) - -type freeTracker struct { - freed atomic.Bool -} - -type singleFreePool struct { - tracker *freeTracker -} - -func (p *singleFreePool) Get(length int) *[]byte { - b := make([]byte, length) - return &b -} - -func (p *singleFreePool) Put(_ *[]byte) { - p.tracker.freed.Store(true) -} - -func registerTrackingBuffer(resp *storepb.SeriesResponse, tracker *freeTracker) { - data := make([]byte, 2048) - resp.RegisterBuffer(mem.NewBuffer(&data, &singleFreePool{tracker: tracker})) -} - -func TestFreeCalledOnAllResponses(t *testing.T) { - t.Parallel() - - const ( - metricName = "test_metric" - minT = int64(10) - maxT = int64(20) - ) - - block1 := ulid.MustNew(1, nil) - block2 := ulid.MustNew(2, nil) - - t.Run("success path", func(t *testing.T) { - t.Parallel() - - resp1 := mockSeriesResponse( - labels.FromStrings(labels.MetricName, metricName, "series", "1"), - []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil, - ) - resp2 := mockSeriesResponse( - labels.FromStrings(labels.MetricName, metricName, "series", "2"), - []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil, - ) - resp3 := mockSeriesResponse( - labels.FromStrings(labels.MetricName, metricName, "series", "3"), - []cortexpb.Sample{{Value: 3, TimestampMs: minT}}, nil, nil, - ) - hintsResp := mockHintsResponse(block1, block2) - - tracker1 := &freeTracker{} - tracker2 := &freeTracker{} - tracker3 := &freeTracker{} - trackerHints := &freeTracker{} - registerTrackingBuffer(resp1, tracker1) - registerTrackingBuffer(resp2, tracker2) - registerTrackingBuffer(resp3, tracker3) - registerTrackingBuffer(hintsResp, trackerHints) - - ctx := user.InjectOrgID(context.Background(), "user-1") - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, 0)) - - stores := &blocksStoreSetMock{mockedResponses: []any{ - map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{ - remoteAddr: "1.1.1.1", - mockedSeriesResponses: []*storepb.SeriesResponse{resp1, resp2, resp3, hintsResp}, - }: {block1, block2}, - }, - }} - - finder := &blocksFinderMock{} - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything). - Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1}, - &bucketindex.Block{ID: block2}, - }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) - - q := &blocksStoreQuerier{ - minT: minT, - maxT: maxT, - finder: finder, - stores: stores, - consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), - logger: log.NewNopLogger(), - metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), - limits: &blocksStoreLimitsMock{}, - - storeGatewayConsistencyCheckMaxAttempts: 3, - } - - matchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), - } - - set := q.Select(ctx, true, &storage.SelectHints{Start: minT, End: maxT}, matchers...) - require.NoError(t, set.Err()) - for set.Next() { - _ = set.At() - } - - assert.True(t, tracker1.freed.Load(), "resp1 should be freed") - assert.True(t, tracker2.freed.Load(), "resp2 should be freed") - assert.True(t, tracker3.freed.Load(), "resp3 should be freed") - assert.True(t, trackerHints.freed.Load(), "hints resp should be freed") - }) - - t.Run("error path - series limit exceeded", func(t *testing.T) { - t.Parallel() - - resp1 := mockSeriesResponse( - labels.FromStrings(labels.MetricName, metricName, "series", "1"), - []cortexpb.Sample{{Value: 1, TimestampMs: minT}}, nil, nil, - ) - resp2 := mockSeriesResponse( - labels.FromStrings(labels.MetricName, metricName, "series", "2"), - []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil, - ) - - tracker1 := &freeTracker{} - tracker2 := &freeTracker{} - registerTrackingBuffer(resp1, tracker1) - registerTrackingBuffer(resp2, tracker2) - - // Limit to 1 series so the second triggers an error. - ctx := user.InjectOrgID(context.Background(), "user-1") - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(1, 0, 0, 0)) - - stores := &blocksStoreSetMock{mockedResponses: []any{ - map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{ - remoteAddr: "1.1.1.1", - mockedSeriesResponses: []*storepb.SeriesResponse{resp1, resp2}, - }: {block1, block2}, - }, - }} - - finder := &blocksFinderMock{} - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT, mock.Anything). - Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1}, - &bucketindex.Block{ID: block2}, - }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) - - q := &blocksStoreQuerier{ - minT: minT, - maxT: maxT, - finder: finder, - stores: stores, - consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), - logger: log.NewNopLogger(), - metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), - limits: &blocksStoreLimitsMock{}, - - storeGatewayConsistencyCheckMaxAttempts: 3, - } - - matchers := []*labels.Matcher{ - labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName), - } - - set := q.Select(ctx, true, &storage.SelectHints{Start: minT, End: maxT}, matchers...) - require.Error(t, set.Err()) - - assert.True(t, tracker1.freed.Load(), "resp1 should be freed") - assert.True(t, tracker2.freed.Load(), "resp2 should be freed (caused the error)") - }) -} diff --git a/pkg/querier/series_response_release_test.go b/pkg/querier/series_response_release_test.go deleted file mode 100644 index afb484a1a31..00000000000 --- a/pkg/querier/series_response_release_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package querier - -import ( - "testing" - "unsafe" - - "github.com/stretchr/testify/assert" - "github.com/thanos-io/thanos/pkg/store/storepb" - "google.golang.org/grpc/mem" - - "github.com/cortexproject/cortex/pkg/cortexpb" -) - -func TestSeriesResponseImplementsReleasableMessage(t *testing.T) { - var resp storepb.SeriesResponse - var _ cortexpb.ReleasableMessage = &resp - - assert.NotPanics(t, func() { resp.Free() }) -} - -func TestSeriesResponseFreeIdempotence(t *testing.T) { - t.Run("zero value", func(t *testing.T) { - var resp storepb.SeriesResponse - assert.NotPanics(t, func() { resp.Free() }) - assert.NotPanics(t, func() { resp.Free() }) - }) - - t.Run("with registered buffer", func(t *testing.T) { - var resp storepb.SeriesResponse - buf := mem.SliceBuffer(make([]byte, 64)) - resp.RegisterBuffer(buf) - - assert.NotPanics(t, func() { resp.Free() }) - assert.NotPanics(t, func() { resp.Free() }) - }) - - t.Run("with pooled buffer", func(t *testing.T) { - var resp storepb.SeriesResponse - b := make([]byte, 128) - resp.RegisterBuffer(mem.NewBuffer(&b, mem.NopBufferPool{})) - - assert.NotPanics(t, func() { resp.Free() }) - assert.NotPanics(t, func() { resp.Free() }) - }) -} - -func TestDetachSeriesFromBuffer_NoUseAfterFree(t *testing.T) { - t.Run("labels survive buffer overwrite", func(t *testing.T) { - // ZLabel strings are unsafe casts into the unmarshal buffer. - bufData := []byte("__name__\x00http_requests_total\x00cluster\x00us-east-1\x00") - series := &storepb.Series{ - Labels: []storepb.Label{ - {Name: unsafeString(bufData[0:8]), Value: unsafeString(bufData[9:28])}, - {Name: unsafeString(bufData[29:36]), Value: unsafeString(bufData[37:46])}, - }, - } - - detachSeriesFromBuffer(series) - - // Overwrite original buffer (simulates pool reuse). - for i := range bufData { - bufData[i] = 0xFF - } - - assert.Equal(t, "__name__", series.Labels[0].Name) - assert.Equal(t, "http_requests_total", series.Labels[0].Value) - assert.Equal(t, "cluster", series.Labels[1].Name) - assert.Equal(t, "us-east-1", series.Labels[1].Value) - }) - - t.Run("chunk data survives buffer overwrite", func(t *testing.T) { - chunkBuf := make([]byte, 4096) - for i := range chunkBuf { - chunkBuf[i] = byte(i % 256) - } - expected := make([]byte, len(chunkBuf)) - copy(expected, chunkBuf) - - series := &storepb.Series{ - Chunks: []storepb.AggrChunk{ - {MinTime: 1000, MaxTime: 2000, Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chunkBuf}}, - }, - } - - detachSeriesFromBuffer(series) - - for i := range chunkBuf { - chunkBuf[i] = 0xFF - } - - assert.Equal(t, expected, series.Chunks[0].Raw.Data) - }) - - t.Run("end-to-end with Free and buffer overwrite", func(t *testing.T) { - chunkData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} - series := &storepb.Series{ - Labels: []storepb.Label{{Name: "job", Value: "prometheus"}}, - Chunks: []storepb.AggrChunk{ - {Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chunkData}}, - }, - } - resp := &storepb.SeriesResponse{ - Result: &storepb.SeriesResponse_Series{Series: series}, - } - - poolBuf := make([]byte, 32768) - resp.RegisterBuffer(mem.NewBuffer(&poolBuf, mem.NopBufferPool{})) - - s := resp.GetSeries() - detachSeriesFromBuffer(s) - resp.Free() - - // Overwrite both the pool buffer and original chunk slice. - for i := range poolBuf { - poolBuf[i] = 0xDE - } - for i := range chunkData { - chunkData[i] = 0xAB - } - - assert.Equal(t, "job", s.Labels[0].Name) - assert.Equal(t, "prometheus", s.Labels[0].Value) - assert.Equal(t, []byte{0x01, 0x02, 0x03, 0x04, 0x05}, s.Chunks[0].Raw.Data) - }) -} - -// unsafeString creates a string sharing memory with the byte slice, -// simulating protobuf's zero-copy string unmarshal. -func unsafeString(b []byte) string { - return *(*string)(unsafe.Pointer(&b)) -} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go deleted file mode 100644 index 926099ae326..00000000000 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/buf_ref.go +++ /dev/null @@ -1,32 +0,0 @@ -package storepb - -import ( - "google.golang.org/grpc/mem" -) - -// MessageWithBufRef holds a reference to gRPC unmarshal buffers for explicit lifecycle management. -// It satisfies cortexpb.ReleasableMessage via structural typing. -type MessageWithBufRef struct { - bs mem.BufferSlice -} - -func (m *MessageWithBufRef) RegisterBuffer(buffer mem.Buffer) { - m.bs = append(m.bs, buffer) -} - -// Free releases all registered buffers. Idempotent and safe on zero-value. -func (m *MessageWithBufRef) Free() { - m.bs.Free() - m.bs = m.bs[:0] -} - -// Proto serialization no-ops (MessageWithBufRef has no wire representation). - -func (m *MessageWithBufRef) Size() int { return 0 } -func (m *MessageWithBufRef) Marshal() ([]byte, error) { return nil, nil } -func (m *MessageWithBufRef) MarshalTo(dAtA []byte) (int, error) { return 0, nil } -func (m *MessageWithBufRef) MarshalToSizedBuffer(dAtA []byte) (int, error) { - return 0, nil -} -func (m *MessageWithBufRef) Unmarshal(dAtA []byte) error { return nil } -func (m MessageWithBufRef) Equal(that MessageWithBufRef) bool { return true } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go index d871f238617..0406eb990c4 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go @@ -440,8 +440,7 @@ type SeriesResponse struct { // *SeriesResponse_Warning // *SeriesResponse_Hints // *SeriesResponse_Batch - Result isSeriesResponse_Result `protobuf_oneof:"result"` - MessageWithBufRef `protobuf:"bytes,1001,opt,name=Ref,proto3,embedded=Ref" json:"Ref"` + Result isSeriesResponse_Result `protobuf_oneof:"result"` } func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } @@ -1603,18 +1602,6 @@ func (m *SeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - { - size := m.MessageWithBufRef.Size() - i -= size - if _, err := m.MessageWithBufRef.MarshalTo(dAtA[i:]); err != nil { - return 0, err - } - i = encodeVarintRpc(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x3e - i-- - dAtA[i] = 0xca if m.Result != nil { { size := m.Result.Size() @@ -2214,8 +2201,6 @@ func (m *SeriesResponse) Size() (n int) { if m.Result != nil { n += m.Result.Size() } - l = m.MessageWithBufRef.Size() - n += 2 + l + sovRpc(uint64(l)) return n } @@ -3863,39 +3848,6 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { } m.Result = &SeriesResponse_Batch{v} iNdEx = postIndex - case 1001: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field MessageWithBufRef", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowRpc - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthRpc - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthRpc - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if err := m.MessageWithBufRef.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto index 20624e258b5..afc9a6ec382 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto @@ -8,7 +8,6 @@ import "store/storepb/types.proto"; import "gogoproto/gogo.proto"; import "store/storepb/prompb/types.proto"; import "google/protobuf/any.proto"; -import "github.com/cortexproject/cortex/pkg/cortexpb/cortex.proto"; option go_package = "storepb"; @@ -209,9 +208,6 @@ message SeriesResponse { /// batch is an array of series so more than 1 series can be included in the response SeriesBatch batch = 4; } - - // Buffer reference for explicit memory management via cortexCodec. - cortexpb.MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/cortexpb.MessageWithBufRef", (gogoproto.nullable) = false]; } message LabelNamesRequest {