Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ services:
START_WEBHOOK_CLEANUP_WORKER: "yes"
START_INTEGRATION_CLEANUP_WORKER: "yes"
START_CANVAS_CLEANUP_WORKER: "yes"
START_NODE_CLEANUP_WORKER: "yes"
WEB_BASE_PATH: ""
SENTRY_DSN: ""
SENTRY_ENVIRONMENT: ${SENTRY_ENVIRONMENT:-development}
Expand Down
41 changes: 41 additions & 0 deletions pkg/models/canvas_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,44 @@ func FindNodeQueueItem(workflowID uuid.UUID, queueItemID uuid.UUID) (*CanvasNode

return &queueItem, nil
}

// ListDeletedCanvasNodes returns soft-deleted canvas nodes whose parent canvas
// is NOT soft-deleted. These are nodes removed during workflow updates that
// need their associated resources cleaned up.
func ListDeletedCanvasNodes() ([]CanvasNode, error) {
var nodes []CanvasNode
err := database.Conn().
Unscoped().
Joins("JOIN workflows ON workflow_nodes.workflow_id = workflows.id").
Where("workflow_nodes.deleted_at IS NOT NULL").
Where("workflows.deleted_at IS NULL").
Find(&nodes).
Error

if err != nil {
return nil, err
}

return nodes, nil
}

// LockDeletedCanvasNode locks a soft-deleted canvas node for processing
// using SELECT FOR UPDATE SKIP LOCKED.
func LockDeletedCanvasNode(tx *gorm.DB, workflowID uuid.UUID, nodeID string) (*CanvasNode, error) {
var node CanvasNode

err := tx.
Unscoped().
Clauses(clause.Locking{Strength: "UPDATE", Options: "SKIP LOCKED"}).
Where("workflow_id = ?", workflowID).
Where("node_id = ?", nodeID).
Where("deleted_at IS NOT NULL").
First(&node).
Error

if err != nil {
return nil, err
}

return &node, nil
}
7 changes: 7 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func startWorkers(encryptor crypto.Encryptor, registry *registry.Registry, oidcP
w := workers.NewCanvasCleanupWorker()
go w.Start(context.Background())
}

if os.Getenv("START_NODE_CLEANUP_WORKER") == "yes" {
log.Println("Starting Canvas Node Cleanup Worker")

w := workers.NewCanvasNodeCleanupWorker()
go w.Start(context.Background())
}
}

func startEmailConsumers(rabbitMQURL string, encryptor crypto.Encryptor, baseURL string, authService authorization.Authorization) {
Expand Down
37 changes: 37 additions & 0 deletions pkg/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var (
workflowCleanupWorkerTickHistogram metric.Float64Histogram
workflowCleanupWorkerCanvasesCountHistogram metric.Int64Histogram

nodeCleanupWorkerTickHistogram metric.Float64Histogram
nodeCleanupWorkerNodesCountHistogram metric.Int64Histogram

dbLocksCountHistogram metric.Int64Histogram
dbLongQueriesCountHistogram metric.Int64Histogram
)
Expand Down Expand Up @@ -141,6 +144,24 @@ func InitMetrics(ctx context.Context) error {
return err
}

nodeCleanupWorkerTickHistogram, err = meter.Float64Histogram(
"node_cleanup_worker.tick.duration.seconds",
metric.WithDescription("Duration of each CanvasNodeCleanupWorker tick"),
metric.WithUnit("s"),
)
if err != nil {
return err
}

nodeCleanupWorkerNodesCountHistogram, err = meter.Int64Histogram(
"node_cleanup_worker.tick.nodes.deleted",
metric.WithDescription("Number of deleted canvas nodes processed each tick"),
metric.WithUnit("1"),
)
if err != nil {
return err
}

dbLocksCountHistogram, err = meter.Int64Histogram(
"db.locks.count",
metric.WithDescription("Number of database locks"),
Expand Down Expand Up @@ -260,6 +281,22 @@ func RecordWorkflowCleanupWorkerCanvasesCount(ctx context.Context, count int) {
workflowCleanupWorkerCanvasesCountHistogram.Record(ctx, int64(count))
}

func RecordNodeCleanupWorkerTickDuration(ctx context.Context, d time.Duration) {
if !metricsReady.Load() {
return
}

nodeCleanupWorkerTickHistogram.Record(ctx, d.Seconds())
}

func RecordNodeCleanupWorkerNodesCount(ctx context.Context, count int) {
if !metricsReady.Load() {
return
}

nodeCleanupWorkerNodesCountHistogram.Record(ctx, int64(count))
}

func RecordDBLocksCount(ctx context.Context, count int64) {
if !metricsReady.Load() {
return
Expand Down
54 changes: 1 addition & 53 deletions pkg/workers/canvas_cleanup_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"github.com/google/uuid"
"golang.org/x/sync/semaphore"
"gorm.io/gorm"

Expand Down Expand Up @@ -101,7 +100,7 @@ func (w *CanvasCleanupWorker) processCanvas(tx *gorm.DB, canvas models.Canvas) e
break
}

resourcesDeleted, allResourcesDeleted, err := w.deleteNodeResourcesBatched(tx, canvas.ID, node.NodeID, w.maxResourcesPerTick-totalResourcesDeleted)
resourcesDeleted, allResourcesDeleted, err := deleteNodeResourcesBatched(tx, canvas.ID, node.NodeID, w.maxResourcesPerTick-totalResourcesDeleted)
if err != nil {
return fmt.Errorf("failed to delete resources for node %s: %w", node.NodeID, err)
}
Expand Down Expand Up @@ -145,54 +144,3 @@ func (w *CanvasCleanupWorker) processCanvas(tx *gorm.DB, canvas models.Canvas) e
w.logger.Infof("Successfully cleaned up canvas %s (deleted %d resources total)", canvas.ID, totalResourcesDeleted)
return nil
}

func (w *CanvasCleanupWorker) deleteNodeResourcesBatched(tx *gorm.DB, workflowID uuid.UUID, nodeID string, maxResources int) (resourcesDeleted int, allResourcesDeleted bool, err error) {
resourceTypes := []struct {
model interface{}
tableName string
}{
{&models.CanvasNodeRequest{}, "canvas_node_requests"},
{&models.CanvasNodeExecutionKV{}, "canvas_node_execution_kvs"},
{&models.CanvasNodeExecution{}, "canvas_node_executions"},
{&models.CanvasNodeQueueItem{}, "canvas_node_queue_items"},
{&models.CanvasEvent{}, "canvas_events"},
}

totalDeleted := 0
allDeleted := true

for _, resourceType := range resourceTypes {
if totalDeleted >= maxResources {
allDeleted = false
break
}

remaining := maxResources - totalDeleted

// Delete in batches with LIMIT
result := tx.Unscoped().Where("workflow_id = ? AND node_id = ?", workflowID, nodeID).Limit(remaining).Delete(resourceType.model)
if result.Error != nil {
return totalDeleted, false, fmt.Errorf("failed to delete %s: %w", resourceType.tableName, result.Error)
}

deleted := int(result.RowsAffected)
totalDeleted += deleted

if deleted != remaining {
continue
}

var count int64

if err := tx.Unscoped().Model(resourceType.model).Where("workflow_id = ? AND node_id = ?", workflowID, nodeID).Count(&count).Error; err != nil {
return totalDeleted, false, fmt.Errorf("failed to count remaining %s: %w", resourceType.tableName, err)
}

if count > 0 {
allDeleted = false
break
}
}

return totalDeleted, allDeleted, nil
}
104 changes: 104 additions & 0 deletions pkg/workers/canvas_node_cleanup_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package workers

import (
"context"
"fmt"
"time"

"golang.org/x/sync/semaphore"
"gorm.io/gorm"

log "github.com/sirupsen/logrus"
"github.com/superplanehq/superplane/pkg/database"
"github.com/superplanehq/superplane/pkg/models"
"github.com/superplanehq/superplane/pkg/telemetry"
)

type CanvasNodeCleanupWorker struct {
semaphore *semaphore.Weighted
logger *log.Entry
maxResourcesPerTick int
}

func NewCanvasNodeCleanupWorker() *CanvasNodeCleanupWorker {
return &CanvasNodeCleanupWorker{
semaphore: semaphore.NewWeighted(25),
logger: log.WithFields(log.Fields{"worker": "CanvasNodeCleanupWorker"}),
maxResourcesPerTick: 500,
}
}

func (w *CanvasNodeCleanupWorker) Start(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
tickStart := time.Now()
nodes, err := models.ListDeletedCanvasNodes()
if err != nil {
w.logger.Errorf("Error finding deleted nodes: %v", err)
continue
}

telemetry.RecordNodeCleanupWorkerNodesCount(context.Background(), len(nodes))

for _, node := range nodes {
if err := w.semaphore.Acquire(context.Background(), 1); err != nil {
w.logger.Errorf("Error acquiring semaphore: %v", err)
continue
}

go func(node models.CanvasNode) {
defer w.semaphore.Release(1)

if err := w.LockAndProcessNode(node); err != nil {
w.logger.Errorf("Error processing node %s from canvas %s: %v", node.NodeID, node.WorkflowID, err)
}
}(node)
}

telemetry.RecordNodeCleanupWorkerTickDuration(context.Background(), time.Since(tickStart))
}
}
}

func (w *CanvasNodeCleanupWorker) LockAndProcessNode(node models.CanvasNode) error {
return database.Conn().Transaction(func(tx *gorm.DB) error {
lockedNode, err := models.LockDeletedCanvasNode(tx, node.WorkflowID, node.NodeID)
if err != nil {
w.logger.Infof("Node %s from canvas %s already being processed - skipping", node.NodeID, node.WorkflowID)
return nil
}

w.logger.Infof("Processing deleted node %s from canvas %s", lockedNode.NodeID, lockedNode.WorkflowID)
return w.processNode(tx, *lockedNode)
})
}

func (w *CanvasNodeCleanupWorker) processNode(tx *gorm.DB, node models.CanvasNode) error {
if !node.DeletedAt.Valid {
w.logger.Infof("Skipping non-deleted node %s from canvas %s", node.NodeID, node.WorkflowID)
return nil
}

resourcesDeleted, allResourcesDeleted, err := deleteNodeResourcesBatched(tx, node.WorkflowID, node.NodeID, w.maxResourcesPerTick)
if err != nil {
return fmt.Errorf("failed to delete resources for node %s: %w", node.NodeID, err)
}

if !allResourcesDeleted {
w.logger.Infof("Partially cleaned node %s from canvas %s (deleted %d resources, more remain)", node.NodeID, node.WorkflowID, resourcesDeleted)
return nil
}

if err := tx.Unscoped().Where("workflow_id = ? AND node_id = ?", node.WorkflowID, node.NodeID).Delete(&models.CanvasNode{}).Error; err != nil {
return fmt.Errorf("failed to delete canvas node %s: %w", node.NodeID, err)
}

w.logger.Infof("Successfully cleaned up node %s from canvas %s (deleted %d resources)", node.NodeID, node.WorkflowID, resourcesDeleted)
return nil
}
Loading