Testing non-HTTP systems

Locust only comes with built-in support for HTTP/HTTPS but it can be extended to load test almost any system. You do this by writing a custom client that triggers request

Note

It is important that any protocol libraries you use can be monkey-patched by gevent (if they use the Python socket module or some other standard library function like subprocess you will be fine). Otherwise your calls will block the whole Locust/Python process (in practice limiting you to running a single User per worker process)

Some C libraries cannot be monkey patched by gevent, but allow for other workarounds. For example, if you want to use psycopg2 to performance test PostgreSQL, you can use psycogreen.

Example: writing an XML-RPC User/client

Lets assume we had an XML-RPC server that we wanted to load test

import random
import time
from xmlrpc.server import SimpleXMLRPCServer


def get_time():
    time.sleep(random.random())
    return time.time()


def get_random_number(low, high):
    time.sleep(random.random())
    return random.randint(low, high)


server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()

We can build a generic XML-RPC client, by wrapping xmlrpc.client.ServerProxy

import time
from xmlrpc.client import ServerProxy, Fault

from locust import User, task


class XmlRpcClient(ServerProxy):
    """
    XmlRpcClient is a wrapper around the standard library's ServerProxy.
    It proxies any function calls and fires the *request* event when they finish,
    so that the calls get recorded in Locust.
    """

    def __init__(self, host, request_event):
        super().__init__(host)
        self._request_event = request_event

    def __getattr__(self, name):
        func = ServerProxy.__getattr__(self, name)

        def wrapper(*args, **kwargs):
            start_time = time.perf_counter()
            request_meta = {
                "request_type": "xmlrpc",
                "name": name,
                "response_length": 0,  # calculating this for an xmlrpc.client response would be too hard
                "response": None,
                "context": {},  # see HttpUser if you actually want to implement contexts
                "exception": None,
            }
            try:
                request_meta["response"] = func(*args, **kwargs)
            except Fault as e:
                request_meta["exception"] = e
            request_meta["response_time"] = (time.perf_counter() - start_time) * 1000
            self._request_event.fire(**request_meta)  # This is what makes the request actually get logged in Locust
            return request_meta["response"]

        return wrapper


class XmlRpcUser(User):
    """
    A minimal Locust user class that provides an XmlRpcClient to its subclasses
    """

    abstract = True  # dont instantiate this as an actual user when running Locust

    def __init__(self, environment):
        super().__init__(environment)
        self.client = XmlRpcClient(self.host, request_event=environment.events.request)


# The real user class that will be instantiated and run by Locust
# This is the only thing that is actually specific to the service that we are testing.
class MyUser(XmlRpcUser):
    host = "http://127.0.0.1:8877/"

    @task
    def get_time(self):
        self.client.get_time()

    @task
    def get_random_number(self):
        self.client.get_random_number(0, 100)

Example: writing a gRPC User/client

If you have understood the XML-RPC example, you can easily build a gRPC User.

The only significant difference is that you need to make gRPC gevent-compatible, by executing this code before opening the channel:

import grpc.experimental.gevent as grpc_gevent

grpc_gevent.init_gevent()

Dummy server to test:

import hello_pb2_grpc
import hello_pb2
import grpc
from concurrent import futures
import logging
import time

logger = logging.getLogger(__name__)


class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
    def SayHello(self, request, context):
        name = request.name
        time.sleep(1)
        return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")


def start_server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
    server.add_insecure_port("localhost:50051")
    server.start()
    logger.info("gRPC server started")
    server.wait_for_termination()

gRPC client, base User and example usage:

# make sure you use grpc version 1.39.0 or later,
# because of https://github.com/grpc/grpc/issues/15880 that affected earlier versions
import grpc
import hello_pb2_grpc
import hello_pb2
from locust import events, User, task
from locust.exception import LocustError
from locust.user.task import LOCUST_STATE_STOPPING
from hello_server import start_server
import gevent
import time

# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent

grpc_gevent.init_gevent()


@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
    # Start the dummy server. This is not something you would do in a real test.
    gevent.spawn(start_server)


class GrpcClient:
    def __init__(self, stub):
        self._stub_class = stub.__class__
        self._stub = stub

    def __getattr__(self, name):
        func = self._stub_class.__getattribute__(self._stub, name)

        def wrapper(*args, **kwargs):
            start_time = time.perf_counter()
            request_meta = {
                "request_type": "grpc",
                "name": name,
                "response_length": 0,
                "exception": None,
                "context": None,
                "response": None,
            }
            try:
                request_meta["response"] = func(*args, **kwargs)
                request_meta["response_length"] = len(request_meta["response"].message)
            except grpc.RpcError as e:
                request_meta["exception"] = e
            request_meta["response_time"] = (time.perf_counter() - start_time) * 1000
            events.request.fire(**request_meta)
            return request_meta["response"]

        return wrapper


class GrpcUser(User):
    abstract = True

    stub_class = None

    def __init__(self, environment):
        super().__init__(environment)
        for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
            if attr_value is None:
                raise LocustError(f"You must specify the {attr_name}.")
        self._channel = grpc.insecure_channel(self.host)
        self._channel_closed = False
        stub = self.stub_class(self._channel)
        self.client = GrpcClient(stub)


class HelloGrpcUser(GrpcUser):
    host = "localhost:50051"
    stub_class = hello_pb2_grpc.HelloServiceStub

    @task
    def sayHello(self):
        if not self._channel_closed:
            self.client.SayHello(hello_pb2.HelloRequest(name="Test"))
        time.sleep(1)

For more examples of user types, see locust-plugins (it has users for WebSocket/SocketIO, Kafka, Selenium/WebDriver and more)