1 | package net.spy.memcached.protocol.binary; |
2 | |
3 | import java.io.IOException; |
4 | import java.nio.ByteBuffer; |
5 | import java.util.ArrayList; |
6 | import java.util.HashMap; |
7 | import java.util.Iterator; |
8 | import java.util.List; |
9 | import java.util.Map; |
10 | |
11 | import net.spy.memcached.KeyUtil; |
12 | import net.spy.memcached.ops.CASOperation; |
13 | import net.spy.memcached.ops.Operation; |
14 | import net.spy.memcached.ops.OperationCallback; |
15 | import net.spy.memcached.ops.OperationState; |
16 | import net.spy.memcached.ops.OperationStatus; |
17 | import net.spy.memcached.ops.StoreType; |
18 | |
19 | public class OptimizedSetImpl extends OperationImpl implements Operation { |
20 | |
21 | private static final OperationCallback NOOP_CALLBACK = new NoopCallback(); |
22 | |
23 | private final int terminalOpaque=generateOpaque(); |
24 | private final Map<Integer, OperationCallback> callbacks = |
25 | new HashMap<Integer, OperationCallback>(); |
26 | private final List<CASOperation> ops = new ArrayList<CASOperation>(); |
27 | |
28 | // If nothing else, this will be a NOOP. |
29 | private int byteCount = MIN_RECV_PACKET; |
30 | |
31 | /** |
32 | * Construct an optimized get starting with the given get operation. |
33 | */ |
34 | public OptimizedSetImpl(CASOperation firstStore) { |
35 | super(-1, -1, NOOP_CALLBACK); |
36 | addOperation(firstStore); |
37 | } |
38 | |
39 | public void addOperation(CASOperation op) { |
40 | ops.add(op); |
41 | |
42 | // Count the bytes required by this operation. |
43 | Iterator<String> is = op.getKeys().iterator(); |
44 | String k = is.next(); |
45 | int keylen = KeyUtil.getKeyBytes(k).length; |
46 | |
47 | byteCount += MIN_RECV_PACKET + StoreOperationImpl.EXTRA_LEN |
48 | + keylen + op.getBytes().length; |
49 | } |
50 | |
51 | public int size() { |
52 | return ops.size(); |
53 | } |
54 | |
55 | public int bytes() { |
56 | return byteCount; |
57 | } |
58 | |
59 | @Override |
60 | public void initialize() { |
61 | // Now create a buffer. |
62 | ByteBuffer bb=ByteBuffer.allocate(byteCount); |
63 | for(CASOperation so : ops) { |
64 | Iterator<String> is = so.getKeys().iterator(); |
65 | String k = is.next(); |
66 | byte[] keyBytes = KeyUtil.getKeyBytes(k); |
67 | assert !is.hasNext(); |
68 | |
69 | int myOpaque = generateOpaque(); |
70 | callbacks.put(myOpaque, so.getCallback()); |
71 | byte[] data = so.getBytes(); |
72 | |
73 | // Custom header |
74 | bb.put(REQ_MAGIC); |
75 | bb.put((byte)cmdMap(so.getStoreType())); |
76 | bb.putShort((short)keyBytes.length); |
77 | bb.put((byte)StoreOperationImpl.EXTRA_LEN); // extralen |
78 | bb.put((byte)0); // data type |
79 | bb.putShort((short)0); // reserved |
80 | bb.putInt(keyBytes.length + data.length + |
81 | StoreOperationImpl.EXTRA_LEN); |
82 | bb.putInt(myOpaque); |
83 | bb.putLong(so.getCasValue()); // cas |
84 | // Extras |
85 | bb.putInt(so.getFlags()); |
86 | bb.putInt(so.getExpiration()); |
87 | // the actual key |
88 | bb.put(keyBytes); |
89 | // And the value |
90 | bb.put(data); |
91 | } |
92 | // Add the noop |
93 | bb.put(REQ_MAGIC); |
94 | bb.put((byte)NoopOperationImpl.CMD); |
95 | bb.putShort((short)0); |
96 | bb.put((byte)0); // extralen |
97 | bb.put((byte)0); // data type |
98 | bb.putShort((short)0); // reserved |
99 | bb.putInt(0); |
100 | bb.putInt(terminalOpaque); |
101 | bb.putLong(0); // cas |
102 | |
103 | bb.flip(); |
104 | setBuffer(bb); |
105 | } |
106 | |
107 | private static int cmdMap(StoreType t) { |
108 | int rv=-1; |
109 | switch(t) { |
110 | case set: rv=StoreOperationImpl.SETQ; break; |
111 | case add: rv=StoreOperationImpl.ADDQ; break; |
112 | case replace: rv=StoreOperationImpl.REPLACEQ; break; |
113 | } |
114 | // Check fall-through. |
115 | assert rv != -1 : "Unhandled store type: " + t; |
116 | return rv; |
117 | } |
118 | |
119 | @Override |
120 | protected void finishedPayload(byte[] pl) throws IOException { |
121 | if(responseOpaque == terminalOpaque) { |
122 | for(OperationCallback cb : callbacks.values()) { |
123 | cb.receivedStatus(STATUS_OK); |
124 | cb.complete(); |
125 | } |
126 | transitionState(OperationState.COMPLETE); |
127 | } else { |
128 | OperationCallback cb = callbacks.remove(responseOpaque); |
129 | assert cb != null : "No callback for " + responseOpaque; |
130 | assert errorCode != 0 : "Got no error on a quiet mutation."; |
131 | OperationStatus status=getStatusForErrorCode(errorCode, pl); |
132 | assert status != null : "Got no status for a quiet mutation error"; |
133 | cb.receivedStatus(status); |
134 | cb.complete(); |
135 | } |
136 | resetInput(); |
137 | } |
138 | |
139 | @Override |
140 | protected OperationStatus getStatusForErrorCode(int errCode, byte[] errPl) { |
141 | OperationStatus rv=null; |
142 | switch(errCode) { |
143 | case ERR_EXISTS: |
144 | rv=EXISTS_STATUS; |
145 | break; |
146 | case ERR_NOT_FOUND: |
147 | rv=NOT_FOUND_STATUS; |
148 | break; |
149 | } |
150 | return rv; |
151 | } |
152 | |
153 | @Override |
154 | protected boolean opaqueIsValid() { |
155 | return responseOpaque == terminalOpaque |
156 | || callbacks.containsKey(responseOpaque); |
157 | } |
158 | |
159 | static class NoopCallback implements OperationCallback { |
160 | |
161 | public void complete() { |
162 | // noop |
163 | } |
164 | |
165 | public void receivedStatus(OperationStatus status) { |
166 | // noop |
167 | } |
168 | |
169 | } |
170 | |
171 | } |