대량 데이터 처리 관련해서 막 코딩한거 공유해봅니다.
대략 이런 내용입니다. 클라이언트 단에서 요청한 특정 시점 이후의 데이터를 DB 에서 뽑아 일정한 주기로 전송해주어야 함.
1. 데이터 건수가 상당히 큼. 이로 인한 DB/서버의 성능 저하/부하를 가능한 줄이는 방법이 필요.
2. 데이터 조회 시간 지연 등등의 문제가 전송 주기에 영향을 주어서는 안됨.여러가지 방법을 생각해보았으나 깔끔한 해법 못 찾아서 일단 막 코딩해봤습니다. 실행 환경은 jdk8, spring, JPA 등입니다 (ZeroMQ 사용한 전송 처리 부분은 위에 적지 않았습니다)
구현단은 그리 복잡하지는 않습니다.
1. DB 조회가 전송에 직접적으로 영향 주지 않도록 비동기 처리(Spring 의 @Async 이용).
2. DB 조회는 한 번의 쿼리로 하지 않고 일정한 단위로 쪼개서 순차적으로 조회(테스트 데이터가 마땅찮아서 조회 범위를 년 단위로 쪼개고 1년 씩 증가 시켜가며 조회하는 것으로 우선 구현)
3. List 타입인 DB 조회 결과를 조금이라도 메모리에서 줄여 주려고 Queue 타입으로 변환하여 전송 건은 지워지도록 처리(해놓고 보니 그리 경제적이진 않은 듯 싶긴 하지만)
4. 큐를 루프 돌며 500ms 주기로 전송하다가 큐에 3건 남았을 때 비동기로 다음 데이터 조회 처리.
5. DB 추가 조회는 테스트 목적으로 4번만 하게 했습니다. 실제 상황에서는 데이터 전송 중간 중간 데이터 추가 조회 필요한지 확인해서 동적으로 처리하도록 해야겠죠.
6. 조회 조건으로 넘어가는 시간 계산은 jdk 8 부터 지원되는 java.time.LocalDate 를 사용했습니다. 처음에는 이거 모르고 옛날 하던 식으로 Calendar 를 썼는데 이 놈이 쓰레드 세이프 하지 않은 탓인지 비동기 메서드 내에서 제대로 동작하지 않아서 이 녀석으로 바꿔 구현했습니다. joda 였나도 가끔 썼었는데 이젠 그럴 일 별로 없을 듯.
@Async
public void pubMessage(ZMQ.Socket publisher, Date fromDt, Queue<Employee> queue) throws InterruptedException {
Future<Queue<Employee>> completableFuture = basicService.onAsync1(fromDt, queue);
while(!completableFuture.isDone()) {
Thread.sleep(100);
}
int i = 0;
int MAX_NEXT = 4;
while (queue.peek() != null) {
try {
Thread.sleep(500);
publisher.send(objs2json(queue.poll()));
if(i != MAX_NEXT && queue.size() == 3) {
Date nextDt = CommonUtil.addTime(fromDt, ++i);
basicService.getEmployees(nextDt, queue);
}
if(i == MAX_NEXT && queue.size() == 0) {
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Async
public Future<Queue<Employee>> getEmployees(Date fromDt, Queue<Employee> queue) throws InterruptedException {
CompletableFuture<Queue<Employee>> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
Date toDt = CommonUtil.addTime(fromDt, 1);
employees = employeeRepository.findByBirthDateBetweenOrderByBirthDateAsc(fromDt, toDt);
employees.forEach(m -> {
queue.offer((Employee) m);
});
employees.clear();
completableFuture.complete(queue);
return null;
});
return completableFuture;
}
public static Date addTime(Date dt, int interval) {
LocalDate toDt = dt.toLocalDate().plusYears(interval);
return Date.valueOf(toDt);
}
// 실행은
Queue<Employee> queue = new LinkedList<>();
pubMessage(publisher, fromDt, queue);
'Slack 채널 정리' 카테고리의 다른 글
ZeroMQ pub/sub (0) | 2019.12.03 |
---|---|
JpaSystemException: No default constructor for entity (0) | 2019.12.03 |
특정 좌표가 어느 위치에 포함되어 있는지 찾기(ST_CONTAINS) (0) | 2019.12.03 |
ZeroMQ 잡담 (0) | 2019.12.02 |
vim macro (0) | 2019.12.02 |