diff --git a/extensions/kubeconfig/exec.go b/extensions/kubeconfig/exec.go index 6a6838ff..fc0763ce 100644 --- a/extensions/kubeconfig/exec.go +++ b/extensions/kubeconfig/exec.go @@ -2,6 +2,7 @@ package kubeconfig import ( "bytes" + "context" "fmt" "os" "strings" @@ -73,7 +74,7 @@ func KubectlExec(restConfig *restclient.Config, podName, namespace string, comma } logStreamer := &LogStreamer{} - err = exec.Stream(remotecommand.StreamOptions{ + err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{ Stdin: nil, Stdout: logStreamer, Stderr: os.Stderr, diff --git a/extensions/kubeconfig/podlogs.go b/extensions/kubeconfig/podlogs.go index 414787cb..c2af32d9 100644 --- a/extensions/kubeconfig/podlogs.go +++ b/extensions/kubeconfig/podlogs.go @@ -73,6 +73,64 @@ func GetPodLogs(client *rancher.Client, clusterID string, podName string, namesp return logs, nil } +func GetPodLogsWithOpts(client *rancher.Client, clusterID string, podName string, namespace string, bufferSizeStr string, opts *corev1.PodLogOptions) (string, error) { + var restConfig *restclient.Config + + kubeConfig, err := GetKubeconfig(client, clusterID) + if err != nil { + return "", err + } + + restConfig, err = (*kubeConfig).ClientConfig() + if err != nil { + return "", err + } + restConfig.ContentConfig.NegotiatedSerializer = serializer.NewCodecFactory(k8Scheme.Scheme) + restConfig.ContentConfig.GroupVersion = &podGroupVersion + restConfig.APIPath = apiPath + + restClient, err := restclient.RESTClientFor(restConfig) + if err != nil { + return "", err + } + + req := restClient.Get().Resource("pods").Name(podName).Namespace(namespace).SubResource("log") + req.VersionedParams( + opts, + k8Scheme.ParameterCodec, + ) + + stream, err := req.Stream(context.TODO()) + if err != nil { + return "", fmt.Errorf("error streaming pod logs for pod %s/%s: %v", namespace, podName, err) + } + + defer stream.Close() + + reader := bufio.NewScanner(stream) + + if bufferSizeStr != "" { + bufferSize, err := parseBufferSize(bufferSizeStr) + if err != nil { + return "", fmt.Errorf("error in parseBufferSize: %v", err) + } + + buf := make([]byte, bufferSize) + reader.Buffer(buf, bufferSize) + } + + var logs string + for reader.Scan() { + logs = logs + fmt.Sprintf("%s\n", reader.Text()) + fmt.Println(reader.Text()) + } + + if err := reader.Err(); err != nil { + return "", fmt.Errorf("error reading pod logs for pod %s/%s: %v", namespace, podName, err) + } + return logs, nil +} + // parseBufferSize is a helper function that parses a size string and returns // the equivalent size in bytes. The provided size string should end with a // suffix of 'KB', 'MB', or 'GB'. If no suffix is provided, the function will diff --git a/extensions/kubeconfig/pods.go b/extensions/kubeconfig/pods.go new file mode 100644 index 00000000..1eb03e9c --- /dev/null +++ b/extensions/kubeconfig/pods.go @@ -0,0 +1,57 @@ +package kubeconfig + +import ( + "context" + "errors" + + "github.com/rancher/shepherd/clients/rancher" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes" + k8Scheme "k8s.io/client-go/kubernetes/scheme" +) + +func GetPods(client *rancher.Client, clusterID string, namespace string, listOptions *metav1.ListOptions) ([]corev1.Pod, error) { + + kubeConfig, err := GetKubeconfig(client, clusterID) + if err != nil { + return nil, err + } + + restConfig, err := (*kubeConfig).ClientConfig() + if err != nil { + return nil, err + } + restConfig.ContentConfig.NegotiatedSerializer = serializer.NewCodecFactory(k8Scheme.Scheme) + restConfig.ContentConfig.GroupVersion = &podGroupVersion + restConfig.APIPath = apiPath + + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, err + } + + pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), *listOptions) + if err != nil { + return nil, err + } + return pods.Items, nil +} + +func GetPodNames(client *rancher.Client, clusterID string, namespace string, listOptions *metav1.ListOptions) ([]string, error) { + pods, err := GetPods(client, clusterID, namespace, listOptions) + if err != nil { + return nil, err + } + + var names []string + for _, pod := range pods { + names = append(names, pod.Name) + } + if len(names) == 0 { + return nil, errors.New("GetPodNames: no pod names found") + } + + return names, nil +} diff --git a/extensions/provisioning/verify.go b/extensions/provisioning/verify.go index 066b3e38..944f44f6 100644 --- a/extensions/provisioning/verify.go +++ b/extensions/provisioning/verify.go @@ -17,7 +17,6 @@ import ( "github.com/rancher/shepherd/extensions/clusters" "github.com/rancher/shepherd/extensions/clusters/bundledclusters" "github.com/rancher/shepherd/extensions/defaults" - "github.com/rancher/shepherd/extensions/etcdsnapshot" kubeapinodes "github.com/rancher/shepherd/extensions/kubeapi/nodes" "github.com/rancher/shepherd/extensions/kubeconfig" nodestat "github.com/rancher/shepherd/extensions/nodes" @@ -45,7 +44,6 @@ const ( hostnameLimit = 63 machineNameAnnotation = "cluster.x-k8s.io/machine" machineSteveResourceType = "cluster.x-k8s.io.machine" - onDemandPrefix = "on-demand-" ) // VerifyRKE1Cluster validates that the RKE1 cluster and its resources are in a good state, matching a given config. @@ -96,10 +94,67 @@ func VerifyRKE1Cluster(t *testing.T, client *rancher.Client, clustersConfig *clu } } - if !strings.Contains(clustersConfig.CloudProvider, "external") { - podErrors := pods.StatusPods(client, cluster.ID) - assert.Empty(t, podErrors) + podErrors := pods.StatusPods(client, cluster.ID) + assert.Empty(t, podErrors) +} + +// VerifyRKE1ClusterWithTimeout validates that the RKE1 cluster and its resources are in a good state within the custom timeout, matching a given config. +func VerifyRKE1ClusterWithTimeout(t *testing.T, client *rancher.Client, clustersConfig *clusters.ClusterConfig, cluster *management.Cluster, timeout *int64) { + client, err := client.ReLogin() + require.NoError(t, err) + + adminClient, err := rancher.NewClient(client.RancherConfig.AdminToken, client.Session) + require.NoError(t, err) + + var timeoutSeconds *int64 + if timeout != nil { + timeoutSeconds = timeout + } else { + timeoutSeconds = &defaults.WatchTimeoutSeconds } + + watchInterface, err := adminClient.GetManagementWatchInterface(management.ClusterType, metav1.ListOptions{ + FieldSelector: "metadata.name=" + cluster.ID, + TimeoutSeconds: timeoutSeconds, + }) + require.NoError(t, err) + + checkFunc := clusters.IsHostedProvisioningClusterReady + err = wait.WatchWait(watchInterface, checkFunc) + require.NoError(t, err) + + assert.Equal(t, clustersConfig.KubernetesVersion, cluster.RancherKubernetesEngineConfig.Version) + + clusterToken, err := clusters.CheckServiceAccountTokenSecret(client, cluster.Name) + require.NoError(t, err) + assert.NotEmpty(t, clusterToken) + + err = nodestat.AllManagementNodeReady(client, cluster.ID, defaults.ThirtyMinuteTimeout) + require.NoError(t, err) + + if clustersConfig.PSACT == string(provisioninginput.RancherPrivileged) || clustersConfig.PSACT == string(provisioninginput.RancherRestricted) || clustersConfig.PSACT == string(provisioninginput.RancherBaseline) { + require.NotEmpty(t, cluster.DefaultPodSecurityAdmissionConfigurationTemplateName) + + err := psadeploy.CreateNginxDeployment(client, cluster.ID, clustersConfig.PSACT) + require.NoError(t, err) + } + if clustersConfig.Registries != nil { + if clustersConfig.Registries.RKE1Registries != nil { + for _, registry := range clustersConfig.Registries.RKE1Registries { + havePrefix, err := registries.CheckAllClusterPodsForRegistryPrefix(client, cluster.ID, registry.URL) + require.NoError(t, err) + assert.True(t, havePrefix) + } + } + } + if clustersConfig.Networking != nil { + if clustersConfig.Networking.LocalClusterAuthEndpoint != nil { + VerifyACE(t, adminClient, cluster) + } + } + + podErrors := pods.StatusPods(client, cluster.ID) + assert.Empty(t, podErrors) } // VerifyCluster validates that a non-rke1 cluster and its resources are in a good state, matching a given config. @@ -173,6 +228,84 @@ func VerifyCluster(t *testing.T, client *rancher.Client, clustersConfig *cluster } } +// VerifyClusterWithTimeout validates that a non-rke1 cluster and its resources are in a good state within the custom timeout, matching a given config. +func VerifyClusterWithTimeout(t *testing.T, client *rancher.Client, clustersConfig *clusters.ClusterConfig, cluster *steveV1.SteveAPIObject, timeout *int64) { + client, err := client.ReLogin() + require.NoError(t, err) + + adminClient, err := rancher.NewClient(client.RancherConfig.AdminToken, client.Session) + require.NoError(t, err) + + kubeProvisioningClient, err := adminClient.GetKubeAPIProvisioningClient() + require.NoError(t, err) + + var timeoutSeconds *int64 + if timeout != nil { + timeoutSeconds = timeout + } else { + timeoutSeconds = &defaults.WatchTimeoutSeconds + } + + watchInterface, err := kubeProvisioningClient.Clusters(namespace).Watch(context.TODO(), metav1.ListOptions{ + FieldSelector: "metadata.name=" + cluster.Name, + TimeoutSeconds: timeoutSeconds, + }) + require.NoError(t, err) + + checkFunc := clusters.IsProvisioningClusterReady + err = wait.WatchWait(watchInterface, checkFunc) + require.NoError(t, err) + + clusterToken, err := clusters.CheckServiceAccountTokenSecret(client, cluster.Name) + require.NoError(t, err) + assert.NotEmpty(t, clusterToken) + + err = nodestat.AllMachineReady(client, cluster.ID, defaults.ThirtyMinuteTimeout) + require.NoError(t, err) + + status := &provv1.ClusterStatus{} + err = steveV1.ConvertToK8sType(cluster.Status, status) + require.NoError(t, err) + + clusterSpec := &provv1.ClusterSpec{} + err = steveV1.ConvertToK8sType(cluster.Spec, clusterSpec) + require.NoError(t, err) + + configKubeVersion := clusterSpec.KubernetesVersion + require.Equal(t, configKubeVersion, clusterSpec.KubernetesVersion) + + if clusterSpec.DefaultPodSecurityAdmissionConfigurationTemplateName == string(provisioninginput.RancherPrivileged) || + clusterSpec.DefaultPodSecurityAdmissionConfigurationTemplateName == string(provisioninginput.RancherRestricted) || + clusterSpec.DefaultPodSecurityAdmissionConfigurationTemplateName == string(provisioninginput.RancherBaseline) { + + require.NotEmpty(t, clusterSpec.DefaultPodSecurityAdmissionConfigurationTemplateName) + + err := psadeploy.CreateNginxDeployment(client, status.ClusterName, clusterSpec.DefaultPodSecurityAdmissionConfigurationTemplateName) + require.NoError(t, err) + } + + if clusterSpec.RKEConfig.Registries != nil { + for registryName := range clusterSpec.RKEConfig.Registries.Configs { + havePrefix, err := registries.CheckAllClusterPodsForRegistryPrefix(client, status.ClusterName, registryName) + require.NoError(t, err) + assert.True(t, havePrefix) + } + } + + if clusterSpec.LocalClusterAuthEndpoint.Enabled { + mgmtClusterObject, err := adminClient.Management.Cluster.ByID(status.ClusterName) + require.NoError(t, err) + VerifyACE(t, adminClient, mgmtClusterObject) + } + + podErrors := pods.StatusPods(client, status.ClusterName) + assert.Empty(t, podErrors) + + if clustersConfig.ClusterSSHTests != nil { + VerifySSHTests(t, client, cluster, clustersConfig.ClusterSSHTests, status.ClusterName) + } +} + // VerifyHostedCluster validates that the hosted cluster and its resources are in a good state, matching a given config. func VerifyHostedCluster(t *testing.T, client *rancher.Client, cluster *management.Cluster) { client, err := client.ReLogin() @@ -203,6 +336,43 @@ func VerifyHostedCluster(t *testing.T, client *rancher.Client, cluster *manageme assert.Empty(t, podErrors) } +// VerifyHostedClusterWithTimeout validates that the hosted cluster and its resources are in a good state within the custom timeout, matching a given config. +func VerifyHostedClusterWithTimeout(t *testing.T, client *rancher.Client, cluster *management.Cluster, timeout *int64) { + client, err := client.ReLogin() + require.NoError(t, err) + + adminClient, err := rancher.NewClient(client.RancherConfig.AdminToken, client.Session) + require.NoError(t, err) + + var timeoutSeconds *int64 + if timeout != nil { + timeoutSeconds = timeout + } else { + timeoutSeconds = &defaults.WatchTimeoutSeconds + } + + watchInterface, err := adminClient.GetManagementWatchInterface(management.ClusterType, metav1.ListOptions{ + FieldSelector: "metadata.name=" + cluster.ID, + TimeoutSeconds: timeoutSeconds, + }) + require.NoError(t, err) + + checkFunc := clusters.IsHostedProvisioningClusterReady + + err = wait.WatchWait(watchInterface, checkFunc) + require.NoError(t, err) + + clusterToken, err := clusters.CheckServiceAccountTokenSecret(client, cluster.Name) + require.NoError(t, err) + assert.NotEmpty(t, clusterToken) + + err = nodestat.AllManagementNodeReady(client, cluster.ID, defaults.ThirtyMinuteTimeout) + require.NoError(t, err) + + podErrors := pods.StatusPods(client, cluster.ID) + assert.Empty(t, podErrors) +} + // VerifyDeleteRKE1Cluster validates that a rke1 cluster and its resources are deleted. func VerifyDeleteRKE1Cluster(t *testing.T, client *rancher.Client, clusterID string) { cluster, err := client.Management.Cluster.ByID(clusterID) @@ -232,6 +402,42 @@ func VerifyDeleteRKE1Cluster(t *testing.T, client *rancher.Client, clusterID str require.NoError(t, err) } +// VerifyDeleteRKE1Cluster validates that a rke1 cluster and its resources are deleted. +func VerifyDeleteRKE1ClusterWithTimeout(t *testing.T, client *rancher.Client, clusterID string, timeout *int64) { + cluster, err := client.Management.Cluster.ByID(clusterID) + require.NoError(t, err) + + adminClient, err := rancher.NewClient(client.RancherConfig.AdminToken, client.Session) + require.NoError(t, err) + + var timeoutSeconds *int64 + if timeout != nil { + timeoutSeconds = timeout + } else { + timeoutSeconds = &defaults.WatchTimeoutSeconds + } + + watchInterface, err := adminClient.GetManagementWatchInterface(management.ClusterType, metav1.ListOptions{ + FieldSelector: "metadata.name=" + clusterID, + TimeoutSeconds: timeoutSeconds, + }) + require.NoError(t, err) + + err = wait.WatchWait(watchInterface, func(event watch.Event) (ready bool, err error) { + if event.Type == watch.Error { + return false, fmt.Errorf("error: unable to delete cluster %s", cluster.Name) + } else if event.Type == watch.Deleted { + logrus.Infof("Cluster %s deleted!", cluster.Name) + return true, nil + } + return false, nil + }) + require.NoError(t, err) + + err = nodestat.AllNodeDeleted(client, clusterID) + require.NoError(t, err) +} + // VerifyDeleteRKE2K3SCluster validates that a non-rke1 cluster and its resources are deleted. func VerifyDeleteRKE2K3SCluster(t *testing.T, client *rancher.Client, clusterID string) { cluster, err := client.Steve.SteveType("provisioning.cattle.io.cluster").ByID(clusterID) @@ -268,6 +474,49 @@ func VerifyDeleteRKE2K3SCluster(t *testing.T, client *rancher.Client, clusterID require.NoError(t, err) } +// VerifyDeleteRKE2K3SClusterWithTimeout validates that a non-rke1 cluster and its resources are deleted. +func VerifyDeleteRKE2K3SClusterWithTimeout(t *testing.T, client *rancher.Client, clusterID string, timeout *int64) { + cluster, err := client.Steve.SteveType("provisioning.cattle.io.cluster").ByID(clusterID) + require.NoError(t, err) + + adminClient, err := rancher.NewClient(client.RancherConfig.AdminToken, client.Session) + require.NoError(t, err) + + provKubeClient, err := adminClient.GetKubeAPIProvisioningClient() + require.NoError(t, err) + + var timeoutSeconds *int64 + if timeout != nil { + timeoutSeconds = timeout + } else { + timeoutSeconds = &defaults.WatchTimeoutSeconds + } + + watchInterface, err := provKubeClient.Clusters(namespace).Watch(context.TODO(), metav1.ListOptions{ + FieldSelector: "metadata.name=" + cluster.Name, + TimeoutSeconds: timeoutSeconds, + }) + require.NoError(t, err) + + err = wait.WatchWait(watchInterface, func(event watch.Event) (ready bool, err error) { + cluster := event.Object.(*provv1.Cluster) + if event.Type == watch.Error { + return false, fmt.Errorf("error: unable to delete cluster %s", cluster.ObjectMeta.Name) + } else if event.Type == watch.Deleted { + logrus.Infof("Cluster %s deleted!", cluster.ObjectMeta.Name) + return true, nil + } else if cluster == nil { + logrus.Infof("Cluster %s deleted!", cluster.ObjectMeta.Name) + return true, nil + } + return false, nil + }) + require.NoError(t, err) + + err = nodestat.AllNodeDeleted(client, clusterID) + require.NoError(t, err) +} + // CertRotationCompleteCheckFunc returns a watch check function that checks if the certificate rotation is complete func CertRotationCompleteCheckFunc(generation int64) wait.WatchCheckFunc { return func(event watch.Event) (bool, error) { @@ -395,50 +644,27 @@ func VerifyUpgrade(t *testing.T, updatedCluster *bundledclusters.BundledCluster, } // VerifySnapshots waits for a cluster's snapshots to be ready and validates that the correct number of snapshots have been taken -func VerifySnapshots(client *rancher.Client, localclusterID string, clusterName string, expectedSnapshotLength int, isRKE1 bool) (string, error) { +func VerifySnapshots(client *rancher.Client, localclusterID string, clusterName string, expectedSnapshotLength int) (string, error) { client, err := client.ReLogin() if err != nil { return "", err } var snapshotToBeRestored string - var snapshotList []string - s3Prefix := onDemandPrefix + clusterName - - err = kwait.Poll(5*time.Second, defaults.FiveMinuteTimeout, func() (done bool, err error) { - if isRKE1 { - snapshotList, err = etcdsnapshot.GetRKE1Snapshots(client, clusterName) - if err != nil { - return false, err - } - } else { - snapshotList, err = etcdsnapshot.GetRKE2K3SSnapshots(client, localclusterID, clusterName) - if err != nil { - return false, err - } - } + err = kwait.Poll(5*time.Second, 5*time.Minute, func() (done bool, err error) { + snapshotList, err := GetSnapshots(client, localclusterID, clusterName) + if err != nil { + return false, err + } if len(snapshotList) == 0 { return false, fmt.Errorf("no snapshots found") } - // Indexed from 0 for S3 checks to ensure that the local backup location does not have the s3Prefix. - // Needed to ensure that the correct S3 snapshot is restored. - if strings.Contains(snapshotList[0], s3Prefix) { - snapshotToBeRestored = snapshotList[len(snapshotList)-1] - return true, nil - } - if len(snapshotList) == expectedSnapshotLength { snapshotToBeRestored = snapshotList[0] return true, nil } - - if len(snapshotList) > expectedSnapshotLength && isRKE1 { - snapshotToBeRestored = snapshotList[0] - return true, nil - } - - if len(snapshotList) > expectedSnapshotLength && !isRKE1 { + if len(snapshotList) > expectedSnapshotLength { return false, fmt.Errorf("more snapshots than expected") } @@ -447,6 +673,26 @@ func VerifySnapshots(client *rancher.Client, localclusterID string, clusterName return snapshotToBeRestored, err } +// getSnapshots is a helper function to get the snapshots for a cluster +func GetSnapshots(client *rancher.Client, localclusterID string, clusterName string) ([]string, error) { + steveclient, err := client.Steve.ProxyDownstream(localclusterID) + if err != nil { + return nil, err + } + snapshotSteveObjList, err := steveclient.SteveType("rke.cattle.io.etcdsnapshot").List(nil) + if err != nil { + return nil, err + } + snapshots := []string{} + for _, snapshot := range snapshotSteveObjList.Data { + if strings.Contains(snapshot.ObjectMeta.Name, clusterName) { + snapshots = append(snapshots, snapshot.Name) + } + } + return snapshots, nil + +} + // VerifySSHTests validates the ssh tests listed in the config on each node of the cluster func VerifySSHTests(t *testing.T, client *rancher.Client, clusterObject *steveV1.SteveAPIObject, sshTests []provisioninginput.SSHTestCase, clusterID string) { client, err := client.ReLogin()