use the []byte instead of the vector index and segment index encode/decode

This commit is contained in:
lion 2022-07-16 12:12:07 +08:00
parent 53fcdcfdbc
commit e638421ae2
3 changed files with 42 additions and 125 deletions

View File

@ -5,7 +5,6 @@
package xdb
import (
"encoding/binary"
"fmt"
"strings"
)
@ -27,67 +26,3 @@ func IndexPolicyFromString(str string) (IndexPolicy, error) {
return VectorIndexPolicy, fmt.Errorf("invalid policy '%s'", str)
}
}
const SegmentIndexBlockSize = 14
type SegmentIndexBlock struct {
StartIP uint32
EndIP uint32
DataLen uint16
DataPtr uint32
}
func SegmentIndexDecode(input []byte) (*SegmentIndexBlock, error) {
if len(input) < 14 {
return nil, fmt.Errorf("input is less than 14 bytes")
}
return &SegmentIndexBlock{
StartIP: binary.LittleEndian.Uint32(input),
EndIP: binary.LittleEndian.Uint32(input[4:]),
DataLen: binary.LittleEndian.Uint16(input[8:]),
DataPtr: binary.LittleEndian.Uint32(input[10:]),
}, nil
}
func (s *SegmentIndexBlock) Encode() []byte {
var buff = make([]byte, 14)
binary.LittleEndian.PutUint32(buff, s.StartIP)
binary.LittleEndian.PutUint32(buff[4:], s.EndIP)
binary.LittleEndian.PutUint16(buff[8:], s.DataLen)
binary.LittleEndian.PutUint32(buff[10:], s.DataPtr)
return buff
}
func (s *SegmentIndexBlock) String() string {
return fmt.Sprintf("{sip: %d, eip: %d, len: %d, ptr: %d}", s.StartIP, s.EndIP, s.DataLen, s.DataPtr)
}
// ------------
type VectorIndexBlock struct {
FirstPtr uint32
LastPtr uint32
}
func VectorIndexBlockDecode(input []byte) (*VectorIndexBlock, error) {
if len(input) < 8 {
return nil, fmt.Errorf("input should be not less then 8 bytes")
}
return &VectorIndexBlock{
FirstPtr: binary.LittleEndian.Uint32(input),
LastPtr: binary.LittleEndian.Uint32(input[4:]),
}, nil
}
func (v VectorIndexBlock) Encode() []byte {
var buff = make([]byte, 8)
binary.LittleEndian.PutUint32(buff, v.FirstPtr)
binary.LittleEndian.PutUint32(buff[4:], v.LastPtr)
return buff
}
func (v VectorIndexBlock) String() string {
return fmt.Sprintf("{FristPtr: %d, LastPtr: %d}", v.FirstPtr, v.LastPtr)
}

View File

@ -64,6 +64,7 @@ const HeaderInfoLength = 256
const VectorIndexRows = 256
const VectorIndexCols = 256
const VectorIndexSize = 8
const SegmentIndexSize = 14
const VectorIndexLength = VectorIndexRows * VectorIndexCols * VectorIndexSize
type Maker struct {
@ -73,7 +74,7 @@ type Maker struct {
indexPolicy IndexPolicy
segments []*Segment
regionPool map[string]uint32
vectorIndex [VectorIndexCols][VectorIndexRows]VectorIndexBlock
vectorIndex []byte
}
func NewMaker(policy IndexPolicy, srcFile string, dstFile string) (*Maker, error) {
@ -96,7 +97,7 @@ func NewMaker(policy IndexPolicy, srcFile string, dstFile string) (*Maker, error
indexPolicy: policy,
segments: []*Segment{},
regionPool: map[string]uint32{},
vectorIndex: [VectorIndexCols][VectorIndexRows]VectorIndexBlock{},
vectorIndex: make([]byte, VectorIndexLength),
}, nil
}
@ -208,12 +209,15 @@ func (m *Maker) Init() error {
// refresh the vector index of the specified ip
func (m *Maker) setVectorIndex(ip uint32, ptr uint32) {
var viBlock = &m.vectorIndex[(ip>>24)&0xFF][(ip>>16)&0xFF]
if viBlock.FirstPtr == 0 {
viBlock.FirstPtr = ptr
viBlock.LastPtr = ptr + SegmentIndexBlockSize
var il0 = (ip >> 24) & 0xFF
var il1 = (ip >> 16) & 0xFF
var idx = il0*VectorIndexCols*VectorIndexSize + il1*VectorIndexSize
var sPtr = binary.LittleEndian.Uint32(m.vectorIndex[idx:])
if sPtr == 0 {
binary.LittleEndian.PutUint32(m.vectorIndex[idx:], ptr)
binary.LittleEndian.PutUint32(m.vectorIndex[idx+4:], ptr+SegmentIndexSize)
} else {
viBlock.LastPtr = ptr + SegmentIndexBlockSize
binary.LittleEndian.PutUint32(m.vectorIndex[idx+4:], ptr+SegmentIndexSize)
}
}
@ -260,6 +264,7 @@ func (m *Maker) Start() error {
// 2, write the index block and cache the super index block
log.Printf("try to write the segment index block ... ")
var indexBuff = make([]byte, SegmentIndexSize)
var counter, startIndexPtr, endIndexPtr = 0, int64(-1), int64(-1)
for _, seg := range m.segments {
dataPtr, has := m.regionPool[seg.Region]
@ -267,6 +272,8 @@ func (m *Maker) Start() error {
return fmt.Errorf("missing ptr cache for region `%s`", seg.Region)
}
// @Note: data length should be the length of bytes.
// this works find cuz of the string feature (byte sequence) of golang.
var dataLen = len(seg.Region)
if dataLen < 1 {
// @TODO: could this even be a case ?
@ -281,14 +288,12 @@ func (m *Maker) Start() error {
return fmt.Errorf("seek to segment index block: %w", err)
}
var sIndex = &SegmentIndexBlock{
StartIP: s.StartIP,
EndIP: s.EndIP,
DataLen: uint16(dataLen),
DataPtr: dataPtr,
}
_, err = m.dstHandle.Write(sIndex.Encode())
// encode the segment index
binary.LittleEndian.PutUint32(indexBuff, s.StartIP)
binary.LittleEndian.PutUint32(indexBuff[4:], s.EndIP)
binary.LittleEndian.PutUint16(indexBuff[8:], uint16(dataLen))
binary.LittleEndian.PutUint32(indexBuff[10:], dataPtr)
_, err = m.dstHandle.Write(indexBuff)
if err != nil {
return fmt.Errorf("write segment index for '%s': %w", s.String(), err)
}
@ -312,27 +317,21 @@ func (m *Maker) Start() error {
if err != nil {
return fmt.Errorf("seek vector index first ptr: %w", err)
}
for i, l := range m.vectorIndex {
for j, c := range l {
_, err = m.dstHandle.Write(c.Encode())
if err != nil {
return fmt.Errorf("write vector index [%d][%d]: %w", i, j, err)
}
}
_, err = m.dstHandle.Write(m.vectorIndex)
if err != nil {
return fmt.Errorf("write vector index: %w", err)
}
// synchronized the segment index info
log.Printf("try to write the segment index ptr ... ")
var buff = make([]byte, 8)
binary.LittleEndian.PutUint32(buff, uint32(startIndexPtr))
binary.LittleEndian.PutUint32(buff[4:], uint32(endIndexPtr))
binary.LittleEndian.PutUint32(indexBuff, uint32(startIndexPtr))
binary.LittleEndian.PutUint32(indexBuff[4:], uint32(endIndexPtr))
_, err = m.dstHandle.Seek(8, 0)
if err != nil {
return fmt.Errorf("seek segment index ptr: %w", err)
}
_, err = m.dstHandle.Write(buff)
_, err = m.dstHandle.Write(indexBuff[:8])
if err != nil {
return fmt.Errorf("write segment index ptr: %w", err)
}

View File

@ -25,7 +25,7 @@ type Searcher struct {
// use it only when this feature enabled.
// Preload the vector index will reduce the number of IO operations
// thus speedup the search process
vectorIndex [][]*VectorIndexBlock
vectorIndex []byte
}
func NewSearcher(dbFile string) (*Searcher, error) {
@ -65,7 +65,7 @@ func (s *Searcher) LoadVectorIndex() error {
return fmt.Errorf("seek to vector index: %w", err)
}
var buff = make([]byte, VectorIndexRows*VectorIndexCols*VectorIndexSize)
var buff = make([]byte, VectorIndexLength)
rLen, err := s.handle.Read(buff)
if err != nil {
return err
@ -75,20 +75,7 @@ func (s *Searcher) LoadVectorIndex() error {
return fmt.Errorf("incomplete read: readed bytes should be %d", len(buff))
}
// decode the vector index blocks
var vectorIndex = make([][]*VectorIndexBlock, VectorIndexRows)
for r := 0; r < VectorIndexRows; r++ {
vectorIndex[r] = make([]*VectorIndexBlock, VectorIndexCols)
for c := 0; c < VectorIndexCols; c++ {
offset := r*VectorIndexCols*VectorIndexSize + c*VectorIndexSize
vectorIndex[r][c], err = VectorIndexBlockDecode(buff[offset:])
if err != nil {
return fmt.Errorf("decode vector index at [%d][%d]: %w", r, c, err)
}
}
}
s.vectorIndex = vectorIndex
s.vectorIndex = buff
return nil
}
@ -101,15 +88,17 @@ func (s *Searcher) ClearVectorIndex() {
func (s *Searcher) Search(ip uint32) (string, int, error) {
// locate the segment index block based on the vector index
var ioCount = 0
var vIndex *VectorIndexBlock
var il0 = (ip >> 24) & 0xFF
var il1 = (ip >> 16) & 0xFF
var idx = il0*VectorIndexCols*VectorIndexSize + il1*VectorIndexSize
var sPtr, ePtr = uint32(0), uint32(0)
if s.vectorIndex != nil {
vIndex = s.vectorIndex[(ip>>24)&0xFF][(ip>>16)&0xFF]
sPtr = binary.LittleEndian.Uint32(s.vectorIndex[idx:])
ePtr = binary.LittleEndian.Uint32(s.vectorIndex[idx+4:])
} else {
l0, l1 := (ip>>24)&0xFF, (ip>>16)&0xFF
offset := l0*VectorIndexCols*VectorIndexSize + l1*VectorIndexSize
pos, err := s.handle.Seek(int64(HeaderInfoLength+offset), 0)
pos, err := s.handle.Seek(int64(HeaderInfoLength+idx), 0)
if err != nil {
return "", ioCount, fmt.Errorf("seek to vector index[%d][%d]: %w", l0, l1, err)
return "", ioCount, fmt.Errorf("seek to vector index %d: %w", HeaderInfoLength+idx, err)
}
ioCount++
@ -123,21 +112,19 @@ func (s *Searcher) Search(ip uint32) (string, int, error) {
return "", ioCount, fmt.Errorf("incomplete read: readed bytes should be %d", len(buff))
}
vIndex, err = VectorIndexBlockDecode(buff)
if err != nil {
return "", ioCount, fmt.Errorf("invalid vector index block at %d: %w", pos, err)
}
sPtr = binary.LittleEndian.Uint32(buff)
ePtr = binary.LittleEndian.Uint32(buff[4:])
}
//log.Printf("vIndex=%s", vIndex)
// binary search the segment index to get the region
var dataLen, dataPtr = 0, uint32(0)
var buff = make([]byte, SegmentIndexBlockSize)
var l, h = 0, int((vIndex.LastPtr - vIndex.FirstPtr) / SegmentIndexBlockSize)
var buff = make([]byte, SegmentIndexSize)
var l, h = 0, int((ePtr - sPtr) / SegmentIndexSize)
for l <= h {
// log.Printf("l=%d, h=%d", l, h)
m := (l + h) >> 1
p := vIndex.FirstPtr + uint32(m*SegmentIndexBlockSize)
p := sPtr + uint32(m*SegmentIndexSize)
// log.Printf("m=%d, p=%d", m, p)
_, err := s.handle.Seek(int64(p), 0)
if err != nil {
@ -154,10 +141,6 @@ func (s *Searcher) Search(ip uint32) (string, int, error) {
return "", ioCount, fmt.Errorf("incomplete read: readed bytes should be %d", len(buff))
}
// segIndex, err := SegmentIndexDecode(buff)
// if err != nil {
// return "", fmt.Errorf("invalid segment index block at %d: %w", p, err)
// }
// decode the data step by step to reduce the unnecessary calculations
sip := binary.LittleEndian.Uint32(buff)
if ip < sip {