EMMA Coverage Report (generated Tue Oct 27 11:32:50 PDT 2009)
[all classes][net.spy.memcached.protocol.binary]

COVERAGE SUMMARY FOR SOURCE FILE [OperationImpl.java]

nameclass, %method, %block, %line, %
OperationImpl.java100% (1/1)93%  (13/14)86%  (660/765)91%  (104.3/114)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class OperationImpl100% (1/1)93%  (13/14)86%  (660/765)91%  (104.3/114)
getStatusForErrorCode (int, byte []): OperationStatus 0%   (0/1)0%   (0/2)0%   (0/1)
opaqueIsValid (): boolean 100% (1/1)41%  (13/32)63%  (1.9/3)
generateOpaque (): int 100% (1/1)44%  (7/16)60%  (3/5)
decodePayload (byte []): void 100% (1/1)67%  (10/15)83%  (2.5/3)
finishedPayload (byte []): void 100% (1/1)76%  (26/34)90%  (9/10)
prepareBuffer (String, long, byte [], Object []): void 100% (1/1)83%  (170/204)94%  (32.8/35)
readFromBuffer (ByteBuffer): void 100% (1/1)88%  (188/214)94%  (31.1/33)
<static initializer> 100% (1/1)95%  (42/44)99%  (7/7)
OperationImpl (int, int, OperationCallback): void 100% (1/1)100% (25/25)100% (9/9)
decodeInt (byte [], int): int 100% (1/1)100% (36/36)100% (1/1)
decodeLong (byte [], int): long 100% (1/1)100% (77/77)100% (1/1)
decodeShort (byte [], int): int 100% (1/1)100% (16/16)100% (1/1)
decodeUnsignedInt (byte [], int): long 100% (1/1)100% (40/40)100% (1/1)
resetInput (): void 100% (1/1)100% (10/10)100% (4/4)

1package net.spy.memcached.protocol.binary;
2 
3import java.io.IOException;
4import java.nio.ByteBuffer;
5import java.nio.ByteOrder;
6import java.util.concurrent.atomic.AtomicInteger;
7 
8import net.spy.memcached.CASResponse;
9import net.spy.memcached.KeyUtil;
10import net.spy.memcached.ops.CASOperationStatus;
11import net.spy.memcached.ops.OperationCallback;
12import net.spy.memcached.ops.OperationErrorType;
13import net.spy.memcached.ops.OperationState;
14import net.spy.memcached.ops.OperationStatus;
15import net.spy.memcached.protocol.BaseOperationImpl;
16 
17/**
18 * Base class for binary operations.
19 */
20abstract class OperationImpl extends BaseOperationImpl {
21 
22        protected static final byte REQ_MAGIC = (byte)0x80;
23        protected static final byte RES_MAGIC = (byte)0x81;
24        protected static final int MIN_RECV_PACKET=24;
25 
26        /**
27         * Error code for items that were not found.
28         */
29        protected static final int ERR_NOT_FOUND = 1;
30        protected static final int ERR_EXISTS = 2;
31        protected static final int ERR_EINVAL = 4;
32        protected static final int ERR_NOT_STORED = 5;
33 
34        protected static final OperationStatus NOT_FOUND_STATUS =
35                new CASOperationStatus(false, "Not Found", CASResponse.NOT_FOUND);
36        protected static final OperationStatus EXISTS_STATUS =
37                new CASOperationStatus(false, "Object exists", CASResponse.EXISTS);
38        protected static final OperationStatus NOT_STORED_STATUS =
39                new CASOperationStatus(false, "Not Stored", CASResponse.NOT_FOUND);
40 
41        protected static final byte[] EMPTY_BYTES = new byte[0];
42 
43        protected static final OperationStatus STATUS_OK =
44                new CASOperationStatus(true, "OK", CASResponse.OK);
45 
46        private static final AtomicInteger seqNumber=new AtomicInteger(0);
47 
48        // request header fields
49        private final int cmd;
50        protected final int opaque;
51 
52        private final byte[] header=new byte[MIN_RECV_PACKET];
53        private int headerOffset=0;
54        private byte[] payload=null;
55 
56        // Response header fields
57        protected int keyLen;
58        protected int responseCmd;
59        protected int errorCode;
60        protected int responseOpaque;
61        protected long responseCas;
62 
63        private int payloadOffset=0;
64 
65        /**
66         * Construct with opaque.
67         *
68         * @param o the opaque value.
69         * @param cb
70         */
71        protected OperationImpl(int c, int o, OperationCallback cb) {
72                super();
73                cmd=c;
74                opaque=o;
75                setCallback(cb);
76        }
77 
78        protected void resetInput() {
79                payload=null;
80                payloadOffset=0;
81                headerOffset=0;
82        }
83 
84        // Base response packet format:
85        //    0      1       2  3    4         5         6  7    8 9 10 11
86        //        # magic, opcode, keylen, extralen, datatype, status, bodylen,
87        //    12,3,4,5  16
88        //    opaque, cas
89        //        RES_PKT_FMT=">BBHBBHIIQ"
90 
91        @Override
92        public void readFromBuffer(ByteBuffer b) throws IOException {
93                // First process headers if we haven't completed them yet
94                if(headerOffset < MIN_RECV_PACKET) {
95                        int toRead=MIN_RECV_PACKET - headerOffset;
96                        int available=b.remaining();
97                        toRead=Math.min(toRead, available);
98                        getLogger().debug("Reading %d header bytes", toRead);
99                        b.get(header, headerOffset, toRead);
100                        headerOffset+=toRead;
101 
102                        // We've completed reading the header.  Prepare body read.
103                        if(headerOffset == MIN_RECV_PACKET) {
104                                int magic=header[0];
105                                assert magic == RES_MAGIC : "Invalid magic:  " + magic;
106                                responseCmd=header[1];
107                                assert cmd == -1 || responseCmd == cmd
108                                        : "Unexpected response command value";
109                                keyLen=decodeShort(header, 2);
110                                // TODO:  Examine extralen and datatype
111                                errorCode=decodeShort(header, 6);
112                                int bytesToRead=decodeInt(header, 8);
113                                payload=new byte[bytesToRead];
114                                responseOpaque=decodeInt(header, 12);
115                                responseCas=decodeLong(header, 16);
116                                assert opaqueIsValid() : "Opaque is not valid";
117                        }
118                }
119 
120                // Now process the payload if we can.
121                if(headerOffset >= MIN_RECV_PACKET && payload == null) {
122                        finishedPayload(EMPTY_BYTES);
123                } else if(payload != null) {
124                        int toRead=payload.length - payloadOffset;
125                        int available=b.remaining();
126                        toRead=Math.min(toRead, available);
127                        getLogger().debug("Reading %d payload bytes", toRead);
128                        b.get(payload, payloadOffset, toRead);
129                        payloadOffset+=toRead;
130 
131                        // Have we read it all?
132                        if(payloadOffset == payload.length) {
133                                finishedPayload(payload);
134                        }
135                } else {
136                        // Haven't read enough to make up a payload.  Must read more.
137                        getLogger().debug("Only read %d of the %d needed to fill a header",
138                                headerOffset, MIN_RECV_PACKET);
139                }
140 
141        }
142 
143        protected void finishedPayload(byte[] pl) throws IOException {
144                if(errorCode != 0) {
145                        OperationStatus status=getStatusForErrorCode(errorCode, pl);
146                        if(status == null) {
147                                handleError(OperationErrorType.SERVER, new String(pl));
148                        } else {
149                                getCallback().receivedStatus(status);
150                                transitionState(OperationState.COMPLETE);
151                        }
152                } else {
153                        decodePayload(pl);
154                        transitionState(OperationState.COMPLETE);
155                }
156        }
157 
158        /**
159         * Get the OperationStatus object for the given error code.
160         *
161         * @param errCode the error code
162         * @return the status to return, or null if this is an exceptional case
163         */
164        protected OperationStatus getStatusForErrorCode(int errCode, byte[] errPl) {
165                return null;
166        }
167 
168        /**
169         * Decode the given payload for this command.
170         *
171         * @param pl the payload.
172         */
173        protected void decodePayload(byte[] pl) {
174                assert pl.length == 0 : "Payload has bytes, but decode isn't overridden";
175                getCallback().receivedStatus(STATUS_OK);
176        }
177 
178        /**
179         * Validate an opaque value from the header.
180         * This may be overridden from a subclass where the opaque isn't expected
181         * to always be the same as the request opaque.
182         */
183        protected boolean opaqueIsValid() {
184                if(responseOpaque != opaque) {
185                        getLogger().warn("Expected opaque:  %d, got opaque:  %d\n",
186                                        responseOpaque, opaque);
187                }
188                return responseOpaque == opaque;
189        }
190 
191        static int decodeShort(byte[] data, int i) {
192                return (data[i] & 0xff) << 8
193                        | (data[i+1] & 0xff);
194        }
195 
196        static int decodeInt(byte[] data, int i) {
197                return (data[i]  & 0xff) << 24
198                        | (data[i+1] & 0xff) << 16
199                        | (data[i+2] & 0xff) << 8
200                        | (data[i+3] & 0xff);
201        }
202 
203        static long decodeUnsignedInt(byte[] data, int i) {
204                return ((long)(data[i]  & 0xff) << 24)
205                        | ((data[i+1] & 0xff) << 16)
206                        | ((data[i+2] & 0xff) << 8)
207                        | (data[i+3] & 0xff);
208        }
209 
210        static long decodeLong(byte[] data, int i) {
211                return(data[i  ] & 0xff) << 56
212                        | (data[i+1] & 0xff) << 48
213                        | (data[i+2] & 0xff) << 40
214                        | (data[i+3] & 0xff) << 32
215                        | (data[i+4] & 0xff) << 24
216                        | (data[i+5] & 0xff) << 16
217                        | (data[i+6] & 0xff) << 8
218                        | (data[i+7] & 0xff);
219        }
220 
221        /**
222         * Prepare a send buffer.
223         *
224         * @param key the key (for keyed ops)
225         * @param cas the cas value
226         * @param val the data payload
227         * @param extraHeaders any additional headers that need to be sent
228         */
229        protected void prepareBuffer(String key, long cas, byte[] val,
230                        Object... extraHeaders) {
231                int extraLen=0;
232                for(Object o : extraHeaders) {
233                        if(o instanceof Integer) {
234                                extraLen += 4;
235                        } else if(o instanceof byte[]) {
236                                extraLen += ((byte[])o).length;
237                        } else if(o instanceof Long) {
238                                extraLen += 8;
239                        } else {
240                                assert false : "Unhandled extra header type:  " + o.getClass();
241                        }
242                }
243                final byte[] keyBytes=KeyUtil.getKeyBytes(key);
244                int bufSize=MIN_RECV_PACKET + keyBytes.length + val.length;
245 
246                //        # magic, opcode, keylen, extralen, datatype, [reserved],
247                //    bodylen, opaque, cas
248                //        REQ_PKT_FMT=">BBHBBxxIIQ"
249 
250                // set up the initial header stuff
251                ByteBuffer bb=ByteBuffer.allocate(bufSize + extraLen);
252                assert bb.order() == ByteOrder.BIG_ENDIAN;
253                bb.put(REQ_MAGIC);
254                bb.put((byte)cmd);
255                bb.putShort((short)keyBytes.length);
256                bb.put((byte)extraLen);
257                bb.put((byte)0); // data type
258                bb.putShort((short)0); // reserved
259                bb.putInt(keyBytes.length + val.length + extraLen);
260                bb.putInt(opaque);
261                bb.putLong(cas);
262 
263                // Add the extra headers.
264                for(Object o : extraHeaders) {
265                        if(o instanceof Integer) {
266                                bb.putInt((Integer)o);
267                        } else if(o instanceof byte[]) {
268                                bb.put((byte[])o);
269                        } else if(o instanceof Long) {
270                                bb.putLong((Long)o);
271                        } else {
272                                assert false : "Unhandled extra header type:  " + o.getClass();
273                        }
274                }
275 
276                // Add the normal stuff
277                bb.put(keyBytes);
278                bb.put(val);
279 
280                bb.flip();
281                setBuffer(bb);
282        }
283 
284        /**
285         * Generate an opaque ID.
286         */
287        static int generateOpaque() {
288                int rv = seqNumber.incrementAndGet();
289                while(rv < 0) {
290                        seqNumber.compareAndSet(rv, 0);
291                        rv=seqNumber.incrementAndGet();
292                }
293                return rv;
294        }
295}

[all classes][net.spy.memcached.protocol.binary]
EMMA 2.0.5312 (C) Vladimir Roubtsov