Stay Informed with Kubernetes Informers

FireHydrant has a changelog feature with a Kubernetes integration - read how our changelog works with Kubernetes.

Robert Rossprofile image

By Robert Ross on 5/1/2019

FireHydrant has a changelog feature with a Kubernetes integration. Building this integration was challenging because the knowledge about creating an event-oriented system with the Kubernetes client-go project was not as easy to understand as I would have hoped.

To get started, I explored using the simple watcher interface that is baked into the Go package. One of the problems with this interface, however, is you end up rolling your own event type routing and indexing. This is where the Informer types in the cache package come in. So let's build a simple tool that informs us of Kubernetes resource updates!

The Cache Package

The client-go package comes with a subpackage that makes getting events easy: k8s.io/client-go/tools/cache. It allows us to easily add functions that will be called when certain events come in. It also allows us to store all of the objects in memory easily which is called a Store.

While the cache package provides the tools we need, initializing and using it can be cumbersome when it comes to receiving simple updates from the Kubernetes API. For reasons I can’t fully understand either, almost every blog post I found uses this package as is.

However, there’s another package that ties the concepts the cache package provides into one: The k8s.io/client-go/informers package. It comes with a simple factory for all Kubernetes resources that nearly mirrors the kubernetes.Interface type. Let's get to some code:

                                
package main
import (
	"os"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)
func main() {
	kubeconfig := os.Getenv("KUBECONFIG")
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	if err != nil {
		panic(err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
}
                                
                        

In this example, we’re setting up a Kubernetes client using our KUBECONFIG environment variable. This will allow us to easily create an informers factory that requires a kubernetes.Interface type. From here, let's initialize an informer factory:

                                
factory := informers.NewSharedInformerFactory(clientset, 0)
                                
                        

This package returns a type called SharedInformerFactory. You can find the documentation for this type here. The first argument in this example is our client that connects and interacts with the Kubernetes API. The second argument is how often this informer should perform a resync. What this means is it will list all resources and rehydrate the informer's store. The reason this useful is it creates a higher guarantee that your informer's store has a perfect picture of the resources it is watching. There are situations where events can be missed entirely and resyncing every so often solves this. Setting to 0 disables resync.

This is great because now we can very easily start receiving events for resources we care about. Using the appropriate API Group method, let’s start receiving Add events for Pods.

                                
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Pods().Informer()
stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		// "k8s.io/apimachinery/pkg/apis/meta/v1" provides an Object
		// interface that allows us to get metadata easily
		mObj := obj.(v1.Object)
		log.Printf("New Pod Added to Store: %s", mObj.GetName())
	},
})
informer.Run(stopper)
                                
                        

Let’s break this code down. There are a few things that are missing from this being “production” ready but this code will already display all pods in our cluster and print any new ones that are created in all namespaces.

                                
informer := factory.Core().V1().Pods().Informer()
stopper := make(chan struct{})
defer close(stopper)
                                
                        

The first line initializes an informer that is geared for Pods in the Core/v1 API Group. The second creates a simple channel that we can pass to our Informer when we tell it to run. This channel is used to stop the Informer’s run loop that is watching / listing the Pod resource.

                                
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		// "k8s.io/apimachinery/pkg/apis/meta/v1" provides an Object
		// interface that allows us to get metadata easily
		mObj := obj.(v1.Object)
		log.Printf("New Pod Added to Store: %s", mObj.GetName())
	},
})
                                
                        

On this line, we’re adding handler functions (just the AddFunc one for now). This function is called every time an object is added to our Informer's underlying Store. The single parameter in our case will always be a Pod, but we need to convert it.

Most of the time your AddFunc will just be pushing events to a work queue (more on this later), so you're not too concerned with the _actual_ type. But, I want to log the name of the Pod that was just added to our store.

I like using the k8s.io/apimachinery/pkg/apis/meta/v1 package for this because it provides an Object interface that makes retrieving Metadata on any Kubernetes object easy. So methods such as GetName() and GetNamespace() work for everything this function receives now.

                                
informer.Run(stopper)
                                
                        

Lastly, we tell our Informer to run. This starts the run loop and waits for our stopper channel to be closed. We’re not closing this channel since this is the end of our main.go file so far.

The entirety of the file looks like this now:

                                
# main.go
package main
import (
"log"
"os"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
	)
	func main() {
kubeconfig := os.Getenv("KUBECONFIG")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
	panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
	panic(err.Error())
}
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Pods().Informer()
stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		// "k8s.io/apimachinery/pkg/apis/meta/v1" provides an Object
		// interface that allows us to get metadata easily
		mObj := obj.(v1.Object)
		log.Printf("New Pod Added to Store: %s", mObj.GetName())
	},
})
informer.Run(stopper)
	}
                                
                        

So far our project connects to a Kubernetes cluster, sets up an informer for Pods in all namespaces, and then starts the Informer run loop. Our program will print all pods that are added to the cluster (and the initial warmup of pods when the Store syncs).

                                
$ go run main.go
2018/08/13 19:23:19 New Pod Added to Store: firehydrant-k8s-changelog-7846b88785-68tqs
2018/08/13 19:23:19 New Pod Added to Store: kube-dns-86f4d74b45-2fxw2
2018/08/13 19:23:19 New Pod Added to Store: kube-scheduler-minikube
2018/08/13 19:23:19 New Pod Added to Store: storage-provisioner
                                
                        

What’s Next?

In the next blog post we’ll start hardening this implementation (this is not production-ready) by adding a work queue and better control flow. I hope this intro was helpful! Stay tuned for the follow-up.

Final result is here: https://github.com/firehydrant-io/blog-stay-informed.

You just got paged. Now what?

FireHydrant helps every team master incident response with straightforward processes that build trust and make communication easy.

Learn How

See FireHydrant in action

See how service catalog, incident management, and incident communications come together in a live demo.

Get a demo