spark

각 파티션에서 TaskContext를 사용해 현재 partitionId가져오기

wefree 2025. 7. 11. 23:00
import org.apache.spark.sql.SparkSession

object PartitionIdTest {
	def main(args: Array[String]): Unit = {
		// TaskId는 Spark에서 작업을 실행할 때 각 작업에 대한 고유한 식별자를 제공합니다.
		// 이 식별자는 Spark의 내부 로깅 및 모니터링에 사용됩니다.
		// SparkContext의 getTaskId() 메서드를 사용하여 현재 작업의 TaskId를 가져올 수 있습니다.

		implicit val spark: SparkSession = SparkSession.builder().appName("TaskIdTest").master("local[*]").getOrCreate()
		import spark.implicits._

		(1 to 10).toDF.repartition(10).foreachPartition { patition =>
			// 각 파티션에서 TaskContext를 사용하여 현재 작업의 TaskId를 가져옵니다.

			val partitionId = org.apache.spark.TaskContext.get().partitionId()
			println(s"Current Partition ID in partition: $partitionId")
		}

		spark.stop()
	}
}

 

 

실행 결과

Current Partition ID in partition: 4
Current Partition ID in partition: 6
Current Partition ID in partition: 8
Current Partition ID in partition: 9
Current Partition ID in partition: 0
Current Partition ID in partition: 1
Current Partition ID in partition: 3
Current Partition ID in partition: 2
Current Partition ID in partition: 7
Current Partition ID in partition: 5

 

 

라이브러리 형태로 만들기

import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession

import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime, ZoneId}

object SparkPartition {
	implicit class SparkPartition(spark: SparkSession) {
		def foreachExecutor(size: Int)(f: Int => Unit): Unit = {
			import spark.implicits._

			(0 until size).toDF.repartition(size).foreachPartition { partition =>
				val id: Int = TaskContext.get().partitionId()

				f(id)
				partition.foreach { _ => Unit }
			}
		}
	}

	def main(args: Array[String]): Unit = {
		implicit val spark: SparkSession = SparkSession.builder()
			.appName("SparkPartition")
			.master("local[*]")
			.getOrCreate()

		spark.foreachExecutor(2) { id =>
			println(s"Partition ID: $id")

			while(true) {
				// Simulate some processing
				val now: String = LocalDateTime.now(ZoneId.of("Asia/Seoul")).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
				println(s"$now Processing in partition: $id")
				Thread.sleep(60000L) // Sleep to simulate work
			}
		}

		spark.stop()
	}
}