608 lines
24 KiB
Python
608 lines
24 KiB
Python
import json
|
||
import re
|
||
import time
|
||
from enum import Enum
|
||
|
||
import log
|
||
from app.conf import ModuleConf
|
||
from app.helper import DbHelper, SubmoduleHelper
|
||
from app.message.message_center import MessageCenter
|
||
from app.plugins import EventManager
|
||
from app.utils import StringUtils, ExceptionUtils
|
||
from app.utils.commons import singleton
|
||
from app.utils.types import SearchType, MediaType, EventType
|
||
from config import Config
|
||
from web.backend.web_utils import WebUtils
|
||
|
||
|
||
@singleton
|
||
class Message(object):
|
||
dbhelper = None
|
||
messagecenter = None
|
||
eventmanager = None
|
||
_message_schemas = []
|
||
_active_clients = []
|
||
_active_interactive_clients = {}
|
||
_client_configs = {}
|
||
_domain = None
|
||
|
||
def __init__(self):
|
||
self._message_schemas = SubmoduleHelper.import_submodules(
|
||
'app.message.client',
|
||
filter_func=lambda _, obj: hasattr(obj, 'schema')
|
||
)
|
||
log.debug(f"【Message】加载消息服务:{self._message_schemas}")
|
||
self.init_config()
|
||
|
||
def init_config(self):
|
||
self.dbhelper = DbHelper()
|
||
self.messagecenter = MessageCenter()
|
||
self.eventmanager = EventManager()
|
||
|
||
self._domain = Config().get_domain()
|
||
# 停止旧服务
|
||
if self._active_clients:
|
||
for active_client in self._active_clients:
|
||
if active_client.get("search_type") in self.get_search_types():
|
||
client = active_client.get("client")
|
||
if client and hasattr(client, "stop_service"):
|
||
client.stop_service()
|
||
# 活跃的客户端
|
||
self._active_clients = []
|
||
# 活跃的交互客户端
|
||
self._active_interactive_clients = {}
|
||
# 全量客户端配置
|
||
self._client_configs = {}
|
||
for client_config in self.dbhelper.get_message_client() or []:
|
||
config = json.loads(client_config.CONFIG) if client_config.CONFIG else {}
|
||
config.update({
|
||
"interactive": client_config.INTERACTIVE
|
||
})
|
||
client_conf = {
|
||
"id": client_config.ID,
|
||
"name": client_config.NAME,
|
||
"type": client_config.TYPE,
|
||
"config": config,
|
||
"switchs": json.loads(client_config.SWITCHS) if client_config.SWITCHS else [],
|
||
"interactive": client_config.INTERACTIVE,
|
||
"enabled": client_config.ENABLED
|
||
}
|
||
self._client_configs[str(client_config.ID)] = client_conf
|
||
if not client_config.ENABLED or not config:
|
||
continue
|
||
client = {
|
||
"search_type": ModuleConf.MESSAGE_CONF.get('client').get(client_config.TYPE, {}).get('search_type'),
|
||
"client": self.__build_class(ctype=client_config.TYPE, conf=config)
|
||
}
|
||
client.update(client_conf)
|
||
self._active_clients.append(client)
|
||
if client.get("interactive"):
|
||
self._active_interactive_clients[client.get("search_type")] = client
|
||
|
||
def __build_class(self, ctype, conf):
|
||
for message_schema in self._message_schemas:
|
||
try:
|
||
if message_schema.match(ctype):
|
||
return message_schema(conf)
|
||
except Exception as e:
|
||
ExceptionUtils.exception_traceback(e)
|
||
return None
|
||
|
||
def get_status(self, ctype=None, config=None):
|
||
"""
|
||
测试消息设置状态
|
||
"""
|
||
if not config or not ctype:
|
||
return False
|
||
# 测试状态不启动监听服务
|
||
state, ret_msg = self.__build_class(ctype=ctype,
|
||
conf=config).send_msg(title="测试",
|
||
text="这是一条测试消息",
|
||
url="https://github.com/NAStool/nas-tools")
|
||
if not state:
|
||
log.error(f"【Message】{ctype} 发送测试消息失败:%s" % ret_msg)
|
||
return state
|
||
|
||
def __sendmsg(self, client, title, text="", image="", url="", user_id=""):
|
||
"""
|
||
通用消息发送
|
||
:param client: 消息端
|
||
:param title: 消息标题
|
||
:param text: 消息内容
|
||
:param image: 图片URL
|
||
:param url: 消息跳转地址
|
||
:param user_id: 用户ID,如有则只发给这个用户
|
||
:return: 发送状态、错误信息
|
||
"""
|
||
if not client or not client.get('client'):
|
||
return None
|
||
cname = client.get('name')
|
||
log.info(f"【Message】发送消息 {cname}:title={title}, text={text}")
|
||
if self._domain:
|
||
if url:
|
||
if not url.startswith("http"):
|
||
url = "%s?next=%s" % (self._domain, url)
|
||
else:
|
||
url = self._domain
|
||
else:
|
||
url = ""
|
||
state, ret_msg = client.get('client').send_msg(title=title,
|
||
text=text,
|
||
image=image,
|
||
url=url,
|
||
user_id=user_id)
|
||
if not state:
|
||
log.error(f"【Message】{cname} 消息发送失败:%s" % ret_msg)
|
||
return state
|
||
|
||
def send_channel_msg(self, channel, title, text="", image="", url="", user_id=""):
|
||
"""
|
||
按渠道发送消息,用于消息交互
|
||
:param channel: 消息渠道
|
||
:param title: 消息标题
|
||
:param text: 消息内容
|
||
:param image: 图片URL
|
||
:param url: 消息跳转地址
|
||
:param user_id: 用户ID,如有则只发给这个用户
|
||
:return: 发送状态、错误信息
|
||
"""
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 发送消息
|
||
client = self._active_interactive_clients.get(channel)
|
||
if client:
|
||
state = self.__sendmsg(client=client,
|
||
title=title,
|
||
text=text,
|
||
image=image,
|
||
url=url,
|
||
user_id=user_id)
|
||
return state
|
||
return False
|
||
|
||
def __send_list_msg(self, client, medias, user_id, title):
|
||
"""
|
||
发送选择类消息
|
||
"""
|
||
if not client or not client.get('client'):
|
||
return None
|
||
cname = client.get('name')
|
||
log.info(f"【Message】发送消息 {cname}:title={title}")
|
||
state, ret_msg = client.get('client').send_list_msg(medias=medias,
|
||
user_id=user_id,
|
||
title=title,
|
||
url=self._domain)
|
||
if not state:
|
||
log.error(f"【Message】{cname} 发送消息失败:%s" % ret_msg)
|
||
return state
|
||
|
||
def send_channel_list_msg(self, channel, title, medias: list, user_id=""):
|
||
"""
|
||
发送列表选择消息,用于消息交互
|
||
:param channel: 消息渠道
|
||
:param title: 消息标题
|
||
:param medias: 媒体信息列表
|
||
:param user_id: 用户ID,如有则只发给这个用户
|
||
:return: 发送状态、错误信息
|
||
"""
|
||
client = self._active_interactive_clients.get(channel)
|
||
if client:
|
||
state = self.__send_list_msg(client=client,
|
||
title=title,
|
||
medias=medias,
|
||
user_id=user_id)
|
||
return state
|
||
return False
|
||
|
||
def send_download_message(self, in_from: SearchType, can_item):
|
||
"""
|
||
发送下载的消息
|
||
:param in_from: 下载来源
|
||
:param can_item: 下载的媒体信息
|
||
:return: 发送状态、错误信息
|
||
"""
|
||
msg_title = f"{can_item.get_title_ep_string()} 开始下载"
|
||
msg_text = f"{can_item.get_star_string()}"
|
||
msg_text = f"{msg_text}\n来自:{in_from.value}"
|
||
if can_item.user_name:
|
||
msg_text = f"{msg_text}\n用户:{can_item.user_name}"
|
||
if can_item.site:
|
||
if in_from == SearchType.USERRSS:
|
||
msg_text = f"{msg_text}\n任务:{can_item.site}"
|
||
else:
|
||
msg_text = f"{msg_text}\n站点:{can_item.site}"
|
||
if can_item.get_resource_type_string():
|
||
msg_text = f"{msg_text}\n质量:{can_item.get_resource_type_string()}"
|
||
if can_item.size:
|
||
if str(can_item.size).isdigit():
|
||
size = StringUtils.str_filesize(can_item.size)
|
||
else:
|
||
size = can_item.size
|
||
msg_text = f"{msg_text}\n大小:{size}"
|
||
if can_item.org_string:
|
||
msg_text = f"{msg_text}\n种子:{can_item.org_string}"
|
||
if can_item.seeders:
|
||
msg_text = f"{msg_text}\n做种数:{can_item.seeders}"
|
||
msg_text = f"{msg_text}\n促销:{can_item.get_volume_factor_string()}"
|
||
if can_item.hit_and_run:
|
||
msg_text = f"{msg_text}\nHit&Run:是"
|
||
if can_item.description:
|
||
html_re = re.compile(r'<[^>]+>', re.S)
|
||
description = html_re.sub('', can_item.description)
|
||
can_item.description = re.sub(r'<[^>]+>', '', description)
|
||
msg_text = f"{msg_text}\n描述:{can_item.description}"
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=msg_title, content=msg_text)
|
||
# 解发事件
|
||
self.eventmanager.send_event(EventType.DownloadAdd, can_item.to_dict())
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "download_start" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=msg_title,
|
||
text=msg_text,
|
||
image=can_item.get_message_image(),
|
||
url='downloading'
|
||
)
|
||
|
||
def send_transfer_movie_message(self, in_from: Enum, media_info, exist_filenum, category_flag):
|
||
"""
|
||
发送转移电影的消息
|
||
:param in_from: 转移来源
|
||
:param media_info: 转移的媒体信息
|
||
:param exist_filenum: 已存在的文件数
|
||
:param category_flag: 二级分类开关
|
||
:return: 发送状态、错误信息
|
||
"""
|
||
msg_title = f"{media_info.get_title_string()} 已入库"
|
||
if media_info.vote_average:
|
||
msg_str = f"{media_info.get_vote_string()},类型:电影"
|
||
else:
|
||
msg_str = "类型:电影"
|
||
if media_info.category:
|
||
if category_flag:
|
||
msg_str = f"{msg_str},类别:{media_info.category}"
|
||
if media_info.get_resource_type_string():
|
||
msg_str = f"{msg_str},质量:{media_info.get_resource_type_string()}"
|
||
msg_str = f"{msg_str},大小:{StringUtils.str_filesize(media_info.size)},来自:{in_from.value}"
|
||
if exist_filenum != 0:
|
||
msg_str = f"{msg_str},{exist_filenum}个文件已存在"
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=msg_title, content=msg_str)
|
||
# 解发事件
|
||
self.eventmanager.send_event(EventType.TransferFinished, media_info.to_dict())
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "transfer_finished" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=msg_title,
|
||
text=msg_str,
|
||
image=media_info.get_message_image(),
|
||
url='history'
|
||
)
|
||
|
||
def send_transfer_tv_message(self, message_medias: dict, in_from: Enum):
|
||
"""
|
||
发送转移电视剧/动漫的消息
|
||
"""
|
||
for item_info in message_medias.values():
|
||
if item_info.total_episodes == 1:
|
||
msg_title = f"{item_info.get_title_string()} {item_info.get_season_episode_string()} 已入库"
|
||
else:
|
||
msg_title = f"{item_info.get_title_string()} {item_info.get_season_string()} 共{item_info.total_episodes}集 已入库"
|
||
if item_info.vote_average:
|
||
msg_str = f"{item_info.get_vote_string()},类型:{item_info.type.value}"
|
||
else:
|
||
msg_str = f"类型:{item_info.type.value}"
|
||
if item_info.category:
|
||
msg_str = f"{msg_str},类别:{item_info.category}"
|
||
if item_info.total_episodes == 1:
|
||
msg_str = f"{msg_str},大小:{StringUtils.str_filesize(item_info.size)},来自:{in_from.value}"
|
||
else:
|
||
msg_str = f"{msg_str},总大小:{StringUtils.str_filesize(item_info.size)},来自:{in_from.value}"
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=msg_title, content=msg_str)
|
||
# 解发事件
|
||
self.eventmanager.send_event(EventType.TransferFinished, item_info.to_dict())
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "transfer_finished" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=msg_title,
|
||
text=msg_str,
|
||
image=item_info.get_message_image(),
|
||
url='history')
|
||
|
||
def send_download_fail_message(self, item, error_msg):
|
||
"""
|
||
发送下载失败的消息
|
||
"""
|
||
title = "添加下载任务失败:%s %s" % (item.get_title_string(), item.get_season_episode_string())
|
||
text = f"站点:{item.site}\n种子名称:{item.org_string}\n种子链接:{item.enclosure}\n错误信息:{error_msg}"
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 解发事件
|
||
self.eventmanager.send_event(EventType.DownloadFail, item.to_dict())
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "download_fail" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=title,
|
||
text=text,
|
||
image=item.get_message_image()
|
||
)
|
||
|
||
def send_rss_success_message(self, in_from: Enum, media_info):
|
||
"""
|
||
发送订阅成功的消息
|
||
"""
|
||
if media_info.type == MediaType.MOVIE:
|
||
msg_title = f"{media_info.get_title_string()} 已添加订阅"
|
||
else:
|
||
msg_title = f"{media_info.get_title_string()} {media_info.get_season_string()} 已添加订阅"
|
||
msg_str = f"类型:{media_info.type.value}"
|
||
if media_info.vote_average:
|
||
msg_str = f"{msg_str},{media_info.get_vote_string()}"
|
||
msg_str = f"{msg_str},来自:{in_from.value}"
|
||
if media_info.user_name:
|
||
msg_str = f"{msg_str},用户:{media_info.user_name}"
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=msg_title, content=msg_str)
|
||
# 解发事件
|
||
self.eventmanager.send_event(EventType.SubscribeAdd, media_info.to_dict())
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "rss_added" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=msg_title,
|
||
text=msg_str,
|
||
image=media_info.get_message_image(),
|
||
url='movie_rss' if media_info.type == MediaType.MOVIE else 'tv_rss'
|
||
)
|
||
|
||
def send_rss_finished_message(self, media_info):
|
||
"""
|
||
发送订阅完成的消息,只针对电视剧
|
||
"""
|
||
if media_info.type == MediaType.MOVIE:
|
||
return
|
||
else:
|
||
if media_info.over_edition:
|
||
msg_title = f"{media_info.get_title_string()} {media_info.get_season_string()} 已完成洗版"
|
||
else:
|
||
msg_title = f"{media_info.get_title_string()} {media_info.get_season_string()} 已完成订阅"
|
||
msg_str = f"类型:{media_info.type.value}"
|
||
if media_info.vote_average:
|
||
msg_str = f"{msg_str},{media_info.get_vote_string()}"
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=msg_title, content=msg_str)
|
||
# 解发事件
|
||
self.eventmanager.send_event(EventType.SubscribeFinished, media_info.to_dict())
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "rss_finished" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=msg_title,
|
||
text=msg_str,
|
||
image=media_info.get_message_image(),
|
||
url='downloaded'
|
||
)
|
||
|
||
def send_site_signin_message(self, msgs: list):
|
||
"""
|
||
发送站点签到消息
|
||
"""
|
||
if not msgs:
|
||
return
|
||
title = "站点签到"
|
||
text = "\n".join(msgs)
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "site_signin" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=title,
|
||
text=text
|
||
)
|
||
|
||
def send_site_message(self, title=None, text=None):
|
||
"""
|
||
发送站点消息
|
||
"""
|
||
if not title:
|
||
return
|
||
if not text:
|
||
text = ""
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "site_message" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=title,
|
||
text=text
|
||
)
|
||
|
||
def send_transfer_fail_message(self, path, count, text):
|
||
"""
|
||
发送转移失败的消息
|
||
"""
|
||
if not path or not count:
|
||
return
|
||
title = f"【{count} 个文件入库失败】"
|
||
text = f"源路径:{path}\n原因:{text}"
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 解发事件
|
||
self.eventmanager.send_event(EventType.TransferFail,
|
||
{"path": path, "count": count, "reason": text})
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "transfer_fail" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=title,
|
||
text=text,
|
||
url="unidentification"
|
||
)
|
||
|
||
def send_brushtask_remove_message(self, title, text):
|
||
"""
|
||
发送刷流删种的消息
|
||
"""
|
||
if not title or not text:
|
||
return
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "brushtask_remove" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=title,
|
||
text=text,
|
||
url="brushtask"
|
||
)
|
||
|
||
def send_brushtask_added_message(self, title, text):
|
||
"""
|
||
发送刷流下种的消息
|
||
"""
|
||
if not title or not text:
|
||
return
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "brushtask_added" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=title,
|
||
text=text,
|
||
url="brushtask"
|
||
)
|
||
|
||
def send_mediaserver_message(self, event_info: dict, channel, image_url):
|
||
"""
|
||
发送媒体服务器的消息
|
||
:param event_info: 事件信息
|
||
:param channel: 服务器类型:
|
||
:param image_url: 图片
|
||
"""
|
||
if not event_info or not channel:
|
||
return
|
||
# 拼装消息内容
|
||
_webhook_actions = {
|
||
"system.webhooktest": "测试",
|
||
"playback.start": "开始播放",
|
||
"playback.stop": "停止播放",
|
||
"user.authenticated": "登录成功",
|
||
"user.authenticationfailed": "登录失败",
|
||
"media.play": "开始播放",
|
||
"media.stop": "停止播放",
|
||
"PlaybackStart": "开始播放",
|
||
"PlaybackStop": "停止播放",
|
||
"item.rate": "标记了",
|
||
}
|
||
_webhook_images = {
|
||
"Emby": "https://emby.media/notificationicon.png",
|
||
"Plex": "https://www.plex.tv/wp-content/uploads/2022/04/new-logo-process-lines-gray.png",
|
||
"Jellyfin": "https://play-lh.googleusercontent.com/SCsUK3hCCRqkJbmLDctNYCfehLxsS4ggD1ZPHIFrrAN1Tn9yhjmGMPep2D9lMaaa9eQi"
|
||
}
|
||
|
||
if not _webhook_actions.get(event_info.get('event')):
|
||
return
|
||
|
||
# 消息标题
|
||
if event_info.get('item_type') == "TV":
|
||
message_title = f"{_webhook_actions.get(event_info.get('event'))}剧集 {event_info.get('item_name')}"
|
||
elif event_info.get('item_type') == "MOV":
|
||
message_title = f"{_webhook_actions.get(event_info.get('event'))}电影 {event_info.get('item_name')}"
|
||
else:
|
||
message_title = f"{_webhook_actions.get(event_info.get('event'))}"
|
||
|
||
# 消息内容
|
||
if {event_info.get('user_name')}:
|
||
message_texts = [f"用户:{event_info.get('user_name')}"]
|
||
if event_info.get('device_name'):
|
||
message_texts.append(f"设备:{event_info.get('client')} {event_info.get('device_name')}")
|
||
if event_info.get('ip'):
|
||
message_texts.append(f"位置:{event_info.get('ip')} {WebUtils.get_location(event_info.get('ip'))}")
|
||
if event_info.get('percentage'):
|
||
percentage = round(float(event_info.get('percentage')), 2)
|
||
message_texts.append(f"进度:{percentage}%")
|
||
if event_info.get('overview'):
|
||
message_texts.append(f"剧情:{event_info.get('overview')}")
|
||
message_texts.append(f"时间:{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))}")
|
||
|
||
# 消息图片
|
||
if not image_url:
|
||
image_url = _webhook_images.get(channel)
|
||
|
||
# 插入消息中心
|
||
message_content = "\n".join(message_texts)
|
||
self.messagecenter.insert_system_message(level="INFO", title=message_title, content=message_content)
|
||
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "mediaserver_message" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=message_title,
|
||
text=message_content,
|
||
image=image_url
|
||
)
|
||
|
||
def send_custom_message(self, title, text="", image=""):
|
||
"""
|
||
发送自定义消息
|
||
"""
|
||
if not title:
|
||
return
|
||
# 插入消息中心
|
||
self.messagecenter.insert_system_message(level="INFO", title=title, content=text)
|
||
# 发送消息
|
||
for client in self._active_clients:
|
||
if "custom_message" in client.get("switchs"):
|
||
self.__sendmsg(
|
||
client=client,
|
||
title=title,
|
||
text=text,
|
||
image=image
|
||
)
|
||
|
||
def get_message_client_info(self, cid=None):
|
||
"""
|
||
获取消息端信息
|
||
"""
|
||
if cid:
|
||
return self._client_configs.get(str(cid))
|
||
return self._client_configs
|
||
|
||
def get_interactive_client(self, client_type=None):
|
||
"""
|
||
查询当前可以交互的渠道
|
||
"""
|
||
if client_type:
|
||
return self._active_interactive_clients.get(client_type)
|
||
else:
|
||
return [client for client in self._active_interactive_clients.values()]
|
||
|
||
@staticmethod
|
||
def get_search_types():
|
||
"""
|
||
查询可交互的渠道
|
||
"""
|
||
return [info.get("search_type")
|
||
for info in ModuleConf.MESSAGE_CONF.get('client').values()
|
||
if info.get('search_type')]
|