tvbox/py/adult/香蕉.py
Wang.Luo fdc95360b4 ```
feat(adult): 更新51吸瓜插件并优化站点选择逻辑

- 修改插件作者标识为 🌈 Love
- 重构 homeContent、categoryContent 等方法,增强容错与分类获取逻辑
- 引入动态 URL 测试机制,自动选择可用站点
- 改进视频播放地址解析逻辑,支持多集识别和去重
- 增加请求日志输出,便于调试
- 修复部分页面结构兼容问题,提升稳定性
- 更新 adult.json 配置,调整插件路径并新增“4K数毛”源
```
2025-09-20 03:18:34 +08:00

669 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding=utf-8
#!/usr/bin/python
import sys
sys.path.append('..')
from base.spider import Spider
import json
import time
import urllib.parse
import re
import requests
from lxml import etree
from urllib.parse import urljoin
class Spider(Spider):
def getName(self):
return "苹果视频"
def init(self, extend=""):
self.host = "https://618041.xyz"
self.api_host = "https://h5.xxoo168.org"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': self.host
}
# 定义特殊分区ID列表包含所有需要特殊处理的分类
self.special_categories = ['13', '14', '33', '53', '32', '52', '9']
self.log(f"苹果视频爬虫初始化完成,主站: {self.host}")
def html(self, content):
"""将HTML内容转换为可查询的对象"""
try:
return etree.HTML(content)
except:
self.log("HTML解析失败")
return None
def regStr(self, pattern, string, index=1):
"""正则表达式提取字符串"""
try:
match = re.search(pattern, string, re.IGNORECASE)
if match and len(match.groups()) >= index:
return match.group(index)
except:
pass
return ""
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
"""获取首页内容和分类"""
result = {}
# 只保留指定的分类
classes = [
{'type_id': '618041.xyz_1', 'type_name': '全部视频'},
{'type_id': '618041.xyz_13', 'type_name': '香蕉精品'},
{'type_id': '618041.xyz_22', 'type_name': '制服诱惑'},
{'type_id': '618041.xyz_6', 'type_name': '国产视频'},
{'type_id': '618041.xyz_8', 'type_name': '清纯少女'},
{'type_id': '618041.xyz_9', 'type_name': '辣妹大奶'},
{'type_id': '618041.xyz_10', 'type_name': '女同专属'},
{'type_id': '618041.xyz_11', 'type_name': '素人出演'},
{'type_id': '618041.xyz_12', 'type_name': '角色扮演'},
{'type_id': '618041.xyz_20', 'type_name': '人妻熟女'},
{'type_id': '618041.xyz_23', 'type_name': '日韩剧情'},
{'type_id': '618041.xyz_21', 'type_name': '经典伦理'},
{'type_id': '618041.xyz_7', 'type_name': '成人动漫'},
{'type_id': '618041.xyz_14', 'type_name': '精品二区'},
{'type_id': '618041.xyz_53', 'type_name': '动漫中字'},
{'type_id': '618041.xyz_52', 'type_name': '日本无码'},
{'type_id': '618041.xyz_33', 'type_name': '中文字幕'},
{'type_id': '618041.xyz_32', 'type_name': '国产自拍'}
]
result['class'] = classes
try:
rsp = self.fetch(self.host, headers=self.headers)
doc = self.html(rsp.text)
videos = self._get_videos(doc, limit=20)
result['list'] = videos
except Exception as e:
self.log(f"首页获取出错: {str(e)}")
result['list'] = []
return result
def homeVideoContent(self):
"""分类定义 - 兼容性方法"""
return {
'class': [
{'type_id': '618041.xyz_1', 'type_name': '全部视频'},
{'type_id': '618041.xyz_13', 'type_name': '香蕉精品'},
{'type_id': '618041.xyz_22', 'type_name': '制服诱惑'},
{'type_id': '618041.xyz_6', 'type_name': '国产视频'},
{'type_id': '618041.xyz_8', 'type_name': '清纯少女'},
{'type_id': '618041.xyz_9', 'type_name': '辣妹大奶'},
{'type_id': '618041.xyz_10', 'type_name': '女同专属'},
{'type_id': '618041.xyz_11', 'type_name': '素人出演'},
{'type_id': '618041.xyz_12', 'type_name': '角色扮演'},
{'type_id': '618041.xyz_20', 'type_name': '人妻熟女'},
{'type_id': '618041.xyz_23', 'type_name': '日韩剧情'},
{'type_id': '618041.xyz_21', 'type_name': '经典伦理'},
{'type_id': '618041.xyz_7', 'type_name': '成人动漫'},
{'type_id': '618041.xyz_14', 'type_name': '精品二区'},
{'type_id': '618041.xyz_53', 'type_name': '动漫中字'},
{'type_id': '618041.xyz_52', 'type_name': '日本无码'},
{'type_id': '618041.xyz_33', 'type_name': '中文字幕'},
{'type_id': '618041.xyz_32', 'type_name': '国产自拍'}
]
}
def categoryContent(self, tid, pg, filter, extend):
"""分类内容 - 修改为使用固定页数设置"""
try:
domain, type_id = tid.split('_')
url = f"https://{domain}/index.php/vod/type/id/{type_id}.html"
if pg and pg != '1':
url = url.replace('.html', f'/page/{pg}.html')
self.log(f"访问分类URL: {url}")
rsp = self.fetch(url, headers=self.headers)
doc = self.html(rsp.text)
# 在这里将 type_id 传递给 _get_videos 方法
videos = self._get_videos(doc, category_id=type_id, limit=20)
# 使用固定页数设置,而不是尝试从页面解析
pagecount = 999
total = 19980
return {
'list': videos,
'page': int(pg),
'pagecount': pagecount,
'limit': 20,
'total': total
}
except Exception as e:
self.log(f"分类内容获取出错: {str(e)}")
return {'list': []}
def searchContent(self, key, quick, pg="1"):
"""搜索功能 - 完全修复版"""
try:
# 构造搜索URL
search_url = f"{self.host}/index.php/vod/type/id/1/wd/{urllib.parse.quote(key)}/page/{pg}.html"
self.log(f"搜索URL: {search_url}")
# 发送请求
rsp = self.fetch(search_url, headers=self.headers)
if not rsp or rsp.status_code != 200:
self.log("搜索请求失败")
return {'list': []}
# 解析HTML
doc = self.html(rsp.text)
if not doc:
self.log("搜索页面解析失败")
return {'list': []}
# 提取搜索结果
videos = self._get_videos(doc, limit=20)
# 尝试从页面提取分页信息
pagecount = 5 # 默认值
total = 100 # 默认值
# 尝试从分页元素中提取真实的分页信息
page_elements = doc.xpath('//div[@class="mypage"]/a')
if page_elements and len(page_elements) > 0:
try:
# 查找尾页链接
last_page = None
for elem in page_elements:
href = elem.xpath('./@href')[0]
if '尾页' in elem.text or 'page/' in href:
last_page = href
break
if last_page:
# 从尾页URL中提取页码
page_match = re.search(r'/page/(\d+)\.html', last_page)
if page_match:
pagecount = int(page_match.group(1))
total = pagecount * 20 # 估算总数
except:
pass
return {
'list': videos,
'page': int(pg),
'pagecount': pagecount,
'limit': 20,
'total': total
}
except Exception as e:
self.log(f"搜索出错: {str(e)}")
return {'list': []}
def detailContent(self, ids):
"""详情页面 - 特别处理特殊分区的链接"""
try:
vid = ids[0]
# 检查是否是特殊分区的链接
if vid.startswith('special_'):
# 解析特殊分区ID格式: special_{category_id}_{video_id}_{encoded_url}
parts = vid.split('_')
if len(parts) >= 4:
category_id = parts[1]
video_id = parts[2]
encoded_url = '_'.join(parts[3:])
play_url = urllib.parse.unquote(encoded_url)
self.log(f"特殊分区视频,直接使用链接: {play_url}")
# 从播放链接中提取视频URL
parsed_url = urllib.parse.urlparse(play_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
video_url = query_params.get('v', [''])[0]
pic_url = query_params.get('b', [''])[0]
title_encrypted = query_params.get('m', [''])[0]
# 解码标题
title = self._decrypt_title(title_encrypted)
return {
'list': [{
'vod_id': vid,
'vod_name': title,
'vod_pic': pic_url,
'vod_remarks': '',
'vod_year': '',
'vod_play_from': '直接播放',
'vod_play_url': f"第1集${play_url}"
}]
}
# 常规处理
if '_' in vid and len(vid.split('_')) > 2:
domain, category_id, video_id = vid.split('_')
else:
domain, video_id = vid.split('_')
detail_url = f"https://{domain}/index.php/vod/detail/id/{video_id}.html"
self.log(f"访问详情URL: {detail_url}")
rsp = self.fetch(detail_url, headers=self.headers)
doc = self.html(rsp.text)
video_info = self._get_detail(doc, rsp.text, vid)
return {'list': [video_info]} if video_info else {'list': []}
except Exception as e:
self.log(f"详情获取出错: {str(e)}")
return {'list': []}
def playerContent(self, flag, id, vipFlags):
"""播放链接 - 特别处理特殊分区的链接"""
try:
self.log(f"获取播放链接: flag={flag}, id={id}")
# 检查是否是特殊分区的链接
if id.startswith('special_'):
# 解析特殊分区ID格式: special_{category_id}_{video_id}_{encoded_url}
parts = id.split('_')
if len(parts) >= 4:
category_id = parts[1]
video_id = parts[2]
encoded_url = '_'.join(parts[3:])
play_url = urllib.parse.unquote(encoded_url)
self.log(f"特殊分区视频,直接使用链接: {play_url}")
# 从播放链接中提取视频URL
parsed_url = urllib.parse.urlparse(play_url)
query_params = urllib.parse.parse_qs(parsed_url.query)
video_url = query_params.get('v', [''])[0]
if video_url:
# 确保URL是完整的
if video_url.startswith('//'):
video_url = 'https:' + video_url
elif not video_url.startswith('http'):
video_url = urljoin(self.host, video_url)
self.log(f"从特殊链接中提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
# 检查传入的ID是否为完整URL如果是则直接解析
if id.startswith('http'):
self.log("ID 是一个完整URL直接解析参数")
parsed_url = urllib.parse.urlparse(id)
query_params = urllib.parse.parse_qs(parsed_url.query)
# 尝试获取视频参数
video_url = query_params.get('v', [''])[0]
if not video_url:
# 尝试其他可能的参数名
for key in query_params:
if key in ['url', 'src', 'file']:
video_url = query_params[key][0]
break
if video_url:
# 解码可能的URL编码
video_url = urllib.parse.unquote(video_url)
# 确保URL是完整的
if video_url.startswith('//'):
video_url = 'https:' + video_url
elif not video_url.startswith('http'):
# 尝试添加基本域名
video_url = urljoin(self.host, video_url)
self.log(f"从 URL 参数中提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
else:
self.log("URL 中没有找到视频参数,尝试从页面提取")
# 请求页面并提取视频链接
rsp = self.fetch(id, headers=self.headers)
if rsp and rsp.status_code == 200:
video_url = self._extract_direct_video_url(rsp.text)
if video_url:
self.log(f"从页面提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
self.log("无法从页面提取视频链接返回原始URL")
return {'parse': 1, 'playUrl': '', 'url': id}
# 从新的 id 格式中提取视频ID和分类ID
if id.count('_') >= 2:
parts = id.split('_')
video_id = parts[-1]
category_id = parts[1]
else:
video_id = id.split('_')[-1]
category_id = ''
self.log(f"视频ID: {video_id}, 分类ID: {category_id}")
# 对于特殊分类,使用直接解析播放页面的方式
if category_id in self.special_categories:
self.log("特殊分类,尝试从详情页提取直接播放链接")
# 构造播放页面URL
play_page_url = f"{self.host}/index.php/vod/play/id/{video_id}.html"
# 请求播放页面
rsp = self.fetch(play_page_url, headers=self.headers)
if rsp and rsp.status_code == 200:
# 从页面提取视频链接
video_url = self._extract_direct_video_url(rsp.text)
if video_url:
self.log(f"从播放页面提取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
# 如果提取失败回退到API方式
self.log("从播放页面提取失败尝试API方式")
return self._get_video_by_api(id, video_id)
else:
# 其他分类使用API方式
self.log("使用API方式获取视频地址")
return self._get_video_by_api(id, video_id)
except Exception as e:
self.log(f"播放链接获取出错: {str(e)}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
return {'parse': 1, 'playUrl': '', 'url': play_url}
def _get_video_by_api(self, id, video_id):
"""通过API获取视频地址"""
try:
api_url = f"{self.api_host}/api/v2/vod/reqplay/{video_id}"
self.log(f"请求API获取视频地址: {api_url}")
api_headers = self.headers.copy()
api_headers.update({
'Referer': f"{self.host}/",
'Origin': self.host,
'X-Requested-With': 'XMLHttpRequest'
})
api_response = self.fetch(api_url, headers=api_headers)
if api_response and api_response.status_code == 200:
data = api_response.json()
self.log(f"API响应: {data}")
if data.get('retcode') == 3:
video_url = data.get('data', {}).get('httpurl_preview', '')
else:
video_url = data.get('data', {}).get('httpurl', '')
if video_url:
video_url = video_url.replace('?300', '')
self.log(f"从API获取到视频地址: {video_url}")
return {'parse': 0, 'playUrl': '', 'url': video_url}
else:
self.log("API响应中没有找到视频地址")
else:
self.log(f"API请求失败状态码: {api_response.status_code if api_response else '无响应'}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
self.log(f"API请求失败回退到播放页面: {play_url}")
return {'parse': 1, 'playUrl': '', 'url': play_url}
except Exception as e:
self.log(f"API方式获取视频出错: {str(e)}")
if '_' in id:
domain, play_id = id.split('_')
play_url = f"https://{domain}/html/kkyd.html?m={play_id}"
else:
play_url = f"{self.host}/html/kkyd.html?m={id}"
return {'parse': 1, 'playUrl': '', 'url': play_url}
def _extract_direct_video_url(self, html_content):
"""从HTML内容中提取直接播放链接 (优化版)"""
try:
# 首先尝试提取明显的视频链接
patterns = [
r'v=([^&]+\.(?:m3u8|mp4))',
r'"url"\s*:\s*["\']([^"\']+\.(?:mp4|m3u8))["\']',
r'src\s*=\s*["\']([^"\']+\.(?:mp4|m3u8))["\']',
r'http[^\s<>"\'?]+\.(?:mp4|m3u8)'
]
for pattern in patterns:
matches = re.findall(pattern, html_content, re.IGNORECASE)
for match in matches:
if isinstance(match, tuple):
match = match[0]
extracted_url = match.replace('\\', '')
extracted_url = urllib.parse.unquote(extracted_url)
if extracted_url.startswith('//'):
extracted_url = 'https:' + extracted_url
elif extracted_url.startswith('http'):
return extracted_url
return None
except Exception as e:
self.log(f"提取直接播放URL出错: {str(e)}")
return None
def _get_videos(self, doc, category_id=None, limit=None):
"""获取影片列表 - 根据实际网站结构"""
try:
videos = []
elements = doc.xpath('//a[@class="vodbox"]')
self.log(f"找到 {len(elements)} 个vodbox元素")
for elem in elements:
video = self._extract_video(elem, category_id)
if video:
videos.append(video)
return videos[:limit] if limit and videos else videos
except Exception as e:
self.log(f"获取影片列表出错: {str(e)}")
return []
def _extract_video(self, element, category_id=None):
"""提取影片信息 - 特别处理特殊分区的链接"""
try:
link = element.xpath('./@href')[0]
if link.startswith('/'):
link = self.host + link
# 检查是否是特殊分区的链接
is_special_link = 'ar-kk.html' in link or 'ar.html' in link
# 对于特殊分区直接使用链接本身作为ID
if is_special_link and category_id in self.special_categories:
# 提取链接中的参数
parsed_url = urllib.parse.urlparse(link)
query_params = urllib.parse.parse_qs(parsed_url.query)
# 获取视频ID从v参数中提取
video_url = query_params.get('v', [''])[0]
if video_url:
# 从视频URL中提取ID
video_id_match = re.search(r'/([a-f0-9-]+)/video\.m3u8', video_url)
if video_id_match:
video_id = video_id_match.group(1)
else:
# 如果没有匹配到,使用哈希值
video_id = str(hash(link) % 1000000)
else:
video_id = str(hash(link) % 1000000)
# 对于特殊分区保留完整的链接作为vod_id的一部分
final_vod_id = f"special_{category_id}_{video_id}_{urllib.parse.quote(link)}"
else:
# 常规处理
vod_id = self.regStr(r'm=(\d+)', link)
if not vod_id:
vod_id = str(hash(link) % 1000000)
final_vod_id = f"618041.xyz_{vod_id}"
if category_id:
final_vod_id = f"618041.xyz_{category_id}_{vod_id}"
# 提取标题
title_elem = element.xpath('.//p[@class="km-script"]/text()')
if not title_elem:
title_elem = element.xpath('.//p[contains(@class, "script")]/text()')
if not title_elem:
title_elem = element.xpath('.//p/text()')
if not title_elem:
title_elem = element.xpath('.//h3/text()')
if not title_elem:
title_elem = element.xpath('.//h4/text()')
if not title_elem:
self.log(f"未找到标题元素,跳过该视频")
return None
title_encrypted = title_elem[0].strip()
title = self._decrypt_title(title_encrypted)
# 提取图片
pic_elem = element.xpath('.//img/@data-original')
if not pic_elem:
pic_elem = element.xpath('.//img/@src')
pic = pic_elem[0] if pic_elem else ''
if pic:
if pic.startswith('//'):
pic = 'https:' + pic
elif pic.startswith('/'):
pic = self.host + pic
return {
'vod_id': final_vod_id,
'vod_name': title,
'vod_pic': pic,
'vod_remarks': '',
'vod_year': ''
}
except Exception as e:
self.log(f"提取影片信息出错: {str(e)}")
return None
def _decrypt_title(self, encrypted_text):
"""解密标题 - 使用网站的解密算法"""
try:
decrypted_chars = []
for char in encrypted_text:
code_point = ord(char)
decrypted_code = code_point ^ 128
decrypted_char = chr(decrypted_code)
decrypted_chars.append(decrypted_char)
decrypted_text = ''.join(decrypted_chars)
return decrypted_text
except Exception as e:
self.log(f"标题解密失败: {str(e)}")
return encrypted_text
def _get_detail(self, doc, html_content, vid):
"""获取详情信息 (优化版) - 修复播放源提取问题"""
try:
title = self._get_text(doc, ['//h1/text()', '//title/text()'])
pic = self._get_text(doc, ['//div[contains(@class,"dyimg")]//img/@src', '//img[contains(@class,"poster")]/@src'])
if pic and pic.startswith('/'):
pic = self.host + pic
desc = self._get_text(doc, ['//div[contains(@class,"yp_context")]/text()', '//div[contains(@class,"introduction")]//text()'])
actor = self._get_text(doc, ['//span[contains(text(),"主演")]/following-sibling::*/text()'])
director = self._get_text(doc, ['//span[contains(text(),"导演")]/following-sibling::*/text()'])
play_from = []
play_urls = []
# 使用更灵活的正则匹配来查找播放链接
player_link_patterns = [
re.compile(r'href="(.*?ar\.html.*?)"'),
re.compile(r'href="(.*?kkyd\.html.*?)"'),
re.compile(r'href="(.*?ar-kk\.html.*?)"')
]
player_links = []
for pattern in player_link_patterns:
matches = pattern.findall(html_content)
player_links.extend(matches)
if player_links:
episodes = []
for link in player_links:
full_url = urljoin(self.host, link)
episodes.append(f"第1集${full_url}")
if episodes:
play_from.append("默认播放源")
play_urls.append('#'.join(episodes))
if not play_from:
self.log("未找到播放源元素,无法定位播放源列表")
return {
'vod_id': vid,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '默认播放源',
'vod_play_url': f"第1集${vid}"
}
return {
'vod_id': vid,
'vod_name': title,
'vod_pic': pic,
'type_name': '',
'vod_year': '',
'vod_area': '',
'vod_remarks': '',
'vod_actor': actor,
'vod_director': director,
'vod_content': desc,
'vod_play_from': '$$$'.join(play_from),
'vod_play_url': '$$$'.join(play_urls)
}
except Exception as e:
self.log(f"获取详情出错: {str(e)}")
return None
def _get_text(self, doc, selectors):
"""通用文本提取"""
for selector in selectors:
try:
texts = doc.xpath(selector)
for text in texts:
if text and text.strip():
return text.strip()
except:
continue
return ''
def log(self, message):
"""日志输出"""
print(f"[苹果视频] {message}")
def fetch(self, url, headers=None, method='GET', data=None, timeout=10):
"""网络请求"""
try:
if headers is None:
headers = self.headers
if method == 'GET':
response = requests.get(url, headers=headers, timeout=timeout, verify=False)
else:
response = requests.post(url, headers=headers, data=data, timeout=timeout, verify=False)
return response
except Exception as e:
self.log(f"网络请求失败: {url}, 错误: {str(e)}")
return None
# 注册爬虫
if __name__ == '__main__':
from base.spider import Spider as BaseSpider
BaseSpider.register(Spider())