Skip to content

Commit

Permalink
Remove LazyBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Mar 8, 2025
1 parent 53a4f18 commit e5a046b
Show file tree
Hide file tree
Showing 74 changed files with 148 additions and 1,533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.spi.block.Fixed12BlockEncoding;
import io.trino.spi.block.Int128ArrayBlockEncoding;
import io.trino.spi.block.IntArrayBlockEncoding;
import io.trino.spi.block.LazyBlockEncoding;
import io.trino.spi.block.LongArrayBlockEncoding;
import io.trino.spi.block.MapBlockEncoding;
import io.trino.spi.block.RowBlockEncoding;
Expand Down Expand Up @@ -57,7 +56,6 @@ public BlockEncodingManager()
addBlockEncoding(new MapBlockEncoding());
addBlockEncoding(new RowBlockEncoding());
addBlockEncoding(new RunLengthBlockEncoding());
addBlockEncoding(new LazyBlockEncoding());
}

public BlockEncoding getBlockEncodingByName(String encodingName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Page transformPage(Page inputPage)
int positionCount = inputPage.getPositionCount();
checkArgument(positionCount > 0, "positionCount should be > 0, but is %s", positionCount);

Block mergeRow = inputPage.getBlock(mergeRowChannel).getLoadedBlock();
Block mergeRow = inputPage.getBlock(mergeRowChannel);
List<Block> fields = getRowFieldsFromBlock(mergeRow);
List<Block> builder = new ArrayList<>(dataColumnChannels.size() + 4);
for (int channel : dataColumnChannels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
package io.trino.operator;

import com.google.common.base.Throwables;
import com.google.common.base.VerifyException;
import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.LazyBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.block.ValueBlock;
import io.trino.spi.predicate.Domain;
Expand Down Expand Up @@ -162,7 +160,6 @@ public boolean isCollecting()

public void add(Block block)
{
block = block.getLoadedBlock();
if (collectDistinctValues) {
switch (block) {
case ValueBlock valueBlock -> {
Expand All @@ -177,7 +174,6 @@ public void add(Block block)
add(dictionary, dictionaryBlock.getId(i));
}
}
case LazyBlock _ -> throw new VerifyException("Did not expect LazyBlock after loading " + block.getClass().getSimpleName());
}

// if the distinct size is too large, fall back to min max, and drop the distinct values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public LoadCursor add(Page page, int startingPosition)
// Initiate additional actions on close
checkState(currentCursor != null);
pageAccounting.unlockPage();
pageAccounting.loadPageLoadIfNeeded();
// Account for page size after lazy loading (which can change the page size)
pageBytes += pageAccounting.sizeOf();
currentCursor = null;
Expand Down Expand Up @@ -237,7 +236,6 @@ private final class PageAccounting

private final int pageId;
private Page page;
private boolean isPageLoaded;
private long[] rowIds;
// Start off locked to give the caller time to declare which rows to reference
private boolean lockedPage = true;
Expand Down Expand Up @@ -315,14 +313,6 @@ public boolean isCompactionEligible()
return !lockedPage && activePositions * COMPACTION_MIN_FILL_MULTIPLIER < page.getPositionCount();
}

public void loadPageLoadIfNeeded()
{
if (!isPageLoaded && activePositions > 0) {
page = page.getLoadedPage();
isPageLoaded = true;
}
}

public void compact()
{
checkState(!lockedPage, "Should not attempt compaction when page is locked");
Expand All @@ -331,8 +321,6 @@ public void compact()
return;
}

loadPageLoadIfNeeded();

int newIndex = 0;
int[] positionsToKeep = new int[activePositions];
long[] newRowIds = new long[activePositions];
Expand All @@ -354,9 +342,7 @@ public void compact()

public long sizeOf()
{
// Getting the size of a page forces a lazy page to be loaded, so only provide the size after an explicit decision to load
long loadedPageSize = isPageLoaded ? page.getSizeInBytes() : 0;
return PAGE_ACCOUNTING_INSTANCE_SIZE + loadedPageSize + SizeOf.sizeOf(rowIds);
return PAGE_ACCOUNTING_INSTANCE_SIZE + page.getSizeInBytes() + SizeOf.sizeOf(rowIds);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,7 @@ private void processInput(Page page)
startPosition = nextGroupStart;
}
else {
// late materialization requires that page being locally stored is materialized before the next one is fetched
currentGroup = page.getRegion(page.getPositionCount() - 1, 1).getLoadedPage();
currentGroup = page.getRegion(page.getPositionCount() - 1, 1);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,6 @@ public Page getOutput()
Page page = null;
if (sourcePage != null) {
page = sourcePage.getPage();
// assure the page is in memory before handing to another operator
page = page.getLoadedPage();

// update operator stats
long endCompletedBytes = source.getCompletedBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public WorkProcessorSourceOperatorAdapter(OperatorContext operatorContext, WorkP
operatorContext.getDriverContext().getYieldSignal(),
WorkProcessor.create(splitBuffer));
this.pages = sourceOperator.getOutputPages()
.map(Page::getLoadedPage)
.withProcessStateMonitor(state -> updateOperatorStats())
.finishWhen(() -> operatorFinishing);
operatorContext.setInfoSupplier(() -> sourceOperator.getOperatorInfo().orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.ByteArrayBlock;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.LazyBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.block.ValueBlock;

Expand All @@ -50,14 +49,12 @@
import static io.airlift.bytecode.expression.BytecodeExpressions.constantFalse;
import static io.airlift.bytecode.expression.BytecodeExpressions.constantInt;
import static io.airlift.bytecode.expression.BytecodeExpressions.constantNull;
import static io.airlift.bytecode.expression.BytecodeExpressions.constantString;
import static io.airlift.bytecode.expression.BytecodeExpressions.equal;
import static io.airlift.bytecode.expression.BytecodeExpressions.invokeStatic;
import static io.airlift.bytecode.expression.BytecodeExpressions.isNotNull;
import static io.airlift.bytecode.expression.BytecodeExpressions.isNull;
import static io.airlift.bytecode.expression.BytecodeExpressions.lessThan;
import static io.airlift.bytecode.expression.BytecodeExpressions.newArray;
import static io.airlift.bytecode.expression.BytecodeExpressions.newInstance;
import static io.airlift.bytecode.expression.BytecodeExpressions.not;
import static io.airlift.bytecode.expression.BytecodeExpressions.notEqual;
import static io.airlift.bytecode.expression.BytecodeExpressions.or;
Expand Down Expand Up @@ -104,11 +101,6 @@ public static Constructor<? extends AggregationMaskBuilder> generateAggregationM
.ifTrue(invokeStatic(AggregationMask.class, "createSelectNone", AggregationMask.class, positionCount).ret()));

Variable maskBlock = scope.declareVariable("maskBlock", body, maskBlockParameter.invoke("orElse", Object.class, constantNull(Object.class)).cast(Block.class));
body.append(new IfStatement()
.condition(maskBlock.instanceOf(LazyBlock.class))
.ifTrue(new BytecodeBlock()
.append(newInstance(IllegalArgumentException.class, constantString("mask block must not be a LazyBlock")))
.throwObject()));
Variable hasMaskBlock = scope.declareVariable("hasMaskBlock", body, isNotNull(maskBlock));
Variable maskBlockMayHaveNull = scope.declareVariable(
"maskBlockMayHaveNull",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,13 @@ public void processPage(int groupCount, int[] groupIds, Page page)
Page arguments = page.getColumns(inputChannels);
Optional<Block> maskBlock = Optional.empty();
if (maskChannel.isPresent()) {
maskBlock = Optional.of(page.getBlock(maskChannel.getAsInt()).getLoadedBlock());
maskBlock = Optional.of(page.getBlock(maskChannel.getAsInt()));
}
AggregationMask mask = maskBuilder.buildAggregationMask(arguments, maskBlock);

if (mask.isSelectNone()) {
return;
}
// Unwrap any LazyBlock values before evaluating the accumulator
arguments = arguments.getLoadedPage();
long start = System.nanoTime();
accumulator.addInput(groupIds, arguments, mask);
metrics.recordAccumulatorUpdateTimeSince(start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ public void close() {}
public Work<?> processPage(Page page)
{
if (groupedAggregators.isEmpty()) {
return new MeasuredGroupByHashWork<>(groupByHash.addPage(page.getLoadedPage(groupByChannels)), aggregationMetrics);
return new MeasuredGroupByHashWork<>(groupByHash.addPage(page.getColumns(groupByChannels)), aggregationMetrics);
}
return new TransformWork<>(
new MeasuredGroupByHashWork<>(groupByHash.getGroupIds(page.getLoadedPage(groupByChannels)), aggregationMetrics),
new MeasuredGroupByHashWork<>(groupByHash.getGroupIds(page.getColumns(groupByChannels)), aggregationMetrics),
groupByIdBlock -> {
int groupCount = groupByHash.getGroupCount();
for (GroupedAggregator groupedAggregator : groupedAggregators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
public class PageChannelSelector
implements Function<Page, Page>
{
// No channels need to be remapped, only ensure that all page blocks are loaded
private static final Function<Page, Page> GET_LOADED_PAGE = Page::getLoadedPage;

private final int[] channels;

public PageChannelSelector(int... channels)
Expand All @@ -37,12 +34,6 @@ public PageChannelSelector(int... channels)
@Override
public Page apply(Page page)
{
// Ensure the channels that are emitted are fully loaded and in the correct order
return page.getLoadedPage(channels);
}

public static Function<Page, Page> identitySelection()
{
return GET_LOADED_PAGE;
return page.getColumns(channels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public JoinProbeFactory(int[] probeOutputChannels, List<Integer> probeJoinChanne

public JoinProbe createJoinProbe(Page page)
{
Page probePage = page.getLoadedPage(probeJoinChannels);
return new JoinProbe(probeOutputChannels, page, probePage, probeHashChannel >= 0 ? page.getBlock(probeHashChannel).getLoadedBlock() : null);
Page probePage = page.getColumns(probeJoinChannels);
return new JoinProbe(probeOutputChannels, page, probePage, probeHashChannel >= 0 ? page.getBlock(probeHashChannel) : null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Page build(JoinProbe probe)
if (!isSequentialProbeIndices || outputPositions == 0) {
int[] probeIndices = probeIndexBuilder.toIntArray();
for (int i = 0; i < probeOutputChannels.length; i++) {
blocks[i] = unwrapLoadedBlock(probePage.getBlock(probeOutputChannels[i]).getPositions(probeIndices, 0, outputPositions));
blocks[i] = probePage.getBlock(probeOutputChannels[i]).getPositions(probeIndices, 0, outputPositions);
}
}
else {
Expand All @@ -127,7 +127,7 @@ public Page build(JoinProbe probe)
// only a subregion of the block should be output
block = block.getRegion(startRegion, outputPositions);
}
blocks[i] = unwrapLoadedBlock(block);
blocks[i] = block;
}
}

Expand All @@ -148,13 +148,6 @@ public String toString()
.toString();
}

private static Block unwrapLoadedBlock(Block filteredProbeBlock)
{
// Lazy blocks (e.g. used in filter condition) could be loaded during filter evaluation.
// Unwrap them to reduce overhead of further processing.
return filteredProbeBlock.isLoaded() ? filteredProbeBlock.getLoadedBlock() : filteredProbeBlock;
}

private void appendProbeIndex(JoinProbe probe)
{
int position = probe.getPosition();
Expand Down Expand Up @@ -206,7 +199,6 @@ private long getEstimatedProbeRowSize(JoinProbe probe)
for (int index : probe.getOutputChannels()) {
Block block = probe.getPage().getBlock(index);
// Estimate the size of the probe row
// TODO: improve estimation for unloaded blocks by making it similar as in PageProcessor
estimatedProbeRowSize += block.getSizeInBytes() / block.getPositionCount();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public JoinProbeFactory(List<Integer> probeOutputChannels, List<Integer> probeJo

public JoinProbe createJoinProbe(Page page, LookupSource lookupSource)
{
Page probePage = page.getLoadedPage(probeJoinChannels);
return new JoinProbe(probeOutputChannels, page, probePage, lookupSource, probeHashChannel >= 0 ? page.getBlock(probeHashChannel).getLoadedBlock() : null, hasFilter);
Page probePage = page.getColumns(probeJoinChannels);
return new JoinProbe(probeOutputChannels, page, probePage, lookupSource, probeHashChannel >= 0 ? page.getBlock(probeHashChannel) : null, hasFilter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public Page build(JoinProbe probe)
if (!isSequentialProbeIndices || outputPositions == 0) {
int[] probeIndices = probeIndexBuilder.toIntArray();
for (int i = 0; i < probeOutputChannels.length; i++) {
blocks[i] = unwrapLoadedBlock(probePage.getBlock(probeOutputChannels[i]).getPositions(probeIndices, 0, outputPositions));
blocks[i] = probePage.getBlock(probeOutputChannels[i]).getPositions(probeIndices, 0, outputPositions);
}
}
else {
Expand All @@ -147,7 +147,7 @@ public Page build(JoinProbe probe)
// only a subregion of the block should be output
block = block.getRegion(startRegion, outputPositions);
}
blocks[i] = unwrapLoadedBlock(block);
blocks[i] = block;
}
}

Expand Down Expand Up @@ -194,13 +194,6 @@ public String toString()
.toString();
}

private static Block unwrapLoadedBlock(Block filteredProbeBlock)
{
// Lazy blocks (e.g. used in filter condition) could be loaded during filter evaluation.
// Unwrap them to reduce overhead of further processing.
return filteredProbeBlock.isLoaded() ? filteredProbeBlock.getLoadedBlock() : filteredProbeBlock;
}

private void appendProbeIndex(JoinProbe probe)
{
int position = probe.getPosition();
Expand Down Expand Up @@ -252,7 +245,6 @@ private long getEstimatedProbeRowSize(JoinProbe probe)
for (int index : probe.getOutputChannels()) {
Block block = probe.getPage().getBlock(index);
// Estimate the size of the probe row
// TODO: improve estimation for unloaded blocks by making it similar as in PageProcessor
estimatedProbeRowSize += block.getSizeInBytes() / block.getPositionCount();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.trino.spi.block.Block;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.LazyBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.block.ValueBlock;
import io.trino.type.BlockTypeOperators.BlockPositionIsIdentical;
Expand Down Expand Up @@ -103,7 +102,6 @@ else if (state == State.DICTIONARY && this.dictionary == dictionary) {
transitionToDirect();
delegate.append(positions, valueBlock);
}
case LazyBlock ignore -> throw new IllegalArgumentException("Unsupported block type: " + source.getClass().getSimpleName());
}
}

Expand Down Expand Up @@ -149,7 +147,6 @@ public void append(int position, Block source)
case RunLengthEncodedBlock runLengthEncodedBlock -> delegate.append(0, runLengthEncodedBlock.getValue());
case DictionaryBlock dictionaryBlock -> delegate.append(dictionaryBlock.getId(position), dictionaryBlock.getDictionary());
case ValueBlock valueBlock -> delegate.append(position, valueBlock);
case LazyBlock ignore -> throw new IllegalArgumentException("Unsupported block type: " + source.getClass().getSimpleName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public InputChannels getInputChannels()
@Override
public SelectedPositions filter(ConnectorSession session, SourcePage page)
{
Block block = page.getBlock(0).getLoadedBlock();
Block block = page.getBlock(0);

if (block instanceof RunLengthEncodedBlock runLengthEncodedBlock) {
Block value = runLengthEncodedBlock.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,6 @@ public Block getResult()

private void setupDictionaryBlockProjection()
{
block = block.getLoadedBlock();

Optional<Block> dictionary = Optional.empty();
if (block instanceof RunLengthEncodedBlock) {
dictionary = Optional.of(((RunLengthEncodedBlock) block).getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private InputChannelsSourcePage(SourcePage sourcePage, int[] channels, @Nullable
if (eagerlyLoad != null) {
for (int channel = 0; channel < eagerlyLoad.length; channel++) {
if (eagerlyLoad[channel]) {
this.blocks[channel] = sourcePage.getBlock(channels[channel]).getLoadedBlock();
this.blocks[channel] = sourcePage.getBlock(channels[channel]);
}
}
}
Expand Down
Loading

0 comments on commit e5a046b

Please sign in to comment.