Trino:分区表上的SQL提交 & 查询流程浅析

2024-01-03 05:56:45

SQL提交&注册

通过/v1/statement/queued API向coordinator提交新的Query,会首先将此query放入QueryManager的缓存池中,然后返回给客户端下一次应该访问的地址。
客户端提交SQL成功后,会立即调用queued/{queryId}/{slug}/{token} REST API,轮询SQL的执行状态。

public class QueuedStatementResource {
    @ResourceSecurity(AUTHENTICATED_USER)
    @POST
    @Produces(APPLICATION_JSON)
    public Response postStatement(
            String statement,
            @Context HttpServletRequest servletRequest,
            @Context HttpHeaders httpHeaders,
            @Context UriInfo uriInfo)
    {
        if (isNullOrEmpty(statement)) {
            throw badRequest(BAD_REQUEST, "SQL statement is empty");
        }
        // 注册新的query这里仅仅是创建Query实例,并添加到QueryManager的缓存池中
        Query query = registerQuery(statement, servletRequest, httpHeaders);

        return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo));
    }

    private Query registerQuery(String statement, HttpServletRequest servletRequest, HttpHeaders httpHeaders)
    {
        Optional<String> remoteAddress = Optional.ofNullable(servletRequest.getRemoteAddr());
        Optional<Identity> identity = Optional.ofNullable((Identity) servletRequest.getAttribute(AUTHENTICATED_IDENTITY));
        MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders();

        SessionContext sessionContext = sessionContextFactory.createSessionContext(headers, alternateHeaderName, remoteAddress, identity);
        // 创建一个SQL实例,维护当前SQL生命周期内的各种信息
        Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory);
        // 将Query实例注册到QueryManager
        queryManager.registerQuery(query);

        // let authentication filter know that identity lifecycle has been handed off
        servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null);

        return query;
    }
}

Query类

维护SQL运行时状态,可以通过此类获取SQL运行期间的状态信息;同时也负责与Client交互,提供对SQL任务的管理能力。

    private static final class Query
    {
        private final String query;
        private final SessionContext sessionContext;
        private final DispatchManager dispatchManager;
        private final QueryId queryId;
        private final Optional<URI> queryInfoUrl;
        private final Slug slug = Slug.createNew();
        private final AtomicLong lastToken = new AtomicLong();

        private final long initTime = System.nanoTime();
        private final AtomicReference<Boolean> submissionGate = new AtomicReference<>();
        private final SettableFuture<Void> creationFuture = SettableFuture.create();

        public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, QueryInfoUrlFactory queryInfoUrlFactory)
        {
            this.query = requireNonNull(query, "query is null");
            this.sessionContext = requireNonNull(sessionContext, "sessionContext is null");
            this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
            this.queryId = dispatchManager.createQueryId();
            requireNonNull(queryInfoUrlFactory, "queryInfoUrlFactory is null");
            this.queryInfoUrl = queryInfoUrlFactory.getQueryInfoUrl(queryId);
        }

        public boolean isCreated()
        {
            return creationFuture.isDone();
        }
        
        private ListenableFuture<Void> waitForDispatched()
        {
            // 只能调用`queued/{queryId}/{slug}/{token}` REST API,获取SQL任务的状态时,才会调用此方法,触发当前SQL任务的提交
            submitIfNeeded();
            if (!creationFuture.isDone()) {
                return nonCancellationPropagating(creationFuture);
            }
            // otherwise, wait for the query to finish
            return dispatchManager.waitForDispatched(queryId);
        }

        private void submitIfNeeded()
        {
            if (submissionGate.compareAndSet(null, true)) {
                // 尝试向dispatcherManager提交一个SQL任务
                creationFuture.setFuture(dispatchManager.createQuery(queryId, slug, sessionContext, query));
            }
        }
        
        public QueryResults getQueryResults(long token, UriInfo uriInfo)
        {
           // 客户端获取结果
        }

        public void cancel()
        {
            creationFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor());
        }

        public void destroy()
        {
            sessionContext.getIdentity().destroy();
        }
}

QueuedStatementResource.QueryManager类

负责维护所有活着的Query实例,为REST API提供快速获取Query功能;同时也负责检查客户端提交超时逻辑,详见tryAbandonSubmissionWithTimeout(clientTimeout)的检查条件。
对于Server来说,只有触发了Query::waitForDispatched()方法,才将任务的状态设置为submitted。那如果客户端提交一个SQL执行后失联,肯定不会再调用REST API获取SQL的执行状态了,因此就不可能触发这个方法,这个期间段就被算作提交时间。

    @ThreadSafe
    private static class QueryManager
    {
        private final ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<>();
        private final ScheduledExecutorService scheduledExecutorService = newSingleThreadScheduledExecutor(daemonThreadsNamed("drain-state-query-manager"));

        private final Duration querySubmissionTimeout;

        public QueryManager(Duration querySubmissionTimeout)
        {
            this.querySubmissionTimeout = requireNonNull(querySubmissionTimeout, "querySubmissionTimeout is null");
        }

        public void initialize(DispatchManager dispatchManager)
        {
            scheduledExecutorService.scheduleWithFixedDelay(() -> syncWith(dispatchManager), 200, 200, MILLISECONDS);
        }

        private void syncWith(DispatchManager dispatchManager)
        {
            queries.forEach((queryId, query) -> {
                if (shouldBePurged(dispatchManager, query)) {
                    removeQuery(queryId);
                }
            });
        }

        private boolean shouldBePurged(DispatchManager dispatchManager, Query query)
        {
            if (query.isSubmissionAbandoned()) {
                // Query submission was explicitly abandoned
                return true;
            }
            if (query.tryAbandonSubmissionWithTimeout(querySubmissionTimeout)) {
                // Query took too long to be submitted by the client
                return true;
            }
            if (query.isCreated() && !dispatchManager.isQueryRegistered(query.getQueryId())) {
                // Query was created in the DispatchManager, and DispatchManager has already purged the query
                return true;
            }
            return false;
        }

        private void removeQuery(QueryId queryId)
        {
            Optional.ofNullable(queries.remove(queryId))
                    .ifPresent(QueryManager::destroyQuietly);
        }

        public void registerQuery(Query query)
        {
            Query existingQuery = queries.putIfAbsent(query.getQueryId(), query);
            checkState(existingQuery == null, "Query already registered");
        }

        @Nullable
        public Query getQuery(QueryId queryId)
        {
            return queries.get(queryId);
        }
    }

Query实例的调度

只有当前客户端尝试获取SQL的执行状态时,才会触发SQL任务的提交,提交到。

public class DispatchManager {
    public ListenableFuture<Void> createQuery(QueryId queryId, Slug slug, SessionContext sessionContext, String query)
    {
        requireNonNull(queryId, "queryId is null");
        requireNonNull(sessionContext, "sessionContext is null");
        requireNonNull(query, "query is null");
        checkArgument(!query.isEmpty(), "query must not be empty string");
        checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(), "query %s already exists", queryId);

        // It is important to return a future implementation which ignores cancellation request.
        // Using NonCancellationPropagatingFuture is not enough; it does not propagate cancel to wrapped future
        // but it would still return true on call to isCancelled() after cancel() is called on it.
        DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
        // 异步创建
        dispatchExecutor.execute(() -> {
            try {
                createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
            }
            finally {
                queryCreationFuture.set(null);
            }
        });
        return queryCreationFuture;
    }
    private <C> void createQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager)
    {
        Session session = null;
        PreparedQuery preparedQuery = null;
        try {
            if (query.length() > maxQueryLength) {
                int queryLength = query.length();
                query = query.substring(0, maxQueryLength);
                throw new TrinoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));
            }

            // decode session
            session = sessionSupplier.createSession(queryId, sessionContext);

            // check query execute permissions
            accessControl.checkCanExecuteQuery(sessionContext.getIdentity());

            // prepare query
            // 对用户SQL进行Parsing,产生AST实例
            preparedQuery = queryPreparer.prepareQuery(session, query);

            // select resource group
            Optional<String> queryType = getQueryType(preparedQuery.getStatement()).map(Enum::name);
            // 如果没有配置ResourceGroup的分配策略,则默认会将当前SQL分析到全局队列中,所有的SQL共享集群
            SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
                    sessionContext.getIdentity().getPrincipal().isPresent(),
                    sessionContext.getIdentity().getUser(),
                    sessionContext.getIdentity().getGroups(),
                    sessionContext.getSource(),
                    sessionContext.getClientTags(),
                    sessionContext.getResourceEstimates(),
                    queryType));

            // apply system default session properties (does not override user set properties)
            session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, queryType, selectionContext.getResourceGroupId());

            // mark existing transaction as active
            transactionManager.activateTransaction(session, isTransactionControlStatement(preparedQuery.getStatement()), accessControl);
            // 将query和preparedQuery封装成一个DispatchQuery实例,实际上是一个LocalDispatchQuery类的实例,这个过程是异步的。
            // 它提供了如下的方法,帮助上层获取任务的调度状态。
            //     ListenableFuture<Void> getDispatchedFuture();
            //     DispatchInfo getDispatchInfo();
            //
            // Trino中SQL执行的每一个阶段基本上都是异步的,为了能够在异步情况下正确管理Query的生命周期,都需要在相应的阶段创建一个
            // 对应的实例,例如这里的DispatchQuery。
            DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
                    session,
                    query,
                    preparedQuery,
                    slug,
                    selectionContext.getResourceGroupId());
            // DispatchQuery一旦创建成功,就会将这个对象添加到QueryTracker对象中的,由它管理SQL的执行生命周期
            boolean queryAdded = queryCreated(dispatchQuery);
            if (queryAdded && !dispatchQuery.isDone()) {
                // 如果SQL成功被添加进了QueryTracker,但是dispatchQuery还没有完成创建,则先将它放进提交到resource group中,等待被调度
                try {
                    resourceGroupManager.submit(dispatchQuery, selectionContext, dispatchExecutor);
                }
                catch (Throwable e) {
                    // dispatch query has already been registered, so just fail it directly
                    dispatchQuery.fail(e);
                }
            }
        }
        catch (Throwable throwable) {
            // creation must never fail, so register a failed query in this case
            if (session == null) {
                session = Session.builder(sessionPropertyManager)
                        .setQueryId(queryId)
                        .setIdentity(sessionContext.getIdentity())
                        .setSource(sessionContext.getSource().orElse(null))
                        .build();
            }
            // 如果发生了任务异常,会创建一个FailedDispatchQuery的实例,记录失败的种信息。
            Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);
            DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql, Optional.empty(), throwable);
            queryCreated(failedDispatchQuery);
        }
    }
}

创建LocalDispatchQuery实例

public class LocalDispatchQueryFactory
        implements DispatchQueryFactory
{
    @Override
    public DispatchQuery createDispatchQuery(
            Session session,
            String query,
            PreparedQuery preparedQuery,
            Slug slug,
            ResourceGroupId resourceGroup)
    {
        WarningCollector warningCollector = warningCollectorFactory.create();
        // 为新提交的Query实例,创建一个新的状态机
        QueryStateMachine stateMachine = QueryStateMachine.begin(
                query,
                preparedQuery.getPrepareSql(),
                session,
                locationFactory.createQueryLocation(session.getQueryId()),
                resourceGroup,
                isTransactionControlStatement(preparedQuery.getStatement()),
                transactionManager,
                accessControl,
                executor,
                metadata,
                warningCollector,
                getQueryType(preparedQuery.getStatement()));

        // It is important that `queryCreatedEvent` is called here. Moving it past the `executor.submit` below
        // can result in delivering query-created event after query analysis has already started.
        // That can result in misbehaviour of plugins called during analysis phase (e.g. access control auditing)
        // which depend on the contract that event was already delivered.
        //
        // Note that for immediate and in-order delivery of query events we depend on synchronous nature of
        // QueryMonitor and EventListenerManager.
        queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty()));
        // 异步的方式,创建QueryExecution实例,实际上是SqlQueryExecution的实例
        ListenableFuture<QueryExecution> queryExecutionFuture = executor.submit(() -> {
            QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(preparedQuery.getStatement().getClass());
            if (queryExecutionFactory == null) {
                throw new TrinoException(NOT_SUPPORTED, "Unsupported statement type: " + preparedQuery.getStatement().getClass().getSimpleName());
            }

            try {
                // 创建
                return queryExecutionFactory.createQueryExecution(preparedQuery, stateMachine, slug, warningCollector);
            }
            catch (Throwable e) {
                if (e instanceof Error) {
                    if (e instanceof StackOverflowError) {
                        log.error(e, "Unhandled StackOverFlowError; should be handled earlier; to investigate full stacktrace you may need to enable -XX:MaxJavaStackTraceDepth=0 JVM flag");
                    }
                    else {
                        log.error(e, "Unhandled Error");
                    }
                    // wrapping as RuntimeException to guard us from problem that code downstream which investigates queryExecutionFuture may not necessarily handle
                    // Error subclass of Throwable well.
                    RuntimeException wrappedError = new RuntimeException(e);
                    stateMachine.transitionToFailed(wrappedError);
                    throw wrappedError;
                }
                stateMachine.transitionToFailed(e);
                throw e;
            }
        });
        // 返回LocalDispatchQuery的实例,可以看到这个实例,会有接收queryExecutionFuture变量,意味着只有当queryExecutionFuture.isDone()时,
        // 才标识着此实例创建完成。
        return new LocalDispatchQuery(
                stateMachine,
                queryExecutionFuture,
                queryMonitor,
                clusterSizeMonitor,
                executor,
                // queryManager是一个SqlQueryManager的实例对象,它内部维护着QueryTracker的引用,因此可以在更上层管理SQL任务的生命周期
                queryManager::createQuery);
    }
}

创建SqlQueryExecution实例

通过SqlQueryExecutionFactory.createQueryExecution()创建对象。

@ThreadSafe
public class SqlQueryExecution
        implements QueryExecution
{
    private SqlQueryExecution(
            PreparedQuery preparedQuery,
            QueryStateMachine stateMachine,
            Slug slug,
            PlannerContext plannerContext,
            AnalyzerFactory analyzerFactory,
            SplitSourceFactory splitSourceFactory,
            NodePartitioningManager nodePartitioningManager,
            NodeScheduler nodeScheduler,
            List<PlanOptimizer> planOptimizers,
            PlanFragmenter planFragmenter,
            RemoteTaskFactory remoteTaskFactory,
            int scheduleSplitBatchSize,
            ExecutorService queryExecutor,
            ScheduledExecutorService schedulerExecutor,
            FailureDetector failureDetector,
            NodeTaskMap nodeTaskMap,
            ExecutionPolicy executionPolicy,
            SplitSchedulerStats schedulerStats,
            StatsCalculator statsCalculator,
            CostCalculator costCalculator,
            DynamicFilterService dynamicFilterService,
            WarningCollector warningCollector,
            TableExecuteContextManager tableExecuteContextManager,
            TypeAnalyzer typeAnalyzer,
            TaskManager coordinatorTaskManager)
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
            this.slug = requireNonNull(slug, "slug is null");
            this.plannerContext = requireNonNull(plannerContext, "plannerContext is null");
            this.splitSourceFactory = requireNonNull(splitSourceFactory, "splitSourceFactory is null");
            this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null");
            this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
            this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null");
            this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null");
            this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
            this.schedulerExecutor = requireNonNull(schedulerExecutor, "schedulerExecutor is null");
            this.failureDetector = requireNonNull(failureDetector, "failureDetector is null");
            this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
            this.executionPolicy = requireNonNull(executionPolicy, "executionPolicy is null");
            this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
            this.statsCalculator = requireNonNull(statsCalculator, "statsCalculator is null");
            this.costCalculator = requireNonNull(costCalculator, "costCalculator is null");
            this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null");
            this.tableExecuteContextManager = requireNonNull(tableExecuteContextManager, "tableExecuteContextManager is null");

            checkArgument(scheduleSplitBatchSize > 0, "scheduleSplitBatchSize must be greater than 0");
            this.scheduleSplitBatchSize = scheduleSplitBatchSize;
            // 保存状态机的引用
            this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");

            // analyze query
            // preparedQuery保存了SQL文本Parsing后的Statement(AST),因此这里基于此对象,对AST进行解析
            this.analysis = analyze(preparedQuery, stateMachine, warningCollector, analyzerFactory);
            // 向状态机注册Listener,一旦状态机的状态被设置为完成状态,就注销dynamicFilterService服务,这个服务的任务会在其它文章中详解。
            stateMachine.addStateChangeListener(state -> {
                if (!state.isDone()) {
                    return;
                }
                unregisterDynamicFilteringQuery(
                        dynamicFilterService.getDynamicFilteringStats(stateMachine.getQueryId(), stateMachine.getSession()));

                tableExecuteContextManager.unregisterTableExecuteContextForQuery(stateMachine.getQueryId());
            });

            // when the query finishes cache the final query info, and clear the reference to the output stage
            AtomicReference<SqlQueryScheduler> queryScheduler = this.queryScheduler;
            stateMachine.addStateChangeListener(state -> {
                if (!state.isDone()) {
                    return;
                }

                // query is now done, so abort any work that is still running
                // 失败是完成状态的一种
                SqlQueryScheduler scheduler = queryScheduler.get();
                if (scheduler != null) {
                    scheduler.abort();
                }
            });

            this.remoteTaskFactory = new MemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine);
            this.typeAnalyzer = requireNonNull(typeAnalyzer, "typeAnalyzer is null");
            this.coordinatorTaskManager = requireNonNull(coordinatorTaskManager, "coordinatorTaskManager is null");
        }
    }
}

LocalDispatchQuery的创建及执行

实际上就是QueryExecution实例的执行,进入这个过程,实际上还需要经过ResourceGroup的筛选,筛选细节不是这里的重点,
因此略过,只需要知道ResourceGroup最终会调用LocalDispatchQuery::startWaitingForResources方法。

资源检测

public class LocalDispatchQuery
        implements DispatchQuery
{
    @Override
    public void startWaitingForResources()
    {
        // 将状态机的状态设置为WAITING_RESOURCES
        if (stateMachine.transitionToWaitingForResources()) {
            waitForMinimumWorkers();
        }
    }

    private void waitForMinimumWorkers()
    {
        // 只有当有足够的Workers结点时,才会开始执行queryExecution实例,但由于我们没有修改默认的参数
        // 因此这里的限制条件是,一旦有1个Worker可用,就会触发startExecution(queryExecution)的调用。
        // wait for query execution to finish construction
        addSuccessCallback(queryExecutionFuture, queryExecution -> {
            Session session = stateMachine.getSession();
            int executionMinCount = 1; // always wait for 1 node to be up
            if (queryExecution.shouldWaitForMinWorkers()) {
                executionMinCount = getRequiredWorkers(session);
            }
            ListenableFuture<Void> minimumWorkerFuture = clusterSizeMonitor.waitForMinimumWorkers(executionMinCount, getRequiredWorkersMaxWait(session));
            // when worker requirement is met, start the execution
            addSuccessCallback(minimumWorkerFuture, () -> startExecution(queryExecution));
            addExceptionCallback(minimumWorkerFuture, throwable -> queryExecutor.execute(() -> stateMachine.transitionToFailed(throwable)));

            // cancel minimumWorkerFuture if query fails for some reason or is cancelled by user
            stateMachine.addStateChangeListener(state -> {
                if (state.isDone()) {
                    minimumWorkerFuture.cancel(true);
                }
            });
        });
    }
    
    private void startExecution(QueryExecution queryExecution)
    {
        queryExecutor.execute(() -> {
            // 将状态机的状态设置为DISPATCHING
            if (stateMachine.transitionToDispatching()) {
                try {
                    // 提交给querySubmitter,就是在前面提到的queryManager::createQuery方法,最终会路由到SqlQueryExecution::start方法
                    querySubmitter.accept(queryExecution);
                    if (notificationSentOrGuaranteed.compareAndSet(false, true)) {
                        queryExecution.addFinalQueryInfoListener(queryMonitor::queryCompletedEvent);
                    }
                }
                catch (Throwable t) {
                    // this should never happen but be safe
                    stateMachine.transitionToFailed(t);
                    log.error(t, "query submitter threw exception");
                    throw t;
                }
                finally {
                    submitted.set(null);
                }
            }
        });
    }
}

SqlQueryExecution::start()

@ThreadSafe
public class SqlQueryExecution
        implements QueryExecution
{
    @Override
    public void start()
    {
        try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
            try {
                // 将状态机的状态设置为PlANNING
                if (!stateMachine.transitionToPlanning()) {
                    // query already started or finished
                    return;
                }
                // 启动监听线程,一旦在发现状态机的状态处理失败状态,则强制中止PLANNING
                AtomicReference<Thread> planningThread = new AtomicReference<>(currentThread());
                stateMachine.getStateChange(PLANNING).addListener(() -> {
                    if (stateMachine.getQueryState() == FAILED) {
                        synchronized (this) {
                            Thread thread = planningThread.get();
                            if (thread != null) {
                                thread.interrupt();
                            }
                        }
                    }
                }, directExecutor());

                try {
                    // 优化逻辑计划树,并切分为PlanFragments,以便能够调度Plan片段执行
                    PlanRoot plan = planQuery();
                    // DynamicFilterService needs plan for query to be registered.
                    // Query should be registered before dynamic filter suppliers are requested in distribution planning.
                    // 注册动态裁剪服务
                    registerDynamicFilteringQuery(plan);
                    // 调度plan执行,内部会创建SqlQueryScheduler实例,负责调度PlanFragments的分发和状态管理,这个过程是异步的
                    planDistribution(plan);
                }
                finally {
                    synchronized (this) {
                        planningThread.set(null);
                        // Clear the interrupted flag in case there was a race condition where
                        // the planning thread was interrupted right after planning completes above
                        Thread.interrupted();
                    }
                }

                tableExecuteContextManager.registerTableExecuteContextForQuery(getQueryId());
                // 将状态机的状态设置为STARTING
                if (!stateMachine.transitionToStarting()) {
                    // query already started or finished
                    return;
                }

                // if query is not finished, start the scheduler, otherwise cancel it
                SqlQueryScheduler scheduler = queryScheduler.get();

                if (!stateMachine.isDone()) {
                    // 调用SqlQueryScheduler::start()方法,开始调度执行
                    scheduler.start();
                }
            }
            catch (Throwable e) {
                fail(e);
                throwIfInstanceOf(e, Error.class);
            }
        }
    }
}

SqlQueryExecution::planDistribution

@ThreadSafe
public class SqlQueryExecution
        implements QueryExecution
{
    private void planDistribution(PlanRoot plan)
    {
        // if query was canceled, skip creating scheduler
        if (stateMachine.isDone()) {
            return;
        }

        // record output field
        PlanFragment rootFragment = plan.getRoot().getFragment();
        stateMachine.setColumns(
                ((OutputNode) rootFragment.getRoot()).getColumnNames(),
                rootFragment.getTypes());

        // build the stage execution objects (this doesn't schedule execution)
        SqlQueryScheduler scheduler = new SqlQueryScheduler(
                stateMachine,
                plan.getRoot(),
                nodePartitioningManager,
                nodeScheduler,
                remoteTaskFactory,
                plan.isSummarizeTaskInfos(),
                scheduleSplitBatchSize,
                queryExecutor,
                schedulerExecutor,
                failureDetector,
                nodeTaskMap,
                executionPolicy,
                schedulerStats,
                dynamicFilterService,
                tableExecuteContextManager,
                plannerContext.getMetadata(),
                splitSourceFactory,
                coordinatorTaskManager);

        queryScheduler.set(scheduler);

        // if query was canceled during scheduler creation, abort the scheduler
        // directly since the callback may have already fired
        if (stateMachine.isDone()) {
            scheduler.abort();
            queryScheduler.set(null);
        }
    }
}
生成StageManager实例,同时为每一个PlanFragment生成SqlStage实例

SqlStage负责维护跟踪所有归属它的任务的生命周期管理,以及状态维护

public class SqlQueryScheduler
{
    private static class StageManager
    {
        private static StageManager create(
                QueryStateMachine queryStateMachine,
                Session session,
                Metadata metadata,
                RemoteTaskFactory taskFactory,
                NodeTaskMap nodeTaskMap,
                ExecutorService executor,
                SplitSchedulerStats schedulerStats,
                SubPlan planTree,
                boolean summarizeTaskInfo)
        {
            ImmutableMap.Builder<StageId, SqlStage> stages = ImmutableMap.builder();
            ImmutableList.Builder<SqlStage> coordinatorStagesInTopologicalOrder = ImmutableList.builder();
            ImmutableList.Builder<SqlStage> distributedStagesInTopologicalOrder = ImmutableList.builder();
            StageId rootStageId = null;
            ImmutableMap.Builder<StageId, Set<StageId>> children = ImmutableMap.builder();
            ImmutableMap.Builder<StageId, StageId> parents = ImmutableMap.builder();
            // 从Root Plan自顶向下、广度优先遍历,获取所有的SubPlans
            for (SubPlan planNode : Traverser.forTree(SubPlan::getChildren).breadthFirst(planTree)) {
                PlanFragment fragment = planNode.getFragment();
                // 一个SubPlan或是PlanFragment就是一个Stage(同Spark中的概念相近),StageId的取值为{queryId}-{fragmentId}
                SqlStage stage = createSqlStage(
                        getStageId(session.getQueryId(), fragment.getId()),
                        fragment,
                        extractTableInfo(session, metadata, fragment),
                        taskFactory,
                        session,
                        summarizeTaskInfo,
                        nodeTaskMap,
                        executor,
                        schedulerStats);
                StageId stageId = stage.getStageId();
                stages.put(stageId, stage);
                // 以拓扑序,维护所有的Stages
                if (fragment.getPartitioning().isCoordinatorOnly()) {
                    coordinatorStagesInTopologicalOrder.add(stage);
                }
                else {
                    distributedStagesInTopologicalOrder.add(stage);
                }
                // 由于外层遍历是自顶向下的,因此每一个Stage就是最上游的Stage,即root stage
                if (rootStageId == null) {
                    rootStageId = stageId;
                }
                // 维护Stages之间的依赖关系
                Set<StageId> childStageIds = planNode.getChildren().stream()
                        .map(childStage -> getStageId(session.getQueryId(), childStage.getFragment().getId()))
                        .collect(toImmutableSet());
                children.put(stageId, childStageIds);
                childStageIds.forEach(child -> parents.put(child, stageId));
            }
          
            StageManager stageManager = new StageManager(
                    queryStateMachine,
                    stages.build(),
                    coordinatorStagesInTopologicalOrder.build(),
                    distributedStagesInTopologicalOrder.build(),
                    rootStageId,
                    children.build(),
                    parents.build());
            stageManager.initialize();
            return stageManager;
        }
    }
}

SqlQueryScheduler::start()

public class SqlQueryScheduler
{
    public synchronized void start()
    {
        if (started) {
            return;
        }
        started = true;

        if (queryStateMachine.isDone()) {
            return;
        }

        // when query is done or any time a stage completes, attempt to transition query to "final query info ready"
        queryStateMachine.addStateChangeListener(state -> {
            if (!state.isDone()) {
                return;
            }

            DistributedStagesScheduler distributedStagesScheduler;
            // synchronize to wait on distributed scheduler creation if it is currently in process
            synchronized (this) {
                distributedStagesScheduler = this.distributedStagesScheduler.get();
            }

            if (state == QueryState.FINISHED) {
                // 如果状态机的状态被设置为FINISHED,就取消所有正在调度的Stages
                coordinatorStagesScheduler.cancel();
                if (distributedStagesScheduler != null) {
                    distributedStagesScheduler.cancel();
                }
                // 通过StageManager完成
                stageManager.finish();
            }
            else if (state == QueryState.FAILED) {
                coordinatorStagesScheduler.abort();
                if (distributedStagesScheduler != null) {
                    distributedStagesScheduler.abort();
                }
                stageManager.abort();
            }

            queryStateMachine.updateQueryInfo(Optional.ofNullable(getStageInfo()));
        });
        // 调度Stages执行
        coordinatorStagesScheduler.schedule();

        Optional<DistributedStagesScheduler> distributedStagesScheduler = createDistributedStagesScheduler(currentAttempt.get());
        distributedStagesScheduler.ifPresent(scheduler -> distributedStagesSchedulingTask = executor.submit(scheduler::schedule, null));
    }
}

DistributedStagesScheduler::schedule()

DistributedStagesScheduler的创建

负责调度所有的SqlStages执行。

        public static PipelinedDistributedStagesScheduler create(
                QueryStateMachine queryStateMachine, // Query级别的状态机
                SplitSchedulerStats schedulerStats, // 记录Splits的调度信息
                NodeScheduler nodeScheduler,        // 负责为Split分配合适的Worker Node
                NodePartitioningManager nodePartitioningManager, // 提供获取对数据页Page进行Partitioning相关信息
                StageManager stageManager,          // 管理所有的Stages
                CoordinatorStagesScheduler coordinatorStagesScheduler, // 负责调度所有的Stages到Coordinator结点
                ExecutionPolicy executionPolicy,    // 执行策略器,AllAtOnceExecutionPolicy和PhasedExecutionPolicy
                FailureDetector failureDetector,
                ScheduledExecutorService executor,  // Stages调度时的线程池
                SplitSourceFactory splitSourceFactory, // 创建Source Splits的工厂类
                int splitBatchSize,                    // 一次调度的最大Splits数量
                DynamicFilterService dynamicFilterService,
                TableExecuteContextManager tableExecuteContextManager,
                RetryPolicy retryPolicy,
                int attempt)
        {
            // 由于DistributedStagesScheduler是负责Stages的调度器,这有别与QueryStateMachine的状态,因此这里要创建一个独立的状态机
            // 负责维护PipelinedDistributedStagesScheduler的状态
            DistributedStagesSchedulerStateMachine stateMachine = new DistributedStagesSchedulerStateMachine(queryStateMachine.getQueryId(), executor);
            // 使用Map以PartitioningHandle缓存所有的NodePartitionMap实例,由于PlanFragment对应的PartitioningHandle实例相同
            // 因此可以避免干次生成NodePartitionMap实例
            Map<PartitioningHandle, NodePartitionMap> partitioningCacheMap = new HashMap<>();
            // 根据具体的Connector提供的PartitioningHandle,生成NodePartitonMap:
            //     NodePartitonMap记录了WorkerNode -> PartitionId的映射关系,它的生成可以由Connector提供,
            //     例如IcebergPartitioningHandle,也可以使用系统默认的实现SystemPartitioningHandle。
            // 如何生成NodePartitionMap实例,见后面的子章节。
            Function<PartitioningHandle, NodePartitionMap> partitioningCache = partitioningHandle ->
                    partitioningCacheMap.computeIfAbsent(partitioningHandle, handle -> nodePartitioningManager.getNodePartitioningMap(queryStateMachine.getSession(), handle));
            // 为每一个PlanFragment实例,创建Bucket -> PartitionId的映射
            // Butcket即桶,类似Hive中的Bucket概念,是对一个数据Partition中的数据的进一步细化,因此一个Partition会包含多个buckets
            Map<PlanFragmentId, Optional<int[]>> bucketToPartitionMap = createBucketToPartitionMap(
                    coordinatorStagesScheduler.getBucketToPartitionForStagesConsumedByCoordinator(),
                    stageManager,
                    partitioningCache);
            // 为每一个PlanFragment创建OutputBufferManager实例,用于创建和维护这个Fragment的输出缓存区
            // OutputBufferManager分根据PartitioningHandle的不同类型,创建不一样的OutputBuffers,一共有如下三种:
            // BufferType type = 
            //   if partitioningHandle.equals(FIXED_BROADCAST_DISTRIBUTION) then BROADCAST;
            //   else if (partitioningHandle.equals(FIXED_ARBITRARY_DISTRIBUTION) then ARBITRARY;
            //   else PARTITIONED;
            Map<PlanFragmentId, OutputBufferManager> outputBufferManagers = createOutputBufferManagers(
                    coordinatorStagesScheduler.getOutputBuffersForStagesConsumedByCoordinator(),
                    stageManager,
                    bucketToPartitionMap);

            TaskLifecycleListener coordinatorTaskLifecycleListener = coordinatorStagesScheduler.getTaskLifecycleListener();
            if (retryPolicy != RetryPolicy.NONE) {
                // when retries are enabled only close exchange clients on coordinator when the query is finished
                TaskLifecycleListenerBridge taskLifecycleListenerBridge = new TaskLifecycleListenerBridge(coordinatorTaskLifecycleListener);
                coordinatorTaskLifecycleListener = taskLifecycleListenerBridge;
                stateMachine.addStateChangeListener(state -> {
                    if (state == DistributedStagesSchedulerState.FINISHED) {
                        taskLifecycleListenerBridge.notifyNoMoreSourceTasks();
                    }
                });
            }
			// 为所有的要调度的Stages创建对应的PipelinedStageExecution实例,每一个PipelinedStageExecution实例则负责各自的Stage的生命周期管理
            Map<StageId, PipelinedStageExecution> stageExecutions = new HashMap<>();
            for (SqlStage stage : stageManager.getDistributedStagesInTopologicalOrder()) {
                Optional<SqlStage> parentStage = stageManager.getParent(stage.getStageId());
                // TaskLifecycleListener提供了为Stage创建任务的接口
                TaskLifecycleListener taskLifecycleListener;
                if (parentStage.isEmpty() || parentStage.get().getFragment().getPartitioning().isCoordinatorOnly()) {
                    // output will be consumed by coordinator
                    // parentStage是Root或是PlanFragment的分区策略是仅位于Coordiantor时,设置这个Stage的生命周期为Coordiator
                    taskLifecycleListener = coordinatorTaskLifecycleListener;
                }
                else {
                    // 非Root Stage时,则获取已经绑定了的实例
                    StageId parentStageId = parentStage.get().getStageId();
                    PipelinedStageExecution parentStageExecution = requireNonNull(stageExecutions.get(parentStageId), () -> "execution is null for stage: " + parentStageId);
                    taskLifecycleListener = parentStageExecution.getTaskLifecycleListener();
                }

                PlanFragment fragment = stage.getFragment();
                // 创建PipelinedStageExecution,负责执行调度&执行当前Stage,会为每一个Partition创建RemoteTask实例,并调度到相应的Worker Node执行
                PipelinedStageExecution stageExecution = createPipelinedStageExecution(
                        stageManager.get(fragment.getId()),
                        outputBufferManagers,
                        taskLifecycleListener,
                        failureDetector,
                        executor,
                        bucketToPartitionMap.get(fragment.getId()),
                        attempt);
                stageExecutions.put(stage.getStageId(), stageExecution);
            }

            ImmutableMap.Builder<StageId, StageScheduler> stageSchedulers = ImmutableMap.builder();
            for (PipelinedStageExecution stageExecution : stageExecutions.values()) {
                List<PipelinedStageExecution> children = stageManager.getChildren(stageExecution.getStageId()).stream()
                        .map(stage -> requireNonNull(stageExecutions.get(stage.getStageId()), () -> "stage execution not found for stage: " + stage))
                        .collect(toImmutableList());
                // 每一个StageExecution实例,创建对应的StageScheduler实例,负责当前Stage的调度执行,Trino实现了几个不同实现类:
                //  FixedSourcePartitionedScheduler
                //  FixedCountScheduler
                //  () -> SourcePartitionedScheduler
                StageScheduler scheduler = createStageScheduler(
                        queryStateMachine,
                        stageExecution,
                        splitSourceFactory,
                        children,
                        partitioningCache,
                        nodeScheduler,
                        nodePartitioningManager,
                        splitBatchSize,
                        dynamicFilterService,
                        executor,
                        tableExecuteContextManager);
                stageSchedulers.put(stageExecution.getStageId(), scheduler);
            }
            // 创建PipelinedDistributedStagesScheduler实例,负责所有的Stages的调度执行
            PipelinedDistributedStagesScheduler distributedStagesScheduler = new PipelinedDistributedStagesScheduler(
                    stateMachine,
                    queryStateMachine,
                    schedulerStats,
                    stageManager,
                    executionPolicy.createExecutionSchedule(stageExecutions.values()),
                    stageSchedulers.build(),
                    ImmutableMap.copyOf(stageExecutions),
                    dynamicFilterService);
            distributedStagesScheduler.initialize();
            return distributedStagesScheduler;
        }
        
        /**
         * 为每一个PlanFragment计算bucketToPartition的映射关系。
         */
        private static Map<PlanFragmentId, Optional<int[]>> createBucketToPartitionMap(
                Map<PlanFragmentId, Optional<int[]>> bucketToPartitionForStagesConsumedByCoordinator,
                StageManager stageManager,
                Function<PartitioningHandle, NodePartitionMap> partitioningCache)
        {
            ImmutableMap.Builder<PlanFragmentId, Optional<int[]>> result = ImmutableMap.builder();
            // 忽略,只有在Coordinator上调度时,才会有值
            result.putAll(bucketToPartitionForStagesConsumedByCoordinator);
            for (SqlStage stage : stageManager.getDistributedStagesInTopologicalOrder()) {
                PlanFragment fragment = stage.getFragment();
                // 
                Optional<int[]> bucketToPartition = getBucketToPartition(fragment.getPartitioning(), partitioningCache, fragment.getRoot(), fragment.getRemoteSourceNodes());
                for (SqlStage childStage : stageManager.getChildren(stage.getStageId())) {
                    result.put(childStage.getFragment().getId(), bucketToPartition);
                }
            }
            return result.build();
        }
        
        private static Optional<int[]> getBucketToPartition(
                PartitioningHandle partitioningHandle,
                Function<PartitioningHandle, NodePartitionMap> partitioningCache,
                PlanNode fragmentRoot,
                List<RemoteSourceNode> remoteSourceNodes)
        {
            if (partitioningHandle.equals(SOURCE_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
                // SOURCE_DISTRIBUTION表示一个TableScan算子,而SCALED_WRITER_DISTRIBUTION表示Table Write算子
                // 因此这种类型的PlanFragment只会有一个分桶
                return Optional.of(new int[1]);
            }
            else if (searchFrom(fragmentRoot).where(node -> node instanceof TableScanNode).findFirst().isPresent()) {
                if (remoteSourceNodes.stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {
                    return Optional.empty();
                }
                else {
                    // remote source requires nodePartitionMap
                    // remote source类型的算子,需要从上游的PlanFragment读取分区的数据,因此bucket到partition的映射关系,需要
                    // 根据绑定的partitioningHandle得到,partitioningCache在之前已经被初始化过了
                    NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);
                    return Optional.of(nodePartitionMap.getBucketToPartition());
                }
            }
            else {
                // 其它类型,例如ARBITRARY_DISTRIBUTION、FIXED_HASH_DISTRIBUTION等,计算过程同remote source相似
                NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);
                List<InternalNode> partitionToNode = nodePartitionMap.getPartitionToNode();
                // todo this should asynchronously wait a standard timeout period before failing
                checkCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");
                return Optional.of(nodePartitionMap.getBucketToPartition());
            }
        }
}
创建NodePartitioningMap实例,维护bucket -> Partition -> Node的映射关系

此实例保存了两个重要的数据结构:
partitionToNode:Data Partition -> Worker Node的映射集合
bucketToPartition:Data Bucket -> Data Partition的映射集合
根据上面两个Map变量,可以做到根据分区键,计算每一个数据行的Bucket ID,就可以知道这一行数据归于哪个Partition,
进而知道应该分布到哪个Worker Node上

    public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHandle partitioningHandle)
    {
        requireNonNull(session, "session is null");
        requireNonNull(partitioningHandle, "partitioningHandle is null");
        if (partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle) {
            // 返回系统默认的对象
            return ((SystemPartitioningHandle) partitioningHandle.getConnectorHandle()).getNodePartitionMap(session, nodeScheduler);
        }
        // 获取Connector自己实现的Bucket -> Node的映射集合,Connector可以实现接口,定义buckets的数量,以及构建bucket到worker node映射
        // 由于我们讨论的Iceberg Connector,因此会createArbitraryBucketToNode(...)方法得到实例
        ConnectorBucketNodeMap connectorBucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle);
        // safety check for crazy partitioning
        checkArgument(connectorBucketNodeMap.getBucketCount() < 1_000_000, "Too many buckets in partitioning: %s", connectorBucketNodeMap.getBucketCount());

        List<InternalNode> bucketToNode;
        if (connectorBucketNodeMap.hasFixedMapping()) {
            bucketToNode = getFixedMapping(connectorBucketNodeMap);
        }
        else {
            CatalogName catalogName = partitioningHandle.getConnectorId()
                    .orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle));
            // Create a bucket to node mapping. Consecutive buckets are assigned
            // to shuffled nodes (e.g "1 -> node2, 2 -> node1, 3 -> node2, 4 -> node1, ...").
            // 这里必然有这样的不等式:buckets的数量 >= 可用的Workers的数量
            // Iceberg Connector仅仅定义了buckets数量,没有定义bucket到node映射关系,并且buckets的数量=活跃worker数量
            bucketToNode = createArbitraryBucketToNode(
                    nodeScheduler.createNodeSelector(session, Optional.of(catalogName)).allNodes(),
                    connectorBucketNodeMap.getBucketCount());
        }
        // 前面创建了bucket到worker的映射关系,下面就要构建Bucket与Partition的关系
        // 创建一个数组,大小为Buckets的数量,同时bucketToPartition[i]存放的是对应的PartitionId
        int[] bucketToPartition = new int[connectorBucketNodeMap.getBucketCount()];
        // BiMap,保证keys和values都各自不重复,也就意味着一个Worker Node唯一对应一个Partition
        BiMap<InternalNode, Integer> nodeToPartition = HashBiMap.create();
        int nextPartitionId = 0; // 初始值
        for (int bucket = 0; bucket < bucketToNode.size(); bucket++) {
            InternalNode node = bucketToNode.get(bucket);
            // bucketToNode中可能会存在重复的Value,即多个Bucket映射到多个Worker Node
            Integer partitionId = nodeToPartition.get(node);
            if (partitionId == null) {
                // 如果partitionId不存在,即找到了一个新的Worker,那么就递增partitionId
                // 不难看出在Trino内部,一个WorkerNode就是一个Partition
                partitionId = nextPartitionId++;
                nodeToPartition.put(node, partitionId);
            }
            // 记录bucketId到PartitionId的映射
            bucketToPartition[bucket] = partitionId;
        }
        // 收集所有的WorkerNode
        List<InternalNode> partitionToNode = IntStream.range(0, nodeToPartition.size())
                .mapToObj(partitionId -> nodeToPartition.inverse().get(partitionId))
                .collect(toImmutableList());
        // 返回实例
        return new NodePartitionMap(partitionToNode, bucketToPartition, getSplitToBucket(session, partitioningHandle));
    }
创建StageScheduler实例,负责一个Stage的调度执行
    private static class PipelinedDistributedStagesScheduler
            implements DistributedStagesScheduler
    {
        private static StageScheduler createStageScheduler(
                QueryStateMachine queryStateMachine,
                PipelinedStageExecution stageExecution,
                SplitSourceFactory splitSourceFactory,
                List<PipelinedStageExecution> childStageExecutions,
                Function<PartitioningHandle, NodePartitionMap> partitioningCache,
                NodeScheduler nodeScheduler,
                NodePartitioningManager nodePartitioningManager,
                int splitBatchSize,
                DynamicFilterService dynamicFilterService,
                ScheduledExecutorService executor,
                TableExecuteContextManager tableExecuteContextManager)
        {
            Session session = queryStateMachine.getSession();
            PlanFragment fragment = stageExecution.getFragment();
            PartitioningHandle partitioningHandle = fragment.getPartitioning();
            // 尝试为当前的Fragment,为每一个TableScanNode创建一个SplitSource实例,用于对数据源的数据进行切分,生成一系列的数据片段。
            // 对于Iceberg Connector来说,就是对DataFile进行切分,返回一批IcebergSplit。
            // SplitSource的提供的接口的最终调用会代理到IcebergSplitSource。
            // 在创建的过程中还会涉及到SplitManager的对象,不过不在这里解析了。
            Map<PlanNodeId, SplitSource> splitSources = splitSourceFactory.createSplitSources(session, fragment);
            if (!splitSources.isEmpty()) {
                queryStateMachine.addStateChangeListener(new StateChangeListener<>()
                {
                    private final AtomicReference<Collection<SplitSource>> splitSourcesReference = new AtomicReference<>(splitSources.values());

                    @Override
                    public void stateChanged(QueryState newState)
                    {
                        if (newState.isDone()) {
                            // ensure split sources are closed and release memory
                            Collection<SplitSource> sources = splitSourcesReference.getAndSet(null);
                            if (sources != null) {
                                closeSplitSources(sources);
                            }
                        }
                    }
                });
            }
            if (partitioningHandle.equals(SOURCE_DISTRIBUTION)) {
                // 如果当前PlanFragment的分区类型是SOURCE_DISTRIBUTION,说明这个Fragment是上游的SubPlan,负责从数据源加载数据
                // nodes are selected dynamically based on the constraints of the splits and the system load
                Entry<PlanNodeId, SplitSource> entry = getOnlyElement(splitSources.entrySet());
                PlanNodeId planNodeId = entry.getKey();
                SplitSource splitSource = entry.getValue();
                Optional<CatalogName> catalogName = Optional.of(splitSource.getCatalogName())
                        .filter(catalog -> !isInternalSystemConnector(catalog));
                NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, catalogName);
                // placementPolicy负责根据nodelSelector的实现,为Split分配合适的WorkerNode,
                SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stageExecution::getAllTasks);

                checkArgument(!fragment.getStageExecutionDescriptor().isStageGroupedExecution());
                // 返回一个封装了SourcePartitionedScheduler实例的对象
                return newSourcePartitionedSchedulerAsStageScheduler(
                        stageExecution,
                        planNodeId,
                        splitSource,
                        placementPolicy,
                        splitBatchSize,
                        dynamicFilterService,
                        tableExecuteContextManager,
                        () -> childStageExecutions.stream().anyMatch(PipelinedStageExecution::isAnyTaskBlocked));
            }
            else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
                // ...
                return scheduler;
            }
            else {
                // 如果不是包含TableScan的PlanFragment,比如是一个JOIN类型的Fragment,它存在如下三种情况
                //    left is Source, right is RemoteSource
                //    left is RemoteSource, right is RemoteSource
                //    left is Source, right is Source
                if (!splitSources.isEmpty()) {
                    // contains local source
                    List<PlanNodeId> schedulingOrder = fragment.getPartitionedSources();
                    Optional<CatalogName> catalogName = partitioningHandle.getConnectorId();
                    checkArgument(catalogName.isPresent(), "No connector ID for partitioning handle: %s", partitioningHandle);
                    List<ConnectorPartitionHandle> connectorPartitionHandles;
                    boolean groupedExecutionForStage = fragment.getStageExecutionDescriptor().isStageGroupedExecution();
                    // 如果一个Stage被标记为Grouped,这个Stage必须是被Partitioning了,因此可以等价地认为
                    // 一个Group就是一个Bucket,因此这个Group中的Splits都对应同一个Partition,又对应同一个Worker Node
                    if (groupedExecutionForStage) {
                        connectorPartitionHandles = nodePartitioningManager.listPartitionHandles(session, partitioningHandle);
                        checkState(!ImmutableList.of(NOT_PARTITIONED).equals(connectorPartitionHandles));
                    }
                    else {
                        // 如果不是分组
                        connectorPartitionHandles = ImmutableList.of(NOT_PARTITIONED);
                    }

                    BucketNodeMap bucketNodeMap;
                    List<InternalNode> stageNodeList;
                    if (fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {
                        // no remote source
                        boolean dynamicLifespanSchedule = fragment.getStageExecutionDescriptor().isDynamicLifespanSchedule();
                        bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, dynamicLifespanSchedule);

                        // verify execution is consistent with planner's decision on dynamic lifespan schedule
                        verify(bucketNodeMap.isDynamic() == dynamicLifespanSchedule);
                        // 如果Fragment仅包含本地的TableScanNode,那么所有可用的Worker结点,都是当前Stage可以被调度执行的结点
                        // 因此
                        stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, catalogName).allNodes());
                        Collections.shuffle(stageNodeList);
                    }
                    else {
                        // cannot use dynamic lifespan schedule
                        verify(!fragment.getStageExecutionDescriptor().isDynamicLifespanSchedule());

                        // remote source requires nodePartitionMap
                        NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);
                        if (groupedExecutionForStage) {
                            // 如果是Grouped Stage,则需要M个不同的ConnectorPartitionHandle实例,用来计算BucketID,
                            // 同时M == Buckets的数量,才能保存每一个BucketId都对应不同的分区。
                            checkState(connectorPartitionHandles.size() == nodePartitionMap.getBucketToPartition().length);
                        }
                        stageNodeList = nodePartitionMap.getPartitionToNode();
                        bucketNodeMap = nodePartitionMap.asBucketNodeMap();
                    }
                    // 在这种情况下,Buckets的数量是固定的,因此数据源的分区数量也是固定的,因此创建FixedSourcePartitionedScheduler实例
                    return new FixedSourcePartitionedScheduler(
                            stageExecution,
                            splitSources,
                            fragment.getStageExecutionDescriptor(),
                            schedulingOrder,
                            stageNodeList,
                            bucketNodeMap,
                            splitBatchSize,
                            getConcurrentLifespansPerNode(session),
                            nodeScheduler.createNodeSelector(session, catalogName),
                            connectorPartitionHandles,
                            dynamicFilterService,
                            tableExecuteContextManager);
                }
                else {
                    // all sources are remote
                    // 如果都是RemoteSources Plan Node,即要读取的数据来自上游的OutputBufers,
                    // 因此这里Partitions的数量,取决于上游,为当前的Stage创建分区任务的数量也是确定的
                    // 例如当Buckets数量 == Partitions数量 == Node数量时,同一个Stage的不同分区上的任务,会发送到不同的WorkerNode;
                    // 但如果Buckets数量多于node数量,一个Stage的多个分区可能会同时运行在一个Node上面
                    NodePartitionMap nodePartitionMap = partitioningCache.apply(partitioningHandle);
                    List<InternalNode> partitionToNode = nodePartitionMap.getPartitionToNode();
                    // todo this should asynchronously wait a standard timeout period before failing
                    checkCondition(!partitionToNode.isEmpty(), NO_NODES_AVAILABLE, "No worker nodes available");
                    return new FixedCountScheduler(stageExecution, partitionToNode);
                }
            }
        }
    }

DistributedStagesScheduler的调度

    private static class PipelinedDistributedStagesScheduler
            implements DistributedStagesScheduler
    {
        @Override
        public void schedule()
        {
            // 调度开始
            checkState(started.compareAndSet(false, true), "already started");

            try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
                while (!executionSchedule.isFinished()) {
                    List<ListenableFuture<Void>> blockedStages = new ArrayList<>();
                    // 获取要调度的Stages,默认配置下,会调度所有的Stages运行,而不考虑Stages之间的依赖
                    for (PipelinedStageExecution stageExecution : executionSchedule.getStagesToSchedule()) {
                        // 由StageExecution实例代理调度绑定的Stage执行
                        stageExecution.beginScheduling();

                        // perform some scheduling work,异步
                        ScheduleResult result = stageSchedulers.get(stageExecution.getStageId())
                                .schedule();

                        // modify parent and children based on the results of the scheduling
                        if (result.isFinished()) {
                            // 如果Stage完成了,那么就设置完成状态
                            stageExecution.schedulingComplete();
                        }
                        else if (!result.getBlocked().isDone()) {
                            // 如果Stage的状态为BLOCKED,可能是由于前置Stage还没有数据输出
                            blockedStages.add(result.getBlocked());
                        }
                        schedulerStats.getSplitsScheduledPerIteration().add(result.getSplitsScheduled());
                        if (result.getBlockedReason().isPresent()) {
                            switch (result.getBlockedReason().get()) {
                                case WRITER_SCALING:
                                    // no-op
                                    break;
                                case WAITING_FOR_SOURCE:
                                    schedulerStats.getWaitingForSource().update(1);
                                    break;
                                case SPLIT_QUEUES_FULL:
                                    schedulerStats.getSplitQueuesFull().update(1);
                                    break;
                                case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
                                case NO_ACTIVE_DRIVER_GROUP:
                                    break;
                                default:
                                    throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
                            }
                        }
                    }

                    // wait for a state change and then schedule again,如果还有被BLOCKED的Stage,则需要进行超时检测
                    if (!blockedStages.isEmpty()) {
                        try (TimeStat.BlockTimer timer = schedulerStats.getSleepTime().time()) {
                            tryGetFutureValue(whenAnyComplete(blockedStages), 1, SECONDS);
                        }
                        for (ListenableFuture<Void> blockedStage : blockedStages) {
                            blockedStage.cancel(true);
                        }
                    }
                }

                for (PipelinedStageExecution stageExecution : stageExecutions.values()) {
                    PipelinedStageExecution.State state = stageExecution.getState();
                    if (state != SCHEDULED && state != RUNNING && state != FLUSHING && !state.isDone()) {
                        throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Scheduling is complete, but stage %s is in state %s", stageExecution.getStageId(), state));
                    }
                }
            }
            catch (Throwable t) {
                fail(t, Optional.empty());
            }
            finally {
                RuntimeException closeError = new RuntimeException();
                for (StageScheduler scheduler : stageSchedulers.values()) {
                    try {
                        scheduler.close();
                    }
                    catch (Throwable t) {
                        fail(t, Optional.empty());
                        // Self-suppression not permitted
                        if (closeError != t) {
                            closeError.addSuppressed(t);
                        }
                    }
                }
            }
        }
    }

FixedSourcePartitionedScheduler::schedule()

GROUP_WIDE,Split的Life Cycle为Task Group级别,只会由中继Stage/PlanFragment的产生,其对应的Split用于读取上游已经被Partitioned的数据(因此可以简单地认为一个Group,就是一个Data Partition)。当有多个SourceScheduler调度Splits时,同一个Source上
的调度策略是按GroupId顺序高度,且同一个时刻只能调度一个Group的Splits执行;而其它SourceScheduler可以并行地调度不同的Group。

TASK_WIDE,Split的Life Cycle为Task级别,可以先简单地认为就是Table Scan Stage执行时的Split的LifeCycle。每一个SourceScheduler之间互相不影响,只看当前SqlTask的剩余资源来决定是否要调度新的Splits。

public class FixedSourcePartitionedScheduler
        implements StageScheduler
{
    @Override
    public ScheduleResult schedule()
    {
        // schedule a task on every node in the distribution
        List<RemoteTask> newTasks = ImmutableList.of();
        if (scheduledTasks.isEmpty()) { // 如果这个Stage还没有调度过任务,就为所有的分区创建RemoteTask任务
            ImmutableList.Builder<RemoteTask> newTasksBuilder = ImmutableList.builder();
            for (InternalNode node : nodes) { // 遍历当前Stage所有可用的Worker Nodes
                // 一个Node,就对应唯一一个分区
                Optional<RemoteTask> task = stageExecution.scheduleTask(node, partitionIdAllocator.getNextId(), ImmutableMultimap.of(), ImmutableMultimap.of());
                if (task.isPresent()) {
                    scheduledTasks.put(node, task.get());
                    newTasksBuilder.add(task.get());
                }
            }
            newTasks = newTasksBuilder.build();
        }

        boolean allBlocked = true;
        List<ListenableFuture<Void>> blocked = new ArrayList<>();
        BlockedReason blockedReason = BlockedReason.NO_ACTIVE_DRIVER_GROUP;

        if (groupedLifespanScheduler.isPresent()) {
            // Start new driver groups on the first scheduler if necessary,
            // i.e. when previous ones have finished execution (not finished scheduling).
            //
            // Invoke schedule method to get a new SettableFuture every time.
            // Reusing previously returned SettableFuture could lead to the ListenableFuture retaining too many listeners.
            blocked.add(groupedLifespanScheduler.get().schedule(sourceSchedulers.get(0)));
        }

        int splitsScheduled = 0;
        // SourceSchedulers保存了每一个Source的调度器实例,即SourcePartitionedScheduler实例,它们负责调度各自的Splits
        Iterator<SourceScheduler> schedulerIterator = sourceSchedulers.iterator();
        List<Lifespan> driverGroupsToStart = ImmutableList.of();
        boolean shouldInvokeNoMoreDriverGroups = false;
        while (schedulerIterator.hasNext()) {
            SourceScheduler sourceScheduler = schedulerIterator.next();
            // 如果是分组调度,意味着底层调度Splits的策略是按分组来的,只有当一个SourceScheduler某个分组的Splits调度完成了,
            // 下一个SourceScheduler才能调度相应分组的Splits,而其它的分组被BLOCKED,真正第一个SourceScheduler又完成有其它分组上的调度
            for (Lifespan lifespan : driverGroupsToStart) {
                sourceScheduler.startLifespan(lifespan, partitionHandleFor(lifespan));
            }
            if (shouldInvokeNoMoreDriverGroups) {
                sourceScheduler.noMoreLifespans();
            }
            // 调用FixedSourcePartitionedScheduler::schedule()方法
            ScheduleResult schedule = sourceScheduler.schedule();
            // 累加当前Stage总共调度的Splits数量
            splitsScheduled += schedule.getSplitsScheduled();
            if (schedule.getBlockedReason().isPresent()) {
                blocked.add(schedule.getBlocked());
                blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
            }
            else {
                verify(schedule.getBlocked().isDone(), "blockedReason not provided when scheduler is blocked");
                allBlocked = false;
            }
            // 如果SourceScheduler instanceOf AsGroupedSourceScheduler,那么drainCompletedLifespans()方法总是会返回对应的LifeSpan对象
            driverGroupsToStart = sourceScheduler.drainCompletedLifespans();

            if (schedule.isFinished()) {
                stageExecution.schedulingComplete(sourceScheduler.getPlanNodeId());
                schedulerIterator.remove();
                sourceScheduler.close();
                shouldInvokeNoMoreDriverGroups = true;
            }
            else {
                shouldInvokeNoMoreDriverGroups = false;
            }
        }

        if (allBlocked) {
            // 如果所有的SourcePartitionedScheduler被BLOCKED了,那么就返回Blocked信息
            return new ScheduleResult(sourceSchedulers.isEmpty(), newTasks, whenAnyComplete(blocked), blockedReason, splitsScheduled);
        }
        else {
            // 有正在运行的SourcePartitionedScheduler,就返回已经调度的Splits信息
            return new ScheduleResult(sourceSchedulers.isEmpty(), newTasks, splitsScheduled);
        }
    }
}
SourcePartitionedScheduler::schedule()

最底层的Splits调度器,负责调度、执行SOURCE类型的Stage。

public class SourcePartitionedScheduler
        implements SourceScheduler
{
    @Override
    public synchronized ScheduleResult schedule()
    {
        dropListenersFromWhenFinishedOrNewLifespansAdded();

        int overallSplitAssignmentCount = 0;
        ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
        List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
        boolean anyBlockedOnPlacements = false;
        boolean anyBlockedOnNextSplitBatch = false;
        boolean anyNotBlocked = false;
        // 遍历每一个ScheduleGroup实例,一个ScheduleGroup对应了
        // 
        for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
            Lifespan lifespan = entry.getKey();
            ScheduleGroup scheduleGroup = entry.getValue();
            Set<Split> pendingSplits = scheduleGroup.pendingSplits;

            if (scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS || scheduleGroup.state == ScheduleGroupState.DONE) {
                verify(scheduleGroup.nextSplitBatchFuture == null);
            }
            else if (pendingSplits.isEmpty()) {
                // try to get the next batch
                if (scheduleGroup.nextSplitBatchFuture == null) {
                    // 实际上是通过IcebergConnectorSplitSource获取下一批要调度处理的Splits,
                    // 注意通过splitBatchSize - pendingSplits.size()限制了最大被调度的Splits数量
                    scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());

                    long start = System.nanoTime();
                    addSuccessCallback(scheduleGroup.nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start));
                }

                if (scheduleGroup.nextSplitBatchFuture.isDone()) {
                    // 如果nextSplitBatchFuture完成,意味着拿到了Splits实例,因此就可以立即调度了
                    SplitBatch nextSplits = getFutureValue(scheduleGroup.nextSplitBatchFuture);
                    scheduleGroup.nextSplitBatchFuture = null;
                    // 将所有的Splits添加到等待队列中
                    pendingSplits.addAll(nextSplits.getSplits());
                    if (nextSplits.isLastBatch()) {
                        // 如果是最后一批要调度的Splits,则追加一个EmptySplit的实例,以便通知Worker Node上的SqlTask任务停止运行
                        if (scheduleGroup.state == ScheduleGroupState.INITIALIZED && pendingSplits.isEmpty()) {
                            // Add an empty split in case no splits have been produced for the source.
                            // For source operators, they never take input, but they may produce output.
                            // This is well handled by the execution engine.
                            // However, there are certain non-source operators that may produce output without any input,
                            // for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is ().
                            // Scheduling an empty split kicks off necessary driver instantiation to make this work.
                            pendingSplits.add(new Split(
                                    splitSource.getCatalogName(),
                                    new EmptySplit(splitSource.getCatalogName()),
                                    lifespan));
                        }
                        // 通知当前的SourceScheduler,不需要再调度了
                        scheduleGroup.state = ScheduleGroupState.NO_MORE_SPLITS;
                    }
                }
                else {
                    overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);
                    anyBlockedOnNextSplitBatch = true;
                    continue;
                }
            }

            Multimap<InternalNode, Split> splitAssignment = ImmutableMultimap.of();
            if (!pendingSplits.isEmpty()) {
                if (!scheduleGroup.placementFuture.isDone()) {
                    anyBlockedOnPlacements = true;
                    continue;
                }

                if (scheduleGroup.state == ScheduleGroupState.INITIALIZED) {
                    scheduleGroup.state = ScheduleGroupState.SPLITS_ADDED;
                }
                if (state == State.INITIALIZED) {
                    state = State.SPLITS_ADDED;
                }

                // calculate placements for splits,为每一个Split计算应该被分发到哪个Worker Node
                SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
                splitAssignment = splitPlacementResult.getAssignments();

                // remove splits with successful placements
                splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
                overallSplitAssignmentCount += splitAssignment.size();

                // if not completed placed, mark scheduleGroup as blocked on placement
                if (!pendingSplits.isEmpty()) {
                    scheduleGroup.placementFuture = splitPlacementResult.getBlocked();
                    overallBlockedFutures.add(scheduleGroup.placementFuture);
                    anyBlockedOnPlacements = true;
                }
            }

            // if no new splits will be assigned, update state and attach completion event
            Multimap<InternalNode, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();
            if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {
                scheduleGroup.state = ScheduleGroupState.DONE;
                if (!lifespan.isTaskWide()) {
                    InternalNode node = ((BucketedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
                    noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);
                }
            }

            // assign the splits with successful placements
            overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));

            // Assert that "placement future is not done" implies "pendingSplits is not empty".
            // The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
            // However, there are other reasons that could lead to this.
            // Note that `computeAssignments` is quite broken:
            // 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
            // 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
            // As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
            if (scheduleGroup.nextSplitBatchFuture == null && scheduleGroup.pendingSplits.isEmpty() && scheduleGroup.state != ScheduleGroupState.DONE) {
                anyNotBlocked = true;
            }
        }

        // * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
        //   If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
        // * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
        //   * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
        //     which may contain recently published splits. We must not ignore those.
        //   * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.
        //     Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.
        if ((state == State.NO_MORE_SPLITS || state == State.FINISHED) || (noMoreScheduleGroups && scheduleGroups.isEmpty() && splitSource.isFinished())) {
            switch (state) {
                case INITIALIZED:
                    // We have not scheduled a single split so far.
                    // But this shouldn't be possible. See usage of EmptySplit in this method.
                    throw new IllegalStateException("At least 1 split should have been scheduled for this plan node");
                case SPLITS_ADDED:
                    state = State.NO_MORE_SPLITS;

                    Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();

                    // Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split source.
                    // TODO support grouped execution
                    tableExecuteSplitsInfo.ifPresent(info -> {
                        TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stageExecution.getStageId().getQueryId());
                        tableExecuteContext.setSplitsInfo(info);
                    });

                    splitSource.close();
                    // fall through
                case NO_MORE_SPLITS:
                    state = State.FINISHED;
                    whenFinishedOrNewLifespanAdded.set(null);
                    // fall through
                case FINISHED:
                    splitSource.getMetrics().ifPresent(stageExecution::updateConnectorMetrics);
                    return new ScheduleResult(
                            true,
                            overallNewTasks.build(),
                            overallSplitAssignmentCount);
            }
            throw new IllegalStateException("Unknown state");
        }

        if (anyNotBlocked) {
            return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
        }

        if (anyBlockedOnNextSplitBatch
                && scheduledTasks.isEmpty()
                && dynamicFilterService.isCollectingTaskNeeded(stageExecution.getStageId().getQueryId(), stageExecution.getFragment())) {
            // schedule a task for collecting dynamic filters in case probe split generator is waiting for them
            createTaskOnRandomNode().ifPresent(overallNewTasks::add);
        }

        boolean anySourceTaskBlocked = this.anySourceTaskBlocked.getAsBoolean();
        if (anySourceTaskBlocked) {
            // Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.
            // In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock.
            dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment());
        }

        if (groupedExecution) {
            overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
        }
        else if (anyBlockedOnPlacements && anySourceTaskBlocked) {
            // In a broadcast join, output buffers of the tasks in build source stage have to
            // hold onto all data produced before probe side task scheduling finishes,
            // even if the data is acknowledged by all known consumers. This is because
            // new consumers may be added until the probe side task scheduling finishes.
            //
            // As a result, the following line is necessary to prevent deadlock
            // due to neither build nor probe can make any progress.
            // The build side blocks due to a full output buffer.
            // In the meantime the probe side split cannot be consumed since
            // builder side hash table construction has not finished.
            overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
        }

        ScheduleResult.BlockedReason blockedReason;
        if (anyBlockedOnNextSplitBatch) {
            blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
        }
        else {
            blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
        }

        overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);
        return new ScheduleResult(
                false,
                overallNewTasks.build(),
                nonCancellationPropagating(asVoid(whenAnyComplete(overallBlockedFutures))),
                blockedReason,
                overallSplitAssignmentCount);
    }
}

SqlTask的创建

SqlTask,运行在Worker Node上,每一个SqlTask对应 一个Stage中的一个分区,它负责处理这个分区上的所有Splits。
客户端通过/v1/task/{taskId},请求对应的Worker Node创建相应的任务实例。

    @ResourceSecurity(INTERNAL_ONLY)
    @POST
    @Path("{taskId}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public void createOrUpdateTask(
            @PathParam("taskId") TaskId taskId,
            TaskUpdateRequest taskUpdateRequest,
            @Context UriInfo uriInfo,
            @Suspended AsyncResponse asyncResponse)
    {
        requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");

        Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());

        if (injectFailure(session.getTraceToken(), taskId, RequestType.CREATE_OR_UPDATE_TASK, asyncResponse)) {
            return;
        }
        // 创建任务
        TaskInfo taskInfo = taskManager.updateTask(session,
                taskId,
                taskUpdateRequest.getFragment(),
                taskUpdateRequest.getSources(),
                taskUpdateRequest.getOutputIds(),
                taskUpdateRequest.getDynamicFilterDomains());

        if (shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }

        asyncResponse.resume(Response.ok().entity(taskInfo).build());
    }

SqlTaskManager::updateTask

public class SqlTaskManager
        implements TaskManager, Closeable
{
    private final LoadingCache<TaskId, SqlTask> tasks = CacheBuilder.newBuilder().build(CacheLoader.from(
                taskId -> createSqlTask(
                        taskId,
                        locationFactory.createLocalTaskLocation(taskId),
                        nodeInfo.getNodeId(),
                        queryContexts.getUnchecked(taskId.getQueryId()),
                        sqlTaskExecutionFactory,
                        taskNotificationExecutor,
                        sqlTask -> finishedTaskStats.merge(sqlTask.getIoStats()),
                        maxBufferSize,
                        maxBroadcastBufferSize,
                        failedTasks)));
                        
    @Override
    public TaskInfo updateTask(
            Session session,
            TaskId taskId,
            Optional<PlanFragment> fragment,
            List<TaskSource> sources,
            OutputBuffers outputBuffers,
            Map<DynamicFilterId, Domain> dynamicFilterDomains)
    {
        try {
            return versionEmbedder.embedVersion(() -> doUpdateTask(session, taskId, fragment, sources, outputBuffers, dynamicFilterDomains)).call();
        }
        catch (Exception e) {
            throwIfUnchecked(e);
            // impossible, doUpdateTask does not throw checked exceptions
            throw new RuntimeException(e);
        }
    }

    private TaskInfo doUpdateTask(
            Session session,
            TaskId taskId,
            Optional<PlanFragment> fragment,
            List<TaskSource> sources,
            OutputBuffers outputBuffers,
            Map<DynamicFilterId, Domain> dynamicFilterDomains)
    {
        requireNonNull(session, "session is null");
        requireNonNull(taskId, "taskId is null");
        requireNonNull(fragment, "fragment is null");
        requireNonNull(sources, "sources is null");
        requireNonNull(outputBuffers, "outputBuffers is null");

        SqlTask sqlTask = tasks.getUnchecked(taskId); // 创建一个新的SqlTask实例
        QueryContext queryContext = sqlTask.getQueryContext();
        if (!queryContext.isMemoryLimitsInitialized()) {
            // 如果限制了当前Query运行时的内存,则需要更新相关的属性
            long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();
            long sessionQueryTotalMaxMemoryPerNode = getQueryMaxTotalMemoryPerNode(session).toBytes();
            // Session properties are only allowed to decrease memory limits, not increase them
            queryContext.initializeMemoryLimits(
                    resourceOvercommit(session),
                    min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode),
                    min(sessionQueryTotalMaxMemoryPerNode, queryMaxTotalMemoryPerNode));
        }
        // 更新SqlTask的心跳信息,实际上就是系统当前的时间
        // 每一个SqlTask的心跳信息,都会在查找或更新时,被更新,以保证能够根据上一次的心跳时间,判断它是不是失联了
        sqlTask.recordHeartbeat();
        // 更新SqlTask实例运行时的参数
        return sqlTask.updateTask(session, fragment, sources, outputBuffers, dynamicFilterDomains);
    }

SqlTask的更新

创建SqlTask实例时,或是Coordinator调了新的Splits时,会执行更新过程。

public class SqlTask
{
    /**
     * 此方法的所有参数,都来自客户端发送的TaskUpdateRequest的对象,因此在生成Worker端的执行任务时,此Fragment上的输入(Split)、
     * 输出(outputBuffers)都已经确定了。
     * session: 保存了Sql执行时的客户端侧会话信息
     * fragment: 当前SqlTask要执行的逻辑计划片段
     * sources: 当前SqlTask要读取的数据源的Split描述信息,这些Split要么读取remote source,要么读取table。
     * outputBuffers: SqlTask的输出缓存区队列,在Coordinator侧创建PipelinedStageExecution实例时就已经被确定了,一共有如下三种类型:
     *     BroadcastOutputBufferManager:广播输出数据,只有一个Partition,因此只有一个buffer
     *     ScaledOutputBufferManager:动态扩展输出Buffer的数量,因此buffers数量是不固定的,被用于写出数据的任务
     *     PartitionedOutputBufferManager:按分区数量创建相同数量的outputBuffer,因此每一个Buffer都对应一个分区ID,供下游Stage
     *                                     消费。在当前的执行流程分析场景下,用到的是此类的实例。
     */
    public TaskInfo updateTask(
            Session session,
            Optional<PlanFragment> fragment,
            List<TaskSource> sources,
            OutputBuffers outputBuffers,
            Map<DynamicFilterId, Domain> dynamicFilterDomains)
    {
        try {
            // trace token must be set first to make sure failure injection for getTaskResults requests works as expected
            session.getTraceToken().ifPresent(traceToken::set);

            // The LazyOutput buffer does not support write methods, so the actual
            // output buffer must be established before drivers are created (e.g.
            // a VALUES query).
            outputBuffer.setOutputBuffers(outputBuffers);

            // assure the task execution is only created once
            SqlTaskExecution taskExecution;
            synchronized (this) {
                // is task already complete?
                TaskHolder taskHolder = taskHolderReference.get();
                if (taskHolder.isFinished()) {
                    return taskHolder.getFinalTaskInfo();
                }
                taskExecution = taskHolder.getTaskExecution();
                if (taskExecution == null) {
                    checkState(fragment.isPresent(), "fragment must be present");
                    // 创建SqlTaskExecution实例,负责在当前的Worker结点分析和执行fragment    
                    taskExecution = sqlTaskExecutionFactory.create(
                            session,
                            queryContext,
                            taskStateMachine,
                            outputBuffer,
                            fragment.get(),
                            this::notifyStatusChanged);
                    taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
                    needsPlan.set(false);
                }
            }

            if (taskExecution != null) {
                // 一旦发现taskExecution实例,就将要处理的数据源Splits添加到等待队列中
                taskExecution.addSources(sources);
                // 同时更新dynamicFilter产生的(可以在处理Split时用于过滤数据的值集合)。
                taskExecution.getTaskContext().addDynamicFilter(dynamicFilterDomains);
            }
        }
        catch (Error e) {
            failed(e);
            throw e;
        }
        catch (RuntimeException e) {
            failed(e);
        }

        return getTaskInfo();
    }
}
SqlTaskExecution的创建

创建此实例时,会在创建过程中,生成真正可执行的物理执行计划实例LocalExecutionPlan。

public class SqlTaskExecutionFactory
{
    public SqlTaskExecution create(
            Session session,
            QueryContext queryContext,
            TaskStateMachine taskStateMachine,
            OutputBuffer outputBuffer,
            PlanFragment fragment,
            Runnable notifyStatusChanged)
    {
        // 创建TaskContext实例,维护了当前SqlTask运行时的各种信息,例如各种metrics
        TaskContext taskContext = queryContext.addTaskContext(
                taskStateMachine,
                session,
                notifyStatusChanged,
                perOperatorCpuTimerEnabled,
                cpuTimerEnabled);

        LocalExecutionPlan localExecutionPlan;
        try (SetThreadName ignored = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {
            try {
                // planner是一个LocalExecutionPlanner类型的实例,用于将逻辑计划PlanFragment转换成本地可执行的
                // 物理执行计划LocalExecutionPlan
                localExecutionPlan = planner.plan(
                        taskContext,
                        fragment.getRoot(),
                        TypeProvider.copyOf(fragment.getSymbols()),
                        fragment.getPartitioningScheme(),
                        fragment.getStageExecutionDescriptor(),
                        fragment.getPartitionedSources(),
                        outputBuffer);
            }
            catch (Throwable e) {
                // planning failed
                taskStateMachine.failed(e);
                throwIfUnchecked(e);
                throw new RuntimeException(e);
            }
        }
        return createSqlTaskExecution(
                taskStateMachine,
                taskContext,
                outputBuffer,
                localExecutionPlan,
                taskExecutor,
                taskNotificationExecutor,
                splitMonitor);
    }
}
LocalExecutionPlanner::plan

将PlanFragment转换成本地可执行的物理计划LocalExecutionPlan。

public class LocalExecutionPlanner
{
    public LocalExecutionPlan plan(
            TaskContext taskContext,
            PlanNode plan,
            TypeProvider types,
            PartitioningScheme partitioningScheme,
            StageExecutionDescriptor stageExecutionDescriptor,
            List<PlanNodeId> partitionedSourceOrder,
            OutputBuffer outputBuffer)
    {
        // 得到当前Fragment的输出布局(layout)
        List<Symbol> outputLayout = partitioningScheme.getOutputLayout();
        if (partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION) ||
                partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) ||
                partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION) ||
                partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) ||
                partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) {
            // 由于数据是基于Partition的,因此跳过
            return plan(taskContext, stageExecutionDescriptor, plan, outputLayout, types, partitionedSourceOrder, new TaskOutputFactory(outputBuffer));
        }

        // We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixed
        List<Integer> partitionChannels;
        List<Optional<NullableValue>> partitionConstants;
        List<Type> partitionChannelTypes;
        if (partitioningScheme.getHashColumn().isPresent()) {
            partitionChannels = ImmutableList.of(outputLayout.indexOf(partitioningScheme.getHashColumn().get()));
            partitionConstants = ImmutableList.of(Optional.empty());
            partitionChannelTypes = ImmutableList.of(BIGINT);
        }
        else {
            // 收集分区列的下标。对于常量分区值,则赋值-1
            partitionChannels = partitioningScheme.getPartitioning().getArguments().stream()
                    .map(argument -> {
                        if (argument.isConstant()) {
                            return -1;
                        }
                        return outputLayout.indexOf(argument.getColumn());
                    })
                    .collect(toImmutableList());
            // 收集分区常量值
            partitionConstants = partitioningScheme.getPartitioning().getArguments().stream()
                    .map(argument -> {
                        if (argument.isConstant()) {
                            return Optional.of(argument.getConstant());
                        }
                        return Optional.<NullableValue>empty();
                    })
                    .collect(toImmutableList());
            // 收集分区字段的类型
            partitionChannelTypes = partitioningScheme.getPartitioning().getArguments().stream()
                    .map(argument -> {
                        if (argument.isConstant()) {
                            return argument.getConstant().getType();
                        }
                        return types.get(argument.getColumn());
                    })
                    .collect(toImmutableList());
        }
        // 得到计算分区ID的函数,一般地,执行Read SQL时,它是一个BucketPartitionFunction的实例
        // PartitionFunction提供了计算分区ID的方法,getPartition(Page page, int position),即给定一个数据页中的某一行数据,
        // 而计算PartitionId的算法,一共内置如下几类:
        //    SINGLE: Single partition can only have one bucket
        //    HASH: HashBucketFunction,根据分区字段,计算得到HASH值,后面再将HASH值对partitions数量取余得到某行数据对应的分区ID
        //    ROUND_ROBIN: 净某一行数据以顺序遍历地方式,分区分区ID
        PartitionFunction partitionFunction = nodePartitioningManager.getPartitionFunction(taskContext.getSession(), partitioningScheme, partitionChannelTypes);
        OptionalInt nullChannel = OptionalInt.empty();
        Set<Symbol> partitioningColumns = partitioningScheme.getPartitioning().getColumns();

        // partitioningColumns expected to have one column in the normal case, and zero columns when partitioning on a constant
        // 对于常量分区,则不需要额外的列;对于指定了分区字段的情况,则需要一个额外的分区列,来保存每一行的分区值(例如对于HASH分区算法,存储
        // 的是HASH值)。
        checkArgument(!partitioningScheme.isReplicateNullsAndAny() || partitioningColumns.size() <= 1);
        if (partitioningScheme.isReplicateNullsAndAny() && partitioningColumns.size() == 1) {
            nullChannel = OptionalInt.of(outputLayout.indexOf(getOnlyElement(partitioningColumns)));
        }
        
        return plan(
                taskContext,
                stageExecutionDescriptor,
                plan,
                outputLayout,
                types,
                partitionedSourceOrder,
                // 创建一个PartitionedOutputFactory类型的实例,它提供了创建PartitionedOutputOperator实例的方法。
                // 而PartitionedOutputOperator是此PlanFragment上一系列执行算子的最后一个,负责对Data Page按PartitioningColumns
                // 计算PartitionID,并扔进OutputBuffer中。
                operatorFactories.partitionedOutput(
                        taskContext,
                        partitionFunction,
                        partitionChannels,
                        partitionConstants,
                        partitioningScheme.isReplicateNullsAndAny(),
                        nullChannel,
                        outputBuffer,
                        maxPagePartitioningBufferSize));
    }

    public LocalExecutionPlan plan(
            TaskContext taskContext,
            StageExecutionDescriptor stageExecutionDescriptor,
            PlanNode plan, // PlanFragment的根结点,对应于此逻辑子计划的最上层Node
            List<Symbol> outputLayout, // 此PlanFragment的结果的布局信息,实际上就是要输出的列符号
            TypeProvider types,        // 用于描述 每一个Symbol的类型
            List<PlanNodeId> partitionedSourceOrder, // 保存了所有要输出的Source源的PlanNodeId,例如JOIN,有左、右两个Source
            OutputFactory outputOperatorFactory)
    {
        Session session = taskContext.getSession();
        // 保存了本地物理执行计划运行时的上下文信息
        LocalExecutionPlanContext context = new LocalExecutionPlanContext(taskContext, types);
        // 从PlanFragment的根逻辑计划结点plan开始访问,构建物理执行计划树,实际上就是一组作用于Split之上的Operators(Driver)
        PhysicalOperation physicalOperation = plan.accept(new Visitor(session, stageExecutionDescriptor), context);
        // 对齐逻辑计划的outputLayout和物理执行计划的输出layoutput
        Function<Page, Page> pagePreprocessor = enforceLoadedLayoutProcessor(outputLayout, physicalOperation.getLayout());
        // 收集逻辑逻辑计划的输出字段的类型
        List<Type> outputTypes = outputLayout.stream()
                .map(types::get)
                .collect(toImmutableList());
        // 创建一个新的Driver。需要将OutputOperator与physicalOperaton串联到一个物理执行流水线中,分配一个新的PipelineId。
        // 其中physicalOperaton作为流水线的Source Operator,而OutputOperator作为流水线中的Output Operator。
        context.addDriverFactory(
                context.isInputDriver(),
                true, // 标识新的Driver的类型为,Output
                new PhysicalOperation(
                        outputOperatorFactory.createOutputOperator(
                                context.getNextOperatorId(),
                                plan.getId(),
                                outputTypes,
                                pagePreprocessor,
                                new PagesSerdeFactory(plannerContext.getBlockEncodingSerde(), isExchangeCompressionEnabled(session))),
                        physicalOperation),
                context.getDriverInstanceCount());

        // notify operator factories that planning has completed
        context.getDriverFactories().stream()
                .map(DriverFactory::getOperatorFactories)
                .flatMap(List::stream)
                .filter(LocalPlannerAware.class::isInstance)
                .map(LocalPlannerAware.class::cast)
                .forEach(LocalPlannerAware::localPlannerComplete);

        return new LocalExecutionPlan(context.getDriverFactories(), partitionedSourceOrder, stageExecutionDescriptor);
    }
}
SqlTaskExecution的构建
    private SqlTaskExecution(
            TaskStateMachine taskStateMachine,
            TaskContext taskContext,
            OutputBuffer outputBuffer,
            LocalExecutionPlan localExecutionPlan,
            TaskExecutor taskExecutor,
            SplitMonitor splitMonitor,
            Executor notificationExecutor)
    {
        this.taskStateMachine = requireNonNull(taskStateMachine, "taskStateMachine is null");
        this.taskId = taskStateMachine.getTaskId();
        this.taskContext = requireNonNull(taskContext, "taskContext is null");
        this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null");
        this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor is null");
        this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
        this.splitMonitor = requireNonNull(splitMonitor, "splitMonitor is null");

        try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
            // index driver factories
            // 从执行计划,得到Source结点ID
            Set<PlanNodeId> partitionedSources = ImmutableSet.copyOf(localExecutionPlan.getPartitionedSourceOrder());
            // 保存所有生成周期为Split级别的DriverSplitRunnerFactory实例
            ImmutableMap.Builder<PlanNodeId, DriverSplitRunnerFactory> driverRunnerFactoriesWithSplitLifeCycle = ImmutableMap.builder();
            // 保存所有生命周期为Task级别的DriverSplitRunnerFactory的实例
            ImmutableList.Builder<DriverSplitRunnerFactory> driverRunnerFactoriesWithTaskLifeCycle = ImmutableList.builder();
            // 保存所有生命周期为Group级别的DriverSplitRunnerFactory实例
            ImmutableList.Builder<DriverSplitRunnerFactory> driverRunnerFactoriesWithDriverGroupLifeCycle = ImmutableList.builder();
            for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
                // 获取当前Driver的最上游的PlanNodeId
                Optional<PlanNodeId> sourceId = driverFactory.getSourceId();
                if (sourceId.isPresent() && partitionedSources.contains(sourceId.get())) {
                    // 如果这个Driver有输入,同时是一个分区类型的Source Node,那么这个Driver的生命周期就是与Split绑定的,即
                    // 绑定的Split被处理完,那么这个Driver就没用了。
                    driverRunnerFactoriesWithSplitLifeCycle.put(sourceId.get(), new DriverSplitRunnerFactory(driverFactory, true));
                }
                else {
                    // 如果这个Driver是一个下游的Driver实例,
                    switch (driverFactory.getPipelineExecutionStrategy()) {
                        case GROUPED_EXECUTION:
                            // 如果是GROUP LifeSpan,那么就需要每一个Drvier创建一个Runner Factory,添加到相应的等待队列中
                            driverRunnerFactoriesWithDriverGroupLifeCycle.add(new DriverSplitRunnerFactory(driverFactory, false));
                            break;
                        case UNGROUPED_EXECUTION:
                            // 如果是GROUP LifeSpan,那么就需要每一个Drvier创建一个Runner
                            driverRunnerFactoriesWithTaskLifeCycle.add(new DriverSplitRunnerFactory(driverFactory, false));
                            break;
                        default:
                            throw new UnsupportedOperationException();
                    }
                }
            }
            this.driverRunnerFactoriesWithSplitLifeCycle = driverRunnerFactoriesWithSplitLifeCycle.build();
            this.driverRunnerFactoriesWithDriverGroupLifeCycle = driverRunnerFactoriesWithDriverGroupLifeCycle.build();
            this.driverRunnerFactoriesWithTaskLifeCycle = driverRunnerFactoriesWithTaskLifeCycle.build();

            this.pendingSplitsByPlanNode = this.driverRunnerFactoriesWithSplitLifeCycle.keySet().stream()
                    .collect(toImmutableMap(identity(), ignore -> new PendingSplitsForPlanNode()));
            this.status = new Status(
                    taskContext,
                    localExecutionPlan.getDriverFactories().stream()
                            .collect(toImmutableMap(DriverFactory::getPipelineId, DriverFactory::getPipelineExecutionStrategy)));
            this.schedulingLifespanManager = new SchedulingLifespanManager(localExecutionPlan.getPartitionedSourceOrder(), localExecutionPlan.getStageExecutionDescriptor(), this.status);

            checkArgument(this.driverRunnerFactoriesWithSplitLifeCycle.keySet().equals(partitionedSources),
                    "Fragment is partitioned, but not all partitioned drivers were found");

            // Pre-register Lifespans for ungrouped partitioned drivers in case they end up get no splits.
            for (Entry<PlanNodeId, DriverSplitRunnerFactory> entry : this.driverRunnerFactoriesWithSplitLifeCycle.entrySet()) {
                PlanNodeId planNodeId = entry.getKey();
                DriverSplitRunnerFactory driverSplitRunnerFactory = entry.getValue();
                if (driverSplitRunnerFactory.getPipelineExecutionStrategy() == UNGROUPED_EXECUTION) {
                    this.schedulingLifespanManager.addLifespanIfAbsent(Lifespan.taskWide());
                    this.pendingSplitsByPlanNode.get(planNodeId).getLifespan(Lifespan.taskWide());
                }
            }

            // don't register the task if it is already completed (most likely failed during planning above)
            if (!taskStateMachine.getState().isDone()) {
                taskHandle = createTaskHandle(taskStateMachine, taskContext, outputBuffer, localExecutionPlan, taskExecutor);
            }
            else {
                taskHandle = null;
            }
            // 追加一个Listener,当前Outputbuffer处于FINISHED状态时,检查当前的SqkTaskExecution是否完成了。
            outputBuffer.addStateChangeListener(new CheckTaskCompletionOnBufferFinish(SqlTaskExecution.this));
        }
    }
SqlTaskExecution::addSources

addSources()方法,用于将客户端(Coordinator)发送的新Splits,按类型添加到相应调度队列中,并尝试调度之。

public class SqlTaskExecution
{
    public void addSources(List<TaskSource> sources)
    {
        requireNonNull(sources, "sources is null");
        checkState(!Thread.holdsLock(this), "Cannot add sources while holding a lock on the %s", getClass().getSimpleName());

        try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
            // update our record of sources and schedule drivers for new partitioned splits
            // 返回的updatedUnpartitionedSources集合,包含了所有非分区类型的、未处理完成的Splits
            // 而分区类型的Splits则通过SqlTaskExecution::schedulePartitionedSource(..)方法被调度
            Map<PlanNodeId, TaskSource> updatedUnpartitionedSources = updateSources(sources);

            // 调度所有的非分区类型的Splits
            // tell existing drivers about the new splits; it is safe to update drivers
            // multiple times and out of order because sources contain full record of
            // the unpartitioned splits
            for (WeakReference<Driver> driverReference : drivers) {
                Driver driver = driverReference.get();
                // the driver can be GCed due to a failure or a limit
                if (driver == null) {
                    // remove the weak reference from the list to avoid a memory leak
                    // NOTE: this is a concurrent safe operation on a CopyOnWriteArrayList
                    drivers.remove(driverReference);
                    continue;
                }
                Optional<PlanNodeId> sourceId = driver.getSourceId();
                if (sourceId.isEmpty()) {
                    continue;
                }
                TaskSource sourceUpdate = updatedUnpartitionedSources.get(sourceId.get());
                if (sourceUpdate == null) {
                    continue;
                }
                driver.updateSource(sourceUpdate);
            }

            // we may have transitioned to no more splits, so check for completion
            checkTaskCompletion();
        }
    }
}

SqlTask的调度&执行

SqlTaskExecution::schedulePartitionedSource

SqlTask每收到新的Splits,就调用schedulePartitionedSource(TaskSource)方法调度Splits。

    private synchronized void schedulePartitionedSource(TaskSource sourceUpdate)
    {
        mergeIntoPendingSplits(sourceUpdate.getPlanNodeId(), sourceUpdate.getSplits(), sourceUpdate.getNoMoreSplitsForLifespan(), sourceUpdate.isNoMoreSplits());

        while (true) {
            // SchedulingLifespanManager tracks how far each Lifespan has been scheduled. Here is an example.
            // Let's say there are 4 source pipelines/nodes: A, B, C, and D, in scheduling order.
            // And we're processing 3 concurrent lifespans at a time. In this case, we could have
            //
            // * Lifespan 10:  A   B  [C]  D; i.e. Pipeline A and B has finished scheduling (but not necessarily finished running).
            // * Lifespan 20: [A]  B   C   D
            // * Lifespan 30:  A  [B]  C   D
            //
            // To recap, SchedulingLifespanManager records the next scheduling source node for each lifespan.
            // schedulingLifespanManager维护了两种类型的LifeSpan:
            //   Task Wide:Split的运行时辐射范围对应于split/task lifecycle,与Group的Source Pipeline是互斥的,
            //              同一时刻只能有一个task wide的pipeline和一个group wide的pipeline并行调度执行。
            //   Task Group Wide:Split的运行辐射范围对应于Driver Group lifecycle,如果有多个Source Pipeline,那么对于
            //                    相同的Group(就是一个Partition),同一时刻只能有一个在调度&执行的Source Pipeline;对于
            //                    不同的Group,可以并行调度。
            //
            // 获取还需要调度执行的LifeSpan,调度属于这个范围内的Splits执行。
            Iterator<SchedulingLifespan> activeLifespans = schedulingLifespanManager.getActiveLifespans();

            boolean madeProgress = false;

            while (activeLifespans.hasNext()) {
                SchedulingLifespan schedulingLifespan = activeLifespans.next();
                Lifespan lifespan = schedulingLifespan.getLifespan();

                // Continue using the example from above. Let's say the sourceUpdate adds some new splits for source node B.
                //
                // For lifespan 30, it could start new drivers and assign a pending split to each.
                // Pending splits could include both pre-existing pending splits, and the new ones from sourceUpdate.
                // If there is enough driver slots to deplete pending splits, one of the below would happen.
                // * If it is marked that all splits for node B in lifespan 30 has been received, SchedulingLifespanManager
                //   will be updated so that lifespan 30 now processes source node C. It will immediately start processing them.
                // * Otherwise, processing of lifespan 30 will be shelved for now.
                //
                // It is possible that the following loop would be a no-op for a particular lifespan.
                // It is also possible that a single lifespan can proceed through multiple source nodes in one run.
                //
                // When different drivers in the task has different pipelineExecutionStrategy, it adds additional complexity.
                // For example, when driver B is ungrouped and driver A, C, D is grouped, you could have something like this:
                //     TaskWide   :     [B]
                //     Lifespan 10:  A  [ ]  C   D
                //     Lifespan 20: [A]      C   D
                //     Lifespan 30:  A  [ ]  C   D
                // In this example, Lifespan 30 cannot start executing drivers in pipeline C because pipeline B
                // hasn't finished scheduling yet (albeit in a different lifespan).
                // Similarly, it wouldn't make sense for TaskWide to start executing drivers in pipeline B until at least
                // one lifespan has finished scheduling pipeline A.
                // This is why getSchedulingPlanNode returns an Optional.
                while (true) {
                    Optional<PlanNodeId> optionalSchedulingPlanNode = schedulingLifespan.getSchedulingPlanNode();
                    if (optionalSchedulingPlanNode.isEmpty()) {
                        break;
                    }
                    PlanNodeId schedulingPlanNode = optionalSchedulingPlanNode.get();
                    // driverRunnerFactoriesWithSplitLifeCycle存储的PlanNode实际上就是Source Node类型,因此这些Split
                    // 对应了数据源的数据,因此需要被Repartitioning,以便能够被Trino对数据进行分桶(重分区),满足在之前章节讲到的
                    // PartitionId -> WorkNode的数据分发策略。
                    DriverSplitRunnerFactory partitionedDriverRunnerFactory = driverRunnerFactoriesWithSplitLifeCycle.get(schedulingPlanNode);

                    PendingSplits pendingSplits = pendingSplitsByPlanNode.get(schedulingPlanNode).getLifespan(lifespan);

                    // Enqueue driver runners with driver group lifecycle for this driver life cycle, if not already enqueued.
                    if (!lifespan.isTaskWide() && !schedulingLifespan.getAndSetDriversForDriverGroupLifeCycleScheduled()) {
                        // 如果当前要调度的LifeSpan的类型为Grouped,同时还没有被调度,就走这里,为当前SqlTask的所有的Pipeline
                        // 创建DriverRunner。
                        // 此时,这个SqlTask是某个一个中继Stage/PlanFragment的某个分区的任务实例,因此属于这个LifSpan的Split的
                        // 信息已经确定了,而且已经被分区过了,因此内部会调用enqueueDriverSplitRunner(true, runners);方法,
                        // 直接唤起每一个DriverRunner
                        scheduleDriversForDriverGroupLifeCycle(lifespan);
                    }

                    // Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
                    // 如果lifespan是TASK WIDE,那么这些Split是叶子Split,即TableScan Splits,因此不能直接唤起它们的Runner,
                    // 需要根据当前Worker Node的负载进行调度
                    ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder();
                    for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) {
                        // create a new driver for the split
                        runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));
                    }
                    enqueueDriverSplitRunner(false, runners.build());

                    // If all driver runners have been enqueued for this plan node and driver life cycle combination,
                    // move on to the next plan node.
                    if (pendingSplits.getState() != NO_MORE_SPLITS) {
                        break;
                    }
                    // 到这里,不会再有新的Split了,那么就进当前的SqlTaskExecution实例进行清理
                    partitionedDriverRunnerFactory.noMoreDriverRunner(ImmutableList.of(lifespan));
                    pendingSplits.markAsCleanedUp();

                    schedulingLifespan.nextPlanNode();
                    madeProgress = true;
                    if (schedulingLifespan.isDone()) {
                        break;
                    }
                }
            }

            if (!madeProgress) {
                break;
            }
        }

        if (sourceUpdate.isNoMoreSplits()) {
            // 通知SchedulingLifespanManager,当前TaskSource对应的PlanNode的工作已经完成。
            schedulingLifespanManager.noMoreSplits(sourceUpdate.getPlanNodeId());
        }
    }

SqlTaskExecution::scheduleDriversForTaskLifeCycle

SqlTask接收到创建请求时,会尝试创建SqlTaskExecution,作为此SqlTask的执行实体,并在完成实例创建后,调用
scheduleDriversForTaskLifeCycle()方法开始调度。

    // scheduleDriversForTaskLifeCycle and scheduleDriversForDriverGroupLifeCycle are similar.
    // They are invoked under different circumstances, and schedules a disjoint set of drivers, as suggested by their names.
    // They also have a few differences, making it more convenient to keep the two methods separate.
    private void scheduleDriversForTaskLifeCycle()
    {
        // This method is called at the beginning of the task.
        // It schedules drivers for all the pipelines that have task life cycle.
        List<DriverSplitRunner> runners = new ArrayList<>();
        for (DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithTaskLifeCycle) {
            for (int i = 0; i < driverRunnerFactory.getDriverInstances().orElse(1); i++) {
                runners.add(driverRunnerFactory.createDriverRunner(null, Lifespan.taskWide()));
            }
        }
        // driverRunnerFactoriesWithTaskLifeCycle存储的是中继续Stage/PlanFragment对应的某个分区的DriverSplitRunners,因此可以立即执行
        enqueueDriverSplitRunner(true, runners);
        for (DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithTaskLifeCycle) {
            driverRunnerFactory.noMoreDriverRunner(ImmutableList.of(Lifespan.taskWide()));
            verify(driverRunnerFactory.isNoMoreDriverRunner());
        }
    }

SqlTaskExecution::scheduleDriversForDriverGroupLifeCycle

    private void scheduleDriversForDriverGroupLifeCycle(Lifespan lifespan)
    {
        // This method is called when a split that belongs to a previously unseen driver group is scheduled.
        // It schedules drivers for all the pipelines that have driver group life cycle.
        if (lifespan.isTaskWide()) {
            checkArgument(driverRunnerFactoriesWithDriverGroupLifeCycle.isEmpty(), "Instantiating pipeline of driver group lifecycle at task level is not allowed");
            return;
        }

        List<DriverSplitRunner> runners = new ArrayList<>();
        for (DriverSplitRunnerFactory driverSplitRunnerFactory : driverRunnerFactoriesWithDriverGroupLifeCycle) {
            for (int i = 0; i < driverSplitRunnerFactory.getDriverInstances().orElse(1); i++) {
                runners.add(driverSplitRunnerFactory.createDriverRunner(null, lifespan));
            }
        }
        // 与scheduleDriversForTaskLifeCycle方法类似,这里的DriverSplitRunners是属于某个中继Stage/PlanFragment,可以立即执行
        enqueueDriverSplitRunner(true, runners);
        for (DriverSplitRunnerFactory driverRunnerFactory : driverRunnerFactoriesWithDriverGroupLifeCycle) {
            driverRunnerFactory.noMoreDriverRunner(ImmutableList.of(lifespan));
        }
    }

DriverSplitRunner的执行

DriverSplitRunner,负责应用一组物理Operators到一个Split上,表示一段完整的数据处理过程,且数据处理的最小单元是Page

Pipeline,就是一组可独立运行的Operators的描述

Driver是对Pipeline的执行实例,因此一个Pipeline可以由多个Drivers并行执行

时间片

文章来源:https://blog.csdn.net/u014445499/article/details/135206884
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。