spark

spark 에서 schema 를 적용해 데이터 읽기

wefree 2021. 7. 18. 19:17

문제

hdfs://localhost:9000/csv.txt 에 아래와 같이 csv 형식의 데이터가 있을 때

name,age,birthDay
A,1,2020-01-01
B,2,2019-02-02
C,3,2018-03-03

name 은 String, age 는 Integer, birthDay 는 Date 형식으로 읽으려면?

 

방법1

import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}

object MySpark extends Serializable {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .getOrCreate()

    val schemaDef = StructType(
      List(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("birthDay", DateType)
      )
    )

    val df: DataFrame = spark.read
      .option("header", "true")
      .option("mode", "FAILFAST")
      .schema(schemaDef)
      .option("dateFormat", "yyyy-MM-dd")
      .csv("hdfs://localhost:9000/csv.txt")

    df.printSchema()

    spark.stop
  }
}


// 출력 결과
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- birthDay: date (nullable = true)

 

방법2

import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import java.sql.Date

final case class Person(name: String, age: Int, birthDay: Date)

object MySpark extends Serializable {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .getOrCreate()

    val schemaDef: StructType = Encoders.product[Person].schema

    val df: DataFrame = spark.read
      .option("header", "true")
      .option("mode", "FAILFAST")
      .schema(schemaDef)
      .option("dateFormat", "yyyy-MM-dd")
      .csv("hdfs://localhost:9000/csv.txt")

    df.printSchema()

    spark.stop
  }
}

아래와 같이 DataFrame 인 df 를 DataSet[Person] 으로 변경할 수 있다.

val df: DataFrame = spark.read...

import spark.implicits._
val ds: Dataset[Person] = df.as[Person]

...

 

참고로 spark.read 에서 csv 관련 option 은 spark doc 에서 확인할 수 있다.