diff --git a/docs/components/Semaphore.mdx b/docs/components/Semaphore.mdx index 0f054c97a3..9763a7c5a6 100644 --- a/docs/components/Semaphore.mdx +++ b/docs/components/Semaphore.mdx @@ -15,6 +15,7 @@ import { CardGrid, LinkCard } from "@astrojs/starlight/components"; ## Actions + @@ -127,6 +128,81 @@ This trigger automatically sets up a Semaphore webhook when configured. The webh } ``` + + +## Get Pipeline + +The Get Pipeline component fetches a Semaphore pipeline by ID and returns its current state, result, workflow ID, and metadata. + +### Use Cases + +- **Pipeline status checks**: After Run Workflow starts a pipeline, fetch its status to decide when to proceed or notify +- **Result verification**: Look up the result of a specific pipeline to get full details or confirm state +- **Conditional deployments**: Verify a pipeline passed before triggering a dependent action (e.g. deploy only if build pipeline passed) + +### Configuration + +- **Pipeline ID**: The Semaphore pipeline ID (e.g. from Run Workflow output or On Pipeline Done event data). Accepts expressions. + +### Output + +Returns the pipeline object including: +- **name**: Pipeline name +- **ppl_id**: Pipeline ID +- **wf_id**: Workflow ID +- **state**: Pipeline state (e.g. done, running) +- **result**: Pipeline result (e.g. passed, failed, stopped) +- **result_reason**: Reason for the result +- **created_at**: When the pipeline was created +- **done_at**: When the pipeline finished +- **running_at**: When the pipeline started running +- **yaml_file_name**: Pipeline YAML file name +- **working_directory**: Pipeline working directory + +### Example Output + +```json +{ + "data": { + "created_at": { + "nanos": 0, + "seconds": 1737559967 + }, + "done_at": { + "nanos": 0, + "seconds": 1737559975 + }, + "error_description": "", + "name": "Initial Pipeline", + "pending_at": { + "nanos": 0, + "seconds": 1737559968 + }, + "ppl_id": "00000000-0000-0000-0000-000000000000", + "queuing_at": { + "nanos": 0, + "seconds": 1737559968 + }, + "result": "passed", + "result_reason": "test", + "running_at": { + "nanos": 0, + "seconds": 1737559968 + }, + "state": "done", + "stopping_at": { + "nanos": 0, + "seconds": 0 + }, + "wf_id": "00000000-0000-0000-0000-000000000000", + "working_directory": ".semaphore", + "yaml_file_name": "semaphore.yml" + }, + "timestamp": "2026-01-22T15:32:56.061430218Z", + "type": "semaphore.pipeline" +} +``` + ## Run Workflow diff --git a/pkg/integrations/semaphore/client.go b/pkg/integrations/semaphore/client.go index e36765ec76..80e2337f3d 100644 --- a/pkg/integrations/semaphore/client.go +++ b/pkg/integrations/semaphore/client.go @@ -153,6 +153,27 @@ func (c *Client) GetPipeline(id string) (*Pipeline, error) { return pipelineResponse.Pipeline, nil } +func (c *Client) GetPipelineRaw(id string) (map[string]any, error) { + URL := fmt.Sprintf("%s/api/v1alpha/pipelines/%s", c.OrgURL, id) + responseBody, err := c.execRequest(http.MethodGet, URL, nil) + if err != nil { + return nil, err + } + + var response map[string]any + err = json.Unmarshal(responseBody, &response) + if err != nil { + return nil, fmt.Errorf("error unmarshaling response: %v", err) + } + + pipeline, ok := response["pipeline"].(map[string]any) + if !ok { + return nil, fmt.Errorf("pipeline data missing from response") + } + + return pipeline, nil +} + func (c *Client) ListPipelines(projectID string) ([]any, error) { URL := fmt.Sprintf("%s/api/v1alpha/pipelines?project_id=%s", c.OrgURL, projectID) response, err := c.execRequest(http.MethodGet, URL, nil) diff --git a/pkg/integrations/semaphore/example.go b/pkg/integrations/semaphore/example.go index 46e5df5619..d10e217fdd 100644 --- a/pkg/integrations/semaphore/example.go +++ b/pkg/integrations/semaphore/example.go @@ -13,12 +13,18 @@ var exampleOutputRunWorkflowBytes []byte //go:embed example_data_on_pipeline_done.json var exampleDataOnPipelineDoneBytes []byte +//go:embed example_output_get_pipeline.json +var exampleOutputGetPipelineBytes []byte + var exampleOutputOnce sync.Once var exampleOutput map[string]any var exampleDataOnce sync.Once var exampleData map[string]any +var exampleOutputGetPipelineOnce sync.Once +var exampleOutputGetPipeline map[string]any + func (c *RunWorkflow) ExampleOutput() map[string]any { return utils.UnmarshalEmbeddedJSON(&exampleOutputOnce, exampleOutputRunWorkflowBytes, &exampleOutput) } @@ -26,3 +32,7 @@ func (c *RunWorkflow) ExampleOutput() map[string]any { func (t *OnPipelineDone) ExampleData() map[string]any { return utils.UnmarshalEmbeddedJSON(&exampleDataOnce, exampleDataOnPipelineDoneBytes, &exampleData) } + +func (c *GetPipeline) ExampleOutput() map[string]any { + return utils.UnmarshalEmbeddedJSON(&exampleOutputGetPipelineOnce, exampleOutputGetPipelineBytes, &exampleOutputGetPipeline) +} diff --git a/pkg/integrations/semaphore/example_output_get_pipeline.json b/pkg/integrations/semaphore/example_output_get_pipeline.json new file mode 100644 index 0000000000..2630adc64c --- /dev/null +++ b/pkg/integrations/semaphore/example_output_get_pipeline.json @@ -0,0 +1,39 @@ +{ + "data": { + "created_at": { + "nanos": 0, + "seconds": 1737559967 + }, + "done_at": { + "nanos": 0, + "seconds": 1737559975 + }, + "error_description": "", + "name": "Initial Pipeline", + "pending_at": { + "nanos": 0, + "seconds": 1737559968 + }, + "ppl_id": "00000000-0000-0000-0000-000000000000", + "queuing_at": { + "nanos": 0, + "seconds": 1737559968 + }, + "result": "passed", + "result_reason": "test", + "running_at": { + "nanos": 0, + "seconds": 1737559968 + }, + "state": "done", + "stopping_at": { + "nanos": 0, + "seconds": 0 + }, + "wf_id": "00000000-0000-0000-0000-000000000000", + "working_directory": ".semaphore", + "yaml_file_name": "semaphore.yml" + }, + "timestamp": "2026-01-22T15:32:56.061430218Z", + "type": "semaphore.pipeline" +} diff --git a/pkg/integrations/semaphore/get_pipeline.go b/pkg/integrations/semaphore/get_pipeline.go new file mode 100644 index 0000000000..8482d57133 --- /dev/null +++ b/pkg/integrations/semaphore/get_pipeline.go @@ -0,0 +1,147 @@ +package semaphore + +import ( + "errors" + "fmt" + "net/http" + + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/superplanehq/superplane/pkg/configuration" + "github.com/superplanehq/superplane/pkg/core" +) + +const GetPipelinePayloadType = "semaphore.pipeline" + +type GetPipeline struct{} + +type GetPipelineSpec struct { + PipelineID string `json:"pipelineId" mapstructure:"pipelineId"` +} + +func (c *GetPipeline) Name() string { + return "semaphore.getPipeline" +} + +func (c *GetPipeline) Label() string { + return "Get Pipeline" +} + +func (c *GetPipeline) Description() string { + return "Fetch a Semaphore pipeline by ID" +} + +func (c *GetPipeline) Documentation() string { + return `The Get Pipeline component fetches a Semaphore pipeline by ID and returns its current state, result, workflow ID, and metadata. + +## Use Cases + +- **Pipeline status checks**: After Run Workflow starts a pipeline, fetch its status to decide when to proceed or notify +- **Result verification**: Look up the result of a specific pipeline to get full details or confirm state +- **Conditional deployments**: Verify a pipeline passed before triggering a dependent action (e.g. deploy only if build pipeline passed) + +## Configuration + +- **Pipeline ID**: The Semaphore pipeline ID (e.g. from Run Workflow output or On Pipeline Done event data). Accepts expressions. + +## Output + +Returns the pipeline object including: +- **name**: Pipeline name +- **ppl_id**: Pipeline ID +- **wf_id**: Workflow ID +- **state**: Pipeline state (e.g. done, running) +- **result**: Pipeline result (e.g. passed, failed, stopped) +- **result_reason**: Reason for the result +- **created_at**: When the pipeline was created +- **done_at**: When the pipeline finished +- **running_at**: When the pipeline started running +- **yaml_file_name**: Pipeline YAML file name +- **working_directory**: Pipeline working directory` +} + +func (c *GetPipeline) Icon() string { + return "workflow" +} + +func (c *GetPipeline) Color() string { + return "gray" +} + +func (c *GetPipeline) OutputChannels(configuration any) []core.OutputChannel { + return []core.OutputChannel{core.DefaultOutputChannel} +} + +func (c *GetPipeline) Configuration() []configuration.Field { + return []configuration.Field{ + { + Name: "pipelineId", + Label: "Pipeline ID", + Type: configuration.FieldTypeString, + Required: true, + Description: "The Semaphore pipeline ID to fetch", + }, + } +} + +func (c *GetPipeline) Setup(ctx core.SetupContext) error { + spec := GetPipelineSpec{} + err := mapstructure.Decode(ctx.Configuration, &spec) + if err != nil { + return fmt.Errorf("failed to decode configuration: %w", err) + } + + if spec.PipelineID == "" { + return errors.New("pipelineId is required") + } + + return nil +} + +func (c *GetPipeline) Execute(ctx core.ExecutionContext) error { + spec := GetPipelineSpec{} + err := mapstructure.Decode(ctx.Configuration, &spec) + if err != nil { + return fmt.Errorf("error decoding configuration: %v", err) + } + + client, err := NewClient(ctx.HTTP, ctx.Integration) + if err != nil { + return fmt.Errorf("error creating client: %v", err) + } + + pipeline, err := client.GetPipelineRaw(spec.PipelineID) + if err != nil { + return fmt.Errorf("failed to get pipeline: %v", err) + } + + return ctx.ExecutionState.Emit( + core.DefaultOutputChannel.Name, + GetPipelinePayloadType, + []any{pipeline}, + ) +} + +func (c *GetPipeline) Cancel(ctx core.ExecutionContext) error { + return nil +} + +func (c *GetPipeline) ProcessQueueItem(ctx core.ProcessQueueContext) (*uuid.UUID, error) { + return ctx.DefaultProcessing() +} + +func (c *GetPipeline) Actions() []core.Action { + return []core.Action{} +} + +func (c *GetPipeline) HandleAction(ctx core.ActionContext) error { + return nil +} + +func (c *GetPipeline) HandleWebhook(ctx core.WebhookRequestContext) (int, error) { + return http.StatusOK, nil +} + +func (c *GetPipeline) Cleanup(ctx core.SetupContext) error { + return nil +} diff --git a/pkg/integrations/semaphore/get_pipeline_test.go b/pkg/integrations/semaphore/get_pipeline_test.go new file mode 100644 index 0000000000..51fd1ade52 --- /dev/null +++ b/pkg/integrations/semaphore/get_pipeline_test.go @@ -0,0 +1,145 @@ +package semaphore + +import ( + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/superplanehq/superplane/pkg/core" + "github.com/superplanehq/superplane/test/support/contexts" +) + +func Test__GetPipeline__Setup(t *testing.T) { + component := &GetPipeline{} + + t.Run("pipelineId is required", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Integration: &contexts.IntegrationContext{}, + Metadata: &contexts.MetadataContext{}, + Configuration: GetPipelineSpec{PipelineID: ""}, + }) + + require.ErrorContains(t, err, "pipelineId is required") + }) + + t.Run("valid configuration", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Integration: &contexts.IntegrationContext{}, + Metadata: &contexts.MetadataContext{}, + Configuration: GetPipelineSpec{PipelineID: "pipeline-123"}, + }) + + require.NoError(t, err) + }) + + t.Run("invalid configuration -> decode error", func(t *testing.T) { + err := component.Setup(core.SetupContext{ + Integration: &contexts.IntegrationContext{}, + Metadata: &contexts.MetadataContext{}, + Configuration: "invalid-config", + }) + + require.ErrorContains(t, err, "failed to decode configuration") + }) +} + +func Test__GetPipeline__Execute(t *testing.T) { + component := &GetPipeline{} + + t.Run("successful pipeline fetch", func(t *testing.T) { + responseBody := `{ + "pipeline": { + "ppl_id": "pipeline-123", + "name": "Build Pipeline", + "wf_id": "workflow-456", + "state": "done", + "result": "passed", + "result_reason": "test", + "yaml_file_name": "semaphore.yml", + "working_directory": ".semaphore" + } + }` + + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(responseBody)), + }, + }, + } + + integrationCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "organizationUrl": "https://example.semaphoreci.com", + "apiToken": "token-123", + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: map[string]string{}, + } + + err := component.Execute(core.ExecutionContext{ + HTTP: httpContext, + Integration: integrationCtx, + Configuration: GetPipelineSpec{PipelineID: "pipeline-123"}, + ExecutionState: executionState, + }) + + require.NoError(t, err) + assert.True(t, executionState.Finished) + assert.True(t, executionState.Passed) + assert.Equal(t, core.DefaultOutputChannel.Name, executionState.Channel) + assert.Equal(t, GetPipelinePayloadType, executionState.Type) + require.Len(t, executionState.Payloads, 1) + + // The mock wraps payloads as {type, timestamp, data} + wrapped := executionState.Payloads[0].(map[string]any) + pipelineData := wrapped["data"].(map[string]any) + assert.Equal(t, "pipeline-123", pipelineData["ppl_id"]) + assert.Equal(t, "Build Pipeline", pipelineData["name"]) + assert.Equal(t, "workflow-456", pipelineData["wf_id"]) + assert.Equal(t, "done", pipelineData["state"]) + assert.Equal(t, "passed", pipelineData["result"]) + + require.Len(t, httpContext.Requests, 1) + assert.Equal(t, "https://example.semaphoreci.com/api/v1alpha/pipelines/pipeline-123", httpContext.Requests[0].URL.String()) + }) + + t.Run("API error", func(t *testing.T) { + httpContext := &contexts.HTTPContext{ + Responses: []*http.Response{ + { + StatusCode: http.StatusNotFound, + Body: io.NopCloser(strings.NewReader("not found")), + }, + }, + } + + integrationCtx := &contexts.IntegrationContext{ + Configuration: map[string]any{ + "organizationUrl": "https://example.semaphoreci.com", + "apiToken": "token-123", + }, + } + + executionState := &contexts.ExecutionStateContext{ + KVs: map[string]string{}, + } + + err := component.Execute(core.ExecutionContext{ + HTTP: httpContext, + Integration: integrationCtx, + Configuration: GetPipelineSpec{PipelineID: "invalid-id"}, + ExecutionState: executionState, + }) + + require.Error(t, err) + assert.ErrorContains(t, err, "failed to get pipeline") + assert.False(t, executionState.Finished) + }) +} diff --git a/pkg/integrations/semaphore/semaphore.go b/pkg/integrations/semaphore/semaphore.go index e08bb78dd2..b473326c32 100644 --- a/pkg/integrations/semaphore/semaphore.go +++ b/pkg/integrations/semaphore/semaphore.go @@ -115,6 +115,7 @@ func (s *Semaphore) HandleAction(ctx core.IntegrationActionContext) error { func (s *Semaphore) Components() []core.Component { return []core.Component{ &RunWorkflow{}, + &GetPipeline{}, } } diff --git a/web_src/src/pages/workflowv2/mappers/semaphore/get_pipeline.ts b/web_src/src/pages/workflowv2/mappers/semaphore/get_pipeline.ts new file mode 100644 index 0000000000..dcb4991421 --- /dev/null +++ b/web_src/src/pages/workflowv2/mappers/semaphore/get_pipeline.ts @@ -0,0 +1,149 @@ +import { ComponentBaseProps, EventSection } from "@/ui/componentBase"; +import { getBackgroundColorClass } from "@/utils/colors"; +import { getState, getStateMap, getTriggerRenderer } from ".."; +import { + ComponentBaseContext, + ComponentBaseMapper, + ExecutionDetailsContext, + ExecutionInfo, + NodeInfo, + OutputPayload, + SubtitleContext, +} from "../types"; +import { MetadataItem } from "@/ui/metadataList"; +import { stringOrDash } from "../utils"; +import SemaphoreLogo from "@/assets/semaphore-logo-sign-black.svg"; +import { formatTimeAgo } from "@/utils/date"; + +interface PipelineData { + name?: string; + ppl_id?: string; + wf_id?: string; + state?: string; + result?: string; + result_reason?: string; + created_at?: { seconds?: number; nanos?: number } | string; + done_at?: { seconds?: number; nanos?: number } | string; + running_at?: { seconds?: number; nanos?: number } | string; + pending_at?: { seconds?: number; nanos?: number } | string; + queuing_at?: { seconds?: number; nanos?: number } | string; + stopping_at?: { seconds?: number; nanos?: number } | string; + yaml_file_name?: string; + working_directory?: string; + error_description?: string; +} + +function formatTimestamp(value?: { seconds?: number; nanos?: number } | string): string | undefined { + if (!value) return undefined; + + if (typeof value === "string") { + const date = new Date(value); + if (Number.isNaN(date.getTime())) return undefined; + return date.toLocaleString(); + } + + if (typeof value === "object" && value.seconds) { + const date = new Date(value.seconds * 1000); + if (Number.isNaN(date.getTime())) return undefined; + return date.toLocaleString(); + } + + return undefined; +} + +export const getPipelineMapper: ComponentBaseMapper = { + props(context: ComponentBaseContext): ComponentBaseProps { + const lastExecution = context.lastExecutions.length > 0 ? context.lastExecutions[0] : null; + const componentName = context.componentDefinition.name || "unknown"; + + return { + iconSrc: SemaphoreLogo, + collapsedBackground: getBackgroundColorClass(context.componentDefinition.color), + collapsed: context.node.isCollapsed, + title: + context.node.name || + context.componentDefinition.label || + context.componentDefinition.name || + "Unnamed component", + eventSections: lastExecution ? getPipelineEventSections(context.nodes, lastExecution, componentName) : undefined, + metadata: getPipelineMetadata(context.node), + includeEmptyState: !lastExecution, + eventStateMap: getStateMap(componentName), + }; + }, + + getExecutionDetails(context: ExecutionDetailsContext): Record { + const details: Record = {}; + const outputs = context.execution.outputs as { default?: OutputPayload[] } | undefined; + + if (!outputs?.default || outputs.default.length === 0) { + return details; + } + + const pipeline = outputs.default[0].data as PipelineData; + if (!pipeline || typeof pipeline !== "object") { + return details; + } + + details["Pipeline Name"] = stringOrDash(pipeline.name); + details["Pipeline ID"] = stringOrDash(pipeline.ppl_id); + details["Workflow ID"] = stringOrDash(pipeline.wf_id); + details["State"] = stringOrDash(pipeline.state); + details["Result"] = stringOrDash(pipeline.result); + + const doneAt = formatTimestamp(pipeline.done_at); + if (doneAt) { + details["Done At"] = doneAt; + } + + const createdAt = formatTimestamp(pipeline.created_at); + if (createdAt) { + details["Created At"] = createdAt; + } + + if (pipeline.yaml_file_name) { + const pipelineFile = pipeline.working_directory + ? `${pipeline.working_directory}/${pipeline.yaml_file_name}`.replace("//", "/") + : pipeline.yaml_file_name; + details["Pipeline File"] = pipelineFile; + } + + if (pipeline.error_description) { + details["Error"] = pipeline.error_description; + } + + return details; + }, + + subtitle(context: SubtitleContext): string { + if (!context.execution.createdAt) return ""; + return formatTimeAgo(new Date(context.execution.createdAt)); + }, +}; + +function getPipelineMetadata(node: NodeInfo): MetadataItem[] { + const metadata: MetadataItem[] = []; + const configuration = node.configuration as { pipelineId?: string }; + + if (configuration?.pipelineId) { + metadata.push({ icon: "git-branch", label: "Pipeline: " + configuration.pipelineId }); + } + + return metadata; +} + +function getPipelineEventSections(nodes: NodeInfo[], execution: ExecutionInfo, componentName: string): EventSection[] { + const rootTriggerNode = nodes.find((n) => n.id === execution.rootEvent?.nodeId); + const rootTriggerRenderer = getTriggerRenderer(rootTriggerNode?.componentName!); + const { title } = rootTriggerRenderer.getTitleAndSubtitle({ event: execution.rootEvent }); + + return [ + { + receivedAt: new Date(execution.createdAt!), + eventTitle: title, + eventSubtitle: formatTimeAgo(new Date(execution.createdAt!)), + eventState: getState(componentName)(execution), + eventId: execution.rootEvent!.id!, + }, + ]; +} diff --git a/web_src/src/pages/workflowv2/mappers/semaphore/index.ts b/web_src/src/pages/workflowv2/mappers/semaphore/index.ts index 883ee973b0..cb1a0d8fbc 100644 --- a/web_src/src/pages/workflowv2/mappers/semaphore/index.ts +++ b/web_src/src/pages/workflowv2/mappers/semaphore/index.ts @@ -1,9 +1,12 @@ import { ComponentBaseMapper, EventStateRegistry, TriggerRenderer } from "../types"; import { onPipelineDoneTriggerRenderer } from "./on_pipeline_done"; import { RUN_WORKFLOW_STATE_REGISTRY, runWorkflowMapper } from "./run_workflow"; +import { getPipelineMapper } from "./get_pipeline"; +import { buildActionStateRegistry } from "../utils"; export const componentMappers: Record = { runWorkflow: runWorkflowMapper, + getPipeline: getPipelineMapper, }; export const triggerRenderers: Record = { @@ -12,4 +15,5 @@ export const triggerRenderers: Record = { export const eventStateRegistry: Record = { runWorkflow: RUN_WORKFLOW_STATE_REGISTRY, + getPipeline: buildActionStateRegistry("retrieved"), };