Testing other systems using custom clients¶
Locust was built with HTTP as it’s main target. However, it can easily be extended to load test any request/response based system, by writing a custom client that triggers request_success and request_failure events.
Sample XML-RPC Locust client¶
Here is an example of a Locust class, XmlRpcLocust, which provides an XML-RPC client, XmlRpcClient, and tracks all requests made:
import time import xmlrpclib from locust import Locust, events, task, TaskSet class XmlRpcClient(xmlrpclib.ServerProxy): """ Simple, sample XML RPC client implementation that wraps xmlrpclib.ServerProxy and fires locust events on request_success and request_failure, so that all requests gets tracked in locust's statistics. """ def __getattr__(self, name): func = xmlrpclib.ServerProxy.__getattr__(self, name) def wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) except xmlrpclib.Fault as e: total_time = int((time.time() - start_time) * 1000) events.request_failure.fire(request_type="xmlrpc", name=name, response_time=total_time, exception=e) else: total_time = int((time.time() - start_time) * 1000) events.request_success.fire(request_type="xmlrpc", name=name, response_time=total_time, response_length=0) # In this example, I've hardcoded response_length=0. If we would want the response length to be # reported correctly in the statistics, we would probably need to hook in at a lower level return wrapper class XmlRpcLocust(Locust): """ This is the abstract Locust class which should be subclassed. It provides an XML-RPC client that can be used to make XML-RPC requests that will be tracked in Locust's statistics. """ def __init__(self, *args, **kwargs): super(XmlRpcLocust, self).__init__(*args, **kwargs) self.client = XmlRpcClient(self.host) class ApiUser(XmlRpcLocust): host = "http://127.0.0.1:8877/" min_wait = 100 max_wait = 1000 class task_set(TaskSet): @task(10) def get_time(self): self.client.get_time() @task(5) def get_random_number(self): self.client.get_random_number(0, 100)
If you’ve written Locust tests before, you’ll recognize the class called ApiUser which is a normal Locust class that has a TaskSet class with tasks in it’s task_set attribute. However, the ApiUser inherits from XmlRpcLocust that you can see right above ApiUser. The XmlRpcLocust class provides an instance of XmlRpcClient under the client attribute. The XmlRpcClient is a wrapper around the standard library’s xmlrpclib.ServerProxy. It basically just proxies the function calls, but with the important addition of firing locust.events.request_success and locust.events.request_failure events, which will make all calls reported in Locust’s statistics.
Here’s an implementation of an XML-RPC server that would work as a server for the code above:
import time import random from SimpleXMLRPCServer import SimpleXMLRPCServer import xmlrpclib def get_time(): time.sleep(random.random()) return time.time() def get_random_number(low, high): time.sleep(random.random()) return random.randint(low, high) server = SimpleXMLRPCServer(("localhost", 8877)) print "Listening on port 8877..." server.register_function(get_time, "get_time") server.register_function(get_random_number, "get_random_number") server.serve_forever()