Browse Source

添加 'prac.py'

tags/ceshi
ZQ1 9 months ago
parent
commit
ce5470b7d7
1 changed files with 114 additions and 0 deletions
  1. +114
    -0
      prac.py

+ 114
- 0
prac.py View File

@@ -0,0 +1,114 @@
import random
import sys, os, queue
import time

environment = 'pre'
sys.path.append('environment tag:{}'.format(environment))
real_path = os.path.split(os.path.realpath(__file__))[0]
sys.path.append('{}{}..'.format(real_path, os.sep))
from locust import HttpUser, between, task, TaskSet, events
from locust.runners import WorkerRunner, MasterRunner
from config.common_config import *
from biz import home_page, performance_biz
from lib.thread_logger import get_logger
from lib.edu_http_session import EduHttpSession
from lib.data_convert import string_to_base64
from api.educoder import practices as _api_practices
from api.educoder import mypractices as _api_mypractices

logger.to_html = False
stu_queue = queue.Queue()


@events.init.add_listener
def on_locust_init_testdata(environment, **_kwargs):
"""worker启动时, 注册消息类型(stu_accounts), 用于接收master分发的学生账号"""
if isinstance(environment.runner, WorkerRunner):
environment.runner.register_message("stu_accounts", recv_stu_accounts)


@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
"""master开始测试时,根据worker数量将所有学生账号分发"""
if isinstance(environment.runner, WorkerRunner):
return
worker_count = environment.runner.worker_count
chunk_size = int(performance_biz.thread_num / worker_count)
# environment.runner.clients 是一个列表,里面放的是每个worker的ID
for i, worker in enumerate(environment.runner.clients):
# 根据数据拿到的下标,截取数据
data = performance_biz.student_accounts[i * chunk_size:i * chunk_size + chunk_size]
# 发送消息给test_users,并且指定worker
index_info = {"worker": worker, 'data': data}
# 发送详情信息
environment.runner.send_message("stu_accounts", index_info, worker)


def recv_stu_accounts(environment, msg, **kwargs):
"""接收master分发的学生账号"""
global stu_list
for stu in msg.data['data']:
stu_queue.put(stu)


class TaskTest(TaskSet):
def on_start(self):
self.user_name = stu_queue.get()
self.client.logger = get_logger('{}'.format(self.user_name))
self.client.logger.info('start test: {}'.format(self.user_name))
self.my_code = string_to_base64(data=performance_biz.answer_code)

def on_stop(self):
self.client.logger.info('stop test: {}'.format(self.user_name))
stu_queue.put(self.user_name)

@task
def test_practices(self):
# 登录
my_header = default_headers.copy()
my_header['client'] = self.client
home_page.login_educoder(username=self.user_name, password='12345678', headers=my_header)
# 打开刷题页面(获取 identifier)
res = _api_practices.start(practices_identifier=performance_biz.practices_identifier, headers=my_header)
mypractices_identifier = res.get('identifier')
assert res.get('status') == 0 and mypractices_identifier, f"开始刷题失败:{res}"
mypractices_info = _api_mypractices.get_mypractices(headers=my_header, mypractices_identifier=mypractices_identifier, hidePopLogin=True)
assert mypractices_info.get('practice'), f"在线刷题-获取题目信息失败:{mypractices_info}"
# 提交代码、开始评测
res = _api_mypractices.update_code(headers=my_header,
mypractices_identifier=mypractices_identifier,
language=performance_biz.language,
code=self.my_code)
assert res.get('status') == 0, f'{self.user_name} 提交代码失败:{res} ,{mypractices_identifier}, {time.time()}'
res = _api_mypractices.code_submit(headers=my_header, mypractices_identifier=mypractices_identifier)
assert res.get('status') == 0, f'{self.user_name} 开始评测失败:{res} ,{mypractices_identifier}, {time.time()}'
# 获取评测结果
t_start = time.time()
flag = False
result = {}
while time.time() - t_start <= performance_biz.limit_time and flag is False:
time.sleep(1)
result = _api_mypractices.result(headers=my_header, mypractices_identifier=mypractices_identifier)
assert result.get('status') in [0, 1], f'获取评测状态异常:{result}'
if result.get('status') == 0:
flag = True
result_data = result.get('data')
assert result_data, f'{self.user_name} 获取评测结果异常:{result} ,{mypractices_identifier}, {time.time()}'
passed = result_data.get('passed')
assert passed is True, f'{self.user_name} 评测未通过:{result} ,{mypractices_identifier}, {time.time()}'


class ExercisePerformance(HttpUser):
host = host
wait_time = between(performance_biz.task_wait, performance_biz.task_wait) # 任务间的时间间隔
tasks = [TaskTest]

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = EduHttpSession(
base_url=self.host,
request_event=self.environment.events.request,
user=self,
pool_manager=self.pool_manager,
rest_time=performance_biz.request_wait # 请求间的时间间隔
)

Loading…
Cancel
Save