
zhaozhenhang
V1
2022/09/24阅读:18主题:默认主题
从零实现一个TSDB(三)
实现刷盘操作
之前我们实现了将chan中的rows封装成segment然后存储进入内存List结构,现在来学习下如何进行异步刷盘操作。
方法 writeCodeSegment()
-
判断是否是冷数据 -
创建一个dirname,然后将当前segment刷入磁盘 -
刷盘后打开该写入磁盘文件的fd句柄,通过mmap的方式打开 -
构造出一个diskSegment存储进入内存有序List,方便查询
tsdb.go
func (db *TSDB) writeColdSegment() (Segment, error) {
db.mutex.Lock()
defer db.mutex.Unlock()
if db.segments.head.Frozen() {
head := db.segments.head
go func() {
db.wait.Add(1)
defer db.wait.Done()
db.segments.Add(head)
startTime := time.Now()
dirname := makeDirName(head.MinTs(), head.MaxTs())
if err := writeToDisk(head.(*memtable)); err != nil {
logrus.Errorf("faild to flush data to disk, %v", err)
return
}
filename := path.Join(dirname, "data")
mmapFile, err := OpenMMapFile(filename)
if err != nil {
logrus.Errorf("failed to make a mmap file %s, %v", filename, err)
return
}
// 将diskSegment添加进入tree,方便查询
err = db.segments.Replace(head, newDiskSegment(mmapFile, dirname, head.MinTs(), head.MaxTs()))
if err != nil {
logrus.Errorf("add diskSegment into in list error: %v", err)
return
}
logrus.Infof("write file %s take: %v", filename, time.Since(startTime))
}()
db.segments.head = newMemtable()
}
return db.segments.head, nil
}
核心方法 writeToDisk()
可以看到,核心方法就是刷盘的writeToDisk()
步骤:
-
将segment编码成为两个部分,第一部分就是我们要存储的data数据,第二部分就是meta数据,也就是开头我们看到的meta.json数据,可以告知我们该segment文件中存储的时间区间,row条数等等 -
进行写文件,分别将data和meta.json的数据写入data文件和meta文件
func writeToDisk(segment *memtable) error {
dataBytes, descBytes, err := segment.Marshal()
if err != nil {
return fmt.Errorf("faild to marshal segment: %s", err.Error())
}
writeFile := func(file string, data []byte) error {
if isFileExist(file) {
return fmt.Errorf("%s file is already exist", file)
}
fd, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return nil
}
defer fd.Close()
_, err = fd.Write(data)
return err
}
dirname := makeDirName(segment.MinTs(), segment.MaxTs())
mkdir(dirname)
if err = writeFile(path.Join(dirname, "data"), dataBytes); err != nil {
return err
}
if err = writeFile(path.Join(dirname, "meta"), descBytes); err != nil {
return err
}
return nil
}
其核心就是Marshal()
方法,将segment生成对应的数据按照指定的格式存储在磁盘上面。
Marshal方法
将数据分成两大块:
-
data:生成data文件的数据
data数据大概分成这么几块儿:
-
series数据 ① -
meta数据 ②
-
-
desc:生成meta.json的数据(这儿叫desc因为该数据更像是该segment文件的描述信息,而非一个完整的元信息)
① 生成步骤大概这样
遍历当前segment,之前我们在进行将row插入memtable时可以看到,memtable中的segment其实是一个sync.Map,key就是sid(row.ID()),value就是对应的series结构 将key(sid)从outdated中获取,outdated其实在我们进行将row插入memtable时,outdated就是一个map,key就是sid,value就是对应内存的有序list节点。 然后判断,如果list存在,就将其遍历出所有的Point数据,然后将Point数据排序后合并成一个[]Point,并将其添加进入 tsStore
中,其实就是添加到go-tsz的series中去最后将该数据压缩,然后写入dataBuf,这样我们的data数据就写入成功了
② 生成步骤大概这样
当data数据根据sid维度写入写入dataBuf后,我们都会构造一个 metaSeries
信息,将其更新进入meta.series
中 然后我们遍历indexMap
结构,该结构也是在将row写入memtable时,保存的一个索引,key是:labelName,value是:sidList,将其遍历拼接进入一个[]labelIndex中临时存储。最后将其更新到meta.series
中去然后讲meta压缩,同时生成该segment的描述,然后将meta的压缩数据存储进入dataBuf中去 最后将data长度和meta长度存储进入dataBuf,这样,dataBuf、descBuf就生成了
memtable.go
func (m *memtable) Marshal() ([]byte, []byte, error) {
sidList := make(map[string]uint32)
startOffset := 0
size := 0
dataBuf := make([]byte, 0)
dataBuf = append(dataBuf, make([]byte, uint64Size>>1)...)
meta := Metadata{
MinTimestamp: m.minTimestamp,
MaxTimestamp: m.maxTimestamp,
}
m.segment.Range(func(key, value any) bool {
seriesID := key.(string)
sidList[seriesID] = uint32(size)
size++
series := value.(*memSeries)
meta.SeriesIDRelatedLabels = append(meta.SeriesIDRelatedLabels, series.labels)
m.outdatedMutex.RLock()
listValue, ok := m.outdated[seriesID]
m.outdatedMutex.RUnlock()
var dataBytes []byte
if ok {
dataBytes = DoCompress(series.MergeOutdatedList(listValue).Bytes())
} else {
dataBytes = DoCompress(series.Bytes())
}
dataBuf = append(dataBuf, dataBytes...)
endOffset := startOffset + len(dataBytes)
meta.Series = append(meta.Series, metaSeries{
Sid: key.(string),
StartOffset: uint64(startOffset),
EndOffset: uint64(endOffset),
})
startOffset = endOffset
return true
})
labelIndex := make([]seriesWithLabel, 0)
m.indexMap.Range(func(key string, value *memtableSidList) {
list := make([]uint32, 0)
for _, sid := range value.List() {
list = append(list, sidList[sid])
}
sort.Slice(list, func(i, j int) bool {
return list[i] < list[j]
})
labelIndex = append(labelIndex, seriesWithLabel{
Name: key,
Sids: list,
})
})
meta.Labels = labelIndex
metaBytes, err := MarshalMeta(meta)
if err != nil {
return nil, nil, err
}
metaLen := len(metaBytes)
desc := &Desc{
SeriesCount: m.seriesCount,
DataPointsCount: m.dataPointsCount,
MaxTimestamp: m.maxTimestamp,
MinTimestamp: m.minTimestamp,
}
descBytes, err := json.MarshalIndent(desc, "", "\t")
dataLen := len(dataBuf) - (uint64Size >> 1)
dataBuf = append(dataBuf, metaBytes...)
newEncodingBuf := newEncodingBuf()
newEncodingBuf.MarshalUint64(uint64(dataLen))
dataLenBytes := newEncodingBuf.Bytes()
copy(dataBuf[:uint64Size], dataLenBytes[:uint64Size])
newEncodingBuf.Reset()
newEncodingBuf.MarshalUint64(uint64(metaLen))
metaLenBytes := newEncodingBuf.Bytes()
copy(dataBuf[uint64Size:(uint64Size*2)], metaLenBytes[:uint64Size])
return dataBuf, descBytes, nil
}
然后将其对应的数据写入文件 memtable.go
func writeToDisk(segment *memtable) error {
dataBytes, descBytes, err := segment.Marshal()
if err != nil {
return fmt.Errorf("faild to marshal segment: %s", err.Error())
}
writeFile := func(file string, data []byte) error {
if isFileExist(file) {
return fmt.Errorf("%s file is already exist", file)
}
fd, err := os.OpenFile(file, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
return nil
}
defer fd.Close()
_, err = fd.Write(data)
return err
}
dirname := makeDirName(segment.MinTs(), segment.MaxTs())
mkdir(dirname)
if err = writeFile(path.Join(dirname, "data"), dataBytes); err != nil {
return err
}
if err = writeFile(path.Join(dirname, "meta"), descBytes); err != nil {
return err
}
return nil
}
本篇就到这儿,下篇就实现以下 基于meta的解压缩,将文件数据解析出来成为metadata
数据~
github:https://github.com/azhsmesos/tsdb
作者介绍

zhaozhenhang
V1
快手