3434import org .apache .iceberg .PartitionField ;
3535import org .apache .iceberg .RowDelta ;
3636import org .apache .iceberg .Schema ;
37+ import org .apache .iceberg .data .InternalRecordWrapper ;
3738import org .apache .iceberg .data .Record ;
3839import org .apache .iceberg .io .WriteResult ;
3940import org .apache .iceberg .types .Type ;
@@ -65,6 +66,8 @@ public class KeyedTableDataView extends AbstractTableDataView {
6566
6667 private final RandomRecordGenerator generator ;
6768
69+ private final InternalRecordWrapper wrapper ;
70+
6871 public KeyedTableDataView (
6972 MixedTable mixedTable ,
7073 Schema primary ,
@@ -77,6 +80,7 @@ public KeyedTableDataView(
7780 org .apache .amoro .shade .guava32 .com .google .common .base .Preconditions .checkArgument (
7881 primary .columns ().size () == 1
7982 && primary .columns ().get (0 ).type ().typeId () == Type .TypeID .INTEGER );
83+ this .wrapper = new InternalRecordWrapper (schema .asStruct ());
8084 this .schemaSize = schema .columns ().size ();
8185
8286 this .primaryUpperBound = primaryUpperBound ;
@@ -87,7 +91,7 @@ public KeyedTableDataView(
8791 this .view = StructLikeMap .create (primary .asStruct ());
8892 List <Record > records = new DataReader (mixedTable ).allData ();
8993 for (Record record : records ) {
90- view .put (record , record );
94+ view .put (wrapper . copyFor ( record ) , record );
9195 }
9296
9397 Map <Integer , Map <Integer , Object >> primaryRelationWithPartition = new HashMap <>();
@@ -132,7 +136,7 @@ public WriteResult append(int count) throws IOException {
132136 List <RecordWithAction > records = new ArrayList <>();
133137 for (int i = 0 ; i < primaryUpperBound ; i ++) {
134138 Record record = generator .randomRecord (i );
135- if (!view .containsKey (record )) {
139+ if (!view .containsKey (wrapper . copyFor ( record ) )) {
136140 records .add (new RecordWithAction (record , ChangeAction .INSERT ));
137141 }
138142 if (records .size () == count ) {
@@ -156,13 +160,14 @@ public WriteResult cdc(int count) throws IOException {
156160 List <Record > scatter = randomRecord (count );
157161 List <RecordWithAction > cdc = new ArrayList <>();
158162 for (Record record : scatter ) {
159- if (view .containsKey (record )) {
163+ if (view .containsKey (wrapper . copyFor ( record ) )) {
160164 if (random .nextBoolean ()) {
161165 // delete
162- cdc .add (new RecordWithAction (view .get (record ), ChangeAction .DELETE ));
166+ cdc .add (new RecordWithAction (view .get (wrapper . copyFor ( record ) ), ChangeAction .DELETE ));
163167 } else {
164168 // update
165- cdc .add (new RecordWithAction (view .get (record ), ChangeAction .UPDATE_BEFORE ));
169+ cdc .add (
170+ new RecordWithAction (view .get (wrapper .copyFor (record )), ChangeAction .UPDATE_BEFORE ));
166171 cdc .add (new RecordWithAction (record , ChangeAction .UPDATE_AFTER ));
167172 }
168173 } else {
@@ -210,7 +215,7 @@ public MatchResult match(List<Record> records) {
210215 List <Record > inViewButDuplicate = new ArrayList <>();
211216 StructLikeSet intersection = StructLikeSet .create (schema .asStruct ());
212217 for (Record record : records ) {
213- Record viewRecord = view .get (record );
218+ Record viewRecord = view .get (wrapper . copyFor ( record ) );
214219 if (viewRecord == null ) {
215220 notInView .add (record );
216221 }
@@ -304,12 +309,12 @@ private void writeView(List<RecordWithAction> records) {
304309 changeLog .add (record );
305310 ChangeAction action = record .getAction ();
306311 if (action == ChangeAction .DELETE || action == ChangeAction .UPDATE_BEFORE ) {
307- view .remove (record );
312+ view .remove (wrapper . copyFor ( record ) );
308313 } else {
309- if (view .containsKey (record )) {
314+ if (view .containsKey (wrapper . copyFor ( record ) )) {
310315 throw new RuntimeException ("You write duplicate pk" );
311316 }
312- view .put (record , record );
317+ view .put (wrapper . copyFor ( record ) , record );
313318 }
314319 }
315320 }
0 commit comments