/** * Process the Web request and (optionally) delegate to the next * {@code SoulPlugin} through the given {@link SoulPluginChain}. * * @param exchange the current server exchange * @param chain provides a way to delegate to the next plugin * @return {@code Mono<Void>} to indicate when request processing is complete */ @Override public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain){ String pluginName = named(); final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName); // 此处判断插件是否启用 if (pluginData != null && pluginData.getEnabled()) { final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName); if (CollectionUtils.isEmpty(selectors)) { return handleSelectorIsNull(pluginName, exchange, chain); } // 匹配选择器 final SelectorData selectorData = matchSelector(exchange, selectors); if (Objects.isNull(selectorData)) { return handleSelectorIsNull(pluginName, exchange, chain); } // 打印日志 selectorLog(selectorData, pluginName); final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId()); if (CollectionUtils.isEmpty(rules)) { return handleRuleIsNull(pluginName, exchange, chain); } // 匹配选择器规则 RuleData rule; if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) { //get last rule = rules.get(rules.size() - 1); } else { // 匹配规则 rule = matchRule(exchange, rules); } if (Objects.isNull(rule)) { return handleRuleIsNull(pluginName, exchange, chain); } // 日志 ruleLog(rule, pluginName); // 进入插件的处理方法 return doExecute(exchange, chain, selectorData, rule); } return chain.execute(exchange); }
// DubboResponsePlugin /** * Process the Web request and (optionally) delegate to the next * {@code WebFilter} through the given {@link SoulPluginChain}. * * @param exchange the current server exchange * @param chain provides a way to delegate to the next filter * @return {@code Mono<Void>} to indicate when request processing is complete */ @Override public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain){ return chain.execute(exchange).then(Mono.defer(() -> { // 从 exchange 中拿到结果 final Object result = exchange.getAttribute(Constants.DUBBO_RPC_RESULT); if (Objects.isNull(result)) { Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result)); // 这个方法里使用了之前见过的 wirteWith() 方法返回响应给客户端 return WebFluxResultUtils.result(exchange, success); })); }