Skip to content

Swap http for sdk client#16

Merged
xiazcy merged 11 commits into
Bit-Quill:neo4j-to-neptune-updatefrom
xShinnRyuu:swap-http-for-SDK-client
Sep 4, 2025
Merged

Swap http for sdk client#16
xiazcy merged 11 commits into
Bit-Quill:neo4j-to-neptune-updatefrom
xShinnRyuu:swap-http-for-SDK-client

Conversation

@xShinnRyuu

Copy link
Copy Markdown

Issue #, if available:

Description of changes:
Switched Bulk Loading request to use AWS SDK NeptunedataClient instead of http requests.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

dependabot Bot and others added 8 commits July 22, 2025 10:30
Bumps [aws-cdk-lib](https://github.com/aws/aws-cdk/tree/HEAD/packages/aws-cdk-lib) from 2.185.0 to 2.189.1.
- [Release notes](https://github.com/aws/aws-cdk/releases)
- [Changelog](https://github.com/aws/aws-cdk/blob/v2.189.1/CHANGELOG.v2.alpha.md)
- [Commits](https://github.com/aws/aws-cdk/commits/v2.189.1/packages/aws-cdk-lib)

---
updated-dependencies:
- dependency-name: aws-cdk-lib
  dependency-version: 2.189.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Bumps [brace-expansion](https://github.com/juliangruber/brace-expansion) from 1.1.11 to 1.1.12.
- [Release notes](https://github.com/juliangruber/brace-expansion/releases)
- [Commits](juliangruber/brace-expansion@1.1.11...v1.1.12)

---
updated-dependencies:
- dependency-name: brace-expansion
  dependency-version: 1.1.12
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* Added a stream writer which will write streamed data from neoj4 database to temp csv file, which then gets converted into neptune accepted formats
…label with YAML config (Bit-Quill#6)

* implement an optional `--conversion-config` that takes in a YAML file with label mapping and record skipping configurations, which will update node/edge labels accordingly and skip elements specified by IDs/labels.
* Added in an automated way to process gremlin formatted CSV with Neptune bulk loader
* Added in business logic to validate if bulk load options are present before proceeding, added BulkLoadConfig parameter option, removed redundant flag parameters

* Refactored how the parameters for bulk loading were passed in, updated parameter validations to be more localized and robust, added in unit testing for the BulkLoadConfig class

* Implemented id transformation for both vertex and edge during conversion

* Updated relevant documentations for the ID transformation logic

* Added in usage of S3TransferManager to support file uploads larger than 5GB to S3. Also added support for compressed files in the .gz format.
Added in logic to compress Neo4j exported files during the convert & bulk load process. This logic will due the compress into gzip as part of the s3 upload. Updated relevant unit tests.
@xShinnRyuu xShinnRyuu changed the base branch from master to neo4j-to-neptune-update August 30, 2025 06:58
…d of http requests

* implement an optional `--conversion-config` that takes in a YAML file with label mapping and record skipping configurations, which will update node/edge labels accordingly and skip elements specified by IDs/labels.

* Added in business logic to validate if bulk load options are present before proceeding, added BulkLoadConfig parameter option, removed redundant flag parameters

* Refactored how the parameters for bulk loading were passed in, updated parameter validations to be more localized and robust, added in unit testing for the BulkLoadConfig class

* Implemented id transformation for both vertex and edge during conversion

* Updated relevant documentations for the ID transformation logic

* Added in usage of S3TransferManager to support file uploads larger than 5GB to S3. Also added support for compressed files in the .gz format.
@xShinnRyuu xShinnRyuu force-pushed the swap-http-for-SDK-client branch from 5a35ab8 to b8efe5e Compare September 2, 2025 19:47
private final Boolean monitor;
private final HttpClient httpClient;
private final ObjectMapper objectMapper;
private final boolean monitor;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: just a thought, would it be cleaner to just to have a reference to the BulkLoadConfig instead of copying many values as fields here? For example:

private final Region region;
private final BulkLoadConfig config;

public NeptuneBulkLoader(BulkLoadConfig bulkLoadConfig) {
    this.region = extractRegionFromEndpoint(bulkLoadConfig.getNeptuneEndpoint());
    this.config = bulkLoadConfig;

And then usage of the values would just need to call the getters from the BulkLoadConfig instead (sanitizing the value is necessary).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely cleaner. I will update it

this.neptuneDataClient = NeptunedataClient.builder()
.region(region)
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create("https://" + neptuneEndpoint + ":" + NEPTUNE_PORT))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed the NEPTUNE_PORT is hardcoded to 8182 which is the default. It is possible for the port to be modified as neptune config OR the user using this tool could be using ssh tunnel with a different port.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for bringing that up. I have added it as a configurable parameter in the config and as a cli command

Comment on lines 285 to 286
.whenComplete((result, throwable) -> closeStreams(streamExecutor, pipedOut, pipedIn));
} catch (Exception e) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to handle the cleanup with a finally block.

Suggested change
.whenComplete((result, throwable) -> closeStreams(streamExecutor, pipedOut, pipedIn));
} catch (Exception e) {
} finally {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored how the method does the error catching. It now just throws it up to the caller method and has all of the logging and exception checking done in there.

Removed the catch block and added a finally to close off the thread & streams


.thenApply(ignored -> handleUploadSuccess(localFile, upload))
.exceptionally(throwable -> handleUploadFailure(localFilePath, throwable))
.whenComplete((result, throwable) -> closeStreams(streamExecutor, pipedOut, pipedIn));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, did try with resources not work? Something like:

ExecutorService streamExecutor = Executors.newSingleThreadExecutor();
        try (PipedOutputStream pipedOut = new PipedOutputStream();
             PipedInputStream pipedIn = new PipedInputStream(pipedOut)) {
            CompletableFuture<Void> compressionFuture = startCompressionTask(localFile, pipedOut);
            UploadRequest uploadRequest = createUploadRequest(s3Key, pipedIn, streamExecutor);

            System.err.println("Initiating Transfer Manager upload...");
            Upload upload = transferManager.upload(uploadRequest);

            return CompletableFuture.allOf(upload.completionFuture(), compressionFuture)
                    .thenApply(ignored -> handleUploadSuccess(localFile, upload))
                    .exceptionally(throwable -> handleUploadFailure(localFilePath, throwable));
        } finally {
            streamExecutor.shutdown();
        }

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutorService does not have autocloseable implemented in Java 17 (only after Java 19). Since i had to pull this thread creation out, it was doing some funny interactions with the pipes being declared in the autocloseable try block. The pipes would either close before every thing was finish or just would not connect.

This led me to moving both of the input and output pipes out of the try block to fix the sync issue.

@xShinnRyuu xShinnRyuu force-pushed the swap-http-for-SDK-client branch from 56da8f9 to 4edc7b1 Compare September 3, 2025 22:00
@xShinnRyuu xShinnRyuu force-pushed the swap-http-for-SDK-client branch from 8b3ba03 to ecbaf94 Compare September 4, 2025 20:26
@xiazcy xiazcy merged commit 297ab61 into Bit-Quill:neo4j-to-neptune-update Sep 4, 2025
xiazcy added a commit that referenced this pull request Sep 4, 2025
* Bump aws-cdk-lib in /neptune-gremlin-js/cdk-test-app (awslabs#363)

Bumps [aws-cdk-lib](https://github.com/aws/aws-cdk/tree/HEAD/packages/aws-cdk-lib) from 2.185.0 to 2.189.1.
- [Release notes](https://github.com/aws/aws-cdk/releases)
- [Changelog](https://github.com/aws/aws-cdk/blob/v2.189.1/CHANGELOG.v2.alpha.md)
- [Commits](https://github.com/aws/aws-cdk/commits/v2.189.1/packages/aws-cdk-lib)

---
updated-dependencies:
- dependency-name: aws-cdk-lib
  dependency-version: 2.189.1
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump brace-expansion in /neptune-gremlin-js/cdk-test-app (awslabs#369)

Bumps [brace-expansion](https://github.com/juliangruber/brace-expansion) from 1.1.11 to 1.1.12.
- [Release notes](https://github.com/juliangruber/brace-expansion/releases)
- [Commits](juliangruber/brace-expansion@1.1.11...v1.1.12)

---
updated-dependencies:
- dependency-name: brace-expansion
  dependency-version: 1.1.12
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Added a stream writer to stream csv data from Neo4j (#4)

* Added a stream writer which will write streamed data from neoj4 database to temp csv file, which then gets converted into neptune accepted formats

* Added the ability to update node/edge labels and skip elements by ID/label with YAML config (#6)

* implement an optional `--conversion-config` that takes in a YAML file with label mapping and record skipping configurations, which will update node/edge labels accordingly and skip elements specified by IDs/labels.

* Add automated gremlin formatted csv bulk loader to Neptune (#7)

* Added in an automated way to process gremlin formatted CSV with Neptune bulk loader

* Merge error fixes and removing redundant imports (#8)

* Neo4j to neptune update (#9)

* Added in business logic to validate if bulk load options are present before proceeding, added BulkLoadConfig parameter option, removed redundant flag parameters

* Refactored how the parameters for bulk loading were passed in, updated parameter validations to be more localized and robust, added in unit testing for the BulkLoadConfig class

* Implemented id transformation for both vertex and edge during conversion

* Updated relevant documentations for the ID transformation logic

* Added in usage of S3TransferManager to support file uploads larger than 5GB to S3. Also added support for compressed files in the .gz format.

* Add gzip compression (#11)

Added in logic to compress Neo4j exported files during the convert & bulk load process. This logic will due the compress into gzip as part of the s3 upload. Updated relevant unit tests.

* Switched Bulk Loading request to use AWS SDK NeptunedataClient instead of http requests

* implement an optional `--conversion-config` that takes in a YAML file with label mapping and record skipping configurations, which will update node/edge labels accordingly and skip elements specified by IDs/labels.

* Added in business logic to validate if bulk load options are present before proceeding, added BulkLoadConfig parameter option, removed redundant flag parameters

* Refactored how the parameters for bulk loading were passed in, updated parameter validations to be more localized and robust, added in unit testing for the BulkLoadConfig class

* Implemented id transformation for both vertex and edge during conversion

* Updated relevant documentations for the ID transformation logic

* Added in usage of S3TransferManager to support file uploads larger than 5GB to S3. Also added support for compressed files in the .gz format.

* Addressed review comments, minor updates overall to improve static analysis issues

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Yang Xia <55853655+xiazcy@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants