Source code for locust.contrib.fasthttp

from __future__ import annotations

from locust.env import Environment
from locust.exception import CatchResponseError, LocustError, ResponseError
from locust.user import User
from locust.util.deprecation import DeprecatedFastHttpLocustClass as FastHttpLocust

import json
import json as unshadowed_json  # some methods take a named parameter called json
import re
import socket
import time
import traceback
from base64 import b64encode
from contextlib import contextmanager
from http.cookiejar import CookieJar
from json.decoder import JSONDecodeError
from ssl import SSLError
from typing import TYPE_CHECKING, cast
from urllib.parse import urlparse, urlunparse

import gevent
from charset_normalizer import detect
from gevent.timeout import Timeout
from geventhttpclient._parser import HTTPParseError
from geventhttpclient.client import HTTPClientPool
from geventhttpclient.header import Headers
from geventhttpclient.response import HTTPConnectionClosed, HTTPSocketPoolResponse
from geventhttpclient.useragent import CompatRequest, CompatResponse, ConnectionError, UserAgent

# borrow requests's content-type header parsing
from requests.utils import get_encoding_from_headers

if TYPE_CHECKING:
    import sys
    from collections.abc import Callable, Generator
    from typing import Any, TypedDict

    if sys.version_info >= (3, 11):
        from typing import Unpack
    else:
        from typing_extensions import Unpack

    class PostKwargs(TypedDict, total=False):
        name: str | None
        catch_response: bool
        stream: bool
        headers: dict | None
        auth: tuple[str | bytes, str | bytes] | None
        allow_redirects: bool
        context: dict

    class PutKwargs(PostKwargs, total=False):
        json: Any

    class PatchKwargs(PostKwargs, total=False):
        json: Any

    class RESTKwargs(PostKwargs, total=False):
        data: str | dict | None
        json: Any


# Monkey patch geventhttpclient.useragent.CompatRequest so that Cookiejar works with Python >= 3.3
# More info: https://github.com/requests/requests/pull/871
CompatRequest.unverifiable = False

# Workaround for AttributeError: 'CompatRequest' object has no attribute 'type' in Cookiejar
# https://github.com/locustio/locust/issues/1138
# Might allow secure cookies over non-secure connections but that is a minor concern in a load testing tool
CompatRequest.type = "https"

# Regexp for checking if an absolute URL was specified
absolute_http_url_regexp = re.compile(r"^https?://", re.I)

# List of exceptions that can be raised by geventhttpclient when sending an HTTP request,
# and that should result in a Locust failure
FAILURE_EXCEPTIONS = (
    ConnectionError,
    ConnectionRefusedError,
    ConnectionResetError,
    socket.error,
    SSLError,
    Timeout,
    HTTPConnectionClosed,
)


def _construct_basic_auth_str(username, password):
    """Construct Authorization header value to be used in HTTP Basic Auth"""
    if isinstance(username, str):
        username = username.encode("latin1")
    if isinstance(password, str):
        password = password.encode("latin1")
    return "Basic " + b64encode(b":".join((username, password))).strip().decode("ascii")


def insecure_ssl_context_factory():
    context = gevent.ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = gevent.ssl.CERT_NONE
    return context


[docs] class FastHttpSession: auth_header = None def __init__( self, environment: Environment, base_url: str, user: User | None, insecure=True, client_pool: HTTPClientPool | None = None, ssl_context_factory: Callable | None = None, **kwargs, ) -> None: self.environment = environment self.base_url = base_url self.cookiejar = CookieJar() self.user = user if not ssl_context_factory: if insecure: ssl_context_factory = insecure_ssl_context_factory else: ssl_context_factory = gevent.ssl.create_default_context self.client = LocustUserAgent( cookiejar=self.cookiejar, ssl_context_factory=ssl_context_factory, insecure=insecure, client_pool=client_pool, **kwargs, ) # Check for basic authentication parsed_url = urlparse(self.base_url) if parsed_url.username and parsed_url.password: netloc = parsed_url.hostname or "" if parsed_url.port: netloc += ":%d" % parsed_url.port # remove username and password from the base_url self.base_url = urlunparse( (parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment) ) # store authentication header (we construct this by using _basic_auth_str() function from requests.auth) self.auth_header = _construct_basic_auth_str(parsed_url.username, parsed_url.password) def _build_url(self, path: str) -> str: """prepend url with hostname unless it's already an absolute URL""" if absolute_http_url_regexp.match(path): return path else: return f"{self.base_url}{path}" def _send_request_safe_mode(self, method: str, url: str, **kwargs): """ Send an HTTP request, and catch any exception that might occur due to either connection problems, or invalid HTTP status codes """ try: return self.client.urlopen(url, method=method, **kwargs) except FAILURE_EXCEPTIONS as e: if hasattr(e, "response"): r = e.response else: req = self.client._make_request( url, method=method, headers=kwargs.get("headers"), payload=kwargs.get("payload"), params=kwargs.get("params"), ) r = ErrorResponse(url=url, request=req) r.error = e return r
[docs] def request( self, method: str, url: str, name: str | None = None, data: str | dict | None = None, catch_response: bool = False, stream: bool = False, headers: dict | None = None, auth: tuple[str | bytes, str | bytes] | None = None, json: Any = None, allow_redirects: bool = True, context: dict = {}, **kwargs, ) -> ResponseContextManager | FastResponse: """ Send and HTTP request Returns :py:class:`locust.contrib.fasthttp.FastResponse` object. :param method: method for the new :class:`Request` object. :param url: path that will be concatenated with the base host URL that has been specified. Can also be a full URL, in which case the full URL will be requested, and the base host is ignored. :param name: (optional) An argument that can be specified to use as label in Locust's statistics instead of the URL path. This can be used to group different URL's that are requested into a single entry in Locust's statistics. :param catch_response: (optional) Boolean argument that, if set, can be used to make a request return a context manager to work as argument to a with statement. This will allow the request to be marked as a fail based on the content of the response, even if the response code is ok (2xx). The opposite also works, one can use catch_response to catch a request and then mark it as successful even if the response code was not (i.e. 500 or 404). :param data: (optional) String/bytes to send in the body of the request. :param json: (optional) Json to send in the body of the request. Automatically sets Content-Type and Accept headers to "application/json". Only used if data is not set. :param headers: (optional) Dictionary of HTTP Headers to send with the request. :param auth: (optional) Auth (username, password) tuple to enable Basic HTTP Auth. :param stream: (optional) If set to true the response body will not be consumed immediately and can instead be consumed by accessing the stream attribute on the Response object. Another side effect of setting stream to True is that the time for downloading the response content will not be accounted for in the request time that is reported by Locust. :param allow_redirects: (optional) Set to True by default. """ # prepend url with hostname unless it's already an absolute URL built_url = self._build_url(url) start_time = time.time() # seconds since epoch if self.user: context = {**self.user.context(), **context} headers = headers or {} if auth: headers["Authorization"] = _construct_basic_auth_str(auth[0], auth[1]) elif self.auth_header: headers["Authorization"] = self.auth_header if "Accept-Encoding" not in headers and "accept-encoding" not in headers: headers["Accept-Encoding"] = "gzip, deflate" if not data and json is not None: data = unshadowed_json.dumps(json) if "Content-Type" not in headers and "content-type" not in headers: headers["Content-Type"] = "application/json" if "Accept" not in headers and "accept" not in headers: headers["Accept"] = "application/json" if not allow_redirects: old_redirect_response_codes = self.client.redirect_resonse_codes self.client.redirect_resonse_codes = [] start_perf_counter = time.perf_counter() # send request, and catch any exceptions response = self._send_request_safe_mode(method, built_url, payload=data, headers=headers, **kwargs) request_meta = { "request_type": method, "name": name or url, "context": context, "response": response, "exception": None, "start_time": start_time, "url": built_url, # this is a small deviation from HttpSession, which gets the final (possibly redirected) URL } if not allow_redirects: self.client.redirect_resonse_codes = old_redirect_response_codes request_meta["response_length"] = 0 # default value, if length cannot be determined # get the length of the content, but if the argument stream is set to True, we take # the size from the content-length header, in order to not trigger fetching of the body if stream: if response.headers and "response_length" in response.headers: request_meta["response_length"] = int(response.headers["response_length"]) else: try: request_meta["response_length"] = len(response.content) if response.content else 0 except HTTPParseError as e: request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000 request_meta["exception"] = e self.environment.events.request.fire(**request_meta) return response # Record the consumed time # Note: This is intentionally placed after we record the content_size above, since # we'll then trigger fetching of the body (unless stream=True) request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000 if catch_response: return ResponseContextManager(response, environment=self.environment, request_meta=request_meta) else: try: response.raise_for_status() except FAILURE_EXCEPTIONS as e: request_meta["exception"] = e self.environment.events.request.fire(**request_meta) return response
[docs] def delete(self, url: str, **kwargs: Unpack[RESTKwargs]) -> ResponseContextManager | FastResponse: """Sends a DELETE request""" return self.request("DELETE", url, **kwargs)
[docs] def get(self, url: str, **kwargs: Unpack[RESTKwargs]) -> ResponseContextManager | FastResponse: """Sends a GET request""" return self.request("GET", url, **kwargs)
[docs] def head(self, url: str, **kwargs: Unpack[RESTKwargs]) -> ResponseContextManager | FastResponse: """Sends a HEAD request""" return self.request("HEAD", url, **kwargs)
[docs] def options(self, url: str, **kwargs: Unpack[RESTKwargs]) -> ResponseContextManager | FastResponse: """Sends a OPTIONS request""" return self.request("OPTIONS", url, **kwargs)
[docs] def patch( self, url: str, data: str | dict | None = None, **kwargs: Unpack[PatchKwargs] ) -> ResponseContextManager | FastResponse: """Sends a PATCH request""" return self.request("PATCH", url, data=data, **kwargs)
[docs] def post( self, url: str, data: str | dict | None = None, json: Any = None, **kwargs: Unpack[PostKwargs] ) -> ResponseContextManager | FastResponse: """Sends a POST request""" return self.request("POST", url, data=data, json=json, **kwargs)
[docs] def put( self, url: str, data: str | dict | None = None, **kwargs: Unpack[PutKwargs] ) -> ResponseContextManager | FastResponse: """Sends a PUT request""" return self.request("PUT", url, data=data, **kwargs)
[docs] class FastHttpUser(User): """ FastHttpUser provides the same API as HttpUser, but uses geventhttpclient instead of python-requests as its underlying client. It uses considerably less CPU on the load generator, and should work as a simple drop-in-replacement in most cases. """ # Below are various UserAgent settings. Change these in your subclass to alter FastHttpUser's behaviour. # It needs to be done before FastHttpUser is instantiated, changing them later will have no effect network_timeout: float = 60.0 """Parameter passed to FastHttpSession""" connection_timeout: float = 60.0 """Parameter passed to FastHttpSession""" max_redirects: int = 30 """Parameter passed to FastHttpSession.""" max_retries: int = 0 """Parameter passed to FastHttpSession.""" insecure: bool = True """Parameter passed to FastHttpSession. Default True, meaning no SSL verification.""" default_headers: dict | None = None """Parameter passed to FastHttpSession. Adds the listed headers to every request.""" concurrency: int = 10 """Parameter passed to FastHttpSession. Describes number of concurrent requests allowed by the FastHttpSession. Default 10. Note that setting this value has no effect when custom client_pool was given, and you need to spawn a your own gevent pool to use it (as Users only have one greenlet). See test_fasthttp.py / test_client_pool_concurrency for an example.""" proxy_host: str | None = None """Parameter passed to FastHttpSession""" proxy_port: int | None = None """Parameter passed to FastHttpSession""" client_pool: HTTPClientPool | None = None """HTTP client pool to use. If not given, a new pool is created per single user.""" ssl_context_factory: Callable | None = None """A callable that return a SSLContext for overriding the default context created by the FastHttpSession.""" abstract = True """Dont register this as a User class that can be run by itself""" _callstack_regex = re.compile(r' File "(\/.[^"]*)", line (\d*),(.*)') def __init__(self, environment) -> None: super().__init__(environment) if self.host is None: raise LocustError( "You must specify the base host. Either in the host attribute in the User class, or on the command line using the --host option." ) self.client: FastHttpSession = FastHttpSession( self.environment, base_url=self.host, network_timeout=self.network_timeout, connection_timeout=self.connection_timeout, max_redirects=self.max_redirects, max_retries=self.max_retries, insecure=self.insecure, concurrency=self.concurrency, user=self, client_pool=self.client_pool, ssl_context_factory=self.ssl_context_factory, headers=self.default_headers, proxy_host=self.proxy_host, proxy_port=self.proxy_port, ) """ Instance of HttpSession that is created upon instantiation of User. The client support cookies, and therefore keeps the session between HTTP requests. """
[docs] @contextmanager def rest(self, method, url, headers: dict | None = None, **kwargs) -> Generator[RestResponseContextManager]: """ A wrapper for self.client.request that: * Parses the JSON response to a dict called ``js`` in the response object. Marks the request as failed if the response was not valid JSON. * Defaults ``Content-Type`` and ``Accept`` headers to ``application/json`` * Sets ``catch_response=True`` (so always use a :ref:`with-block <catch-response>`) * Catches any unhandled exceptions thrown inside your with-block, marking the sample as failed (instead of exiting the task immediately without even firing the request event) """ headers = headers or {} if not ("Content-Type" in headers or "content-type" in headers): headers["Content-Type"] = "application/json" if not ("Accept" in headers or "accept" in headers): headers["Accept"] = "application/json" with self.client.request(method, url, catch_response=True, headers=headers, **kwargs) as r: resp = cast(RestResponseContextManager, r) resp.js = None # type: ignore if resp.content is None: resp.failure(str(resp.error)) elif resp.text: try: resp.js = resp.json() except JSONDecodeError as e: resp.failure( f"Could not parse response as JSON. {resp.text[:250]}, response code {resp.status_code}, error {e}" ) try: yield resp except AssertionError as e: if e.args: if e.args[0].endswith(","): short_resp = resp.text[:200] if resp.text else resp.text resp.failure(f"{e.args[0][:-1]}, response was {short_resp}") else: resp.failure(e.args[0]) else: resp.failure("Assertion failed") except Exception as e: error_lines = [] for l in traceback.format_exc().split("\n"): if m := self._callstack_regex.match(l): filename = re.sub(r"/(home|Users/\w*)/", "~/", m.group(1)) error_lines.append(filename + ":" + m.group(2) + m.group(3)) short_resp = resp.text[:200] if resp.text else resp.text resp.failure(f"{e.__class__.__name__}: {e} at {', '.join(error_lines)}. Response was {short_resp}")
[docs] @contextmanager def rest_(self, method, url, name=None, **kwargs) -> Generator[RestResponseContextManager]: """ Some REST api:s use a timestamp as part of their query string (mainly to break through caches). This is a convenience method for that, appending a _=<timestamp> parameter automatically """ separator = "&" if "?" in url else "?" if name is None: name = url + separator + "_=..." with self.rest(method, f"{url}{separator}_={int(time.time()*1000)}", name=name, **kwargs) as resp: yield resp
class FastRequest(CompatRequest): payload: str | None = None @property def body(self) -> str | None: return self.payload
[docs] class FastResponse(CompatResponse): headers: Headers | None = None """Dict like object containing the response headers""" _response: HTTPSocketPoolResponse | None = None encoding: str | float | None = None """In some cases setting the encoding explicitly is needed. If so, do it before calling .text""" request: FastRequest | None = None def __init__( self, ghc_response: HTTPSocketPoolResponse, request: FastRequest | None = None, sent_request: str | None = None, ): super().__init__(ghc_response, request, sent_request) self.request = request @property def text(self) -> str | None: """ Returns the text content of the response as a decoded string """ if self.content is None: return None if self.encoding is None: if self.headers is None: # No information, try to detect self.encoding = detect(self.content)["encoding"] else: self.encoding = get_encoding_from_headers(self.headers) # No information, try to detect if not self.encoding: self.encoding = detect(self.content)["encoding"] if self.encoding is None: return None return str(self.content, str(self.encoding), errors="replace") @property def url(self) -> str | None: """ Get "response" URL, which is the same as the request URL. This is a small deviation from HttpSession, which gets the final (possibly redirected) URL. """ if self.request is not None: return self.request.url return None
[docs] def json(self) -> dict: """ Parses the response as json and returns a dict """ return json.loads(self.text) # type: ignore
def raise_for_status(self): """Raise any connection errors that occurred during the request""" if hasattr(self, "error") and self.error: raise self.error @property def status_code(self) -> int: """ We override status_code in order to return None if no valid response was returned. E.g. in the case of connection errors """ return self._response.get_code() if self._response is not None else 0 @property def ok(self): """Returns True if :attr:`status_code` is less than 400, False if not.""" return self.status_code < 400 def _content(self): if self.headers is None: return None return super()._content() def success(self): raise LocustError( "If you want to change the state of the request, you must pass catch_response=True. See http://docs.locust.io/en/stable/writing-a-locustfile.html#validating-responses" ) def failure(self): raise LocustError( "If you want to change the state of the request, you must pass catch_response=True. See http://docs.locust.io/en/stable/writing-a-locustfile.html#validating-responses" )
class ErrorResponse: """ This is used as a dummy response object when geventhttpclient raises an error that doesn't have a real Response object attached. E.g. a socket error or similar """ headers: Headers | None = None content = None status_code = 0 error: Exception | None = None text: str | None = None request: CompatRequest def __init__(self, url: str, request: CompatRequest): self.url = url self.request = request def raise_for_status(self): raise self.error class LocustUserAgent(UserAgent): response_type = FastResponse request_type = FastRequest valid_response_codes = frozenset([200, 201, 202, 203, 204, 205, 206, 207, 208, 226, 301, 302, 303, 304, 307]) def __init__(self, client_pool: HTTPClientPool | None = None, **kwargs): super().__init__(**kwargs) if client_pool is not None: self.clientpool = client_pool def _urlopen(self, request): """Override _urlopen() in order to make it use the response_type attribute""" client = self.clientpool.get_client(request.url_split) resp = client.request( request.method, request.url_split.request_uri, body=request.payload, headers=request.headers ) return self.response_type(resp, request=request, sent_request=resp._sent_request) class ResponseContextManager(FastResponse): """ A Response class that also acts as a context manager that provides the ability to manually control if an HTTP request should be marked as successful or a failure in Locust's statistics This class is a subclass of :py:class:`FastResponse <locust.contrib.fasthttp.FastResponse>` with two additional methods: :py:meth:`success <locust.contrib.fasthttp.ResponseContextManager.success>` and :py:meth:`failure <locust.contrib.fasthttp.ResponseContextManager.failure>`. """ _manual_result = None _entered = False def __init__(self, response, environment, request_meta): # copy data from response to this object self.__dict__ = response.__dict__ try: self._cached_content = response._cached_content except AttributeError: pass # store reference to locust Environment self._environment = environment self.request_meta = request_meta def __enter__(self): self._entered = True return self def __exit__(self, exc, value, traceback): # if the user has already manually marked this response as failure or success # we can ignore the default behaviour of letting the response code determine the outcome if self._manual_result is not None: if self._manual_result is True: self._report_request() elif isinstance(self._manual_result, Exception): self.request_meta["exception"] = self._manual_result self._report_request() return exc is None if exc: if isinstance(value, ResponseError): self.request_meta["exception"] = value self._report_request() else: return False else: try: self.raise_for_status() except FAILURE_EXCEPTIONS as e: self.request_meta["exception"] = e self._report_request() return True def _report_request(self): self._environment.events.request.fire(**self.request_meta) def success(self): """ Report the response as successful Example:: with self.client.get("/does/not/exist", catch_response=True) as response: if response.status_code == 404: response.success() """ if not self._entered: raise LocustError( "Tried to set status on a request that has not yet been made. Make sure you use a with-block, like this:\n\nwith self.client.request(..., catch_response=True) as response:\n response.success()" ) self._manual_result = True def failure(self, exc): """ Report the response as a failure. if exc is anything other than a python exception (like a string) it will be wrapped inside a CatchResponseError. Example:: with self.client.get("/", catch_response=True) as response: if response.content == "": response.failure("No data") """ if not self._entered: raise LocustError( "Tried to set status on a request that has not yet been made. Make sure you use a with-block, like this:\n\nwith self.client.request(..., catch_response=True) as response:\n response.failure(...)" ) if not isinstance(exc, Exception): exc = CatchResponseError(exc) self._manual_result = exc class RestResponseContextManager(ResponseContextManager): js: dict # This is technically an Optional, but I dont want to force everyone to check it error: Exception # This one too headers: Headers # .. and this one