diff --git a/README.md b/README.md
index 3db4ee8..e7d1400 100644
--- a/README.md
+++ b/README.md
@@ -3,52 +3,58 @@
Concurrency Patterns and features found in Java, through multithreaded programming.
## Features:
-* Threads and Runnables
-* Locks
- * Intrinsic
+* [Threads and Runnables](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java)
+* [Locks](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/locks)
+ * [Intrinsic](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java)
* Explicit
- * Reentrant
- * ReadWrite
-* Synchronizers
- * Latches
- * Semaphores
- * Barriers
-* Synchronized Collections
-* Concurrent Collections
- * CopyOnWriteArrayList
- * ConcurrentHashMap
- * Blocking Queue
-* Executors
- * Fixed Thread Pool
- * Cached Thread Pool
- * Single Thread Pool
- * Scheduled Thread Pool
-* Atomics
-* Futures
- * FutureTask
- * CompletableFuture
-* Java Memory Model
+ * [Reentrant](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java)
+ * [ReadWrite](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java)
+* [Synchronizers](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/synchronizers)
+ * [Latches](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java)
+ * [Semaphores](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java)
+ * [Barriers](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java)
+* [Synchronized Collections](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java)
+* [Concurrent Collections](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java)
+ * [CopyOnWriteArrayList](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L89)
+ * [ConcurrentHashMap](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L40)
+ * [Blocking Queue](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L141)
+* [Executors](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java)
+ * [Fixed Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L94)
+ * [Cached Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L65)
+ * [Single Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L51)
+ * [Scheduled Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L122)
+ * [Single Thread Scheduled Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L139)
+ * [Work-Stealing Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L156)
+* [Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java)
+* [Futures](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/futures)
+ * [FutureTask](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingFutureTasks.java)
+ * [CompletableFuture](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java)
+* [Fork/Join Framework](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java)
+* [Parallel Streams](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java)
+* [Java Memory Model](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java)
## Patterns
-* Protect Shared State
-* Lock Split
-* Protecting Composed Actions
-* Fixed Lock Ordering
-* Thread Local Confinement
-* Immutable Object
-* Safe Instantiation
-* Safe Publication
-* Interruption
-* Resource Pool
-* Condition Queues (wait-notify / await-signal)
-* Background Task Executor
-* Task Cancel
-* Producer-Consumer
-* Task Convergence
-* Non-Blocking with Atomics
-* Controlled Concurrent Initialization
+* [Protect Shared State](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state)
+* [Atomic Compound Actions](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/compound_actions)
+* [Lock Split](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/lock_split)
+* [Fixed Lock Ordering](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering)
+* [Thread Local Confinement](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement)
+* [Immutable Object](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object)
+* [Safe Lazy Initialization](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/thread_safe/initialization/SafeInitializationHolder.java)
+* [Safe Publishing](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/thread_safe/publishing/SafePublishing.java)
+* [Resource Pool](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/resource_pool)
+* [Condition Queues](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/condition_queues)
+ * [wait-notify](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java)
+ * [await-signal](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java)
+* [Background Task Executor](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java)
+* [Task Cancel](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/task_cancel)
+* [Producer-Consumer](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java)
+* [Task Convergence](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java)
+* [Non-Blocking with Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/non_blocking)
+* [Controlled Concurrent Initialization](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java)
+* [Parallel Divide and Conquer](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/divideconquer)
## About
Patterns and Algorithms inspired by the Java Concurrency in Practice book.
-
\ No newline at end of file
+
diff --git a/pom.xml b/pom.xml
index 0052648..8f27a3d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,8 +14,8 @@
3.7.0
- 1.8
- 1.8
+ 11
+ 11
diff --git a/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java b/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java
index e62cbae..97ff747 100644
--- a/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java
+++ b/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java
@@ -1,6 +1,5 @@
package br.com.leonardoz.features.atomics;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -47,13 +46,13 @@ public int get() {
}
public static void main(String[] args) throws InterruptedException {
- AtomicCounter counter = new AtomicCounter();
- ExecutorService ctp = Executors.newCachedThreadPool();
+ var counter = new AtomicCounter();
+ var cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10_000; i++) {
- ctp.execute(() -> counter.increment());
+ cachedThreadPool.execute(() -> counter.increment());
}
- ctp.shutdown();
- ctp.awaitTermination(4000, TimeUnit.SECONDS);
+ cachedThreadPool.shutdown();
+ cachedThreadPool.awaitTermination(4000, TimeUnit.SECONDS);
System.out.println("Result shound be 10000: Actual result is: " + counter.get());
}
}
diff --git a/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java b/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java
index cfab0f1..5073cac 100644
--- a/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java
+++ b/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java
@@ -2,10 +2,8 @@
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -38,10 +36,10 @@ public class UsingConcurrentCollections {
* - Just a few Writers can modify it.
*/
public static void usingConcurrentHashMap() {
- ExecutorService executor = Executors.newCachedThreadPool();
System.out.println("=== ConcurrentHashMap ===");
- Random random = new Random();
- ConcurrentHashMap valuesPerUuid = new ConcurrentHashMap<>();
+ var executor = Executors.newCachedThreadPool();
+ var random = new Random();
+ var valuesPerUuid = new ConcurrentHashMap();
// atomic operations
valuesPerUuid.putIfAbsent(UUID.randomUUID(), random.nextInt(100));
@@ -87,28 +85,28 @@ public static void usingConcurrentHashMap() {
*
*/
public static void usingCopyOnWriteArrayList() {
- ExecutorService executor = Executors.newCachedThreadPool();
System.out.println("=== CopyOnWriteArrayList ===");
- Random random = new Random();
+ var executor = Executors.newCachedThreadPool();
+ var random = new Random();
// No ConcurrentModificationException
- CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
+ var copyOnWriteArrayList = new CopyOnWriteArrayList();
for (int i = 0; i < 100; i++) {
if (i % 8 == 0) {
// write
executor.execute(() -> {
- Integer value = random.nextInt(10);
+ var value = random.nextInt(10);
System.err.println("Added " + value);
copyOnWriteArrayList.add(value);
});
} else {
// read
executor.execute(() -> {
- StringBuilder sb = new StringBuilder();
- for (Integer vv : copyOnWriteArrayList) {
- sb.append(vv + " ");
+ var builder = new StringBuilder();
+ for (var value : copyOnWriteArrayList) {
+ builder.append(value + " ");
}
- System.out.println("Reading " + sb.toString());
+ System.out.println("Reading " + builder.toString());
});
}
}
@@ -142,7 +140,7 @@ public static void usingBlockingQueue() {
System.out.println("=== BlockingQueue ===");
// Bounded UUID queue
- BlockingQueue uuidQueue = new LinkedBlockingQueue<>(10);
+ var uuidQueue = new LinkedBlockingQueue(10);
System.out.println("Queue will execute for 10s");
@@ -150,18 +148,20 @@ public static void usingBlockingQueue() {
Runnable runConsumer = () -> {
while (!Thread.currentThread().isInterrupted()) {
try {
- UUID uuid = uuidQueue.take();
+ var uuid = uuidQueue.take();
System.out.println("Consumed: " + uuid + " by " + Thread.currentThread().getName());
} catch (InterruptedException e) {
// interrupted pattern
+ // InterruptedException makes isInterrupted returns false
+ Thread.currentThread().interrupt();
System.err.println("Consumer Finished");
}
}
};
- Thread consumer1 = new Thread(runConsumer);
+ var consumer1 = new Thread(runConsumer);
consumer1.start();
- Thread consumer2 = new Thread(runConsumer);
+ var consumer2 = new Thread(runConsumer);
consumer2.start();
// Producer Thread
@@ -181,12 +181,12 @@ public static void usingBlockingQueue() {
}
};
- // Multiple producers - Examples using simple threads this time.
- Thread producer1 = new Thread(runProducer);
+ // Multiple producers - Examples using simple threads this time.
+ var producer1 = new Thread(runProducer);
producer1.start();
- Thread producer2 = new Thread(runProducer);
+ var producer2 = new Thread(runProducer);
producer2.start();
- Thread producer3 = new Thread(runProducer);
+ var producer3 = new Thread(runProducer);
producer3.start();
try {
diff --git a/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java b/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java
index 0b89058..fb24ce6 100644
--- a/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java
+++ b/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java
@@ -1,7 +1,6 @@
package br.com.leonardoz.features.collections;
import java.util.Vector;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -30,7 +29,7 @@ public class UsingSynchronizedCollections {
*/
public static void insertIfAbsent(Vector list, Long value) {
synchronized (list) {
- boolean contains = list.contains(value);
+ var contains = list.contains(value);
if (!contains) {
list.add(value);
System.out.println("Value added: " + value);
@@ -43,7 +42,7 @@ public static void insertIfAbsent(Vector list, Long value) {
* results
*/
public static void insertIfAbsentUnsafe(Vector list, Long value) {
- boolean contains = list.contains(value);
+ var contains = list.contains(value);
if (!contains) {
list.add(value);
System.out.println("Value added: " + value);
@@ -51,13 +50,13 @@ public static void insertIfAbsentUnsafe(Vector list, Long value) {
}
public static void main(String[] args) throws InterruptedException {
- ExecutorService executor = Executors.newCachedThreadPool();
+ var executor = Executors.newCachedThreadPool();
// Synchronized - Vector
- Vector vec = new Vector<>();
+ var vector = new Vector();
Runnable insertIfAbsent = () -> {
long millis = System.currentTimeMillis() / 1000;
- insertIfAbsent(vec, millis);
+ insertIfAbsent(vector, millis);
};
for (int i = 0; i < 10001; i++) {
executor.execute(insertIfAbsent);
diff --git a/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java b/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java
index 4294de4..93c4bff 100644
--- a/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java
+++ b/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java
@@ -1,13 +1,11 @@
package br.com.leonardoz.features.executors;
import java.util.LinkedList;
-import java.util.List;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -15,7 +13,7 @@
*
* Executors help us to decouple task submission from execution.
*
- * We have 4 types of executors:
+ * We have 6 types of executors:
*
* - Single Thread Executor: Uses a single worker to process tasks.
*
@@ -27,6 +25,13 @@
*
* - Scheduled Thread Pool: Bounded thread limit, used for delayed tasks.
*
+ * - Single-Thread Scheduled Pool: Similar to the scheduled thread pool, but
+ * single-threaded, with only one active task at the time.
+ *
+ * - Work-Stealing Thread Pool: Based on Fork/Join Framework, applies the
+ * work-stealing algorithm for balancing tasks, with available processors as a
+ * paralellism level.
+ *
* And 2 types of tasks:
*
* - execute: Executes without giving feedback. Fire-and-forget.
@@ -45,7 +50,7 @@ public class UsingExecutors {
public static void usingSingleThreadExecutor() {
System.out.println("=== SingleThreadExecutor ===");
- ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
+ var singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(() -> System.out.println("Print this."));
singleThreadExecutor.execute(() -> System.out.println("and this one to."));
singleThreadExecutor.shutdown();
@@ -59,15 +64,15 @@ public static void usingSingleThreadExecutor() {
public static void usingCachedThreadPool() {
System.out.println("=== CachedThreadPool ===");
- ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
- List> uuids = new LinkedList<>();
+ var cachedThreadPool = Executors.newCachedThreadPool();
+ var uuids = new LinkedList>();
for (int i = 0; i < 10; i++) {
- Future submitted = cachedThreadPool.submit(() -> {
- UUID randomUUID = UUID.randomUUID();
+ var submittedUUID = cachedThreadPool.submit(() -> {
+ var randomUUID = UUID.randomUUID();
System.out.println("UUID " + randomUUID + " from " + Thread.currentThread().getName());
return randomUUID;
});
- uuids.add(submitted);
+ uuids.add(submittedUUID);
}
cachedThreadPool.execute(() -> uuids.forEach((f) -> {
try {
@@ -88,11 +93,11 @@ public static void usingCachedThreadPool() {
public static void usingFixedThreadPool() {
System.out.println("=== FixedThreadPool ===");
- ExecutorService fixedPool = Executors.newFixedThreadPool(4);
- List> uuids = new LinkedList<>();
+ var fixedPool = Executors.newFixedThreadPool(4);
+ var uuids = new LinkedList>();
for (int i = 0; i < 20; i++) {
- Future submitted = fixedPool.submit(() -> {
- UUID randomUUID = UUID.randomUUID();
+ var submitted = fixedPool.submit(() -> {
+ var randomUUID = UUID.randomUUID();
System.out.println("UUID " + randomUUID + " from " + Thread.currentThread().getName());
return randomUUID;
});
@@ -116,11 +121,11 @@ public static void usingFixedThreadPool() {
public static void usingScheduledThreadPool() {
System.out.println("=== ScheduledThreadPool ===");
- ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4);
- scheduledThreadPool.scheduleAtFixedRate(
- () -> System.out.println("Print every 2s"), 0, 2, TimeUnit.SECONDS);
- scheduledThreadPool.scheduleWithFixedDelay(
- () -> System.out.println("Print every 2s delay"), 0, 2, TimeUnit.SECONDS);
+ var scheduledThreadPool = Executors.newScheduledThreadPool(4);
+ scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("1) Print every 2s"), 0, 2, TimeUnit.SECONDS);
+ scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("2) Print every 2s"), 0, 2, TimeUnit.SECONDS);
+ scheduledThreadPool.scheduleWithFixedDelay(() -> System.out.println("3) Print every 2s delay"), 0, 2,
+ TimeUnit.SECONDS);
try {
scheduledThreadPool.awaitTermination(6, TimeUnit.SECONDS);
@@ -128,12 +133,66 @@ public static void usingScheduledThreadPool() {
} catch (InterruptedException e) {
e.printStackTrace();
}
+ System.out.println("\n\n");
+ }
+
+ public static void usingSingleTreadScheduledExecutor() {
+ System.out.println("=== SingleThreadScheduledThreadPool ===");
+ var singleThreadScheduler = Executors.newSingleThreadScheduledExecutor();
+ singleThreadScheduler.scheduleAtFixedRate(() -> System.out.println("1) Print every 2s"), 0, 2, TimeUnit.SECONDS);
+ singleThreadScheduler.scheduleWithFixedDelay(() -> System.out.println("2) Print every 2s delay"), 0, 2,
+ TimeUnit.SECONDS);
+
+ try {
+ singleThreadScheduler.awaitTermination(6, TimeUnit.SECONDS);
+ singleThreadScheduler.shutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ System.out.println("\n\n");
+
+ }
+
+ public static void usingWorkStealingThreadPool() {
+ System.out.println("=== WorkStealingThreadPool ===");
+ var workStealingPool = Executors.newWorkStealingPool();
+
+ workStealingPool.execute(() -> System.out.println("Prints normally"));
+
+ Callable generatesUUID = UUID::randomUUID;
+ var severalUUIDsTasks = new LinkedList>();
+ for (int i = 0; i < 20; i++) {
+ severalUUIDsTasks.add(generatesUUID);
+ }
+
+ try {
+ var futureUUIDs = workStealingPool.invokeAll(severalUUIDsTasks);
+ for (var future : futureUUIDs) {
+ if (future.isDone()) {
+ var uuid = future.get();
+ System.out.println("New UUID :" + uuid);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ try {
+ workStealingPool.awaitTermination(6, TimeUnit.SECONDS);
+ workStealingPool.shutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ System.out.println("\n\n");
}
+
+
public static void main(String[] args) {
usingSingleThreadExecutor();
usingCachedThreadPool();
usingFixedThreadPool();
usingScheduledThreadPool();
+ usingSingleTreadScheduledExecutor();
+ usingWorkStealingThreadPool();
}
}
diff --git a/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java b/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java
new file mode 100644
index 0000000..f61eea7
--- /dev/null
+++ b/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java
@@ -0,0 +1,253 @@
+package br.com.leonardoz.features.forkjoin;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.math.BigInteger;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveAction;
+import java.util.concurrent.RecursiveTask;
+
+/**
+ *
+ * The Fork/Join Framework
+ *
+ * Parallelism is the execution of multiple tasks simultaneously. Fork Join
+ * Framework helps with the execution of those tasks in parallel.
+ *
+ * Why?
+ *
+ * The Fork/Join approach speeds up the execution of a task that can be split
+ * into subtasks/small tasks, be executing them in parallel and combining those
+ * results.
+ *
+ * Limitations
+ *
+ * One requirement for using the Fork/Join Framework is that all of the Subtasks
+ * must be "completable and independent" of each other to be truly parallel, so
+ * not every problem can be solved using this method. In general, the ForkJoin Framework
+ * is to be used by CPU-intensive computations, not IO bound computations, due to the
+ * long wait periods that could happen.
+ *
+ * How it works
+ *
+ * It uses a divide and conquer approach, dividing a major task into minor
+ * subtasks recursively (Fork), until the division limit is reached and the
+ * tasks can be solved, to be later combined (Join).
+ *
+ * For the execution of those tasks in parallel, the framework will use a thread
+ * pool, which has, be default, the same size of the number of processors
+ * available for the JVM.
+ *
+ * A thread from the pool has it's own double ended queue, for the matter of
+ * storing all the tasks that are being executed/to be executed. The double
+ * ended queue nature enables inserts or deletes to both the head and last
+ * position of the queue.
+ *
+ * The work-stealing algorithm is the greatest functionality for the speed up
+ * aspect of the ForkJoin Framework. The algorithm balances the workload between
+ * threads, allowing the threads that doesn't have any task at the moment to
+ * "steal" from last position of a thread's queue that can't process his own
+ * last task at the moment. In theory, there will be more task being processed.
+ *
+ * Framework architecture overview
+ *
+ * - ForkJoinPool: Base class for the pools, used to balance tasks that can be
+ * "work-stealed".
+ *
+ * - ForkJoinTask: Represents a task to be executed in a ForkJoinPool.
+ *
+ * - RecursiveTask: Specialization of ForkJoinTask, holds a result.
+ *
+ * - RecursiveAction: Specialization of ForkJoinTask, just process something
+ * without yielding a result.
+ *
+ *
+ * Workflow
+ *
+ * The idea is that you can split bigger tasks into smaller ones, until that the work
+ * is small enough to be completed.
+ *
+ * the following algorithm describes how to use the ForkJoinFramework correctly.
+ *
+ * if (my task is small enough)
+ *
+ * complete my task
+ *
+ * else
+ * split my task into two small tasks
+ *
+ * execute both tasks and wait for the results
+ *
+ *
+ * Then do your work based on the result
+ *
+ *
+ */
+public class UsingForkJoinFramework {
+
+ /**
+ * Common Pool
+ *
+ * Default instance of a fork join pool in a Java app, used by
+ * CompletableFuture, and parallel streams. All threads used by the common pool
+ * can be reused, released and reinstated after some time. This approach reduces
+ * the resource consumption. It doesn't need to be closed/shutdown.
+ *
+ */
+ public ForkJoinPool getCommonPool() {
+ return ForkJoinPool.commonPool();
+ }
+
+ /**
+ * Customize ForkJoinPool
+ *
+ * Parallelism: Parallelism level, default is Runtime#availableProcessors
+ *
+ * ForkJoinWorkerThreadFactory: Factory used for creating threads for the pool.
+ *
+ * UncaughtExceptionHandler: handles worker threads that terminates due some
+ * "unrecoverable" problem.
+ *
+ * True-value AsyncMode: FIFO scheduling mode, used by tasks that are never
+ * joined, like event-oriented asynchronous tasks.
+ *
+ */
+ public ForkJoinPool customForkJoinPool(int parallelism,
+ ForkJoinPool.ForkJoinWorkerThreadFactory factory,
+ UncaughtExceptionHandler handler,
+ boolean asyncMode) {
+ return new ForkJoinPool(parallelism, factory, handler, asyncMode);
+ }
+
+ /**
+ *
+ * Tasks
+ *
+ * ForkJoinTask is the base type of a task. It represents a "lightweight
+ * thread", with the ForkJoinPool being it's scheduler.
+ *
+ * RecursiveTask: Task that returns a value, result of a computation.
+ *
+ * RecursiveAction: Task that doesn't returns a value.
+ *
+ * Both can be used to implement the workflow algorithm described in the
+ * Workflow section, with he aid of Fork and Join.
+ *
+ */
+
+ /**
+ * RecursiveTask
+ *
+ * Represents a result of a computation.
+ *
+ * In the example bellow, it follows the algorithm, partitioning the numbers
+ * list in half, using fork and join to control the task flow.
+ *
+ */
+ static class RecSumTask extends RecursiveTask {
+
+ private static final long serialVersionUID = 1L;
+ public static final int DIVIDE_AT = 500;
+
+ private List numbers;
+
+ public RecSumTask(List numbers) {
+ this.numbers = numbers;
+ }
+
+ @Override
+ protected BigInteger compute() {
+ var subTasks = new LinkedList();
+ if (numbers.size() < DIVIDE_AT) {
+ // directly
+ var subSum = BigInteger.ZERO;
+ for (Integer number : numbers) {
+ subSum = subSum.add(BigInteger.valueOf(number));
+ }
+ return subSum;
+ } else {
+ // Divide to conquer
+ var size = numbers.size();
+ var numbersLeft = numbers.subList(0, size / 2);
+ var numbersRight = numbers.subList(size / 2, size);
+
+ var recSumLeft = new RecSumTask(numbersLeft);
+ var recSumRight = new RecSumTask(numbersRight);
+
+ subTasks.add(recSumRight);
+ subTasks.add(recSumLeft);
+
+ // Fork Child Tasks
+ recSumLeft.fork();
+ recSumRight.fork();
+ }
+
+ var sum = BigInteger.ZERO;
+ for (var recSum : subTasks) {
+ // Join Child Tasks
+ sum = sum.add(recSum.join());
+ }
+ return sum;
+ }
+ }
+
+ public static void main(String[] args) {
+ // prepares dataset for the example
+ var numbers = new LinkedList();
+ for (int i = 0; i < 500_000; i++) {
+ numbers.add(i);
+ }
+
+ // Usage
+ var commonPool = ForkJoinPool.commonPool();
+ var task = new RecSumTask(numbers);
+ BigInteger result = commonPool.invoke(task);
+ System.out.println("Result is: " + result);
+ System.out.println("\n\n");
+ }
+
+ /**
+ * RecursiveTask
+ *
+ * Represents a result of a computation, resembles RecursiveTask, but without
+ * the return value.
+ *
+ */
+ static class ARecursiveAction extends RecursiveAction {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected void compute() {
+ // same pattern goes here
+ }
+
+ }
+
+ /**
+ * It's possible to extract informations about the pool's current state.
+ *
+ * Active thread count: Number of threads that are stealing or executing tasks.
+ *
+ * Pool size: Number of worker threads that are started but not terminated yet.
+ *
+ * Parallelism level: Equivalent to the number of available processors.
+ *
+ * Queue submitted tasks: Number of submitted tasks, but not executing. Steal
+ * count:
+ *
+ * Number of stealed tasks from a thread to another, useful for monitoring.
+ *
+ */
+ public static void debugPool(ForkJoinPool commonPool) {
+ System.out.println("Debuggin ForJoinPool");
+ System.out.println("Active Thread Count: " + commonPool.getActiveThreadCount());
+ System.out.println("Pool Size: " + commonPool.getPoolSize());
+ System.out.println("Parallelism level: " + commonPool.getParallelism());
+ System.out.println("Queue submitted tasks: " + commonPool.getQueuedSubmissionCount());
+ System.out.println("Steal count: " + commonPool.getStealCount());
+ System.out.println("\n");
+ }
+
+}
diff --git a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java
index 6841327..bbffd0f 100644
--- a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java
+++ b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java
@@ -3,7 +3,6 @@
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -11,7 +10,7 @@
/**
*
* CompletableFuture is a Future that may be manually completed. It combines a
- * Future interface with the CompletionState interface, supporting dependent
+ * Future interface with the CompletionStage interface, supporting dependent
* actions that trigger upon its completion, similarly to a callback.
*
* Important: Specify an Executor for async methods when available. All async
@@ -30,6 +29,7 @@
* xxxAsync(..., Executor executor); // Executed in the specified Executor, good
* for Java EE.
*
+ *
* = supply x run =
*
* supplyAsync(Supplier supplier); // will complete asynchronously by calling
@@ -37,6 +37,7 @@
*
* runAsync(Runnable runnable); // will complete after the runnable executions;
*
+ *
* = thenApply x thenAccept x thenRun
*
* thenApply: transforms a value to another type;
@@ -140,8 +141,8 @@
public class UsingCompletableFuture {
public static void main(String[] args) throws InterruptedException, ExecutionException {
- Random random = new Random();
- ExecutorService executor = Executors.newCachedThreadPool();
+ var random = new Random();
+ var executor = Executors.newCachedThreadPool();
// Creating
CompletableFuture randomNum = CompletableFuture.supplyAsync(() -> random.nextInt(140), executor);
@@ -165,8 +166,8 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
Integer value = mappedAndCombined.get();
System.out.println("Sum " + value);
- // Indefined time task
- Supplier ind = () -> {
+ // Undefined time task
+ Supplier randomDouble = () -> {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
@@ -176,14 +177,14 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
};
// Run after executed
- CompletableFuture f1 = CompletableFuture.supplyAsync(ind);
- CompletableFuture f2 = CompletableFuture.supplyAsync(ind);
- CompletableFuture f3 = CompletableFuture.supplyAsync(ind);
- CompletableFuture f4 = CompletableFuture.supplyAsync(ind);
+ CompletableFuture f1 = CompletableFuture.supplyAsync(randomDouble);
+ CompletableFuture f2 = CompletableFuture.supplyAsync(randomDouble);
+ CompletableFuture f3 = CompletableFuture.supplyAsync(randomDouble);
+ CompletableFuture f4 = CompletableFuture.supplyAsync(randomDouble);
CompletableFuture.anyOf(f1, f2, f3, f4).thenRun(() -> System.out.println("Completed"));
// Fastest result will be delivered
- // Indefined time task - static value
+ // Undefined time task - static value
Supplier getVal = () -> {
try {
Thread.sleep(random.nextInt(1000));
diff --git a/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java b/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java
new file mode 100644
index 0000000..2f8bef2
--- /dev/null
+++ b/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java
@@ -0,0 +1,5 @@
+package br.com.leonardoz.features.java_memory_model;
+
+public class UsingReactiveStreams {
+
+}
diff --git a/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java b/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java
index 08f09ed..d3e309d 100644
--- a/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java
+++ b/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java
@@ -13,8 +13,8 @@
* == Reordering ==
*
* When data is shared across threads, there's not so much guarantees on the
- * execution order by the threads, and because of the imprevisibility, it's
- * import to identify shared data and to use a proper synchronization mechanism
+ * execution order by the threads, and because of the unpredictability, it's
+ * important to identify shared data and to use a proper synchronization mechanism
* to ensure order and to keep the visibility guarantees provided by the JMM.
*
* == Happens-Before ==
diff --git a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java
index 7fa3fe8..d531c65 100644
--- a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java
+++ b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java
@@ -2,7 +2,6 @@
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -65,8 +64,8 @@ public void writeContent(String newContentToAppend) {
}
public static void main(String[] args) {
- ExecutorService executor = Executors.newCachedThreadPool();
- UsingExplicitReadWriteLocks uerwl = new UsingExplicitReadWriteLocks();
+ var executor = Executors.newCachedThreadPool();
+ var self = new UsingExplicitReadWriteLocks();
// Readers
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
@@ -76,13 +75,13 @@ public static void main(String[] args) {
} catch (InterruptedException e) {
e.printStackTrace();
}
- System.out.println(uerwl.showContent());
+ System.out.println(self.showContent());
});
}
// Writers - only if no writer is available
for (int i = 0; i < 5; i++) {
- executor.execute(() -> uerwl.writeContent(UUID.randomUUID().toString()));
+ executor.execute(() -> self.writeContent(UUID.randomUUID().toString()));
}
executor.shutdown();
}
diff --git a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java
index a0006a6..9f9974c 100644
--- a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java
+++ b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java
@@ -1,6 +1,5 @@
package br.com.leonardoz.features.locks;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@@ -60,16 +59,16 @@ public void lockMyHearthWithTiming() throws InterruptedException {
}
public static void main(String[] args) {
- ExecutorService executor = Executors.newCachedThreadPool();
- UsingExplicitReentrantLocks uel = new UsingExplicitReentrantLocks();
+ var executor = Executors.newCachedThreadPool();
+ var self = new UsingExplicitReentrantLocks();
for (int i = 0; i < 10; i++) {
- executor.execute(() -> uel.lockMyHearth());
+ executor.execute(() -> self.lockMyHearth());
}
for (int i = 0; i < 40; i++) {
executor.execute(() -> {
try {
- uel.lockMyHearthWithTiming();
+ self.lockMyHearthWithTiming();
} catch (InterruptedException e) {
e.printStackTrace();
}
diff --git a/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java b/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java
index 89e29ac..a9cc800 100644
--- a/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java
+++ b/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java
@@ -1,6 +1,5 @@
package br.com.leonardoz.features.locks;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
@@ -69,18 +68,18 @@ public synchronized void reentrancy() {
}
public static void main(String[] args) throws InterruptedException {
- ExecutorService executor = Executors.newCachedThreadPool();
- UsingIntrinsicLocks uil = new UsingIntrinsicLocks();
+ var executor = Executors.newCachedThreadPool();
+ var self = new UsingIntrinsicLocks();
for (int i = 0; i < 100; i++) {
- executor.execute(() -> uil.mySynchronizedMethod());
+ executor.execute(() -> self.mySynchronizedMethod());
}
Thread.sleep(1000);
for (int i = 0; i < 10; i++) {
- executor.execute(() -> uil.mySynchronizedBlock());
+ executor.execute(() -> self.mySynchronizedBlock());
}
Thread.sleep(1000);
for (int i = 0; i < 10; i++) {
- executor.execute(() -> uil.reentrancy());
+ executor.execute(() -> self.reentrancy());
}
executor.shutdown();
}
diff --git a/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java b/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java
new file mode 100644
index 0000000..eaf2b46
--- /dev/null
+++ b/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java
@@ -0,0 +1,99 @@
+package br.com.leonardoz.features.parallel_stream;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ *
+ * Warning
+ *
+ * Parallel Streams uses the ForkJoin Pool, so be aware of using it in Java
+ * EE/Jakarta EE environments!
+ *
+ * The Stream API allows, with methods like parallel() or parallelStream(), the
+ * execution of streams's operations in parallel.
+ *
+ * It enables each element to be processed in parallel, having a thread for each
+ * one of them, depending on the number of cores available. Like the Fork/Join
+ * Framework, it has an overhead, so the speed of execution can get much better
+ * in some cases, getting worse in some others.
+ *
+ * Streams are composed of a Source, several intermediate operations and a
+ * terminal operation. Streams are executed only when a terminal operation is
+ * executed, so they're lazy too. The intermediate operations respect the order
+ * that you used, they're sequential. The work of each intermediate operation is
+ * parallel.
+ *
+ * CPU intensive tasks benefits from this feature.
+ *
+ *
+ */
+public class UsingParallelStreams {
+
+ public static void main(String[] args) {
+ // Creating Parallel Streams from existing collection
+ new ArrayList<>().parallelStream();
+
+ // Making Stream Parallel
+ IntStream.rangeClosed(0, 30_000) // source
+ .parallel().mapToObj(BigInteger::valueOf).map(UsingParallelStreams::isPrime) // Intermediate operations
+ .collect(Collectors.toList()); // Terminal Operations
+
+ // Each operation run in parallel, out of order
+ IntStream.rangeClosed(0, 20) // source
+ .parallel().mapToObj(Integer::toString) // Intermediate operation
+ .forEach(System.out::print); // Terminal operation
+
+ System.out.println("\n");
+
+ // Runs sequentially, in order.
+ IntStream.rangeClosed(0, 20)
+ .mapToObj(Integer::toString)
+ .forEach(System.out::print);
+
+ System.out.println("\n");
+
+ dummyPerformanceCheck();
+ }
+
+ private static void dummyPerformanceCheck() {
+
+ // Sequential Stream
+ var start1 = System.currentTimeMillis();
+ IntStream.rangeClosed(0, 50_000)
+ .mapToObj(BigInteger::valueOf)
+ .map(UsingParallelStreams::isPrime)
+ .collect(Collectors.toList());
+ var end1 = System.currentTimeMillis();
+ var time1 = (end1 - start1) / 1000;
+ System.out.println("Sequential: " + time1);
+
+ // Parallel Stream
+ var start2 = System.currentTimeMillis();
+ IntStream.rangeClosed(0, 50_000)
+ .parallel()
+ .mapToObj(BigInteger::valueOf)
+ .map(UsingParallelStreams::isPrime)
+ .collect(Collectors.toList());
+ var end2 = System.currentTimeMillis();
+ var time2 = (end2 - start2) / 1000;
+ System.out.println("Parallel: " + time2);
+ }
+
+ // thanks to linski on
+ // https://stackoverflow.com/questions/15862271/java-compute-intensive-task
+ public static boolean isPrime(BigInteger n) {
+ var counter = BigInteger.ONE.add(BigInteger.ONE);
+ var isPrime = true;
+ while (counter.compareTo(n) == -1) {
+ if (n.remainder(counter).compareTo(BigInteger.ZERO) == 0) {
+ isPrime = false;
+ break;
+ }
+ counter = counter.add(BigInteger.ONE);
+ }
+ return isPrime;
+ }
+}
diff --git a/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java b/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java
index c5776d9..0b4610d 100644
--- a/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java
+++ b/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java
@@ -3,7 +3,6 @@
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
@@ -20,10 +19,10 @@ public class UsingBarriers {
public static void main(String[] args) {
- ExecutorService executor = Executors.newCachedThreadPool();
Runnable barrierAction = () -> System.out.println("Well done, guys!");
- CyclicBarrier barrier = new CyclicBarrier(10, barrierAction);
+ var executor = Executors.newCachedThreadPool();
+ var barrier = new CyclicBarrier(10, barrierAction);
Runnable task = () -> {
try {
diff --git a/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java b/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java
index 90904d4..1da6b2b 100644
--- a/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java
+++ b/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java
@@ -1,7 +1,6 @@
package br.com.leonardoz.features.synchronizers;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -19,8 +18,8 @@
public class UsingLatches {
public static void main(String[] args) {
- ExecutorService executor = Executors.newCachedThreadPool();
- CountDownLatch latch = new CountDownLatch(3);
+ var executor = Executors.newCachedThreadPool();
+ var latch = new CountDownLatch(3);
Runnable r = () -> {
try {
Thread.sleep(1000);
diff --git a/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java b/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java
index cfaf9b5..8fbffff 100644
--- a/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java
+++ b/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java
@@ -1,6 +1,5 @@
package br.com.leonardoz.features.synchronizers;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -20,8 +19,9 @@
*/
public class UsingSemaphores {
public static void main(String[] args) {
- ExecutorService executor = Executors.newCachedThreadPool();
- Semaphore semaphore = new Semaphore(3);
+
+ var executor = Executors.newCachedThreadPool();
+ var semaphore = new Semaphore(3);
Runnable r = () -> {
try {
diff --git a/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java b/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java
index 8d62445..129c1e2 100644
--- a/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java
+++ b/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java
@@ -14,12 +14,12 @@ public class UsingThreads {
public static void main(String[] args) throws InterruptedException {
// Creating
- Thread created = new Thread();
+ var created = new Thread();
created.start();
// .run() runs on main thread
// Assigning a task for running on a thread - we pass a Runnable instance
- Thread threadWithTask = new Thread(() -> System.out.println("Inside thread" + Thread.currentThread().getName()));
+ var threadWithTask = new Thread(() -> System.out.println("Inside thread" + Thread.currentThread().getName()));
threadWithTask.start();
// Interrupting a thread
@@ -28,7 +28,7 @@ public static void main(String[] args) throws InterruptedException {
System.out.println("Im not interrupted " + Thread.currentThread().getName());
}
};
- Thread interruptable = new Thread(interrupatblyTask);
+ var interruptable = new Thread(interrupatblyTask);
interruptable.start();
Thread.sleep(3000);
interruptable.interrupt();
diff --git a/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java b/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java
index 42220c6..3a9d205 100644
--- a/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java
+++ b/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java
@@ -22,7 +22,7 @@
*/
public class ExplicitConditionQueue {
- private static final int LIMIT = 8;
+ private static final int LIMIT = 5;
private int messageCount = 0;
private Lock lock = new ReentrantLock();
private Condition limitReachedCondition = lock.newCondition();
@@ -58,13 +58,13 @@ public void printMessages(String message) throws InterruptedException {
}
public static void main(String[] args) {
- ExplicitConditionQueue eqc = new ExplicitConditionQueue();
+ var queue = new ExplicitConditionQueue();
// Will run indefinitely
new Thread(() -> {
while (true) {
- String uuidMessage = UUID.randomUUID().toString();
+ var uuidMessage = UUID.randomUUID().toString();
try {
- eqc.printMessages(uuidMessage);
+ queue.printMessages(uuidMessage);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -73,7 +73,7 @@ public static void main(String[] args) {
new Thread(() -> {
while (true) {
try {
- eqc.stopMessages();
+ queue.stopMessages();
} catch (InterruptedException e) {
e.printStackTrace();
}
diff --git a/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java b/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java
index fbf1c37..afa138f 100644
--- a/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java
+++ b/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java
@@ -39,16 +39,16 @@ public synchronized void stopsMessaging() {
public synchronized void message() throws InterruptedException {
while (!continueToNotify)
wait();
- String message = messages.take();
+ var message = messages.take();
System.out.println(message);
}
public static void main(String[] args) {
- List messages = new LinkedList<>();
+ var messages = new LinkedList();
for (int i = 0; i < 130; i++) {
messages.add(UUID.randomUUID().toString());
}
- WaitNotifyQueue waitNotifyQueue = new WaitNotifyQueue(messages);
+ var waitNotifyQueue = new WaitNotifyQueue(messages);
new Thread(() -> {
try {
while (true) {
@@ -56,13 +56,14 @@ public static void main(String[] args) {
Thread.sleep(300);
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
e.printStackTrace();
}
}).start();
- Random r = new Random();
+ var random = new Random();
new Thread(() -> {
while (true) {
- int val = r.nextInt(100);
+ int val = random.nextInt(100);
System.out.println(val);
if (val == 99) {
break;
@@ -70,6 +71,7 @@ public static void main(String[] args) {
try {
Thread.sleep(400);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
e.printStackTrace();
}
}
@@ -77,6 +79,7 @@ public static void main(String[] args) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
e.printStackTrace();
}
}).start();
diff --git a/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java b/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java
index 10a8d89..5576ac1 100644
--- a/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java
+++ b/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java
@@ -1,7 +1,6 @@
package br.com.leonardoz.patterns.controlled_initialization;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
@@ -81,7 +80,7 @@ public ControlledInitialization() {
}
private void doTask() {
- System.out.println("=== Resources initialized ===");
+ System.out.println("=== Resources Initialized ===");
System.out.println("Resource 1 instance " + resource1);
System.out.println("Resource 2 instance " + resource2);
System.out.println("Resource 3 instance " + resource3);
@@ -90,7 +89,7 @@ private void doTask() {
private void initialize() {
System.out.println("=== Initializing Resources ===");
- ExecutorService executor = Executors.newFixedThreadPool(3);
+ var executor = Executors.newFixedThreadPool(3);
executor.execute(initResource1);
executor.execute(initResource2);
executor.execute(initResource3);
diff --git a/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelDivideAndConquer.java b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelDivideAndConquer.java
new file mode 100644
index 0000000..4cef549
--- /dev/null
+++ b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelDivideAndConquer.java
@@ -0,0 +1,53 @@
+package br.com.leonardoz.patterns.divideconquer;
+
+import java.util.List;
+import java.util.concurrent.RecursiveTask;
+
+/*
+ * Pattern: Parallel Divide and Conquer
+ *
+ * Motivations: Some kind of tasks, algorithms or flows can be divided
+ * into smaller pieces, which are independently resolvable, making them fall
+ * into the category of divide and conquer computations. Those properties
+ * allows the execution to be parallel, possible increasing the speed if those
+ * problems were difficult enough to compensate the cost involved in parallelization.
+ *
+ * Intent: Make the execution of some computation that uses the divide and conquer
+ * approach parallel, in order to increase performance if the problem difficulty is
+ * enough to overcome the overhead of using threads and coordination. It uses the ForkJoin Framework
+ * to model those problems.
+ *
+ * Applicability: Divide and conquer computations.
+ *
+ */
+public class ParallelDivideAndConquer {
+
+ private final static int THRESHOLD = 10; // Choosing a number to split the computation
+
+
+ public static class Task extends RecursiveTask {
+
+ private static final long serialVersionUID = 1L;
+ private List somethingToDivideAndConquer;
+
+ public Task(List somethingToDivideAndConquer) {
+ this.somethingToDivideAndConquer = somethingToDivideAndConquer;
+
+ }
+
+ @Override
+ protected Integer compute() {
+ var size = somethingToDivideAndConquer.size();
+ if (size < THRESHOLD) {
+ // solves directly
+ return 1;
+ } else {
+ // creates tasks, fork and join
+ return 2;
+ }
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelSum.java b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelSum.java
new file mode 100644
index 0000000..8fff70e
--- /dev/null
+++ b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelSum.java
@@ -0,0 +1,106 @@
+package br.com.leonardoz.patterns.divideconquer;
+
+import java.math.BigInteger;
+import java.util.List;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+/*
+ * Pattern: Parallel Divide and Conquer
+ *
+ * Example: Parallel Sum
+ *
+ * This is a simple example for educational purpose only, run on your
+ * machine and check which one is better!
+ */
+public class ParallelSum extends RecursiveTask {
+
+ private static final long serialVersionUID = 1L;
+ private final static int THRESHOLD = 10_000; // Choosing a number to split the computation
+
+ private List nums;
+
+ public ParallelSum(List nums) {
+ this.nums = nums;
+ }
+
+ @Override
+ protected BigInteger compute() {
+ var size = nums.size();
+ if (size < THRESHOLD) {
+ return sequentialSum(nums);
+ } else {
+ var x = new ParallelSum(nums.subList(0, size / 2));
+ var y = new ParallelSum(nums.subList(size / 2, size));
+ x.fork();
+ y.fork();
+ var xResult = x.join();
+ var yResult = y.join();
+ return yResult.add(xResult);
+ }
+ }
+
+ /*
+ * Just showing how to use the pattern and some really dummy benchmark. Don't
+ * take it seriously.
+ */
+ public static void main(String[] args) throws InterruptedException {
+ var nums = LongStream.range(0, 10_000_000L)
+ .mapToObj(BigInteger::valueOf)
+ .collect(Collectors.toList());
+
+// Run one then comment and run another
+ Runnable parallel = () -> {
+ var commonPool = ForkJoinPool.commonPool();
+ var result = commonPool.invoke(new ParallelSum(nums));
+
+ System.out.println("Parallel Result is: " + result);
+ };
+
+ Runnable sequential = () -> {
+ var acc = sequentialSum(nums);
+
+ System.out.println("Sequential Result is: " + acc);
+ };
+
+ sequential.run();
+ parallel.run();
+
+ Thread.sleep(2000);
+
+ System.out.println("#### After some JIT \n\n");
+
+ dummyBenchmark(sequential);
+ dummyBenchmark(parallel);
+
+ Thread.sleep(2000);
+
+ System.out.println("#### After more JIT \n\n");
+
+ dummyBenchmark(sequential);
+ dummyBenchmark(parallel);
+ }
+
+ private static BigInteger sequentialSum(List nums) {
+ var acc = BigInteger.ZERO;
+ for (var value : nums) {
+ acc = acc.add(value);
+ }
+ return acc;
+ }
+
+ static void getHot(Runnable runnable) {
+ runnable.run();
+ }
+
+ static void dummyBenchmark(Runnable runnable) {
+ var before = System.currentTimeMillis();
+ runnable.run();
+ var after = System.currentTimeMillis();
+ System.out.println("Executed in: " + (after - before));
+ System.out.println("######\n");
+ }
+
+}
diff --git a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java
index 68291c2..862dd89 100644
--- a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java
+++ b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java
@@ -48,8 +48,8 @@ public void withdrawCoins(BigInteger amount) {
}
public void transferBetweenPlayers(Player playerFrom, Player playerTo, BigInteger amount) {
- int from = playerFrom.getId();
- int to = playerTo.getId();
+ var from = playerFrom.getId();
+ var to = playerTo.getId();
if (from < to) {
synchronized (playerFrom) {
synchronized (playerTo) {
diff --git a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java
index c56a955..5ac14d6 100644
--- a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java
+++ b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java
@@ -38,8 +38,8 @@ public void setAnotherValue(String anotherValue) {
}
public void doSomeOperation(LockableObject obj1, LockableObject obj2) {
- int obj1Id = obj1.getId();
- int obj2Id = obj2.getId();
+ var obj1Id = obj1.getId();
+ var obj2Id = obj2.getId();
if (obj1Id < obj2Id) {
synchronized (obj1) {
synchronized (obj2) {
diff --git a/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java b/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java
index 65f884c..972bc7b 100644
--- a/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java
+++ b/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java
@@ -3,7 +3,6 @@
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -28,7 +27,7 @@ public class ProducerConsumer {
private Callable consumer = () -> {
while (true) {
- String dataUnit = data.poll(5, TimeUnit.SECONDS);
+ var dataUnit = data.poll(5, TimeUnit.SECONDS);
if (dataUnit == null)
break;
System.out.println("Consumed " + dataUnit + " from " + Thread.currentThread().getName());
@@ -38,14 +37,14 @@ public class ProducerConsumer {
private Callable producer = () -> {
for (int i = 0; i < 90_000; i++) {
- String dataUnit = UUID.randomUUID().toString();
+ var dataUnit = UUID.randomUUID().toString();
data.put(dataUnit);
}
return null;
};
public void run(long forHowLong, TimeUnit unit) throws InterruptedException {
- ExecutorService pool = Executors.newCachedThreadPool();
+ var pool = Executors.newCachedThreadPool();
pool.submit(producer);
pool.submit(consumer);
pool.submit(consumer);
@@ -54,7 +53,7 @@ public void run(long forHowLong, TimeUnit unit) throws InterruptedException {
}
public static void main(String[] args) {
- ProducerConsumer producerConsumer = new ProducerConsumer();
+ var producerConsumer = new ProducerConsumer();
try {
producerConsumer.run(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
diff --git a/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java b/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java
index cc12cee..bcd12a0 100644
--- a/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java
+++ b/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java
@@ -2,7 +2,6 @@
import java.util.Arrays;
import java.util.Random;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
@@ -12,16 +11,16 @@
*/
public class ResourcePoolUsage {
public static void main(String[] args) {
- ExecutorService executor = Executors.newCachedThreadPool();
- ResourcePool pool = new ResourcePool<>(15,
+ var executor = Executors.newCachedThreadPool();
+ var pool = new ResourcePool(15,
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 10, 11, 12, 13, 14));
- Random r = new Random();
+ var random = new Random();
for (int i = 0; i < 30; i++) {
executor.execute(() -> {
try {
- Integer value = pool.get(60);
+ var value = pool.get(60);
System.out.println("Value taken: " + value);
- Thread.sleep(r.nextInt(5000));
+ Thread.sleep(random.nextInt(5000));
pool.release(value);
System.out.println("Value released " + value);
} catch (InterruptedException e) {
diff --git a/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java b/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java
index 738703a..f9b7903 100644
--- a/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java
+++ b/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java
@@ -14,7 +14,7 @@ public class BackgroundTimePrintTask {
private Thread thread;
private Runnable task = () -> {
while (!Thread.currentThread().isInterrupted()) {
- Date date = new Date(System.currentTimeMillis());
+ var date = new Date(System.currentTimeMillis());
System.out.println(new SimpleDateFormat().format(date));
try {
Thread.sleep(1000);
@@ -37,13 +37,13 @@ public void cancel() {
}
public static void main(String[] args) {
- BackgroundTimePrintTask bttt = new BackgroundTimePrintTask();
- bttt.run();
+ var self = new BackgroundTimePrintTask();
+ self.run();
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
- bttt.cancel();
+ self.cancel();
}
}
diff --git a/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java b/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java
index 60be10c..25e53e2 100644
--- a/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java
+++ b/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java
@@ -19,7 +19,7 @@ public class ThreadTaskCancel {
private Thread thread;
private Runnable task = () -> {
while (!Thread.currentThread().isInterrupted()) {
- // keep going
+ // keep going - be aware of using this Pattern with the Interrupted exception! It won't work.
}
};
diff --git a/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java b/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java
index 534ab0c..0b56494 100644
--- a/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java
+++ b/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java
@@ -8,6 +8,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
/**
* Pattern: Task Convergence
@@ -32,10 +33,10 @@ public class TaskConvergence {
private ExecutorService executor;
private Runnable run = () -> {
- Random random = new Random();
- List results = new LinkedList<>();
+ var random = new Random();
+ var results = new LinkedList();
for (int i = 0; i < ITERS; i++) {
- Long next = (long) random.nextInt(BOUND);
+ var next = (long) random.nextInt(BOUND);
results.add(next);
}
try {
@@ -60,13 +61,19 @@ public class TaskConvergence {
public TaskConvergence() {
barrier = new CyclicBarrier(CORES, onComplete);
synchronizedLinkedList = Collections.synchronizedList(new LinkedList<>());
- executor = Executors.newFixedThreadPool(4);
+ executor = Executors.newFixedThreadPool(CORES);
}
public void run() {
for (int i = 0; i < CORES; i++) {
executor.execute(run);
}
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ executor.shutdown();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
private void persist(List randomNumbers) {
diff --git a/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java b/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java
index ea90d29..599df79 100644
--- a/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java
+++ b/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java
@@ -43,22 +43,26 @@ public BackgroundTaskExecutor(int threadsForTasks) {
}
public Future execute(Callable task) {
- Future submited = executor.submit(task);
+ var submited = executor.submit(task);
return submited;
}
public List> execute(List> tasks) {
- List> futureTasks = tasks.stream().map(executor::submit).collect(Collectors.toList());
+ var futureTasks = tasks.stream()
+ .map(executor::submit)
+ .collect(Collectors.toList());
return futureTasks;
}
public boolean cancel(Future task) {
- boolean canceled = task.cancel(true);
+ var canceled = task.cancel(true);
return canceled;
}
public boolean cancel(List> task) {
- boolean hasAFalse = task.stream().map(f -> f.cancel(true)).anyMatch(b -> b.equals(false));
+ var hasAFalse = task.stream()
+ .map(f -> f.cancel(true))
+ .anyMatch(b -> b.equals(false));
return !hasAFalse;
}
@@ -71,7 +75,9 @@ public List> completeTask(List> tasks, OnInterruption<
return Optional.empty();
}
};
- List> results = tasks.stream().map(fn).collect(Collectors.toList());
+ var results = tasks.stream()
+ .map(fn)
+ .collect(Collectors.toList());
return results;
}
@@ -94,7 +100,7 @@ public void shutdownTasks(long timeout, TimeUnit timeUnit, OnShutdownError onShu
}
public List shutdownNowTasks(long timeout, TimeUnit timeUnit, OnShutdownError onShutdownError) {
- List remainingTasks = executor.shutdownNow();
+ var remainingTasks = executor.shutdownNow();
try {
executor.awaitTermination(timeout, timeUnit);
} catch (InterruptedException e) {
diff --git a/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java b/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java
index 2fd4048..006744b 100644
--- a/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java
+++ b/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java
@@ -1,6 +1,5 @@
package br.com.leonardoz.patterns.thread_safe.shared_state;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import br.com.leonardoz.patterns.GuardedBy;
@@ -30,8 +29,8 @@ public synchronized void decrease() {
}
public static void main(String[] args) {
- VisitCounter counter = new VisitCounter();
- ExecutorService threadPool = Executors.newCachedThreadPool();
+ var counter = new VisitCounter();
+ var threadPool = Executors.newCachedThreadPool();
for (int i = 1; i <= 50; i++) {
System.out.println("value " + counter.actualValue() + " i " + i);
threadPool.execute(() -> counter.increase());