CompletableFuture와 리액티브 프로그래밍 컨셉의 기초
Executor와 스레드 풀 자바에서는 ExecutorService
를 사용해 스레드 풀을 구성할 수 있다. 구성한 스레드 풀에 작업을 제출함으로 써 작업을 수행시키는 방식이다.
장점
하드웨어 코어에 알맞은 풀을 구성해 병렬성을 만족 시킬 수 있다.
태스크 큐의 크기, 정책, 우선순위 등을 구성할 수 있다.
단점
k개의 스레드 풀을 구성하면 작업은 동시에 최대 k개만 수행된다.
스레드가 종료되지 않는다면 어플리케이션은 정상적으로 종료될 수 없다.
스레드간의 레이스 컨디션 문제를 야기할 수 있다.
Future 형식의 API Future<T>
를 통해 비동기 API를 사용할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ExecutorServiceExample { public static void main (String[] args) throws ExecutionException, InterruptedException { int x = 1337 ; ExecutorService executorService = Executors.newFixedThreadPool(2 ); Future<Integer> y = executorService.submit(() -> fo(x)); Future<Integer> z = executorService.submit(() -> go(x)); System.out.println(y.get() + z.get()); executorService.shutdown(); } }
리액티브 형식 API 함수 파라미터에 콜백형식을 넣어 태스크가 완료되면 return문이 아닌 람다 내에서 이를 호출한다.
리액티브 형식은 결과가 아닌 이벤트에 반응하도록 설계함으로서 예제가 적절하지는 않다.
println
이 모든 결과가 반영되기 전에 호출되는데, 적절한 lock
을 통해 조정할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class CallbackStyleExample { public static void main (String[] args) { int x = 1337 ; Result result = new Result (); f(x, (int y) -> { result.left = y; System.out.println("f : " + (result.left + result.right)); }); g(x, (int z) -> { result.right = z; System.out.println("g : " + (result.left + result.right)); }); } private static class Result { private int left; private int right; } private static void f (int x, IntConsumer dealWithResult) { dealWithResult.accept(Functions.f(x)); } private static void g (int x, IntConsumer dealWithResult) { dealWithResult.accept(Functions.g(x)); } }
적절하지 못한 Thread.sleep Thread.sleep
을 통해 태스크를 지연시키는 방법은 자원을 점유하면서 block
상태를 유지한다. 이는 다른 태스크나 스레드가 cpu 자원을 점유하고 작업을 수행하는 것을 방해한다. 일정 시간 뒤에 다른 태스크를 수행해야 하는 경우에는 ScheduledExecutorService
를 통해 태스크를 예약 수행 할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ScheduledExecutorServiceExample { public static void main (String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1 ); work1(); scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10 , TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } public static void work1 () { System.out.println("Hello from Work1!" ); } public static void work2 () { System.out.println("Hello from Work2!" ); } }
CompletableFuture와 콤비네이터를 사용한 동시성 태스크 a와 b가 있고 두 태스크의 결과를 합쳐 작업을 마무리하는 코드가 있다고 하자.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class CFComplete { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); int x = 1337 ; CompletableFuture<Integer> a = new CompletableFuture <>(); executorService.submit(() -> a.complete(f(x))); int b = g(x); System.out.println(a.get() + b); executorService.shutdown(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CFCombine { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); int x = 1337 ; CompletableFuture<Integer> a = new CompletableFuture <>(); CompletableFuture<Integer> b = new CompletableFuture <>(); CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z); executorService.submit(() -> a.complete(f(x))); executorService.submit(() -> b.complete(g(x))); System.out.println(c.get()); executorService.shutdown(); } }
Pub-Sub 모델 Pub-Sub 모델의 개념은 다음과 같다.
구독자가 구독할 수 있는 발행자
구독자와 발행자의 연결(구독)
구독을 이용한 메세지 혹은 이벤트를 전송
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 public class Chapter15 { public static void main (String[] args) { Publisher<String> fox = new Publisher <>("FOX NEWS" ); Publisher<String> cnn = new Publisher <>("CNN NEWS" ); Subscriber<String> sub1 = new Subscriber <>(); Subscriber<String> sub2 = new Subscriber <>(); Subscriber<String> sub3 = new Subscriber <>(); fox.subscribe(sub1); fox.subscribe(sub2); cnn.subscribe(sub3); fox.addHeadLine("First Headline On Fox" ); fox.addHeadLine("Second Headline On Fox" ); fox.addHeadLine("Third Headline On Fox" ); cnn.addHeadLine("First Headline On CNN" ); sub1.getNews(3 ); sub2.getNews(2 ); sub3.getNews(1 ); } } public class Publisher <T> implements Flow .Publisher<T> { private List<T> headLines = new ArrayList <>(); private String news; public Publisher (String news) { this .news = news; } @Override public void subscribe (Flow.Subscriber<? super T> subscriber) { Subscription<T> subscription = new Subscription <>(this , (Subscriber<T>) subscriber); subscriber.onSubscribe(subscription); } public List<T> recentNews (int n) { return headLines.subList(Math.max(headLines.size() - n, 0 ), headLines.size()); } public void addHeadLine (T headLine) { headLines.add(headLine); } } public class Subscription <T> implements Flow .Subscription { private Publisher<T> publisher; private Subscriber<T> subscriber; public Subscription (Publisher<T> publisher, Subscriber<T> subscriber) { this .publisher = publisher; this .subscriber = subscriber; } @Override public void request (long n) { List<T> result = publisher.recentNews(Long.valueOf(n).intValue()); result.forEach(subscriber::onNext); subscriber.onComplete(); } @Override public void cancel () { } } public class Subscriber <T> implements Flow .Subscriber<T> { private Flow.Subscription subscription; @Override public void onSubscribe (Flow.Subscription subscription) { this .subscription = subscription; } @Override public void onNext (T item) { System.out.println(item.toString()); } @Override public void onError (Throwable throwable) { } @Override public void onComplete () { System.out.println("complete" ); } public void getNews (long n) { subscription.request(n); } }