SSE (Server Sent Event)

Learn the possible SSE operations with Gatling: connect, close.

SSE support is an extension to the HTTP DSL, whose entry point is the sse(requestName: Expression[String]) method.

sseName

If you want to deal with several SSE streams per virtual users, you have to give them a name and pass this name on each SSE operation: For example:

     
sse("Sse").sseName("myCustomName");
sse("Sse").sseName("myCustomName")
sse("Sse").sseName("myCustomName")

Of course, this step is not required if you deal with one single SSE stream per virtual user.

Connecting

The first thing is to connect the stream:

Gatling supports GET and POST requests:

     
exec(sse("Connect").get("/stocks/prices"));
exec(sse("Connect").post("/stocks/prices").body(StringBody("{\"foo\": \"bar\"}")));
exec(sse("Connect").get("/stocks/prices"))
exec(sse("Connect").post("/stocks/prices").body(StringBody("{\"foo\": \"bar\"}")))
exec(sse("Connect").get("/stocks/prices"))
exec(sse("Connect").post("/stocks/prices").body(StringBody("""{"foo": "bar"}""")))

close

Once you’re done with a SSE stream, you can close it.

     
exec(sse("Close").close());
exec(sse("Close").close())
exec(sse("Close").close)

Checks

You deal with incoming messages with checks.

Beware of not missing messages that would be received prior to setting the check.

Gatling currently only supports blocking checks that will wait until receiving expected message or timing out.

Set a Check

You can set a check right after connecting:

     
exec(sse("Connect").get("/stocks/prices")
  .await(5).on(sseCheck));
exec(sse("Connect").get("/stocks/prices")
  .await(5).on(sseCheck))
exec(sse("Connect").get("/stocks/prices")
  .await(5)(sseCheck))

Or you can set a check from main flow:

     
exec(sse("SetCheck").setCheck()
  .await(30).on(sseCheck));
exec(sse("SetCheck").setCheck()
  .await(30).on(sseCheck))
exec(sse("SetCheck").setCheck
  .await(30)(sseCheck))

You can set multiple checks sequentially. Each one will expect one single frame.

You can configure multiple checks in a single sequence:

     
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// whole sequence must complete withing 30 seconds
exec(sse("SetCheck").setCheck()
  .await(30).on(sseCheck1, sseCheck2));
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// whole sequence must complete withing 30 seconds
exec(sse("SetCheck").setCheck()
  .await(30).on(sseCheck, sseCheck))
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// whole sequence must complete withing 30 seconds
exec(sse("SetCheck").setCheck
  .await(30)(sseCheck1, sseCheck2))

You can also configure multiple check sequences with different timeouts:

     
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// both sequences must complete withing 15 seconds
// 2nd sequence will start after 1st one completes
exec(sse("SetCheck").setCheck()
  .await(15).on(sseCheck1)
  .await(15).on(sseCheck2));
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// both sequences must complete withing 15 seconds
// 2nd sequence will start after 1st one completes
exec(sse("SetCheck").setCheck()
  .await(15).on(sseCheck)
  .await(15).on(sseCheck))
// expecting 2 messages
// 1st message will be validated against sseCheck1
// 2nd message will be validated against sseCheck2
// both sequences must complete withing 15 seconds
// 2nd sequence will start after 1st one completes
exec(sse("SetCheck").setCheck
  .await(15)(sseCheck1)
  .await(15)(sseCheck2))

Create a check

You can create checks for server events with checkMessage. You can use almost all the same check criteria as for HTTP requests.

     
SseMessageCheck sseCheck = sse.checkMessage("checkName")
  .check(regex("event: snapshot(.*)"));
val sseCheck = sse.checkMessage("checkName")
  .check(regex("event: snapshot(.*)"))
val sseCheck = sse.checkMessage("checkName")
  .check(regex("event: snapshot(.*)"))

You can have multiple criteria for a given message:

     
sse.checkMessage("checkName")
  .check(
    regex("event: event1(.*)"),
    regex("event: event2(.*)")
  );
sse.checkMessage("checkName")
  .check(
    regex("event: event1(.*)"),
    regex("event: event2(.*)")
  )
sse.checkMessage("checkName")
  .check(
    regex("event: event1(.*)"),
    regex("event: event2(.*)")
  )

Matching messages

You can define matching criteria to filter messages you want to check. Matching criterion is a standard check, except it doesn’t take saveAs. Non-matching messages will be ignored.

     
exec(sse("SetCheck").setCheck()
  .await(1).on(
    sse.checkMessage("checkName")
      .matching(substring("event"))
      .check(regex("event: snapshot(.*)"))
  ));
exec(sse("SetCheck").setCheck()
  .await(1).on(
    sse.checkMessage("checkName")
      .matching(substring("event"))
      .check(regex("event: snapshot(.*)"))
  ))
exec(sse("SetCheck").setCheck
  .await(1)(
    sse.checkMessage("checkName")
      .matching(substring("event"))
      .check(regex("event: snapshot(.*)"))
  ))

Processing unmatched messages

You can use processUnmatchedMessages to process inbound messages that haven’t been matched with a check and have been buffered. By default, unmatched inbound messages are not buffered, you must enable this feature by setting the size of the buffer on the protocol with .sseUnmatchedInboundMessageQueueSize(maxSize). The buffer is reset when:

  • sending an outbound message
  • calling processUnmatchedMessages so we don’t present the same message twice

You can then pass your processing logic as a function. The list of messages passed to this function is sorted in timestamp ascending (meaning older messages first). It contains instances of types io.gatling.http.action.sse.SseInboundMessage.

     
exec(
  // store the unmatched messages in the Session
  ws.processUnmatchedMessages((messages, session) -> session.set("messages", messages))
);
exec(
  // collect the last message and store it in the Session
  sse.processUnmatchedMessages(
          (messages, session) ->
                  !messages.isEmpty()
                          ? session.set("lastMessage", messages.get(messages.size() - 1).message())
                          : session)
);
exec(
  // store the unmatched messages in the Session
  ws.processUnmatchedMessages { messages, session -> session.set("messages", messages) }
)
exec(
  // collect the last message and store it in the Session
  sse.processUnmatchedMessages { messages, session ->
    if (messages.isNotEmpty()) session.set("lastMessage", messages.last().message())
    else session
  }
)
exec(
  // store the unmatched messages in the Session
  sse.processUnmatchedMessages((messages, session) => session.set("messages", messages))
)
exec(
  // collect the last message and store it in the Session
  sse.processUnmatchedMessages { (messages, session) =>
    messages
      .lastOption
      .fold(session)(m => session.set("lastMessage", m.message))
  }
)

Configuration

SSE support introduces new HttpProtocol parameters:

     
http
  // enable unmatched SSE inbound messages buffering,
  // with a max buffer size of 5
  .sseUnmatchedInboundMessageBufferSize(5);
http
  // enable unmatched SSE inbound messages buffering,
  // with a max buffer size of 5
  .sseUnmatchedInboundMessageBufferSize(5)
http
  // enable unmatched SSE inbound messages buffering,
  // with a max buffer size of 5
  .sseUnmatchedInboundMessageBufferSize(5)

Debugging

You can inspect streams if you add the following logger to your logback configuration:

<logger name="io.gatling.http.action.sse.fsm" level="DEBUG" />

Example

Here’s an example that runs against a stock market sample:

     
ScenarioBuilder scn = scenario("ServerSentEvents")
  .exec(
    sse("Stocks").get("/stocks/prices")
      .await(10).on(
        sse.checkMessage("checkName").check(regex("event: snapshot(.*)"))
      ),
    pause(15),
    sse("Close").close()
  );
val scn = scenario("ServerSentEvents")
  .exec(
    sse("Stocks").get("/stocks/prices")
      .await(10).on(
        sse.checkMessage("checkName").check(regex("event: snapshot(.*)"))
      ),
    pause(15),
    sse("Close").close()
  )
val scn = scenario("ServerSentEvents")
  .exec(
    sse("Stocks").get("/stocks/prices")
      .await(10)(
        sse.checkMessage("checkName").check(regex("event: snapshot(.*)"))
      ),
    pause(15),
    sse("Close").close
  )

Edit this page on GitHub