Redis Cluster 批量操作实现

前言

之前发过一篇RedisCluster构建批量操作探讨,这篇文章针对其中第2种方案,做一次实现。在使用redis cluster 过程中,经常会需要用到批量操作,本次探讨因此产生。

实现

原理:将key分批计算所在node,然后在单个node上执行pipeline,即可完成批量操作

key 槽计算

因为redis cluster 设计原理,在处理key时,会将 CRC16(key) mod 16384 散列的分布在集群的node 上,因此,我们需要计算该key所在slot.(利用jedis中自带的算法)

1
int slot = JedisClusterCRC16.getSlot(key);

node及slot获取

同样基于jedis(2.8.1),核心用法如下:

1
2
Jedis jedisNode = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
List<Object> list = jedisNode.clusterSlots();

该方法获取到整个集群中slot在node节点的分布。例如:0,5000,node.
通过以上方法整合,即可实现批量操作。

整合案例

以下是我自己写的一个set类型读取的批量操作的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

private static Map<String, JedisPool> nodeMap = jedis.getClusterNodes();

private static TreeMap<Long, String> slotHostMap = getSlotHostMap(redisConf[0]);
/**
* 将key按slort分批整理
* @param keys
* @return
*/
private static Map<JedisPool, List<String>> getPoolKeyMap(List<String> keys) {
Map<JedisPool, List<String>> poolKeysMap = new LinkedHashMap<JedisPool, List<String>>();
try {
for (String key : keys) {

int slot = JedisClusterCRC16.getSlot(key);

//获取到对应的Jedis对象,此处+1解决临界问题
Map.Entry<Long, String> entry = slotHostMap.lowerEntry(Long.valueOf(slot+1));

JedisPool jedisPool = nodeMap.get(entry.getValue());

if (poolKeysMap.containsKey(jedisPool)) {
poolKeysMap.get(jedisPool).add(key);
} else {
List<String> subKeyList = new ArrayList<String>();
subKeyList.add(key);
poolKeysMap.put(jedisPool, subKeyList);
}
}
} catch (Exception e) {
e.getMessage();
}
return poolKeysMap;
}

/**
* slort对应node
* @param anyHostAndPortStr
* @return
*/
private static TreeMap<Long, String> getSlotHostMap(String anyHostAndPortStr) {
TreeMap<Long, String> tree = new TreeMap<Long, String>();
String parts[] = anyHostAndPortStr.split(":");
HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
try {
Jedis jedisNode = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
List<Object> list = jedisNode.clusterSlots();
for (Object object : list) {
List<Object> list1 = (List<Object>) object;
List<Object> master = (List<Object>) list1.get(2);
String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
tree.put((Long) list1.get(0), hostAndPort);
tree.put((Long) list1.get(1), hostAndPort);
}
jedisNode.close();
} catch (Exception e) {

}
return tree;
}

/**
* 批量获取set类型数据
* @param keys
* @return
*/
public static Map<String, Set<String>> batchGetSetData(List<String> keys) {
if (keys == null || keys.isEmpty()) {
return null;
}
Map<JedisPool, List<String>> poolKeysMap = getPoolKeyMap(keys);
Map<String, Set<String>> resultMap = new HashMap<String, Set<String>>();
for (Map.Entry<JedisPool, List<String>> entry : poolKeysMap.entrySet()) {
JedisPool jedisPool = entry.getKey();
List<String> subkeys = entry.getValue();
if (subkeys == null || subkeys.isEmpty()) {
continue;
}
//申请jedis对象
Jedis jedis = null;
Pipeline pipeline = null;
List<Object> subResultList = null;
try {
jedis = jedisPool.getResource();
pipeline = jedis.pipelined();

for (String key : subkeys) {
pipeline.smembers(key);
}

subResultList = pipeline.syncAndReturnAll();
} catch (JedisConnectionException e) {
e.getMessage();
} catch (Exception e) {
e.getMessage();
} finally {
if (pipeline != null)
try {
pipeline.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//释放jedis对象
if (jedis != null) {
jedis.close();
}
}
if (subResultList == null || subResultList.isEmpty()) {
continue;
}
if (subResultList.size() == subkeys.size()) {
for (int i = 0; i < subkeys.size(); i++) {
String key = subkeys.get(i);
Object result = subResultList.get(i);
resultMap.put(key, (Set<String>) result);
}
} else {
System.out.println("redis cluster pipeline error!");
}
}
return resultMap;
}

参考

[1] cachecloud client中实现