1515package agent
1616
1717import (
18+ "bytes"
19+ "encoding/json"
1820 "errors"
21+ "fmt"
22+ "io/ioutil"
1923 "sync"
2024 "time"
2125
2226 "github.com/codefresh-io/go/venona/pkg/codefresh"
2327 "github.com/codefresh-io/go/venona/pkg/logger"
2428 "github.com/codefresh-io/go/venona/pkg/runtime"
2529 "github.com/codefresh-io/go/venona/pkg/task"
30+ retryablehttp "github.com/hashicorp/go-retryablehttp"
31+ "github.com/stretchr/objx"
2632)
2733
34+ // internal errors
2835var (
29- errAlreadyRunning = errors .New ("Agent already running" )
30- errAlreadyStopped = errors .New ("Agent already stopped" )
31- errOptionsRequired = errors .New ("Options are required" )
32- errIDRequired = errors .New ("ID options is required" )
33- errRuntimesRequired = errors .New ("Runtimes options is required" )
34- errLoggerRequired = errors .New ("Logger options is required" )
36+ errAlreadyRunning = errors .New ("Agent already running" )
37+ errAlreadyStopped = errors .New ("Agent already stopped" )
38+ errOptionsRequired = errors .New ("Options are required" )
39+ errIDRequired = errors .New ("ID options is required" )
40+ errRuntimesRequired = errors .New ("Runtimes options is required" )
41+ errLoggerRequired = errors .New ("Logger options is required" )
42+ errFailedToParseAgentTask = errors .New ("Failed to parse agent task spec" )
43+ errUknownAgentTaskType = errors .New ("Agent task has unknown type" )
44+ errAgentTaskMalformedParams = errors .New ("failed to marshal agent task params" )
45+ errProxyTaskWithoutURL = errors .New (`url not provided for task of type "proxy"` )
46+ errProxyTaskWithoutToken = errors .New (`token not provided for task of type "proxy"` )
3547)
3648
3749const (
3850 defaultTaskPullingInterval = time .Second * 3
3951 defaultStatusReportingInterval = time .Second * 10
52+ defaultProxyRequestTimeout = time .Second * 10
53+ defaultProxyRequestRetries = 3
4054)
4155
4256type (
@@ -77,6 +91,14 @@ type (
7791 }
7892)
7993
94+ var (
95+ httpClient = retryablehttp .NewClient ()
96+
97+ agentTaskExecutors = map [string ]func (t * task.AgentTask , log logger.Logger ) error {
98+ "proxy" : proxyRequest ,
99+ }
100+ )
101+
80102// New creates a new Agent instance
81103func New (opt * Options ) (* Agent , error ) {
82104 if err := checkOptions (opt ); err != nil {
@@ -210,17 +232,33 @@ func pullTasks(client codefresh.Codefresh, logger logger.Logger) []task.Task {
210232func startTasks (tasks []task.Task , runtimes map [string ]runtime.Runtime , logger logger.Logger ) {
211233 creationTasks := []task.Task {}
212234 deletionTasks := []task.Task {}
235+ agentTasks := []task.Task {}
236+
237+ // divide tasks by types
213238 for _ , t := range tasks {
214- logger .Debug ("Received task" , "type" , t .Type , "workflow" , t .Metadata .Workflow , "runtime" , t .Metadata .ReName )
215- if t .Type == task .TypeCreatePod || t .Type == task .TypeCreatePVC {
239+ logger .Debug ("Received task" , "type" , t .Type , "tid" , t .Metadata .Workflow , "runtime" , t .Metadata .ReName )
240+ switch t .Type {
241+ case task .TypeCreatePod , task .TypeCreatePVC :
216242 creationTasks = append (creationTasks , t )
243+ case task .TypeDeletePod , task .TypeDeletePVC :
244+ deletionTasks = append (deletionTasks , t )
245+ case task .TypeAgentTask :
246+ agentTasks = append (agentTasks , t )
247+ default :
248+ logger .Error ("unrecognized task type" , "type" , t .Type , "tid" , t .Metadata .Workflow , "runtime" , t .Metadata .ReName )
217249 }
250+ }
218251
219- if t .Type == task .TypeDeletePod || t .Type == task .TypeDeletePVC {
220- deletionTasks = append (deletionTasks , t )
252+ // process agent tasks
253+ for i := range agentTasks {
254+ t := agentTasks [i ]
255+ logger .Info ("executing agent task" , "tid" , t .Metadata .Workflow )
256+ if err := executeAgentTask (& t , logger ); err != nil {
257+ logger .Error (err .Error ())
221258 }
222259 }
223260
261+ // process creation tasks
224262 for _ , tasks := range groupTasks (creationTasks ) {
225263 reName := tasks [0 ].Metadata .ReName
226264 runtime , ok := runtimes [reName ]
@@ -233,6 +271,8 @@ func startTasks(tasks []task.Task, runtimes map[string]runtime.Runtime, logger l
233271 logger .Error (err .Error ())
234272 }
235273 }
274+
275+ // process deletion tasks
236276 for _ , tasks := range groupTasks (deletionTasks ) {
237277 reName := tasks [0 ].Metadata .ReName
238278 runtime , ok := runtimes [reName ]
@@ -249,6 +289,71 @@ func startTasks(tasks []task.Task, runtimes map[string]runtime.Runtime, logger l
249289 }
250290}
251291
292+ func executeAgentTask (t * task.Task , log logger.Logger ) error {
293+ specJSON , err := json .Marshal (t .Spec )
294+ if err != nil {
295+ return errFailedToParseAgentTask
296+ }
297+
298+ spec := task.AgentTask {}
299+ if err = json .Unmarshal (specJSON , & spec ); err != nil {
300+ return errFailedToParseAgentTask
301+ }
302+
303+ e , ok := agentTaskExecutors [spec .Type ]
304+ if ! ok {
305+ return errUknownAgentTaskType
306+ }
307+
308+ return e (& spec , log )
309+ }
310+
311+ func proxyRequest (t * task.AgentTask , log logger.Logger ) error {
312+ spec := objx .Map (t .Params )
313+ vars := objx .Map (spec .Get ("runtimeContext.context.variables" ).MSI ())
314+ token := spec .Get ("runtimeContext.context.eventReporting.token" ).Str ()
315+ if token == "" {
316+ return errProxyTaskWithoutToken
317+ }
318+
319+ url := vars .Get ("proxyUrl" ).Str ()
320+ if url == "" {
321+ return errProxyTaskWithoutURL
322+ }
323+
324+ method := vars .Get ("method" ).Str ("POST" )
325+
326+ json , err := json .Marshal (t .Params )
327+ if err != nil {
328+ return errAgentTaskMalformedParams
329+ }
330+ if json == nil {
331+ json = []byte {}
332+ }
333+
334+ req , err := retryablehttp .NewRequest (method , url , bytes .NewReader (json ))
335+ if err != nil {
336+ return err
337+ }
338+
339+ req .Header .Add ("x-req-type" , "workflow-request" )
340+ req .Header .Add ("x-access-token" , token )
341+ req .Header .Add ("Content-Type" , "application/json" )
342+ req .Header .Add ("Content-Length" , fmt .Sprintf ("%v" , len (json )))
343+
344+ log .Info ("executing proxy task" , "url" , url , "method" , method )
345+
346+ resp , err := httpClient .Do (req )
347+ if err != nil {
348+ return err
349+ }
350+ body , _ := ioutil .ReadAll (resp .Body )
351+
352+ log .Info ("finished proxy task" , "url" , url , "method" , method , "status" , resp .Status , "body" , string (body ))
353+
354+ return nil
355+ }
356+
252357func groupTasks (tasks []task.Task ) map [string ][]task.Task {
253358 candidates := map [string ][]task.Task {}
254359 for _ , task := range tasks {
@@ -282,3 +387,8 @@ func checkOptions(opt *Options) error {
282387
283388 return nil
284389}
390+
391+ func init () {
392+ httpClient .RetryMax = defaultProxyRequestRetries
393+ httpClient .HTTPClient .Timeout = defaultProxyRequestTimeout
394+ }
0 commit comments