Skip to content

Commit

Permalink
[OPIK-218] Make column type a list (#416)
Browse files Browse the repository at this point in the history
* [OPIK-218] Make column type a list

* Update clickhouse version

* Sync clickhouse versions

* Reuse assertions

* Sync docker version with test containers
  • Loading branch information
thiagohora authored Oct 18, 2024
1 parent 6c1c78c commit fcf24dc
Show file tree
Hide file tree
Showing 21 changed files with 166 additions and 77 deletions.
8 changes: 7 additions & 1 deletion apps/opik-backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<liquibase-clickhouse.version>0.7.2</liquibase-clickhouse.version>
<clickhouse-java.version>0.6.5</clickhouse-java.version>
<org.mapstruct.version>1.6.2</org.mapstruct.version>
<testcontainers.version>1.20.0</testcontainers.version>
<testcontainers.version>1.20.2</testcontainers.version>
<uuid.java.generator.version>5.1.0</uuid.java.generator.version>
<wiremock.version>3.9.1</wiremock.version>
<redisson.version>3.34.1</redisson.version>
Expand Down Expand Up @@ -254,6 +254,12 @@
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.7.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public record DatasetItemPage(
@JsonView({DatasetItem.View.Public.class}) long total,
@JsonView({DatasetItem.View.Public.class}) Set<Column> columns) implements Page<DatasetItem>{

public record Column(String name, String type) {
public record Column(String name, Set<String> types) {
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
package com.comet.opik.api.validate;

import com.comet.opik.api.DatasetItem;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.validation.ConstraintValidator;
import jakarta.validation.ConstraintValidatorContext;
import org.apache.commons.collections4.MapUtils;

import java.util.Map;
import java.util.Optional;

public class DatasetItemInputValidator implements ConstraintValidator<DatasetItemInputValidation, DatasetItem> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,20 @@ INSERT INTO dataset_items (
private static final String SELECT_DATASET_ITEMS_COUNT = """
SELECT
count(id) AS count,
arrayDistinct(arrayFlatten(groupArray(arrayMap(key -> (key, JSONType(data[key])), mapKeys(data))))) AS columns
arrayFold(
(acc, x) -> mapFromArrays(
arrayMap(key -> key, arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x)))),
arrayMap(key -> arrayDistinct(arrayConcat(acc[key], x[key])), arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x))))
),
arrayDistinct(
arrayFlatten(
groupArray(
arrayMap(key -> map(key, [toString(JSONType(data[key]))]), mapKeys(data))
)
)
),
CAST(map(), 'Map(String, Array(String))')
) AS columns
FROM (
SELECT
id,
Expand All @@ -188,7 +201,20 @@ INSERT INTO dataset_items (
private static final String SELECT_DATASET_ITEMS_WITH_EXPERIMENT_ITEMS_COUNT = """
SELECT
COUNT(DISTINCT di.id) AS count,
arrayDistinct(arrayFlatten(groupArray(arrayMap(key -> (key, JSONType(di.data[key])), mapKeys(di.data))))) AS columns
arrayFold(
(acc, x) -> mapFromArrays(
arrayMap(key -> key, arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x)))),
arrayMap(key -> arrayDistinct(arrayConcat(acc[key], x[key])), arrayDistinct(arrayConcat(mapKeys(acc), mapKeys(x))))
),
arrayDistinct(
arrayFlatten(
groupArray(
arrayMap(key -> map(key, [toString(JSONType(di.data[key]))]), mapKeys(di.data))
)
)
),
CAST(map(), 'Map(String, Array(String))')
) AS columns
FROM (
SELECT
id,
Expand Down Expand Up @@ -770,9 +796,10 @@ public Mono<DatasetItemPage> getItems(@NonNull UUID datasetId, int page, int siz
private Publisher<Map.Entry<Long, Set<Column>>> mapCount(Result result) {
return result.map((row, rowMetadata) -> Map.entry(
row.get(0, Long.class),
((List<List<String>>) row.get(1, List.class))
((Map<String, String[]>) row.get(1, Map.class))
.entrySet()
.stream()
.map(columnArray -> new Column(columnArray.getFirst(), columnArray.get(1)))
.map(columnArray -> new Column(columnArray.getKey(), Set.of(columnArray.getValue())))
.collect(Collectors.toSet())));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.comet.opik.api.resources.utils;

import com.comet.opik.infrastructure.DatabaseAnalyticsFactory;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Map;
Expand All @@ -16,9 +16,7 @@ public static ClickHouseContainer newClickHouseContainer() {
}

public static ClickHouseContainer newClickHouseContainer(boolean reusable) {
// TODO: Use non-deprecated ClickHouseContainer: https://github.com/comet-ml/opik/issues/58
return new ClickHouseContainer(
DockerImageName.parse("clickhouse/clickhouse-server:24.3.8.13-alpine"))
return new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:24.3.6.48-alpine"))
.withReuse(reusable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Mono;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.uuid.Generators;
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
import com.redis.testcontainers.RedisContainer;
Expand All @@ -62,7 +63,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -104,7 +105,9 @@
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.unauthorized;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static java.util.stream.Collectors.flatMapping;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;
Expand Down Expand Up @@ -2854,8 +2857,7 @@ private void getItemAndAssert(DatasetItem expectedDatasetItem, String workspaceN
assertThat(actualEntity.lastUpdatedAt()).isInThePast();
}

private static DatasetItem mergeInputMap(DatasetItem expectedDatasetItem,
Map<String, JsonNode> data) {
private DatasetItem mergeInputMap(DatasetItem expectedDatasetItem, Map<String, JsonNode> data) {

Map<String, JsonNode> newMap = new HashMap<>();

Expand Down Expand Up @@ -3021,13 +3023,7 @@ void getDatasetItemsByDatasetId() {

var actualEntity = actualResponse.readEntity(DatasetItemPage.class);

assertThat(actualEntity.size()).isEqualTo(items.size());
assertThat(actualEntity.content()).hasSize(items.size());
assertThat(actualEntity.page()).isEqualTo(1);
assertThat(actualEntity.total()).isEqualTo(items.size());
assertThat(actualEntity.columns()).isEqualTo(columns);

assertPage(items.reversed(), actualEntity.content());
assertDatasetItemPage(actualEntity, items.reversed(), columns, 1);
}
}

Expand Down Expand Up @@ -3065,16 +3061,11 @@ void getDatasetItemsByDatasetId__whenDefiningPageSize__thenReturnPageWithLimitRe
.get()) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200);

var actualEntity = actualResponse.readEntity(DatasetItemPage.class);

assertThat(actualEntity.size()).isEqualTo(1);
assertThat(actualEntity.content()).hasSize(1);
assertThat(actualEntity.page()).isEqualTo(1);
assertThat(actualEntity.total()).isEqualTo(items.size());
assertThat(actualEntity.columns()).isEqualTo(columns);
List<DatasetItem> expectedContent = List.of(items.reversed().getFirst());

assertPage(List.of(items.reversed().getFirst()), actualEntity.content());
assertDatasetItemPage(actualEntity, expectedContent, 5, columns, 1);
}
}

Expand Down Expand Up @@ -3125,19 +3116,87 @@ void getDatasetItemsByDatasetId__whenItemsWereUpdated__thenReturnCorrectItemsCou

var actualEntity = actualResponse.readEntity(DatasetItemPage.class);

assertThat(actualEntity.size()).isEqualTo(updatedItems.size());
assertThat(actualEntity.content()).hasSize(updatedItems.size());
assertThat(actualEntity.page()).isEqualTo(1);
assertThat(actualEntity.total()).isEqualTo(updatedItems.size());
assertThat(actualEntity.columns()).isEqualTo(columns);
assertDatasetItemPage(actualEntity, updatedItems.reversed(), columns, 1);
}
}

@Test
@DisplayName("when items have data with same keys and different types, then return columns types and count")
void getDatasetItemsByDatasetId__whenItemsHaveDataWithSameKeysAndDifferentTypes__thenReturnColumnsTypesAndCount() {

UUID datasetId = createAndAssert(factory.manufacturePojo(Dataset.class).toBuilder()
.id(null)
.build());

var item = factory.manufacturePojo(DatasetItem.class);

var item2 = item.toBuilder()
.id(factory.manufacturePojo(UUID.class))
.data(item.data()
.keySet()
.stream()
.map(key -> Map.entry(key, NullNode.getInstance()))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue)))
.build();

var item3 = item.toBuilder()
.id(factory.manufacturePojo(UUID.class))
.data(item.data()
.keySet()
.stream()
.map(key -> Map.entry(key, TextNode.valueOf(RandomStringUtils.randomAlphanumeric(10))))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue)))
.build();

var batch = factory.manufacturePojo(DatasetItemBatch.class).toBuilder()
.items(List.of(item, item2, item3))
.datasetId(datasetId)
.build();

List<Map<String, JsonNode>> data = batch.items()
.stream()
.map(DatasetItem::data)
.toList();

assertPage(updatedItems.reversed(), actualEntity.content());
Set<Column> columns = addDeprecatedFields(data);

putAndAssert(batch, TEST_WORKSPACE, API_KEY);

try (var actualResponse = client.target(BASE_RESOURCE_URI.formatted(baseURI))
.path(datasetId.toString())
.path("items")
.request()
.header(HttpHeaders.AUTHORIZATION, API_KEY)
.header(WORKSPACE_HEADER, TEST_WORKSPACE)
.get()) {

assertThat(actualResponse.getStatusInfo().getStatusCode()).isEqualTo(200);

var actualEntity = actualResponse.readEntity(DatasetItemPage.class);

assertDatasetItemPage(actualEntity, batch.items().reversed(), columns, 1);
}

}
}

private void assertDatasetItemPage(DatasetItemPage actualPage, List<DatasetItem> expected, Set<Column> columns,
int page) {
assertDatasetItemPage(actualPage, expected, expected.size(), columns, page);
}

private static void assertPage(List<DatasetItem> expectedItems, List<DatasetItem> actualItems) {
private void assertDatasetItemPage(DatasetItemPage actualPage, List<DatasetItem> expected, int total,
Set<Column> columns, int page) {
assertThat(actualPage.size()).isEqualTo(expected.size());
assertThat(actualPage.content()).hasSize(expected.size());
assertThat(actualPage.page()).isEqualTo(page);
assertThat(actualPage.total()).isEqualTo(total);
assertThat(actualPage.columns()).isEqualTo(columns);

assertPage(expected, actualPage.content());
}

private void assertPage(List<DatasetItem> expectedItems, List<DatasetItem> actualItems) {

List<String> ignoredFields = new ArrayList<>(Arrays.asList(IGNORED_FIELDS_DATA_ITEM));
ignoredFields.add("data");
Expand Down Expand Up @@ -3449,13 +3508,9 @@ void find__whenFilteringBySupportedFields__thenReturnMatchingRows(Filter filter)

var actualPage = actualResponse.readEntity(DatasetItemPage.class);

assertThat(actualPage.size()).isEqualTo(1);
assertThat(actualPage.total()).isEqualTo(1);
assertThat(actualPage.page()).isEqualTo(1);
assertThat(actualPage.content()).hasSize(1);
assertThat(actualPage.columns()).isEqualTo(columns);
assertDatasetItemPage(actualPage, List.of(items.getFirst()), columns, 1);

assertDatasetItemPage(actualPage, items, experimentItems);
assertDatasetItemExperiments(actualPage, items, experimentItems);
}
}

Expand Down Expand Up @@ -3716,16 +3771,16 @@ void setUp() {

Mono.from(clickhouseConnectionFactory.create())
.flatMap(connection -> Mono.from(connection.createStatement("""
INSERT INTO %s.%s (
id,
input,
expected_output,
metadata,
source,
dataset_id,
workspace_id
) VALUES (:id, :input, :expected_output, :metadata, :source, :dataset_id, :workspace_id)
""".formatted(DATABASE_NAME, "dataset_items"))
INSERT INTO %s.%s (
id,
input,
expected_output,
metadata,
source,
dataset_id,
workspace_id
) VALUES (:id, :input, :expected_output, :metadata, :source, :dataset_id, :workspace_id)
""".formatted(DATABASE_NAME, "dataset_items"))
.bind("id", datasetItem.id())
.bind("input", datasetItem.input().toString())
.bind("expected_output", datasetItem.expectedOutput().toString())
Expand Down Expand Up @@ -3756,24 +3811,31 @@ void findById__whenDatasetItemNotMigrated__thenReturnDatasetItemWithData() {
}
}

private static Set<Column> addDeprecatedFields(List<Map<String, JsonNode>> data) {
private Set<Column> addDeprecatedFields(List<Map<String, JsonNode>> data) {

HashSet<Column> columns = data
.stream()
.map(Map::entrySet)
.flatMap(Collection::stream)
.map(entry -> new Column(entry.getKey(),
StringUtils.capitalize(entry.getValue().getNodeType().name().toLowerCase())))
.map(entry -> new Column(
entry.getKey(),
Set.of(StringUtils.capitalize(entry.getValue().getNodeType().name().toLowerCase()))))
.collect(Collectors.toCollection(HashSet::new));

columns.add(new Column("input", "Object"));
columns.add(new Column("expected_output", "Object"));
columns.add(new Column("metadata", "Object"));
columns.add(new Column("input", Set.of("Object")));
columns.add(new Column("expected_output", Set.of("Object")));
columns.add(new Column("metadata", Set.of("Object")));

Map<String, Set<String>> results = columns.stream()
.collect(groupingBy(Column::name, mapping(Column::types, flatMapping(Set::stream, toSet()))));

return columns;
return results.entrySet()
.stream()
.map(entry -> new Column(entry.getKey(), entry.getValue()))
.collect(Collectors.toSet());
}

private void assertDatasetItemPage(DatasetItemPage actualPage, List<DatasetItem> items,
private void assertDatasetItemExperiments(DatasetItemPage actualPage, List<DatasetItem> items,
List<ExperimentItem> experimentItems) {

assertPage(List.of(items.getFirst()), List.of(actualPage.content().getFirst()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullAndEmptySource;
import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import ru.vyarus.dropwizard.guice.test.ClientSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import ru.vyarus.dropwizard.guice.test.ClientSupport;
Expand Down
Loading

0 comments on commit fcf24dc

Please sign in to comment.