Skip to content

Commit 1bcfd50

Browse files
authored
Merge pull request #1215 from SpiNNakerManchester/sanity_check_jobs
Sanity check jobs
2 parents f6a47f9 + a46d2f9 commit 1bcfd50

File tree

15 files changed

+520
-121
lines changed

15 files changed

+520
-121
lines changed

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/AllocatorTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ private final class AllocSQL extends PowerSQL {
453453
getRectangles = conn.query(findRectangle);
454454
getRectangleAt = conn.query(findRectangleAt);
455455
countConnectedBoards = conn.query(countConnected);
456-
findSpecificBoard = conn.query(findLocation);
456+
findSpecificBoard = conn.query(FIND_LOCATION);
457457
getConnectedBoardIDs = conn.query(getConnectedBoards);
458458
allocBoard = conn.update(ALLOCATE_BOARDS_BOARD);
459459
allocJob = conn.update(ALLOCATE_BOARDS_JOB);

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/Spalloc.java

+77-34
Original file line numberDiff line numberDiff line change
@@ -416,30 +416,32 @@ private static JobDescription jobDescription(int id, Row job,
416416
}
417417

418418
@Override
419-
public Optional<Job> createJobInGroup(String owner, String groupName,
419+
public Job createJobInGroup(String owner, String groupName,
420420
CreateDescriptor descriptor, String machineName, List<String> tags,
421-
Duration keepaliveInterval, byte[] req) {
421+
Duration keepaliveInterval, byte[] req)
422+
throws IllegalArgumentException {
422423
return execute(conn -> {
423424
int user = getUser(conn, owner).orElseThrow(
424425
() -> new RuntimeException("no such user: " + owner));
425426
int group = selectGroup(conn, owner, groupName);
426427
if (!quotaManager.mayCreateJob(group)) {
427428
// No quota left
428-
return Optional.empty();
429+
throw new IllegalArgumentException(
430+
"quota exceeded in group " + group);
429431
}
430432

431-
var m = selectMachine(conn, machineName, tags);
433+
var m = selectMachine(conn, descriptor, machineName, tags);
432434
if (!m.isPresent()) {
433-
// Cannot find machine!
434-
return Optional.empty();
435+
throw new IllegalArgumentException(
436+
"no machine available which matches allocation "
437+
+ "request");
435438
}
436439
var machine = m.orElseThrow();
437440

438441
var id = insertJob(conn, machine, user, group, keepaliveInterval,
439442
req);
440443
if (!id.isPresent()) {
441-
// Insert failed
442-
return Optional.empty();
444+
throw new RuntimeException("failed to create job");
443445
}
444446
int jobId = id.orElseThrow();
445447

@@ -512,12 +514,13 @@ public Integer board(CreateBoard b) {
512514
machine.name, owner, numBoards);
513515

514516
allocator.scheduleAllocateNow();
515-
return getJob(jobId, conn).map(ji -> (Job) ji);
517+
return getJob(jobId, conn).map(ji -> (Job) ji).orElseThrow(
518+
() -> new RuntimeException("Error creating job!"));
516519
});
517520
}
518521

519522
@Override
520-
public Optional<Job> createJob(String owner, CreateDescriptor descriptor,
523+
public Job createJob(String owner, CreateDescriptor descriptor,
521524
String machineName, List<String> tags, Duration keepaliveInterval,
522525
byte[] originalRequest) {
523526
return execute(conn -> createJobInGroup(
@@ -526,7 +529,7 @@ owner, getOnlyGroup(conn, owner), descriptor, machineName,
526529
}
527530

528531
@Override
529-
public Optional<Job> createJobInCollabSession(String owner,
532+
public Job createJobInCollabSession(String owner,
530533
String nmpiCollab, CreateDescriptor descriptor,
531534
String machineName, List<String> tags, Duration keepaliveInterval,
532535
byte[] originalRequest) {
@@ -537,39 +540,30 @@ public Optional<Job> createJobInCollabSession(String owner,
537540
var job = execute(conn -> createJobInGroup(
538541
owner, nmpiCollab, descriptor, machineName,
539542
tags, keepaliveInterval, originalRequest));
540-
// On failure to get job, just return; shouldn't happen as quota checked
541-
// earlier, but just in case!
542-
if (job.isEmpty()) {
543-
return job;
544-
}
545543

546-
quotaManager.associateNMPISession(job.get().getId(), session.getId(),
544+
quotaManager.associateNMPISession(job.getId(), session.getId(),
547545
quotaUnits);
548546

549547
// Return the job created
550548
return job;
551549
}
552550

553551
@Override
554-
public Optional<Job> createJobForNMPIJob(String owner, int nmpiJobId,
552+
public Job createJobForNMPIJob(String owner, int nmpiJobId,
555553
CreateDescriptor descriptor, String machineName, List<String> tags,
556554
Duration keepaliveInterval, byte[] originalRequest) {
557555
var collab = quotaManager.mayUseNMPIJob(owner, nmpiJobId);
558556
if (collab.isEmpty()) {
559-
return Optional.empty();
557+
throw new IllegalArgumentException("User cannot create session in "
558+
+ "NMPI job" + nmpiJobId);
560559
}
561560
var quotaDetails = collab.get();
562561

563562
var job = execute(conn -> createJobInGroup(
564563
owner, quotaDetails.collabId, descriptor, machineName,
565564
tags, keepaliveInterval, originalRequest));
566-
// On failure to get job, just return; shouldn't happen as quota checked
567-
// earlier, but just in case!
568-
if (job.isEmpty()) {
569-
return job;
570-
}
571565

572-
quotaManager.associateNMPIJob(job.get().getId(), nmpiJobId,
566+
quotaManager.associateNMPIJob(job.getId(), nmpiJobId,
573567
quotaDetails.quotaUnits);
574568

575569
// Return the job created
@@ -689,25 +683,74 @@ private static Optional<Integer> insertJob(Connection conn, MachineImpl m,
689683
}
690684

691685
private Optional<MachineImpl> selectMachine(Connection conn,
692-
String machineName, List<String> tags) {
686+
CreateDescriptor descriptor, String machineName,
687+
List<String> tags) {
693688
if (nonNull(machineName)) {
694-
return getMachine(machineName, false, conn);
695-
} else if (!tags.isEmpty()) {
689+
var m = getMachine(machineName, false, conn);
690+
if (m.isPresent() && isAllocPossible(conn, descriptor, m.get())) {
691+
return m;
692+
}
693+
return Optional.empty();
694+
}
695+
696+
if (!tags.isEmpty()) {
696697
for (var m : getMachines(conn, false).values()) {
697698
var mi = (MachineImpl) m;
698-
if (mi.tags.containsAll(tags)) {
699-
/*
700-
* Originally, spalloc checked if allocation was possible;
701-
* we just assume that it is because there really isn't ever
702-
* going to be that many different machines on one service.
703-
*/
699+
if (mi.tags.containsAll(tags)
700+
&& isAllocPossible(conn, descriptor, mi)) {
704701
return Optional.of(mi);
705702
}
706703
}
707704
}
708705
return Optional.empty();
709706
}
710707

708+
private boolean isAllocPossible(final Connection conn,
709+
final CreateDescriptor descriptor,
710+
final MachineImpl m) {
711+
return descriptor.visit(new CreateVisitor<Boolean>() {
712+
@Override
713+
public Boolean numBoards(CreateNumBoards nb) {
714+
try (var getNBoards = conn.query(COUNT_FUNCTIONING_BOARDS)) {
715+
var numBoards = getNBoards.call1(integer("c"), m.id)
716+
.orElseThrow();
717+
return numBoards >= nb.numBoards;
718+
}
719+
}
720+
721+
@Override
722+
public Boolean dimensions(CreateDimensions d) {
723+
try (var checkPossible = conn.query(checkRectangle)) {
724+
return checkPossible.call1((r) -> true, d.width, d.height,
725+
m.id, d.maxDead).isPresent();
726+
}
727+
}
728+
729+
@Override
730+
public Boolean dimensionsAt(CreateDimensionsAt da) {
731+
try (var checkPossible = conn.query(checkRectangleAt)) {
732+
int board = locateBoard(conn, m.name, da, true);
733+
return checkPossible.call1((r) -> true, board,
734+
da.width, da.height, m.id, da.maxDead).isPresent();
735+
} catch (IllegalArgumentException e) {
736+
// This means the board doesn't exist on the given machine
737+
return false;
738+
}
739+
}
740+
741+
@Override
742+
public Boolean board(CreateBoard b) {
743+
try (var check = conn.query(CHECK_LOCATION)) {
744+
int board = locateBoard(conn, m.name, b, false);
745+
return check.call1((r) -> true, m.id, board).isPresent();
746+
} catch (IllegalArgumentException e) {
747+
// This means the board doesn't exist on the given machine
748+
return false;
749+
}
750+
}
751+
});
752+
}
753+
711754
@Override
712755
public void purgeDownCache() {
713756
synchronized (this) {

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/SpallocAPI.java

+16-12
Original file line numberDiff line numberDiff line change
@@ -192,12 +192,13 @@ Optional<MachineDescription> getMachineInfo(@NotNull String machine,
192192
* @param originalRequest
193193
* The serialized original request, which will be stored in the
194194
* database for later retrieval.
195-
* @return Handle to the job, or {@code empty} if the job couldn't be made.
195+
* @return The job created.
196+
* @throws IllegalArgumentException if the job could not be created.
196197
*/
197-
Optional<Job> createJob(@NotNull String owner,
198+
Job createJob(@NotNull String owner,
198199
@Valid CreateDescriptor descriptor, String machineName,
199200
List<String> tags, Duration keepaliveInterval,
200-
byte[] originalRequest);
201+
byte[] originalRequest) throws IllegalArgumentException;
201202

202203
/**
203204
* Create a job for a user in a specific local group.
@@ -224,12 +225,13 @@ Optional<Job> createJob(@NotNull String owner,
224225
* @param originalRequest
225226
* The serialized original request, which will be stored in the
226227
* database for later retrieval.
227-
* @return Handle to the job, or {@code empty} if the job couldn't be made.
228+
* @return The job created.
229+
* @throws IllegalArgumentException if the job could not be created.
228230
*/
229-
Optional<Job> createJobInGroup(@NotNull String owner, @NotNull String group,
231+
Job createJobInGroup(@NotNull String owner, @NotNull String group,
230232
@Valid CreateDescriptor descriptor, String machineName,
231233
List<String> tags, Duration keepaliveInterval,
232-
byte[] originalRequest);
234+
byte[] originalRequest) throws IllegalArgumentException;
233235

234236
/**
235237
* Create a job for interactive use in an NMPI Collab Session.
@@ -256,13 +258,14 @@ Optional<Job> createJobInGroup(@NotNull String owner, @NotNull String group,
256258
* @param originalRequest
257259
* The serialized original request, which will be stored in the
258260
* database for later retrieval.
259-
* @return Handle to the job, or {@code empty} if the job couldn't be made.
261+
* @return The job created.
262+
* @throws IllegalArgumentException if the job could not be created.
260263
*/
261-
Optional<Job> createJobInCollabSession(@NotNull String owner,
264+
Job createJobInCollabSession(@NotNull String owner,
262265
@NotNull String nmpiCollab,
263266
@Valid CreateDescriptor descriptor, String machineName,
264267
List<String> tags, Duration keepaliveInterval,
265-
byte[] originalRequest);
268+
byte[] originalRequest) throws IllegalArgumentException;
266269

267270
/**
268271
* Create a job for interactive use in an NMPI Collab Session.
@@ -289,13 +292,14 @@ Optional<Job> createJobInCollabSession(@NotNull String owner,
289292
* @param originalRequest
290293
* The serialized original request, which will be stored in the
291294
* database for later retrieval.
292-
* @return Handle to the job, or {@code empty} if the job couldn't be made.
295+
* @return The job created.
296+
* @throws IllegalArgumentException if the job could not be created.
293297
*/
294298
@PreAuthorize(IS_NMPI_EXEC)
295-
Optional<Job> createJobForNMPIJob(@NotNull String owner, int nmpiJobId,
299+
Job createJobForNMPIJob(@NotNull String owner, int nmpiJobId,
296300
@Valid CreateDescriptor descriptor, String machineName,
297301
List<String> tags, Duration keepaliveInterval,
298-
byte[] originalRequest);
302+
byte[] originalRequest) throws IllegalArgumentException;
299303

300304
/** Purge the cache of what boards are down. */
301305
void purgeDownCache();

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/compat/V1CompatTask.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -444,17 +444,16 @@ private Object callOperation(Command cmd) throws Exception {
444444
byte[] serialCmd = getJsonMapper().writeValueAsBytes(cmd);
445445
switch (args.size()) {
446446
case 0:
447-
return createJobNumBoards(1, kwargs, serialCmd).orElse(null);
447+
return createJobNumBoards(1, kwargs, serialCmd);
448448
case 1:
449-
return createJobNumBoards(parseDec(args, 0), kwargs, serialCmd)
450-
.orElse(null);
449+
return createJobNumBoards(parseDec(args, 0), kwargs, serialCmd);
451450
case 2:
452451
return createJobRectangle(parseDec(args, 0), parseDec(args, 1),
453-
kwargs, serialCmd).orElse(null);
452+
kwargs, serialCmd);
454453
case TRIAD_COORD_COUNT:
455454
return createJobSpecificBoard(new TriadCoords(parseDec(args, 0),
456455
parseDec(args, 1), parseDec(args, 2)), kwargs,
457-
serialCmd).orElse(null);
456+
serialCmd);
458457
default:
459458
throw new Oops(
460459
"unsupported number of arguments: " + args.size());
@@ -563,7 +562,7 @@ private Object callOperation(Command cmd) throws Exception {
563562
* @throws TaskException
564563
* If anything goes wrong.
565564
*/
566-
protected abstract Optional<Integer> createJobNumBoards(
565+
protected abstract int createJobNumBoards(
567566
@Positive int numBoards,
568567
Map<@NotBlank String, @NotNull Object> kwargs, byte[] cmd)
569568
throws TaskException;
@@ -583,7 +582,7 @@ protected abstract Optional<Integer> createJobNumBoards(
583582
* @throws TaskException
584583
* If anything goes wrong.
585584
*/
586-
protected abstract Optional<Integer> createJobRectangle(
585+
protected abstract int createJobRectangle(
587586
@ValidTriadWidth int width, @ValidTriadHeight int height,
588587
Map<@NotBlank String, @NotNull Object> kwargs, byte[] cmd)
589588
throws TaskException;
@@ -601,7 +600,7 @@ protected abstract Optional<Integer> createJobRectangle(
601600
* @throws TaskException
602601
* If anything goes wrong.
603602
*/
604-
protected abstract Optional<Integer> createJobSpecificBoard(
603+
protected abstract int createJobSpecificBoard(
605604
@Valid TriadCoords coords,
606605
Map<@NotBlank String, @NotNull Object> kwargs, byte[] cmd)
607606
throws TaskException;

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/compat/V1TaskImpl.java

+8-12
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import java.util.List;
4545
import java.util.Map;
4646
import java.util.Objects;
47-
import java.util.Optional;
4847
import java.util.concurrent.Future;
4948

5049
import javax.annotation.PostConstruct;
@@ -260,22 +259,22 @@ private static List<String> tags(Object src, boolean mayForceDefault) {
260259
}
261260

262261
@Override
263-
protected final Optional<Integer> createJobNumBoards(int numBoards,
262+
protected final int createJobNumBoards(int numBoards,
264263
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
265264
var maxDead = parseDec(kwargs.get("max_dead_boards"));
266265
return createJob(new CreateNumBoards(numBoards, maxDead), kwargs, cmd);
267266
}
268267

269268
@Override
270-
protected final Optional<Integer> createJobRectangle(int width, int height,
269+
protected final int createJobRectangle(int width, int height,
271270
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
272271
var maxDead = parseDec(kwargs.get("max_dead_boards"));
273272
return createJob(new CreateDimensions(width, height, maxDead), kwargs,
274273
cmd);
275274
}
276275

277276
@Override
278-
protected final Optional<Integer> createJobSpecificBoard(TriadCoords coords,
277+
protected final int createJobSpecificBoard(TriadCoords coords,
279278
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
280279
return createJob(triad(coords.x, coords.y, coords.z), kwargs, cmd);
281280
}
@@ -294,21 +293,18 @@ private static String getOwner(Map<String, Object> kwargs)
294293
return owner;
295294
}
296295

297-
private Optional<Integer> createJob(SpallocAPI.CreateDescriptor create,
296+
private Integer createJob(SpallocAPI.CreateDescriptor create,
298297
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
299298
var owner = getOwner(kwargs);
300299
var keepalive = parseKeepalive((Number) kwargs.get("keepalive"));
301300
var machineName = (String) kwargs.get("machine");
302301
var ts = tags(kwargs.get("tags"), isNull(machineName));
303-
var result = permit.authorize(() -> spalloc.createJobInGroup(
302+
var job = permit.authorize(() -> spalloc.createJobInGroup(
304303
permit.name, groupName, create, machineName, ts, keepalive,
305304
cmd));
306-
result.ifPresent(
307-
j -> log.info(
308-
"made compatibility-mode job {} "
309-
+ "on behalf of claimed user {}",
310-
j.getId(), owner));
311-
return result.map(Job::getId);
305+
log.info("made compatibility-mode job {} on behalf of claimed user {}",
306+
job.getId(), owner);
307+
return job.getId();
312308
}
313309

314310
@Override

0 commit comments

Comments
 (0)