From cd0b0cb4b1ccebead77055e6618ce36cc873037f Mon Sep 17 00:00:00 2001 From: "muhammad.fahlevi" Date: Tue, 16 Dec 2025 16:01:36 +0700 Subject: [PATCH 1/2] feat(compass): update condition --- plugins/sinks/compass/sink.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/plugins/sinks/compass/sink.go b/plugins/sinks/compass/sink.go index 33f71806..a59e1df9 100644 --- a/plugins/sinks/compass/sink.go +++ b/plugins/sinks/compass/sink.go @@ -226,21 +226,27 @@ func (s *Sink) buildLineage(asset *v1beta2.Asset) (upstreams, downstreams []Line return nil, nil } - for _, upstream := range lineage.Upstreams { - upstreams = append(upstreams, LineageRecord{ - URN: upstream.Urn, - Type: upstream.Type, - Service: upstream.Service, - }) + if lineage.GetUpstreams() != nil { + for _, upstream := range lineage.GetUpstreams() { + upstreams = append(upstreams, LineageRecord{ + URN: upstream.Urn, + Type: upstream.Type, + Service: upstream.Service, + }) + } } - for _, downstream := range lineage.Downstreams { - downstreams = append(downstreams, LineageRecord{ - URN: downstream.Urn, - Type: downstream.Type, - Service: downstream.Service, - }) + + if lineage.GetDownstreams() != nil { + for _, downstream := range lineage.GetDownstreams() { + downstreams = append(downstreams, LineageRecord{ + URN: downstream.Urn, + Type: downstream.Type, + Service: downstream.Service, + }) + } } + s.logger.Info("build lineage request", "upstreams", upstreams, "downstreams", downstreams) return upstreams, downstreams } @@ -258,13 +264,13 @@ func (*Sink) buildOwners(asset *v1beta2.Asset) []Owner { } func (s *Sink) buildLabels(asset *v1beta2.Asset) (map[string]string, error) { - total := len(s.config.Labels) + len(asset.Labels) + total := len(s.config.Labels) + len(asset.GetLabels()) if total == 0 { return nil, nil } labels := make(map[string]string, total) - for k, v := range asset.Labels { + for k, v := range asset.GetLabels() { labels[k] = v } From 4552189f26806ae84516bec5ff7564dcaa923ce8 Mon Sep 17 00:00:00 2001 From: "muhammad.fahlevi" Date: Tue, 16 Dec 2025 16:05:30 +0700 Subject: [PATCH 2/2] feat(compass): add proper log for lineage --- plugins/sinks/compass/sink.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/sinks/compass/sink.go b/plugins/sinks/compass/sink.go index a59e1df9..4fcf3ed7 100644 --- a/plugins/sinks/compass/sink.go +++ b/plugins/sinks/compass/sink.go @@ -246,7 +246,7 @@ func (s *Sink) buildLineage(asset *v1beta2.Asset) (upstreams, downstreams []Line } } - s.logger.Info("build lineage request", "upstreams", upstreams, "downstreams", downstreams) + s.logger.Info(fmt.Sprintf("build lineage request for %s", asset.GetUrn()), "upstreams", upstreams, "downstreams", downstreams) return upstreams, downstreams }