공모전을 준비하면서 4GB가 넘는 건축물대장 파일을 전처리할 필요가 있었는데,
데이터베이스를 연결하지않고 파이썬 내에서 간단하게 불러올 수 있는
dask라는 라이브러리가 있어서 유용하게 사용했던 적이 있다.
dask 라이브러리
- 병렬 처리를 지원하는 대규모 데이터프레임 라이브러리
- 큰 규모의 데이터를 효율적으로 처리하고 분석할 수 있도록 도와줌
- Pandas와 비슷한 API를 제공하여 기존에 Pandas를 사용했던 사람들도 비교적 쉽게 적용 가능
코드별 설명
- Raw data : https://open.eais.go.kr/opnsvc/opnSvcInqireView.do 에서 다운로드 가능
- 상황 : 2023년 5월 기준 전국 건축물대장 층별개요 데이터에서 서울특별시 & 필요 칼럼만 추출
1. 라이브러리 및 필요 함수 정의
import pandas as pd
import glob
from tqdm import tqdm
import dask.dataframe as dd
def make_sido_code(row):
return str(row["시군구_코드"])[:2]
Raw 데이터는 전국 단위이며, 시군구_코드로 지역마다 구분되어있다.
서울특별시의 데이터만을 가져오기 위해 지역을 구분하기 위한 함수를 정의해준다.
2. 각 칼럼명 정의 후 데이터 로드
데이터 다운로드받을 때 데이터 구조도 함께 다운로드 받을텐데,
데이터별 칼럼명을 정의해주기 위해 엑셀에서 컬럼 한글명을 긁어왔다.
column_names = ['관리_건축물대장_PK', '대지_위치', '도로명_대지_위치', '건물_명', '시군구_코드', '법정동_코드', '대지_구분_코드',
'번', '지', '특수지_명', '블록', '로트', '새주소_도로_코드', '새주소_법정동_코드', '새주소_지상지하_코드',
'새주소_본_번', '새주소_부_번', '동_명', '층_구분_코드', '층_구분_코드_명', '층_번호', '층_번호_명',
'구조_코드', '구조_코드_명', '기타_구조', '주_용도_코드', '주_용도_코드_명', '기타_용도', '면적(㎡)',
'주_부속_구분_코드', '주_부속_구분_코드_명', '면적_제외_여부', '생성_일자']
floor = dd.read_csv("/content/drive/MyDrive/반지하/mart_djy_04.txt", sep = "\|", engine='python', dtype=str, \
on_bad_lines='skip', keep_default_na=False, encoding = "cp949", header=None, names=column_names)
print(f"건축물대장 표제부의 분할된 개수는 {floor.npartitions}개입니다.")
CSV 파일의 내용이 Dask DataFrame 형식으로 저장된다.
Dask의 npartitions
- npartitions 속성은 데이터프레임이 분할된 개수
- Dask는 큰 데이터를 처리할 때 데이터를 여러 조각으로 나누어 처리하는데, 이때 각 조각을 파티션(Partition)이라고 함
3. 전처리 및 pandas dataframe 형태로 변환
필요한 칼럼들만 가져오고, 서울특별시만 가져오기 위해 앞에서 정의한 함수를 통해 서울시 코드인 11로 데이터를 처리해준다.
meta=object 옵션
- Dask가 apply 메소드를 실행한 후에 반환되는 데이터프레임의 열에 대한 데이터 타입을 지정해주는 것
- 여기서는 "시도코드" 열의 데이터 타입을 object로 지정
seoul_floor = floor[["관리_건축물대장_PK",'시군구_코드', '법정동_코드','대지_위치','번', '지', "도로명_대지_위치", \
'주_용도_코드_명', '기타_용도','층_구분_코드_명', '층_번호', '층_번호_명', '면적(㎡)']]
seoul_floor["시도코드"] = floor.apply(make_sido_code, axis=1, meta=object)
seoul_floor = seoul_floor[seoul_floor["시도코드"] == "11"]
# preprocessing
seoul_floor["관리_건축물대장_PK"] = seoul_floor["관리_건축물대장_PK"].map(lambda x: x.strip())
seoul_floor = seoul_floor.drop("시도코드", axis=1)
# convert dask to pandas
seoul_floor = seoul_floor.compute()
# save csv
seoul_floor.to_csv("./seoul_층별개요.csv", encoding="utf-8", index=False)
compute() 함수 : dask dataframe → pandas dataframe
- compute()를 통해 dask 데이터프레임을 pandas의 데이터프레임으로 변환할 수 있다.
마지막으로, 변환한 데이터를 csv로 저장해준다.
전체 코드
- 4.66 GB의 크기라서 전체 코드 실행하면 시간이 생각보다 조금 걸린다.
import pandas as pd
import glob
from tqdm import tqdm
import dask.dataframe as dd
def make_sido_code(row):
return str(row["시군구_코드"])[:2]
%%time
column_names = ['관리_건축물대장_PK', '대지_위치', '도로명_대지_위치', '건물_명', '시군구_코드', '법정동_코드', '대지_구분_코드',
'번', '지', '특수지_명', '블록', '로트', '새주소_도로_코드', '새주소_법정동_코드', '새주소_지상지하_코드',
'새주소_본_번', '새주소_부_번', '동_명', '층_구분_코드', '층_구분_코드_명', '층_번호', '층_번호_명',
'구조_코드', '구조_코드_명', '기타_구조', '주_용도_코드', '주_용도_코드_명', '기타_용도', '면적(㎡)',
'주_부속_구분_코드', '주_부속_구분_코드_명', '면적_제외_여부', '생성_일자']
floor = dd.read_csv("/content/drive/MyDrive/반지하/mart_djy_04.txt", sep = "\|", engine='python', dtype=str, \
on_bad_lines='skip', keep_default_na=False, encoding = "cp949", header=None, names=column_names)
print(f"건축물대장 표제부의 분할된 개수는 {floor.npartitions}개입니다.")
seoul_floor = floor[["관리_건축물대장_PK",'시군구_코드', '법정동_코드','대지_위치','번', '지', "도로명_대지_위치", \
'주_용도_코드_명', '기타_용도','층_구분_코드_명', '층_번호', '층_번호_명', '면적(㎡)']]
seoul_floor["시도코드"] = floor.apply(make_sido_code, axis=1, meta=object)
seoul_floor = seoul_floor[seoul_floor["시도코드"] == "11"]
# preprocessing
seoul_floor["관리_건축물대장_PK"] = seoul_floor["관리_건축물대장_PK"].map(lambda x: x.strip())
seoul_floor = seoul_floor.drop("시도코드", axis=1)
# convert dask to pandas
seoul_floor = seoul_floor.compute()
# save csv
seoul_floor.to_csv("./seoul_층별개요.csv", encoding="utf-8", index=False)
실행하면 아래와 같은 결과가 뜬다. 4.66GB 데이터 기준 13분정도 걸렸다.
Reference
728x90