Skip to content

Commit cfa40bd

Browse files
committedAug 5, 2020
i2i transformations exercises
1 parent 2b57396 commit cfa40bd

File tree

1 file changed

+71
-0
lines changed

1 file changed

+71
-0
lines changed
 

‎src/main/scala/part5rddtransformations/I2ITransformations.scala

+71
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,77 @@ object I2ITransformations {
7171
result.foreach(println)
7272
}
7373

74+
/**
75+
* Exercises
76+
*/
77+
78+
def printTopMetricsEx1() = {
79+
/*
80+
Better than the "dummy" approach
81+
- not sorting the entire RDD
82+
83+
Bad (worse than the optimal)
84+
- sorting the entire partition
85+
- forcing the iterator in memory - this can OOM your executors
86+
*/
87+
val topMetrics = readMetrics()
88+
.mapPartitions(_.toList.sortBy(_._2).take(LIMIT).toIterator)
89+
.repartition(1)
90+
.mapPartitions(_.toList.sortBy(_._2).take(LIMIT).toIterator)
91+
.take(LIMIT)
92+
93+
topMetrics.foreach(println)
94+
}
95+
96+
/*
97+
Better than ex1
98+
- extracting top 10 values per partition instead of sorting the entire partition
99+
100+
Bad because
101+
- forcing toList can OOM your executors
102+
- iterating over the list twice
103+
- if the list is immutable, time spent allocating objects (and GC)
104+
*/
105+
def printTopMetricsEx2() = {
106+
val topMetrics = readMetrics()
107+
.mapPartitions { records =>
108+
109+
implicit val ordering: Ordering[(String, Double)] = Ordering.by[(String, Double), Double](_._2)
110+
val limitedCollection = new mutable.TreeSet[(String, Double)]()
111+
112+
records.toList.foreach { record =>
113+
limitedCollection.add(record)
114+
if (limitedCollection.size > LIMIT) {
115+
limitedCollection.remove(limitedCollection.last)
116+
}
117+
}
118+
119+
// I've traversed the iterator
120+
121+
limitedCollection.toIterator
122+
}
123+
.repartition(1)
124+
.mapPartitions { records =>
125+
126+
implicit val ordering: Ordering[(String, Double)] = Ordering.by[(String, Double), Double](_._2)
127+
val limitedCollection = new mutable.TreeSet[(String, Double)]()
128+
129+
records.toList.foreach { record =>
130+
limitedCollection.add(record)
131+
if (limitedCollection.size > LIMIT) {
132+
limitedCollection.remove(limitedCollection.last)
133+
}
134+
}
135+
136+
// I've traversed the iterator
137+
138+
limitedCollection.toIterator
139+
}
140+
.take(LIMIT)
141+
142+
topMetrics.foreach(println)
143+
}
144+
74145
def main(args: Array[String]): Unit = {
75146
printTopMetrics()
76147
printTopMetricsI2I()

0 commit comments

Comments
 (0)
Please sign in to comment.