MQTT

MQTT support is only available in FrontLine.

Prerequisites

Gatling FrontLine MQTT DSL is not imported by default.

One has to manually add the following imports:

import io.gatling.frontline.mqtt.Predef._

MQTT Protocol

Use the mqtt object in order to create a MQTT protocol.

  • mqttVersion_3_1: enable protocol version 3.1 (default: false)
  • mqttVersion_3_1_1: enable protocol version 3.1.1 (default: true)
  • broker("hostname", port): broker address (default: localhost:1883)
  • useTls(boolean): if TLS should be enabled (default: false)
  • clientId("id"): clientIdentifier sent in the connect payload (of not set, Gatling will generate a random one)
  • cleanSession(boolean): if session should be cleaned during connect (default: true)
  • credentials("${userName}", "${password}"): optional credentials for connecting
  • keepAlive(30): connections keep alive timeout
  • qosAtMostOnce: use at-most-once QoS (default: true)
  • qosAtLeastOnce: use at-least-once QoS (default: false)
  • qosExactlyOnce: use exactly-once QoS (default: false)
  • retain(true): enable retain (default: false)
  • lastWill(LastWill("${willTopic}", StringBody("${willMessage}")).qosAtLeastOnce.retain(true)): send last will, possibly with specific QoS and retain
  • reconnectAttemptsMax(1): max number of reconnects after connection crash (default: 3)
  • reconnectDelay(1L): reconnect delay after connection crash in millis (default: 100)
  • reconnectBackoffMultiplier(1.5F): reconnect delay exponential backoff (default: 1.5)
  • resendDelay(1000): resend delay after send failure in millis (default: 5000)
  • resendBackoffMultiplier(2.0F): resend delay exponential backoff (default: 1.0)
  • timeoutCheckInterval(1 second): interval for timeout checker (default: 1 second)
  • correlateBy(check): check for pairing messages sent and messages received

MQTT Request API

Use the mqtt("requestName") method in order to create a MQTT request.

Connect

Use the connect method to connect to the MQTT broker:

mqtt("Connecting").connect

Subscribe

Use the subscribe method to subscribe to an MQTT topic:

mqtt("Subscribing")
  .subscribe("${myTopic}")
  .qosAtMostOnce // override default QoS

Publish

Use the publish method to publish a message. You can use the same Body API as for HTTP request bodies:

mqtt("Publishing")
  .publish("${myTopic}")
  .message(StringBody("${myTextPayload}"))

MQTT Checks

You can define blocking checks with await and non-blocking checks with expect. Those can be set right after subscribing, or after publishing:

// subscribe and expect to receive a message within 100ms, without blocking flow
mqtt("Subscribing").subscribe("${myTopic2}")
  .expect(100 milliseconds)
// publish and wait (block) until it receives a message withing 100ms
mqtt("Publishing").publish("${myTopic}").message(StringBody("${myPayload}"))
  .wait(100 milliseconds)

You can optionally define in which topic the expected message will be received:

.wait(100 milliseconds, "repub/${myTopic}")

You can optionally define check criteria to be applied on the matching received message:

mqtt("Publishing")
  .publish("${myTopic}").message(StringBody("${myPayload}"))
  .wait(100 milliseconds).check(jsonPath("$.error").notExists)

You can use waitForMessages and block for all pending non-blocking checks:

exec(waitForMessages.timeout(100 milliseconds))

Example

import scala.concurrent.duration._
import io.gatling.core.Predef._
import io.gatling.frontline.mqtt.Predef._

class MqttSample {

  private val mqttConf = mqtt
    .broker("localhost", 1883)
    .correlateBy(jsonPath("$.correlationId"))

  private val scn = scenario("MQTT Test")
    .feed(csv("topics-and-payloads.csv"))
    .exec(mqtt("Connecting").connect)
    .exec(mqtt("Subscribing").subscribe("${myTopic}"))
    .exec(mqtt("Publishing").publish("${myTopic}").message(StringBody("${myTextPayload}"))
      .expect(100 milliseconds).check(jsonPath("$.error").notExists))

  setUp(scn.inject(rampUsersPerSec(10) to 1000 during (2 minutes)))
    .protocols(mqttConf)
}