EMMA Coverage Report (generated Tue Oct 27 11:32:50 PDT 2009)
[all classes][net.spy.memcached.protocol]

COVERAGE SUMMARY FOR SOURCE FILE [TCPMemcachedNodeImpl.java]

nameclass, %method, %block, %line, %
TCPMemcachedNodeImpl.java100% (1/1)100% (32/32)84%  (642/764)95%  (138.2/146)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TCPMemcachedNodeImpl100% (1/1)100% (32/32)84%  (642/764)95%  (138.2/146)
writeSome (): int 100% (1/1)62%  (34/55)84%  (5.1/6)
TCPMemcachedNodeImpl (SocketAddress, SocketChannel, int, BlockingQueue, Block... 100% (1/1)64%  (66/103)84%  (16.8/20)
addOp (Operation): void 100% (1/1)71%  (10/14)90%  (2.7/3)
setChannel (SocketChannel): void 100% (1/1)72%  (13/18)88%  (2.6/3)
fillWriteBuffer (boolean): void 100% (1/1)73%  (130/177)92%  (26.8/29)
<static initializer> 100% (1/1)75%  (6/8)75%  (0.8/1)
transitionWriteItem (): void 100% (1/1)78%  (18/23)90%  (3.6/4)
toString (): String 100% (1/1)99%  (74/75)100% (7/7)
connected (): void 100% (1/1)100% (4/4)100% (2/2)
copyInputQueue (): void 100% (1/1)100% (18/18)100% (4/4)
destroyInputQueue (): Collection 100% (1/1)100% (11/11)100% (3/3)
fixupOps (): void 100% (1/1)100% (32/32)100% (8/8)
getBytesRemainingToWrite (): int 100% (1/1)100% (3/3)100% (1/1)
getChannel (): SocketChannel 100% (1/1)100% (3/3)100% (1/1)
getCurrentReadOp (): Operation 100% (1/1)100% (5/5)100% (1/1)
getCurrentWriteOp (): Operation 100% (1/1)100% (11/11)100% (1/1)
getRbuf (): ByteBuffer 100% (1/1)100% (3/3)100% (1/1)
getReconnectCount (): int 100% (1/1)100% (3/3)100% (1/1)
getSelectionOps (): int 100% (1/1)100% (28/28)100% (8/8)
getSk (): SelectionKey 100% (1/1)100% (3/3)100% (1/1)
getSocketAddress (): SocketAddress 100% (1/1)100% (3/3)100% (1/1)
getWbuf (): ByteBuffer 100% (1/1)100% (3/3)100% (1/1)
hasReadOp (): boolean 100% (1/1)100% (8/8)100% (1/1)
hasWriteOp (): boolean 100% (1/1)100% (11/11)100% (1/1)
isActive (): boolean 100% (1/1)100% (14/14)100% (1/1)
preparePending (): boolean 100% (1/1)100% (33/33)100% (7/7)
reconnecting (): void 100% (1/1)100% (7/7)100% (2/2)
registerChannel (SocketChannel, SelectionKey): void 100% (1/1)100% (7/7)100% (3/3)
removeCurrentReadOp (): Operation 100% (1/1)100% (5/5)100% (1/1)
removeCurrentWriteOp (): Operation 100% (1/1)100% (16/16)100% (5/5)
setSk (SelectionKey): void 100% (1/1)100% (4/4)100% (2/2)
setupResend (): void 100% (1/1)100% (56/56)100% (16/16)

1package net.spy.memcached.protocol;
2 
3import java.io.IOException;
4import java.net.SocketAddress;
5import java.nio.ByteBuffer;
6import java.nio.channels.SelectionKey;
7import java.nio.channels.SocketChannel;
8import java.util.ArrayList;
9import java.util.Collection;
10import java.util.concurrent.BlockingQueue;
11 
12import net.spy.memcached.MemcachedNode;
13import net.spy.memcached.compat.SpyObject;
14import net.spy.memcached.ops.Operation;
15import net.spy.memcached.ops.OperationState;
16 
17/**
18 * Represents a node with the memcached cluster, along with buffering and
19 * operation queues.
20 */
21public abstract class TCPMemcachedNodeImpl extends SpyObject
22        implements MemcachedNode {
23 
24        private final SocketAddress socketAddress;
25        private final ByteBuffer rbuf;
26        private final ByteBuffer wbuf;
27        protected final BlockingQueue<Operation> writeQ;
28        private final BlockingQueue<Operation> readQ;
29        private final BlockingQueue<Operation> inputQueue;
30        // This has been declared volatile so it can be used as an availability
31        // indicator.
32        private volatile int reconnectAttempt=1;
33        private SocketChannel channel;
34        private int toWrite=0;
35        protected Operation optimizedOp=null;
36        private volatile SelectionKey sk=null;
37 
38        public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c,
39                        int bufSize, BlockingQueue<Operation> rq,
40                        BlockingQueue<Operation> wq, BlockingQueue<Operation> iq) {
41                super();
42                assert sa != null : "No SocketAddress";
43                assert c != null : "No SocketChannel";
44                assert bufSize > 0 : "Invalid buffer size: " + bufSize;
45                assert rq != null : "No operation read queue";
46                assert wq != null : "No operation write queue";
47                assert iq != null : "No input queue";
48                socketAddress=sa;
49                setChannel(c);
50                rbuf=ByteBuffer.allocate(bufSize);
51                wbuf=ByteBuffer.allocate(bufSize);
52                getWbuf().clear();
53                readQ=rq;
54                writeQ=wq;
55                inputQueue=iq;
56        }
57 
58        /* (non-Javadoc)
59         * @see net.spy.memcached.MemcachedNode#copyInputQueue()
60         */
61        public final void copyInputQueue() {
62                Collection<Operation> tmp=new ArrayList<Operation>();
63 
64                // don't drain more than we have space to place
65                inputQueue.drainTo(tmp, writeQ.remainingCapacity());
66 
67                writeQ.addAll(tmp);
68        }
69 
70        /* (non-Javadoc)
71         * @see net.spy.memcached.MemcachedNode#destroyInputQueue()
72         */
73        public Collection<Operation> destroyInputQueue() {
74                Collection<Operation> rv=new ArrayList<Operation>();
75                inputQueue.drainTo(rv);
76                return rv;
77        }
78 
79        /* (non-Javadoc)
80         * @see net.spy.memcached.MemcachedNode#setupResend()
81         */
82        public final void setupResend() {
83                // First, reset the current write op.
84                Operation op=getCurrentWriteOp();
85                if(op != null) {
86                        ByteBuffer buf=op.getBuffer();
87                        if(buf != null) {
88                                buf.reset();
89                        } else {
90                                getLogger().info("No buffer for current write op, removing");
91                                removeCurrentWriteOp();
92                        }
93                }
94                // Now cancel all the pending read operations.  Might be better to
95                // to requeue them.
96                while(hasReadOp()) {
97                        op=removeCurrentReadOp();
98                        if (op != getCurrentWriteOp()) {
99                                getLogger().warn("Discarding partially completed op: %s", op);
100                                op.cancel();
101                        }
102                }
103 
104                getWbuf().clear();
105                getRbuf().clear();
106                toWrite=0;
107        }
108 
109        // Prepare the pending operations.  Return true if there are any pending
110        // ops
111        private boolean preparePending() {
112                // Copy the input queue into the write queue.
113                copyInputQueue();
114 
115                // Now check the ops
116                Operation nextOp=getCurrentWriteOp();
117                while(nextOp != null && nextOp.isCancelled()) {
118                        getLogger().info("Removing cancelled operation: %s", nextOp);
119                        removeCurrentWriteOp();
120                        nextOp=getCurrentWriteOp();
121                }
122                return nextOp != null;
123        }
124 
125        /* (non-Javadoc)
126         * @see net.spy.memcached.MemcachedNode#fillWriteBuffer(boolean)
127         */
128        public final void fillWriteBuffer(boolean shouldOptimize) {
129                if(toWrite == 0 && readQ.remainingCapacity() > 0) {
130                        getWbuf().clear();
131                        Operation o=getCurrentWriteOp();
132                        while(o != null && toWrite < getWbuf().capacity()) {
133                                assert o.getState() == OperationState.WRITING;
134                                // This isn't the most optimal way to do this, but it hints
135                                // at a larger design problem that may need to be taken care
136                                // if in the bowels of the client.
137                                // In practice, readQ should be small, however.
138                                if(!readQ.contains(o)) {
139                                        readQ.add(o);
140                                }
141 
142                                ByteBuffer obuf=o.getBuffer();
143                                assert obuf != null : "Didn't get a write buffer from " + o;
144                                int bytesToCopy=Math.min(getWbuf().remaining(),
145                                                obuf.remaining());
146                                byte b[]=new byte[bytesToCopy];
147                                obuf.get(b);
148                                getWbuf().put(b);
149                                getLogger().debug("After copying stuff from %s: %s",
150                                                o, getWbuf());
151                                if(!o.getBuffer().hasRemaining()) {
152                                        o.writeComplete();
153                                        transitionWriteItem();
154 
155                                        preparePending();
156                                        if(shouldOptimize) {
157                                                optimize();
158                                        }
159 
160                                        o=getCurrentWriteOp();
161                                }
162                                toWrite += bytesToCopy;
163                        }
164                        getWbuf().flip();
165                        assert toWrite <= getWbuf().capacity()
166                                : "toWrite exceeded capacity: " + this;
167                        assert toWrite == getWbuf().remaining()
168                                : "Expected " + toWrite + " remaining, got "
169                                + getWbuf().remaining();
170                } else {
171                        getLogger().debug("Buffer is full, skipping");
172                }
173        }
174 
175        /* (non-Javadoc)
176         * @see net.spy.memcached.MemcachedNode#transitionWriteItem()
177         */
178        public final void transitionWriteItem() {
179                Operation op=removeCurrentWriteOp();
180                assert op != null : "There is no write item to transition";
181                getLogger().debug("Finished writing %s", op);
182        }
183 
184        /* (non-Javadoc)
185         * @see net.spy.memcached.MemcachedNode#optimize()
186         */
187        protected abstract void optimize();
188 
189        /* (non-Javadoc)
190         * @see net.spy.memcached.MemcachedNode#getCurrentReadOp()
191         */
192        public final Operation getCurrentReadOp() {
193                return readQ.peek();
194        }
195 
196        /* (non-Javadoc)
197         * @see net.spy.memcached.MemcachedNode#removeCurrentReadOp()
198         */
199        public final Operation removeCurrentReadOp() {
200                return readQ.remove();
201        }
202 
203        /* (non-Javadoc)
204         * @see net.spy.memcached.MemcachedNode#getCurrentWriteOp()
205         */
206        public final Operation getCurrentWriteOp() {
207                return optimizedOp == null ? writeQ.peek() : optimizedOp;
208        }
209 
210        /* (non-Javadoc)
211         * @see net.spy.memcached.MemcachedNode#removeCurrentWriteOp()
212         */
213        public final Operation removeCurrentWriteOp() {
214                Operation rv=optimizedOp;
215                if(rv == null) {
216                        rv=writeQ.remove();
217                } else {
218                        optimizedOp=null;
219                }
220                return rv;
221        }
222 
223        /* (non-Javadoc)
224         * @see net.spy.memcached.MemcachedNode#hasReadOp()
225         */
226        public final boolean hasReadOp() {
227                return !readQ.isEmpty();
228        }
229 
230        /* (non-Javadoc)
231         * @see net.spy.memcached.MemcachedNode#hasWriteOp()
232         */
233        public final boolean hasWriteOp() {
234                return !(optimizedOp == null && writeQ.isEmpty());
235        }
236 
237        /* (non-Javadoc)
238         * @see net.spy.memcached.MemcachedNode#addOp(net.spy.memcached.ops.Operation)
239         */
240        public final void addOp(Operation op) {
241                boolean added=inputQueue.add(op);
242                assert added; // documented to throw an IllegalStateException
243        }
244 
245        /* (non-Javadoc)
246         * @see net.spy.memcached.MemcachedNode#getSelectionOps()
247         */
248        public final int getSelectionOps() {
249                int rv=0;
250                if(getChannel().isConnected()) {
251                        if(hasReadOp()) {
252                                rv |= SelectionKey.OP_READ;
253                        }
254                        if(toWrite > 0 || hasWriteOp()) {
255                                rv |= SelectionKey.OP_WRITE;
256                        }
257                } else {
258                        rv = SelectionKey.OP_CONNECT;
259                }
260                return rv;
261        }
262 
263        /* (non-Javadoc)
264         * @see net.spy.memcached.MemcachedNode#getRbuf()
265         */
266        public final ByteBuffer getRbuf() {
267                return rbuf;
268        }
269 
270        /* (non-Javadoc)
271         * @see net.spy.memcached.MemcachedNode#getWbuf()
272         */
273        public final ByteBuffer getWbuf() {
274                return wbuf;
275        }
276 
277        /* (non-Javadoc)
278         * @see net.spy.memcached.MemcachedNode#getSocketAddress()
279         */
280        public final SocketAddress getSocketAddress() {
281                return socketAddress;
282        }
283 
284        /* (non-Javadoc)
285         * @see net.spy.memcached.MemcachedNode#isActive()
286         */
287        public final boolean isActive() {
288                return reconnectAttempt == 0
289                        && getChannel() != null && getChannel().isConnected();
290        }
291 
292        /* (non-Javadoc)
293         * @see net.spy.memcached.MemcachedNode#reconnecting()
294         */
295        public final void reconnecting() {
296                reconnectAttempt++;
297        }
298 
299        /* (non-Javadoc)
300         * @see net.spy.memcached.MemcachedNode#connected()
301         */
302        public final void connected() {
303                reconnectAttempt=0;
304        }
305 
306        /* (non-Javadoc)
307         * @see net.spy.memcached.MemcachedNode#getReconnectCount()
308         */
309        public final int getReconnectCount() {
310                return reconnectAttempt;
311        }
312 
313        /* (non-Javadoc)
314         * @see net.spy.memcached.MemcachedNode#toString()
315         */
316        @Override
317        public final String toString() {
318                int sops=0;
319                if(getSk()!= null && getSk().isValid()) {
320                        sops=getSk().interestOps();
321                }
322                int rsize=readQ.size() + (optimizedOp == null ? 0 : 1);
323                int wsize=writeQ.size();
324                int isize=inputQueue.size();
325                return "{QA sa=" + getSocketAddress() + ", #Rops=" + rsize
326                        + ", #Wops=" + wsize
327                        + ", #iq=" + isize
328                        + ", topRop=" + getCurrentReadOp()
329                        + ", topWop=" + getCurrentWriteOp()
330                        + ", toWrite=" + toWrite
331                        + ", interested=" + sops + "}";
332        }
333 
334        /* (non-Javadoc)
335         * @see net.spy.memcached.MemcachedNode#registerChannel(java.nio.channels.SocketChannel, java.nio.channels.SelectionKey)
336         */
337        public final void registerChannel(SocketChannel ch, SelectionKey skey) {
338                setChannel(ch);
339                setSk(skey);
340        }
341 
342        /* (non-Javadoc)
343         * @see net.spy.memcached.MemcachedNode#setChannel(java.nio.channels.SocketChannel)
344         */
345        public final void setChannel(SocketChannel to) {
346                assert channel == null || !channel.isOpen()
347                        : "Attempting to overwrite channel";
348                channel = to;
349        }
350 
351        /* (non-Javadoc)
352         * @see net.spy.memcached.MemcachedNode#getChannel()
353         */
354        public final SocketChannel getChannel() {
355                return channel;
356        }
357 
358        /* (non-Javadoc)
359         * @see net.spy.memcached.MemcachedNode#setSk(java.nio.channels.SelectionKey)
360         */
361        public final void setSk(SelectionKey to) {
362                sk = to;
363        }
364 
365        /* (non-Javadoc)
366         * @see net.spy.memcached.MemcachedNode#getSk()
367         */
368        public final SelectionKey getSk() {
369                return sk;
370        }
371 
372        /* (non-Javadoc)
373         * @see net.spy.memcached.MemcachedNode#getBytesRemainingInBuffer()
374         */
375        public final int getBytesRemainingToWrite() {
376                return toWrite;
377        }
378 
379        /* (non-Javadoc)
380         * @see net.spy.memcached.MemcachedNode#writeSome()
381         */
382        public final int writeSome() throws IOException {
383                int wrote=channel.write(wbuf);
384                assert wrote >= 0 : "Wrote negative bytes?";
385                toWrite -= wrote;
386                assert toWrite >= 0
387                        : "toWrite went negative after writing " + wrote
388                                + " bytes for " + this;
389                getLogger().debug("Wrote %d bytes", wrote);
390                return wrote;
391        }
392 
393 
394        public final void fixupOps() {
395                // As the selection key can be changed at any point due to node
396                // failure, we'll grab the current volatile value and configure it.
397                SelectionKey s = sk;
398                if(s != null && s.isValid()) {
399                        int iops=getSelectionOps();
400                        getLogger().debug("Setting interested opts to %d", iops);
401                        s.interestOps(iops);
402                } else {
403                        getLogger().debug("Selection key is not valid.");
404                }
405        }
406}

[all classes][net.spy.memcached.protocol]
EMMA 2.0.5312 (C) Vladimir Roubtsov