-
Raw SQL & SQLAlchemy 예제python/응용 2022. 7. 4. 22:31
Raw SQL
pip install pymysql
Single Connection
import pymysql conn = pymysql.connect( host='127.0.0.1', port=3306, user='admin', passwd='admin', db='test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, autocommit=False ) # if the connection was lost, then it reconnects conn.ping(reconnect=True) with conn.cursor() as cursor: cursor.execute("""select name, age from person where age < %s""", (10,)) rows = cursor.fetchall() for row in rows: print(row['name'], row['age']) conn.close()
Pool Connection
DBUtils 를 이용해 pool 을 만들어 사용해 보자. mysql-connector 도 있지만 connection 이 끊겼을 때, 자동으로 연결해 주는 기능이 보이지 않아서 DBUtils 가 좋아 보인다. DBUtils 에서는 ping=1 옵션으로 connection 상태에 대응할 수 있다. 아래 예제의 get_cursor() 처럼 util function 을 만들어 활용하는 것도 좋아 보인다.
pip install pymysql pip install dbutils
import contextlib import pymysql from dbutils.pooled_db import PooledDB # from dbutils.persistent_db import PersistentDB # DB 접속 정보 DB_CONFIG = { "host": "127.0.0.1", "port": 3306, "database": "test", "user": "admin", "password": "admin123", "charset": 'utf8mb4' } # pool: PersistentDB = PersistentDB( pool: PooledDB = PooledDB( creator=pymysql, **DB_CONFIG, maxconnections=5, # 유지할 커넥션 수 autocommit=True, cursorclass=pymysql.cursors.DictCursor, blocking=True, # determines behavior when exceeding the maximum ping=1 # default 1 ) @contextlib.contextmanager def get_cursor(pool) -> pymysql.cursors.Cursor: with pool.connection() as conn: with conn.cursor() as cursor: yield cursor with get_cursor(pool) as cursor: cursor.execute("select name, age from person where age < %s", (10,)) rows = cursor.fetchall() print(rows)
SQLAlchemy
SELECT
from typing import Sequence from sqlalchemy import create_engine, text, Engine, TextClause, Row from sqlalchemy.engine.result import _TP engine: Engine = create_engine(url="mysql+pymysql://admin:admin@localhost:3306/test?charset=utf8mb4", pool_size=3) stmt: TextClause = text("select name, age from person where age > :age") with engine.connect() as conn: result: Sequence[Row[_TP]] = conn.execute(stmt, {'age': 0}).fetchall() for row in result: print(row.name, row.age) print(row['name'], row['age'])
INSERT
from sqlalchemy import create_engine, text, Engine, TextClause engine: Engine = create_engine(url="mysql+pymysql://admin:admin@localhost:3306/test?charset=utf8mb4", pool_size=3) stmt: TextClause = text("insert into person values (:name, :age)") with engine.connect() as conn: cnt = conn.execute(stmt, [{'name': 'C', 'age': 3}, {'name': 'D', 'age': 4}]).rowcount conn.commit() print(cnt)
import sqlalchemy.ext.declarative from sqlalchemy.orm import Query import sqlalchemy.orm # engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=True) engine = sqlalchemy.create_engine("mysql+pymysql://root:Password@my.db.host:3306/ndiff?charset=utf8mb4") Base = sqlalchemy.ext.declarative.declarative_base() class Person(Base): __tablename__ = 'persons' id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True) name = sqlalchemy.Column(sqlalchemy.String(14)) Base.metadata.create_all(engine) Session = sqlalchemy.orm.sessionmaker(bind=engine) session = Session() # INSERT person = Person(name='Mike') session.add(person) session.commit() person_query: Query = session.query(Person) persons = person_query.all() for person in persons: print(person.id, person.name) # UPDATE p4 = session.query(Person).filter_by(name='Mike').first() p4.name = 'Michel' session.add(p4) session.commit() persons = session.query(Person).all() for person in persons: print(person.id, person.name) # DELETE p5 = session.query(Person).filter_by(name='Michel').first() session.delete(p5) session.commit() persons = session.query(Person).all() for person in persons: print(person.id, person.name)
'python > 응용' 카테고리의 다른 글
AsyncIO (0) 2022.10.29 ProcessPoolExecutor (0) 2022.10.29 requests (0) 2022.05.31 json 읽기 / 쓰기 (0) 2022.05.31 pytest - fixture (0) 2022.05.23