EMMA Coverage Report (generated Tue Oct 27 11:32:50 PDT 2009)
[all classes][net.spy.memcached]

COVERAGE SUMMARY FOR SOURCE FILE [MemcachedConnection.java]

nameclass, %method, %block, %line, %
MemcachedConnection.java100% (1/1)100% (24/24)77%  (1287/1682)84%  (278.7/330)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class MemcachedConnection100% (1/1)100% (24/24)77%  (1287/1682)84%  (278.7/330)
redistributeOperations (Collection): void 100% (1/1)11%  (7/63)12%  (1.5/12)
cancelOperations (Collection): void 100% (1/1)50%  (7/14)49%  (1.5/3)
lostConnection (MemcachedNode): void 100% (1/1)55%  (11/20)63%  (2.5/4)
attemptReconnects (): void 100% (1/1)62%  (86/138)73%  (21.2/29)
handleIO (): void 100% (1/1)64%  (131/204)77%  (23.8/31)
selectorsMakeSense (): boolean 100% (1/1)69%  (72/104)92%  (16.6/18)
addOperation (String, Operation): void 100% (1/1)72%  (68/94)93%  (17.6/19)
<static initializer> 100% (1/1)75%  (6/8)75%  (0.8/1)
handleReads (SelectionKey, MemcachedNode): void 100% (1/1)75%  (80/106)89%  (19.6/22)
queueReconnect (MemcachedNode): void 100% (1/1)79%  (99/125)82%  (19.6/24)
handleIO (SelectionKey): void 100% (1/1)83%  (134/161)90%  (26/29)
MemcachedConnection (int, ConnectionFactory, List, Collection, FailureMode, O... 100% (1/1)85%  (132/156)87%  (29.7/34)
connected (MemcachedNode): void 100% (1/1)85%  (29/34)92%  (5.5/6)
addOperation (MemcachedNode, Operation): void 100% (1/1)88%  (38/43)98%  (7.8/8)
addOperations (Map): void 100% (1/1)90%  (44/49)97%  (10.7/11)
handleInputQueue (): void 100% (1/1)90%  (89/99)86%  (24/28)
broadcastOperation (BroadcastOpFactory): CountDownLatch 100% (1/1)91%  (51/56)97%  (10.7/11)
shutdown (): void 100% (1/1)94%  (76/81)98%  (12.7/13)
addObserver (ConnectionObserver): boolean 100% (1/1)100% (5/5)100% (1/1)
dbgBuffer (ByteBuffer, int): String 100% (1/1)100% (46/46)100% (9/9)
getLocator (): NodeLocator 100% (1/1)100% (3/3)100% (1/1)
handleWrites (SelectionKey, MemcachedNode): void 100% (1/1)100% (31/31)100% (8/8)
removeObserver (ConnectionObserver): boolean 100% (1/1)100% (5/5)100% (1/1)
toString (): String 100% (1/1)100% (37/37)100% (7/7)

1// Copyright (c) 2006  Dustin Sallings <dustin@spy.net>
2 
3package net.spy.memcached;
4 
5import java.io.IOException;
6import java.net.ConnectException;
7import java.net.InetSocketAddress;
8import java.net.SocketAddress;
9import java.net.SocketException;
10import java.nio.ByteBuffer;
11import java.nio.channels.ClosedChannelException;
12import java.nio.channels.SelectionKey;
13import java.nio.channels.Selector;
14import java.nio.channels.SocketChannel;
15import java.util.ArrayList;
16import java.util.Collection;
17import java.util.HashSet;
18import java.util.IdentityHashMap;
19import java.util.Iterator;
20import java.util.List;
21import java.util.Map;
22import java.util.NoSuchElementException;
23import java.util.Set;
24import java.util.SortedMap;
25import java.util.TreeMap;
26import java.util.concurrent.ConcurrentLinkedQueue;
27import java.util.concurrent.CountDownLatch;
28 
29import net.spy.memcached.compat.SpyObject;
30import net.spy.memcached.ops.KeyedOperation;
31import net.spy.memcached.ops.Operation;
32import net.spy.memcached.ops.OperationState;
33 
34/**
35 * Connection to a cluster of memcached servers.
36 */
37public final class MemcachedConnection extends SpyObject {
38 
39        // The number of empty selects we'll allow before assuming we may have
40        // missed one and should check the current selectors.  This generally
41        // indicates a bug, but we'll check it nonetheless.
42        private static final int DOUBLE_CHECK_EMPTY = 256;
43        // The number of empty selects we'll allow before blowing up.  It's too
44        // easy to write a bug that causes it to loop uncontrollably.  This helps
45        // find those bugs and often works around them.
46        private static final int EXCESSIVE_EMPTY = 0x1000000;
47 
48        private volatile boolean shutDown=false;
49        // If true, optimization will collapse multiple sequential get ops
50        private final boolean shouldOptimize;
51        private Selector selector=null;
52        private final NodeLocator locator;
53        private final FailureMode failureMode;
54        // maximum amount of time to wait between reconnect attempts
55        private final long maxDelay;
56        private int emptySelects=0;
57        // AddedQueue is used to track the QueueAttachments for which operations
58        // have recently been queued.
59        private final ConcurrentLinkedQueue<MemcachedNode> addedQueue;
60        // reconnectQueue contains the attachments that need to be reconnected
61        // The key is the time at which they are eligible for reconnect
62        private final SortedMap<Long, MemcachedNode> reconnectQueue;
63 
64        private final Collection<ConnectionObserver> connObservers =
65                new ConcurrentLinkedQueue<ConnectionObserver>();
66        private final OperationFactory opFact;
67 
68        /**
69         * Construct a memcached connection.
70         *
71         * @param bufSize the size of the buffer used for reading from the server
72         * @param f the factory that will provide an operation queue
73         * @param a the addresses of the servers to connect to
74         *
75         * @throws IOException if a connection attempt fails early
76         */
77        public MemcachedConnection(int bufSize, ConnectionFactory f,
78                        List<InetSocketAddress> a, Collection<ConnectionObserver> obs,
79                        FailureMode fm, OperationFactory opfactory)
80                throws IOException {
81                connObservers.addAll(obs);
82                reconnectQueue=new TreeMap<Long, MemcachedNode>();
83                addedQueue=new ConcurrentLinkedQueue<MemcachedNode>();
84                failureMode = fm;
85                shouldOptimize = f.shouldOptimize();
86                maxDelay = f.getMaxReconnectDelay();
87                opFact = opfactory;
88                selector=Selector.open();
89                List<MemcachedNode> connections=new ArrayList<MemcachedNode>(a.size());
90                for(SocketAddress sa : a) {
91                        SocketChannel ch=SocketChannel.open();
92                        ch.configureBlocking(false);
93                        MemcachedNode qa=f.createMemcachedNode(sa, ch, bufSize);
94                        int ops=0;
95                        ch.socket().setTcpNoDelay(!f.useNagleAlgorithm());
96                        // Initially I had attempted to skirt this by queueing every
97                        // connect, but it considerably slowed down start time.
98                        try {
99                                if(ch.connect(sa)) {
100                                        getLogger().info("Connected to %s immediately", qa);
101                                        connected(qa);
102                                } else {
103                                        getLogger().info("Added %s to connect queue", qa);
104                                        ops=SelectionKey.OP_CONNECT;
105                                }
106                                qa.setSk(ch.register(selector, ops, qa));
107                                assert ch.isConnected()
108                                        || qa.getSk().interestOps() == SelectionKey.OP_CONNECT
109                                        : "Not connected, and not wanting to connect";
110                        } catch(SocketException e) {
111                                queueReconnect(qa);
112                        }
113                        connections.add(qa);
114                }
115                locator=f.createLocator(connections);
116        }
117 
118        private boolean selectorsMakeSense() {
119                for(MemcachedNode qa : locator.getAll()) {
120                        if(qa.getSk() != null && qa.getSk().isValid()) {
121                                if(qa.getChannel().isConnected()) {
122                                        int sops=qa.getSk().interestOps();
123                                        int expected=0;
124                                        if(qa.hasReadOp()) {
125                                                expected |= SelectionKey.OP_READ;
126                                        }
127                                        if(qa.hasWriteOp()) {
128                                                expected |= SelectionKey.OP_WRITE;
129                                        }
130                                        if(qa.getBytesRemainingToWrite() > 0) {
131                                                expected |= SelectionKey.OP_WRITE;
132                                        }
133                                        assert sops == expected : "Invalid ops:  "
134                                                + qa + ", expected " + expected + ", got " + sops;
135                                } else {
136                                        int sops=qa.getSk().interestOps();
137                                        assert sops == SelectionKey.OP_CONNECT
138                                        : "Not connected, and not watching for connect: "
139                                                + sops;
140                                }
141                        }
142                }
143                getLogger().debug("Checked the selectors.");
144                return true;
145        }
146 
147        /**
148         * MemcachedClient calls this method to handle IO over the connections.
149         */
150        public void handleIO() throws IOException {
151                if(shutDown) {
152                        throw new IOException("No IO while shut down");
153                }
154 
155                // Deal with all of the stuff that's been added, but may not be marked
156                // writable.
157                handleInputQueue();
158                getLogger().debug("Done dealing with queue.");
159 
160                long delay=0;
161                if(!reconnectQueue.isEmpty()) {
162                        long now=System.currentTimeMillis();
163                        long then=reconnectQueue.firstKey();
164                        delay=Math.max(then-now, 1);
165                }
166                getLogger().debug("Selecting with delay of %sms", delay);
167                assert selectorsMakeSense() : "Selectors don't make sense.";
168                int selected=selector.select(delay);
169                Set<SelectionKey> selectedKeys=selector.selectedKeys();
170 
171                if(selectedKeys.isEmpty() && !shutDown) {
172                        getLogger().debug("No selectors ready, interrupted: "
173                                        + Thread.interrupted());
174                        if(++emptySelects > DOUBLE_CHECK_EMPTY) {
175                                for(SelectionKey sk : selector.keys()) {
176                                        getLogger().info("%s has %s, interested in %s",
177                                                        sk, sk.readyOps(), sk.interestOps());
178                                        if(sk.readyOps() != 0) {
179                                                getLogger().info("%s has a ready op, handling IO", sk);
180                                                handleIO(sk);
181                                        } else {
182                                                lostConnection((MemcachedNode)sk.attachment());
183                                        }
184                                }
185                                assert emptySelects < EXCESSIVE_EMPTY
186                                        : "Too many empty selects";
187                        }
188                } else {
189                        getLogger().debug("Selected %d, selected %d keys",
190                                        selected, selectedKeys.size());
191                        emptySelects=0;
192                        for(SelectionKey sk : selectedKeys) {
193                                handleIO(sk);
194                        } // for each selector
195                        selectedKeys.clear();
196                }
197 
198                if(!shutDown && !reconnectQueue.isEmpty()) {
199                        attemptReconnects();
200                }
201        }
202 
203        // Handle any requests that have been made against the client.
204        private void handleInputQueue() {
205                if(!addedQueue.isEmpty()) {
206                        getLogger().debug("Handling queue");
207                        // If there's stuff in the added queue.  Try to process it.
208                        Collection<MemcachedNode> toAdd=new HashSet<MemcachedNode>();
209                        // Transfer the queue into a hashset.  There are very likely more
210                        // additions than there are nodes.
211                        Collection<MemcachedNode> todo=new HashSet<MemcachedNode>();
212                        try {
213                                MemcachedNode qa=null;
214                                while((qa=addedQueue.remove()) != null) {
215                                        todo.add(qa);
216                                }
217                        } catch(NoSuchElementException e) {
218                                // Found everything
219                        }
220 
221                        // Now process the queue.
222                        for(MemcachedNode qa : todo) {
223                                boolean readyForIO=false;
224                                if(qa.isActive()) {
225                                        if(qa.getCurrentWriteOp() != null) {
226                                                readyForIO=true;
227                                                getLogger().debug("Handling queued write %s", qa);
228                                        }
229                                } else {
230                                        toAdd.add(qa);
231                                }
232                                qa.copyInputQueue();
233                                if(readyForIO) {
234                                        try {
235                                                if(qa.getWbuf().hasRemaining()) {
236                                                        handleWrites(qa.getSk(), qa);
237                                                }
238                                        } catch(IOException e) {
239                                                getLogger().warn("Exception handling write", e);
240                                                lostConnection(qa);
241                                        }
242                                }
243                                qa.fixupOps();
244                        }
245                        addedQueue.addAll(toAdd);
246                }
247        }
248 
249        /**
250         * Add a connection observer.
251         *
252         * @return whether the observer was successfully added
253         */
254        public boolean addObserver(ConnectionObserver obs) {
255                return connObservers.add(obs);
256        }
257 
258        /**
259         * Remove a connection observer.
260         *
261         * @return true if the observer existed and now doesn't
262         */
263        public boolean removeObserver(ConnectionObserver obs) {
264                return connObservers.remove(obs);
265        }
266 
267        private void connected(MemcachedNode qa) {
268                assert qa.getChannel().isConnected() : "Not connected.";
269                int rt = qa.getReconnectCount();
270                qa.connected();
271                for(ConnectionObserver observer : connObservers) {
272                        observer.connectionEstablished(qa.getSocketAddress(), rt);
273                }
274        }
275 
276        private void lostConnection(MemcachedNode qa) {
277                queueReconnect(qa);
278                for(ConnectionObserver observer : connObservers) {
279                        observer.connectionLost(qa.getSocketAddress());
280                }
281        }
282 
283        // Handle IO for a specific selector.  Any IOException will cause a
284        // reconnect
285        private void handleIO(SelectionKey sk) {
286                MemcachedNode qa=(MemcachedNode)sk.attachment();
287                try {
288                        getLogger().debug(
289                                        "Handling IO for:  %s (r=%s, w=%s, c=%s, op=%s)",
290                                        sk, sk.isReadable(), sk.isWritable(),
291                                        sk.isConnectable(), sk.attachment());
292                        if(sk.isConnectable()) {
293                                getLogger().info("Connection state changed for %s", sk);
294                                final SocketChannel channel=qa.getChannel();
295                                if(channel.finishConnect()) {
296                                        connected(qa);
297                                        addedQueue.offer(qa);
298                                        if(qa.getWbuf().hasRemaining()) {
299                                                handleWrites(sk, qa);
300                                        }
301                                } else {
302                                        assert !channel.isConnected() : "connected";
303                                }
304                        } else {
305                                if(sk.isReadable()) {
306                                        handleReads(sk, qa);
307                                }
308                                if(sk.isWritable()) {
309                                        handleWrites(sk, qa);
310                                }
311                        }
312                } catch(ClosedChannelException e) {
313                        if(!shutDown) {
314                                getLogger().info("Closed channel and not shutting down.  "
315                                        + "Queueing reconnect on %s", qa, e);
316                                lostConnection(qa);
317                        }
318                } catch(ConnectException e) {
319                        // Failures to establish a connection should attempt a reconnect
320                        // without signaling the observers.
321                        getLogger().info("Reconnecting due to failure to connect to %s",
322                                        qa, e);
323                        queueReconnect(qa);
324                } catch(Exception e) {
325                        // Various errors occur on Linux that wind up here.  However, any
326                        // particular error processing an item should simply cause us to
327                        // reconnect to the server.
328                        getLogger().info("Reconnecting due to exception on %s", qa, e);
329                        lostConnection(qa);
330                }
331                qa.fixupOps();
332        }
333 
334        private void handleWrites(SelectionKey sk, MemcachedNode qa)
335                throws IOException {
336                qa.fillWriteBuffer(shouldOptimize);
337                boolean canWriteMore=qa.getBytesRemainingToWrite() > 0;
338                while(canWriteMore) {
339                        int wrote=qa.writeSome();
340                        qa.fillWriteBuffer(shouldOptimize);
341                        canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0;
342                }
343        }
344 
345        private void handleReads(SelectionKey sk, MemcachedNode qa)
346                throws IOException {
347                Operation currentOp = qa.getCurrentReadOp();
348                ByteBuffer rbuf=qa.getRbuf();
349                final SocketChannel channel = qa.getChannel();
350                int read=channel.read(rbuf);
351                if (read < 0) {
352                    // GRUMBLE.
353                    throw new IOException("Disconnected");
354                }
355                while(read > 0) {
356                        getLogger().debug("Read %d bytes", read);
357                        rbuf.flip();
358                        while(rbuf.remaining() > 0) {
359                                if(currentOp == null) {
360                                        throw new IllegalStateException("No read operation.");
361                                }
362                                currentOp.readFromBuffer(rbuf);
363                                if(currentOp.getState() == OperationState.COMPLETE) {
364                                        getLogger().debug(
365                                                        "Completed read op: %s and giving the next %d bytes",
366                                                        currentOp, rbuf.remaining());
367                                        Operation op=qa.removeCurrentReadOp();
368                                        assert op == currentOp
369                                        : "Expected to pop " + currentOp + " got " + op;
370                                        currentOp=qa.getCurrentReadOp();
371                                }
372                        }
373                        rbuf.clear();
374                        read=channel.read(rbuf);
375                }
376        }
377 
378        // Make a debug string out of the given buffer's values
379        static String dbgBuffer(ByteBuffer b, int size) {
380                StringBuilder sb=new StringBuilder();
381                byte[] bytes=b.array();
382                for(int i=0; i<size; i++) {
383                        char ch=(char)bytes[i];
384                        if(Character.isWhitespace(ch) || Character.isLetterOrDigit(ch)) {
385                                sb.append(ch);
386                        } else {
387                                sb.append("\\x");
388                                sb.append(Integer.toHexString(bytes[i] & 0xff));
389                        }
390                }
391                return sb.toString();
392        }
393 
394        private void queueReconnect(MemcachedNode qa) {
395                if(!shutDown) {
396                        getLogger().warn("Closing, and reopening %s, attempt %d.",
397                                        qa, qa.getReconnectCount());
398                        if(qa.getSk() != null) {
399                                qa.getSk().cancel();
400                                assert !qa.getSk().isValid() : "Cancelled selection key is valid";
401                        }
402                        qa.reconnecting();
403                        try {
404                                if(qa.getChannel() != null && qa.getChannel().socket() != null) {
405                                        qa.getChannel().socket().close();
406                                } else {
407                                        getLogger().info("The channel or socket was null for %s",
408                                                qa);
409                                }
410                        } catch(IOException e) {
411                                getLogger().warn("IOException trying to close a socket", e);
412                        }
413                        qa.setChannel(null);
414 
415                        long delay = (long)Math.min(maxDelay,
416                                        Math.pow(2, qa.getReconnectCount())) * 1000;
417                        long reconTime = System.currentTimeMillis() + delay;
418 
419                        // Avoid potential condition where two connections are scheduled
420                        // for reconnect at the exact same time.  This is expected to be
421                        // a rare situation.
422                        while(reconnectQueue.containsKey(reconTime)) {
423                                reconTime++;
424                        }
425 
426                        reconnectQueue.put(reconTime, qa);
427 
428                        // Need to do a little queue management.
429                        qa.setupResend();
430 
431                        if(failureMode == FailureMode.Redistribute) {
432                                redistributeOperations(qa.destroyInputQueue());
433                        } else if(failureMode == FailureMode.Cancel) {
434                                cancelOperations(qa.destroyInputQueue());
435                        }
436                }
437        }
438 
439        private void cancelOperations(Collection<Operation> ops) {
440                for(Operation op : ops) {
441                        op.cancel();
442                }
443        }
444 
445        private void redistributeOperations(Collection<Operation> ops) {
446                for(Operation op : ops) {
447                        if(op instanceof KeyedOperation) {
448                                KeyedOperation ko = (KeyedOperation)op;
449                                int added = 0;
450                                for(String k : ko.getKeys()) {
451                                        for(Operation newop : opFact.clone(ko)) {
452                                                addOperation(k, newop);
453                                                added++;
454                                        }
455                                }
456                                assert added > 0
457                                        : "Didn't add any new operations when redistributing";
458                        } else {
459                                // Cancel things that don't have definite targets.
460                                op.cancel();
461                        }
462                }
463        }
464 
465        private void attemptReconnects() throws IOException {
466                final long now=System.currentTimeMillis();
467                final Map<MemcachedNode, Boolean> seen=
468                        new IdentityHashMap<MemcachedNode, Boolean>();
469                final List<MemcachedNode> rereQueue=new ArrayList<MemcachedNode>();
470                for(Iterator<MemcachedNode> i=
471                                reconnectQueue.headMap(now).values().iterator(); i.hasNext();) {
472                        final MemcachedNode qa=i.next();
473                        i.remove();
474                        try {
475                                if(!seen.containsKey(qa)) {
476                                        seen.put(qa, Boolean.TRUE);
477                                        getLogger().info("Reconnecting %s", qa);
478                                        final SocketChannel ch=SocketChannel.open();
479                                        ch.configureBlocking(false);
480                                        int ops=0;
481                                        if(ch.connect(qa.getSocketAddress())) {
482                                                getLogger().info("Immediately reconnected to %s", qa);
483                                                assert ch.isConnected();
484                                        } else {
485                                                ops=SelectionKey.OP_CONNECT;
486                                        }
487                                        qa.registerChannel(ch, ch.register(selector, ops, qa));
488                                        assert qa.getChannel() == ch : "Channel was lost.";
489                                } else {
490                                        getLogger().debug(
491                                                "Skipping duplicate reconnect request for %s", qa);
492                                }
493                        } catch(SocketException e) {
494                                getLogger().warn("Error on reconnect", e);
495                                rereQueue.add(qa);
496                        }
497                }
498                // Requeue any fast-failed connects.
499                for(MemcachedNode n : rereQueue) {
500                        queueReconnect(n);
501                }
502        }
503 
504        /**
505         * Get the node locator used by this connection.
506         */
507        NodeLocator getLocator() {
508                return locator;
509        }
510 
511        /**
512         * Add an operation to the given connection.
513         *
514         * @param key the key the operation is operating upon
515         * @param o the operation
516         */
517        public void addOperation(final String key, final Operation o) {
518                MemcachedNode placeIn=null;
519                MemcachedNode primary = locator.getPrimary(key);
520                if(primary.isActive() || failureMode == FailureMode.Retry) {
521                        placeIn=primary;
522                } else if(failureMode == FailureMode.Cancel) {
523                        o.cancel();
524                } else {
525                        // Look for another node in sequence that is ready.
526                        for(Iterator<MemcachedNode> i=locator.getSequence(key);
527                                placeIn == null && i.hasNext(); ) {
528                                MemcachedNode n=i.next();
529                                if(n.isActive()) {
530                                        placeIn=n;
531                                }
532                        }
533                        // If we didn't find an active node, queue it in the primary node
534                        // and wait for it to come back online.
535                        if(placeIn == null) {
536                                placeIn = primary;
537                        }
538                }
539 
540                assert o.isCancelled() || placeIn != null
541                        : "No node found for key " + key;
542                if(placeIn != null) {
543                        addOperation(placeIn, o);
544                } else {
545                        assert o.isCancelled() : "No not found for "
546                                + key + " (and not immediately cancelled)";
547                }
548        }
549 
550        public void addOperation(final MemcachedNode node, final Operation o) {
551                o.setHandlingNode(node);
552                o.initialize();
553                node.addOp(o);
554                addedQueue.offer(node);
555                Selector s=selector.wakeup();
556                assert s == selector : "Wakeup returned the wrong selector.";
557                getLogger().debug("Added %s to %s", o, node);
558        }
559 
560        public void addOperations(final Map<MemcachedNode, Operation> ops) {
561 
562                for(Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) {
563                        final MemcachedNode node=me.getKey();
564                        Operation o=me.getValue();
565                        o.setHandlingNode(node);
566                        o.initialize();
567                        node.addOp(o);
568                        addedQueue.offer(node);
569                }
570                Selector s=selector.wakeup();
571                assert s == selector : "Wakeup returned the wrong selector.";
572        }
573 
574        /**
575         * Broadcast an operation to all nodes.
576         */
577        public CountDownLatch broadcastOperation(final BroadcastOpFactory of) {
578                final CountDownLatch latch=new CountDownLatch(locator.getAll().size());
579                for(MemcachedNode node : locator.getAll()) {
580                        Operation op = of.newOp(node, latch);
581                        op.initialize();
582                        node.addOp(op);
583                        op.setHandlingNode(node);
584                        addedQueue.offer(node);
585                }
586                Selector s=selector.wakeup();
587                assert s == selector : "Wakeup returned the wrong selector.";
588                return latch;
589        }
590 
591        /**
592         * Shut down all of the connections.
593         */
594        public void shutdown() throws IOException {
595                shutDown=true;
596                Selector s=selector.wakeup();
597                assert s == selector : "Wakeup returned the wrong selector.";
598                for(MemcachedNode qa : locator.getAll()) {
599                        if(qa.getChannel() != null) {
600                                qa.getChannel().close();
601                                qa.setSk(null);
602                                if(qa.getBytesRemainingToWrite() > 0) {
603                                        getLogger().warn(
604                                                "Shut down with %d bytes remaining to write",
605                                                        qa.getBytesRemainingToWrite());
606                                }
607                                getLogger().debug("Shut down channel %s", qa.getChannel());
608                        }
609                }
610                selector.close();
611                getLogger().debug("Shut down selector %s", selector);
612        }
613 
614        @Override
615        public String toString() {
616                StringBuilder sb=new StringBuilder();
617                sb.append("{MemcachedConnection to");
618                for(MemcachedNode qa : locator.getAll()) {
619                        sb.append(" ");
620                        sb.append(qa.getSocketAddress());
621                }
622                sb.append("}");
623                return sb.toString();
624        }
625 
626}

[all classes][net.spy.memcached]
EMMA 2.0.5312 (C) Vladimir Roubtsov