Skip to content

Commit

Permalink
fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
siliconbrain committed Jun 18, 2021
1 parent 772847c commit 097e060
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3090,8 +3090,10 @@ spec:
type: string
scaling:
properties:
drainWatch:
drain:
properties:
enabled:
type: boolean
image:
properties:
imagePullSecrets:
Expand Down Expand Up @@ -10877,8 +10879,10 @@ spec:
type: string
scaling:
properties:
drainWatch:
drain:
properties:
enabled:
type: boolean
image:
properties:
imagePullSecrets:
Expand Down
8 changes: 6 additions & 2 deletions config/crd/bases/logging.banzaicloud.io_loggings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3090,8 +3090,10 @@ spec:
type: string
scaling:
properties:
drainWatch:
drain:
properties:
enabled:
type: boolean
image:
properties:
imagePullSecrets:
Expand Down Expand Up @@ -10877,8 +10879,10 @@ spec:
type: string
scaling:
properties:
drainWatch:
drain:
properties:
enabled:
type: boolean
image:
properties:
imagePullSecrets:
Expand Down
2 changes: 1 addition & 1 deletion drain-watch-image/drain-watch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ do

if [ "$(find $BUFFER_PATH -iname '*.buffer' -or -iname '*.buffer.meta' | wc -l)" = 0 ]
then
echo '['$(date)']' 'no buffers left, terminating workers:' "$(curl http://$RPC_ADDRESS/api/processes.killWorkers 2>/dev/null)"
echo '['$(date)']' 'no buffers left, terminating workers:' "$(curl --silent --show-error http://$RPC_ADDRESS/api/processes.killWorkers)"
exit 0
fi

Expand Down
1 change: 1 addition & 0 deletions pkg/resources/fluentd/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ package fluentd
const (
ComponentFluentd = "fluentd"
ComponentConfigCheck = "fluentd-configcheck"
ComponentDrainer = "fluentd-drainer"
)
34 changes: 19 additions & 15 deletions pkg/resources/fluentd/drainjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,36 @@ package fluentd
import (
"strings"

"github.com/banzaicloud/logging-operator/pkg/sdk/api/v1beta1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (r *Reconciler) drainJobFor(pvc corev1.PersistentVolumeClaim) (*batchv1.Job, error) {
func (r *Reconciler) drainerJobFor(pvc corev1.PersistentVolumeClaim) (*batchv1.Job, error) {
bufVolName := r.Logging.QualifiedName(r.Logging.Spec.FluentdSpec.BufferStorageVolume.PersistentVolumeClaim.PersistentVolumeSource.ClaimName)

var initContainers []corev1.Container
if c := r.volumeMountHackContainer(); c != nil {
initContainers = append(initContainers, *c)
}

fluentdContainer := r.fluentContainer() // TODO: don't redirect container logs
fluentdContainer := fluentContainer(withoutFluentOutLogrotate(r.Logging.Spec.FluentdSpec))
fluentdContainer.VolumeMounts = append(fluentdContainer.VolumeMounts, corev1.VolumeMount{
Name: bufVolName,
MountPath: bufferPath,
})
containers := []corev1.Container{
*fluentdContainer,
r.drainWatchContainer(bufVolName),
fluentdContainer,
drainWatchContainer(&r.Logging.Spec.FluentdSpec.Scaling.Drain, bufVolName),
}
if c := r.bufferMetricsSidecarContainer(); c != nil {
containers = append(containers, *c)
}

spec := batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: r.generatePodMeta(),
ObjectMeta: metav1.ObjectMeta{
Labels: r.getFluentdLabels(ComponentDrainer),
},
Spec: corev1.PodSpec{
Volumes: r.generateVolume(),
ServiceAccountName: r.getServiceAccount(),
InitContainers: initContainers,
ImagePullSecrets: r.Logging.Spec.FluentdSpec.Image.ImagePullSecrets,
Containers: containers,
NodeSelector: r.Logging.Spec.FluentdSpec.NodeSelector,
Expand All @@ -75,21 +73,21 @@ func (r *Reconciler) drainJobFor(pvc corev1.PersistentVolumeClaim) (*batchv1.Job
},
})
return &batchv1.Job{
ObjectMeta: r.FluentdObjectMeta(StatefulSetName+pvc.Name[strings.LastIndex(pvc.Name, "-"):]+"-drain", ComponentFluentd),
ObjectMeta: r.FluentdObjectMeta(StatefulSetName+pvc.Name[strings.LastIndex(pvc.Name, "-"):]+"-drainer", ComponentDrainer),
Spec: spec,
}, nil
}

func (r *Reconciler) drainWatchContainer(bufferVolumeName string) corev1.Container {
func drainWatchContainer(cfg *v1beta1.FluentdDrainConfig, bufferVolumeName string) corev1.Container {
return corev1.Container{
Env: []corev1.EnvVar{
{
Name: "BUFFER_PATH",
Value: bufferPath,
},
},
Image: r.Logging.Spec.FluentdSpec.Scaling.DrainWatch.Image.RepositoryWithTag(),
ImagePullPolicy: corev1.PullPolicy(r.Logging.Spec.FluentdSpec.Scaling.DrainWatch.Image.PullPolicy),
Image: cfg.Image.RepositoryWithTag(),
ImagePullPolicy: corev1.PullPolicy(cfg.Image.PullPolicy),
Name: "drain-watch",
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -100,3 +98,9 @@ func (r *Reconciler) drainWatchContainer(bufferVolumeName string) corev1.Contain
},
}
}

func withoutFluentOutLogrotate(spec *v1beta1.FluentdSpec) *v1beta1.FluentdSpec {
res := spec.DeepCopy()
res.FluentOutLogrotate = nil
return res
}
158 changes: 87 additions & 71 deletions pkg/resources/fluentd/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,93 +241,109 @@ func (r *Reconciler) Reconcile() (*reconcile.Result, error) {
}
}

if !r.Logging.Spec.FluentdSpec.DisablePvc {
nsOpt := client.InNamespace(r.Logging.Spec.ControlNamespace)
labelSet := r.getFluentdLabels(ComponentFluentd)

var pvcList corev1.PersistentVolumeClaimList
if err := r.Client.List(ctx, &pvcList, nsOpt,
client.MatchingLabelsSelector{
Selector: labels.SelectorFromSet(labelSet).Add(drainableRequirement),
}); err != nil {
return nil, errors.WrapIf(err, "listing PVC resources")
}
if err := r.reconcileDrain(ctx); err != nil {
return nil, err
}

var stsPods corev1.PodList
if err := r.Client.List(ctx, &stsPods, nsOpt, client.MatchingLabels(labelSet)); err != nil {
return nil, errors.WrapIf(err, "listing StatefulSet pods")
}
return nil, nil
}

bufVolName := r.Logging.QualifiedName(r.Logging.Spec.FluentdSpec.BufferStorageVolume.PersistentVolumeClaim.PersistentVolumeSource.ClaimName)
func (r *Reconciler) reconcileDrain(ctx context.Context) error {
if r.Logging.Spec.FluentdSpec.DisablePvc || !r.Logging.Spec.FluentdSpec.Scaling.Drain.Enabled {
return nil
}

livePVCs := make(map[string]bool)
for _, pod := range stsPods.Items {
if bufVol := findVolumeByName(pod.Spec.Volumes, bufVolName); bufVol != nil {
livePVCs[bufVol.PersistentVolumeClaim.ClaimName] = true
}
nsOpt := client.InNamespace(r.Logging.Spec.ControlNamespace)
labelSet := r.getFluentdLabels(ComponentFluentd)

var pvcList corev1.PersistentVolumeClaimList
if err := r.Client.List(ctx, &pvcList, nsOpt,
client.MatchingLabelsSelector{
Selector: labels.SelectorFromSet(labelSet).Add(drainableRequirement),
}); err != nil {
return errors.WrapIf(err, "listing PVC resources")
}

var stsPods corev1.PodList
if err := r.Client.List(ctx, &stsPods, nsOpt, client.MatchingLabels(labelSet)); err != nil {
return errors.WrapIf(err, "listing StatefulSet pods")
}

bufVolName := r.Logging.QualifiedName(r.Logging.Spec.FluentdSpec.BufferStorageVolume.PersistentVolumeClaim.PersistentVolumeSource.ClaimName)

pvcsInUse := make(map[string]bool)
for _, pod := range stsPods.Items {
if bufVol := findVolumeByName(pod.Spec.Volumes, bufVolName); bufVol != nil {
pvcsInUse[bufVol.PersistentVolumeClaim.ClaimName] = true
}
}

var jobList batchv1.JobList
if err := r.Client.List(ctx, &jobList, nsOpt, client.MatchingLabels(labelSet)); err != nil {
return nil, errors.WrapIf(err, "listing buffer drain jobs")
var jobList batchv1.JobList
if err := r.Client.List(ctx, &jobList, nsOpt, client.MatchingLabels(labelSet)); err != nil {
return errors.WrapIf(err, "listing buffer drainer jobs")
}

jobOfPVC := make(map[string]batchv1.Job)
for _, job := range jobList.Items {
if bufVol := findVolumeByName(job.Spec.Template.Spec.Volumes, bufVolName); bufVol != nil {
jobOfPVC[bufVol.PersistentVolumeClaim.ClaimName] = job
}
}

var errs error
for _, pvc := range pvcList.Items {
pvcLog := r.Log.WithValues("pvc", pvc.Name)

jobOfPVC := make(map[string]batchv1.Job)
for _, job := range jobList.Items {
if bufVol := findVolumeByName(job.Spec.Template.Spec.Volumes, bufVolName); bufVol != nil {
jobOfPVC[bufVol.PersistentVolumeClaim.ClaimName] = job
drained := markedAsDrained(pvc)
inUse := pvcsInUse[pvc.Name]
if drained && inUse {
pvcLog.Info("removing drained label from PVC as it has a matching statefulset pod")

patch := client.MergeFrom(pvc.DeepCopy())
delete(pvc.Labels, drainStatusLabelKey)
if err := client.IgnoreNotFound(r.Client.Patch(ctx, pvc.DeepCopy(), patch)); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "removing drained label from pvc"))
}
continue
}

var errs error
for _, pvc := range pvcList.Items {
pvcLog := r.Log.WithValues("pvc", pvc.Name)
drained := markedAsDrained(pvc)
live := livePVCs[pvc.Name]
if drained && live {
pvcLog.Info("removing drained label from PVC as it has a matching statefulset pod")
patch := client.MergeFrom(pvc.DeepCopy())
delete(pvc.Labels, drainStatusLabelKey)
if err := client.IgnoreNotFound(r.Client.Patch(ctx, &pvc, patch)); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "removing drained label from pvc"))
}
continue
job, hasJob := jobOfPVC[pvc.Name]
if hasJob && jobSuccessfullyCompleted(job) {
pvcLog.Info("drainer job for PVC has completed, adding drained label and deleting job")

patch := client.MergeFrom(pvc.DeepCopy())
pvc.Labels[drainStatusLabelKey] = drainStatusLabelValue
if err := client.IgnoreNotFound(r.Client.Patch(ctx, pvc.DeepCopy(), patch)); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "marking pvc as drained"))
}
job, hasJob := jobOfPVC[pvc.Name]
if hasJob && jobSuccessfullyCompleted(job) {
pvcLog.Info("drainer job for PVC has completed, adding drained label and deleting job")
patch := client.MergeFrom(pvc.DeepCopy())
pvc.Labels[drainStatusLabelKey] = drainStatusLabelValue
if err := client.IgnoreNotFound(r.Client.Patch(ctx, &pvc, patch)); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "marking pvc as drained"))
}
if err := client.IgnoreNotFound(r.Client.Delete(ctx, &job, client.PropagationPolicy(v1.DeletePropagationBackground))); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "deleting completed drain job"))
}
continue
} else if hasJob && !jobSuccessfullyCompleted(job) {
if job.Status.Failed > 0 {
errs = errors.Append(errs, errors.NewWithDetails("draining PVC failed", "pvc", pvc.Name, "attempts", job.Status.Failed))
} else {
pvcLog.Info("drainer job for PVC has not yet been completed")
}

if err := client.IgnoreNotFound(r.Client.Delete(ctx, &job, client.PropagationPolicy(v1.DeletePropagationBackground))); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "deleting completed drainer job"))
}
if !drained && !live && !hasJob {
pvcLog.Info("creating drainer job for PVC", "pvc", pvc.Name)
if job, err := r.drainJobFor(pvc); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "assembling drain job"))
} else if err := r.Client.Create(ctx, job); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "creating drain job"))
}
continue
continue
}

if hasJob && !jobSuccessfullyCompleted(job) {
if job.Status.Failed > 0 {
errs = errors.Append(errs, errors.NewWithDetails("draining PVC failed", "pvc", pvc.Name, "attempts", job.Status.Failed))
} else {
pvcLog.Info("drainer job for PVC has not yet been completed")
}
continue
}
if errs != nil {
return nil, errs

if !drained && !inUse && !hasJob {
pvcLog.Info("creating drainer job for PVC")

if job, err := r.drainerJobFor(pvc); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "assembling drainer job"))
} else if err := r.Client.Create(ctx, job); err != nil {
errs = errors.Append(errs, errors.WrapIf(err, "creating drainer job"))
}
continue
}
}

return nil, nil
return errs
}

func RegisterWatches(builder *builder.Builder) *builder.Builder {
Expand Down
Loading

0 comments on commit 097e060

Please sign in to comment.