Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add AutoCloseable shortcut on mapWithResource #1053

Merged
merged 9 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Map elements with the help of a resource that can be opened, transform each elem
2. `f`: Transform each element inputs with the help of resource.
3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.

@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%20%3C%3A%20AutoCloseable%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%2C%20f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%29%3A%20Repr%5BT%5D" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2)" }
1. `create`: Open or Create the autocloseable resource.
2. `f`: Transform each element inputs with the help of resource.

## Description

Transform each stream element with the help of a resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -237,6 +238,21 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}

@Test
public void mustBeAbleToUseMapWithAutoCloseableResource() {
final TestKit probe = new TestKit(system);
final AtomicInteger closed = new AtomicInteger();
Source.from(Arrays.asList("1", "2", "3"))
.via(
Flow.of(String.class)
.mapWithResource(
() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem))
.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system);

probe.expectMsgAllOf("1", "2", "3");
Assert.assertEquals(closed.get(), 1);
}

@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -815,6 +816,18 @@ public void mustBeAbleToUseMapWithResource() {
Assert.assertFalse(gate.get());
}

@Test
public void mustBeAbleToUseMapWithAutoCloseableResource() {
final TestKit probe = new TestKit(system);
final AtomicInteger closed = new AtomicInteger();
Source.from(Arrays.asList("1", "2", "3"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want add something like Flux.elements for java dsl...

.mapWithResource(() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem)
.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system);

probe.expectMsgAllOf("1", "2", "3");
Assert.assertEquals(closed.get(), 1);
}

@Test
public void mustBeAbleToUseFoldWhile() throws Exception {
final int result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.pekko
import pekko.Done
import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer }
import pekko.stream.ActorAttributes.supervisionStrategy
import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
import pekko.stream.Supervision.{ restartingDecider, resumingDecider, stoppingDecider }
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import pekko.stream.impl.StreamSupervisor.Children
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
Expand Down Expand Up @@ -410,6 +410,226 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
Await.result(promise.future, 3.seconds) shouldBe Done
}

"will close the autocloseable resource when upstream complete" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.expectSubscription().request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
pub.sendComplete()
sub.expectComplete()
closedCounter.get shouldBe 1
}

"will close the autocloseable resource when upstream fail" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.expectSubscription().request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
pub.sendError(ex)
sub.expectError(ex)
closedCounter.get shouldBe 1
}

"will close the autocloseable resource when downstream cancel" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
val subscription = sub.expectSubscription()
subscription.request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
subscription.cancel()
pub.expectCancellation()
closedCounter.get shouldBe 1
}

"will close the autocloseable resource when downstream fail" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(create, (_: AutoCloseable, count) => count)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.request(2)
closedCounter.get shouldBe 0
pub.sendNext(1)
sub.expectNext(1)
closedCounter.get shouldBe 0
sub.cancel(ex)
pub.expectCancellationWithCause(ex)
closedCounter.get shouldBe 1
}

"will close the autocloseable resource on abrupt materializer termination" in {
val closedCounter = new AtomicInteger(0)
@nowarn("msg=deprecated")
val mat = ActorMaterializer()
val promise = Promise[Done]()
val create = () =>
new AutoCloseable {
override def close(): Unit = {
closedCounter.incrementAndGet()
promise.complete(Success(Done))
}
}
val matVal = Source
.single(1)
.mapWithResource(create, (_: AutoCloseable, count) => count)
.runWith(Sink.never)(mat)
closedCounter.get shouldBe 0
mat.shutdown()
matVal.failed.futureValue shouldBe an[AbruptTerminationException]
Await.result(promise.future, 3.seconds) shouldBe Done
closedCounter.get shouldBe 1
}

"continue with autoCloseable when Strategy is Resume and exception happened" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create,
(_: AutoCloseable, elem) => {
if (elem == 10) throw TE("") else elem
})
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 48).foreach(i => {
sub.request(1)
c.expectNext() should ===(if (i < 10) i else i + 1)
})
sub.request(1)
c.expectNext(50)
c.expectComplete()
closedCounter.get shouldBe 1
}

"continue with autoCloseable when Strategy is Resume and exception happened on map" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create, (_: AutoCloseable, elem) => elem)
.map(elem => {
if (elem == 10) throw TE("") else elem
})
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 48).foreach(i => {
sub.request(1)
c.expectNext() should ===(if (i < 10) i else i + 1)
})
sub.request(1)
c.expectNext(50)
c.expectComplete()
closedCounter.get shouldBe 1
}

"close and open stream with autocloseable again when Strategy is Restart" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create,
(_: AutoCloseable, elem) => {
if (elem == 10) throw TE("") else elem
})
.withAttributes(supervisionStrategy(restartingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 18).foreach(i => {
sub.request(1)
c.expectNext() should ===(if (i < 10) i else i + 1)
})
sub.cancel()
}

"stop stream with autoCloseable when Strategy is Stop and exception happened" in {
val closedCounter = new AtomicInteger(0)
val create = () =>
new AutoCloseable {
override def close(): Unit = closedCounter.incrementAndGet()
}
val p = Source
.fromIterator(() => (0 to 50).iterator)
.mapWithResource(create,
(_: AutoCloseable, elem) => {
if (elem == 10) throw TE("") else elem
})
.withAttributes(supervisionStrategy(stoppingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[Int]()

p.subscribe(c)
val sub = c.expectSubscription()

(0 to 9).foreach(i => {
sub.request(1)
c.expectNext() shouldBe i
})
sub.request(1)
c.expectError()
closedCounter.get shouldBe 1
}

}
override def afterTermination(): Unit = {
fs.close()
Expand Down
39 changes: 39 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))

/**
* Transform each stream element with the help of a [[AutoCloseable]] resource and close it when the stream finishes or fails.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @since 1.1.0
*/
def mapWithResource[R <: AutoCloseable, T](
create: function.Creator[R],
f: function.Function2[R, Out, T]): javadsl.Flow[In, T, Mat] =
mapWithResource(create, f,
(resource: AutoCloseable) => {
resource.close()
Optional.empty()
})

/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
Expand Down
39 changes: 39 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2541,6 +2541,45 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))

/**
* Transform each stream element with the help of a [[AutoCloseable]] resource and close it when the stream finishes or fails.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ updated. oh thanks for catching!

*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @since 1.1.0
*/
def mapWithResource[R <: AutoCloseable, T](
create: function.Creator[R],
f: function.Function2[R, Out, T]): javadsl.Source[T, Mat] =
mapWithResource(create, f,
(resource: AutoCloseable) => {
resource.close()
Optional.empty()
})

/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
Expand Down
Loading
Loading