nas-tools/app/sites/sites.py
2023-02-13 12:52:00 +08:00

423 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
import time
from datetime import datetime
from functools import lru_cache
from lxml import etree
from app.conf import SiteConf
from app.helper import ChromeHelper, SiteHelper, DbHelper
from app.message import Message
from app.utils import RequestUtils, StringUtils, ExceptionUtils
from app.utils.commons import singleton
from config import Config
@singleton
class Sites:
message = None
dbhelper = None
_sites = []
_siteByIds = {}
_siteByUrls = {}
_site_favicons = {}
_rss_sites = []
_brush_sites = []
_statistic_sites = []
_signin_sites = []
_MAX_CONCURRENCY = 10
def __init__(self):
self.init_config()
def init_config(self):
self.dbhelper = DbHelper()
self.message = Message()
# 原始站点列表
self._sites = []
# ID存储站点
self._siteByIds = {}
# URL存储站点
self._siteByUrls = {}
# 开启订阅功能站点
self._rss_sites = []
# 开启刷流功能站点:
self._brush_sites = []
# 开启统计功能站点:
self._statistic_sites = []
# 开启签到功能站点:
self._signin_sites = []
# 站点图标
self.init_favicons()
# 站点数据
self._sites = self.dbhelper.get_config_site()
for site in self._sites:
# 站点属性
site_note = self.__get_site_note_items(site.NOTE)
# 站点用途Q签到、D订阅、S刷流
site_rssurl = site.RSSURL
site_signurl = site.SIGNURL
site_cookie = site.COOKIE
site_uses = site.INCLUDE or ''
uses = []
if site_uses:
signin_enable = True if "Q" in site_uses and site_signurl and site_cookie else False
rss_enable = True if "D" in site_uses and site_rssurl else False
brush_enable = True if "S" in site_uses and site_rssurl and site_cookie else False
statistic_enable = True if "T" in site_uses and (site_rssurl or site_signurl) and site_cookie else False
uses.append("Q") if signin_enable else None
uses.append("D") if rss_enable else None
uses.append("S") if brush_enable else None
uses.append("T") if statistic_enable else None
else:
signin_enable = False
rss_enable = False
brush_enable = False
statistic_enable = False
site_info = {
"id": site.ID,
"name": site.NAME,
"pri": site.PRI or 0,
"rssurl": site_rssurl,
"signurl": site_signurl,
"cookie": site_cookie,
"rule": site_note.get("rule"),
"download_setting": site_note.get("download_setting"),
"signin_enable": signin_enable,
"rss_enable": rss_enable,
"brush_enable": brush_enable,
"statistic_enable": statistic_enable,
"uses": uses,
"ua": site_note.get("ua"),
"parse": True if site_note.get("parse") == "Y" else False,
"unread_msg_notify": True if site_note.get("message") == "Y" else False,
"chrome": True if site_note.get("chrome") == "Y" else False,
"proxy": True if site_note.get("proxy") == "Y" else False,
"subtitle": True if site_note.get("subtitle") == "Y" else False,
"strict_url": StringUtils.get_base_url(site_signurl or site_rssurl)
}
# 以ID存储
self._siteByIds[site.ID] = site_info
# 以域名存储
site_strict_url = StringUtils.get_url_domain(site.SIGNURL or site.RSSURL)
if site_strict_url:
self._siteByUrls[site_strict_url] = site_info
def init_favicons(self):
"""
加载图标到内存
"""
self._site_favicons = {site.SITE: site.FAVICON for site in self.dbhelper.get_site_favicons()}
def get_sites(self,
siteid=None,
siteurl=None,
rss=False,
brush=False,
signin=False,
statistic=False):
"""
获取站点配置
"""
if siteid:
return self._siteByIds.get(int(siteid)) or {}
if siteurl:
return self._siteByUrls.get(StringUtils.get_url_domain(siteurl)) or {}
ret_sites = []
for site in self._siteByIds.values():
if rss and not site.get('rss_enable'):
continue
if brush and not site.get('brush_enable'):
continue
if signin and not site.get('signin_enable'):
continue
if statistic and not site.get('statistic_enable'):
continue
ret_sites.append(site)
if siteid or siteurl:
return {}
return ret_sites
def get_site_dict(self,
rss=False,
brush=False,
signin=False,
statistic=False):
"""
获取站点字典
"""
return [
{
"id": site.get("id"),
"name": site.get("name")
} for site in self.get_sites(
rss=rss,
brush=brush,
signin=signin,
statistic=statistic
)
]
def get_site_names(self,
rss=False,
brush=False,
signin=False,
statistic=False):
"""
获取站点名称
"""
return [
site.get("name") for site in self.get_sites(
rss=rss,
brush=brush,
signin=signin,
statistic=statistic
)
]
def get_site_favicon(self, site_name=None):
"""
获取站点图标
"""
if site_name:
return self._site_favicons.get(site_name)
else:
return self._site_favicons
def get_site_download_setting(self, site_name=None):
"""
获取站点下载设置
"""
if site_name:
for site in self._siteByIds.values():
if site.get("name") == site_name:
return site.get("download_setting")
return None
def test_connection(self, site_id):
"""
测试站点连通性
:param site_id: 站点编号
:return: 是否连通、错误信息、耗时
"""
site_info = self.get_sites(siteid=site_id)
if not site_info:
return False, "站点不存在", 0
site_cookie = site_info.get("cookie")
if not site_cookie:
return False, "未配置站点Cookie", 0
ua = site_info.get("ua")
site_url = StringUtils.get_base_url(site_info.get("signurl") or site_info.get("rssurl"))
if not site_url:
return False, "未配置站点地址", 0
chrome = ChromeHelper()
if site_info.get("chrome") and chrome.get_status():
# 计时
start_time = datetime.now()
if not chrome.visit(url=site_url, ua=ua, cookie=site_cookie):
return False, "Chrome模拟访问失败", 0
# 循环检测是否过cf
cloudflare = chrome.pass_cloudflare()
seconds = int((datetime.now() - start_time).microseconds / 1000)
if not cloudflare:
return False, "跳转站点失败", seconds
# 判断是否已签到
html_text = chrome.get_html()
if not html_text:
return False, "获取站点源码失败", 0
if SiteHelper.is_logged_in(html_text):
return True, "连接成功", seconds
else:
return False, "Cookie失效", seconds
else:
# 计时
start_time = datetime.now()
res = RequestUtils(cookies=site_cookie,
headers=ua,
proxies=Config().get_proxies() if site_info.get("proxy") else None
).get_res(url=site_url)
seconds = int((datetime.now() - start_time).microseconds / 1000)
if res and res.status_code == 200:
if not SiteHelper.is_logged_in(res.text):
return False, "Cookie失效", seconds
else:
return True, "连接成功", seconds
elif res is not None:
return False, f"连接失败,状态码:{res.status_code}", seconds
else:
return False, "无法打开网站", seconds
def get_site_attr(self, url):
"""
整合公有站点和私有站点的属性
"""
site_info = self.get_sites(siteurl=url)
public_site = self.get_public_sites(url=url)
if public_site:
site_info.update(public_site)
return site_info
def parse_site_download_url(self, page_url, xpath):
"""
从站点详情页面中解析中下载链接
:param page_url: 详情页面地址
:param xpath: 解析XPATH同时还包括Cookie、UA和Referer
"""
if not page_url or not xpath:
return ""
cookie, ua, referer, page_source = None, None, None, None
xpaths = xpath.split("|")
xpath = xpaths[0]
if len(xpaths) > 1:
cookie = xpaths[1]
if len(xpaths) > 2:
ua = xpaths[2]
if len(xpaths) > 3:
referer = xpaths[3]
try:
site_info = self.get_public_sites(url=page_url)
if not site_info.get("referer"):
referer = None
req = RequestUtils(
headers=ua,
cookies=cookie,
referer=referer,
proxies=Config().get_proxies() if site_info.get("proxy") else None
).get_res(url=page_url)
if req and req.status_code == 200:
if req.text:
page_source = req.text
# xpath解析
if page_source:
html = etree.HTML(page_source)
urls = html.xpath(xpath)
if urls:
return str(urls[0])
except Exception as err:
ExceptionUtils.exception_traceback(err)
return None
@staticmethod
@lru_cache(maxsize=128)
def __get_site_page_html(url, cookie, ua, render=False, proxy=False):
chrome = ChromeHelper(headless=True)
if render and chrome.get_status():
# 开渲染
if chrome.visit(url=url, cookie=cookie, ua=ua):
# 等待页面加载完成
time.sleep(10)
return chrome.get_html()
else:
res = RequestUtils(
cookies=cookie,
headers=ua,
proxies=Config().get_proxies() if proxy else None
).get_res(url=url)
if res and res.status_code == 200:
res.encoding = res.apparent_encoding
return res.text
return ""
@staticmethod
def get_grapsite_conf(url):
"""
根据地址找到RSS_SITE_GRAP_CONF对应配置
"""
for k, v in SiteConf.RSS_SITE_GRAP_CONF.items():
if StringUtils.url_equal(k, url):
return v
return {}
def check_torrent_attr(self, torrent_url, cookie, ua=None, proxy=False):
"""
检验种子是否免费,当前做种人数
:param torrent_url: 种子的详情页面
:param cookie: 站点的Cookie
:param ua: 站点的ua
:param proxy: 是否使用代理
:return: 种子属性包含FREE 2XFREE HR PEER_COUNT等属性
"""
ret_attr = {
"free": False,
"2xfree": False,
"hr": False,
"peer_count": 0
}
if not torrent_url:
return ret_attr
xpath_strs = self.get_grapsite_conf(torrent_url)
if not xpath_strs:
return ret_attr
html_text = self.__get_site_page_html(url=torrent_url,
cookie=cookie,
ua=ua,
render=xpath_strs.get('RENDER'),
proxy=proxy)
if not html_text:
return ret_attr
try:
html = etree.HTML(html_text)
# 检测2XFREE
for xpath_str in xpath_strs.get("2XFREE"):
if html.xpath(xpath_str):
ret_attr["free"] = True
ret_attr["2xfree"] = True
# 检测FREE
for xpath_str in xpath_strs.get("FREE"):
if html.xpath(xpath_str):
ret_attr["free"] = True
# 检测HR
for xpath_str in xpath_strs.get("HR"):
if html.xpath(xpath_str):
ret_attr["hr"] = True
# 检测PEER_COUNT当前做种人数
for xpath_str in xpath_strs.get("PEER_COUNT"):
peer_count_dom = html.xpath(xpath_str)
if peer_count_dom:
peer_count_str = ''.join(peer_count_dom[0].itertext())
peer_count_digit_str = ""
for m in peer_count_str:
if m.isdigit():
peer_count_digit_str = peer_count_digit_str + m
ret_attr["peer_count"] = int(peer_count_digit_str) if len(peer_count_digit_str) > 0 else 0
except Exception as err:
ExceptionUtils.exception_traceback(err)
# 随机休眼后再返回
time.sleep(round(random.uniform(1, 5), 1))
return ret_attr
@staticmethod
def is_public_site(url):
"""
判断是否为公开BT站点
"""
_, netloc = StringUtils.get_url_netloc(url)
if netloc in SiteConf.PUBLIC_TORRENT_SITES.keys():
return True
return False
@staticmethod
def get_public_sites(url=None):
"""
查询所有公开BT站点
"""
if url:
_, netloc = StringUtils.get_url_netloc(url)
return SiteConf.PUBLIC_TORRENT_SITES.get(netloc) or {}
else:
return SiteConf.PUBLIC_TORRENT_SITES.items()
@staticmethod
def __get_site_note_items(note):
"""
从note中提取站点信息
"""
infos = {}
if note:
infos = json.loads(note)
return infos