Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kapacitor loopback #1360

Merged
merged 1 commit into from
May 8, 2017
Merged

Add kapacitor loopback #1360

merged 1 commit into from
May 8, 2017

Conversation

nathanielc
Copy link
Contributor

@nathanielc nathanielc commented May 4, 2017

Adds the ability to natively emit data back into Kapacitor from a task

Does this break the order guarantee? No, not anymore than you could have done using InfluxDBOut.
We can always gate loopback writes through a reorder step if it does become something that is hard to manage in practice.

Does this allow creation of infinite loops? Yes, but not within the same task. To create an infinite loop you would need to use at least two task each with a loopback pointing at the other. A warning has been added to the docs, but this should be rare.

  • Rebased/mergable
  • Tests pass
  • CHANGELOG.md updated
  • Add tests for batch/stream

@nathanielc nathanielc requested a review from desa May 4, 2017 21:47
@nathanielc nathanielc force-pushed the feature/kapacitor-loopback branch from 13b4030 to 487b21d Compare May 4, 2017 21:51
@desa
Copy link
Contributor

desa commented May 5, 2017

So I got a bit carried away and started playing with recursion in the loopback.

Made the tickscript for iteratively computing the square root

var tolerance = 0.0001
var sq = stream
 |from()
   .measurement('m')
 |eval(lambda: 0.5 * ("x" + ("s" / "x")))
  .as('xn')
  .keep()

sq|where(lambda: abs("x" - "xn") < tolerance)
 |log()

sq|where(lambda: abs("x" - "xn") > tolerance)
  |eval(lambda: "xn")
   .as('x')
   .keep('s','x')
 |kapacitorLoopback()
  .database('mydb2')
  .retentionPolicy('autogen')

with mutually recursive

var tolerance = 0.0001
var sq = stream
 |from()
   .measurement('m')
 |eval(lambda: 0.5 * ("x" + ("s" / "x")))
  .as('xn')
  .keep()

sq|where(lambda: abs("x" - "xn") < tolerance)
 |log()

sq|where(lambda: abs("x" - "xn") > tolerance)
  |eval(lambda: "xn")
   .as('x')
   .keep('s','x')
 |kapacitorLoopback()
  .database('mydb')
  .retentionPolicy('autogen')

and data

m s=5,x=1

where s is the value you want to compute the square root of

Copy link
Contributor

@desa desa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! I'm excited about the types of things this opens up.

@@ -628,7 +640,9 @@ func (tm *TaskMaster) forkPoint(p models.Point) {
}

func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, points []imodels.Point) error {
if tm.closed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't this need a read lock previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did, but wasn't caught by the race detector somehow.

if tm.closed {
tm.writesMu.RLock()
defer tm.writesMu.RUnlock()
if tm.writesClosed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was writesClosed introduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To safely close the writePointsIn edge we need to ensure that no more writes were happening. This meant that we needed to check a RWMutex and perform a multistage close.

The close process looks something like this:

  1. Stop all writes
  2. Close writePointsIn edge.
  3. Wait for all forks to finish
  4. Delete forks
  5. Stop tasks
  6. Mark task master as closed.

This should have been the process all along but previous with just the HTTP method of writing points it didn't matter. But now with the new loopback way of writing points this explicit close process was needed to prevent panics on writing to the closed edge.

In other words the HTTPD service was always closed before the TaskMaster so the fact that the Close operation wasn't safe didn't matter since writes could not come in during the close operation. But this new loopback method for writes changed that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense

@nathanielc nathanielc changed the title WIP Add kapacitor loopback Add kapacitor loopback May 8, 2017
@nathanielc nathanielc merged commit 487b21d into master May 8, 2017
nathanielc added a commit that referenced this pull request May 8, 2017
@nathanielc nathanielc deleted the feature/kapacitor-loopback branch May 8, 2017 19:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants