
zhaozhenhang
V1
2022/09/26阅读:15主题:默认主题
从零实现一个TSDB(五)
上篇我们实现了将整个冷数据进行刷盘的过程,其实刷盘时将meta数据进行编解码还没实现
注意:这儿说的meta并不是meta文件的数据,而是
data中的索引信息
meta编码 metadata.go
的Marshal()
方法
分为这么几步
-
将label对应的seriesIDList进行保存,同时保存其长度 -
保存该series的seriesID和对应的startTimestamp和endTimestamp -
保存每个series中的labelList(有序存储) -
保存minTs和maxTs -
最后保存一个 signature
,用于检查文件是否损坏
func (b *binaryMetaserializer) Marshal(meta Metadata) ([]byte, error) {
nowEncodingBuf := newEncodingBuf()
labelOrdered := make(map[string]int)
for index, labelToSids := range meta.Labels {
labelOrdered[labelToSids.Name] = index
nowEncodingBuf.MarshalUint16(uint16(len(labelToSids.Name)))
nowEncodingBuf.MarshalString(labelToSids.Name)
nowEncodingBuf.MarshalUint32(uint32(len(labelToSids.Sids)))
nowEncodingBuf.MarshalUint32(labelToSids.Sids...)
}
nowEncodingBuf.MarshalUint16(endBlock)
for index, series := range meta.Series {
nowEncodingBuf.MarshalUint16(uint16(len(series.Sid)))
nowEncodingBuf.MarshalString(series.Sid)
nowEncodingBuf.MarshalUint64(series.StartOffset, series.EndOffset)
labelList := meta.SeriesIDRelatedLabels[index]
nowEncodingBuf.MarshalUint32(uint32(labelList.Len()))
labelIndex := make([]uint32, 0, labelList.Len())
for _, labelName := range labelList {
labelIndex = append(labelIndex, uint32(labelOrdered[labelName.MarshalName()]))
}
sort.Slice(labelIndex, func(i, j int) bool {
return labelIndex[i] < labelIndex[j]
})
nowEncodingBuf.MarshalUint32(labelIndex...)
}
nowEncodingBuf.MarshalUint16(endBlock)
nowEncodingBuf.MarshalUint64(uint64(meta.MinTimestamp))
nowEncodingBuf.MarshalUint64(uint64(meta.MaxTimestamp))
nowEncodingBuf.MarshalString(signature)
return DoCompress(nowEncodingBuf.Bytes()), nil
}
接下来看看解码器UnmarshalMeta
大致过程其实就是对Marshal方法的一个复原
-
通过我们保存的signature来校验文件是否完整 -
通过额外的offset来进行消费位点的转移,实现解码出对应指针的数据并将其构造成为meta结构体
metadata.go
func (b *binaryMetaserializer) Unmarshal(data []byte, meta *Metadata) error {
data, err := DoDecompress(data)
if err != nil {
return fmt.Errorf("faild to decompress, err: %v", err)
}
if len(data) < len(signature) {
return fmt.Errorf("the data block is incomplete, data len: %d", len(data))
}
nowDecodingBuf := newDecodingBuf()
// 首先判断数据是否完整
if strings.EqualFold(nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):]), signature) {
return fmt.Errorf("the data block is incomplete, data: %s", nowDecodingBuf.UnmarshalString(data[len(data)-len(signature):]))
}
offset := 0
labels := make([]seriesWithLabel, 0)
for {
var labelName string
labelLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size])
offset += uint16Size
if labelLen == endBlock {
break
}
labelName = nowDecodingBuf.UnmarshalString(data[offset : offset+int(labelLen)])
offset += int(labelLen)
sidCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
sidList := make([]uint32, sidCount)
for i := 0; i < int(sidCount); i++ {
sidList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
}
labels = append(labels, seriesWithLabel{
Name: labelName,
Sids: sidList,
})
}
meta.Labels = labels
seriesList := make([]metaSeries, 0)
for {
series := metaSeries{}
sidLen := nowDecodingBuf.UnmarshalUint16(data[offset : offset+uint16Size])
offset += uint16Size
if sidLen == endBlock {
break
}
series.Sid = nowDecodingBuf.UnmarshalString(data[offset : offset+int(sidLen)])
offset += int(sidLen)
series.StartOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])
offset += uint64Size
series.EndOffset = nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size])
offset += uint64Size
labelCount := nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
labelList := make([]uint32, labelCount)
for i := 0; i < int(labelCount); i++ {
labelList[i] = nowDecodingBuf.UnmarshalUint32(data[offset : offset+uint32Size])
offset += uint32Size
}
series.Labels = labelList
seriesList = append(seriesList, series)
}
meta.Series = seriesList
meta.MinTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]))
offset += uint64Size
meta.MaxTimestamp = int64(nowDecodingBuf.UnmarshalUint64(data[offset : offset+uint64Size]))
offset += uint64Size
return nowDecodingBuf.err
}
以上就是存储的整个过程,接下来写查询,我们写第一个接口 QueryLabelValues()
根据labelName查询所有的labelValue
tsdb.go
-
从内存有序list上面迭代获取所有的节点 -
将node转化成为segment,其实就是meta数据 -
通过labelVs获取所有的value值
func (db *TSDB) QueryLabelValues(label string, start, end int64) []string {
temp := make(map[string]struct{})
for _, segment := range db.segments.Get(start, end) {
segment := segment.Load()
values := segment.QueryLabelValuse(label)
for i := 0; i < len(values); i++ {
temp[values[i]] = struct{}{}
}
}
ret := make([]string, 0, len(temp))
for key := range temp {
ret = append(ret, key)
}
sort.Strings(ret)
return ret
}
disk.go
的Load()
方法
-
获取当前diskSegment的句柄fd,创建DReader对象 -
通过DReader去读取当前文件数据,封装成meta返回
func (ds *diskSegment) Load() Segment {
if ds.load {
return ds
}
start := time.Now()
reader := bytes.NewReader(ds.dataFd.Bytes())
dreader := &DReader{
reader: reader,
}
dataLen, metaLen, err := dreader.Read()
if err != nil {
logrus.Errorf("faild to read %s, err: %v", ds.dataFilename, err)
return ds
}
metaBytes := make([]byte, metaLen)
_, err = reader.ReadAt(metaBytes, uint64Size>>1+int64(dataLen))
if err != nil {
logrus.Errorf("faild to read %s, metaData error: %v", ds.dataFilename, err)
return ds
}
var meta Metadata
if err = UnmarshaMeta(metaBytes, &meta); err != nil {
logrus.Errorf("faild to unmarshal meta, error: %v", err)
return ds
}
for _, label := range meta.Labels {
key, value := UnmarshalLabelName(label.Name)
if !strings.EqualFold(key, "") && strings.EqualFold(value, "") {
ds.labelVs.Set(key, value)
}
}
ds.indexMap = newDiskIndexMap(meta.Labels)
ds.series = meta.Series
ds.load = true
logrus.Infof("load disk segment %s, time: %v", ds.dataFilename, time.Since(start))
return ds
}
func (dr *DReader) Read() (int64, int64, error) {
// 读取data长度
diskDataLen := make([]byte, uint64Size)
_, err := dr.reader.ReadAt(diskDataLen, 0)
if err != nil {
return 0, 0, err
}
nowDecodingBuf := newDecodingBuf()
// nowDecodingBuf.UnmarshalUint64(diskDataLen)
dataLen := nowDecodingBuf.UnmarshalUint64(diskDataLen)
// 读取 meta长度
diskDataLen = make([]byte, uint64Size)
_, err = dr.reader.ReadAt(diskDataLen, uint64Size)
if err != nil {
return 0, 0, err
}
nowDecodingBuf = newDecodingBuf()
metaLen := nowDecodingBuf.UnmarshalUint64(diskDataLen)
return int64(dataLen), int64(metaLen), nil
}
然后直接从segment的labelVs中通过label获取values,然后排序后返回
测试结果OK
=== RUN TestInsertRow
time="2022-09-26T21:16:23+08:00" level=info msg="data: [vm_node_azh0 vm_node_azh1 vm_node_azh2]\n"
--- PASS: TestInsertRow (1.06s)
github: https://github.com/azhsmesos/tsdb
作者介绍

zhaozhenhang
V1
快手