import org.apache.spark.sql.SparkSession
object PartitionIdTest {
def main(args: Array[String]): Unit = {
// TaskId는 Spark에서 작업을 실행할 때 각 작업에 대한 고유한 식별자를 제공합니다.
// 이 식별자는 Spark의 내부 로깅 및 모니터링에 사용됩니다.
// SparkContext의 getTaskId() 메서드를 사용하여 현재 작업의 TaskId를 가져올 수 있습니다.
implicit val spark: SparkSession = SparkSession.builder().appName("TaskIdTest").master("local[*]").getOrCreate()
import spark.implicits._
(1 to 10).toDF.repartition(10).foreachPartition { patition =>
// 각 파티션에서 TaskContext를 사용하여 현재 작업의 TaskId를 가져옵니다.
val partitionId = org.apache.spark.TaskContext.get().partitionId()
println(s"Current Partition ID in partition: $partitionId")
}
spark.stop()
}
}
실행 결과
Current Partition ID in partition: 4
Current Partition ID in partition: 6
Current Partition ID in partition: 8
Current Partition ID in partition: 9
Current Partition ID in partition: 0
Current Partition ID in partition: 1
Current Partition ID in partition: 3
Current Partition ID in partition: 2
Current Partition ID in partition: 7
Current Partition ID in partition: 5
라이브러리 형태로 만들기
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime, ZoneId}
object SparkPartition {
implicit class SparkPartition(spark: SparkSession) {
def foreachExecutor(size: Int)(f: Int => Unit): Unit = {
import spark.implicits._
(0 until size).toDF.repartition(size).foreachPartition { partition =>
val id: Int = TaskContext.get().partitionId()
f(id)
partition.foreach { _ => Unit }
}
}
}
def main(args: Array[String]): Unit = {
implicit val spark: SparkSession = SparkSession.builder()
.appName("SparkPartition")
.master("local[*]")
.getOrCreate()
spark.foreachExecutor(2) { id =>
println(s"Partition ID: $id")
while(true) {
// Simulate some processing
val now: String = LocalDateTime.now(ZoneId.of("Asia/Seoul")).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
println(s"$now Processing in partition: $id")
Thread.sleep(60000L) // Sleep to simulate work
}
}
spark.stop()
}
}