Apache Pulsar¶
Since testcontainers-go v0.19.0
Introduction¶
The Testcontainers module for Apache Pulsar.
Testcontainers can be used to automatically create Apache Pulsar containers without external services.
It's based on the official Apache Pulsar docker image, so it is recommended to read the official guide.
Adding this module to your project dependencies¶
Please run the following command to add the Apache Pulsar module to your Go dependencies:
go get github.com/testcontainers/testcontainers-go/modules/pulsar
Usage example¶
Create a Pulsar
container to use it in your tests:
c, err := testcontainerspulsar.RunContainer(
ctx,
tt.opts...,
)
require.Nil(t, err)
defer func() {
err := c.Terminate(ctx)
require.Nil(t, err)
}()
where the tt.opts
are the options to configure the container. See the Container Options section for more details.
Module Reference¶
The Redis module exposes one entrypoint function to create the containerr, and this function receives two parameters:
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*Container, error)
context.Context
, the Go context.testcontainers.ContainerCustomizer
, a variadic argument for passing options.
Container Options¶
When starting the Pulsar container, you can pass options in a variadic way to configure it.
Image¶
If you need to set a different Pulsar Docker image, you can use testcontainers.WithImage
with a valid Docker image
for Pulsar. E.g. testcontainers.WithImage("docker.io/apachepulsar/pulsar:2.10.2")
.
testcontainers.WithImage("docker.io/apachepulsar/pulsar:2.10.2"),
Wait Strategies¶
If you need to set a different wait strategy for Pulsar, you can use testcontainers.WithWaitStrategy
with a valid wait strategy
for Pulsar.
Info
The default deadline for the wait strategy is 60 seconds.
At the same time, it's possible to set a wait strategy and a custom deadline with testcontainers.WithWaitStrategyAndDeadline
.
Docker type modifiers¶
If you need an advanced configuration for Pulsar, you can leverage the following Docker type modifiers:
testcontainers.WithConfigModifier
testcontainers.WithHostConfigModifier
testcontainers.WithEndpointSettingsModifier
Please read the Create containers: Advanced Settings documentation for more information.
testcontainers.WithConfigModifier(func(config *container.Config) {
config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m")
}),
testcontainers.WithHostConfigModifier(func(hostConfig *container.HostConfig) {
hostConfig.Resources = container.Resources{
Memory: 1024 * 1024 * 1024,
}
}),
testcontainers.WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) {
settings[nwName] = &network.EndpointSettings{
Aliases: []string{"pulsar"},
}
}),
Pulsar Configuration¶
If you need to set Pulsar configuration variables you can use the WithPulsarEnv
to set Pulsar environment variables: the PULSAR_PREFIX_
prefix will be automatically added for you.
For example, if you want to enable brokerDeduplicationEnabled
:
testcontainerspulsar.WithPulsarEnv("brokerDeduplicationEnabled", "true"),
It will result in the PULSAR_PREFIX_brokerDeduplicationEnabled=true
environment variable being set in the container request.
Pulsar IO¶
If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker with the WithFunctionsWorker
option:
testcontainerspulsar.WithFunctionsWorker(),
Pulsar Transactions¶
If you need to test Pulsar Transactions you can enable the transactions feature:
testcontainerspulsar.WithTransactions(),
Log consumers¶
If you need to collect the logs from the Pulsar container, you can add your own LogConsumer with the WithLogConsumers
function, which accepts a variadic argument of LogConsumers.
if len(c.LogConsumers) > 0 {
c.WithLogConsumers(ctx, tt.logConsumers...)
defer c.StopLogProducer()
}
An example of a LogConsumer could be the following:
// logConsumer is a testcontainers.LogConsumer that prints the log to stdout
type testLogConsumer struct{}
// Accept prints the log to stdout
func (lc *testLogConsumer) Accept(l testcontainers.Log) {
fmt.Print(string(l.Content))
}
Warning
You will need to explicitly stop the producer in your tests.
If you want to know more about LogConsumers, please check the Following Container Logs documentation.
Container methods¶
Once you have a Pulsar container, then you can retrieve the broker and the admin url:
Admin URL¶
serviceURL, err := c.HTTPServiceURL(ctx)
require.Nil(t, err)
Broker URL¶
brokerURL, err := c.BrokerURL(ctx)
require.Nil(t, err)