Python file opener VSI plugin: read-multi-range support (#3117)

* add multi-range-read for VSIPlugin

* Update tests/test_pyopener.py

* Add and use abstract base classes in the opener implementation

* Silence flake8

---------

Co-authored-by: Sean Gillies <sean.gillies@gmail.com>
This commit is contained in:
Vincent Sarago 2024-07-31 00:30:52 +02:00 committed by GitHub
parent 9e643c3f56
commit 6e68bbb601
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 155 additions and 40 deletions

View File

@ -3,8 +3,11 @@
Based on _filepath.pyx.
"""
from abc import ABC, abstractmethod
from collections.abc import Callable
import contextlib
from contextvars import ContextVar
from functools import singledispatch
import logging
import os
from pathlib import Path
@ -277,6 +280,25 @@ cdef size_t pyopener_read(void *pFile, void *pBuffer, size_t nSize, size_t nCoun
return <size_t>(num_bytes / nSize)
cdef int pyopener_read_multi_range(void *pFile, int nRanges, void **ppData, vsi_l_offset *panOffsets, size_t *panSizes) except -1 with gil:
cdef object file_obj = <object>pFile
if not hasattr(file_obj, "get_byte_ranges"):
errmsg = "MultiRangeRead not implemented for Opener".encode("utf-8")
CPLError(CE_Failure, <CPLErrorNum>1, <const char *>"%s", <const char *>errmsg)
return -1
# NOTE: Convert panOffsets and panSizes to Python lists
cdef list offsets = [int(panOffsets[i]) for i in range(nRanges)]
cdef list sizes = [int(panSizes[i]) for i in range(nRanges)]
# NOTE: Call the Python method with the converted arguments
cdef list python_data = file_obj.get_byte_ranges(offsets, sizes)
for i in range(nRanges):
memcpy(ppData[i], <void*><char*>python_data[i], len(python_data[i]))
return 0
cdef size_t pyopener_write(void *pFile, void *pBuffer, size_t nSize, size_t nCount) with gil:
if pBuffer == NULL:
return -1
@ -351,8 +373,17 @@ def _opener_registration(urlpath, obj):
namespace = f"{VSI_NS_ROOT}_{kid}"
cdef bytes prefix_bytes = f"/{namespace}/".encode("utf-8")
# Might raise.
opener = _create_opener(obj)
opener = to_pyopener(obj)
# Before returning we do a quick check that the opener will
# plausibly function.
try:
_ = opener.size("test")
except (AttributeError, TypeError, ValueError) as err:
raise OpenerRegistrationError(f"Opener is invalid.") from err
except Exception:
# We expect the path to not resolve.
pass
registry = _OPENER_REGISTRY.get({})
@ -388,7 +419,12 @@ def _opener_registration(urlpath, obj):
callbacks_struct.read_dir = <VSIFilesystemPluginReadDirCallback>pyopener_read_dir
callbacks_struct.stat = <VSIFilesystemPluginStatCallback>pyopener_stat
callbacks_struct.unlink = <VSIFilesystemPluginUnlinkCallback>pyopener_unlink
if isinstance(opener, MultiByteRangeResourceContainer):
callbacks_struct.read_multi_range = <VSIFilesystemPluginReadMultiRangeCallback>pyopener_read_multi_range
callbacks_struct.pUserData = &fsdata
retval = VSIInstallPluginHandler(prefix_bytes, callbacks_struct)
VSIFreeFilesystemPluginCallbacksStruct(callbacks_struct)
@ -406,9 +442,10 @@ def _opener_registration(urlpath, obj):
retval = VSIRemovePluginHandler(prefix_bytes)
class _AbstractOpener:
"""Adapts a Python object to the opener interface."""
def open(self, path, mode="r", **kwds):
class FileContainer(ABC):
"""An object that can report on and open Python files."""
@abstractmethod
def open(self, path: str, mode: str = "r", **kwds):
"""Get a Python file object for a resource.
Parameters
@ -426,8 +463,10 @@ class _AbstractOpener:
A Python 'file' object with methods read/write, seek, tell,
etc.
"""
raise NotImplementedError
def isfile(self, path):
pass
@abstractmethod
def isfile(self, path: str) -> bool:
"""Test if the resource is a 'file', a sequence of bytes.
Parameters
@ -439,8 +478,10 @@ class _AbstractOpener:
-------
bool
"""
raise NotImplementedError
def isdir(self, path):
pass
@abstractmethod
def isdir(self, path: str) -> bool:
"""Test if the resource is a 'directory', a container.
Parameters
@ -452,8 +493,10 @@ class _AbstractOpener:
-------
bool
"""
raise NotImplementedError
def ls(self, path):
pass
@abstractmethod
def ls(self, path: str) -> list[str]:
"""Get a 'directory' listing.
Parameters
@ -466,8 +509,10 @@ class _AbstractOpener:
list of str
List of 'path' paths relative to the directory.
"""
raise NotImplementedError
def mtime(self, path):
pass
@abstractmethod
def mtime(self, path: str) -> int:
"""Get the mtime of a resource..
Parameters
@ -480,8 +525,10 @@ class _AbstractOpener:
int
Modification timestamp in seconds.
"""
raise NotImplementedError
def rm(self, path):
pass
@abstractmethod
def rm(self, path: str) -> None:
"""Remove a resource.
Parameters
@ -493,8 +540,10 @@ class _AbstractOpener:
-------
None
"""
raise NotImplementedError
def size(self, path):
pass
@abstractmethod
def size(self, path: str) -> int:
"""Get the size, in bytes, of a resource..
Parameters
@ -506,10 +555,26 @@ class _AbstractOpener:
-------
int
"""
raise NotImplementedError
pass
class _FileOpener(_AbstractOpener):
class MultiByteRangeResource(ABC):
"""An object that provides VSIFilesystemPluginReadMultiRangeCallback."""
@abstractmethod
def get_byte_ranges(self, offsets: list[int], sizes: list[int]) -> list[bytes]:
"""Get a sequence of bytes specified by a sequence of ranges."""
pass
class MultiByteRangeResourceContainer(FileContainer):
"""An object that can open a MultiByteRangeResource."""
@abstractmethod
def open(self, path: str, **kwds) -> MultiByteRangeResource:
"""Open the resource at the given path."""
pass
class _FileContainer(FileContainer):
"""Adapts a Python file object to the opener interface."""
def __init__(self, obj):
self._obj = obj
@ -523,13 +588,15 @@ class _FileOpener(_AbstractOpener):
return []
def mtime(self, path):
return 0
def rm(self, path):
pass
def size(self, path):
with self._obj(path) as f:
f.seek(0, os.SEEK_END)
return f.tell()
class _FilesystemOpener(_AbstractOpener):
class _FilesystemContainer(FileContainer):
"""Adapts an fsspec filesystem object to the opener interface."""
def __init__(self, obj):
self._obj = obj
@ -554,7 +621,7 @@ class _FilesystemOpener(_AbstractOpener):
return self._obj.size(path)
class _AltFilesystemOpener(_FilesystemOpener):
class _AltFilesystemContainer(_FilesystemContainer):
"""Adapts a tiledb virtual filesystem object to the opener interface."""
def isfile(self, path):
return self._obj.is_file(path)
@ -568,25 +635,20 @@ class _AltFilesystemOpener(_FilesystemOpener):
return self._obj.file_size(path)
def _create_opener(obj):
"""Adapt Python file and fsspec objects to the opener interface."""
if isinstance(obj, _AbstractOpener):
opener = obj
elif callable(obj):
opener = _FileOpener(obj)
elif hasattr(obj, "file_size"):
opener = _AltFilesystemOpener(obj)
@singledispatch
def to_pyopener(obj):
"""Adapt an object to the Pyopener interface."""
if hasattr(obj, "file_size"):
return _AltFilesystemContainer(obj)
else:
opener = _FilesystemOpener(obj)
return _FilesystemContainer(obj)
# Before returning we do a quick check that the opener will
# plausibly function.
try:
_ = opener.size("test")
except (AttributeError, TypeError, ValueError) as err:
raise OpenerRegistrationError(f"Opener is invalid.") from err
except Exception:
# We expect the path to not resolve.
pass
return opener
@to_pyopener.register(FileContainer)
def _(obj):
return obj
@to_pyopener.register(Callable)
def _(obj):
return _FileContainer(obj)

3
rasterio/abc.py Normal file
View File

@ -0,0 +1,3 @@
"""Abstract base classes."""
from rasterio._vsiopener import FileContainer, MultiByteRangeResourceContainer # noqa: F401

View File

@ -2,6 +2,7 @@
import io
import os
import warnings
from pathlib import Path
from threading import Thread
import zipfile
@ -272,3 +273,52 @@ def test_opener_fsspec_fs_tiff_threads_2():
assert profile["driver"] == "GTiff"
assert profile["count"] == 3
assert src.read().shape == (3, 718, 791)
def test_opener_multi_range_read():
"""Test with Opener with multi-range-read method."""
from rasterio.abc import MultiByteRangeResourceContainer
class CustomResource(io.FileIO):
"""Custom FileIO FS with `read_multi_range` method."""
def get_byte_ranges(
self,
offsets,
sizes,
):
warnings.warn("Using MultiRange Reads", UserWarning, stacklevel=2)
return [
self._read_range(offset, size) for (offset, size) in zip(offsets, sizes)
]
def _read_range(self, offset, size):
_ = self.seek(offset)
return self.read(size)
class CustomResourceContainer:
def open(self, path, mode="r", **kwds):
return CustomResource(path, mode=mode, **kwds)
def isfile(self, path):
return True
def isdir(self, path):
return False
def ls(self, path):
return []
def mtime(self, path):
return 0
def size(self, path):
with CustomResource(path) as f:
return f.size()
MultiByteRangeResourceContainer.register(CustomResourceContainer)
with rasterio.open(
"tests/data/RGB.byte.tif", opener=CustomResourceContainer()
) as src:
profile = src.profile
assert profile["driver"] == "GTiff"
assert profile["count"] == 3
# Should emit a multi-range read
with pytest.warns(UserWarning, match="Using MultiRange Reads"):
_ = src.read()