1 | package�net.spy.memcached.protocol.binary; |
2 | � |
3 | import�java.io.IOException; |
4 | import�java.nio.ByteBuffer; |
5 | import�java.util.ArrayList; |
6 | import�java.util.HashMap; |
7 | import�java.util.Iterator; |
8 | import�java.util.List; |
9 | import�java.util.Map; |
10 | � |
11 | import�net.spy.memcached.KeyUtil; |
12 | import�net.spy.memcached.ops.CASOperation; |
13 | import�net.spy.memcached.ops.Operation; |
14 | import�net.spy.memcached.ops.OperationCallback; |
15 | import�net.spy.memcached.ops.OperationState; |
16 | import�net.spy.memcached.ops.OperationStatus; |
17 | import�net.spy.memcached.ops.StoreType; |
18 | � |
19 | public�class�OptimizedSetImpl�extends�OperationImpl�implements�Operation�{ |
20 | � |
21 | ��������private�static�final�OperationCallback�NOOP_CALLBACK�=�new�NoopCallback(); |
22 | � |
23 | ��������private�final�int�terminalOpaque=generateOpaque(); |
24 | ��������private�final�Map<Integer,�OperationCallback>�callbacks�= |
25 | ����������������new�HashMap<Integer,�OperationCallback>(); |
26 | ��������private�final�List<CASOperation>�ops�=�new�ArrayList<CASOperation>(); |
27 | � |
28 | ��������//�If�nothing�else,�this�will�be�a�NOOP. |
29 | ��������private�int�byteCount�=�MIN_RECV_PACKET; |
30 | � |
31 | ��������/** |
32 | ���������*�Construct�an�optimized�get�starting�with�the�given�get�operation. |
33 | ���������*/ |
34 | ��������public�OptimizedSetImpl(CASOperation�firstStore)�{ |
35 | ����������������super(-1,�-1,�NOOP_CALLBACK); |
36 | ����������������addOperation(firstStore); |
37 | ��������} |
38 | � |
39 | ��������public�void�addOperation(CASOperation�op)�{ |
40 | ����������������ops.add(op); |
41 | � |
42 | ����������������//�Count�the�bytes�required�by�this�operation. |
43 | ����������������Iterator<String>�is�=�op.getKeys().iterator(); |
44 | ����������������String�k�=�is.next(); |
45 | ����������������int�keylen�=�KeyUtil.getKeyBytes(k).length; |
46 | � |
47 | ����������������byteCount�+=�MIN_RECV_PACKET�+�StoreOperationImpl.EXTRA_LEN |
48 | ������������������������+�keylen�+�op.getBytes().length; |
49 | ��������} |
50 | � |
51 | ��������public�int�size()�{ |
52 | ����������������return�ops.size(); |
53 | ��������} |
54 | � |
55 | ��������public�int�bytes()�{ |
56 | ����������������return�byteCount; |
57 | ��������} |
58 | � |
59 | ��������@Override |
60 | ��������public�void�initialize()�{ |
61 | ����������������//�Now�create�a�buffer. |
62 | ����������������ByteBuffer�bb=ByteBuffer.allocate(byteCount); |
63 | ����������������for(CASOperation�so�:�ops)�{ |
64 | ������������������������Iterator<String>�is�=�so.getKeys().iterator(); |
65 | ������������������������String�k�=�is.next(); |
66 | ������������������������byte[]�keyBytes�=�KeyUtil.getKeyBytes(k); |
67 | ������������������������assert�!is.hasNext(); |
68 | � |
69 | ������������������������int�myOpaque�=�generateOpaque(); |
70 | ������������������������callbacks.put(myOpaque,�so.getCallback()); |
71 | ������������������������byte[]�data�=�so.getBytes(); |
72 | � |
73 | ������������������������//�Custom�header |
74 | ������������������������bb.put(REQ_MAGIC); |
75 | ������������������������bb.put((byte)cmdMap(so.getStoreType())); |
76 | ������������������������bb.putShort((short)keyBytes.length); |
77 | ������������������������bb.put((byte)StoreOperationImpl.EXTRA_LEN);�//�extralen |
78 | ������������������������bb.put((byte)0);�//�data�type |
79 | ������������������������bb.putShort((short)0);�//�reserved |
80 | ������������������������bb.putInt(keyBytes.length�+�data.length�+ |
81 | ������������������������������������������������StoreOperationImpl.EXTRA_LEN); |
82 | ������������������������bb.putInt(myOpaque); |
83 | ������������������������bb.putLong(so.getCasValue());�//�cas |
84 | ������������������������//�Extras |
85 | ������������������������bb.putInt(so.getFlags()); |
86 | ������������������������bb.putInt(so.getExpiration()); |
87 | ������������������������//�the�actual�key |
88 | ������������������������bb.put(keyBytes); |
89 | ������������������������//�And�the�value |
90 | ������������������������bb.put(data); |
91 | ����������������} |
92 | ����������������//�Add�the�noop |
93 | ����������������bb.put(REQ_MAGIC); |
94 | ����������������bb.put((byte)NoopOperationImpl.CMD); |
95 | ����������������bb.putShort((short)0); |
96 | ����������������bb.put((byte)0);�//�extralen |
97 | ����������������bb.put((byte)0);�//�data�type |
98 | ����������������bb.putShort((short)0);�//�reserved |
99 | ����������������bb.putInt(0); |
100 | ����������������bb.putInt(terminalOpaque); |
101 | ����������������bb.putLong(0);�//�cas |
102 | � |
103 | ����������������bb.flip(); |
104 | ����������������setBuffer(bb); |
105 | ��������} |
106 | � |
107 | ��������private�static�int�cmdMap(StoreType�t)�{ |
108 | ����������������int�rv=-1; |
109 | ����������������switch(t)�{ |
110 | ������������������������case�set:�rv=StoreOperationImpl.SETQ;�break; |
111 | ������������������������case�add:�rv=StoreOperationImpl.ADDQ;�break; |
112 | ������������������������case�replace:�rv=StoreOperationImpl.REPLACEQ;�break; |
113 | ����������������} |
114 | ����������������//�Check�fall-through. |
115 | ����������������assert�rv�!=�-1�:�"Unhandled�store�type:��"�+�t; |
116 | ����������������return�rv; |
117 | ��������} |
118 | � |
119 | ��������@Override |
120 | ��������protected�void�finishedPayload(byte[]�pl)�throws�IOException�{ |
121 | ����������������if(responseOpaque�==�terminalOpaque)�{ |
122 | ������������������������for(OperationCallback�cb�:�callbacks.values())�{ |
123 | ��������������������������������cb.receivedStatus(STATUS_OK); |
124 | ��������������������������������cb.complete(); |
125 | ������������������������} |
126 | ������������������������transitionState(OperationState.COMPLETE); |
127 | ����������������}�else�{ |
128 | ������������������������OperationCallback�cb�=�callbacks.remove(responseOpaque); |
129 | ������������������������assert�cb�!=�null�:�"No�callback�for�"�+�responseOpaque; |
130 | ������������������������assert�errorCode�!=�0�:�"Got�no�error�on�a�quiet�mutation."; |
131 | ������������������������OperationStatus�status=getStatusForErrorCode(errorCode,�pl); |
132 | ������������������������assert�status�!=�null�:�"Got�no�status�for�a�quiet�mutation�error"; |
133 | ������������������������cb.receivedStatus(status); |
134 | ������������������������cb.complete(); |
135 | ����������������} |
136 | ����������������resetInput(); |
137 | ��������} |
138 | � |
139 | ��������@Override |
140 | ��������protected�OperationStatus�getStatusForErrorCode(int�errCode,�byte[]�errPl)�{ |
141 | ����������������OperationStatus�rv=null; |
142 | ����������������switch(errCode)�{ |
143 | ������������������������case�ERR_EXISTS: |
144 | ��������������������������������rv=EXISTS_STATUS; |
145 | ��������������������������������break; |
146 | ������������������������case�ERR_NOT_FOUND: |
147 | ��������������������������������rv=NOT_FOUND_STATUS; |
148 | ��������������������������������break; |
149 | ����������������} |
150 | ����������������return�rv; |
151 | ��������} |
152 | � |
153 | ��������@Override |
154 | ��������protected�boolean�opaqueIsValid()�{ |
155 | ����������������return�responseOpaque�==�terminalOpaque |
156 | ������������������������||�callbacks.containsKey(responseOpaque); |
157 | ��������} |
158 | � |
159 | ��������static�class�NoopCallback�implements�OperationCallback�{ |
160 | � |
161 | ����������������public�void�complete()�{ |
162 | ������������������������//�noop |
163 | ����������������} |
164 | � |
165 | ����������������public�void�receivedStatus(OperationStatus�status)�{ |
166 | ������������������������//�noop |
167 | ����������������} |
168 | � |
169 | ��������} |
170 | � |
171 | } |