1 | package net.spy.memcached.protocol.binary; |
2 | |
3 | import java.net.SocketAddress; |
4 | import java.nio.channels.SocketChannel; |
5 | import java.util.concurrent.BlockingQueue; |
6 | |
7 | import net.spy.memcached.ops.CASOperation; |
8 | import net.spy.memcached.ops.GetOperation; |
9 | import net.spy.memcached.ops.Operation; |
10 | import net.spy.memcached.ops.OperationState; |
11 | import net.spy.memcached.ops.StoreOperation; |
12 | import net.spy.memcached.protocol.ProxyCallback; |
13 | import net.spy.memcached.protocol.TCPMemcachedNodeImpl; |
14 | |
15 | /** |
16 | * Implementation of MemcachedNode for speakers of the binary protocol. |
17 | */ |
18 | public class BinaryMemcachedNodeImpl extends TCPMemcachedNodeImpl { |
19 | |
20 | private final int MAX_SET_OPTIMIZATION_COUNT = 65535; |
21 | private final int MAX_SET_OPTIMIZATION_BYTES = 2 * 1024 * 1024; |
22 | |
23 | public BinaryMemcachedNodeImpl(SocketAddress sa, SocketChannel c, |
24 | int bufSize, BlockingQueue<Operation> rq, |
25 | BlockingQueue<Operation> wq, BlockingQueue<Operation> iq) { |
26 | super(sa, c, bufSize, rq, wq, iq); |
27 | } |
28 | |
29 | @Override |
30 | protected void optimize() { |
31 | Operation firstOp = writeQ.peek(); |
32 | if(firstOp instanceof GetOperation) { |
33 | optimizeGets(); |
34 | } else if(firstOp instanceof CASOperation) { |
35 | optimizeSets(); |
36 | } |
37 | } |
38 | |
39 | private void optimizeGets() { |
40 | // make sure there are at least two get operations in a row before |
41 | // attempting to optimize them. |
42 | optimizedOp=writeQ.remove(); |
43 | if(writeQ.peek() instanceof GetOperation) { |
44 | OptimizedGetImpl og=new OptimizedGetImpl( |
45 | (GetOperation)optimizedOp); |
46 | optimizedOp=og; |
47 | |
48 | while(writeQ.peek() instanceof GetOperation) { |
49 | GetOperation o=(GetOperation) writeQ.remove(); |
50 | if(!o.isCancelled()) { |
51 | og.addOperation(o); |
52 | } |
53 | } |
54 | |
55 | // Initialize the new mega get |
56 | optimizedOp.initialize(); |
57 | assert optimizedOp.getState() == OperationState.WRITING; |
58 | ProxyCallback pcb=(ProxyCallback) og.getCallback(); |
59 | getLogger().debug("Set up %s with %s keys and %s callbacks", |
60 | this, pcb.numKeys(), pcb.numCallbacks()); |
61 | } |
62 | } |
63 | |
64 | private void optimizeSets() { |
65 | // make sure there are at least two get operations in a row before |
66 | // attempting to optimize them. |
67 | optimizedOp=writeQ.remove(); |
68 | if(writeQ.peek() instanceof CASOperation) { |
69 | OptimizedSetImpl og=new OptimizedSetImpl( |
70 | (CASOperation)optimizedOp); |
71 | optimizedOp=og; |
72 | |
73 | while(writeQ.peek() instanceof StoreOperation |
74 | && og.size() < MAX_SET_OPTIMIZATION_COUNT |
75 | && og.bytes() < MAX_SET_OPTIMIZATION_BYTES) { |
76 | CASOperation o=(CASOperation) writeQ.remove(); |
77 | if(!o.isCancelled()) { |
78 | og.addOperation(o); |
79 | } |
80 | } |
81 | |
82 | // Initialize the new mega set |
83 | optimizedOp.initialize(); |
84 | assert optimizedOp.getState() == OperationState.WRITING; |
85 | } |
86 | } |
87 | } |