-
Notifications
You must be signed in to change notification settings - Fork 488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add kapacitor loopback #1360
Add kapacitor loopback #1360
Conversation
13b4030
to
487b21d
Compare
So I got a bit carried away and started playing with recursion in the loopback. Made the tickscript for iteratively computing the square root
with mutually recursive
and data
where |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I'm excited about the types of things this opens up.
@@ -628,7 +640,9 @@ func (tm *TaskMaster) forkPoint(p models.Point) { | |||
} | |||
|
|||
func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, points []imodels.Point) error { | |||
if tm.closed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't this need a read lock previously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It did, but wasn't caught by the race detector somehow.
if tm.closed { | ||
tm.writesMu.RLock() | ||
defer tm.writesMu.RUnlock() | ||
if tm.writesClosed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was writesClosed
introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To safely close the writePointsIn edge we need to ensure that no more writes were happening. This meant that we needed to check a RWMutex and perform a multistage close.
The close process looks something like this:
- Stop all writes
- Close writePointsIn edge.
- Wait for all forks to finish
- Delete forks
- Stop tasks
- Mark task master as closed.
This should have been the process all along but previous with just the HTTP method of writing points it didn't matter. But now with the new loopback way of writing points this explicit close process was needed to prevent panics on writing to the closed edge.
In other words the HTTPD service was always closed before the TaskMaster so the fact that the Close operation wasn't safe didn't matter since writes could not come in during the close operation. But this new loopback method for writes changed that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense
Adds the ability to natively emit data back into Kapacitor from a task
Does this break the order guarantee? No, not anymore than you could have done using InfluxDBOut.
We can always gate loopback writes through a reorder step if it does become something that is hard to manage in practice.
Does this allow creation of infinite loops? Yes, but not within the same task. To create an infinite loop you would need to use at least two task each with a loopback pointing at the other. A warning has been added to the docs, but this should be rare.