개발환경

AggregateQueue 구현

toogari 2018. 6. 1. 11:17


AggregateQueue


import java.util.List;


/**

 * Aggregator 를 일부 구현한 클래스.

 * 적당한 크기와 적당한 시간에 모인 데이터를 aggregate 한다.

 * @param <T>

 */

public interface AggregateQueue<T> {

    /**

     * 큐에 입력한다.

     * @param o

     * @throws InterruptedException

     */

    public void put(T o) throws InterruptedException;


    /**

     * 큐에서 추출한다.

     * @return

     * @throws InterruptedException

     */

    public List<T> take() throws InterruptedException;


    public int size();

}




AggregateQueueImpl 


import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Aggregator의 기능을 일부 구현한 클래스.
 * size 크기가 채워지면 모인 데이터를 추출할 수 있다.
 * size 크기만큼 채워지지 않더라도 timeout 만큼 시간을 기다려 모인 데이터를 추출한다.
 */
public class AggregateQueueImpl implements AggregateQueue {
    private int size = 10;
    private long timeout = 5000;

    private List<Object> objects = new ArrayList<>();

    public AggregateQueueImpl(int size, long timeout) {
        this.size = size;
        this.timeout = timeout;
    }

    /**
     * 데이터를 쌓는다.
     * 다만 size보다 크면 블록된다.
     *
     * @param o
     * @throws InterruptedException
     */
    @Override
    public synchronized void put(Object o) throws InterruptedException {
        boolean logging = true;
        while (objects.size() >= size) {
            if (logging == true) {
                System.out.println("PUTTING....");
                logging=false;
            }
            wait();

        }
        objects.add(o);
        notifyAll();
    }

    /**
     * @return
     * @throws InterruptedException
     */
    @Override
    public synchronized List<Object> take() throws InterruptedException {
        long finish = System.currentTimeMillis() + timeout;
        while (objects.size() < size) {
            long interval = finish - System.currentTimeMillis();
            if (interval > 0) {
                wait();
            } else {
                break;
            }
        }

        if (objects.size() == 0) {
            notifyAll();
            return null;
        } else {
            List result = new ArrayList();
            result.addAll(objects);
            objects.clear();
            notifyAll();
            return result;
        }
    }

    @Override
    public synchronized int size() {
        return objects.size();
    }

    public static void main(String[] args) {
        final Random random = new Random(System.currentTimeMillis());
        final AggregateQueue aggregateQueue = new AggregateQueueImpl(1000, 2000);
        new Thread(new Runnable() {
            long index = 0;

            @Override
            public void run() {
                while (true) {
                    try {
                        aggregateQueue.put((++index) + "");
//                        System.out.println("PUT "+index);
                        Thread.sleep(1+random.nextInt(1));
                        if (index % 1000 == 0) {
                            System.out.println("PUT " + index);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        new Thread(new Runnable() {
            int count = 0;
            int index = 0;
            @Override
            public void run() {
                while (true) {
                    try {
                        List list = aggregateQueue.take();
                        int size = list.size();
                        count += size;
                        if (index++ % 10 == 0) {
                            System.out.println("TAKING ... "+count);
                        }

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("QUEUE " + aggregateQueue.size());
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);
    }


}