Skip to content

Commit ddba1b6

Browse files
committed
[FLINK-7251] [types] Remove the flink-java8 module and improve lambda type extraction
This commit removes the flink-java8 module and merges some tests into flink-core/flink-runtime. It ensures to have the possibility for passing explicit type information in DataStream API as a fallback. Since the tycho compiler approach was very hacky and seems not to work anymore, this commit also removes all references in the docs and quickstarts. This closes apache#6120.
1 parent 95eadfe commit ddba1b6

File tree

52 files changed

+782
-2392
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+782
-2392
lines changed

docs/dev/java8.md

Lines changed: 0 additions & 198 deletions
This file was deleted.

docs/dev/java_lambdas.md

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
---
2+
title: "Java Lambda Expressions"
3+
nav-parent_id: api-concepts
4+
nav-pos: 20
5+
---
6+
<!--
7+
Licensed to the Apache Software Foundation (ASF) under one
8+
or more contributor license agreements. See the NOTICE file
9+
distributed with this work for additional information
10+
regarding copyright ownership. The ASF licenses this file
11+
to you under the Apache License, Version 2.0 (the
12+
"License"); you may not use this file except in compliance
13+
with the License. You may obtain a copy of the License at
14+
15+
http://www.apache.org/licenses/LICENSE-2.0
16+
17+
Unless required by applicable law or agreed to in writing,
18+
software distributed under the License is distributed on an
19+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20+
KIND, either express or implied. See the License for the
21+
specific language governing permissions and limitations
22+
under the License.
23+
-->
24+
25+
Java 8 introduced several new language features designed for faster and clearer coding. With the most important feature,
26+
the so-called "Lambda Expressions", it opened the door to functional programming. Lambda expressions allow for implementing and
27+
passing functions in a straightforward way without having to declare additional (anonymous) classes.
28+
29+
<span class="label label-danger">Attention</span> Flink supports the usage of lambda expressions for all operators of the Java API, however, whenever a lambda expression uses Java generics you need to declare type information *explicitly*.
30+
31+
This document shows how to use lambda expressions and describes current limitations. For a general introduction to the
32+
Flink API, please refer to the [Programming Guide]({{ site.baseurl }}/dev/api_concepts.html)
33+
34+
### Examples and Limitations
35+
36+
The following example illustrates how to implement a simple, inline `map()` function that squares its input using a lambda expression.
37+
The types of input `i` and output parameters of the `map()` function need not to be declared as they are inferred by the Java compiler.
38+
39+
{% highlight java %}
40+
env.fromElements(1, 2, 3)
41+
// returns the squared i
42+
.map(i -> i*i)
43+
.print();
44+
{% endhighlight %}
45+
46+
Flink can automatically extract the result type information from the implementation of the method signature `OUT map(IN value)` because `OUT` is not generic but `Integer`.
47+
48+
Unfortunately, functions such as `flatMap()` with a signature `void flatMap(IN value, Collector<OUT> out)` are compiled into `void flatMap(IN value, Collector out)` by the Java compiler. This makes it impossible for Flink to infer the type information for the output type automatically.
49+
50+
Flink will most likely throw an exception similar to the following:
51+
52+
{% highlight plain%}
53+
org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
54+
In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
55+
An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
56+
Otherwise the type has to be specified explicitly using type information.
57+
{% endhighlight %}
58+
59+
In this case, the type information needs to be *specified explicitly*, otherwise the output will be treated as type `Object` which leads to unefficient serialization.
60+
61+
{% highlight java %}
62+
import org.apache.flink.api.common.typeinfo.Types;
63+
import org.apache.flink.api.java.DataSet;
64+
import org.apache.flink.util.Collector;
65+
66+
DataSet<Integer> input = env.fromElements(1, 2, 3);
67+
68+
// collector type must be declared
69+
input.flatMap((Integer number, Collector<String> out) -> {
70+
StringBuilder builder = new StringBuilder();
71+
for(int i = 0; i < number; i++) {
72+
builder.append("a");
73+
out.collect(builder.toString());
74+
}
75+
})
76+
// provide type information explicitly
77+
.returns(Types.STRING)
78+
// prints "a", "a", "aa", "a", "aa", "aaa"
79+
.print();
80+
{% endhighlight %}
81+
82+
Similar problems occur when using a `map()` function with a generic return type. A method signature `Tuple2<Integer, Integer> map(Integer value)` is erasured to `Tuple2 map(Integer value)` in the example below.
83+
84+
{% highlight java %}
85+
import org.apache.flink.api.common.functions.MapFunction;
86+
import org.apache.flink.api.java.tuple.Tuple2;
87+
88+
env.fromElements(1, 2, 3)
89+
.map(i -> Tuple2.of(i, i)) // no information about fields of Tuple2
90+
.print();
91+
{% endhighlight %}
92+
93+
In general, those problems can be solved in multiple ways:
94+
95+
{% highlight java %}
96+
import org.apache.flink.api.common.typeinfo.Types;
97+
import org.apache.flink.api.java.tuple.Tuple2;
98+
99+
// use the explicit ".returns(...)"
100+
env.fromElements(1, 2, 3)
101+
.map(i -> Tuple2.of(i, i))
102+
.returns(Types.TUPLE(Types.INT, Types.INT))
103+
.print();
104+
105+
// use a class instead
106+
env.fromElements(1, 2, 3)
107+
.map(new MyTuple2Mapper())
108+
.print();
109+
110+
public static class MyTuple2Mapper extends MapFunction<Integer, Integer> {
111+
@Override
112+
public Tuple2<Integer, Integer> map(Integer i) {
113+
return Tuple2.of(i, i);
114+
}
115+
}
116+
117+
// use an anonymous class instead
118+
env.fromElements(1, 2, 3)
119+
.map(new MapFunction<Integer, Tuple2<Integer, Integer>> {
120+
@Override
121+
public Tuple2<Integer, Integer> map(Integer i) {
122+
return Tuple2.of(i, i);
123+
}
124+
})
125+
.print();
126+
127+
// or in this example use a tuple subclass instead
128+
env.fromElements(1, 2, 3)
129+
.map(i -> new DoubleTuple(i, i))
130+
.print();
131+
132+
public static class DoubleTuple extends Tuple2<Integer, Integer> {
133+
public DoubleTuple(int f0, int f1) {
134+
this.f0 = f0;
135+
this.f1 = f1;
136+
}
137+
}
138+
{% endhighlight %}

flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,21 +158,25 @@ public static LambdaExecutable checkAndExtractLambda(Function function) throws T
158158
/**
159159
* Extracts type from given index from lambda. It supports nested types.
160160
*
161+
* @param baseClass SAM function that the lambda implements
161162
* @param exec lambda function to extract the type from
162163
* @param lambdaTypeArgumentIndices position of type to extract in type hierarchy
163164
* @param paramLen count of total parameters of the lambda (including closure parameters)
164165
* @param baseParametersLen count of lambda interface parameters (without closure parameters)
165166
* @return extracted type
166167
*/
167168
public static Type extractTypeFromLambda(
169+
Class<?> baseClass,
168170
LambdaExecutable exec,
169171
int[] lambdaTypeArgumentIndices,
170172
int paramLen,
171173
int baseParametersLen) {
172174
Type output = exec.getParameterTypes()[paramLen - baseParametersLen + lambdaTypeArgumentIndices[0]];
173175
for (int i = 1; i < lambdaTypeArgumentIndices.length; i++) {
176+
validateLambdaType(baseClass, output);
174177
output = extractTypeArgument(output, lambdaTypeArgumentIndices[i]);
175178
}
179+
validateLambdaType(baseClass, output);
176180
return output;
177181
}
178182

@@ -328,4 +332,23 @@ public static Class<?> getRawClass(Type t) {
328332
}
329333
return Object.class;
330334
}
335+
336+
/**
337+
* Checks whether the given type has the generic parameters declared in the class definition.
338+
*
339+
* @param t type to be validated
340+
*/
341+
public static void validateLambdaType(Class<?> baseClass, Type t) {
342+
if (!(t instanceof Class)) {
343+
return;
344+
}
345+
final Class<?> clazz = (Class<?>) t;
346+
347+
if (clazz.getTypeParameters().length > 0) {
348+
throw new InvalidTypesException("The generic type parameters of '" + clazz.getSimpleName() + "' are missing. "
349+
+ "In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. "
350+
+ "An easy workaround is to use an (anonymous) class instead that implements the '" + baseClass.getName() + "' interface. "
351+
+ "Otherwise the type has to be specified explicitly using type information.");
352+
}
353+
}
331354
}

0 commit comments

Comments
 (0)