1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
   | private <T, R> RFuture<R> executeBatchedAsync(boolean readOnly, Codec codec, RedisCommand<T> command, SlotCallback<T, R> callback, String... keys) {          if (!connectionManager.isClusterMode()) {         if (readOnly) {             return readAsync((String) null, codec, command, keys);         }         return writeAsync((String) null, codec, command, keys);     }
      Map<MasterSlaveEntry, List<String>> range2key = new HashMap<>();     for (String key : keys) {         int slot = connectionManager.calcSlot(key);         MasterSlaveEntry entry = connectionManager.getEntry(slot);                  List<String> list = range2key.computeIfAbsent(entry, k -> new ArrayList<>());         list.add(key);     }
      RPromise<R> result = new RedissonPromise<>();     AtomicLong executed = new AtomicLong(keys.length);     AtomicReference<Throwable> failed = new AtomicReference<>();     BiConsumer<T, Throwable> listener = (res, ex) -> {         if (ex != null) {             failed.set(ex);         } else {             if (res != null) {                 callback.onSlotResult(res);             }         }
          if (executed.decrementAndGet() == 0) {             if (failed.get() != null) {                 result.tryFailure(failed.get());             } else {                 result.trySuccess(callback.onFinish());             }         }     };
      for (Entry<MasterSlaveEntry, List<String>> entry : range2key.entrySet()) {                  CommandBatchService executorService;         if (this instanceof CommandBatchService) {             executorService = (CommandBatchService) this;         } else {             executorService = new CommandBatchService(connectionManager);         }
          for (String key : entry.getValue()) {             RedisCommand<T> c = command;             RedisCommand<T> newCommand = callback.createCommand(key);             if (newCommand != null) {                 c = newCommand;             }                          if (readOnly) {                 RFuture<T> f = executorService.readAsync(entry.getKey(), codec, c, key);                 f.onComplete(listener);             } else {                 RFuture<T> f = executorService.writeAsync(entry.getKey(), codec, c, key);                 f.onComplete(listener);             }         }
          if (!(this instanceof CommandBatchService)) {             executorService.executeAsync();         }     }
      return result; }
 
 
  |