summaryrefslogtreecommitdiff
path: root/examples/streams-stress-test.py
blob: 33cc20ceae3342976b4db7225e6b7e1122abdf0b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python3
# vim: set sts=2 sw=2 et tw=0 :
#

import json
import time
import random
import atexit
import datetime
import subprocess
import http.client
from itertools import zip_longest

PROXY_HOST = "localhost:8000"
GST_LAUNCH_BASE = "gst-launch-1.0 -e videotestsrc is-live=TRUE ! videoscale ! \
  video/x-raw, height=120, width=160 ! timeoverlay font-desc=80px ! tee name=t \
  t. ! queue ! videoconvert ! vp8enc ! webmmux ! tee name=s \
    "
GST_LAUNCH_STREAMER = ""

rtp_caps = "application/x-rtp, media=(string)video"
start_port = 5004
end_port = 5104
token_table = {}
processes = []
stream_count = 0

for port in range(start_port, end_port+1):
  someid = "{}-{}".format(datetime.datetime.now().strftime("%H%M%S.%f"),
                          random.randint(0, 100))
  clients = "localhost:{}".format(port)
  token_table.update({someid: clients})

def cleanup():
  for p in processes:
    p.term()
  for sid, clients in token_table.items():
    revoke_token(conn, sid).read()

def grouper(n, iterable, fillvalue=None):
  "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
  args = [iter(iterable)] * n
  return zip_longest(fillvalue=fillvalue, *args)

def add_token(conn, someid, clients):
  conn.request("POST", "/add-token?sessionid={}&type=http,rtp-udp&udp-clients={}".format(someid, clients))
  return conn.getresponse()

def list_tokens(conn):
  conn.request("GET", "/list-tokens")
  return conn.getresponse()

def revoke_token(conn, someid):
  conn.request("DELETE", "/revoke-token?sessionid={}".format(someid))
  return conn.getresponse()

def list_sessionids(conn):
  r = list_tokens(conn)
  l = json.loads(r)
  return [e['sessionid'] for e in l]

def list_udp_clients(conn):
  r = list_tokens(conn)
  l = json.loads(r)
  return [e['udp-clients'] for e in l]

def generate_souphttpclientsink(sid, clients):
  base = "s. ! queue ! souphttpclientsink location=\"http://" + PROXY_HOST
  return "{}/{}?type=http,rtp-udp&udp-clients={}\" ".format(base, sid, clients)

conn = http.client.HTTPConnection(PROXY_HOST)
atexit.register(cleanup)

# Add tokens to valid list
for sid, clients in token_table.items():
  add_token(conn, sid, clients).read()
list_tokens(conn).read()

for group in grouper(10, token_table.items(), fillvalue=(None, None)):
  sinks = []
  for sid, clients in group:
    if not sid or not clients:
      continue
    sink = generate_souphttpclientsink(sid, clients)
    sinks.append(sink)
    stream_count += 1
  streamer = GST_LAUNCH_BASE + " ".join(sinks)
  print(streamer)
  processes.append(subprocess.Popen(streamer, shell=True))
  print("Currently streaming {} HTTP streams".format(stream_count))
  time.sleep(2)

[p.wait() for p in processes]