Skip to content

Commit db96c66

Browse files
wip
1 parent 83db408 commit db96c66

File tree

18 files changed

+834
-407
lines changed

18 files changed

+834
-407
lines changed

client-spark/src/test/java/io/whitefox/api/client/ApiUtils.java

Lines changed: 83 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import io.whitefox.api.utils.ApiClient;
66
import io.whitefox.api.utils.ApiException;
7-
87
import java.io.File;
98
import java.io.FileInputStream;
109
import java.io.IOException;
@@ -14,99 +13,99 @@
1413
import java.util.function.Supplier;
1514

1615
public class ApiUtils {
17-
/**
18-
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
19-
* in that case, returns the result of the call of the second argument (defaultValue).
20-
* If defaultValue is not dynamic, you can use also {@link ApiUtils#recoverConflict recoverConflinct}.
21-
*/
22-
public static <T> T recoverConflictLazy(Supplier<T> f, Supplier<T> defaultValue) {
23-
try {
24-
return f.get();
25-
} catch (ApiException e) {
26-
if (e.getCode() == 409) {
27-
return defaultValue.get();
28-
} else {
29-
throw e;
30-
}
31-
}
16+
/**
17+
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
18+
* in that case, returns the result of the call of the second argument (defaultValue).
19+
* If defaultValue is not dynamic, you can use also {@link ApiUtils#recoverConflict recoverConflinct}.
20+
*/
21+
public static <T> T recoverConflictLazy(Supplier<T> f, Supplier<T> defaultValue) {
22+
try {
23+
return f.get();
24+
} catch (ApiException e) {
25+
if (e.getCode() == 409) {
26+
return defaultValue.get();
27+
} else {
28+
throw e;
29+
}
3230
}
31+
}
3332

34-
/**
35-
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
36-
* in that case, returns the second argument (defaultValue).
37-
* If defaultValue is dynamic, you can use also {@link ApiUtils#recoverConflictLazy recoverConflictLazy}.
38-
*/
39-
public static <T> T recoverConflict(Supplier<T> f, T defaultValue) {
40-
return recoverConflictLazy(f, new Supplier<T>() {
41-
@Override
42-
public T get() {
43-
return defaultValue;
44-
}
45-
});
46-
}
33+
/**
34+
* Returns the result of the call of the first argument (f) unless it throws an ApiException with HTTP status code 409,
35+
* in that case, returns the second argument (defaultValue).
36+
* If defaultValue is dynamic, you can use also {@link ApiUtils#recoverConflictLazy recoverConflictLazy}.
37+
*/
38+
public static <T> T recoverConflict(Supplier<T> f, T defaultValue) {
39+
return recoverConflictLazy(f, new Supplier<T>() {
40+
@Override
41+
public T get() {
42+
return defaultValue;
43+
}
44+
});
45+
}
4746

48-
/**
49-
* Calls the first argument (f), if the call throws an ApiException with HTTP status code 409 will swallow the exception.
50-
*/
51-
public static <T> void ignoreConflict(Supplier<T> f) {
52-
recoverConflict(f, null);
53-
}
47+
/**
48+
* Calls the first argument (f), if the call throws an ApiException with HTTP status code 409 will swallow the exception.
49+
*/
50+
public static <T> void ignoreConflict(Supplier<T> f) {
51+
recoverConflict(f, null);
52+
}
5453

55-
private static final ObjectMapper objectMapper = new ObjectMapper();
56-
public static final String ENDPOINT_FIELD_NAME = "endpoint";
57-
public static final String BEARER_TOKEN_FIELD_NAME = "bearerToken";
54+
private static final ObjectMapper objectMapper = new ObjectMapper();
55+
public static final String ENDPOINT_FIELD_NAME = "endpoint";
56+
public static final String BEARER_TOKEN_FIELD_NAME = "bearerToken";
5857

59-
/**
60-
* Reads a resource named as the parameter, parses it following
61-
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
62-
* and configures an {@link ApiClient ApiClient} accordingly.
63-
*/
64-
public static ApiClient configureApiClientFromResource(String resourceName) {
65-
try (InputStream is = ApiUtils.class.getClassLoader().getResourceAsStream(resourceName)) {
66-
return configureClientInternal(objectMapper.reader().readTree(is));
67-
} catch (IOException e) {
68-
throw new RuntimeException(String.format("Cannot read %s", resourceName), e);
69-
} catch (NullPointerException e) {
70-
throw new RuntimeException(String.format("Cannot find resource %s", resourceName), e);
71-
}
58+
/**
59+
* Reads a resource named as the parameter, parses it following
60+
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
61+
* and configures an {@link ApiClient ApiClient} accordingly.
62+
*/
63+
public static ApiClient configureApiClientFromResource(String resourceName) {
64+
try (InputStream is = ApiUtils.class.getClassLoader().getResourceAsStream(resourceName)) {
65+
return configureClientInternal(objectMapper.reader().readTree(is));
66+
} catch (IOException e) {
67+
throw new RuntimeException(String.format("Cannot read %s", resourceName), e);
68+
} catch (NullPointerException e) {
69+
throw new RuntimeException(String.format("Cannot find resource %s", resourceName), e);
7270
}
71+
}
7372

74-
/**
75-
* Reads a local file named as the parameter, parses it following
76-
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
77-
* and configures an {@link ApiClient ApiClient} accordingly.
78-
*/
79-
public static ApiClient configureClientFromFile(File file) {
80-
try (InputStream is = new FileInputStream(file)) {
81-
return configureClientInternal(objectMapper.reader().readTree(is));
82-
} catch (IOException e) {
83-
throw new RuntimeException(String.format("Cannot read %s", file), e);
84-
}
73+
/**
74+
* Reads a local file named as the parameter, parses it following
75+
* <a href="https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#profile-file-format">delta sharing specification</a>
76+
* and configures an {@link ApiClient ApiClient} accordingly.
77+
*/
78+
public static ApiClient configureClientFromFile(File file) {
79+
try (InputStream is = new FileInputStream(file)) {
80+
return configureClientInternal(objectMapper.reader().readTree(is));
81+
} catch (IOException e) {
82+
throw new RuntimeException(String.format("Cannot read %s", file), e);
8583
}
84+
}
8685

87-
private static ApiClient configureClientInternal(JsonNode conf) {
88-
var endpointText = getRequiredField(conf, ENDPOINT_FIELD_NAME).asText();
89-
var token = getRequiredField(conf, BEARER_TOKEN_FIELD_NAME).asText();
90-
try {
91-
var endpoint = new URI(endpointText);
92-
var apiClient = new ApiClient();
93-
apiClient.setHost(endpoint.getHost());
94-
apiClient.setPort(endpoint.getPort());
95-
apiClient.setScheme(endpoint.getScheme());
96-
apiClient.setRequestInterceptor(
97-
builder -> builder.header("Authorization", String.format("Bearer %s", token)));
98-
return apiClient;
99-
} catch (URISyntaxException u) {
100-
throw new RuntimeException(String.format("Invalid endpoint syntax %s", endpointText), u);
101-
}
86+
private static ApiClient configureClientInternal(JsonNode conf) {
87+
var endpointText = getRequiredField(conf, ENDPOINT_FIELD_NAME).asText();
88+
var token = getRequiredField(conf, BEARER_TOKEN_FIELD_NAME).asText();
89+
try {
90+
var endpoint = new URI(endpointText);
91+
var apiClient = new ApiClient();
92+
apiClient.setHost(endpoint.getHost());
93+
apiClient.setPort(endpoint.getPort());
94+
apiClient.setScheme(endpoint.getScheme());
95+
apiClient.setRequestInterceptor(
96+
builder -> builder.header("Authorization", String.format("Bearer %s", token)));
97+
return apiClient;
98+
} catch (URISyntaxException u) {
99+
throw new RuntimeException(String.format("Invalid endpoint syntax %s", endpointText), u);
102100
}
101+
}
103102

104-
private static JsonNode getRequiredField(JsonNode node, String fieldName) {
105-
if (node.has(fieldName)) {
106-
return node.get(fieldName);
107-
} else {
108-
throw new RuntimeException(
109-
String.format("Cannot find required field %s in %s", fieldName, node));
110-
}
103+
private static JsonNode getRequiredField(JsonNode node, String fieldName) {
104+
if (node.has(fieldName)) {
105+
return node.get(fieldName);
106+
} else {
107+
throw new RuntimeException(
108+
String.format("Cannot find required field %s in %s", fieldName, node));
111109
}
110+
}
112111
}

protocol/delta-sharing-protocol-api.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ components:
787787

788788
# This is not used for the spec but comes handy for autogeneration
789789
TableQueryResponseObject:
790-
anyOf:
790+
oneOf:
791791
- $ref: '#/components/schemas/ParquetTableQueryResponseObject'
792792
- $ref: '#/components/schemas/DeltaTableQueryResponseObject'
793793
ParquetTableQueryResponseObject:

server/app/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ val serverGeneratorProperties = mapOf(
4949
"dateLibrary" to "java8",
5050
"disallowAdditionalPropertiesIfNotPresent" to "false",
5151
"generateBuilders" to "false",
52+
"legacyDiscriminatorBehavior" to "false",
5253
"generatePom" to "false",
5354
"interfaceOnly" to "true",
5455
"library" to "quarkus",

server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java

Lines changed: 39 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.whitefox.core.*;
66
import io.whitefox.core.Schema;
77
import io.whitefox.core.Share;
8+
import io.whitefox.core.services.DeltaSharingCapabilities;
89
import java.util.*;
910
import java.util.stream.Collectors;
1011

@@ -60,44 +61,44 @@ public static TableQueryResponseObject readTableResult2api(ReadTableResult readT
6061

6162
private static ParquetMetadataObject metadata2Api(Metadata metadata) {
6263
return new ParquetMetadataObject()
63-
.metaData(new ParquetMetadataObjectMetaData()
64-
.numFiles(metadata.numFiles().orElse(null))
65-
.version(metadata.version().orElse(null))
66-
.size(metadata.size().orElse(null))
67-
.id(metadata.id())
68-
.name(metadata.name().orElse(null))
69-
.description(metadata.description().orElse(null))
70-
.format(new ParquetFormatObject().provider(metadata.format().provider()))
71-
.schemaString(metadata.tableSchema().structType().toJson())
72-
.partitionColumns(metadata.partitionColumns())
73-
._configuration(metadata.configuration()));
64+
.metaData(new ParquetMetadataObjectMetaData()
65+
.numFiles(metadata.numFiles().orElse(null))
66+
.version(metadata.version())
67+
.size(metadata.size().orElse(null))
68+
.id(metadata.id())
69+
.name(metadata.name().orElse(null))
70+
.description(metadata.description().orElse(null))
71+
.format(new ParquetFormatObject().provider(metadata.format().provider()))
72+
.schemaString(metadata.tableSchema().structType().toJson())
73+
.partitionColumns(metadata.partitionColumns())
74+
._configuration(metadata.configuration()));
7475
}
7576

7677
private static DeltaProtocolObject protocol2Api(Protocol protocol) {
7778
return new DeltaProtocolObject()
78-
.protocol(new DeltaProtocolObjectProtocol()
79-
.deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol()
80-
.minReaderVersion(protocol.minReaderVersion().orElse(1))
81-
.minWriterVersion(protocol.minWriterVersion().orElse(1))));
79+
.protocol(new DeltaProtocolObjectProtocol()
80+
.deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol()
81+
.minReaderVersion(protocol.minReaderVersion().orElse(1))
82+
.minWriterVersion(protocol.minWriterVersion().orElse(1))));
8283
}
8384

8485
private static DeltaFileObject file2Api(TableFile f) {
8586
return new DeltaFileObject()
86-
.id(f.id())
87-
.version(f.version().orElse(null))
88-
.deletionVectorFileId(null) // TODO
89-
.timestamp(f.timestamp().orElse(null))
90-
.expirationTimestamp(f.expirationTimestamp())
91-
.deltaSingleAction(new DeltaSingleAction()
92-
._file(new DeltaAddFileAction()
93-
.id(f.id())
94-
.url(f.url())
95-
.partitionValues(f.partitionValues())
96-
.size(f.size())
97-
.stats(f.stats().orElse(null))
98-
.version(f.version().orElse(null))
99-
.timestamp(f.timestamp().orElse(null))
100-
.expirationTimestamp(f.expirationTimestamp())));
87+
.id(f.id())
88+
.version(f.version().orElse(null))
89+
.deletionVectorFileId(null) // TODO
90+
.timestamp(f.timestamp().orElse(null))
91+
.expirationTimestamp(f.expirationTimestamp())
92+
.deltaSingleAction(new DeltaSingleAction()
93+
._file(new DeltaAddFileAction()
94+
.id(f.id())
95+
.url(f.url())
96+
.partitionValues(f.partitionValues())
97+
.size(f.size())
98+
.stats(f.stats().orElse(null))
99+
.version(f.version().orElse(null))
100+
.timestamp(f.timestamp().orElse(null))
101+
.expirationTimestamp(f.expirationTimestamp())));
101102
}
102103

103104
public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest(
@@ -113,28 +114,16 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api(
113114
.schema(sharedTable.schema());
114115
}
115116

116-
/**
117-
* NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the
118-
* protocol
119-
* ----
120-
* Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header
121-
* that will be set in the response w/r/t the one received in the request.
122-
* If the request did not contain any, we will return an empty one.
123-
*/
124-
public static Map<String, String> toHeaderCapabilitiesMap(String headerCapabilities) {
125-
if (headerCapabilities == null) {
126-
return Map.of();
127-
}
128-
return Arrays.stream(headerCapabilities.toLowerCase().split(";"))
129-
.map(h -> h.split("="))
130-
.filter(h -> h.length == 2)
131-
.map(splits -> Map.entry(splits[0], splits[1]))
132-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
133-
}
134-
135117
public static TableMetadataResponseObject toTableResponseMetadata(Metadata m) {
136118
return new TableMetadataResponseObject()
137-
.protocol(new ParquetProtocolObject().protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1)))
119+
.protocol(new ParquetProtocolObject()
120+
.protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1)))
138121
.metadata(metadata2Api(m));
139122
}
123+
124+
public static String toCapabilitiesHeader(DeltaSharingCapabilities deltaSharingCapabilities) {
125+
return deltaSharingCapabilities.values().entrySet().stream()
126+
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
127+
.collect(Collectors.joining(";"));
128+
}
140129
}

0 commit comments

Comments
 (0)