聊聊PowerJob的RemoteEngine
序
本文主要研究一下PowerJob的RemoteEngine
RemoteEngine
tech/powerjob/remote/framework/engine/RemoteEngine.java
public interface RemoteEngine {
    EngineOutput start(EngineConfig engineConfig);
    void close() throws IOException;
}
RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput
EngineConfig
tech/powerjob/remote/framework/engine/EngineConfig.java
@Data
@Accessors(chain = true)
public class EngineConfig implements Serializable {
    /**
     * 服务类型
     */
    private ServerType serverType;
    /**
     * 需要启动的引擎类型
     */
    private String type;
    /**
     * 绑定的本地地址
     */
    private Address bindAddress;
    /**
     * actor实例,交由使用侧自己实例化以便自行注入各种 bean
     */
    private List<Object> actorList;
}
EngineConfig定义了serverType(
SERVER、WORKER),type、bindAddress、actorList属性
EngineOutput
tech/powerjob/remote/framework/engine/EngineOutput.java
@Getter
@Setter
public class EngineOutput {
    private Transporter transporter;
}
EngineOutput定义了transporter
Transporter
tech/powerjob/remote/framework/transporter/Transporter.java
public interface Transporter {
    /**
     * Protocol
     * @return return protocol
     */
    Protocol getProtocol();
    /**
     *send message
     * @param url url
     * @param request request
     */
    void tell(URL url, PowerSerializable request);
    /**
     * ask by request
     * @param url url
     * @param request request
     * @param clz response type
     * @return CompletionStage
     * @throws RemotingException remote exception
     */
    <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}
Transporter接口定义了getProtocol(
AkkaProtocol、HttpProtocol)、tell、ask三个方法
PowerJobRemoteEngine
tech/powerjob/remote/framework/engine/impl/PowerJobRemoteEngine.java
@Slf4j
public class PowerJobRemoteEngine implements RemoteEngine {
    private CSInitializer csInitializer;
    @Override
    public EngineOutput start(EngineConfig engineConfig) {
        final String engineType = engineConfig.getType();
        EngineOutput engineOutput = new EngineOutput();
        log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);
        List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
        csInitializer = CSInitializerFactory.build(engineType);
        String type = csInitializer.type();
        Stopwatch sw = Stopwatch.createStarted();
        log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);
        csInitializer.init(new CSInitializerConfig()
                .setBindAddress(engineConfig.getBindAddress())
                .setServerType(engineConfig.getServerType())
        );
        // 构建通讯器
        Transporter transporter = csInitializer.buildTransporter();
        engineOutput.setTransporter(transporter);
        log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
        actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));
        // 绑定 handler
        csInitializer.bindHandlers(actorInfos);
        log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);
        return engineOutput;
    }
    @Override
    public void close() throws IOException {
        csInitializer.close();
    }
}
PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer
ActorFactory.load
tech/powerjob/remote/framework/engine/impl/ActorFactory.java
@Slf4j
class ActorFactory {
    static List<ActorInfo> load(List<Object> actorList) {
        List<ActorInfo> actorInfos = Lists.newArrayList();
        actorList.forEach(actor -> {
            final Class<?> clz = actor.getClass();
            try {
                final Actor anno = clz.getAnnotation(Actor.class);
                ActorInfo actorInfo = new ActorInfo().setActor(actor).setAnno(anno);
                actorInfo.setHandlerInfos(loadHandlerInfos4Actor(actorInfo));
                actorInfos.add(actorInfo);
            } catch (Throwable t) {
                log.error("[ActorFactory] process Actor[{}] failed!", clz);
                ExceptionUtils.rethrow(t);
            }
        });
        return actorInfos;
    }
    //......
}    
ActorFactory.load方法遍历actorList,获取其类上的
@Actor注解,再收集其方法上的@Handler注解信息设置到actorInfo
CSInitializerFactory
tech/powerjob/remote/framework/engine/impl/CSInitializerFactory.java
@Slf4j
class CSInitializerFactory {
    static CSInitializer build(String targetType) {
        Reflections reflections = new Reflections(OmsConstant.PACKAGE);
        Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);
        log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);
        for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {
            try {
                CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();
                String type = csInitializer.type();
                log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);
                if (targetType.equalsIgnoreCase(type)) {
                    return csInitializer;
                }
            } catch (Exception e) {
                log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);
                ExceptionUtils.rethrow(e);
            }
        }
        throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob' and import the dependencies!", targetType));
    }
}
CSInitializerFactory的build方法通过org.reflections.Reflections去扫描
tech.powerjob包,获取CSInitializer的子类,之后通过反射进行实例化
CSInitializer
tech/powerjob/remote/framework/cs/CSInitializer.java
public interface CSInitializer {
    /**
     * 类型名称,比如 akka, netty4,httpJson
     * @return 名称
     */
    String type();
    /**
     * initialize the framework
     * @param config config
     */
    void init(CSInitializerConfig config);
    /**
     * build a Transporter by based network framework
     * @return Transporter
     */
    Transporter buildTransporter();
    /**
     * bind Actor, publish handler's service
     * @param actorInfos actor infos
     */
    void bindHandlers(List<ActorInfo> actorInfos);
    void close() throws IOException;
}
CSInitializer接口定义了type、init、buildTransporter、close方法,它有两个实现类,分别是AkkaCSInitializer、HttpVertxCSInitializer
CSInitializerConfig
tech/powerjob/remote/framework/cs/CSInitializerConfig.java
@Getter
@Setter
@Accessors(chain = true)
public class CSInitializerConfig implements Serializable {
    private Address bindAddress;
    private ServerType serverType;
}
CSInitializerConfig定义了bindAddress、serverType两个属性
AkkaCSInitializer
tech/powerjob/remote/akka/AkkaCSInitializer.java
@Slf4j
public class AkkaCSInitializer implements CSInitializer {
    private ActorSystem actorSystem;
    private CSInitializerConfig config;
    @Override
    public String type() {
        return tech.powerjob.common.enums.Protocol.AKKA.name();
    }
    @Override
    public void init(CSInitializerConfig config) {
        this.config = config;
        Address bindAddress = config.getBindAddress();
        log.info("[PowerJob-AKKA] bindAddress: {}", bindAddress);
        // 初始化 ActorSystem(macOS上 new ServerSocket 检测端口占用的方法并不生效,可能是AKKA是Scala写的缘故?没办法...只能靠异常重试了)
        Map<String, Object> overrideConfig = Maps.newHashMap();
        overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
        overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());
        Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
        Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
        log.info("[PowerJob-AKKA] try to start AKKA System.");
        // 启动时绑定当前的 actorSystemName
        String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());
        this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);
        // 处理系统中产生的异常情况
        ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");
        actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
        log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", actorSystem.name());
    }
    @Override
    public Transporter buildTransporter() {
        return new AkkaTransporter(actorSystem);
    }
    @Override
    public void bindHandlers(List<ActorInfo> actorInfos) {
        int cores = Runtime.getRuntime().availableProcessors();
        actorInfos.forEach(actorInfo -> {
            String rootPath = actorInfo.getAnno().path();
            AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);
            log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig));
            actorSystem.actorOf(AkkaProxyActor.props(actorInfo)
                    .withDispatcher("akka.".concat(actorConfig.getDispatcherName()))
                    .withRouter(new RoundRobinPool(cores)), actorConfig.getActorName());
        });
    }
    @Override
    public void close() throws IOException {
        actorSystem.terminate();
    }
}
AkkaCSInitializer其type方法返回的是AKKA类型,init方法先通过ConfigFactory.load(AkkaConstant.AKKA_CONFIG)加载akka基本配置,再覆盖hostname和port信息,最后通过ActorSystem.create(actorSystemName, akkaFinalConfig)创建actorSystem,并创建AkkaTroubleshootingActor,订阅DeadLetter消息;buildTransporter返回的是AkkaTransporter;其bindHandlers方法主要是根据ActorInfo信息来创建actor;其close方法执行actorSystem.terminate()
AkkaTransporter
tech/powerjob/remote/akka/AkkaTransporter.java
public class AkkaTransporter implements Transporter {
    private final ActorSystem actorSystem;
    /**
     * akka://<actor system>@<hostname>:<port>/<actor path>
     */
    private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
    public AkkaTransporter(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }
    @Override
    public Protocol getProtocol() {
        return new AkkaProtocol();
    }
    @Override
    public void tell(URL url, PowerSerializable request) {
        ActorSelection actorSelection = fetchActorSelection(url);
        actorSelection.tell(request, null);
    }
    @Override
    @SuppressWarnings("unchecked")
    public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
        ActorSelection actorSelection = fetchActorSelection(url);
        return (CompletionStage<T>) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
    }
    private ActorSelection fetchActorSelection(URL url) {
        HandlerLocation location = url.getLocation();
        String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType());
        String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();
        CommonUtils.requireNonNull(targetActorName, "can't find actor by URL: " + location);
        String address = url.getAddress().toFullAddress();
        return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, targetActorName));
    }
}
AkkaTransporter其protocol为AkkaProtocol;其tell方法根据url找到actorSelection,通过actorSelection的tell发送请求;ask方法使用的是Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS))
HttpVertxCSInitializer
tech/powerjob/remote/http/HttpVertxCSInitializer.java
@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {
    private Vertx vertx;
    private HttpServer httpServer;
    private HttpClient httpClient;
    private CSInitializerConfig config;
    @Override
    public String type() {
        return tech.powerjob.common.enums.Protocol.HTTP.name();
    }
    @Override
    public void init(CSInitializerConfig config) {
        this.config = config;
        vertx = VertxInitializer.buildVertx();
        httpServer = VertxInitializer.buildHttpServer(vertx);
        httpClient = VertxInitializer.buildHttpClient(vertx);
    }
    @Override
    public Transporter buildTransporter() {
        return new VertxTransporter(httpClient);
    }
    @Override
    @SneakyThrows
    public void bindHandlers(List<ActorInfo> actorInfos) {
        Router router = Router.router(vertx);
        // 处理请求响应
        router.route().handler(BodyHandler.create());
        actorInfos.forEach(actorInfo -> {
            Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
                String handlerHttpPath = handlerInfo.getLocation().toPath();
                ProcessType processType = handlerInfo.getAnno().processType();
                Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
                Route route = router.post(handlerHttpPath);
                if (processType == ProcessType.BLOCKING) {
                    route.blockingHandler(routingContextHandler, false);
                } else {
                    route.handler(routingContextHandler);
                }
            });
        });
        // 启动 vertx http server
        final int port = config.getBindAddress().getPort();
        final String host = config.getBindAddress().getHost();
        httpServer.requestHandler(router)
                .exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
                .listen(port, host)
                .toCompletionStage()
                .toCompletableFuture()
                .get(1, TimeUnit.MINUTES);
        log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");
    }
    private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {
        Method method = handlerInfo.getMethod();
        Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());
        // 内部框架,严格模式,绑定失败直接报错
        if (!powerSerializeClz.isPresent()) {
            throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());
        }
        return ctx -> {
            final RequestBody body = ctx.body();
            final Object convertResult = body.asPojo(powerSerializeClz.get());
            try {
                Object response = method.invoke(actorInfo.getActor(), convertResult);
                if (response != null) {
                    if (response instanceof String) {
                        ctx.end((String) response);
                    } else {
                        ctx.json(JsonObject.mapFrom(response));
                    }
                    return;
                }
                ctx.end();
            } catch (Throwable t) {
                // 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式
                log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);
                ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);
            }
        };
    }
    @Override
    public void close() throws IOException {
        httpClient.close();
        httpServer.close();
        vertx.close();
    }
}
HttpVertxCSInitializer的type类型为HTTP,其init方法主要是通过VertxInitializer.buildVertx()构建vertx,并通过VertxInitializer.buildHttpServer(vertx)构建httpServer,通过VertxInitializer.buildHttpClient(vertx)构建httpClient;其buildTransporter返回的是VertxTransporter;其bindHandlers主要是通过actorInfo去注册vertx的路由及handler;其close方法依次关闭httpClient、httpServer、vertx
VertxTransporter
tech/powerjob/remote/http/vertx/VertxTransporter.java
public class VertxTransporter implements Transporter {
    private final HttpClient httpClient;
    private static final Protocol PROTOCOL = new HttpProtocol();
    public VertxTransporter(HttpClient httpClient) {
        this.httpClient = httpClient;
    }
    @Override
    public Protocol getProtocol() {
        return PROTOCOL;
    }
    @Override
    public void tell(URL url, PowerSerializable request) {
        post(url, request, null);
    }
    @Override
    public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
        return post(url, request, clz);
    }
    @SuppressWarnings("unchecked")
    private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
        final String host = url.getAddress().getHost();
        final int port = url.getAddress().getPort();
        final String path = url.getLocation().toPath();
        RequestOptions requestOptions = new RequestOptions()
                .setMethod(HttpMethod.POST)
                .setHost(host)
                .setPort(port)
                .setURI(path);
        // 获取远程服务器的HTTP连接
        Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
        // 转换 -> 发送请求获取响应
        Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->
            httpClientRequest
                .putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
                .send(JsonObject.mapFrom(request).toBuffer())
        );
        return responseFuture.compose(httpClientResponse -> {
            // throw exception
            final int statusCode = httpClientResponse.statusCode();
            if (statusCode != HttpResponseStatus.OK.code()) {
                // CompletableFuture.get() 时会传递抛出该异常
                throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
                       host, port, path, statusCode, httpClientResponse.statusMessage()
                        ));
            }
            return httpClientResponse.body().compose(x -> {
                if (clz == null) {
                    return Future.succeededFuture(null);
                }
                if (clz.equals(String.class)) {
                    return Future.succeededFuture((T) x.toString());
                }
                return Future.succeededFuture(x.toJsonObject().mapTo(clz));
            });
        }).toCompletionStage();
    }
}
VertxTransporter的protocol为HttpProtocol,其tell方法使用的是不需要返回值的post,其ask方法也是调用post方法,只不过其设定了返回值类型
小结
PowerJob的RemoteEngine定义了start及close方法,其中start接收EngineConfig参数返回EngineOutput;PowerJobRemoteEngine定义了csInitializer,其start方法先通过ActorFactory.load(engineConfig.getActorList())加载actorInfos,再通过CSInitializerFactory.build(engineType)构建csInitializer,然后执行其init方法;接着通过csInitializer.buildTransporter()构建transporter;最后将actorInfos绑定到csInitializer;其close方法关闭csInitializer。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!