CompletableFuture와 리액티브 프로그래밍 컨셉의 기초
Executor와 스레드 풀 자바에서는 ExecutorService를 사용해 스레드 풀을 구성할 수 있다. 구성한 스레드 풀에 작업을 제출함으로 써 작업을 수행시키는 방식이다.
장점
하드웨어 코어에 알맞은 풀을 구성해 병렬성을 만족 시킬 수 있다.
태스크 큐의 크기, 정책, 우선순위 등을 구성할 수 있다.
단점
k개의 스레드 풀을 구성하면 작업은 동시에 최대 k개만 수행된다.
스레드가 종료되지 않는다면 어플리케이션은 정상적으로 종료될 수 없다.
스레드간의 레이스 컨디션 문제를 야기할 수 있다.
Future 형식의 API Future<T>를 통해 비동기 API를 사용할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 public class ExecutorServiceExample { public static void main (String[] args) throws ExecutionException, InterruptedException { int x = 1337 ; ExecutorService executorService = Executors.newFixedThreadPool(2 ); Future<Integer> y = executorService.submit(() -> fo(x)); Future<Integer> z = executorService.submit(() -> go(x)); System.out.println(y.get() + z.get()); executorService.shutdown(); } }
리액티브 형식 API 함수 파라미터에 콜백형식을 넣어 태스크가 완료되면 return문이 아닌 람다 내에서 이를 호출한다.
리액티브 형식은 결과가 아닌 이벤트에 반응하도록 설계함으로서 예제가 적절하지는 않다.
println이 모든 결과가 반영되기 전에 호출되는데, 적절한 lock을 통해 조정할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class CallbackStyleExample { public static void main (String[] args) { int x = 1337 ; Result result = new Result (); f(x, (int y) -> { result.left = y; System.out.println("f : " + (result.left + result.right)); }); g(x, (int z) -> { result.right = z; System.out.println("g : " + (result.left + result.right)); }); } private static class Result { private int left; private int right; } private static void f (int x, IntConsumer dealWithResult) { dealWithResult.accept(Functions.f(x)); } private static void g (int x, IntConsumer dealWithResult) { dealWithResult.accept(Functions.g(x)); } }
적절하지 못한 Thread.sleep Thread.sleep을 통해 태스크를 지연시키는 방법은 자원을 점유하면서 block 상태를 유지한다. 이는 다른 태스크나 스레드가 cpu 자원을 점유하고 작업을 수행하는 것을 방해한다. 일정 시간 뒤에 다른 태스크를 수행해야 하는 경우에는 ScheduledExecutorService를 통해 태스크를 예약 수행 할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class ScheduledExecutorServiceExample { public static void main (String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1 ); work1(); scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10 , TimeUnit.SECONDS); scheduledExecutorService.shutdown(); } public static void work1 () { System.out.println("Hello from Work1!" ); } public static void work2 () { System.out.println("Hello from Work2!" ); } }
CompletableFuture와 콤비네이터를 사용한 동시성 태스크 a와 b가 있고 두 태스크의 결과를 합쳐 작업을 마무리하는 코드가 있다고 하자.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class CFComplete { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); int x = 1337 ; CompletableFuture<Integer> a = new CompletableFuture <>(); executorService.submit(() -> a.complete(f(x))); int b = g(x); System.out.println(a.get() + b); executorService.shutdown(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CFCombine { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); int x = 1337 ; CompletableFuture<Integer> a = new CompletableFuture <>(); CompletableFuture<Integer> b = new CompletableFuture <>(); CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z); executorService.submit(() -> a.complete(f(x))); executorService.submit(() -> b.complete(g(x))); System.out.println(c.get()); executorService.shutdown(); } }
Pub-Sub 모델 Pub-Sub 모델의 개념은 다음과 같다.
구독자가 구독할 수 있는 발행자
구독자와 발행자의 연결(구독)
구독을 이용한 메세지 혹은 이벤트를 전송
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 public class Chapter15 { public static void main (String[] args) { Publisher<String> fox = new Publisher <>("FOX NEWS" ); Publisher<String> cnn = new Publisher <>("CNN NEWS" ); Subscriber<String> sub1 = new Subscriber <>(); Subscriber<String> sub2 = new Subscriber <>(); Subscriber<String> sub3 = new Subscriber <>(); fox.subscribe(sub1); fox.subscribe(sub2); cnn.subscribe(sub3); fox.addHeadLine("First Headline On Fox" ); fox.addHeadLine("Second Headline On Fox" ); fox.addHeadLine("Third Headline On Fox" ); cnn.addHeadLine("First Headline On CNN" ); sub1.getNews(3 ); sub2.getNews(2 ); sub3.getNews(1 ); } } public class Publisher <T> implements Flow .Publisher<T> { private List<T> headLines = new ArrayList <>(); private String news; public Publisher (String news) { this .news = news; } @Override public void subscribe (Flow.Subscriber<? super T> subscriber) { Subscription<T> subscription = new Subscription <>(this , (Subscriber<T>) subscriber); subscriber.onSubscribe(subscription); } public List<T> recentNews (int n) { return headLines.subList(Math.max(headLines.size() - n, 0 ), headLines.size()); } public void addHeadLine (T headLine) { headLines.add(headLine); } } public class Subscription <T> implements Flow .Subscription { private Publisher<T> publisher; private Subscriber<T> subscriber; public Subscription (Publisher<T> publisher, Subscriber<T> subscriber) { this .publisher = publisher; this .subscriber = subscriber; } @Override public void request (long n) { List<T> result = publisher.recentNews(Long.valueOf(n).intValue()); result.forEach(subscriber::onNext); subscriber.onComplete(); } @Override public void cancel () { } } public class Subscriber <T> implements Flow .Subscriber<T> { private Flow.Subscription subscription; @Override public void onSubscribe (Flow.Subscription subscription) { this .subscription = subscription; } @Override public void onNext (T item) { System.out.println(item.toString()); } @Override public void onError (Throwable throwable) { } @Override public void onComplete () { System.out.println("complete" ); } public void getNews (long n) { subscription.request(n); } }