ZeroMQ pub/sub 테스트를 위해 샘플 참고해 구현한 텍스트 파일 읽어서 계속 전송하는 Pub 자바 소스.
import org.zeromq.ZMQ;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
public class JsonServer {
public static void main (String[] args) throws Exception {
System.out.println("the server starting...");
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5559");
File crunchifyFile = new File("D:/temp/Pow.json");
FileInputStream fileInputStream;
String json = null;
try {
fileInputStream = new FileInputStream(crunchifyFile);
byte[] crunchifyValue = new byte[(int) crunchifyFile.length()];
fileInputStream.read(crunchifyValue);
fileInputStream.close();
json = new String(crunchifyValue, "UTF-8");
while (!Thread.currentThread ().isInterrupted ()) {
publisher.send(json, 0);
}
} catch (IOException e) {
e.printStackTrace();
}
publisher.close ();
context.term ();
}
}
앞의 pub 에서 전송한 메시지를 수신해서 파일로 저장하는 ZeroMQ sub 자바 샘플. 442byte, 10,000번 수신(저장 파일 4.22MB) 에 560ms 정도 걸림.
import org.zeromq.ZMQ;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.time.Duration;
import java.time.Instant;
public class JsonClient {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
Instant start = Instant.now();
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://서버IP:5559");
System.out.println("receiving messages...");
subscriber.subscribe(ZMQ.SUBSCRIPTION_ALL); // no filtering
String fileName = "D:/test.log";
BufferedWriter writer = new BufferedWriter(new FileWriter(fileName, true));
for (int i = 0; i < 10000; i++) {
String message = subscriber.recvStr(0);
writer.append(message);
}
Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis();
System.out.println(timeElapsed);
writer.close();
subscriber.close();
context.term();
}
}
앞의 Java sub 소스를 C# 으로 대충 구현한 소스. NetMQ 이용. 동일한 테스트에서 550ms 나옴. 자바 라이브러리 jeroMQ 랑 속도차 별로 없어 보임.
using System;
using NetMQ;
using System.Diagnostics;
using NetMQ.Sockets;
using System.IO;
namespace Worker
{
class Program
{
static void Main(string[] args)
{
SendJson();
}
private static void SendJson()
{
using (var subscriber = new SubscriberSocket())
{
var logFile = @"..\..\archive_json.log";
System.IO.FileInfo fi = new System.IO.FileInfo(logFile);
try
{
fi.Delete();
}
catch (System.IO.IOException e)
{
Console.WriteLine(e.Message);
}
Stopwatch sw = new Stopwatch();
sw.Start();
subscriber.Connect("tcp://서버IP:5559");
subscriber.Subscribe(""); // no filtering
int updateNumber = 0;
using (StreamWriter outputFile = new StreamWriter(logFile))
{
for (; updateNumber < 10000; updateNumber++)
{
string message = subscriber.ReceiveFrameString();
outputFile.WriteLine(message);
}
}
sw.Stop();
Console.WriteLine(sw.ElapsedMilliseconds.ToString() + "ms");
}
}
}
}
'Slack 채널 정리' 카테고리의 다른 글
정규식 ([\s\S]?) (0) | 2019.12.03 |
---|---|
intelliJ 변수 할당 핫키 (0) | 2019.12.03 |
JpaSystemException: No default constructor for entity (0) | 2019.12.03 |
대량 데이터 조회 비동기 처리 (0) | 2019.12.03 |
특정 좌표가 어느 위치에 포함되어 있는지 찾기(ST_CONTAINS) (0) | 2019.12.03 |