上一篇:dubbo 3.0服务调用者调用第三方接口源码主流程分析
前言
Dubbo的路由机制主要解决的目的就是服务调用时,从已知的所有服务提供者中根据路由规则刷选服务提供者。
Condition条件路由规则
public class ConditionStateRouter<T> extends AbstractStateRouter<T> {
public static final String NAME = "condition";
private static final Logger logger = LoggerFactory.getLogger(ConditionStateRouter.class);
protected static final Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");
protected static Pattern ARGUMENTS_PATTERN = Pattern.compile("arguments\\[([0-9]+)\\]");
/** 存放consumer路由规则 */
protected Map<String, MatchPair> whenCondition;
/** 存放provider路由规则 */
protected Map<String, MatchPair> thenCondition;
private boolean enabled;
public ConditionStateRouter(URL url, String rule, boolean force, boolean enabled) {
super(url);
this.setForce(force);
this.enabled = enabled;
if (enabled) {
this.init(rule);
}
}
public ConditionStateRouter(URL url) {
super(url);
this.setUrl(url);
this.setForce(url.getParameter(FORCE_KEY, false));
this.enabled = url.getParameter(ENABLED_KEY, true);
if (enabled) {
init(url.getParameterAndDecoded(RULE_KEY));
}
}
public void init(String rule) {
try {
if (rule == null || rule.trim().length() == 0) {
throw new IllegalArgumentException("Illegal route rule!");
}
/** 把字符串里的"consumer." "provider." 替换掉,方便解析 */
rule = rule.replace("consumer.", "").replace("provider.", "");
int i = rule.indexOf("=>");
/** 以"=>"为分割线,前面是consumer规则,后面是provider 规则 */
String whenRule = i < 0 ? null : rule.substring(0, i).trim();
String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
/** parseRule 方法解析规则,放在Map<String, MatchPair>里 */
Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
// NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
/**
* NOTE: When条件是允许为空的,外部业务来保证类似的约束条件
* 解析构造的规则放在condition变量里
*/
this.whenCondition = when;
this.thenCondition = then;
} catch (ParseException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
/**
* 解析规则字串解析后放到map里
* 这里有个数据结构类MatchPair 用set放着,允许规则matches,和不允许规则mismatches,他们都是set结构
* @param rule
* @return
* @throws ParseException
*/
private static Map<String, MatchPair> parseRule(String rule)
throws ParseException {
Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
if (StringUtils.isBlank(rule)) {
return condition;
}
// Key-Value pair, stores both match and mismatch conditions
/** 匹配或不匹配Key-Value对 */
MatchPair pair = null;
// Multiple values
/** 多个Value值 */
Set<String> values = null;
/**
* 用java的正则表达式Pattern的matcher去分割字符串到Matcher类,然后逐个匹配。
* 这个要熟悉正则,和Matcher,Pattern
*/
final Matcher matcher = ROUTE_PATTERN.matcher(rule);
/** 逐个匹配 */
while (matcher.find()) { // Try to match one by one
String separator = matcher.group(1);
String content = matcher.group(2);
// Start part of the condition expression.
/** 表达式开始 */
if (StringUtils.isEmpty(separator)) {
pair = new MatchPair();
condition.put(content, pair);
}
// The KV part of the condition expression
else if ("&".equals(separator)) {
if (condition.get(content) == null) {
pair = new MatchPair();
condition.put(content, pair);
} else {
pair = condition.get(content);
}
}
// The Value in the KV part.
else if ("=".equals(separator)) {
if (pair == null) {
throw new ParseException("Illegal route rule \""
+ rule + "\", The error char '" + separator
+ "' at index " + matcher.start() + " before \""
+ content + "\".", matcher.start());
}
values = pair.matches;
values.add(content);
}
// The Value in the KV part.
else if ("!=".equals(separator)) {
if (pair == null) {
throw new ParseException("Illegal route rule \""
+ rule + "\", The error char '" + separator
+ "' at index " + matcher.start() + " before \""
+ content + "\".", matcher.start());
}
values = pair.mismatches;
values.add(content);
}
// The Value in the KV part, if Value have more than one items.
else if (",".equals(separator)) { // Should be separated by ','
if (values == null || values.isEmpty()) {
throw new ParseException("Illegal route rule \""
+ rule + "\", The error char '" + separator
+ "' at index " + matcher.start() + " before \""
+ content + "\".", matcher.start());
}
values.add(content);
} else {
throw new ParseException("Illegal route rule \"" + rule
+ "\", The error char '" + separator + "' at index "
+ matcher.start() + " before \"" + content + "\".", matcher.start());
}
}
return condition;
}
@Override
protected BitList<Invoker<T>> doRoute(BitList<Invoker<T>> invokers, URL url, Invocation invocation,
boolean needToPrintMessage, Holder<RouterSnapshotNode<T>> nodeHolder,
Holder<String> messageHolder) throws RpcException {
if (!enabled) {
if (needToPrintMessage) {
messageHolder.set("Directly return. Reason: ConditionRouter disabled.");
}
return invokers;
}
if (CollectionUtils.isEmpty(invokers)) {
if (needToPrintMessage) {
messageHolder.set("Directly return. Reason: Invokers from previous router is empty.");
}
return invokers;
}
try {
/** 如果没有达到matchWhen的条件则直接返回,说明consumer不在限制之列。
* 说明,路由不针对当前客户,这样就全部放行,所有提供者都可以调用。
* 这是consumer的url
*/
if (!matchWhen(url, invocation)) {
if (needToPrintMessage) {
messageHolder.set("Directly return. Reason: WhenCondition not match.");
}
return invokers;
}
/** thenCondition为null表示拒绝一切请求 */
if (thenCondition == null) {
logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
if (needToPrintMessage) {
messageHolder.set("Empty return. Reason: ThenCondition is empty.");
}
return BitList.emptyList();
}
BitList<Invoker<T>> result = invokers.clone();
result.removeIf(invoker ->
/**
* 调用不匹配进行过滤。服务提供者,只要符合路由才能提供服务,这里的url改为invoker.getUrl()
* 都不匹配,这里result可能是空的。
*/
!matchThen(invoker.getUrl(), url));
if (!result.isEmpty()) {
if (needToPrintMessage) {
messageHolder.set("Match return.");
}
return result;
} else if (this.isForce()) {
/** force强制执行路由。哪怕result是空的,也要返回给上层方法。如果为false,最后放回所有的invokers,等于不执行路由 */
logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY));
if (needToPrintMessage) {
messageHolder.set("Empty return. Reason: Empty result from condition and condition is force.");
}
return result;
}
} catch (Throwable t) {
logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
}
if (needToPrintMessage) {
messageHolder.set("Directly return. Reason: Error occurred ( or result is empty ).");
}
return invokers;
}
@Override
public boolean isRuntime() {
// We always return true for previously defined Router, that is, old Router doesn't support cache anymore.
// return true;
return this.getUrl().getParameter(RUNTIME_KEY, false);
}
/** 对服务调用方的匹配 */
boolean matchWhen(URL url, Invocation invocation) {
return CollectionUtils.isEmptyMap(whenCondition) || matchCondition(whenCondition, url, null, invocation);
}
/** 对服务提供者的匹配 */
private boolean matchThen(URL url, URL param) {
return CollectionUtils.isNotEmptyMap(thenCondition) && matchCondition(thenCondition, url, param, null);
}
/** 具体执行匹配规则,遍历condition里所有的规则,执行规则。 */
private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
Map<String, String> sample = url.toMap();
boolean result = false;
for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
String key = matchPair.getKey();
if (key.startsWith(Constants.ARGUMENTS)) {
if (!matchArguments(matchPair, invocation)) {
return false;
} else {
result = true;
continue;
}
}
String sampleValue;
//get real invoked method name from invocation
/** 路由规则支持到方法级别 */
if (invocation != null && (METHOD_KEY.equals(key) || METHODS_KEY.equals(key))) {
/** 获取方法名 */
sampleValue = invocation.getMethodName();
} else if (ADDRESS_KEY.equals(key)) {
sampleValue = url.getAddress();
} else if (HOST_KEY.equals(key)) {
sampleValue = url.getHost();
} else {
/** key 是host 获取host值 */
sampleValue = sample.get(key);
}
if (sampleValue != null) {
/** 调用MatchPair的isMatch方法 */
if (!matchPair.getValue().isMatch(sampleValue, param)) {
return false;
} else {
result = true;
}
} else {
//not pass the condition
/** 如果sampleValue没有值,但匹配项有值。不通过 */
if (!matchPair.getValue().matches.isEmpty()) {
return false;
} else {
result = true;
}
}
}
return result;
}
/**
* analysis the arguments in the rule.
* Examples would be like this:
* "arguments[0]=1", whenCondition is that the first argument is equal to '1'.
* "arguments[1]=a", whenCondition is that the second argument is equal to 'a'.
* @param matchPair
* @param invocation
* @return
*/
public boolean matchArguments(Map.Entry<String, MatchPair> matchPair, Invocation invocation) {
try {
// split the rule
String key = matchPair.getKey();
String[] expressArray = key.split("\\.");
String argumentExpress = expressArray[0];
final Matcher matcher = ARGUMENTS_PATTERN.matcher(argumentExpress);
if (!matcher.find()) {
return false;
}
//extract the argument index
int index = Integer.parseInt(matcher.group(1));
if (index < 0 || index > invocation.getArguments().length) {
return false;
}
//extract the argument value
Object object = invocation.getArguments()[index];
if (matchPair.getValue().isMatch(String.valueOf(object), null)) {
return true;
}
} catch (Exception e) {
logger.warn("Arguments match failed, matchPair[]" + matchPair + "] invocation[" + invocation + "]", e);
}
return false;
}
/**
* MatchPiar 数据结构,它包含类具体匹配方法isMatch
*/
protected static final class MatchPair {
/** 允许列表,内部用Set集合类型,防止重复 */
final Set<String> matches = new HashSet<String>();
/** 拒绝规则 */
final Set<String> mismatches = new HashSet<String>();
/** 具体执行匹配规则的方法 */
private boolean isMatch(String value, URL param) {
/** 只有允许项目 */
if (!matches.isEmpty() && mismatches.isEmpty()) {
for (String match : matches) {
/** value只要满足一项,允许,就为匹配,通过 */
if (UrlUtils.isMatchGlobPattern(match, value, param)) {
return true;
}
}
return false;
}
/** 只有不允许项目 */
if (!mismatches.isEmpty() && matches.isEmpty()) {
/** value是要满足一项,不允许。就是不匹配 */
for (String mismatch : mismatches) {
if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
return false;
}
}
/** 必须全部不满足不匹配。才通过 */
return true;
}
if (!matches.isEmpty() && !mismatches.isEmpty()) {
//when both mismatches and matches contain the same value, then using mismatches first
/** 当匹配项和不匹配项包含同样的值,不匹配项优先 */
for (String mismatch : mismatches) {
if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
return false;
}
}
for (String match : matches) {
if (UrlUtils.isMatchGlobPattern(match, value, param)) {
return true;
}
}
return false;
}
return false;
}
}
}
Script脚本路由规则(JavaScript)
public class ScriptStateRouter<T> extends AbstractStateRouter<T> {
public static final String NAME = "SCRIPT_ROUTER";
private static final int SCRIPT_ROUTER_DEFAULT_PRIORITY = 0;
private static final Logger logger = LoggerFactory.getLogger(ScriptStateRouter.class);
private static final Map<String, ScriptEngine> ENGINES = new ConcurrentHashMap<>();
private final ScriptEngine engine;
private final String rule;
private CompiledScript function;
private AccessControlContext accessControlContext;
{
//Just give permission of reflect to access member.
Permissions perms = new Permissions();
perms.add(new RuntimePermission("accessDeclaredMembers"));
// Cast to Certificate[] required because of ambiguity:
ProtectionDomain domain = new ProtectionDomain(new CodeSource(null, (Certificate[]) null), perms);
accessControlContext = new AccessControlContext(new ProtectionDomain[]{domain});
}
public ScriptStateRouter(URL url) {
super(url);
this.setUrl(url);
/** 根据type,获取java 的脚本类型。这里用了map做缓存。 */
engine = getEngine(url);
/** 通过 rule key,获取具体的脚本函数字符串 */
rule = getRule(url);
try {
/** 根据rule 规则字串,编译脚本 */
Compilable compilable = (Compilable) engine;
function = compilable.compile(rule);
} catch (ScriptException e) {
logger.error("route error, rule has been ignored. rule: " + rule +
", url: " + RpcContext.getServiceContext().getUrl(), e);
}
}
/**
* get rule from url parameters.
*/
private String getRule(URL url) {
String vRule = url.getParameterAndDecoded(RULE_KEY);
if (StringUtils.isEmpty(vRule)) {
throw new IllegalStateException("route rule can not be empty.");
}
return vRule;
}
/**
* create ScriptEngine instance by type from url parameters, then cache it
*/
private ScriptEngine getEngine(URL url) {
String type = url.getParameter(TYPE_KEY, DEFAULT_SCRIPT_TYPE_KEY);
return ENGINES.computeIfAbsent(type, t -> {
ScriptEngine scriptEngine = new ScriptEngineManager().getEngineByName(type);
if (scriptEngine == null) {
throw new IllegalStateException("unsupported route engine type: " + type);
}
return scriptEngine;
});
}
@Override
protected BitList<Invoker<T>> doRoute(BitList<Invoker<T>> invokers, URL url, Invocation invocation, boolean needToPrintMessage, Holder<RouterSnapshotNode<T>> nodeHolder, Holder<String> messageHolder) throws RpcException {
if (engine == null || function == null) {
if (needToPrintMessage) {
messageHolder.set("Directly Return. Reason: engine or function is null");
}
return invokers;
}
Bindings bindings = createBindings(invokers, invocation);
return getRoutedInvokers(invokers, AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
try {
/**
* 执行脚本
* 用xx.eval方法,都是在动态的运行你的脚本语言
* bindings就是一些数据,可以传入到脚本里面去让脚本来进行处理
*/
return function.eval(bindings);
} catch (ScriptException e) {
logger.error("route error, rule has been ignored. rule: " + rule + ", method:" +
invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
return invokers;
}
}, accessControlContext));
}
/**
* get routed invokers from result of script rule evaluation
*/
@SuppressWarnings("unchecked")
protected BitList<Invoker<T>> getRoutedInvokers(BitList<Invoker<T>> invokers, Object obj) {
BitList<Invoker<T>> result = invokers.clone();
/** 把结果转型,返回 */
if (obj instanceof Invoker[]) {
result.retainAll(Arrays.asList((Invoker<T>[]) obj));
} else if (obj instanceof Object[]) {
result.retainAll(Arrays.stream((Object[]) obj).map(item -> (Invoker<T>) item).collect(Collectors.toList()));
} else {
result.retainAll((List<Invoker<T>>) obj);
}
return result;
}
/**
* create bindings for script engine
*/
private Bindings createBindings(List<Invoker<T>> invokers, Invocation invocation) {
Bindings bindings = engine.createBindings();
// create a new List of invokers
/** copy一份,原始invokers, 绑定3个参数,也是在rule规则字串最后,调用函数时,传递的参数名称。 */
bindings.put("invokers", new ArrayList<>(invokers));
bindings.put("invocation", invocation);
bindings.put("context", RpcContext.getClientAttachment());
return bindings;
}
@Override
public boolean isRuntime() {
return this.getUrl().getParameter(RUNTIME_KEY, false);
}
@Override
public boolean isForce() {
return this.getUrl().getParameter(FORCE_KEY, false);
}
}
本文暂时没有评论,来添加一个吧(●'◡'●)