325 lines
10 KiB
Python
325 lines
10 KiB
Python
import datetime
|
|
import os
|
|
import platform
|
|
import shutil
|
|
import subprocess
|
|
|
|
from app.utils.path_utils import PathUtils
|
|
from app.utils.exception_utils import ExceptionUtils
|
|
from app.utils.types import OsType
|
|
from config import WEBDRIVER_PATH
|
|
|
|
|
|
class SystemUtils:
|
|
|
|
@staticmethod
|
|
def __get_hidden_shell():
|
|
if os.name == "nt":
|
|
st = subprocess.STARTUPINFO()
|
|
st.dwFlags = subprocess.STARTF_USESHOWWINDOW
|
|
st.wShowWindow = subprocess.SW_HIDE
|
|
return st
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def get_used_of_partition(path):
|
|
"""
|
|
获取系统存储空间占用信息
|
|
"""
|
|
if not path:
|
|
return 0, 0
|
|
if not os.path.exists(path):
|
|
return 0, 0
|
|
try:
|
|
total_b, used_b, free_b = shutil.disk_usage(path)
|
|
return used_b, total_b
|
|
except Exception as e:
|
|
ExceptionUtils.exception_traceback(e)
|
|
return 0, 0
|
|
|
|
@staticmethod
|
|
def get_system():
|
|
"""
|
|
获取操作系统类型
|
|
"""
|
|
if SystemUtils.is_windows():
|
|
return OsType.WINDOWS
|
|
elif SystemUtils.is_synology():
|
|
return OsType.SYNOLOGY
|
|
elif SystemUtils.is_docker():
|
|
return OsType.DOCKER
|
|
elif SystemUtils.is_macos():
|
|
return OsType.MACOS
|
|
else:
|
|
return OsType.LINUX
|
|
|
|
@staticmethod
|
|
def get_free_space_gb(folder):
|
|
"""
|
|
计算目录剩余空间大小
|
|
"""
|
|
total_b, used_b, free_b = shutil.disk_usage(folder)
|
|
return free_b / 1024 / 1024 / 1024
|
|
|
|
@staticmethod
|
|
def get_local_time(utc_time_str):
|
|
"""
|
|
通过UTC的时间字符串获取时间
|
|
"""
|
|
try:
|
|
utc_date = datetime.datetime.strptime(utc_time_str.replace('0000', ''), '%Y-%m-%dT%H:%M:%S.%fZ')
|
|
local_date = utc_date + datetime.timedelta(hours=8)
|
|
local_date_str = datetime.datetime.strftime(local_date, '%Y-%m-%d %H:%M:%S')
|
|
except Exception as e:
|
|
ExceptionUtils.exception_traceback(e)
|
|
return utc_time_str
|
|
return local_date_str
|
|
|
|
@staticmethod
|
|
def check_process(pname):
|
|
"""
|
|
检查进程序是否存在
|
|
"""
|
|
if not pname:
|
|
return False
|
|
text = subprocess.Popen('ps -ef | grep -v grep | grep %s' % pname, shell=True).communicate()
|
|
return True if text else False
|
|
|
|
@staticmethod
|
|
def execute(cmd):
|
|
"""
|
|
执行命令,获得返回结果
|
|
"""
|
|
try:
|
|
with os.popen(cmd) as p:
|
|
return p.readline().strip()
|
|
except Exception as err:
|
|
print(str(err))
|
|
return ""
|
|
|
|
@staticmethod
|
|
def is_docker():
|
|
return os.path.exists('/.dockerenv')
|
|
|
|
@staticmethod
|
|
def is_synology():
|
|
if SystemUtils.is_windows():
|
|
return False
|
|
return True if "synology" in SystemUtils.execute('uname -a') else False
|
|
|
|
@staticmethod
|
|
def is_windows():
|
|
return True if os.name == "nt" else False
|
|
|
|
@staticmethod
|
|
def is_macos():
|
|
return True if platform.system() == 'Darwin' else False
|
|
|
|
@staticmethod
|
|
def is_lite_version():
|
|
return True if SystemUtils.is_docker() \
|
|
and os.environ.get("NASTOOL_VERSION") == "lite" else False
|
|
|
|
@staticmethod
|
|
def get_webdriver_path():
|
|
if SystemUtils.is_lite_version():
|
|
return None
|
|
else:
|
|
return WEBDRIVER_PATH.get(SystemUtils.get_system().value)
|
|
|
|
@staticmethod
|
|
def copy(src, dest):
|
|
"""
|
|
复制
|
|
"""
|
|
try:
|
|
shutil.copy2(os.path.normpath(src), os.path.normpath(dest))
|
|
return 0, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def move(src, dest):
|
|
"""
|
|
移动
|
|
"""
|
|
try:
|
|
tmp_file = os.path.normpath(os.path.join(os.path.dirname(src),
|
|
os.path.basename(dest)))
|
|
shutil.move(os.path.normpath(src), tmp_file)
|
|
shutil.move(tmp_file, os.path.normpath(dest))
|
|
return 0, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def link(src, dest):
|
|
"""
|
|
硬链接
|
|
"""
|
|
try:
|
|
if platform.release().find("-z4-") >= 0:
|
|
# 兼容极空间Z4
|
|
tmp = os.path.normpath(os.path.join(PathUtils.get_parent_paths(dest, 2),
|
|
os.path.basename(dest)))
|
|
os.link(os.path.normpath(src), tmp)
|
|
shutil.move(tmp, os.path.normpath(dest))
|
|
else:
|
|
os.link(os.path.normpath(src), os.path.normpath(dest))
|
|
return 0, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def softlink(src, dest):
|
|
"""
|
|
软链接
|
|
"""
|
|
try:
|
|
os.symlink(os.path.normpath(src), os.path.normpath(dest))
|
|
return 0, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def rclone_move(src, dest):
|
|
"""
|
|
Rclone移动
|
|
"""
|
|
try:
|
|
src = os.path.normpath(src)
|
|
dest = dest.replace("\\", "/")
|
|
retcode = subprocess.run(['rclone', 'moveto',
|
|
src,
|
|
f'NASTOOL:{dest}'],
|
|
startupinfo=SystemUtils.__get_hidden_shell()).returncode
|
|
return retcode, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def rclone_copy(src, dest):
|
|
"""
|
|
Rclone复制
|
|
"""
|
|
try:
|
|
src = os.path.normpath(src)
|
|
dest = dest.replace("\\", "/")
|
|
retcode = subprocess.run(['rclone', 'copyto',
|
|
src,
|
|
f'NASTOOL:{dest}'],
|
|
startupinfo=SystemUtils.__get_hidden_shell()).returncode
|
|
return retcode, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def minio_move(src, dest):
|
|
"""
|
|
Minio移动
|
|
"""
|
|
try:
|
|
src = os.path.normpath(src)
|
|
dest = dest.replace("\\", "/")
|
|
if dest.startswith("/"):
|
|
dest = dest[1:]
|
|
retcode = subprocess.run(['mc', 'mv',
|
|
'--recursive',
|
|
src,
|
|
f'NASTOOL/{dest}'],
|
|
startupinfo=SystemUtils.__get_hidden_shell()).returncode
|
|
return retcode, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def minio_copy(src, dest):
|
|
"""
|
|
Minio复制
|
|
"""
|
|
try:
|
|
src = os.path.normpath(src)
|
|
dest = dest.replace("\\", "/")
|
|
if dest.startswith("/"):
|
|
dest = dest[1:]
|
|
retcode = subprocess.run(['mc', 'cp',
|
|
'--recursive',
|
|
src,
|
|
f'NASTOOL/{dest}'],
|
|
startupinfo=SystemUtils.__get_hidden_shell()).returncode
|
|
return retcode, ""
|
|
except Exception as err:
|
|
ExceptionUtils.exception_traceback(err)
|
|
return -1, str(err)
|
|
|
|
@staticmethod
|
|
def get_windows_drives():
|
|
"""
|
|
获取Windows所有盘符
|
|
"""
|
|
vols = []
|
|
for i in range(65, 91):
|
|
vol = chr(i) + ':'
|
|
if os.path.isdir(vol):
|
|
vols.append(vol)
|
|
return vols
|
|
|
|
def find_hardlinks(self, file, fdir=None):
|
|
"""
|
|
查找文件的所有硬链接
|
|
"""
|
|
ret_files = []
|
|
if os.name == "nt":
|
|
ret = subprocess.run(
|
|
['fsutil', 'hardlink', 'list', file],
|
|
startupinfo=self.__get_hidden_shell(),
|
|
stdout=subprocess.PIPE
|
|
)
|
|
if ret.returncode != 0:
|
|
return []
|
|
if ret.stdout:
|
|
drive = os.path.splitdrive(file)[0]
|
|
link_files = ret.stdout.decode('GBK').replace('\\', '/').split('\r\n')
|
|
for link_file in link_files:
|
|
if link_file \
|
|
and "$RECYCLE.BIN" not in link_file \
|
|
and os.path.normpath(file) != os.path.normpath(f'{drive}{link_file}'):
|
|
link_file = f'{drive.upper()}{link_file}'
|
|
file_name = os.path.basename(link_file)
|
|
file_path = os.path.dirname(link_file)
|
|
ret_files.append({
|
|
"file": link_file,
|
|
"filename": file_name,
|
|
"filepath": file_path
|
|
})
|
|
else:
|
|
inode = os.stat(file).st_ino
|
|
if not fdir:
|
|
fdir = os.path.dirname(file)
|
|
stdout = subprocess.run(
|
|
['find', fdir, '-inum', str(inode)],
|
|
stdout=subprocess.PIPE
|
|
).stdout
|
|
if stdout:
|
|
link_files = stdout.decode('utf-8').split('\n')
|
|
for link_file in link_files:
|
|
if link_file \
|
|
and os.path.normpath(file) != os.path.normpath(link_file):
|
|
file_name = os.path.basename(link_file)
|
|
file_path = os.path.dirname(link_file)
|
|
ret_files.append({
|
|
"file": link_file,
|
|
"filename": file_name,
|
|
"filepath": file_path
|
|
})
|
|
|
|
return ret_files
|