DataFrame은 Pandas의 DataFrame, Spark의 DataFrame 두개가 존재한다.
-두개의 객체는 서로 다른 객체이다. 하나는 pandas의, 다른 하나는 spark의 객체이다.
이런 Pandas와 Spark의 DataFrame을 서로 호환해주는 Tool이 있다.
- 예를들어 Pandas의 데이터프레임에 주식 데이터를 저장하고 csv파일로 만든 뒤,
만든 csv파일을 spark의 데이터프레임으로 다시 불러와야 하는 작업이 있는데
만약 여기서 사용된 두 데이터프레임이 서로 호환이 된다면 중간에 파일을 저장하고 불러오는 번거로운 작업을 줄일 수 있을 것이다.
- Arrow 모듈
Arrow 모듈이 위에서 설명한 Pandas와 Spark의 두 데이터프레임을 호환해주는 모듈이다.
- 사용법
import numpy as np
sparkSession.conf.set("spark.sql.execution.arrow.enabled","true")
pdf = pd.DataFrame(np.random.rand(100,3))
sdf = sparkSession.createDataFrame(pdf)
sdf.show(5)
result_pdf = sdf.select("*").toPandas()
display(result_pdf.head(5))
1. sparkSession.conf.set("spark.sql.execution.arrow.enabled","true")
- arrow 모듈을 사용하기 위한 설정이다. arrow를 따로 import할 필요는 없다.
2.
sdf(Spark DataFrame) -> pdf(Pandas DataFrame) : toPandas()
스파크 데이터프레임에서 판다스 데이터프레임으로 변환할 때는 toPandas()를 사용한다.
pdf(Pandas DataFrame) -> sdf(Spark DataFrame) : sparkSession.createDataFrame(pdf)
판다스 데이터프레임에서 스파크 데이터프레임으로 변환할 때는 sparkSession.createDataFrame()을 사용한다.
예)
import yfinance as yf
from pyspark.sql import SparkSession
import pandas as pd
from hdfs import InsecureClient
#Microsoft 주식 데이터 2011-01-01 부터 조회 -> stock_df(판다스 데이터프레임)
stock_df = pd.DataFrame(yf.download('msft', start = '2011-01-01'))
#sparkSession 객체 생성
sparkSession = SparkSession.builder.appName('stock_test').getOrCreate()
#Arrow 모듈 사용
sparkSession.conf.set("spark.sql.execution.arrow.enabled","true")
#stock_df(판다스 데이터프레임) -> sdf(스파크 데이터프레임)
sdf = sparkSession.createDataFrame(stock_df)
# sdf를 csv파일로 저장
sdf.write.csv('hdfs://localhost:9000/user/wan',mode='overwrite')
위의 예시 코드처럼 Arrow를 사용하면 Pandas 데이터프레임을 spark 데이터프레임으로 변형하고
변형된 데이터를 hdfs 영역에 저장하는 과정이 매우 간단한다.
'Hadoop' 카테고리의 다른 글
RDD와 비정형데이터(feat. Spark) (0) | 2021.09.15 |
---|---|
subprocess 모듈 (feat. python) (0) | 2021.09.15 |
Pyspark 사용 예시(3) (0) | 2021.09.15 |
Pyspark 사용 예시(2) (0) | 2021.09.15 |
Pyspark 사용 예시(1) (0) | 2021.09.14 |
댓글