1 | // Copyright (c) 2006 Dustin Sallings <dustin@spy.net> |
2 | |
3 | package net.spy.memcached.compat; |
4 | |
5 | import java.util.ArrayList; |
6 | import java.util.Collection; |
7 | import java.util.IdentityHashMap; |
8 | import java.util.concurrent.Callable; |
9 | import java.util.concurrent.CountDownLatch; |
10 | import java.util.concurrent.CyclicBarrier; |
11 | |
12 | /** |
13 | * Thread that invokes a callable multiple times concurrently. |
14 | */ |
15 | public class SyncThread<T> extends SpyThread { |
16 | |
17 | private final Callable<T> callable; |
18 | private final CyclicBarrier barrier; |
19 | private final CountDownLatch latch; |
20 | private Throwable throwable=null; |
21 | private T rv=null; |
22 | |
23 | /** |
24 | * Get a SyncThread that will call the given callable when the given |
25 | * barrier allows it past. |
26 | * |
27 | * @param b the barrier |
28 | * @param c the callable |
29 | */ |
30 | public SyncThread(CyclicBarrier b, Callable<T> c) { |
31 | super("SyncThread"); |
32 | setDaemon(true); |
33 | callable=c; |
34 | barrier=b; |
35 | latch=new CountDownLatch(1); |
36 | start(); |
37 | } |
38 | |
39 | /** |
40 | * Wait for the barrier, invoke the callable and capture the result or an |
41 | * exception. |
42 | */ |
43 | @Override |
44 | public void run() { |
45 | try { |
46 | barrier.await(); |
47 | rv=callable.call(); |
48 | } catch(Throwable t) { |
49 | throwable=t; |
50 | } |
51 | latch.countDown(); |
52 | } |
53 | |
54 | /** |
55 | * Get the result from the invocation. |
56 | * |
57 | * @return the result |
58 | * @throws Throwable if an error occurred when evaluating the callable |
59 | */ |
60 | public T getResult() throws Throwable { |
61 | latch.await(); |
62 | if(throwable != null) { |
63 | throw throwable; |
64 | } |
65 | return rv; |
66 | } |
67 | |
68 | /** |
69 | * Get a collection of SyncThreads that all began as close to the |
70 | * same time as possible and have all completed. |
71 | * @param <T> the result type of the SyncThread |
72 | * @param num the number of concurrent threads to execute |
73 | * @param callable the thing to call |
74 | * @return the completed SyncThreads |
75 | * @throws InterruptedException if we're interrupted during join |
76 | */ |
77 | public static <T> Collection<SyncThread<T>> getCompletedThreads( |
78 | int num, Callable<T> callable) throws InterruptedException { |
79 | Collection<SyncThread<T>> rv=new ArrayList<SyncThread<T>>(num); |
80 | |
81 | CyclicBarrier barrier=new CyclicBarrier(num); |
82 | for(int i=0; i<num; i++) { |
83 | rv.add(new SyncThread<T>(barrier, callable)); |
84 | } |
85 | |
86 | for(SyncThread<T> t : rv) { |
87 | t.join(); |
88 | } |
89 | |
90 | return rv; |
91 | } |
92 | |
93 | /** |
94 | * Get the distinct result count for the given callable at the given |
95 | * concurrency. |
96 | * |
97 | * @param <T> the type of the callable |
98 | * @param num the concurrency |
99 | * @param callable the callable to invoke |
100 | * @return the number of distinct (by identity) results found |
101 | * @throws Throwable if an exception occurred in one of the invocations |
102 | */ |
103 | public static <T> int getDistinctResultCount(int num, Callable<T> callable) |
104 | throws Throwable { |
105 | IdentityHashMap<T, Object> found=new IdentityHashMap<T, Object>(); |
106 | Collection<SyncThread<T>> threads=getCompletedThreads(num, callable); |
107 | for(SyncThread<T> s : threads) { |
108 | found.put(s.getResult(), new Object()); |
109 | } |
110 | return found.size(); |
111 | } |
112 | } |