Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ services:
START_WEBHOOK_CLEANUP_WORKER: "yes"
START_INTEGRATION_CLEANUP_WORKER: "yes"
START_CANVAS_CLEANUP_WORKER: "yes"
START_NODE_CLEANUP_WORKER: "yes"
WEB_BASE_PATH: ""
SENTRY_DSN: ""
SENTRY_ENVIRONMENT: ${SENTRY_ENVIRONMENT:-development}
Expand Down
41 changes: 41 additions & 0 deletions pkg/models/canvas_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,44 @@ func FindNodeQueueItem(workflowID uuid.UUID, queueItemID uuid.UUID) (*CanvasNode

return &queueItem, nil
}

// ListDeletedCanvasNodes returns soft-deleted canvas nodes whose parent canvas
// is NOT soft-deleted. These are nodes removed during workflow updates that
// need their associated resources cleaned up.
func ListDeletedCanvasNodes() ([]CanvasNode, error) {
var nodes []CanvasNode
err := database.Conn().
Unscoped().
Joins("JOIN workflows ON workflow_nodes.workflow_id = workflows.id").
Where("workflow_nodes.deleted_at IS NOT NULL").
Where("workflows.deleted_at IS NULL").
Find(&nodes).
Error

if err != nil {
return nil, err
}

return nodes, nil
}

// LockDeletedCanvasNode locks a soft-deleted canvas node for processing
// using SELECT FOR UPDATE SKIP LOCKED.
func LockDeletedCanvasNode(tx *gorm.DB, workflowID uuid.UUID, nodeID string) (*CanvasNode, error) {
var node CanvasNode

err := tx.
Unscoped().
Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).
Where("workflow_id = ?", workflowID).
Where("node_id = ?", nodeID).
Where("deleted_at IS NOT NULL").
First(&node).
Error

if err != nil {
return nil, err
}

return &node, nil
}
7 changes: 7 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func startWorkers(encryptor crypto.Encryptor, registry *registry.Registry, oidcP
w := workers.NewCanvasCleanupWorker()
go w.Start(context.Background())
}

if os.Getenv("START_NODE_CLEANUP_WORKER") == "yes" {
log.Println("Starting Canvas Node Cleanup Worker")

w := workers.NewCanvasNodeCleanupWorker()
go w.Start(context.Background())
}
}

func startEmailConsumers(rabbitMQURL string, encryptor crypto.Encryptor, baseURL string, authService authorization.Authorization) {
Expand Down
37 changes: 37 additions & 0 deletions pkg/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var (
workflowCleanupWorkerTickHistogram metric.Float64Histogram
workflowCleanupWorkerCanvasesCountHistogram metric.Int64Histogram

nodeCleanupWorkerTickHistogram metric.Float64Histogram
nodeCleanupWorkerNodesCountHistogram metric.Int64Histogram

dbLocksCountHistogram metric.Int64Histogram
dbLongQueriesCountHistogram metric.Int64Histogram
)
Expand Down Expand Up @@ -141,6 +144,24 @@ func InitMetrics(ctx context.Context) error {
return err
}

nodeCleanupWorkerTickHistogram, err = meter.Float64Histogram(
"node_cleanup_worker.tick.duration.seconds",
metric.WithDescription("Duration of each CanvasNodeCleanupWorker tick"),
metric.WithUnit("s"),
)
if err != nil {
return err
}

nodeCleanupWorkerNodesCountHistogram, err = meter.Int64Histogram(
"node_cleanup_worker.tick.nodes.deleted",
metric.WithDescription("Number of deleted canvas nodes processed each tick"),
metric.WithUnit("1"),
)
if err != nil {
return err
}

dbLocksCountHistogram, err = meter.Int64Histogram(
"db.locks.count",
metric.WithDescription("Number of database locks"),
Expand Down Expand Up @@ -260,6 +281,22 @@ func RecordWorkflowCleanupWorkerCanvasesCount(ctx context.Context, count int) {
workflowCleanupWorkerCanvasesCountHistogram.Record(ctx, int64(count))
}

func RecordNodeCleanupWorkerTickDuration(ctx context.Context, d time.Duration) {
if !metricsReady.Load() {
return
}

nodeCleanupWorkerTickHistogram.Record(ctx, d.Seconds())
}

func RecordNodeCleanupWorkerNodesCount(ctx context.Context, count int) {
if !metricsReady.Load() {
return
}

nodeCleanupWorkerNodesCountHistogram.Record(ctx, int64(count))
}

func RecordDBLocksCount(ctx context.Context, count int64) {
if !metricsReady.Load() {
return
Expand Down
156 changes: 156 additions & 0 deletions pkg/workers/canvas_node_cleanup_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package workers

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"golang.org/x/sync/semaphore"
"gorm.io/gorm"

log "github.com/sirupsen/logrus"
"github.com/superplanehq/superplane/pkg/database"
"github.com/superplanehq/superplane/pkg/models"
"github.com/superplanehq/superplane/pkg/telemetry"
)

type CanvasNodeCleanupWorker struct {
semaphore *semaphore.Weighted
logger *log.Entry
maxResourcesPerTick int
}

func NewCanvasNodeCleanupWorker() *CanvasNodeCleanupWorker {
return &CanvasNodeCleanupWorker{
semaphore: semaphore.NewWeighted(25),
logger: log.WithFields(log.Fields{"worker": "CanvasNodeCleanupWorker"}),
maxResourcesPerTick: 500,
}
}

func (w *CanvasNodeCleanupWorker) Start(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
tickStart := time.Now()
nodes, err := models.ListDeletedCanvasNodes()
if err != nil {
w.logger.Errorf("Error finding deleted nodes: %v", err)
continue
}

telemetry.RecordNodeCleanupWorkerNodesCount(context.Background(), len(nodes))

for _, node := range nodes {
if err := w.semaphore.Acquire(context.Background(), 1); err != nil {
w.logger.Errorf("Error acquiring semaphore: %v", err)
continue
}

go func(node models.CanvasNode) {
defer w.semaphore.Release(1)

if err := w.LockAndProcessNode(node); err != nil {
w.logger.Errorf("Error processing node %s from canvas %s: %v", node.NodeID, node.WorkflowID, err)
}
}(node)
}

telemetry.RecordNodeCleanupWorkerTickDuration(context.Background(), time.Since(tickStart))
}
}
}

func (w *CanvasNodeCleanupWorker) LockAndProcessNode(node models.CanvasNode) error {
return database.Conn().Transaction(func(tx *gorm.DB) error {
lockedNode, err := models.LockDeletedCanvasNode(tx, node.WorkflowID, node.NodeID)
if err != nil {
w.logger.Infof("Node %s from canvas %s already being processed - skipping", node.NodeID, node.WorkflowID)
return nil
}

w.logger.Infof("Processing deleted node %s from canvas %s", lockedNode.NodeID, lockedNode.WorkflowID)
return w.processNode(tx, *lockedNode)
})
}

func (w *CanvasNodeCleanupWorker) processNode(tx *gorm.DB, node models.CanvasNode) error {
if !node.DeletedAt.Valid {
w.logger.Infof("Skipping non-deleted node %s from canvas %s", node.NodeID, node.WorkflowID)
return nil
}

resourcesDeleted, allResourcesDeleted, err := w.deleteNodeResourcesBatched(tx, node.WorkflowID, node.NodeID, w.maxResourcesPerTick)
if err != nil {
return fmt.Errorf("failed to delete resources for node %s: %w", node.NodeID, err)
}

if !allResourcesDeleted {
w.logger.Infof("Partially cleaned node %s from canvas %s (deleted %d resources, more remain)", node.NodeID, node.WorkflowID, resourcesDeleted)
return nil
}

if err := tx.Unscoped().Where("workflow_id = ? AND node_id = ?", node.WorkflowID, node.NodeID).Delete(&models.CanvasNode{}).Error; err != nil {
return fmt.Errorf("failed to delete canvas node %s: %w", node.NodeID, err)
}

w.logger.Infof("Successfully cleaned up node %s from canvas %s (deleted %d resources)", node.NodeID, node.WorkflowID, resourcesDeleted)
return nil
}

func (w *CanvasNodeCleanupWorker) deleteNodeResourcesBatched(tx *gorm.DB, workflowID uuid.UUID, nodeID string, maxResources int) (resourcesDeleted int, allResourcesDeleted bool, err error) {
resourceTypes := []struct {
model any
tableName string
}{
{&models.CanvasNodeRequest{}, "canvas_node_requests"},
{&models.CanvasNodeExecutionKV{}, "canvas_node_execution_kvs"},
{&models.CanvasNodeExecution{}, "canvas_node_executions"},
{&models.CanvasNodeQueueItem{}, "canvas_node_queue_items"},
{&models.CanvasEvent{}, "canvas_events"},
}

totalDeleted := 0
allDeleted := true

for _, resourceType := range resourceTypes {
if totalDeleted >= maxResources {
allDeleted = false
break
}

remaining := maxResources - totalDeleted

// Delete in batches with LIMIT
result := tx.Unscoped().Where("workflow_id = ? AND node_id = ?", workflowID, nodeID).Limit(remaining).Delete(resourceType.model)
if result.Error != nil {
return totalDeleted, false, fmt.Errorf("failed to delete %s: %w", resourceType.tableName, result.Error)
}

deleted := int(result.RowsAffected)
totalDeleted += deleted

if deleted != remaining {
continue
}

var count int64

if err := tx.Unscoped().Model(resourceType.model).Where("workflow_id = ? AND node_id = ?", workflowID, nodeID).Count(&count).Error; err != nil {
return totalDeleted, false, fmt.Errorf("failed to count remaining %s: %w", resourceType.tableName, err)
}

if count > 0 {
allDeleted = false
break
}
}

return totalDeleted, allDeleted, nil
}
Loading