EMMA Coverage Report (generated Tue Oct 27 11:32:50 PDT 2009)
[all classes][net.spy.memcached]

COVERAGE SUMMARY FOR SOURCE FILE [MemcachedClient.java]

nameclass, %method, %block, %line, %
MemcachedClient.java100% (19/19)98%  (129/132)89%  (2005/2258)88%  (347.5/396)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class MemcachedClient$3100% (1/1)100% (3/3)72%  (31/43)88%  (7/8)
receivedStatus (OperationStatus): void 100% (1/1)56%  (15/27)80%  (4/5)
MemcachedClient$3 (MemcachedClient, OperationFuture, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
     
class MemcachedClient$8$1100% (1/1)100% (4/4)73%  (32/44)88%  (7/8)
receivedStatus (OperationStatus): void 100% (1/1)25%  (4/16)67%  (2/3)
MemcachedClient$8$1 (MemcachedClient$8, SocketAddress, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
gotStat (String, String): void 100% (1/1)100% (12/12)100% (2/2)
     
class MemcachedClient$5100% (1/1)100% (5/5)78%  (68/87)88%  (8.8/10)
gotData (String, int, long, byte []): void 100% (1/1)65%  (31/48)73%  (2.9/4)
<static initializer> 100% (1/1)75%  (6/8)75%  (0.8/1)
MemcachedClient$5 (MemcachedClient, OperationFuture, String, Transcoder, Coun... 100% (1/1)100% (21/21)100% (2/2)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (6/6)100% (2/2)
     
class MemcachedClient100% (1/1)96%  (72/75)87%  (1369/1568)85%  (249.5/294)
access$100 (MemcachedClient): Logger 0%   (0/1)0%   (0/3)0%   (0/1)
delete (String, int): Future 0%   (0/1)0%   (0/4)0%   (0/1)
getTranscoder (): Transcoder 0%   (0/1)0%   (0/3)0%   (0/1)
get (String, Transcoder): Object 100% (1/1)53%  (16/30)43%  (3/7)
getBulk (Collection, Transcoder): Map 100% (1/1)55%  (17/31)43%  (3/7)
gets (String, Transcoder): CASValue 100% (1/1)55%  (17/31)43%  (3/7)
cas (String, long, int, Object, Transcoder): CASResponse 100% (1/1)59%  (20/34)43%  (3/7)
shutdown (long, TimeUnit): boolean 100% (1/1)63%  (78/124)66%  (13.9/21)
mutateWithDefault (Mutator, String, int, long, int): long 100% (1/1)65%  (48/74)58%  (8.7/15)
waitForQueues (long, TimeUnit): boolean 100% (1/1)65%  (13/20)50%  (2/4)
logRunException (Exception): void 100% (1/1)67%  (10/15)75%  (3/4)
checkState (): void 100% (1/1)74%  (14/19)88%  (3.5/4)
<static initializer> 100% (1/1)75%  (6/8)75%  (0.8/1)
getVersions (): Map 100% (1/1)75%  (21/28)71%  (5/7)
getStats (String): Map 100% (1/1)76%  (22/29)71%  (5/7)
run (): void 100% (1/1)84%  (27/32)83%  (10.8/13)
mutate (Mutator, String, int, long, int): long 100% (1/1)90%  (61/68)80%  (8/10)
asyncGetBulk (Collection, Transcoder): Future 100% (1/1)91%  (157/173)97%  (35.9/37)
MemcachedClient (ConnectionFactory, List): void 100% (1/1)100% (100/100)100% (22/22)
MemcachedClient (InetSocketAddress []): void 100% (1/1)100% (8/8)100% (2/2)
MemcachedClient (List): void 100% (1/1)100% (7/7)100% (2/2)
access$000 (MemcachedClient): Logger 100% (1/1)100% (3/3)100% (1/1)
add (String, int, Object): Future 100% (1/1)100% (9/9)100% (1/1)
add (String, int, Object, Transcoder): Future 100% (1/1)100% (8/8)100% (1/1)
addObserver (ConnectionObserver): boolean 100% (1/1)100% (5/5)100% (1/1)
addOp (String, Operation): Operation 100% (1/1)100% (12/12)100% (4/4)
append (long, String, Object): Future 100% (1/1)100% (8/8)100% (1/1)
append (long, String, Object, Transcoder): Future 100% (1/1)100% (8/8)100% (1/1)
asyncCAS (String, long, Object): Future 100% (1/1)100% (8/8)100% (1/1)
asyncCAS (String, long, Object, Transcoder): Future 100% (1/1)100% (8/8)100% (1/1)
asyncCAS (String, long, int, Object, Transcoder): Future 100% (1/1)100% (44/44)100% (7/7)
asyncCat (ConcatenationType, long, String, Object, Transcoder): Future 100% (1/1)100% (41/41)100% (7/7)
asyncDecr (String, int): Future 100% (1/1)100% (8/8)100% (1/1)
asyncGet (String): Future 100% (1/1)100% (6/6)100% (1/1)
asyncGet (String, Transcoder): Future 100% (1/1)100% (35/35)100% (6/6)
asyncGetBulk (Collection): Future 100% (1/1)100% (6/6)100% (1/1)
asyncGetBulk (String []): Future 100% (1/1)100% (7/7)100% (1/1)
asyncGetBulk (Transcoder, String []): Future 100% (1/1)100% (6/6)100% (1/1)
asyncGets (String): Future 100% (1/1)100% (6/6)100% (1/1)
asyncGets (String, Transcoder): Future 100% (1/1)100% (35/35)100% (6/6)
asyncIncr (String, int): Future 100% (1/1)100% (8/8)100% (1/1)
asyncMutate (Mutator, String, int, long, int): Future 100% (1/1)100% (35/35)100% (5/5)
asyncStore (StoreType, String, int, Object): Future 100% (1/1)100% (9/9)100% (1/1)
asyncStore (StoreType, String, int, Object, Transcoder): Future 100% (1/1)100% (43/43)100% (7/7)
broadcastOp (BroadcastOpFactory): CountDownLatch 100% (1/1)100% (5/5)100% (1/1)
broadcastOp (BroadcastOpFactory, boolean): CountDownLatch 100% (1/1)100% (15/15)100% (3/3)
cas (String, long, Object): CASResponse 100% (1/1)100% (8/8)100% (1/1)
cas (String, long, Object, Transcoder): CASResponse 100% (1/1)100% (8/8)100% (1/1)
decr (String, int): long 100% (1/1)100% (8/8)100% (1/1)
decr (String, int, long): long 100% (1/1)100% (8/8)100% (1/1)
decr (String, int, long, int): long 100% (1/1)100% (8/8)100% (1/1)
delete (String): Future 100% (1/1)100% (33/33)100% (6/6)
flush (): Future 100% (1/1)100% (4/4)100% (1/1)
flush (int): Future 100% (1/1)100% (29/29)100% (4/4)
get (String): Object 100% (1/1)100% (6/6)100% (1/1)
getAvailableServers (): Collection 100% (1/1)100% (28/28)100% (5/5)
getBulk (Collection): Map 100% (1/1)100% (6/6)100% (1/1)
getBulk (String []): Map 100% (1/1)100% (7/7)100% (1/1)
getBulk (Transcoder, String []): Map 100% (1/1)100% (6/6)100% (1/1)
getNodeLocator (): NodeLocator 100% (1/1)100% (5/5)100% (1/1)
getStats (): Map 100% (1/1)100% (4/4)100% (1/1)
getUnavailableServers (): Collection 100% (1/1)100% (28/28)100% (5/5)
gets (String): CASValue 100% (1/1)100% (6/6)100% (1/1)
incr (String, int): long 100% (1/1)100% (8/8)100% (1/1)
incr (String, int, long): long 100% (1/1)100% (8/8)100% (1/1)
incr (String, int, long, int): long 100% (1/1)100% (8/8)100% (1/1)
prepend (long, String, Object): Future 100% (1/1)100% (8/8)100% (1/1)
prepend (long, String, Object, Transcoder): Future 100% (1/1)100% (8/8)100% (1/1)
removeObserver (ConnectionObserver): boolean 100% (1/1)100% (5/5)100% (1/1)
replace (String, int, Object): Future 100% (1/1)100% (9/9)100% (1/1)
replace (String, int, Object, Transcoder): Future 100% (1/1)100% (8/8)100% (1/1)
set (String, int, Object): Future 100% (1/1)100% (9/9)100% (1/1)
set (String, int, Object, Transcoder): Future 100% (1/1)100% (8/8)100% (1/1)
shutdown (): void 100% (1/1)100% (6/6)100% (2/2)
validateKey (String): void 100% (1/1)100% (62/62)100% (9/9)
     
class MemcachedClient$4100% (1/1)100% (5/5)90%  (61/68)95%  (8.5/9)
<static initializer> 100% (1/1)75%  (6/8)75%  (0.8/1)
gotData (String, int, byte []): void 100% (1/1)83%  (24/29)86%  (2.6/3)
MemcachedClient$4 (MemcachedClient, GetFuture, String, Transcoder, CountDownL... 100% (1/1)100% (21/21)100% (2/2)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (6/6)100% (2/2)
     
class MemcachedClient$13100% (1/1)100% (4/4)96%  (90/94)98%  (13.7/14)
isDone (): boolean 100% (1/1)91%  (30/33)94%  (3.7/4)
cancel (boolean): boolean 100% (1/1)96%  (27/28)99%  (4.9/5)
MemcachedClient$13 (MemcachedClient, CountDownLatch, AtomicReference, long, C... 100% (1/1)100% (12/12)100% (1/1)
isCancelled (): boolean 100% (1/1)100% (21/21)100% (4/4)
     
class MemcachedClient$1100% (1/1)100% (3/3)100% (23/23)100% (5/5)
MemcachedClient$1 (MemcachedClient, OperationFuture, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (7/7)100% (2/2)
     
class MemcachedClient$10100% (1/1)100% (3/3)100% (30/30)100% (5/5)
MemcachedClient$10 (MemcachedClient, OperationFuture, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (14/14)100% (2/2)
     
class MemcachedClient$11100% (1/1)100% (3/3)100% (23/23)100% (5/5)
MemcachedClient$11 (MemcachedClient, OperationFuture, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (7/7)100% (2/2)
     
class MemcachedClient$12100% (1/1)100% (2/2)100% (34/34)100% (4/4)
MemcachedClient$12 (MemcachedClient, int, AtomicReference, ConcurrentLinkedQu... 100% (1/1)100% (15/15)100% (1/1)
newOp (MemcachedNode, CountDownLatch): Operation 100% (1/1)100% (19/19)100% (3/3)
     
class MemcachedClient$12$1100% (1/1)100% (3/3)100% (21/21)100% (5/5)
MemcachedClient$12$1 (MemcachedClient$12, CountDownLatch): void 100% (1/1)100% (9/9)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (8/8)100% (2/2)
     
class MemcachedClient$14100% (1/1)100% (2/2)100% (16/16)100% (2/2)
MemcachedClient$14 (MemcachedClient): void 100% (1/1)100% (6/6)100% (1/1)
newOp (MemcachedNode, CountDownLatch): Operation 100% (1/1)100% (10/10)100% (1/1)
     
class MemcachedClient$14$1100% (1/1)100% (3/3)100% (14/14)100% (4/4)
MemcachedClient$14$1 (MemcachedClient$14, CountDownLatch): void 100% (1/1)100% (9/9)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (1/1)100% (1/1)
     
class MemcachedClient$2100% (1/1)100% (3/3)100% (23/23)100% (5/5)
MemcachedClient$2 (MemcachedClient, OperationFuture, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (7/7)100% (2/2)
     
class MemcachedClient$6100% (1/1)100% (4/4)100% (54/54)100% (8/8)
MemcachedClient$6 (MemcachedClient, Map, Transcoder, CountDownLatch): void 100% (1/1)100% (15/15)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
gotData (String, int, byte []): void 100% (1/1)100% (20/20)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (15/15)100% (3/3)
     
class MemcachedClient$7100% (1/1)100% (2/2)100% (23/23)100% (3/3)
MemcachedClient$7 (MemcachedClient, Map): void 100% (1/1)100% (9/9)100% (1/1)
newOp (MemcachedNode, CountDownLatch): Operation 100% (1/1)100% (14/14)100% (2/2)
     
class MemcachedClient$7$1100% (1/1)100% (3/3)100% (26/26)100% (5/5)
MemcachedClient$7$1 (MemcachedClient$7, SocketAddress, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (10/10)100% (2/2)
     
class MemcachedClient$8100% (1/1)100% (2/2)100% (36/36)100% (4/4)
MemcachedClient$8 (MemcachedClient, Map, String): void 100% (1/1)100% (12/12)100% (1/1)
newOp (MemcachedNode, CountDownLatch): Operation 100% (1/1)100% (24/24)100% (3/3)
     
class MemcachedClient$9100% (1/1)100% (3/3)100% (31/31)100% (5/5)
MemcachedClient$9 (MemcachedClient, AtomicLong, CountDownLatch): void 100% (1/1)100% (12/12)100% (1/1)
complete (): void 100% (1/1)100% (4/4)100% (2/2)
receivedStatus (OperationStatus): void 100% (1/1)100% (15/15)100% (2/2)

1// Copyright (c) 2006  Dustin Sallings <dustin@spy.net>
2 
3package net.spy.memcached;
4 
5import java.io.IOException;
6import java.net.InetSocketAddress;
7import java.net.SocketAddress;
8import java.nio.channels.CancelledKeyException;
9import java.nio.channels.ClosedSelectorException;
10import java.util.ArrayList;
11import java.util.Arrays;
12import java.util.Collection;
13import java.util.HashMap;
14import java.util.Iterator;
15import java.util.List;
16import java.util.Map;
17import java.util.concurrent.ConcurrentHashMap;
18import java.util.concurrent.ConcurrentLinkedQueue;
19import java.util.concurrent.CountDownLatch;
20import java.util.concurrent.ExecutionException;
21import java.util.concurrent.Future;
22import java.util.concurrent.TimeUnit;
23import java.util.concurrent.TimeoutException;
24import java.util.concurrent.atomic.AtomicLong;
25import java.util.concurrent.atomic.AtomicReference;
26 
27import net.spy.memcached.compat.SpyThread;
28import net.spy.memcached.internal.BulkGetFuture;
29import net.spy.memcached.internal.GetFuture;
30import net.spy.memcached.internal.OperationFuture;
31import net.spy.memcached.ops.CASOperationStatus;
32import net.spy.memcached.ops.CancelledOperationStatus;
33import net.spy.memcached.ops.ConcatenationType;
34import net.spy.memcached.ops.DeleteOperation;
35import net.spy.memcached.ops.GetOperation;
36import net.spy.memcached.ops.GetsOperation;
37import net.spy.memcached.ops.Mutator;
38import net.spy.memcached.ops.Operation;
39import net.spy.memcached.ops.OperationCallback;
40import net.spy.memcached.ops.OperationState;
41import net.spy.memcached.ops.OperationStatus;
42import net.spy.memcached.ops.StatsOperation;
43import net.spy.memcached.ops.StoreType;
44import net.spy.memcached.transcoders.TranscodeService;
45import net.spy.memcached.transcoders.Transcoder;
46 
47/**
48 * Client to a memcached server.
49 *
50 * <h2>Basic usage</h2>
51 *
52 * <pre>
53 *        MemcachedClient c=new MemcachedClient(
54 *                new InetSocketAddress("hostname", portNum));
55 *
56 *        // Store a value (async) for one hour
57 *        c.set("someKey", 3600, someObject);
58 *        // Retrieve a value.
59 *        Object myObject=c.get("someKey");
60 *        </pre>
61 *
62 *        <h2>Advanced Usage</h2>
63 *
64 *        <p>
65 *         MemcachedClient may be processing a great deal of asynchronous messages or
66 *         possibly dealing with an unreachable memcached, which may delay processing.
67 *         If a memcached is disabled, for example, MemcachedConnection will continue
68 *         to attempt to reconnect and replay pending operations until it comes back
69 *         up.  To prevent this from causing your application to hang, you can use
70 *         one of the asynchronous mechanisms to time out a request and cancel the
71 *         operation to the server.
72 *        </p>
73 *
74 *        <pre>
75 *      // Get a memcached client connected to several servers
76 *      // over the binary protocol
77 *      MemcachedClient c = new MemcachedClient(new BinaryConnectionFactory(),
78 *              AddrUtil.getAddresses("server1:11211 server2:11211"));
79 *
80 *      // Try to get a value, for up to 5 seconds, and cancel if it
81 *      // doesn't return
82 *      Object myObj = null;
83 *      Future&lt;Object&gt; f = c.asyncGet("someKey");
84 *      try {
85 *          myObj = f.get(5, TimeUnit.SECONDS);
86 *      // throws expecting InterruptedException, ExecutionException
87 *      // or TimeoutException
88 *      } catch (Exception e) {  /*  /
89 *          // Since we don't need this, go ahead and cancel the operation.
90 *          // This is not strictly necessary, but it'll save some work on
91 *          // the server.  It is okay to cancel it if running.
92 *          f.cancel(true);
93 *          // Do other timeout related stuff
94 *      }
95 * </pre>
96 */
97public class MemcachedClient extends SpyThread implements MemcachedClientIF {
98 
99        private volatile boolean running=true;
100        private volatile boolean shuttingDown=false;
101 
102        private final long operationTimeout;
103 
104        private final MemcachedConnection conn;
105        final OperationFactory opFact;
106 
107        final Transcoder<Object> transcoder;
108 
109        final TranscodeService tcService;
110 
111        /**
112         * Get a memcache client operating on the specified memcached locations.
113         *
114         * @param ia the memcached locations
115         * @throws IOException if connections cannot be established
116         */
117        public MemcachedClient(InetSocketAddress... ia) throws IOException {
118                this(new DefaultConnectionFactory(), Arrays.asList(ia));
119        }
120 
121        /**
122         * Get a memcache client over the specified memcached locations.
123         *
124         * @param addrs the socket addrs
125         * @throws IOException if connections cannot be established
126         */
127        public MemcachedClient(List<InetSocketAddress> addrs)
128                throws IOException {
129                this(new DefaultConnectionFactory(), addrs);
130        }
131 
132        /**
133         * Get a memcache client over the specified memcached locations.
134         *
135         * @param cf the connection factory to configure connections for this client
136         * @param addrs the socket addresses
137         * @throws IOException if connections cannot be established
138         */
139        public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs)
140                throws IOException {
141                if(cf == null) {
142                        throw new NullPointerException("Connection factory required");
143                }
144                if(addrs == null) {
145                        throw new NullPointerException("Server list required");
146                }
147                if(addrs.isEmpty()) {
148                        throw new IllegalArgumentException(
149                                "You must have at least one server to connect to");
150                }
151                if(cf.getOperationTimeout() <= 0) {
152                        throw new IllegalArgumentException(
153                                "Operation timeout must be positive.");
154                }
155                tcService = new TranscodeService();
156                transcoder=cf.getDefaultTranscoder();
157                opFact=cf.getOperationFactory();
158                assert opFact != null : "Connection factory failed to make op factory";
159                conn=cf.createConnection(addrs);
160                assert conn != null : "Connection factory failed to make a connection";
161                operationTimeout = cf.getOperationTimeout();
162                setName("Memcached IO over " + conn);
163                setDaemon(cf.isDaemon());
164                start();
165        }
166 
167        /**
168         * Get the addresses of available servers.
169         *
170         * <p>
171         * This is based on a snapshot in time so shouldn't be considered
172         * completely accurate, but is a useful for getting a feel for what's
173         * working and what's not working.
174         * </p>
175         */
176        public Collection<SocketAddress> getAvailableServers() {
177                Collection<SocketAddress> rv=new ArrayList<SocketAddress>();
178                for(MemcachedNode node : conn.getLocator().getAll()) {
179                        if(node.isActive()) {
180                                rv.add(node.getSocketAddress());
181                        }
182                }
183                return rv;
184        }
185 
186        /**
187         * Get the addresses of unavailable servers.
188         *
189         * <p>
190         * This is based on a snapshot in time so shouldn't be considered
191         * completely accurate, but is a useful for getting a feel for what's
192         * working and what's not working.
193         * </p>
194         */
195        public Collection<SocketAddress> getUnavailableServers() {
196                Collection<SocketAddress> rv=new ArrayList<SocketAddress>();
197                for(MemcachedNode node : conn.getLocator().getAll()) {
198                        if(!node.isActive()) {
199                                rv.add(node.getSocketAddress());
200                        }
201                }
202                return rv;
203        }
204 
205        /**
206         * Get a read-only wrapper around the node locator wrapping this instance.
207         */
208        public NodeLocator getNodeLocator() {
209                return conn.getLocator().getReadonlyCopy();
210        }
211 
212        /**
213         * Get the default transcoder that's in use.
214         */
215        public Transcoder<Object> getTranscoder() {
216                return transcoder;
217        }
218 
219        private void validateKey(String key) {
220                byte[] keyBytes=KeyUtil.getKeyBytes(key);
221                if(keyBytes.length > MAX_KEY_LENGTH) {
222                        throw new IllegalArgumentException("Key is too long (maxlen = "
223                                        + MAX_KEY_LENGTH + ")");
224                }
225                if(keyBytes.length == 0) {
226                        throw new IllegalArgumentException(
227                                "Key must contain at least one character.");
228                }
229                // Validate the key
230                for(byte b : keyBytes) {
231                        if(b == ' ' || b == '\n' || b == '\r' || b == 0) {
232                                throw new IllegalArgumentException(
233                                        "Key contains invalid characters:  ``" + key + "''");
234                        }
235                }
236        }
237 
238        private void checkState() {
239                if(shuttingDown) {
240                        throw new IllegalStateException("Shutting down");
241                }
242                assert isAlive() : "IO Thread is not running.";
243        }
244 
245        /**
246         * (internal use) Add a raw operation to a numbered connection.
247         * This method is exposed for testing.
248         *
249         * @param which server number
250         * @param op the operation to perform
251         * @return the Operation
252         */
253        Operation addOp(final String key, final Operation op) {
254                validateKey(key);
255                checkState();
256                conn.addOperation(key, op);
257                return op;
258        }
259 
260        CountDownLatch broadcastOp(final BroadcastOpFactory of) {
261                return broadcastOp(of, true);
262        }
263 
264        private CountDownLatch broadcastOp(BroadcastOpFactory of,
265                        boolean checkShuttingDown) {
266                if(checkShuttingDown && shuttingDown) {
267                        throw new IllegalStateException("Shutting down");
268                }
269                return conn.broadcastOperation(of);
270        }
271 
272        private <T> Future<Boolean> asyncStore(StoreType storeType, String key,
273                                                   int exp, T value, Transcoder<T> tc) {
274                CachedData co=tc.encode(value);
275                final CountDownLatch latch=new CountDownLatch(1);
276                final OperationFuture<Boolean> rv=new OperationFuture<Boolean>(latch,
277                                operationTimeout);
278                Operation op=opFact.store(storeType, key, co.getFlags(),
279                                exp, co.getData(), new OperationCallback() {
280                                        public void receivedStatus(OperationStatus val) {
281                                                rv.set(val.isSuccess());
282                                        }
283                                        public void complete() {
284                                                latch.countDown();
285                                        }});
286                rv.setOperation(op);
287                addOp(key, op);
288                return rv;
289        }
290 
291        private Future<Boolean> asyncStore(StoreType storeType,
292                        String key, int exp, Object value) {
293                return asyncStore(storeType, key, exp, value, transcoder);
294        }
295 
296        private <T> Future<Boolean> asyncCat(
297                        ConcatenationType catType, long cas, String key,
298                        T value, Transcoder<T> tc) {
299                CachedData co=tc.encode(value);
300                final CountDownLatch latch=new CountDownLatch(1);
301                final OperationFuture<Boolean> rv=new OperationFuture<Boolean>(latch,
302                                operationTimeout);
303                Operation op=opFact.cat(catType, cas, key, co.getData(),
304                                new OperationCallback() {
305                        public void receivedStatus(OperationStatus val) {
306                                rv.set(val.isSuccess());
307                        }
308                        public void complete() {
309                                latch.countDown();
310                        }});
311                rv.setOperation(op);
312                addOp(key, op);
313                return rv;
314        }
315 
316        /**
317         * Append to an existing value in the cache.
318         *
319         * @param cas cas identifier (ignored in the ascii protocol)
320         * @param key the key to whose value will be appended
321         * @param val the value to append
322         * @return a future indicating success
323         * @throws IllegalStateException in the rare circumstance where queue
324         *         is too full to accept any more requests
325         */
326        public Future<Boolean> append(long cas, String key, Object val) {
327                return append(cas, key, val, transcoder);
328        }
329 
330        /**
331         * Append to an existing value in the cache.
332         *
333         * @param cas cas identifier (ignored in the ascii protocol)
334         * @param key the key to whose value will be appended
335         * @param val the value to append
336         * @param tc the transcoder to serialize and unserialize the value
337         * @return a future indicating success
338         * @throws IllegalStateException in the rare circumstance where queue
339         *         is too full to accept any more requests
340         */
341        public <T> Future<Boolean> append(long cas, String key, T val,
342                        Transcoder<T> tc) {
343                return asyncCat(ConcatenationType.append, cas, key, val, tc);
344        }
345 
346        /**
347         * Prepend to an existing value in the cache.
348         *
349         * @param cas cas identifier (ignored in the ascii protocol)
350         * @param key the key to whose value will be prepended
351         * @param val the value to append
352         * @return a future indicating success
353         * @throws IllegalStateException in the rare circumstance where queue
354         *         is too full to accept any more requests
355         */
356        public Future<Boolean> prepend(long cas, String key, Object val) {
357                return prepend(cas, key, val, transcoder);
358        }
359 
360        /**
361         * Prepend to an existing value in the cache.
362         *
363         * @param cas cas identifier (ignored in the ascii protocol)
364         * @param key the key to whose value will be prepended
365         * @param val the value to append
366         * @param tc the transcoder to serialize and unserialize the value
367         * @return a future indicating success
368         * @throws IllegalStateException in the rare circumstance where queue
369         *         is too full to accept any more requests
370         */
371        public <T> Future<Boolean> prepend(long cas, String key, T val,
372                        Transcoder<T> tc) {
373                return asyncCat(ConcatenationType.prepend, cas, key, val, tc);
374        }
375 
376        /**
377     * Asynchronous CAS operation.
378     *
379     * @param key the key
380     * @param casId the CAS identifier (from a gets operation)
381     * @param value the new value
382     * @param tc the transcoder to serialize and unserialize the value
383     * @return a future that will indicate the status of the CAS
384     * @throws IllegalStateException in the rare circumstance where queue
385     *         is too full to accept any more requests
386     */
387    public <T> Future<CASResponse> asyncCAS(String key, long casId, T value,
388            Transcoder<T> tc) {
389        return asyncCAS(key, casId, 0, value, tc);
390        }
391 
392        /**
393         * Asynchronous CAS operation.
394         *
395         * @param key the key
396         * @param casId the CAS identifier (from a gets operation)
397         * @param exp the expiration of this object
398         * @param value the new value
399         * @param tc the transcoder to serialize and unserialize the value
400         * @return a future that will indicate the status of the CAS
401         * @throws IllegalStateException in the rare circumstance where queue
402         *         is too full to accept any more requests
403         */
404        public <T> Future<CASResponse> asyncCAS(String key, long casId, int exp, T value,
405                        Transcoder<T> tc) {
406                CachedData co=tc.encode(value);
407                final CountDownLatch latch=new CountDownLatch(1);
408                final OperationFuture<CASResponse> rv=new OperationFuture<CASResponse>(
409                                latch, operationTimeout);
410                Operation op=opFact.cas(StoreType.set, key, casId, co.getFlags(), exp,
411                                co.getData(), new OperationCallback() {
412                                        public void receivedStatus(OperationStatus val) {
413                                                if(val instanceof CASOperationStatus) {
414                                                        rv.set(((CASOperationStatus)val).getCASResponse());
415                                                } else if(val instanceof CancelledOperationStatus) {
416                                                        // Cancelled, ignore and let it float up
417                                                } else {
418                                                        throw new RuntimeException(
419                                                                "Unhandled state: " + val);
420                                                }
421                                        }
422                                        public void complete() {
423                                                latch.countDown();
424                                        }});
425                rv.setOperation(op);
426                addOp(key, op);
427                return rv;
428        }
429 
430        /**
431         * Asynchronous CAS operation using the default transcoder.
432         *
433         * @param key the key
434         * @param casId the CAS identifier (from a gets operation)
435         * @param value the new value
436         * @return a future that will indicate the status of the CAS
437         * @throws IllegalStateException in the rare circumstance where queue
438         *         is too full to accept any more requests
439         */
440        public Future<CASResponse> asyncCAS(String key, long casId, Object value) {
441                return asyncCAS(key, casId, value, transcoder);
442        }
443 
444        /**
445     * Perform a synchronous CAS operation.
446     *
447     * @param key the key
448     * @param casId the CAS identifier (from a gets operation)
449     * @param value the new value
450     * @param tc the transcoder to serialize and unserialize the value
451     * @return a CASResponse
452     * @throws OperationTimeoutException if global operation timeout is
453     *         exceeded
454     * @throws IllegalStateException in the rare circumstance where queue
455     *         is too full to accept any more requests
456     */
457    public <T> CASResponse cas(String key, long casId, T value,
458            Transcoder<T> tc) {
459        return cas(key, casId, 0, value, tc);
460    }
461 
462        /**
463         * Perform a synchronous CAS operation.
464         *
465         * @param key the key
466         * @param casId the CAS identifier (from a gets operation)
467         * @param exp the expiration of this object
468         * @param value the new value
469         * @param tc the transcoder to serialize and unserialize the value
470         * @return a CASResponse
471         * @throws OperationTimeoutException if global operation timeout is
472         *         exceeded
473         * @throws IllegalStateException in the rare circumstance where queue
474         *         is too full to accept any more requests
475         */
476        public <T> CASResponse cas(String key, long casId, int exp, T value,
477                        Transcoder<T> tc) {
478                try {
479                        return asyncCAS(key, casId, exp, value, tc).get(operationTimeout,
480                                        TimeUnit.MILLISECONDS);
481                } catch (InterruptedException e) {
482                        throw new RuntimeException("Interrupted waiting for value", e);
483                } catch (ExecutionException e) {
484                        throw new RuntimeException("Exception waiting for value", e);
485                } catch (TimeoutException e) {
486                        throw new OperationTimeoutException("Timeout waiting for value", e);
487                }
488        }
489 
490        /**
491         * Perform a synchronous CAS operation with the default transcoder.
492         *
493         * @param key the key
494         * @param casId the CAS identifier (from a gets operation)
495         * @param value the new value
496         * @return a CASResponse
497         * @throws OperationTimeoutException if the global operation timeout is
498         *                   exceeded
499         * @throws IllegalStateException in the rare circumstance where queue
500         *         is too full to accept any more requests
501         */
502        public CASResponse cas(String key, long casId, Object value) {
503                return cas(key, casId, value, transcoder);
504        }
505 
506        /**
507         * Add an object to the cache iff it does not exist already.
508         *
509         * <p>
510         * The <code>exp</code> value is passed along to memcached exactly as
511         * given, and will be processed per the memcached protocol specification:
512         * </p>
513         *
514         * <blockquote>
515         * <p>
516         * The actual value sent may either be
517         * Unix time (number of seconds since January 1, 1970, as a 32-bit
518         * value), or a number of seconds starting from current time. In the
519         * latter case, this number of seconds may not exceed 60*60*24*30 (number
520         * of seconds in 30 days); if the number sent by a client is larger than
521         * that, the server will consider it to be real Unix time value rather
522         * than an offset from current time.
523         * </p>
524         * </blockquote>
525         *
526         * @param key the key under which this object should be added.
527         * @param exp the expiration of this object
528         * @param o the object to store
529         * @param tc the transcoder to serialize and unserialize the value
530         * @return a future representing the processing of this operation
531         * @throws IllegalStateException in the rare circumstance where queue
532         *         is too full to accept any more requests
533         */
534        public <T> Future<Boolean> add(String key, int exp, T o, Transcoder<T> tc) {
535                return asyncStore(StoreType.add, key, exp, o, tc);
536        }
537 
538        /**
539         * Add an object to the cache (using the default transcoder)
540         * iff it does not exist already.
541         *
542         * <p>
543         * The <code>exp</code> value is passed along to memcached exactly as
544         * given, and will be processed per the memcached protocol specification:
545         * </p>
546         *
547         * <blockquote>
548         * <p>
549         * The actual value sent may either be
550         * Unix time (number of seconds since January 1, 1970, as a 32-bit
551         * value), or a number of seconds starting from current time. In the
552         * latter case, this number of seconds may not exceed 60*60*24*30 (number
553         * of seconds in 30 days); if the number sent by a client is larger than
554         * that, the server will consider it to be real Unix time value rather
555         * than an offset from current time.
556         * </p>
557         * </blockquote>
558         *
559         * @param key the key under which this object should be added.
560         * @param exp the expiration of this object
561         * @param o the object to store
562         * @return a future representing the processing of this operation
563         * @throws IllegalStateException in the rare circumstance where queue
564         *         is too full to accept any more requests
565         */
566        public Future<Boolean> add(String key, int exp, Object o) {
567                return asyncStore(StoreType.add, key, exp, o, transcoder);
568        }
569 
570        /**
571         * Set an object in the cache regardless of any existing value.
572         *
573         * <p>
574         * The <code>exp</code> value is passed along to memcached exactly as
575         * given, and will be processed per the memcached protocol specification:
576         * </p>
577         *
578         * <blockquote>
579         * <p>
580         * The actual value sent may either be
581         * Unix time (number of seconds since January 1, 1970, as a 32-bit
582         * value), or a number of seconds starting from current time. In the
583         * latter case, this number of seconds may not exceed 60*60*24*30 (number
584         * of seconds in 30 days); if the number sent by a client is larger than
585         * that, the server will consider it to be real Unix time value rather
586         * than an offset from current time.
587         * </p>
588         * </blockquote>
589         *
590         * @param key the key under which this object should be added.
591         * @param exp the expiration of this object
592         * @param o the object to store
593         * @param tc the transcoder to serialize and unserialize the value
594         * @return a future representing the processing of this operation
595         * @throws IllegalStateException in the rare circumstance where queue
596         *         is too full to accept any more requests
597         */
598        public <T> Future<Boolean> set(String key, int exp, T o, Transcoder<T> tc) {
599                return asyncStore(StoreType.set, key, exp, o, tc);
600        }
601 
602        /**
603         * Set an object in the cache (using the default transcoder)
604         * regardless of any existing value.
605         *
606         * <p>
607         * The <code>exp</code> value is passed along to memcached exactly as
608         * given, and will be processed per the memcached protocol specification:
609         * </p>
610         *
611         * <blockquote>
612         * <p>
613         * The actual value sent may either be
614         * Unix time (number of seconds since January 1, 1970, as a 32-bit
615         * value), or a number of seconds starting from current time. In the
616         * latter case, this number of seconds may not exceed 60*60*24*30 (number
617         * of seconds in 30 days); if the number sent by a client is larger than
618         * that, the server will consider it to be real Unix time value rather
619         * than an offset from current time.
620         * </p>
621         * </blockquote>
622         *
623         * @param key the key under which this object should be added.
624         * @param exp the expiration of this object
625         * @param o the object to store
626         * @return a future representing the processing of this operation
627         * @throws IllegalStateException in the rare circumstance where queue
628         *         is too full to accept any more requests
629         */
630        public Future<Boolean> set(String key, int exp, Object o) {
631                return asyncStore(StoreType.set, key, exp, o, transcoder);
632        }
633 
634        /**
635         * Replace an object with the given value iff there is already a value
636         * for the given key.
637         *
638         * <p>
639         * The <code>exp</code> value is passed along to memcached exactly as
640         * given, and will be processed per the memcached protocol specification:
641         * </p>
642         *
643         * <blockquote>
644         * <p>
645         * The actual value sent may either be
646         * Unix time (number of seconds since January 1, 1970, as a 32-bit
647         * value), or a number of seconds starting from current time. In the
648         * latter case, this number of seconds may not exceed 60*60*24*30 (number
649         * of seconds in 30 days); if the number sent by a client is larger than
650         * that, the server will consider it to be real Unix time value rather
651         * than an offset from current time.
652         * </p>
653         * </blockquote>
654         *
655         * @param key the key under which this object should be added.
656         * @param exp the expiration of this object
657         * @param o the object to store
658         * @param tc the transcoder to serialize and unserialize the value
659         * @return a future representing the processing of this operation
660         * @throws IllegalStateException in the rare circumstance where queue
661         *         is too full to accept any more requests
662         */
663        public <T> Future<Boolean> replace(String key, int exp, T o,
664                Transcoder<T> tc) {
665                return asyncStore(StoreType.replace, key, exp, o, tc);
666        }
667 
668        /**
669         * Replace an object with the given value (transcoded with the default
670         * transcoder) iff there is already a value for the given key.
671         *
672         * <p>
673         * The <code>exp</code> value is passed along to memcached exactly as
674         * given, and will be processed per the memcached protocol specification:
675         * </p>
676         *
677         * <blockquote>
678         * <p>
679         * The actual value sent may either be
680         * Unix time (number of seconds since January 1, 1970, as a 32-bit
681         * value), or a number of seconds starting from current time. In the
682         * latter case, this number of seconds may not exceed 60*60*24*30 (number
683         * of seconds in 30 days); if the number sent by a client is larger than
684         * that, the server will consider it to be real Unix time value rather
685         * than an offset from current time.
686         * </p>
687         * </blockquote>
688         *
689         * @param key the key under which this object should be added.
690         * @param exp the expiration of this object
691         * @param o the object to store
692         * @return a future representing the processing of this operation
693         * @throws IllegalStateException in the rare circumstance where queue
694         *         is too full to accept any more requests
695         */
696        public Future<Boolean> replace(String key, int exp, Object o) {
697                return asyncStore(StoreType.replace, key, exp, o, transcoder);
698        }
699 
700        /**
701         * Get the given key asynchronously.
702         *
703         * @param key the key to fetch
704         * @param tc the transcoder to serialize and unserialize value
705         * @return a future that will hold the return value of the fetch
706         * @throws IllegalStateException in the rare circumstance where queue
707         *         is too full to accept any more requests
708         */
709        public <T> Future<T> asyncGet(final String key, final Transcoder<T> tc) {
710 
711                final CountDownLatch latch=new CountDownLatch(1);
712                final GetFuture<T> rv=new GetFuture<T>(latch, operationTimeout);
713 
714                Operation op=opFact.get(key,
715                                new GetOperation.Callback() {
716                        private Future<T> val=null;
717                        public void receivedStatus(OperationStatus status) {
718                                rv.set(val);
719                        }
720                        public void gotData(String k, int flags, byte[] data) {
721                                assert key.equals(k) : "Wrong key returned";
722                                val=tcService.decode(tc,
723                                        new CachedData(flags, data, tc.getMaxSize()));
724                        }
725                        public void complete() {
726                                latch.countDown();
727                        }});
728                rv.setOperation(op);
729                addOp(key, op);
730                return rv;
731        }
732 
733        /**
734         * Get the given key asynchronously and decode with the default
735         * transcoder.
736         *
737         * @param key the key to fetch
738         * @return a future that will hold the return value of the fetch
739         * @throws IllegalStateException in the rare circumstance where queue
740         *         is too full to accept any more requests
741         */
742        public Future<Object> asyncGet(final String key) {
743                return asyncGet(key, transcoder);
744        }
745 
746        /**
747         * Gets (with CAS support) the given key asynchronously.
748         *
749         * @param key the key to fetch
750         * @param tc the transcoder to serialize and unserialize value
751         * @return a future that will hold the return value of the fetch
752         * @throws IllegalStateException in the rare circumstance where queue
753         *         is too full to accept any more requests
754         */
755        public <T> Future<CASValue<T>> asyncGets(final String key,
756                        final Transcoder<T> tc) {
757 
758                final CountDownLatch latch=new CountDownLatch(1);
759                final OperationFuture<CASValue<T>> rv=
760                        new OperationFuture<CASValue<T>>(latch, operationTimeout);
761 
762                Operation op=opFact.gets(key,
763                                new GetsOperation.Callback() {
764                        private CASValue<T> val=null;
765                        public void receivedStatus(OperationStatus status) {
766                                rv.set(val);
767                        }
768                        public void gotData(String k, int flags, long cas, byte[] data) {
769                                assert key.equals(k) : "Wrong key returned";
770                                assert cas > 0 : "CAS was less than zero:  " + cas;
771                                val=new CASValue<T>(cas, tc.decode(
772                                        new CachedData(flags, data, tc.getMaxSize())));
773                        }
774                        public void complete() {
775                                latch.countDown();
776                        }});
777                rv.setOperation(op);
778                addOp(key, op);
779                return rv;
780        }
781 
782        /**
783         * Gets (with CAS support) the given key asynchronously and decode using
784         * the default transcoder.
785         *
786         * @param key the key to fetch
787         * @return a future that will hold the return value of the fetch
788         * @throws IllegalStateException in the rare circumstance where queue
789         *         is too full to accept any more requests
790         */
791        public Future<CASValue<Object>> asyncGets(final String key) {
792                return asyncGets(key, transcoder);
793        }
794 
795        /**
796         * Gets (with CAS support) with a single key.
797         *
798         * @param key the key to get
799         * @param tc the transcoder to serialize and unserialize value
800         * @return the result from the cache and CAS id (null if there is none)
801         * @throws OperationTimeoutException if global operation timeout is
802         *                    exceeded
803         * @throws IllegalStateException in the rare circumstance where queue
804         *         is too full to accept any more requests
805         */
806        public <T> CASValue<T> gets(String key, Transcoder<T> tc) {
807                try {
808                        return asyncGets(key, tc).get(
809                                operationTimeout, TimeUnit.MILLISECONDS);
810                } catch (InterruptedException e) {
811                        throw new RuntimeException("Interrupted waiting for value", e);
812                } catch (ExecutionException e) {
813                        throw new RuntimeException("Exception waiting for value", e);
814                } catch (TimeoutException e) {
815                        throw new OperationTimeoutException("Timeout waiting for value", e);
816                }
817        }
818 
819        /**
820         * Gets (with CAS support) with a single key using the default transcoder.
821         *
822         * @param key the key to get
823         * @return the result from the cache and CAS id (null if there is none)
824         * @throws OperationTimeoutException if the global operation timeout is
825         *                   exceeded
826         * @throws IllegalStateException in the rare circumstance where queue
827         *         is too full to accept any more requests
828         */
829        public CASValue<Object> gets(String key) {
830                return gets(key, transcoder);
831        }
832 
833        /**
834         * Get with a single key.
835         *
836         * @param key the key to get
837         * @param tc the transcoder to serialize and unserialize value
838         * @return the result from the cache (null if there is none)
839         * @throws OperationTimeoutException if the global operation timeout is
840         *                   exceeded
841         * @throws IllegalStateException in the rare circumstance where queue
842         *         is too full to accept any more requests
843         */
844        public <T> T get(String key, Transcoder<T> tc) {
845                try {
846                        return asyncGet(key, tc).get(
847                                operationTimeout, TimeUnit.MILLISECONDS);
848                } catch (InterruptedException e) {
849                        throw new RuntimeException("Interrupted waiting for value", e);
850                } catch (ExecutionException e) {
851                        throw new RuntimeException("Exception waiting for value", e);
852                } catch (TimeoutException e) {
853                        throw new OperationTimeoutException("Timeout waiting for value", e);
854                }
855        }
856 
857        /**
858         * Get with a single key and decode using the default transcoder.
859         *
860         * @param key the key to get
861         * @return the result from the cache (null if there is none)
862         * @throws OperationTimeoutException if the global operation timeout is
863         *                   exceeded
864         * @throws IllegalStateException in the rare circumstance where queue
865         *         is too full to accept any more requests
866         */
867        public Object get(String key) {
868                return get(key, transcoder);
869        }
870 
871        /**
872         * Asynchronously get a bunch of objects from the cache.
873         *
874         * @param keys the keys to request
875         * @param tc the transcoder to serialize and unserialize value
876         * @return a Future result of that fetch
877         * @throws IllegalStateException in the rare circumstance where queue
878         *         is too full to accept any more requests
879         */
880        public <T> Future<Map<String, T>> asyncGetBulk(Collection<String> keys,
881                final Transcoder<T> tc) {
882                final Map<String, Future<T>> m=new ConcurrentHashMap<String, Future<T>>();
883                // Break the gets down into groups by key
884                final Map<MemcachedNode, Collection<String>> chunks
885                        =new HashMap<MemcachedNode, Collection<String>>();
886                final NodeLocator locator=conn.getLocator();
887                for(String key : keys) {
888                        validateKey(key);
889                        final MemcachedNode primaryNode=locator.getPrimary(key);
890                        MemcachedNode node=null;
891                        if(primaryNode.isActive()) {
892                                node=primaryNode;
893                        } else {
894                                for(Iterator<MemcachedNode> i=locator.getSequence(key);
895                                        node == null && i.hasNext();) {
896                                        MemcachedNode n=i.next();
897                                        if(n.isActive()) {
898                                                node=n;
899                                        }
900                                }
901                                if(node == null) {
902                                        node=primaryNode;
903                                }
904                        }
905                        assert node != null : "Didn't find a node for " + key;
906                        Collection<String> ks=chunks.get(node);
907                        if(ks == null) {
908                                ks=new ArrayList<String>();
909                                chunks.put(node, ks);
910                        }
911                        ks.add(key);
912                }
913 
914                final CountDownLatch latch=new CountDownLatch(chunks.size());
915                final Collection<Operation> ops=new ArrayList<Operation>();
916 
917                GetOperation.Callback cb=new GetOperation.Callback() {
918                                @SuppressWarnings("synthetic-access")
919                                public void receivedStatus(OperationStatus status) {
920                                        if(!status.isSuccess()) {
921                                                getLogger().warn("Unsuccessful get:  %s", status);
922                                        }
923                                }
924                                public void gotData(String k, int flags, byte[] data) {
925                                        m.put(k, tcService.decode(tc,
926                                                        new CachedData(flags, data, tc.getMaxSize())));
927                                }
928                                public void complete() {
929                                        latch.countDown();
930                                }
931                };
932 
933                // Now that we know how many servers it breaks down into, and the latch
934                // is all set up, convert all of these strings collections to operations
935                final Map<MemcachedNode, Operation> mops=
936                        new HashMap<MemcachedNode, Operation>();
937 
938                for(Map.Entry<MemcachedNode, Collection<String>> me
939                                : chunks.entrySet()) {
940                        Operation op=opFact.get(me.getValue(), cb);
941                        mops.put(me.getKey(), op);
942                        ops.add(op);
943                }
944                assert mops.size() == chunks.size();
945                checkState();
946                conn.addOperations(mops);
947                return new BulkGetFuture<T>(m, ops, latch);
948        }
949 
950        /**
951         * Asynchronously get a bunch of objects from the cache and decode them
952         * with the given transcoder.
953         *
954         * @param keys the keys to request
955         * @return a Future result of that fetch
956         * @throws IllegalStateException in the rare circumstance where queue
957         *         is too full to accept any more requests
958         */
959        public Future<Map<String, Object>> asyncGetBulk(Collection<String> keys) {
960                return asyncGetBulk(keys, transcoder);
961        }
962 
963        /**
964         * Varargs wrapper for asynchronous bulk gets.
965         *
966         * @param tc the transcoder to serialize and unserialize value
967         * @param keys one more more keys to get
968         * @return the future values of those keys
969         * @throws IllegalStateException in the rare circumstance where queue
970         *         is too full to accept any more requests
971         */
972        public <T> Future<Map<String, T>> asyncGetBulk(Transcoder<T> tc,
973                String... keys) {
974                return asyncGetBulk(Arrays.asList(keys), tc);
975        }
976 
977        /**
978         * Varargs wrapper for asynchronous bulk gets with the default transcoder.
979         *
980         * @param keys one more more keys to get
981         * @return the future values of those keys
982         * @throws IllegalStateException in the rare circumstance where queue
983         *         is too full to accept any more requests
984         */
985        public Future<Map<String, Object>> asyncGetBulk(String... keys) {
986                return asyncGetBulk(Arrays.asList(keys), transcoder);
987        }
988 
989        /**
990         * Get the values for multiple keys from the cache.
991         *
992         * @param keys the keys
993         * @param tc the transcoder to serialize and unserialize value
994         * @return a map of the values (for each value that exists)
995         * @throws OperationTimeoutException if the global operation timeout is
996         *                   exceeded
997         * @throws IllegalStateException in the rare circumstance where queue
998         *         is too full to accept any more requests
999         */
1000        public <T> Map<String, T> getBulk(Collection<String> keys,
1001                        Transcoder<T> tc) {
1002                try {
1003                        return asyncGetBulk(keys, tc).get(
1004                                operationTimeout, TimeUnit.MILLISECONDS);
1005                } catch (InterruptedException e) {
1006                        throw new RuntimeException("Interrupted getting bulk values", e);
1007                } catch (ExecutionException e) {
1008                        throw new RuntimeException("Failed getting bulk values", e);
1009                } catch (TimeoutException e) {
1010                        throw new OperationTimeoutException(
1011                                "Timeout waiting for bulkvalues", e);
1012                }
1013        }
1014 
1015        /**
1016         * Get the values for multiple keys from the cache.
1017         *
1018         * @param keys the keys
1019         * @return a map of the values (for each value that exists)
1020         * @throws OperationTimeoutException if the global operation timeout is
1021         *                   exceeded
1022         * @throws IllegalStateException in the rare circumstance where queue
1023         *         is too full to accept any more requests
1024         */
1025        public Map<String, Object> getBulk(Collection<String> keys) {
1026                return getBulk(keys, transcoder);
1027        }
1028 
1029        /**
1030         * Get the values for multiple keys from the cache.
1031         *
1032         * @param tc the transcoder to serialize and unserialize value
1033         * @param keys the keys
1034         * @return a map of the values (for each value that exists)
1035         * @throws OperationTimeoutException if the global operation timeout is
1036         *                   exceeded
1037         * @throws IllegalStateException in the rare circumstance where queue
1038         *         is too full to accept any more requests
1039         */
1040        public <T> Map<String, T> getBulk(Transcoder<T> tc, String... keys) {
1041                return getBulk(Arrays.asList(keys), tc);
1042        }
1043 
1044        /**
1045         * Get the values for multiple keys from the cache.
1046         *
1047         * @param keys the keys
1048         * @return a map of the values (for each value that exists)
1049         * @throws OperationTimeoutException if the global operation timeout is
1050         *                   exceeded
1051         * @throws IllegalStateException in the rare circumstance where queue
1052         *         is too full to accept any more requests
1053         */
1054        public Map<String, Object> getBulk(String... keys) {
1055                return getBulk(Arrays.asList(keys), transcoder);
1056        }
1057 
1058        /**
1059         * Get the versions of all of the connected memcacheds.
1060         * @throws IllegalStateException in the rare circumstance where queue
1061         *         is too full to accept any more requests
1062         */
1063        public Map<SocketAddress, String> getVersions() {
1064                final Map<SocketAddress, String>rv=
1065                        new ConcurrentHashMap<SocketAddress, String>();
1066 
1067                CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){
1068                        public Operation newOp(final MemcachedNode n,
1069                                        final CountDownLatch latch) {
1070                                final SocketAddress sa=n.getSocketAddress();
1071                                return opFact.version(
1072                                                new OperationCallback() {
1073                                                        public void receivedStatus(OperationStatus s) {
1074                                                                rv.put(sa, s.getMessage());
1075                                                        }
1076                                                        public void complete() {
1077                                                                latch.countDown();
1078                                                        }
1079                                                });
1080                        }});
1081                try {
1082                        blatch.await(operationTimeout, TimeUnit.MILLISECONDS);
1083                } catch (InterruptedException e) {
1084                        throw new RuntimeException("Interrupted waiting for versions", e);
1085                }
1086                return rv;
1087        }
1088 
1089        /**
1090         * Get all of the stats from all of the connections.
1091         * @throws IllegalStateException in the rare circumstance where queue
1092         *         is too full to accept any more requests
1093         */
1094        public Map<SocketAddress, Map<String, String>> getStats() {
1095                return getStats(null);
1096        }
1097 
1098        /**
1099         * Get a set of stats from all connections.
1100         *
1101         * @param arg which stats to get
1102         * @return a Map of the server SocketAddress to a map of String stat
1103         *                   keys to String stat values.
1104         * @throws IllegalStateException in the rare circumstance where queue
1105         *         is too full to accept any more requests
1106         */
1107        public Map<SocketAddress, Map<String, String>> getStats(final String arg) {
1108                final Map<SocketAddress, Map<String, String>> rv
1109                        =new HashMap<SocketAddress, Map<String, String>>();
1110 
1111                CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){
1112                        public Operation newOp(final MemcachedNode n,
1113                                final CountDownLatch latch) {
1114                                final SocketAddress sa=n.getSocketAddress();
1115                                rv.put(sa, new HashMap<String, String>());
1116                                return opFact.stats(arg,
1117                                                new StatsOperation.Callback() {
1118                                        public void gotStat(String name, String val) {
1119                                                rv.get(sa).put(name, val);
1120                                        }
1121                                        @SuppressWarnings("synthetic-access") // getLogger()
1122                                        public void receivedStatus(OperationStatus status) {
1123                                                if(!status.isSuccess()) {
1124                                                        getLogger().warn("Unsuccessful stat fetch:        %s",
1125                                                                        status);
1126                                                }
1127                                        }
1128                                        public void complete() {
1129                                                latch.countDown();
1130                                        }});
1131                        }});
1132                try {
1133                        blatch.await(operationTimeout, TimeUnit.MILLISECONDS);
1134                } catch (InterruptedException e) {
1135                        throw new RuntimeException("Interrupted waiting for stats", e);
1136                }
1137                return rv;
1138        }
1139 
1140        private long mutate(Mutator m, String key, int by, long def, int exp) {
1141                final AtomicLong rv=new AtomicLong();
1142                final CountDownLatch latch=new CountDownLatch(1);
1143                addOp(key, opFact.mutate(m, key, by, def, exp, new OperationCallback() {
1144                                        public void receivedStatus(OperationStatus s) {
1145                                                // XXX:  Potential abstraction leak.
1146                                                // The handling of incr/decr in the binary protocol
1147                                                // Allows us to avoid string processing.
1148                                                rv.set(new Long(s.isSuccess()?s.getMessage():"-1"));
1149                                        }
1150                                        public void complete() {
1151                                                latch.countDown();
1152                                        }}));
1153                try {
1154                        if (!latch.await(operationTimeout, TimeUnit.MILLISECONDS)) {
1155                                throw new OperationTimeoutException(
1156                                        "Mutate operation timed out, unable to modify counter ["
1157                                                + key + "]");
1158                        }
1159                } catch (InterruptedException e) {
1160                        throw new RuntimeException("Interrupted", e);
1161                }
1162                getLogger().debug("Mutation returned %s", rv);
1163                return rv.get();
1164        }
1165 
1166        /**
1167         * Increment the given key by the given amount.
1168         *
1169         * @param key the key
1170         * @param by the amount to increment
1171         * @return the new value (-1 if the key doesn't exist)
1172         * @throws OperationTimeoutException if the global operation timeout is
1173         *                   exceeded
1174         * @throws IllegalStateException in the rare circumstance where queue
1175         *         is too full to accept any more requests
1176         */
1177        public long incr(String key, int by) {
1178                return mutate(Mutator.incr, key, by, 0, -1);
1179        }
1180 
1181        /**
1182         * Decrement the given key by the given value.
1183         *
1184         * @param key the key
1185         * @param by the value
1186         * @return the new value (-1 if the key doesn't exist)
1187         * @throws OperationTimeoutException if the global operation timeout is
1188         *                   exceeded
1189         * @throws IllegalStateException in the rare circumstance where queue
1190         *         is too full to accept any more requests
1191         */
1192        public long decr(String key, int by) {
1193                return mutate(Mutator.decr, key, by, 0, -1);
1194        }
1195 
1196        /**
1197         * Increment the given counter, returning the new value.
1198         *
1199         * @param key the key
1200         * @param by the amount to increment
1201         * @param def the default value (if the counter does not exist)
1202         * @param exp the expiration of this object
1203         * @return the new value, or -1 if we were unable to increment or add
1204         * @throws OperationTimeoutException if the global operation timeout is
1205         *                   exceeded
1206         * @throws IllegalStateException in the rare circumstance where queue
1207         *         is too full to accept any more requests
1208         */
1209        public long incr(String key, int by, long def, int exp) {
1210                return mutateWithDefault(Mutator.incr, key, by, def, exp);
1211        }
1212 
1213        /**
1214         * Decrement the given counter, returning the new value.
1215         *
1216         * @param key the key
1217         * @param by the amount to decrement
1218         * @param def the default value (if the counter does not exist)
1219         * @param exp the expiration of this object
1220         * @return the new value, or -1 if we were unable to decrement or add
1221         * @throws OperationTimeoutException if the global operation timeout is
1222         *                   exceeded
1223         * @throws IllegalStateException in the rare circumstance where queue
1224         *         is too full to accept any more requests
1225         */
1226        public long decr(String key, int by, long def, int exp) {
1227                return mutateWithDefault(Mutator.decr, key, by, def, exp);
1228        }
1229 
1230 
1231        private long mutateWithDefault(Mutator t, String key,
1232                        int by, long def, int exp) {
1233                long rv=mutate(t, key, by, def, exp);
1234                // The ascii protocol doesn't support defaults, so I added them
1235                // manually here.
1236                if(rv == -1) {
1237                        Future<Boolean> f=asyncStore(StoreType.add,
1238                                        key, exp, String.valueOf(def));
1239                        try {
1240                                if(f.get(operationTimeout, TimeUnit.MILLISECONDS)) {
1241                                        rv=def;
1242                                } else {
1243                                        rv=mutate(t, key, by, 0, exp);
1244                                        assert rv != -1 : "Failed to mutate or init value";
1245                                }
1246                        } catch (InterruptedException e) {
1247                                throw new RuntimeException("Interrupted waiting for store", e);
1248                        } catch (ExecutionException e) {
1249                                throw new RuntimeException("Failed waiting for store", e);
1250                        } catch (TimeoutException e) {
1251                                throw new OperationTimeoutException(
1252                                        "Timeout waiting to mutate or init value", e);
1253                        }
1254                }
1255                return rv;
1256        }
1257 
1258        private Future<Long> asyncMutate(Mutator m, String key, int by, long def,
1259                        int exp) {
1260                final CountDownLatch latch = new CountDownLatch(1);
1261                final OperationFuture<Long> rv = new OperationFuture<Long>(
1262                                latch, operationTimeout);
1263                Operation op = addOp(key, opFact.mutate(m, key, by, def, exp,
1264                                new OperationCallback() {
1265                        public void receivedStatus(OperationStatus s) {
1266                                rv.set(new Long(s.isSuccess() ? s.getMessage() : "-1"));
1267                        }
1268                        public void complete() {
1269                                latch.countDown();
1270                        }
1271                }));
1272                rv.setOperation(op);
1273                return rv;
1274        }
1275 
1276        /**
1277         * Asychronous increment.
1278         *
1279         * @return a future with the incremented value, or -1 if the
1280         *                   increment failed.
1281         * @throws IllegalStateException in the rare circumstance where queue
1282         *         is too full to accept any more requests
1283         */
1284        public Future<Long> asyncIncr(String key, int by) {
1285                return asyncMutate(Mutator.incr, key, by, 0, -1);
1286        }
1287 
1288        /**
1289         * Asynchronous decrement.
1290         *
1291         * @return a future with the decremented value, or -1 if the
1292         *                   increment failed.
1293         * @throws IllegalStateException in the rare circumstance where queue
1294         *         is too full to accept any more requests
1295         */
1296        public Future<Long> asyncDecr(String key, int by) {
1297                return asyncMutate(Mutator.decr, key, by, 0, -1);
1298        }
1299 
1300        /**
1301         * Increment the given counter, returning the new value.
1302         *
1303         * @param key the key
1304         * @param by the amount to increment
1305         * @param def the default value (if the counter does not exist)
1306         * @return the new value, or -1 if we were unable to increment or add
1307         * @throws OperationTimeoutException if the global operation timeout is
1308         *                   exceeded
1309         * @throws IllegalStateException in the rare circumstance where queue
1310         *         is too full to accept any more requests
1311         */
1312        public long incr(String key, int by, long def) {
1313                return mutateWithDefault(Mutator.incr, key, by, def, 0);
1314        }
1315 
1316        /**
1317         * Decrement the given counter, returning the new value.
1318         *
1319         * @param key the key
1320         * @param by the amount to decrement
1321         * @param def the default value (if the counter does not exist)
1322         * @return the new value, or -1 if we were unable to decrement or add
1323         * @throws OperationTimeoutException if the global operation timeout is
1324         *                   exceeded
1325         * @throws IllegalStateException in the rare circumstance where queue
1326         *         is too full to accept any more requests
1327         */
1328        public long decr(String key, int by, long def) {
1329                return mutateWithDefault(Mutator.decr, key, by, def, 0);
1330        }
1331 
1332        /**
1333         * Delete the given key from the cache.
1334         *
1335         * <p>
1336         * The hold argument specifies the amount of time in seconds (or Unix time
1337         * until which) the client wishes the server to refuse "add" and "replace"
1338         * commands with this key. For this amount of item, the item is put into a
1339         * delete queue, which means that it won't possible to retrieve it by the
1340         * "get" command, but "add" and "replace" command with this key will also
1341         * fail (the "set" command will succeed, however). After the time passes,
1342         * the item is finally deleted from server memory.
1343         * </p>
1344         *
1345         * @param key the key to delete
1346         * @param hold how long the key should be unavailable to add commands
1347         *
1348         * @deprecated Hold values are no longer honored.
1349         */
1350        @Deprecated
1351        public Future<Boolean> delete(String key, int hold) {
1352                return delete(key);
1353        }
1354 
1355        /**
1356         * Delete the given key from the cache.
1357         *
1358         * @param key the key to delete
1359         * @throws IllegalStateException in the rare circumstance where queue
1360         *         is too full to accept any more requests
1361         */
1362        public Future<Boolean> delete(String key) {
1363                final CountDownLatch latch=new CountDownLatch(1);
1364                final OperationFuture<Boolean> rv=new OperationFuture<Boolean>(latch,
1365                        operationTimeout);
1366                DeleteOperation op=opFact.delete(key,
1367                                new OperationCallback() {
1368                                        public void receivedStatus(OperationStatus s) {
1369                                                rv.set(s.isSuccess());
1370                                        }
1371                                        public void complete() {
1372                                                latch.countDown();
1373                                        }});
1374                rv.setOperation(op);
1375                addOp(key, op);
1376                return rv;
1377        }
1378 
1379        /**
1380         * Flush all caches from all servers with a delay of application.
1381         * @throws IllegalStateException in the rare circumstance where queue
1382         *         is too full to accept any more requests
1383         */
1384        public Future<Boolean> flush(final int delay) {
1385                final AtomicReference<Boolean> flushResult=
1386                        new AtomicReference<Boolean>(null);
1387                final ConcurrentLinkedQueue<Operation> ops=
1388                        new ConcurrentLinkedQueue<Operation>();
1389                CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){
1390                        public Operation newOp(final MemcachedNode n,
1391                                        final CountDownLatch latch) {
1392                                Operation op=opFact.flush(delay, new OperationCallback(){
1393                                        public void receivedStatus(OperationStatus s) {
1394                                                flushResult.set(s.isSuccess());
1395                                        }
1396                                        public void complete() {
1397                                                latch.countDown();
1398                                        }});
1399                                ops.add(op);
1400                                return op;
1401                        }});
1402                return new OperationFuture<Boolean>(blatch, flushResult,
1403                                operationTimeout) {
1404                        @Override
1405                        public boolean cancel(boolean ign) {
1406                                boolean rv=false;
1407                                for(Operation op : ops) {
1408                                        op.cancel();
1409                                        rv |= op.getState() == OperationState.WRITING;
1410                                }
1411                                return rv;
1412                        }
1413                        @Override
1414                        public boolean isCancelled() {
1415                                boolean rv=false;
1416                                for(Operation op : ops) {
1417                                        rv |= op.isCancelled();
1418                                }
1419                                return rv;
1420                        }
1421                        @Override
1422                        public boolean isDone() {
1423                                boolean rv=true;
1424                                for(Operation op : ops) {
1425                                        rv &= op.getState() == OperationState.COMPLETE;
1426                                }
1427                                return rv || isCancelled();
1428                        }
1429                };
1430        }
1431 
1432        /**
1433         * Flush all caches from all servers immediately.
1434         * @throws IllegalStateException in the rare circumstance where queue
1435         *         is too full to accept any more requests
1436         */
1437        public Future<Boolean> flush() {
1438                return flush(-1);
1439        }
1440 
1441        private void logRunException(Exception e) {
1442                if(shuttingDown) {
1443                        // There are a couple types of errors that occur during the
1444                        // shutdown sequence that are considered OK.  Log at debug.
1445                        getLogger().debug("Exception occurred during shutdown", e);
1446                } else {
1447                        getLogger().warn("Problem handling memcached IO", e);
1448                }
1449        }
1450 
1451        /**
1452         * Infinitely loop processing IO.
1453         */
1454        @Override
1455        public void run() {
1456                while(running) {
1457                        try {
1458                                conn.handleIO();
1459                        } catch(IOException e) {
1460                                logRunException(e);
1461                        } catch(CancelledKeyException e) {
1462                                logRunException(e);
1463                        } catch(ClosedSelectorException e) {
1464                                logRunException(e);
1465                        } catch(IllegalStateException e) {
1466                                logRunException(e);
1467                        }
1468                }
1469                getLogger().info("Shut down memcached client");
1470        }
1471 
1472        /**
1473         * Shut down immediately.
1474         */
1475        public void shutdown() {
1476                shutdown(-1, TimeUnit.MILLISECONDS);
1477        }
1478 
1479        /**
1480         * Shut down this client gracefully.
1481         */
1482        public boolean shutdown(long timeout, TimeUnit unit) {
1483                // Guard against double shutdowns (bug 8).
1484                if(shuttingDown) {
1485                        getLogger().info("Suppressing duplicate attempt to shut down");
1486                        return false;
1487                }
1488                shuttingDown=true;
1489                String baseName=getName();
1490                setName(baseName + " - SHUTTING DOWN");
1491                boolean rv=false;
1492                try {
1493                        // Conditionally wait
1494                        if(timeout > 0) {
1495                                setName(baseName + " - SHUTTING DOWN (waiting)");
1496                                rv=waitForQueues(timeout, unit);
1497                        }
1498                } finally {
1499                        // But always begin the shutdown sequence
1500                        try {
1501                                setName(baseName + " - SHUTTING DOWN (telling client)");
1502                                running=false;
1503                                conn.shutdown();
1504                                setName(baseName + " - SHUTTING DOWN (informed client)");
1505                                tcService.shutdown();
1506                        } catch (IOException e) {
1507                                getLogger().warn("exception while shutting down", e);
1508                        }
1509                }
1510                return rv;
1511        }
1512 
1513        /**
1514         * Wait for the queues to die down.
1515         *
1516         * @throws IllegalStateException in the rare circumstance where queue
1517         *         is too full to accept any more requests
1518         */
1519        public boolean waitForQueues(long timeout, TimeUnit unit) {
1520                CountDownLatch blatch = broadcastOp(new BroadcastOpFactory(){
1521                        public Operation newOp(final MemcachedNode n,
1522                                        final CountDownLatch latch) {
1523                                return opFact.noop(
1524                                                new OperationCallback() {
1525                                                        public void complete() {
1526                                                                latch.countDown();
1527                                                        }
1528                                                        public void receivedStatus(OperationStatus s) {
1529                                                                // Nothing special when receiving status, only
1530                                                                // necessary to complete the interface
1531                                                        }
1532                                                });
1533                        }}, false);
1534                try {
1535                        // XXX:  Perhaps IllegalStateException should be caught here
1536                        // and the check retried.
1537                        return blatch.await(timeout, unit);
1538                } catch (InterruptedException e) {
1539                        throw new RuntimeException("Interrupted waiting for queues", e);
1540                }
1541        }
1542 
1543        /**
1544         * Add a connection observer.
1545         *
1546         * @return true if the observer was added.
1547         */
1548        public boolean addObserver(ConnectionObserver obs) {
1549                return conn.addObserver(obs);
1550        }
1551 
1552        /**
1553         * Remove a connection observer.
1554         *
1555         * @return true if the observer existed, but no longer does
1556         */
1557        public boolean removeObserver(ConnectionObserver obs) {
1558                return conn.removeObserver(obs);
1559        }
1560}

[all classes][net.spy.memcached]
EMMA 2.0.5312 (C) Vladimir Roubtsov