背景 业务背景 高权限用户要去统计低权限用户的会话存档数据。企微接口调用过多,耗时过长
排查思路 查看链路日志。当用户的企微外部联系人过多时,会重复调用企微会话存档是否授权查询接口【接口一次查询上限为100,企微外部联系人最高上限2w】。
解决方案 企微耗时和调用次数问题无法解决,只能通过多线程和缓存的方式解决
多线程 1 2 3 4 5 6 7 8 9 public class ExecutorUtils { private static ThreadFactory CHAT_ARCHIVE_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("ChatArchiveThread-%d" ).build(); public static final ExecutorService CHAT_ARCHIVE_POOL = new ThreadPoolExecutor( 20 , 20 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024 ), CHAT_ARCHIVE_THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy()); public static final ForkJoinPool PARALLEL_STREAM_POOL = new ForkJoinPool(20 ); }
CompletableFuture多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 private FriendNumDto friendNum (String corpId, String userId) { CompletableFuture<List<ChatArchiveGetAgreeStatusResDTO>> future7 = future6.thenCompose(result -> CompletableFuture.supplyAsync(() -> { return this .chatGetAgreeStatusBatch(corpId, externalUserIdList.stream().map(externalUserId -> { AgreeStatusItemDTO item = new AgreeStatusItemDTO(); item.setExternalUserId(externalUserId); item.setOpenUserId(userId); return item; }).collect(Collectors.toList())); }, ExecutorUtils.CHAT_ARCHIVE_POOL)); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public List<ChatArchiveGetAgreeStatusResDTO> chatGetAgreeStatusBatch (String corpId, List<AgreeStatusItemDTO> items) { List<List<AgreeStatusItemDTO>> batches = new ArrayList<>(); int batchSize = 100 ; for (int i = 0 ; i < items.size(); i += batchSize) { int end = Math.min(items.size(), i + batchSize); batches.add(items.subList(i, end)); } List<CompletableFuture<ChatArchiveGetAgreeStatusResDTO>> futures = new ArrayList<>(); for (List<AgreeStatusItemDTO> item : batches) { futures.add(CompletableFuture.supplyAsync(() -> this .chatGetAgreeStatus(corpId, item), ExecutorUtils.CHAT_ARCHIVE_POOL)); } List<ChatArchiveGetAgreeStatusResDTO> result = new ArrayList<>(); for (CompletableFuture<ChatArchiveGetAgreeStatusResDTO> future : futures) { try { ChatArchiveGetAgreeStatusResDTO chatArchiveGetAgreeStatusResDTO = future.get(); result.add(chatArchiveGetAgreeStatusResDTO); } catch (Exception e) { ChatArchiveGetAgreeStatusResDTO failover = new ChatArchiveGetAgreeStatusResDTO(); failover.setAgreeinfo(new ArrayList<>()); result.add(failover); } } return result; }
lambda stream多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 CountDownLatch count = new CountDownLatch(items.size()); ExecutorUtils.PARALLEL_STREAM_POOL.submit(() -> items.parallelStream().forEach(item -> { try { } finally { count.countDown(); } })); try { count.await(); } catch (Exception e) { throw new BizException(ErrorEnum.SYSTEM_ERROR); }
缓存设计见之前的文章 统计数据跟产品沟通后,可以不实时计算,所以增加缓存。加速用户接口返回https://huangbangjing.cn/2024/06/06/%E5%9F%BA%E4%BA%8ESpringAop%E4%B8%8ESpEL%E8%A1%A8%E8%BE%BE%E5%BC%8F%E5%AE%9E%E7%8E%B0%E7%BC%93%E5%AD%98%E9%80%BB%E8%BE%91/
定时任务+mq定时计算 实际上经过以上操作,在数据量大的场景下,接口需要8s+【优化前分钟级】,对用户体验仍然不友好 思路: 定时任务筛选出需要计算的企微企业,以企业的维度分发计算任务【mq生产者】 消息队列消费者异步计算企微企业会话存档数据,更新缓存