데이터를 RDD객체로 만드는 방법은 SparkSession의 parallelize()함수를 이용한다.
rdd = sparkSession.sparkContext.parallelize (
['scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'])
rdd # ParallelCollectionRDD
rdd2 = rdd.filter(lambda x: 'spark' in x) # PythonRDD[33]
rdd2.collect()
예를들어 위의 코드에서 여러 문자열이 담긴 리스트(List)를 sparkContext의 parallelize()함수에 넣으면
return된 객체는 RDD임을 알 수 있다(rdd 출력 : parallelCllectionRDD)
즉 데이터셋을 넘겨주면 RDD를 객체를 넘긴다.
*RDD를 가공/처리하는 함수에는 여러가지가 있다.
-filter() -> filter()함수안에 함수를 넣으면 함수에 맞는 조건의 값만 걸러내는 기능
-> rdd.filter(lambda x : 'spark' in x) : rdd데이터 안에 'spark'라는 문자열을 가진 데이터만 return한다
rdd3 = rdd.map(lambda x : (x, len(x)))
rdd3.collect()
-map()
-> map()함수안에 함수를 넣으면 rdd의 배열의 각 원소에 특정하게 가공을하고 그 값을 모아서 RDD객체로 반환한다
-> rdd.map(lambda x : (x,len(x))) : rdd의 각 원소에 (원소, 원소의 길이) 의 튜플형식을 List에 담아서 반환한다
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName('filter-salary').getOrCreate()
lines = sparkSession.sparkContext.textFile('hdfs://localhost:9000/user/data/csv/employee.txt')
rdd = lines.map(lambda x : x.split(','))
rdd2 = rdd.filter(lambda x : int(x[2]) >= 40000)
rdd2.collect()

위의 코드는 map()과 filter()를 적용시킨 예이다.
1. map(lambda )함수를 이용해 읽어온 txt파일을 콤마(,) 단위로 분리하고 새로운 RDD객체를 return한다.
2. filter(lambda )함수를 이용해 콤마로 분리된 RDD의 값들 중 row별 2번째 값(0부터 시작)들 중에 40000이상인 데이터만 뽑아낸다. 한 행마다 5개의 컬럼데이터가 있고 그중에 2번째 값만 보고 조건에 충족하면 그 행을 return하는 것이다.
from operator import add
num = sparkSession.sparkContext.parallelize([1,2,3,4,5])
res = num.reduce(add) # = num.reduce(lambda x,y : x+y)
print(type(res))
res

- recude()
-> reduce() 함수를 이용하면 값들을 병렬합산하고 같은 데이터타입을 return한다
-> res의 type을 보면 int임을 알 수 있다.
-> reduce(add)의 결과값은 모든 원소를 더한 15이다
num = sparkSession.sparkContext.parallelize([1,2,3,4,5])
result = num.reduce(lambda x, y : x*y)
result

이것은 쉬운 응용인데, 사칙연산을 곱하기(*)로 바꿔주면 모든 값들의 곱셈값을 return한다.
a = sparkSession.sparkContext.parallelize( [ ('spark', 1), ('python', 7)])
b = sparkSession.sparkContext.parallelize( [ ('spark', 3), ('python', 4)])
result = a.join(b)
result.collect()
- join()
-> parallelize()에 넘겨주는 데이터셋이 마치 DataFrame의 컬럼명과 필드값으로 있을 때 두 RDD를 join하면 한개 데이터프레임의 한 컬럼에 값이 여러개가 있는것 처럼 합쳐진다.
'Hadoop' 카테고리의 다른 글
DataFrame과 Schema(스키마) (feat. Spark (0) | 2021.09.16 |
---|---|
RDD -> DataFrame(feat. Spark) (0) | 2021.09.16 |
RDD와 비정형데이터(feat. Spark) (0) | 2021.09.15 |
subprocess 모듈 (feat. python) (0) | 2021.09.15 |
Arrow 모듈과 DataFrame(feat. Pandas, Spark) (0) | 2021.09.15 |
댓글