Skip to content

Commit

Permalink
feat: adds per-pod/container colorization. Some cleanup of output and…
Browse files Browse the repository at this point in the history
… error handling.
  • Loading branch information
atombender committed Aug 16, 2022
1 parent 7379be7 commit 838ad90
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 41 deletions.
29 changes: 29 additions & 0 deletions colorization.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"hash/fnv"

"github.com/fatih/color"
)

type colorConfig struct {
labels *color.Color
metadata *color.Color
}

var colorConfigs = []colorConfig{
{color.New(color.FgHiBlue), color.New(color.FgBlue)},
{color.New(color.FgHiCyan), color.New(color.FgCyan)},
{color.New(color.FgHiGreen), color.New(color.FgGreen)},
{color.New(color.FgHiMagenta), color.New(color.FgMagenta)},
{color.New(color.FgHiRed), color.New(color.FgRed)},
{color.New(color.FgHiYellow), color.New(color.FgYellow)},
}

func getColorConfig(parts ...string) colorConfig {
hash := fnv.New32()
for _, a := range parts {
_, _ = hash.Write([]byte(a))
}
return colorConfigs[hash.Sum32()%uint32(len(colorConfigs))]
}
9 changes: 7 additions & 2 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewController(
}
}

func (ctl *Controller) Run() {
func (ctl *Controller) Run(ctx context.Context) error {
podListWatcher := cache.NewListWatchFromClient(
ctl.client.CoreV1().RESTClient(), "pods", ctl.Namespace, fields.Everything())

Expand Down Expand Up @@ -106,7 +106,12 @@ func (ctl *Controller) Run() {

stopCh := make(chan struct{}, 1)
go informer.Run(stopCh)
<-stopCh
select {
case <-stopCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (ctl *Controller) onInitialAdd(pod *v1.Pod) {
Expand Down
147 changes: 108 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package main

import (
"bytes"
"context"
"errors"
"fmt"
"os"
"regexp"
"sync"
"text/template"
"time"

"github.com/fatih/color"
"github.com/spf13/pflag"
Expand All @@ -32,6 +36,7 @@ func main() {
showVersion bool
includePatterns []*regexp.Regexp
excludePatternStrings []string
noColor bool
)

flags := pflag.NewFlagSet("ktail", pflag.ExitOnError)
Expand All @@ -57,12 +62,15 @@ func main() {
flags.BoolVarP(&sinceStart, "since-start", "s", false,
"Start reading log from the beginning of the container's lifetime.")
flags.BoolVarP(&showVersion, "version", "", false, "Show version.")
flags.BoolVarP(&noColor, "no-color", "", false, "Disable color.")

if err := flags.Parse(os.Args[1:]); err != nil {
fail(err.Error())
os.Exit(1)
}

color.NoColor = noColor

if showVersion {
fmt.Printf("ktail %s\n", version)
os.Exit(0)
Expand All @@ -85,21 +93,6 @@ func main() {
includePatterns = append(includePatterns, r)
}

if tmplString == "" {
if raw {
tmplString = `{{.Message}}`
} else {
tmplString = "{{.Pod.Name}}:{{.Container.Name}} {{.Message}}"
if allNamespaces {
tmplString = "{{.Pod.Namespace}}/" + tmplString
}
}
if timestamps {
tmplString = "{{.Timestamp}} " + tmplString
}
}
tmplString += "\n"

if kubeconfigPath == "" {
if os.Getenv("KUBECONFIG") != "" {
kubeconfigPath = os.Getenv("KUBECONFIG")
Expand All @@ -120,11 +113,6 @@ func main() {
inclusionMatcher := buildMatcher(includePatterns, labelSelector, true)
exclusionMatcher := buildMatcher(excludePatterns, nil, false)

tmpl, err := template.New("line").Parse(tmplString)
if err != nil {
fail("Invalid template: %s", err)
}

clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{
ExplicitPath: kubeconfigPath,
Expand Down Expand Up @@ -158,8 +146,14 @@ func main() {
}
}

yellow := color.New(color.FgYellow)
red := color.New(color.FgRed)
var tmpl *template.Template
if tmplString != "" {
var err error
tmpl, err = template.New("line").Parse(tmplString)
if err != nil {
fail("invalid template: %s", err)
}
}

formatPod := func(pod *v1.Pod) string {
if allNamespaces {
Expand All @@ -172,6 +166,59 @@ func main() {
return fmt.Sprintf("%s:%s", formatPod(pod), container.Name)
}

var printEvent func(*LogEvent) error

if tmpl != nil {
printEvent = func(event *LogEvent) error {
type templateEvent struct {
Pod *v1.Pod
Container *v1.Container
Timestamp string
Message string
}

var buf bytes.Buffer
if err := tmpl.Execute(&buf, &templateEvent{
Pod: event.Pod,
Container: event.Container,
Message: event.Message,
Timestamp: formatTimestamp(event.Timestamp),
}); err != nil {
return err
}

_, err := fmt.Fprintln(os.Stdout, string(buf.Bytes()))
return err
}
} else {
printEvent = func(event *LogEvent) error {
col := getColorConfig(event.Pod.Name, event.Container.Name)

var line string
if !raw {
if timestamps {
line = col.metadata.Sprint(formatTimestamp(event.Timestamp))
}
line += " "
if allNamespaces {
line += col.labels.Sprint(fmt.Sprintf("%s/%s:%s",
event.Pod.Namespace, event.Pod.Name, event.Container.Name))
} else {
line += col.labels.Sprint(fmt.Sprintf("%s:%s", event.Pod.Name, event.Container.Name))
}
line += " "
}

line += event.Message

_, err := fmt.Fprintln(os.Stdout, line)
return err
}
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var stdoutMutex sync.Mutex
controller := NewController(clientset, ControllerOptions{
Namespace: namespace,
Expand All @@ -183,19 +230,17 @@ func main() {
OnEvent: func(event LogEvent) {
stdoutMutex.Lock()
defer stdoutMutex.Unlock()
_ = tmpl.Execute(os.Stdout, event)
if err := printEvent(&event); err != nil {
printError(fmt.Sprintf("Could not write event: %s", err))
cancel()
}
},
OnEnter: func(
pod *v1.Pod,
container *v1.Container,
initialAddPhase bool) bool {
OnEnter: func(pod *v1.Pod, container *v1.Container, initialAddPhase bool) bool {
if !quiet {
if initialAddPhase {
_, _ = yellow.Fprintf(os.Stderr,
"==> Detected running container [%s]\n", formatPodAndContainer(pod, container))
printInfo("Attached to container [%s]", formatPodAndContainer(pod, container))
} else {
_, _ = yellow.Fprintf(os.Stderr,
"==> New container [%s]\n", formatPodAndContainer(pod, container))
printInfo("New container [%s]", formatPodAndContainer(pod, container))
}
}
return true
Expand All @@ -215,21 +260,45 @@ func main() {
break
}
}
_, _ = yellow.Fprintf(os.Stderr,
"==> Container left (%s) [%s]\n", status,
formatPodAndContainer(pod, container))
printInfo(fmt.Sprintf("Container left (%s) [%s]\n", status,
formatPodAndContainer(pod, container)))
}
},
OnError: func(pod *v1.Pod, container *v1.Container, err error) {
_, _ = red.Fprintf(os.Stderr,
"==> Warning: Error while tailing container [%s]: %s\n",
formatPodAndContainer(pod, container), err)
printError(fmt.Sprintf("Error while tailing container [%s]: %s",
formatPodAndContainer(pod, container), err))
},
})
controller.Run()

if err := controller.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
printError(err.Error())
}
}

func printInfo(format string, args ...interface{}) {
message := fmt.Sprintf(format, args...)
_, _ = fmt.Fprintf(os.Stderr, colorInfo(fmt.Sprintf("==> %s\n", message)))
}

func printError(format string, args ...interface{}) {
message := fmt.Sprintf(format, args...)
_, _ = fmt.Fprintf(os.Stderr, colorError(fmt.Sprintf("==> %s\n", message)))
}

func formatTimestamp(t *time.Time) string {
s := t.Local().Format("2006-01-02T15:04:05.999")
for len(s) < 23 {
s += "0"
}
return s
}

func fail(format string, args ...interface{}) {
fmt.Fprintf(os.Stderr, format, args...)
_, _ = fmt.Fprintf(os.Stderr, format, args...)
os.Exit(1)
}

var (
colorInfo = color.New(color.FgYellow).SprintFunc()
colorError = color.New(color.FgRed).SprintFunc()
)

0 comments on commit 838ad90

Please sign in to comment.