작업 요청이 하나 들어 왔는데 이전이라면 java 나 python 으로 할 일이지만, 연휴에 가볍게 Julia 자료본 후에 다시 접할 기회 못 잡았던지라 learn by doing 차원에서 julia 로 간단히 구현 도전해봤다.
DataFrames 을 이용하여 데이터 처리를 해봤고 대량 데이터 DB 저장을 위해 pandas 의 to_sql 과 같은 함수 찾아보았으나 결국 못 찾고 페북의 julia korea 채널에서 도움 받아 간단히 구현했다.
본론으로 들어가서 가령 작업해야할 전체 데이터 중 shop_id : LA-12019, work_id : 103 인 건들만 추출한게 아래와 같다고 해보자(원래 요건을 설명하기 쉽게 변형한거라 컬럼명이나 구현 함수명 등이 쌩뚱 맞은건 양해해주시길).
shop_id | work_id | job_cnt | job_s_time | job_e_time |
LA-12019 | 103 | 5 | 2019-01-01 10:20:36 | 2019-01-01 10:41:36 |
LA-12019 | 103 | 7 | 2019-01-01 10:41:36 | 2019-01-01 11:00:36 |
LA-12019 | 103 | 2 | 2019-01-01 11:00:36 | 2019-01-01 11:40:36 |
LA-12019 | 103 | 4 | 2019-01-01 11:40:36 | 2019-01-01 13:17:36 |
LA-12019 | 103 | 6 | 2019-01-01 13:17:36 | 2019-01-01 13:56:36 |
- job_s_time 과 job_e_time 을 보면 2019-01-01 11시대에 수행된 작업 건수는 7건, 2건, 3건이므로 그 중 최대값은 7이 될 것이다.
- 2019-01-01 12시대에 수행된 작업 건수는 job_s_time 과 job_e_time 에 직접 표시되지는 않았지만 네번째 줄의 2019-01-01 11:40:36 ~ 2019-01-01 13:17:36 가 이 시간대를 포함하고 있으므로 최대값은 4가 될 것이다.
작업할 내용을 요약하면 (위와 같은 식의) 전체 데이터를 job_s_time ~ job_e_time 이 포함하는 시간대별로 분해한 뒤 shop_id, work_id 기준으로 그룹핑해서 시간대별 job_cnt 의 최대값을 구하는거고(아래가 의도하는 결과) 그 결과값을 DB 에 저장하는거.
shop_id | area_id | evnt_dt | job_max |
LA-12019 | 103 | 2019-01-01 10:00 | 7 |
LA-12019 | 103 | 2019-01-01 11:00 | 7 |
LA-12019 | 103 | 2019-01-01 12:00 | 4 |
LA-12019 | 103 | 2019-01-01 13:00 | 6 |
소스 보여주기에 앞서 간단한 구현이긴 했지만 julia 실제 사용해 본 첫 느낌 살짝 적자면 ...
- 레퍼런스가 아직 많지 않군.
- DB 관련 모듈의 완성도(?)가 아직 흡족스러운 수준은 아니네.
- 그럼에도 초보 수준에서도 꽤 깔끔하게 기능 구현 가능(보일러 플레이트한 코딩이 거의 없다. 물론 있다면 그건 현재 내 julia 실력 탓)
아래는 구현 소스. 위에 설명한 요건 참고해서 소스 읽어보면 combine 함수 외에는 julia 알지 못해도 딱히 설명할게 없을 정도다. 그만큼 julia 가 쉽다(?)라고 해도 될 듯.
using MySQL
using DataFrames
using Tables
using Dates
function get_connection()
...
DBInterface.connect(MySQL.Connection, HOST, USER, PASSWD, port = PORT, db = DBNAME)
end
function to_db(conn, itr, name)
rows = Tables.rows(itr)
sch = Tables.schema(rows)
cols = replace(string(sch.names), ':' => "")
MySQL.transaction(conn) do
params = chop(repeat("?,", length(sch.names)))
stmt = DBInterface.prepare(conn, "INSERT INTO job_summary $cols VALUES ($params)")
for (i, row) in enumerate(rows)
DBInterface.execute(stmt, Tables.Row(row))
end
end
end
function get_jobs(conn)
query = """
SELECT shop_id, work_id, job_cnt,
DATE_FORMAT(job_s_time, '%Y-%m-%d %H:00:00') AS evnt_dt,
TIMESTAMPDIFF(HOUR, DATE_FORMAT(job_s_time, '%Y-%m-%d %H:00:00'), job_e_time) AS runtime
FROM work_history
"""
results = DBInterface.execute(conn, query)
return DataFrame(results)
end
function decompose_by_hour(df)
df_decompose = DataFrame("shop_id" => String[], "work_id" => Int[], "evnt_dt" => DateTime[], "job_cnt" => Int[])
for x in eachrow(df)
for i = 0:x.runtime
push!(df_decompose, [x.shop_id, x.work_id, DateTime(x.evnt_dt, "yyyy-mm-dd HH:00:00") + Dates.Hour(i), x.job_cnt])
end
end
return df1
end
get_max_job_count(df) = combine(groupby(df, [:shop_id, :work_id, :evnt_dt]), :job_cnt=>maximum=>:yt_cnt)
function main()
conn = get_connection()
df0 = get_jobs(conn)
df1 = decompose_by_hour(df0)
df2 = get_max_job_count(df1)
to_db(conn, df2)
DBInterface.close!(conn)
end
if !isdefined(Base, :active_repl)
main()
end
'Lang' 카테고리의 다른 글
[kotlin]애기 걸음, reactive 구구단 (0) | 2022.04.04 |
---|---|
[Java]Spring Cloud Stream 에서 Kafka 메시지 헤더 이용하는 방법 외 (0) | 2021.11.19 |
[Julia]첫 공부는 이런 식으로 어떨런지 (0) | 2021.09.27 |
[julia]Decimal to Binary conversion (0) | 2021.09.23 |
r2dbc, DatabaseClient 첫 테스트 (0) | 2021.09.17 |