Skip to content

Nats

Introduction

The Testcontainers module for nats.

Adding this module to your project dependencies

Please run the following command to add the nats module to your Go dependencies:

go get github.com/testcontainers/testcontainers-go/examples/nats

Usage example

package nats

import (
    "context"
    "fmt"

    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/wait"
)

// natsContainer represents the nats container type used in the module
type natsContainer struct {
    testcontainers.Container
    URI string
}

// runContainer creates an instance of the nats container type
func runContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*natsContainer, error) {
    req := testcontainers.ContainerRequest{
        Image:        "nats:2.9",
        ExposedPorts: []string{"4222/tcp", "6222/tcp", "8222/tcp"},
        Cmd:          []string{"-DV", "-js"},
        WaitingFor:   wait.ForLog("Listening for client connections on 0.0.0.0:4222"),
    }

    genericContainerReq := testcontainers.GenericContainerRequest{
        ContainerRequest: req,
        Started:          true,
    }

    for _, opt := range opts {
        opt.Customize(&genericContainerReq)
    }

    container, err := testcontainers.GenericContainer(ctx, genericContainerReq)
    if err != nil {
        return nil, err
    }

    mappedPort, err := container.MappedPort(ctx, "4222/tcp")
    if err != nil {
        return nil, err
    }

    hostIP, err := container.Host(ctx)
    if err != nil {
        return nil, err
    }

    uri:= fmt.Sprintf("nats://%s:%s", hostIP, mappedPort.Port())

    return &natsContainer{Container: container, URI: uri}, nil
}
package nats

import (
    "context"
    "testing"

    "github.com/nats-io/nats.go"
)

func TestNats(t *testing.T) {
    ctx := context.Background()

    container, err := runContainer(ctx)
    if err != nil {
        t.Fatal(err)
    }

    // Clean up the container after the test is complete
    t.Cleanup(func() {
        if err := container.Terminate(ctx); err != nil {
            t.Fatalf("failed to terminate container: %s", err)
        }
    })

    // perform assertions
    nc, err := nats.Connect(container.URI)
    if err != nil {
        t.Fatalf("failed to connect to nats: %s", err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        t.Fatalf("failed to create jetstream context: %s", err)
    }

    // add stream to nats
    if _, err = js.AddStream(&nats.StreamConfig{
        Name:     "hello",
        Subjects: []string{"hello"},
    }); err != nil {
        t.Fatalf("failed to add stream: %s", err)
    }

    // add subscriber to nats
    sub, err := js.SubscribeSync("hello", nats.Durable("worker"))
    if err != nil {
        t.Fatalf("failed to subscribe to hello: %s", err)
    }

    // publish a message to nats
    if _, err = js.Publish("hello", []byte("hello")); err != nil {
        t.Fatalf("failed to publish hello: %s", err)
    }

    // wait for the message to be received
    msg, err := sub.NextMsgWithContext(ctx)
    if err != nil {
        t.Fatalf("failed to get message: %s", err)
    }

    if string(msg.Data) != "hello" {
        t.Fatalf("expected message to be 'hello', got '%s'", msg.Data)
    }
}