feat(PyramidStore): 初始化项目并添加基础配置文件

添加 .gitignore 忽略子仓库的 .git 目录
添加 LICENSE 文件,使用 GNU General Public License v3.0
添加 README.md 说明文档,包含调试示例、免责声明和配置说明
添加 base/localProxy.py 基础代理配置文件
添加版本控制图片文件(二进制差异)
```
This commit is contained in:
2025-10-23 02:14:43 +08:00
commit 3572e29279
356 changed files with 120993 additions and 0 deletions

349
py/adult/4K数毛.py Normal file
View File

@@ -0,0 +1,349 @@
import requests
from bs4 import BeautifulSoup
import re
from base.spider import Spider
import sys
import json
import base64
import urllib.parse
from Crypto.Cipher import ARC4
from Crypto.Util.Padding import unpad
import binascii
sys.path.append('..')
xurl = "https://www.fullhd.xxx/zh/"
headerx = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36'
}
pm = ''
class Spider(Spider):
global xurl
global headerx
def getName(self):
return "首页"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def extract_middle_text(self, text, start_str, end_str, pl, start_index1: str = '', end_index2: str = ''):
if pl == 3:
plx = []
while True:
start_index = text.find(start_str)
if start_index == -1:
break
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
break
middle_text = text[start_index + len(start_str):end_index]
plx.append(middle_text)
text = text.replace(start_str + middle_text + end_str, '')
if len(plx) > 0:
purl = ''
for i in range(len(plx)):
matches = re.findall(start_index1, plx[i])
output = ""
for match in matches:
match3 = re.search(r'(?:^|[^0-9])(\d+)(?:[^0-9]|$)', match[1])
if match3:
number = match3.group(1)
else:
number = 0
if 'http' not in match[0]:
output += f"#{'📽️' + match[1]}${number}{xurl}{match[0]}"
else:
output += f"#{'📽️' + match[1]}${number}{match[0]}"
output = output[1:]
purl = purl + output + "$$$"
purl = purl[:-3]
return purl
else:
return ""
else:
start_index = text.find(start_str)
if start_index == -1:
return ""
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
return ""
if pl == 0:
middle_text = text[start_index + len(start_str):end_index]
return middle_text.replace("\\", "")
if pl == 1:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
jg = ' '.join(matches)
return jg
if pl == 2:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
new_list = [f'{item}' for item in matches]
jg = '$$$'.join(new_list)
return jg
def homeContent(self, filter):
result = {}
result = {"class": [
{"type_id": "latest-updates", "type_name": "最新视频🌠"},
{"type_id": "top-rated", "type_name": "最佳视频🌠"},
{"type_id": "most-popular", "type_name": "热门影片🌠"},
{"type_id": "networks/brazzers-com", "type_name": "Brazzers🌠"},
{"type_id": "networks/tushy-com", "type_name": "Tushy🌠"},
{"type_id": "networks/naughtyamerica-com", "type_name": "Naughtyamerica🌠"},
{"type_id": "sites/sexmex", "type_name": "Sexmex🌠"},
{"type_id": "sites/passion-hd", "type_name": "Passion-HD🌠"},
{"type_id": "categories/animation", "type_name": "Animation🌠"},
{"type_id": "categories/18-years-old", "type_name": "Teen🌠"},
{"type_id": "categories/pawg", "type_name": "Pawg🌠"},
{"type_id": "categories/thong", "type_name": "Thong🌠"},
{"type_id": "categories/stockings", "type_name": "Stockings🌠"},
{"type_id": "categories/jav-uncensored", "type_name": "JAV🌠"},
{"type_id": "categories/pantyhose", "type_name": "Pantyhose🌠"}
],
}
return result
def homeVideoContent(self):
videos = []
try:
detail = requests.get(url=xurl, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
# Get videos from different sections
sections = {
"latest-updates": "最新视频",
"top-rated": "最佳视频",
"most-popular": "热门影片"
}
for section_id, section_name in sections.items():
section = doc.find('div', id=f"list_videos_videos_watched_right_now_items")
if not section:
continue
vods = section.find_all('div', class_="item")
for vod in vods:
names = vod.find_all('a')
name = names[0]['title'] if names and 'title' in names[0].attrs else section_name
ids = vod.find_all('a')
id = ids[0]['href'] if ids else ""
pics = vod.find('img', class_="lazyload")
pic = pics['data-src'] if pics and 'data-src' in pics.attrs else ""
if pic and 'http' not in pic:
pic = xurl + pic
remarks = vod.find('span', class_="duration")
remark = remarks.text.strip() if remarks else ""
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
result = {'list': videos}
return result
except Exception as e:
print(f"Error in homeVideoContent: {str(e)}")
return {'list': []}
def categoryContent(self, cid, pg, filter, ext):
result = {}
videos = []
try:
if pg and int(pg) > 1:
url = f'{xurl}/{cid}/{pg}/'
else:
url = f'{xurl}/{cid}/'
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
section = doc.find('div', class_="list-videos")
if section:
vods = section.find_all('div', class_="item")
for vod in vods:
names = vod.find_all('a')
name = names[0]['title'] if names and 'title' in names[0].attrs else ""
ids = vod.find_all('a')
id = ids[0]['href'] if ids else ""
pics = vod.find('img', class_="lazyload")
pic = pics['data-src'] if pics and 'data-src' in pics.attrs else ""
if pic and 'http' not in pic:
pic = xurl + pic
remarks = vod.find('span', class_="duration")
remark = remarks.text.strip() if remarks else ""
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
except Exception as e:
print(f"Error in categoryContent: {str(e)}")
result = {
'list': videos,
'page': pg,
'pagecount': 9999,
'limit': 90,
'total': 999999
}
return result
def detailContent(self, ids):
global pm
did = ids[0]
result = {}
videos = []
playurl = ''
if 'http' not in did:
did = xurl + did
res1 = requests.get(url=did, headers=headerx)
res1.encoding = "utf-8"
res = res1.text
content = '👉' + self.extract_middle_text(res,'<h1>','</h1>', 0)
yanuan = self.extract_middle_text(res, '<span>Pornstars:</span>','</div>',1, 'href=".*?">(.*?)</a>')
bofang = did
videos.append({
"vod_id": did,
"vod_actor": yanuan,
"vod_director": '',
"vod_content": content,
"vod_play_from": '💗FullHD💗',
"vod_play_url": bofang
})
result['list'] = videos
return result
def playerContent(self, flag, id, vipFlags):
parts = id.split("http")
xiutan = 0
if xiutan == 0:
if len(parts) > 1:
before_https, after_https = parts[0], 'http' + parts[1]
res = requests.get(url=after_https, headers=headerx)
res = res.text
url2 = self.extract_middle_text(res, '<video', '</video>', 0).replace('\\', '')
soup = BeautifulSoup(url2, 'html.parser')
first_source = soup.find('source')
src_value = first_source.get('src')
response = requests.head(src_value, allow_redirects=False)
if response.status_code == 302:
redirect_url = response.headers['Location']
response = requests.head(redirect_url, allow_redirects=False)
if response.status_code == 302:
redirect_url = response.headers['Location']
result = {}
result["parse"] = xiutan
result["playUrl"] = ''
result["url"] = redirect_url
result["header"] = headerx
return result
def searchContentPage(self, key, quick, page):
result = {}
videos = []
if not page:
page = '1'
if page == '1':
url = f'{xurl}/search/{key}/'
else:
url = f'{xurl}/search/{key}/{str(page)}/'
try:
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
section = doc.find('div', class_="list-videos")
if section:
vods = section.find_all('div', class_="item")
for vod in vods:
names = vod.find_all('a')
name = names[0]['title'] if names and 'title' in names[0].attrs else ""
ids = vod.find_all('a')
id = ids[0]['href'] if ids else ""
pics = vod.find('img', class_="lazyload")
pic = pics['data-src'] if pics and 'data-src' in pics.attrs else ""
if pic and 'http' not in pic:
pic = xurl + pic
remarks = vod.find('span', class_="duration")
remark = remarks.text.strip() if remarks else ""
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
except Exception as e:
print(f"Error in searchContentPage: {str(e)}")
result = {
'list': videos,
'page': page,
'pagecount': 9999,
'limit': 90,
'total': 999999
}
return result
def searchContent(self, key, quick):
return self.searchContentPage(key, quick, '1')
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None

435
py/adult/911.py Normal file
View File

@@ -0,0 +1,435 @@
# -*- coding: utf-8 -*-
import json
import random
import re
import sys
import threading
import time
import requests
from base64 import b64decode, b64encode
from urllib.parse import urlparse, urljoin
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from bs4 import BeautifulSoup
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend="{}"):
config = json.loads(extend)
self.domin = config.get('site', "https://911blw.com")
self.proxies = config.get('proxy', {}) or {}
self.plp = config.get('plp', '')
self.backup_urls = ["https://hlj.fun", "https://911bl16.com"]
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
# 获取最佳主机
self.host = self.host_late([self.domin] + self.backup_urls)
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
# 缓存主机信息
self.getcnh()
def getName(self):
return "911爆料网"
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
def homeContent(self, filter):
result = {}
classes = []
# 分类列表根据911爆料网的实际分类
categories = [
{"type_id": "/category/jrgb/", "type_name": "最新爆料"},
{"type_id": "/category/rmgb/", "type_name": "精选大瓜"},
{"type_id": "/category/blqw/", "type_name": "猎奇吃瓜"},
{"type_id": "/category/rlph/", "type_name": "TOP5大瓜"},
{"type_id": "/category/ssdbl/", "type_name": "社会热点"},
{"type_id": "/category/hjsq/", "type_name": "海角社区"},
{"type_id": "/category/mrds/", "type_name": "每日大赛"},
{"type_id": "/category/xyss/", "type_name": "校园吃瓜"},
{"type_id": "/category/mxhl/", "type_name": "明星吃瓜"},
{"type_id": "/category/whbl/", "type_name": "网红爆料"},
{"type_id": "/category/bgzq/", "type_name": "反差爆料"},
{"type_id": "/category/fljq/", "type_name": "网黄福利"},
{"type_id": "/category/crfys/", "type_name": "午夜剧场"},
{"type_id": "/category/thjx/", "type_name": "探花经典"},
{"type_id": "/category/dmhv/", "type_name": "禁漫天堂"},
{"type_id": "/category/slec/", "type_name": "吃瓜精选"},
{"type_id": "/category/zksr/", "type_name": "重口调教"},
{"type_id": "/category/crlz/", "type_name": "精选连载"}
]
result['class'] = categories
# 首页推荐内容
html = self.fetch_page(f"{self.host}/")
if html:
soup = BeautifulSoup(html, 'html.parser')
articles = soup.select('article, .post-item, .article-item')
result['list'] = self.getlist(articles)
else:
result['list'] = []
return result
def homeVideoContent(self):
# 首页推荐视频
html = self.fetch_page(f"{self.host}/category/jrgb/1/")
videos = self.extract_content(html, f"{self.host}/category/jrgb/1/")
return {'list': videos}
def categoryContent(self, tid, pg, filter, extend):
if '@folder' in tid:
# 文件夹类型内容
id = tid.replace('@folder', '')
videos = self.getfod(id)
else:
# 普通分类内容
url = f"{self.host}{tid}{pg}/" if pg != "1" else f"{self.host}{tid}"
html = self.fetch_page(url)
if html:
soup = BeautifulSoup(html, 'html.parser')
articles = soup.select('article, .post-item, .article-item, ul.row li')
videos = self.getlist(articles, tid)
else:
videos = []
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 1 if '@folder' in tid else 99999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
url = ids[0] if ids[0].startswith("http") else f"{self.host}{ids[0]}"
html = self.fetch_page(url)
if not html:
return {'list': []}
soup = BeautifulSoup(html, 'html.parser')
vod = {'vod_play_from': '911爆料网'}
try:
# 提取标签信息
clist = []
tags = soup.select('.tags .keywords a, .tagcloud a, a[rel="tag"]')
for tag in tags:
title = tag.get_text(strip=True)
href = tag.get('href', '')
if href and title:
clist.append('[a=cr:' + json.dumps({'id': href, 'name': title}) + '/]' + title + '[/a]')
vod['vod_content'] = '点击展开↓↓↓\n'+' '.join(clist) if clist else soup.select_one('.post-content, .entry-content').get_text(strip=True)[:200] + '...'
except:
title_elem = soup.select_one('h1, .post-title, .entry-title')
vod['vod_content'] = title_elem.get_text(strip=True) if title_elem else "无简介"
try:
# 提取播放列表类似51吸瓜的dplayer方式
plist = []
# 方式1检查dplayer
dplayers = soup.select('.dplayer, [data-config]')
for c, player in enumerate(dplayers, start=1):
config_str = player.get('data-config', '{}')
try:
config = json.loads(config_str)
if 'video' in config and 'url' in config['video']:
plist.append(f"视频{c}${config['video']['url']}")
except:
pass
# 方式2检查视频标签
if not plist:
video_tags = soup.select('video source, video[src]')
for c, video in enumerate(video_tags, start=1):
src = video.get('src') or ''
if src:
plist.append(f"视频{c}${src}")
# 方式3检查iframe
if not plist:
iframes = soup.select('iframe[src]')
for c, iframe in enumerate(iframes, start=1):
src = iframe.get('src', '')
if src and ('player' in src or 'video' in src):
plist.append(f"视频{c}${src}")
# 方式4从脚本中提取
if not plist:
scripts = soup.find_all('script')
for script in scripts:
if script.string:
# 查找m3u8、mp4等视频链接
video_matches = re.findall(r'(https?://[^\s"\']*\.(?:m3u8|mp4|flv|ts|mkv)[^\s"\']*)', script.string)
for c, match in enumerate(video_matches, start=1):
plist.append(f"视频{c}${match}")
vod['vod_play_url'] = '#'.join(plist) if plist else f"请检查页面,可能没有视频${url}"
except Exception as e:
print(f"详情页解析错误: {e}")
vod['vod_play_url'] = f"解析错误${url}"
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
url = f"{self.host}/search/{key}/{pg}/"
html = self.fetch_page(url)
if html:
soup = BeautifulSoup(html, 'html.parser')
articles = soup.select('article, .post-item, .article-item, ul.row li')
videos = self.getlist(articles)
else:
videos = []
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
def playerContent(self, flag, id, vipFlags):
# 判断是否为直接播放的视频格式
p = 0 if re.search(r'\.(m3u8|mp4|flv|ts|mkv|mov|avi|webm)', id) else 1
return {'parse': p, 'url': f"{self.plp}{id}", 'header': self.headers}
def localProxy(self, param):
try:
url = self.d64(param['url'])
match = re.search(r"loadBannerDirect\('([^']*)'", url)
if match:
url = match.group(1)
res = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
# 检查是否需要AES解密根据文件类型判断
if url.endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
# 普通图片直接返回
return [200, res.headers.get('Content-Type'), res.content]
else:
# 加密内容进行AES解密
return [200, res.headers.get('Content-Type'), self.aesimg(res.content)]
except Exception as e:
print(f"图片代理错误: {str(e)}")
return [500, 'text/html', '']
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def aesimg(self, word):
key = b'f5d965df75336270'
iv = b'97b60394abc2fbe1'
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(word), AES.block_size)
return decrypted
def fetch_page(self, url, use_backup=False):
original_url = url
if use_backup:
for backup in self.backup_urls:
test_url = url.replace(self.domin, backup)
try:
time.sleep(1)
res = requests.get(test_url, headers=self.headers, proxies=self.proxies, timeout=10)
res.raise_for_status()
res.encoding = "utf-8"
text = res.text
if len(text) > 1000:
print(f"[DEBUG] 使用备用 {backup}: {test_url}")
return text
except:
continue
try:
time.sleep(1)
res = requests.get(original_url, headers=self.headers, proxies=self.proxies, timeout=10)
res.raise_for_status()
res.encoding = "utf-8"
text = res.text
if len(text) < 1000:
print(f"[DEBUG] 内容过短,尝试备用域名")
return self.fetch_page(original_url, use_backup=True)
return text
except Exception as e:
print(f"[ERROR] 请求失败 {original_url}: {e}")
return None
def getcnh(self):
try:
html = self.fetch_page(f"{self.host}/about.html")
if html:
soup = BeautifulSoup(html, 'html.parser')
link = soup.select_one('a[href]')
if link:
url = link.get('href')
parsed_url = urlparse(url)
host = parsed_url.scheme + "://" + parsed_url.netloc
self.setCache('host_911blw', host)
except Exception as e:
print(f"获取主机信息错误: {str(e)}")
def host_late(self, url_list):
if not url_list:
return self.domin
results = {}
threads = []
def test_host(url):
try:
start_time = time.time()
response = requests.head(url, headers=self.headers, proxies=self.proxies, timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
except Exception as e:
results[url] = float('inf')
for url in url_list:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def getfod(self, id):
url = f"{self.host}{id}"
html = self.fetch_page(url)
if not html:
return []
soup = BeautifulSoup(html, 'html.parser')
videos = []
# 查找文件夹内容
content = soup.select_one('.post-content, .entry-content')
if content:
# 移除不需要的元素
for elem in content.select('.txt-apps, .line, blockquote, .tags, .content-tabs'):
elem.decompose()
# 提取标题和链接
headings = content.select('h2, h3, h4')
paragraphs = content.select('p')
for i, heading in enumerate(headings):
title = heading.get_text(strip=True)
if i < len(paragraphs):
link = paragraphs[i].select_one('a')
if link:
videos.append({
'vod_id': link.get('href', ''),
'vod_name': link.get_text(strip=True),
'vod_pic': f"{self.getProxyUrl()}&url={self.e64(link.get('data-img', ''))}",
'vod_remarks': title
})
return videos
def getlist(self, articles, tid=''):
videos = []
is_folder = '/mrdg' in tid
for article in articles:
try:
# 标题
title_elem = article.select_one('h2, h3, .headline, .title, a[title]')
name = title_elem.get_text(strip=True) if title_elem else ""
# 链接
link_elem = article.select_one('a')
href = link_elem.get('href', '') if link_elem else ""
# 日期/备注
date_elem = article.select_one('time, .date, .published')
remarks = date_elem.get_text(strip=True) if date_elem else ""
# 图片(使用吸瓜的方式)
pic = None
script_elem = article.select_one('script')
if script_elem and script_elem.string:
base64_match = re.search(r'base64,[\'"]?([A-Za-z0-9+/=]+)[\'"]?', script_elem.string)
if base64_match:
encoded_url = base64_match.group(1)
pic = f"{self.getProxyUrl()}&url={self.e64(encoded_url)}"
if not pic:
img_elem = article.select_one('img[data-xkrkllgl]')
if img_elem and img_elem.get('data-xkrkllgl'):
encoded_url = img_elem.get('data-xkrkllgl')
pic = f"{self.getProxyUrl()}&url={self.e64(encoded_url)}"
if not pic:
img_elem = article.select_one('img')
if img_elem:
for attr in ["data-lazy-src", "data-original", "data-src", "src"]:
pic = img_elem.get(attr)
if pic:
pic = urljoin(self.host, pic)
break
if name and href:
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}",
'vod_name': name.replace('\n', ' '),
'vod_pic': pic,
'vod_remarks': remarks,
'vod_tag': 'folder' if is_folder else '',
'style': {"type": "rect", "ratio": 1.33}
})
except Exception as e:
print(f"列表项解析错误: {e}")
continue
return videos
if __name__ == "__main__":
spider = Spider()
spider.init('{"site": "https://911blw.com"}')
# 测试首页
result = spider.homeContent({})
print(f"首页分类: {len(result['class'])}")
print(f"首页内容: {len(result['list'])}")
# 测试分类
result = spider.categoryContent("/category/jrgb/", "1", False, {})
print(f"分类内容: {len(result['list'])}")
# 测试搜索
result = spider.searchContent("测试", False, "1")
print(f"搜索结果: {len(result['list'])}")

84
py/adult/91RB.py Normal file
View File

@@ -0,0 +1,84 @@
#author Kyle
import re, sys, time, urllib.parse
sys.path.append('..')
from base.spider import Spider as BaseSpider
class Spider(BaseSpider):
def __init__(self):
super().__init__(); self.base = 'https://www.91rb.com'; self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Referer': self.base + '/'}
def getName(self): return '91热爆'
def init(self, extend=""): self.extend = extend or ''; return {'class': 'movie'}
def isVideoFormat(self, url): return bool(re.search(r'\.(m3u8|mp4)(\?|$)', url))
def manualVideoCheck(self): return False
def destroy(self): pass
def homeContent(self, filter): return {'class': [{'type_name': '最新上传', 'type_id': 'latest-updates'}, {'type_name': '热门视频', 'type_id': 'most-popular'}, {'type_name': '收藏最多', 'type_id': 'most-favourited'}, {'type_name': '日本AV', 'type_id': 'tags/av2/'}, {'type_name': 'jav', 'type_id': 'tags/jav/'}, {'type_name': '韩国', 'type_id': 'tags/20c3f16d021b069ce3af1da50b15bd83/'}]}
def homeVideoContent(self):
try: return self._listPage(self._buildListUrl('latest-updates', '1'))
except Exception as e: self.log(f'homeVideoContent error: {e}'); return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
try: return self._listPage(self._buildListUrl(tid, pg), page=pg)
except Exception as e: self.log(f'categoryContent error: {e}'); return {'list': [], 'page': pg, 'pagecount': 1, 'limit': 48, 'total': 0}
def detailContent(self, ids):
vid = self._ensure_id(ids[0]); detail_url = f"{self.base}/videos/{vid}/"; name = f'视频 {vid}'; pic = ''
try:
r = self.fetch(detail_url, headers=self.headers, timeout=10, allow_redirects=True)
if r and hasattr(r, 'text'):
doc = self.html(r.text)
if doc:
name = ''.join(doc.xpath('//h1//text()')).strip() or name
ogs = doc.xpath('//meta[@property="og:image"]/@content'); tws = doc.xpath('//meta[@name="twitter:image"]/@content')
pic = ogs[0].strip() if ogs else (tws[0].strip() if tws else '')
if pic: pic = self._abs_url(pic)
except Exception as e: self.log(f'detailContent fetch error: {e}')
if not pic: pic = self._cover_fallback(vid)
vod = {'vod_id': str(vid), 'vod_name': name, 'vod_pic': pic, 'type_name': '', 'vod_year': '', 'vod_remarks': '', 'vod_content': '', 'vod_play_from': '91RB', 'vod_play_url': f'正片${vid}'}
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
key_enc = urllib.parse.quote(key); url = f"{self.base}/search/{key_enc}/"
if pg != '1': url = url.rstrip('/') + f'/{pg}/'
try: return self._listPage(url, page=pg)
except Exception as e: self.log(f'searchContent error: {e}'); return {'list': [], 'page': pg, 'pagecount': 1, 'total': 0}
def playerContent(self, flag, id, vipFlags):
vid = self._ensure_id(id); group = int(vid) - (int(vid) % 1000)
m3u8 = f"https://91rbnet.gslb-al.com/hls/contents/videos/{group}/{vid}/{vid}.mp4/index.m3u8"
try:
r = self.fetch(m3u8, headers=self.headers, timeout=5, allow_redirects=True, verify=True, stream=True)
if r.status_code >= 400: self.log(f'm3u8 head status={r.status_code}, fallback to direct anyway')
except Exception as e: self.log(f'playerContent HEAD error: {e}')
return {'parse': 0, 'playUrl': '', 'url': m3u8, 'header': self.headers}
def localProxy(self, param): return None
def _buildListUrl(self, tid, pg):
path = tid.strip('/') or 'latest-updates'; page_suffix = f"/{pg}/" if str(pg) != '1' else '/'
if path.startswith('categories') or path in ['latest-updates', 'most-popular', 'most-favourited']: return f"{self.base}/{path}{page_suffix}"
return f"{self.base}/{path}{page_suffix}"
def _abs_url(self, url):
if not url: return url
u = url.strip()
return 'https:' + u if u.startswith('//') else (self.base + u if u.startswith('/') else u)
def _parse_srcset_first(self, srcset):
if not srcset: return ''
return srcset.split(',')[0].strip().split(' ')[0]
def _cover_fallback(self, vid):
try: iv = int(vid); group = iv - (iv % 1000); return f'https://rimg.iomycdn.com/videos_screenshots/{group}/{iv}/preview.jpg'
except Exception: return ''
def _listPage(self, url, page='1'):
doc = self.html(self.fetch(url, headers=self.headers, timeout=10).text)
if doc is None: return {'list': [], 'page': page, 'pagecount': 1, 'total': 0}
nodes, videos, seen = doc.xpath('//main//a[contains(@href, "/videos/")]'), [], set()
for a in nodes:
href = a.get('href') or ''; m = re.search(r'/videos/(\d+)/', href)
if not m or '/login' in href: continue
vid = m.group(1);
if vid in seen: continue
seen.add(vid); title = ''; img = a.xpath('.//img')
if img:
im = img[0]; title = (im.get('alt') or '').strip()
pic = (im.get('src') or im.get('data-src') or im.get('data-original') or '').strip()
if not pic: pic = self._parse_srcset_first(im.get('data-srcset') or im.get('srcset') or '')
pic = self._abs_url(pic)
else: title = (a.text or '').strip(); pic = ''
title = title or f'视频 {vid}'
if not pic or pic.startswith('data:'): pic = self._cover_fallback(vid)
videos.append({'vod_id': vid, 'vod_name': title, 'vod_pic': pic, 'vod_remarks': ''})
return {'list': videos, 'page': str(page), 'pagecount': 9999, 'limit': 48, 'total': 0}
def _ensure_id(self, s):
m = re.search(r'(\d+)', str(s)); return m.group(1) if m else str(s)

118
py/adult/cam4.py Normal file
View File

@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
import json
import time
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "Cam4直播"
def init(self, extend=""):
self.base = "https://zh.cam4.com"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
}
return self
def homeContent(self, filter):
classes = [
{"type_id": "all", "type_name": "全部"},
{"type_id": "female", "type_name": "女性"},
{"type_id": "male", "type_name": "男性"},
{"type_id": "couples", "type_name": "情侣"},
{"type_id": "shemale", "type_name": "变性"},
]
return {"class": classes}
def categoryContent(self, tid, pg, filter, extend):
if not pg:
pg = 1
params = f"?directoryJson=true&online=true&url=true&page={pg}"
if tid == "female":
params += "&gender=female"
elif tid == "male":
params += "&gender=male"
elif tid == "couples":
params += "&broadcastType=male_female_group"
elif tid == "shemale":
params += "&gender=shemale"
url = f"{self.base}/directoryCams{params}"
rsp = self.fetch(url, headers=self.headers)
data = rsp.text
try:
jRoot = json.loads(data)
except:
return {"list": []}
videos = []
for u in jRoot.get("users", []):
title = f"{u.get('username')} ({u.get('countryCode', '')})"
if "age" in u:
title += f" - {u['age']}"
if "resolution" in u:
res = u["resolution"].split(":")[-1]
title += f" [HD:{res}]"
video = {
"vod_id": u.get("hlsPreviewUrl"),
"vod_name": title,
"vod_pic": u.get("snapshotImageLink", ""),
"vod_remarks": u.get("statusMessage", ""),
}
videos.append(video)
result = {
"list": videos,
"page": int(pg),
"pagecount": 9999,
"limit": 90,
"total": len(videos)
}
return result
def detailContent(self, ids):
id = ids[0]
vod = {
"vod_id": id,
"vod_name": "Cam4直播",
"vod_pic": "",
"vod_play_from": "Cam4",
"vod_play_url": f"直播源${id}",
}
return {"list": [vod]}
def playerContent(self, flag, id, vipFlags):
play_url = id
return {
"parse": 0,
"playUrl": "",
"url": play_url,
"header": self.headers
}
def searchContent(self, key, quick, pg="1"):
url = f"{self.base}/directoryCams?directoryJson=true&online=true&url=true&showTag={key}&page={pg}"
rsp = self.fetch(url, headers=self.headers)
data = rsp.text
try:
jRoot = json.loads(data)
except:
return {"list": []}
videos = []
for u in jRoot.get("users", []):
title = f"{u.get('username')} ({u.get('countryCode', '')})"
video = {
"vod_id": u.get("hlsPreviewUrl"),
"vod_name": title,
"vod_pic": u.get("snapshotImageLink", ""),
"vod_remarks": u.get("statusMessage", ""),
}
videos.append(video)
return {"list": videos}
def isVideoFormat(self, url):
return ".m3u8" in url
def manualVideoCheck(self):
return True

214
py/adult/javxbb.py Normal file
View File

@@ -0,0 +1,214 @@
# -*- coding: utf-8 -*-
#author 🍑
import json
import re
import os
import sys
import requests
from requests.exceptions import RequestException
try:
from pyquery import PyQuery as pq
except Exception:
pq = None
from base.spider import Spider
class Spider(Spider):
name = 'Javbobo'
host = 'https://javbobo.com'
def init(self, extend=""):
try:
self.extend = json.loads(extend) if extend else {}
except Exception:
self.extend = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:142.0) Gecko/20100101 Firefox/142.0',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Referer': f'{self.host}/',
'Origin': self.host,
'Connection': 'keep-alive',
}
self.session = requests.Session()
self.session.headers.update(self.headers)
def getName(self):
return self.name
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
return False
def destroy(self):
pass
def homeContent(self, filter):
result = {}
try:
cateManual = [
{'type_name': '日本有碼', 'type_id': '47'},
{'type_name': '日本無碼', 'type_id': '48'},
{'type_name': '國產AV', 'type_id': '49'},
{'type_name': '網紅主播', 'type_id': '50'},
]
result['class'] = cateManual
result['filters'] = {}
except Exception:
pass
return result
def homeVideoContent(self):
return self.categoryContent('', '1', False, {})
def categoryContent(self, tid, pg, filter, extend):
pg = str(pg)
result = {'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999, 'list': []}
try:
url = self.host
if tid:
if str(tid).startswith('http'):
url = str(tid)
if pg != '1': url = f"{url}{'&' if '?' in url else '?'}page={pg}"
elif str(tid).startswith('/'):
url = f"{self.host}{tid}"
if pg != '1': url = f"{url}{'&' if '?' in url else '?'}page={pg}"
else:
url = f"{self.host}/vod/index.html?type_id={tid}"
if pg != '1': url = f"{self.host}/vod/index.html?page={pg}&type_id={tid}"
resp = self.session.get(url, timeout=30)
resp.raise_for_status()
if pq is None: raise RuntimeError('PyQuery 未安装,无法解析列表页面')
doc = pq(resp.text)
def _parse_list(doc):
vlist = []
seen = set()
for a in doc('a[href*="/vod/player.html"]').items():
href = a.attr('href') or ''
if not href: continue
full = href if href.startswith('http') else f"{self.host}{href}"
m = re.search(r'[?&]id=(\d+)', full)
if not m: continue
vid = m.group(1)
if vid in seen: continue
seen.add(vid)
img_el = a('img')
title = img_el.attr('alt') or a.attr('title') or (a.text() or '').strip()
if not title:
li = a.parents('li').eq(0)
title = li.find('h1,h2,h3').text().strip() if li else ''
if not title: title = f"视频{vid}"
img = img_el.attr('src') or img_el.attr('data-src') or ''
if img and not img.startswith('http'): img = f"{self.host}{img}"
vlist.append({
'vod_id': full, 'vod_name': title, 'vod_pic': img, 'vod_remarks': '',
'style': {'ratio': 1.33, 'type': 'rect'}
})
if len(vlist) >= 90: break
return vlist
result['list'] = _parse_list(doc)
page_numbers = []
for a in doc('a[href*="/vod/index.html?page="]').items():
t = (a.text() or '').strip()
if t.isdigit(): page_numbers.append(int(t))
if page_numbers: result['pagecount'] = max(page_numbers)
except Exception:
result['list'] = []
return result
def detailContent(self, ids):
try:
url = ids[0] if isinstance(ids, list) else str(ids)
if not url: return {'list': []}
if not url.startswith('http'): url = f"{self.host}/vod/player.html?id={url}"
resp = self.session.get(url, timeout=30)
resp.raise_for_status()
html = resp.text
if pq is None: raise RuntimeError('PyQuery 未安装,无法解析详情页面')
doc = pq(html)
title = doc('meta[property="og:title"]').attr('content') or doc('h1').text().strip() or 'Javbobo 视频'
vod_pic = doc('meta[property="og:image"]').attr('content') or ''
if not vod_pic:
img_el = doc('img').eq(0)
vod_pic = img_el.attr('src') or img_el.attr('data-src') or ''
if vod_pic and not vod_pic.startswith('http'): vod_pic = f"{self.host}{vod_pic}"
line_id = None
m = re.search(r"lineId\s*=\s*Number\('?(\d+)'?\)", html)
if m: line_id = m.group(1)
if not line_id:
m = re.search(r"var\s+Iyplayer\s*=\s*\{[^}]*id:(\d+)", html)
if m: line_id = m.group(1)
play_id = line_id or url
vod = {
'vod_name': title, 'vod_pic': vod_pic, 'vod_content': '',
'vod_play_from': 'Javbobo', 'vod_play_url': f'正片${play_id}'
}
return {'list': [vod]}
except Exception:
return {'list': []}
def searchContent(self, key, quick, pg="1"):
try:
params = {'wd': key}
url = f"{self.host}/index.html"
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
if pq is None: raise RuntimeError('PyQuery 未安装,无法解析搜索页面')
doc = pq(resp.text)
vlist = []
seen = set()
for a in doc('a[href*="/vod/player.html"]').items():
href = a.attr('href') or ''
if not href: continue
full = href if href.startswith('http') else f"{self.host}{href}"
m = re.search(r'[?&]id=(\d+)', full)
if not m: continue
vid = m.group(1)
if vid in seen: continue
seen.add(vid)
img_el = a('img')
title = img_el.attr('alt') or a.attr('title') or (a.text() or '').strip()
img = img_el.attr('src') or img_el.attr('data-src') or ''
if img and not img.startswith('http'): img = f"{self.host}{img}"
vlist.append({
'vod_id': full, 'vod_name': title or f'视频{vid}', 'vod_pic': img,
'vod_remarks': '', 'style': {'ratio': 1.33, 'type': 'rect'}
})
if len(vlist) >= 60: break
return {'list': vlist, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception:
return {'list': []}
def playerContent(self, flag, id, vipFlags):
try:
line_id = None
sid = str(id or '')
if re.fullmatch(r'\d+', sid):
line_id = sid
elif sid.startswith('http'):
if self.isVideoFormat(sid):
headers = {'User-Agent': self.headers['User-Agent'], 'Referer': f'{self.host}/'}
return {'parse': 0, 'url': sid, 'header': headers}
html = self.session.get(sid, timeout=30).text
m = re.search(r"lineId\s*=\s*Number\('?(\d+)'?\)", html)
if m: line_id = m.group(1)
if not line_id:
m = re.search(r"var\s+Iyplayer\s*=\s*\{[^}]*id:(\d+)", html)
if m: line_id = m.group(1)
else:
if sid.startswith('/'): page_url = f"{self.host}{sid}"
else: page_url = f"{self.host}/vod/player.html?id={sid}"
html = self.session.get(page_url, timeout=30).text
m = re.search(r"lineId\s*=\s*Number\('?(\d+)'?\)", html)
if m: line_id = m.group(1)
if not line_id:
m = re.search(r"var\s+Iyplayer\s*=\s*\{[^}]*id:(\d+)", html)
if m: line_id = m.group(1)
if not line_id: raise ValueError('未能获取到播放线路ID(lineId)')
api = f"{self.host}/openapi/playline/{line_id}"
r = self.session.get(api, timeout=30)
txt = r.text.strip()
j = None
try: j = r.json()
except Exception: j = None
if isinstance(j, str):
try: j = json.loads(j)
except Exception: j = None
if not isinstance(j, dict):
try: j = json.loads(txt)
except Exception: j = {}
m3u8_url = ''
if isinstance(j, dict): m3u8_url = j.get('info', {}).get('file') or j.get('file') or ''
headers = {'User-Agent': self.headers['User-Agent'], 'Referer': f'{self.host}/'}
return {'parse': 0, 'url': m3u8_url, 'header': headers}
except Exception:
return {'parse': 0, 'url': '', 'header': {}}

329
py/adult/javxx.py Normal file
View File

@@ -0,0 +1,329 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import gzip
import html
import json
import re
import sys
import base64
from base64 import b64decode
from urllib.parse import unquote, urlparse
import requests
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend='{}'):
config = json.loads(extend)
self.proxies = config.get('proxy', {})
self.plp = config.get('plp', '')
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = 'https://javxx.com'
contr = 'cn'
conh = f'{host}/{contr}'
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'referer': f'{conh}/',
'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36',
}
gcate = 'H4sIAAAAAAAAA6tWejan4dm0DUpWCkp5qeVKOkrPm9e+nL4CxM/ILwHygfIv9k8E8YtSk1PzwELTFzxf0AgSKs0DChXnF6WmwIWfbW55OWcTqqRuTmpiNljN8427n3asBsmmp+YVpRaDtO2Z8nTiDJBQYnIJUKgYLPq0Y9uTvXOeTm0DSeQCdReBRJ9vBmqfDhIqTi3KhGhf0P587T6QUElierFSLQCk4MAf0gAAAA=='
flts = 'H4sIAAAAAAAAA23QwYrCMBAG4FeRnH0CX0WKBDJiMRpoY0WkIOtFXLQU1IoEFFHWw4qHPazgii/TRPctNKK1Ro/zz8cM/PkmKkMD5TLIZQ5HWVTFFUiNHqY1PeebyNOxAxSwCwWCOWitMxmEcttW0VKJKfKzN4kJAfLk1O9OdmemKzF+B8f2+j9aPVacEdwoeDbU3TuJd93LgdPXx1F8PmAdoEwNqTaBDFemrLAqL72hSnReqcuvDkgCRUsGkfqenw59AxaxxxybP9uRuFjkW5reai7alIOTKjoJzKoxpUnDvWG8bcnlj/obyHCcKi95JxeTeN9LEcu3zoYr9GndAQAA'
actft = 'H4sIAAAAAAAAA22UTUsbURSG/0qYtQMxZvIhIvidxI/oVpEy6GiCmpFkEhEpVBcqikYprV2kG6GkhYK2XRbxzziT+C88c2/OnLnnunznec47zJ3LWTsydpxDYzRhVJzqdsUzhoyavecoD1r2bjN8snZktEIwPJI0h0fSoRqL/vW33p9/xsehyLLgcZ4sETUrDcNp6pJRt2A4TV0yapYFwxZ1yahbMGxRl4yalYHhDHXJqFswnKEuGTUrC8NZ6pJRt2A4S10yalYOhnPUJaNuwXCOumTUrDwM56lLRrTWQ29wNzaa+7GLIRO/FRPYM9F7+hV8f6D3TCKZ5GQKyRQn00imOZlBMsPJLJJZTuaQzHFSQFLgpIikyEkJSYmTeSTznCwgWeBkEckiJ0tIljgpIylzsoxkmZMVJCucrCJZRRL/9/a2E/v3MvF/H14cLBlLpJL+32OqTyXNVHTJRFCxZaaiYREUDMuFVo0IKrZM2jEiKBjWCS0XEVRsmbRVRFAwLBBaJyIoGHZCPpoeT2TkZ8fPruHW4xt1EPnpCTyo8buf/ZsreseG26x5CPvd09f72+DL4+tZmxTP3bQPP7SqzkEDxZf/F8Hdj373pNe5JPHAcXZ2mRk8tP3bn9zcc2te5R016JzrasMTnrMZiZ1Pfvsu+H3ff75m4pbdcutVT3W/dsAND279DSxD8pmOBgAA'
def homeContent(self, filter):
data = self.getpq(requests.get(f"{self.conh}", headers=self.headers, proxies=self.proxies).text)
result = {}
cate = self.ungzip(self.gcate)
classes = []
filters = {}
for k, j in cate.items():
classes.append({
'type_name': k,
'type_id': j
})
if j == 'actresses':
fts = self.ungzip(self.actft)
else:
fts = self.ungzip(self.flts)
filters[j] = fts
result['class'] = classes
result['filters'] = filters
result['list'] = self.getvl(data('.vid-items .item'))
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
videos = []
if tid in ['genres', 'makers', 'series', 'tags']:
gggg = tid if tid == 'series' else tid[:-1]
pagecount = 1
data = self.getpq(requests.get(f"{self.conh}/{tid}", headers=self.headers, proxies=self.proxies).text)
for i in data(f'.term-items.{gggg} .item').items():
videos.append({
'vod_id': i('a').attr('href'),
'vod_name': i('h2').text(),
'vod_remarks': i('.meta').text(),
'vod_tag': 'folder',
'style': {"type": "rect", "ratio": 2}
})
elif tid == 'actresses':
params = {
'height': extend.get('height'),
"cup": extend.get('cup'),
"sort": extend.get('sort'),
'age': extend.get('age'),
"page": pg
}
c_params = {k: v for k, v in params.items() if v}
data = self.getpq(
requests.get(f"{self.conh}/{tid}", headers=self.headers, params=c_params, proxies=self.proxies).text)
pagecount = self.getpgc(data('ul.pagination li').eq(-1))
for i in data('.chanel-items .item').items():
i = i('.main')
videos.append({
'vod_id': i('.info a').attr('href'),
'vod_name': i('.info h2').text(),
'vod_pic': i('.avatar img').attr('src'),
'vod_year': i('.meta div div').eq(-1).text(),
'vod_remarks': i('.meta div div').eq(0).text(),
'vod_tag': 'folder',
'style': {"type": "oval", "ratio": 0.75}
})
else:
tid = tid.split('_click')[0].replace(f"/{self.contr}/", "")
params = {
"filter": extend.get('filter'),
"sort": extend.get('sort'),
"page": pg
}
c_params = {k: v for k, v in params.items() if v}
data = self.getpq(
requests.get(f"{self.conh}/{tid}", params=c_params, headers=self.headers, proxies=self.proxies).text)
videos = self.getvl(data('.vid-items .item'))
pagecount = self.getpgc(data('ul.pagination li').eq(-1))
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = pagecount
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data = self.getpq(requests.get(f"{self.host}{ids[0]}", headers=self.headers, proxies=self.proxies).text)
dv = data('#video-details')
pnpn = {
'老僧酿酒、名妓读经': f"{data('#video-info h1').text()}${data('#video-files div').attr('data-url')}",
'书生玩剑': '#'.join(
[f"{i('.info .title span').eq(-1).text()}$_gggb_{i('.info .title').attr('href')}" for i in
data('.main .vid-items .item').items()]),
'将军作文': '#'.join([f"{i('.info .title span').eq(-1).text()}$_gggb_{i('.info .title').attr('href')}" for i in
data('.vid-items.side .item').items()])
}
n, p = [], []
for k, v in pnpn.items():
if v:
n.append(k)
p.append(v)
vod = {
'vod_content': dv('.content').text(),
'vod_play_from': '$$$'.join(n),
'vod_play_url': '$$$'.join(p)
}
a, b, c, d = [], [], [], []
for i in dv('.meta div').items():
if re.search(r'发布日期', i('label').text()):
vod['vod_year'] = i('span').text()
elif re.search(r'演员', i('label').text()):
a.extend(['[a=cr:' + json.dumps(
{'id': f"{j.attr('href')}_click", 'name': j.text()}) + '/]' + j.text() + '[/a]' for j in
i('a').items()])
elif re.search(r'制作商|系列', i('label').text()):
b.extend(['[a=cr:' + json.dumps(
{'id': f"{j.attr('href')}_click", 'name': j.text()}) + '/]' + j.text() + '[/a]' for j in
i('a').items()])
elif re.search(r'标签', i('label').text()):
c.extend(['[a=cr:' + json.dumps(
{'id': f"{j.attr('href')}_click", 'name': j.text()}) + '/]' + j.text() + '[/a]' for j in
i('a').items()])
elif re.search(r'类别', i('label').text()):
d.extend(['[a=cr:' + json.dumps(
{'id': f"{j.attr('href')}_click", 'name': j.text()}) + '/]' + j.text() + '[/a]' for j in
i('a').items()])
vod.update({'vod_actor': ' '.join(a), 'vod_director': ' '.join(b), 'vod_remarks': ' '.join(c),
'vod_content': ' '.join(d) + '\n' + vod['vod_content']})
return {'list': [vod]}
def searchContent(self, key, quick, pg="1"):
params = {'keyword': key, 'page': pg}
data = self.getpq(
requests.get(f"{self.conh}/search", headers=self.headers, params=params, proxies=self.proxies).text)
return {'list': self.getvl(data('.vid-items .item')), 'page': pg}
def playerContent(self, flag, id, vipFlags):
# 处理跳转标识,获取初始加密地址
if id.startswith('_gggb_'):
data = self.getpq(
requests.get(f"{self.host}{id.replace('_gggb_', '')}", headers=self.headers).text)
id = data('#video-files div').attr('data-url')
# 解密初始URL
url = self.de_url(id)
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
video_id = parsed_url.path.split('/')[-1]
# 生成加密的token
tkid = self.encrypt_video_id(video_id)
data_url = f"{durl}/stream?token={tkid}"
# 请求视频流数据
response = requests.get(data_url, timeout=10)
data = response.json()
# 解密媒体数据
media = data["result"]["media"]
decrypted_media = self.decrypt_media(media)
decrypted_data = json.loads(decrypted_media)
playeurl = decrypted_data["stream"]
# 构建请求头并返回结果
headers = {
'user-agent': self.headers['user-agent'],
'origin': durl,
'referer': f"{durl}/"
}
return {'parse': 0, 'url': playeurl, 'header': headers}
def encrypt_video_id(self, video_id, key=None):
"""使用指定密钥对视频ID进行XOR加密并Base64编码"""
if key is None:
key = "kBxSj373GhC18iOc" # 默认密钥
# XOR加密
key_bytes = key.encode('utf-8')
encrypted_bytes = []
for i, char in enumerate(video_id):
key_byte = key_bytes[i % len(key_bytes)]
encrypted_byte = ord(char) ^ key_byte
encrypted_bytes.append(encrypted_byte)
# Base64编码
encrypted_base64 = base64.b64encode(bytes(encrypted_bytes)).decode('utf-8')
return encrypted_base64
def decrypt_media(self, encrypted_media, key="kBxSj373GhC18iOc"):
"""使用指定密钥解密媒体数据"""
# Base64解码
encrypted_bytes = base64.b64decode(encrypted_media)
# XOR解密
key_bytes = key.encode('utf-8')
decrypted_chars = []
for i, byte in enumerate(encrypted_bytes):
key_byte = key_bytes[i % len(key_bytes)]
decrypted_char = byte ^ key_byte
decrypted_chars.append(chr(decrypted_char))
# 组合成字符串并URL解码
decrypted_text = ''.join(decrypted_chars)
url_decoded_text = unquote(decrypted_text)
return url_decoded_text
def localProxy(self, param):
pass
def liveContent(self, url):
pass
def getvl(self, data):
videos = []
for i in data.items():
img = i('.img')
imgurl = img('.image img').attr('src')
if imgurl:
imgurl = imgurl.replace("/s360/", "/s1080/")
videos.append({
'vod_id': img('a').attr('href'),
'vod_name': i('.info .title').text(),
'vod_pic': imgurl,
'vod_year': i('.info .meta div').eq(-1).text(),
'vod_remarks': i('.duration').text(),
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def de_url(self, encoded_str):
decoded = b64decode(encoded_str).decode()
key = "G9zhUyphqPWZGWzZ" # 更新为第一个密钥
result = []
for i, char in enumerate(decoded):
key_char = key[i % len(key)]
decrypted_char = chr(ord(char) ^ ord(key_char))
result.append(decrypted_char)
return unquote(''.join(result))
def getpgc(self, data):
try:
if data:
if data('a'):
return int(data('a').attr('href').split('page=')[-1])
else:
return int(data.text())
else:
raise Exception("获取页数失败")
except:
return 1
def p_qjs(self, js_code):
try:
from com.whl.quickjs.wrapper import QuickJSContext
ctx = QuickJSContext.create()
jctx = ctx.evaluate(js_code)
code = jctx.strip().split('const posterUrl', 1)[0].split('{', 1)[-1]
result = ctx.evaluate(f"{code}\nJSON.stringify(media)")
ctx.destroy()
return json.loads(result)
except Exception as e:
self.log(f"执行失败: {e}")
return []
def ungzip(self, data):
result = gzip.decompress(b64decode(data)).decode()
return json.loads(result)
def getpq(self, data):
try:
return pq(data)
except Exception as e:
print(f"{str(e)}")
return pq(data.encode('utf-8'))

669
py/adult/香蕉.py Normal file
View File

@@ -0,0 +1,669 @@
# coding=utf-8
#!/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import time
import urllib.parse
import re
import requests
from lxml import etree
from urllib.parse import urljoin
class Spider(Spider):
def getName(self):
return "苹果视频"
def init(self, extend=""):
self.host = "https://618041.xyz"
self.api_host = "https://h5.xxoo168.org"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': self.host
}
# 定义特殊分区ID列表包含所有需要特殊处理的分类
self.special_categories = ['13', '14', '33', '53', '32', '52', '9']
self.log(f"苹果视频爬虫初始化完成,主站: {self.host}")
def html(self, content):
"""将HTML内容转换为可查询的对象"""
try:
return etree.HTML(content)
except:
self.log("HTML解析失败")
return None
def regStr(self, pattern, string, index=1):
"""正则表达式提取字符串"""
try:
match = re.search(pattern, string, re.IGNORECASE)
if match and len(match.groups()) >= index:
return match.group(index)
except:
pass
return ""
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
"""获取首页内容和分类"""
result = {}
# 只保留指定的分类
classes = [
{'type_id': '618041.xyz_1', 'type_name': '全部视频'},
{'type_id': '618041.xyz_13', 'type_name': '香蕉精品'},
{'type_id': '618041.xyz_22', 'type_name': '制服诱惑'},
{'type_id': '618041.xyz_6', 'type_name': '国产视频'},
{'type_id': '618041.xyz_8', 'type_name': '清纯少女'},
{'type_id': '618041.xyz_9', 'type_name': '辣妹大奶'},
{'type_id': '618041.xyz_10', 'type_name': '女同专属'},
{'type_id': '618041.xyz_11', 'type_name': '素人出演'},
{'type_id': '618041.xyz_12', 'type_name': '角色扮演'},
{'type_id': '618041.xyz_20', 'type_name': '人妻熟女'},
{'type_id': '618041.xyz_23', 'type_name': '日韩剧情'},
{'type_id': '618041.xyz_21', 'type_name': '经典伦理'},
{'type_id': '618041.xyz_7', 'type_name': '成人动漫'},
{'type_id': '618041.xyz_14', 'type_name': '精品二区'},
{'type_id': '618041.xyz_53', 'type_name': '动漫中字'},
{'type_id': '618041.xyz_52', 'type_name': '日本无码'},
{'type_id': '618041.xyz_33', 'type_name': '中文字幕'},
{'type_id': '618041.xyz_32', 'type_name': '国产自拍'}
]
result['class'] = classes
try:
rsp = self.fetch(self.host, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=20)
result['list'] = videos
except Exception as e:
self.log(f"首页获取出错: {str(e)}")
result['list'] = []
return result
def homeVideoContent(self):
"""分类定义 - 兼容性方法"""
return {
'class': [
{'type_id': '618041.xyz_1', 'type_name': '全部视频'},
{'type_id': '618041.xyz_13', 'type_name': '香蕉精品'},
{'type_id': '618041.xyz_22', 'type_name': '制服诱惑'},
{'type_id': '618041.xyz_6', 'type_name': '国产视频'},
{'type_id': '618041.xyz_8', 'type_name': '清纯少女'},
{'type_id': '618041.xyz_9', 'type_name': '辣妹大奶'},
{'type_id': '618041.xyz_10', 'type_name': '女同专属'},
{'type_id': '618041.xyz_11', 'type_name': '素人出演'},
{'type_id': '618041.xyz_12', 'type_name': '角色扮演'},
{'type_id': '618041.xyz_20', 'type_name': '人妻熟女'},
{'type_id': '618041.xyz_23', 'type_name': '日韩剧情'},
{'type_id': '618041.xyz_21', 'type_name': '经典伦理'},
{'type_id': '618041.xyz_7', 'type_name': '成人动漫'},
{'type_id': '618041.xyz_14', 'type_name': '精品二区'},
{'type_id': '618041.xyz_53', 'type_name': '动漫中字'},
{'type_id': '618041.xyz_52', 'type_name': '日本无码'},
{'type_id': '618041.xyz_33', 'type_name': '中文字幕'},
{'type_id': '618041.xyz_32', 'type_name': '国产自拍'}
]
}
def categoryContent(self, tid, pg, filter, extend):
"""分类内容 - 修改为使用固定页数设置"""
try:
domain, type_id = tid.split('_')
url = f"https://{domain}/index.php/vod/type/id/{type_id}.html"
if pg and pg != '1':
url = url.replace('.html', f'/page/{pg}.html')
self.log(f"访问分类URL: {url}")
rsp = self.fetch(url, headers=self.headers)
doc = self.html(rsp.text)
# 在这里将 type_id 传递给 _get_videos 方法
videos = self._get_videos(doc, category_id=type_id, limit=20)
# 使用固定页数设置,而不是尝试从页面解析
pagecount = 999
total = 19980
return {
'list': videos,
'page': int(pg),
'pagecount': pagecount,
'limit': 20,
'total': total
}
except Exception as e:
self.log(f"分类内容获取出错: {str(e)}")
return {'list': []}
def searchContent(self, key, quick, pg="1"):
"""搜索功能 - 完全修复版"""
try:
# 构造搜索URL
search_url = f"{self.host}/index.php/vod/type/id/1/wd/{urllib.parse.quote(key)}/page/{pg}.html"
self.log(f"搜索URL: {search_url}")
# 发送请求
rsp = self.fetch(search_url, headers=self.headers)
if not rsp or rsp.status_code != 200:
self.log("搜索请求失败")
return {'list': []}
# 解析HTML
doc = self.html(rsp.text)
if not doc:
self.log("搜索页面解析失败")
return {'list': []}
# 提取搜索结果
videos = self._get_videos(doc, limit=20)
# 尝试从页面提取分页信息
pagecount = 5 # 默认值
total = 100 # 默认值
# 尝试从分页元素中提取真实的分页信息
page_elements = doc.xpath('//div[@class="mypage"]/a')
if page_elements and len(page_elements) > 0:
try:
# 查找尾页链接
last_page = None
for elem in page_elements:
href = elem.xpath('./@href')[0]
if '尾页' in elem.text or 'page/' in href:
last_page = href
break
if last_page:
# 从尾页URL中提取页码
page_match = re.search(r'/page/(\d+)\.html', last_page)
if page_match:
pagecount = int(page_match.group(1))
total = pagecount * 20 # 估算总数
except:
pass
return {
'list': videos,
'page': int(pg),
'pagecount': pagecount,
'limit': 20,
'total': total
}
except Exception as e:
self.log(f"搜索出错: {str(e)}")
return {'list': []}
def detailContent(self, ids):
"""详情页面 - 特别处理特殊分区的链接"""
try:
vid = ids[0]
# 检查是否是特殊分区的链接
if vid.startswith('special_'):
# 解析特殊分区ID格式: special_{category_id}_{video_id}_{encoded_url}
parts = vid.split('_')
if len(parts) >= 4:
category_id = parts[1]
video_id = parts[2]
encoded_url = '_'.join(parts[3:])
play_url = urllib.parse.unquote(encoded_url)
self.log(f"特殊分区视频,直接使用链接: {play_url}")
# 从播放链接中提取视频URL
parsed_url = urllib.parse.urlparse(play_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
video_url = query_params.get('v', [''])[0]
pic_url = query_params.get('b', [''])[0]
title_encrypted = query_params.get('m', [''])[0]
# 解码标题
title = self._decrypt_title(title_encrypted)
return {
'list': [{
'vod_id': vid,
'vod_name': title,
'vod_pic': pic_url,
'vod_remarks': '',
'vod_year': '',
'vod_play_from': '直接播放',
'vod_play_url': f"第1集${play_url}"
}]
}
# 常规处理
if '_' in vid and len(vid.split('_')) > 2:
domain, category_id, video_id = vid.split('_')
else:
domain, video_id = vid.split('_')
detail_url = f"https://{domain}/index.php/vod/detail/id/{video_id}.html"
self.log(f"访问详情URL: {detail_url}")
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
video_info = self._get_detail(doc, rsp.text, vid)
return {'list': [video_info]} if video_info else {'list': []}
except Exception as e:
self.log(f"详情获取出错: {str(e)}")
return {'list': []}
def playerContent(self, flag, id, vipFlags):
"""播放链接 - 特别处理特殊分区的链接"""
try:
self.log(f"获取播放链接: flag={flag}, id={id}")
# 检查是否是特殊分区的链接
if id.startswith('special_'):
# 解析特殊分区ID格式: special_{category_id}_{video_id}_{encoded_url}
parts = id.split('_')
if len(parts) >= 4:
category_id = parts[1]
video_id = parts[2]
encoded_url = '_'.join(parts[3:])
play_url = urllib.parse.unquote(encoded_url)
self.log(f"特殊分区视频,直接使用链接: {play_url}")
# 从播放链接中提取视频URL
parsed_url = urllib.parse.urlparse(play_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
video_url = query_params.get('v', [''])[0]
if video_url:
# 确保URL是完整的
if video_url.startswith('//'):
video_url = 'https:' + video_url
elif not video_url.startswith('http'):
video_url = urljoin(self.host, video_url)
self.log(f"从特殊链接中提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
# 检查传入的ID是否为完整URL如果是则直接解析
if id.startswith('http'):
self.log("ID 是一个完整URL直接解析参数")
parsed_url = urllib.parse.urlparse(id)
query_params = urllib.parse.parse_qs(parsed_url.query)
# 尝试获取视频参数
video_url = query_params.get('v', [''])[0]
if not video_url:
# 尝试其他可能的参数名
for key in query_params:
if key in ['url', 'src', 'file']:
video_url = query_params[key][0]
break
if video_url:
# 解码可能的URL编码
video_url = urllib.parse.unquote(video_url)
# 确保URL是完整的
if video_url.startswith('//'):
video_url = 'https:' + video_url
elif not video_url.startswith('http'):
# 尝试添加基本域名
video_url = urljoin(self.host, video_url)
self.log(f"从 URL 参数中提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
else:
self.log("URL 中没有找到视频参数,尝试从页面提取")
# 请求页面并提取视频链接
rsp = self.fetch(id, headers=self.headers)
if rsp and rsp.status_code == 200:
video_url = self._extract_direct_video_url(rsp.text)
if video_url:
self.log(f"从页面提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
self.log("无法从页面提取视频链接返回原始URL")
return {'parse': 1, 'playUrl': '', 'url': id}
# 从新的 id 格式中提取视频ID和分类ID
if id.count('_') >= 2:
parts = id.split('_')
video_id = parts[-1]
category_id = parts[1]
else:
video_id = id.split('_')[-1]
category_id = ''
self.log(f"视频ID: {video_id}, 分类ID: {category_id}")
# 对于特殊分类,使用直接解析播放页面的方式
if category_id in self.special_categories:
self.log("特殊分类,尝试从详情页提取直接播放链接")
# 构造播放页面URL
play_page_url = f"{self.host}/index.php/vod/play/id/{video_id}.html"
# 请求播放页面
rsp = self.fetch(play_page_url, headers=self.headers)
if rsp and rsp.status_code == 200:
# 从页面提取视频链接
video_url = self._extract_direct_video_url(rsp.text)
if video_url:
self.log(f"从播放页面提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
# 如果提取失败回退到API方式
self.log("从播放页面提取失败尝试API方式")
return self._get_video_by_api(id, video_id)
else:
# 其他分类使用API方式
self.log("使用API方式获取视频地址")
return self._get_video_by_api(id, video_id)
except Exception as e:
self.log(f"播放链接获取出错: {str(e)}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
return {'parse': 1, 'playUrl': '', 'url': play_url}
def _get_video_by_api(self, id, video_id):
"""通过API获取视频地址"""
try:
api_url = f"{self.api_host}/api/v2/vod/reqplay/{video_id}"
self.log(f"请求API获取视频地址: {api_url}")
api_headers = self.headers.copy()
api_headers.update({
'Referer': f"{self.host}/",
'Origin': self.host,
'X-Requested-With': 'XMLHttpRequest'
})
api_response = self.fetch(api_url, headers=api_headers)
if api_response and api_response.status_code == 200:
data = api_response.json()
self.log(f"API响应: {data}")
if data.get('retcode') == 3:
video_url = data.get('data', {}).get('httpurl_preview', '')
else:
video_url = data.get('data', {}).get('httpurl', '')
if video_url:
video_url = video_url.replace('?300', '')
self.log(f"从API获取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
else:
self.log("API响应中没有找到视频地址")
else:
self.log(f"API请求失败状态码: {api_response.status_code if api_response else '无响应'}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
self.log(f"API请求失败回退到播放页面: {play_url}")
return {'parse': 1, 'playUrl': '', 'url': play_url}
except Exception as e:
self.log(f"API方式获取视频出错: {str(e)}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
return {'parse': 1, 'playUrl': '', 'url': play_url}
def _extract_direct_video_url(self, html_content):
"""从HTML内容中提取直接播放链接 (优化版)"""
try:
# 首先尝试提取明显的视频链接
patterns = [
r'v=([^&]+\.(?:m3u8|mp4))',
r'"url"\s*:\s*["\']([^"\']+\.(?:mp4|m3u8))["\']',
r'src\s*=\s*["\']([^"\']+\.(?:mp4|m3u8))["\']',
r'http[^\s<>"\'?]+\.(?:mp4|m3u8)'
]
for pattern in patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE)
for match in matches:
if isinstance(match, tuple):
match = match[0]
extracted_url = match.replace('\\', '')
extracted_url = urllib.parse.unquote(extracted_url)
if extracted_url.startswith('//'):
extracted_url = 'https:' + extracted_url
elif extracted_url.startswith('http'):
return extracted_url
return None
except Exception as e:
self.log(f"提取直接播放URL出错: {str(e)}")
return None
def _get_videos(self, doc, category_id=None, limit=None):
"""获取影片列表 - 根据实际网站结构"""
try:
videos = []
elements = doc.xpath('//a[@class="vodbox"]')
self.log(f"找到 {len(elements)} 个vodbox元素")
for elem in elements:
video = self._extract_video(elem, category_id)
if video:
videos.append(video)
return videos[:limit] if limit and videos else videos
except Exception as e:
self.log(f"获取影片列表出错: {str(e)}")
return []
def _extract_video(self, element, category_id=None):
"""提取影片信息 - 特别处理特殊分区的链接"""
try:
link = element.xpath('./@href')[0]
if link.startswith('/'):
link = self.host + link
# 检查是否是特殊分区的链接
is_special_link = 'ar-kk.html' in link or 'ar.html' in link
# 对于特殊分区直接使用链接本身作为ID
if is_special_link and category_id in self.special_categories:
# 提取链接中的参数
parsed_url = urllib.parse.urlparse(link)
query_params = urllib.parse.parse_qs(parsed_url.query)
# 获取视频ID从v参数中提取
video_url = query_params.get('v', [''])[0]
if video_url:
# 从视频URL中提取ID
video_id_match = re.search(r'/([a-f0-9-]+)/video\.m3u8', video_url)
if video_id_match:
video_id = video_id_match.group(1)
else:
# 如果没有匹配到,使用哈希值
video_id = str(hash(link) % 1000000)
else:
video_id = str(hash(link) % 1000000)
# 对于特殊分区保留完整的链接作为vod_id的一部分
final_vod_id = f"special_{category_id}_{video_id}_{urllib.parse.quote(link)}"
else:
# 常规处理
vod_id = self.regStr(r'm=(\d+)', link)
if not vod_id:
vod_id = str(hash(link) % 1000000)
final_vod_id = f"618041.xyz_{vod_id}"
if category_id:
final_vod_id = f"618041.xyz_{category_id}_{vod_id}"
# 提取标题
title_elem = element.xpath('.//p[@class="km-script"]/text()')
if not title_elem:
title_elem = element.xpath('.//p[contains(@class, "script")]/text()')
if not title_elem:
title_elem = element.xpath('.//p/text()')
if not title_elem:
title_elem = element.xpath('.//h3/text()')
if not title_elem:
title_elem = element.xpath('.//h4/text()')
if not title_elem:
self.log(f"未找到标题元素,跳过该视频")
return None
title_encrypted = title_elem[0].strip()
title = self._decrypt_title(title_encrypted)
# 提取图片
pic_elem = element.xpath('.//img/@data-original')
if not pic_elem:
pic_elem = element.xpath('.//img/@src')
pic = pic_elem[0] if pic_elem else ''
if pic:
if pic.startswith('//'):
pic = 'https:' + pic
elif pic.startswith('/'):
pic = self.host + pic
return {
'vod_id': final_vod_id,
'vod_name': title,
'vod_pic': pic,
'vod_remarks': '',
'vod_year': ''
}
except Exception as e:
self.log(f"提取影片信息出错: {str(e)}")
return None
def _decrypt_title(self, encrypted_text):
"""解密标题 - 使用网站的解密算法"""
try:
decrypted_chars = []
for char in encrypted_text:
code_point = ord(char)
decrypted_code = code_point ^ 128
decrypted_char = chr(decrypted_code)
decrypted_chars.append(decrypted_char)
decrypted_text = ''.join(decrypted_chars)
return decrypted_text
except Exception as e:
self.log(f"标题解密失败: {str(e)}")
return encrypted_text
def _get_detail(self, doc, html_content, vid):
"""获取详情信息 (优化版) - 修复播放源提取问题"""
try:
title = self._get_text(doc, ['//h1/text()', '//title/text()'])
pic = self._get_text(doc, ['//div[contains(@class,"dyimg")]//img/@src', '//img[contains(@class,"poster")]/@src'])
if pic and pic.startswith('/'):
pic = self.host + pic
desc = self._get_text(doc, ['//div[contains(@class,"yp_context")]/text()', '//div[contains(@class,"introduction")]//text()'])
actor = self._get_text(doc, ['//span[contains(text(),"主演")]/following-sibling::*/text()'])
director = self._get_text(doc, ['//span[contains(text(),"导演")]/following-sibling::*/text()'])
play_from = []
play_urls = []
# 使用更灵活的正则匹配来查找播放链接
player_link_patterns = [
re.compile(r'href="(.*?ar\.html.*?)"'),
re.compile(r'href="(.*?kkyd\.html.*?)"'),
re.compile(r'href="(.*?ar-kk\.html.*?)"')
]
player_links = []
for pattern in player_link_patterns:
matches = pattern.findall(html_content)
player_links.extend(matches)
if player_links:
episodes = []
for link in player_links:
full_url = urljoin(self.host, link)
episodes.append(f"第1集${full_url}")
if episodes:
play_from.append("默认播放源")
play_urls.append('#'.join(episodes))
if not play_from:
self.log("未找到播放源元素,无法定位播放源列表")
return {
'vod_id': vid,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '默认播放源',
'vod_play_url': f"第1集${vid}"
}
return {
'vod_id': vid,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '$$$'.join(play_from),
'vod_play_url': '$$$'.join(play_urls)
}
except Exception as e:
self.log(f"获取详情出错: {str(e)}")
return None
def _get_text(self, doc, selectors):
"""通用文本提取"""
for selector in selectors:
try:
texts = doc.xpath(selector)
for text in texts:
if text and text.strip():
return text.strip()
except:
continue
return ''
def log(self, message):
"""日志输出"""
print(f"[苹果视频] {message}")
def fetch(self, url, headers=None, method='GET', data=None, timeout=10):
"""网络请求"""
try:
if headers is None:
headers = self.headers
if method == 'GET':
response = requests.get(url, headers=headers, timeout=timeout, verify=False)
else:
response = requests.post(url, headers=headers, data=data, timeout=timeout, verify=False)
return response
except Exception as e:
self.log(f"网络请求失败: {url}, 错误: {str(e)}")
return None
# 注册爬虫
if __name__ == '__main__':
from base.spider import Spider as BaseSpider
BaseSpider.register(Spider())