diff --git a/api.json b/api.json index 0f2709f..7437093 100644 --- a/api.json +++ b/api.json @@ -929,6 +929,17 @@ "quickSearch": 0, "filterable": 0 }, + { + "key": "河马短剧", + "name": "河马|短剧", + "type": 3, + "api": "./py/河马短剧.py", + "searchable": 1, + "changeable": 1, + "quickSearch": 1, + "filterable": 1, + "playerType": 2 + }, { "key": "偷乐短剧", "name": "偷乐|短剧", @@ -957,15 +968,6 @@ "quickSearch": 1, "filterable": 1 }, - { - "key": "海马影视APP", - "name": "海马|APP", - "type": 3, - "api": "./PyramidStore/plugin/app/海马影视APP.py", - "searchable": 1, - "quickSearch": 1, - "filterable": 1 - }, { "key": "国外剧APP", "name": "国外剧|APP", diff --git a/py/河马短剧.py b/py/河马短剧.py index eeee8ba..0085507 100644 --- a/py/河马短剧.py +++ b/py/河马短剧.py @@ -4,6 +4,7 @@ import re import json import traceback import sys +from urllib.parse import quote sys.path.append('../../') try: @@ -17,7 +18,6 @@ except ImportError: class Spider(Spider): def __init__(self): self.siteUrl = "https://www.kuaikaw.cn" - self.nextData = None # 缓存NEXT_DATA数据 self.cateManual = { "甜宠": "462", "古装仙侠": "1102", @@ -30,552 +30,351 @@ class Spider(Spider): "总裁": "1147", "职场商战": "943" } + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", + "Referer": self.siteUrl, + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8" + } def getName(self): - # 返回爬虫名称 return "河马短剧" def init(self, extend=""): return - def fetch(self, url, headers=None): + def fetch(self, url, headers=None, retry=2): """统一的网络请求接口""" if headers is None: - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", - "Referer": self.siteUrl, - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8" - } + headers = self.headers - try: - response = requests.get(url, headers=headers, timeout=10, allow_redirects=True) - response.raise_for_status() - return response - except Exception as e: - print(f"请求异常: {url}, 错误: {str(e)}") - return None + for i in range(retry + 1): + try: + response = requests.get(url, headers=headers, timeout=10, allow_redirects=True) + response.raise_for_status() + return response + except Exception as e: + if i == retry: + print(f"请求异常: {url}, 错误: {str(e)}") + return None + continue def isVideoFormat(self, url): - # 检查是否为视频格式 video_formats = ['.mp4', '.mkv', '.avi', '.wmv', '.m3u8', '.flv', '.rmvb'] - for format in video_formats: - if format in url.lower(): - return True - return False + return any(format in url.lower() for format in video_formats) def manualVideoCheck(self): - # 不需要手动检查 return False def homeContent(self, filter): - """获取首页分类及筛选""" result = {} - # 分类列表,使用已初始化的cateManual - classes = [] - for k in self.cateManual: - classes.append({ - 'type_name': k, - 'type_id': self.cateManual[k] - }) + classes = [{'type_name': k, 'type_id': v} for k, v in self.cateManual.items()] result['class'] = classes - # 获取首页推荐视频 + try: result['list'] = self.homeVideoContent()['list'] except: result['list'] = [] - return result def homeVideoContent(self): - """获取首页推荐视频内容""" videos = [] try: response = self.fetch(self.siteUrl) + if not response: + return {'list': []} + html_content = response.text - # 提取NEXT_DATA JSON数据 next_data_pattern = r'' next_data_match = re.search(next_data_pattern, html_content, re.DOTALL) - if next_data_match: - next_data_json = json.loads(next_data_match.group(1)) - page_props = next_data_json.get("props", {}).get("pageProps", {}) - # 获取轮播图数据 - 这些通常是推荐内容 - if "bannerList" in page_props and isinstance(page_props["bannerList"], list): - banner_list = page_props["bannerList"] - for banner in banner_list: - book_id = banner.get("bookId", "") - book_name = banner.get("bookName", "") - cover_url = banner.get("coverWap", banner.get("wapUrl", "")) - # 获取状态和章节数 - status = banner.get("statusDesc", "") - total_chapters = banner.get("totalChapterNum", "") - if book_id and book_name: - videos.append({ - "vod_id": f"/drama/{book_id}", - "vod_name": book_name, - "vod_pic": cover_url, - "vod_remarks": f"{status} {total_chapters}集" if total_chapters else status - }) + if not next_data_match: + return {'list': []} - # SEO分类下的推荐 - if "seoColumnVos" in page_props and isinstance(page_props["seoColumnVos"], list): - for column in page_props["seoColumnVos"]: - book_infos = column.get("bookInfos", []) - for book in book_infos: - book_id = book.get("bookId", "") - book_name = book.get("bookName", "") - cover_url = book.get("coverWap", "") - status = book.get("statusDesc", "") - total_chapters = book.get("totalChapterNum", "") - - if book_id and book_name: - videos.append({ - "vod_id": f"/drama/{book_id}", - "vod_name": book_name, - "vod_pic": cover_url, - "vod_remarks": f"{status} {total_chapters}集" if total_chapters else status - }) - - # # 去重 - # seen = set() - # unique_videos = [] - # for video in videos: - # if video["vod_id"] not in seen: - # seen.add(video["vod_id"]) - # unique_videos.append(video) - # videos = unique_videos + next_data_json = json.loads(next_data_match.group(1)) + page_props = next_data_json.get("props", {}).get("pageProps", {}) + + # 处理轮播图数据 + if "bannerList" in page_props: + for banner in page_props["bannerList"]: + if banner.get("bookId"): + videos.append({ + "vod_id": f"/drama/{banner['bookId']}", + "vod_name": banner.get("bookName", ""), + "vod_pic": banner.get("coverWap", ""), + "vod_remarks": f"{banner.get('statusDesc', '')} {banner.get('totalChapterNum', '')}集".strip() + }) + + # 处理SEO分类推荐 + if "seoColumnVos" in page_props: + for column in page_props["seoColumnVos"]: + for book in column.get("bookInfos", []): + if book.get("bookId"): + videos.append({ + "vod_id": f"/drama/{book['bookId']}", + "vod_name": book.get("bookName", ""), + "vod_pic": book.get("coverWap", ""), + "vod_remarks": f"{book.get('statusDesc', '')} {book.get('totalChapterNum', '')}集".strip() + }) + + # 去重处理 + seen = set() + unique_videos = [] + for video in videos: + key = (video["vod_id"], video["vod_name"]) + if key not in seen: + seen.add(key) + unique_videos.append(video) except Exception as e: print(f"获取首页推荐内容出错: {e}") + unique_videos = [] - result = { - "list": videos - } - return result + return {'list': unique_videos} def categoryContent(self, tid, pg, filter, extend): - """获取分类内容""" - result = {} - videos = [] + result = {'list': [], 'page': pg, 'pagecount': 1, 'limit': 20, 'total': 0} url = f"{self.siteUrl}/browse/{tid}/{pg}" + response = self.fetch(url) + if not response: + return result + html_content = response.text - # 提取NEXT_DATA JSON数据 - next_data_pattern = r'' - next_data_match = re.search(next_data_pattern, html_content, re.DOTALL) - if next_data_match: + next_data_match = re.search(r'', html_content, re.DOTALL) + if not next_data_match: + return result + + try: next_data_json = json.loads(next_data_match.group(1)) page_props = next_data_json.get("props", {}).get("pageProps", {}) - # 获取总页数和当前页 + current_page = page_props.get("page", 1) total_pages = page_props.get("pages", 1) - # 获取书籍列表 book_list = page_props.get("bookList", []) - # 转换为通用格式 + + videos = [] for book in book_list: - book_id = book.get("bookId", "") - book_name = book.get("bookName", "") - cover_url = book.get("coverWap", "") - status_desc = book.get("statusDesc", "") - total_chapters = book.get("totalChapterNum", "") - if book_id and book_name: + if book.get("bookId"): videos.append({ - "vod_id": f"/drama/{book_id}", - "vod_name": book_name, - "vod_pic": cover_url, - "vod_remarks": f"{status_desc} {total_chapters}集" if total_chapters else status_desc + "vod_id": f"/drama/{book['bookId']}", + "vod_name": book.get("bookName", ""), + "vod_pic": book.get("coverWap", ""), + "vod_remarks": f"{book.get('statusDesc', '')} {book.get('totalChapterNum', '')}集".strip() }) - # 构建返回结果 - result = { - "list": videos, - "page": int(current_page), - "pagecount": total_pages, - "limit": len(videos), - "total": total_pages * len(videos) if videos else 0 - } + + result.update({ + 'list': videos, + 'page': int(current_page), + 'pagecount': total_pages, + 'limit': len(videos), + 'total': len(videos) * total_pages if videos else 0 + }) + + except Exception as e: + print(f"分类内容获取出错: {e}") + return result - def switch(self, key, pg): - # 搜索功能 - search_results = [] - # 获取第一页结果,并检查总页数 - url = f"{self.siteUrl}/search?searchValue={key}&page={pg}" - response = self.fetch(url) - html_content = response.text - # 提取NEXT_DATA JSON数据 - next_data_pattern = r'' - next_data_match = re.search(next_data_pattern, html_content, re.DOTALL) - if next_data_match: - next_data_json = json.loads(next_data_match.group(1)) - page_props = next_data_json.get("props", {}).get("pageProps", {}) - # 获取总页数 - total_pages = page_props.get("pages", 1) - # 处理所有页的数据 - all_book_list = [] - # 添加第一页的书籍列表 - book_list = page_props.get("bookList", []) - all_book_list.extend(book_list) - # 如果有多页,获取其他页的数据 - if total_pages > 1 : # quick模式只获取第一页 - for page in range(2, total_pages + 1): - next_page_url = f"{self.siteUrl}/search?searchValue={key}&page={page}" - next_page_response = self.fetch(next_page_url) - next_page_html = next_page_response.text - next_page_match = re.search(next_data_pattern, next_page_html, re.DOTALL) - if next_page_match: - next_page_json = json.loads(next_page_match.group(1)) - next_page_props = next_page_json.get("props", {}).get("pageProps", {}) - next_page_books = next_page_props.get("bookList", []) - all_book_list.extend(next_page_books) - # 转换为统一的搜索结果格式 - for book in all_book_list: - book_id = book.get("bookId", "") - book_name = book.get("bookName", "") - cover_url = book.get("coverWap", "") - total_chapters = book.get("totalChapterNum", "0") - status_desc = book.get("statusDesc", "") - # 构建视频项 - vod = { - "vod_id": f"/drama/{book_id}", - "vod_name": book_name, - "vod_pic": cover_url, - "vod_remarks": f"{status_desc} {total_chapters}集" - } - search_results.append(vod) - result = { - "list": search_results, - "page": pg - } - return result - def searchContent(self, key, quick, pg=1): - result = self.switch(key, pg=pg) - result['page'] = pg - return result + return self.searchContentPage(key, quick, pg) def searchContentPage(self, key, quick, pg=1): - return self.searchContent(key, quick, pg) + result = {'list': [], 'page': pg, 'pagecount': 1, 'limit': 20, 'total': 0} + search_url = f"{self.siteUrl}/search?searchValue={quote(key)}&page={pg}" + + response = self.fetch(search_url) + if not response: + return result + + html_content = response.text + next_data_match = re.search(r'', html_content, re.DOTALL) + if not next_data_match: + return result + + try: + next_data_json = json.loads(next_data_match.group(1)) + page_props = next_data_json.get("props", {}).get("pageProps", {}) + + total_pages = page_props.get("pages", 1) + book_list = page_props.get("bookList", []) + + videos = [] + for book in book_list: + if book.get("bookId"): + videos.append({ + "vod_id": f"/drama/{book['bookId']}", + "vod_name": book.get("bookName", ""), + "vod_pic": book.get("coverWap", ""), + "vod_remarks": f"{book.get('statusDesc', '')} {book.get('totalChapterNum', '')}集".strip() + }) + + result.update({ + 'list': videos, + 'pagecount': total_pages, + 'total': len(videos) * total_pages if videos else 0 + }) + + except Exception as e: + print(f"搜索内容出错: {e}") + + return result def detailContent(self, ids): - # 获取剧集信息 + result = {'list': []} + if not ids: + return result + vod_id = ids[0] - episode_id = None - chapter_id = None - if not vod_id.startswith('/drama/'): - if vod_id.startswith('/episode/'): - episode_info = vod_id.replace('/episode/', '').split('/') - if len(episode_info) >= 2: - episode_id = episode_info[0] - chapter_id = episode_info[1] - vod_id = f'/drama/{episode_id}' - else: - vod_id = '/drama/' + vod_id + vod_id = f'/drama/{vod_id}' - drama_url = self.siteUrl + vod_id - print(f"请求URL: {drama_url}") - - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", - "Referer": self.siteUrl, - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8" - } - - rsp = self.fetch(drama_url, headers=headers) - if not rsp or rsp.status_code != 200: - print(f"请求失败,状态码: {getattr(rsp, 'status_code', 'N/A')}") - return {} - - html = rsp.text + drama_url = f"{self.siteUrl}{vod_id}" + response = self.fetch(drama_url) + if not response: + return result + + html = response.text next_data_match = re.search(r'', html, re.DOTALL) - if not next_data_match: - print("未找到NEXT_DATA内容") - return {} - + return result + try: next_data = json.loads(next_data_match.group(1)) page_props = next_data.get("props", {}).get("pageProps", {}) - print(f"找到页面属性,包含 {len(page_props.keys())} 个键") - book_info = page_props.get("bookInfoVo", {}) chapter_list = page_props.get("chapterList", []) - title = book_info.get("title", "") - sub_title = f"{book_info.get('totalChapterNum', '')}集" + if not book_info.get("bookId"): + return result - categories = [] - for category in book_info.get("categoryList", []): - categories.append(category.get("name", "")) - - vod_content = book_info.get("introduction", "") + # 基本信息 + categories = [c.get("name", "") for c in book_info.get("categoryList", [])] + performers = [p.get("name", "") for p in book_info.get("performerList", [])] vod = { "vod_id": vod_id, - "vod_name": title, + "vod_name": book_info.get("title", ""), "vod_pic": book_info.get("coverWap", ""), "type_name": ",".join(categories), "vod_year": "", "vod_area": book_info.get("countryName", ""), - "vod_remarks": sub_title, - "vod_actor": ", ".join([p.get("name", "") for p in book_info.get("performerList", [])]), + "vod_remarks": f"{book_info.get('statusDesc', '')} {book_info.get('totalChapterNum', '')}集".strip(), + "vod_actor": ", ".join(performers), "vod_director": "", - "vod_content": vod_content + "vod_content": book_info.get("introduction", "") } - # 处理播放列表 - play_url_list = [] - episodes = [] - - if chapter_list: - print(f"找到 {len(chapter_list)} 个章节") - - # 先检查是否有可以直接使用的MP4链接作为模板 - mp4_template = None - first_mp4_chapter_id = None - - # 先搜索第一个章节的MP4链接 - # 为提高成功率,尝试直接请求第一个章节的播放页 - if chapter_list and len(chapter_list) > 0: - first_chapter = chapter_list[0] - first_chapter_id = first_chapter.get("chapterId", "") - drama_id_clean = vod_id.replace('/drama/', '') - - if first_chapter_id and drama_id_clean: - first_episode_url = f"{self.siteUrl}/episode/{drama_id_clean}/{first_chapter_id}" - print(f"请求第一集播放页: {first_episode_url}") - - first_rsp = self.fetch(first_episode_url, headers=headers) - if first_rsp and first_rsp.status_code == 200: - first_html = first_rsp.text - # 直接从HTML提取MP4链接 - mp4_pattern = r'(https?://[^"\']+\.mp4)' - mp4_matches = re.findall(mp4_pattern, first_html) - if mp4_matches: - mp4_template = mp4_matches[0] - first_mp4_chapter_id = first_chapter_id - print(f"找到MP4链接模板: {mp4_template}") - print(f"模板对应的章节ID: {first_mp4_chapter_id}") - - # 如果未找到模板,再检查章节对象中是否有MP4链接 - if not mp4_template: - for chapter in chapter_list[:5]: # 只检查前5个章节以提高效率 - if "chapterVideoVo" in chapter and chapter["chapterVideoVo"]: - chapter_video = chapter["chapterVideoVo"] - mp4_url = chapter_video.get("mp4", "") or chapter_video.get("mp4720p", "") or chapter_video.get("vodMp4Url", "") - if mp4_url and ".mp4" in mp4_url: - mp4_template = mp4_url - first_mp4_chapter_id = chapter.get("chapterId", "") - print(f"从chapterVideoVo找到MP4链接模板: {mp4_template}") - print(f"模板对应的章节ID: {first_mp4_chapter_id}") - break - - # 遍历所有章节处理播放信息 - for chapter in chapter_list: - chapter_id = chapter.get("chapterId", "") - chapter_name = chapter.get("chapterName", "") - - # 1. 如果章节自身有MP4链接,直接使用 - if "chapterVideoVo" in chapter and chapter["chapterVideoVo"]: - chapter_video = chapter["chapterVideoVo"] - mp4_url = chapter_video.get("mp4", "") or chapter_video.get("mp4720p", "") or chapter_video.get("vodMp4Url", "") - if mp4_url and ".mp4" in mp4_url: - episodes.append(f"{chapter_name}${mp4_url}") - continue - - # 2. 如果有MP4模板,尝试替换章节ID构建MP4链接 - if mp4_template and first_mp4_chapter_id and chapter_id: - # 替换模板中的章节ID部分 - if first_mp4_chapter_id in mp4_template: - new_mp4_url = mp4_template.replace(first_mp4_chapter_id, chapter_id) - episodes.append(f"{chapter_name}${new_mp4_url}") - continue - - # 3. 如果上述方法都不可行,回退到使用chapter_id构建中间URL - if chapter_id and chapter_name: - url = f"{vod_id}${chapter_id}${chapter_name}" - episodes.append(f"{chapter_name}${url}") - - if not episodes and vod_id: - # 尝试构造默认的集数 - total_chapters = int(book_info.get("totalChapterNum", "0")) - if total_chapters > 0: - print(f"尝试构造 {total_chapters} 个默认集数") - - # 如果知道章节ID的模式,可以构造 - if chapter_id and episode_id: - for i in range(1, total_chapters + 1): - chapter_name = f"第{i}集" - url = f"{vod_id}${chapter_id}${chapter_name}" - episodes.append(f"{chapter_name}${url}") - else: - # 使用普通的构造方式 - for i in range(1, total_chapters + 1): - chapter_name = f"第{i}集" - url = f"{vod_id}${chapter_name}" - episodes.append(f"{chapter_name}${url}") - - if episodes: - play_url_list.append("#".join(episodes)) + # 处理剧集 + play_urls = self.processEpisodes(vod_id, chapter_list) + if play_urls: vod['vod_play_from'] = '河马剧场' - vod['vod_play_url'] = '$$$'.join(play_url_list) + vod['vod_play_url'] = '$$$'.join(play_urls) + + result['list'] = [vod] - result = { - 'list': [vod] - } - return result except Exception as e: - print(f"解析详情页失败: {str(e)}") - print(traceback.format_exc()) - return {} + print(f"详情页解析出错: {e}") + traceback.print_exc() + + return result + + def processEpisodes(self, vod_id, chapter_list): + play_urls = [] + episodes = [] + + for chapter in chapter_list: + chapter_id = chapter.get("chapterId", "") + chapter_name = chapter.get("chapterName", "") + + if not chapter_id or not chapter_name: + continue + + # 尝试获取直接视频链接 + video_url = self.getDirectVideoUrl(chapter) + if video_url: + episodes.append(f"{chapter_name}${video_url}") + continue + + # 回退方案 + episodes.append(f"{chapter_name}${vod_id}${chapter_id}${chapter_name}") + + if episodes: + play_urls.append("#".join(episodes)) + + return play_urls + + def getDirectVideoUrl(self, chapter): + if "chapterVideoVo" not in chapter or not chapter["chapterVideoVo"]: + return None + + video_info = chapter["chapterVideoVo"] + for key in ["mp4", "mp4720p", "vodMp4Url"]: + if key in video_info and video_info[key] and ".mp4" in video_info[key].lower(): + return video_info[key] + return None def playerContent(self, flag, id, vipFlags): - result = {} - print(f"调用playerContent: flag={flag}, id={id}") - - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0", - "Referer": self.siteUrl, - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8" + result = { + "parse": 0, + "url": id, + "header": json.dumps(self.headers) } - # 解析id参数 + # 如果已经是视频链接直接返回 + if 'http' in id and ('.mp4' in id or '.m3u8' in id): + return result + + # 解析参数 parts = id.split('$') - drama_id = None - chapter_id = None - - if len(parts) >= 2: - drama_id = parts[0] - chapter_id = parts[1] - chapter_name = parts[2] if len(parts) > 2 else "第一集" - print(f"解析参数: drama_id={drama_id}, chapter_id={chapter_id}") - else: - # 处理旧数据格式 - print(f"使用原始URL格式: {id}") - result["parse"] = 0 - result["url"] = id - result["header"] = json.dumps(headers) + if len(parts) < 2: return result + + drama_id = parts[0].replace('/drama/', '') + chapter_id = parts[1] - # 直接检查chapter_id是否包含http(可能已经是视频链接) - if 'http' in chapter_id and '.mp4' in chapter_id: - print(f"已经是MP4链接: {chapter_id}") - result["parse"] = 0 - result["url"] = chapter_id - result["header"] = json.dumps(headers) - return result - - # 构建episode页面URL - drama_id_clean = drama_id.replace('/drama/', '') - episode_url = f"{self.siteUrl}/episode/{drama_id_clean}/{chapter_id}" - print(f"请求episode页面: {episode_url}") - - try: - rsp = self.fetch(episode_url, headers=headers) - if not rsp or rsp.status_code != 200: - print(f"请求失败,状态码: {getattr(rsp, 'status_code', 'N/A')}") - result["parse"] = 0 - result["url"] = id - result["header"] = json.dumps(headers) - return result + # 尝试获取视频链接 + video_url = self.getEpisodeVideoUrl(drama_id, chapter_id) + if video_url: + result["url"] = video_url - html = rsp.text - print(f"获取页面大小: {len(html)} 字节") - - # 尝试从NEXT_DATA提取视频链接 - mp4_url = None - - # 方法1: 从NEXT_DATA提取 - next_data_match = re.search(r'', html, re.DOTALL) - if next_data_match: - try: - print("找到NEXT_DATA") - next_data = json.loads(next_data_match.group(1)) - page_props = next_data.get("props", {}).get("pageProps", {}) - - # 从chapterList中查找当前章节 - chapter_list = page_props.get("chapterList", []) - print(f"找到章节列表,长度: {len(chapter_list)}") - - for chapter in chapter_list: - if chapter.get("chapterId") == chapter_id: - print(f"找到匹配的章节: {chapter.get('chapterName')}") - chapter_video = chapter.get("chapterVideoVo", {}) - mp4_url = chapter_video.get("mp4", "") or chapter_video.get("mp4720p", "") or chapter_video.get("vodMp4Url", "") - if mp4_url: - print(f"从chapterList找到MP4链接: {mp4_url}") - break - - # 如果未找到,尝试从当前章节获取 - if not mp4_url: - current_chapter = page_props.get("chapterInfo", {}) - if current_chapter: - print("找到当前章节信息") - chapter_video = current_chapter.get("chapterVideoVo", {}) - mp4_url = chapter_video.get("mp4", "") or chapter_video.get("mp4720p", "") or chapter_video.get("vodMp4Url", "") - if mp4_url: - print(f"从chapterInfo找到MP4链接: {mp4_url}") - except Exception as e: - print(f"解析NEXT_DATA失败: {str(e)}") - print(traceback.format_exc()) - - # 方法2: 直接从HTML中提取MP4链接 - if not mp4_url: - mp4_pattern = r'(https?://[^"\']+\.mp4)' - mp4_matches = re.findall(mp4_pattern, html) - if mp4_matches: - # 查找含有chapter_id的链接 - matched_mp4 = False - for url in mp4_matches: - if chapter_id in url: - mp4_url = url - matched_mp4 = True - print(f"从HTML直接提取章节MP4链接: {mp4_url}") - break - - # 如果没找到包含chapter_id的链接,使用第一个 - if not matched_mp4 and mp4_matches: - mp4_url = mp4_matches[0] - print(f"从HTML直接提取MP4链接: {mp4_url}") - - if mp4_url and ".mp4" in mp4_url: - print(f"最终找到的MP4链接: {mp4_url}") - result["parse"] = 0 - result["url"] = mp4_url - result["header"] = json.dumps(headers) - return result - else: - print(f"未找到有效的MP4链接,尝试再次解析页面内容") - # 再尝试一次从HTML中广泛搜索所有可能的MP4链接 - all_mp4_pattern = r'(https?://[^"\']+\.mp4)' - all_mp4_matches = re.findall(all_mp4_pattern, html) - if all_mp4_matches: - mp4_url = all_mp4_matches[0] - print(f"从HTML广泛搜索找到MP4链接: {mp4_url}") - result["parse"] = 0 - result["url"] = mp4_url - result["header"] = json.dumps(headers) - return result - - print(f"未找到视频链接,返回原episode URL: {episode_url}") - result["parse"] = 0 - result["url"] = episode_url - result["header"] = json.dumps(headers) - return result - except Exception as e: - print(f"请求或解析失败: {str(e)}") - print(traceback.format_exc()) - result["parse"] = 0 - result["url"] = id - result["header"] = json.dumps(headers) - return result + return result + def getEpisodeVideoUrl(self, drama_id, chapter_id): + episode_url = f"{self.siteUrl}/episode/{drama_id}/{chapter_id}" + response = self.fetch(episode_url) + if not response: + return None + + html = response.text + + # 方法1: 从NEXT_DATA提取 + next_data_match = re.search(r'', html, re.DOTALL) + if next_data_match: + try: + next_data = json.loads(next_data_match.group(1)) + page_props = next_data.get("props", {}).get("pageProps", {}) + chapter_info = page_props.get("chapterInfo", {}) + + if chapter_info and "chapterVideoVo" in chapter_info: + video_info = chapter_info["chapterVideoVo"] + for key in ["mp4", "mp4720p", "vodMp4Url"]: + if key in video_info and video_info[key] and ".mp4" in video_info[key].lower(): + return video_info[key] + except: + pass + + # 方法2: 直接从HTML提取 + mp4_matches = re.findall(r'(https?://[^"\']+\.mp4)', html) + if mp4_matches: + for url in mp4_matches: + if chapter_id in url or drama_id in url: + return url + return mp4_matches[0] + + return None + def localProxy(self, param): - # 本地代理处理,此处简单返回传入的参数 return [200, "video/MP2T", {}, param] def destroy(self): - # 资源回收 - pass \ No newline at end of file + pass \ No newline at end of file diff --git a/spider.jar b/spider.jar index 9b24e76..685e1fb 100644 Binary files a/spider.jar and b/spider.jar differ