From 558eabaf6f7cfae67bd0b484fe381ad765a4ee1f Mon Sep 17 00:00:00 2001
From: anjagruenheid <87397397+anjagruenheid@users.noreply.github.com>
Date: Mon, 2 Sep 2024 10:34:28 +0200
Subject: [PATCH 1/3] Initial draft for timestamping DLO generation and
 compaction.

---
 .../openhouse/jobs/spark/DataCompactionSparkApp.java     | 9 +++++++++
 .../generator/OpenHouseDataLayoutStrategyGenerator.java  | 1 +
 .../datalayout/strategy/DataLayoutStrategy.java          | 1 +
 3 files changed, 11 insertions(+)

diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java
index 25fb4128..04c32586 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java
@@ -24,6 +24,7 @@
 @Slf4j
 public class DataCompactionSparkApp extends BaseTableSparkApp {
   private final DataCompactionConfig config;
+  public static final String DATA_LAYOUT_COMPACTION_PROPERTY_KEY = "write.data-layout.compaction";
 
   protected DataCompactionSparkApp(
       String jobId, StateManager stateManager, String fqtn, DataCompactionConfig config) {
@@ -58,6 +59,14 @@ protected void runInner(Operations ops) {
           fileGroupRewriteResult.rewrittenDataFilesCount(),
           fileGroupRewriteResult.rewrittenBytesCount());
     }
+    // Add compaction timestamp to table properties.
+    ops.spark()
+        .sql(
+            String.format(
+                "ALTER TABLE %s SET TBLPROPERTIES ('%s' = '%s')",
+                ops.getTable(fqtn),
+                DATA_LAYOUT_COMPACTION_PROPERTY_KEY,
+                System.currentTimeMillis()));
     METER
         .counterBuilder(AppConstants.ADDED_DATA_FILE_COUNT)
         .build()
diff --git a/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java b/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java
index c7a4bae2..ba1eeca1 100644
--- a/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java
+++ b/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java
@@ -112,6 +112,7 @@ private DataLayoutStrategy generateCompactionStrategy() {
     double reducedFileCountPerComputeGbHr = reducedFileCount / computeGbHr;
     return DataLayoutStrategy.builder()
         .config(configBuilder.build())
+        .computationTimestamp(System.currentTimeMillis())
         .cost(computeGbHr)
         .gain(reducedFileCount)
         .score(reducedFileCountPerComputeGbHr)
diff --git a/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/strategy/DataLayoutStrategy.java b/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/strategy/DataLayoutStrategy.java
index 93f31079..1c86acb8 100644
--- a/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/strategy/DataLayoutStrategy.java
+++ b/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/strategy/DataLayoutStrategy.java
@@ -11,6 +11,7 @@
 @Data
 @Builder
 public class DataLayoutStrategy {
+  private final long computationTimestamp;
   private final double score;
   private final double entropy;
   private final double cost;

From 8136a0a982b855b085efeb3156610cc989a49218 Mon Sep 17 00:00:00 2001
From: anjagruenheid <87397397+anjagruenheid@users.noreply.github.com>
Date: Mon, 2 Sep 2024 10:41:22 +0200
Subject: [PATCH 2/3] Merge main branch.

---
 .../storage/configs/StorageProperties.java    | 11 +++
 .../storage/selector/BaseStorageSelector.java | 10 +++
 .../storage/selector/StorageSelector.java     | 31 ++++++++
 .../selector/StorageSelectorConfig.java       | 58 +++++++++++++++
 .../selector/impl/DefaultStorageSelector.java | 30 ++++++++
 .../selector/StorageSelectorConfigTest.java   | 73 +++++++++++++++++++
 .../OpenHouseDataLayoutStrategyGenerator.java | 42 +++++++----
 .../storage/StoragePropertiesConfigTest.java  | 16 +++-
 .../resources/cluster-test-properties.yaml    |  5 ++
 9 files changed, 260 insertions(+), 16 deletions(-)
 create mode 100644 cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/BaseStorageSelector.java
 create mode 100644 cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelector.java
 create mode 100644 cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfig.java
 create mode 100644 cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/impl/DefaultStorageSelector.java
 create mode 100644 cluster/storage/src/test/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfigTest.java

diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/configs/StorageProperties.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/configs/StorageProperties.java
index b363ee0d..493f3938 100644
--- a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/configs/StorageProperties.java
+++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/configs/StorageProperties.java
@@ -30,6 +30,7 @@
 public class StorageProperties {
   private String defaultType;
   private Map<String, StorageTypeProperties> types;
+  private StorageSelectorProperties storageSelector;
 
   @Getter
   @Setter
@@ -41,4 +42,14 @@ public static class StorageTypeProperties {
     private String endpoint;
     @Builder.Default private Map<String, String> parameters = new HashMap<>();
   }
+
+  @Getter
+  @Setter
+  @AllArgsConstructor
+  @NoArgsConstructor
+  @Builder(toBuilder = true)
+  public static class StorageSelectorProperties {
+    private String name;
+    @Builder.Default private Map<String, String> parameters = new HashMap<>();
+  }
 }
diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/BaseStorageSelector.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/BaseStorageSelector.java
new file mode 100644
index 00000000..407a6024
--- /dev/null
+++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/BaseStorageSelector.java
@@ -0,0 +1,10 @@
+package com.linkedin.openhouse.cluster.storage.selector;
+
+/** An abstract class for all Storage selectors */
+public abstract class BaseStorageSelector implements StorageSelector {
+
+  @Override
+  public String getName() {
+    return this.getClass().getSimpleName();
+  }
+}
diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelector.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelector.java
new file mode 100644
index 00000000..fb754311
--- /dev/null
+++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelector.java
@@ -0,0 +1,31 @@
+package com.linkedin.openhouse.cluster.storage.selector;
+
+import com.linkedin.openhouse.cluster.storage.Storage;
+import com.linkedin.openhouse.cluster.storage.selector.impl.DefaultStorageSelector;
+
+/**
+ * The StorageSelector interface provides a way to select storage per table
+ *
+ * <p>Implementations of this interface can choose to return different storages for different db and
+ * tables For Example {@link DefaultStorageSelector} returns the cluster level storage for all
+ * tables
+ */
+public interface StorageSelector {
+  /**
+   * Select the storage for given db and table. This call should be idempotent, same db and table
+   * should return same storage
+   *
+   * @param db
+   * @param table
+   * @return {@link Storage}
+   */
+  Storage selectStorage(String db, String table);
+
+  /**
+   * Get the Simple class name of the implementing class. This name will be used to configure the
+   * Storage Selector from cluster.yaml
+   *
+   * @return Simple name of implementing class
+   */
+  String getName();
+}
diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfig.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfig.java
new file mode 100644
index 00000000..ef12cc77
--- /dev/null
+++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfig.java
@@ -0,0 +1,58 @@
+package com.linkedin.openhouse.cluster.storage.selector;
+
+import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
+import com.linkedin.openhouse.cluster.storage.selector.impl.DefaultStorageSelector;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+
+/**
+ * Configures the StorageSelector bean for storage-selector configured in {@link StorageProperties}
+ * The return value of the bean is the {@link
+ * com.linkedin.openhouse.cluster.storage.selector.StorageSelector} implementation that matches the
+ * name in storage-selector or is null if the storage selector is not configured.
+ */
+@Slf4j
+@Configuration
+public class StorageSelectorConfig {
+
+  @Autowired StorageProperties storageProperties;
+
+  @Autowired List<StorageSelector> storageSelectors;
+
+  @Autowired DefaultStorageSelector defaultStorageSelector;
+
+  /**
+   * Checks the name of storage-selector from {@link StorageProperties} against all implementations
+   * of {@link com.linkedin.openhouse.cluster.storage.selector.StorageSelector} and returns the
+   * implementation that matches the name. Returns {@link DefaultStorageSelector}if not configured
+   *
+   * @return
+   */
+  @Bean
+  @Primary
+  StorageSelector provideStorageSelector() {
+
+    // If storage selector or its name is not configured. Return DefaultStorageSelector.
+    if (storageProperties.getStorageSelector() == null
+        || storageProperties.getStorageSelector().getName() == null) {
+      log.info(
+          "storage selector or its name is not configured. Defaulting to {}",
+          DefaultStorageSelector.class.getSimpleName());
+      return defaultStorageSelector;
+    }
+
+    String selectorName = storageProperties.getStorageSelector().getName();
+    for (StorageSelector selector : storageSelectors) {
+      if (selectorName.equals(selector.getName())) {
+        log.info("Found Storage Selector {}", selectorName);
+        return selector;
+      }
+    }
+
+    throw new IllegalArgumentException("Could not find Storage selector with name=" + selectorName);
+  }
+}
diff --git a/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/impl/DefaultStorageSelector.java b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/impl/DefaultStorageSelector.java
new file mode 100644
index 00000000..44403596
--- /dev/null
+++ b/cluster/storage/src/main/java/com/linkedin/openhouse/cluster/storage/selector/impl/DefaultStorageSelector.java
@@ -0,0 +1,30 @@
+package com.linkedin.openhouse.cluster.storage.selector.impl;
+
+import com.linkedin.openhouse.cluster.storage.Storage;
+import com.linkedin.openhouse.cluster.storage.StorageManager;
+import com.linkedin.openhouse.cluster.storage.selector.BaseStorageSelector;
+import com.linkedin.openhouse.cluster.storage.selector.StorageSelector;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * An implementation of {@link StorageSelector} that returns storage that's marked as default-type
+ * for the cluster in yaml configuration for all tables
+ */
+@Component
+public class DefaultStorageSelector extends BaseStorageSelector {
+
+  @Autowired private StorageManager storageManager;
+
+  /**
+   * Get default-type storage for all tables
+   *
+   * @param db
+   * @param table
+   * @return {@link Storage}
+   */
+  @Override
+  public Storage selectStorage(String db, String table) {
+    return storageManager.getDefaultStorage();
+  }
+}
diff --git a/cluster/storage/src/test/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfigTest.java b/cluster/storage/src/test/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfigTest.java
new file mode 100644
index 00000000..455bf5c6
--- /dev/null
+++ b/cluster/storage/src/test/java/com/linkedin/openhouse/cluster/storage/selector/StorageSelectorConfigTest.java
@@ -0,0 +1,73 @@
+package com.linkedin.openhouse.cluster.storage.selector;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.*;
+
+import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
+import com.linkedin.openhouse.cluster.storage.selector.impl.DefaultStorageSelector;
+import java.util.Arrays;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+public class StorageSelectorConfigTest {
+
+  @Mock private StorageProperties storageProperties;
+
+  @Mock private StorageProperties.StorageSelectorProperties storageSelectorProperties;
+
+  @Mock private StorageSelector selector1;
+
+  @Mock private StorageSelector selector2;
+
+  @Mock private DefaultStorageSelector defaultStorageSelector;
+
+  @InjectMocks private StorageSelectorConfig storageSelectorConfig;
+
+  @BeforeEach
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+  }
+
+  @Test
+  public void testProvideStorageSelector_Found() {
+    when(storageSelectorProperties.getName()).thenReturn("selector1");
+    when(storageSelectorProperties.getParameters())
+        .thenReturn(ImmutableMap.of("key1", "value1", "key2", "value2"));
+    when(storageProperties.getStorageSelector()).thenReturn(storageSelectorProperties);
+    when(selector1.getName()).thenReturn("selector1");
+    when(selector2.getName()).thenReturn("selector2");
+    storageSelectorConfig.storageSelectors = Arrays.asList(selector1, selector2);
+
+    assertEquals(selector1, storageSelectorConfig.provideStorageSelector());
+  }
+
+  @Test
+  public void testProvideStorageSelectorNotFound() {
+    when(storageSelectorProperties.getName()).thenReturn("selector3");
+    when(storageProperties.getStorageSelector()).thenReturn(storageSelectorProperties);
+    when(selector1.getName()).thenReturn("selector1");
+    when(selector2.getName()).thenReturn("selector2");
+    storageSelectorConfig.storageSelectors = Arrays.asList(selector1, selector2);
+
+    assertThrows(
+        IllegalArgumentException.class, () -> storageSelectorConfig.provideStorageSelector());
+  }
+
+  @Test
+  public void testProvideStorageSelectorMissingConfig() {
+    when(selector1.getName()).thenReturn("selector1");
+    when(selector2.getName()).thenReturn("selector2");
+    when(defaultStorageSelector.getName()).thenReturn(DefaultStorageSelector.class.getSimpleName());
+    storageSelectorConfig.defaultStorageSelector = defaultStorageSelector;
+    storageSelectorConfig.storageSelectors = Arrays.asList(selector1, selector2);
+
+    assertEquals(
+        DefaultStorageSelector.class.getSimpleName(),
+        storageSelectorConfig.provideStorageSelector().getName());
+  }
+}
diff --git a/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java b/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java
index ba1eeca1..09a23922 100644
--- a/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java
+++ b/libs/datalayout/src/main/java/com/linkedin/openhouse/datalayout/generator/OpenHouseDataLayoutStrategyGenerator.java
@@ -8,6 +8,7 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import lombok.Builder;
 import org.apache.spark.api.java.function.FilterFunction;
 import org.apache.spark.api.java.function.MapFunction;
@@ -43,7 +44,9 @@ public class OpenHouseDataLayoutStrategyGenerator implements DataLayoutStrategyG
    */
   @Override
   public List<DataLayoutStrategy> generate() {
-    return Collections.singletonList(generateCompactionStrategy());
+    return generateCompactionStrategy()
+        .map(Collections::singletonList)
+        .orElse(Collections.emptyList());
   }
 
   /**
@@ -57,17 +60,25 @@ public List<DataLayoutStrategy> generate() {
    *       compute, the higher the score, the better the strategy
    * </ul>
    */
-  private DataLayoutStrategy generateCompactionStrategy() {
+  private Optional<DataLayoutStrategy> generateCompactionStrategy() {
     // Retrieve file sizes of all data files.
     Dataset<Long> fileSizes =
         tableFileStats.get().map((MapFunction<FileStat, Long>) FileStat::getSize, Encoders.LONG());
 
+    Dataset<Long> filteredSizes =
+        fileSizes.filter(
+            (FilterFunction<Long>)
+                size ->
+                    size < TARGET_BYTES_SIZE * DataCompactionConfig.MIN_BYTE_SIZE_RATIO_DEFAULT);
+    // Check whether we have anything to map/reduce on for cost computation, this is only the case
+    // if we have small files that need to be compacted.
+    if (filteredSizes.count() == 0) {
+      return Optional.empty();
+    }
+
+    // Traits computation (cost, gain, and entropy).
     Tuple2<Long, Integer> fileStats =
-        fileSizes
-            .filter(
-                (FilterFunction<Long>)
-                    size ->
-                        size < TARGET_BYTES_SIZE * DataCompactionConfig.MIN_BYTE_SIZE_RATIO_DEFAULT)
+        filteredSizes
             .map(
                 (MapFunction<Long, Tuple2<Long, Integer>>) size -> new Tuple2<>(size, 1),
                 Encoders.tuple(Encoders.LONG(), Encoders.INT()))
@@ -110,14 +121,15 @@ private DataLayoutStrategy generateCompactionStrategy() {
     double computeGbHr = estimateComputeGbHr(rewriteFileBytes);
     // computeGbHr >= COMPUTE_STARTUP_COST_GB_HR
     double reducedFileCountPerComputeGbHr = reducedFileCount / computeGbHr;
-    return DataLayoutStrategy.builder()
-        .config(configBuilder.build())
-        .computationTimestamp(System.currentTimeMillis())
-        .cost(computeGbHr)
-        .gain(reducedFileCount)
-        .score(reducedFileCountPerComputeGbHr)
-        .entropy(computeEntropy(fileSizes))
-        .build();
+    return Optional.of(
+        DataLayoutStrategy.builder()
+            .config(configBuilder.build())
+            .computationTimestamp(System.currentTimeMillis())
+            .cost(computeGbHr)
+            .gain(reducedFileCount)
+            .score(reducedFileCountPerComputeGbHr)
+            .entropy(computeEntropy(fileSizes))
+            .build());
   }
 
   private long estimateReducedFileCount(long rewriteFileBytes, int rewriteFileCount) {
diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/StoragePropertiesConfigTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/StoragePropertiesConfigTest.java
index c8956100..a7f90ece 100644
--- a/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/StoragePropertiesConfigTest.java
+++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/mock/storage/StoragePropertiesConfigTest.java
@@ -2,6 +2,7 @@
 
 import com.linkedin.openhouse.cluster.storage.StorageManager;
 import com.linkedin.openhouse.cluster.storage.configs.StorageProperties;
+import com.linkedin.openhouse.cluster.storage.selector.impl.DefaultStorageSelector;
 import com.linkedin.openhouse.tables.mock.properties.CustomClusterPropertiesInitializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
@@ -17,7 +18,6 @@ public class StoragePropertiesConfigTest {
   @Autowired private StorageProperties storageProperties;
 
   @MockBean private StorageManager storageManager;
-
   private static final String DEFAULT_TYPE = "hdfs";
 
   private static final String DEFAULT_ENDPOINT = "hdfs://localhost:9000";
@@ -55,6 +55,20 @@ public void testUnsetPropertiesAreNull() {
     Assertions.assertNull(storageProperties.getTypes().get(NON_EXISTING_TYPE));
   }
 
+  @Test
+  public void testStorageSelector() {
+    Assertions.assertNotNull(storageProperties.getStorageSelector());
+    Assertions.assertEquals(
+        storageProperties.getStorageSelector().getName(),
+        DefaultStorageSelector.class.getSimpleName());
+    Assertions.assertNotNull(storageProperties.getStorageSelector().getParameters());
+    Assertions.assertEquals(storageProperties.getStorageSelector().getParameters().size(), 2);
+    Assertions.assertEquals(
+        storageProperties.getStorageSelector().getParameters().get("prop1"), "value1");
+    Assertions.assertEquals(
+        storageProperties.getStorageSelector().getParameters().get("prop2"), "value2");
+  }
+
   @AfterAll
   static void unsetSysProp() {
     System.clearProperty("OPENHOUSE_CLUSTER_CONFIG_PATH");
diff --git a/services/tables/src/test/resources/cluster-test-properties.yaml b/services/tables/src/test/resources/cluster-test-properties.yaml
index d4a6e571..3ff0dce6 100644
--- a/services/tables/src/test/resources/cluster-test-properties.yaml
+++ b/services/tables/src/test/resources/cluster-test-properties.yaml
@@ -16,6 +16,11 @@ cluster:
         parameters:
           key2: value2
           token: xyz
+    storage-selector:
+      name: "DefaultStorageSelector"
+      parameters:
+        prop1: "value1"
+        prop2: "value2"
   housetables:
     base-uri: "http://localhost:8080"
   tables:

From 7dcc3ba863f96ed641d13bc8123b8fbad6b2295e Mon Sep 17 00:00:00 2001
From: anjagruenheid <87397397+anjagruenheid@users.noreply.github.com>
Date: Mon, 2 Sep 2024 11:52:41 +0200
Subject: [PATCH 3/3] Formalize compaction event for table properties.

---
 .../jobs/spark/DataCompactionSparkApp.java      | 17 ++++++++++++++++-
 .../openhouse/jobs/util/CompactionEvent.java    | 14 ++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)
 create mode 100644 apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/CompactionEvent.java

diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java
index 04c32586..1ae51fb1 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/DataCompactionSparkApp.java
@@ -1,15 +1,21 @@
 package com.linkedin.openhouse.jobs.spark;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
 import com.linkedin.openhouse.datalayout.config.DataCompactionConfig;
 import com.linkedin.openhouse.jobs.spark.state.StateManager;
 import com.linkedin.openhouse.jobs.util.AppConstants;
+import com.linkedin.openhouse.jobs.util.CompactionEvent;
 import io.opentelemetry.api.common.AttributeKey;
 import io.opentelemetry.api.common.Attributes;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.iceberg.actions.RewriteDataFiles;
 
@@ -60,13 +66,22 @@ protected void runInner(Operations ops) {
           fileGroupRewriteResult.rewrittenBytesCount());
     }
     // Add compaction timestamp to table properties.
+    CompactionEvent event =
+        CompactionEvent.builder()
+            .compactionTimestamp(System.currentTimeMillis())
+            .addedDataFilesCount(result.addedDataFilesCount())
+            .rewrittenDataFilesCount(result.rewrittenDataFilesCount())
+            .rewrittenBytesCount(result.rewrittenBytesCount())
+            .build();
+    Gson gson = new GsonBuilder().create();
+    Type type = new TypeToken<CompactionEvent>() {}.getType();
     ops.spark()
         .sql(
             String.format(
                 "ALTER TABLE %s SET TBLPROPERTIES ('%s' = '%s')",
                 ops.getTable(fqtn),
                 DATA_LAYOUT_COMPACTION_PROPERTY_KEY,
-                System.currentTimeMillis()));
+                StringEscapeUtils.escapeJava(gson.toJson(event, type))));
     METER
         .counterBuilder(AppConstants.ADDED_DATA_FILE_COUNT)
         .build()
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/CompactionEvent.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/CompactionEvent.java
new file mode 100644
index 00000000..27b1446c
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/CompactionEvent.java
@@ -0,0 +1,14 @@
+package com.linkedin.openhouse.jobs.util;
+
+import lombok.Builder;
+import lombok.Data;
+
+/** Compaction event to be logged as part of the table properties. */
+@Data
+@Builder
+public class CompactionEvent {
+  private final long compactionTimestamp;
+  private final long addedDataFilesCount;
+  private final long rewrittenDataFilesCount;
+  private final long rewrittenBytesCount;
+}