Recently I got this obsession to learn Rust. While there are plenty of documentation around, like Rust Book, some good published books, tutorials, rustling, I always struggle to learn something when I am not doing something practical, so I have decided to go to my comfort zone and write a Kubernetes Controller using kube-rs.
This article intends to help people with some basic knowledge of Rust and Kubernetes controllers written in Go to build a simple controller and understand a bit more how it works. I won't cover here how to install Rust, or how to manage Cargo, or even details of dependencies like tokio. The goal is to build a simple controller :)
If you have any questions on "why this line is required", I highly recommend you to paste the question in ChatGPT asking for an explanation. It may make things clear! An example: https://chatgpt.com/share/67c78315-4d30-8002-8aa1-731171fc533f
Getting a Kubernetes client
The first step on a controller is to bootstrap a Kubernetes Client. We need to add the kube-rs with the client feature and Kubernetes OpenAPI to our program.
cargo add kube -F client
cargo add k8s-openapi -F latest
After that, we can create a program that connects to our Kubernetes cluster using the current Kubernetes context.
use kube::Client;
fn main() {
let client = Client::try_default();
}
But if you try to run this program with cargo run
it is going to be a no-op.
Because of the nature of Kubernetes and its operation being "network operations", kube-rs considers all the operations as "async" operations, and we need a way to manage it. The well-known and mostly used way of doing it is with the tokio framework. Adding tokio, with features of "macros" (to use macros like #[tokio::main]
) and a runtime (rt-multi-thread) should be enough. Additionally, we need to return an error to this function, so we will be using anyhow crate, that allows us to return errors without worrying about the error type
cargo add tokio -F macros,rt-multi-thread
cargo add anyhow
And then we update the program as:
use kube::Client;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let client = Client::try_default()
.await
.expect("error getting a Kubernetes client");
Ok(())
}
Great, our program is still a no-op but prepared for Kubernetes Client. Let's list some Services. We will need to add Kubernetes Core types to the scope, and then use the client to list it:
use k8s_openapi::api::core::v1::Service; // Add Service API to scope
use kube::{
Client,
api::{Api, ListParams},
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let client = Client::try_default()
.await
.expect("error getting a Kubernetes client");
let service: Api<Service> = Api::all(client); // Define our service client with all namespaces on context
let lp = ListParams::default(); // List anything. Here you can set filters like label filters
for svc in service.list(&lp).await? {
println!(
"found service {}/{}",
svc.metadata.namespace.unwrap(),
svc.metadata.name.unwrap()
);
}
Ok(()) // This is a function that returns a "Result", so we need to return something
}
Great, doing a cargo run
should return all the Services of our cluster. This main program will be split to a new "run" function, for the sake of being more clean:
use k8s_openapi::api::core::v1::Service;
use kube::{
Client,
api::{Api, ListParams},
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
run().await;
Ok(())
}
pub async fn run() {
let client = Client::try_default()
.await
.expect("error getting a Kubernetes client");
let service: Api<Service> = Api::all(client);
let lp = ListParams::default();
for svc in service.list(&lp).await.expect("error listing services") {
println!(
"found service {}/{}",
svc.metadata.namespace.unwrap(),
svc.metadata.name.unwrap()
);
}
}
Creating a "context"
Because of Rust nature of memory safety, borrow checking and variables lifetimes, we need to be sure that the client will exist for any new thread of our controller. The way of doing it is creating a struct that contains a clone of our client, and eventually any other resource we may need during reconciliation, like Prometheus instrumentation client, etc.
We can add this context struct as part of our lib.rs to make it available to the whole module we are writing, so we will have now a program that creates a new context for every reconciler we define:
// lib.rs
use kube::Client;
// This macro makes our structure "clonable"
#[derive(Clone)]
pub struct Context {
pub client: Client,
}
// main.rs
// we will hide main function and some imports
use my_rust_controller::Context; // my_rust_controller is my crate name. There may be a better way to do it, so feel free to comment :)
pub async fn run() {
let client = Client::try_default()
.await
.expect("error getting a Kubernetes client");
let ctx = Context {
client: client.clone(),
};
}
Again, this program is a no-op. What we are doing here is, everytime run() is called, it will create a Kubernetes client, and then a new context with a clone of this client is created (remember, Rust lifetimes)
Creating the controller
With everything in place, we can now create a new module that will contain our controller logics. The idea of this controller is, "given a new service created, if its name is bad-service on namespace default print an error".
Before anything, we need to add kube-rs controller runtime to our project. We will also add the "thiserror" to make it easier to generate the errors, and "futures" as a required dependency
cargo add kube -F runtime
cargo add thiserror futures
Our main.rs and lib.rs will need some changes to call the reconciler, and contain some errors enum:
// lib.rs
use kube::Client;
use thiserror::Error;
pub mod controller;
// This macro makes our structure "clonable"
#[derive(Clone)]
pub struct Context {
pub client: Client,
}
#[derive(Error, Debug)]
pub enum Error {
#[error("kube error: {0}")]
KubeError(#[source] kube::Error),
#[error("invalid configuration: `{0}`")]
InvalidConfigError(String),
#[error("Bad service to reconcile: `{0}`")]
BadService(String),
#[error("service listing error: {0}")]
ServiceError(#[source] kube::Error),
}
// Result type should be used during reconciliation return
pub type Result<T, E = Error> = std::result::Result<T, E>;
// ---------------------------
// main.rs
use kube::Client;
use my_rust_controller::Context;
use my_rust_controller::controller;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
run().await;
Ok(())
}
pub async fn run() {
let client = Client::try_default()
.await
.expect("error getting a Kubernetes client");
let ctx = Context {
client: client.clone(),
};
if let Err(error) = controller::controller(ctx).await {
println!("failed to start Service controller: {error:?}");
std::process::exit(1);
}
}
Ok, now let's write our controller. The controller will mostly watch for "services" and print messages. In case the service name is "bad-service" and namespace is "default" it will return an error:
// controller.rs
use futures::StreamExt;
use k8s_openapi::api::core::v1::Service;
use kube::{
api::{Api, ListParams},
runtime::{Controller, controller::Action, watcher::Config},
};
use std::{sync::Arc, time::Duration};
use crate::*;
pub async fn controller(ctx: Context) -> Result<()> {
let service = Api::<Service>::all(ctx.client.clone());
service
.list(&ListParams::default().limit(1))
.await
.map_err(Error::ServiceError)?;
Controller::new(service, Config::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(ctx))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.await;
Ok(())
}
pub async fn reconcile(service: Arc<Service>, _: Arc<Context>) -> Result<Action> {
let name = service
.metadata
.name
.clone()
.ok_or(Error::InvalidConfigError("invalid name".to_string()))?;
let ns = service
.metadata
.namespace
.clone()
.ok_or(Error::InvalidConfigError("invalid namespace".to_string()))?;
println!("reconciling service {ns}/{name}");
if name == "bad-service" && ns == "default" {
return Err(Error::BadService("Service has a bad name".to_string()));
}
Ok(Action::requeue(Duration::from_secs(60)))
}
// error_policy is the function called when a reconciliation error happens
fn error_policy(_: Arc<Service>, error: &Error, _: Arc<Context>) -> Action {
println!(
"error happened during reconciliation of service svc {:?}",
error
);
Action::requeue(Duration::from_secs(5 * 60)) // Requeue after 5 minutes
}
Running this program, we will get the following outputs:
$ kubectl create service clusterip bad-service --tcp=80
.....
$ cargo run
reconciling service kube-system/kube-dns
reconciling service default/nginx
reconciling service default/kubernetes
reconciling service default/bad-service
error happened during reconciliation of service svc BadService("Service has a bad name")
Conclusion (part 1)
Here we explored a very simple controller. It doesn't do anything useful other than printing messages. It doesn't even gets the object to check labels, or adds finalizer.
On the next parts, we will:
- Given a label, patch the service
- Add finalizers
- Add some better logging
References:
- kube-rs controller example: https://github.com/kube-rs/controller-rs/blob/main/src/lib.rs
- Kubernetes Blixt: https://github.com/kubernetes-sigs/blixt
Top comments (0)