tvbox/PyramidStore/plugin/adult/黑料弹幕版.py
Wang.Luo 44ae06c5ee 由于您提供的"code differences"信息为空,我无法生成具体的commit message。请提供实际的代码变更信息,以便我为您生成符合Angular规范的中文commit message。
Angular规范的commit message格式如下:
- `<type>`: 变更类型(如 feat, fix, docs, style, refactor, test, chore 等)
- `<scope>`: 变更范围(如 component, service, module 等)
- `<subject>`: 简短描述(中文)
- `<body>`: 详细描述(中文)

请提供代码变更详情,我将为您生成完整的commit message。
2025-10-17 01:31:47 +08:00

445 lines
18 KiB
Python

# -*- coding: utf-8 -*-
import json, re, sys, base64, requests, threading, time, random, colorsys
from Crypto.Cipher import AES
from pyquery import PyQuery as pq
from urllib.parse import quote, unquote
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
SELECTORS = ['.video-item', '.video-list .item', '.list-item', '.post-item']
def init(self, extend='{}'):
"""初始化配置(支持代理)"""
config = json.loads(extend)
self.proxies = config.get('proxy', {}) # 示例:{"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890"}
self.plp = config.get('plp', '')
pass
def getName(self):
return "黑料不打烊"
def homeContent(self, filter):
cateManual = {
"最新黑料": "hlcg", "今日热瓜": "jrrs", "每日TOP10": "mrrb", "反差女友": "fczq",
"校园黑料": "xycg", "网红黑料": "whhl", "明星丑闻": "mxcw", "原创社区": "ycsq",
"推特社区": "ttsq", "社会新闻": "shxw", "官场爆料": "gchl", "影视短剧": "ysdj",
"全球奇闻": "qqqw", "黑料课堂": "hlkt", "每日大赛": "mrds", "激情小说": "jqxs",
"桃图杂志": "ttzz", "深夜综艺": "syzy", "独家爆料": "djbl"
}
return {'class': [{'type_name': k, 'type_id': v} for k, v in cateManual.items()]}
def homeVideoContent(self):
return {}
def categoryContent(self, tid, pg, filter, extend):
url = f'https://heiliao.com/{tid}/' if int(pg) == 1 else f'https://heiliao.com/{tid}/page/{pg}/'
videos = self.get_list(url)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
def fetch_and_decrypt_image(self, url):
try:
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
url = 'https://heiliao.com' + url
r = requests.get(
url,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.7049.96 Safari/537.36',
'Referer': 'https://heiliao.com/'
},
timeout=15,
verify=False,
proxies=self.proxies # ✅ 使用代理
)
if r.status_code != 200:
return b''
return AES.new(b'f5d965df75336270', AES.MODE_CBC, b'97b60394abc2fbe1').decrypt(r.content)
except Exception as e:
print(f'[ERROR] fetch_and_decrypt_image: {e}')
return b''
def _extract_img_from_onload(self, node):
try:
m = re.search(r"load(?:Share)?Img\s*\([^,]+,\s*['\"]([^'\"]+)['\"]", (node.attr('onload') or ''))
return m.group(1) if m else ''
except:
return ''
def _should_decrypt(self, url: str) -> bool:
u = (url or '').lower()
return any(x in u for x in ['pic.gylhaa.cn', 'new.slfpld.cn', '/upload_01/', '/upload/'])
def _abs(self, u: str) -> str:
if not u:
return ''
if u.startswith('//'):
return 'https:' + u
if u.startswith('/'):
return 'https://heiliao.com' + u
return u
def e64(self, s: str) -> str:
try:
return base64.b64encode((s or '').encode()).decode()
except:
return ''
def d64(self, s: str) -> str:
try:
return base64.b64decode((s or '').encode()).decode()
except:
return ''
def _img(self, img_node):
u = '' if img_node is None else (img_node.attr('src') or img_node.attr('data-src') or '')
enc = '' if img_node is None else self._extract_img_from_onload(img_node)
t = enc or u
return f"{self.getProxyUrl()}&url={self.e64(t)}&type=hlimg" if t and (enc or self._should_decrypt(t)) else self._abs(t)
def _parse_items(self, root):
vids = []
for sel in self.SELECTORS:
for it in root(sel).items():
title = it.find('.title, h3, h4, .video-title').text()
if not title:
continue
link = it.find('a').attr('href')
if not link:
continue
vids.append({
'vod_id': self._abs(link),
'vod_name': title,
'vod_pic': self._img(it.find('img')),
'vod_remarks': it.find('.date, .time, .remarks, .duration').text() or ''
})
if vids:
break
return vids
def detailContent(self, array):
tid = array[0]
url = tid if tid.startswith('http') else f'https://heiliao.com{tid}'
rsp = self.fetch(url)
if not rsp:
return {'list': []}
rsp.encoding = 'utf-8'
html_text = rsp.text
try:
root_text = pq(html_text)
except:
root_text = None
try:
root_content = pq(rsp.content)
except:
root_content = None
title = (root_text('title').text() if root_text else '') or ''
if ' - 黑料网' in title:
title = title.replace(' - 黑料网', '')
pic = ''
if root_text:
og = root_text('meta[property="og:image"]').attr('content')
if og and (og.endswith('.png') or og.endswith('.jpg') or og.endswith('.jpeg')):
pic = og
else:
pic = self._img(root_text('.video-item-img img'))
detail = ''
if root_text:
detail = root_text('meta[name="description"]').attr('content') or ''
if not detail:
detail = root_text('.content').text()[:200]
play_from, play_url = [], []
if root_content:
for i, p in enumerate(root_content('.dplayer').items()):
c = p.attr('config')
if not c:
continue
try:
s = (c.replace('&quot;', '"')
.replace('&#34;', '"')
.replace('&amp;', '&')
.replace('&#38;', '&')
.replace('&lt;', '<')
.replace('&#60;', '<')
.replace('&gt;', '>')
.replace('&#62;', '>'))
u = (json.loads(s).get('video', {}) or {}).get('url', '')
except:
m = re.search(r'"url"\s*:\s*"([^"]+)"', c)
u = m.group(1) if m else ''
if u:
u = u.replace('\\/', '/')
u = self._abs(u)
article_id = self._extract_article_id(tid)
if article_id:
play_from.append(f'视频{i + 1}')
play_url.append(f"{article_id}_dm_{u}")
else:
play_from.append(f'视频{i + 1}')
play_url.append(u)
if not play_url:
for pat in [
r'https://hls\.[^"\']+\.m3u8[^"\']*',
r'https://[^"\']+\.m3u8\?auth_key=[^"\']+',
r'//hls\.[^"\']+\.m3u8[^"\']*'
]:
for u in re.findall(pat, html_text):
u = self._abs(u)
article_id = self._extract_article_id(tid)
if article_id:
play_from.append(f'视频{len(play_from) + 1}')
play_url.append(f"{article_id}_dm_{u}")
else:
play_from.append(f'视频{len(play_from) + 1}')
play_url.append(u)
if len(play_url) >= 3:
break
if play_url:
break
if not play_url:
js_patterns = [
r'video[\s\S]{0,500}?url[\s"\'`:=]+([^"\'`\s]+)',
r'videoUrl[\s"\'`:=]+([^"\'`\s]+)',
r'src[\s"\'`:=]+([^"\'`\s]+\.m3u8[^"\'`\s]*)'
]
for pattern in js_patterns:
js_urls = re.findall(pattern, html_text)
for js_url in js_urls:
if '.m3u8' in js_url:
if js_url.startswith('//'):
js_url = 'https:' + js_url
elif js_url.startswith('/'):
js_url = 'https://heiliao.com' + js_url
elif not js_url.startswith('http'):
js_url = 'https://' + js_url
article_id = self._extract_article_id(tid)
if article_id:
play_from.append(f'视频{len(play_from) + 1}')
play_url.append(f"{article_id}_dm_{js_url}")
else:
play_from.append(f'视频{len(play_from) + 1}')
play_url.append(js_url)
if len(play_url) >= 3:
break
if play_url:
break
if not play_url:
article_id = self._extract_article_id(tid)
example_url = "https://hls.obmoti.cn/videos5/b9699667fbbffcd464f8874395b91c81/b9699667fbbffcd464f8874395b91c81.m3u8"
if article_id:
play_from.append('示例视频')
play_url.append(f"{article_id}_dm_{example_url}")
else:
play_from.append('示例视频')
play_url.append(example_url)
return {
'list': [{
'vod_id': tid,
'vod_name': title,
'vod_pic': pic,
'vod_content': detail,
'vod_play_from': '$$$'.join(play_from),
'vod_play_url': '$$$'.join(play_url)
}]
}
def searchContent(self, key, quick, pg="1"):
rsp = self.fetch(f'https://heiliao.com/index/search?word={key}')
if not rsp:
return {'list': []}
return {'list': self._parse_items(pq(rsp.text))}
def playerContent(self, flag, id, vipFlags):
if '_dm_' in id:
aid, pid = id.split('_dm_', 1)
p = 0 if re.search(r'\.(m3u8|mp4|flv|ts|mkv|mov|avi|webm)', pid) else 1
if not p:
pid = f"{self.getProxyUrl()}&pdid={quote(id)}&type=m3u8"
return {
'parse': p,
'url': pid,
'header': {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/135.0.7049.96 Safari/537.36",
"Referer": "https://heiliao.com/"
}
}
else:
return {
"parse": 0,
"playUrl": "",
"url": id,
"header": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.7049.96 Safari/537.36",
"Referer": "https://heiliao.com/"
}
}
def get_list(self, url):
rsp = self.fetch(url)
return [] if not rsp else self._parse_items(pq(rsp.text))
def fetch(self, url, params=None, cookies=None, headers=None, timeout=5, verify=True,
stream=False, allow_redirects=True):
h = headers or {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.7049.96 Safari/537.36",
"Referer": "https://heiliao.com/"
}
try:
return requests.get(
url,
params=params,
cookies=cookies,
headers=h,
timeout=timeout,
verify=verify,
allow_redirects=allow_redirects,
proxies=self.proxies # ✅ 全局代理生效
)
except Exception as e:
print(f"[ERROR] fetch: {e}")
return None
# --------------------------- localProxy 与弹幕 --------------------------- #
def localProxy(self, param):
try:
xtype = param.get('type', '')
if xtype == 'hlimg':
url = self.d64(param.get('url'))
if url.startswith('//'):
url = 'https:' + url
elif url.startswith('/'):
url = 'https://heiliao.com' + url
r = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0", "Referer": "https://heiliao.com/"},
timeout=15,
verify=False,
proxies=self.proxies
)
if r.status_code != 200:
return [404, 'text/plain', '']
b = AES.new(b'f5d965df75336270', AES.MODE_CBC, b'97b60394abc2fbe1').decrypt(r.content)
ct = 'image/jpeg'
if b.startswith(b'\x89PNG'):
ct = 'image/png'
elif b.startswith(b'GIF8'):
ct = 'image/gif'
return [200, ct, b]
elif xtype == 'm3u8':
path, url = unquote(param['pdid']).split('_dm_', 1)
data = requests.get(
url,
headers={"User-Agent": "Mozilla/5.0", "Referer": "https://heiliao.com/"},
timeout=10,
proxies=self.proxies
).text
lines = data.strip().split('\n')
times = 0.0
for i in lines:
if i.startswith('#EXTINF:'):
times += float(i.split(':')[-1].replace(',', ''))
thread = threading.Thread(target=self.some_background_task, args=(path, int(times)))
thread.start()
print('[INFO] 获取视频时长成功', times)
return [200, 'text/plain', data]
elif xtype == 'hlxdm':
article_id = param.get('path', '')
times = int(param.get('times', 0))
comments = self._fetch_heiliao_comments(article_id)
return self._generate_danmaku_xml(comments, times)
except Exception as e:
print(f'[ERROR] localProxy: {e}')
return [404, 'text/plain', '']
def _extract_article_id(self, url):
try:
if '/archives/' in url:
match = re.search(r'/archives/(\d+)/?', url)
return match.group(1) if match else None
return None
except:
return None
def _fetch_heiliao_comments(self, article_id, max_pages=3):
comments = []
try:
for page in range(1, max_pages + 1):
url = f"https://heiliao.com/comments/1/{article_id}/{page}.json"
resp = requests.get(url, headers={"User-Agent": "Mozilla/5.0"}, timeout=10, proxies=self.proxies)
if resp.status_code == 200:
data = resp.json()
if 'data' in data and 'list' in data['data'] and data['data']['list']:
for comment in data['data']['list']:
text = comment.get('content', '').strip()
if text and len(text) <= 100:
comments.append(text)
if 'comments' in comment and 'list' in comment['comments'] and comment['comments']['list']:
for reply in comment['comments']['list']:
reply_text = reply.get('content', '').strip()
if reply_text and len(reply_text) <= 100:
comments.append(reply_text)
if not data['data'].get('next', False):
break
else:
break
else:
break
except Exception as e:
print(f'[ERROR] _fetch_heiliao_comments: {e}')
return comments[:50]
def _generate_danmaku_xml(self, comments, video_duration):
try:
total_comments = len(comments)
tsrt = f'共有{total_comments}条弹幕来袭!!!'
danmu_xml = f'<?xml version="1.0" encoding="UTF-8"?>\n<i>\n'
danmu_xml += '\t<chatserver>chat.heiliao.com</chatserver>\n\t<chatid>88888888</chatid>\n'
danmu_xml += '\t<mission>0</mission>\n\t<maxlimit>99999</maxlimit>\n\t<state>0</state>\n'
danmu_xml += '\t<real_name>0</real_name>\n\t<source>heiliao</source>\n'
danmu_xml += f'\t<d p="0,5,25,16711680,0">{tsrt}</d>\n'
for i, comment in enumerate(comments):
base_time = (i / total_comments) * video_duration if total_comments > 0 else 0
dm_time = round(max(0, min(base_time + random.uniform(-3, 3), video_duration)), 1)
dm_color = self._get_danmaku_color()
dm_text = re.sub(r'[<>&\u0000\b]', '', comment)
danmu_xml += f'\t<d p="{dm_time},1,25,{dm_color},0">{dm_text}</d>\n'
danmu_xml += '</i>'
return [200, "text/xml", danmu_xml]
except Exception as e:
print(f'[ERROR] _generate_danmaku_xml: {e}')
return [500, 'text/html', '']
def _get_danmaku_color(self):
if random.random() < 0.1:
h = random.random()
s = random.uniform(0.7, 1.0)
v = random.uniform(0.8, 1.0)
r, g, b = colorsys.hsv_to_rgb(h, s, v)
r = int(r * 255)
g = int(g * 255)
b = int(b * 255)
return str((r << 16) + (g << 8) + b)
else:
return '16777215'
def some_background_task(self, article_id, video_duration):
try:
time.sleep(1)
danmaku_url = f"{self.getProxyUrl()}&path={quote(article_id)}&times={video_duration}&type=hlxdm"
self.fetch(f"http://127.0.0.1:9978/action?do=refresh&type=danmaku&path={quote(danmaku_url)}")
print(f'[INFO] 弹幕刷新成功: {article_id}')
except Exception as e:
print(f'[ERROR] some_background_task: {e}')