1 | package net.spy.memcached.protocol; |
2 | |
3 | import java.io.IOException; |
4 | import java.net.SocketAddress; |
5 | import java.nio.ByteBuffer; |
6 | import java.nio.channels.SelectionKey; |
7 | import java.nio.channels.SocketChannel; |
8 | import java.util.ArrayList; |
9 | import java.util.Collection; |
10 | import java.util.concurrent.BlockingQueue; |
11 | |
12 | import net.spy.memcached.MemcachedNode; |
13 | import net.spy.memcached.compat.SpyObject; |
14 | import net.spy.memcached.ops.Operation; |
15 | import net.spy.memcached.ops.OperationState; |
16 | |
17 | /** |
18 | * Represents a node with the memcached cluster, along with buffering and |
19 | * operation queues. |
20 | */ |
21 | public abstract class TCPMemcachedNodeImpl extends SpyObject |
22 | implements MemcachedNode { |
23 | |
24 | private final SocketAddress socketAddress; |
25 | private final ByteBuffer rbuf; |
26 | private final ByteBuffer wbuf; |
27 | protected final BlockingQueue<Operation> writeQ; |
28 | private final BlockingQueue<Operation> readQ; |
29 | private final BlockingQueue<Operation> inputQueue; |
30 | // This has been declared volatile so it can be used as an availability |
31 | // indicator. |
32 | private volatile int reconnectAttempt=1; |
33 | private SocketChannel channel; |
34 | private int toWrite=0; |
35 | protected Operation optimizedOp=null; |
36 | private volatile SelectionKey sk=null; |
37 | |
38 | public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, |
39 | int bufSize, BlockingQueue<Operation> rq, |
40 | BlockingQueue<Operation> wq, BlockingQueue<Operation> iq) { |
41 | super(); |
42 | assert sa != null : "No SocketAddress"; |
43 | assert c != null : "No SocketChannel"; |
44 | assert bufSize > 0 : "Invalid buffer size: " + bufSize; |
45 | assert rq != null : "No operation read queue"; |
46 | assert wq != null : "No operation write queue"; |
47 | assert iq != null : "No input queue"; |
48 | socketAddress=sa; |
49 | setChannel(c); |
50 | rbuf=ByteBuffer.allocate(bufSize); |
51 | wbuf=ByteBuffer.allocate(bufSize); |
52 | getWbuf().clear(); |
53 | readQ=rq; |
54 | writeQ=wq; |
55 | inputQueue=iq; |
56 | } |
57 | |
58 | /* (non-Javadoc) |
59 | * @see net.spy.memcached.MemcachedNode#copyInputQueue() |
60 | */ |
61 | public final void copyInputQueue() { |
62 | Collection<Operation> tmp=new ArrayList<Operation>(); |
63 | |
64 | // don't drain more than we have space to place |
65 | inputQueue.drainTo(tmp, writeQ.remainingCapacity()); |
66 | |
67 | writeQ.addAll(tmp); |
68 | } |
69 | |
70 | /* (non-Javadoc) |
71 | * @see net.spy.memcached.MemcachedNode#destroyInputQueue() |
72 | */ |
73 | public Collection<Operation> destroyInputQueue() { |
74 | Collection<Operation> rv=new ArrayList<Operation>(); |
75 | inputQueue.drainTo(rv); |
76 | return rv; |
77 | } |
78 | |
79 | /* (non-Javadoc) |
80 | * @see net.spy.memcached.MemcachedNode#setupResend() |
81 | */ |
82 | public final void setupResend() { |
83 | // First, reset the current write op. |
84 | Operation op=getCurrentWriteOp(); |
85 | if(op != null) { |
86 | ByteBuffer buf=op.getBuffer(); |
87 | if(buf != null) { |
88 | buf.reset(); |
89 | } else { |
90 | getLogger().info("No buffer for current write op, removing"); |
91 | removeCurrentWriteOp(); |
92 | } |
93 | } |
94 | // Now cancel all the pending read operations. Might be better to |
95 | // to requeue them. |
96 | while(hasReadOp()) { |
97 | op=removeCurrentReadOp(); |
98 | if (op != getCurrentWriteOp()) { |
99 | getLogger().warn("Discarding partially completed op: %s", op); |
100 | op.cancel(); |
101 | } |
102 | } |
103 | |
104 | getWbuf().clear(); |
105 | getRbuf().clear(); |
106 | toWrite=0; |
107 | } |
108 | |
109 | // Prepare the pending operations. Return true if there are any pending |
110 | // ops |
111 | private boolean preparePending() { |
112 | // Copy the input queue into the write queue. |
113 | copyInputQueue(); |
114 | |
115 | // Now check the ops |
116 | Operation nextOp=getCurrentWriteOp(); |
117 | while(nextOp != null && nextOp.isCancelled()) { |
118 | getLogger().info("Removing cancelled operation: %s", nextOp); |
119 | removeCurrentWriteOp(); |
120 | nextOp=getCurrentWriteOp(); |
121 | } |
122 | return nextOp != null; |
123 | } |
124 | |
125 | /* (non-Javadoc) |
126 | * @see net.spy.memcached.MemcachedNode#fillWriteBuffer(boolean) |
127 | */ |
128 | public final void fillWriteBuffer(boolean shouldOptimize) { |
129 | if(toWrite == 0 && readQ.remainingCapacity() > 0) { |
130 | getWbuf().clear(); |
131 | Operation o=getCurrentWriteOp(); |
132 | while(o != null && toWrite < getWbuf().capacity()) { |
133 | assert o.getState() == OperationState.WRITING; |
134 | // This isn't the most optimal way to do this, but it hints |
135 | // at a larger design problem that may need to be taken care |
136 | // if in the bowels of the client. |
137 | // In practice, readQ should be small, however. |
138 | if(!readQ.contains(o)) { |
139 | readQ.add(o); |
140 | } |
141 | |
142 | ByteBuffer obuf=o.getBuffer(); |
143 | assert obuf != null : "Didn't get a write buffer from " + o; |
144 | int bytesToCopy=Math.min(getWbuf().remaining(), |
145 | obuf.remaining()); |
146 | byte b[]=new byte[bytesToCopy]; |
147 | obuf.get(b); |
148 | getWbuf().put(b); |
149 | getLogger().debug("After copying stuff from %s: %s", |
150 | o, getWbuf()); |
151 | if(!o.getBuffer().hasRemaining()) { |
152 | o.writeComplete(); |
153 | transitionWriteItem(); |
154 | |
155 | preparePending(); |
156 | if(shouldOptimize) { |
157 | optimize(); |
158 | } |
159 | |
160 | o=getCurrentWriteOp(); |
161 | } |
162 | toWrite += bytesToCopy; |
163 | } |
164 | getWbuf().flip(); |
165 | assert toWrite <= getWbuf().capacity() |
166 | : "toWrite exceeded capacity: " + this; |
167 | assert toWrite == getWbuf().remaining() |
168 | : "Expected " + toWrite + " remaining, got " |
169 | + getWbuf().remaining(); |
170 | } else { |
171 | getLogger().debug("Buffer is full, skipping"); |
172 | } |
173 | } |
174 | |
175 | /* (non-Javadoc) |
176 | * @see net.spy.memcached.MemcachedNode#transitionWriteItem() |
177 | */ |
178 | public final void transitionWriteItem() { |
179 | Operation op=removeCurrentWriteOp(); |
180 | assert op != null : "There is no write item to transition"; |
181 | getLogger().debug("Finished writing %s", op); |
182 | } |
183 | |
184 | /* (non-Javadoc) |
185 | * @see net.spy.memcached.MemcachedNode#optimize() |
186 | */ |
187 | protected abstract void optimize(); |
188 | |
189 | /* (non-Javadoc) |
190 | * @see net.spy.memcached.MemcachedNode#getCurrentReadOp() |
191 | */ |
192 | public final Operation getCurrentReadOp() { |
193 | return readQ.peek(); |
194 | } |
195 | |
196 | /* (non-Javadoc) |
197 | * @see net.spy.memcached.MemcachedNode#removeCurrentReadOp() |
198 | */ |
199 | public final Operation removeCurrentReadOp() { |
200 | return readQ.remove(); |
201 | } |
202 | |
203 | /* (non-Javadoc) |
204 | * @see net.spy.memcached.MemcachedNode#getCurrentWriteOp() |
205 | */ |
206 | public final Operation getCurrentWriteOp() { |
207 | return optimizedOp == null ? writeQ.peek() : optimizedOp; |
208 | } |
209 | |
210 | /* (non-Javadoc) |
211 | * @see net.spy.memcached.MemcachedNode#removeCurrentWriteOp() |
212 | */ |
213 | public final Operation removeCurrentWriteOp() { |
214 | Operation rv=optimizedOp; |
215 | if(rv == null) { |
216 | rv=writeQ.remove(); |
217 | } else { |
218 | optimizedOp=null; |
219 | } |
220 | return rv; |
221 | } |
222 | |
223 | /* (non-Javadoc) |
224 | * @see net.spy.memcached.MemcachedNode#hasReadOp() |
225 | */ |
226 | public final boolean hasReadOp() { |
227 | return !readQ.isEmpty(); |
228 | } |
229 | |
230 | /* (non-Javadoc) |
231 | * @see net.spy.memcached.MemcachedNode#hasWriteOp() |
232 | */ |
233 | public final boolean hasWriteOp() { |
234 | return !(optimizedOp == null && writeQ.isEmpty()); |
235 | } |
236 | |
237 | /* (non-Javadoc) |
238 | * @see net.spy.memcached.MemcachedNode#addOp(net.spy.memcached.ops.Operation) |
239 | */ |
240 | public final void addOp(Operation op) { |
241 | boolean added=inputQueue.add(op); |
242 | assert added; // documented to throw an IllegalStateException |
243 | } |
244 | |
245 | /* (non-Javadoc) |
246 | * @see net.spy.memcached.MemcachedNode#getSelectionOps() |
247 | */ |
248 | public final int getSelectionOps() { |
249 | int rv=0; |
250 | if(getChannel().isConnected()) { |
251 | if(hasReadOp()) { |
252 | rv |= SelectionKey.OP_READ; |
253 | } |
254 | if(toWrite > 0 || hasWriteOp()) { |
255 | rv |= SelectionKey.OP_WRITE; |
256 | } |
257 | } else { |
258 | rv = SelectionKey.OP_CONNECT; |
259 | } |
260 | return rv; |
261 | } |
262 | |
263 | /* (non-Javadoc) |
264 | * @see net.spy.memcached.MemcachedNode#getRbuf() |
265 | */ |
266 | public final ByteBuffer getRbuf() { |
267 | return rbuf; |
268 | } |
269 | |
270 | /* (non-Javadoc) |
271 | * @see net.spy.memcached.MemcachedNode#getWbuf() |
272 | */ |
273 | public final ByteBuffer getWbuf() { |
274 | return wbuf; |
275 | } |
276 | |
277 | /* (non-Javadoc) |
278 | * @see net.spy.memcached.MemcachedNode#getSocketAddress() |
279 | */ |
280 | public final SocketAddress getSocketAddress() { |
281 | return socketAddress; |
282 | } |
283 | |
284 | /* (non-Javadoc) |
285 | * @see net.spy.memcached.MemcachedNode#isActive() |
286 | */ |
287 | public final boolean isActive() { |
288 | return reconnectAttempt == 0 |
289 | && getChannel() != null && getChannel().isConnected(); |
290 | } |
291 | |
292 | /* (non-Javadoc) |
293 | * @see net.spy.memcached.MemcachedNode#reconnecting() |
294 | */ |
295 | public final void reconnecting() { |
296 | reconnectAttempt++; |
297 | } |
298 | |
299 | /* (non-Javadoc) |
300 | * @see net.spy.memcached.MemcachedNode#connected() |
301 | */ |
302 | public final void connected() { |
303 | reconnectAttempt=0; |
304 | } |
305 | |
306 | /* (non-Javadoc) |
307 | * @see net.spy.memcached.MemcachedNode#getReconnectCount() |
308 | */ |
309 | public final int getReconnectCount() { |
310 | return reconnectAttempt; |
311 | } |
312 | |
313 | /* (non-Javadoc) |
314 | * @see net.spy.memcached.MemcachedNode#toString() |
315 | */ |
316 | @Override |
317 | public final String toString() { |
318 | int sops=0; |
319 | if(getSk()!= null && getSk().isValid()) { |
320 | sops=getSk().interestOps(); |
321 | } |
322 | int rsize=readQ.size() + (optimizedOp == null ? 0 : 1); |
323 | int wsize=writeQ.size(); |
324 | int isize=inputQueue.size(); |
325 | return "{QA sa=" + getSocketAddress() + ", #Rops=" + rsize |
326 | + ", #Wops=" + wsize |
327 | + ", #iq=" + isize |
328 | + ", topRop=" + getCurrentReadOp() |
329 | + ", topWop=" + getCurrentWriteOp() |
330 | + ", toWrite=" + toWrite |
331 | + ", interested=" + sops + "}"; |
332 | } |
333 | |
334 | /* (non-Javadoc) |
335 | * @see net.spy.memcached.MemcachedNode#registerChannel(java.nio.channels.SocketChannel, java.nio.channels.SelectionKey) |
336 | */ |
337 | public final void registerChannel(SocketChannel ch, SelectionKey skey) { |
338 | setChannel(ch); |
339 | setSk(skey); |
340 | } |
341 | |
342 | /* (non-Javadoc) |
343 | * @see net.spy.memcached.MemcachedNode#setChannel(java.nio.channels.SocketChannel) |
344 | */ |
345 | public final void setChannel(SocketChannel to) { |
346 | assert channel == null || !channel.isOpen() |
347 | : "Attempting to overwrite channel"; |
348 | channel = to; |
349 | } |
350 | |
351 | /* (non-Javadoc) |
352 | * @see net.spy.memcached.MemcachedNode#getChannel() |
353 | */ |
354 | public final SocketChannel getChannel() { |
355 | return channel; |
356 | } |
357 | |
358 | /* (non-Javadoc) |
359 | * @see net.spy.memcached.MemcachedNode#setSk(java.nio.channels.SelectionKey) |
360 | */ |
361 | public final void setSk(SelectionKey to) { |
362 | sk = to; |
363 | } |
364 | |
365 | /* (non-Javadoc) |
366 | * @see net.spy.memcached.MemcachedNode#getSk() |
367 | */ |
368 | public final SelectionKey getSk() { |
369 | return sk; |
370 | } |
371 | |
372 | /* (non-Javadoc) |
373 | * @see net.spy.memcached.MemcachedNode#getBytesRemainingInBuffer() |
374 | */ |
375 | public final int getBytesRemainingToWrite() { |
376 | return toWrite; |
377 | } |
378 | |
379 | /* (non-Javadoc) |
380 | * @see net.spy.memcached.MemcachedNode#writeSome() |
381 | */ |
382 | public final int writeSome() throws IOException { |
383 | int wrote=channel.write(wbuf); |
384 | assert wrote >= 0 : "Wrote negative bytes?"; |
385 | toWrite -= wrote; |
386 | assert toWrite >= 0 |
387 | : "toWrite went negative after writing " + wrote |
388 | + " bytes for " + this; |
389 | getLogger().debug("Wrote %d bytes", wrote); |
390 | return wrote; |
391 | } |
392 | |
393 | |
394 | public final void fixupOps() { |
395 | // As the selection key can be changed at any point due to node |
396 | // failure, we'll grab the current volatile value and configure it. |
397 | SelectionKey s = sk; |
398 | if(s != null && s.isValid()) { |
399 | int iops=getSelectionOps(); |
400 | getLogger().debug("Setting interested opts to %d", iops); |
401 | s.interestOps(iops); |
402 | } else { |
403 | getLogger().debug("Selection key is not valid."); |
404 | } |
405 | } |
406 | } |