분류 전체보기
-
DuckDB 에서 PostgreSQL 연결하기pandas & duckdb 2025. 10. 31. 22:20
import duckdb# (1) DuckDB 연결 — 메모리 또는 파일 DBcon = duckdb.connect() # 메모리 기반# 또는# con = duckdb.connect("mydb.duckdb") # 파일 기반# (2) PostgreSQL extension 설치 / 로드 (필요한 경우)con.execute("INSTALL postgres;")con.execute("LOAD postgres;")# (3) PostgreSQL DB를 ATTACH# 예: 로컬의 PostgreSQL 인스턴스, 기본 스키마 publiccon.execute("""ATTACH 'dbname=postgres user=postgres host=127.0.0.1 port=5432 password=mypassword' AS ..
-
DuckDB 에서 pyspark api 사용하기pandas & duckdb 2025. 10. 31. 22:18
https://duckdb.org/docs/stable/clients/python/spark_apifrom duckdb.experimental.spark.sql import SparkSession as sessionfrom duckdb.experimental.spark.sql.functions import lit, colimport pandas as pdspark = session.builder.getOrCreate()pandas_df = pd.DataFrame({ 'age': [10, 20, 30]})df = spark.createDataFrame(pandas_df)res = df.select(col('age'))res.toPandas().head()
-
각 파티션에서 TaskContext를 사용해 현재 partitionId가져오기spark 2025. 7. 11. 23:00
import org.apache.spark.sql.SparkSessionobject PartitionIdTest { def main(args: Array[String]): Unit = { // TaskId는 Spark에서 작업을 실행할 때 각 작업에 대한 고유한 식별자를 제공합니다. // 이 식별자는 Spark의 내부 로깅 및 모니터링에 사용됩니다. // SparkContext의 getTaskId() 메서드를 사용하여 현재 작업의 TaskId를 가져올 수 있습니다. implicit val spark: SparkSession = SparkSession.builder().appName("TaskIdTest").master("local[*]").getOrCreate() import spark.impl..
-
주식 각 종목의 가장 최근 결과 가져오기postgresql 2025. 6. 21. 16:38
SELECT DISTINCT ON (i.code) i.code, i.date, i.last_actual_price, i.final_predicted_price, i.price_change, i.price_change_percent, i.test_r2, i.volatility, i.prediction_days, i.last_actual_date, i.train_r2, i.test_mse, i.test_rmse, i.test_mae, c.name as company_name, c.rank, i.model_inf..
-
Dataset 으로 정의된 case class 의 fields 가져오기spark 2025. 6. 10. 21:06
case class PwlLog( log_type: String, device_type: String, event_type: String)val fields: Array[String] = classOf[PwlLog].getDeclaredFields.map(_.getName)spark.read .... .withColumn("log_type", ...) .select(fields.map(col): _*) .as[PwlLog]
-
개발 환경 세팅 & Job 제출flink 2025. 5. 5. 18:19
docker-compose.yml 로 flink cluster 시작version: '2'services: jobmanager: image: flink:1.18.1-scala_2.12-java17 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager volumes: - /mnt/c:/opt/flink/c taskmanager: image: flink:1.18.1-scala_2.12-java17 depends_on: - jobmanager com..
-
TypeTag, ClassTagscala/basic 2025. 4. 22. 21:54
TypeTagimport scala.reflect.runtime.universe._def printType[T: TypeTag](value: T): Unit = { println(s"Type: ${typeOf[T]}")}printType(42) // 출력: Type: IntprintType("Hello") // 출력: Type: String ClassTagimport scala.reflect.ClassTagdef createArray[T: ClassTag](size: Int): Array[T] = { new Array[T](size)}val intArray = createArray[Int](5) // Int 배열 생성val strArray = createArray[Strin..
-
Semaphore 를 사용해 최대 maxConcurrent 개의 Future만 실행되도록 제한scala/basic 2025. 3. 13. 10:28
내가 작성package com.naver.search.webimport java.util.concurrent.Semaphoreimport scala.concurrent.ExecutionContext.Implicits.globalimport scala.concurrent.{Await, Future}object SemaphoreFutureTest { def semFuture[T](f: => T, n: Int): Future[T] = { val sem = new Semaphore(n) Future { try { sem.acquire() f } finally { sem.release() } } } def f(x: Int): Int = { Thread.sleep(1000) pr..