이기종 두 DB 간 테이블 동기화하는 프로그램을 간단하게 작성해보았습니다.
스케쥴러를 이용해서 정기적으로 postgresql 의 테이블 데이터를 읽어서 MariaDB 에 넣어줍니다.
작업 처리 결과를 소스 테이블의 sync_flag 컬럼에 일괄 업데이트합니다 (성공하면 'Y', 실패하면 'E')
pip install mysqlclient
pip install psycopg2
pip install sqlalchemy
pip install pandas
pip install apscheduler
중요한 몇 개 소스만 간추려 옮겨봅니다.
DB 연결 구현.
class Connection:
def __init__(self):
self.host = Config.DB['server']
self.user = Config.DB['user']
...
self.mdb = self.__md_connect()
self.pdb = self.__pg_connect()
def __pg_connect(self):
url = 'postgresql://{}:{}@{}:{}/{}'.format(self.user, self.passwd, self.host, self.p_port, self.db_p_name)
engine = create_engine(url, client_encoding='utf8')
return engine
def __md_connect(self):
url = 'mysql+mysqldb://{}:{}@{}:{}/{}'.format(self.user, self.passwd, self.host, self.m_port, self.db_m_name)
engine = create_engine(url)
return engine
테이블 동기화 처리.
class TableSyncer:
def __init__(self, db_conn, tbl_name, tbl_keys):
self._db_conn = db_conn
self._tbl_name = tbl_name
self._tbl_keys = tbl_keys
self._tbl_vals = ''
@property
def db_conn(self):
return self._db_conn
@property
def tbl_name(self):
return self._tbl_name
@property
def tbl_keys(self):
return self._tbl_keys
@property
def tbl_vals(self):
return self._tbl_vals
@tbl_vals.setter
def tbl_vals(self, in_tbl_vals):
self._tbl_vals = in_tbl_vals
@contextmanager
def get_session(self, db_engine):
session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=db_engine))
try:
yield session
except Exception as e:
session.rollback()
self.upd_err(session)
raise
else:
session.commit()
finally:
session.close()
def upd_err(self, session):
s = session()
su_qry = Query.SQL['upd_sync_flag'].format(self.tbl_name, 'E', ','.join(self.tbl_keys))
s.execute(su_qry, {'p1': tuple(self.tbl_vals)})
session.commit()
def sync_tbl(self):
start_time = time.time()
pg_engine = self.db_conn.pdb
with self.get_session(pg_engine) as session:
ss_qry = Query.SQL['ss_' + self.tbl_name]
df = pd.read_sql_query(ss_qry, pg_engine)
start_time1 = time.time()
if len(df) > 0:
md_engine = self.db_conn.mdb
rows = list(df[self.tbl_keys].itertuples(index=False))
self.tbl_vals = rows
df.to_sql(self.tbl_name, con=md_engine, if_exists='append', chunksize=2000, index=False, method='multi')
su_qry = Query.SQL['upd_sync_flag'].format(self.tbl_name, 'Y', ','.join(self.tbl_keys))
s = session()
s.execute(su_qry, {'p1': tuple(rows)})
SQL문 꾸러미.
class Query(object):
SQL = {
'upd_sync_flag': """
UPDATE {} SET sync_flag = '{}'
WHERE ({}) in :p1
""",
'ss_conditions': """
SELECT time, location, temperature, humidity
FROM conditions
WHERE sync_flag IS NULL
""",
...
}
스케쥴러 실행.
def qry_condition(x):
return {
'conditions': ['conditions', ['time', 'location']],
'mach_mvt': ['mach_mvt_hst', ['evnt_dt', 'mach_id']],
...,
}[x]
def exec_sync(db_conn, key):
params = qry_condition(key)
db_util = TableSyncer(db_conn, params[0], params[1])
db_util.sync_tbl()
if __name__ == '__main__':
db_conn = Connection()
scheduler = BackgroundScheduler()
scheduler.add_job(exec_sync, 'interval', seconds=10, args=[db_conn, 'conditions'])
scheduler.add_job(exec_sync, 'interval', seconds=10, args=[db_conn, 'mach_mvt'])
...
scheduler.start()
try:
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
체계적으로 파이썬 학습한게 아니라 모듈 구성, 클래스 사용 등이 제대로인지 사실 잘 모르겠습니다.
그거말고도 개선할 점이 많습니다.
- sqlalchemy의 ORM 특성 활용 못하고 그냥 Query 문 직접 이용.
- DB 연결 처리, 트랜젝션 처리.
- config 구성.
- 결정적으로 벌크 인써트 중 한 건이라도 데이터 오류있으면 전체 롤백.
- exception 시 읽어온 dataframe 을 한 건씩 insert 처리하는 식 등의 retry 로직 추가 필요.
실전에 활용해보면서 언어 익히는 중이라 개선 작업하면서 좀 더 파이써닉하게 나아질거라 혼자 생각해봅니다.
'Lang' 카테고리의 다른 글
Armeria, gRPC, JPA 간단한 샘플 (2) | 2020.08.19 |
---|---|
[python]cx_oracle 설정, 사용 예 (0) | 2020.07.28 |
[Java]여러 DB 환경에서 native query 쓸 때 orm.xml 문제 (0) | 2020.07.15 |
[Python] 야구 게임 - 숫자 맞추기 (0) | 2020.07.11 |
[Python]방정식 문제를 코딩으로 풀어보기 (0) | 2020.05.19 |