https://www.supergloo.com/fieldnotes/apache-spark-examples-of-transformations/
조음
MAP(FUNC)
What does it do? Pass each element of the RDD through the supplied function; i.e. func
| scala> val rows = babyNames.map(line => line.split(",")) rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[360] at map at <console>:14 |
What did this example do? Iterates over every line in the babyNames RDD (originally created from baby_names.csv file) and splits into new RDD of Arrays of Strings. The arrays contain a String separated by comma characters in the source RDD (CSV).
Back to Top
FLATMAP(FUNC)
“Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).”
Compare flatMap to map in the following
| scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)).collect res200: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3) scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)).collect res201: Array[List[Int]] = Array(List(1, 1, 1), List(2, 2, 2), List(3, 3, 3)) |
flatMap
is helpful with nested datasets. It may be beneficial to think of the RDD source as hierarchical JSON (which may have been converted to case classes or nested collections). This is unlike CSV which has no hierarchical structural.
By the way, these examples may blur the line between Scala and Spark for you. These examples highlight Scala and not necessarily Spark. In a sense, the only Spark specific portion of this code example is the use of
parallelize
from a SparkContext. When calling
parallelize
, the elements of the collection are copied to form a distributed dataset that can be operated on in parallel. Being able to operate in parallel is a Spark feature.
Adding
collect
to both the flatMap
and map
results was shown for clarity. We can focus on Spark aspect (re: the RDD return type) of the example if we don’t use
collect
as seen in the following:
| scala> sc.parallelize(List(1,2,3)).flatMap(x=>List(x,x,x)) res202: org.apache.spark.rdd.RDD[Int] = FlatMappedRDD[373] at flatMap at <console>:13 scala> sc.parallelize(List(1,2,3)).map(x=>List(x,x,x)) res203: org.apache.spark.rdd.RDD[List[Int]] = MappedRDD[375] at map at <console>:13 |
Formal API sans implicit: flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]
Back to Top
FILTER(FUNC)
Filter creates a new RDD by passing in the supplied func used to filter the results. For those people with relational database background or coming from a SQL perspective, it may be helpful think of
filter
as the
where
clause in a SQL statement.
Spark filter examples
| val file = sc.textFile("catalina.out") val errors = file.filter(line => line.contains("ERROR")) |
Formal API: filter(f: (T) ⇒ Boolean): RDD[T]
Back to Top
MAPPARTITIONS(FUNC)
Consider mapPartitions
a tool for performance optimization if you have the horsepower. It won’t do much for you when running examples on your local machine compared to running across a cluster. It’s the same as map
, but works with Spark RDD partitions. Remember the first D in RDD is “Distributed” – Resilient Distributed Datasets. Or, put another way, you could say it is distributed over partitions.
| // from laptop scala> val parallel = sc.parallelize(1 to 9, 3) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[450] at parallelize at <console>:12 scala> parallel.mapPartitions( x => List(x.next).iterator).collect res383: Array[Int] = Array(1, 4, 7) // compare to the same, but with default parallelize scala> val parallel = sc.parallelize(1 to 9) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[452] at parallelize at <console>:12 scala> parallel.mapPartitions( x => List(x.next).iterator).collect res384: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8) |
API: mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0:ClassTag[U]): RDD[U]
Back to Top
MAPPARTITIONSWITHINDEX(FUNC)
Similar to mapPartitions, but also provides a function with an Int value to indicate the index position of the partition.
| scala> val parallel = sc.parallelize(1 to 9) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[455] at parallelize at <console>:12 scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect res389: Array[String] = Array(0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 7, 9) |
When learning these APIs on an individual laptop or desktop, it might be helpful to show differences in capabilities and outputs. For example, if we change the above example to use a parallelize’d list with 3 slices, our output changes significantly:
| scala> val parallel = sc.parallelize(1 to 9, 3) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[457] at parallelize at <console>:12 scala> parallel.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", "+x).iterator).collect res390: Array[String] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9) |
Formal API signature (implicts stripped) and definition from Spark Scala API docs:
mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
SAMPLE(WITHREPLACEMENT,FRACTION, SEED)
Return a random sample subset RDD of the input RDD
| scala> val parallel = sc.parallelize(1 to 9) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[470] at parallelize at <console>:12 scala> parallel.sample(true,.2).count res403: Long = 3 scala> parallel.sample(true,.2).count res404: Long = 2 scala> parallel.sample(true,.1) res405: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[473] at sample at <console>:15 |
Formal API: (withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
Back to Top
THE NEXT THREE (AKA: HAMMER TIME)
Stop. Hammer Time. The next three functions (union, intersection and distinct) really play well off of each other when describing. Can’t Touch this.
UNION(A DIFFERENT RDD)
Simple. Return the union of two RDDs
| scala> val parallel = sc.parallelize(1 to 9) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12 scala> val par2 = sc.parallelize(5 to 15) par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12 scala> parallel.union(par2).collect res408: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15) |
Back to Top
INTERSECTION(A DIFFERENT RDD)
Simple. Similar to union but return the intersection of two RDDs
| scala> val parallel = sc.parallelize(1 to 9) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12 scala> val par2 = sc.parallelize(5 to 15) par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12 scala> parallel.intersection(par2).collect res409: Array[Int] = Array(8, 9, 5, 6, 7) |
Formal API: intersection(other: RDD[T]): RDD[T]
Back to Top
DISTINCT([NUMTASKS])
Another simple one. Return a new RDD with distinct elements within a source RDD
| scala> val parallel = sc.parallelize(1 to 9) parallel: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[477] at parallelize at <console>:12 scala> val par2 = sc.parallelize(5 to 15) par2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[478] at parallelize at <console>:12 scala> parallel.union(par2).distinct.collect res412: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15) |
Formal API: distinct(): RDD[T]
Back to Top
THE KEYS
The group of transformation functions (groupByKey, reduceByKey, aggregateByKey, sortByKey, join) all act on key,value RDDs. So, this section will be known as “The Keys”. Cool name, huh? Well, not really, but it sounded much better than The Keys and the Values which for some unexplained reason, triggers memories of “The Young and the Restless”.)
The following key functions are available though org.apache.spark.rdd.PairRDDFunctions which are operations available only on RDDs of key-value pairs. “These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when you importorg.apache.spark.SparkContext._.”
For the following, we’re going to use the baby_names.csv file introduced in previous post What is Apache Spark?
All the following examples presume the baby_names.csv file has been loaded and split such as:
| scala> val babyNames = sc.textFile("baby_names.csv") babyNames: org.apache.spark.rdd.RDD[String] = baby_names.csv MappedRDD[495] at textFile at <console>:12 scala> val rows = babyNames.map(line => line.split(",")) rows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[496] at map at <console>:14 |
Back to Top
GROUPBYKEY([NUMTASKS])
“When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. ”
The following groups all names to counties in which they appear over the years.
| scala> val namesToCounties = rows.map(name => (name(1),name(2))) namesToCounties: org.apache.spark.rdd.RDD[(String, String)] = MappedRDD[513] at map at <console>:16 scala> namesToCounties.groupByKey.collect res429: Array[(String, Iterable[String])] = Array((BRADEN,CompactBuffer(SUFFOLK, SARATOGA, SUFFOLK, ERIE, SUFFOLK, SUFFOLK, ERIE)), (MATTEO,CompactBuffer(NEW YORK, SUFFOLK, NASSAU, KINGS, WESTCHESTER, WESTCHESTER, KINGS, SUFFOLK, NASSAU, QUEENS, QUEENS, NEW YORK, NASSAU, QUEENS, KINGS, SUFFOLK, WESTCHESTER, WESTCHESTER, SUFFOLK, KINGS, NASSAU, QUEENS, SUFFOLK, NASSAU, WESTCHESTER)), (HAZEL,CompactBuffer(ERIE, MONROE, KINGS, NEW YORK, KINGS, MONROE, NASSAU, SUFFOLK, QUEENS, KINGS, SUFFOLK, NEW YORK, KINGS, SUFFOLK)), (SKYE,CompactBuffer(NASSAU, KINGS, MONROE, BRONX, KINGS, KINGS, NASSAU)), (JOSUE,CompactBuffer(SUFFOLK, NASSAU, WESTCHESTER, BRONX, KINGS, QUEENS, SUFFOLK, QUEENS, NASSAU, WESTCHESTER, BRONX, BRONX, QUEENS, SUFFOLK, KINGS, WESTCHESTER, QUEENS, NASSAU, SUFFOLK, BRONX, KINGS, ... |
The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?
Back to Top
REDUCEBYKEY(FUNC, [NUMTASKS])
Operates on (K,V) pairs of course, but the func must be of type (V,V) => V
Let’s sum the yearly name counts over the years in the CSV. Notice we need to filter out the header row.
| scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14 scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR... |
Formal API: reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
The above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?
Back to Top
AGGREGATEBYKEY(ZEROVALUE)(SEQOP, COMBOP, [NUMTASKS])
Ok, I admit, this one drives me a bit nuts. Why wouldn’t we just use reduceByKey? I don’t feel smart enough to know when to use aggregateByKey over reduceByKey. For example, the same results may be produced:
| scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14 scala> filteredRows.map(n => (n(1),n(4).toInt)).reduceByKey((v1,v2) => v1 + v2).collect res452: Array[(String, Int)] = Array((BRADEN,39), (MATTEO,279), (HAZEL,133), (SKYE,63), (JOSUE,404), (RORY,12), (NAHLA,16), (ASIA,6), (MEGAN,581), (HINDY,254), (ELVIN,26), (AMARA,10), (CHARLOTTE,1737), (BELLA,672), (DANTE,246), (PAUL,712), (EPHRAIM,26), (ANGIE,295), (ANNABELLA,38), (DIAMOND,16), (ALFONSO,6), (MELISSA,560), (AYANNA,11), (ANIYAH,365), (DINAH,5), (MARLEY,32), (OLIVIA,6467), (MALLORY,15), (EZEQUIEL,13), (ELAINE,116), (ESMERALDA,71), (SKYLA,172), (EDEN,199), (MEGHAN,128), (AHRON,29), (KINLEY,5), (RUSSELL,5), (TROY,88), (MORDECHAI,521), (JALIYAH,10), (AUDREY,690), (VALERIE,584), (JAYSON,285), (SKYLER,26), (DASHIELL,24), (SHAINDEL,17), (AURORA,86), (ANGELY,5), (ANDERSON,369), (SHMUEL,315), (MARCO,370), (AUSTIN,1345), (MITCHELL,12), (SELINA,187), (FATIMA,421), (CESAR,292), (CAR... scala> filteredRows.map ( n => (n(1), n(4))).aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).sortBy(_._2).collect |
And again, the above example was created from baby_names.csv file which was introduced in previous post What is Apache Spark?
There’s a gist of aggregateByKey as well.
Back to Top
SORTBYKEY([ASCENDING], [NUMTASKS])
This simply sorts the (K,V) pair by K. Try it out. See examples above on where babyNames originates.
| scala> val filteredRows = babyNames.filter(line => !line.contains("Count")).map(line => line.split(",")) filteredRows: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[546] at map at <console>:14 scala> filteredRows.map ( n => (n(1), n(4))).sortByKey().foreach (println _) scala> filteredRows.map ( n => (n(1), n(4))).sortByKey(false).foreach (println _) // opposite order |
Back to Top
JOIN(OTHERDATASET, [NUMTASKS])
If you have relational database experience, this will be easy. It’s joining of two datasets. Other joins are available as well such as leftOuterJoin and rightOuterJoin.
| scala> val names1 = sc.parallelize(List("abe", "abby", "apple")).map(a => (a, 1)) names1: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1441] at map at <console>:14 scala> val names2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1)) names2: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1443] at map at <console>:14 scala> names1.join(names2).collect res735: Array[(String, (Int, Int))] = Array((apple,(1,1))) scala> names1.leftOuterJoin(names2).collect res736: Array[(String, (Int, Option[Int]))] = Array((abby,(1,None)), (apple,(1,Some(1))), (abe,(1,None))) scala> names1.rightOuterJoin(names2).collect res737: Array[(String, (Option[Int], Int))] = Array((apple,(Some(1),1)), (beatty,(None,1)), (beatrice,(None,1))) |
댓글