본문 바로가기

Slack 채널 정리

ZeroMQ pub/sub

ZeroMQ pub/sub  테스트를 위해 샘플 참고해 구현한 텍스트 파일 읽어서 계속 전송하는 Pub 자바 소스.

import org.zeromq.ZMQ;
​
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

public class JsonServer {
    public static void main (String[] args) throws Exception {
        System.out.println("the server starting...");
        ZMQ.Context context = ZMQ.context(1);

        ZMQ.Socket publisher = context.socket(ZMQ.PUB);
        publisher.bind("tcp://*:5559");

        File crunchifyFile = new File("D:/temp/Pow.json");
        FileInputStream fileInputStream;

        String json = null;
        try {
            fileInputStream = new FileInputStream(crunchifyFile);
            byte[] crunchifyValue = new byte[(int) crunchifyFile.length()];
            fileInputStream.read(crunchifyValue);
            fileInputStream.close();
            json = new String(crunchifyValue, "UTF-8");
            while (!Thread.currentThread ().isInterrupted ()) {
                publisher.send(json, 0);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        publisher.close ();
        context.term ();
    }
}

앞의 pub 에서 전송한 메시지를 수신해서 파일로 저장하는 ZeroMQ sub 자바 샘플. 442byte, 10,000번 수신(저장 파일 4.22MB) 에 560ms 정도 걸림.

import org.zeromq.ZMQ;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.time.Duration;
import java.time.Instant;

public class JsonClient {
    public static void main (String[] args) throws Exception {
        ZMQ.Context context = ZMQ.context(1);

        Instant start = Instant.now();
        ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
        subscriber.connect("tcp://서버IP:5559");

        System.out.println("receiving messages...");
        subscriber.subscribe(ZMQ.SUBSCRIPTION_ALL); // no filtering

        String fileName = "D:/test.log";
        BufferedWriter writer = new BufferedWriter(new FileWriter(fileName, true));

        for (int i = 0; i < 10000; i++) {
            String message = subscriber.recvStr(0);
            writer.append(message);
        }

        Instant finish = Instant.now();
        long timeElapsed = Duration.between(start, finish).toMillis();
        System.out.println(timeElapsed);
        writer.close();

        subscriber.close();
        context.term();
    }
}

앞의 Java sub 소스를 C# 으로 대충 구현한  소스. NetMQ 이용. 동일한 테스트에서 550ms 나옴. 자바 라이브러리 jeroMQ 랑 속도차 별로 없어 보임.

using System;

using NetMQ;
using System.Diagnostics;
using NetMQ.Sockets;
using System.IO;

namespace Worker
{
    class Program
    {
        static void Main(string[] args)
        {
            SendJson();
        }

        private static void SendJson()
        {
            using (var subscriber = new SubscriberSocket())
            {

                var logFile = @"..\..\archive_json.log";
                System.IO.FileInfo fi = new System.IO.FileInfo(logFile);
                try
                {
                    fi.Delete();
                }
                catch (System.IO.IOException e)
                {
                    Console.WriteLine(e.Message);
                }

                Stopwatch sw = new Stopwatch();
                sw.Start();

                subscriber.Connect("tcp://서버IP:5559");
                subscriber.Subscribe(""); // no filtering

                int updateNumber = 0;
                using (StreamWriter outputFile = new StreamWriter(logFile))
                {
                    for (; updateNumber < 10000; updateNumber++)
                    {
                        string message = subscriber.ReceiveFrameString();
                        outputFile.WriteLine(message);
                    }
                }

                sw.Stop();
                Console.WriteLine(sw.ElapsedMilliseconds.ToString() + "ms");
            }
        }
    }
}