If you're interested in functional programming, you might also want to checkout my second blog which i'm actively working on!!

Wednesday, April 2, 2014

Using Java's FutureTask to execute Callable tasks concurrently

In this demo we have 2 Case Providers, one for Activiti and one for WPS. I used Thread.sleep to mimic a time consuming computation (e.g. IO). Now we want to have a composite Case Provider which composes cases from both case providers. However, as we naively implement this in SequentialCaseProvider we will see that the total time it takes is the sum of having the 2 caseproviders fetch the cases (9 seconds). By using a concurrent approach in ConcurrentCaseProvider, we can reduce the time to +- the time it takes the longest CaseProvider to fetch the cases. (5 seconds)

package com.pelssers.futuretask;
public class Case {
private String message;
public Case(final String message) {
this.message = message;
}
public String getMessage() {
return this.message;
}
@Override
public String toString() {
return "Case(" + message + ")";
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.List;
public interface CaseProvider {
List<Case> getCases() throws Exception;
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.ArrayList;
import java.util.List;
public class ActivitiCaseProvider implements CaseProvider {
private static final List<Case> ACTIVITI_CASES;
static {
ACTIVITI_CASES = new ArrayList<Case>();
ACTIVITI_CASES.add(new Case("ACTIVITI CASE 1"));
ACTIVITI_CASES.add(new Case("ACTIVITI CASE 2"));
ACTIVITI_CASES.add(new Case("ACTIVITI CASE 3"));
}
public List<Case> getCases() throws Exception {
// mimic an expensive operation by sleeping for 4 seconds
Thread.sleep(4000l);
return ACTIVITI_CASES;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.ArrayList;
import java.util.List;
public class WPSCaseProvider implements CaseProvider {
private static final List<Case> WPS_CASES;
static {
WPS_CASES = new ArrayList<Case>();
WPS_CASES.add(new Case("WPS CASE 1"));
WPS_CASES.add(new Case("WPS CASE 2"));
WPS_CASES.add(new Case("WPS CASE 3"));
}
public List<Case> getCases() throws Exception {
// mimic an expensive operation by sleeping for 5 seconds
Thread.sleep(5000l);
return WPS_CASES;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.List;
public interface CompositeCaseProvider extends CaseProvider {
void setCaseProviders(final List<CaseProvider> caseProviders);
List<CaseProvider> getCaseProviders();
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.List;
public abstract class AbstractCompositeCaseProvider implements CompositeCaseProvider {
private List<CaseProvider> caseProviders;
public void setCaseProviders(final List<CaseProvider> caseProviders) {
this.caseProviders = caseProviders;
}
public List<CaseProvider> getCaseProviders() {
return caseProviders;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.ArrayList;
import java.util.List;
public class SequentialCaseProvider extends AbstractCompositeCaseProvider {
public List<Case> getCases() throws Exception {
final List<Case> cases = new ArrayList<Case>();
for (final CaseProvider caseProvider : getCaseProviders()) {
cases.addAll(caseProvider.getCases());
}
return cases;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public final class ConcurrencyHelper {
public static <T> List<T> getResults(final Iterable<Callable<T>> callables, final int numberOfThreads,
final int sleepInMillis) throws InterruptedException, ExecutionException {
final List<T> results = new ArrayList<T>();
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final List<FutureTask<T>> tasks = mapToFutureTasks(callables);
for (final FutureTask<T> task : tasks) {
executor.execute(task);
}
boolean isFinished = isDone(tasks);
while (!isFinished) {
try {
Thread.sleep(sleepInMillis);
isFinished = isDone(tasks);
} catch (final InterruptedException e) {
}
}
for (final FutureTask<T> task : tasks) {
results.add(task.get());
}
executor.shutdown();
return results;
}
private static <T> List<FutureTask<T>> mapToFutureTasks(final Iterable<Callable<T>> callables) {
final List<FutureTask<T>> futureTasks = new ArrayList<FutureTask<T>>();
for (final Callable<T> callable : callables) {
futureTasks.add(new FutureTask<T>(callable));
}
return futureTasks;
}
private static <T> boolean isDone(final Iterable<FutureTask<T>> tasks) {
final boolean isDone = true;
for (final FutureTask<T> task : tasks) {
if (!task.isDone()) {
return false;
}
}
return isDone;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.List;
import java.util.concurrent.Callable;
public class CallableCaseProviderAdapter implements Callable<List<Case>> {
private CaseProvider caseProvider;
public CallableCaseProviderAdapter(final CaseProvider caseProvider) {
this.caseProvider = caseProvider;
}
public List<Case> call() throws Exception {
return caseProvider.getCases();
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
public class ConcurrentCaseProvider extends AbstractCompositeCaseProvider {
public List<Case> getCases() throws Exception {
final List<Case> result = new ArrayList<Case>();
final int numberOfThreads = 2;
final int sleepInMillis = 300;
final List<List<Case>> casesList = ConcurrencyHelper.getResults(
getCallables(), numberOfThreads, sleepInMillis);
for (final List<Case> cases : casesList) {
result.addAll(cases);
}
return result;
}
public List<Callable<List<Case>>> getCallables() {
final List<Callable<List<Case>>> callables = new ArrayList<Callable<List<Case>>>();
for (final CaseProvider caseProvider : getCaseProviders()) {
callables.add(new CallableCaseProviderAdapter(caseProvider));
}
return callables;
}
}
view raw gistfile1.java hosted with ❤ by GitHub
package com.pelssers.futuretask;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* The output from running main():
*
* Fetching cases using com.pelssers.futuretask.SequentialCaseProvider
* [Case(ACTIVITI CASE 1), Case(ACTIVITI CASE 2), Case(ACTIVITI CASE 3), Case(WPS CASE 1), Case(WPS CASE 2), Case(WPS CASE 3)]
* Fetching cases took 9 seconds
* Fetching cases using com.pelssers.futuretask.ConcurrentCaseProvider
* [Case(ACTIVITI CASE 1), Case(ACTIVITI CASE 2), Case(ACTIVITI CASE 3), Case(WPS CASE 1), Case(WPS CASE 2), Case(WPS CASE 3)]
* Fetching cases took 5 seconds
*/
public class CaseProviderProgram {
public List<CaseProvider> getCaseProviders() {
final List<CaseProvider> caseProviders = new ArrayList<CaseProvider>();
caseProviders.add(new ActivitiCaseProvider());
caseProviders.add(new WPSCaseProvider());
return caseProviders;
}
public CaseProvider getSequentialCaseProvider() {
final CompositeCaseProvider compositeCaseProvider = new SequentialCaseProvider();
compositeCaseProvider.setCaseProviders(getCaseProviders());
return compositeCaseProvider;
}
public CaseProvider getConcurrentCaseProvider() {
final CompositeCaseProvider compositeCaseProvider = new ConcurrentCaseProvider();
compositeCaseProvider.setCaseProviders(getCaseProviders());
return compositeCaseProvider;
}
public static void main(final String[] args) throws Exception {
final CaseProviderProgram program = new CaseProviderProgram();
timeGetCases(program.getSequentialCaseProvider());
timeGetCases(program.getConcurrentCaseProvider());
}
public static void timeGetCases(final CaseProvider caseProvider) throws Exception {
System.out.println("Fetching cases using " + caseProvider.getClass().getName());
// now time how long it takes to fetch all cases using the provided caseProvider
final Date start = new Date();
final List<Case> cases = caseProvider.getCases();
System.out.println(cases);
final Date end = new Date();
final long duration = (end.getTime() - start.getTime()) / 1000;
System.out.println("Fetching cases took " + duration + " seconds");
}
}
view raw gistfile1.java hosted with ❤ by GitHub

No comments:

Post a Comment