Testing other systems/protocols
Locust only comes with built-in support for HTTP/HTTPS but it can be extended to test almost any system. This is normally done by wrapping the protocol library and triggering a request event after each call has completed, to let Locust know what happened.
Note
It is important that the protocol libraries you use can be monkey-patched by gevent.
Almost any libraries that are pure Python (using the Python socket module or some other standard library function like subprocess) should work fine out of the box - but if they do their I/O calls from compiled code C, gevent will be unable to patch it. This will block the whole Locust/Python process (in practice limiting you to running a single User per worker process).
Some C libraries allow for other workarounds. For example, if you want to use psycopg2 to performance test PostgreSQL, you can use psycogreen. If you are willing to get your hands dirty, you may be able to patch a library yourself, but that is beyond the scope of this documentation.
XML-RPC
Lets assume we have an XML-RPC server that we want to load test.
import random
import time
from xmlrpc.server import SimpleXMLRPCServer
def get_time():
time.sleep(random.random())
return time.time()
def get_random_number(low, high):
time.sleep(random.random())
return random.randint(low, high)
server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()
We can build a generic XML-RPC client, by wrapping xmlrpc.client.ServerProxy.
from locust import User, task
import time
from xmlrpc.client import Fault, ServerProxy
class XmlRpcClient(ServerProxy):
"""
XmlRpcClient is a wrapper around the standard library's ServerProxy.
It proxies any function calls and fires the *request* event when they finish,
so that the calls get recorded in Locust.
"""
def __init__(self, host, request_event):
super().__init__(host)
self._request_event = request_event
def __getattr__(self, name):
func = ServerProxy.__getattr__(self, name)
def wrapper(*args, **kwargs):
request_meta = {
"request_type": "xmlrpc",
"name": name,
"start_time": time.time(),
"response_length": 0, # calculating this for an xmlrpc.client response would be too hard
"response": None,
"context": {}, # see HttpUser if you actually want to implement contexts
"exception": None,
}
start_perf_counter = time.perf_counter()
try:
request_meta["response"] = func(*args, **kwargs)
except Fault as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
self._request_event.fire(**request_meta) # This is what makes the request actually get logged in Locust
return request_meta["response"]
return wrapper
class XmlRpcUser(User):
"""
A minimal Locust user class that provides an XmlRpcClient to its subclasses
"""
abstract = True # dont instantiate this as an actual user when running Locust
def __init__(self, environment):
super().__init__(environment)
self.client = XmlRpcClient(self.host, request_event=environment.events.request)
# The real user class that will be instantiated and run by Locust
# This is the only thing that is actually specific to the service that we are testing.
class MyUser(XmlRpcUser):
host = "http://127.0.0.1:8877/"
@task
def get_time(self):
self.client.get_time()
@task
def get_random_number(self):
self.client.get_random_number(0, 100)
gRPC
Lets assume we have a gRPC server that we want to load test:
import logging
import time
from concurrent import futures
import grpc
import hello_pb2
import hello_pb2_grpc
logger = logging.getLogger(__name__)
class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
def SayHello(self, request, context):
name = request.name
time.sleep(1)
return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")
def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
server.add_insecure_port("localhost:50051")
server.start()
logger.info("gRPC server started")
server.wait_for_termination()
if __name__ == "__main__":
start_server()
The generic GrpcUser base class sends events to Locust using an interceptor:
from locust import User
from locust.exception import LocustError
import time
from collections.abc import Callable
from typing import Any
import grpc
import grpc.experimental.gevent as grpc_gevent
from grpc_interceptor import ClientInterceptor
# patch grpc so that it uses gevent instead of asyncio
grpc_gevent.init_gevent()
class LocustInterceptor(ClientInterceptor):
def __init__(self, environment, *args, **kwargs):
super().__init__(*args, **kwargs)
self.env = environment
def intercept(
self,
method: Callable,
request_or_iterator: Any,
call_details: grpc.ClientCallDetails,
):
response = None
exception = None
start_perf_counter = time.perf_counter()
response_length = 0
try:
response = method(request_or_iterator, call_details)
response_length = response.result().ByteSize()
except grpc.RpcError as e:
exception = e
self.env.events.request.fire(
request_type="grpc",
name=call_details.method,
response_time=(time.perf_counter() - start_perf_counter) * 1000,
response_length=response_length,
response=response,
context=None,
exception=exception,
)
return response
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
interceptor = LocustInterceptor(environment=environment)
self._channel = grpc.intercept_channel(self._channel, interceptor)
self.stub = self.stub_class(self._channel)
And a locustfile using the above would look like this:
from locust import events, task
import gevent
import grpc_user
import hello_pb2
import hello_pb2_grpc
from hello_server import start_server
# Start the dummy server. This is not something you would do in a real test.
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)
class HelloGrpcUser(grpc_user.GrpcUser):
host = "localhost:50051"
stub_class = hello_pb2_grpc.HelloServiceStub
@task
def sayHello(self):
self.stub.SayHello(hello_pb2.HelloRequest(name="Test"))
requests-based libraries/SDKs
If you want to use a library that uses a requests.Session object under the hood you will most likely be able to skip all the above complexity.
Some libraries allow you to pass a Session explicitly, like for example the SOAP client provided by Zeep. In that case, just pass it your HttpUser’s client, and any requests made using the library will be logged in Locust.
Even if your library doesn’t expose that in its interface, you may be able to get it working by overwriting some internally used Session. Here’s an example of how to do that for the Archivist client.
import locust
from locust.user import task
from archivist.archivist import Archivist # Example library under test
class ArchivistUser(locust.HttpUser):
def on_start(self):
AUTH_TOKEN = None
with open("auth.text") as f:
AUTH_TOKEN = f.read()
# Start an instance of of the library-provided client
self.arch: Archivist = Archivist(url=self.host, auth=AUTH_TOKEN)
# overwrite the internal _session attribute with the locust session
self.arch._session = self.client
@task
def Create_assets(self):
"""User creates assets as fast as possible"""
while True:
self.arch.assets.create(behaviours=["Builtin", "RecordEvidence", "Attachments"], attrs={"foo": "bar"})
REST
See FastHttpUser
SocketIO
See SocketIOUser
Note
SocketIO support is experimental and may change without notice.
pytest
Locust allows you to use pytest syntax to define Locust Users using pytest fixtures (currently HttpSession and FastHttpSession). It has multiple benefits:
Simpler syntax than regular Locustfiles
Run or debug easily from any editor that supports pytest
Reliably reuse functional test cases for load testing
from locust.clients import HttpSession # this import is just for type hints
import time
# pytest/locust will discover any functions prefixed with "test_" as test cases.
# session and fastsession are pytest fixtures provided by Locust's pytest plugin.
def test_stuff(session):
resp = session.get("https://www.locust.io/")
# Bad HTTP status codes in the response dont automatically raise an exception,
# so if that is what you want, you need to call:
resp.raise_for_status()
# In Locust, request-related exceptions are caught (and the test case restarted),
# in pytest any exceptions fail the test case
# Just like with Locust, you can set a base URL using --host/-H when using pytest.
# Or you can set a default:
if not session.base_url:
session.base_url = "https://www.locust.io"
# catch_response works just like in regular locustfiles
with session.get("/", catch_response=True) as resp:
if not resp.text or not "Locust" in resp.text:
resp.failure("important text was missing in response")
# raise_for_status also respects calls to resp.failure()/.success()
# so this will raise an exception and fail the test case if "Load" was missing
resp.raise_for_status()
# you can call helper functions as needed
helper_function(session)
# unlike regular Locust Users, there's no wait_time, so use time.sleep instead
time.sleep(0.1)
# this is not a test case and won't be detected by pytest/locust
def helper_function(session: HttpSession):
session.get("/")
Example usage:
$ locust -f test_pytest.py
$ pytest test_pytest.py
# if you have issues with gevent patching (RecursionError: maximum recursion depth exceeded)
$ python -m gevent.monkey -m pytest test_pytest.py
Limitations:
Each test case becomes a Locust User under the hood. We don’t (yet) support weighting users.
Locust will only look for pytest-style tests if there are no regular User classes defined.
Adding other pytest fixtures or pytest plugins may cause issues (do let us know though)
For a more complex example, see https://github.com/locustio/locust/blob/master/locust/test/test_pytest_locustfile.py
Note
pytest support is experimental and may change without a new major release version.
OpenAI
Performance/load testing AI services is a little different. While you could call the OpenAI API using HttpUser or FastHttpUser, it is often convenient to use the SDK.
# You need to install the openai package and set OPENAI_API_KEY env var to run this
# OpenAIUser tracks the number of output tokens in the response_length field,
# because it is more useful than the actual payload size. This field is available to event handlers.
from locust import run_single_user, task
from locust.contrib.oai import OpenAIUser
class MyUser(OpenAIUser):
@task
def t(self):
self.client.responses.create(
model="gpt-4o",
instructions="You are a coding assistant that speaks like it were a Monty Python skit.",
input="How do I check if a Python object is an instance of a class?",
)
# print(response.output_text)
with self.client.rename_request("mini"): # here's how to rename requests
self.client.responses.create(
model="gpt-4o-mini",
instructions="You are a coding assistant that speaks like it were a Monty Python skit.",
input="How do I check if a Python object is an instance of a class?",
)
if __name__ == "__main__":
run_single_user(MyUser)
Note
OpenAIUser is experimental and may change without notice.
MQTT
Locust uses to paho-mqtt to provide Mqtt connection capabilities.
from locust import task
from locust.contrib.mqtt import MqttUser
from locust.user.wait_time import between
import time
class MyUser(MqttUser):
host = "localhost"
port = 1883
# We could uncomment below to use the WebSockets transport
# transport = "websockets"
# ws_path = "/mqtt/custom/path"
# We'll probably want to throttle our publishing a bit: let's limit it to
# 10-100 messages per second.
wait_time = between(0.01, 0.1)
# Uncomment below if you need to set MQTTv5
# protocol = paho.mqtt.client.MQTTv5
# Sleep for a while to allow the client time to connect.
# This is probably not the most "correct" way to do this: a better method
# might be to add a gevent.event.Event to the MqttClient's on_connect
# callback and wait for that (with a timeout) here.
# However, this works well enough for the sake of an example.
def on_start(self):
time.sleep(5)
@task
def say_hello(self):
self.client.publish("hello/locust", b"hello world")
Alternatively, if you need more control over the Mqtt client you can use a custom implementation.
from locust import task
from locust.contrib.mqtt import MqttClient, MqttUser
from locust.user.wait_time import between
import time
# extend the MqttClient class with your own custom implementation
class MyMqttClient(MqttClient):
# you can override the event name with your custom implementation
def _generate_event_name(self, event_type: str, qos: int, topic: str):
return f"mqtt:{event_type}:{qos}"
class MyUser(MqttUser):
host = "localhost"
port = 1883
# We could uncomment below to use the WebSockets transport
# transport = "websockets"
# ws_path = "/mqtt/custom/path"
# We'll probably want to throttle our publishing a bit: let's limit it to
# 10-100 messages per second.
wait_time = between(0.01, 0.1)
# override the client_cls with your custom MqttClient implementation
client_cls = MyMqttClient
# Sleep for a while to allow the client time to connect.
# This is probably not the most "correct" way to do this: a better method
# might be to add a gevent.event.Event to the MqttClient's on_connect
# callback and wait for that (with a timeout) here.
# However, this works well enough for the sake of an example.
def on_start(self):
time.sleep(5)
@task
def say_hello(self):
self.client.publish("hello/locust", b"hello world locust custom client")
Note
MqttUser is experimental and may change without notice.
Other examples
See locust-plugins it has users for Kafka, Selenium/WebDriver, Playwright and more.