1 | package net.spy.memcached.protocol.binary; |
2 | |
3 | import java.io.IOException; |
4 | import java.nio.ByteBuffer; |
5 | import java.nio.ByteOrder; |
6 | import java.util.concurrent.atomic.AtomicInteger; |
7 | |
8 | import net.spy.memcached.CASResponse; |
9 | import net.spy.memcached.KeyUtil; |
10 | import net.spy.memcached.ops.CASOperationStatus; |
11 | import net.spy.memcached.ops.OperationCallback; |
12 | import net.spy.memcached.ops.OperationErrorType; |
13 | import net.spy.memcached.ops.OperationState; |
14 | import net.spy.memcached.ops.OperationStatus; |
15 | import net.spy.memcached.protocol.BaseOperationImpl; |
16 | |
17 | /** |
18 | * Base class for binary operations. |
19 | */ |
20 | abstract class OperationImpl extends BaseOperationImpl { |
21 | |
22 | protected static final byte REQ_MAGIC = (byte)0x80; |
23 | protected static final byte RES_MAGIC = (byte)0x81; |
24 | protected static final int MIN_RECV_PACKET=24; |
25 | |
26 | /** |
27 | * Error code for items that were not found. |
28 | */ |
29 | protected static final int ERR_NOT_FOUND = 1; |
30 | protected static final int ERR_EXISTS = 2; |
31 | protected static final int ERR_EINVAL = 4; |
32 | protected static final int ERR_NOT_STORED = 5; |
33 | |
34 | protected static final OperationStatus NOT_FOUND_STATUS = |
35 | new CASOperationStatus(false, "Not Found", CASResponse.NOT_FOUND); |
36 | protected static final OperationStatus EXISTS_STATUS = |
37 | new CASOperationStatus(false, "Object exists", CASResponse.EXISTS); |
38 | protected static final OperationStatus NOT_STORED_STATUS = |
39 | new CASOperationStatus(false, "Not Stored", CASResponse.NOT_FOUND); |
40 | |
41 | protected static final byte[] EMPTY_BYTES = new byte[0]; |
42 | |
43 | protected static final OperationStatus STATUS_OK = |
44 | new CASOperationStatus(true, "OK", CASResponse.OK); |
45 | |
46 | private static final AtomicInteger seqNumber=new AtomicInteger(0); |
47 | |
48 | // request header fields |
49 | private final int cmd; |
50 | protected final int opaque; |
51 | |
52 | private final byte[] header=new byte[MIN_RECV_PACKET]; |
53 | private int headerOffset=0; |
54 | private byte[] payload=null; |
55 | |
56 | // Response header fields |
57 | protected int keyLen; |
58 | protected int responseCmd; |
59 | protected int errorCode; |
60 | protected int responseOpaque; |
61 | protected long responseCas; |
62 | |
63 | private int payloadOffset=0; |
64 | |
65 | /** |
66 | * Construct with opaque. |
67 | * |
68 | * @param o the opaque value. |
69 | * @param cb |
70 | */ |
71 | protected OperationImpl(int c, int o, OperationCallback cb) { |
72 | super(); |
73 | cmd=c; |
74 | opaque=o; |
75 | setCallback(cb); |
76 | } |
77 | |
78 | protected void resetInput() { |
79 | payload=null; |
80 | payloadOffset=0; |
81 | headerOffset=0; |
82 | } |
83 | |
84 | // Base response packet format: |
85 | // 0 1 2 3 4 5 6 7 8 9 10 11 |
86 | // # magic, opcode, keylen, extralen, datatype, status, bodylen, |
87 | // 12,3,4,5 16 |
88 | // opaque, cas |
89 | // RES_PKT_FMT=">BBHBBHIIQ" |
90 | |
91 | @Override |
92 | public void readFromBuffer(ByteBuffer b) throws IOException { |
93 | // First process headers if we haven't completed them yet |
94 | if(headerOffset < MIN_RECV_PACKET) { |
95 | int toRead=MIN_RECV_PACKET - headerOffset; |
96 | int available=b.remaining(); |
97 | toRead=Math.min(toRead, available); |
98 | getLogger().debug("Reading %d header bytes", toRead); |
99 | b.get(header, headerOffset, toRead); |
100 | headerOffset+=toRead; |
101 | |
102 | // We've completed reading the header. Prepare body read. |
103 | if(headerOffset == MIN_RECV_PACKET) { |
104 | int magic=header[0]; |
105 | assert magic == RES_MAGIC : "Invalid magic: " + magic; |
106 | responseCmd=header[1]; |
107 | assert cmd == -1 || responseCmd == cmd |
108 | : "Unexpected response command value"; |
109 | keyLen=decodeShort(header, 2); |
110 | // TODO: Examine extralen and datatype |
111 | errorCode=decodeShort(header, 6); |
112 | int bytesToRead=decodeInt(header, 8); |
113 | payload=new byte[bytesToRead]; |
114 | responseOpaque=decodeInt(header, 12); |
115 | responseCas=decodeLong(header, 16); |
116 | assert opaqueIsValid() : "Opaque is not valid"; |
117 | } |
118 | } |
119 | |
120 | // Now process the payload if we can. |
121 | if(headerOffset >= MIN_RECV_PACKET && payload == null) { |
122 | finishedPayload(EMPTY_BYTES); |
123 | } else if(payload != null) { |
124 | int toRead=payload.length - payloadOffset; |
125 | int available=b.remaining(); |
126 | toRead=Math.min(toRead, available); |
127 | getLogger().debug("Reading %d payload bytes", toRead); |
128 | b.get(payload, payloadOffset, toRead); |
129 | payloadOffset+=toRead; |
130 | |
131 | // Have we read it all? |
132 | if(payloadOffset == payload.length) { |
133 | finishedPayload(payload); |
134 | } |
135 | } else { |
136 | // Haven't read enough to make up a payload. Must read more. |
137 | getLogger().debug("Only read %d of the %d needed to fill a header", |
138 | headerOffset, MIN_RECV_PACKET); |
139 | } |
140 | |
141 | } |
142 | |
143 | protected void finishedPayload(byte[] pl) throws IOException { |
144 | if(errorCode != 0) { |
145 | OperationStatus status=getStatusForErrorCode(errorCode, pl); |
146 | if(status == null) { |
147 | handleError(OperationErrorType.SERVER, new String(pl)); |
148 | } else { |
149 | getCallback().receivedStatus(status); |
150 | transitionState(OperationState.COMPLETE); |
151 | } |
152 | } else { |
153 | decodePayload(pl); |
154 | transitionState(OperationState.COMPLETE); |
155 | } |
156 | } |
157 | |
158 | /** |
159 | * Get the OperationStatus object for the given error code. |
160 | * |
161 | * @param errCode the error code |
162 | * @return the status to return, or null if this is an exceptional case |
163 | */ |
164 | protected OperationStatus getStatusForErrorCode(int errCode, byte[] errPl) { |
165 | return null; |
166 | } |
167 | |
168 | /** |
169 | * Decode the given payload for this command. |
170 | * |
171 | * @param pl the payload. |
172 | */ |
173 | protected void decodePayload(byte[] pl) { |
174 | assert pl.length == 0 : "Payload has bytes, but decode isn't overridden"; |
175 | getCallback().receivedStatus(STATUS_OK); |
176 | } |
177 | |
178 | /** |
179 | * Validate an opaque value from the header. |
180 | * This may be overridden from a subclass where the opaque isn't expected |
181 | * to always be the same as the request opaque. |
182 | */ |
183 | protected boolean opaqueIsValid() { |
184 | if(responseOpaque != opaque) { |
185 | getLogger().warn("Expected opaque: %d, got opaque: %d\n", |
186 | responseOpaque, opaque); |
187 | } |
188 | return responseOpaque == opaque; |
189 | } |
190 | |
191 | static int decodeShort(byte[] data, int i) { |
192 | return (data[i] & 0xff) << 8 |
193 | | (data[i+1] & 0xff); |
194 | } |
195 | |
196 | static int decodeInt(byte[] data, int i) { |
197 | return (data[i] & 0xff) << 24 |
198 | | (data[i+1] & 0xff) << 16 |
199 | | (data[i+2] & 0xff) << 8 |
200 | | (data[i+3] & 0xff); |
201 | } |
202 | |
203 | static long decodeUnsignedInt(byte[] data, int i) { |
204 | return ((long)(data[i] & 0xff) << 24) |
205 | | ((data[i+1] & 0xff) << 16) |
206 | | ((data[i+2] & 0xff) << 8) |
207 | | (data[i+3] & 0xff); |
208 | } |
209 | |
210 | static long decodeLong(byte[] data, int i) { |
211 | return(data[i ] & 0xff) << 56 |
212 | | (data[i+1] & 0xff) << 48 |
213 | | (data[i+2] & 0xff) << 40 |
214 | | (data[i+3] & 0xff) << 32 |
215 | | (data[i+4] & 0xff) << 24 |
216 | | (data[i+5] & 0xff) << 16 |
217 | | (data[i+6] & 0xff) << 8 |
218 | | (data[i+7] & 0xff); |
219 | } |
220 | |
221 | /** |
222 | * Prepare a send buffer. |
223 | * |
224 | * @param key the key (for keyed ops) |
225 | * @param cas the cas value |
226 | * @param val the data payload |
227 | * @param extraHeaders any additional headers that need to be sent |
228 | */ |
229 | protected void prepareBuffer(String key, long cas, byte[] val, |
230 | Object... extraHeaders) { |
231 | int extraLen=0; |
232 | for(Object o : extraHeaders) { |
233 | if(o instanceof Integer) { |
234 | extraLen += 4; |
235 | } else if(o instanceof byte[]) { |
236 | extraLen += ((byte[])o).length; |
237 | } else if(o instanceof Long) { |
238 | extraLen += 8; |
239 | } else { |
240 | assert false : "Unhandled extra header type: " + o.getClass(); |
241 | } |
242 | } |
243 | final byte[] keyBytes=KeyUtil.getKeyBytes(key); |
244 | int bufSize=MIN_RECV_PACKET + keyBytes.length + val.length; |
245 | |
246 | // # magic, opcode, keylen, extralen, datatype, [reserved], |
247 | // bodylen, opaque, cas |
248 | // REQ_PKT_FMT=">BBHBBxxIIQ" |
249 | |
250 | // set up the initial header stuff |
251 | ByteBuffer bb=ByteBuffer.allocate(bufSize + extraLen); |
252 | assert bb.order() == ByteOrder.BIG_ENDIAN; |
253 | bb.put(REQ_MAGIC); |
254 | bb.put((byte)cmd); |
255 | bb.putShort((short)keyBytes.length); |
256 | bb.put((byte)extraLen); |
257 | bb.put((byte)0); // data type |
258 | bb.putShort((short)0); // reserved |
259 | bb.putInt(keyBytes.length + val.length + extraLen); |
260 | bb.putInt(opaque); |
261 | bb.putLong(cas); |
262 | |
263 | // Add the extra headers. |
264 | for(Object o : extraHeaders) { |
265 | if(o instanceof Integer) { |
266 | bb.putInt((Integer)o); |
267 | } else if(o instanceof byte[]) { |
268 | bb.put((byte[])o); |
269 | } else if(o instanceof Long) { |
270 | bb.putLong((Long)o); |
271 | } else { |
272 | assert false : "Unhandled extra header type: " + o.getClass(); |
273 | } |
274 | } |
275 | |
276 | // Add the normal stuff |
277 | bb.put(keyBytes); |
278 | bb.put(val); |
279 | |
280 | bb.flip(); |
281 | setBuffer(bb); |
282 | } |
283 | |
284 | /** |
285 | * Generate an opaque ID. |
286 | */ |
287 | static int generateOpaque() { |
288 | int rv = seqNumber.incrementAndGet(); |
289 | while(rv < 0) { |
290 | seqNumber.compareAndSet(rv, 0); |
291 | rv=seqNumber.incrementAndGet(); |
292 | } |
293 | return rv; |
294 | } |
295 | } |