본문 바로가기

Lang

[Python]두 DB 간 테이블 동기화 코딩

이기종 두 DB 간 테이블 동기화하는 프로그램을 간단하게 작성해보았습니다. 

스케쥴러를 이용해서 정기적으로 postgresql 의 테이블 데이터를 읽어서 MariaDB 에 넣어줍니다. 
작업 처리 결과를 소스 테이블의 sync_flag 컬럼에 일괄 업데이트합니다 (성공하면 'Y', 실패하면 'E')

pip install mysqlclient
pip install psycopg2
pip install sqlalchemy
pip install pandas
pip install apscheduler

중요한 몇 개 소스만 간추려 옮겨봅니다.

DB 연결 구현.

class Connection:
    def __init__(self):
        self.host = Config.DB['server']
        self.user = Config.DB['user']
        ...
        self.mdb = self.__md_connect()
        self.pdb = self.__pg_connect()

    def __pg_connect(self):
        url = 'postgresql://{}:{}@{}:{}/{}'.format(self.user, self.passwd, self.host, self.p_port, self.db_p_name)
        engine = create_engine(url, client_encoding='utf8')
        return engine

    def __md_connect(self):
        url = 'mysql+mysqldb://{}:{}@{}:{}/{}'.format(self.user, self.passwd, self.host, self.m_port, self.db_m_name)
        engine = create_engine(url)
        return engine

테이블 동기화 처리.

class TableSyncer:
    def __init__(self, db_conn, tbl_name, tbl_keys):
        self._db_conn = db_conn
        self._tbl_name = tbl_name
        self._tbl_keys = tbl_keys
        self._tbl_vals = ''

    @property
    def db_conn(self):
        return self._db_conn

    @property
    def tbl_name(self):
        return self._tbl_name

    @property
    def tbl_keys(self):
        return self._tbl_keys

    @property
    def tbl_vals(self):
        return self._tbl_vals

    @tbl_vals.setter
    def tbl_vals(self, in_tbl_vals):
        self._tbl_vals = in_tbl_vals

    @contextmanager
    def get_session(self, db_engine):
        session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=db_engine))

        try:
            yield session
        except Exception as e:
            session.rollback()
            self.upd_err(session)
            raise
        else:
            session.commit()
        finally:
            session.close()

    def upd_err(self, session):
        s = session()
        su_qry = Query.SQL['upd_sync_flag'].format(self.tbl_name, 'E', ','.join(self.tbl_keys))
        s.execute(su_qry, {'p1': tuple(self.tbl_vals)})
        session.commit()

    def sync_tbl(self):
        start_time = time.time()
        pg_engine = self.db_conn.pdb

        with self.get_session(pg_engine) as session:
            ss_qry = Query.SQL['ss_' + self.tbl_name]
            df = pd.read_sql_query(ss_qry, pg_engine)

            start_time1 = time.time()
            if len(df) > 0:
                md_engine = self.db_conn.mdb
                rows = list(df[self.tbl_keys].itertuples(index=False))
                self.tbl_vals = rows

                df.to_sql(self.tbl_name, con=md_engine, if_exists='append', chunksize=2000, index=False, method='multi')

                su_qry = Query.SQL['upd_sync_flag'].format(self.tbl_name, 'Y', ','.join(self.tbl_keys))
                s = session()
                s.execute(su_qry, {'p1': tuple(rows)})

SQL문 꾸러미.

class Query(object):
    SQL = {
        'upd_sync_flag': """
            UPDATE {} SET sync_flag = '{}' 
            WHERE ({}) in :p1
        """,

        'ss_conditions': """
            SELECT time, location, temperature, humidity
            FROM conditions
            WHERE sync_flag IS NULL
            """,
     ...
    }

스케쥴러 실행.

def qry_condition(x):
    return {
        'conditions': ['conditions', ['time', 'location']],
        'mach_mvt': ['mach_mvt_hst', ['evnt_dt', 'mach_id']],
        ...,
    }[x]


def exec_sync(db_conn, key):
    params = qry_condition(key)
    db_util = TableSyncer(db_conn, params[0], params[1])
    db_util.sync_tbl()


if __name__ == '__main__':
    db_conn = Connection()
    scheduler = BackgroundScheduler()

    scheduler.add_job(exec_sync, 'interval', seconds=10, args=[db_conn, 'conditions'])
    scheduler.add_job(exec_sync, 'interval', seconds=10, args=[db_conn, 'mach_mvt'])
    ...
    scheduler.start()

    try:
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

체계적으로 파이썬 학습한게 아니라 모듈 구성, 클래스 사용 등이 제대로인지 사실 잘 모르겠습니다.

그거말고도 개선할 점이 많습니다.

  • sqlalchemy의 ORM 특성 활용 못하고 그냥 Query 문 직접 이용.
  • DB 연결 처리, 트랜젝션 처리.
  • config 구성.
  • 결정적으로 벌크 인써트 중 한 건이라도 데이터 오류있으면 전체 롤백.
    • exception 시 읽어온 dataframe 을 한 건씩 insert 처리하는 식 등의 retry 로직 추가 필요.

실전에 활용해보면서 언어 익히는 중이라 개선 작업하면서 좀 더 파이써닉하게 나아질거라 혼자 생각해봅니다.