Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to combine a stream with itself dynamically. #693

Merged
merged 1 commit into from
Jul 2, 2016

Conversation

nathanielc
Copy link
Contributor

@nathanielc nathanielc commented Jun 30, 2016

Fixes #46

Here is an example that uses the new |combine() method.

// Points are stored in the DB at 1m intervals
var step = 1m

// Get the number of bytes received by port
var in = batch
    |query('''
    SELECT bytes FROM "telegraf"."default"."net_in_port"
    ''')
        .period(step)
        .every(step)
        .groupBy('port')
    // We expect one point per batch
    |last('bytes')
        .as('bytes')
    |derivative('bytes')
        .as('bytes')
        .unit(step)
        .nonNegative()
    |default()
        .tag('source', 'in')
    |groupBy()

// Get the number of bytes sent by port
var out = batch
    |query('''
    SELECT bytes FROM "telegraf"."default"."net_out_port"
    ''')
        .period(step)
        .every(step)
        .groupBy('port')
    // We expect one point per batch
    |last('bytes')
        .as('bytes')
    |derivative('bytes')
        .as('bytes')
        .unit(step)
        .nonNegative()
    |default()
        .tag('source', 'out')
    |groupBy()

// Merge the in and out streams via union
in
    |union(out)
        .rename('ports')
    // Combine all in<->out port pairs 
    |combine(lambda: "source" == 'in', lambda: "source" == 'out')
        .as('in', 'out')
        .tolerance(step)
    // Now group by the pair of ports
    |groupBy('in.port', 'out.port')
    |window()
        .period(1h)
        .every(1h)
    // Compute the Pearson R coefficient for each pair of ports for a 1h window
    @pearsonr()
        .fields('in.bytes', 'out.bytes')
    // If the correlation was strong do something with it...
    |where(lambda: abs("r") > 0.9)
    |log()
        .prefix('R')

For the above script say we have data being received on port 22 and data being sent on ports 80 and 443. (Note the port data is aggregate for an entire network not a single host). The TICKscript will combine the in vs out ports into pairs of (22, 80) and (22, 443). Then it windows the data an computes a correlation between them.

If we were to modify the |combine call to be less restrictive, like:

    |combine(lambda: TRUE, lambda: TRUE)

then all pairs would be generated, i.e. (22,80) , (22,443) and (80,443). But since we don't care to correlate outgoing ports with outgoing ports we restricted the possible combinations.

To demonstrate the use of such a task you could run:

kapacitor replay-live -task corr_in_out_traffic -past 1d -rec-time

The above command would then correlate all incoming traffic data with outgoing traffic by port over 1h windows.

  • Rebased/mergable
  • Tests pass
  • CHANGELOG.md updated

@nathanielc nathanielc force-pushed the nc-issue#46 branch 3 times, most recently from ad86bac to b36d119 Compare July 1, 2016 16:59
@nathanielc nathanielc merged commit 1803d58 into master Jul 2, 2016
@nathanielc nathanielc deleted the nc-issue#46 branch July 2, 2016 14:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Be able to join dynamically on Groups
1 participant