背景 业务背景 高权限用户要去统计低权限用户的会话存档数据。企微接口调用过多,耗时过长
排查思路 查看链路日志。当用户的企微外部联系人过多时,会重复调用企微会话存档是否授权查询接口【接口一次查询上限为100,企微外部联系人最高上限2w】。
解决方案 企微耗时和调用次数问题无法解决,只能通过多线程和缓存的方式解决
多线程 1 2 3 4 5 6 7 8 9 public  class  ExecutorUtils   {    private  static  ThreadFactory CHAT_ARCHIVE_THREAD_FACTORY = new  ThreadFactoryBuilder().setNameFormat("ChatArchiveThread-%d" ).build();     public  static  final  ExecutorService CHAT_ARCHIVE_POOL = new  ThreadPoolExecutor(             20 , 20 , 0L , TimeUnit.MILLISECONDS,             new  LinkedBlockingQueue<>(1024 ), CHAT_ARCHIVE_THREAD_FACTORY, new  ThreadPoolExecutor.AbortPolicy());     public  static  final  ForkJoinPool PARALLEL_STREAM_POOL = new  ForkJoinPool(20 ); } 
 
CompletableFuture多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 private  FriendNumDto friendNum (String corpId, String userId)   {              CompletableFuture<List<ChatArchiveGetAgreeStatusResDTO>> future7 = future6.thenCompose(result -> CompletableFuture.supplyAsync(() -> {         return  this .chatGetAgreeStatusBatch(corpId, externalUserIdList.stream().map(externalUserId -> {             AgreeStatusItemDTO item = new  AgreeStatusItemDTO();             item.setExternalUserId(externalUserId);             item.setOpenUserId(userId);             return  item;         }).collect(Collectors.toList()));     }, ExecutorUtils.CHAT_ARCHIVE_POOL));      } 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override public  List<ChatArchiveGetAgreeStatusResDTO> chatGetAgreeStatusBatch (String corpId, List<AgreeStatusItemDTO> items)   {    List<List<AgreeStatusItemDTO>> batches = new  ArrayList<>();          int  batchSize = 100 ;     for  (int  i = 0 ; i < items.size(); i += batchSize) {         int  end = Math.min(items.size(), i + batchSize);         batches.add(items.subList(i, end));     }     List<CompletableFuture<ChatArchiveGetAgreeStatusResDTO>> futures = new  ArrayList<>();     for  (List<AgreeStatusItemDTO> item : batches) {                  futures.add(CompletableFuture.supplyAsync(() -> this .chatGetAgreeStatus(corpId, item), ExecutorUtils.CHAT_ARCHIVE_POOL));     }     List<ChatArchiveGetAgreeStatusResDTO> result = new  ArrayList<>();     for  (CompletableFuture<ChatArchiveGetAgreeStatusResDTO> future : futures) {         try  {             ChatArchiveGetAgreeStatusResDTO chatArchiveGetAgreeStatusResDTO = future.get();             result.add(chatArchiveGetAgreeStatusResDTO);         } catch  (Exception e) {             ChatArchiveGetAgreeStatusResDTO failover = new  ChatArchiveGetAgreeStatusResDTO();             failover.setAgreeinfo(new  ArrayList<>());             result.add(failover);         }     }     return  result; } 
 
lambda stream多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 CountDownLatch count = new  CountDownLatch(items.size()); ExecutorUtils.PARALLEL_STREAM_POOL.submit(() -> items.parallelStream().forEach(item -> {     try  {              } finally  {         count.countDown();     } })); try  {    count.await(); } catch  (Exception e) {     throw  new  BizException(ErrorEnum.SYSTEM_ERROR); } 
 
缓存设计见之前的文章 统计数据跟产品沟通后,可以不实时计算,所以增加缓存。加速用户接口返回https://huangbangjing.cn/2024/06/06/%E5%9F%BA%E4%BA%8ESpringAop%E4%B8%8ESpEL%E8%A1%A8%E8%BE%BE%E5%BC%8F%E5%AE%9E%E7%8E%B0%E7%BC%93%E5%AD%98%E9%80%BB%E8%BE%91/ 
定时任务+mq定时计算 实际上经过以上操作,在数据量大的场景下,接口需要8s+【优化前分钟级】,对用户体验仍然不友好 思路: 定时任务筛选出需要计算的企微企业,以企业的维度分发计算任务【mq生产者】 消息队列消费者异步计算企微企业会话存档数据,更新缓存