Testing other systems using custom clients

Locust was built with HTTP as its main target. However, it can easily be extended to load test any request/response based system, by writing a custom client that triggers request_success and request_failure events.

Sample XML-RPC Locust client

Here is an example of a Locust class, XmlRpcLocust, which provides an XML-RPC client, XmlRpcClient, and tracks all requests made:

import time
import xmlrpclib

from locust import Locust, TaskSet, events, task, between


class XmlRpcClient(xmlrpclib.ServerProxy):
    """
    Simple, sample XML RPC client implementation that wraps xmlrpclib.ServerProxy and 
    fires locust events on request_success and request_failure, so that all requests 
    gets tracked in locust's statistics.
    """
    def __getattr__(self, name):
        func = xmlrpclib.ServerProxy.__getattr__(self, name)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
            except xmlrpclib.Fault as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(request_type="xmlrpc", name=name, response_time=total_time, exception=e)
            else:
                total_time = int((time.time() - start_time) * 1000)
                events.request_success.fire(request_type="xmlrpc", name=name, response_time=total_time, response_length=0)
                # In this example, I've hardcoded response_length=0. If we would want the response length to be 
                # reported correctly in the statistics, we would probably need to hook in at a lower level
        
        return wrapper


class XmlRpcLocust(Locust):
    """
    This is the abstract Locust class which should be subclassed. It provides an XML-RPC client
    that can be used to make XML-RPC requests that will be tracked in Locust's statistics.
    """
    def __init__(self, *args, **kwargs):
        super(XmlRpcLocust, self).__init__(*args, **kwargs)
        self.client = XmlRpcClient(self.host)


class ApiUser(XmlRpcLocust):
    
    host = "http://127.0.0.1:8877/"
    wait_time = between(0.1, 1)
    
    class task_set(TaskSet):
        @task(10)
        def get_time(self):
            self.client.get_time()
        
        @task(5)
        def get_random_number(self):
            self.client.get_random_number(0, 100)

If you’ve written Locust tests before, you’ll recognize the class called ApiUser which is a normal Locust class that has a TaskSet class with tasks in its task_set attribute. However, the ApiUser inherits from XmlRpcLocust that you can see right above ApiUser. The XmlRpcLocust class provides an instance of XmlRpcClient under the client attribute. The XmlRpcClient is a wrapper around the standard library’s xmlrpclib.ServerProxy. It basically just proxies the function calls, but with the important addition of firing locust.events.request_success and locust.events.request_failure events, which will make all calls reported in Locust’s statistics.

Here’s an implementation of an XML-RPC server that would work as a server for the code above:

import random
import time
from SimpleXMLRPCServer import SimpleXMLRPCServer


def get_time():
    time.sleep(random.random())
    return time.time()

def get_random_number(low, high):
    time.sleep(random.random())
    return random.randint(low, high)

server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()