Testing other systems using custom clients¶
Locust was built with HTTP as its main use case but it can be extended to load test almost any system. You do this by writing a custom client that triggers request
Note
Any protocol libraries that you use must be gevent-friendly (use the Python socket
module or some other standard library function like subprocess
), or your calls are likely to block the whole Locust/Python process.
Some C libraries cannot be monkey patched by gevent, but allow for other workarounds. For example, if you want to use psycopg2 to performance test PostgreSQL, you can use psycogreen.
Example: writing an XML-RPC User/client¶
Lets assume we had an XML-RPC server that we wanted to load test
import random
import time
from xmlrpc.server import SimpleXMLRPCServer
def get_time():
time.sleep(random.random())
return time.time()
def get_random_number(low, high):
time.sleep(random.random())
return random.randint(low, high)
server = SimpleXMLRPCServer(("localhost", 8877))
print("Listening on port 8877...")
server.register_function(get_time, "get_time")
server.register_function(get_random_number, "get_random_number")
server.serve_forever()
We can build a generic XML-RPC client, by wrapping xmlrpc.client.ServerProxy
import time
from xmlrpc.client import ServerProxy, Fault
from locust import User, task
class XmlRpcClient(ServerProxy):
"""
XmlRpcClient is a wrapper around the standard library's ServerProxy.
It proxies any function calls and fires the *request* event when they finish,
so that the calls get recorded in Locust.
"""
def __init__(self, host, request_event):
super().__init__(host)
self._request_event = request_event
def __getattr__(self, name):
func = ServerProxy.__getattr__(self, name)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
request_meta = {
"request_type": "xmlrpc",
"name": name,
"response_length": 0, # calculating this for an xmlrpc.client response would be too hard
"response": None,
"context": {}, # see HttpUser if you actually want to implement contexts
"exception": None,
}
try:
request_meta["response"] = func(*args, **kwargs)
except Fault as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_time) * 1000
self._request_event.fire(**request_meta) # This is what makes the request actually get logged in Locust
return request_meta["response"]
return wrapper
class XmlRpcUser(User):
"""
A minimal Locust user class that provides an XmlRpcClient to its subclasses
"""
abstract = True # dont instantiate this as an actual user when running Locust
def __init__(self, environment):
super().__init__(environment)
self.client = XmlRpcClient(self.host, request_event=environment.events.request)
# The real user class that will be instantiated and run by Locust
# This is the only thing that is actually specific to the service that we are testing.
class MyUser(XmlRpcUser):
host = "http://127.0.0.1:8877/"
@task
def get_time(self):
self.client.get_time()
@task
def get_random_number(self):
self.client.get_random_number(0, 100)
For more examples, see locust-plugins
Example: writing a gRPC User/client¶
Similarly to the XML-RPC example, we can also load test a gRPC server.
import hello_pb2_grpc
import hello_pb2
import grpc
from concurrent import futures
import logging
import time
logger = logging.getLogger(__name__)
class HelloServiceServicer(hello_pb2_grpc.HelloServiceServicer):
def SayHello(self, request, context):
name = request.name
time.sleep(1)
return hello_pb2.HelloResponse(message=f"Hello from Locust, {name}!")
def start_server():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
hello_pb2_grpc.add_HelloServiceServicer_to_server(HelloServiceServicer(), server)
server.add_insecure_port("localhost:50051")
server.start()
logger.info("gRPC server started")
server.wait_for_termination()
In this case, the gRPC stub methods can also be wrapped so that we can record the request stats.
import grpc
import hello_pb2_grpc
import hello_pb2
from locust import events, User, task
from locust.exception import LocustError
from locust.user.task import LOCUST_STATE_STOPPING
from hello_server import start_server
import gevent
import time
# patch grpc so that it uses gevent instead of asyncio
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
@events.init.add_listener
def run_grpc_server(environment, **_kwargs):
gevent.spawn(start_server)
class GrpcClient:
def __init__(self, stub):
self._stub_class = stub.__class__
self._stub = stub
def __getattr__(self, name):
func = self._stub_class.__getattribute__(self._stub, name)
def wrapper(*args, **kwargs):
start_time = time.perf_counter()
request_meta = {
"request_type": "grpc",
"name": name,
"response_length": 0,
"exception": None,
"context": None,
"response": None,
}
try:
request_meta["response"] = func(*args, **kwargs)
request_meta["response_length"] = len(request_meta["response"].message)
except grpc.RpcError as e:
request_meta["exception"] = e
request_meta["response_time"] = (time.perf_counter() - start_time) * 1000
events.request.fire(**request_meta)
return request_meta["response"]
return wrapper
class GrpcUser(User):
abstract = True
stub_class = None
def __init__(self, environment):
super().__init__(environment)
for attr_value, attr_name in ((self.host, "host"), (self.stub_class, "stub_class")):
if attr_value is None:
raise LocustError(f"You must specify the {attr_name}.")
self._channel = grpc.insecure_channel(self.host)
self._channel_closed = False
stub = self.stub_class(self._channel)
self.client = GrpcClient(stub)
def stop(self, force=False):
self._channel_closed = True
time.sleep(1)
self._channel.close()
super().stop(force=True)
class HelloGrpcUser(GrpcUser):
host = "localhost:50051"
stub_class = hello_pb2_grpc.HelloServiceStub
@task
def sayHello(self):
if not self._channel_closed:
self.client.SayHello(hello_pb2.HelloRequest(name="Test"))
time.sleep(1)
Note: In order to make the grpcio Python library gevent-compatible the following code needs to be executed before creating the gRPC channel.
`python
import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()
`
Note: It is important to close the gRPC channel before stopping the User greenlet; otherwise Locust may not be able to stop executing. This is due to an issue in grpcio (see grpc#15880).