optimize the region filtering to achieve higher consecutive segments merging rate

This commit is contained in:
lion 2025-09-25 11:17:09 +08:00
parent cb3329390a
commit 4ff67f168c
9 changed files with 35 additions and 29 deletions

View File

@ -442,7 +442,7 @@ func testBench() {
var count, errCount, tStart = 0, 0, time.Now()
slog.Info("Bench start", "xdbPath", dbFile, "srcPath", srcFile)
var iErr = xdb.IterateSegments(handle, nil, func(seg *xdb.Segment) error {
var iErr = xdb.IterateSegments(handle, nil, nil, func(seg *xdb.Segment) error {
var l = fmt.Sprintf("%d|%d|%s", seg.StartIP, seg.EndIP, seg.Region)
slog.Debug("try to bench", "segment", l)
// mip := xdb.IPMiddle(seg.StartIP, seg.EndIP)

View File

@ -59,7 +59,7 @@ func (e *Editor) loadSegments() error {
var iErr = IterateSegments(e.srcHandle, func(l string) {
// do nothing here
}, func(seg *Segment) error {
}, nil, func(seg *Segment) error {
// version check
if len(seg.StartIP) != e.verison.Bytes {
return fmt.Errorf("invalid ip segment(%s expected)", e.verison.Name)
@ -240,7 +240,7 @@ func (e *Editor) PutFile(src string) (int, int, error) {
var oldRows, newRows = 0, 0
iErr := IterateSegments(handle, func(l string) {
// do nothing here
}, func(seg *Segment) error {
}, nil, func(seg *Segment) error {
o, n, err := e.PutSegment(seg)
if err == nil {
oldRows += o

View File

@ -162,6 +162,9 @@ func (m *Maker) loadSegments() error {
var iErr = IterateSegments(m.srcHandle, func(l string) {
slog.Debug("loaded", "segment", l)
}, func(region string) (string, error) {
// apply the field filter
return RegionFiltering(region, m.fields)
}, func(seg *Segment) error {
// ip version check
if len(seg.StartIP) != m.version.Bytes {
@ -173,14 +176,6 @@ func (m *Maker) loadSegments() error {
return err
}
// apply the field filter
region, err := RegionFiltering(seg.Region, m.fields)
if err != nil {
return err
}
// slog.Info("filtered", "region", region)
seg.Region = region
m.segments = append(m.segments, seg)
last = seg
return nil

View File

@ -52,20 +52,15 @@ func (p *Processor) loadSegments() error {
var iErr = IterateSegments(p.srcHandle, func(l string) {
slog.Debug("loaded", "segment", l)
}, func(region string) (string, error) {
return RegionFiltering(region, p.fields)
}, func(seg *Segment) error {
// check the continuity of the data segment
// if err := seg.AfterCheck(last); err != nil {
// return err
// }
// apply the field filter
region, err := RegionFiltering(seg.Region, p.fields)
if err != nil {
return err
}
// slog.Info("filtered", "source", seg.Region, "filtered", region)
seg.Region = region
p.segments = append(p.segments, seg)
return nil
})

View File

@ -101,7 +101,7 @@ func IPMiddle(sip, eip []byte) []byte {
return result
}
func IterateSegments(handle *os.File, before func(l string), cb func(seg *Segment) error) error {
func IterateSegments(handle *os.File, before func(l string), filter func(region string) (string, error), done func(seg *Segment) error) error {
var last *Segment = nil
var scanner = bufio.NewScanner(handle)
scanner.Split(bufio.ScanLines)
@ -147,10 +147,19 @@ func IterateSegments(handle *os.File, before func(l string), cb func(seg *Segmen
// return fmt.Errorf("empty region info in segment line `%s`", l)
// }
// check and do the region filter
var region = ps[2]
if filter != nil {
region, err = filter(ps[2])
if err != nil {
return fmt.Errorf("failed to filter region `%s`: %s", ps[2], err)
}
}
var seg = &Segment{
StartIP: sip,
EndIP: eip,
Region: ps[2],
Region: region,
}
// check and automatic merging the Consecutive Segments, which means:
@ -166,7 +175,7 @@ func IterateSegments(handle *os.File, before func(l string), cb func(seg *Segmen
}
}
if err = cb(last); err != nil {
if err = done(last); err != nil {
return err
}
@ -176,7 +185,7 @@ func IterateSegments(handle *os.File, before func(l string), cb func(seg *Segmen
// process the last segment
if last != nil {
return cb(last)
return done(last)
}
return nil

View File

@ -163,7 +163,7 @@ func TestIterateSegments(t *testing.T) {
_ = IterateSegments(handle, func(l string) {
// fmt.Printf("load segment: `%s`\n", l)
}, func(seg *Segment) error {
}, nil, func(seg *Segment) error {
fmt.Printf("get segment: `%s`\n", seg)
return nil
})

View File

@ -164,6 +164,11 @@ public class Maker {
log.debugf("load segment: `%s`", line);
}
@Override
public String filter(String region) {
return Util.regionFiltering(region, fields);
}
@Override
public void handle(Segment seg) throws Exception {
// ip version check
@ -176,18 +181,14 @@ public class Maker {
+ Util.ipToString(last.endIP)+")+1 != seg.sip("+ Util.ipToString(seg.startIP) + ", "+ seg.region +")");
}
// apply the field filtering
final String region = Util.regionFiltering(seg.region, fields);
// allow empty region
// if (region.length() < 1) {
// throw new Exception("empty region info for segment `"+seg+"`");
// }
segments.add(new Segment(seg.startIP, seg.endIP, region));
segments.add(seg);
last = seg;
}
});
log.infof("all segments loaded, length: %d, elapsed: %d ms", segments.size(), System.currentTimeMillis() - tStart);

View File

@ -122,6 +122,7 @@ public class Segment {
// static class to handler the iterate callback
public static interface IterateAction {
public void before(final String line);
public String filter(final String region);
public void handle(final Segment seg) throws Exception;
}
@ -175,7 +176,7 @@ public class Segment {
// throw new Exception("empty region info in segment line `"+ps[2]+"`");
// }
final Segment seg = new Segment(sip, eip, ps[2]);
final Segment seg = new Segment(sip, eip, action.filter(ps[2]));
// check and set the last segment
if (last == null) {
last = seg;

View File

@ -54,6 +54,11 @@ public class SegmentTest {
// log.debugf("load segment: `%s`", line);
}
@Override
public String filter(String region) {
return region;
}
@Override
public void handle(Segment seg) throws Exception {
log.infof("handle segment: `%s`", seg.toString());