[Modern Java] CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

CompletableFuture와 리액티브 프로그래밍 컨셉의 기초

Executor와 스레드 풀

자바에서는 ExecutorService를 사용해 스레드 풀을 구성할 수 있다. 구성한 스레드 풀에 작업을 제출함으로 써 작업을 수행시키는 방식이다.

  • 장점
    • 하드웨어 코어에 알맞은 풀을 구성해 병렬성을 만족 시킬 수 있다.
    • 태스크 큐의 크기, 정책, 우선순위 등을 구성할 수 있다.
  • 단점
    • k개의 스레드 풀을 구성하면 작업은 동시에 최대 k개만 수행된다.
    • 스레드가 종료되지 않는다면 어플리케이션은 정상적으로 종료될 수 없다.
    • 스레드간의 레이스 컨디션 문제를 야기할 수 있다.

Future 형식의 API

Future<T>를 통해 비동기 API를 사용할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ExecutorServiceExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int x = 1337;

ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<Integer> y = executorService.submit(() -> fo(x)); //제출 즉시 task는 평가됨
Future<Integer> z = executorService.submit(() -> go(x));
System.out.println(y.get() + z.get());

executorService.shutdown();
}

}

리액티브 형식 API

함수 파라미터에 콜백형식을 넣어 태스크가 완료되면 return문이 아닌 람다 내에서 이를 호출한다.

  • 리액티브 형식은 결과가 아닌 이벤트에 반응하도록 설계함으로서 예제가 적절하지는 않다.
  • println이 모든 결과가 반영되기 전에 호출되는데, 적절한 lock을 통해 조정할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class CallbackStyleExample {
public static void main(String[] args) {

int x = 1337;
Result result = new Result();

f(x, (int y) -> {
result.left = y;
System.out.println("f : " + (result.left + result.right));
});

g(x, (int z) -> {
result.right = z;
System.out.println("g : " + (result.left + result.right));
});
}

private static class Result {
private int left;
private int right;
}

private static void f(int x, IntConsumer dealWithResult) {
dealWithResult.accept(Functions.f(x));
}

private static void g(int x, IntConsumer dealWithResult) {
dealWithResult.accept(Functions.g(x));
}

}

적절하지 못한 Thread.sleep

Thread.sleep을 통해 태스크를 지연시키는 방법은 자원을 점유하면서 block 상태를 유지한다. 이는 다른 태스크나 스레드가 cpu 자원을 점유하고 작업을 수행하는 것을 방해한다. 일정 시간 뒤에 다른 태스크를 수행해야 하는 경우에는 ScheduledExecutorService를 통해 태스크를 예약 수행 할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ScheduledExecutorServiceExample {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

work1();
scheduledExecutorService.schedule(ScheduledExecutorServiceExample::work2, 10, TimeUnit.SECONDS); //10초 후에 work2 태스크를 제출하도록 예약

scheduledExecutorService.shutdown();
}

public static void work1() {
System.out.println("Hello from Work1!");
}

public static void work2() {
System.out.println("Hello from Work2!");
}

}

CompletableFuture와 콤비네이터를 사용한 동시성

태스크 a와 b가 있고 두 태스크의 결과를 합쳐 작업을 마무리하는 코드가 있다고 하자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CFComplete {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;

CompletableFuture<Integer> a = new CompletableFuture<>();
executorService.submit(() -> a.complete(f(x)));
int b = g(x);

//f(x)나 g(x)가 끝나지 않는다면 자원을 낭비하며 대기해야 함
System.out.println(a.get() + b);

executorService.shutdown();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class CFCombine {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
int x = 1337;

CompletableFuture<Integer> a = new CompletableFuture<>();
CompletableFuture<Integer> b = new CompletableFuture<>();
//a와 b가 정해지진 않았지만 a와 b의 결과를 합친 c라는 작업을 만듬
//c 작업은 스레드 풀에 존재하는 스레드에서 수행됨
CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z);
executorService.submit(() -> a.complete(f(x)));
executorService.submit(() -> b.complete(g(x)));

System.out.println(c.get());
executorService.shutdown();
}
}

Pub-Sub 모델

Pub-Sub 모델의 개념은 다음과 같다.

  • 구독자가 구독할 수 있는 발행자
  • 구독자와 발행자의 연결(구독)
  • 구독을 이용한 메세지 혹은 이벤트를 전송

Flow의 흐름 구조

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public class Chapter15 {
public static void main(String[] args) {
Publisher<String> fox = new Publisher<>("FOX NEWS");
Publisher<String> cnn = new Publisher<>("CNN NEWS");

Subscriber<String> sub1 = new Subscriber<>();
Subscriber<String> sub2 = new Subscriber<>();
Subscriber<String> sub3 = new Subscriber<>();

fox.subscribe(sub1);
fox.subscribe(sub2);
cnn.subscribe(sub3);

fox.addHeadLine("First Headline On Fox");
fox.addHeadLine("Second Headline On Fox");
fox.addHeadLine("Third Headline On Fox");
cnn.addHeadLine("First Headline On CNN");

sub1.getNews(3);
sub2.getNews(2);
sub3.getNews(1);
}
}

public class Publisher<T> implements Flow.Publisher<T> {
private List<T> headLines = new ArrayList<>();
private String news;

public Publisher(String news) {
this.news = news;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
Subscription<T> subscription = new Subscription<>(this, (Subscriber<T>) subscriber);
subscriber.onSubscribe(subscription);
}

public List<T> recentNews(int n) {
return headLines.subList(Math.max(headLines.size() - n, 0), headLines.size());
}

public void addHeadLine(T headLine) {
headLines.add(headLine);
}
}

public class Subscription<T> implements Flow.Subscription {
private Publisher<T> publisher;
private Subscriber<T> subscriber;

public Subscription(Publisher<T> publisher, Subscriber<T> subscriber) {
this.publisher = publisher;
this.subscriber = subscriber;
}

@Override
public void request(long n) {
List<T> result = publisher.recentNews(Long.valueOf(n).intValue());
result.forEach(subscriber::onNext);
subscriber.onComplete();
}

@Override
public void cancel() {

}
}

public class Subscriber<T> implements Flow.Subscriber<T> {
private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}

@Override
public void onNext(T item) {
System.out.println(item.toString());
}

@Override
public void onError(Throwable throwable) {

}

@Override
public void onComplete() {
System.out.println("complete");
}

public void getNews(long n) {
subscription.request(n);
}
}
Author: Song Hayoung
Link: https://songhayoung.github.io/2021/05/19/Languages/Modern%20Java/chapter15/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.