@@ -44,6 +44,8 @@ public class SparkYarnTasklet implements InitializingBean, Tasklet, StepExecutio
44
44
45
45
private String sparkAssemblyJar ;
46
46
47
+ private String extraClassPath ;
48
+
47
49
private Configuration hadoopConfiguration ;
48
50
49
51
private String appName ;
@@ -58,13 +60,23 @@ public class SparkYarnTasklet implements InitializingBean, Tasklet, StepExecutio
58
60
59
61
private String executorMemory ;
60
62
63
+ private String executorCores ;
64
+
65
+ private String driverMemory ;
66
+
67
+ private String driverCores ;
68
+
61
69
private int numExecutors ;
62
70
63
71
private String [] arguments ;
64
72
65
73
public RepeatStatus execute (StepContribution contribution , ChunkContext chunkContext ) throws Exception {
66
74
SparkConf sparkConf = new SparkConf ();
67
75
sparkConf .set ("spark.yarn.jar" , sparkAssemblyJar );
76
+ if (StringUtils .hasText (extraClassPath )) {
77
+ sparkConf .set ("spark.driver.extraClassPath" , extraClassPath );
78
+ sparkConf .set ("spark.executor.extraClassPath" , extraClassPath );
79
+ }
68
80
List <String > submitArgs = new ArrayList <String >();
69
81
if (StringUtils .hasText (appName )) {
70
82
submitArgs .add ("--name" );
@@ -82,10 +94,22 @@ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkCon
82
94
submitArgs .add ("--archives" );
83
95
submitArgs .add (resourceArchives );
84
96
}
85
- submitArgs .add ("--executor-memory" );
86
- submitArgs .add (executorMemory );
87
97
submitArgs .add ("--num-executors" );
88
98
submitArgs .add ("" + numExecutors );
99
+ submitArgs .add ("--executor-memory" );
100
+ submitArgs .add (executorMemory );
101
+ if (!StringUtils .hasText (executorCores )) {
102
+ submitArgs .add ("--executor-cores" );
103
+ submitArgs .add (executorCores );
104
+ }
105
+ if (!StringUtils .hasText (driverMemory )) {
106
+ submitArgs .add ("--driver-memory" );
107
+ submitArgs .add (driverMemory );
108
+ }
109
+ if (!StringUtils .hasText (driverCores )) {
110
+ submitArgs .add ("--driver-cores" );
111
+ submitArgs .add (driverCores );
112
+ }
89
113
for (String arg : arguments ) {
90
114
submitArgs .add ("--arg" );
91
115
submitArgs .add (arg );
@@ -137,6 +161,10 @@ public void setSparkAssemblyJar(String sparkAssemblyJar) {
137
161
this .sparkAssemblyJar = sparkAssemblyJar ;
138
162
}
139
163
164
+ public void setExtraClassPath (String extraClassPath ) {
165
+ this .extraClassPath = extraClassPath ;
166
+ }
167
+
140
168
public void setHadoopConfiguration (Configuration configuration ) {
141
169
this .hadoopConfiguration = configuration ;
142
170
}
@@ -165,6 +193,18 @@ public void setExecutorMemory(String executorMemory) {
165
193
this .executorMemory = executorMemory ;
166
194
}
167
195
196
+ public void setExecutorCores (String executorCores ) {
197
+ this .executorCores = executorCores ;
198
+ }
199
+
200
+ public void setDriverMemory (String driverMemory ) {
201
+ this .driverMemory = driverMemory ;
202
+ }
203
+
204
+ public void setDriverCores (String driverCores ) {
205
+ this .driverCores = driverCores ;
206
+ }
207
+
168
208
public void setNumExecutors (int numExecutors ) {
169
209
this .numExecutors = numExecutors ;
170
210
}
0 commit comments