-
Notifications
You must be signed in to change notification settings - Fork 411
Numeric type widening converters #1877
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Numeric type widening converters #1877
Conversation
…orresponding unit tests in Fluss Client
…nit tests for various widening scenarios in Fluss Flink
…ening support in Fluss Java Client and DataStream API
|
@polyzos Can you please review the pull request. Thanks |
| - `short` → `int`, `long`, `float`, `double` | ||
| - `int` → `long`, `float`, `double` | ||
| - `long` → `float`, `double` | ||
| - `float` → `double` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, Float.NaN
same for double
also would be great to have test for such corner cases
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not clear what about BigInteger/BigDecimal?
| Short.class, | ||
| Byte.class)); // all numerics can widen to double | ||
|
|
||
| // Non-numeric types remain unchanged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you elaborate why
map.put(DataTypeRoot.DECIMAL, setOf(BigDecimal.class));is considered as non-numeric?
| if (v instanceof Float) { | ||
| return ((Float) v).doubleValue(); // float -> double widening | ||
| } | ||
| if (v instanceof Long) { | ||
| return ((Long) v).doubleValue(); // long -> double widening | ||
| } | ||
| if (v instanceof Integer) { | ||
| return ((Integer) v).doubleValue(); // int -> double widening | ||
| } | ||
| if (v instanceof Short) { | ||
| return ((Short) v).doubleValue(); // short -> double widening | ||
| } | ||
| if (v instanceof Byte) { | ||
| return ((Byte) v).doubleValue(); // byte -> double widening | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder this can be replaced with something like
if (v instanceof Number) {
return ((Number) v).doubleValue();
}similar for other places
| // ----------------------- Numeric Type Widening Tests ----------------------- | ||
|
|
||
| @Test | ||
| public void testByteToSmallIntWidening() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| public void testByteToSmallIntWidening() { | |
| void testByteToSmallIntWidening() { |
nit: here and in other places no need for public in junit5
| @Test | ||
| public void testIntToBigIntWidening() { | ||
| RowType table = RowType.builder().field("orderId", DataTypes.BIGINT()).build(); | ||
| RowType projection = table; | ||
|
|
||
| PojoToRowConverter<IntFieldPojo> converter = | ||
| PojoToRowConverter.of(IntFieldPojo.class, table, projection); | ||
|
|
||
| IntFieldPojo pojo = new IntFieldPojo(); | ||
| pojo.orderId = 123456; | ||
| GenericRow row = converter.toRow(pojo); | ||
|
|
||
| assertThat(row.getLong(0)).isEqualTo(123456L); | ||
| } | ||
|
|
||
| @Test | ||
| public void testShortToBigIntWidening() { | ||
| RowType table = RowType.builder().field("quantity", DataTypes.BIGINT()).build(); | ||
| RowType projection = table; | ||
|
|
||
| PojoToRowConverter<ShortFieldPojo> converter = | ||
| PojoToRowConverter.of(ShortFieldPojo.class, table, projection); | ||
|
|
||
| ShortFieldPojo pojo = new ShortFieldPojo(); | ||
| pojo.quantity = (short) 999; | ||
| GenericRow row = converter.toRow(pojo); | ||
|
|
||
| assertThat(row.getLong(0)).isEqualTo(999L); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all these and other tests in this class are pretty similar
can we parameterize them?
|
@hemanthsavasere can you remove the documentation? This functionality is meant for internal use. |
|
@snuyanzin thank you for reviewing this and your thoughtful feedback |
Purpose
Linked issue: close #1844
Perform automatic type widening between numeric types when converting POJOs to internal rows. Change from strict and exact type validation to widening conversion from lower to higher data types in POJO field's Java type and the schema's Fluss type.
Implementation Sample Scenarios :
POJO with int field + Schema with BIGINT → SUCCEEDS ✅
POJO with short field + Schema with BIGINT → SUCCEEDS ✅
POJO with int field + Schema with FLOAT → SUCCEEDS ✅
POJO with Long field + Schema with SMALLINT → FAILS (narrowing not supported) ❌
Widening Primitive Conversions :
byte (TINYINT) → short, int, long, float, double
short (SMALLINT) → int, long, float, double
int (INTEGER) → long, float, double
long (BIGINT) → float, double
float (FLOAT) → double
fluss-client
fluss-flink-common
Tests
Added comprehensive tests in PojoToRowConverterTest.java for all widening combinations in fluss-client
Added comprehensive tests in PojoToRowConverterTest.java covering all widening scenarios in fluss-flink-common
Documentation
Updated Java Client documentation (website/docs/apis/java-client.md) with POJO converter usage examples and numeric type widening explanation
Updated Flink DataStream documentation (website/docs/engine-flink/datastream.mdx) with POJO converter notes and widening support details