Skip to content

Commit

Permalink
[Part 9]: Introducing Sink::close (#159)
Browse files Browse the repository at this point in the history
* [Part 9]: Introducing Sink::close

An explicit close is required because in the new model (Titus), we want the ability to stop tasks gracefully. This means shutting down all the resources that the job is currently consuming. In the old world (Apache Mesos), this was not required because we were forcefully shutting down processes which are both not clean as well as not feasible in the new world.

Co-authored-by: Sundaram Ananthanarayanan <[email protected]>
  • Loading branch information
sundargates and sundargates authored Apr 4, 2022
1 parent 30f6e87 commit 3a3b5dd
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 374 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

Expand All @@ -42,6 +43,7 @@ public class KafkaSink<T> implements SelfDocumentingSink<T> {
private final Func1<T, byte[]> encoder;
private final Registry registry;
private final AtomicReference<KafkaProducer<byte[], byte[]>> kafkaProducerAtomicRef = new AtomicReference<>(null);
private Subscription subscription;

KafkaSink(Registry registry, Func1<T, byte[]> encoder) {
this.encoder = encoder;
Expand All @@ -61,7 +63,7 @@ public void call(Context context, PortRequest ignore, Observable<T> dataO) {
Parameters parameters = context.getParameters();
String topic = (String)parameters.get(KafkaSinkJobParameters.TOPIC);

dataO.map(encoder::call)
subscription = dataO.map(encoder::call)
.flatMap((dataBytes) ->
Observable.from(kafkaProducer.send(new ProducerRecord<>(topic, dataBytes)))
.subscribeOn(Schedulers.io()))
Expand Down Expand Up @@ -91,4 +93,9 @@ public Metadata metadata() {
.description(description.toString())
.build();
}

@Override
public void close() {
subscription.unsubscribe();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@
import io.mantisrx.runtime.sink.ServerSentEventsSink;
import io.mantisrx.runtime.sink.Sink;
import io.mantisrx.runtime.sink.predicate.Predicate;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import rx.Observable;
import rx.Subscription;
import rx.functions.Func2;


public class SourceSink implements Sink<String> {

private final String clientId;

private Func2<Map<String, List<String>>, Context, Void> preProcessor = new NoOpProcessor();
private Func2<Map<String, List<String>>, Context, Void> postProcessor = new NoOpProcessor();
private final ServerSentEventsSink<String> sink;
private Subscription subscription;

static class NoOpProcessor implements Func2<Map<String, List<String>>, Context, Void> {

Expand All @@ -46,25 +45,30 @@ public Void call(Map<String, List<String>> t1, Context t2) {

public SourceSink(Func2<Map<String, List<String>>, Context, Void> preProcessor,
Func2<Map<String, List<String>>, Context, Void> postProcessor, String mantisClientId) {
this.postProcessor = postProcessor;
this.preProcessor = preProcessor;
this.clientId = mantisClientId;
this.sink = new ServerSentEventsSink.Builder<String>()
.withEncoder(data -> data)
.withPredicate(new Predicate<>("description", new EventFilter(mantisClientId)))
.withRequestPreprocessor(preProcessor)
.withRequestPostprocessor(postProcessor)
.build();
}

@Override
public void call(Context context, PortRequest portRequest,
Observable<String> observable) {
observable = observable.filter(t1 -> !t1.isEmpty());

ServerSentEventsSink<String> sink = new ServerSentEventsSink.Builder<String>()
.withEncoder(data -> data)
.withPredicate(new Predicate<>("description", new EventFilter(clientId)))
.withRequestPreprocessor(preProcessor)
.withRequestPostprocessor(postProcessor)
.build();

observable.subscribe();
subscription = observable.subscribe();

sink.call(context, portRequest, observable);
}

@Override
public void close() throws IOException {
try {
sink.close();
} finally {
subscription.unsubscribe();
}
}
}
Loading

0 comments on commit 3a3b5dd

Please sign in to comment.