본문 바로가기
Hadoop

RDD -> DataFrame(feat. Spark)

by 집못가는프로그래머 2021. 9. 16.

RDD에서 DataFrame으로 변환할 때에는 RDD에 포함된 모든 행을 Row오브젝트로 변환하는 과정이 요구된다.

*각 컬럼의 스키마가 추론할 수 있는 질서있는 데이터셋이라면 RDD.toDF()를 통해 데이터프레임으로 바로 변환 가능하다.

from pyspark.sql import Row

row = Row(num=11, name='smith')
print(row.num, row.name)

Output

*Pyspark모듈의 Row함수에 컬럼명과 값을 입력해주면 DataFrame 형식처럼 컬럼명과 필드값이 저장된 Row객체가 생성된다.

 

예)

# employee의 정보를 가지고 데이터프레임을 만들어 본다
from pyspark.sql import SparkSession, Row
ss = SparkSession.builder.appName('test').getOrCreate()

lines = ss.sparkContext.textFile('/home/hdoop/py-scripts/employee.txt') #rdd 객체
parts = lines.map(lambda x : x.split(',')) #rdd 객체, 가공된 현상태로 자료형 추론이 가능하다

#각 컬럼의 스키마를 추론할 수 있는 데이터라면 RDD.toDF()를 사용하여 데이터프레임으로 변환가능하다
#그래도 map()을 활용해서 변환하면
rdd_row = parts.map(lambda x : Row(id = int(x[0]), name = str(x[1]), salary = int(x[2]), job = str(x[3])))

df = sparkSession.createDataFrame(rdd_row) #rdd객체를 DataFrame으로 변환
df.show()

#Row 생성시 컬럼명을 지정하지 않고 데이터프레임 생성 후에 커럼명을 지정하는 예
rdd_row = parts.map(lambda x: Row( int(x[0]), str(x[1]), str(x[2]), str(x[3])))
df = rdd_row.toDF().toDF('id','name','salary','job')
df.show()

*위의 예제는 employee.txt파일의 비정형 데이터를 RDD객체로 받고 가공/처리하여 DataFrame으로 변환하는 예시이다.

- sparkContext.textFile()은 RDD 객체를 return한다. 

- map(lamda )를 이용해 RDD의 행마다 콤마(,)를 기준으로 split()한다

- 위에서 이용한 데이터셋은 .toDF()를 이용해 바로 DataFrame으로 변환이 가능하지만 Row를 사용한 예시이다.

   -> id컬럼에는 행의 0번쨰, name컬럼에는 행의 1번째, salary컬럼에는 행의 2번째, job컬럼에는 행의 3번째 값

- 그렇게 Row값을 가지고 생성된 rdd객체는 sparkSession.createDataFrame(rdd)을 통해 DataFrame으로 변환한다.

* 위처럼 Row생성시에 컬럼명을 부여하지 않고, DataFrame으로 변환 후 .toDF(컬럼명 )을 통해 컬럼명 설정이 가능하다.

 

 

#salary <= 30000 이하인 경우만 DataFrame을 출력해보자
df.createOrReplaceTempView('emp')  #'emp'라는 view를 생성, 내용은 df(DataFrame)
found = sparkSession.sql('select * from emp where salary <= 30000') #sql문법을 사용하여 view를 조회
print(type(found))  # type : <class 'pyspark.sql.dataframe.DataFrame'>
found.show()

RDD -> DataFrame으로 변환된 데이터는 sql문을 통해 검색/처리 등이 가능하다.

위 예시는 변환된 DataFrame 값들 중 salary컬럼이 30000이상인 행을 출력하도록 하는 sql문 활용 예시이다.

'Hadoop' 카테고리의 다른 글

Spark로 Json파일 읽고 쓰기  (0) 2021.09.16
DataFrame과 Schema(스키마) (feat. Spark  (0) 2021.09.16
RDD 가공/처리 함수  (0) 2021.09.16
RDD와 비정형데이터(feat. Spark)  (0) 2021.09.15
subprocess 모듈 (feat. python)  (0) 2021.09.15

댓글