본문 바로가기
Hadoop

RDD 가공/처리 함수

by 집못가는프로그래머 2021. 9. 16.

데이터를 RDD객체로 만드는 방법은 SparkSession의 parallelize()함수를 이용한다.

rdd = sparkSession.sparkContext.parallelize (
['scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'])

rdd # ParallelCollectionRDD
rdd2 = rdd.filter(lambda x: 'spark' in x) # PythonRDD[33]
rdd2.collect()

예를들어 위의 코드에서 여러 문자열이 담긴 리스트(List)를 sparkContext의 parallelize()함수에 넣으면

return된 객체는 RDD임을 알 수 있다(rdd 출력 : parallelCllectionRDD)

즉 데이터셋을 넘겨주면 RDD를 객체를 넘긴다.


*RDD를 가공/처리하는 함수에는 여러가지가 있다. 

-filter()   -> filter()함수안에 함수를 넣으면 함수에 맞는 조건의 값만 걸러내는 기능

            -> rdd.filter(lambda x : 'spark' in x)  :  rdd데이터 안에 'spark'라는 문자열을 가진 데이터만 return한다

rdd3 = rdd.map(lambda x : (x, len(x)))
rdd3.collect()

-map() 

  -> map()함수안에 함수를 넣으면 rdd의 배열의 각 원소에 특정하게 가공을하고 그 값을 모아서 RDD객체로 반환한다

  -> rdd.map(lambda x : (x,len(x)))  :  rdd의 각 원소에  (원소, 원소의 길이) 의 튜플형식을 List에 담아서 반환한다

 

from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName('filter-salary').getOrCreate()
lines = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/data/csv/employee.txt')

rdd = lines.map(lambda x : x.split(','))
rdd2 = rdd.filter(lambda x : int(x[2]) >= 40000)
rdd2.collect()

위의 코드는 map()과 filter()를 적용시킨 예이다.

1. map(lambda )함수를 이용해 읽어온 txt파일을 콤마(,) 단위로 분리하고 새로운 RDD객체를 return한다.

2. filter(lambda )함수를 이용해 콤마로 분리된 RDD의 값들 중 row별 2번째 값(0부터 시작)들 중에 40000이상인 데이터만 뽑아낸다. 한 행마다 5개의 컬럼데이터가 있고 그중에 2번째 값만 보고 조건에 충족하면 그 행을 return하는 것이다.

 

 

from operator import add

num = sparkSession.sparkContext.parallelize([1,2,3,4,5])
res = num.reduce(add)  # = num.reduce(lambda x,y : x+y)
print(type(res))
res

- recude()

  -> reduce() 함수를 이용하면 값들을 병렬합산하고 같은 데이터타입을 return한다

  -> res의 type을 보면 int임을 알 수 있다.

  -> reduce(add)의 결과값은 모든 원소를 더한 15이다

num = sparkSession.sparkContext.parallelize([1,2,3,4,5])
result = num.reduce(lambda x, y : x*y)
result

이것은 쉬운 응용인데, 사칙연산을 곱하기(*)로 바꿔주면 모든 값들의 곱셈값을 return한다.

 

a = sparkSession.sparkContext.parallelize( [ ('spark', 1), ('python', 7)])
b = sparkSession.sparkContext.parallelize( [ ('spark', 3), ('python', 4)])
result = a.join(b)
result.collect()

- join()

  -> parallelize()에 넘겨주는 데이터셋이 마치 DataFrame의 컬럼명과 필드값으로 있을 때 두 RDD를 join하면 한개  데이터프레임의 한 컬럼에 값이 여러개가 있는것 처럼 합쳐진다.

 

 

 

댓글