Hadoop

DataFrame 가공/처리 함수(feat. Spark)

집못가는프로그래머 2021. 9. 20. 03:06

Spark의 DataFrame에는 여러가지 함수들이 존재한다.

DataFrame에 들어있는 데이터를 처리/가공하기 쉽게 해주는 함수들이다.

이 함수들은 방대한 양의 데이터에서 필요한 정보만 필터링/추출 하는것이 매우 쉽다.

 

1. df.WithColumn()

- df.withColumn('col_name','new value') -> 특정 컬럼의 값을 new_value로 변경

2. df.select()

- df.select('col_name') -> 특정 컬럼 선택

3. df.filter()

- df.filter( ( df.col_name == 'Smith') & (...) ) -> 특정 행 선택 (조건을 여러개 붙힐때는 '&'를 사용한다)

4. df.drop()

- df.drop('col_name') -> 특정 컬럼 삭제

- df.drop('col1','col2','col3', ...) -> 여러개의 컬럼 삭제도 가능

 

예)

# employee.csv -> DataFrame 로드
# salary 30000 이하인 행 선택 / 급여를 10% 인상

ss = SparkSession.builder.getOrCreate()
path = 'hdfs://localhost:9000/user/data/csv/employee_with_header.csv'
df = ss.read.csv(path=path, header = True, inferSchema=True)
df.show()

#withColumn()함수만을 이용한 'salary'컬럼 10% 증가
#df2 = df.filter(df.salary <= 30000).withColumn('salary', df.salary * 1.1)
#df2.show()

#when() 함수
from pyspark.sql.functions import when  
df2 = df.withColumn('salary', when(df.salary <= 30000, 33000) \
                    .when(df.salary > 30000, 100) \
                    .otherwise(df.salary))
#salary컬럼 값이 30000이하이면 33000으로 바꾸고
# salary컴럼 값이 30000 초과라면 100으로 바꾼다.
# 모든 경우가 아니면 그냥 둔다
df2.show()

#drop()함수
df3 = df2.drop('name')  # name컬럼 삭제, 본 데이터프레임에서 삭제가 아닌 삭제된 새로운 DataFrame return
# ==
# df3 = df2.drop(df2.name)
# 여러개도 사용 가능
# df3 = df2.drop('name','salary')
# ==
# df3 = df2.drop(df2.name, df2.salary)
df3.show()

Output

 

 

4. UDF : User Defined Function

- Spark에서 지원하는 사용자 지정 함수

- Annotation(주석) 활용

- 가능한한 최대한 Built-In함수를 사용하고 최후의 보루로서 사용하는 것을 권장

 

예 )

#UDF : User Defined Function
#Spark에서 지원하는 사용자 지정함수
# Annotation 활용
# 최대한 Built-In 함수를 사용하고 정말 방법이 없을 때 사용하는 것을 권장

from pyspark.sql.functions import udf  #import udf
from pyspark.sql.types import *

@udf(StringType())              # Annotation을 사용하여 편리하게 UDF 선언, 함수의 return 타입은 String임
def sal_label(sal):             # sal_label()함수 위에 @udf을 적어줌으로서
    if sal<=3000 : return 'Low' # sal_label을 Spark DataFrame에서 사용 가능
    elif sal<=40000 : return 'Middle'
    else : return 'High'
    
# 사용 예시
df2 = df.withColumn('Sal Grade', sal_label(df.salary)) #'Sal Grade'라는 컬럼은 없으므로 새로 생성되고 값이 들어간다.
df2.show()
df2.write.csv('hdfs://localhost:9000/user/data/label/', mode='overwrite')

Output

 

 

  • Aggregation (agg)  : 통계함수 = 집합함수(그룹함수)

* 사용법 : df.agg( { 'col_name' : function } )  - Json/Dict 형태를 지닌다

1. avg : 평균
2. max : 최대
3. min : 최소
4. sum : 합계
5. count : 계산
var : variance(분산)
6. var_samp (sample)     - 통계학의 표본(sample) : 전체를 샘플링하여 일부를 추출
7. var_pop  (population) - 통계학의 모집단(population) : 구하고자 하는 데이터셋의 전체
(stddev : 표준편차)
8. stddev_samp - 표본의 표준편차
9. stddev_pop   - 모집단의 표준편차

 

예)

#급여의 최고액, 최저액, 평균액, 항목수 추출
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

res = df2.agg({'salary' : 'min'})
res.show()

df4 = df2.agg({'salary' : 'max'})
res.show()

df5 = df2.agg({'salary' : 'mean'})
res.show()

df6 = df2.agg({'salary' : 'count'})
res.show()

res = df.agg( max('salary'), min('salary'), mean('salary'))   # 한 컬럼에 대해 여러개의 통계가 필요할 때 한번에 작성하는 법
res.show()

Output

 

 

  • csv파일을 읽은 뒤 함수를 사용할 때 (e.g. 계산을 요하는 함수들) 문자열 컬럼에 대한 적용 불가한 오류가 발생한다.그럴때는 
    1. 커스텀 스키마 적용
    2. inferSchema = True 사용
    3. DF 생성 후에 withColumn 사용하여 자료형 변경
       -> e.g) df.withColumn('col1', df['col'].cast('double')) : 특정 컬럼의 Type을 Double로 선언

 

  • 공분산 : 두 변수의 연관성 여부
    df.cov('col', 'col2')  << 두 컬럼의 공분산 구하는 법
  • 상관계수 : 두 변수의 연관성의 크기 (-1 ~ +1)
    df.corr('col','col2')  << 두 컬럼의 상관계수 구하는 법

예)

from pyspark.sql.types import *
from pyspark.sql.functions import *

ss = SparkSession.builder.appName('diabetes').getOrCreate()

url = 'hdfs://localhost:9000/user/diabetes_info/diabetes.csv'
df = ss.read.csv(url, header=True, inferSchema = True)  #inferSchema를 이용한 컬럼타입 사전 자동 설정
df.printSchema()
print(df.cov('Pregnancies', 'BloodPressure'))  #두 컬럼 Pregnancies, BloodPressure의 공분산
print(df.corr('Pregnancies', 'BloodPressure')) # 두 컬럼 Prgnancies, BloodPressure의 상관계수


df2 = ss.read.csv(url,header =True)
df2.printSchema()

df2 = df2.withColumn('Pregnancies', df2.Pregnancies.cast('Double'))   #withColumn()과 cast()함수를 이용한 컬럼 형변환
df2 = df2.withColumn('BloodPressure', df2.Pregnancies.cast('Double')) 
df2.printSchema()

InferShcema를 이용한 데이터 Type 자동 설정
Prenancies, BloodPressure 컬럼의 공분산, 상관계수
기본 데이터 Type : 
StringwithColumn과 cast를 이용한 데이터 Type 설정

 

 

 

  • 데이터를 탐색하는 방법으로 <기술통계>가 있다.
    - 기술은 Describe의 의미를 가진다. 예) 특징을 기술하여라.
    df.describe()
    df.summary()  : 사분위 포함

예 )

ss = SparkSession.builder.appName('describe').getOrCreate()
url = 'hdfs://localhost:9000/user/data/csv/employee4.csv'

df = ss.read.csv(url, header = True, inferSchema=True)

df.describe().show()
df.summary().show()

기술통계 함수(describe, summary)를 이용한 데이터 셋의 컬럼별 통계/특징 추출

 

 

  • df.orderBy('col_name') : 해당 컬럼 오름차순 정렬
    - 여러개의 컬럼 정렬 : df.orderBy('col1','col2','col3'...)
    - 내림차순 : df.orderBy('col_name', ascending=False)
    - df.orderBy('col_name','col_name2', ascending=[True,False])  : col_name 오름차순, col_name2 내림차순, 맨 앞에 적힌 컬럼을 먼저 정렬,  값이 동일하면 뒤의 조건 순으로 정렬

예 )

#df3.orderBy('eid').show()
df3.orderBy(['department_id','eid'], ascending = [True,False]).show(df3.count())

department_id 기준 오름차순 정렬, 값이 똑같으면 eid기준 내림차순 정렬

 

 

  • 중복제거

- df.drop_duplicates() : 중복 제거
- df.drop_duplicates(['col']) : col컬럼에 중복되는 값들을 첫번째를 제외하고 모두 제거
- df.drop_duplicates(['col','col2']) : col컬럼과 col2의 컬럼에 중복되는 값 제거

 

예)

#부서 번호가 중복되지 않고 한번만 출력
df3.drop_duplicates(['department_id']).show()  # department_id 중복 값 제거

res = df3.drop_duplicates(['department_id']).orderBy(['department_id'])  # department_id 중복 값 제거 후 오름차순 정렬
print("총 부서 수 : ", res.count())
res.show()

department_id 컬럼의 중복값 제거 후 결과
department_id 컬럼의 데이터 종류 출력(count) 및  컬럼 데이터 오름차순 정렬

 

 

 

  • 무작위 추출 : df.sample()
    - 복원 추출 : df.sample(withReplacement = True) -> 임의의 값을 뽑고 그 다음값을 뽑을때 전에 뽑았던 값 포함하여 임의로 뽑는다
    - 비복원 추출 : df.sample(withReplacement = False) -> 복원추출의 반대개념으로 임의의 값을 뽑으면 뽑힌 값은 제외하고 다음값을 뽑는다.
    fraction = 0.5   : 데이터셋중에서 50%를 뽑는다.
    seed = 10   :   seed값을 10으로 하고 임의로 뽑는다. Random함수의 seed와 같은 개념

예)

#employee5.csv : 비복원 추출로 10개 급여평균, 모집단 급여평균 비교

# 비복원 추출 20%의 급여평균
sample = df.sample(withReplacement = False, fraction = 0.2, seed = 10)
sample.show()

#평균값 출력
sample.agg(avg('SALARY')).show()
#alias를 사용하면 예를 들어 SALARY에 대한 AVG를 출력할때 컬럼명을 지정할 수 있다.
sample.agg( avg('SALARY').alias('sample avg')).show()

# 모집단의 금여평균
df.agg({'SALARY' : 'mean'}).show()

데이터셋의 20%를 비복원추출로 뽑은 결과
위에서 뽑은 20%의  salary컬럼 평균
alias()를 이용한 컬럼명 변경
모집단의 salary 컬럼 평균

 

 

 

  • 특정 컬럼의 데이터 빈도수 구하는 함수 : df.freqItems(cols=['컬럼명', '컬럼명2','컬럼명3'...])
                                                         * Freaquancy(빈도수) Items(복수형)

예)

df.freqItems(cols = ['HIRE_DATE', 'DEPARTMENT_ID']).show()  # 빈도수 순위

HIRE_DATE, DEPARTMENT_ID 컬럼의 데이터 빈도수 출력

 

 

 

  • 그룹화 : df.groupby(['컬럼1','컬럼2', ...]) + 계산 함수
        -> 컬럼1, 컬럼2의 데이터값 별로 그룹화한다.
        -> 그룹화된 그룹마다 계산함수를 이용하여 결과를 도출한다. 예) mean, max, min, count, etc.

예)

df.groupby(['DEPARTMENT_ID']).mean().orderBy('DEPARTMENT_ID').show()   #부서별로 그룹화하여 그룹마다의 평균을 구하고 부서순으로 오름차순 정렬함

DEPARTMENT_ID 컬럼의 데이터 값들을 기준으로 그룹화하고, 나머지 컬럼들은 데이터의       평균값으로 계산한다. 그룹화된 DEPARTMENT_ID 컬럼을 기준으로 오름차순 정렬