| 1 | package net.spy.memcached; |
| 2 | |
| 3 | import java.util.ArrayList; |
| 4 | import java.util.Collection; |
| 5 | import java.util.Iterator; |
| 6 | import java.util.List; |
| 7 | import java.util.Map; |
| 8 | import java.util.SortedMap; |
| 9 | import java.util.TreeMap; |
| 10 | |
| 11 | import net.spy.memcached.compat.SpyObject; |
| 12 | import net.spy.memcached.util.DefaultKetamaNodeLocatorConfiguration; |
| 13 | import net.spy.memcached.util.KetamaNodeLocatorConfiguration; |
| 14 | |
| 15 | /** |
| 16 | * This is an implementation of the Ketama consistent hash strategy from |
| 17 | * last.fm. This implementation may not be compatible with libketama as |
| 18 | * hashing is considered separate from node location. |
| 19 | * |
| 20 | * Note that this implementation does not currently supported weighted nodes. |
| 21 | * |
| 22 | * @see <a href="http://www.last.fm/user/RJ/journal/2007/04/10/392555/">RJ's blog post</a> |
| 23 | */ |
| 24 | public final class KetamaNodeLocator extends SpyObject implements NodeLocator { |
| 25 | |
| 26 | |
| 27 | final SortedMap<Long, MemcachedNode> ketamaNodes; |
| 28 | final Collection<MemcachedNode> allNodes; |
| 29 | |
| 30 | final HashAlgorithm hashAlg; |
| 31 | final KetamaNodeLocatorConfiguration config; |
| 32 | |
| 33 | |
| 34 | public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg) { |
| 35 | this(nodes, alg, new DefaultKetamaNodeLocatorConfiguration()); |
| 36 | } |
| 37 | |
| 38 | public KetamaNodeLocator(List<MemcachedNode> nodes, HashAlgorithm alg, KetamaNodeLocatorConfiguration conf) { |
| 39 | super(); |
| 40 | allNodes = nodes; |
| 41 | hashAlg = alg; |
| 42 | ketamaNodes=new TreeMap<Long, MemcachedNode>(); |
| 43 | config= conf; |
| 44 | |
| 45 | int numReps= config.getNodeRepetitions(); |
| 46 | for(MemcachedNode node : nodes) { |
| 47 | // Ketama does some special work with md5 where it reuses chunks. |
| 48 | if(alg == HashAlgorithm.KETAMA_HASH) { |
| 49 | for(int i=0; i<numReps / 4; i++) { |
| 50 | byte[] digest=HashAlgorithm.computeMd5(config.getKeyForNode(node, i)); |
| 51 | for(int h=0;h<4;h++) { |
| 52 | Long k = ((long)(digest[3+h*4]&0xFF) << 24) |
| 53 | | ((long)(digest[2+h*4]&0xFF) << 16) |
| 54 | | ((long)(digest[1+h*4]&0xFF) << 8) |
| 55 | | (digest[h*4]&0xFF); |
| 56 | ketamaNodes.put(k, node); |
| 57 | } |
| 58 | |
| 59 | } |
| 60 | } else { |
| 61 | for(int i=0; i<numReps; i++) { |
| 62 | |
| 63 | ketamaNodes.put(hashAlg.hash(config.getKeyForNode(node, i)), node); |
| 64 | } |
| 65 | } |
| 66 | } |
| 67 | assert ketamaNodes.size() == numReps * nodes.size(); |
| 68 | } |
| 69 | |
| 70 | private KetamaNodeLocator(SortedMap<Long, MemcachedNode> smn, |
| 71 | Collection<MemcachedNode> an, HashAlgorithm alg, KetamaNodeLocatorConfiguration conf) { |
| 72 | super(); |
| 73 | ketamaNodes=smn; |
| 74 | allNodes=an; |
| 75 | hashAlg=alg; |
| 76 | config=conf; |
| 77 | } |
| 78 | |
| 79 | public Collection<MemcachedNode> getAll() { |
| 80 | return allNodes; |
| 81 | } |
| 82 | |
| 83 | public MemcachedNode getPrimary(final String k) { |
| 84 | MemcachedNode rv=getNodeForKey(hashAlg.hash(k)); |
| 85 | assert rv != null : "Found no node for key " + k; |
| 86 | return rv; |
| 87 | } |
| 88 | |
| 89 | long getMaxKey() { |
| 90 | return ketamaNodes.lastKey(); |
| 91 | } |
| 92 | |
| 93 | MemcachedNode getNodeForKey(long hash) { |
| 94 | final MemcachedNode rv; |
| 95 | if(!ketamaNodes.containsKey(hash)) { |
| 96 | // Java 1.6 adds a ceilingKey method, but I'm still stuck in 1.5 |
| 97 | // in a lot of places, so I'm doing this myself. |
| 98 | SortedMap<Long, MemcachedNode> tailMap=ketamaNodes.tailMap(hash); |
| 99 | if(tailMap.isEmpty()) { |
| 100 | hash=ketamaNodes.firstKey(); |
| 101 | } else { |
| 102 | hash=tailMap.firstKey(); |
| 103 | } |
| 104 | } |
| 105 | rv=ketamaNodes.get(hash); |
| 106 | return rv; |
| 107 | } |
| 108 | |
| 109 | public Iterator<MemcachedNode> getSequence(String k) { |
| 110 | return new KetamaIterator(k, allNodes.size()); |
| 111 | } |
| 112 | |
| 113 | public NodeLocator getReadonlyCopy() { |
| 114 | SortedMap<Long, MemcachedNode> smn=new TreeMap<Long, MemcachedNode>( |
| 115 | ketamaNodes); |
| 116 | Collection<MemcachedNode> an= |
| 117 | new ArrayList<MemcachedNode>(allNodes.size()); |
| 118 | |
| 119 | // Rewrite the values a copy of the map. |
| 120 | for(Map.Entry<Long, MemcachedNode> me : smn.entrySet()) { |
| 121 | me.setValue(new MemcachedNodeROImpl(me.getValue())); |
| 122 | } |
| 123 | // Copy the allNodes collection. |
| 124 | for(MemcachedNode n : allNodes) { |
| 125 | an.add(new MemcachedNodeROImpl(n)); |
| 126 | } |
| 127 | |
| 128 | return new KetamaNodeLocator(smn, an, hashAlg, config); |
| 129 | } |
| 130 | |
| 131 | class KetamaIterator implements Iterator<MemcachedNode> { |
| 132 | |
| 133 | final String key; |
| 134 | long hashVal; |
| 135 | int remainingTries; |
| 136 | int numTries=0; |
| 137 | |
| 138 | public KetamaIterator(final String k, final int t) { |
| 139 | super(); |
| 140 | hashVal=hashAlg.hash(k); |
| 141 | remainingTries=t; |
| 142 | key=k; |
| 143 | } |
| 144 | |
| 145 | private void nextHash() { |
| 146 | // this.calculateHash(Integer.toString(tries)+key).hashCode(); |
| 147 | long tmpKey=hashAlg.hash((numTries++) + key); |
| 148 | // This echos the implementation of Long.hashCode() |
| 149 | hashVal += (int)(tmpKey ^ (tmpKey >>> 32)); |
| 150 | hashVal &= 0xffffffffL; /* truncate to 32-bits */ |
| 151 | remainingTries--; |
| 152 | } |
| 153 | |
| 154 | public boolean hasNext() { |
| 155 | return remainingTries > 0; |
| 156 | } |
| 157 | |
| 158 | public MemcachedNode next() { |
| 159 | try { |
| 160 | return getNodeForKey(hashVal); |
| 161 | } finally { |
| 162 | nextHash(); |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | public void remove() { |
| 167 | throw new UnsupportedOperationException("remove not supported"); |
| 168 | } |
| 169 | |
| 170 | } |
| 171 | } |