feat(adult): 更新51吸瓜插件并优化站点选择逻辑

- 修改插件作者标识为 🌈 Love
- 重构 homeContent、categoryContent 等方法,增强容错与分类获取逻辑
- 引入动态 URL 测试机制,自动选择可用站点
- 改进视频播放地址解析逻辑,支持多集识别和去重
- 增加请求日志输出,便于调试
- 修复部分页面结构兼容问题,提升稳定性
- 更新 adult.json 配置,调整插件路径并新增“4K数毛”源
```
This commit is contained in:
Wang.Luo 2025-09-20 03:18:34 +08:00
parent 914908adc8
commit fdc95360b4
4 changed files with 1260 additions and 181 deletions

View File

@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
# by @嗷呜
# 🌈 Love
import json
import random
import re
@ -24,99 +24,233 @@ class Spider(Spider):
except:self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Pragma': 'no-cache',
'Cache-Control': 'no-cache',
'sec-ch-ua-platform': '"macOS"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'DNT': '1',
'sec-ch-ua-mobile': '?0',
'Origin': '',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
self.host=self.host_late(self.gethosts())
# Use working dynamic URLs directly
self.host = self.get_working_host()
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
self.getcnh()
self.log(f"使用站点: {self.host}")
print(f"使用站点: {self.host}")
pass
def getName(self):
pass
return "🌈 51吸瓜"
def isVideoFormat(self, url):
pass
# Treat direct media formats as playable without parsing
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
pass
return False
def destroy(self):
pass
def homeContent(self, filter):
data=self.getpq(requests.get(self.host, headers=self.headers,proxies=self.proxies).text)
result = {}
classes = []
for k in data('.category-list ul li').items():
classes.append({
'type_name': k('a').text(),
'type_id': k('a').attr('href')
})
result['class'] = classes
result['list'] = self.getlist(data('#index article a'))
return result
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'class': [], 'list': []}
data = self.getpq(response.text)
result = {}
classes = []
# Try to get categories from different possible locations
category_selectors = [
'.category-list ul li',
'.nav-menu li',
'.menu li',
'nav ul li'
]
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
# Skip placeholder or invalid entries
if not href or href == '#' or not name:
continue
classes.append({
'type_name': name,
'type_id': href
})
if classes:
break
# If no categories found, create some default ones
if not classes:
classes = [
{'type_name': '首页', 'type_id': '/'},
{'type_name': '最新', 'type_id': '/latest/'},
{'type_name': '热门', 'type_id': '/hot/'}
]
result['class'] = classes
result['list'] = self.getlist(data('#index article a'))
return result
except Exception as e:
print(f"homeContent error: {e}")
return {'class': [], 'list': []}
def homeVideoContent(self):
pass
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article a, #archive article a'))}
except Exception as e:
print(f"homeVideoContent error: {e}")
return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
if '@folder' in tid:
id = tid.replace('@folder', '')
videos = self.getfod(id)
else:
data = self.getpq(requests.get(f"{self.host}{tid}{pg}", headers=self.headers, proxies=self.proxies).text)
videos = self.getlist(data('#archive article a'), tid)
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 1 if '@folder' in tid else 99999
result['limit'] = 90
result['total'] = 999999
return result
try:
if '@folder' in tid:
id = tid.replace('@folder', '')
videos = self.getfod(id)
else:
# Build URL properly
if tid.startswith('/'):
if pg and pg != '1':
url = f"{self.host}{tid}page/{pg}/"
else:
url = f"{self.host}{tid}"
else:
url = f"{self.host}/{tid}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': [], 'page': pg, 'pagecount': 1, 'limit': 90, 'total': 0}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article a, #index article a'), tid)
result = {}
result['list'] = videos
result['page'] = pg
result['pagecount'] = 1 if '@folder' in tid else 99999
result['limit'] = 90
result['total'] = 999999
return result
except Exception as e:
print(f"categoryContent error: {e}")
return {'list': [], 'page': pg, 'pagecount': 1, 'limit': 90, 'total': 0}
def detailContent(self, ids):
url=f"{self.host}{ids[0]}"
data=self.getpq(requests.get(url, headers=self.headers,proxies=self.proxies).text)
vod = {'vod_play_from': '51吸瓜'}
try:
clist = []
if data('.tags .keywords a'):
for k in data('.tags .keywords a').items():
title = k.text()
href = k.attr('href')
clist.append('[a=cr:' + json.dumps({'id': href, 'name': title}) + '/]' + title + '[/a]')
vod['vod_content'] = ' '.join(clist)
except:
vod['vod_content'] = data('.post-title').text()
try:
plist=[]
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
config = json.loads(k.attr('data-config'))
plist.append(f"视频{c}${config['video']['url']}")
vod['vod_play_url']='#'.join(plist)
except:
vod['vod_play_url']=f"请停止活塞运动,可能没有视频${url}"
return {'list':[vod]}
url = f"{self.host}{ids[0]}" if not ids[0].startswith('http') else ids[0]
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': [{'vod_play_from': '51吸瓜', 'vod_play_url': f'页面加载失败${url}'}]}
data = self.getpq(response.text)
vod = {'vod_play_from': '51吸瓜'}
# Get content/description
try:
clist = []
if data('.tags .keywords a'):
for k in data('.tags .keywords a').items():
title = k.text()
href = k.attr('href')
if title and href:
clist.append('[a=cr:' + json.dumps({'id': href, 'name': title}) + '/]' + title + '[/a]')
vod['vod_content'] = ' '.join(clist) if clist else data('.post-title').text()
except:
vod['vod_content'] = data('.post-title').text() or '51吸瓜视频'
# Get video URLs (build episode list when multiple players exist)
try:
plist = []
used_names = set()
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
config_attr = k.attr('data-config')
if config_attr:
try:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
# Determine a readable episode name from nearby headings if present
ep_name = ''
try:
parent = k.parents().eq(0)
# search up to a few ancestors for a heading text
for _ in range(3):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text() or ''
heading = heading.strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
except Exception:
ep_name = ''
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
# Ensure the name is unique
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
if video_url:
self.log(f"解析到视频: {name} -> {video_url}")
print(f"解析到视频: {name} -> {video_url}")
plist.append(f"{name}${video_url}")
except:
continue
if plist:
self.log(f"拼装播放列表,共{len(plist)}")
print(f"拼装播放列表,共{len(plist)}")
vod['vod_play_url'] = '#'.join(plist)
else:
vod['vod_play_url'] = f"未找到视频源${url}"
except Exception as e:
vod['vod_play_url'] = f"视频解析失败${url}"
return {'list': [vod]}
except Exception as e:
print(f"detailContent error: {e}")
return {'list': [{'vod_play_from': '51吸瓜', 'vod_play_url': f'详情页加载失败${ids[0] if ids else ""}'}]}
def searchContent(self, key, quick, pg="1"):
data=self.getpq(requests.get(f"{self.host}/search/{key}/{pg}", headers=self.headers,proxies=self.proxies).text)
return {'list':self.getlist(data('#archive article a')),'page':pg}
try:
url = f"{self.host}/search/{key}/{pg}" if pg != "1" else f"{self.host}/search/{key}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200:
return {'list': [], 'page': pg}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article a, #index article a'))
return {'list': videos, 'page': pg}
except Exception as e:
print(f"searchContent error: {e}")
return {'list': [], 'page': pg}
def playerContent(self, flag, id, vipFlags):
p=1
if '.m3u8' in id:p,id=0,self.proxy(id)
return {'parse': p, 'url': id, 'header': self.headers}
url = id
p = 1
if self.isVideoFormat(url):
# m3u8/mp4 direct play; when using proxy setting, wrap to proxy for m3u8
if '.m3u8' in url:
url = self.proxy(url)
p = 0
self.log(f"播放请求: parse={p}, url={url}")
print(f"播放请求: parse={p}, url={url}")
return {'parse': p, 'url': url, 'header': self.headers}
def localProxy(self, param):
if param.get('type') == 'img':
@ -180,118 +314,35 @@ class Spider(Spider):
print(f"Base64解码错误: {str(e)}")
return ""
def gethosts(self):
url = 'https://51cg.fun'
curl = self.getCache('host_51cn')
if curl:
def get_working_host(self):
"""Get working host from known dynamic URLs"""
# Known working URLs from the dynamic gateway
dynamic_urls = [
'https://artist.vgwtswi.xyz',
'https://ability.vgwtswi.xyz',
'https://am.vgwtswi.xyz'
]
# Test each URL to find a working one
for url in dynamic_urls:
try:
data = self.getpq(requests.get(curl, headers=self.headers, proxies=self.proxies).text)('a').attr('href')
if data:
parsed_url = urlparse(data)
url = parsed_url.scheme + "://" + parsed_url.netloc
except:
pass
try:
html = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
html_pattern = r"Base64\.decode\('([^']+)'\)"
html_match = re.search(html_pattern, html('script').eq(-1).text(), re.DOTALL)
if not html_match: raise Exception("未找到html")
html = self.getpq(b64decode(html_match.group(1)).decode())('script').eq(-4).text()
return self.hstr(html)
except Exception as e:
self.log(f"获取: {str(e)}")
return ""
def getcnh(self):
data=self.getpq(requests.get(f"{self.host}/ybml.html", headers=self.headers,proxies=self.proxies).text)
url=data('.post-content[itemprop="articleBody"] blockquote p').eq(0)('a').attr('href')
parsed_url = urlparse(url)
host = parsed_url.scheme + "://" + parsed_url.netloc
self.setCache('host_51cn',host)
def hstr(self, html):
pattern = r"(backupLine\s*=\s*\[\])\s+(words\s*=)"
replacement = r"\1, \2"
html = re.sub(pattern, replacement, html)
data = f"""
var Vx = {{
range: function(start, end) {{
const result = [];
for (let i = start; i < end; i++) {{
result.push(i);
}}
return result;
}},
map: function(array, callback) {{
const result = [];
for (let i = 0; i < array.length; i++) {{
result.push(callback(array[i], i, array));
}}
return result;
}}
}};
Array.prototype.random = function() {{
return this[Math.floor(Math.random() * this.length)];
}};
var location = {{
protocol: "https:"
}};
function executeAndGetResults() {{
var allLines = lineAry.concat(backupLine);
var resultStr = JSON.stringify(allLines);
return resultStr;
}};
{html}
executeAndGetResults();
"""
return self.p_qjs(data)
def p_qjs(self, js_code):
try:
from com.whl.quickjs.wrapper import QuickJSContext
ctx = QuickJSContext.create()
result_json = ctx.evaluate(js_code)
ctx.destroy()
return json.loads(result_json)
except Exception as e:
self.log(f"执行失败: {e}")
return []
def host_late(self, url_list):
if isinstance(url_list, str):
urls = [u.strip() for u in url_list.split(',')]
else:
urls = url_list
if len(urls) <= 1:
return urls[0] if urls else ''
results = {}
threads = []
def test_host(url):
try:
start_time = time.time()
response = requests.head(url,headers=self.headers,proxies=self.proxies,timeout=1.0, allow_redirects=False)
delay = (time.time() - start_time) * 1000
results[url] = delay
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
# Verify it has the expected content structure
data = self.getpq(response.text)
articles = data('#index article a')
if len(articles) > 0:
self.log(f"选用可用站点: {url}")
print(f"选用可用站点: {url}")
return url
except Exception as e:
results[url] = float('inf')
continue
# Fallback to first URL if none work (better than crashing)
self.log(f"未检测到可用站点,回退: {dynamic_urls[0]}")
print(f"未检测到可用站点,回退: {dynamic_urls[0]}")
return dynamic_urls[0]
for url in urls:
t = threading.Thread(target=test_host, args=(url,))
threads.append(t)
t.start()
for t in threads:
t.join()
return min(results.items(), key=lambda x: x[1])[0]
def getlist(self, data, tid=''):
videos = []
@ -299,13 +350,14 @@ class Spider(Spider):
for k in data.items():
a = k.attr('href')
b = k('h2').text()
c = k('span[itemprop="datePublished"]').text()
if a and b and c:
# Some pages might not include datePublished; use a fallback
c = k('span[itemprop="datePublished"]').text() or k('.post-meta, .entry-meta, time').text()
if a and b:
videos.append({
'vod_id': f"{a}{'@folder' if l else ''}",
'vod_name': b.replace('\n', ' '),
'vod_pic': self.getimg(k('script').text()),
'vod_remarks': c,
'vod_remarks': c or '',
'vod_tag': 'folder' if l else '',
'style': {"type": "rect", "ratio": 1.33}
})

View File

@ -1203,7 +1203,7 @@
"key": "香蕉APP",
"name": "香蕉APP",
"type": 3,
"api": "./PyramidStore/plugin/adult/香蕉APP.py",
"api": "./py/adult/香蕉.py",
"searchable": 1,
"quickSearch": 1,
"filterable": 1
@ -1253,6 +1253,15 @@
"quickSearch": 1,
"filterable": 1
},
{
"key": "4K数毛",
"name": "FullHD",
"type": 3,
"api": "./py/adult/4K数毛.py",
"searchable": 1,
"quickSearch": 1,
"filterable": 1
},
{
"key": "好色TV",
"name": "好色TV",

349
py/adult/4K数毛.py Normal file
View File

@ -0,0 +1,349 @@
import requests
from bs4 import BeautifulSoup
import re
from base.spider import Spider
import sys
import json
import base64
import urllib.parse
from Crypto.Cipher import ARC4
from Crypto.Util.Padding import unpad
import binascii
sys.path.append('..')
xurl = "https://www.fullhd.xxx/zh/"
headerx = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.87 Safari/537.36'
}
pm = ''
class Spider(Spider):
global xurl
global headerx
def getName(self):
return "首页"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def extract_middle_text(self, text, start_str, end_str, pl, start_index1: str = '', end_index2: str = ''):
if pl == 3:
plx = []
while True:
start_index = text.find(start_str)
if start_index == -1:
break
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
break
middle_text = text[start_index + len(start_str):end_index]
plx.append(middle_text)
text = text.replace(start_str + middle_text + end_str, '')
if len(plx) > 0:
purl = ''
for i in range(len(plx)):
matches = re.findall(start_index1, plx[i])
output = ""
for match in matches:
match3 = re.search(r'(?:^|[^0-9])(\d+)(?:[^0-9]|$)', match[1])
if match3:
number = match3.group(1)
else:
number = 0
if 'http' not in match[0]:
output += f"#{'📽️' + match[1]}${number}{xurl}{match[0]}"
else:
output += f"#{'📽️' + match[1]}${number}{match[0]}"
output = output[1:]
purl = purl + output + "$$$"
purl = purl[:-3]
return purl
else:
return ""
else:
start_index = text.find(start_str)
if start_index == -1:
return ""
end_index = text.find(end_str, start_index + len(start_str))
if end_index == -1:
return ""
if pl == 0:
middle_text = text[start_index + len(start_str):end_index]
return middle_text.replace("\\", "")
if pl == 1:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
jg = ' '.join(matches)
return jg
if pl == 2:
middle_text = text[start_index + len(start_str):end_index]
matches = re.findall(start_index1, middle_text)
if matches:
new_list = [f'{item}' for item in matches]
jg = '$$$'.join(new_list)
return jg
def homeContent(self, filter):
result = {}
result = {"class": [
{"type_id": "latest-updates", "type_name": "最新视频🌠"},
{"type_id": "top-rated", "type_name": "最佳视频🌠"},
{"type_id": "most-popular", "type_name": "热门影片🌠"},
{"type_id": "networks/brazzers-com", "type_name": "Brazzers🌠"},
{"type_id": "networks/tushy-com", "type_name": "Tushy🌠"},
{"type_id": "networks/naughtyamerica-com", "type_name": "Naughtyamerica🌠"},
{"type_id": "sites/sexmex", "type_name": "Sexmex🌠"},
{"type_id": "sites/passion-hd", "type_name": "Passion-HD🌠"},
{"type_id": "categories/animation", "type_name": "Animation🌠"},
{"type_id": "categories/18-years-old", "type_name": "Teen🌠"},
{"type_id": "categories/pawg", "type_name": "Pawg🌠"},
{"type_id": "categories/thong", "type_name": "Thong🌠"},
{"type_id": "categories/stockings", "type_name": "Stockings🌠"},
{"type_id": "categories/jav-uncensored", "type_name": "JAV🌠"},
{"type_id": "categories/pantyhose", "type_name": "Pantyhose🌠"}
],
}
return result
def homeVideoContent(self):
videos = []
try:
detail = requests.get(url=xurl, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
# Get videos from different sections
sections = {
"latest-updates": "最新视频",
"top-rated": "最佳视频",
"most-popular": "热门影片"
}
for section_id, section_name in sections.items():
section = doc.find('div', id=f"list_videos_videos_watched_right_now_items")
if not section:
continue
vods = section.find_all('div', class_="item")
for vod in vods:
names = vod.find_all('a')
name = names[0]['title'] if names and 'title' in names[0].attrs else section_name
ids = vod.find_all('a')
id = ids[0]['href'] if ids else ""
pics = vod.find('img', class_="lazyload")
pic = pics['data-src'] if pics and 'data-src' in pics.attrs else ""
if pic and 'http' not in pic:
pic = xurl + pic
remarks = vod.find('span', class_="duration")
remark = remarks.text.strip() if remarks else ""
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
result = {'list': videos}
return result
except Exception as e:
print(f"Error in homeVideoContent: {str(e)}")
return {'list': []}
def categoryContent(self, cid, pg, filter, ext):
result = {}
videos = []
try:
if pg and int(pg) > 1:
url = f'{xurl}/{cid}/{pg}/'
else:
url = f'{xurl}/{cid}/'
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
section = doc.find('div', class_="list-videos")
if section:
vods = section.find_all('div', class_="item")
for vod in vods:
names = vod.find_all('a')
name = names[0]['title'] if names and 'title' in names[0].attrs else ""
ids = vod.find_all('a')
id = ids[0]['href'] if ids else ""
pics = vod.find('img', class_="lazyload")
pic = pics['data-src'] if pics and 'data-src' in pics.attrs else ""
if pic and 'http' not in pic:
pic = xurl + pic
remarks = vod.find('span', class_="duration")
remark = remarks.text.strip() if remarks else ""
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
except Exception as e:
print(f"Error in categoryContent: {str(e)}")
result = {
'list': videos,
'page': pg,
'pagecount': 9999,
'limit': 90,
'total': 999999
}
return result
def detailContent(self, ids):
global pm
did = ids[0]
result = {}
videos = []
playurl = ''
if 'http' not in did:
did = xurl + did
res1 = requests.get(url=did, headers=headerx)
res1.encoding = "utf-8"
res = res1.text
content = '👉' + self.extract_middle_text(res,'<h1>','</h1>', 0)
yanuan = self.extract_middle_text(res, '<span>Pornstars:</span>','</div>',1, 'href=".*?">(.*?)</a>')
bofang = did
videos.append({
"vod_id": did,
"vod_actor": yanuan,
"vod_director": '',
"vod_content": content,
"vod_play_from": '💗FullHD💗',
"vod_play_url": bofang
})
result['list'] = videos
return result
def playerContent(self, flag, id, vipFlags):
parts = id.split("http")
xiutan = 0
if xiutan == 0:
if len(parts) > 1:
before_https, after_https = parts[0], 'http' + parts[1]
res = requests.get(url=after_https, headers=headerx)
res = res.text
url2 = self.extract_middle_text(res, '<video', '</video>', 0).replace('\\', '')
soup = BeautifulSoup(url2, 'html.parser')
first_source = soup.find('source')
src_value = first_source.get('src')
response = requests.head(src_value, allow_redirects=False)
if response.status_code == 302:
redirect_url = response.headers['Location']
response = requests.head(redirect_url, allow_redirects=False)
if response.status_code == 302:
redirect_url = response.headers['Location']
result = {}
result["parse"] = xiutan
result["playUrl"] = ''
result["url"] = redirect_url
result["header"] = headerx
return result
def searchContentPage(self, key, quick, page):
result = {}
videos = []
if not page:
page = '1'
if page == '1':
url = f'{xurl}/search/{key}/'
else:
url = f'{xurl}/search/{key}/{str(page)}/'
try:
detail = requests.get(url=url, headers=headerx)
detail.encoding = "utf-8"
res = detail.text
doc = BeautifulSoup(res, "lxml")
section = doc.find('div', class_="list-videos")
if section:
vods = section.find_all('div', class_="item")
for vod in vods:
names = vod.find_all('a')
name = names[0]['title'] if names and 'title' in names[0].attrs else ""
ids = vod.find_all('a')
id = ids[0]['href'] if ids else ""
pics = vod.find('img', class_="lazyload")
pic = pics['data-src'] if pics and 'data-src' in pics.attrs else ""
if pic and 'http' not in pic:
pic = xurl + pic
remarks = vod.find('span', class_="duration")
remark = remarks.text.strip() if remarks else ""
video = {
"vod_id": id,
"vod_name": name,
"vod_pic": pic,
"vod_remarks": remark
}
videos.append(video)
except Exception as e:
print(f"Error in searchContentPage: {str(e)}")
result = {
'list': videos,
'page': page,
'pagecount': 9999,
'limit': 90,
'total': 999999
}
return result
def searchContent(self, key, quick):
return self.searchContentPage(key, quick, '1')
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None

669
py/adult/香蕉.py Normal file
View File

@ -0,0 +1,669 @@
# coding=utf-8
#!/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import time
import urllib.parse
import re
import requests
from lxml import etree
from urllib.parse import urljoin
class Spider(Spider):
def getName(self):
return "苹果视频"
def init(self, extend=""):
self.host = "https://618041.xyz"
self.api_host = "https://h5.xxoo168.org"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': self.host
}
# 定义特殊分区ID列表包含所有需要特殊处理的分类
self.special_categories = ['13', '14', '33', '53', '32', '52', '9']
self.log(f"苹果视频爬虫初始化完成,主站: {self.host}")
def html(self, content):
"""将HTML内容转换为可查询的对象"""
try:
return etree.HTML(content)
except:
self.log("HTML解析失败")
return None
def regStr(self, pattern, string, index=1):
"""正则表达式提取字符串"""
try:
match = re.search(pattern, string, re.IGNORECASE)
if match and len(match.groups()) >= index:
return match.group(index)
except:
pass
return ""
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
"""获取首页内容和分类"""
result = {}
# 只保留指定的分类
classes = [
{'type_id': '618041.xyz_1', 'type_name': '全部视频'},
{'type_id': '618041.xyz_13', 'type_name': '香蕉精品'},
{'type_id': '618041.xyz_22', 'type_name': '制服诱惑'},
{'type_id': '618041.xyz_6', 'type_name': '国产视频'},
{'type_id': '618041.xyz_8', 'type_name': '清纯少女'},
{'type_id': '618041.xyz_9', 'type_name': '辣妹大奶'},
{'type_id': '618041.xyz_10', 'type_name': '女同专属'},
{'type_id': '618041.xyz_11', 'type_name': '素人出演'},
{'type_id': '618041.xyz_12', 'type_name': '角色扮演'},
{'type_id': '618041.xyz_20', 'type_name': '人妻熟女'},
{'type_id': '618041.xyz_23', 'type_name': '日韩剧情'},
{'type_id': '618041.xyz_21', 'type_name': '经典伦理'},
{'type_id': '618041.xyz_7', 'type_name': '成人动漫'},
{'type_id': '618041.xyz_14', 'type_name': '精品二区'},
{'type_id': '618041.xyz_53', 'type_name': '动漫中字'},
{'type_id': '618041.xyz_52', 'type_name': '日本无码'},
{'type_id': '618041.xyz_33', 'type_name': '中文字幕'},
{'type_id': '618041.xyz_32', 'type_name': '国产自拍'}
]
result['class'] = classes
try:
rsp = self.fetch(self.host, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=20)
result['list'] = videos
except Exception as e:
self.log(f"首页获取出错: {str(e)}")
result['list'] = []
return result
def homeVideoContent(self):
"""分类定义 - 兼容性方法"""
return {
'class': [
{'type_id': '618041.xyz_1', 'type_name': '全部视频'},
{'type_id': '618041.xyz_13', 'type_name': '香蕉精品'},
{'type_id': '618041.xyz_22', 'type_name': '制服诱惑'},
{'type_id': '618041.xyz_6', 'type_name': '国产视频'},
{'type_id': '618041.xyz_8', 'type_name': '清纯少女'},
{'type_id': '618041.xyz_9', 'type_name': '辣妹大奶'},
{'type_id': '618041.xyz_10', 'type_name': '女同专属'},
{'type_id': '618041.xyz_11', 'type_name': '素人出演'},
{'type_id': '618041.xyz_12', 'type_name': '角色扮演'},
{'type_id': '618041.xyz_20', 'type_name': '人妻熟女'},
{'type_id': '618041.xyz_23', 'type_name': '日韩剧情'},
{'type_id': '618041.xyz_21', 'type_name': '经典伦理'},
{'type_id': '618041.xyz_7', 'type_name': '成人动漫'},
{'type_id': '618041.xyz_14', 'type_name': '精品二区'},
{'type_id': '618041.xyz_53', 'type_name': '动漫中字'},
{'type_id': '618041.xyz_52', 'type_name': '日本无码'},
{'type_id': '618041.xyz_33', 'type_name': '中文字幕'},
{'type_id': '618041.xyz_32', 'type_name': '国产自拍'}
]
}
def categoryContent(self, tid, pg, filter, extend):
"""分类内容 - 修改为使用固定页数设置"""
try:
domain, type_id = tid.split('_')
url = f"https://{domain}/index.php/vod/type/id/{type_id}.html"
if pg and pg != '1':
url = url.replace('.html', f'/page/{pg}.html')
self.log(f"访问分类URL: {url}")
rsp = self.fetch(url, headers=self.headers)
doc = self.html(rsp.text)
# 在这里将 type_id 传递给 _get_videos 方法
videos = self._get_videos(doc, category_id=type_id, limit=20)
# 使用固定页数设置,而不是尝试从页面解析
pagecount = 999
total = 19980
return {
'list': videos,
'page': int(pg),
'pagecount': pagecount,
'limit': 20,
'total': total
}
except Exception as e:
self.log(f"分类内容获取出错: {str(e)}")
return {'list': []}
def searchContent(self, key, quick, pg="1"):
"""搜索功能 - 完全修复版"""
try:
# 构造搜索URL
search_url = f"{self.host}/index.php/vod/type/id/1/wd/{urllib.parse.quote(key)}/page/{pg}.html"
self.log(f"搜索URL: {search_url}")
# 发送请求
rsp = self.fetch(search_url, headers=self.headers)
if not rsp or rsp.status_code != 200:
self.log("搜索请求失败")
return {'list': []}
# 解析HTML
doc = self.html(rsp.text)
if not doc:
self.log("搜索页面解析失败")
return {'list': []}
# 提取搜索结果
videos = self._get_videos(doc, limit=20)
# 尝试从页面提取分页信息
pagecount = 5 # 默认值
total = 100 # 默认值
# 尝试从分页元素中提取真实的分页信息
page_elements = doc.xpath('//div[@class="mypage"]/a')
if page_elements and len(page_elements) > 0:
try:
# 查找尾页链接
last_page = None
for elem in page_elements:
href = elem.xpath('./@href')[0]
if '尾页' in elem.text or 'page/' in href:
last_page = href
break
if last_page:
# 从尾页URL中提取页码
page_match = re.search(r'/page/(\d+)\.html', last_page)
if page_match:
pagecount = int(page_match.group(1))
total = pagecount * 20 # 估算总数
except:
pass
return {
'list': videos,
'page': int(pg),
'pagecount': pagecount,
'limit': 20,
'total': total
}
except Exception as e:
self.log(f"搜索出错: {str(e)}")
return {'list': []}
def detailContent(self, ids):
"""详情页面 - 特别处理特殊分区的链接"""
try:
vid = ids[0]
# 检查是否是特殊分区的链接
if vid.startswith('special_'):
# 解析特殊分区ID格式: special_{category_id}_{video_id}_{encoded_url}
parts = vid.split('_')
if len(parts) >= 4:
category_id = parts[1]
video_id = parts[2]
encoded_url = '_'.join(parts[3:])
play_url = urllib.parse.unquote(encoded_url)
self.log(f"特殊分区视频,直接使用链接: {play_url}")
# 从播放链接中提取视频URL
parsed_url = urllib.parse.urlparse(play_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
video_url = query_params.get('v', [''])[0]
pic_url = query_params.get('b', [''])[0]
title_encrypted = query_params.get('m', [''])[0]
# 解码标题
title = self._decrypt_title(title_encrypted)
return {
'list': [{
'vod_id': vid,
'vod_name': title,
'vod_pic': pic_url,
'vod_remarks': '',
'vod_year': '',
'vod_play_from': '直接播放',
'vod_play_url': f"第1集${play_url}"
}]
}
# 常规处理
if '_' in vid and len(vid.split('_')) > 2:
domain, category_id, video_id = vid.split('_')
else:
domain, video_id = vid.split('_')
detail_url = f"https://{domain}/index.php/vod/detail/id/{video_id}.html"
self.log(f"访问详情URL: {detail_url}")
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
video_info = self._get_detail(doc, rsp.text, vid)
return {'list': [video_info]} if video_info else {'list': []}
except Exception as e:
self.log(f"详情获取出错: {str(e)}")
return {'list': []}
def playerContent(self, flag, id, vipFlags):
"""播放链接 - 特别处理特殊分区的链接"""
try:
self.log(f"获取播放链接: flag={flag}, id={id}")
# 检查是否是特殊分区的链接
if id.startswith('special_'):
# 解析特殊分区ID格式: special_{category_id}_{video_id}_{encoded_url}
parts = id.split('_')
if len(parts) >= 4:
category_id = parts[1]
video_id = parts[2]
encoded_url = '_'.join(parts[3:])
play_url = urllib.parse.unquote(encoded_url)
self.log(f"特殊分区视频,直接使用链接: {play_url}")
# 从播放链接中提取视频URL
parsed_url = urllib.parse.urlparse(play_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
video_url = query_params.get('v', [''])[0]
if video_url:
# 确保URL是完整的
if video_url.startswith('//'):
video_url = 'https:' + video_url
elif not video_url.startswith('http'):
video_url = urljoin(self.host, video_url)
self.log(f"从特殊链接中提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
# 检查传入的ID是否为完整URL如果是则直接解析
if id.startswith('http'):
self.log("ID 是一个完整URL直接解析参数")
parsed_url = urllib.parse.urlparse(id)
query_params = urllib.parse.parse_qs(parsed_url.query)
# 尝试获取视频参数
video_url = query_params.get('v', [''])[0]
if not video_url:
# 尝试其他可能的参数名
for key in query_params:
if key in ['url', 'src', 'file']:
video_url = query_params[key][0]
break
if video_url:
# 解码可能的URL编码
video_url = urllib.parse.unquote(video_url)
# 确保URL是完整的
if video_url.startswith('//'):
video_url = 'https:' + video_url
elif not video_url.startswith('http'):
# 尝试添加基本域名
video_url = urljoin(self.host, video_url)
self.log(f"从 URL 参数中提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
else:
self.log("URL 中没有找到视频参数,尝试从页面提取")
# 请求页面并提取视频链接
rsp = self.fetch(id, headers=self.headers)
if rsp and rsp.status_code == 200:
video_url = self._extract_direct_video_url(rsp.text)
if video_url:
self.log(f"从页面提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
self.log("无法从页面提取视频链接返回原始URL")
return {'parse': 1, 'playUrl': '', 'url': id}
# 从新的 id 格式中提取视频ID和分类ID
if id.count('_') >= 2:
parts = id.split('_')
video_id = parts[-1]
category_id = parts[1]
else:
video_id = id.split('_')[-1]
category_id = ''
self.log(f"视频ID: {video_id}, 分类ID: {category_id}")
# 对于特殊分类,使用直接解析播放页面的方式
if category_id in self.special_categories:
self.log("特殊分类,尝试从详情页提取直接播放链接")
# 构造播放页面URL
play_page_url = f"{self.host}/index.php/vod/play/id/{video_id}.html"
# 请求播放页面
rsp = self.fetch(play_page_url, headers=self.headers)
if rsp and rsp.status_code == 200:
# 从页面提取视频链接
video_url = self._extract_direct_video_url(rsp.text)
if video_url:
self.log(f"从播放页面提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
# 如果提取失败回退到API方式
self.log("从播放页面提取失败尝试API方式")
return self._get_video_by_api(id, video_id)
else:
# 其他分类使用API方式
self.log("使用API方式获取视频地址")
return self._get_video_by_api(id, video_id)
except Exception as e:
self.log(f"播放链接获取出错: {str(e)}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
return {'parse': 1, 'playUrl': '', 'url': play_url}
def _get_video_by_api(self, id, video_id):
"""通过API获取视频地址"""
try:
api_url = f"{self.api_host}/api/v2/vod/reqplay/{video_id}"
self.log(f"请求API获取视频地址: {api_url}")
api_headers = self.headers.copy()
api_headers.update({
'Referer': f"{self.host}/",
'Origin': self.host,
'X-Requested-With': 'XMLHttpRequest'
})
api_response = self.fetch(api_url, headers=api_headers)
if api_response and api_response.status_code == 200:
data = api_response.json()
self.log(f"API响应: {data}")
if data.get('retcode') == 3:
video_url = data.get('data', {}).get('httpurl_preview', '')
else:
video_url = data.get('data', {}).get('httpurl', '')
if video_url:
video_url = video_url.replace('?300', '')
self.log(f"从API获取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
else:
self.log("API响应中没有找到视频地址")
else:
self.log(f"API请求失败状态码: {api_response.status_code if api_response else '无响应'}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
self.log(f"API请求失败回退到播放页面: {play_url}")
return {'parse': 1, 'playUrl': '', 'url': play_url}
except Exception as e:
self.log(f"API方式获取视频出错: {str(e)}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
return {'parse': 1, 'playUrl': '', 'url': play_url}
def _extract_direct_video_url(self, html_content):
"""从HTML内容中提取直接播放链接 (优化版)"""
try:
# 首先尝试提取明显的视频链接
patterns = [
r'v=([^&]+\.(?:m3u8|mp4))',
r'"url"\s*:\s*["\']([^"\']+\.(?:mp4|m3u8))["\']',
r'src\s*=\s*["\']([^"\']+\.(?:mp4|m3u8))["\']',
r'http[^\s<>"\'?]+\.(?:mp4|m3u8)'
]
for pattern in patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE)
for match in matches:
if isinstance(match, tuple):
match = match[0]
extracted_url = match.replace('\\', '')
extracted_url = urllib.parse.unquote(extracted_url)
if extracted_url.startswith('//'):
extracted_url = 'https:' + extracted_url
elif extracted_url.startswith('http'):
return extracted_url
return None
except Exception as e:
self.log(f"提取直接播放URL出错: {str(e)}")
return None
def _get_videos(self, doc, category_id=None, limit=None):
"""获取影片列表 - 根据实际网站结构"""
try:
videos = []
elements = doc.xpath('//a[@class="vodbox"]')
self.log(f"找到 {len(elements)} 个vodbox元素")
for elem in elements:
video = self._extract_video(elem, category_id)
if video:
videos.append(video)
return videos[:limit] if limit and videos else videos
except Exception as e:
self.log(f"获取影片列表出错: {str(e)}")
return []
def _extract_video(self, element, category_id=None):
"""提取影片信息 - 特别处理特殊分区的链接"""
try:
link = element.xpath('./@href')[0]
if link.startswith('/'):
link = self.host + link
# 检查是否是特殊分区的链接
is_special_link = 'ar-kk.html' in link or 'ar.html' in link
# 对于特殊分区直接使用链接本身作为ID
if is_special_link and category_id in self.special_categories:
# 提取链接中的参数
parsed_url = urllib.parse.urlparse(link)
query_params = urllib.parse.parse_qs(parsed_url.query)
# 获取视频ID从v参数中提取
video_url = query_params.get('v', [''])[0]
if video_url:
# 从视频URL中提取ID
video_id_match = re.search(r'/([a-f0-9-]+)/video\.m3u8', video_url)
if video_id_match:
video_id = video_id_match.group(1)
else:
# 如果没有匹配到,使用哈希值
video_id = str(hash(link) % 1000000)
else:
video_id = str(hash(link) % 1000000)
# 对于特殊分区保留完整的链接作为vod_id的一部分
final_vod_id = f"special_{category_id}_{video_id}_{urllib.parse.quote(link)}"
else:
# 常规处理
vod_id = self.regStr(r'm=(\d+)', link)
if not vod_id:
vod_id = str(hash(link) % 1000000)
final_vod_id = f"618041.xyz_{vod_id}"
if category_id:
final_vod_id = f"618041.xyz_{category_id}_{vod_id}"
# 提取标题
title_elem = element.xpath('.//p[@class="km-script"]/text()')
if not title_elem:
title_elem = element.xpath('.//p[contains(@class, "script")]/text()')
if not title_elem:
title_elem = element.xpath('.//p/text()')
if not title_elem:
title_elem = element.xpath('.//h3/text()')
if not title_elem:
title_elem = element.xpath('.//h4/text()')
if not title_elem:
self.log(f"未找到标题元素,跳过该视频")
return None
title_encrypted = title_elem[0].strip()
title = self._decrypt_title(title_encrypted)
# 提取图片
pic_elem = element.xpath('.//img/@data-original')
if not pic_elem:
pic_elem = element.xpath('.//img/@src')
pic = pic_elem[0] if pic_elem else ''
if pic:
if pic.startswith('//'):
pic = 'https:' + pic
elif pic.startswith('/'):
pic = self.host + pic
return {
'vod_id': final_vod_id,
'vod_name': title,
'vod_pic': pic,
'vod_remarks': '',
'vod_year': ''
}
except Exception as e:
self.log(f"提取影片信息出错: {str(e)}")
return None
def _decrypt_title(self, encrypted_text):
"""解密标题 - 使用网站的解密算法"""
try:
decrypted_chars = []
for char in encrypted_text:
code_point = ord(char)
decrypted_code = code_point ^ 128
decrypted_char = chr(decrypted_code)
decrypted_chars.append(decrypted_char)
decrypted_text = ''.join(decrypted_chars)
return decrypted_text
except Exception as e:
self.log(f"标题解密失败: {str(e)}")
return encrypted_text
def _get_detail(self, doc, html_content, vid):
"""获取详情信息 (优化版) - 修复播放源提取问题"""
try:
title = self._get_text(doc, ['//h1/text()', '//title/text()'])
pic = self._get_text(doc, ['//div[contains(@class,"dyimg")]//img/@src', '//img[contains(@class,"poster")]/@src'])
if pic and pic.startswith('/'):
pic = self.host + pic
desc = self._get_text(doc, ['//div[contains(@class,"yp_context")]/text()', '//div[contains(@class,"introduction")]//text()'])
actor = self._get_text(doc, ['//span[contains(text(),"主演")]/following-sibling::*/text()'])
director = self._get_text(doc, ['//span[contains(text(),"导演")]/following-sibling::*/text()'])
play_from = []
play_urls = []
# 使用更灵活的正则匹配来查找播放链接
player_link_patterns = [
re.compile(r'href="(.*?ar\.html.*?)"'),
re.compile(r'href="(.*?kkyd\.html.*?)"'),
re.compile(r'href="(.*?ar-kk\.html.*?)"')
]
player_links = []
for pattern in player_link_patterns:
matches = pattern.findall(html_content)
player_links.extend(matches)
if player_links:
episodes = []
for link in player_links:
full_url = urljoin(self.host, link)
episodes.append(f"第1集${full_url}")
if episodes:
play_from.append("默认播放源")
play_urls.append('#'.join(episodes))
if not play_from:
self.log("未找到播放源元素,无法定位播放源列表")
return {
'vod_id': vid,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '默认播放源',
'vod_play_url': f"第1集${vid}"
}
return {
'vod_id': vid,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '$$$'.join(play_from),
'vod_play_url': '$$$'.join(play_urls)
}
except Exception as e:
self.log(f"获取详情出错: {str(e)}")
return None
def _get_text(self, doc, selectors):
"""通用文本提取"""
for selector in selectors:
try:
texts = doc.xpath(selector)
for text in texts:
if text and text.strip():
return text.strip()
except:
continue
return ''
def log(self, message):
"""日志输出"""
print(f"[苹果视频] {message}")
def fetch(self, url, headers=None, method='GET', data=None, timeout=10):
"""网络请求"""
try:
if headers is None:
headers = self.headers
if method == 'GET':
response = requests.get(url, headers=headers, timeout=timeout, verify=False)
else:
response = requests.post(url, headers=headers, data=data, timeout=timeout, verify=False)
return response
except Exception as e:
self.log(f"网络请求失败: {url}, 错误: {str(e)}")
return None
# 注册爬虫
if __name__ == '__main__':
from base.spider import Spider as BaseSpider
BaseSpider.register(Spider())