tvbox/py/adult/javxbb.py

214 lines
9.5 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
#author 🍑
import json
import re
import os
import sys
import requests
from requests.exceptions import RequestException
try:
from pyquery import PyQuery as pq
except Exception:
pq = None
from base.spider import Spider
class Spider(Spider):
name = 'Javbobo'
host = 'https://javbobo.com'
def init(self, extend=""):
try:
self.extend = json.loads(extend) if extend else {}
except Exception:
self.extend = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': f'{self.host}/',
'Origin': self.host,
'Connection': 'keep-alive',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
def getName(self):
return self.name
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
return False
def destroy(self):
pass
def homeContent(self, filter):
result = {}
try:
cateManual = [
{'type_name': '日本有碼', 'type_id': '47'},
{'type_name': '日本無碼', 'type_id': '48'},
{'type_name': '國產AV', 'type_id': '49'},
{'type_name': '網紅主播', 'type_id': '50'},
]
result['class'] = cateManual
result['filters'] = {}
except Exception:
pass
return result
def homeVideoContent(self):
return self.categoryContent('', '1', False, {})
def categoryContent(self, tid, pg, filter, extend):
pg = str(pg)
result = {'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999, 'list': []}
try:
url = self.host
if tid:
if str(tid).startswith('http'):
url = str(tid)
if pg != '1': url = f"{url}{'&' if '?' in url else '?'}page={pg}"
elif str(tid).startswith('/'):
url = f"{self.host}{tid}"
if pg != '1': url = f"{url}{'&' if '?' in url else '?'}page={pg}"
else:
url = f"{self.host}/vod/index.html?type_id={tid}"
if pg != '1': url = f"{self.host}/vod/index.html?page={pg}&type_id={tid}"
resp = self.session.get(url, timeout=30)
resp.raise_for_status()
if pq is None: raise RuntimeError('PyQuery 未安装,无法解析列表页面')
doc = pq(resp.text)
def _parse_list(doc):
vlist = []
seen = set()
for a in doc('a[href*="/vod/player.html"]').items():
href = a.attr('href') or ''
if not href: continue
full = href if href.startswith('http') else f"{self.host}{href}"
m = re.search(r'[?&]id=(\d+)', full)
if not m: continue
vid = m.group(1)
if vid in seen: continue
seen.add(vid)
img_el = a('img')
title = img_el.attr('alt') or a.attr('title') or (a.text() or '').strip()
if not title:
li = a.parents('li').eq(0)
title = li.find('h1,h2,h3').text().strip() if li else ''
if not title: title = f"视频{vid}"
img = img_el.attr('src') or img_el.attr('data-src') or ''
if img and not img.startswith('http'): img = f"{self.host}{img}"
vlist.append({
'vod_id': full, 'vod_name': title, 'vod_pic': img, 'vod_remarks': '',
'style': {'ratio': 1.33, 'type': 'rect'}
})
if len(vlist) >= 90: break
return vlist
result['list'] = _parse_list(doc)
page_numbers = []
for a in doc('a[href*="/vod/index.html?page="]').items():
t = (a.text() or '').strip()
if t.isdigit(): page_numbers.append(int(t))
if page_numbers: result['pagecount'] = max(page_numbers)
except Exception:
result['list'] = []
return result
def detailContent(self, ids):
try:
url = ids[0] if isinstance(ids, list) else str(ids)
if not url: return {'list': []}
if not url.startswith('http'): url = f"{self.host}/vod/player.html?id={url}"
resp = self.session.get(url, timeout=30)
resp.raise_for_status()
html = resp.text
if pq is None: raise RuntimeError('PyQuery 未安装,无法解析详情页面')
doc = pq(html)
title = doc('meta[property="og:title"]').attr('content') or doc('h1').text().strip() or 'Javbobo 视频'
vod_pic = doc('meta[property="og:image"]').attr('content') or ''
if not vod_pic:
img_el = doc('img').eq(0)
vod_pic = img_el.attr('src') or img_el.attr('data-src') or ''
if vod_pic and not vod_pic.startswith('http'): vod_pic = f"{self.host}{vod_pic}"
line_id = None
m = re.search(r"lineId\s*=\s*Number\('?(\d+)'?\)", html)
if m: line_id = m.group(1)
if not line_id:
m = re.search(r"var\s+Iyplayer\s*=\s*\{[^}]*id:(\d+)", html)
if m: line_id = m.group(1)
play_id = line_id or url
vod = {
'vod_name': title, 'vod_pic': vod_pic, 'vod_content': '',
'vod_play_from': 'Javbobo', 'vod_play_url': f'正片${play_id}'
}
return {'list': [vod]}
except Exception:
return {'list': []}
def searchContent(self, key, quick, pg="1"):
try:
params = {'wd': key}
url = f"{self.host}/index.html"
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
if pq is None: raise RuntimeError('PyQuery 未安装,无法解析搜索页面')
doc = pq(resp.text)
vlist = []
seen = set()
for a in doc('a[href*="/vod/player.html"]').items():
href = a.attr('href') or ''
if not href: continue
full = href if href.startswith('http') else f"{self.host}{href}"
m = re.search(r'[?&]id=(\d+)', full)
if not m: continue
vid = m.group(1)
if vid in seen: continue
seen.add(vid)
img_el = a('img')
title = img_el.attr('alt') or a.attr('title') or (a.text() or '').strip()
img = img_el.attr('src') or img_el.attr('data-src') or ''
if img and not img.startswith('http'): img = f"{self.host}{img}"
vlist.append({
'vod_id': full, 'vod_name': title or f'视频{vid}', 'vod_pic': img,
'vod_remarks': '', 'style': {'ratio': 1.33, 'type': 'rect'}
})
if len(vlist) >= 60: break
return {'list': vlist, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception:
return {'list': []}
def playerContent(self, flag, id, vipFlags):
try:
line_id = None
sid = str(id or '')
if re.fullmatch(r'\d+', sid):
line_id = sid
elif sid.startswith('http'):
if self.isVideoFormat(sid):
headers = {'User-Agent': self.headers['User-Agent'], 'Referer': f'{self.host}/'}
return {'parse': 0, 'url': sid, 'header': headers}
html = self.session.get(sid, timeout=30).text
m = re.search(r"lineId\s*=\s*Number\('?(\d+)'?\)", html)
if m: line_id = m.group(1)
if not line_id:
m = re.search(r"var\s+Iyplayer\s*=\s*\{[^}]*id:(\d+)", html)
if m: line_id = m.group(1)
else:
if sid.startswith('/'): page_url = f"{self.host}{sid}"
else: page_url = f"{self.host}/vod/player.html?id={sid}"
html = self.session.get(page_url, timeout=30).text
m = re.search(r"lineId\s*=\s*Number\('?(\d+)'?\)", html)
if m: line_id = m.group(1)
if not line_id:
m = re.search(r"var\s+Iyplayer\s*=\s*\{[^}]*id:(\d+)", html)
if m: line_id = m.group(1)
if not line_id: raise ValueError('未能获取到播放线路ID(lineId)')
api = f"{self.host}/openapi/playline/{line_id}"
r = self.session.get(api, timeout=30)
txt = r.text.strip()
j = None
try: j = r.json()
except Exception: j = None
if isinstance(j, str):
try: j = json.loads(j)
except Exception: j = None
if not isinstance(j, dict):
try: j = json.loads(txt)
except Exception: j = {}
m3u8_url = ''
if isinstance(j, dict): m3u8_url = j.get('info', {}).get('file') or j.get('file') or ''
headers = {'User-Agent': self.headers['User-Agent'], 'Referer': f'{self.host}/'}
return {'parse': 0, 'url': m3u8_url, 'header': headers}
except Exception:
return {'parse': 0, 'url': '', 'header': {}}