diff --git a/plugins/sinks/compass/sink.go b/plugins/sinks/compass/sink.go index 33f71806..4fcf3ed7 100644 --- a/plugins/sinks/compass/sink.go +++ b/plugins/sinks/compass/sink.go @@ -226,21 +226,27 @@ func (s *Sink) buildLineage(asset *v1beta2.Asset) (upstreams, downstreams []Line return nil, nil } - for _, upstream := range lineage.Upstreams { - upstreams = append(upstreams, LineageRecord{ - URN: upstream.Urn, - Type: upstream.Type, - Service: upstream.Service, - }) + if lineage.GetUpstreams() != nil { + for _, upstream := range lineage.GetUpstreams() { + upstreams = append(upstreams, LineageRecord{ + URN: upstream.Urn, + Type: upstream.Type, + Service: upstream.Service, + }) + } } - for _, downstream := range lineage.Downstreams { - downstreams = append(downstreams, LineageRecord{ - URN: downstream.Urn, - Type: downstream.Type, - Service: downstream.Service, - }) + + if lineage.GetDownstreams() != nil { + for _, downstream := range lineage.GetDownstreams() { + downstreams = append(downstreams, LineageRecord{ + URN: downstream.Urn, + Type: downstream.Type, + Service: downstream.Service, + }) + } } + s.logger.Info(fmt.Sprintf("build lineage request for %s", asset.GetUrn()), "upstreams", upstreams, "downstreams", downstreams) return upstreams, downstreams } @@ -258,13 +264,13 @@ func (*Sink) buildOwners(asset *v1beta2.Asset) []Owner { } func (s *Sink) buildLabels(asset *v1beta2.Asset) (map[string]string, error) { - total := len(s.config.Labels) + len(asset.Labels) + total := len(s.config.Labels) + len(asset.GetLabels()) if total == 0 { return nil, nil } labels := make(map[string]string, total) - for k, v := range asset.Labels { + for k, v := range asset.GetLabels() { labels[k] = v }