본문 바로가기

Lang

[Julia]DataFrame 이용한 간단한 데이터 조작 예

작업 요청이 하나 들어 왔는데 이전이라면 java 나 python 으로 할 일이지만, 연휴에 가볍게 Julia 자료본 후에 다시 접할 기회 못 잡았던지라 learn by doing 차원에서 julia 로 간단히 구현 도전해봤다.

DataFrames 을 이용하여 데이터 처리를 해봤고 대량 데이터 DB 저장을 위해 pandas 의 to_sql 과 같은 함수 찾아보았으나 결국 못 찾고 페북의 julia korea 채널에서 도움 받아 간단히 구현했다. 

본론으로 들어가서 가령 작업해야할 전체 데이터 중 shop_id : LA-12019, work_id : 103 인 건들만 추출한게 아래와 같다고 해보자(원래 요건을 설명하기 쉽게 변형한거라 컬럼명이나 구현 함수명 등이 쌩뚱 맞은건 양해해주시길).

shop_id work_id job_cnt job_s_time job_e_time
LA-12019 103 5 2019-01-01 10:20:36 2019-01-01 10:41:36
LA-12019 103 7 2019-01-01 10:41:36 2019-01-01 11:00:36
LA-12019 103 2 2019-01-01 11:00:36 2019-01-01 11:40:36
LA-12019 103 4 2019-01-01 11:40:36 2019-01-01 13:17:36
LA-12019 103 6 2019-01-01 13:17:36 2019-01-01 13:56:36
  • job_s_time 과 job_e_time 을 보면 2019-01-01 11시대에 수행된 작업 건수는 7건, 2건, 3건이므로 그 중 최대값은 7이 될 것이다.
  • 2019-01-01 12시대에 수행된 작업 건수는 job_s_time 과 job_e_time 에 직접 표시되지는 않았지만 네번째 줄의 2019-01-01 11:40:36 ~ 2019-01-01 13:17:36 가 이 시간대를 포함하고 있으므로 최대값은 4가 될 것이다.

작업할 내용을 요약하면 (위와 같은 식의) 전체 데이터를 job_s_time ~ job_e_time 이 포함하는 시간대별로 분해한 뒤 shop_id, work_id 기준으로 그룹핑해서 시간대별 job_cnt 의 최대값을 구하는거고(아래가 의도하는 결과) 그 결과값을 DB 에 저장하는거.

shop_id area_id evnt_dt job_max
LA-12019 103 2019-01-01 10:00 7
LA-12019 103 2019-01-01 11:00 7
LA-12019 103 2019-01-01 12:00 4
LA-12019 103 2019-01-01 13:00 6


소스 보여주기에 앞서 간단한 구현이긴 했지만 julia 실제 사용해 본 첫 느낌 살짝 적자면 ...

  • 레퍼런스가 아직 많지 않군.
  • DB 관련 모듈의 완성도(?)가 아직 흡족스러운 수준은 아니네.
  • 그럼에도 초보 수준에서도 꽤 깔끔하게 기능 구현 가능(보일러 플레이트한 코딩이 거의 없다. 물론 있다면 그건 현재 내 julia 실력 탓)

아래는 구현 소스. 위에 설명한 요건 참고해서 소스 읽어보면 combine 함수 외에는 julia 알지 못해도 딱히 설명할게 없을 정도다. 그만큼 julia 가 쉽다(?)라고 해도 될 듯.

using MySQL
using DataFrames
using Tables
using Dates

function get_connection()
    ...
    DBInterface.connect(MySQL.Connection, HOST, USER, PASSWD, port = PORT, db = DBNAME)
end


function to_db(conn, itr, name)
    rows = Tables.rows(itr)
    sch = Tables.schema(rows)
    cols = replace(string(sch.names), ':' => "")
    
    MySQL.transaction(conn) do
        params = chop(repeat("?,", length(sch.names)))
        stmt = DBInterface.prepare(conn, "INSERT INTO job_summary $cols VALUES ($params)")
        for (i, row) in enumerate(rows)
            DBInterface.execute(stmt, Tables.Row(row))
        end
    end
end


function get_jobs(conn)
    query = """
        SELECT shop_id, work_id, job_cnt,
        DATE_FORMAT(job_s_time, '%Y-%m-%d %H:00:00') AS evnt_dt,
        TIMESTAMPDIFF(HOUR, DATE_FORMAT(job_s_time, '%Y-%m-%d %H:00:00'), job_e_time) AS runtime
        FROM work_history
    """

    results = DBInterface.execute(conn, query)
    return DataFrame(results)
end


function decompose_by_hour(df)
    df_decompose = DataFrame("shop_id" => String[], "work_id" => Int[], "evnt_dt" => DateTime[], "job_cnt" => Int[])
    for x in eachrow(df)
        for i = 0:x.runtime
            push!(df_decompose, [x.shop_id, x.work_id, DateTime(x.evnt_dt, "yyyy-mm-dd HH:00:00") + Dates.Hour(i), x.job_cnt])
        end
    end
    return df1
end

get_max_job_count(df) = combine(groupby(df, [:shop_id, :work_id, :evnt_dt]), :job_cnt=>maximum=>:yt_cnt)


function main()
    conn = get_connection()
    df0 = get_jobs(conn)
    df1 = decompose_by_hour(df0)
    df2 = get_max_job_count(df1)
    to_db(conn, df2)
    DBInterface.close!(conn)
end

if !isdefined(Base, :active_repl)
    main()
end