踩坑Resilience4j @Bulkhead二
调整了@Bulkhead
j加了type = Type.THREADPOOL
,发现程序无法进入这个方法
@CircuitBreaker(name = "licenseService", fallbackMethod = "buildFallbackLicenseList")@Bulkhead(name = "bulkheadLicenseService", fallbackMethod = "buildFallbackLicenseList", type = Type.THREADPOOL)@Retry(name = "retryLicenseService", fallbackMethod = "buildFallbackLicenseList")@RateLimiter(name = "licenseService", fallbackMethod = "buildFallbackLicenseList")public List<License> getLicensesByOrganization(String organizationId) {LOGGER.info("find all licenses belong to {}", organizationId);LicenseExample example = new LicenseExample();example.createCriteria().andOrganizationIdEqualTo(organizationId);return licenseMapper.selectByExample(example);}
debug发现io.github.resilience4j.spring6.bulkhead.configure.BulkheadAspect
类里面对@Bulkhead
的type
属性进行了判断
@Around(value = "matchAnnotatedClassOrMethod(bulkheadAnnotation)", argNames = "proceedingJoinPoint, bulkheadAnnotation")public Object bulkheadAroundAdvice(ProceedingJoinPoint proceedingJoinPoint,@Nullable Bulkhead bulkheadAnnotation) throws Throwable {Method method = ((MethodSignature) proceedingJoinPoint.getSignature()).getMethod();String methodName = method.getDeclaringClass().getName() + "#" + method.getName();if (bulkheadAnnotation == null) {bulkheadAnnotation = getBulkheadAnnotation(proceedingJoinPoint);}if (bulkheadAnnotation == null) { //because annotations wasn't foundreturn proceedingJoinPoint.proceed();}Class<?> returnType = method.getReturnType();String backend = spelResolver.resolve(method, proceedingJoinPoint.getArgs(), bulkheadAnnotation.name());if (bulkheadAnnotation.type() == Bulkhead.Type.THREADPOOL) {final CheckedSupplier<Object> bulkheadExecution =() -> proceedInThreadPoolBulkhead(proceedingJoinPoint, methodName, returnType, backend);return fallbackExecutor.execute(proceedingJoinPoint, method, bulkheadAnnotation.fallbackMethod(), bulkheadExecution);} else {io.github.resilience4j.bulkhead.Bulkhead bulkhead = getOrCreateBulkhead(methodName,backend);final CheckedSupplier<Object> bulkheadExecution = () -> proceed(proceedingJoinPoint, methodName, bulkhead, returnType);return fallbackExecutor.execute(proceedingJoinPoint, method, bulkheadAnnotation.fallbackMethod(), bulkheadExecution);}}
bulkheadAroundAdvice
方法中使用if-else对type
进行不同处理,type=Type.THREADPOOL
时调用proceedInThreadPoolBulkhead
方法
private Object proceedInThreadPoolBulkhead(ProceedingJoinPoint proceedingJoinPoint,String methodName, Class<?> returnType, String backend) throws Throwable {if (logger.isDebugEnabled()) {logger.debug("ThreadPool bulkhead invocation for method {} in backend {}", methodName,backend);}ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(backend);if (CompletionStage.class.isAssignableFrom(returnType)) {// threadPoolBulkhead.executeSupplier throws a BulkheadFullException, if the Bulkhead is full.// The RuntimeException is converted into an exceptionally completed futuretry {return threadPoolBulkhead.executeCallable(() -> {try {return ((CompletionStage<?>) proceedingJoinPoint.proceed()).toCompletableFuture().get();} catch (ExecutionException e) {throw new CompletionException(e.getCause());} catch (InterruptedException | CancellationException e) {throw e;} catch (Throwable e) {throw new CompletionException(e);}});} catch (BulkheadFullException ex){CompletableFuture<?> future = new CompletableFuture<>();future.completeExceptionally(ex);return future;}} else {throw new IllegalStateException("ThreadPool bulkhead is only applicable for completable futures ");}}
proceedInThreadPoolBulkhead
方法判断返回值类型, 如果是CompletionStage
及子类(比如CompletableFuture
)就能处理,否则抛IllegalStateException
因为我们的返回值类型是List<License>
,所以会进入else中抛出IllegalStateException