ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 각 파티션에서 TaskContext를 사용해 현재 partitionId가져오기
    spark 2025. 7. 11. 23:00
    import org.apache.spark.sql.SparkSession
    
    object PartitionIdTest {
    	def main(args: Array[String]): Unit = {
    		// TaskId는 Spark에서 작업을 실행할 때 각 작업에 대한 고유한 식별자를 제공합니다.
    		// 이 식별자는 Spark의 내부 로깅 및 모니터링에 사용됩니다.
    		// SparkContext의 getTaskId() 메서드를 사용하여 현재 작업의 TaskId를 가져올 수 있습니다.
    
    		implicit val spark: SparkSession = SparkSession.builder().appName("TaskIdTest").master("local[*]").getOrCreate()
    		import spark.implicits._
    
    		(1 to 10).toDF.repartition(10).foreachPartition { patition =>
    			// 각 파티션에서 TaskContext를 사용하여 현재 작업의 TaskId를 가져옵니다.
    
    			val partitionId = org.apache.spark.TaskContext.get().partitionId()
    			println(s"Current Partition ID in partition: $partitionId")
    		}
    
    		spark.stop()
    	}
    }

     

     

    실행 결과

    Current Partition ID in partition: 4
    Current Partition ID in partition: 6
    Current Partition ID in partition: 8
    Current Partition ID in partition: 9
    Current Partition ID in partition: 0
    Current Partition ID in partition: 1
    Current Partition ID in partition: 3
    Current Partition ID in partition: 2
    Current Partition ID in partition: 7
    Current Partition ID in partition: 5

     

     

    라이브러리 형태로 만들기

    import org.apache.spark.TaskContext
    import org.apache.spark.sql.SparkSession
    
    import java.time.format.DateTimeFormatter
    import java.time.{LocalDate, LocalDateTime, ZoneId}
    
    object SparkPartition {
    	implicit class SparkPartition(spark: SparkSession) {
    		def foreachExecutor(size: Int)(f: Int => Unit): Unit = {
    			import spark.implicits._
    
    			(0 until size).toDF.repartition(size).foreachPartition { partition =>
    				val id: Int = TaskContext.get().partitionId()
    
    				f(id)
    				partition.foreach { _ => Unit }
    			}
    		}
    	}
    
    	def main(args: Array[String]): Unit = {
    		implicit val spark: SparkSession = SparkSession.builder()
    			.appName("SparkPartition")
    			.master("local[*]")
    			.getOrCreate()
    
    		spark.foreachExecutor(2) { id =>
    			println(s"Partition ID: $id")
    
    			while(true) {
    				// Simulate some processing
    				val now: String = LocalDateTime.now(ZoneId.of("Asia/Seoul")).format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
    				println(s"$now Processing in partition: $id")
    				Thread.sleep(60000L) // Sleep to simulate work
    			}
    		}
    
    		spark.stop()
    	}
    }

    댓글

Designed by Tistory.