Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature node resource monitoring and spill trigger #379

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ default void resumeQuery()
// no-op
}

default void spillQueryRevocableMemory()
{
// no-op
}

interface QueryExecutionFactory<T extends QueryExecution>
{
T createQueryExecution(PreparedQuery preparedQuery, QueryStateMachine stateMachine, String slug, WarningCollector warningCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,6 @@ default QuerySnapshotManager getQuerySnapshotManager(QueryId queryId)
void suspendQuery(QueryId queryId);

void resumeQuery(QueryId queryId);

void spillQueryRevocableMemory(QueryId queryId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public interface RemoteTask

void resume();

void spillRevocableMemory();

void abort();

int getPartitionedSplitCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.prestosql.Session;
import io.prestosql.SystemSessionProperties;
import io.prestosql.cost.CostCalculator;
import io.prestosql.cost.PlanCostEstimate;
import io.prestosql.cost.StatsCalculator;
import io.prestosql.cube.CubeManager;
import io.prestosql.dynamicfilter.DynamicFilterService;
Expand All @@ -43,6 +44,8 @@
import io.prestosql.operator.ForScheduler;
import io.prestosql.query.CachedSqlQueryExecution;
import io.prestosql.query.CachedSqlQueryExecutionPlan;
import io.prestosql.resourcemanager.QueryResourceManager;
import io.prestosql.resourcemanager.QueryResourceManagerService;
import io.prestosql.security.AccessControl;
import io.prestosql.server.BasicQueryInfo;
import io.prestosql.snapshot.MarkerAnnouncer;
Expand All @@ -57,6 +60,7 @@
import io.prestosql.spi.connector.StandardWarningCode;
import io.prestosql.spi.metadata.TableHandle;
import io.prestosql.spi.plan.PlanNode;
import io.prestosql.spi.plan.PlanNodeId;
import io.prestosql.spi.plan.PlanNodeIdAllocator;
import io.prestosql.spi.plan.ProjectNode;
import io.prestosql.spi.plan.Symbol;
Expand Down Expand Up @@ -111,6 +115,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -173,6 +178,7 @@ public class SqlQueryExecution
private final QueryRecoveryManager queryRecoveryManager;
private final WarningCollector warningCollector;
private final AtomicBoolean suspendedWithRecoveryManager = new AtomicBoolean();
private final QueryResourceManager queryResourceManager;

public SqlQueryExecution(
PreparedQuery preparedQuery,
Expand Down Expand Up @@ -203,7 +209,8 @@ public SqlQueryExecution(
DynamicFilterService dynamicFilterService,
HeuristicIndexerManager heuristicIndexerManager,
StateStoreProvider stateStoreProvider,
RecoveryUtils recoveryUtils)
RecoveryUtils recoveryUtils,
QueryResourceManagerService queryResourceManager)
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
this.slug = requireNonNull(slug, "slug is null");
Expand Down Expand Up @@ -235,6 +242,7 @@ public SqlQueryExecution(

this.stateMachine = requireNonNull(stateMachine, "stateMachine is null");
this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null");
this.queryResourceManager = requireNonNull(queryResourceManager, "queryResourceManager is null").createQueryResourceManager(stateMachine.getQueryId(), stateMachine.getSession());

// clear dynamic filter tasks and data created for this query
stateMachine.addStateChangeListener(state -> {
Expand Down Expand Up @@ -473,6 +481,38 @@ private void handleCrossRegionDynamicFilter(PlanRoot plan)
log.debug("queryId=%s, add columnToSymbolMapping into hazelcast success.", queryId + QUERY_COLUMN_NAME_TO_SYMBOL_MAPPING);
}

private void setResourceLimitsFromEstimates(PlanNodeId rootId)
{
/* Todo(Nitin): Re-divide the available resources amongst queries again when new query comes in.. */
PlanCostEstimate estimate = queryPlan.get().getStatsAndCosts().getCosts().get(rootId);
if (estimate != null && estimate != PlanCostEstimate.zero() && estimate != PlanCostEstimate.unknown()) {
double cpuCost = estimate.getCpuCost();
cpuCost = Double.isInfinite(cpuCost) || Double.isNaN(cpuCost) ? 0.0 : cpuCost;

double memCost = estimate.getMaxMemory();
memCost = Double.isInfinite(memCost) || Double.isNaN(memCost) ? 0.0 : memCost;

double ioCost = estimate.getNetworkCost();
ioCost = Double.isInfinite(ioCost) || Double.isNaN(ioCost) ? 0.0 : ioCost;

queryResourceManager.setResourceLimit(new DataSize(memCost, BYTE),
new Duration(cpuCost, TimeUnit.MILLISECONDS),
new DataSize(ioCost, BYTE));
}
}

private void updateQueryResourceStats()
{
SqlQueryScheduler scheduler = queryScheduler.get();
if (scheduler != null) {
Duration totalCpu = scheduler.getTotalCpuTime();
DataSize totalMem = DataSize.succinctBytes(scheduler.getTotalMemoryReservation());
DataSize totalIo = scheduler.getBasicStageStats().getInternalNetworkInputDataSize();

queryResourceManager.updateStats(totalCpu, totalMem, totalIo);
}
}

@Override
public void start()
{
Expand All @@ -486,6 +526,7 @@ public void start()

// analyze query
PlanRoot plan = analyzeQuery();
setResourceLimitsFromEstimates(plan.getRoot().getFragment().getRoot().getId());

try {
handleCrossRegionDynamicFilter(plan);
Expand Down Expand Up @@ -583,7 +624,7 @@ private SqlQueryScheduler createResumeScheduler(OptionalLong snapshotId, PlanRoo
queryRecoveryManager,
// Require same number of tasks to be scheduled, but do not require it if starting from beginning
snapshotId.isPresent() ? queryScheduler.get().getStageTaskCounts() : null,
true);
true, queryResourceManager);
if (snapshotId.isPresent() && snapshotId.getAsLong() != 0) {
// Restore going to happen first, mark the restore state for all stages
scheduler.setResuming(snapshotId.getAsLong());
Expand Down Expand Up @@ -850,7 +891,8 @@ private void planDistribution(PlanRoot plan)
snapshotManager,
queryRecoveryManager,
null,
false);
false,
queryResourceManager);

queryScheduler.set(scheduler);

Expand Down Expand Up @@ -922,6 +964,17 @@ public void resumeQuery()
}
}

@Override
public void spillQueryRevocableMemory()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
SqlQueryScheduler scheduler = queryScheduler.get();
if (scheduler != null) {
scheduler.spillRevocableMem();
}
}
}

@Override
public void cancelStage(StageId stageId)
{
Expand Down Expand Up @@ -1085,6 +1138,8 @@ public static class SqlQueryExecutionFactory
private final StateStoreProvider stateStoreProvider;
private final RecoveryUtils recoveryUtils;

private final QueryResourceManagerService queryResourceManagerService;

@Inject
SqlQueryExecutionFactory(QueryManagerConfig config,
HetuConfig hetuConfig,
Expand All @@ -1111,7 +1166,8 @@ public static class SqlQueryExecutionFactory
DynamicFilterService dynamicFilterService,
HeuristicIndexerManager heuristicIndexerManager,
StateStoreProvider stateStoreProvider,
RecoveryUtils recoveryUtils)
RecoveryUtils recoveryUtils,
QueryResourceManagerService queryResourceManagerService)
{
requireNonNull(config, "config is null");
this.schedulerStats = requireNonNull(schedulerStats, "schedulerStats is null");
Expand Down Expand Up @@ -1139,6 +1195,7 @@ public static class SqlQueryExecutionFactory
this.heuristicIndexerManager = requireNonNull(heuristicIndexerManager, "heuristicIndexerManager is null");
this.stateStoreProvider = requireNonNull(stateStoreProvider, "stateStoreProvider is null");
this.recoveryUtils = requireNonNull(recoveryUtils, "recoveryUtils is null");
this.queryResourceManagerService = requireNonNull(queryResourceManagerService, "queryResourceManagerService is null");
this.loadConfigToService(hetuConfig);
if (hetuConfig.isExecutionPlanCacheEnabled()) {
this.cache = Optional.of(CacheBuilder.newBuilder()
Expand Down Expand Up @@ -1199,7 +1256,8 @@ public QueryExecution createQueryExecution(
this.cache,
heuristicIndexerManager,
stateStoreProvider,
recoveryUtils);
recoveryUtils,
queryResourceManagerService);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,16 @@ public void resumeQuery(QueryId queryId)
.ifPresent(QueryExecution::resumeQuery);
}

@Override
public void spillQueryRevocableMemory(QueryId queryId)
{
requireNonNull(queryId, "queryId is null");
log.debug("Spill any revocable memory in query %s", queryId);

queryTracker.tryGetQuery(queryId)
.ifPresent(QueryExecution::spillQueryRevocableMemory);
}

@Override
@Managed
@Flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ public synchronized void resume()
getAllTasks().forEach(RemoteTask::resume);
}

public synchronized void spillRevocableMem()
{
getAllTasks().forEach(RemoteTask::spillRevocableMemory);
}

public void OnSnapshotXCompleted(boolean capture, long snapshotId)
{
log.debug("OnSnapshotXCompleted() is called!, capture: %b, snapshotId: %d", capture, snapshotId);
Expand Down
101 changes: 69 additions & 32 deletions presto-main/src/main/java/io/prestosql/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

import java.net.URI;
import java.util.List;
Expand Down Expand Up @@ -461,38 +462,9 @@ public TaskInfo suspend(TaskState targetState)
return taskHolder.getFinalTaskInfo();
}

if (taskStateMachine.getState() == RUNNING) {
taskExecution = taskHolder.getTaskExecution();
if (taskExecution != null) {
AtomicLong bytesRevokedAtomic = new AtomicLong();
queryContext.accept(new VoidTraversingQueryContextVisitor<AtomicLong>()
{
@Override
public Void visitQueryContext(QueryContext queryContext, AtomicLong remainingBytesToRevoke)
{
if (remainingBytesToRevoke.get() < 0) {
// exit immediately if no work needs to be done
return null;
}
return super.visitQueryContext(queryContext, remainingBytesToRevoke);
}

@Override
public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong remainingBytesToRevoke)
{
if (remainingBytesToRevoke.get() > 0) {
long revokedBytes = operatorContext.requestMemoryRevoking();
if (revokedBytes > 0) {
remainingBytesToRevoke.addAndGet(revokedBytes);
log.debug("revoked bytes current: %s, total: %s", revokedBytes, remainingBytesToRevoke.get());
}
}
return null;
}
}, bytesRevokedAtomic);

taskExecution.suspendTask();
}
taskExecution = taskHolder.getTaskExecution();
if (spillRevocableMem(taskExecution) > 0) {
taskExecution.suspendTask();
}
}
}
Expand All @@ -507,6 +479,45 @@ public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong rem
return getTaskInfo();
}

@GuardedBy("this")
private long spillRevocableMem(SqlTaskExecution taskExecution)
{
if (taskStateMachine.getState() == RUNNING) {
if (taskExecution != null) {
AtomicLong bytesRevokedAtomic = new AtomicLong(0);
queryContext.accept(new VoidTraversingQueryContextVisitor<AtomicLong>()
{
@Override
public Void visitQueryContext(QueryContext queryContext, AtomicLong remainingBytesToRevoke)
{
if (remainingBytesToRevoke.get() < 0) {
// exit immediately if no work needs to be done
return null;
}
return super.visitQueryContext(queryContext, remainingBytesToRevoke);
}

@Override
public Void visitOperatorContext(OperatorContext operatorContext, AtomicLong remainingBytesToRevoke)
{
if (remainingBytesToRevoke.get() >= 0) {
long revokedBytes = operatorContext.requestMemoryRevoking();
if (revokedBytes > 0) {
remainingBytesToRevoke.addAndGet(revokedBytes);
log.debug("revoked bytes current: %s, total: %s", revokedBytes, remainingBytesToRevoke.get());
}
}
return null;
}
}, bytesRevokedAtomic);

return bytesRevokedAtomic.get();
}
}

return 0;
}

public TaskInfo resume(TaskState targetState)
{
try {
Expand Down Expand Up @@ -534,6 +545,32 @@ public TaskInfo resume(TaskState targetState)
return getTaskInfo();
}

public TaskInfo spill(TaskState targetState)
{
try {
SqlTaskExecution taskExecution;
synchronized (this) {
// is task already complete?
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return taskHolder.getFinalTaskInfo();
}

taskExecution = taskHolder.getTaskExecution();
spillRevocableMem(taskExecution);
}
}
catch (Error e) {
failed(e);
throw e;
}
catch (RuntimeException e) {
failed(e);
}

return getTaskInfo();
}

public ListenableFuture<BufferResult> getTaskResults(OutputBufferId bufferId, long startingSequenceId, DataSize maxSize)
{
requireNonNull(bufferId, "bufferId is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,24 @@ public TaskInfo resumeTask(TaskId taskId, TaskState targetState, String expected
return result;
}

@Override
public TaskInfo spillTask(TaskId taskId, TaskState targetState, String expectedTaskInstanceId)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(targetState, "targetState is null");

SqlTask sqlTask = getTaskOrCreate(expectedTaskInstanceId, taskId);
if (sqlTask == null) {
return null;
}

TaskState oldState = sqlTask.getTaskStatus().getState();
TaskInfo result = sqlTask.spill(targetState);

log.debug("Resuming task %s (instanceId %s). Old state: %s; new state: %s", taskId, expectedTaskInstanceId, oldState, targetState);
return result;
}

@Override
public TaskInfo cancelTask(TaskId taskId, TaskState targetState, String expectedTaskInstanceId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public interface TaskManager

TaskInfo resumeTask(TaskId taskId, TaskState targetState, String expectedTaskInstanceId);

TaskInfo spillTask(TaskId taskId, TaskState targetState, String expectedTaskInstanceId);

/**
* Cancels a task. If the task does not already exist, is is created and then
* canceled. Snapshot: If a worker dies and then brought back, we don't want
Expand Down
Loading