Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --split-queries option to export-pg-from-queries #105

Merged
merged 8 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### New Features and Improvements:

- Add --split-queries option to export-pg-from-queries. When invoked, `range()` steps will be injected in the beginning of queries, and they will be split according to the configured concurrency.

## Neptune Export v1.1.3 (Release Date: November 30, 2023):

### New Features and Improvements:
Expand Down
48 changes: 46 additions & 2 deletions docs/export-pg-from-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
SYNOPSIS
neptune-export.sh export-pg-from-queries
[ --alb-endpoint <applicationLoadBalancerEndpoint> ]
[ --approx-edge-count <approxEdgeCount> ]
[ --approx-node-count <approxNodeCount> ]
[ {-b | --batch-size} <batchSize> ] [ --clone-cluster ]
[ --clone-cluster-correlation-id <cloneCorrelationId> ]
[ --clone-cluster-instance-type <cloneClusterInstanceType> ]
Expand All @@ -15,15 +17,17 @@
[ {-e | --endpoint} <endpoint>... ] [ --export-id <exportId> ]
[ {-f | --queries-file} <queriesFile> ] [ --format <format> ]
[ --include-type-definitions ] [ --janus ]
[ --lb-port <loadBalancerPort> ] [ --log-level <log level> ]
[ --lb-port <loadBalancerPort> ] [ --limit <limit> ]
[ --log-level <log level> ]
[ --max-content-length <maxContentLength> ] [ --merge-files ]
[ --nlb-endpoint <networkLoadBalancerEndpoint> ]
[ {-o | --output} <output> ] [ {-p | --port} <port> ]
[ --partition-directories <partitionDirectories> ]
[ --per-label-directories ] [ --profile <profiles>... ]
[ {-q | --queries | --query | --gremlin} <queries>... ]
[ {-r | --range | --range-size} <rangeSize> ]
[ {--region | --stream-region} <region> ]
[ --serializer <serializer> ]
[ --serializer <serializer> ] [ --skip <skip> ] [--split-queries]
[ --stream-large-record-strategy <largeStreamRecordHandlingStrategy> ]
[ --stream-name <streamName> ] [ --stream-role-arn <streamRoleArn> ]
[ --stream-role-external-id <streamRoleExternalId> ]
Expand All @@ -42,8 +46,20 @@

This option is part of the group 'load-balancer' from which only
one option may be specified


--approx-edge-count <approxEdgeCount>
Approximate number of edges in the graph.

This option may occur a maximum of 1 times


--approx-node-count <approxNodeCount>
Approximate number of nodes in the graph.

This option may occur a maximum of 1 times


-b <batchSize>, --batch-size <batchSize>
Batch size (optional, default 64). Reduce this number if your
queries trigger CorruptedFrameExceptions.
Expand Down Expand Up @@ -248,6 +264,12 @@
This options value represents a port and must fall in one of the
following port ranges: 1-1023, 1024-49151


--limit <limit>
Maximum number of items to export (optional).

This option may occur a maximum of 1 times


--log-level <log level>
Log level (optional, default 'error').
Expand Down Expand Up @@ -329,6 +351,13 @@
Gremlin queries (format: name="semi-colon-separated list of
queries" OR "semi-colon-separated list of queries").


-r <rangeSize>, --range <rangeSize>, --range-size <rangeSize>
Number of items to fetch per request (optional).

This option may occur a maximum of 1 times


--region <region>, --stream-region <region>
AWS Region in which your Amazon Kinesis Data Stream is located.

Expand All @@ -350,6 +379,21 @@

This option may occur a maximum of 1 times


--skip <skip>
Number of items to skip (optional).

This option may occur a maximum of 1 times


--split-queries
Uses `range()` steps to split provided queries into
`--concurrency` queries to run concurrently. `range()` steps
will be injected at the beginning of the queries. May lead to
altered results for certain queries.

This option may occur a maximum of 1 times


--stream-large-record-strategy <largeStreamRecordHandlingStrategy>
Strategy for dealing with records to be sent to Amazon Kinesis that
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@
<version>6.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.amazonaws.services.neptune.io.DirectoryStructure;
import com.amazonaws.services.neptune.propertygraph.ExportStats;
import com.amazonaws.services.neptune.propertygraph.GremlinFilters;
import com.amazonaws.services.neptune.propertygraph.LazyQueriesRangeFactoryProvider;
import com.amazonaws.services.neptune.propertygraph.NamedQueries;
import com.amazonaws.services.neptune.propertygraph.NamedQueriesCollection;
import com.amazonaws.services.neptune.propertygraph.NeptuneGremlinClient;
Expand Down Expand Up @@ -68,6 +69,9 @@ public class ExportPropertyGraphFromGremlinQueries extends NeptuneExportCommand
@Inject
private PropertyGraphScopeModule scope = new PropertyGraphScopeModule();

@Inject
private PropertyGraphRangeModule range = new PropertyGraphRangeModule();

@Option(name = {"-q", "--queries", "--query", "--gremlin"}, description = "Gremlin queries (format: name=\"semi-colon-separated list of queries\" OR \"semi-colon-separated list of queries\").",
arity = 1, typeConverterProvider = NameQueriesTypeConverter.class)
private List<NamedQueries> queries = new ArrayList<>();
Expand All @@ -93,6 +97,11 @@ public class ExportPropertyGraphFromGremlinQueries extends NeptuneExportCommand
@Once
private boolean structuredOutput = false;

@Option(name = {"--split-queries"}, description = "Uses `range()` steps to split provided queries into `--concurrency` queries to run concurrently. " +
"`range()` steps will be injected at the beginning of the queries. May lead to altered results for certain queries.")
@Once
private boolean splitQueries = false;

@Override
public void run() {

Expand Down Expand Up @@ -133,6 +142,16 @@ public void run() {
try (NeptuneGremlinClient client = NeptuneGremlinClient.create(cluster, serialization.config());
NeptuneGremlinClient.QueryClient queryClient = client.queryClient()) {

if (splitQueries) {
namedQueries.splitQueries(new LazyQueriesRangeFactoryProvider(
range.config(),
concurrency.config(),
client,
exportSpecifications,
featureToggles()
));
}

QueryJob queryJob = new QueryJob(
namedQueries.flatten(),
queryClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public long approxCount(LabelsFilter labelsFilter, RangeConfig rangeConfig, Grem

Long count = t.next();

stats.setEdgeCount(count);
if(stats != null) {
stats.setEdgeCount(count);
}
return count;
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License").
You may not use this file except in compliance with the License.
A copy of the License is located at
http://www.apache.org/licenses/LICENSE-2.0
or in the "license" file accompanying this file. This file is distributed
on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the specific language governing
permissions and limitations under the License.
*/

package com.amazonaws.services.neptune.propertygraph;

import com.amazonaws.services.neptune.cluster.ConcurrencyConfig;
import com.amazonaws.services.neptune.export.FeatureToggles;
import com.amazonaws.services.neptune.propertygraph.schema.ExportSpecification;
import com.amazonaws.services.neptune.propertygraph.schema.GraphElementType;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;

import java.util.Collection;
import java.util.Optional;

public class LazyQueriesRangeFactoryProvider {

private final RangeConfig rangeConfig;
private final ConcurrencyConfig concurrencyConfig;
private final NeptuneGremlinClient client;
private final LabelsFilter nodeLabelsFilter;
private final LabelsFilter edgeLabelsFilter;
private final FeatureToggles featureToggles;

private RangeFactory nodesRangeFactory;
private RangeFactory edgesRangeFactory;

public LazyQueriesRangeFactoryProvider(RangeConfig rangeConfig,
ConcurrencyConfig concurrencyConfig,
NeptuneGremlinClient client,
Collection<ExportSpecification> exportSpecifications,
FeatureToggles featureToggles) {
this.rangeConfig = rangeConfig;
this.concurrencyConfig = concurrencyConfig;
this.client = client;
this.featureToggles = featureToggles;

Optional<ExportSpecification> nodeSpecification = exportSpecifications.stream()
.filter(e -> e.getGraphElementType().equals(GraphElementType.nodes)).findFirst();
nodeLabelsFilter = nodeSpecification.isPresent() ?
nodeSpecification.get().getLabelsFilter() :
new AllLabels(NodeLabelStrategy.nodeLabelsOnly);

Optional<ExportSpecification> edgeSpecification = exportSpecifications.stream()
.filter(e -> e.getGraphElementType().equals(GraphElementType.edges)).findFirst();
edgeLabelsFilter = edgeSpecification.isPresent() ?
edgeSpecification.get().getLabelsFilter() :
new AllLabels(EdgeLabelStrategy.edgeLabelsOnly);
}

public RangeFactory getNodesRangeFactory() {
if(nodesRangeFactory == null) {
try (GraphTraversalSource g = client.newTraversalSource()) {
nodesRangeFactory = RangeFactory.create(
new NodesClient(g, false, null, featureToggles),
nodeLabelsFilter, GremlinFilters.EMPTY, rangeConfig, concurrencyConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return nodesRangeFactory;
}

public RangeFactory getEdgesRangeFactory() {
if(edgesRangeFactory == null) {
try (GraphTraversalSource g = client.newTraversalSource()) {
edgesRangeFactory = RangeFactory.create(
new EdgesClient(g, false, null, featureToggles),
edgeLabelsFilter, GremlinFilters.EMPTY, rangeConfig, concurrencyConfig);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return edgesRangeFactory;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static NamedQueries fromJson(JsonNode json) {

public NamedQueries(String name, Collection<String> queries) {
this.name = name;
this.queries = queries;
this.queries = Collections.synchronizedCollection(queries);
}

public String name() {
Expand All @@ -71,4 +71,39 @@ public ArrayNode toJson() {

return json;
}

/**
* Splits each query into n smaller queries.
*/
public void split(LazyQueriesRangeFactoryProvider rangeFactoryProvider) {
Collection<String> splitQueries = Collections.synchronizedCollection(new ArrayList<>());
queries.forEach(q -> {
RangeFactory rangeFactory = null;
if (q.startsWith("g.V()")) {
rangeFactory = rangeFactoryProvider.getNodesRangeFactory();
} else if (q.startsWith("g.E()")) {
rangeFactory = rangeFactoryProvider.getEdgesRangeFactory();
}

if (rangeFactory != null) {
while (!rangeFactory.isExhausted()) {
Range range = rangeFactory.nextRange();
if (range.isAll()) {
// Keep unaltered query if range is all. This works-around an issue where range(0,-1) does not
// produce any results in some versions of Neptune.
splitQueries.add(q);
break;
}
if (q.startsWith("g.V()")) {
splitQueries.add(q.replaceFirst("g.V\\(\\)", "g.V()."+range.toString()));
} else if (q.startsWith("g.E()")) {
splitQueries.add(q.replaceFirst("g.E\\(\\)", "g.E()."+range.toString()));
}
}
rangeFactory.reset();
}
});
queries.clear();
queries.addAll(splitQueries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,12 @@ public JsonNode toJson(Object o) {
return json;
}

/**
* Splits each query into n smaller queries.
*/
public void splitQueries(LazyQueriesRangeFactoryProvider rangeFactoryProvider) {
Collection<NamedQueries> splitQueries = new ArrayList<>();
namedQueries.forEach(q -> q.split(rangeFactoryProvider));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ public long approxCount(LabelsFilter labelsFilter, RangeConfig rangeConfig, Grem
logger.info(GremlinQueryDebugger.queryAsString(t));

Long count = t.next();
stats.setNodeCount(count);
if(stats != null) {
stats.setNodeCount(count);
}
return count;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static RangeFactory create(GraphClient<?> graphClient,
private final long rangeUpperBound;
private final AtomicLong currentEnd;
private final long numberOfItemsToExport;
private final long skip;

private RangeFactory(long rangeSize,
long limit,
Expand All @@ -71,6 +72,7 @@ private RangeFactory(long rangeSize,
this.rangeSize = rangeSize;
this.exportAll = limit == Long.MAX_VALUE;
this.concurrency = concurrency;
this.skip = skip;
if (exportAll){
this.rangeUpperBound = estimatedNumberOfItemsInGraph;
this.numberOfItemsToExport = estimatedNumberOfItemsInGraph - skip;
Expand Down Expand Up @@ -112,4 +114,8 @@ public boolean isExhausted() {
public int concurrency() {
return concurrency;
}

public void reset() {
currentEnd.set(skip);
}
}
Loading