Skip to content

Commit 48adc90

Browse files
Feature/368 scalable scheduler (#370)
* Add ComputeInstance repository * Rename ComputeInstance -> SchedulerInstance * Rename ping -> heartbeat * Add scheduler instance id to workflow * Implement workflow balancing * Fix suddenly failing test * Add tests * Add integration test * Add logging * Only process sensors of assigned workflows * Fix Thread sleep in the wrong block * Only select dags from assigned workflows * Fix test * Add more logging * Fixes * Only consider active instances for rebalancing * Fix instancesSteady check * Delete deactivated instances * Tidy up PR * Rename drop -> release * Add FK to workflow table, don't change scheduler instance id on updating workflow * Guard assignWorkflows against concurrent execution * Merge branch 'develop' into feature/368-scalable-scheduler * Use liquibase * Modify changeset version 0.3.3 to 0.3.4
1 parent f977c5d commit 48adc90

37 files changed

+1727
-85
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,13 @@ appUniqueId=
8181
# Core properties.
8282
# How many threads to use for each part of the "scheduler".
8383
# Heart beat interval in milliseconds.
84+
# Lag threshold, before instance is deactivated by another instance.
8485
scheduler.thread.pool.size=10
8586
scheduler.sensors.thread.pool.size=20
8687
scheduler.executors.thread.pool.size=30
8788
scheduler.jobs.parallel.number=100
8889
scheduler.heart.beat=5000
90+
scheduler.lag.threshold=20000
8991
```
9092
```
9193
#Kafka sensor properties. Not all are required. Adjust according to your use case.

src/main/resources/application.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ scheduler.sensors.thread.pool.size=20
4848
scheduler.executors.thread.pool.size=30
4949
scheduler.jobs.parallel.number=100
5050
scheduler.heart.beat=5000
51+
scheduler.lag.threshold=20000
5152

5253
#Kafka sensor properties.
5354
kafkaSource.group.id=hyper_drive_${appUniqueId}

src/main/resources/db_scripts/db_script_latest.sql

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ create table "workflow" (
1919
"project" VARCHAR NOT NULL,
2020
"created" TIMESTAMP NOT NULL,
2121
"updated" TIMESTAMP,
22+
"scheduler_instance_id" BIGINT,
2223
"id" BIGSERIAL NOT NULL PRIMARY KEY
2324
);
2425

@@ -98,6 +99,12 @@ create table "job_template" (
9899
"form_config" VARCHAR NOT NULL DEFAULT 'unknown'
99100
);
100101

102+
create table "scheduler_instance" (
103+
"id" BIGSERIAL NOT NULL PRIMARY KEY,
104+
"status" VARCHAR NOT NULL,
105+
"last_heartbeat" TIMESTAMP NOT NULL
106+
);
107+
101108
alter table "job_instance"
102109
add constraint "job_instance_dag_instance_fk"
103110
foreign key("dag_instance_id")
@@ -146,6 +153,12 @@ alter table "dag_instance"
146153
references "workflow"("id")
147154
on update NO ACTION on delete NO ACTION;
148155

156+
alter table "workflow"
157+
add constraint "workflow_scheduler_instance_fk"
158+
foreign key("scheduler_instance_id")
159+
references "scheduler_instance"("id")
160+
on update NO ACTION on delete NO ACTION;
161+
149162
create view "dag_run_view" AS
150163
select
151164
dag_instance.id as "id",
@@ -174,3 +187,5 @@ values ('Generic Shell Job', 'Shell', 'Shell', '{}', '{}', '{}');
174187

175188
CREATE INDEX job_instance_dag_instance_idx ON job_instance (dag_instance_id);
176189
CREATE INDEX dag_instance_workflow_id_idx ON dag_instance (workflow_id);
190+
CREATE INDEX workflow_scheduler_inst_id_idx ON workflow (scheduler_instance_id);
191+

src/main/resources/db_scripts/liquibase/db.changelog.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,7 @@ databaseChangeLog:
4444
- include:
4545
relativeToChangelogFile: true
4646
file: v0.3.3.add-indexes.yml
47+
- include:
48+
relativeToChangelogFile: true
49+
file: v0.3.4.add-scheduler-instances.yml
50+
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
create table "scheduler_instance" (
17+
"id" BIGSERIAL NOT NULL PRIMARY KEY,
18+
"status" VARCHAR NOT NULL,
19+
"last_heartbeat" TIMESTAMP NOT NULL
20+
);
21+
22+
alter table "workflow" add column "scheduler_instance_id" BIGINT;
23+
24+
alter table "workflow"
25+
add constraint "workflow_scheduler_instance_fk"
26+
foreign key("scheduler_instance_id")
27+
references "scheduler_instance"("id")
28+
on update NO ACTION on delete NO ACTION;
29+
30+
CREATE INDEX workflow_scheduler_inst_id_idx ON workflow (scheduler_instance_id);
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Copyright 2018 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
#
15+
16+
databaseChangeLog:
17+
- changeSet:
18+
id: v0.3.4.add-scheduler-instances
19+
logicalFilePath: v0.3.4.add-scheduler-instances
20+
21+
context: default
22+
changes:
23+
- sqlFile:
24+
relativeToChangelogFile: true
25+
path: v0.3.4.add-scheduler-instances.sql
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.models
17+
18+
import java.time.LocalDateTime
19+
20+
import za.co.absa.hyperdrive.trigger.models.enums.SchedulerInstanceStatuses.SchedulerInstanceStatus
21+
22+
case class SchedulerInstance(
23+
id: Long = 0,
24+
status: SchedulerInstanceStatus,
25+
lastHeartbeat: LocalDateTime
26+
)

src/main/scala/za/co/absa/hyperdrive/trigger/models/Workflow.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ case class Workflow(
2323
project: String,
2424
created: LocalDateTime = LocalDateTime.now(),
2525
updated: Option[LocalDateTime],
26+
schedulerInstanceId: Option[Long] = None,
2627
id: Long = 0
2728
)
2829

@@ -36,6 +37,7 @@ case class WorkflowJoined(
3637
project: String,
3738
created: LocalDateTime = LocalDateTime.now(),
3839
updated: Option[LocalDateTime],
40+
schedulerInstanceId: Option[Long] = None,
3941
sensor: Sensor,
4042
dagDefinitionJoined: DagDefinitionJoined,
4143
id: Long = 0
@@ -47,6 +49,7 @@ case class WorkflowJoined(
4749
project = this.project,
4850
created = this.created,
4951
updated = this.updated,
52+
schedulerInstanceId = this.schedulerInstanceId,
5053
id = this.id
5154
)
5255
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.trigger.models.enums
17+
18+
object SchedulerInstanceStatuses {
19+
20+
sealed abstract class SchedulerInstanceStatus(val name: String) {
21+
override def toString: String = name
22+
}
23+
24+
case object Active extends SchedulerInstanceStatus("Active")
25+
case object Deactivated extends SchedulerInstanceStatus("Deactivated")
26+
27+
val statuses: Set[SchedulerInstanceStatus] = Set(Active, Deactivated)
28+
29+
}

src/main/scala/za/co/absa/hyperdrive/trigger/models/tables/JdbcTypeMapper.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package za.co.absa.hyperdrive.trigger.models.tables
1717

1818
import java.io.StringWriter
1919

20-
import za.co.absa.hyperdrive.trigger.models.enums.{DBOperation, DagInstanceStatuses, JobStatuses, JobTypes, SensorTypes}
20+
import za.co.absa.hyperdrive.trigger.models.enums.{DBOperation, DagInstanceStatuses, SchedulerInstanceStatuses, JobStatuses, JobTypes, SensorTypes}
2121
import za.co.absa.hyperdrive.trigger.models.enums.SensorTypes.SensorType
2222
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.JobStatus
2323
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes.JobType
@@ -27,6 +27,7 @@ import za.co.absa.hyperdrive.trigger.ObjectMapperSingleton
2727
import za.co.absa.hyperdrive.trigger.models.WorkflowJoined
2828
import za.co.absa.hyperdrive.trigger.models.enums.DBOperation.DBOperation
2929
import za.co.absa.hyperdrive.trigger.models.enums.DagInstanceStatuses.DagInstanceStatus
30+
import za.co.absa.hyperdrive.trigger.models.enums.SchedulerInstanceStatuses.SchedulerInstanceStatus
3031

3132
import scala.collection.immutable.SortedMap
3233
import scala.util.Try
@@ -84,6 +85,14 @@ trait JdbcTypeMapper {
8485
)
8586
)
8687

88+
implicit lazy val instanceStatusMapper: JdbcType[SchedulerInstanceStatus] =
89+
MappedColumnType.base[SchedulerInstanceStatus, String](
90+
status => status.name,
91+
statusName => SchedulerInstanceStatuses.statuses.find(_.name == statusName).getOrElse(
92+
throw new Exception(s"Couldn't find SchedulerInstanceStatus: $statusName")
93+
)
94+
)
95+
8796
implicit lazy val payloadMapper: JdbcType[JsValue] =
8897
MappedColumnType.base[JsValue, String](
8998
payload => payload.toString(),

0 commit comments

Comments
 (0)