Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from locust import HttpUser, TaskSet, task
class RegistredUser(HttpUser):
min_wait = 5000
max_wait = 9000
@task
class RandomStresstest(TaskSet):
@task(2)
def list(self):
self.client.get('/random-list')
@task(1)
def insert_random_value(self):
self.client.put('/random', {'lower': 0, 'upper': 10000})
from random import randrange
from locust import HttpUser, TaskSet, task
class RegistredUser(HttpUser):
min_wait = 5000
max_wait = 9000
@task
class CrudStresstest(TaskSet):
def __get_random_user(self):
userid = str(randrange(0, 10000))
username = 'testuser_{0}'.format(userid)
email = 'some-email{0}@yahoo.com'.format(userid)
return userid, username, email
@task(1)
def add_user(self):
user_data = self.__get_random_user()
user = {
from locust import HttpUser, task, events
from locust_plugins import run_single_user, constant_total_ips
from locust_plugins.listeners import TimescaleListener
class MyUser(HttpUser):
@task
def my_task(self):
self.client.get("/")
wait_time = constant_total_ips(5)
host = "https://www.example.com"
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
TimescaleListener(env=environment, testplan="constant_total_ips", target_env="myTestEnv")
if __name__ == "__main__":
run_single_user(MyUser)
from locust_plugins.listeners import TimescaleListener
from locust import HttpUser, task, events
class MyHttpUser(HttpUser):
@task
def index(self):
self.client.post("/authentication/1.0/getResults", {"username": "something"})
host = "http://example.com"
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
TimescaleListener(env=environment, testplan="timescale_listener_ex", target_env="myTestEnv")
# self.logout()
return
######################################################################
# write your tasks ###################################################
######################################################################
@task(60)
def index(self):
self.get("/")
@task(40)
def health(self):
self.get("/health")
class WebsiteUser(HttpUser):
tasks = [WebsiteTasks]
# If you want no wait time between tasks
# wait_time = constant(0)
wait_time = between(1, 2)
# default target host
host = "http://www.example.com"
class ExampleSequentialTaskSet(SequentialTaskSet):
def on_start(self):
self.tm = TransactionManager()
@task
def home(self):
self.tm.start_transaction("startup")
self.client.get("/", name="01 /")
@task
def get_config_json(self):
self.client.get("/config.json", name="02 /config.json")
self.tm.end_transaction("startup")
class TranactionExample(HttpUser):
host = "https://www.demoblaze.com"
tasks = [ExampleSequentialTaskSet]
super().__init__(parent)
if WebdriverUser._first_instance:
WebdriverUser._first_instance = False
# kill old webdriver browser instances
subprocess.Popen(["killall", "chromedriver"])
subprocess.Popen(["pkill", "-f", " --test-type=webdriver"])
chrome_options = Options()
if headless:
chrome_options.add_argument("--headless")
self.client = webdriver.Remote(
command_executor="http://127.0.0.1:4444/wd/hub", desired_capabilities=chrome_options.to_capabilities()
)
class HttpUserWithResources(HttpUser):
"""
provides embedded resource management for HttpUser
"""
abstract = True
include_resources_by_default = True
default_resource_filter = ".*"
bundle_resource_stats = True
cache_resource_links = True
def __init__(self, *args):
super().__init__(*args)
EmbeddedResourceManager(
self,
self.include_resources_by_default,
from locust import HttpUser, TaskSet, task
import random
class DistroxApiUser(HttpUser):
min_wait = 100
max_wait = 1000
host = "http://localhost:8080/v1/kv"
@task(1)
class DistroxServerTasks(TaskSet):
keys = []
v = "\x68\x65\x6C\x6C\x6F\x20\x77\x6F\x72\x6C\x64\x21"
entry_count = 300
def put_entry(self, key):
self.client.headers['Content-Type'] = "application/json"
response = self.client.put("/{0}".format(key), data=self.v)
if response.status_code == 201:
self.keys.append(key)
import re
import subprocess
import time
import gevent
import websocket
from locust import HttpUser, User
from locust.contrib.fasthttp import FastHttpUser
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from locust_plugins.embedded_resource_manager import EmbeddedResourceManager
class SocketIOUser(HttpUser):
"""
A locust that includes a socket io websocket connection.
You could easily use this a template for a pure WS taskset,
socket.io just happens to be my use case
"""
abstract = True
def __init__(self, parent):
super().__init__(parent)
ws_host = re.sub(r"https*://", "", self.host)
self.ws = websocket.create_connection(f"wss://{ws_host}/socket.io/?EIO=3&transport=websocket")
gevent.spawn(self.receive)
def receive(self):
message_regex = re.compile(r"(\d*)(.*)")
from locust_plugins.csvreader import CSVReader
from locust import HttpUser, task
ssn_reader = CSVReader("ssn.csv")
class MyUser(HttpUser):
@task
def index(self):
customer = next(ssn_reader)
self.client.get(f"/?ssn={customer[0]}")
host = "http://example.com"