1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys) { if (!connectionManager.isClusterMode()) { if (readOnly) { return readAsync((String) null, codec, command, keys); } return writeAsync((String) null, codec, command, keys); }
Map<MasterSlaveEntry, List<String>> range2key = new HashMap<>(); for (String key : keys) { int slot = connectionManager.calcSlot(key); MasterSlaveEntry entry = connectionManager.getEntry(slot); List<String> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>()); list.add(key); }
RPromise<R> result = new RedissonPromise<>(); AtomicLong executed = new AtomicLong(keys.length); AtomicReference<Throwable> failed = new AtomicReference<>(); BiConsumer<T, Throwable> listener = (res, ex) -> { if (ex != null) { failed.set(ex); } else { if (res != null) { callback.onSlotResult(res); } }
if (executed.decrementAndGet() == 0) { if (failed.get() != null) { result.tryFailure(failed.get()); } else { result.trySuccess(callback.onFinish()); } } };
for (Entry<MasterSlaveEntry, List<String>> entry : range2key.entrySet()) { CommandBatchService executorService; if (this instanceof CommandBatchService) { executorService = (CommandBatchService) this; } else { executorService = new CommandBatchService(connectionManager); }
for (String key : entry.getValue()) { RedisCommand<T> c = command; RedisCommand<T> newCommand = callback.createCommand(key); if (newCommand != null) { c = newCommand; } if (readOnly) { RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, key); f.onComplete(listener); } else { RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, key); f.onComplete(listener); } }
if (!(this instanceof CommandBatchService)) { executorService.executeAsync(); } }
return result; }
|