ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Raw SQL & SQLAlchemy 예제
    python/응용 2022. 7. 4. 22:31

    Raw SQL

    pip install pymysql

     

    Single Connection

    import pymysql
    
    conn = pymysql.connect(
        host='127.0.0.1',
        port=3306,
        user='admin',
        passwd='admin',
        db='test',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor,
        autocommit=False
    )
    
    # if the connection was lost, then it reconnects
    conn.ping(reconnect=True)
    with conn.cursor() as cursor:
        cursor.execute("""select name, age from person where age < %s""", (10,))
        rows = cursor.fetchall()
    
    for row in rows:
        print(row['name'], row['age'])
    
    conn.close()

     

    Pool Connection

    DBUtils 를 이용해 pool 을 만들어 사용해 보자. mysql-connector 도 있지만 connection 이 끊겼을 때, 자동으로 연결해 주는 기능이 보이지 않아서 DBUtils 가 좋아 보인다. DBUtils 에서는 ping=1 옵션으로 connection 상태에 대응할 수 있다. 아래 예제의 get_cursor() 처럼 util function 을 만들어 활용하는 것도 좋아 보인다.

    pip install pymysql
    pip install dbutils

     

    import contextlib
    
    import pymysql
    from dbutils.pooled_db import PooledDB
    # from dbutils.persistent_db import PersistentDB
    
    # DB 접속 정보
    DB_CONFIG = {
        "host": "127.0.0.1",
        "port": 3306,
        "database": "test",
        "user": "admin",
        "password": "admin123",
        "charset": 'utf8mb4'
    }
    
    # pool: PersistentDB = PersistentDB(
    pool: PooledDB = PooledDB(
        creator=pymysql,
        **DB_CONFIG,
        maxconnections=5,  # 유지할 커넥션 수
        autocommit=True,
        cursorclass=pymysql.cursors.DictCursor,
        blocking=True,  # determines behavior when exceeding the maximum
        ping=1  # default 1
    )
    
    
    @contextlib.contextmanager
    def get_cursor(pool) -> pymysql.cursors.Cursor:
        with pool.connection() as conn:
            with conn.cursor() as cursor:
                yield cursor
    
    
    with get_cursor(pool) as cursor:
        cursor.execute("select name, age from person where age < %s", (10,))
        rows = cursor.fetchall()
    
    print(rows)

     


    SQLAlchemy

    SELECT

    from typing import Sequence
    
    from sqlalchemy import create_engine, text, Engine, TextClause, Row
    from sqlalchemy.engine.result import _TP
    
    engine: Engine = create_engine(url="mysql+pymysql://admin:admin@localhost:3306/test?charset=utf8mb4", pool_size=3)
    stmt: TextClause = text("select name, age from person where age > :age")
    
    with engine.connect() as conn:
        result: Sequence[Row[_TP]] = conn.execute(stmt, {'age': 0}).fetchall()
    
    for row in result:
        print(row.name, row.age)
        print(row['name'], row['age'])

     

    INSERT

    from sqlalchemy import create_engine, text, Engine, TextClause
    
    engine: Engine = create_engine(url="mysql+pymysql://admin:admin@localhost:3306/test?charset=utf8mb4", pool_size=3)
    stmt: TextClause = text("insert into person values (:name, :age)")
    
    with engine.connect() as conn:
        cnt = conn.execute(stmt, [{'name': 'C', 'age': 3}, {'name': 'D', 'age': 4}]).rowcount
        conn.commit()
    
    print(cnt)

     

    import sqlalchemy.ext.declarative
    from sqlalchemy.orm import Query
    import sqlalchemy.orm
    
    # engine = sqlalchemy.create_engine('sqlite:///:memory:', echo=True)
    engine = sqlalchemy.create_engine("mysql+pymysql://root:Password@my.db.host:3306/ndiff?charset=utf8mb4")
    
    Base = sqlalchemy.ext.declarative.declarative_base()
    
    
    class Person(Base):
        __tablename__ = 'persons'
        id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
        name = sqlalchemy.Column(sqlalchemy.String(14))
    
    
    Base.metadata.create_all(engine)
    
    Session = sqlalchemy.orm.sessionmaker(bind=engine)
    session = Session()
    
    # INSERT
    person = Person(name='Mike')
    session.add(person)
    session.commit()
    
    person_query: Query = session.query(Person)
    persons = person_query.all()
    for person in persons:
        print(person.id, person.name)
    
    # UPDATE
    p4 = session.query(Person).filter_by(name='Mike').first()
    p4.name = 'Michel'
    session.add(p4)
    session.commit()
    
    persons = session.query(Person).all()
    for person in persons:
        print(person.id, person.name)
    
    # DELETE
    p5 = session.query(Person).filter_by(name='Michel').first()
    session.delete(p5)
    session.commit()
    
    persons = session.query(Person).all()
    for person in persons:
        print(person.id, person.name)

    'python > 응용' 카테고리의 다른 글

    AsyncIO  (0) 2022.10.29
    ProcessPoolExecutor  (0) 2022.10.29
    requests  (0) 2022.05.31
    json 읽기 / 쓰기  (0) 2022.05.31
    pytest - fixture  (0) 2022.05.23

    댓글

Designed by Tistory.