spark
spark 에서 schema 를 적용해 데이터 읽기
wefree
2021. 7. 18. 19:17
문제
hdfs://localhost:9000/csv.txt 에 아래와 같이 csv 형식의 데이터가 있을 때
name,age,birthDay
A,1,2020-01-01
B,2,2019-02-02
C,3,2018-03-03
name 은 String, age 는 Integer, birthDay 는 Date 형식으로 읽으려면?
방법1
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
object MySpark extends Serializable {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
val schemaDef = StructType(
List(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("birthDay", DateType)
)
)
val df: DataFrame = spark.read
.option("header", "true")
.option("mode", "FAILFAST")
.schema(schemaDef)
.option("dateFormat", "yyyy-MM-dd")
.csv("hdfs://localhost:9000/csv.txt")
df.printSchema()
spark.stop
}
}
// 출력 결과
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- birthDay: date (nullable = true)
방법2
import org.apache.spark.sql.{DataFrame, Encoders, SparkSession}
import java.sql.Date
final case class Person(name: String, age: Int, birthDay: Date)
object MySpark extends Serializable {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.getOrCreate()
val schemaDef: StructType = Encoders.product[Person].schema
val df: DataFrame = spark.read
.option("header", "true")
.option("mode", "FAILFAST")
.schema(schemaDef)
.option("dateFormat", "yyyy-MM-dd")
.csv("hdfs://localhost:9000/csv.txt")
df.printSchema()
spark.stop
}
}
아래와 같이 DataFrame 인 df 를 DataSet[Person] 으로 변경할 수 있다.
val df: DataFrame = spark.read...
import spark.implicits._
val ds: Dataset[Person] = df.as[Person]
...
참고로 spark.read 에서 csv 관련 option 은 spark doc 에서 확인할 수 있다.