Skip to content

Commit

Permalink
Lint and clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Sep 11, 2024
1 parent ff4925a commit 513e33b
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 144 deletions.
132 changes: 0 additions & 132 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,77 +83,11 @@ func (s DedicatedColumnScope) ToTempopb() (tempopb.DedicatedColumn_Scope, error)
}
}

type compactedBlockMeta struct {
CompactedTime time.Time `json:"compactedTime"`
BlockMeta
}

// func (b *CompactedBlockMeta) ToBackendV1Proto() (*CompactedBlockMeta, error) {
// bm, err := b.BlockMeta.ToBackendV1Proto()
// if err != nil {
// return nil, err
// }
//
// return &CompactedBlockMeta{
// BlockMeta: *bm,
// CompactedTime: b.CompactedTime,
// }, nil
// }
//
// func (b *CompactedBlockMeta) FromBackendV1Proto(pb *CompactedBlockMeta) error {
// err := b.BlockMeta.FromBackendV1Proto(&pb.BlockMeta)
// if err != nil {
// return err
// }
//
// b.CompactedTime = pb.CompactedTime
//
// return nil
// }

const (
DefaultReplicationFactor = 0 // Replication factor for blocks from the ingester. This is the default value to indicate RF3.
MetricsGeneratorReplicationFactor = 1
)

// The BlockMeta data that is stored for each individual block.
type blockMeta struct {
// A Version that indicates the block format. This includes specifics of how the indexes and data is stored.
Version string `json:"format"`
// BlockID is a unique identifier of the block.
BlockID google_uuid.UUID `json:"blockID"`
// A TenantID that defines the tenant to which this block belongs.
TenantID string `json:"tenantID"`
// StartTime roughly matches when the first obj was written to this block. It is used to determine block.
// age for different purposes (caching, etc)
StartTime time.Time `json:"startTime"`
// EndTime roughly matches to the time the last obj was written to this block. Is currently mostly meaningless.
EndTime time.Time `json:"endTime"`
// TotalObjects counts the number of objects in this block.
TotalObjects int `json:"totalObjects"`
// The Size in bytes of the block.
Size uint64 `json:"size"`
// CompactionLevel defines the number of times this block has been compacted.
CompactionLevel uint8 `json:"compactionLevel"`
// Encoding and compression format (used only in v2)
Encoding Encoding `json:"encoding"`
// IndexPageSize holds the size of each index page in bytes (used only in v2)
IndexPageSize uint32 `json:"indexPageSize"`
// TotalRecords holds the total Records stored in the index file (used only in v2)
TotalRecords uint32 `json:"totalRecords"`
// DataEncoding is tracked by tempodb and indicates the way the bytes are encoded.
DataEncoding string `json:"dataEncoding"`
// BloomShardCount represents the number of bloom filter shards.
BloomShardCount uint16 `json:"bloomShards"`
// FooterSize contains the size of the footer in bytes (used by parquet)
FooterSize uint32 `json:"footerSize"`
// DedicatedColumns configuration for attributes (used by vParquet3)
DedicatedColumns DedicatedColumns `json:"dedicatedColumns,omitempty"`
// ReplicationFactor is the number of times the data written in this block has been replicated.
// It's left unset if replication factor is 3. Default is 0 (RF3).
ReplicationFactor uint8 `json:"replicationFactor,omitempty"`
}

// DedicatedColumn contains the configuration for a single attribute with the given name that should
// be stored in a dedicated column instead of the generic attribute column.
type DedicatedColumn struct {
Expand Down Expand Up @@ -257,72 +191,6 @@ func (b *BlockMeta) DedicatedColumnsHash() uint64 {
return b.DedicatedColumns.Hash()
}

// func (b *BlockMeta) ToBackendV1Proto() (*BlockMeta, error) {
// blockID, err := b.BlockID.MarshalText()
// if err != nil {
// return nil, err
// }
//
// m := &BlockMeta{
// Version: b.Version,
// BlockId: blockID,
// TenantId: b.TenantID,
// StartTime: b.StartTime,
// EndTime: b.EndTime,
// TotalObjects: int32(b.TotalObjects),
// Size_: b.Size,
// CompactionLevel: uint32(b.CompactionLevel),
// Encoding: uint32(b.Encoding),
// IndexPageSize: b.IndexPageSize,
// TotalRecords: b.TotalRecords,
// DataEncoding: b.DataEncoding,
// BloomShardCount: uint32(b.BloomShardCount),
// FooterSize: b.FooterSize,
// ReplicationFactor: uint32(b.ReplicationFactor),
// }
//
// dc, err := b.DedicatedColumns.ToTempopb()
// if err != nil {
// return nil, err
// }
// m.DedicatedColumns = dc
//
// return m, nil
// }

// func (b *BlockMeta) FromBackendV1Proto(pb *BlockMeta) error {
// blockID, err := uuid.ParseBytes(pb.BlockId)
// if err != nil {
// return err
// }
//
// b.Version = pb.Version
// b.BlockID = blockID
// b.TenantID = pb.TenantId
// b.StartTime = pb.StartTime
// b.EndTime = pb.EndTime
// b.TotalObjects = int(pb.TotalObjects)
// b.Size = pb.Size_
// b.CompactionLevel = uint8(pb.CompactionLevel)
// b.Encoding = Encoding(pb.Encoding)
// b.IndexPageSize = pb.IndexPageSize
// b.TotalRecords = pb.TotalRecords
// b.DataEncoding = pb.DataEncoding
// b.BloomShardCount = uint16(pb.BloomShardCount)
// b.FooterSize = pb.FooterSize
// b.ReplicationFactor = uint8(pb.ReplicationFactor)
// dcs, err := DedicatedColumnsFromTempopb(pb.DedicatedColumns)
// if err != nil {
// return err
// }
//
// if len(dcs) > 0 {
// b.DedicatedColumns = dcs
// }
//
// return nil
// }

func DedicatedColumnsFromTempopb(tempopbCols []*tempopb.DedicatedColumn) (DedicatedColumns, error) {
cols := make(DedicatedColumns, 0, len(tempopbCols))

Expand Down
5 changes: 3 additions & 2 deletions tempodb/backend/test/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ func BenchmarkIndexLoad(b *testing.B) {
require.NoError(b, err)

w := backend.NewWriter(rw)
w.WriteTenantIndex(ctx, tenant, blockMeta, nil)
err = w.WriteTenantIndex(ctx, tenant, blockMeta, nil)
require.NoError(b, err)

r := backend.NewReader(rr)
b.ResetTimer()

for i := 0; i < b.N; i++ {
r.TenantIndex(ctx, tenant)
_, _ = r.TenantIndex(ctx, tenant)
}
}
1 change: 0 additions & 1 deletion tempodb/compaction_block_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

google_uuid "github.com/google/uuid"
"github.com/grafana/tempo/pkg/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/stretchr/testify/assert"
Expand Down
8 changes: 4 additions & 4 deletions tempodb/encoding/vparquet2/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
newMeta := &backend.BlockMeta{
BlockID: uuid.New(),
TenantID: inputs[0].TenantID,
CompactionLevel: uint32(nextCompactionLevel),
TotalObjects: int32(recordsPerBlock), // Just an estimate
CompactionLevel: nextCompactionLevel,
TotalObjects: recordsPerBlock, // Just an estimate
}

currentBlock = newStreamingBlock(ctx, &c.opts.BlockConfig, newMeta, r, w, tempo_io.NewBufferedWriter)
currentBlock.meta.CompactionLevel = uint32(nextCompactionLevel)
currentBlock.meta.CompactionLevel = nextCompactionLevel
newCompactedBlocks = append(newCompactedBlocks, currentBlock.meta)
}

Expand Down Expand Up @@ -190,7 +190,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
pool.Put(lowestObject)

// ship block to backend if done
if currentBlock.meta.TotalObjects >= int32(recordsPerBlock) {
if currentBlock.meta.TotalObjects >= recordsPerBlock {
currentBlockPtrCopy := currentBlock
currentBlockPtrCopy.meta.StartTime = minBlockStart
currentBlockPtrCopy.meta.EndTime = maxBlockEnd
Expand Down
10 changes: 5 additions & 5 deletions tempodb/encoding/vparquet3/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
totalRecords += blockMeta.TotalObjects

if blockMeta.CompactionLevel > compactionLevel {
compactionLevel = uint32(blockMeta.CompactionLevel)
compactionLevel = blockMeta.CompactionLevel
}

if blockMeta.StartTime.Before(minBlockStart) || minBlockStart.IsZero() {
Expand Down Expand Up @@ -158,14 +158,14 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
newMeta := &backend.BlockMeta{
BlockID: uuid.New(),
TenantID: inputs[0].TenantID,
CompactionLevel: uint32(nextCompactionLevel),
TotalObjects: int32(recordsPerBlock), // Just an estimate
CompactionLevel: nextCompactionLevel,
TotalObjects: recordsPerBlock, // Just an estimate
ReplicationFactor: inputs[0].ReplicationFactor,
DedicatedColumns: inputs[0].DedicatedColumns,
}

currentBlock = newStreamingBlock(ctx, &c.opts.BlockConfig, newMeta, r, w, tempo_io.NewBufferedWriter)
currentBlock.meta.CompactionLevel = uint32(nextCompactionLevel)
currentBlock.meta.CompactionLevel = nextCompactionLevel
newCompactedBlocks = append(newCompactedBlocks, currentBlock.meta)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader,
pool.Put(lowestObject)

// ship block to backend if done
if currentBlock.meta.TotalObjects >= int32(recordsPerBlock) {
if currentBlock.meta.TotalObjects >= recordsPerBlock {
currentBlockPtrCopy := currentBlock
currentBlockPtrCopy.meta.StartTime = minBlockStart
currentBlockPtrCopy.meta.EndTime = maxBlockEnd
Expand Down

0 comments on commit 513e33b

Please sign in to comment.