Anddd we're back!
Welcome back again to my adventure series on the NATS JetStream Key/Value Store!
We're finally going to write some code and use the NATS Jetstream KV Store! At last!
Where did we leave off now?
We had just finished using Datastar to hit an endpoint on our server. Now let's use it in conjunction with NATS!
Yay! We're working with NATS again.
Yes we are, let's put it to work and check out some of it's magic in some actual code.
Our application is pretty much just NATS Jetstream doing it's thing and some UI, so let's set up NATS.
In our Router.go, we'll set up the server. Let's change our SetupRoutes()
function.
We're going to add embedded NATS into our application. For this, we're going to user a library that wraps some of the work to set NATS up. It's really simple though, we can take a quick peek into what that code is doing.
It's quite simple to do. Grab this code out and then let's talk about it.
Router.go
func SetupRoutes(ctx context.Context, logger *slog.Logger, router chi.Router) (cleanup func() error, err error) {
natsPort := 1234
log.Printf("Starting on Nats server %d", natsPort)
ns, err := embeddednats.New(ctx, embeddednats.WithNATSServerOptions(&server.Options{
JetStream: true,
Port: natsPort,
}))
if err != nil {
return nil, fmt.Errorf("error creating embedded nats server: %w", err)
}
ns.WaitForServer()
cleanup = func() error {
return errors.Join(
ns.Close(),
)
}
sessionStore := sessions.NewCookieStore([]byte("session-secret"))
sessionStore.MaxAge(int(24 * time.Hour / time.Second))
nc, err := ns.Client()
if err != nil {
err = fmt.Errorf("error creating nats client: %w", err)
return nil, err
}
js, err := jetstream.New(nc)
if err != nil {
err = fmt.Errorf("error creating nats client: %w", err)
return nil, err
}
createKeyValueBuckets := func(ctx context.Context, js jetstream.JetStream) error {
createBucket := func(bucket, desc string) error {
_, err := js.CreateOrUpdateKeyValue(ctx, jetstream.KeyValueConfig{
Bucket: bucket,
Description: desc,
Compression: true,
TTL: time.Hour,
MaxBytes: 16 * 1024 * 1024,
})
if err != nil {
return fmt.Errorf("error creating bucket %q: %w", bucket, err)
}
return nil
}
if err := createBucket("gameLobbies", "Datastar Tic Tac Toe Game"); err != nil {
return err
}
if err := createBucket("gameBoards", "Datastar Tic Tac Toe Game"); err != nil {
return err
}
if err := createBucket("users", "Datastar Tic Tac Toe Game"); err != nil {
return err
}
return nil
}
if err := createKeyValueBuckets(ctx, js); err != nil {
return cleanup, err
}
if err := errors.Join(
setupIndexRoute(router, sessionStore, js),
setupDashboardRoute(router),
setupGameRoute(router),
); err != nil {
return cleanup, fmt.Errorf("error setting up routes: %w", err)
}
return cleanup, nil
}
This package just defines an embedded NATS server that you can launch directly within your Go application. The New
function accepts various options for storing data (including where to place JetStream files) and whether to clear existing data before starting.
Internally, it configures and starts a new server.Server
from the official NATS library, then watches for a canceled context to gracefully shut everything down. A helper function WaitForServer
uses exponential backoff to confirm the NATS server is fully ready for connections, while Client
returns a nats.Conn
pointing at the server’s client URL so other parts of your application can publish or subscribe.
Not too difficult eh? If you're curious, click into the embeddednats
package and check the code out yourself. It's rather simple.
After we set up the server with JetStream support a cleanup function is defined so that shutting down the NATS server is straightforward when you’re finished using it.
Next, the code sets up a session store with a 24-hour cookie lifespan and creates a JetStream client for handling NATS-based streaming features. It then calls a helper to create any necessary Key/Value buckets.
These Buckets are for our Users, Lobbies and Games. We pass the Jetstream instance into our routes so we can leverage it.
So we have NATS set up, nice! Now we need to use it in our routes. Let's start with our Index route.
Index.go
This is where we left off with our Index route.
func setupIndexRoute(router chi.Router) error {
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
pages.Index().Render(r.Context(), w)
})
router.Route("/api/index", func(indexRouter chi.Router) {
indexRouter.Get("/", func(w http.ResponseWriter, r *http.Request) {
sse := datastar.NewSSE(w, r)
sse.ExecuteScript("alert('Hello from the server!')")
})
})
return nil
}
We need to do some work, we don't want to just send an alert when we hit that endpoint, we want to log in! We also need to keep track of when someone is logged in. How can we do this?
Let's change it to look like this.
func setupIndexRoute(router chi.Router, store sessions.Store, js jetstream.JetStream) error {
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
sessionID, err := getSessionId(store, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if sessionID != "" {
http.Redirect(w, r, "/dashboard", http.StatusSeeOther)
return
}
pages.Index().Render(r.Context(), w)
})
return nil
}
So what's going on here? We now do some things immediately when we hit our Index page rather than just displaying the page.
First, we get the sessionId for this user; if the session exists (session isn't an empty string)-it means the user has already logged it. We shoot them off to the dashboard!
Otherwise, we show the Index page. Ok cool, we need to add this getSessionId
function. Let's add it to a utilities file because we will need it later!
For this we will use a cookie store with Gorilla sessions. This library just makes it easier to use cookies to keep track of user sessions.
Let's work on this.
$ touch routes/utils.go
Then copy this...
package routes
import (
"fmt"
"net/http"
"github.com/gorilla/sessions"
)
func getSessionId(store sessions.Store, r *http.Request) (string, error) {
sess, err := store.Get(r, "connections")
if err != nil {
return "", fmt.Errorf("failed to get session: %w", err)
}
id, ok := sess.Values["id"].(string)
if !ok || id == "" {
return "", nil
}
return id, nil
}
So now we need to be able to create a session. There isn't a ton of use of getting something if it never exists. We're going to be using Datastar again to create a Login component. First, let's add some handlers. We will need a handler to get the login component and a handler that the login component will use to log in.
Aiming towards Loggin' in...
Let's get the Login component first. We want to be able to grab it immediately upon landing on the Index page. There's many ways to skin a cat (sorry cat lovers, I am also a friend of cats but... it's just a figure of speech!) and we're going to do it in a way that shows off Datastar.
Now remember, when we render our Index page, we have a Datastar data-attribute that will hit our /api/index
endpoint on load. Let's add this endpoint and the necessities to show our Login component.
We will need to create a struct to hold input when a user types their name. Let's create a utility file to hold these component structs.
$ touch web/components/utils.go
Then add this...
utils.go
package components
type InlineValidationUser struct {
Name string `json:"name"`
}
routes/Index.go
package routes
import (
"net/http"
"github.com/go-chi/chi/v5"
"github.com/gorilla/sessions"
"github.com/nats-io/nats.go/jetstream"
"github.com/rphumulock/ds_nats_ttt/web/components"
"github.com/rphumulock/ds_nats_ttt/web/pages"
datastar "github.com/starfederation/datastar/sdk/go"
)
func setupIndexRoute(router chi.Router, store sessions.Store, js jetstream.JetStream) error {
userValidation := func(u *components.InlineValidationUser) bool {
return len(u.Name) >= 2
}
loadInlineUser := func(r *http.Request) (*components.InlineValidationUser, error) {
inlineUser := &components.InlineValidationUser{}
if err := datastar.ReadSignals(r, inlineUser); err != nil {
return nil, err
}
return inlineUser, nil
}
handleGetLoginComponent := func(w http.ResponseWriter, r *http.Request) {
inlineUser, err := loadInlineUser(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
sse := datastar.NewSSE(w, r)
isNameValid := userValidation(inlineUser)
sse.MergeFragmentTempl(
components.InlineValidationUserComponent(inlineUser, isNameValid),
)
}
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
sessionID, err := getSessionId(store, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if sessionID != "" {
http.Redirect(w, r, "/dashboard", http.StatusSeeOther)
return
}
pages.Index().Render(r.Context(), w)
})
router.Route("/api/index", func(indexRouter chi.Router) {
indexRouter.Get("/", handleGetLoginComponent)
})
return nil
}
Let's break this down. First we hit the Index page. We immediately hit our GET /api/index
endpoint. This will call our handleGetLoginComponent
function. In that function, we call loadInlineUser
.
On requests from Datastar, we will send whatever signals are created in it's store. Since we haven't created any it will just continue on with an empty string. So our InlineUserValidation struct will just have an empty name.
Back in handleGetLoginComponent
, we wrap the resonse to be a Datastar SSE response and then we validate on the User struct by calling userValidation
. As you can see, our name is empty, so it will fail the validation. We then pass this into a component we will create so that Datastar can send it back to the front!
Because we put an id="login"
on our div, Datastar will default to targetting the div that called the endpoint. You can also specify another element if you wished by supplying it's selector. How convenient!
Login Component
Let's make the Login component now. Since this is on our Index page, let's make the component for Index.templ
. If we needed other components, we could add them to this file as well.
$ touch web/components/Index.templ
Then add this...
package components
import (
"fmt"
datastar "github.com/starfederation/datastar/sdk/go"
)
templ inlineValidationFieldComponent(label, field string, isValid bool, isNotValidErrorLabelFmt string, labelArgs ...any) {
<div class="form-control">
<label class="label">
<span class="label-text">{ label }</span>
</label>
<input
class={ "input input-bordered text-accent rounded-none", templ.KV("input-error", !isValid) }
data-bind={ field }
data-on-keydown__debounce.500ms={ datastar.GetSSE("/api/index") }
/>
if !isValid {
<label class="text-sm font-bold text-error">{ fmt.Sprintf( isNotValidErrorLabelFmt, labelArgs...) }</label>
}
</div>
}
templ InlineValidationUserComponent(u *InlineValidationUser, isNameValid bool) {
<div id="login" data-signals__ifmissing={ templ.JSONString(u) }>
<h1 class="text-4xl font-bold text-accent tracking-wide text-center">
Ready to Play?
</h1>
<div class="flex flex-col gap-4">
@inlineValidationFieldComponent("Enter Your Name:", "name", isNameValid, "Name must be at least 2 characters.")
<button
disabled?={ !isNameValid }
class="btn btn-secondary w-full"
data-on-click={ datastar.PostSSE("api/index/login") }
>
Login
</button>
</div>
</div>
}
InlineValidationUserComponent
is our main wrapper. It takes the InlineValidationUser
struct that we read in and creates a signal for it.
<div id="login" data-signals__ifmissing={ templ.JSONString(u) }>
This is so Datastar has something to work with. Check out how this works here.
If the name is valid, you will be able to click the button to hit our login endpoint (we haven't made that yet!). The button uses a data-on-click
data-attribute and will do a POST
to our endpoint. Datastar signals will be sent to the endpoint with our request.
inlineValidationFieldComponent
is a helper component that constructs the actual form portion.
<input
class={ "input input-bordered text-accent rounded-none", templ.KV("input-error", !isValid) }
data-bind={ field }
/>
data-on-keydown__debounce.500ms={ datastar.GetSSE("/api/index") }
if !isValid {
<label class="text-sm font-bold text-error">{ fmt.Sprintf(isNotValidErrorLabelFmt, labelArgs...) }</label>
}
The data-bind
attribute links the input value to our Datastar signal, which we've designated as "Name." This establishes a binding between the provided Name
field and the Datastar signals. Additionally, we attach an event listener that triggers when a user types into the input field—debounced to prevent excessive updates with every keystroke. Validation is handled through a boolean parameter passed to the component, ensuring that if the input is invalid, the UI reflects it accordingly.
In the end Datastar will send down our component as an HTML fragment that gets merged into the DOM.
sse.MergeFragmentTempl(components.InlineValidationUserComponent(inlineUser, isNameValid),
Go ahead and check the page out now localhost:8080. You should see a new Login component!
Now all we have to do is write our actual login endpoint that is hit when we click our Login button.
Finally logging in...
We're so close!! This is a part that involves NATS. We're going to add a little bit more to our setupIndexRoute()
function.
Here is the finished product.
func setupIndexRoute(router chi.Router, store sessions.Store, js jetstream.JetStream) error {
ctx := context.Background()
usersKV, err := js.KeyValue(ctx, "users")
if err != nil {
return fmt.Errorf("failed to get 'users' KV bucket: %w", err)
}
saveUser := func(ctx context.Context, user *components.User) error {
data, err := json.Marshal(user)
if err != nil {
return fmt.Errorf("failed to marshal user: %w", err)
}
if _, err := usersKV.Put(ctx, user.SessionId, data); err != nil {
return fmt.Errorf("failed to store user in KV: %w", err)
}
return nil
}
userSession := func(w http.ResponseWriter, r *http.Request, inlineUser *components.InlineValidationUser) (*components.User, error) {
sessCtx := r.Context()
sessionID, err := createSessionId(store, r, w)
if err != nil {
return nil, fmt.Errorf("failed to get session id: %w", err)
}
user := &components.User{
SessionId: sessionID,
Name: inlineUser.Name,
}
if err := saveUser(sessCtx, user); err != nil {
return nil, fmt.Errorf("failed to save user: %w", err)
}
return user, nil
}
userValidation := func(u *components.InlineValidationUser) bool {
return len(u.Name) >= 2
}
loadInlineUser := func(r *http.Request) (*components.InlineValidationUser, error) {
inlineUser := &components.InlineValidationUser{}
if err := datastar.ReadSignals(r, inlineUser); err != nil {
return nil, err
}
return inlineUser, nil
}
handleGetLoginComponent := func(w http.ResponseWriter, r *http.Request) {
inlineUser, err := loadInlineUser(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
sse := datastar.NewSSE(w, r)
isNameValid := userValidation(inlineUser)
sse.MergeFragmentTempl(
components.InlineValidationUserComponent(inlineUser, isNameValid),
)
}
handlePostLogin := func(w http.ResponseWriter, r *http.Request) {
inlineUser, err := loadInlineUser(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if _, err := userSession(w, r, inlineUser); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sse := datastar.NewSSE(w, r)
sse.Redirect("/dashboard")
}
router.Get("/", func(w http.ResponseWriter, r *http.Request) {
sessionID, err := getSessionId(store, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if sessionID != "" {
http.Redirect(w, r, "/dashboard", http.StatusSeeOther)
return
}
pages.Index().Render(r.Context(), w)
})
router.Route("/api/index", func(indexRouter chi.Router) {
indexRouter.Get("/", handleGetLoginComponent)
indexRouter.Post("/login", handlePostLogin)
})
return nil
}
Let's go over it. When we hit our /login
endpoint, we call handlePostLogin()
. This function will read in the input for the user name similar to what we were doing before (it's calling the same loadInlineUser()
function. Then it will call our userSession()
function.
Here is where we will actually create a user session. We need to add another function to do this. In our routes/utils.go
file, add this...
func createSessionId(store sessions.Store, r *http.Request, w http.ResponseWriter) (string, error) {
sess, err := store.Get(r, "connections")
if err != nil {
return "", fmt.Errorf("failed to get session: %w", err)
}
id := toolbelt.NextEncodedID()
sess.Values["id"] = id
if err := sess.Save(r, w); err != nil {
return "", fmt.Errorf("failed to save session: %w", err)
}
return id, nil
}
Using Gorilla sessions, we will create a new session that is essentially just a random id for the user. Then we instantiate a User struct with this. Let's add that in now. Add this to our web/components.utils.go
file.
type User struct {
Name string `json:"name"`
SessionId string `json:"session_id"`
}
Once we create the User, we pass it to saveUser()
.
userSession := func(w http.ResponseWriter, r *http.Request, inlineUser *components.InlineValidationUser) (*components.User, error) {
sessCtx := r.Context()
sessionID, err := createSessionId(store, r, w)
if err != nil {
return nil, fmt.Errorf("failed to get session id: %w", err)
}
user := &components.User{
SessionId: sessionID,
Name: inlineUser.Name,
}
if err := saveUser(sessCtx, user); err != nil {
return nil, fmt.Errorf("failed to save user: %w", err)
}
return user, nil
}
And here is saveUser()
. We're using the Jetstream KV Store!! Woo! We're just marshalling the User struct and saving it to the store!
saveUser := func(ctx context.Context, user *components.User) error {
data, err := json.Marshal(user)
if err != nil {
return fmt.Errorf("failed to marshal user: %w", err)
}
if _, err := usersKV.Put(ctx, user.SessionId, data); err != nil {
return fmt.Errorf("failed to store user in KV: %w", err)
}
return nil
}
Heck yeah. When that's all done Datastar finishes up by sending us off to the Dashboard, because we're logged in!
sse := datastar.NewSSE(w, r)
sse.Redirect("/dashboard")
Nice! Go ahead and check it out!
That was a bit of work but in the end isn't much code when you look at it. It also does validation! We won't need to have front-end and back-end validation that needs to sync like traditional SPA applications (which can be a pain on occasion if you've dealt with it).
Onto the Dashboard in Part 10!
Top comments (0)