From 2a8d67bb6e2c3d0b5693644992c2446063c4bc8f Mon Sep 17 00:00:00 2001
From: englefly <zhouminghong@selectdb.com>
Date: Fri, 27 Dec 2024 18:09:41 +0800
Subject: [PATCH 1/2] push encode col

---
 .../org/apache/doris/mtmv/MTMVPlanUtil.java   |   4 +-
 .../doris/nereids/jobs/executor/Rewriter.java |  19 +-
 .../nereids/properties/LogicalProperties.java |   8 +-
 .../apache/doris/nereids/rules/RuleType.java  |   4 +
 .../rules/analysis/CompressedMaterialize.java |  52 ++
 .../rules/rewrite/DecoupleEncodeDecode.java   |  77 ++
 .../rules/rewrite/PullUpJoinFromUnionAll.java |  15 +-
 .../PullUpProjectBetweenTopNAndAgg.java       | 106 +++
 .../rules/rewrite/PushDownEncodeSlot.java     | 660 ++++++++++++++++++
 .../rewrite/PushDownFilterThroughProject.java |   4 +-
 .../rules/rewrite/SimplifyEncodeDecode.java   |   4 +-
 .../trees/expressions/WindowExpression.java   |   4 +-
 .../functions/scalar/EncodeAsBigInt.java      |   4 +-
 .../functions/scalar/EncodeAsInt.java         |   4 +-
 .../functions/scalar/EncodeAsLargeInt.java    |   4 +-
 .../functions/scalar/EncodeAsSmallInt.java    |   4 +-
 ...odeStrToInteger.java => EncodeString.java} |  11 +-
 .../plans/physical/PhysicalHashAggregate.java |   9 +
 .../doris/nereids/util/ExpressionUtils.java   |   4 +-
 .../apache/doris/nereids/util/PlanUtils.java  |   4 +-
 .../org/apache/doris/qe/SessionVariable.java  |   2 +-
 .../compress_materialize.out                  |  18 +
 .../compress_materialize/pushdown_encode.out  |  36 +
 .../compress_materialize.groovy               |   2 +-
 .../pushdown_encode.groovy                    | 293 ++++++++
 25 files changed, 1317 insertions(+), 35 deletions(-)
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecoupleEncodeDecode.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectBetweenTopNAndAgg.java
 create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownEncodeSlot.java
 rename fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/{EncodeStrToInteger.java => EncodeString.java} (71%)
 create mode 100644 regression-test/data/nereids_p0/compress_materialize/pushdown_encode.out
 create mode 100644 regression-test/suites/nereids_p0/compress_materialize/pushdown_encode.groovy

diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
index 3264d6627ead5d..585d9d24830217 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
@@ -62,7 +62,9 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
         ctx.getSessionVariable().allowModifyMaterializedViewData = true;
         // Disable add default limit rule to avoid refresh data wrong
         ctx.getSessionVariable().setDisableNereidsRules(
-                String.join(",", ImmutableSet.of(RuleType.ADD_DEFAULT_LIMIT.name())));
+                String.join(",", ImmutableSet.of(
+                        "COMPRESSED_MATERIALIZE_AGG", "COMPRESSED_MATERIALIZE_SORT",
+                        RuleType.ADD_DEFAULT_LIMIT.name())));
         Optional<String> workloadGroup = mtmv.getWorkloadGroup();
         if (workloadGroup.isPresent()) {
             ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index 5b276258263f37..a78d9ab232a58a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -55,6 +55,7 @@
 import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite;
 import org.apache.doris.nereids.rules.rewrite.CountLiteralRewrite;
 import org.apache.doris.nereids.rules.rewrite.CreatePartitionTopNFromWindow;
+import org.apache.doris.nereids.rules.rewrite.DecoupleEncodeDecode;
 import org.apache.doris.nereids.rules.rewrite.DeferMaterializeTopNResult;
 import org.apache.doris.nereids.rules.rewrite.EliminateAggCaseWhen;
 import org.apache.doris.nereids.rules.rewrite.EliminateAggregate;
@@ -106,6 +107,7 @@
 import org.apache.doris.nereids.rules.rewrite.PruneOlapScanTablet;
 import org.apache.doris.nereids.rules.rewrite.PullUpCteAnchor;
 import org.apache.doris.nereids.rules.rewrite.PullUpJoinFromUnionAll;
+import org.apache.doris.nereids.rules.rewrite.PullUpProjectBetweenTopNAndAgg;
 import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderApply;
 import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderLimit;
 import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderTopN;
@@ -115,6 +117,7 @@
 import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOneSide;
 import org.apache.doris.nereids.rules.rewrite.PushDownAggWithDistinctThroughJoinOneSide;
 import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
+import org.apache.doris.nereids.rules.rewrite.PushDownEncodeSlot;
 import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject;
 import org.apache.doris.nereids.rules.rewrite.PushDownLimit;
 import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin;
@@ -253,6 +256,13 @@ public class Rewriter extends AbstractBatchJobExecutor {
                         new CountLiteralRewrite(),
                         new NormalizeSort()
                 ),
+
+                topDown(// must behind NormalizeAggregate/NormalizeSort
+                        new MergeProjects(),
+                        new PushDownEncodeSlot(),
+                        new DecoupleEncodeDecode()
+                ),
+
                 topic("Window analysis",
                         topDown(
                                 new ExtractAndNormalizeWindowExpression(),
@@ -372,9 +382,11 @@ public class Rewriter extends AbstractBatchJobExecutor {
                         //       generate one PhysicalLimit if current distribution is gather or two
                         //       PhysicalLimits with gather exchange
                         topDown(new LimitSortToTopN()),
-                        topDown(new SimplifyEncodeDecode()),
-                        topDown(new LimitAggToTopNAgg()),
                         topDown(new MergeTopNs()),
+                        topDown(new SimplifyEncodeDecode(),
+                                new MergeProjects()
+                        ),
+                        topDown(new LimitAggToTopNAgg()),
                         topDown(new SplitLimit()),
                         topDown(
                                 new PushDownLimit(),
@@ -466,6 +478,9 @@ public class Rewriter extends AbstractBatchJobExecutor {
                         custom(RuleType.ADD_PROJECT_FOR_JOIN, AddProjectForJoin::new),
                         topDown(new MergeProjects())
                 ),
+                topic("Adjust topN project",
+                        topDown(new MergeProjects(),
+                                new PullUpProjectBetweenTopNAndAgg())),
                 // this rule batch must keep at the end of rewrite to do some plan check
                 topic("Final rewrite and check",
                         custom(RuleType.CHECK_DATA_TYPES, CheckDataTypes::new),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/LogicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/LogicalProperties.java
index 2cb472ce975f31..3ac5a38c914367 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/LogicalProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/LogicalProperties.java
@@ -44,11 +44,6 @@ public class LogicalProperties {
     protected final Supplier<DataTrait> dataTraitSupplier;
     private Integer hashCode = null;
 
-    public LogicalProperties(Supplier<List<Slot>> outputSupplier,
-            Supplier<DataTrait> dataTraitSupplier) {
-        this(outputSupplier, dataTraitSupplier, ImmutableList::of);
-    }
-
     /**
      * constructor of LogicalProperties.
      *
@@ -56,8 +51,7 @@ public LogicalProperties(Supplier<List<Slot>> outputSupplier,
      *                       throw exception for which children have UnboundRelation
      */
     public LogicalProperties(Supplier<List<Slot>> outputSupplier,
-            Supplier<DataTrait> dataTraitSupplier,
-            Supplier<List<Slot>> nonUserVisibleOutputSupplier) {
+            Supplier<DataTrait> dataTraitSupplier) {
         this.outputSupplier = Suppliers.memoize(
                 Objects.requireNonNull(outputSupplier, "outputSupplier can not be null")
         );
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 86d0495b851bd2..06d631260766ed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -114,6 +114,10 @@ public enum RuleType {
     // rewrite rules
     COMPRESSED_MATERIALIZE_AGG(RuleTypeClass.REWRITE),
     COMPRESSED_MATERIALIZE_SORT(RuleTypeClass.REWRITE),
+    COMPRESSED_MATERIALIZE_REPEAT(RuleTypeClass.REWRITE),
+    PUSH_DOWN_ENCODE_SLOT(RuleTypeClass.REWRITE),
+    ADJUST_TOPN_PROJECT(RuleTypeClass.REWRITE),
+    DECOUPLE_DECODE_ENCODE_SLOT(RuleTypeClass.REWRITE),
     SIMPLIFY_ENCODE_DECODE(RuleTypeClass.REWRITE),
     NORMALIZE_AGGREGATE(RuleTypeClass.REWRITE),
     NORMALIZE_SORT(RuleTypeClass.REWRITE),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CompressedMaterialize.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CompressedMaterialize.java
index 814269a63984a0..3779b95fca7b3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CompressedMaterialize.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CompressedMaterialize.java
@@ -31,6 +31,7 @@
 import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeAsSmallInt;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
 import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.types.coercion.CharacterType;
@@ -101,6 +102,9 @@ private LogicalSort<Plan> compressMaterializeSort(LogicalSort<Plan> sort) {
     }
 
     private Optional<Expression> getEncodeExpression(Expression expression) {
+        if (expression.isConstant()) {
+            return Optional.empty();
+        }
         DataType type = expression.getDataType();
         Expression encodeExpr = null;
         if (type instanceof CharacterType) {
@@ -169,4 +173,52 @@ private LogicalAggregate<Plan> compressedMaterializeAggregate(LogicalAggregate<P
         }
         return aggregate;
     }
+
+    private Map<Expression, Expression> getEncodeGroupingSets(LogicalRepeat<Plan> repeat) {
+        Map<Expression, Expression> encode = Maps.newHashMap();
+        // the first grouping set contains all group by keys
+        for (Expression gb : repeat.getGroupingSets().get(0)) {
+            Optional<Expression> encodeExpr = getEncodeExpression(gb);
+            encodeExpr.ifPresent(expression -> encode.put(gb, expression));
+        }
+        return encode;
+    }
+
+    private LogicalRepeat<Plan> compressMaterializeRepeat(LogicalRepeat<Plan> repeat) {
+        Map<Expression, Expression> encode = getEncodeGroupingSets(repeat);
+        if (encode.isEmpty()) {
+            return repeat;
+        }
+        List<List<Expression>> newGroupingSets = Lists.newArrayList();
+        for (int i = 0; i < repeat.getGroupingSets().size(); i++) {
+            List<Expression> grouping = Lists.newArrayList();
+            for (int j = 0; j < repeat.getGroupingSets().get(i).size(); j++) {
+                Expression groupingExpr = repeat.getGroupingSets().get(i).get(j);
+                grouping.add(encode.getOrDefault(groupingExpr, groupingExpr));
+            }
+            newGroupingSets.add(grouping);
+        }
+        List<NamedExpression> newOutputs = Lists.newArrayList();
+        Map<Expression, Expression> decodeMap = new HashMap<>();
+        for (Expression gp : encode.keySet()) {
+            decodeMap.put(gp, new DecodeAsVarchar(encode.get(gp)));
+        }
+        for (NamedExpression out : repeat.getOutputExpressions()) {
+            Expression replaced = ExpressionUtils.replace(out, decodeMap);
+            if (out != replaced) {
+                if (out instanceof SlotReference) {
+                    newOutputs.add(new Alias(out.getExprId(), replaced, out.getName()));
+                } else if (out instanceof Alias) {
+                    newOutputs.add(((Alias) out).withChildren(replaced.children()));
+                } else {
+                    // should not reach here
+                    Preconditions.checkArgument(false, "output abnormal: " + repeat);
+                }
+            } else {
+                newOutputs.add(out);
+            }
+        }
+        repeat = repeat.withGroupSetsAndOutput(newGroupingSets, newOutputs);
+        return repeat;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecoupleEncodeDecode.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecoupleEncodeDecode.java
new file mode 100644
index 00000000000000..6c4f1aa3808691
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DecoupleEncodeDecode.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.DecodeAsVarchar;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeString;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * in project:
+ * decode_as_varchar(encode_as_xxx(v)) => v
+ */
+public class DecoupleEncodeDecode extends OneRewriteRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalProject()
+                .when(topN -> ConnectContext.get() != null
+                        && ConnectContext.get().getSessionVariable().enableCompressMaterialize)
+                .then(this::rewrite)
+                .toRule(RuleType.DECOUPLE_DECODE_ENCODE_SLOT);
+    }
+
+    private LogicalProject<?> rewrite(LogicalProject<?> project) {
+        List<NamedExpression> newProjections = Lists.newArrayList();
+        boolean hasNewProjections = false;
+        for (NamedExpression e : project.getProjects()) {
+            boolean changed = false;
+            if (e instanceof Alias) {
+                Alias alias = (Alias) e;
+                Expression body = alias.child();
+                if (body instanceof DecodeAsVarchar && body.child(0) instanceof EncodeString) {
+                    Expression encodeBody = body.child(0).child(0);
+                    newProjections.add((NamedExpression) alias.withChildren(encodeBody));
+                    changed = true;
+                } else if (body instanceof EncodeString && body.child(0) instanceof DecodeAsVarchar) {
+                    Expression decodeBody = body.child(0).child(0);
+                    newProjections.add((NamedExpression) alias.withChildren(decodeBody));
+                    changed = true;
+                }
+            }
+            if (!changed) {
+                newProjections.add(e);
+                hasNewProjections = true;
+            }
+        }
+        if (hasNewProjections) {
+            project = project.withProjects(newProjections);
+        }
+        return project;
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpJoinFromUnionAll.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpJoinFromUnionAll.java
index b3df9b92c56a84..8c21b12a6d8fa1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpJoinFromUnionAll.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpJoinFromUnionAll.java
@@ -586,10 +586,17 @@ boolean comparePlan(Plan plan1, Plan plan2) {
                     isEqual = false;
                 }
                 for (int i = 0; isEqual && i < plan2.getOutput().size(); i++) {
-                    NamedExpression expr = ((LogicalProject<?>) plan1).getProjects().get(i);
-                    NamedExpression replacedExpr = (NamedExpression)
-                            expr.rewriteUp(e -> plan1ToPlan2.getOrDefault(e, e));
-                    if (!replacedExpr.equals(((LogicalProject<?>) plan2).getProjects().get(i))) {
+                    Expression expr1 = ((LogicalProject<?>) plan1).getProjects().get(i);
+                    Expression expr2 = ((LogicalProject<?>) plan2).getProjects().get(i);
+                    if (expr1 instanceof Alias) {
+                        if (!(expr2 instanceof Alias)) {
+                            return false;
+                        }
+                        expr1 = expr1.child(0);
+                        expr2 = expr2.child(0);
+                    }
+                    Expression replacedExpr = expr1.rewriteUp(e -> plan1ToPlan2.getOrDefault(e, e));
+                    if (!replacedExpr.equals(expr2)) {
                         isEqual = false;
                     }
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectBetweenTopNAndAgg.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectBetweenTopNAndAgg.java
new file mode 100644
index 00000000000000..f80c40cff7a98f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectBetweenTopNAndAgg.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
+import org.apache.doris.qe.ConnectContext;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ *
+ * try to reduce shuffle cost of topN operator, used to optimize plan after applying Compress_materialize
+ *
+ * topn(orderKey=[a])
+ *   --> project(a+1 as x, a+2 as y, a)
+ *      --> any(output(a))
+ * =>
+ * project(a+1 as x, a+2 as y, a)
+ *  --> topn(orderKey=[a])
+ *    --> any(output(a))
+ *
+ */
+public class PullUpProjectBetweenTopNAndAgg extends OneRewriteRuleFactory {
+    public static final Logger LOG = LogManager.getLogger(PullUpProjectBetweenTopNAndAgg.class);
+
+    @Override
+    public Rule build() {
+        return logicalTopN(logicalProject(logicalAggregate()))
+                .when(topN -> ConnectContext.get() != null
+                        && ConnectContext.get().getSessionVariable().enableCompressMaterialize)
+                .then(topN -> adjust(topN)).toRule(RuleType.ADJUST_TOPN_PROJECT);
+    }
+
+    private Plan adjust(LogicalTopN<? extends Plan> topN) {
+        LogicalProject<Plan> project = (LogicalProject<Plan>) topN.child();
+        Set<Slot> projectInputSlots = project.getInputSlots();
+        Map<SlotReference, SlotReference> keyAsKey = new HashMap<>();
+        for (NamedExpression proj : project.getProjects()) {
+            if (proj instanceof Alias && ((Alias) proj).child(0) instanceof SlotReference) {
+                keyAsKey.put((SlotReference) ((Alias) proj).toSlot(), (SlotReference) ((Alias) proj).child());
+            }
+        }
+        boolean match = true;
+        List<OrderKey> newOrderKeys = new ArrayList<>();
+        for (OrderKey orderKey : topN.getOrderKeys()) {
+            Expression orderExpr = orderKey.getExpr();
+            if (orderExpr instanceof SlotReference) {
+                if (projectInputSlots.contains(orderExpr)) {
+                    newOrderKeys.add(orderKey);
+                } else if (keyAsKey.containsKey(orderExpr)) {
+                    newOrderKeys.add(orderKey.withExpression(keyAsKey.get(orderExpr)));
+                } else {
+                    match = false;
+                    break;
+                }
+            } else {
+                match = false;
+                break;
+            }
+        }
+        if (match) {
+            if (project.getProjects().size() >= project.getInputSlots().size()) {
+                LOG.info("$$$$ before: project.getProjects() = " + project.getProjects());
+                LOG.info("$$$$ before: project.getInputSlots() = " + project.getInputSlots());
+                LOG.info("$$$$ before: " + topN.treeString());
+                topN = topN.withChildren(project.children()).withOrderKeys(newOrderKeys);
+                project = (LogicalProject<Plan>) project.withChildren(topN);
+                LOG.info("$$$$ after:" + project.treeString());
+                return project;
+            }
+        }
+        return topN;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownEncodeSlot.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownEncodeSlot.java
new file mode 100644
index 00000000000000..fdc4b007cda13c
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownEncodeSlot.java
@@ -0,0 +1,660 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.DecodeAsVarchar;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeString;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLeaf;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.coercion.CharacterType;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * push down encode_as_int(slot) down
+ * example:
+ *   group by x
+ *     -->project(encode_as_int(A) as x)
+ *      -->Any(A)
+ *       -->project(A)
+ *          --> scan
+ *   =>
+ *   group by x
+ *     -->project(x)
+ *      -->Any(x)
+ *       --> project(encode_as_int(A) as x)
+ *          -->scan
+ * Note:
+ * do not push down encode if encode.child() is not slot,
+ * example
+ * group by encode_as_int(A + B)
+ *    --> any(A, B)
+ */
+public class PushDownEncodeSlot extends OneRewriteRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalProject()
+                .when(topN -> ConnectContext.get() != null
+                        && ConnectContext.get().getSessionVariable().enableCompressMaterialize)
+                .whenNot(project -> project.child() instanceof LogicalRepeat)
+                .whenNot(project -> (project.child() instanceof LogicalLeaf))
+                .then(this::pushDownEncodeSlot)
+                .toRule(RuleType.PUSH_DOWN_ENCODE_SLOT);
+    }
+
+    private List<Alias> collectEncodeAliases(LogicalProject<? extends Plan> project) {
+        List<Alias> encodeAliases = new ArrayList<>();
+        Set<Slot> computingSlots = new HashSet<>();
+        for (NamedExpression e : project.getProjects()) {
+            if (e instanceof Alias) {
+                Expression aliasBody = e.child(0);
+                if (!(aliasBody instanceof SlotReference) && !(aliasBody instanceof EncodeString)) {
+                    computingSlots.addAll(e.getInputSlots());
+                }
+            }
+        }
+        for (NamedExpression e : project.getProjects()) {
+            if (e instanceof Alias && e.child(0) instanceof EncodeString
+                    && e.child(0).child(0) instanceof SlotReference
+                    && !computingSlots.contains(e.child(0).child(0))) {
+                encodeAliases.add((Alias) e);
+            }
+        }
+        return encodeAliases;
+    }
+
+    /**
+     * case 1
+     * project(encode(A) as B)
+     *    --> any(A)
+     * =>
+     * project(B)
+     *     -->any(A): push "encode(A) as B"
+     *
+     * case 2
+     * project(A, encode(A) as B)
+     *      -->any(A)
+     * =>
+     * project(decode(B) as A, B)
+     *      -->any(A): push "encode(A) as B"
+     *
+     * case 3
+     * project(A as C, encode(A) as B)
+     *      -->any(A)
+     * =>
+     * project(decode(B) as C, B)
+     *      -->any(A): push "encode(A) as B"
+     */
+    private LogicalProject<? extends Plan> rewriteRootProject(LogicalProject<? extends Plan> project,
+            List<Alias> pushedEncodeAlias) {
+        if (pushedEncodeAlias.isEmpty()) {
+            return project;
+        }
+        Map<Expression, Alias> encodeBodyToEncodeAlias = new HashMap<>();
+        for (Alias alias : pushedEncodeAlias) {
+            Expression encodeBody = alias.child().child(0);
+            encodeBodyToEncodeAlias.put(encodeBody, alias);
+        }
+        List<NamedExpression> projections = Lists.newArrayListWithCapacity(project.getProjects().size());
+        for (NamedExpression e : project.getProjects()) {
+            if (pushedEncodeAlias.contains(e)) {
+                // case 1
+                projections.add(e.toSlot());
+            } else if (encodeBodyToEncodeAlias.containsKey(e)) {
+                // case 2
+                ExprId id = e.getExprId();
+                DecodeAsVarchar decode = new DecodeAsVarchar(encodeBodyToEncodeAlias.get(e).toSlot());
+                Alias alias = new Alias(id, decode, decode.toSql());
+                projections.add(alias);
+            } else if (e instanceof Alias && encodeBodyToEncodeAlias.containsKey(e.child(0))) {
+                // case 3
+                Alias alias = (Alias) e;
+                DecodeAsVarchar decode = new DecodeAsVarchar(encodeBodyToEncodeAlias.get(e.child(0)).toSlot());
+                Alias newAlias = (Alias) alias.withChildren(decode);
+                projections.add(newAlias);
+            } else {
+                projections.add(e);
+            }
+        }
+        return project.withProjects(projections);
+
+    }
+
+    private LogicalProject<? extends Plan> pushDownEncodeSlot(LogicalProject<? extends Plan> project) {
+        List<Alias> encodeAliases = collectEncodeAliases(project);
+        if (encodeAliases.isEmpty()) {
+            return project;
+        }
+
+        PushDownContext ctx = new PushDownContext(project, encodeAliases);
+        ctx.prepare();
+        if (ctx.notPushed.size() == encodeAliases.size()) {
+            return project;
+        }
+        Plan child = project.child();
+        PushDownContext childContext = new PushDownContext(child, ctx.toBePushedToChild.get(child));
+        Plan newChild = child.accept(EncodeSlotPushDownVisitor.INSTANCE, childContext);
+        List<Alias> pushed = ctx.toBePused;
+        if (child != newChild) {
+            if (newChild instanceof LogicalProject) {
+                pushed.removeAll(childContext.notPushed);
+                newChild = ((LogicalProject<?>) newChild).child();
+            }
+            project = (LogicalProject<? extends Plan>) project.withChildren(newChild);
+            project = rewriteRootProject(project, pushed);
+        }
+        return project;
+    }
+
+    /**
+     * push down encode slot context
+     */
+    public static class PushDownContext {
+        public Plan plan;
+
+        public List<Alias> encodeAliases;
+        // encode_as_int(slot1) as slot2
+        // replaceMap:
+        // slot1 -> slot2
+        Map<Expression, SlotReference> replaceMap = new HashMap<>();
+        // child plan -> aliases in encodeAliases which can be pushed down to child plan
+        Map<Plan, List<Alias>> toBePushedToChild = new HashMap<>();
+        List<Alias> toBePused = new ArrayList<>();
+        // the aliases that cannot be pushed down to any child plan
+        // for example:
+        // encode(A+B) as x, where plan is a join, and A, B comes from join's left and right child respectively
+        List<Alias> notPushed = new ArrayList<>();
+
+        public PushDownContext(Plan plan, List<Alias> encodeAliases) {
+            this.plan = plan;
+            this.encodeAliases = encodeAliases;
+        }
+
+        // init replaceMap/toBePushed/notPushed
+        private void prepare() {
+            List<Set<Slot>> childrenPassThroughSlots =
+                    plan.children().stream().map(n -> getPassThroughSlots(n)).collect(Collectors.toList());
+            for (int i = 0; i < plan.children().size(); i++) {
+                Plan child = plan.children().get(i);
+                if (child instanceof LogicalJoin) {
+                    LogicalJoin<?, ?> join = (LogicalJoin<?, ?>) child;
+                    BiMap<SlotReference, SlotReference> compareSlots = EncodeSlotPushDownVisitor
+                            .getEncodeCandidateSlotsAndShouldNotPushSlotsFromJoinCondition(join).first;
+                    childrenPassThroughSlots.get(i).addAll(compareSlots.keySet());
+                }
+            }
+            for (Alias alias : encodeAliases) {
+                EncodeString encode = (EncodeString) alias.child();
+                Expression strExpr = encode.child();
+                boolean pushed = false;
+                Preconditions.checkArgument(strExpr instanceof SlotReference,
+                        "expect encode_as_xxx(slot), but " + alias);
+
+                for (int i = 0; i < childrenPassThroughSlots.size(); i++) {
+                    if (childrenPassThroughSlots.get(i).contains(strExpr)) {
+                        toBePushedToChild.putIfAbsent(plan.child(i), new ArrayList<>());
+                        toBePushedToChild.get(plan.child(i)).add(alias);
+                        toBePused.add(alias);
+                        replaceMap.put(alias.child().child(0), (SlotReference) alias.toSlot());
+                        pushed = true;
+                        break;
+                    }
+                }
+                if (!pushed) {
+                    notPushed.add(alias);
+                }
+            }
+        }
+
+        /**
+         * expandEncodeAliasForJoin
+         */
+        public void expandEncodeAliasForJoin(BiMap<SlotReference, SlotReference> equalSlots) {
+            List<Alias> expanded = new ArrayList<>();
+            for (Alias alias : encodeAliases) {
+                if (alias.child().child(0) instanceof SlotReference) {
+                    SlotReference slot = (SlotReference) alias.child().child(0);
+                    SlotReference otherHand = equalSlots.get(slot);
+                    if (otherHand != null) {
+                        EncodeString encodeOtherHand = (EncodeString) alias.child().withChildren(otherHand);
+                        Alias encodeOtherHandAlias = new Alias(encodeOtherHand, encodeOtherHand.toSql());
+                        if (!encodeAliases.contains(encodeOtherHandAlias)) {
+                            expanded.add(encodeOtherHandAlias);
+                        }
+                    }
+                }
+            }
+            encodeAliases.addAll(expanded);
+        }
+
+        // the child of alias is a slot reference. for example: slotA as B
+        //
+        private boolean isSlotAlias(Expression expr) {
+            return expr instanceof Alias && expr.child(0) instanceof SlotReference;
+        }
+
+        private Set<Slot> getPassThroughSlots(Plan plan) {
+            Set<Slot> outputSlots = Sets.newHashSet(plan.getOutputSet());
+            Set<Slot> keySlots = Sets.newHashSet();
+            for (Expression e : plan.getExpressions()) {
+                if (!(e instanceof SlotReference) && !isSlotAlias(e)) {
+                    keySlots.addAll(e.getInputSlots());
+                }
+            }
+            outputSlots.removeAll(keySlots);
+            return outputSlots;
+        }
+    }
+
+    /**
+     * push down encode slot
+     */
+    public static class EncodeSlotPushDownVisitor extends PlanVisitor<Plan, PushDownContext> {
+        public static EncodeSlotPushDownVisitor INSTANCE = new EncodeSlotPushDownVisitor();
+
+        /**
+         * visitChildren
+         */
+        public Plan visitChildren(Plan plan, PushDownContext ctx) {
+            ImmutableList.Builder<Plan> newChildren = ImmutableList.builderWithExpectedSize(plan.arity());
+            boolean hasNewChildren = false;
+            for (Plan child : plan.children()) {
+                Plan newChild;
+                if (ctx.toBePushedToChild.containsKey(child)) {
+                    newChild = child.accept(this, new PushDownContext(child, ctx.toBePushedToChild.get(child)));
+                    if (!hasNewChildren && newChild != child) {
+                        hasNewChildren = true;
+                    }
+                } else {
+                    newChild = child;
+                }
+                newChildren.add(newChild);
+            }
+            if (hasNewChildren) {
+                plan = plan.withChildren(newChildren.build());
+            }
+            return plan;
+        }
+
+        private Plan projectNotPushedAlias(Plan plan, List<Alias> notPushedAlias) {
+            if (!notPushedAlias.isEmpty()) {
+                // project encode expressions if they are not pushed down
+                // project(encode)
+                //   +--> plan
+                List<NamedExpression> projections =
+                        notPushedAlias.stream().map(e -> (NamedExpression) e).collect(Collectors.toList());
+                projections.addAll(plan.getOutput());
+                plan = new LogicalProject<>(projections, plan);
+            }
+            return plan;
+        }
+
+        @Override
+        public Plan visit(Plan plan, PushDownContext ctx) {
+            ctx.prepare();
+            plan = visitChildren(plan, ctx);
+            plan = projectNotPushedAlias(plan, ctx.notPushed);
+            return plan;
+        }
+
+        @Override
+        public Plan visitLogicalRepeat(LogicalRepeat repeat, PushDownContext ctx) {
+            Plan plan = projectNotPushedAlias(repeat, ctx.encodeAliases);
+            return plan;
+        }
+
+        private Optional<Alias> findEncodeAliasByEncodeSlot(SlotReference slot, List<Alias> aliases) {
+            for (Alias alias : aliases) {
+                if (alias.child().child(0).equals(slot)) {
+                    return Optional.of(alias);
+                }
+            }
+            return Optional.empty();
+        }
+
+        @Override
+        public LogicalProject<? extends Plan> visitLogicalProject(
+                LogicalProject<? extends Plan> project, PushDownContext ctx) {
+            /*
+             * case 1
+             * push down "encode(v1) as v2
+             * project(v1, ...)
+             *    +--->any(v1)
+             * =>
+             * project(v2, ...)
+             *    +--->any(v1)
+             * and push down "encode(v1) as v2" to any(v1)
+             *
+             * case 2
+             * push down "encode(v1) as v2
+             * project(k as v1, ...)
+             *    +--->any(k)
+             * =>
+             * project(v2, ...)
+             *    +--->any(k)
+             * and push down "encode(k) as v2" to any(v1)
+             *
+             * case 3
+             * push down "encode(v44) as v307"
+             * project(decode(v305) as v44)
+             *     +-->agg(v305, groupBy[v305])
+             *         +--->project(encode(v44) as v305)
+             * =>
+             * project(v305 as v307)
+             *     +-->agg
+             *
+             * case 4
+             * push down "encode(v1) as v2
+             * project(a + b as v1, ...)
+             *    +--->any(a, b)
+             * =>
+             * project(encode(a+b) as v2, ...)
+             *    +-->any(a, b)
+
+             *
+             */
+            List<NamedExpression> projections = Lists.newArrayListWithCapacity(project.getProjects().size());
+            List<Alias> toBePushed = Lists.newArrayList();
+            List<Alias> notPushed = Lists.newArrayList(ctx.encodeAliases);
+
+            for (NamedExpression e : project.getProjects()) {
+                boolean changed = false;
+
+                if (e instanceof SlotReference) {
+                    Optional<Alias> encodeAliasOpt = findEncodeAliasByEncodeSlot((SlotReference) e, ctx.encodeAliases);
+                    if (encodeAliasOpt.isPresent()) {
+                        // case 1
+                        projections.add(encodeAliasOpt.get().toSlot());
+                        toBePushed.add(encodeAliasOpt.get());
+                        notPushed.remove(encodeAliasOpt.get());
+                        changed = true;
+                    }
+                } else {
+                    // e is Alias
+                    Expression aliasExpr = e.child(0);
+                    if (aliasExpr instanceof SlotReference) {
+                        //case 2
+                        Optional<Alias> encodeAliasOpt = findEncodeAliasByEncodeSlot((SlotReference) e.toSlot(),
+                                ctx.encodeAliases);
+                        if (encodeAliasOpt.isPresent()) {
+                            projections.add(encodeAliasOpt.get().toSlot());
+                            Alias encodeAlias = encodeAliasOpt.get();
+                            EncodeString encode = (EncodeString) encodeAlias.child();
+                            SlotReference baseSlot = (SlotReference) aliasExpr;
+                            Alias encodeAliasForChild = (Alias) encodeAlias.withChildren(encode.withChildren(baseSlot));
+                            toBePushed.add(encodeAliasForChild);
+                            notPushed.remove(encodeAlias);
+                            changed = true;
+                        }
+                    } else {
+                        Optional<Alias> encodeAliasOpt = findEncodeAliasByEncodeSlot((SlotReference) e.toSlot(),
+                                ctx.encodeAliases);
+                        if (encodeAliasOpt.isPresent()) {
+                            Alias encodeAlias = encodeAliasOpt.get();
+                            if (aliasExpr instanceof DecodeAsVarchar) {
+                                // case 3
+                                // push down "encode(v44) as v307"
+                                // project(decode(v305) as v44)
+                                //      +-->agg(v305, groupBy[v305])
+                                //         +--->project(encode(v44) as v305)
+                                Expression decodeBody = aliasExpr.child(0);
+                                Alias aliasForProject = (Alias) encodeAlias.withChildren(decodeBody);
+                                projections.add(aliasForProject);
+                                notPushed.remove(encodeAlias);
+                                changed = true;
+                            } else {
+                                // case 4
+                                EncodeString encode = (EncodeString) encodeAlias.child();
+                                Alias encodeAliasForProject = (Alias) encodeAlias
+                                        .withChildren(encode.withChildren(aliasExpr));
+                                projections.add(encodeAliasForProject);
+                                notPushed.remove(encodeAlias);
+                                changed = true;
+                            }
+                        }
+                    }
+                }
+                if (!changed) {
+                    projections.add(e);
+                }
+            }
+            projections.addAll(notPushed);
+
+            project = project.withProjects(projections);
+            if (!toBePushed.isEmpty()) {
+                PushDownContext childContext = new PushDownContext(project.child(), toBePushed);
+                Plan newChild = project.child().accept(this, childContext);
+                if (project.child() != newChild) {
+                    project = (LogicalProject<? extends Plan>) project.withChildren(newChild);
+                }
+            }
+            return project;
+        }
+
+        private static boolean canBothSidesEncode(ComparisonPredicate compare) {
+            return compare.left().getDataType() instanceof CharacterType
+                    && ((CharacterType) compare.left().getDataType()).getLen() < 15
+                    && ((CharacterType) compare.right().getDataType()).getLen() < 15
+                    && compare.left() instanceof SlotReference && compare.right() instanceof SlotReference;
+        }
+
+        /**
+         * getEncodeCandidateSlotsFromJoinCondition
+         *
+         */
+        public static Pair<BiMap<SlotReference, SlotReference>, Set<Slot>>
+                getEncodeCandidateSlotsAndShouldNotPushSlotsFromJoinCondition(LogicalJoin<?, ?> join) {
+            // T1 join T2 on v1=v2  => v1/v2 can be encoded
+            // T1 join T2 on v1=v2 and fun(v1) => v1/v2 can not be encoded
+            BiMap<SlotReference, SlotReference> compareSlots = HashBiMap.create();
+            List<Expression> conditions = new ArrayList<>();
+            conditions.addAll(join.getHashJoinConjuncts());
+            conditions.addAll(join.getOtherJoinConjuncts());
+            Set<Slot> shouldNotPushSlots = Sets.newHashSet();
+            for (Expression e : conditions) {
+                boolean canPush = false;
+                if (e instanceof ComparisonPredicate) {
+                    ComparisonPredicate compare = (ComparisonPredicate) e;
+                    if (canBothSidesEncode(compare)) {
+                        compareSlots.put((SlotReference) compare.left(), (SlotReference) compare.right());
+                        canPush = true;
+                    }
+                }
+                if (!canPush) {
+                    shouldNotPushSlots.addAll(e.getInputSlots());
+                }
+            }
+            for (Slot notPushSlot : shouldNotPushSlots) {
+                if (compareSlots.isEmpty()) {
+                    break;
+                }
+                compareSlots.remove((SlotReference) notPushSlot);
+                compareSlots.inverse().remove((SlotReference) notPushSlot);
+            }
+            return Pair.of(compareSlots, shouldNotPushSlots);
+        }
+
+        @Override
+        public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join, PushDownContext ctx) {
+            List<Alias> pushLeft = new ArrayList<>();
+            List<Alias> pushRight = new ArrayList<>();
+            Pair<BiMap<SlotReference, SlotReference>, Set<Slot>> pair =
+                    getEncodeCandidateSlotsAndShouldNotPushSlotsFromJoinCondition(join);
+            BiMap<SlotReference, SlotReference> encodeCandidateSlots = pair.first;
+            Set<Slot> shouldNotPushSlots = pair.second;
+            Set<Slot> leftOutputSlots = join.left().getOutputSet();
+            Map<SlotReference, SlotReference> replaceMap = new HashMap<>();
+            List<Alias> notPushed = new ArrayList<>();
+            for (Alias encodeAlias : ctx.encodeAliases) {
+                SlotReference encodeSlot = (SlotReference) encodeAlias.child().child(0);
+                if (encodeCandidateSlots.containsKey(encodeSlot)) {
+                    SlotReference otherHand = encodeCandidateSlots.get(encodeSlot);
+                    Alias otherHandAlias = new Alias(encodeAlias.child().withChildren(otherHand));
+                    if (leftOutputSlots.contains(encodeSlot)) {
+                        pushLeft.add(encodeAlias);
+                        pushRight.add(otherHandAlias);
+                    } else {
+                        pushRight.add(encodeAlias);
+                        pushLeft.add(otherHandAlias);
+                    }
+                    replaceMap.put(encodeSlot, (SlotReference) encodeAlias.toSlot());
+                    replaceMap.put(otherHand, (SlotReference) otherHandAlias.toSlot());
+                } else if (!shouldNotPushSlots.contains(encodeSlot)) {
+                    if (leftOutputSlots.contains(encodeSlot)) {
+                        pushLeft.add(encodeAlias);
+                    } else {
+                        pushRight.add(encodeAlias);
+                    }
+                } else {
+                    notPushed.add(encodeAlias);
+                }
+                replaceMap.put(encodeSlot, (SlotReference) encodeAlias.toSlot());
+            }
+            List<Expression> newConjuncts = Lists.newArrayListWithCapacity(join.getOtherJoinConjuncts().size());
+            boolean changed = false;
+            for (Expression e : join.getOtherJoinConjuncts()) {
+                if (e instanceof ComparisonPredicate) {
+                    ComparisonPredicate compare = (ComparisonPredicate) e;
+                    if (canBothSidesEncode(compare)) {
+                        SlotReference newLeft = replaceMap.get(compare.left());
+                        SlotReference newRight = replaceMap.get(compare.right());
+                        if (newLeft != null && newRight != null) {
+                            compare = (ComparisonPredicate) compare.withChildren(newLeft, newRight);
+                            changed = true;
+                        }
+                        Preconditions.checkArgument((newLeft == null) == (newRight == null),
+                                "PushDownEncodeSlot replaceMap is not valid, " + compare);
+                    }
+                    newConjuncts.add(compare);
+                } else {
+                    newConjuncts.add(e);
+                }
+            }
+            if (changed) {
+                join = join.withJoinConjuncts(join.getHashJoinConjuncts(), newConjuncts, join.getJoinReorderContext());
+            }
+            Plan newLeft;
+            if (pushLeft.isEmpty()) {
+                newLeft = join.left();
+            } else {
+                newLeft = join.left().accept(this, new PushDownContext(join.left(), pushLeft));
+            }
+            Plan newRight;
+            if (pushRight.isEmpty()) {
+                newRight = join.right();
+            } else {
+                newRight = join.right().accept(this, new PushDownContext(join.right(), pushRight));
+            }
+            Plan result = join.withChildren(newLeft, newRight);
+            if (!notPushed.isEmpty()) {
+                List<NamedExpression> projections = new ArrayList<>();
+                projections.addAll(notPushed);
+                projections.addAll(join.getOutput());
+                result = new LogicalProject<Plan>(projections, join);
+            }
+            return result;
+        }
+
+        @Override
+        public Plan visitLogicalSetOperation(LogicalSetOperation op, PushDownContext ctx) {
+            // push down "encode(v) as x" through
+            // union(output[v], regular([v1],[v2]))
+            //    -->child1(v1)
+            //    -->child2(v2)
+            // rewrite union to: union(output[x], regular([x1], [x2]))
+            // and then push "encode(v1) as x1" to child(v1)
+            //          push "encode(v2) as x2" to child(v2)
+
+            List<NamedExpression> newOutput = Lists.newArrayListWithCapacity(op.getOutput().size());
+            List<List<SlotReference>> newRegularOutputs = Lists.newArrayListWithCapacity(op.getOutput().size());
+            for (int cid = 0; cid < op.children().size(); cid++) {
+                newRegularOutputs.add(Lists.newArrayList(op.getRegularChildOutput(cid)));
+            }
+
+            for (int oid = 0; oid < op.getOutput().size(); oid++) {
+                NamedExpression e = op.getOutput().get(oid);
+                boolean changed = false;
+                for (Alias alias : ctx.encodeAliases) {
+                    if (alias.child().child(0).equals(e)) {
+                        newOutput.add(alias.toSlot());
+                        changed = true;
+                        EncodeString encode = (EncodeString) alias.child();
+                        ctx.toBePused.add(alias);
+                        for (int cid = 0; cid < op.children().size(); cid++) {
+                            Plan child = op.child(cid);
+                            ctx.toBePushedToChild.putIfAbsent(child, new ArrayList<>());
+                            Alias aliasForChild = new Alias(
+                                    encode.withChildren(op.getRegularChildrenOutputs().get(cid).get(oid)));
+                            ctx.toBePushedToChild.get(child).add(aliasForChild);
+                            newRegularOutputs.get(cid).set(oid, (SlotReference) aliasForChild.toSlot());
+                        }
+                        break;
+                    }
+                }
+                if (!changed) {
+                    newOutput.add(e);
+                }
+            }
+            op = op.withNewOutputs(newOutput);
+
+            //rewrite children
+            List<Plan> newChildren = Lists.newArrayListWithCapacity(op.children().size());
+            for (Plan child : op.children()) {
+                if (!ctx.toBePushedToChild.get(child).isEmpty()) {
+                    PushDownContext childCtx = new PushDownContext(child, ctx.toBePushedToChild.get(child));
+                    Plan newChild = child.accept(this, childCtx);
+                    newChildren.add(newChild);
+                } else {
+                    newChildren.add(child);
+                }
+            }
+            op = op.withChildrenAndTheirOutputs(newChildren, newRegularOutputs);
+            return op;
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
index 38b687ba8387a0..3963c1b651ba28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughProject.java
@@ -23,7 +23,7 @@
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.DecodeAsVarchar;
-import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeStrToInteger;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeString;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
@@ -149,7 +149,7 @@ private static Set<Expression> eliminateDecodeAndEncode(Set<Expression> expressi
     }
 
     private static Expression eliminateDecodeAndEncode(Expression expression) {
-        if (expression instanceof DecodeAsVarchar && expression.child(0) instanceof EncodeStrToInteger) {
+        if (expression instanceof DecodeAsVarchar && expression.child(0) instanceof EncodeString) {
             return expression.child(0).child(0);
         }
         boolean hasNewChild = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SimplifyEncodeDecode.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SimplifyEncodeDecode.java
index b6513fc7580707..fd15fae0d07a53 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SimplifyEncodeDecode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SimplifyEncodeDecode.java
@@ -23,7 +23,7 @@
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.functions.scalar.DecodeAsVarchar;
-import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeStrToInteger;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.EncodeString;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -48,7 +48,7 @@ public List<Rule> buildRules() {
                                     boolean changed = false;
                                     for (NamedExpression namedExpression : project.getProjects()) {
                                         if (namedExpression instanceof Alias
-                                                && namedExpression.child(0) instanceof EncodeStrToInteger
+                                                && namedExpression.child(0) instanceof EncodeString
                                                 && namedExpression.child(0).child(0)
                                                 instanceof DecodeAsVarchar) {
                                             Alias alias = (Alias) namedExpression;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/WindowExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/WindowExpression.java
index 7f26298c700626..00e7f986ad5826 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/WindowExpression.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/WindowExpression.java
@@ -199,7 +199,7 @@ public String computeToSql() {
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
-        sb.append(function).append(" WindowSpec(");
+        sb.append("WindowExpression(").append(function).append(" spec(");
         if (!partitionKeys.isEmpty()) {
             sb.append("PARTITION BY ").append(partitionKeys.stream()
                     .map(Expression::toString)
@@ -211,7 +211,7 @@ public String toString() {
                     .collect(Collectors.joining(", ", "", " ")));
         }
         windowFrame.ifPresent(wf -> sb.append(wf.toSql()));
-        return sb.toString().trim() + ")";
+        return sb.toString().trim() + "))";
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsBigInt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsBigInt.java
index 7d798ecf3e8cab..d3f267efa36d33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsBigInt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsBigInt.java
@@ -33,8 +33,8 @@
 /**
  * ScalarFunction 'EncodeAsBigInt'.
  */
-public class EncodeAsBigInt extends ScalarFunction
-        implements ExplicitlyCastableSignature, PropagateNullable, EncodeStrToInteger {
+public class EncodeAsBigInt extends EncodeString
+        implements ExplicitlyCastableSignature, PropagateNullable {
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(BigIntType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsInt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsInt.java
index 5c6382d6ea144d..661f05fe3be82d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsInt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsInt.java
@@ -33,8 +33,8 @@
 /**
  * ScalarFunction 'EncodeAsInt'.
  */
-public class EncodeAsInt extends ScalarFunction
-        implements ExplicitlyCastableSignature, PropagateNullable, EncodeStrToInteger {
+public class EncodeAsInt extends EncodeString
+        implements ExplicitlyCastableSignature, PropagateNullable {
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(IntegerType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsLargeInt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsLargeInt.java
index bb30a9a8e8aef5..ee8d723d2b5143 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsLargeInt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsLargeInt.java
@@ -33,8 +33,8 @@
 /**
  * ScalarFunction 'EncodeAsLargeInt'.
  */
-public class EncodeAsLargeInt extends ScalarFunction
-        implements ExplicitlyCastableSignature, PropagateNullable, EncodeStrToInteger {
+public class EncodeAsLargeInt extends EncodeString
+        implements ExplicitlyCastableSignature, PropagateNullable {
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(LargeIntType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsSmallInt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsSmallInt.java
index 355a740197c33e..d0e6a1fa23b121 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsSmallInt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeAsSmallInt.java
@@ -33,8 +33,8 @@
 /**
  * ScalarFunction 'CompressAsSmallInt'.
  */
-public class EncodeAsSmallInt extends ScalarFunction
-        implements ExplicitlyCastableSignature, PropagateNullable, EncodeStrToInteger {
+public class EncodeAsSmallInt extends EncodeString
+        implements ExplicitlyCastableSignature, PropagateNullable {
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(SmallIntType.INSTANCE).args(VarcharType.SYSTEM_DEFAULT)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeStrToInteger.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeString.java
similarity index 71%
rename from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeStrToInteger.java
rename to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeString.java
index 87a9c43687d6a3..778d76c3462508 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeStrToInteger.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/EncodeString.java
@@ -17,8 +17,17 @@
 
 package org.apache.doris.nereids.trees.expressions.functions.scalar;
 
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
+
 /**
  * Encode_as_XXXInt
  */
-public interface EncodeStrToInteger {
+public abstract class EncodeString extends ScalarFunction implements UnaryExpression {
+    /**
+     * constructor with 1 argument.
+     */
+    public EncodeString(String name, Expression arg0) {
+        super(name, arg0);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
index 11baf2137ab5d8..94e86506b58751 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java
@@ -329,6 +329,15 @@ public TopnPushInfo(List<OrderKey> orderkeys, long limit) {
             this.orderkeys = ImmutableList.copyOf(orderkeys);
             this.limit = limit;
         }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder("[");
+            builder.append("orderkeys=").append(orderkeys);
+            builder.append(", limit=").append(limit);
+            builder.append("]");
+            return builder.toString();
+        }
     }
 
     public TopnPushInfo getTopnPushInfo() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
index 25637d1b816656..813a3edb7d2c31 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java
@@ -420,7 +420,7 @@ public static Optional<Slot> extractSlotOrCastOnSlot(Expression expr) {
     /**
      * Generate replaceMap Slot -> Expression from NamedExpression[Expression as name]
      */
-    public static Map<Slot, Expression> generateReplaceMap(List<NamedExpression> namedExpressions) {
+    public static Map<Slot, Expression> generateReplaceMap(List<? extends NamedExpression> namedExpressions) {
         Map<Slot, Expression> replaceMap = Maps.newLinkedHashMapWithExpectedSize(namedExpressions.size());
         for (NamedExpression namedExpression : namedExpressions) {
             if (namedExpression instanceof Alias) {
@@ -484,7 +484,7 @@ public static Set<Expression> replace(Set<Expression> exprs,
     /**
      * Replace expression node in the expression tree by `replaceMap` in top-down manner.
      */
-    public static List<NamedExpression> replaceNamedExpressions(List<NamedExpression> namedExpressions,
+    public static List<NamedExpression> replaceNamedExpressions(List<? extends NamedExpression> namedExpressions,
             Map<? extends Expression, ? extends Expression> replaceMap) {
         Builder<NamedExpression> replaceExprs = ImmutableList.builderWithExpectedSize(namedExpressions.size());
         for (NamedExpression namedExpression : namedExpressions) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
index 0076c232340d29..28f7cda427ed7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/PlanUtils.java
@@ -121,8 +121,8 @@ public static List<NamedExpression> adjustNullableForRepeat(
     /**
      * merge childProjects with parentProjects
      */
-    public static List<NamedExpression> mergeProjections(List<NamedExpression> childProjects,
-            List<NamedExpression> parentProjects) {
+    public static List<NamedExpression> mergeProjections(List<? extends NamedExpression> childProjects,
+            List<? extends NamedExpression> parentProjects) {
         Map<Slot, Expression> replaceMap = ExpressionUtils.generateReplaceMap(childProjects);
         return ExpressionUtils.replaceNamedExpressions(parentProjects, replaceMap);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d62c00534b6d94..2961d51caa6888 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2231,7 +2231,7 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
     public boolean enableSortSpill = false;
 
     @VariableMgr.VarAttr(
-            name = "ENABLE_COMPRESS_MATERIALIZE",
+            name = "enable_compress_materialize",
             description = {"控制是否启用compress materialize。",
                     "enable compress-materialize. "},
             needForward = true, fuzzy = false,
diff --git a/regression-test/data/nereids_p0/compress_materialize/compress_materialize.out b/regression-test/data/nereids_p0/compress_materialize/compress_materialize.out
index eee04795628144..8922b2155aec2b 100644
--- a/regression-test/data/nereids_p0/compress_materialize/compress_materialize.out
+++ b/regression-test/data/nereids_p0/compress_materialize/compress_materialize.out
@@ -53,3 +53,21 @@ a	1
 中	8
 bb	3
 
+-- !explain_sort_agg --
+cost = 19.563333333333333
+PhysicalResultSink[294] ( outputExprs=[v1#1] )
++--PhysicalProject[289]@5 ( stats=1, projects=[v1#1] )
+   +--PhysicalQuickSort[284]@4 ( stats=1, orderKeys=[encode_as_bigint(v1)#4 asc null first], phase=MERGE_SORT )
+      +--PhysicalDistribute[279]@7 ( stats=1, distributionSpec=DistributionSpecGather )
+         +--PhysicalQuickSort[274]@7 ( stats=1, orderKeys=[encode_as_bigint(v1)#4 asc null first], phase=LOCAL_SORT )
+            +--PhysicalProject[269]@3 ( stats=1, projects=[decode_as_varchar(encode_as_bigint(v1)#3) AS `v1`#1, encode_as_bigint(decode_as_varchar(encode_as_bigint(v1)#3)) AS `encode_as_bigint(decode_as_varchar(encode_as_bigint(v1)))`#4], multi_proj=l0([encode_as_bigint(v1)#3, decode_as_varchar(encode_as_bigint(v1)#3) AS `decode_as_varchar(encode_as_bigint(v1))`#5])l1([decode_as_varchar(encode_as_bigint(v1))#5 AS `v1`#1, encode_as_bigint(decode_as_varchar(encode_as_bigint(v1))#5) AS `encode_as_bigint(decode_as_varchar(encode_as_bigint(v1)))`#4]) )
+               +--PhysicalHashAggregate[264]@2 ( stats=1, aggPhase=GLOBAL, aggMode=BUFFER_TO_RESULT, maybeUseStreaming=false, groupByExpr=[encode_as_bigint(v1)#3], outputExpr=[encode_as_bigint(v1)#3], partitionExpr=Optional[[encode_as_bigint(v1)#3]], topnFilter=false, topnPushDown=false )
+                  +--PhysicalDistribute[259]@8 ( stats=1, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[3], shuffleType=EXECUTION_BUCKETED, tableId=-1, selectedIndexId=-1, partitionIds=[], equivalenceExprIds=[[3]], exprIdToEquivalenceSet={3=0} ) )
+                     +--PhysicalHashAggregate[254]@8 ( stats=1, aggPhase=LOCAL, aggMode=INPUT_TO_BUFFER, maybeUseStreaming=true, groupByExpr=[encode_as_bigint(v1)#3], outputExpr=[encode_as_bigint(v1)#3], partitionExpr=Optional[[encode_as_bigint(v1)#3]], topnFilter=false, topnPushDown=false )
+                        +--PhysicalProject[249]@1 ( stats=1, projects=[encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#3] )
+                           +--PhysicalOlapScan[t1]@0 ( stats=1 )
+
+-- !exec_sort_agg --
+a
+b
+
diff --git a/regression-test/data/nereids_p0/compress_materialize/pushdown_encode.out b/regression-test/data/nereids_p0/compress_materialize/pushdown_encode.out
new file mode 100644
index 00000000000000..da63c94a5abc17
--- /dev/null
+++ b/regression-test/data/nereids_p0/compress_materialize/pushdown_encode.out
@@ -0,0 +1,36 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !exec_sort_agg --
+a
+b
+
+-- !exec_sort_filter --
+a
+b
+
+-- !exec_agg_join --
+b
+
+-- !agg_join_2 --
+b	2
+
+-- !nlj --
+a	9
+b	7
+
+-- !union --
+1
+2
+2
+3
+4
+
+-- !intersect --
+2
+
+-- !except --
+1
+
+-- !agg_sort --
+a
+b
+
diff --git a/regression-test/suites/nereids_p0/compress_materialize/compress_materialize.groovy b/regression-test/suites/nereids_p0/compress_materialize/compress_materialize.groovy
index 8489de2aa2a1fd..656869c6ea99d0 100644
--- a/regression-test/suites/nereids_p0/compress_materialize/compress_materialize.groovy
+++ b/regression-test/suites/nereids_p0/compress_materialize/compress_materialize.groovy
@@ -17,6 +17,7 @@
 
 suite("compress_materialize") {
     sql """
+    set enable_compress_materialize=true;
     drop table if exists compress;
     CREATE TABLE `compress` (
     `k` varchar(5) NOT NULL,
@@ -193,6 +194,5 @@ suite("compress_materialize") {
     qt_sort "select * from compressSort order by k desc, v";
     qt_sort "select * from compressSort order by k desc nulls last";
     qt_sort "select * from compressSort order by k desc nulls last, v limit 3";
-
 }
 
diff --git a/regression-test/suites/nereids_p0/compress_materialize/pushdown_encode.groovy b/regression-test/suites/nereids_p0/compress_materialize/pushdown_encode.groovy
new file mode 100644
index 00000000000000..50239c9f8c13c6
--- /dev/null
+++ b/regression-test/suites/nereids_p0/compress_materialize/pushdown_encode.groovy
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("pushdown_encode") {
+// push down encode slot
+    sql """
+    set enable_compress_materialize=true;
+    drop table if exists t1;
+    CREATE TABLE t1 (
+    `k1` int NOT NULL,
+    `v1` char(5) NOT NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`k1`)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+
+    insert into t1 values (1, "a"), (2, "b");
+
+    drop table if exists t2;
+    CREATE TABLE t2 (
+    `k2` int NOT NULL,
+    `v2` char(5) NOT NULL
+    ) ENGINE=OLAP
+    DUPLICATE KEY(`k2`)
+    DISTRIBUTED BY HASH(`k2`) BUCKETS 3
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+
+    insert into t2 values (3, "c"), (4, "d"), (2, "b");
+
+    set disable_join_reorder=true;
+    """
+
+    explain{
+        sql """
+            physical plan 
+            select v1
+            from (select sum(k1) as k, v1 from t1 group by v1) t
+            order by v1;
+        """
+        contains("orderKeys=[encode_as_bigint(v1)#4 asc null first]")
+        contains("projects=[decode_as_varchar(encode_as_bigint(v1)#3) AS `v1`#1, encode_as_bigint(v1)#3 AS `encode_as_bigint(v1)`#4]")
+        contains("groupByExpr=[encode_as_bigint(v1)#3]")
+        contains("projects=[encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#3]")
+    } 
+
+    order_qt_exec_sort_agg """
+        select v1
+        from (select sum(k1) as k, v1 from t1 group by v1) t
+        order by v1;
+        """
+
+    explain{
+        sql """
+            physical plan
+            select v1
+            from t1
+            where k1 > 0
+            order by v1;
+            """
+        contains("orderKeys=[encode_as_bigint(v1)#2 asc null first]")
+        contains("projects=[decode_as_varchar(encode_as_bigint(v1#1)) AS `decode_as_varchar(encode_as_bigint(v1))`#1, encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#2]")
+    }
+    
+    order_qt_exec_sort_filter """
+    select v1
+    from t1
+    where k1 > 0
+    order by v1;
+    """
+
+    explain{
+        sql """
+        physical plan 
+        select v1
+        from t1 join t2 on v1=v2
+        group by v1;
+        """
+        contains("groupByExpr=[encode_as_bigint(v1)#4]")
+        contains("projects=[encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#4]")
+    }
+
+    order_qt_exec_agg_join"""
+    select v1
+    from t1 join t2 on v1=v2
+    group by v1;
+    """
+
+    explain {
+        sql"""
+            physical plan
+            select v1
+            from t1 join t2 on v1=v2
+            group by v1;
+            """
+        contains("groupByExpr=[encode_as_bigint(v1)#4]")
+        contains("projects=[encode_as_bigint(v1)#4]")
+        contains("hashCondition=[(encode_as_bigint(v1)#4 = encode_as_bigint(v2)#5)]")
+        contains("projects=[encode_as_bigint(v2#3) AS `encode_as_bigint(v2)`#5]")
+        contains("projects=[encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#4]")
+    }
+    
+    explain {
+        // because of "length(v1)>0", encode not pushed down through join
+        sql """
+            physical plan
+            select v1
+            from t1 left join t2 on v1=v2 and length(v1)>0
+            group by v1;
+            """
+        contains("hashCondition=[(v1#1 = v2#3)], otherCondition=[(length(v1#1) > 0)]")
+    }
+
+    order_qt_agg_join_2 """
+        select v1, sum(k2)
+        from t1 join t2 on v1=v2
+        group by v1;"""
+
+    explain {
+        sql """physical plan
+            select v1, sum(k2)
+            from t1 join t2 on v1=v2
+            group by v1;"""
+        contains("projects=[decode_as_varchar(encode_as_bigint(v1)#5) AS `v1`#1, sum(k2)#4]")
+        contains("groupByExpr=[encode_as_bigint(v1)#5]")
+        contains("projects=[encode_as_bigint(v1)#5, k2#2]")
+        contains("hashCondition=[(encode_as_bigint(v1)#5 = encode_as_bigint(v2)#6)]")
+        contains("projects=[encode_as_bigint(v2#3) AS `encode_as_bigint(v2)`#6, k2#2]")
+        contains("projects=[encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#5]")
+    }
+
+    explain {
+        sql """
+            physical plan
+            select v1, sum(k2)
+            from t1 right outer join t2 on v1 < v2
+            group by v1;
+            """
+        contains("groupByExpr=[encode_as_bigint(v1)#5]")
+        contains("projects=[encode_as_bigint(v1)#5, k2#2]")
+        contains("otherCondition=[(encode_as_bigint(v1)#5 < encode_as_bigint(v2)#6)]")
+        contains("projects=[encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#5]")
+        contains("projects=[encode_as_bigint(v2#3) AS `encode_as_bigint(v2)`#6, k2#2]")
+    }
+
+   
+    explain {
+        sql """
+            physical plan
+            select v1, sum(k2)
+            from 
+               (select t1.k1, t1.v1 from t1 join t2 on v1 < concat(v2,'a')) t3
+               join t2 on t3.k1=t2.k2
+            group by v1;
+            """
+        contains("otherCondition=[(v1#1 < concat(v2, 'a')#8)]")
+        contains("projects=[k1#0, encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#7]")
+
+        // +--PhysicalHashJoin[730]@7 ( stats=3, type=INNER_JOIN, hashCondition=[(k1#0 = k2#4)], otherCondition=[], markCondition=[], runtimeFilters=[RF0[k2#4->[k1#0](ndv/size = 3/4) , RF1[k2#4->[k1#0](ndv/size = 3/4) ] )
+        //               |--PhysicalProject[714]@4 ( stats=3, projects=[k1#0, encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#7] )                                                                                  
+        //               |  +--PhysicalNestedLoopJoin[709]@3 ( stats=3, type=INNER_JOIN, hashCondition=[], otherCondition=[(v1#1 < concat(v2, 'a')#8)], markCondition=[] )                                           
+        //               |     |--PhysicalProject[698]@2 ( stats=3, projects=[concat(v2#3, 'a') AS `concat(v2, 'a')`#8] )                                                                                            
+        //               |     |  +--PhysicalOlapScan[t2]@1 ( stats=3 )                                                                                                                                            
+        //               |     +--PhysicalDistribute[704]@0 ( stats=2, distributionSpec=DistributionSpecReplicated )                                                                                               
+        //               |        +--PhysicalOlapScan[t1]@0 ( stats=2, RFs= RF0 RF1 )                                                                                                                        
+        //               +--PhysicalDistribute[725]@6 ( stats=3, distributionSpec=DistributionSpecReplicated )                                                                                               
+        //                  +--PhysicalProject[720]@6 ( stats=3, projects=[k2#4] )                                                                                                                           
+        //                     +--PhysicalOlapScan[t2]@5 ( stats=3 )      
+    } 
+
+
+    order_qt_nlj """
+            select v1, sum(k2)
+            from t1 right outer join t2 on v1 < v2 and v2>"abc"
+            group by v1;
+            """
+
+    // not push through join, because v2>"abc"
+    sql """            
+            select v1, sum(k2)
+            from t1 right outer join t2 on v1 = v2 and (k1!=k2 or v2>"abc")
+            group by v1;
+            """
+    explain {
+        sql """  
+            shape plan          
+            select v1, sum(k2)
+            from t1 right outer join t2 on v1 = v2 and (k1!=k2 or v2>"abc")
+            group by v1;
+            """
+        contains "hashCondition=((t1.v1 = t2.v2))"
+    }
+
+    explain {
+        sql """
+             physical plan
+            select  k
+            from (
+                (select k1 as k, v1 as v from t1)
+                union all 
+                (select k2 as k, v2 as v from t2)
+                ) T
+            order by v;
+           """
+        contains("orderKeys=[encode_as_bigint(v)#10 asc null first]")
+        contains("outputs=[k#8, encode_as_bigint(v)#10], regularChildrenOutputs=[[k#4, encode_as_bigint(v)#11], [k#6, encode_as_bigint(v)#12]]")
+        contains("projects=[k1#0 AS `k`#4, encode_as_bigint(v1#1) AS `encode_as_bigint(v)`#11]")
+        contains("projects=[k2#2 AS `k`#6, encode_as_bigint(v2#3) AS `encode_as_bigint(v)`#12]")
+    }
+
+    order_qt_union """
+            select  k
+            from (
+                (select k1 as k, v1 as v from t1)
+                union all 
+                (select k2 as k, v2 as v from t2)
+                ) T
+            order by v;
+            """
+    order_qt_intersect """
+        select  k
+        from (
+            (select k1 as k, v1 as v from t1)
+            intersect 
+            (select k2 as k, v2 as v from t2)
+            ) T
+        order by v;
+        """
+
+    order_qt_except """
+        select  k
+        from (
+            (select k1 as k, v1 as v from t1)
+            except 
+            (select k2 as k, v2 as v from t2)
+            ) T
+        order by v;
+        """
+
+    order_qt_agg_sort """ 
+        select v1
+        from (select v1 from t1 where k1 > 0 order by v1 limit 10) t group by v1
+        """
+    
+    explain{
+        sql """ 
+        physical plan
+        select v1
+        from (select v1 from t1 where k1 > 0 order by v1 limit 10) t group by v1
+        """
+        contains("projects=[decode_as_varchar(encode_as_bigint(v1#1)) AS `decode_as_varchar(encode_as_bigint(v1))`#1, encode_as_bigint(v1#1) AS `encode_as_bigint(v1)`#3]")
+    }
+
+    // if encodeBody is used in windowExpression, do not push encode down
+
+    sql """
+        SELECT v1, k1, k2
+            , sum(k2) OVER (PARTITION BY v1 ORDER BY k1) AS w_sum
+        FROM t1
+            JOIN t2 ON k1 = k2 - 2
+        ORDER BY k1, v1, w_sum;
+    """
+
+    explain {
+        sql """
+        physical plan
+        SELECT v1, k1, k2
+            , sum(k2) OVER (PARTITION BY v1 ORDER BY k1) AS w_sum
+        FROM t1
+            JOIN t2 ON k1 = k2 - 2
+        ORDER BY k1, v1, w_sum;
+        """
+        contains("orderKeys=[k1#0 asc null first, encode_as_bigint(v1)#5 asc null first, w_sum#4 asc null first]")
+    }
+    
+}
\ No newline at end of file

From 33dca76f57498484fc6d54f90917acd1f8bc95e6 Mon Sep 17 00:00:00 2001
From: minghong <zhouminghong@selectdb.com>
Date: Sat, 28 Dec 2024 17:44:50 +0800
Subject: [PATCH 2/2] upate pull_up_join_from_union result

---
 .../pull_up_join_from_union/pull_up_join_from_union.out   | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/regression-test/data/nereids_rules_p0/pull_up_join_from_union/pull_up_join_from_union.out b/regression-test/data/nereids_rules_p0/pull_up_join_from_union/pull_up_join_from_union.out
index 10820e9ee48414..ecef42943dcedf 100644
--- a/regression-test/data/nereids_rules_p0/pull_up_join_from_union/pull_up_join_from_union.out
+++ b/regression-test/data/nereids_rules_p0/pull_up_join_from_union/pull_up_join_from_union.out
@@ -80,13 +80,11 @@ PhysicalResultSink
 
 -- !complex_join_condition1 --
 PhysicalResultSink
---PhysicalUnion
-----hashJoin[INNER_JOIN] hashCondition=((expr_cast(id as BIGINT) = expr_(cast(id as BIGINT) - 1))) otherCondition=()
-------PhysicalOlapScan[table_a]
+--hashJoin[INNER_JOIN] hashCondition=((expr_(cast(id as BIGINT) - 1) = expr_cast(id as BIGINT))) otherCondition=()
+----PhysicalUnion
 ------PhysicalOlapScan[table_b]
-----hashJoin[INNER_JOIN] hashCondition=((expr_cast(id as BIGINT) = expr_(cast(id as BIGINT) - 1))) otherCondition=()
-------PhysicalOlapScan[table_a]
 ------PhysicalOlapScan[table_c]
+----PhysicalOlapScan[table_a]
 
 -- !complex_join_condition2 --
 PhysicalResultSink