1 | package net.spy.memcached.transcoders; |
2 | |
3 | import java.util.concurrent.ArrayBlockingQueue; |
4 | import java.util.concurrent.Callable; |
5 | import java.util.concurrent.ExecutionException; |
6 | import java.util.concurrent.Future; |
7 | import java.util.concurrent.FutureTask; |
8 | import java.util.concurrent.ThreadPoolExecutor; |
9 | import java.util.concurrent.TimeUnit; |
10 | import java.util.concurrent.TimeoutException; |
11 | import java.util.concurrent.atomic.AtomicBoolean; |
12 | |
13 | import net.spy.memcached.CachedData; |
14 | import net.spy.memcached.compat.SpyObject; |
15 | |
16 | /** |
17 | * Asynchronous transcoder. |
18 | */ |
19 | public class TranscodeService extends SpyObject { |
20 | |
21 | private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L, |
22 | TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), |
23 | new ThreadPoolExecutor.DiscardPolicy()); |
24 | |
25 | /** |
26 | * Perform a decode. |
27 | */ |
28 | public <T> Future<T> decode(final Transcoder<T> tc, |
29 | final CachedData cachedData) { |
30 | |
31 | assert !pool.isShutdown() : "Pool has already shut down."; |
32 | |
33 | TranscodeService.Task<T> task = new TranscodeService.Task<T>( |
34 | new Callable<T>() { |
35 | public T call() { |
36 | return tc.decode(cachedData); |
37 | } |
38 | }); |
39 | |
40 | if (tc.asyncDecode(cachedData)) { |
41 | this.pool.execute(task); |
42 | } |
43 | return task; |
44 | } |
45 | |
46 | /** |
47 | * Shut down the pool. |
48 | */ |
49 | public void shutdown() { |
50 | pool.shutdown(); |
51 | } |
52 | |
53 | /** |
54 | * Ask whether this service has been shut down. |
55 | */ |
56 | public boolean isShutdown() { |
57 | return pool.isShutdown(); |
58 | } |
59 | |
60 | private static class Task<T> extends FutureTask<T> { |
61 | private final AtomicBoolean isRunning = new AtomicBoolean(false); |
62 | |
63 | public Task(Callable<T> callable) { |
64 | super(callable); |
65 | } |
66 | |
67 | @Override |
68 | public T get() throws InterruptedException, ExecutionException { |
69 | this.run(); |
70 | return super.get(); |
71 | } |
72 | |
73 | @Override |
74 | public T get(long timeout, TimeUnit unit) throws InterruptedException, |
75 | ExecutionException, TimeoutException { |
76 | this.run(); |
77 | return super.get(timeout, unit); |
78 | } |
79 | |
80 | @Override |
81 | public void run() { |
82 | if (this.isRunning.compareAndSet(false, true)) { |
83 | super.run(); |
84 | } |
85 | } |
86 | } |
87 | |
88 | } |