본문 바로가기
개발/HADOOP_SPARK_ETC

SPARK aggregate() 함수 설명 및 진행 과정

by 로그인시러 2017. 2. 16.

책에서 봤을 때 내부 과정이 이해가 안 되었음.

 

그래서 찾아봤더니 이런 심오한 ....

 

일단 이 함수를 이해하는데 기억해야 할 점은 2가지...

 

1) rdd 데이터 타입과 action 결과 타입이 다를 경우 사용한다.

 

2) 뻔한 얘기지만, spark 는 분산처리 프레임웍이다. 각 파티션이 분산되어있어, 연산을 할 때는 파티션 단위의 연산과 각 연산된 결과를 합치는 과정을 거치게 된다.

 

So let us now have that look at the signature: Page on apache.org


def aggregate[U](zeroValue: U)(seqOp: (U, T) U, combOp: (U, U) U)(implicit arg0: ClassTag[U]): U

 

위와 같다.

 

seqOp: (U, T)  U

 

기억해야 할 것 1)을 보자.

rdd 의 데이터 타입 T를 연산 결과 데이터 타입 U로 변경 및 연산을 수행하는 함수다.

 

이 함수는 전역적인 작업이 아니라 각 노드 로컬 파티션에서 수행된다.

 

zeroValue

 

이것은 각 파티션에서 누적해야 될 값의 시작 값이다.

밑에 예제에서 정확히 확인할 수 있다.

 

combOp: (U, U)  U)

 

각 파티션에서 seqOp 의 작업이 끝났으면, 합치는 함수이다.

 

실제 수행과정의 예를 들면,

배열 [1,2,3,4] 의 평균을 내기 위해서 각 원소를 합하고 원소의 갯수를 세는 예제이다.

 

>>> sc.parallelize([1,2,3,4]).aggregate(
... (0,0),
... (lambda acc, value : (acc[0] + value, acc[1] + 1)),   ----- (1)
... (lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1]))) ----- (2)
(10, 4)

 

(1) 번 함수의 acc 는 초기값으로 0이 셋팅된다.

그리고, 여기에 지속적으로  + value 된다. 

 

물론, 각 파티션 별로 위 함수는 진행된다.

 

각 파티션의 결과는 combine 되어야 하는데, 그것은 (2) 를 통해서 수행된다.

 

그래서, (10, 4) 가 나온다.

 

10/4 하면 평균이 나온다 ... 

 

또 한 가지 예는 문자의 갯수를 카운트하는 것이다.

 

이건, stackoverflow 의 답변을 그냥 복붙 하겠다.

 

 

 

1) The first operation will transform strings into size (int) and accumulate the values for size.

val stringSizeCummulator: (Int, String) => Int  = (total, string) => total + string.lenght`

2) provide the ZERO for the addition operation (0)

val ZERO = 0

3) an operation to add two integers together:

val add: (Int, Int) => Int = _ + _

Putting it all together:

rdd.aggregate(ZERO, stringSizeCummulator, add)

So, why is the ZERO needed? When the cummulator function is applied to the first element of a partition, there's no running total. ZERO is used here.

Eg. My RDD is: - Partition 1: ["Jump", "over"] - Partition 2: ["the", "wall"]

This will result:

P1:

  1. stringSizeCummulator(ZERO, "Jump") = 4
  2. stringSizeCummulator(4, "over") = 8

P2:

  1. stringSizeCummulator(ZERO, "the") = 3
  2. stringSizeCummulator(3, "wall") = 7

Reduce: add(P1, P2) = 15

위 설명을 실제로 테스트하면

 

>>> sc.parallelize(['jump', 'over','the', 'wall']).aggregate(
... (0),
... (lambda acc, value : (acc + len(value))),
... (lambda acc1, acc2 : (acc1 + acc2)))
15

 

그리고, 참고했던 설명도 복붙...

Great question! Aggregate and aggregateByKey can be a bit more complex than reduce and reduceByKey.

Basically, the idea with aggregate is to provide an extremely general way of combining your data in some way. In Spark, your data is stored in different partitions. With the aggregate method, you specify a zero value, a function that determines how data within a partition will be combined, and a function that determines how the output from each of the partitions will be combined.

 

The reduce method is simpler because it doesn’t distinguish between whether the data is in the same partition or not, it is simply about combining each pair of data items into one item.

 

출처 및 참조)

 http://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark

https://www.quora.com/How-does-the-aggregate-function-works-in-Apache-spark-using-Python

 

'개발 > HADOOP_SPARK_ETC' 카테고리의 다른 글

spark codeing 시 유의사항  (0) 2017.02.22
SPARK 의 헷갈림 reduce(), fold()  (0) 2017.02.16
SPARK reduce() 개념도  (0) 2017.02.16
RDD 기본 함수 예제  (0) 2017.02.15
SPARK - RDD [펌]  (0) 2017.02.14

댓글