feat(api): 添加多个影视资源接口

新增云速影视、绝对影视、界影视和柯南影视四个资源接口,
支持搜索和筛选功能,丰富了影视内容来源。
This commit is contained in:
Wang.Luo 2025-09-20 03:27:01 +08:00
parent fdc95360b4
commit 71943e4ea6
4 changed files with 582 additions and 0 deletions

View File

@ -1348,6 +1348,42 @@
"quickSearch": 1,
"filterable": 1
},
{
"key": "云速影视",
"name": "云速|影视",
"type": 3,
"api": "./py/云速影视.py",
"searchable": 1,
"quickSearch": 1,
"filterable": 1
},
{
"key": "绝对影视",
"name": "绝对|影视",
"type": 3,
"api": "./py/绝对影视.py",
"searchable": 1,
"quickSearch": 1,
"filterable": 1
},
{
"key": "界影视",
"name": "界|影视",
"type": 3,
"api": "./py/界影视.py",
"searchable": 1,
"quickSearch": 1,
"filterable": 1
},
{
"key": "柯南影视",
"name": "柯南|影视",
"type": 3,
"api": "./PyramidStore/plugin/html/柯南影视.py",
"searchable": 1,
"quickSearch": 1,
"filterable": 1
},
{
"comment": "自定义接口结束",
"key": "柚子资源",

219
py/云速影视.py Normal file
View File

@ -0,0 +1,219 @@
import re
import sys
from Crypto.Hash import MD5
sys.path.append("..")
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from urllib.parse import quote, urlparse
from base64 import b64encode, b64decode
import json
import time
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
self.host = self.gethost()
self.did=self.getdid()
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def action(self, action):
pass
def destroy(self):
pass
def homeContent(self, filter):
data = self.getdata("/api.php/getappapi.index/initV119")
dy = {"class": "类型", "area": "地区", "lang": "语言", "year": "年份", "letter": "字母", "by": "排序",
"sort": "排序"}
filters = {}
classes = []
json_data = data["type_list"]
homedata = data["banner_list"][8:]
for item in json_data:
if item["type_name"] == "全部":
continue
has_non_empty_field = False
jsontype_extend = json.loads(item["type_extend"])
homedata.extend(item["recommend_list"])
jsontype_extend["sort"] = "最新,最热,最赞"
classes.append({"type_name": item["type_name"], "type_id": item["type_id"]})
for key in dy:
if key in jsontype_extend and jsontype_extend[key].strip() != "":
has_non_empty_field = True
break
if has_non_empty_field:
filters[str(item["type_id"])] = []
for dkey in jsontype_extend:
if dkey in dy and jsontype_extend[dkey].strip() != "":
values = jsontype_extend[dkey].split(",")
value_array = [{"n": value.strip(), "v": value.strip()} for value in values if
value.strip() != ""]
filters[str(item["type_id"])].append({"key": dkey, "name": dy[dkey], "value": value_array})
result = {}
result["class"] = classes
result["filters"] = filters
result["list"] = homedata[1:]
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
body = {"area": extend.get('area', '全部'), "year": extend.get('year', '全部'), "type_id": tid, "page": pg,
"sort": extend.get('sort', '最新'), "lang": extend.get('lang', '全部'),
"class": extend.get('class', '全部')}
result = {}
data = self.getdata("/api.php/getappapi.index/typeFilterVodList", body)
result["list"] = data["recommend_list"]
result["page"] = pg
result["pagecount"] = 9999
result["limit"] = 90
result["total"] = 999999
return result
def detailContent(self, ids):
body = f"vod_id={ids[0]}"
data = self.getdata("/api.php/getappapi.index/vodDetail", body)
vod = data["vod"]
play = []
names = []
for itt in data["vod_play_list"]:
a = []
names.append(itt["player_info"]["show"])
for it in itt['urls']:
it['user_agent'] = itt["player_info"].get("user_agent")
it["parse"] = itt["player_info"].get("parse")
a.append(f"{it['name']}${self.e64(json.dumps(it))}")
play.append("#".join(a))
vod["vod_play_from"] = "$$$".join(names)
vod["vod_play_url"] = "$$$".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
body = f"keywords={key}&type_id=0&page={pg}"
data = self.getdata("/api.php/getappapi.index/searchList", body)
result = {"list": data["search_list"], "page": pg}
return result
def playerContent(self, flag, id, vipFlags):
ids = json.loads(self.d64(id))
h = {"User-Agent": (ids['user_agent'] or "okhttp/3.14.9")}
try:
if re.search(r'url=', ids['parse_api_url']):
data = self.fetch(ids['parse_api_url'], headers=h, timeout=10).json()
url = data.get('url') or data['data'].get('url')
else:
body = f"parse_api={ids.get('parse') or ids['parse_api_url'].replace(ids['url'], '')}&url={quote(self.aes(ids['url'], True))}&token={ids.get('token')}"
b = self.getdata("/api.php/getappapi.index/vodParse", body)['json']
url = json.loads(b)['url']
if 'error' in url: raise ValueError(f"解析失败: {url}")
p = 0
except Exception as e:
print('错误信息:', e)
url, p = ids['url'], 1
if re.search(r'\.jpg|\.png|\.jpeg', url):
url = self.Mproxy(url)
result = {}
result["parse"] = p
result["url"] = url
result["header"] = h
return result
def localProxy(self, param):
return self.Mlocal(param)
def gethost(self):
headers = {
'User-Agent': 'okhttp/3.14.9'
}
response = self.fetch('https://jingyu-1312635929.cos.ap-nanjing.myqcloud.com/1.json',headers=headers).text
return response.strip()
def getdid(self):
did=self.getCache('did')
if not did:
t = str(int(time.time()))
did = self.md5(t)
self.setCache('did', did)
return did
def aes(self, text, b=None):
key = b"4d83b87c4c5ea111"
cipher = AES.new(key, AES.MODE_CBC, key)
if b:
ct_bytes = cipher.encrypt(pad(text.encode("utf-8"), AES.block_size))
ct = b64encode(ct_bytes).decode("utf-8")
return ct
else:
pt = unpad(cipher.decrypt(b64decode(text)), AES.block_size)
return pt.decode("utf-8")
def header(self):
t = str(int(time.time()))
header = {"Referer": self.host,
"User-Agent": "okhttp/3.14.9", "app-version-code": "300", "app-ui-mode": "light",
"app-api-verify-time": t, "app-user-device-id": self.did,
"app-api-verify-sign": self.aes(t, True),
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"}
return header
def getdata(self, path, data=None):
vdata = self.post(f"{self.host}{path}", headers=self.header(), data=data, timeout=10).json()['data']
data1 = self.aes(vdata)
return json.loads(data1)
def Mproxy(self, url):
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=m3u8"
def Mlocal(self, param, header=None):
url = self.d64(param["url"])
ydata = self.fetch(url, headers=header, allow_redirects=False)
data = ydata.content.decode('utf-8')
if ydata.headers.get('Location'):
url = ydata.headers['Location']
data = self.fetch(url, headers=header).content.decode('utf-8')
parsed_url = urlparse(url)
durl = parsed_url.scheme + "://" + parsed_url.netloc
lines = data.strip().split('\n')
for index, string in enumerate(lines):
if '#EXT' not in string and 'http' not in string:
last_slash_index = string.rfind('/')
lpath = string[:last_slash_index + 1]
lines[index] = durl + ('' if lpath.startswith('/') else '/') + lpath
data = '\n'.join(lines)
return [200, "application/vnd.apple.mpegur", data]
def e64(self, text):
try:
text_bytes = text.encode('utf-8')
encoded_bytes = b64encode(text_bytes)
return encoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64编码错误: {str(e)}")
return ""
def d64(self, encoded_text):
try:
encoded_bytes = encoded_text.encode('utf-8')
decoded_bytes = b64decode(encoded_bytes)
return decoded_bytes.decode('utf-8')
except Exception as e:
print(f"Base64解码错误: {str(e)}")
return ""
def md5(self, text):
h = MD5.new()
h.update(text.encode('utf-8'))
return h.hexdigest()

180
py/界影视.py Normal file
View File

@ -0,0 +1,180 @@
# -*- coding: utf-8 -*-
# @Author : Doubebly
# @Time : 2025/1/21 23:07
import hashlib
import re
import sys
import time
import requests
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def getName(self):
return "JieYingShi"
def init(self, extend):
self.home_url = 'https://www.hkybqufgh.com'
self.error_url = 'https://json.doube.eu.org/error/4gtv/index.m3u8'
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
}
def getDependence(self):
return []
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filter):
return {'class': [
{
'type_id': '1',
'type_name': '电影'
},
{
'type_id': '2',
'type_name': '电视剧'
},
{
'type_id': '4',
'type_name': '动漫'
},
{
'type_id': '3',
'type_name': '综艺'
}
]}
def homeVideoContent(self):
a = self.get_data(self.home_url)
return {'list': a, 'parse': 0, 'jx': 0}
def categoryContent(self, cid, page, filter, ext):
url = self.home_url + f'/vod/show/id/{cid}/page/{page}'
data = self.get_data(url)
return {'list': data, 'parse': 0, 'jx': 0}
def detailContent(self, did):
ids = did[0]
data = self.get_detail_data(ids)
return {"list": data, 'parse': 0, 'jx': 0}
def searchContent(self, key, quick, page='1'):
if int(page) > 1:
return {'list': [], 'parse': 0, 'jx': 0}
url = self.home_url + f'/vod/search/{key}'
data = self.get_data(url)
return {'list': data, 'parse': 0, 'jx': 0}
def playerContent(self, flag, pid, vipFlags):
url = self.get_play_data(pid)
return {"url": url, "header": self.headers, "parse": 1, "jx": 0}
def localProxy(self, params):
pass
def destroy(self):
return '正在Destroy'
def get_data(self, url):
data = []
try:
res = requests.get(url, headers=self.headers)
if res.status_code != 200:
return data
vod_id_s = re.findall(r'\\"vodId\\":(.*?),', res.text)
vod_name_s = re.findall(r'\\"vodName\\":\\"(.*?)\\"', res.text)
vod_pic_s = re.findall(r'\\"vodPic\\":\\"(.*?)\\"', res.text)
vod_remarks_s = re.findall(r'\\"vodRemarks\\":\\"(.*?)\\"', res.text)
for i in range(len(vod_id_s)):
data.append(
{
'vod_id': vod_id_s[i],
'vod_name': vod_name_s[i],
'vod_pic': vod_pic_s[i],
'vod_remarks': vod_remarks_s[i],
}
)
except requests.RequestException as e:
print(e)
return data
def get_detail_data(self, ids):
url = self.home_url + f'/api/mw-movie/anonymous/video/detail?id={ids}'
t = str(int(time.time() * 1000))
headers = self.get_headers(t, f'id={ids}&key=cb808529bae6b6be45ecfab29a4889bc&t={t}')
try:
res = requests.get(url, headers=headers)
if res.status_code != 200:
return []
i = res.json()['data']
urls = []
for ii in res.json()['data']['episodeList']:
name = ii['name']
url = ii['nid']
urls.append(f'{name}${ids}-{url}')
data = {
'type_name': i['vodClass'],
'vod_id': i['vodId'],
'vod_name': i['vodName'],
'vod_remarks': i['vodRemarks'],
'vod_year': i['vodYear'],
'vod_area': i['vodArea'],
'vod_actor': i['vodActor'],
'vod_director': i['vodDirector'],
'vod_content': i['vodContent'],
'vod_play_from': '默认',
'vod_play_url': '#'.join(urls),
}
return [data]
except requests.RequestException as e:
print(e)
return []
def get_play_data(self, play):
info = play.split('-')
_id = info[0]
_pid = info[1]
url = self.home_url + f'/api/mw-movie/anonymous/v2/video/episode/url?id={_id}&nid={_pid}'
t = str(int(time.time() * 1000))
headers = self.get_headers(t, f'id={_id}&nid={_pid}&key=cb808529bae6b6be45ecfab29a4889bc&t={t}')
try:
res = requests.get(url, headers=headers)
if res.status_code != 200:
return self.error_url
return res.json()['data']['list'][0]['url']
except requests.RequestException as e:
print(e)
return self.error_url
@staticmethod
def get_headers(t, e):
sign = hashlib.sha1(hashlib.md5(e.encode()).hexdigest().encode()).hexdigest()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Accept': 'application/json, text/plain, */*',
'sign': sign,
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
't': t,
'referer': 'https://www.hkybqufgh.com/',
}
return headers
if __name__ == '__main__':
pass

147
py/绝对影视.py Normal file
View File

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
# by @嗷呜
import base64
import re
import sys
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider
class Spider(Spider):
def init(self, extend=""):
pass
def getName(self):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def destroy(self):
pass
host = 'https://www.jdys.art'
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'sec-ch-ua-platform': '"macOS"',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="134", "Google Chrome";v="134"',
'dnt': '1',
'sec-ch-ua-mobile': '?0',
'origin': host,
'sec-fetch-site': 'cross-site',
'sec-fetch-mode': 'cors',
'sec-fetch-dest': 'empty',
'referer': f'{host}/',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
'priority': 'u=1, i',
}
def homeContent(self, filter):
data = self.getpq(self.fetch(self.host, headers=self.headers).text)
result = {}
classes = []
for k in list(data('.navtop .navlist li').items())[:9]:
classes.append({
'type_name': k('a').text(),
'type_id': k('a').attr('href'),
})
result['class'] = classes
result['list'] = self.getlist(data('.mi_btcon .bt_img ul li'))
return result
def homeVideoContent(self):
pass
def categoryContent(self, tid, pg, filter, extend):
data = self.getpq(self.fetch(f"{tid}{'' if pg == '1' else f'page/{pg}/'}", headers=self.headers).text)
result = {}
result['list'] = self.getlist(data('.mi_cont .bt_img ul li'))
result['page'] = pg
result['pagecount'] = 9999
result['limit'] = 90
result['total'] = 999999
return result
def detailContent(self, ids):
data = self.getpq(self.fetch(ids[0], headers=self.headers).text)
data2 = data('.moviedteail_list li')
vod = {
'vod_name': data('.dytext h1').text(),
'type_name': data2.eq(0).text(),
'vod_year': data2.eq(2).text(),
'vod_area': data2.eq(1).text(),
'vod_remarks': data2.eq(4).text(),
'vod_actor': data2.eq(7).text(),
'vod_director': data2.eq(5).text(),
'vod_content': data('.yp_context').text().strip()
}
vdata = data('.paly_list_btn a')
play = []
for i in vdata.items():
a = i.text() + "$" + i.attr.href
play.append(a)
vod["vod_play_from"] = "在线播放"
vod["vod_play_url"] = "#".join(play)
result = {"list": [vod]}
return result
def searchContent(self, key, quick, pg="1"):
data = self.getpq(self.fetch(f"{self.host}/page/{pg}/?s={key}", headers=self.headers).text)
return {'list': self.getlist(data('.mi_cont .bt_img ul li')), 'page': pg}
def playerContent(self, flag, id, vipFlags):
data = self.getpq(self.fetch(id, headers=self.headers).text)
try:
sc = data('.videoplay script').eq(-1).text()
strd = re.findall(r'var\s+[^=]*=\s*"([^"]*)";', sc)
kdata = re.findall(r'parse\((.*?)\);', sc)
jm = self.aes(strd[0], kdata[0].replace('"', ''), kdata[1].replace('"', ''))
url = re.search(r'url: "(.*?)"', jm).group(1)
p = 0
except:
p = 1
url = id
result = {}
result["parse"] = p
result["url"] = url
result["header"] = self.headers
return result
def localProxy(self, param):
pass
def getpq(self, text):
try:
return pq(text)
except Exception as e:
print(f"{str(e)}")
return pq(text.encode('utf-8'))
def getlist(self, data):
videos = []
for i in data.items():
videos.append({
'vod_id': i('a').attr('href'),
'vod_name': i('a img').attr('alt'),
'vod_pic': i('a img').attr('src'),
'vod_remarks': i('.dycategory').text(),
'vod_year': i('.dyplayinfo').text() or i('.rating').text(),
})
return videos
def aes(self, word, key, iv):
key = key.encode('utf-8')
iv = iv.encode('utf-8')
encrypted_data = base64.b64decode(word)
cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted_data = cipher.decrypt(encrypted_data)
decrypted_data = unpad(decrypted_data, AES.block_size)
return decrypted_data.decode('utf-8')