批量删除Redis集群的数据
大黄 Lv4

问题:Redis集群存在多节点问题,不同的数据存在不同的上,所以我们一次操作无法同时操作多个节点hash槽里面的数据。
下面分享几个常见的解决方案。但是他们的解决方案都是大同小异的,都是逐个操作每个键

Redisson

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys) {
// 连接模式非集群模式
if (!connectionManager.isClusterMode()) {
if (readOnly) {
return readAsync((String) null, codec, command, keys);
}
return writeAsync((String) null, codec, command, keys);
}

Map<MasterSlaveEntry, List<String>> range2key = new HashMap<>();
for (String key : keys) {
int slot = connectionManager.calcSlot(key);
MasterSlaveEntry entry = connectionManager.getEntry(slot);
// 构建一个集群节点的map=》后续对集群节点逐个操作
List<String> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());
list.add(key);
}

RPromise<R> result = new RedissonPromise<>();
AtomicLong executed = new AtomicLong(keys.length);
AtomicReference<Throwable> failed = new AtomicReference<>();
BiConsumer<T, Throwable> listener = (res, ex) -> {
if (ex != null) {
failed.set(ex);
} else {
if (res != null) {
callback.onSlotResult(res);
}
}

if (executed.decrementAndGet() == 0) {
if (failed.get() != null) {
result.tryFailure(failed.get());
} else {
result.trySuccess(callback.onFinish());
}
}
};

for (Entry<MasterSlaveEntry, List<String>> entry : range2key.entrySet()) {
// executes in batch due to CROSSLOT error
CommandBatchService executorService;
if (this instanceof CommandBatchService) {
executorService = (CommandBatchService) this;
} else {
executorService = new CommandBatchService(connectionManager);
}

for (String key : entry.getValue()) {
RedisCommand<T> c = command;
RedisCommand<T> newCommand = callback.createCommand(key);
if (newCommand != null) {
c = newCommand;
}
// 对集群节点逐个操作
if (readOnly) {
RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, key);
f.onComplete(listener);
} else {
RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, key);
f.onComplete(listener);
}
}

if (!(this instanceof CommandBatchService)) {
executorService.executeAsync();
}
}

return result;
}


SpringBoot原生Lettuce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public Long del(byte[]... keys) {

Assert.notNull(keys, "Keys must not be null!");
Assert.noNullElements(keys, "Keys must not contain null elements!");

try {
if (isPipelined()) {
pipeline(connection.newLettuceResult(getAsyncConnection().del(keys)));
return null;
}
if (isQueueing()) {
transaction(connection.newLettuceResult(getAsyncConnection().del(keys)));
return null;
}
return getConnection().del(keys);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
1
2
3
4
5
6
7
/**
* Delete a key with pipelining. Cross-slot keys will result in multiple calls to the particular cluster nodes.
*
* @param keys the key.
* @return RedisFuture&lt;Long&gt; integer-reply The number of keys that were removed.
*/
RedisFuture<Long> del(K... keys);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public RedisFuture<Long> del(Iterable<K> keys) {

// 获取hash槽=》判断涉及到的集群节点的个数
Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);

// 同一个节点
if (partitioned.size() < 2) {
return super.del(keys);
}

Map<Integer, RedisFuture<Long>> executions = new HashMap<>();

// 不同节点,逐个处理
for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
RedisFuture<Long> del = super.del(entry.getValue());
executions.put(entry.getKey(), del);
}

return MultiNodeExecution.aggregateAsync(executions);
}

Jedis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Long del(byte[]... keys) {

Assert.notNull(keys, "Keys must not be null!");
Assert.noNullElements(keys, "Keys must not contain null elements!");

// 同一个节点上的
if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
try {
return connection.getCluster().del(keys);
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}

// 不同节点
return (long) connection.getClusterCommandExecutor()
.executeMultiKeyCommand((JedisMultiKeyClusterCommandCallback<Long>) (client, key) -> client.del(key),
Arrays.asList(keys))
.resultsAsList().size();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static boolean isSameSlotForAllKeys(byte[]... keys) {

Assert.notNull(keys, "Keys must not be null!");

if (keys.length <= 1) {
return true;
}

int slot = calculateSlot(keys[0]);
for (int i = 1; i < keys.length; i++) {
if (slot != calculateSlot(keys[i])) {
return false;
}
}
return true;
}
  • Post title:批量删除Redis集群的数据
  • Post author:大黄
  • Create time:2023-10-26 14:52:03
  • Post link:https://huangbangjing.cn/2023/10/26/批量删除Redis集群的数据/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.