@@ -190,7 +190,8 @@ func FlowDispatcher(flowCr interface{}) (*CommonFlow, error) {
190190 },
191191 }
192192 }
193- flow , err := types .NewFlow (matches , f .Name , f .Namespace )
193+ id := fmt .Sprintf ("clusterflow" + ":" + f .Namespace + ":" + f .Name )
194+ flow , err := types .NewFlow (matches , id , f .Name , f .Namespace )
194195 if err != nil {
195196 return nil , err
196197 }
@@ -223,7 +224,8 @@ func FlowDispatcher(flowCr interface{}) (*CommonFlow, error) {
223224 },
224225 }
225226 }
226- flow , err := types .NewFlow (matches , f .Name , f .Namespace )
227+ id := fmt .Sprintf ("flow" + ":" + f .Namespace + ":" + f .Name )
228+ flow , err := types .NewFlow (matches , id , f .Name , f .Namespace )
227229 commonFlow .Flow = flow
228230 if err != nil {
229231 return nil , err
@@ -240,15 +242,22 @@ func (l *LoggingResources) CreateFlowFromCustomResource(flowCr interface{}) (*ty
240242 }
241243 flow := commonFlow .Flow
242244 outputs := []types.Output {}
245+ var flowType string
246+ if commonFlow .Scope != "" {
247+ flowType = "flow"
248+ } else {
249+ flowType = "clusterflow"
250+ }
243251 var multierr error
244252FindOutputForAllRefs:
245253 for _ , outputRef := range commonFlow .OutputRefs {
246254 // only namespaced flows should use namespaced outputs
247255 if commonFlow .Scope != "" {
248256 for _ , output := range l .Outputs {
249257 // only an output from the same namespace can be used with a matching name
258+ // flow -> output (matching)
250259 if output .Namespace == commonFlow .Scope && outputRef == output .Name {
251- outputId := commonFlow . Scope + "_" + commonFlow .Name + "_" + output .Name
260+ outputId := fmt . Sprintf ( "%s:%s:%s:output:%s:%s" , flowType , commonFlow .Namespace , commonFlow . Name , output . Namespace , output .Name )
252261 plugin , err := plugins .CreateOutput (output .Spec , outputId , secret .NewSecretLoader (l .client , output .Namespace , fluentd .OutputSecretPath , l .Secrets ))
253262 if err != nil {
254263 multierr = errors .Combine (multierr , errors .WrapIff (err , "failed to create configured output %s" , outputRef ))
@@ -261,7 +270,9 @@ FindOutputForAllRefs:
261270 }
262271 for _ , clusterOutput := range l .ClusterOutputs {
263272 if outputRef == clusterOutput .Name {
264- outputId := commonFlow .Namespace + "_" + commonFlow .Name + "_" + clusterOutput .Name
273+ // flow, clusterflow -> clusterOutput
274+ // diff flow / clusterflow based on scope
275+ outputId := fmt .Sprintf ("%s:%s:%s:clusteroutput:%s:%s" , flowType , commonFlow .Namespace , commonFlow .Name , clusterOutput .Namespace , clusterOutput .Name )
265276 plugin , err := plugins .CreateOutput (clusterOutput .Spec .OutputSpec , outputId , secret .NewSecretLoader (l .client , clusterOutput .Namespace , fluentd .OutputSecretPath , l .Secrets ))
266277 if err != nil {
267278 multierr = errors .Combine (multierr , errors .WrapIff (err , "failed to create configured output %s" , outputRef ))
@@ -278,7 +289,7 @@ FindOutputForAllRefs:
278289 // Filter
279290 var filters []types.Filter
280291 for i , f := range commonFlow .Filters {
281- id := fmt .Sprintf ("%s_%s_%d" , commonFlow .Namespace , commonFlow .Name , i )
292+ id := fmt .Sprintf ("%s:%s:%s:%d" , flowType , commonFlow .Namespace , commonFlow .Name , i )
282293 filter , err := plugins .CreateFilter (f , id , secret .NewSecretLoader (l .client , commonFlow .Namespace , fluentd .OutputSecretPath , l .Secrets ))
283294 if err != nil {
284295 multierr = errors .Combine (multierr , errors .WrapIff (err , "failed to create filter with index %d for flow %s" , i , commonFlow .Name ))
0 commit comments