In this demo we have 2 Case Providers, one for Activiti and one for WPS. I used Thread.sleep to mimic a time consuming computation (e.g. IO). Now we want to have a composite Case Provider which composes cases from both case providers. However, as we naively implement this in SequentialCaseProvider we will see that the total time it takes is the sum of having the 2 caseproviders fetch the cases (9 seconds). By using a concurrent approach in ConcurrentCaseProvider, we can reduce the time to +- the time it takes the longest CaseProvider to fetch the cases. (5 seconds)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
public class Case { | |
private String message; | |
public Case(final String message) { | |
this.message = message; | |
} | |
public String getMessage() { | |
return this.message; | |
} | |
@Override | |
public String toString() { | |
return "Case(" + message + ")"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.List; | |
public interface CaseProvider { | |
List<Case> getCases() throws Exception; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class ActivitiCaseProvider implements CaseProvider { | |
private static final List<Case> ACTIVITI_CASES; | |
static { | |
ACTIVITI_CASES = new ArrayList<Case>(); | |
ACTIVITI_CASES.add(new Case("ACTIVITI CASE 1")); | |
ACTIVITI_CASES.add(new Case("ACTIVITI CASE 2")); | |
ACTIVITI_CASES.add(new Case("ACTIVITI CASE 3")); | |
} | |
public List<Case> getCases() throws Exception { | |
// mimic an expensive operation by sleeping for 4 seconds | |
Thread.sleep(4000l); | |
return ACTIVITI_CASES; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class WPSCaseProvider implements CaseProvider { | |
private static final List<Case> WPS_CASES; | |
static { | |
WPS_CASES = new ArrayList<Case>(); | |
WPS_CASES.add(new Case("WPS CASE 1")); | |
WPS_CASES.add(new Case("WPS CASE 2")); | |
WPS_CASES.add(new Case("WPS CASE 3")); | |
} | |
public List<Case> getCases() throws Exception { | |
// mimic an expensive operation by sleeping for 5 seconds | |
Thread.sleep(5000l); | |
return WPS_CASES; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.List; | |
public interface CompositeCaseProvider extends CaseProvider { | |
void setCaseProviders(final List<CaseProvider> caseProviders); | |
List<CaseProvider> getCaseProviders(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.List; | |
public abstract class AbstractCompositeCaseProvider implements CompositeCaseProvider { | |
private List<CaseProvider> caseProviders; | |
public void setCaseProviders(final List<CaseProvider> caseProviders) { | |
this.caseProviders = caseProviders; | |
} | |
public List<CaseProvider> getCaseProviders() { | |
return caseProviders; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.ArrayList; | |
import java.util.List; | |
public class SequentialCaseProvider extends AbstractCompositeCaseProvider { | |
public List<Case> getCases() throws Exception { | |
final List<Case> cases = new ArrayList<Case>(); | |
for (final CaseProvider caseProvider : getCaseProviders()) { | |
cases.addAll(caseProvider.getCases()); | |
} | |
return cases; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.FutureTask; | |
public final class ConcurrencyHelper { | |
public static <T> List<T> getResults(final Iterable<Callable<T>> callables, final int numberOfThreads, | |
final int sleepInMillis) throws InterruptedException, ExecutionException { | |
final List<T> results = new ArrayList<T>(); | |
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); | |
final List<FutureTask<T>> tasks = mapToFutureTasks(callables); | |
for (final FutureTask<T> task : tasks) { | |
executor.execute(task); | |
} | |
boolean isFinished = isDone(tasks); | |
while (!isFinished) { | |
try { | |
Thread.sleep(sleepInMillis); | |
isFinished = isDone(tasks); | |
} catch (final InterruptedException e) { | |
} | |
} | |
for (final FutureTask<T> task : tasks) { | |
results.add(task.get()); | |
} | |
executor.shutdown(); | |
return results; | |
} | |
private static <T> List<FutureTask<T>> mapToFutureTasks(final Iterable<Callable<T>> callables) { | |
final List<FutureTask<T>> futureTasks = new ArrayList<FutureTask<T>>(); | |
for (final Callable<T> callable : callables) { | |
futureTasks.add(new FutureTask<T>(callable)); | |
} | |
return futureTasks; | |
} | |
private static <T> boolean isDone(final Iterable<FutureTask<T>> tasks) { | |
final boolean isDone = true; | |
for (final FutureTask<T> task : tasks) { | |
if (!task.isDone()) { | |
return false; | |
} | |
} | |
return isDone; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
public class CallableCaseProviderAdapter implements Callable<List<Case>> { | |
private CaseProvider caseProvider; | |
public CallableCaseProviderAdapter(final CaseProvider caseProvider) { | |
this.caseProvider = caseProvider; | |
} | |
public List<Case> call() throws Exception { | |
return caseProvider.getCases(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Callable; | |
public class ConcurrentCaseProvider extends AbstractCompositeCaseProvider { | |
public List<Case> getCases() throws Exception { | |
final List<Case> result = new ArrayList<Case>(); | |
final int numberOfThreads = 2; | |
final int sleepInMillis = 300; | |
final List<List<Case>> casesList = ConcurrencyHelper.getResults( | |
getCallables(), numberOfThreads, sleepInMillis); | |
for (final List<Case> cases : casesList) { | |
result.addAll(cases); | |
} | |
return result; | |
} | |
public List<Callable<List<Case>>> getCallables() { | |
final List<Callable<List<Case>>> callables = new ArrayList<Callable<List<Case>>>(); | |
for (final CaseProvider caseProvider : getCaseProviders()) { | |
callables.add(new CallableCaseProviderAdapter(caseProvider)); | |
} | |
return callables; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pelssers.futuretask; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.List; | |
/** | |
* The output from running main(): | |
* | |
* Fetching cases using com.pelssers.futuretask.SequentialCaseProvider | |
* [Case(ACTIVITI CASE 1), Case(ACTIVITI CASE 2), Case(ACTIVITI CASE 3), Case(WPS CASE 1), Case(WPS CASE 2), Case(WPS CASE 3)] | |
* Fetching cases took 9 seconds | |
* Fetching cases using com.pelssers.futuretask.ConcurrentCaseProvider | |
* [Case(ACTIVITI CASE 1), Case(ACTIVITI CASE 2), Case(ACTIVITI CASE 3), Case(WPS CASE 1), Case(WPS CASE 2), Case(WPS CASE 3)] | |
* Fetching cases took 5 seconds | |
*/ | |
public class CaseProviderProgram { | |
public List<CaseProvider> getCaseProviders() { | |
final List<CaseProvider> caseProviders = new ArrayList<CaseProvider>(); | |
caseProviders.add(new ActivitiCaseProvider()); | |
caseProviders.add(new WPSCaseProvider()); | |
return caseProviders; | |
} | |
public CaseProvider getSequentialCaseProvider() { | |
final CompositeCaseProvider compositeCaseProvider = new SequentialCaseProvider(); | |
compositeCaseProvider.setCaseProviders(getCaseProviders()); | |
return compositeCaseProvider; | |
} | |
public CaseProvider getConcurrentCaseProvider() { | |
final CompositeCaseProvider compositeCaseProvider = new ConcurrentCaseProvider(); | |
compositeCaseProvider.setCaseProviders(getCaseProviders()); | |
return compositeCaseProvider; | |
} | |
public static void main(final String[] args) throws Exception { | |
final CaseProviderProgram program = new CaseProviderProgram(); | |
timeGetCases(program.getSequentialCaseProvider()); | |
timeGetCases(program.getConcurrentCaseProvider()); | |
} | |
public static void timeGetCases(final CaseProvider caseProvider) throws Exception { | |
System.out.println("Fetching cases using " + caseProvider.getClass().getName()); | |
// now time how long it takes to fetch all cases using the provided caseProvider | |
final Date start = new Date(); | |
final List<Case> cases = caseProvider.getCases(); | |
System.out.println(cases); | |
final Date end = new Date(); | |
final long duration = (end.getTime() - start.getTime()) / 1000; | |
System.out.println("Fetching cases took " + duration + " seconds"); | |
} | |
} |
No comments:
Post a Comment