Testing other systems/protocols

Locust only comes with built-in support for HTTP/HTTPS but it can be extended to test almost any system. This is normally done by wrapping the protocol library and triggering a request event after each call has completed, to let Locust know what happened.

Note

It is important that the protocol libraries you use can be monkey-patched by gevent.

Almost any libraries that are pure Python (using the Python socket module or some other standard library function like subprocess) should work fine out of the box - but if they do their I/O calls in C, gevent will be unable to patch it. This will block the whole Locust/Python process (in practice limiting you to running a single User per worker process).

Some C libraries allow for other workarounds. For example, if you want to use psycopg2 to performance test PostgreSQL, you can use psycogreen. If you are willing to get your hands dirty, you may also be able to patch a library yourself, but that is beyond the scope of this documentation.

XML-RPC

Lets assume we have an XML-RPC server that we want to load test.

import random
import time
from xmlrpc.server import SimpleXMLRPCServer


def get_time():
    time.sleep(random.random())
    return time.time()


def get_random_number(low, high):
    time.sleep(random.random())
    return random.randint(low, high)


server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()

We can build a generic XML-RPC client, by wrapping xmlrpc.client.ServerProxy.

from locust import User, task

import time
from xmlrpc.client import Fault, ServerProxy


class XmlRpcClient(ServerProxy):
    """
    XmlRpcClient is a wrapper around the standard library's ServerProxy.
    It proxies any function calls and fires the *request* event when they finish,
    so that the calls get recorded in Locust.
    """

    def __init__(self, host, request_event):
        super().__init__(host)
        self._request_event = request_event

    def __getattr__(self, name):
        func = ServerProxy.__getattr__(self, name)

        def wrapper(*args, **kwargs):
            request_meta = {
                "request_type": "xmlrpc",
                "name": name,
                "start_time": time.time(),
                "response_length": 0,  # calculating this for an xmlrpc.client response would be too hard
                "response": None,
                "context": {},  # see HttpUser if you actually want to implement contexts
                "exception": None,
            }
            start_perf_counter = time.perf_counter()
            try:
                request_meta["response"] = func(*args, **kwargs)
            except Fault as e:
                request_meta["exception"] = e
            request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
            self._request_event.fire(**request_meta)  # This is what makes the request actually get logged in Locust
            return request_meta["response"]

        return wrapper


class XmlRpcUser(User):
    """
    A minimal Locust user class that provides an XmlRpcClient to its subclasses
    """

    abstract = True  # dont instantiate this as an actual user when running Locust

    def __init__(self, environment):
        super().__init__(environment)
        self.client = XmlRpcClient(self.host, request_event=environment.events.request)


# The real user class that will be instantiated and run by Locust
# This is the only thing that is actually specific to the service that we are testing.
class MyUser(XmlRpcUser):
    host = "http://127.0.0.1:8877/"

    @task
    def get_time(self):
        self.client.get_time()

    @task
    def get_random_number(self):
        self.client.get_random_number(0, 100)

gRPC

Lets assume we have a gRPC server that we want to load test:

import logging
import time
from concurrent import futures

import grpc
import hello_pb2
import hello_pb2_grpc

logger = logging.getLogger(__name__)


class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
    def SayHello(self, request, context):
        name = request.name
        time.sleep(1)
        return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")


def start_server():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
    server.add_insecure_port("localhost:50051")
    server.start()
    logger.info("gRPC server started")
    server.wait_for_termination()


if __name__ == "__main__":
    start_server()

The generic GrpcUser base class sends events to Locust using an interceptor:

from locust import User
from locust.exception import LocustError

import time
from typing import Any, Callable

import grpc
import grpc.experimental.gevent as grpc_gevent
from grpc_interceptor import ClientInterceptor

# patch grpc so that it uses gevent instead of asyncio
grpc_gevent.init_gevent()


class LocustInterceptor(ClientInterceptor):
    def __init__(self, environment, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.env = environment

    def intercept(
        self,
        method: Callable,
        request_or_iterator: Any,
        call_details: grpc.ClientCallDetails,
    ):
        response = None
        exception = None
        start_perf_counter = time.perf_counter()
        response_length = 0
        try:
            response = method(request_or_iterator, call_details)
            response_length = response.result().ByteSize()
        except grpc.RpcError as e:
            exception = e

        self.env.events.request.fire(
            request_type="grpc",
            name=call_details.method,
            response_time=(time.perf_counter() - start_perf_counter) * 1000,
            response_length=response_length,
            response=response,
            context=None,
            exception=exception,
        )
        return response


class GrpcUser(User):
    abstract = True
    stub_class = None

    def __init__(self, environment):
        super().__init__(environment)
        for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
            if attr_value is None:
                raise LocustError(f"You must specify the {attr_name}.")

        self._channel = grpc.insecure_channel(self.host)
        interceptor = LocustInterceptor(environment=environment)
        self._channel = grpc.intercept_channel(self._channel, interceptor)

        self.stub = self.stub_class(self._channel)

And a locustfile using the above would look like this:

from locust import events, task

import gevent
import grpc_user
import hello_pb2
import hello_pb2_grpc
from hello_server import start_server


# Start the dummy server. This is not something you would do in a real test.
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
    gevent.spawn(start_server)


class HelloGrpcUser(grpc_user.GrpcUser):
    host = "localhost:50051"
    stub_class = hello_pb2_grpc.HelloServiceStub

    @task
    def sayHello(self):
        self.stub.SayHello(hello_pb2.HelloRequest(name="Test"))

requests-based libraries/SDKs

If you want to use a library that uses a requests.Session object under the hood you will most likely be able to skip all the above complexity.

Some libraries allow you to pass a Session explicitly, like for example the SOAP client provided by Zeep. In that case, just pass it your HttpUser’s client, and any requests made using the library will be logged in Locust.

Even if your library doesn’t expose that in its interface, you may be able to get it working by overwriting some internally used Session. Here’s an example of how to do that for the Archivist client.

import locust
from locust.user import task

from archivist.archivist import Archivist  # Example library under test


class ArchivistUser(locust.HttpUser):
    def on_start(self):
        AUTH_TOKEN = None

        with open("auth.text") as f:
            AUTH_TOKEN = f.read()

        # Start an instance of of the library-provided client
        self.arch: Archivist = Archivist(url=self.host, auth=AUTH_TOKEN)
        # overwrite the internal _session attribute with the locust session
        self.arch._session = self.client

    @task
    def Create_assets(self):
        """User creates assets as fast as possible"""

        while True:
            self.arch.assets.create(behaviours=["Builtin", "RecordEvidence", "Attachments"], attrs={"foo": "bar"})

REST

See FastHttpUser

Other examples

See locust-plugins it has users for WebSocket/SocketIO, Kafka, Selenium/WebDriver, Playwright and more.