Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE]debug e2e #1455

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions agent/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ clean-all: clean-vendor clean
.PHONY: lint ##runs code analysis tools
lint:
go vet ./cmd/... ./pkg/...
golint ./cmd/... ./pkg/...
golangci-lint run ./cmd/... ./pkg/...

.PHONY: help ##show this help message
help:
Expand Down
16 changes: 16 additions & 0 deletions agent/pkg/spec/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,42 +59,58 @@ func (d *genericDispatcher) Start(ctx context.Context) error {
}

func (d *genericDispatcher) dispatch(ctx context.Context) {
d.log.Debugf("in dispatch")
for {
select {
case <-ctx.Done():
return
case evt := <-d.consumer.EventChan():
d.log.Debugf("in event chan")
// if destination is explicitly specified and does not match, drop bundle
clusterNameVal, err := evt.Context.GetExtension(constants.CloudEventExtensionKeyClusterName)
if err != nil {
d.log.Infow("event dropped due to cluster name retrieval error", "error", err)
continue
}
d.log.Debugf("in event chan")

clusterName, ok := clusterNameVal.(string)
if !ok {
d.log.Infow("event dropped due to invalid cluster name", "clusterName", clusterNameVal)
continue
}
d.log.Debugf("in event chan:%v", clusterName)

if clusterName != transport.Broadcast && clusterName != d.agentConfig.LeafHubName {
// d.log.Infow("event dropped due to cluster name mismatch", "clusterName", clusterName)
continue
}
d.log.Debugf("in event chan")

syncer, found := d.syncers[evt.Type()]
if !found {
d.log.Debugw("dispatching to the default generic syncer", "eventType", evt.Type())
syncer = d.syncers[constants.GenericSpecMsgKey]
}
d.log.Debugf("in event chan")

if syncer == nil || evt == nil {
d.log.Warnw("nil syncer or event: incompatible event will be resolved after upgrade.",
"syncer", syncer, "event", evt)
continue
}
d.log.Debugf("in event chan")

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
d.log.Debugf("in event chan")

if err := syncer.Sync(ctx, evt.Data()); err != nil {
d.log.Debugf("in event chan:%v", err)
return err
}
return nil
}); err != nil {
d.log.Debugf("in event chan:%v", err)
d.log.Errorw("sync failed", "type", evt.Type(), "error", err)
}
}
Expand Down
7 changes: 5 additions & 2 deletions agent/pkg/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ func AddToManager(context context.Context, mgr ctrl.Manager, transportClient tra
agentConfig *configs.AgentConfig,
) error {
log := logger.DefaultZapLogger()

if transportClient.GetConsumer() == nil {
log.Info("the consumer is not initialized for the spec controllers")
return nil
return fmt.Errorf("the consumer is not initialized")
}
if transportClient.GetProducer() == nil {
log.Info("the producer is not initialized for the spec controllers")
return nil
return fmt.Errorf("the producer is not initialized")
}

// add worker pool to manager
Expand All @@ -39,10 +40,12 @@ func AddToManager(context context.Context, mgr ctrl.Manager, transportClient tra
return fmt.Errorf("failed to add bundle dispatcher to runtime manager: %w", err)
}

log.Infof("agentConfig.EnableGlobalResource:%v", agentConfig.EnableGlobalResource)
// register syncer to the dispatcher
if agentConfig.EnableGlobalResource {
dispatcher.RegisterSyncer(constants.GenericSpecMsgKey,
syncers.NewGenericSyncer(workers, agentConfig))
log.Debugf("regist ManagedClustersLabelsMsgKey")
dispatcher.RegisterSyncer(constants.ManagedClustersLabelsMsgKey,
syncers.NewManagedClusterLabelSyncer(workers))
}
Expand Down
11 changes: 10 additions & 1 deletion agent/pkg/spec/syncers/clusterlabel_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/stolostron/multicluster-global-hub/pkg/utils"
)

var log = logger.DefaultZapLogger()

const (
// periodicApplyInterval = 5 * time.Second
hohFieldManager = "mgh-agent"
Expand Down Expand Up @@ -51,14 +53,17 @@ func (syncer *managedClusterLabelsBundleSyncer) Sync(ctx context.Context, payloa
if err := json.Unmarshal(payload, bundle); err != nil {
return err
}
syncer.log.Debugf("start sync bundle: %v", bundle)
syncer.setLatestBundle(bundle) // uses latestBundle
syncer.log.Debugf("handle bundle: %v", bundle)
syncer.handleBundle()

return nil
}

func (syncer *managedClusterLabelsBundleSyncer) setLatestBundle(newBundle *specbundle.ManagedClusterLabelsSpecBundle) {
syncer.latestBundleLock.Lock()
syncer.log.Debugf("lock sync bundle")
defer syncer.latestBundleLock.Unlock()

syncer.latestBundle = newBundle
Expand All @@ -67,8 +72,10 @@ func (syncer *managedClusterLabelsBundleSyncer) setLatestBundle(newBundle *specb
func (syncer *managedClusterLabelsBundleSyncer) handleBundle() {
syncer.latestBundleLock.Lock()
defer syncer.latestBundleLock.Unlock()

log.Debugf("handle managedClusterLabelsBundleSyncer bundle")
for _, managedClusterLabelsSpec := range syncer.latestBundle.Objects {
log.Debugf("managedClusterLabelsSpec:%v", *managedClusterLabelsSpec)

lastProcessedTimestampPtr := syncer.getManagedClusterLastProcessedTimestamp(managedClusterLabelsSpec.ClusterName)
if managedClusterLabelsSpec.UpdateTimestamp.After(*lastProcessedTimestampPtr) { // handle (success) once
syncer.bundleProcessingWaitingGroup.Add(1)
Expand Down Expand Up @@ -115,11 +122,13 @@ func (s *managedClusterLabelsBundleSyncer) updateManagedClusterAsync(
for key, value := range labelsSpec.Labels {
managedCluster.Labels[key] = value
}
log.Debugf("managedCluster.Labels: %v", managedCluster.Labels)

// delete labels by key
for _, labelKey := range labelsSpec.DeletedLabelKeys {
delete(managedCluster.Labels, labelKey)
}
log.Debugf("managedCluster.Labels: %v", managedCluster.Labels)

if err := s.updateManagedFieldEntry(managedCluster, labelsSpec); err != nil {
s.log.Error(err, "failed to update managed cluster", "name", labelsSpec.ClusterName)
Expand Down
12 changes: 11 additions & 1 deletion agent/pkg/status/generic/generic_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ import (

"github.com/stolostron/multicluster-global-hub/agent/pkg/status/interfaces"
genericpayload "github.com/stolostron/multicluster-global-hub/pkg/bundle/generic"
"github.com/stolostron/multicluster-global-hub/pkg/logger"
)

var log = logger.ZapLogger("generic-handler")

type genericHandler struct {
eventData *genericpayload.GenericObjectBundle
// isSpec is to let the handler only update the event when spec is changed.
Expand All @@ -32,23 +35,28 @@ func NewGenericHandler(eventData *genericpayload.GenericObjectBundle, opts ...Ha
}

func (h *genericHandler) Get() interface{} {
log.Debugf("get obj:%v", *h.eventData)
return h.eventData
}

func (h *genericHandler) Update(obj client.Object) bool {
log.Debugf("update obj:%v", obj)

if h.shouldUpdate != nil {
if updated := h.shouldUpdate(obj); !updated {
log.Debugf("h.shouldUpdate false")
return false
}
}

index := getObjectIndexByUID(obj.GetUID(), (*h.eventData))
if index == -1 { // object not found, need to add it to the bundle
(*h.eventData) = append((*h.eventData), obj)
return true
}

old := (*h.eventData)[index]
log.Debugf("obj: %v", old)

if h.isSpec && old.GetGeneration() == obj.GetGeneration() {
return false
}
Expand Down Expand Up @@ -121,6 +129,8 @@ func WithSpec(onlySpec bool) HandlerOption {
}

func WithShouldUpdate(shouldUpdate func(client.Object) bool) HandlerOption {
log.Debugf("g.shouldUpdate:%v, shouldUpdate:%v", shouldUpdate, shouldUpdate)

return func(g *genericHandler) {
g.shouldUpdate = shouldUpdate
}
Expand Down
59 changes: 59 additions & 0 deletions install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash


# This script is for installing OLM from a GitHub release

set -e

default_base_url=https://github.com/operator-framework/operator-lifecycle-manager/releases/download

if [[ ${#@} -lt 1 || ${#@} -gt 2 ]]; then
echo "Usage: $0 version [base_url]"
echo "* version: the github release version"
echo "* base_url: the github base URL (Default: $default_base_url)"
exit 1
fi

if kubectl get deployment olm-operator -n openshift-operator-lifecycle-manager > /dev/null 2>&1; then
echo "OLM is already installed in a different configuration. This is common if you are not running a vanilla Kubernetes cluster. Exiting..."
exit 1
fi

release="$1"
base_url="${2:-${default_base_url}}"
url="${base_url}/${release}"
namespace=olm

if kubectl get deployment olm-operator -n ${namespace} > /dev/null 2>&1; then
echo "OLM is already installed in ${namespace} namespace. Exiting..."
exit 1
fi

kubectl create -f "${url}/crds.yaml"
kubectl wait --for=condition=Established -f "${url}/crds.yaml"
kubectl create -f "${url}/olm.yaml"

# wait for deployments to be ready
kubectl rollout status -w deployment/olm-operator --namespace="${namespace}"
kubectl rollout status -w deployment/catalog-operator --namespace="${namespace}"

retries=30
until [[ $retries == 0 ]]; do
new_csv_phase=$(kubectl get csv -n "${namespace}" packageserver -o jsonpath='{.status.phase}' 2>/dev/null || echo "Waiting for CSV to appear")
if [[ $new_csv_phase != "$csv_phase" ]]; then
csv_phase=$new_csv_phase
echo "Package server phase: $csv_phase"
fi
if [[ "$new_csv_phase" == "Succeeded" ]]; then
break
fi
sleep 10
retries=$((retries - 1))
done

if [ $retries == 0 ]; then
echo "CSV \"packageserver\" failed to reach phase succeeded"
exit 1
fi

kubectl rollout status -w deployment/packageserver --namespace="${namespace}"
Loading
Loading