Dubbo服务提供端处理请求的过程剖析

2024-01-09 15:54:21

1 处理请求的过程概述

(1)消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用;

(2)因为Netty默认的线程模型为All,因此AllChannelHandler类把接收到的所有消息(包括请求事件、响应事件、连接事件、断开事件,心跳事件等)包装成ChannelEventRunnable任务,并将其投递到线程池中;

(3)接着执行线程池中的任务,并最终调用DubboProtocol的connected方法来执行请求的方法。

2 处理请求的实现细节

2.1?NettyServer的connected方法被调用

消费端发起TCP连接后,服务提供方的NettyServer的connected方法将被调用。connected方法为NettyServer父类AbstractServer的connected方法。

其中的依次调用关系为:AbstractServer的connected()->AbstractPeer的connected()->ChannelHandler的connected()。具体实现如下所示。

(1)AbstractServer的connected()

    public void connected(Channel ch) throws RemotingException {
        // If the server has entered the shutdown process, reject any new connection
        if (this.isClosing() || this.isClosed()) {
            logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
            ch.close();
            return;
        }

        if (accepts > 0 && getChannelsSize()> accepts) {
            logger.error(INTERNAL_ERROR, "unknown error in remoting module", "", "Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
            ch.close();
            return;
        }
        super.connected(ch);
    }

(2)AbstractPeer的connected()

    private final ChannelHandler handler;

    public void connected(Channel ch) throws RemotingException {
        if (closed) {
            return;
        }
        handler.connected(ch);
    }

2.2 消息被投递到线程池中

调用ChannelHandler的connected()时,因为Netty默认的线程模型为All,因此AllChannelHandler类(ChannelHandler的子类)把接收到的所有消息包装成ChannelEventRunnable任务,并将其投递到线程池中。具体实现如下所示。

public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getSharedExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

2.3?执行线程池中的任务

2.3.1?ChannelEventRunnable的run方法

执行线程池中的任务时,将执行ChannelEventRunnable的run方法,其实现细节具体如下所示。

    public void run() {
        InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();
        try {
            if (state == ChannelState.RECEIVED) {
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
                }
            } else {
                switch (state) {
                    case CONNECTED:
                        try {
                            handler.connected(channel);
                        } catch (Exception e) {
                            logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                        }
                        break;
                    case DISCONNECTED:
                        try {
                            handler.disconnected(channel);
                        } catch (Exception e) {
                            logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                        }
                        break;
                    case SENT:
                        try {
                            handler.sent(channel, message);
                        } catch (Exception e) {
                            logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is " + message, e);
                        }
                        break;
                    case CAUGHT:
                        try {
                            handler.caught(channel, exception);
                        } catch (Exception e) {
                            logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                                + ", message is: " + message + ", exception is " + exception, e);
                        }
                        break;
                    default:
                        logger.warn(INTERNAL_ERROR, "unknown error in remoting module", "", "unknown state: " + state + ", message is " + message);
                }
            }
        } finally {
            InternalThreadLocalMap.set(internalThreadLocalMap);
        }
    }

2.3.2 执行connected方法

执行handler.connected(channel)时,将调用HeaderExchangeHandler#connected方法,具体实现如下所示。

    public void connected(Channel channel) throws RemotingException {
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        handler.connected(exchangeChannel);
        channel.setAttribute(Constants.CHANNEL_SHUTDOWN_TIMEOUT_KEY,
            ConfigurationUtils.getServerShutdownTimeout(channel.getUrl().getOrDefaultApplicationModel()));
    }

接着在执行handler.connected(exchangeChannel)时,将调用DubboProtocol#connected方法,实现如下所示。

public void connected(Channel channel) throws RemotingException {
    invoke(channel, ON_CONNECT_KEY);
}

private void invoke(Channel channel, String methodKey) {
    Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
    if (invocation != null) {
        try {
            if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {
                tryToGetStubService(channel, invocation);
            }
            received(channel, invocation);
        } catch (Throwable t) {
            logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", "Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
        }
    }
}

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        reply((ExchangeChannel) channel, message);

    } else {
        super.received(channel, message);
    }
}

执行接口请求最终将调用DubboProtocol#reply方法,具体实现如下所示。

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

    if (!(message instanceof Invocation)) {
        throw new RemotingException(channel, "Unsupported request: "
            + (message == null ? null : (message.getClass().getName() + ": " + message))
            + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
    }

    Invocation inv = (Invocation) message;

    // 1、获取调用方法对应的Invoker
    Invoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();
    // switch TCCL
    if (invoker.getUrl().getServiceModel() != null) {
        Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
    }
    // need to consider backward-compatibility if it's a callback
    if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
        String methodsStr = invoker.getUrl().getParameters().get("methods");
        boolean hasMethod = false;
        if (methodsStr == null || !methodsStr.contains(",")) {
            hasMethod = inv.getMethodName().equals(methodsStr);
        } else {
            String[] methods = methodsStr.split(",");
            for (String method : methods) {
                if (inv.getMethodName().equals(method)) {
                    hasMethod = true;
                    break;
                }
            }
        }
        if (!hasMethod) {
            logger.warn(PROTOCOL_FAILED_REFER_INVOKER, "", "", new IllegalStateException("The methodName " + inv.getMethodName()
                + " not found in callback service interface ,invoke will be ignored."
                + " please update the api interface. url is:"
                + invoker.getUrl()) + " ,invocation is :" + inv);
            return null;
        }
    }

    // 2、获取上下文对象,并设置对端地址
    RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
    // 3、执行invoker调用链
    Result result = invoker.invoke(inv);
    // 4、返回结果
    return result.thenApply(Function.identity());
}

文章来源:https://blog.csdn.net/J_bean/article/details/133938964
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。