mirror of
https://github.com/Unitech/pm2.git
synced 2025-12-08 20:35:53 +00:00
(aggregator) refactor matching algo + fixs
This commit is contained in:
parent
0a93463eff
commit
6b05672811
@ -149,18 +149,22 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
})
|
||||
process.meta.trace_count++;
|
||||
|
||||
// Update global stat object
|
||||
self.matchPath(path, process.routes, function (matched) {
|
||||
if (!matched) {
|
||||
process.routes[path] = [];
|
||||
log('Path %s isnt aggregated yet, creating new entry', path)
|
||||
self.mergeTrace(process.routes[path], new_trace);
|
||||
}
|
||||
else {
|
||||
log('Path %s already aggregated under %s, merging', path, matched)
|
||||
self.mergeTrace(process.routes['/' + matched], new_trace);
|
||||
}
|
||||
})
|
||||
|
||||
// remove the last slash if exist
|
||||
if (path[0] === '/' && path !== '/')
|
||||
path = path.substr(1, path.length - 1)
|
||||
// Find
|
||||
var matched = self.matchPath(path, process.routes);
|
||||
if (!matched) {
|
||||
process.routes[path] = [];
|
||||
log('Path %s isnt aggregated yet, creating new entry', path)
|
||||
self.mergeTrace(process.routes[path], new_trace);
|
||||
}
|
||||
else {
|
||||
log('Path %s already aggregated under %s', path, matched)
|
||||
self.mergeTrace(process.routes[matched], new_trace);
|
||||
}
|
||||
|
||||
return self.processes;
|
||||
}
|
||||
|
||||
@ -171,13 +175,8 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
* @param {Object} trace
|
||||
*/
|
||||
this.mergeTrace = function (aggregated, trace) {
|
||||
if (!trace.spans || !trace.spans[0])
|
||||
return log('trace.spans or trace.spans[0] is null');
|
||||
|
||||
// @vmarchaud If aggregated is null it mean that .matchPath is buggy?
|
||||
if (!aggregated)
|
||||
return log('aggregated is null?');
|
||||
|
||||
if (!aggregated || !trace)
|
||||
return ;
|
||||
if (!aggregated.variances)
|
||||
aggregated.variances = [];
|
||||
|
||||
@ -187,22 +186,32 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
min: 100000,
|
||||
max: 0
|
||||
}
|
||||
|
||||
trace.spans = trace.spans.filter(function (span) {
|
||||
|
||||
// remove spans with startTime == endTime
|
||||
trace.spans = trace.spans.filter(function(span) {
|
||||
return span.endTime !== span.startTime;
|
||||
})
|
||||
// if the trace doesn't any spans stop aggregation here
|
||||
if (trace.spans.length == 0)
|
||||
return ;
|
||||
|
||||
// compute duration of child spans
|
||||
trace.spans.forEach(function(span) {
|
||||
span.min = span.max = span.mean = Math.round(new Date(span.endTime) - new Date(span.startTime));
|
||||
})
|
||||
|
||||
// Calculate/Update mean, min, max, count
|
||||
aggregated.meta.mean = aggregated.meta.count > 0 ?
|
||||
(trace.spans[0].mean + (aggregated.meta.mean * aggregated.meta.count)) / (aggregated.meta.count + 1) : trace.spans[0].mean;
|
||||
// Calculate/Update mean
|
||||
if (aggregated.meta.count > 0)
|
||||
aggregated.meta.mean = (trace.spans[0].mean + (aggregated.meta.mean * aggregated.meta.count)) / (aggregated.meta.count + 1)
|
||||
else
|
||||
aggregated.meta.mean = trace.spans[0].mean;
|
||||
|
||||
// update min/max
|
||||
aggregated.meta.min = aggregated.meta.min > trace.spans[0].mean ? trace.spans[0].mean : aggregated.meta.min;
|
||||
aggregated.meta.max = aggregated.meta.max < trace.spans[0].mean ? trace.spans[0].mean : aggregated.meta.max;
|
||||
aggregated.meta.count++;
|
||||
// round mean value
|
||||
aggregated.meta.mean = Math.round(aggregated.meta.mean * 100) / 100;
|
||||
|
||||
var merge = function (variance) {
|
||||
// no variance found so its a new one
|
||||
@ -220,6 +229,7 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
variance.min = variance.min > trace.spans[0].mean ? trace.spans[0].mean : variance.min;
|
||||
variance.max = variance.max < trace.spans[0].mean ? trace.spans[0].mean : variance.max;
|
||||
variance.mean = (trace.spans[0].mean + (variance.mean * variance.count)) / (variance.count + 1);
|
||||
variance.mean = Math.round(variance.mean * 100) / 100;
|
||||
|
||||
// update duration of spans to be mean
|
||||
self.updateSpanDuration(variance.spans, trace.spans, variance.count, true);
|
||||
@ -238,7 +248,7 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Parkour simultaneously both spans to update value of the first one using value of the second one
|
||||
* Parkour simultaneously both spans list to update value of the first one using value of the second one
|
||||
* The first should be variance already aggregated for which we want to merge the second one
|
||||
* The second one is a new trace, so we need to re-compute mean/min/max time for each spans
|
||||
*/
|
||||
@ -269,52 +279,53 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
/**
|
||||
* Will return the route if we found an already matched route
|
||||
*/
|
||||
this.matchPath = function (path, routes, cb) {
|
||||
var self = this;
|
||||
|
||||
this.matchPath = function (path, routes) {
|
||||
// empty route is / without the fist slash
|
||||
if (path === '/')
|
||||
return routes[path] ? cb(path) : cb(null);
|
||||
return routes[path] ? path : null;
|
||||
|
||||
// remove the last slash if exist
|
||||
if (path[path.length - 1] === '/')
|
||||
path = path.substr(0, path.length - 1)
|
||||
|
||||
// split to get array of segment
|
||||
path = path.split('/').filter(function (item) {
|
||||
return !item ? null : item;
|
||||
});
|
||||
path = path.split('/');
|
||||
|
||||
// if the path has only one segment, we just need to compare the key
|
||||
if (path.length === 1)
|
||||
return routes[path[0]] ? cb(routes[path[0]]) : cb(null);
|
||||
return routes[path[0]] ? routes[path[0]] : null;
|
||||
|
||||
// check in routes already stored for match
|
||||
async.forEachOfLimit(routes, 10, function (data, route, next) {
|
||||
var segments = route.split('/').filter(function (item) {
|
||||
return !item ? null : item;
|
||||
});
|
||||
if (segments.length !== path.length) return next(null);
|
||||
var keys = Object.keys(routes);
|
||||
for (var i = 0, len = keys.length; i < len; i++) {
|
||||
var route = keys[i], segments = route.split('/');
|
||||
|
||||
for (var i = path.length - 1; i >= 0; i--) {
|
||||
if (segments.length !== path.length) continue;
|
||||
|
||||
for (var j = path.length - 1; j >= 0; j--) {
|
||||
// different segment, try to find if new route or not
|
||||
if (path[i] !== segments[i]) {
|
||||
// case if the aggregator already have matched that path into a route and we got an identifier
|
||||
if (self.isIdentifier(path[i]) && segments[i] === '*' && path[i - 1] === segments[i - 1])
|
||||
return next(segments.join('/'));
|
||||
if (path[j] !== segments[j]) {
|
||||
// if the aggregator already have matched that segment with a wildcard and the next segment is the same
|
||||
if (self.isIdentifier(path[j]) && segments[j] === '*' && path[j - 1] === segments[j - 1])
|
||||
return segments.join('/');
|
||||
// case a var in url match, so we continue because they must be other var in url
|
||||
else if (path[i - 1] !== undefined && path[i - 1] === segments[i - 1] && self.isIdentifier(path[i]) && self.isIdentifier(segments[i])) {
|
||||
segments[i] = '*';
|
||||
else if (path[j - 1] !== undefined && path[j - 1] === segments[j - 1] && self.isIdentifier(path[j]) && self.isIdentifier(segments[j])) {
|
||||
segments[j] = '*';
|
||||
// update routes in cache
|
||||
routes[segments.join('/')] = routes[route];
|
||||
delete routes[route];
|
||||
route = segments.join('/');
|
||||
delete routes[keys[i]]
|
||||
return segments.join('/');
|
||||
}
|
||||
else
|
||||
return next();
|
||||
break ;
|
||||
}
|
||||
|
||||
// if finish to iterate over segment of path, we must be on the same route
|
||||
if (j == 0)
|
||||
return segments.join('/')
|
||||
}
|
||||
// if finish to iterate over segment of path, we must be on the same route
|
||||
return next(segments.join('/'))
|
||||
}, cb)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -362,7 +373,7 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
// @vmarchaud why do we iterate over keys -
|
||||
// Why not just doing this only on the .cmd field?
|
||||
Object.keys(span.labels).forEach(function(key) {
|
||||
if (typeof(span.labels[key]) === 'string')
|
||||
if (typeof(span.labels[key]) === 'string' && key !== 'stacktrace')
|
||||
span.labels[key] = span.labels[key].replace(REGEX_JSON_CLEANUP, '\": \"?\"');
|
||||
});
|
||||
});
|
||||
@ -386,7 +397,7 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
routes: [],
|
||||
meta: fclone({
|
||||
trace_count : process.meta.trace_count,
|
||||
mean_latency : process.meta.mean_latency,
|
||||
mean_latency : Math.round(process.meta.mean_latency * 100) / 100,
|
||||
http_meter : Math.round(process.meta.http_meter.rate(1000) * 100) / 100,
|
||||
db_meter : Math.round(process.meta.db_meter.rate(1000) * 100) / 100
|
||||
})
|
||||
@ -422,7 +433,7 @@ var TransactionAggregator = module.exports = function (pushInteractor) {
|
||||
tmp.meter = Math.round(variance.meter.rate(1000) * 100) / 100;
|
||||
// delete stackrace from spans
|
||||
tmp.spans.forEach(function (span) {
|
||||
delete span.labels.stacktrace;
|
||||
// delete span.labels.stacktrace;
|
||||
})
|
||||
// push serialized into normalized data
|
||||
routeCopy.variances.push(tmp);
|
||||
|
||||
@ -64,42 +64,34 @@ describe('Transactions Aggregator', function() {
|
||||
|
||||
describe('.matchPath - aggregate', function() {
|
||||
var routes = {
|
||||
'/bucket/6465577': { spans: true },
|
||||
'/admin/bucket/8787': { spans: true }
|
||||
'bucket/6465577': { spans: true }
|
||||
};
|
||||
|
||||
it('should match first route', function(done) {
|
||||
aggregator.matchPath('/bucket/67754', routes, function (match) {
|
||||
should.exist(match);
|
||||
match.should.be.a.String();
|
||||
match.should.equal('bucket/*');
|
||||
should.exist(routes['bucket/*'])
|
||||
done()
|
||||
})
|
||||
it('should match first route', function() {
|
||||
var match = aggregator.matchPath('bucket/67754', routes);
|
||||
should.exist(match);
|
||||
match.should.be.a.String();
|
||||
match.should.equal('bucket/*');
|
||||
should.exist(routes['bucket/*'])
|
||||
});
|
||||
|
||||
it('should NOT match any route', function(done) {
|
||||
aggregator.matchPath('/toto/67754', routes, function (match) {
|
||||
should.not.exist(match);
|
||||
done()
|
||||
})
|
||||
it('should NOT match any route', function() {
|
||||
should.not.exist(aggregator.matchPath('toto/67754', routes));
|
||||
});
|
||||
|
||||
it('should match aggregated route with *', function(done) {
|
||||
aggregator.matchPath('/bucket/87998', routes, function (match) {
|
||||
should.exist(match);
|
||||
match.should.be.a.String();
|
||||
match.should.equal('bucket/*');
|
||||
should.exist(routes['bucket/*'])
|
||||
done()
|
||||
})
|
||||
it('should match aggregated route with *', function() {
|
||||
var match = aggregator.matchPath('bucket/87998', routes);
|
||||
should.exist(match);
|
||||
match.should.be.a.String();
|
||||
match.should.equal('bucket/*');
|
||||
should.exist(routes['bucket/*'])
|
||||
});
|
||||
});
|
||||
|
||||
describe('merging trace together', function() {
|
||||
var trace = TraceFactory.generateTrace('/yoloswag/swag', 2);
|
||||
var trace = TraceFactory.generateTrace('yoloswag/swag', 2);
|
||||
var ROUTES = {
|
||||
'/yoloswag/swag': {}
|
||||
'yoloswag/swag': {}
|
||||
};
|
||||
|
||||
it('should not fail', function() {
|
||||
@ -107,29 +99,29 @@ describe('Transactions Aggregator', function() {
|
||||
});
|
||||
|
||||
it('should add a trace', function() {
|
||||
aggregator.mergeTrace(ROUTES['/yoloswag/swag'], trace)
|
||||
ROUTES['/yoloswag/swag'].meta.count.should.be.equal(1);
|
||||
ROUTES['/yoloswag/swag'].variances.length.should.be.equal(1);
|
||||
ROUTES['/yoloswag/swag'].variances[0].spans.length.should.be.equal(3);
|
||||
aggregator.mergeTrace(ROUTES['yoloswag/swag'], trace)
|
||||
ROUTES['yoloswag/swag'].meta.count.should.be.equal(1);
|
||||
ROUTES['yoloswag/swag'].variances.length.should.be.equal(1);
|
||||
ROUTES['yoloswag/swag'].variances[0].spans.length.should.be.equal(3);
|
||||
});
|
||||
|
||||
it('should merge with the first variance', function() {
|
||||
aggregator.mergeTrace(ROUTES['/yoloswag/swag'], trace);
|
||||
ROUTES['/yoloswag/swag'].variances.length.should.be.equal(1);
|
||||
ROUTES['/yoloswag/swag'].variances[0].count.should.be.equal(2);
|
||||
aggregator.mergeTrace(ROUTES['yoloswag/swag'], trace);
|
||||
ROUTES['yoloswag/swag'].variances.length.should.be.equal(1);
|
||||
ROUTES['yoloswag/swag'].variances[0].count.should.be.equal(2);
|
||||
});
|
||||
|
||||
it('should merge as a new variance with the same route', function () {
|
||||
var trace2 = TraceFactory.generateTrace('/yoloswag/swag', 3)
|
||||
var trace2 = TraceFactory.generateTrace('yoloswag/swag', 3)
|
||||
trace2.spans.forEach(function (span) {
|
||||
span.min = span.max = span.mean = Math.round(new Date(span.endTime) - new Date(span.startTime));
|
||||
})
|
||||
aggregator.mergeTrace(ROUTES['/yoloswag/swag'], trace2);
|
||||
ROUTES['/yoloswag/swag'].meta.count.should.be.equal(3);
|
||||
ROUTES['/yoloswag/swag'].variances.length.should.be.equal(2);
|
||||
ROUTES['/yoloswag/swag'].variances[0].count.should.be.equal(2);
|
||||
ROUTES['/yoloswag/swag'].variances[1].count.should.be.equal(1);
|
||||
ROUTES['/yoloswag/swag'].variances[1].spans.length.should.be.equal(4);
|
||||
aggregator.mergeTrace(ROUTES['yoloswag/swag'], trace2);
|
||||
ROUTES['yoloswag/swag'].meta.count.should.be.equal(3);
|
||||
ROUTES['yoloswag/swag'].variances.length.should.be.equal(2);
|
||||
ROUTES['yoloswag/swag'].variances[0].count.should.be.equal(2);
|
||||
ROUTES['yoloswag/swag'].variances[1].count.should.be.equal(1);
|
||||
ROUTES['yoloswag/swag'].variances[1].spans.length.should.be.equal(4);
|
||||
});
|
||||
});
|
||||
|
||||
@ -141,15 +133,15 @@ describe('Transactions Aggregator', function() {
|
||||
|
||||
it('should aggregate', function() {
|
||||
// Simulate some data
|
||||
var packet = TraceFactory.generatePacket('/yoloswag/swag', 'appname');
|
||||
var packet = TraceFactory.generatePacket('yoloswag/swag', 'appname');
|
||||
aggregator.aggregate(packet);
|
||||
packet = TraceFactory.generatePacket('/yoloswag/swag', 'appname');
|
||||
packet = TraceFactory.generatePacket('yoloswag/swag', 'appname');
|
||||
aggregator.aggregate(packet);
|
||||
packet = TraceFactory.generatePacket('/yoloswag/swag', 'appname');
|
||||
packet = TraceFactory.generatePacket('yoloswag/swag', 'appname');
|
||||
aggregator.aggregate(packet);
|
||||
packet = TraceFactory.generatePacket('/sisi/aight', 'appname');
|
||||
packet = TraceFactory.generatePacket('sisi/aight', 'appname');
|
||||
aggregator.aggregate(packet);
|
||||
packet = TraceFactory.generatePacket('/sisi/aight', 'APP2');
|
||||
packet = TraceFactory.generatePacket('sisi/aight', 'APP2');
|
||||
var agg = aggregator.aggregate(packet);
|
||||
should(agg).not.be.undefined();
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user