Skip to content

Commit 1f57fec

Browse files
committed
gRPC workers spawn bug-fix; added exception handling
Signed-off-by: Aditya Kumar <[email protected]>
1 parent 5c9a340 commit 1f57fec

File tree

24 files changed

+239
-132
lines changed

24 files changed

+239
-132
lines changed
13 Bytes
Binary file not shown.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#Generated by Maven
2-
#Sun Mar 28 18:29:56 IST 2021
2+
#Mon Mar 29 16:50:42 IST 2021
33
version=1.0-SNAPSHOT
44
groupId=io.github
55
artifactId=grpc-server
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
D:\Work\UW\app\grpc-server\src\main\java\io\github\stubs\matrix\computeGrpc.java
2-
D:\Work\UW\app\grpc-server\src\main\java\io\github\engine\MultiplyBlock.java
3-
D:\Work\UW\app\grpc-server\src\main\java\io\github\stubs\matrix\Matrix.java
4-
D:\Work\UW\app\grpc-server\src\main\java\io\github\engine\AddBlock.java
5-
D:\Work\UW\app\grpc-server\src\main\java\io\github\GrpcServer.java
6-
D:\Work\UW\app\grpc-server\src\main\java\io\github\helper\MatrixLoader.java
7-
D:\Work\UW\app\grpc-server\src\main\java\io\github\services\ComputeService.java
1+
D:\Work\UW\distributed-computation-with-grpc-and-rest\grpc-server\src\main\java\io\github\helper\MatrixLoader.java
2+
D:\Work\UW\distributed-computation-with-grpc-and-rest\grpc-server\src\main\java\io\github\stubs\matrix\computeGrpc.java
3+
D:\Work\UW\distributed-computation-with-grpc-and-rest\grpc-server\src\main\java\io\github\engine\AddBlock.java
4+
D:\Work\UW\distributed-computation-with-grpc-and-rest\grpc-server\src\main\java\io\github\stubs\matrix\Matrix.java
5+
D:\Work\UW\distributed-computation-with-grpc-and-rest\grpc-server\src\main\java\io\github\engine\MultiplyBlock.java
6+
D:\Work\UW\distributed-computation-with-grpc-and-rest\grpc-server\src\main\java\io\github\GrpcServer.java
7+
D:\Work\UW\distributed-computation-with-grpc-and-rest\grpc-server\src\main\java\io\github\services\ComputeService.java

rest-server/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@
2020
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2121
</properties>
2222
<dependencies>
23+
<dependency>
24+
<groupId>org.junit.jupiter</groupId>
25+
<artifactId>junit-jupiter</artifactId>
26+
<version>5.6.2</version>
27+
<scope>test</scope>
28+
</dependency>
2329
<dependency>
2430
<groupId>org.springframework.boot</groupId>
2531
<artifactId>spring-boot-starter-web</artifactId>
@@ -32,6 +38,12 @@
3238
<groupId>org.springframework.boot</groupId>
3339
<artifactId>spring-boot-starter-test</artifactId>
3440
<scope>test</scope>
41+
<exclusions>
42+
<exclusion>
43+
<groupId>junit</groupId>
44+
<artifactId>junit</artifactId>
45+
</exclusion>
46+
</exclusions>
3547
</dependency>
3648
<dependency>
3749
<groupId>com.google.protobuf</groupId>

rest-server/src/main/java/io/github/restserver/controllers/FileUploadController.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,42 @@
66
import io.github.restserver.models.Status;
77
import io.github.restserver.models.UploadFileResponse;
88
import io.github.restserver.services.ComputeService;
9-
import org.springframework.beans.factory.annotation.Autowired;
109
import org.springframework.http.HttpStatus;
1110
import org.springframework.http.ResponseEntity;
1211
import org.springframework.web.bind.annotation.PostMapping;
1312
import org.springframework.web.bind.annotation.RequestParam;
1413
import org.springframework.web.bind.annotation.RestController;
1514
import org.springframework.web.multipart.MultipartFile;
1615

17-
import java.io.IOException;
1816
import java.util.ArrayList;
1917

2018
@RestController
2119
public class FileUploadController {
22-
@Autowired
23-
private FileUploadHelper fileUploadHelper;
20+
private final FileUploadHelper fileUploadHelper;
2421

25-
@Autowired
26-
private MatrixLoaderHelper matrixLoaderHelper;
22+
private final MatrixLoaderHelper matrixLoaderHelper;
2723

28-
@Autowired
29-
private PathProvider pathProvider;
24+
private final PathProvider pathProvider;
3025

31-
@Autowired
32-
private ComputeService computeService;
26+
private final ComputeService computeService;
3327

34-
@Autowired
35-
private ThreadedComputeController threadedComputeController;
28+
private final ThreadedComputeController threadedComputeController;
3629

37-
@Autowired
38-
private GrpcServerScalingController grpcServerScalingController;
30+
private final GrpcServerScalingController grpcServerScalingController;
31+
32+
private boolean uploadHit;
33+
34+
public FileUploadController(FileUploadHelper fileUploadHelper, MatrixLoaderHelper matrixLoaderHelper, PathProvider pathProvider, ComputeService computeService, ThreadedComputeController threadedComputeController, GrpcServerScalingController grpcServerScalingController) {
35+
this.fileUploadHelper = fileUploadHelper;
36+
this.matrixLoaderHelper = matrixLoaderHelper;
37+
this.pathProvider = pathProvider;
38+
this.computeService = computeService;
39+
this.threadedComputeController = threadedComputeController;
40+
this.grpcServerScalingController = grpcServerScalingController;
41+
}
3942

4043
@PostMapping("/upload")
4144
public ResponseEntity<?> uploadFile(@RequestParam("file1") MultipartFile file1, @RequestParam("file2") MultipartFile file2) {
42-
4345
try {
4446
if (file1.isEmpty() || file2.isEmpty()) {
4547
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("Files are empty");
@@ -86,12 +88,7 @@ public ResponseEntity<?> uploadFile(@RequestParam("file1") MultipartFile file1,
8688

8789
@PostMapping("/kill-kids")
8890
public ResponseEntity<?> killChildren() {
89-
try {
90-
grpcServerScalingController.grpcServerScaleDown();
91-
return ResponseEntity.ok(grpcServerScalingController.getPortList().size() + " gRPC workers were killed");
92-
} catch (IOException e) {
93-
e.printStackTrace();
94-
}
95-
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("workers couldn't be killed.\n Please restart the server.");
91+
grpcServerScalingController.grpcServerScaleDown();
92+
return ResponseEntity.ok(grpcServerScalingController.getPortList().size() + " gRPC workers were killed");
9693
}
9794
}

rest-server/src/main/java/io/github/restserver/controllers/GrpcClientController.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public GrpcClientController() {
7171
public void setResponse(GrpcResponse response) {
7272
this.response = response;
7373
}
74-
75-
public GrpcResponse callMultiplyRowByColumnUsingBlockingStub(int port, ArrayList<Long> row, ArrayList<Long> column, int rowId, int colId) throws InterruptedException {
74+
/**
75+
public GrpcResponse callMultiplyRowByColumnUsingBlockingStub(int port, ArrayList<Long> row, ArrayList<Long> column, int rowId, int colId) {
7676
7777
ManagedChannel channel = ManagedChannelBuilder
7878
.forAddress("localhost", port)
@@ -111,11 +111,15 @@ public void onCompleted() {
111111
}
112112
};
113113
matrixStub.multiplyRowByColumn(request, responseObserver);
114-
finishLatch.await();
114+
try {
115+
finishLatch.await();
116+
} catch (InterruptedException e) {
117+
System.out.println("Exception encountered in closing time latch");
118+
}
115119
return response;
116120
}
117-
118-
public GrpcResponse callMultiplyRowByColumnUsingAsyncStub(int port, ArrayList<Long> row, ArrayList<Long> column, int rowId, int colId) throws InterruptedException {
121+
*/
122+
public GrpcResponse callMultiplyRowByColumnUsingAsyncStub(int port, ArrayList<Long> row, ArrayList<Long> column, int rowId, int colId) {
119123
ManagedChannel channel = ManagedChannelBuilder
120124
.forAddress("localhost", port)
121125
.usePlaintext()
Lines changed: 54 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package io.github.restserver.controllers;
22

33
import io.github.restserver.helper.PathProvider;
4-
import org.springframework.beans.factory.annotation.Autowired;
54
import org.springframework.context.annotation.PropertySource;
65
import org.springframework.core.env.Environment;
76
import org.springframework.stereotype.Component;
87

98
import java.io.File;
9+
import java.io.FileWriter;
1010
import java.io.IOException;
1111
import java.net.ServerSocket;
1212
import java.util.ArrayList;
@@ -15,51 +15,35 @@
1515
@PropertySource("classpath:application.properties")
1616
public class GrpcServerScalingController {
1717

18-
@Autowired
19-
private Environment env;
18+
private final Environment env;
2019

21-
@Autowired
22-
private PathProvider pathProvider;
20+
private final PathProvider pathProvider;
2321

24-
private ArrayList<Integer> portList;
22+
private final ArrayList<Integer> portList;
2523

26-
public GrpcServerScalingController() {
24+
public GrpcServerScalingController(Environment env, PathProvider pathProvider) {
2725
this.portList = new ArrayList<>();
26+
this.env = env;
27+
this.pathProvider = pathProvider;
2828
}
2929

3030
public ArrayList<Integer> getPortList() {
3131
return portList;
3232
}
3333

34-
/**
35-
* Returns a free port number on localhost.
36-
* <p>
37-
* Heavily inspired from org.eclipse.jdt.launching.SocketUtil (to avoid a dependency to JDT just because of this).
38-
* Slightly improved with close() missing in JDT. And throws exception instead of returning -1.
39-
*
40-
* @return a free port number on localhost
41-
* @throws IllegalStateException if unable to find a free port
42-
*/
4334
private int findFreePort() {
44-
ServerSocket socket = null;
45-
try {
46-
socket = new ServerSocket(0);
35+
try (ServerSocket socket = new ServerSocket(0)) {
4736
socket.setReuseAddress(true);
4837
int port = socket.getLocalPort();
4938
try {
5039
socket.close();
5140
} catch (IOException e) {
5241
// Ignore IOException on close()
42+
System.out.println("Problem in closing socket connection");
5343
}
5444
return port;
5545
} catch (IOException e) {
56-
} finally {
57-
if (socket != null) {
58-
try {
59-
socket.close();
60-
} catch (IOException e) {
61-
}
62-
}
46+
System.out.println("\"Could not find a free TCP/IP port to start embedded Jetty HTTP Server on\"");
6347
}
6448
throw new IllegalStateException("Could not find a free TCP/IP port to start embedded Jetty HTTP Server on");
6549
}
@@ -71,12 +55,13 @@ private void getFreePortList(int requirements) {
7155
}
7256
}
7357

74-
public void grpcServerScaleUp(boolean isFirstInstance) throws IOException {
58+
public void grpcServerScaleUp(boolean isFirstInstance) {
7559
String envThreshold = env.getProperty("server_threshold");
7660
if (isFirstInstance) {
7761
if (envThreshold != null) {
7862
int requirements = Integer.parseInt(envThreshold);
7963
getFreePortList(requirements);
64+
storePortsLocally(this.portList);
8065
}
8166
int firstInstancePort = portList.get(0);
8267
System.out.println("=================================");
@@ -91,29 +76,58 @@ public void grpcServerScaleUp(boolean isFirstInstance) throws IOException {
9176
}
9277
}
9378

94-
public void spawnGrpcServer(int port) throws IOException {
79+
private void storePortsLocally(ArrayList<Integer> portList) {
80+
String storageDirectory = pathProvider.provideStoragePath();
81+
String pathForPortsFile = storageDirectory + File.separator + "port.txt";
82+
File localPortFile;
83+
FileWriter localPortFileWriter;
84+
try {
85+
localPortFile = new File(pathForPortsFile);
86+
if (!localPortFile.exists()) {
87+
localPortFile.createNewFile();
88+
}
89+
localPortFileWriter = new FileWriter(pathForPortsFile);
90+
StringBuilder p = new StringBuilder();
91+
for (int port : portList) p.append(port).append(" ");
92+
localPortFileWriter.write(p.toString());
93+
localPortFileWriter.flush();
94+
localPortFileWriter.close();
95+
} catch (IOException e) {
96+
e.printStackTrace();
97+
}
98+
}
99+
100+
public void spawnGrpcServer(int port) {
95101
String spawnScriptPath = pathProvider.provideScriptPath() + File.separator + "spawn.py";
96102
String cmd = "python " + spawnScriptPath + " " + port;
97103
System.out.println(cmd);
98104
Runtime run = Runtime.getRuntime();
99-
Process pr = run.exec(cmd);
105+
try {
106+
Process pr = run.exec(cmd);
107+
} catch (IOException e) {
108+
System.out.println("Exception encountered while running python script for spawning gRPC workers");
109+
}
100110
System.out.println("=================================");
101111

102112
}
103113

104-
public void grpcServerScaleDown() throws IOException {
105-
for (int port : this.portList) {
106-
killGrpcServer(port);
107-
System.out.println("Worker gRPC on " + port + " was killed.");
108-
}
114+
public void grpcServerScaleDown() {
115+
killGrpcServer(this.portList);
109116
}
110117

111-
public void killGrpcServer(int port) throws IOException {
118+
public void killGrpcServer(ArrayList<Integer> ports) {
112119
String spawnScriptPath = pathProvider.provideScriptPath() + File.separator + "kill.py";
113-
String cmd = "python " + spawnScriptPath + " " + port;
114-
System.out.println(cmd);
115-
Runtime run = Runtime.getRuntime();
116-
Process pr = run.exec(cmd);
117-
System.out.println("=================================");
120+
for (int port : ports) {
121+
String cmd = "python " + spawnScriptPath + " " + port;
122+
System.out.println(cmd);
123+
Runtime run = Runtime.getRuntime();
124+
try {
125+
Process pr = run.exec(cmd);
126+
} catch (IOException e) {
127+
System.out.println("Exception encountered while running python script for killing gRPC workers");
128+
}
129+
System.out.println("Worker gRPC on " + port + " was killed.");
130+
System.out.println("=================================");
131+
}
118132
}
119133
}

0 commit comments

Comments
 (0)