zhaozhenhang

V1

2022/09/24阅读:18主题:默认主题

从零实现一个TSDB(三)

实现刷盘操作

之前我们实现了将chan中的rows封装成segment然后存储进入内存List结构,现在来学习下如何进行异步刷盘操作。

方法 writeCodeSegment()

  1. 判断是否是冷数据
  2. 创建一个dirname,然后将当前segment刷入磁盘
  3. 刷盘后打开该写入磁盘文件的fd句柄,通过mmap的方式打开
  4. 构造出一个diskSegment存储进入内存有序List,方便查询

tsdb.go

func (db *TSDB) writeColdSegment() (Segment, error) {
 db.mutex.Lock()
 defer db.mutex.Unlock()
 if db.segments.head.Frozen() {
  head := db.segments.head
  go func() {
   db.wait.Add(1)
   defer db.wait.Done()
   db.segments.Add(head)
   startTime := time.Now()
   dirname := makeDirName(head.MinTs(), head.MaxTs())
   if err := writeToDisk(head.(*memtable)); err != nil {
    logrus.Errorf("faild to flush data to disk, %v", err)
    return
   }
   filename := path.Join(dirname, "data")
   mmapFile, err := OpenMMapFile(filename)
   if err != nil {
    logrus.Errorf("failed to make a mmap file %s, %v", filename, err)
    return
   }
   // 将diskSegment添加进入tree,方便查询
   err = db.segments.Replace(head, newDiskSegment(mmapFile, dirname, head.MinTs(), head.MaxTs()))
   if err != nil {
    logrus.Errorf("add diskSegment into in list error: %v", err)
    return
   }
   logrus.Infof("write file %s take: %v", filename, time.Since(startTime))
  }()
  db.segments.head = newMemtable()
 }
 return db.segments.head, nil
}

核心方法 writeToDisk()

可以看到,核心方法就是刷盘的writeToDisk()

步骤:

  1. 将segment编码成为两个部分,第一部分就是我们要存储的data数据,第二部分就是meta数据,也就是开头我们看到的meta.json数据,可以告知我们该segment文件中存储的时间区间,row条数等等
  2. 进行写文件,分别将data和meta.json的数据写入data文件和meta文件
func writeToDisk(segment *memtable) error {
 dataBytes, descBytes, err := segment.Marshal()
 if err != nil {
  return fmt.Errorf("faild to marshal segment: %s", err.Error())
 }
 writeFile := func(file string, data []byte) error {
  if isFileExist(file) {
   return fmt.Errorf("%s file is already exist", file)
  }
  fd, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, os.ModePerm)
  if err != nil {
   return nil
  }
  defer fd.Close()
  _, err = fd.Write(data)
  return err
 }

 dirname := makeDirName(segment.MinTs(), segment.MaxTs())
 mkdir(dirname)

 if err = writeFile(path.Join(dirname, "data"), dataBytes); err != nil {
  return err
 }

 if err = writeFile(path.Join(dirname, "meta"), descBytes); err != nil {
  return err
 }
 return nil
}

其核心就是Marshal()方法,将segment生成对应的数据按照指定的格式存储在磁盘上面。

Marshal方法

将数据分成两大块:

  • data:生成data文件的数据

    data数据大概分成这么几块儿:

    • series数据 ①
    • meta数据 ②
  • desc:生成meta.json的数据(这儿叫desc因为该数据更像是该segment文件的描述信息,而非一个完整的元信息)

① 生成步骤大概这样

  • 遍历当前segment,之前我们在进行将row插入memtable时可以看到,memtable中的segment其实是一个sync.Map,key就是sid(row.ID()),value就是对应的series结构
  • 将key(sid)从outdated中获取,outdated其实在我们进行将row插入memtable时,outdated就是一个map,key就是sid,value就是对应内存的有序list节点。
  • 然后判断,如果list存在,就将其遍历出所有的Point数据,然后将Point数据排序后合并成一个[]Point,并将其添加进入tsStore中,其实就是添加到go-tsz的series中去
  • 最后将该数据压缩,然后写入dataBuf,这样我们的data数据就写入成功了

② 生成步骤大概这样

  • 当data数据根据sid维度写入写入dataBuf后,我们都会构造一个metaSeries信息,将其更新进入meta.series中 然后我们遍历indexMap结构,该结构也是在将row写入memtable时,保存的一个索引,key是:labelName,value是:sidList,将其遍历拼接进入一个[]labelIndex中临时存储。最后将其更新到meta.series中去
  • 然后讲meta压缩,同时生成该segment的描述,然后将meta的压缩数据存储进入dataBuf中去
  • 最后将data长度和meta长度存储进入dataBuf,这样,dataBuf、descBuf就生成了

memtable.go

func (m *memtable) Marshal() ([]byte, []byte, error) {
 sidList := make(map[string]uint32)
 startOffset := 0
 size := 0
 dataBuf := make([]byte, 0)
 dataBuf = append(dataBuf, make([]byte, uint64Size>>1)...)
 meta := Metadata{
  MinTimestamp: m.minTimestamp,
  MaxTimestamp: m.maxTimestamp,
 }

 m.segment.Range(func(key, value any) bool {
  seriesID := key.(string)
  sidList[seriesID] = uint32(size)
  size++
  series := value.(*memSeries)
  meta.SeriesIDRelatedLabels = append(meta.SeriesIDRelatedLabels, series.labels)
  m.outdatedMutex.RLock()
  listValue, ok := m.outdated[seriesID]
  m.outdatedMutex.RUnlock()

  var dataBytes []byte
  if ok {
   dataBytes = DoCompress(series.MergeOutdatedList(listValue).Bytes())
  } else {
   dataBytes = DoCompress(series.Bytes())
  }

  dataBuf = append(dataBuf, dataBytes...)
  endOffset := startOffset + len(dataBytes)
  meta.Series = append(meta.Series, metaSeries{
   Sid:         key.(string),
   StartOffset: uint64(startOffset),
   EndOffset:   uint64(endOffset),
  })
  startOffset = endOffset
  return true
 })
 labelIndex := make([]seriesWithLabel, 0)
 m.indexMap.Range(func(key string, value *memtableSidList) {
  list := make([]uint32, 0)
  for _, sid := range value.List() {
   list = append(list, sidList[sid])
  }
  sort.Slice(list, func(i, j int) bool {
   return list[i] < list[j]
  })
  labelIndex = append(labelIndex, seriesWithLabel{
   Name: key,
   Sids: list,
  })
 })
 meta.Labels = labelIndex
 metaBytes, err := MarshalMeta(meta)
 if err != nil {
  return nil, nil, err
 }
 metaLen := len(metaBytes)
 desc := &Desc{
  SeriesCount:     m.seriesCount,
  DataPointsCount: m.dataPointsCount,
  MaxTimestamp:    m.maxTimestamp,
  MinTimestamp:    m.minTimestamp,
 }

 descBytes, err := json.MarshalIndent(desc, """\t")
 dataLen := len(dataBuf) - (uint64Size >> 1)
 dataBuf = append(dataBuf, metaBytes...)
 newEncodingBuf := newEncodingBuf()

 newEncodingBuf.MarshalUint64(uint64(dataLen))
 dataLenBytes := newEncodingBuf.Bytes()
 copy(dataBuf[:uint64Size], dataLenBytes[:uint64Size])
 newEncodingBuf.Reset()

 newEncodingBuf.MarshalUint64(uint64(metaLen))
 metaLenBytes := newEncodingBuf.Bytes()
 copy(dataBuf[uint64Size:(uint64Size*2)], metaLenBytes[:uint64Size])
 return dataBuf, descBytes, nil
}

然后将其对应的数据写入文件 memtable.go

func writeToDisk(segment *memtable) error {
 dataBytes, descBytes, err := segment.Marshal()
 if err != nil {
  return fmt.Errorf("faild to marshal segment: %s", err.Error())
 }
 writeFile := func(file string, data []byte) error {
  if isFileExist(file) {
   return fmt.Errorf("%s file is already exist", file)
  }
  fd, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, os.ModePerm)
  if err != nil {
   return nil
  }
  defer fd.Close()
  _, err = fd.Write(data)
  return err
 }

 dirname := makeDirName(segment.MinTs(), segment.MaxTs())
 mkdir(dirname)

 if err = writeFile(path.Join(dirname, "data"), dataBytes); err != nil {
  return err
 }

 if err = writeFile(path.Join(dirname, "meta"), descBytes); err != nil {
  return err
 }
 return nil
}

本篇就到这儿,下篇就实现以下 基于meta的解压缩,将文件数据解析出来成为metadata数据~

github:https://github.com/azhsmesos/tsdb

分类:

后端

标签:

后端

作者介绍

zhaozhenhang
V1

快手