|
- import random
- import sys, os, queue
- import time
-
- environment = 'pre'
- sys.path.append('environment tag:{}'.format(environment))
- real_path = os.path.split(os.path.realpath(__file__))[0]
- sys.path.append('{}{}..'.format(real_path, os.sep))
- from locust import HttpUser, between, task, TaskSet, events
- from locust.runners import WorkerRunner, MasterRunner
- from config.common_config import *
- from biz import home_page, performance_biz
- from lib.thread_logger import get_logger
- from lib.edu_http_session import EduHttpSession
- from lib.data_convert import string_to_base64
- from api.educoder import practices as _api_practices
- from api.educoder import mypractices as _api_mypractices
-
- logger.to_html = False
- stu_queue = queue.Queue()
-
-
- @events.init.add_listener
- def on_locust_init_testdata(environment, **_kwargs):
- """worker启动时, 注册消息类型(stu_accounts), 用于接收master分发的学生账号"""
- if isinstance(environment.runner, WorkerRunner):
- environment.runner.register_message("stu_accounts", recv_stu_accounts)
-
-
- @events.test_start.add_listener
- def on_test_start(environment, **_kwargs):
- """master开始测试时,根据worker数量将所有学生账号分发"""
- if isinstance(environment.runner, WorkerRunner):
- return
- worker_count = environment.runner.worker_count
- chunk_size = int(performance_biz.thread_num / worker_count)
- # environment.runner.clients 是一个列表,里面放的是每个worker的ID
- for i, worker in enumerate(environment.runner.clients):
- # 根据数据拿到的下标,截取数据
- data = performance_biz.student_accounts[i * chunk_size:i * chunk_size + chunk_size]
- # 发送消息给test_users,并且指定worker
- index_info = {"worker": worker, 'data': data}
- # 发送详情信息
- environment.runner.send_message("stu_accounts", index_info, worker)
-
-
- def recv_stu_accounts(environment, msg, **kwargs):
- """接收master分发的学生账号"""
- global stu_list
- for stu in msg.data['data']:
- stu_queue.put(stu)
-
-
- class TaskTest(TaskSet):
- def on_start(self):
- self.user_name = stu_queue.get()
- self.client.logger = get_logger('{}'.format(self.user_name))
- self.client.logger.info('start test: {}'.format(self.user_name))
- self.my_code = string_to_base64(data=performance_biz.answer_code)
-
- def on_stop(self):
- self.client.logger.info('stop test: {}'.format(self.user_name))
- stu_queue.put(self.user_name)
-
- @task
- def test_practices(self):
- # 登录
- my_header = default_headers.copy()
- my_header['client'] = self.client
- home_page.login_educoder(username=self.user_name, password='12345678', headers=my_header)
- # 打开刷题页面(获取 identifier)
- res = _api_practices.start(practices_identifier=performance_biz.practices_identifier, headers=my_header)
- mypractices_identifier = res.get('identifier')
- assert res.get('status') == 0 and mypractices_identifier, f"开始刷题失败:{res}"
- mypractices_info = _api_mypractices.get_mypractices(headers=my_header, mypractices_identifier=mypractices_identifier, hidePopLogin=True)
- assert mypractices_info.get('practice'), f"在线刷题-获取题目信息失败:{mypractices_info}"
- # 提交代码、开始评测
- res = _api_mypractices.update_code(headers=my_header,
- mypractices_identifier=mypractices_identifier,
- language=performance_biz.language,
- code=self.my_code)
- assert res.get('status') == 0, f'{self.user_name} 提交代码失败:{res} ,{mypractices_identifier}, {time.time()}'
- res = _api_mypractices.code_submit(headers=my_header, mypractices_identifier=mypractices_identifier)
- assert res.get('status') == 0, f'{self.user_name} 开始评测失败:{res} ,{mypractices_identifier}, {time.time()}'
- # 获取评测结果
- t_start = time.time()
- flag = False
- result = {}
- while time.time() - t_start <= performance_biz.limit_time and flag is False:
- time.sleep(1)
- result = _api_mypractices.result(headers=my_header, mypractices_identifier=mypractices_identifier)
- assert result.get('status') in [0, 1], f'获取评测状态异常:{result}'
- if result.get('status') == 0:
- flag = True
- result_data = result.get('data')
- assert result_data, f'{self.user_name} 获取评测结果异常:{result} ,{mypractices_identifier}, {time.time()}'
- passed = result_data.get('passed')
- assert passed is True, f'{self.user_name} 评测未通过:{result} ,{mypractices_identifier}, {time.time()}'
-
-
- class ExercisePerformance(HttpUser):
- host = host
- wait_time = between(performance_biz.task_wait, performance_biz.task_wait) # 任务间的时间间隔
- tasks = [TaskTest]
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.client = EduHttpSession(
- base_url=self.host,
- request_event=self.environment.events.request,
- user=self,
- pool_manager=self.pool_manager,
- rest_time=performance_biz.request_wait # 请求间的时间间隔
- )
|