Azure Synapse은 대규모 DW를 구축하거나 ETL 설계에서 필요한 통합 플랫폼이다. 특히 Spark환경에서 데이터를 효율적으로 불러오고, 쓸 때 자주사용하는 패키지가 mssparkutils 이다. 이번 글에서는 notebookutils와 mssparkutils를 중점적으로 이용한 외부 스토리지 접근 및 Credential 인증, 그리고 엑셀을 불러오는 방법을 알아본다.
mssparkutils란?mssparkutils는 Azure Synapse Spark 환경 전용 유틸리티 모듈로, 다음과 같은 기능을 제공합니다.
이는 Databricks의 dbutils와 유사한 역할을 수행하며, Synapse 환경에서 데이터 접근을 자동화할 때 매우 유용합니다.
Synapse에서 외부 스토리지(예: ADLS Gen2, Blob Storage)에 접근할 때는 반드시 보안 인증(Credential Authentication) 절차가 필요합니다.
가장 일반적인 방법은 Linked Service를 생성하여 인증을 위임하는 것입니다.
💡 계정 키 Managed Identity를 권장합니다.
Azure AD 기반 인증이 가능하며, Key를 직접 관리할 필요가 없습니다.
이제 Notebook 코드에서 생성한 Linked Service를 이용하여 Credential을 불러올 수 있습니다.
from notebookutils import mssparkutils
# 파일을 읽기 위한 Credential 인증
linked_service = 'develop_adls'
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service)
위 코드는 Synapse Workspace에 등록된 develop_adls의 연결 정보를 불러옵니다.
이 Credential은 이후 mssparkutils.fs 모듈을 통해 ADLS 파일 접근 시 사용됩니다.
파일을 다루는 대표적인 기능 중 하나는 폴더 내 파일 리스트 조회입니다.
Synapse에서는 다음 코드로 간단히 구현할 수 있습니다.
from notebookutils import mssparkutils
import pandas as pd
ADLS_FILE_PATH= "abfss://data@datalakeaccount.dfs.core.windows.net/input/"
# 폴더 내 파일 목록 불러오기
file_list = mssparkutils.fs.ls(ADLS_FILE_PATH)
# 엑셀 읽기
for f in file_list:
if f.name.endswith('.xlsx'):
pf = pd.read_excel(f.path, dtypes=str) # Pandas로 불러오기
# nan문자열 null 변경
df = pf.replace("nan", "")
출력 결과는 file_list객체 리스트로 반환되며, 파일명, 경로, 크기 등의 정보를 포함합니다.
이는 ETL 파이프라인에서 동적 파일 처리(예: 매일 생성되는 CSV/Parquet 파일) 에 자주 활용됩니다.
파일 목록을 가져왔다면 Spark를 이용해 데이터를 읽을 수 있습니다.
# 첫 번째 파일 읽기
first_file = file_list[0].path
df = spark.read \\
.option("header", True) \\
.csv(first_file)
display(df.limit(5))
또는 Parquet 파일의 경우:
df_parquet = spark.read.parquet(first_file)
df_parquet.printSchema()
✅ CSV/Parquet 외에도 JSON, Avro, Delta 등 다양한 포맷을 지원합니다.
mssparkutils 활용 팁| 기능 | 코드 예시 | 설명 |
|---|---|---|
| 변수 전달 | mssparkutils.notebook.run("child", 60, {"param":"value"}) | 다른 노트북 실행 및 파라미터 전달 |
| Notebook 경로 | mssparkutils.env.getNotebookPath() | 현재 Notebook의 경로 확인 |
| Spark Job 취소 | mssparkutils.runtime.cancelJob(job_id) | 장시간 실행되는 Job 강제 중단 |
| 파일 복사 | mssparkutils.fs.cp(source, target, recurse=True) | 디렉토리 단위 복사 지원 |
| 파일 삭제 | mssparkutils.fs.rm(path, recurse=True) | 재귀적 삭제 |
이러한 유틸리티들은 Synapse Notebook을 자동화된 ETL 또는 데이터 준비 단계에서 효율적으로 사용할 수 있습니다.
아래는 mssparkutils를 활용한 간단한 ETL 흐름 예시입니다.
SOURCE_PATH = "abfss://raw@datalake.dfs.core.windows.net/tmp_sales/"
TARGET_PATH = "abfss://silver@datalake.dfs.core.windows.net/stg_sales/"
files = mssparkutils.fs.ls(SOURCE_PATH)
for f in files:
df = spark.read.csv(f.path, header=True)
df_clean = df.dropna()
df_clean.write.mode("overwrite").parquet(TARGET_PATH + f.name.replace(".csv", ".parquet"))
이 스크립트는:
raw 영역의 CSV 파일을 읽고silver 영역에 Parquet 포맷으로 저장합니다.Azure Synapse에서의 데이터 파이프라인 개발은 단순한 Spark 코드를 넘어,
보안, 인증, 파일 시스템 자동화까지 모두 고려해야 합니다.
mssparkutils는 이러한 과정을 통합적으로 처리할 수 있는 강력한 도구이며,
다음과 같은 장점이 있습니다:
맥북에서 PySpark 로컬 실행환경 설치를 해보기 위한 방법을 정리하였다. 데이터 엔지니어링을 공부하면 Spark는 꼭 다뤄야…