Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions operator/internal/manifests/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"path"

"github.com/ViaQ/logerr/v2/kverrors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
Expand All @@ -14,6 +15,7 @@ import (
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

lokiv1 "github.com/grafana/loki/operator/api/loki/v1"
"github.com/grafana/loki/operator/internal/manifests/internal/config"
"github.com/grafana/loki/operator/internal/manifests/storage"
)
Expand Down Expand Up @@ -62,11 +64,16 @@ func BuildIngester(opts Options) ([]client.Object, error) {
return nil, err
}

pdb, err := newIngesterPodDisruptionBudget(opts)
if err != nil {
return nil, err
}

return []client.Object{
statefulSet,
NewIngesterGRPCService(opts),
NewIngesterHTTPService(opts),
newIngesterPodDisruptionBudget(opts),
pdb,
}, nil
}

Expand Down Expand Up @@ -288,13 +295,16 @@ func configureIngesterGRPCServicePKI(sts *appsv1.StatefulSet, opts Options) erro

// newIngesterPodDisruptionBudget returns a PodDisruptionBudget for the LokiStack
// Ingester pods.
func newIngesterPodDisruptionBudget(opts Options) *policyv1.PodDisruptionBudget {
func newIngesterPodDisruptionBudget(opts Options) (*policyv1.PodDisruptionBudget, error) {
l := ComponentLabels(LabelIngesterComponent, opts.Name)
// Default to 1 if not defined in ResourceRequirementsTable for a given size
mu := intstr.FromInt(1)
if opts.ResourceRequirements.Ingester.PDBMinAvailable > 0 {
mu = intstr.FromInt(opts.ResourceRequirements.Ingester.PDBMinAvailable)
mu, err := getPDBMinAvailable(opts)
if err != nil {
return nil, err
}
/* mu := intstr.FromInt(1)
if opts.ResourceRequirements.Ingester.PDBMinAvailable > 0 {
mu = pdbMinAvailable
} */
return &policyv1.PodDisruptionBudget{
TypeMeta: metav1.TypeMeta{
Kind: "PodDisruptionBudget",
Expand All @@ -311,5 +321,27 @@ func newIngesterPodDisruptionBudget(opts Options) *policyv1.PodDisruptionBudget
},
MinAvailable: &mu,
},
}, nil
}

func getPDBMinAvailable(opts Options) (intstr.IntOrString, error) {
/*
size | ReplicationFactor | PDBMinAvailablePods | Ingester Replicas
1x.demo | 1 | 1 | 1
1x.pico | 2 | 2 | 3
1x.extra-small| 2 | 1 | 2
1x.small | 2 | 1 | 2
1x.medium | 2 | 2 | 3 */

switch opts.Stack.Size {
case lokiv1.SizeOneXDemo:
return intstr.FromInt(1), nil
case lokiv1.SizeOneXExtraSmall, lokiv1.SizeOneXSmall:
return intstr.FromInt32(opts.Stack.Replication.Factor - 1), nil
}
if opts.Stack.Template.Ingester.Replicas <= opts.Stack.Replication.Factor {
err := kverrors.New("failed to set PodDisruptionBudget. Replication factor should be less than ingester replicas")
return intstr.FromInt32(0), err
}
return intstr.FromInt32(opts.Stack.Replication.Factor), nil
}
68 changes: 62 additions & 6 deletions operator/internal/manifests/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

v1 "github.com/grafana/loki/operator/api/config/v1"
lokiv1 "github.com/grafana/loki/operator/api/loki/v1"
"github.com/grafana/loki/operator/internal/manifests/internal"
"github.com/grafana/loki/operator/internal/manifests/storage"
)

Expand Down Expand Up @@ -108,16 +107,19 @@ func TestNewIngesterStatefulSet_SelectorMatchesLabels(t *testing.T) {
func TestBuildIngester_PodDisruptionBudget(t *testing.T) {
for _, tc := range []struct {
Name string
Size lokiv1.LokiStackSizeType
PDBMinAvailable int
ExpectedMinAvailable int
}{
{
Name: "Small stack",
Size: lokiv1.SizeOneXSmall,
PDBMinAvailable: 1,
ExpectedMinAvailable: 1,
},
{
Name: "Medium stack",
Size: lokiv1.SizeOneXMedium,
PDBMinAvailable: 2,
ExpectedMinAvailable: 2,
},
Expand All @@ -127,12 +129,8 @@ func TestBuildIngester_PodDisruptionBudget(t *testing.T) {
Name: "abcd",
Namespace: "efgh",
Gates: v1.FeatureGates{},
ResourceRequirements: internal.ComponentResources{
Ingester: internal.ResourceRequirements{
PDBMinAvailable: tc.PDBMinAvailable,
},
},
Stack: lokiv1.LokiStackSpec{
Size: tc.Size,
Template: &lokiv1.LokiTemplateSpec{
Ingester: &lokiv1.LokiComponentSpec{
Replicas: rand.Int31(),
Expand All @@ -141,6 +139,9 @@ func TestBuildIngester_PodDisruptionBudget(t *testing.T) {
Tenants: &lokiv1.TenantsSpec{
Mode: lokiv1.OpenshiftLogging,
},
Replication: &lokiv1.ReplicationSpec{
Factor: int32(2),
},
},
}
objs, err := BuildIngester(opts)
Expand All @@ -158,6 +159,61 @@ func TestBuildIngester_PodDisruptionBudget(t *testing.T) {
}
}

func TestBuildIngester_PodDisruptionBudgetWithReplicationFactor(t *testing.T) {
ingesterReplicas := 3
for _, tc := range []struct {
Name string
CustomReplicationFactor int32
PDBMinAvailable int
ExpectedMinAvailable int
}{
{
Name: "ingester replicas <= replication factor",
CustomReplicationFactor: 4,
PDBMinAvailable: 2,
ExpectedMinAvailable: 0,
},
{
Name: "ingester replicas > replication factor",
CustomReplicationFactor: 2,
PDBMinAvailable: 1,
ExpectedMinAvailable: 2,
},
} {
t.Run(tc.Name, func(t *testing.T) {
opts := Options{
Name: "abcd",
Namespace: "efgh",
Gates: v1.FeatureGates{},
Stack: lokiv1.LokiStackSpec{
Template: &lokiv1.LokiTemplateSpec{
Ingester: &lokiv1.LokiComponentSpec{
Replicas: int32(ingesterReplicas),
},
},
Tenants: &lokiv1.TenantsSpec{
Mode: lokiv1.OpenshiftLogging,
},
Replication: &lokiv1.ReplicationSpec{
Factor: tc.CustomReplicationFactor,
},
},
}
objs, err := BuildIngester(opts)

if err != nil {
require.Error(t, err)
} else {
require.NoError(t, err)
pdb := objs[3].(*policyv1.PodDisruptionBudget)
require.NotNil(t, pdb)
require.NotNil(t, pdb.Spec.MinAvailable.IntVal)
require.Equal(t, int32(tc.ExpectedMinAvailable), pdb.Spec.MinAvailable.IntVal)
}
})
}
}

func TestNewIngesterStatefulSet_TopologySpreadConstraints(t *testing.T) {
obj, _ := BuildIngester(Options{
Name: "abcd",
Expand Down
18 changes: 6 additions & 12 deletions operator/internal/manifests/internal/sizes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,16 @@ func (c ComponentResources) DeepCopy() ComponentResources {

// ResourceRequirements sets CPU, Memory, and PVC requirements for a component
type ResourceRequirements struct {
Limits corev1.ResourceList
Requests corev1.ResourceList
PVCSize resource.Quantity
PDBMinAvailable int
Limits corev1.ResourceList
Requests corev1.ResourceList
PVCSize resource.Quantity
}

func (r *ResourceRequirements) DeepCopy() *ResourceRequirements {
return &ResourceRequirements{
Limits: r.Limits.DeepCopy(),
Requests: r.Requests.DeepCopy(),
PVCSize: r.PVCSize.DeepCopy(),
PDBMinAvailable: r.PDBMinAvailable,
Limits: r.Limits.DeepCopy(),
Requests: r.Requests.DeepCopy(),
PVCSize: r.PVCSize.DeepCopy(),
}
}

Expand Down Expand Up @@ -91,7 +89,6 @@ var resourceRequirementsTable = map[lokiv1.LokiStackSizeType]ComponentResources{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("3Gi"),
},
PDBMinAvailable: 2,
},
Distributor: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
Expand Down Expand Up @@ -149,7 +146,6 @@ var resourceRequirementsTable = map[lokiv1.LokiStackSizeType]ComponentResources{
corev1.ResourceCPU: resource.MustParse("2"),
corev1.ResourceMemory: resource.MustParse("8Gi"),
},
PDBMinAvailable: 1,
},
Distributor: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
Expand Down Expand Up @@ -207,7 +203,6 @@ var resourceRequirementsTable = map[lokiv1.LokiStackSizeType]ComponentResources{
corev1.ResourceCPU: resource.MustParse("4"),
corev1.ResourceMemory: resource.MustParse("20Gi"),
},
PDBMinAvailable: 1,
},
Distributor: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
Expand Down Expand Up @@ -265,7 +260,6 @@ var resourceRequirementsTable = map[lokiv1.LokiStackSizeType]ComponentResources{
corev1.ResourceCPU: resource.MustParse("6"),
corev1.ResourceMemory: resource.MustParse("30Gi"),
},
PDBMinAvailable: 2,
},
Distributor: corev1.ResourceRequirements{
Requests: map[corev1.ResourceName]resource.Quantity{
Expand Down