Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def gettmts(tvid, vid):
tm = int(time.time() * 1000)
key = 'd5fb4bd9d50c4be6948c97edd7254b0e'
host = 'https://cache.m.iqiyi.com'
params = {
'src': '76f90cbd92f94a2e925d83e8ccd22cb7',
'sc': md5(str(tm) + key + vid),
't': tm
}
src = '/tmts/{}/{}/?{}'.format(tvid, vid, urlencode(params))
req_url = '{}{}'.format(host, src)
html = get_content(req_url)
return json.loads(html)
'ppt': 0,
'locale': 'zh_cn',
'prio': '{"ff":"f4v","code":2}',
'k_err_retries': 0,
'up': '',
'qd_v': 2,
'tm': tm,
'qdy': 'a',
'qds': 0,
'ut': 0, # 600 bid isn't available
# relative to encode
#'k_ft1': ,
#'k_ft4': ,
#'k_ft5': ,
}
src = '/dash?{}'.format(urlencode(params))
vf = cmd5x(src)
req_url = '{}{}&vf={}'.format(host, src, vf)
html = get_content(req_url)
return json.loads(html)
def get_live_info(q=0):
params = {
'player': 1,
'cid': self.vid,
'platform': 'html5',
'quality': q,
'otype': 'json'
}
data = json.loads(get_content(api_url + urlencode(params)))
assert data['code'] == 0, data['message']
data = data['data']
urls = [random.choice(data['durl'])['url']]
qlt = data['current_quality']
aqlts = [int(x) for x in data['accept_quality']]
size = float('inf')
ext = 'flv'
prf = self.supported_stream_profile[4 - qlt]
st = self.profile_2_type[prf]
if urls and st not in info.streams:
info.stream_types.append(st)
info.streams[st] = {
'container': ext,
'video_profile': prf,
def getvps(tvid, vid):
tm = int(time.time() * 1000)
host = 'http://cache.video.qiyi.com'
params = {
'tvid': tvid,
'vid': vid,
'v': 0,
'qypid': '{}_12'.format(tvid),
'src': '01012001010000000000',
't': tm,
'k_tag': 1,
'k_uid': get_macid(),
'rs': 1,
}
src = '/vps?{}'.format(urlencode(params))
vf = md5x(src)
req_url = '{}{}&vf={}'.format(host, src, vf)
html = get_content(req_url)
return json.loads(html)
size = sum(map(int, data['clipsBytes']))
urls = []
assert len(data['clipsURL']) == len(data['clipsBytes']) == len(data['su'])
for new, ck, in zip(data['su'], data['ck']):
params = {
'ch': data['ch'],
'num': data['num'],
'new': new,
'key': ck,
'uid': uid,
'prod': 'h5n',
'pt': 1,
'pg': 2,
}
if urlparse(new).netloc == '':
cdnurl = 'https://{}/ip?{}'.format(host, urlencode(params))
url = json.loads(get_content(cdnurl))['servers'][0]['url']
else:
url = new
urls.append(url)
video.streams[stream_id] = {
'container': 'mp4',
'video_profile': stream_profile,
'src': urls,
'size' : size
}
video.stream_types.append(stream_id)
def get_live_info(rate=0):
params['rate'] = rate
data = urlencode(params)
if not isinstance(data, bytes):
data = data.encode()
html_content = get_content('https://www.douyu.com/lapi/live/getH5Play/{}'.format(self.vid), data=data)
self.logger.debug(html_content)
live_data = json.loads(html_content)
if live_data['error']:
return live_data['msg']
live_data = live_data["data"]
real_url = '{}/{}'.format(live_data['rtmp_url'], live_data['rtmp_live'])
rate_2_profile = dict((rate['rate'], rate['name']) for rate in live_data['multirates'])
video_profile = rate_2_profile[live_data['rate']]
stream = self.profile_2_id[video_profile]
if stream in info.streams:
return
host = 'https://live.video.iqiyi.com'
params = {
'lp': vid,
'src': '01010031010000000000',
'uid': '',
'rateVers': 'PC_QIYI_3',
'k_uid': get_macid(24),
'qdx': 'n',
'qdv': 3,
'qd_v': 1,
'dfp': get_random_str(66),
'v': 1,
'k_err_retries': 0,
'tm': int(tm + 1),
}
src = '/live?{}'.format(urlencode(params))
vf = cmd5x(src)
req_url = '{}{}&vf={}'.format(host, src, vf)
st = int(tm * 1000)
et = int((tm + 1296000) * 1000)
c_dfp = '__dfp={}@{}@{}'.format(params['dfp'], et, st)
add_header('Cookie', c_dfp)
html = get_content(req_url)
return json.loads(html)
self.vid = match1(html, 'data-vid="(\d+)')
title = match1(html, '<h1 class="video-title">(.+?)</h1>')
info.artist = artist = match1(html, "<div class="video-author">[\s\S]+?<h3>(.+?)</h3>")
info.title = u'{} - {}'.format(title, artist)
t1 = int(time.time() * 1000)
t2 = t1 + random.randrange(5, 10)
rnd = str(random.random()).replace('.', '')
params = {
'callback': 'jQuery1124{}_{}'.format(rnd, t1),
'r': 'vhuyaplay/video',
'vid': self.vid,
'format': 'mp4,m3u8',
'_': t2
}
api_url = 'https://v-api-player-ssl.huya.com/?' + urlencode(params)
data = get_content(api_url)[len(params['callback']) + 1:-1]
self.logger.debug('data:\n%s', data)
data = json.loads(data)
assert data['code'] == 1, data['message']
data = data['result']['items']
for stream_date in data:
ext = stream_date['format']
quality = min(int(q) for q in (stream_date['height'], stream_date['width']))
stream = self.quality_2_id[quality]
if stream not in info.stream_types:
info.stream_types.append(stream)
elif ext == 'm3u8':
# prefer mp4
continue</div>
def prepare(self):
info = VideoInfo(self.name)
self.vid = match1(self.url, 'videoCenterId=(\w+)')
if self.url and not self.vid:
content = get_content(self.url)
self.vid = match1(content, 'guid = "([^"]+)', '"videoCenterId","([^"]+)')
assert self.vid, 'cant find vid'
params = {
'pid': self.vid,
'tsp': int(time.time()),
'vn': 2054,
'pcv': 152438790
}
data = get_content('http://vdn.apps.cntv.cn/api/getHttpVideoInfo.do?' + urlencode(params))
self.logger.debug('data> ' + data)
data = json.loads(data)
video_data = data['video']
info.title = u'{} - {}'.format(data['title'], data['play_channel'])
for chapters, stream_type, profile in self.supported_chapters:
stream_data = video_data.get(chapters)
if stream_data:
urls = []
for v in stream_data:
urls.append(v['url'])
info.stream_types.append(stream_type)
info.streams[stream_type] = {
'container': 'mp4',
'video_profile': profile,