Jump to content
新域网络技术论坛

Python发起自定义高并发压力测试


Jamers
 Share

Recommended Posts

apache中有ab压力测试工具,但是无法自定义参数,并且无法分析最终结果,特写此文,共勉之。

我们先在服务端创建一个有10%概率成功的PHP脚本,由于高并发使用默认的随机数算法可能有些问题,所以采用了自定义的随机数种子算法。这里接收id用以区分请求顺序号。

$rate = 10;
$id = isset($_REQUEST['id']) ? $_REQUEST['id'] : '0';

$seed = sRandSeed();
mt_srand($seed);
if (mt_rand(1, 100) <= $rate) {
    $code = 0;
    $msg = 'success';
} else {
    $code = 1;
    $msg = 'busy';
}

header('Content-type: application/json');
echo json_encode(['code' => $code, 'msg' => $msg, 'id' => $id, 'seed' => $seed,]);

function sRandSeed()
{
    $hash = md5(session_create_id());
    $result = 0x003F;
    foreach (str_split($hash, 8) as $v) {
        $result ^= hexdec($v);
    }
    return $result & 0x7FFFFFFF;
}

 

由于windows最大同时打开文件只能是512个,我这里限制最大并发数为500了,如果非windows可以适当增加此值,否则会报错,无法执行下去。由于高并发的时候,有可能http服务会报错,如果返回的值不是标准json串,设置此次请求为失效(code = -1),并统计各code的数量。

# -*- Coding: UTF-8 -*-
import time
import sys
import getopt
import asyncio
from aiohttp import ClientSession
import json
import codecs
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

url = "http://192.168.0.200/cli/asynctest.php"
tasks = []
result = {}


async def run(cid=0, obj=None):
    global cols
    if obj is None:
        obj = asyncio.Semaphore(cols)
    async with obj:
        async with ClientSession() as session:
            async with session.post(url, data={"id": cid}) as response:
                try:
                    text = await response.text()
                    j = json.loads(text)
                except json.JSONDecodeError:
                    if -1 not in result:
                        result[-1] = 0
                    result[-1] += 1
                else:
                    if j["code"] not in result:
                        result[j["code"]] = 0
                    result[j["code"]] += 1
                    # print(j)

if __name__ == "__main__":
    nums = 10
    cols = 500
    if len(sys.argv) > 1:
        opts, args = getopt.getopt(sys.argv[1:], "hn:c:")
        for op, value in opts:
            if op == "-h":
                print("python " + sys.argv[0] + " -n 100 -c 500")
                sys.exit()
            elif op == "-n":
                nums = int(value)
            elif op == '-c':
                cols = int(value)
    loop = asyncio.get_event_loop()
    start_time = time.time()
    semaphore = asyncio.Semaphore(cols)
    tasks = [run(cid, semaphore) for cid in range(nums)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    end_time = time.time()
    print(result)
    print("Loop Nums:", str(nums))
    print("Start Time:", str(start_time))
    print("End Time:", str(end_time))
    print("Running Time:", str(end_time - start_time))

 

Link to comment
Share on other sites

我在FREEBSD上测试过了,用参数 -n 10000 -c 10000 没问题,所以并发压力测试的时候尽量不要用windows,另外考虑带宽问题,尽量在内网走,基本不受带宽限制。

 

如果像高并发秒杀的时候,可以模拟各种情况了。甚至于分析最终秒杀成功的用户是否属于随机,保证算法的公平性。

Link to comment
Share on other sites

继续优化python代码,添加请求模式(POST|GET),添加链接配置,直接通过参数可控制具体请求的链接。

# -*- Coding: UTF-8 -*-
"""
自定义参数高并发压力测试工具
@author Jamers
@since  2019.05.08
"""

import time
import sys
import getopt
import asyncio
from aiohttp import ClientSession
import json
import re
import codecs
sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())

url = "http://192.168.0.200/cli/asynctest.php"
tasks = []
result = {}


def usage():
    print("usage: python " + sys.argv[0] + " -n 100 -c 500 -m post|get http://www.baidu.com")
    sys.exit()


async def run(cid=0, obj=None):
    global cols
    if obj is None:
        obj = asyncio.Semaphore(cols)
    async with obj:
        async with ClientSession() as session:
            if method == 'get':
                resp = session.get(url, data={"id": cid})
            else:
                resp = session.post(url, data={"id": cid})
            async with resp as response:
                try:
                    text = await response.text()
                    j = json.loads(text)
                except json.JSONDecodeError:
                    if -1 not in result:
                        result[-1] = 0
                    result[-1] += 1
                else:
                    if j["code"] not in result:
                        result[j["code"]] = 0
                    result[j["code"]] += 1
                    # print(j)

if __name__ == "__main__":
    nums = 10
    cols = 500
    method = 'get'
    if len(sys.argv) > 1:
        opts, args = getopt.getopt(sys.argv[1:], "hn:c:m:")
        if len(args) > 0:
            reg = re.compile(r"^https?://[^/]{3,}\S*$", re.I)
            url = args.pop(0)
            if not reg.match(url):
                print("URL must be Illegal")
                usage()

        for op, value in opts:
            if op == "-h":
                usage()
            elif op == "-n":
                nums = int(value)
            elif op == '-c':
                cols = int(value)
            elif op == '-m':
                tmp = value.lower()
                if tmp in ['post', 'get', ]:
                    method = tmp
                else:
                    print("method only set to post or get")
                    usage()
    loop = asyncio.get_event_loop()
    start_time = time.time()
    semaphore = asyncio.Semaphore(cols)
    tasks = [run(cid, semaphore) for cid in range(nums)]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    end_time = time.time()
    print(result)
    print("Loop Nums:", str(nums))
    print("Start Time:", str(start_time))
    print("End Time:", str(end_time))
    print("Running Time:", str(end_time - start_time))

 

Link to comment
Share on other sites

Please sign in to comment

You will be able to leave a comment after signing in



Sign In Now
 Share

×
×
  • Create New...