1 | // Copyright (c) 2006 Dustin Sallings <dustin@spy.net> |
2 | |
3 | package net.spy.memcached; |
4 | |
5 | import java.io.IOException; |
6 | import java.net.ConnectException; |
7 | import java.net.InetSocketAddress; |
8 | import java.net.SocketAddress; |
9 | import java.net.SocketException; |
10 | import java.nio.ByteBuffer; |
11 | import java.nio.channels.ClosedChannelException; |
12 | import java.nio.channels.SelectionKey; |
13 | import java.nio.channels.Selector; |
14 | import java.nio.channels.SocketChannel; |
15 | import java.util.ArrayList; |
16 | import java.util.Collection; |
17 | import java.util.HashSet; |
18 | import java.util.IdentityHashMap; |
19 | import java.util.Iterator; |
20 | import java.util.List; |
21 | import java.util.Map; |
22 | import java.util.NoSuchElementException; |
23 | import java.util.Set; |
24 | import java.util.SortedMap; |
25 | import java.util.TreeMap; |
26 | import java.util.concurrent.ConcurrentLinkedQueue; |
27 | import java.util.concurrent.CountDownLatch; |
28 | |
29 | import net.spy.memcached.compat.SpyObject; |
30 | import net.spy.memcached.ops.KeyedOperation; |
31 | import net.spy.memcached.ops.Operation; |
32 | import net.spy.memcached.ops.OperationState; |
33 | |
34 | /** |
35 | * Connection to a cluster of memcached servers. |
36 | */ |
37 | public final class MemcachedConnection extends SpyObject { |
38 | |
39 | // The number of empty selects we'll allow before assuming we may have |
40 | // missed one and should check the current selectors. This generally |
41 | // indicates a bug, but we'll check it nonetheless. |
42 | private static final int DOUBLE_CHECK_EMPTY = 256; |
43 | // The number of empty selects we'll allow before blowing up. It's too |
44 | // easy to write a bug that causes it to loop uncontrollably. This helps |
45 | // find those bugs and often works around them. |
46 | private static final int EXCESSIVE_EMPTY = 0x1000000; |
47 | |
48 | private volatile boolean shutDown=false; |
49 | // If true, optimization will collapse multiple sequential get ops |
50 | private final boolean shouldOptimize; |
51 | private Selector selector=null; |
52 | private final NodeLocator locator; |
53 | private final FailureMode failureMode; |
54 | // maximum amount of time to wait between reconnect attempts |
55 | private final long maxDelay; |
56 | private int emptySelects=0; |
57 | // AddedQueue is used to track the QueueAttachments for which operations |
58 | // have recently been queued. |
59 | private final ConcurrentLinkedQueue<MemcachedNode> addedQueue; |
60 | // reconnectQueue contains the attachments that need to be reconnected |
61 | // The key is the time at which they are eligible for reconnect |
62 | private final SortedMap<Long, MemcachedNode> reconnectQueue; |
63 | |
64 | private final Collection<ConnectionObserver> connObservers = |
65 | new ConcurrentLinkedQueue<ConnectionObserver>(); |
66 | private final OperationFactory opFact; |
67 | |
68 | /** |
69 | * Construct a memcached connection. |
70 | * |
71 | * @param bufSize the size of the buffer used for reading from the server |
72 | * @param f the factory that will provide an operation queue |
73 | * @param a the addresses of the servers to connect to |
74 | * |
75 | * @throws IOException if a connection attempt fails early |
76 | */ |
77 | public MemcachedConnection(int bufSize, ConnectionFactory f, |
78 | List<InetSocketAddress> a, Collection<ConnectionObserver> obs, |
79 | FailureMode fm, OperationFactory opfactory) |
80 | throws IOException { |
81 | connObservers.addAll(obs); |
82 | reconnectQueue=new TreeMap<Long, MemcachedNode>(); |
83 | addedQueue=new ConcurrentLinkedQueue<MemcachedNode>(); |
84 | failureMode = fm; |
85 | shouldOptimize = f.shouldOptimize(); |
86 | maxDelay = f.getMaxReconnectDelay(); |
87 | opFact = opfactory; |
88 | selector=Selector.open(); |
89 | List<MemcachedNode> connections=new ArrayList<MemcachedNode>(a.size()); |
90 | for(SocketAddress sa : a) { |
91 | SocketChannel ch=SocketChannel.open(); |
92 | ch.configureBlocking(false); |
93 | MemcachedNode qa=f.createMemcachedNode(sa, ch, bufSize); |
94 | int ops=0; |
95 | ch.socket().setTcpNoDelay(!f.useNagleAlgorithm()); |
96 | // Initially I had attempted to skirt this by queueing every |
97 | // connect, but it considerably slowed down start time. |
98 | try { |
99 | if(ch.connect(sa)) { |
100 | getLogger().info("Connected to %s immediately", qa); |
101 | connected(qa); |
102 | } else { |
103 | getLogger().info("Added %s to connect queue", qa); |
104 | ops=SelectionKey.OP_CONNECT; |
105 | } |
106 | qa.setSk(ch.register(selector, ops, qa)); |
107 | assert ch.isConnected() |
108 | || qa.getSk().interestOps() == SelectionKey.OP_CONNECT |
109 | : "Not connected, and not wanting to connect"; |
110 | } catch(SocketException e) { |
111 | queueReconnect(qa); |
112 | } |
113 | connections.add(qa); |
114 | } |
115 | locator=f.createLocator(connections); |
116 | } |
117 | |
118 | private boolean selectorsMakeSense() { |
119 | for(MemcachedNode qa : locator.getAll()) { |
120 | if(qa.getSk() != null && qa.getSk().isValid()) { |
121 | if(qa.getChannel().isConnected()) { |
122 | int sops=qa.getSk().interestOps(); |
123 | int expected=0; |
124 | if(qa.hasReadOp()) { |
125 | expected |= SelectionKey.OP_READ; |
126 | } |
127 | if(qa.hasWriteOp()) { |
128 | expected |= SelectionKey.OP_WRITE; |
129 | } |
130 | if(qa.getBytesRemainingToWrite() > 0) { |
131 | expected |= SelectionKey.OP_WRITE; |
132 | } |
133 | assert sops == expected : "Invalid ops: " |
134 | + qa + ", expected " + expected + ", got " + sops; |
135 | } else { |
136 | int sops=qa.getSk().interestOps(); |
137 | assert sops == SelectionKey.OP_CONNECT |
138 | : "Not connected, and not watching for connect: " |
139 | + sops; |
140 | } |
141 | } |
142 | } |
143 | getLogger().debug("Checked the selectors."); |
144 | return true; |
145 | } |
146 | |
147 | /** |
148 | * MemcachedClient calls this method to handle IO over the connections. |
149 | */ |
150 | public void handleIO() throws IOException { |
151 | if(shutDown) { |
152 | throw new IOException("No IO while shut down"); |
153 | } |
154 | |
155 | // Deal with all of the stuff that's been added, but may not be marked |
156 | // writable. |
157 | handleInputQueue(); |
158 | getLogger().debug("Done dealing with queue."); |
159 | |
160 | long delay=0; |
161 | if(!reconnectQueue.isEmpty()) { |
162 | long now=System.currentTimeMillis(); |
163 | long then=reconnectQueue.firstKey(); |
164 | delay=Math.max(then-now, 1); |
165 | } |
166 | getLogger().debug("Selecting with delay of %sms", delay); |
167 | assert selectorsMakeSense() : "Selectors don't make sense."; |
168 | int selected=selector.select(delay); |
169 | Set<SelectionKey> selectedKeys=selector.selectedKeys(); |
170 | |
171 | if(selectedKeys.isEmpty() && !shutDown) { |
172 | getLogger().debug("No selectors ready, interrupted: " |
173 | + Thread.interrupted()); |
174 | if(++emptySelects > DOUBLE_CHECK_EMPTY) { |
175 | for(SelectionKey sk : selector.keys()) { |
176 | getLogger().info("%s has %s, interested in %s", |
177 | sk, sk.readyOps(), sk.interestOps()); |
178 | if(sk.readyOps() != 0) { |
179 | getLogger().info("%s has a ready op, handling IO", sk); |
180 | handleIO(sk); |
181 | } else { |
182 | lostConnection((MemcachedNode)sk.attachment()); |
183 | } |
184 | } |
185 | assert emptySelects < EXCESSIVE_EMPTY |
186 | : "Too many empty selects"; |
187 | } |
188 | } else { |
189 | getLogger().debug("Selected %d, selected %d keys", |
190 | selected, selectedKeys.size()); |
191 | emptySelects=0; |
192 | for(SelectionKey sk : selectedKeys) { |
193 | handleIO(sk); |
194 | } // for each selector |
195 | selectedKeys.clear(); |
196 | } |
197 | |
198 | if(!shutDown && !reconnectQueue.isEmpty()) { |
199 | attemptReconnects(); |
200 | } |
201 | } |
202 | |
203 | // Handle any requests that have been made against the client. |
204 | private void handleInputQueue() { |
205 | if(!addedQueue.isEmpty()) { |
206 | getLogger().debug("Handling queue"); |
207 | // If there's stuff in the added queue. Try to process it. |
208 | Collection<MemcachedNode> toAdd=new HashSet<MemcachedNode>(); |
209 | // Transfer the queue into a hashset. There are very likely more |
210 | // additions than there are nodes. |
211 | Collection<MemcachedNode> todo=new HashSet<MemcachedNode>(); |
212 | try { |
213 | MemcachedNode qa=null; |
214 | while((qa=addedQueue.remove()) != null) { |
215 | todo.add(qa); |
216 | } |
217 | } catch(NoSuchElementException e) { |
218 | // Found everything |
219 | } |
220 | |
221 | // Now process the queue. |
222 | for(MemcachedNode qa : todo) { |
223 | boolean readyForIO=false; |
224 | if(qa.isActive()) { |
225 | if(qa.getCurrentWriteOp() != null) { |
226 | readyForIO=true; |
227 | getLogger().debug("Handling queued write %s", qa); |
228 | } |
229 | } else { |
230 | toAdd.add(qa); |
231 | } |
232 | qa.copyInputQueue(); |
233 | if(readyForIO) { |
234 | try { |
235 | if(qa.getWbuf().hasRemaining()) { |
236 | handleWrites(qa.getSk(), qa); |
237 | } |
238 | } catch(IOException e) { |
239 | getLogger().warn("Exception handling write", e); |
240 | lostConnection(qa); |
241 | } |
242 | } |
243 | qa.fixupOps(); |
244 | } |
245 | addedQueue.addAll(toAdd); |
246 | } |
247 | } |
248 | |
249 | /** |
250 | * Add a connection observer. |
251 | * |
252 | * @return whether the observer was successfully added |
253 | */ |
254 | public boolean addObserver(ConnectionObserver obs) { |
255 | return connObservers.add(obs); |
256 | } |
257 | |
258 | /** |
259 | * Remove a connection observer. |
260 | * |
261 | * @return true if the observer existed and now doesn't |
262 | */ |
263 | public boolean removeObserver(ConnectionObserver obs) { |
264 | return connObservers.remove(obs); |
265 | } |
266 | |
267 | private void connected(MemcachedNode qa) { |
268 | assert qa.getChannel().isConnected() : "Not connected."; |
269 | int rt = qa.getReconnectCount(); |
270 | qa.connected(); |
271 | for(ConnectionObserver observer : connObservers) { |
272 | observer.connectionEstablished(qa.getSocketAddress(), rt); |
273 | } |
274 | } |
275 | |
276 | private void lostConnection(MemcachedNode qa) { |
277 | queueReconnect(qa); |
278 | for(ConnectionObserver observer : connObservers) { |
279 | observer.connectionLost(qa.getSocketAddress()); |
280 | } |
281 | } |
282 | |
283 | // Handle IO for a specific selector. Any IOException will cause a |
284 | // reconnect |
285 | private void handleIO(SelectionKey sk) { |
286 | MemcachedNode qa=(MemcachedNode)sk.attachment(); |
287 | try { |
288 | getLogger().debug( |
289 | "Handling IO for: %s (r=%s, w=%s, c=%s, op=%s)", |
290 | sk, sk.isReadable(), sk.isWritable(), |
291 | sk.isConnectable(), sk.attachment()); |
292 | if(sk.isConnectable()) { |
293 | getLogger().info("Connection state changed for %s", sk); |
294 | final SocketChannel channel=qa.getChannel(); |
295 | if(channel.finishConnect()) { |
296 | connected(qa); |
297 | addedQueue.offer(qa); |
298 | if(qa.getWbuf().hasRemaining()) { |
299 | handleWrites(sk, qa); |
300 | } |
301 | } else { |
302 | assert !channel.isConnected() : "connected"; |
303 | } |
304 | } else { |
305 | if(sk.isReadable()) { |
306 | handleReads(sk, qa); |
307 | } |
308 | if(sk.isWritable()) { |
309 | handleWrites(sk, qa); |
310 | } |
311 | } |
312 | } catch(ClosedChannelException e) { |
313 | if(!shutDown) { |
314 | getLogger().info("Closed channel and not shutting down. " |
315 | + "Queueing reconnect on %s", qa, e); |
316 | lostConnection(qa); |
317 | } |
318 | } catch(ConnectException e) { |
319 | // Failures to establish a connection should attempt a reconnect |
320 | // without signaling the observers. |
321 | getLogger().info("Reconnecting due to failure to connect to %s", |
322 | qa, e); |
323 | queueReconnect(qa); |
324 | } catch(Exception e) { |
325 | // Various errors occur on Linux that wind up here. However, any |
326 | // particular error processing an item should simply cause us to |
327 | // reconnect to the server. |
328 | getLogger().info("Reconnecting due to exception on %s", qa, e); |
329 | lostConnection(qa); |
330 | } |
331 | qa.fixupOps(); |
332 | } |
333 | |
334 | private void handleWrites(SelectionKey sk, MemcachedNode qa) |
335 | throws IOException { |
336 | qa.fillWriteBuffer(shouldOptimize); |
337 | boolean canWriteMore=qa.getBytesRemainingToWrite() > 0; |
338 | while(canWriteMore) { |
339 | int wrote=qa.writeSome(); |
340 | qa.fillWriteBuffer(shouldOptimize); |
341 | canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0; |
342 | } |
343 | } |
344 | |
345 | private void handleReads(SelectionKey sk, MemcachedNode qa) |
346 | throws IOException { |
347 | Operation currentOp = qa.getCurrentReadOp(); |
348 | ByteBuffer rbuf=qa.getRbuf(); |
349 | final SocketChannel channel = qa.getChannel(); |
350 | int read=channel.read(rbuf); |
351 | if (read < 0) { |
352 | // GRUMBLE. |
353 | throw new IOException("Disconnected"); |
354 | } |
355 | while(read > 0) { |
356 | getLogger().debug("Read %d bytes", read); |
357 | rbuf.flip(); |
358 | while(rbuf.remaining() > 0) { |
359 | if(currentOp == null) { |
360 | throw new IllegalStateException("No read operation."); |
361 | } |
362 | currentOp.readFromBuffer(rbuf); |
363 | if(currentOp.getState() == OperationState.COMPLETE) { |
364 | getLogger().debug( |
365 | "Completed read op: %s and giving the next %d bytes", |
366 | currentOp, rbuf.remaining()); |
367 | Operation op=qa.removeCurrentReadOp(); |
368 | assert op == currentOp |
369 | : "Expected to pop " + currentOp + " got " + op; |
370 | currentOp=qa.getCurrentReadOp(); |
371 | } |
372 | } |
373 | rbuf.clear(); |
374 | read=channel.read(rbuf); |
375 | } |
376 | } |
377 | |
378 | // Make a debug string out of the given buffer's values |
379 | static String dbgBuffer(ByteBuffer b, int size) { |
380 | StringBuilder sb=new StringBuilder(); |
381 | byte[] bytes=b.array(); |
382 | for(int i=0; i<size; i++) { |
383 | char ch=(char)bytes[i]; |
384 | if(Character.isWhitespace(ch) || Character.isLetterOrDigit(ch)) { |
385 | sb.append(ch); |
386 | } else { |
387 | sb.append("\\x"); |
388 | sb.append(Integer.toHexString(bytes[i] & 0xff)); |
389 | } |
390 | } |
391 | return sb.toString(); |
392 | } |
393 | |
394 | private void queueReconnect(MemcachedNode qa) { |
395 | if(!shutDown) { |
396 | getLogger().warn("Closing, and reopening %s, attempt %d.", |
397 | qa, qa.getReconnectCount()); |
398 | if(qa.getSk() != null) { |
399 | qa.getSk().cancel(); |
400 | assert !qa.getSk().isValid() : "Cancelled selection key is valid"; |
401 | } |
402 | qa.reconnecting(); |
403 | try { |
404 | if(qa.getChannel() != null && qa.getChannel().socket() != null) { |
405 | qa.getChannel().socket().close(); |
406 | } else { |
407 | getLogger().info("The channel or socket was null for %s", |
408 | qa); |
409 | } |
410 | } catch(IOException e) { |
411 | getLogger().warn("IOException trying to close a socket", e); |
412 | } |
413 | qa.setChannel(null); |
414 | |
415 | long delay = (long)Math.min(maxDelay, |
416 | Math.pow(2, qa.getReconnectCount())) * 1000; |
417 | long reconTime = System.currentTimeMillis() + delay; |
418 | |
419 | // Avoid potential condition where two connections are scheduled |
420 | // for reconnect at the exact same time. This is expected to be |
421 | // a rare situation. |
422 | while(reconnectQueue.containsKey(reconTime)) { |
423 | reconTime++; |
424 | } |
425 | |
426 | reconnectQueue.put(reconTime, qa); |
427 | |
428 | // Need to do a little queue management. |
429 | qa.setupResend(); |
430 | |
431 | if(failureMode == FailureMode.Redistribute) { |
432 | redistributeOperations(qa.destroyInputQueue()); |
433 | } else if(failureMode == FailureMode.Cancel) { |
434 | cancelOperations(qa.destroyInputQueue()); |
435 | } |
436 | } |
437 | } |
438 | |
439 | private void cancelOperations(Collection<Operation> ops) { |
440 | for(Operation op : ops) { |
441 | op.cancel(); |
442 | } |
443 | } |
444 | |
445 | private void redistributeOperations(Collection<Operation> ops) { |
446 | for(Operation op : ops) { |
447 | if(op instanceof KeyedOperation) { |
448 | KeyedOperation ko = (KeyedOperation)op; |
449 | int added = 0; |
450 | for(String k : ko.getKeys()) { |
451 | for(Operation newop : opFact.clone(ko)) { |
452 | addOperation(k, newop); |
453 | added++; |
454 | } |
455 | } |
456 | assert added > 0 |
457 | : "Didn't add any new operations when redistributing"; |
458 | } else { |
459 | // Cancel things that don't have definite targets. |
460 | op.cancel(); |
461 | } |
462 | } |
463 | } |
464 | |
465 | private void attemptReconnects() throws IOException { |
466 | final long now=System.currentTimeMillis(); |
467 | final Map<MemcachedNode, Boolean> seen= |
468 | new IdentityHashMap<MemcachedNode, Boolean>(); |
469 | final List<MemcachedNode> rereQueue=new ArrayList<MemcachedNode>(); |
470 | for(Iterator<MemcachedNode> i= |
471 | reconnectQueue.headMap(now).values().iterator(); i.hasNext();) { |
472 | final MemcachedNode qa=i.next(); |
473 | i.remove(); |
474 | try { |
475 | if(!seen.containsKey(qa)) { |
476 | seen.put(qa, Boolean.TRUE); |
477 | getLogger().info("Reconnecting %s", qa); |
478 | final SocketChannel ch=SocketChannel.open(); |
479 | ch.configureBlocking(false); |
480 | int ops=0; |
481 | if(ch.connect(qa.getSocketAddress())) { |
482 | getLogger().info("Immediately reconnected to %s", qa); |
483 | assert ch.isConnected(); |
484 | } else { |
485 | ops=SelectionKey.OP_CONNECT; |
486 | } |
487 | qa.registerChannel(ch, ch.register(selector, ops, qa)); |
488 | assert qa.getChannel() == ch : "Channel was lost."; |
489 | } else { |
490 | getLogger().debug( |
491 | "Skipping duplicate reconnect request for %s", qa); |
492 | } |
493 | } catch(SocketException e) { |
494 | getLogger().warn("Error on reconnect", e); |
495 | rereQueue.add(qa); |
496 | } |
497 | } |
498 | // Requeue any fast-failed connects. |
499 | for(MemcachedNode n : rereQueue) { |
500 | queueReconnect(n); |
501 | } |
502 | } |
503 | |
504 | /** |
505 | * Get the node locator used by this connection. |
506 | */ |
507 | NodeLocator getLocator() { |
508 | return locator; |
509 | } |
510 | |
511 | /** |
512 | * Add an operation to the given connection. |
513 | * |
514 | * @param key the key the operation is operating upon |
515 | * @param o the operation |
516 | */ |
517 | public void addOperation(final String key, final Operation o) { |
518 | MemcachedNode placeIn=null; |
519 | MemcachedNode primary = locator.getPrimary(key); |
520 | if(primary.isActive() || failureMode == FailureMode.Retry) { |
521 | placeIn=primary; |
522 | } else if(failureMode == FailureMode.Cancel) { |
523 | o.cancel(); |
524 | } else { |
525 | // Look for another node in sequence that is ready. |
526 | for(Iterator<MemcachedNode> i=locator.getSequence(key); |
527 | placeIn == null && i.hasNext(); ) { |
528 | MemcachedNode n=i.next(); |
529 | if(n.isActive()) { |
530 | placeIn=n; |
531 | } |
532 | } |
533 | // If we didn't find an active node, queue it in the primary node |
534 | // and wait for it to come back online. |
535 | if(placeIn == null) { |
536 | placeIn = primary; |
537 | } |
538 | } |
539 | |
540 | assert o.isCancelled() || placeIn != null |
541 | : "No node found for key " + key; |
542 | if(placeIn != null) { |
543 | addOperation(placeIn, o); |
544 | } else { |
545 | assert o.isCancelled() : "No not found for " |
546 | + key + " (and not immediately cancelled)"; |
547 | } |
548 | } |
549 | |
550 | public void addOperation(final MemcachedNode node, final Operation o) { |
551 | o.setHandlingNode(node); |
552 | o.initialize(); |
553 | node.addOp(o); |
554 | addedQueue.offer(node); |
555 | Selector s=selector.wakeup(); |
556 | assert s == selector : "Wakeup returned the wrong selector."; |
557 | getLogger().debug("Added %s to %s", o, node); |
558 | } |
559 | |
560 | public void addOperations(final Map<MemcachedNode, Operation> ops) { |
561 | |
562 | for(Map.Entry<MemcachedNode, Operation> me : ops.entrySet()) { |
563 | final MemcachedNode node=me.getKey(); |
564 | Operation o=me.getValue(); |
565 | o.setHandlingNode(node); |
566 | o.initialize(); |
567 | node.addOp(o); |
568 | addedQueue.offer(node); |
569 | } |
570 | Selector s=selector.wakeup(); |
571 | assert s == selector : "Wakeup returned the wrong selector."; |
572 | } |
573 | |
574 | /** |
575 | * Broadcast an operation to all nodes. |
576 | */ |
577 | public CountDownLatch broadcastOperation(final BroadcastOpFactory of) { |
578 | final CountDownLatch latch=new CountDownLatch(locator.getAll().size()); |
579 | for(MemcachedNode node : locator.getAll()) { |
580 | Operation op = of.newOp(node, latch); |
581 | op.initialize(); |
582 | node.addOp(op); |
583 | op.setHandlingNode(node); |
584 | addedQueue.offer(node); |
585 | } |
586 | Selector s=selector.wakeup(); |
587 | assert s == selector : "Wakeup returned the wrong selector."; |
588 | return latch; |
589 | } |
590 | |
591 | /** |
592 | * Shut down all of the connections. |
593 | */ |
594 | public void shutdown() throws IOException { |
595 | shutDown=true; |
596 | Selector s=selector.wakeup(); |
597 | assert s == selector : "Wakeup returned the wrong selector."; |
598 | for(MemcachedNode qa : locator.getAll()) { |
599 | if(qa.getChannel() != null) { |
600 | qa.getChannel().close(); |
601 | qa.setSk(null); |
602 | if(qa.getBytesRemainingToWrite() > 0) { |
603 | getLogger().warn( |
604 | "Shut down with %d bytes remaining to write", |
605 | qa.getBytesRemainingToWrite()); |
606 | } |
607 | getLogger().debug("Shut down channel %s", qa.getChannel()); |
608 | } |
609 | } |
610 | selector.close(); |
611 | getLogger().debug("Shut down selector %s", selector); |
612 | } |
613 | |
614 | @Override |
615 | public String toString() { |
616 | StringBuilder sb=new StringBuilder(); |
617 | sb.append("{MemcachedConnection to"); |
618 | for(MemcachedNode qa : locator.getAll()) { |
619 | sb.append(" "); |
620 | sb.append(qa.getSocketAddress()); |
621 | } |
622 | sb.append("}"); |
623 | return sb.toString(); |
624 | } |
625 | |
626 | } |