1 | package�net.spy.memcached.protocol.binary; |
2 | � |
3 | import�java.io.IOException; |
4 | import�java.nio.ByteBuffer; |
5 | import�java.nio.ByteOrder; |
6 | import�java.util.concurrent.atomic.AtomicInteger; |
7 | � |
8 | import�net.spy.memcached.CASResponse; |
9 | import�net.spy.memcached.KeyUtil; |
10 | import�net.spy.memcached.ops.CASOperationStatus; |
11 | import�net.spy.memcached.ops.OperationCallback; |
12 | import�net.spy.memcached.ops.OperationErrorType; |
13 | import�net.spy.memcached.ops.OperationState; |
14 | import�net.spy.memcached.ops.OperationStatus; |
15 | import�net.spy.memcached.protocol.BaseOperationImpl; |
16 | � |
17 | /** |
18 | �*�Base�class�for�binary�operations. |
19 | �*/ |
20 | abstract�class�OperationImpl�extends�BaseOperationImpl�{ |
21 | � |
22 | ��������protected�static�final�byte�REQ_MAGIC�=�(byte)0x80; |
23 | ��������protected�static�final�byte�RES_MAGIC�=�(byte)0x81; |
24 | ��������protected�static�final�int�MIN_RECV_PACKET=24; |
25 | � |
26 | ��������/** |
27 | ���������*�Error�code�for�items�that�were�not�found. |
28 | ���������*/ |
29 | ��������protected�static�final�int�ERR_NOT_FOUND�=�1; |
30 | ��������protected�static�final�int�ERR_EXISTS�=�2; |
31 | ��������protected�static�final�int�ERR_EINVAL�=�4; |
32 | ��������protected�static�final�int�ERR_NOT_STORED�=�5; |
33 | � |
34 | ��������protected�static�final�OperationStatus�NOT_FOUND_STATUS�= |
35 | ����������������new�CASOperationStatus(false,�"Not�Found",�CASResponse.NOT_FOUND); |
36 | ��������protected�static�final�OperationStatus�EXISTS_STATUS�= |
37 | ����������������new�CASOperationStatus(false,�"Object�exists",�CASResponse.EXISTS); |
38 | ��������protected�static�final�OperationStatus�NOT_STORED_STATUS�= |
39 | ����������������new�CASOperationStatus(false,�"Not�Stored",�CASResponse.NOT_FOUND); |
40 | � |
41 | ��������protected�static�final�byte[]�EMPTY_BYTES�=�new�byte[0]; |
42 | � |
43 | ��������protected�static�final�OperationStatus�STATUS_OK�= |
44 | ����������������new�CASOperationStatus(true,�"OK",�CASResponse.OK); |
45 | � |
46 | ��������private�static�final�AtomicInteger�seqNumber=new�AtomicInteger(0); |
47 | � |
48 | ��������//�request�header�fields |
49 | ��������private�final�int�cmd; |
50 | ��������protected�final�int�opaque; |
51 | � |
52 | ��������private�final�byte[]�header=new�byte[MIN_RECV_PACKET]; |
53 | ��������private�int�headerOffset=0; |
54 | ��������private�byte[]�payload=null; |
55 | � |
56 | ��������//�Response�header�fields |
57 | ��������protected�int�keyLen; |
58 | ��������protected�int�responseCmd; |
59 | ��������protected�int�errorCode; |
60 | ��������protected�int�responseOpaque; |
61 | ��������protected�long�responseCas; |
62 | � |
63 | ��������private�int�payloadOffset=0; |
64 | � |
65 | ��������/** |
66 | ���������*�Construct�with�opaque. |
67 | ���������* |
68 | ���������*�@param�o�the�opaque�value. |
69 | ���������*�@param�cb |
70 | ���������*/ |
71 | ��������protected�OperationImpl(int�c,�int�o,�OperationCallback�cb)�{ |
72 | ����������������super(); |
73 | ����������������cmd=c; |
74 | ����������������opaque=o; |
75 | ����������������setCallback(cb); |
76 | ��������} |
77 | � |
78 | ��������protected�void�resetInput()�{ |
79 | ����������������payload=null; |
80 | ����������������payloadOffset=0; |
81 | ����������������headerOffset=0; |
82 | ��������} |
83 | � |
84 | ��������//�Base�response�packet�format: |
85 | ��������//����0������1�������2��3����4���������5���������6��7����8�9�10�11 |
86 | ��������//��������#�magic,�opcode,�keylen,�extralen,�datatype,�status,�bodylen, |
87 | ��������//����12,3,4,5��16 |
88 | ��������//����opaque,�cas |
89 | ��������//��������RES_PKT_FMT=">BBHBBHIIQ" |
90 | � |
91 | ��������@Override |
92 | ��������public�void�readFromBuffer(ByteBuffer�b)�throws�IOException�{ |
93 | ����������������//�First�process�headers�if�we�haven't�completed�them�yet |
94 | ����������������if(headerOffset�<�MIN_RECV_PACKET)�{ |
95 | ������������������������int�toRead=MIN_RECV_PACKET�-�headerOffset; |
96 | ������������������������int�available=b.remaining(); |
97 | ������������������������toRead=Math.min(toRead,�available); |
98 | ������������������������getLogger().debug("Reading�%d�header�bytes",�toRead); |
99 | ������������������������b.get(header,�headerOffset,�toRead); |
100 | ������������������������headerOffset+=toRead; |
101 | � |
102 | ������������������������//�We've�completed�reading�the�header.��Prepare�body�read. |
103 | ������������������������if(headerOffset�==�MIN_RECV_PACKET)�{ |
104 | ��������������������������������int�magic=header[0]; |
105 | ��������������������������������assert�magic�==�RES_MAGIC�:�"Invalid�magic:��"�+�magic; |
106 | ��������������������������������responseCmd=header[1]; |
107 | ��������������������������������assert�cmd�==�-1�||�responseCmd�==�cmd |
108 | ����������������������������������������:�"Unexpected�response�command�value"; |
109 | ��������������������������������keyLen=decodeShort(header,�2); |
110 | ��������������������������������//�TODO:��Examine�extralen�and�datatype |
111 | ��������������������������������errorCode=decodeShort(header,�6); |
112 | ��������������������������������int�bytesToRead=decodeInt(header,�8); |
113 | ��������������������������������payload=new�byte[bytesToRead]; |
114 | ��������������������������������responseOpaque=decodeInt(header,�12); |
115 | ��������������������������������responseCas=decodeLong(header,�16); |
116 | ��������������������������������assert�opaqueIsValid()�:�"Opaque�is�not�valid"; |
117 | ������������������������} |
118 | ����������������} |
119 | � |
120 | ����������������//�Now�process�the�payload�if�we�can. |
121 | ����������������if(headerOffset�>=�MIN_RECV_PACKET�&&�payload�==�null)�{ |
122 | ������������������������finishedPayload(EMPTY_BYTES); |
123 | ����������������}�else�if(payload�!=�null)�{ |
124 | ������������������������int�toRead=payload.length�-�payloadOffset; |
125 | ������������������������int�available=b.remaining(); |
126 | ������������������������toRead=Math.min(toRead,�available); |
127 | ������������������������getLogger().debug("Reading�%d�payload�bytes",�toRead); |
128 | ������������������������b.get(payload,�payloadOffset,�toRead); |
129 | ������������������������payloadOffset+=toRead; |
130 | � |
131 | ������������������������//�Have�we�read�it�all? |
132 | ������������������������if(payloadOffset�==�payload.length)�{ |
133 | ��������������������������������finishedPayload(payload); |
134 | ������������������������} |
135 | ����������������}�else�{ |
136 | ������������������������//�Haven't�read�enough�to�make�up�a�payload.��Must�read�more. |
137 | ������������������������getLogger().debug("Only�read�%d�of�the�%d�needed�to�fill�a�header", |
138 | ��������������������������������headerOffset,�MIN_RECV_PACKET); |
139 | ����������������} |
140 | � |
141 | ��������} |
142 | � |
143 | ��������protected�void�finishedPayload(byte[]�pl)�throws�IOException�{ |
144 | ����������������if(errorCode�!=�0)�{ |
145 | ������������������������OperationStatus�status=getStatusForErrorCode(errorCode,�pl); |
146 | ������������������������if(status�==�null)�{ |
147 | ��������������������������������handleError(OperationErrorType.SERVER,�new�String(pl)); |
148 | ������������������������}�else�{ |
149 | ��������������������������������getCallback().receivedStatus(status); |
150 | ��������������������������������transitionState(OperationState.COMPLETE); |
151 | ������������������������} |
152 | ����������������}�else�{ |
153 | ������������������������decodePayload(pl); |
154 | ������������������������transitionState(OperationState.COMPLETE); |
155 | ����������������} |
156 | ��������} |
157 | � |
158 | ��������/** |
159 | ���������*�Get�the�OperationStatus�object�for�the�given�error�code. |
160 | ���������* |
161 | ���������*�@param�errCode�the�error�code |
162 | ���������*�@return�the�status�to�return,�or�null�if�this�is�an�exceptional�case |
163 | ���������*/ |
164 | ��������protected�OperationStatus�getStatusForErrorCode(int�errCode,�byte[]�errPl)�{ |
165 | ����������������return�null; |
166 | ��������} |
167 | � |
168 | ��������/** |
169 | ���������*�Decode�the�given�payload�for�this�command. |
170 | ���������* |
171 | ���������*�@param�pl�the�payload. |
172 | ���������*/ |
173 | ��������protected�void�decodePayload(byte[]�pl)�{ |
174 | ����������������assert�pl.length�==�0�:�"Payload�has�bytes,�but�decode�isn't�overridden"; |
175 | ����������������getCallback().receivedStatus(STATUS_OK); |
176 | ��������} |
177 | � |
178 | ��������/** |
179 | ���������*�Validate�an�opaque�value�from�the�header. |
180 | ���������*�This�may�be�overridden�from�a�subclass�where�the�opaque�isn't�expected |
181 | ���������*�to�always�be�the�same�as�the�request�opaque. |
182 | ���������*/ |
183 | ��������protected�boolean�opaqueIsValid()�{ |
184 | ����������������if(responseOpaque�!=�opaque)�{ |
185 | ������������������������getLogger().warn("Expected�opaque:��%d,�got�opaque:��%d\n", |
186 | ����������������������������������������responseOpaque,�opaque); |
187 | ����������������} |
188 | ����������������return�responseOpaque�==�opaque; |
189 | ��������} |
190 | � |
191 | ��������static�int�decodeShort(byte[]�data,�int�i)�{ |
192 | ����������������return�(data[i]�&�0xff)�<<�8 |
193 | ������������������������|�(data[i+1]�&�0xff); |
194 | ��������} |
195 | � |
196 | ��������static�int�decodeInt(byte[]�data,�int�i)�{ |
197 | ����������������return�(data[i]��&�0xff)�<<�24 |
198 | ������������������������|�(data[i+1]�&�0xff)�<<�16 |
199 | ������������������������|�(data[i+2]�&�0xff)�<<�8 |
200 | ������������������������|�(data[i+3]�&�0xff); |
201 | ��������} |
202 | � |
203 | ��������static�long�decodeUnsignedInt(byte[]�data,�int�i)�{ |
204 | ����������������return�((long)(data[i]��&�0xff)�<<�24) |
205 | ������������������������|�((data[i+1]�&�0xff)�<<�16) |
206 | ������������������������|�((data[i+2]�&�0xff)�<<�8) |
207 | ������������������������|�(data[i+3]�&�0xff); |
208 | ��������} |
209 | � |
210 | ��������static�long�decodeLong(byte[]�data,�int�i)�{ |
211 | ����������������return(data[i��]�&�0xff)�<<�56 |
212 | ������������������������|�(data[i+1]�&�0xff)�<<�48 |
213 | ������������������������|�(data[i+2]�&�0xff)�<<�40 |
214 | ������������������������|�(data[i+3]�&�0xff)�<<�32 |
215 | ������������������������|�(data[i+4]�&�0xff)�<<�24 |
216 | ������������������������|�(data[i+5]�&�0xff)�<<�16 |
217 | ������������������������|�(data[i+6]�&�0xff)�<<�8 |
218 | ������������������������|�(data[i+7]�&�0xff); |
219 | ��������} |
220 | � |
221 | ��������/** |
222 | ���������*�Prepare�a�send�buffer. |
223 | ���������* |
224 | ���������*�@param�key�the�key�(for�keyed�ops) |
225 | ���������*�@param�cas�the�cas�value |
226 | ���������*�@param�val�the�data�payload |
227 | ���������*�@param�extraHeaders�any�additional�headers�that�need�to�be�sent |
228 | ���������*/ |
229 | ��������protected�void�prepareBuffer(String�key,�long�cas,�byte[]�val, |
230 | ������������������������Object...�extraHeaders)�{ |
231 | ����������������int�extraLen=0; |
232 | ����������������for(Object�o�:�extraHeaders)�{ |
233 | ������������������������if(o�instanceof�Integer)�{ |
234 | ��������������������������������extraLen�+=�4; |
235 | ������������������������}�else�if(o�instanceof�byte[])�{ |
236 | ��������������������������������extraLen�+=�((byte[])o).length; |
237 | ������������������������}�else�if(o�instanceof�Long)�{ |
238 | ��������������������������������extraLen�+=�8; |
239 | ������������������������}�else�{ |
240 | ��������������������������������assert�false�:�"Unhandled�extra�header�type:��"�+�o.getClass(); |
241 | ������������������������} |
242 | ����������������} |
243 | ����������������final�byte[]�keyBytes=KeyUtil.getKeyBytes(key); |
244 | ����������������int�bufSize=MIN_RECV_PACKET�+�keyBytes.length�+�val.length; |
245 | � |
246 | ����������������//��������#�magic,�opcode,�keylen,�extralen,�datatype,�[reserved], |
247 | ����������������//����bodylen,�opaque,�cas |
248 | ����������������//��������REQ_PKT_FMT=">BBHBBxxIIQ" |
249 | � |
250 | ����������������//�set�up�the�initial�header�stuff |
251 | ����������������ByteBuffer�bb=ByteBuffer.allocate(bufSize�+�extraLen); |
252 | ����������������assert�bb.order()�==�ByteOrder.BIG_ENDIAN; |
253 | ����������������bb.put(REQ_MAGIC); |
254 | ����������������bb.put((byte)cmd); |
255 | ����������������bb.putShort((short)keyBytes.length); |
256 | ����������������bb.put((byte)extraLen); |
257 | ����������������bb.put((byte)0);�//�data�type |
258 | ����������������bb.putShort((short)0);�//�reserved |
259 | ����������������bb.putInt(keyBytes.length�+�val.length�+�extraLen); |
260 | ����������������bb.putInt(opaque); |
261 | ����������������bb.putLong(cas); |
262 | � |
263 | ����������������//�Add�the�extra�headers. |
264 | ����������������for(Object�o�:�extraHeaders)�{ |
265 | ������������������������if(o�instanceof�Integer)�{ |
266 | ��������������������������������bb.putInt((Integer)o); |
267 | ������������������������}�else�if(o�instanceof�byte[])�{ |
268 | ��������������������������������bb.put((byte[])o); |
269 | ������������������������}�else�if(o�instanceof�Long)�{ |
270 | ��������������������������������bb.putLong((Long)o); |
271 | ������������������������}�else�{ |
272 | ��������������������������������assert�false�:�"Unhandled�extra�header�type:��"�+�o.getClass(); |
273 | ������������������������} |
274 | ����������������} |
275 | � |
276 | ����������������//�Add�the�normal�stuff |
277 | ����������������bb.put(keyBytes); |
278 | ����������������bb.put(val); |
279 | � |
280 | ����������������bb.flip(); |
281 | ����������������setBuffer(bb); |
282 | ��������} |
283 | � |
284 | ��������/** |
285 | ���������*�Generate�an�opaque�ID. |
286 | ���������*/ |
287 | ��������static�int�generateOpaque()�{ |
288 | ����������������int�rv�=�seqNumber.incrementAndGet(); |
289 | ����������������while(rv�<�0)�{ |
290 | ������������������������seqNumber.compareAndSet(rv,�0); |
291 | ������������������������rv=seqNumber.incrementAndGet(); |
292 | ����������������} |
293 | ����������������return�rv; |
294 | ��������} |
295 | } |