diff --git a/extensions/kubeconfig/exec.go b/extensions/kubeconfig/exec.go index 6a6838ff..fc0763ce 100644 --- a/extensions/kubeconfig/exec.go +++ b/extensions/kubeconfig/exec.go @@ -2,6 +2,7 @@ package kubeconfig import ( "bytes" + "context" "fmt" "os" "strings" @@ -73,7 +74,7 @@ func KubectlExec(restConfig *restclient.Config, podName, namespace string, comma } logStreamer := &LogStreamer{} - err = exec.Stream(remotecommand.StreamOptions{ + err = exec.StreamWithContext(context.TODO(), remotecommand.StreamOptions{ Stdin: nil, Stdout: logStreamer, Stderr: os.Stderr, diff --git a/extensions/kubeconfig/podlogs.go b/extensions/kubeconfig/podlogs.go index 414787cb..a968788c 100644 --- a/extensions/kubeconfig/podlogs.go +++ b/extensions/kubeconfig/podlogs.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/rancher/shepherd/clients/rancher" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime/serializer" k8Scheme "k8s.io/client-go/kubernetes/scheme" @@ -73,6 +74,66 @@ func GetPodLogs(client *rancher.Client, clusterID string, podName string, namesp return logs, nil } +// GetPodLogsWithOpts fetches logs from a Kubernetes pod, with an optional PodLogOptions object that can be used to finetune which Logs get pulled. +// Buffer size (e.g., '64KB', '8MB', '1GB') influences log reading; an empty string causes no buffering. +func GetPodLogsWithOpts(client *rancher.Client, clusterID string, podName string, namespace string, bufferSizeStr string, opts *corev1.PodLogOptions) (string, error) { + var restConfig *restclient.Config + + kubeConfig, err := GetKubeconfig(client, clusterID) + if err != nil { + return "", err + } + + restConfig, err = (*kubeConfig).ClientConfig() + if err != nil { + return "", err + } + restConfig.ContentConfig.NegotiatedSerializer = serializer.NewCodecFactory(k8Scheme.Scheme) + restConfig.ContentConfig.GroupVersion = &podGroupVersion + restConfig.APIPath = apiPath + + restClient, err := restclient.RESTClientFor(restConfig) + if err != nil { + return "", err + } + + req := restClient.Get().Resource("pods").Name(podName).Namespace(namespace).SubResource("log") + req.VersionedParams( + opts, + k8Scheme.ParameterCodec, + ) + + stream, err := req.Stream(context.TODO()) + if err != nil { + return "", fmt.Errorf("error streaming pod logs for pod %s/%s: %v", namespace, podName, err) + } + + defer stream.Close() + + reader := bufio.NewScanner(stream) + + if bufferSizeStr != "" { + bufferSize, err := parseBufferSize(bufferSizeStr) + if err != nil { + return "", fmt.Errorf("error in parseBufferSize: %v", err) + } + + buf := make([]byte, bufferSize) + reader.Buffer(buf, bufferSize) + } + + var logs string + for reader.Scan() { + logs = logs + fmt.Sprintf("%s\n", reader.Text()) + logrus.Info(reader.Text()) + } + + if err := reader.Err(); err != nil { + return "", fmt.Errorf("error reading pod logs for pod %s/%s: %v", namespace, podName, err) + } + return logs, nil +} + // parseBufferSize is a helper function that parses a size string and returns // the equivalent size in bytes. The provided size string should end with a // suffix of 'KB', 'MB', or 'GB'. If no suffix is provided, the function will diff --git a/extensions/kubeconfig/pods.go b/extensions/kubeconfig/pods.go new file mode 100644 index 00000000..9fa836f4 --- /dev/null +++ b/extensions/kubeconfig/pods.go @@ -0,0 +1,59 @@ +package kubeconfig + +import ( + "context" + "errors" + + "github.com/rancher/shepherd/clients/rancher" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/client-go/kubernetes" + k8Scheme "k8s.io/client-go/kubernetes/scheme" +) + +// GetPods utilizes the upstream K8s corev1 client (from the rancher.Client) to get the given cluster's Pods based on any List options passed +func GetPods(client *rancher.Client, clusterID string, namespace string, listOptions *metav1.ListOptions) ([]corev1.Pod, error) { + + kubeConfig, err := GetKubeconfig(client, clusterID) + if err != nil { + return nil, err + } + + restConfig, err := (*kubeConfig).ClientConfig() + if err != nil { + return nil, err + } + restConfig.ContentConfig.NegotiatedSerializer = serializer.NewCodecFactory(k8Scheme.Scheme) + restConfig.ContentConfig.GroupVersion = &podGroupVersion + restConfig.APIPath = apiPath + + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, err + } + + pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), *listOptions) + if err != nil { + return nil, err + } + return pods.Items, nil +} + +// GetPodNames calls GetPods and filters the list of Pods and extracts their names into a []string +func GetPodNames(client *rancher.Client, clusterID string, namespace string, listOptions *metav1.ListOptions) ([]string, error) { + pods, err := GetPods(client, clusterID, namespace, listOptions) + if err != nil { + return nil, err + } + + var names []string + for _, pod := range pods { + names = append(names, pod.Name) + } + if len(names) == 0 { + return nil, errors.New("GetPodNames: no pod names found") + } + + return names, nil +}