1 | package net.spy.memcached.internal; |
2 | |
3 | import java.util.concurrent.CountDownLatch; |
4 | import java.util.concurrent.ExecutionException; |
5 | import java.util.concurrent.Future; |
6 | import java.util.concurrent.TimeUnit; |
7 | import java.util.concurrent.TimeoutException; |
8 | import java.util.concurrent.atomic.AtomicReference; |
9 | |
10 | import net.spy.memcached.ops.Operation; |
11 | import net.spy.memcached.ops.OperationState; |
12 | |
13 | /** |
14 | * Managed future for operations. |
15 | * |
16 | * Not intended for general use. |
17 | * |
18 | * @param <T> Type of object returned from this future. |
19 | */ |
20 | public class OperationFuture<T> implements Future<T> { |
21 | |
22 | private final CountDownLatch latch; |
23 | private final AtomicReference<T> objRef; |
24 | private final long timeout; |
25 | private Operation op; |
26 | |
27 | public OperationFuture(CountDownLatch l, long opTimeout) { |
28 | this(l, new AtomicReference<T>(null), opTimeout); |
29 | } |
30 | |
31 | public OperationFuture(CountDownLatch l, AtomicReference<T> oref, |
32 | long opTimeout) { |
33 | super(); |
34 | latch=l; |
35 | objRef=oref; |
36 | timeout = opTimeout; |
37 | } |
38 | |
39 | public boolean cancel(boolean ign) { |
40 | assert op != null : "No operation"; |
41 | op.cancel(); |
42 | // This isn't exactly correct, but it's close enough. If we're in |
43 | // a writing state, we *probably* haven't started. |
44 | return op.getState() == OperationState.WRITING; |
45 | } |
46 | |
47 | public T get() throws InterruptedException, ExecutionException { |
48 | try { |
49 | return get(timeout, TimeUnit.MILLISECONDS); |
50 | } catch (TimeoutException e) { |
51 | throw new RuntimeException( |
52 | "Timed out waiting for operation", e); |
53 | } |
54 | } |
55 | |
56 | public T get(long duration, TimeUnit units) |
57 | throws InterruptedException, TimeoutException, ExecutionException { |
58 | if(!latch.await(duration, units)) { |
59 | throw new CheckedOperationTimeoutException( |
60 | "Timed out waiting for operation", op); |
61 | } |
62 | if(op != null && op.hasErrored()) { |
63 | throw new ExecutionException(op.getException()); |
64 | } |
65 | if(isCancelled()) { |
66 | throw new ExecutionException(new RuntimeException("Cancelled")); |
67 | } |
68 | |
69 | return objRef.get(); |
70 | } |
71 | |
72 | public void set(T o) { |
73 | objRef.set(o); |
74 | } |
75 | |
76 | public void setOperation(Operation to) { |
77 | op=to; |
78 | } |
79 | |
80 | public boolean isCancelled() { |
81 | assert op != null : "No operation"; |
82 | return op.isCancelled(); |
83 | } |
84 | |
85 | public boolean isDone() { |
86 | assert op != null : "No operation"; |
87 | return latch.getCount() == 0 || |
88 | op.isCancelled() || op.getState() == OperationState.COMPLETE; |
89 | } |
90 | |
91 | } |