Skip to content

Commit

Permalink
scheduler initial
Browse files Browse the repository at this point in the history
  • Loading branch information
dougsillars committed May 10, 2022
1 parent 9bb3dcd commit f5f3425
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 158 deletions.
237 changes: 99 additions & 138 deletions docs/how-tos/sdks/conductor-go/main/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,121 +7,97 @@ To find out more about Conductor visit: [https://github.com/Netflix/conductor](h
## Quick Start

1. [Setup conductor-go package](#Setup-conductor-go-package)
1. [Write worker as a function](#Write-worker-as-a-function)
1. [Run workers](#Run-workers)
1. [Configuration](#Configuration)
2. [Run workers](#Run-workers)
3. [Configuration](#Configuration)

### Setup-conductor-go-package
### Setup conductor go package

Create a folder to build your package:
```shell
$ mkdir conductor-go/
$ cd conductor-go/
mkdir conductor-go/
cd conductor-go/
```

Create a `go.mod` file inside this folder, with this content:
```
Create a go.mod file for dependencies
```go
module conductor_test

go 1.18

require (
github.com/conductor-sdk/conductor-go v1.0.8
github.com/conductor-sdk/conductor-go v1.1.1
)
```

Now you may be able to create your workers and main function.

### Write worker as a function
You can download [this code](examples/task_execute_function/task_execute_function.go) into the repository folder with:
```shell
$ wget "https://github.com/conductor-sdk/conductor-go/blob/main/examples/task_execute_function/task_execute_function.go"
```

### Run workers
You can download [this code](examples/main/main.go) into the repository folder with:
```shell
$ wget "https://github.com/conductor-sdk/conductor-go/blob/main/examples/main/main.go"
```
Now, create simple worker implentation
```go
package main

import (
"github.com/conductor-sdk/conductor-go/pkg/http_model"
"github.com/conductor-sdk/conductor-go/pkg/model"
"github.com/conductor-sdk/conductor-go/pkg/model/enum/task_result_status"
"github.com/conductor-sdk/conductor-go/pkg/settings"
"github.com/conductor-sdk/conductor-go/pkg/worker"
log "github.com/sirupsen/logrus"
"os"
)

### Running Conductor server locally in 2-minute
More details on how to run Conductor see https://netflix.github.io/conductor/server/
func init() {
log.SetFormatter(&log.JSONFormatter{})
log.SetOutput(os.Stdout)
log.SetLevel(log.DebugLevel)
}

func Worker(t *http_model.Task) (taskResult *http_model.TaskResult, err error) {
taskResult = model.GetTaskResultFromTask(t)
taskResult.OutputData = map[string]interface{}{
"task": "task_1",
"key3": 3,
"key4": false,
}
taskResult.Status = task_result_status.COMPLETED
err = nil
return taskResult, err
}

func main() {
taskRunner := worker.NewTaskRunner(
settings.NewAuthenticationSettings(
__KEY__,
__SECRET__,
),
settings.NewHttpSettings(
"https://play.orkes.io",
),
)

taskRunner.StartWorker(
"go_task_example",
Worker,
2,
10,
)

taskRunner.WaitWorkers()
}

Use the script below to download and start the server locally. The server runs in memory and no data saved upon exit.
```shell
export CONDUCTOR_VER=3.5.2
export REPO_URL=https://repo1.maven.org/maven2/com/netflix/conductor/conductor-server
curl $REPO_URL/$CONDUCTOR_VER/conductor-server-$CONDUCTOR_VER-boot.jar \
--output conductor-server-$CONDUCTOR_VER-boot.jar; java -jar conductor-server-$CONDUCTOR_VER-boot.jar
```
### Execute workers
```shell
go ./main.go
```

### Create your first workflow
Now, let's create a new workflow and see your task worker code in execution!

Create a new Task Metadata for the worker you just created

Install dependencies. This will download all the required dependencies
```shell
curl -X 'POST' \
'http://localhost:8080/api/metadata/taskdefs' \
-H 'accept: */*' \
-H 'Content-Type: application/json' \
-d '[{
"name": "go_task_example",
"description": "Go task example",
"retryCount": 3,
"retryLogic": "FIXED",
"retryDelaySeconds": 10,
"timeoutSeconds": 300,
"timeoutPolicy": "TIME_OUT_WF",
"responseTimeoutSeconds": 180,
"ownerEmail": "[email protected]"
}]'
go get
```
**Note:**
Replace `KEY` and `SECRET` by obtaining a new key and secret from Orkes Playground as described [Generating Access Keys for Programmatic Access](https://orkes.io/content/docs/getting-started/concepts/access-control#access-keys)

Create a workflow that uses the task
```shell
curl -X 'POST' \
'http://localhost:8080/api/metadata/workflow' \
-H 'accept: */*' \
-H 'Content-Type: application/json' \
-d '{
"name": "workflow_with_go_task_example",
"description": "Workflow with Go Task example",
"version": 1,
"tasks": [
{
"name": "go_task_example",
"taskReferenceName": "go_task_example_ref_1",
"inputParameters": {},
"type": "SIMPLE"
}
],
"inputParameters": [],
"outputParameters": {
"workerOutput": "${go_task_example_ref_1.output}"
},
"schemaVersion": 2,
"restartable": true,
"ownerEmail": "[email protected]",
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0
}'
```

Start a new workflow execution
### Run workers
Start the workers by running `go run`
```shell
curl -X 'POST' \
'http://localhost:8080/api/workflow/workflow_with_go_task_example?priority=0' \
-H 'accept: text/plain' \
-H 'Content-Type: application/json' \
-d '{}'
go run main.go
```


## Configuration

### Authentication settings (optional)
Expand All @@ -131,64 +107,49 @@ Use if your conductor server requires authentication

```go
authenticationSettings := settings.NewAuthenticationSettings(
"keyId",
"keySecret",
),
```

### External Storage Settings (optional)
Use if you would like to upload large payload at an external storage
You may define max payload size and threshold for uploading, also with a function capable of returning the path where it is stored.

```go
externalStorageSettings := settings.NewExternalStorageSettings(
4, // taskOutputPayloadThresholdKB
10, // taskOutputMaxPayloadThresholdKB
external_storage_handler.UploadAndGetPath, // External Storage Handler function
),
```

### HTTP Settings (optional)

* baseUrl: Conductor server address. e.g. http://localhost:8000 if running locally

```go
httpSettings := settings.NewHttpSettings(
"https://play.orkes.io/api",
externalStorageSettings,
"keyId",
"keySecret",
)
```

### Metrics Settings
Conductor uses [Prometheus](https://prometheus.io/) to collect metrics.

* apiEndpoint : Address to serve metrics (e.g. `/metrics`)
* port : Port to serve metrics (e.g. `2112`)

With this configuration, you can access metrics via `http://localhost:2112/metrics` after exposing them with:

```go
metricsSettings := settings.NewMetricsSettings(
"/metrics",
2112,
)

go metrics.ProvideMetrics(metricsSettings)
```

### Worker Settings

You can create a new worker by calling `workerOrkestrator.StartWorker` with:
* taskType : Task definition name (e.g `"go_task_example"`)
* executeFunction : Task Execution Function (e.g. `example.TaskExecuteFunctionExample1` from `example` folder)
* parallelGoRoutinesAmount : Amount of Go routines to be executed in parallel for new worker (e.g. `1`, single thread)
* pollingInterval : Amount of ms to wait between polling for task
* threadCount : Amount of Go routines to be executed in parallel for new worker (e.g. `1`, single thread)
* pollIntervalInMillis : Amount of ms to wait between polling for task

```go
workerOrkestrator.StartWorker(
taskRunner.StartWorker(
"go_task_example", // task definition name
task_execute_function.Example1, // task execution function
1, // parallel go routines amount
5000, // 5000ms
Worker, // task execution function
1, // thread count
1000, // polling interval in milli-seconds
)
```
### Start a workflow using APIs
```go

apiClient := conductor_http_client.NewAPIClient(
settings.NewAuthenticationSettings(
KEY,
SECRET,
),
settings.NewHttpSettings(
"https://play.orkes.io",
),
)

workflowClient := *&conductor_http_client.WorkflowResourceApiService{
APIClient: apiClient,
}
workflowId, _, _ := workflowClient.StartWorkflow(
context.Background(),
map[string]interface{}{},
"PopulationMinMax",
&conductor_http_client.WorkflowResourceApiStartWorkflowOpts{},
)
log.Info("Workflow Id is ", workflowId)

```
2 changes: 1 addition & 1 deletion docs/how-tos/sdks/java-sdk/testing_framework.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ testRunner = new WorkflowTestRunner(8096, "3.5.2");
//Scan the packages for task workers
testRunner.init("com.netflix.conductor.testing.workflows");

//Get the executor instance used for loading workflwos
//Get the executor instance used for loading workflows
executor = testRunner.getWorkflowExecutor();
```

Expand Down
6 changes: 3 additions & 3 deletions docs/how-tos/sdks/java-sdk/worker_sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ Output:
```

## Managing Task Workers
Annotated Workers are managed by [WorkflowExecutor](src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java)
Annotated Workers are managed by [WorkflowExecutor](https://github.com/netflix/conductor/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java)

### Start Workers
```java
WorkflowExecutor executor = new WorkflowExecutor("http://server/api/");
//List of packages (comma separated) to scan for annonated workers.
//List of packages (comma separated) to scan for annotated workers.
// Please note,the worker method MUST be public and the class in which they are defined
//MUST have a no-args constructor
executor.initWorkers("com.company.package1,com.company.package2");
Expand All @@ -95,7 +95,7 @@ executor.shutdown();
Workers implemented with the annotations are regular Java methods can be united tested with any testing framework.

#### Mock workers for workflow testing
Create a mock worker in a different pacakge (e.g. test) and scan for these packages when loading up the workers for integration testing.
Create a mock worker in a different package (e.g. test) and scan for these packages when loading up the workers for integration testing.

See [Unit Testing Framework](testing_framework.md) for more details on testing.

Expand Down
30 changes: 15 additions & 15 deletions docs/how-tos/sdks/java-sdk/workflow_sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Workflow SDK provides fluent API to create workflows with strongly typed interfa

## APIs
### ConductorWorkflow
[ConductorWorkflow](src/main/java/com/netflix/conductor/sdk/workflow/def/ConductorWorkflow.java) is the SDK representation of a Conductor workflow.
[ConductorWorkflow](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/ConductorWorkflow.java) is the SDK representation of a Conductor workflow.

#### Create a `ConductorWorkflow` instance
```java
Expand All @@ -18,7 +18,7 @@ ConductorWorkflow<GetInsuranceQuote> conductorWorkflow = new WorkflowBuilder<Get
.build();
```
### Working with Simple Worker Tasks
Use [SimpleTask](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/SimpleTask.java) to add simple task to a workflow.
Use [SimpleTask](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/SimpleTask.java) to add simple task to a workflow.

Example:
```java
Expand All @@ -43,16 +43,16 @@ builder.add(
### Working with operators
Each of the operator -

[ForkJoin](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/ForkJoin.java),
[Wait](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Wait.java),
[Switch](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Switch.java),
[DynamicFork](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/DynamicFork.java),
[DoWhile](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/DoWhile.java),
[Join](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Join.java),
[Dynamic](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Dynamic.java),
[Terminate](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Terminate.java),
[SubWorkflow](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/SubWorkflow.java),
[SetVariable](src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/SetVariable.java),
[ForkJoin](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/ForkJoin.java),
[Wait](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Wait.java),
[Switch](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Switch.java),
[DynamicFork](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/DynamicFork.java),
[DoWhile](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/DoWhile.java),
[Join](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Join.java),
[Dynamic](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Dynamic.java),
[Terminate](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/Terminate.java),
[SubWorkflow](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/SubWorkflow.java),
[SetVariable](https://github.com/Netflix/conductor/blob/main/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/def/tasks/SetVariable.java),

have their own class that can be added to the workflow builder.

Expand All @@ -63,7 +63,7 @@ have their own class that can be added to the workflow builder.
//Reasons why this method will return false
//1. Network connectivity issue
//2. Workflow already exists with the specified name and version
//3. There are missing task definions
//3. There are missing task definitions
boolean registered = workflow.registerWorkflow();
```
#### Overwrite existing workflow definition
Expand Down Expand Up @@ -102,14 +102,14 @@ String workflowId = workflowRun.getWorkflowId();
//Get the status of workflow execution
WorkflowStatus status = workflowRun.getStatus();
```
See [Workflow](../common/src/main/java/com/netflix/conductor/common/run/Workflow.java) for more details on Workflow object.
See [Workflow](https://github.com/Netflix/conductor/blob/main/common/src/main/java/com/netflix/conductor/common/run/Workflow.java) for more details on Workflow object.

#### Start a dynamic workflow execution
Dynamic workflows are executed by specifying the workflow definition along with the execution and does not require registering the workflow on the server before executing.

##### Use cases for dynamic workflows
1. Each workflow run has a unique workflow definition
2. Workflows are defined based on the user data and cannnot be modeled ahead of time statically
2. Workflows are defined based on the user data and cannot be modeled ahead of time statically

```java
//1. Use WorkflowBuilder to create ConductorWorkflow
Expand Down
Loading

0 comments on commit f5f3425

Please sign in to comment.