Skip to content

Commit 7de1fb2

Browse files
authored
feat: Add AutoCloseable shortcut on mapWithResource (#1053)
* feat: Add AutoCloseable shortcut on mapWithResource * Enhance test to check resource is closed after stream is completed * Enhance comment * Update doc * Address comment * Add resume, restart, stop strategy test * Address comment * Fix doc * Fix typo
1 parent 7dd5d73 commit 7de1fb2

File tree

9 files changed

+420
-2
lines changed

9 files changed

+420
-2
lines changed

docs/src/main/paradox/stream/operators/Source-or-Flow/mapWithResource.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ Map elements with the help of a resource that can be opened, transform each elem
66

77
## Signature
88

9-
@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" }
9+
@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource[S,T](create:()=%3ES)(f:(S,Out)=%3ET,close:S=%3EOption[T]):Repr[T]" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" }
1010
1. `create`: Open or Create the resource.
1111
2. `f`: Transform each element inputs with the help of resource.
1212
3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.
1313

14+
@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource[S%3C:AutoCloseable,T](create:()=%3ES,f:(S,Out)=%3ET):Repr[T]" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2)" }
15+
1. `create`: Open or Create the autocloseable resource.
16+
2. `f`: Transform each element inputs with the help of resource.
17+
1418
## Description
1519

1620
Transform each stream element with the help of a resource.

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java

+16
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
import java.util.*;
3939
import java.util.concurrent.atomic.AtomicBoolean;
40+
import java.util.concurrent.atomic.AtomicInteger;
4041
import java.util.function.Supplier;
4142
import java.util.concurrent.CompletableFuture;
4243
import java.util.concurrent.CompletionStage;
@@ -237,6 +238,21 @@ public void mustBeAbleToUseMapWithResource() {
237238
Assert.assertFalse(gate.get());
238239
}
239240

241+
@Test
242+
public void mustBeAbleToUseMapWithAutoCloseableResource() {
243+
final TestKit probe = new TestKit(system);
244+
final AtomicInteger closed = new AtomicInteger();
245+
Source.from(Arrays.asList("1", "2", "3"))
246+
.via(
247+
Flow.of(String.class)
248+
.mapWithResource(
249+
() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem))
250+
.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system);
251+
252+
probe.expectMsgAllOf("1", "2", "3");
253+
Assert.assertEquals(closed.get(), 1);
254+
}
255+
240256
@Test
241257
public void mustBeAbleToUseFoldWhile() throws Exception {
242258
final int result =

stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.*;
4848
import java.util.concurrent.*;
4949
import java.util.concurrent.atomic.AtomicBoolean;
50+
import java.util.concurrent.atomic.AtomicInteger;
5051
import java.util.stream.Collectors;
5152
import java.util.stream.IntStream;
5253
import java.util.stream.Stream;
@@ -815,6 +816,18 @@ public void mustBeAbleToUseMapWithResource() {
815816
Assert.assertFalse(gate.get());
816817
}
817818

819+
@Test
820+
public void mustBeAbleToUseMapWithAutoCloseableResource() {
821+
final TestKit probe = new TestKit(system);
822+
final AtomicInteger closed = new AtomicInteger();
823+
Source.from(Arrays.asList("1", "2", "3"))
824+
.mapWithResource(() -> (AutoCloseable) closed::incrementAndGet, (resource, elem) -> elem)
825+
.runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), system);
826+
827+
probe.expectMsgAllOf("1", "2", "3");
828+
Assert.assertEquals(closed.get(), 1);
829+
}
830+
818831
@Test
819832
public void mustBeAbleToUseFoldWhile() throws Exception {
820833
final int result =

stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowMapWithResourceSpec.scala

+193-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.pekko
3535
import pekko.Done
3636
import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer }
3737
import pekko.stream.ActorAttributes.supervisionStrategy
38-
import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
38+
import pekko.stream.Supervision.{ restartingDecider, resumingDecider, stoppingDecider }
3939
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
4040
import pekko.stream.impl.StreamSupervisor.Children
4141
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
@@ -410,6 +410,198 @@ class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
410410
Await.result(promise.future, 3.seconds) shouldBe Done
411411
}
412412

413+
"will close the autocloseable resource when upstream complete" in {
414+
val closedCounter = new AtomicInteger(0)
415+
val create = () =>
416+
new AutoCloseable {
417+
override def close(): Unit = closedCounter.incrementAndGet()
418+
}
419+
val (pub, sub) = TestSource
420+
.probe[Int]
421+
.mapWithResource(create, (_: AutoCloseable, count) => count)
422+
.toMat(TestSink.probe)(Keep.both)
423+
.run()
424+
sub.expectSubscription().request(2)
425+
closedCounter.get shouldBe 0
426+
pub.sendNext(1)
427+
sub.expectNext(1)
428+
closedCounter.get shouldBe 0
429+
pub.sendComplete()
430+
sub.expectComplete()
431+
closedCounter.get shouldBe 1
432+
}
433+
434+
"will close the autocloseable resource when upstream fail" in {
435+
val closedCounter = new AtomicInteger(0)
436+
val create = () =>
437+
new AutoCloseable {
438+
override def close(): Unit = closedCounter.incrementAndGet()
439+
}
440+
val (pub, sub) = TestSource
441+
.probe[Int]
442+
.mapWithResource(create, (_: AutoCloseable, count) => count)
443+
.toMat(TestSink.probe)(Keep.both)
444+
.run()
445+
sub.expectSubscription().request(2)
446+
closedCounter.get shouldBe 0
447+
pub.sendNext(1)
448+
sub.expectNext(1)
449+
closedCounter.get shouldBe 0
450+
pub.sendError(ex)
451+
sub.expectError(ex)
452+
closedCounter.get shouldBe 1
453+
}
454+
455+
"will close the autocloseable resource when downstream cancel" in {
456+
val closedCounter = new AtomicInteger(0)
457+
val create = () =>
458+
new AutoCloseable {
459+
override def close(): Unit = closedCounter.incrementAndGet()
460+
}
461+
val (pub, sub) = TestSource
462+
.probe[Int]
463+
.mapWithResource(create, (_: AutoCloseable, count) => count)
464+
.toMat(TestSink.probe)(Keep.both)
465+
.run()
466+
val subscription = sub.expectSubscription()
467+
subscription.request(2)
468+
closedCounter.get shouldBe 0
469+
pub.sendNext(1)
470+
sub.expectNext(1)
471+
closedCounter.get shouldBe 0
472+
subscription.cancel()
473+
pub.expectCancellation()
474+
closedCounter.get shouldBe 1
475+
}
476+
477+
"will close the autocloseable resource when downstream fail" in {
478+
val closedCounter = new AtomicInteger(0)
479+
val create = () =>
480+
new AutoCloseable {
481+
override def close(): Unit = closedCounter.incrementAndGet()
482+
}
483+
val (pub, sub) = TestSource
484+
.probe[Int]
485+
.mapWithResource(create, (_: AutoCloseable, count) => count)
486+
.toMat(TestSink.probe)(Keep.both)
487+
.run()
488+
sub.request(2)
489+
closedCounter.get shouldBe 0
490+
pub.sendNext(1)
491+
sub.expectNext(1)
492+
closedCounter.get shouldBe 0
493+
sub.cancel(ex)
494+
pub.expectCancellationWithCause(ex)
495+
closedCounter.get shouldBe 1
496+
}
497+
498+
"will close the autocloseable resource on abrupt materializer termination" in {
499+
val closedCounter = new AtomicInteger(0)
500+
@nowarn("msg=deprecated")
501+
val mat = ActorMaterializer()
502+
val promise = Promise[Done]()
503+
val create = () =>
504+
new AutoCloseable {
505+
override def close(): Unit = {
506+
closedCounter.incrementAndGet()
507+
promise.complete(Success(Done))
508+
}
509+
}
510+
val matVal = Source
511+
.single(1)
512+
.mapWithResource(create, (_: AutoCloseable, count) => count)
513+
.runWith(Sink.never)(mat)
514+
closedCounter.get shouldBe 0
515+
mat.shutdown()
516+
matVal.failed.futureValue shouldBe an[AbruptTerminationException]
517+
Await.result(promise.future, 3.seconds) shouldBe Done
518+
closedCounter.get shouldBe 1
519+
}
520+
521+
"continue with autoCloseable when Strategy is Resume and exception happened" in {
522+
val closedCounter = new AtomicInteger(0)
523+
val create = () =>
524+
new AutoCloseable {
525+
override def close(): Unit = closedCounter.incrementAndGet()
526+
}
527+
val p = Source
528+
.fromIterator(() => (0 to 50).iterator)
529+
.mapWithResource(create,
530+
(_: AutoCloseable, elem) => {
531+
if (elem == 10) throw TE("") else elem
532+
})
533+
.withAttributes(supervisionStrategy(resumingDecider))
534+
.runWith(Sink.asPublisher(false))
535+
val c = TestSubscriber.manualProbe[Int]()
536+
537+
p.subscribe(c)
538+
val sub = c.expectSubscription()
539+
540+
(0 to 48).foreach(i => {
541+
sub.request(1)
542+
c.expectNext() should ===(if (i < 10) i else i + 1)
543+
})
544+
sub.request(1)
545+
c.expectNext(50)
546+
c.expectComplete()
547+
closedCounter.get shouldBe 1
548+
}
549+
550+
"close and open stream with autocloseable again when Strategy is Restart" in {
551+
val closedCounter = new AtomicInteger(0)
552+
val create = () =>
553+
new AutoCloseable {
554+
override def close(): Unit = closedCounter.incrementAndGet()
555+
}
556+
val p = Source
557+
.fromIterator(() => (0 to 50).iterator)
558+
.mapWithResource(create,
559+
(_: AutoCloseable, elem) => {
560+
if (elem == 10 || elem == 20) throw TE("") else elem
561+
})
562+
.withAttributes(supervisionStrategy(restartingDecider))
563+
.runWith(Sink.asPublisher(false))
564+
val c = TestSubscriber.manualProbe[Int]()
565+
566+
p.subscribe(c)
567+
val sub = c.expectSubscription()
568+
569+
(0 to 30).filter(i => i != 10 && i != 20).foreach(i => {
570+
sub.request(1)
571+
c.expectNext() shouldBe i
572+
closedCounter.get should ===(if (i < 10) 0 else if (i < 20) 1 else 2)
573+
})
574+
sub.cancel()
575+
}
576+
577+
"stop stream with autoCloseable when Strategy is Stop and exception happened" in {
578+
val closedCounter = new AtomicInteger(0)
579+
val create = () =>
580+
new AutoCloseable {
581+
override def close(): Unit = closedCounter.incrementAndGet()
582+
}
583+
val p = Source
584+
.fromIterator(() => (0 to 50).iterator)
585+
.mapWithResource(create,
586+
(_: AutoCloseable, elem) => {
587+
if (elem == 10) throw TE("") else elem
588+
})
589+
.withAttributes(supervisionStrategy(stoppingDecider))
590+
.runWith(Sink.asPublisher(false))
591+
val c = TestSubscriber.manualProbe[Int]()
592+
593+
p.subscribe(c)
594+
val sub = c.expectSubscription()
595+
596+
(0 to 9).foreach(i => {
597+
sub.request(1)
598+
c.expectNext() shouldBe i
599+
})
600+
sub.request(1)
601+
c.expectError()
602+
closedCounter.get shouldBe 1
603+
}
604+
413605
}
414606
override def afterTermination(): Unit = {
415607
fs.close()

stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala

+39
Original file line numberDiff line numberDiff line change
@@ -828,6 +828,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
828828
(resource, out) => f(resource, out),
829829
resource => close.apply(resource).toScala))
830830

831+
/**
832+
* Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails.
833+
*
834+
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
835+
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
836+
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
837+
*
838+
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
839+
*
840+
* Early completion can be done with combination of the [[takeWhile]] operator.
841+
*
842+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
843+
*
844+
* You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
845+
* set it for a given Source by using [[ActorAttributes]].
846+
*
847+
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
848+
*
849+
* '''Backpressures when''' downstream backpressures
850+
*
851+
* '''Completes when''' upstream completes
852+
*
853+
* '''Cancels when''' downstream cancels
854+
*
855+
* @tparam R the type of the resource
856+
* @tparam T the type of the output elements
857+
* @param create function that creates the resource
858+
* @param f function that transforms the upstream element and the resource to output element
859+
* @since 1.1.0
860+
*/
861+
def mapWithResource[R <: AutoCloseable, T](
862+
create: function.Creator[R],
863+
f: function.Function2[R, Out, T]): javadsl.Flow[In, T, Mat] =
864+
mapWithResource(create, f,
865+
(resource: AutoCloseable) => {
866+
resource.close()
867+
Optional.empty()
868+
})
869+
831870
/**
832871
* Transform each input element into an `Iterable` of output elements that is
833872
* then flattened into the output stream. The transformation is meant to be stateful,

stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala

+39
Original file line numberDiff line numberDiff line change
@@ -2541,6 +2541,45 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
25412541
(resource, out) => f(resource, out),
25422542
resource => close.apply(resource).toScala))
25432543

2544+
/**
2545+
* Transform each stream element with the help of an [[AutoCloseable]] resource and close it when the stream finishes or fails.
2546+
*
2547+
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
2548+
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
2549+
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
2550+
*
2551+
* The [[AutoCloseable]] resource is closed only once when the upstream or downstream finishes or fails.
2552+
*
2553+
* Early completion can be done with combination of the [[takeWhile]] operator.
2554+
*
2555+
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
2556+
*
2557+
* You can configure the default dispatcher for this Source by changing the `pekko.stream.materializer.blocking-io-dispatcher` or
2558+
* set it for a given Source by using [[ActorAttributes]].
2559+
*
2560+
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
2561+
*
2562+
* '''Backpressures when''' downstream backpressures
2563+
*
2564+
* '''Completes when''' upstream completes
2565+
*
2566+
* '''Cancels when''' downstream cancels
2567+
*
2568+
* @tparam R the type of the resource
2569+
* @tparam T the type of the output elements
2570+
* @param create function that creates the resource
2571+
* @param f function that transforms the upstream element and the resource to output element
2572+
* @since 1.1.0
2573+
*/
2574+
def mapWithResource[R <: AutoCloseable, T](
2575+
create: function.Creator[R],
2576+
f: function.Function2[R, Out, T]): javadsl.Source[T, Mat] =
2577+
mapWithResource(create, f,
2578+
(resource: AutoCloseable) => {
2579+
resource.close()
2580+
Optional.empty()
2581+
})
2582+
25442583
/**
25452584
* Transform each input element into an `Iterable` of output elements that is
25462585
* then flattened into the output stream. The transformation is meant to be stateful,

0 commit comments

Comments
 (0)