Concurrency models
One of the most exciting aspects of the Java programming language is its robustness. When addressing parallel execution challenges, Java supports multiple models, so the approach we take is up to us. Usually, there is not just one way of doing things, with each possible solution presenting both advantages and trade-offs. Our goal is to create Java applications that run efficiently and are scalable and maintainable. To that end, we will use the thread-based concurrency approach (detailed later in this chapter). Our selection is based on its straightforward nature.
Concurrency, in the context of computer science, is the simultaneous execution of instructions. This is achieved through multithreading (think of multitasking). This programming paradigm includes the ability to access Java objects and other resources from multiple threads. Let’s look at three specific models (thread-based, message passing, and reactive) and then compare them to see which model might be more ideal, given a specific scenario, than others.
Thread-based model
The thread-based model, also referred to as the shared memory model, is the most used Java concurrency model. The notion of a thread is that all threads share the same physical memory and conduct their intercommunication via that memory. The Java language has deep support for threads, with the Thread
class and Callable
and Runnable
interfaces.
Let’s look at a simple implementation example. We will implement the increment
method and mark it with the synchronized
keyword. This tells Java to only execute one thread at any given time:
public class MyCounter { private int count = 0; public synchronized void increment() { count++; } public int getCount() { return count; }
This next section of code contains our main()
method. In this method, we create two threads; both will increment our counter:
public static void main(String[] args) throws InterruptedException { MyCounter counter = new MyCounter(); Thread t1 = new Thread(() -> { for(int i = 0; i < 1000; i++) { counter.increment(); } }); Thread t2 = new Thread(() -> { for(int i = 0; i < 1000; i++) { counter.increment(); } });
The next two lines of code start the threads:
t1.start(); t2.start();
In the next two lines of code, we wait for both threads to finish:
t1.join(); t2.join();
Lastly, we output the final results:
System.out.println("Final counter value: " + counter.getCount()); } }
The straightforward nature of thread-based model implementation represents a tremendous advantage. This approach is typical for smaller applications. There are potential disadvantages to using this model, as deadlocks and race conditions can be introduced when multiple threads attempt to access shared, mutable data.
Deadlocks and race conditions
Deadlocks occur when two threads wait for the other to release a needed resource. Race conditions occur when the sequence of the thread execution is required.
Both deadlocks and race conditions should be avoided as much as possible in our applications.
The message passing model
The message passing model is an interesting one in that it avoids shared states. This model requires threads to intercommunicate by sending messages.
Shared states
A shared state exists when more than one thread in an application can simultaneously access data.
The message passing model offers assurances against deadlocks and race conditions. A benefit of this model is that it promotes scalability.
Let’s look at how we can implement the message passing model. Our example includes a simple sender and receiver scenario. We start with our import
statements and then create a Message
class:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class MessagePassingExample { static class Message { private final String content; public Message(String content) { this.content = content; } public String getContent() { return content; } }
Next, we will have our Sender
class implement the Runnable
interface:
static class Sender implements Runnable { private final BlockingQueue<Message> queue; public Sender(BlockingQueue<Message> q) { this.queue = q; } @Override public void run() { // Sending messages String[] messages = {"First message", "Second message", "Third message", "Done"}; for (String m : messages) { try { Thread.sleep(1000); // Simulating work queue.put(new Message(m)); System.out.println("Sent: " + m); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }
Next, we will have our Receiver
class implement the Runnable
interface:
static class Receiver implements Runnable { private final BlockingQueue<Message> queue; public Receiver(BlockingQueue<Message> q) { this.queue = q; } @Override public void run() { try { Message msg; // Receiving messages while (!((msg = queue.take()).getContent().equals("Done"))) { System.out.println("Received: " + msg.getContent()); Thread.sleep(400); // Simulating work } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
The last step is to create our main()
method:
public static void main(String[] args) { BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); Thread senderThread = new Thread(new Sender(queue)); Thread receiverThread = new Thread(new Receiver(queue)); senderThread.start(); receiverThread.start(); } }
Our example implemented Sender
and Receiver
as Runnable
classes. They communicated using BlockingQueue
. The queue is used for Sender
to add messages and Receiver
to take and process them. Sender
sends Done
to the queue so that Receiver
knows when it can stop processing. The message passing model is often used in distributed systems, due to its support of highly scalable systems.
The Reactive model
The Reactive model is newer than the last two models we covered. Its focus is on non-blocking, event-driven programming. This model is usually evident in large-scale systems that process extensive input/output operations, especially when high scalability is needed. There are external libraries that we can use to implement this model, including Project Reactor and RxJava.
Let’s look at a simple implementation example using Project Reactor. We start by adding the Project Reactor dependency to our project. Here is how that looks using Maven as the build tool:
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.4.0</version> </dependency>
The following example demonstrates how to create a reactive stream to process a sequence of events:
import reactor.core.publisher.Flux; public class ReactiveExample { public static void main(String[] args) { Flux<String> messageFlux = Flux.just("Hello", "Reactive", "World", "with", "Java") .map(String::toUpperCase) .filter(s -> s.length() > 4); messageFlux.subscribe(System.out::println); } }
The Reactive model offers efficient resource use, blocking operation avoidance, and a unique approach to asynchronous programming. However, it can be more difficult to implement compared to the other models we covered.
Comparative analysis
Each of the three concurrency models offers different benefits, and understanding their individuality and differences can help you make an informed decision regarding which model to adopt.