Skip to content

Commit 4e1a16c

Browse files
committed
CompletableFuture added and minor fixes
1 parent 195321b commit 4e1a16c

File tree

4 files changed

+302
-84
lines changed

4 files changed

+302
-84
lines changed

README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ Concurrency Patterns and features found in Java, through multithreaded programmi
1111
* ReadWrite
1212
* Synchronizers
1313
* Latches
14-
* Future Tasks
1514
* Semaphores
1615
* Barriers
1716
* Synchronized Collections
@@ -25,6 +24,9 @@ Concurrency Patterns and features found in Java, through multithreaded programmi
2524
* Single Thread Pool
2625
* Scheduled Thread Pool
2726
* Atomics
27+
* Futures
28+
* FutureTask
29+
* CompletableFuture
2830
* Java Memory Model
2931

3032
## Patterns
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
package br.com.leonardoz.features.futures;
2+
3+
import java.util.Random;
4+
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.function.Supplier;
10+
11+
/**
12+
*
13+
* CompletableFuture is a Future that may be manually completed. It combines a
14+
* Future interface with the CompletionState interface, supporting dependent
15+
* actions that trigger upon its completion, similarly to a callback.
16+
*
17+
* Important: Specify an Executor for async methods when available. All async
18+
* methods without an explicit Executor argument are performed using the
19+
* ForkJoinPool.commonPool();
20+
*
21+
* Important 2: Mostly of the CompletableFuture methods returns a new
22+
* CompletableFuture.
23+
*
24+
* == Quick terminology guide ==
25+
*
26+
* = Async =
27+
*
28+
* xxxAsync(...); // Async method executed in the ForkJoinPool.commonPool();
29+
*
30+
* xxxAsync(..., Executor executor); // Executed in the specified Executor, good
31+
* for Java EE.
32+
*
33+
* = supply x run =
34+
*
35+
* supplyAsync(Supplier<U> supplier); // will complete asynchronously by calling
36+
* supplier.
37+
*
38+
* runAsync(Runnable runnable); // will complete after the runnable executions;
39+
*
40+
* = thenApply x thenAccept x thenRun
41+
*
42+
* thenApply: transforms a value to another type;
43+
*
44+
* thenAccept: accepts a consumer to the result value;
45+
*
46+
* thenRun: accepts a Runnable to be executed after the result is ready;
47+
*
48+
*
49+
* == Quick API guide ==
50+
*
51+
* = Creating =
52+
*
53+
* new CompletableFuture<>();
54+
*
55+
* CompletableFuture.supplyAsync(Supplier<U>supplier);
56+
*
57+
* CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor);
58+
*
59+
* CompletableFuture.runAsync(Runnable runnable);
60+
*
61+
* CompletableFuture.runAsync(Runnable runnable, Executor executor);
62+
*
63+
*
64+
* = Mapping values =
65+
*
66+
* completableFuture.thenApply(Function<? super T,? extends U> fn);
67+
*
68+
* completableFuture.thenApplyAsync(Function<? super T,? extends U> fn);
69+
*
70+
* completableFuture.thenApplyAsync(Function<? super T,? extends U> fn, Executor
71+
* executor);
72+
*
73+
*
74+
* = Callback on completion =
75+
*
76+
* completableFuture.thenAccept(Consumer<? super T> block);
77+
*
78+
* completableFuture.thenRun(Runnable action);
79+
*
80+
*
81+
* = Error handling =
82+
*
83+
* completableFuture.exceptionally(ex -> ex.getMessage());
84+
*
85+
* completableFuture.handle((value, ex) -> {if value != null... else {}})
86+
*
87+
*
88+
* = Pipeline =
89+
*
90+
* Chain one future dependent on the other
91+
*
92+
* completableFuture.thenCompose(Function<? super T,CompletableFuture<U>> fn);
93+
* // flatMap
94+
*
95+
*
96+
* = Mapping values from Two Futures =
97+
*
98+
* completableFuture.thenCombine(CompletableFuture<? extends U> other,
99+
* BiFunction<? super T,? super U,? extends V> fn) ex.:
100+
*
101+
*
102+
* = Waiting for first CompletableFuture to complete =
103+
*
104+
* Two services, one fast and the other slow. Fastest always wins.
105+
*
106+
* completableFuture.acceptEither(CompletableFuture<? extends T> other,
107+
* Consumer<? super T> block);
108+
*
109+
*
110+
* = Transforming first completed =
111+
*
112+
* completableFuture.applyToEither(CompletableFuture<? extends T> other,
113+
* Function<? super T,U> fn)
114+
*
115+
*
116+
* = Combining multiple CompletableFuture together =
117+
*
118+
* CompletableFuture.allOf(CompletableFuture<?>... cfs)
119+
*
120+
* CompletableFuture.anyOf(CompletableFuture<?>... cfs)
121+
*
122+
*
123+
* = Get-Complete value =
124+
*
125+
* CompletableFuture.get() // block
126+
*
127+
* CompletableFuture.complete() // complete future's lifecycle
128+
*
129+
* CompletableFuture.obtrudeValue() // ignores complete
130+
*
131+
* CompletableFuture.join() // same as get
132+
*
133+
* CompletableFuture.getNow(valueIfAbsent) // immediatly return
134+
*
135+
* CompletableFuture.completeExceptionally() // completes throwing a exception
136+
*
137+
* CompletableFuture.completeExceptionally(ex) // completes with a exception
138+
*
139+
*/
140+
public class UsingCompletableFuture {
141+
142+
public static void main(String[] args) throws InterruptedException, ExecutionException {
143+
Random random = new Random();
144+
ExecutorService executor = Executors.newCachedThreadPool();
145+
// Creating
146+
CompletableFuture<Integer> randomNum = CompletableFuture.supplyAsync(() -> random.nextInt(140), executor);
147+
148+
// Mapping
149+
String strNum = randomNum.thenApplyAsync(n -> Integer.toString(n), executor).get();
150+
System.out.println("Executed " + strNum);
151+
152+
// Combining
153+
CompletableFuture<Integer> anotherNum = CompletableFuture.supplyAsync(() -> random.nextInt(140), executor);
154+
155+
// accept both and do something
156+
randomNum.thenAcceptBoth(anotherNum, (num1, num2) -> {
157+
System.out.println("Num1 is: " + num1);
158+
System.out.println("Num2 is: " + num2);
159+
});
160+
161+
// combine both into a new type/value
162+
CompletableFuture<Integer> mappedAndCombined = randomNum.thenCombine(anotherNum, (num1, num2) -> num1 + num2);
163+
164+
// retrieving value
165+
Integer value = mappedAndCombined.get();
166+
System.out.println("Sum " + value);
167+
168+
// Indefined time task
169+
Supplier<Double> ind = () -> {
170+
try {
171+
Thread.sleep(random.nextInt(1000));
172+
} catch (InterruptedException e) {
173+
e.printStackTrace();
174+
}
175+
return random.nextDouble();
176+
};
177+
178+
// Run after executed
179+
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(ind);
180+
CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(ind);
181+
CompletableFuture<Double> f3 = CompletableFuture.supplyAsync(ind);
182+
CompletableFuture<Double> f4 = CompletableFuture.supplyAsync(ind);
183+
CompletableFuture.anyOf(f1, f2, f3, f4).thenRun(() -> System.out.println("Completed"));
184+
185+
// Fastest result will be delivered
186+
// Indefined time task - static value
187+
Supplier<String> getVal = () -> {
188+
try {
189+
Thread.sleep(random.nextInt(1000));
190+
} catch (InterruptedException e) {
191+
e.printStackTrace();
192+
}
193+
return "First";
194+
};
195+
Supplier<String> getVal2 = () -> {
196+
try {
197+
Thread.sleep(random.nextInt(1000));
198+
} catch (InterruptedException e) {
199+
e.printStackTrace();
200+
}
201+
return "Second";
202+
};
203+
CompletableFuture
204+
.supplyAsync(getVal)
205+
.acceptEitherAsync(CompletableFuture.supplyAsync(getVal2, executor), (firstToBeReady) -> System.out.println(firstToBeReady), executor);
206+
executor.shutdown();
207+
executor.awaitTermination(3000, TimeUnit.SECONDS);
208+
}
209+
210+
}

src/main/java/br/com/leonardoz/features/synchronizers/UsingFutureTasks.java src/main/java/br/com/leonardoz/features/futures/UsingFutureTasks.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package br.com.leonardoz.features.synchronizers;
1+
package br.com.leonardoz.features.futures;
22

33
import java.util.Random;
44
import java.util.concurrent.Callable;
@@ -10,6 +10,9 @@
1010
import java.util.concurrent.TimeoutException;
1111

1212
/**
13+
* FutureTask<V> represents an asynchronous computation. It has methods to check
14+
* if the computation is completed and to cancel it if needed.
15+
*
1316
* Used for computing long running tasks/IO tasks.
1417
*
1518
* Act as a latch and has three states: waiting for run, running or completed.

0 commit comments

Comments
 (0)