Spark
-
broadcast join 과 bucket joinspark 2021. 7. 26. 00:11
broadcast join join 할 데이터가 작은게 있을 경우 spark 에서 자동으로 broadcast join 을 적용해 준다. broadcast join 을 적용하라고 broadcast() function 으로 Hint 를 줄 수도 있다. import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.{Column, DataFrame, SparkSession} object MySpark extends Serializable { def main(args: Array[String]): Unit = { val spark: SparkSession = ??? val smallDf: DataFrame = ??? val hugeDf: ..
-
spark 에서 schema 를 적용해 데이터 읽기spark 2021. 7. 18. 19:17
문제 hdfs://localhost:9000/csv.txt 에 아래와 같이 csv 형식의 데이터가 있을 때 name,age,birthDay A,1,2020-01-01 B,2,2019-02-02 C,3,2018-03-03 name 은 String, age 는 Integer, birthDay 는 Date 형식으로 읽으려면? 방법1 import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} object MySpark extends Serializable { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("l..
-
spark 에서 groupByKey 로 특정 column 기준 unique row 만 남기기spark 2021. 7. 16. 21:45
문제아래와 같이 Person 데이터가 있을 때nameagelocationa1Ab2Ba3C name 기준으로 unique 한 row 만 남긴다. name 이 a 인 레코드가 2개 있는데, 중복이 제거되어 둘 중 임의의 하나만 남으면 된다. 단순히 groupBy(name) 을 사용하면 age, location 에 대해 aggregation 을 해야되서 복잡해 진다. Codeimport org.apache.spark.sql.{Dataset, SparkSession}final case class Person(name: String, age: Int, location: String)object MySpark extends Serializable { def main(args: Array[String]): Unit..
-
spark 을 local, cluster mode 에서 실행하기 위한 JVM option, log4j.properties 설정spark 2021. 7. 10. 20:15
log4j.proerties log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log 사용에 주목할 것 !! # Set everything to be logged to the console log4j.rootCategory=WARN, console # define console appender log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.lay..