본문 바로가기
Hadoop

Pyspark 사용 예시(3)

by 집못가는프로그래머 2021. 9. 15.
from pyspark.sql import SparkSession

sparkSession = SparkSession.builder.appName("pyspark-hdfs2").getOrCreate() 

#read.load()는 다양한 옵션을 설정할 수 있다
df2 = sparkSession.read.load('/home/hdoop/employee.csv', 
                             format = 'csv', sep=',', inferSchema='true', header='true')

df2.show()

df2 = df2.withColumn('salary', df2.salary+5000)

df2.show()

df2 = df2.withColumn('salary', df2.salary * 1.05)

df2.show()

Output

1. sparkSession.read.load('/home/hdoop/employee.csv',format = 'csv', sep=',', inferSchema='true', header='true')

read.load는 csv를 읽어오는 형식뿐만 아니라 더 많은 형태의 데이터를 읽어올 수 있는 기능이다.

위의 코드에서는 foramt = 'csv'를 통해 'csv' 파일임을 지정하고

sep = ','를 통해 구분자를 지정하고

inderSchema = 'true'를 통해 테이블의 구조를 자동으로 처리하도록 하고

header = 'true'를 통해 컬럼명을 지정해준다

 

 

2. df2 = df2.withColumn('salary', df2.salary+5000)

withColumn을 이용해 df2 데이터프레임에서 특정 컬럼을 이용해 무언가를 하겠다는 의미이다.

즉, df2의 'salary'라는 컬럼 내용에 5000을 더하고 처리된 내용에 대한 새로운 객체(DataFrame)을 return한다

그 return된 객체를 다시 df2가 받는 원리이다.

df2 = df2.withColumn('salary', df2.salary * 1.05) 도 마찬가지 원리이다.

salary라는 컬럼 내용에 1.05를 곱해주고 곱해진 값에 대한 객체를 df2가 다시 받는 원리이다.

 


df2.write.csv(path='local_data')
print('변경된 내용을 파일에 저장함')

Output

위의 코드를 실행하면 Jupyter Notebook의 Default 경로(나는 py-scipts)에 'local_data'를 만들고 그 안에 csv파일을 만든다. 만약 저 코드를 2번 실행하면 위와같은 에러가 발생한다. 이미 존재하는 파일이기 때문이다.


df2.write.csv(path='local_data', mode='append')
print('변경된 내용을 파일에 저장함')

df2.write.csv(path='local_data', mode='overwrite')
print('변경된 내용을 파일에 저장함')

위의 두 코드의 차이점은 mode='append' 와 mode='overwrite' 이다

전에 mode없이 실행을 하면 경로에 파일이 존재한다는 내용의 에러가 발생했는데

mode를 적어줌으로서 해결이 가능하다

mode = 'append'는 이미 존재하는 파일에 누적해서 내용을 집어넣겠다는 뜻이다.

mode = 'overwrite'는 이미 존재하는 파일에 대한 내용을 없애고 새로운 내용으로 덮어씌운다는 뜻이다.

파이썬에서 파일 입출력할 때, mode = 'a'와 mode = 'w'의 차이라고 생각하면 쉽다.

 


df2.write.format('csv').option('header', True).mode('overwrite').option('sep',',').save('local_data')
df2.show()

print('file saved')

Output

위 코드는 옵션을 지정해주는 또다른 방법이다.

판다스 데이터프레임의 컬럼을 조회할때 df['column_name'] 방식 뿐만 아니라 df.column_name도 가능한 것처럼 (.)으로 구분하여 옵션을 지정해 줄 수 있다.

이처럼 hdfs 경로에도 저장이 가능하다.

'Hadoop' 카테고리의 다른 글

subprocess 모듈 (feat. python)  (0) 2021.09.15
Arrow 모듈과 DataFrame(feat. Pandas, Spark)  (0) 2021.09.15
Pyspark 사용 예시(2)  (0) 2021.09.15
Pyspark 사용 예시(1)  (0) 2021.09.14
Pyspark 설치  (0) 2021.09.14

댓글