Port of Pyopener VSI plugin improvements in Fiona 1.10b2 (#3113)

* Port of Pyopener VSI plugin improvements in Fiona 1.10b2

* Don't use cpp flags for _vsiopener

* Update change log

* Update rasterio/_vsiopener.pyx

Co-authored-by: Alan D. Snow <alansnow21@gmail.com>

* Add threading and registration tests

---------

Co-authored-by: Alan D. Snow <alansnow21@gmail.com>
This commit is contained in:
Sean Gillies 2024-07-12 16:00:23 -06:00 committed by GitHub
parent f3018c78a4
commit 5d65401be8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 240 additions and 101 deletions

View File

@ -6,6 +6,12 @@ Next (TBD)
Bug fixes:
- The Pyopener registry and VSI plugin have been rewritten to avoid filename
conflicts and to be compatible with multithreading. Now, a new plugin handler
is registered for each instance of using an opener (#3113). Before GDAL 3.9.0
plugin handlers cannot not be removed and so it may be observed that the size
of the Pyopener registry grows during the execution of a program.
- A CSLConstList ctypedef has been added and is used where appropriate (#3113).
- Missing parentheses in the denominators of the max_pixels calculation in
calc() and merge() have been added (#3073, #3076). This constrains memory
use as originally intended.

View File

@ -20,7 +20,6 @@ import threading
from rasterio._err import CPLE_BaseError
from rasterio._err cimport exc_wrap_ogrerr, exc_wrap_int
from rasterio._filepath cimport install_filepath_plugin
from rasterio._vsiopener cimport install_pyopener_plugin
from rasterio._version import gdal_version
from libc.stdio cimport stderr
@ -378,7 +377,6 @@ cdef class GDALEnv(ConfigEnv):
GDALAllRegister()
OGRRegisterAll()
install_filepath_plugin(filepath_plugin)
install_pyopener_plugin(pyopener_plugin)
if 'GDAL_DATA' in os.environ:
log.debug("GDAL_DATA found in environment.")

View File

@ -1,4 +1 @@
include "gdal.pxi"
cdef int install_pyopener_plugin(VSIFilesystemPluginCallbacksStruct *callbacks_struct)
cdef void uninstall_pyopener_plugin(VSIFilesystemPluginCallbacksStruct *callbacks_struct)

View File

@ -1,18 +1,15 @@
# cython: language_level=3, boundscheck=False
# distutils: language = c++
"""Bridge between Python file openers and GDAL VSI.
Based on _filepath.pyx.
"""
include "gdal.pxi"
import contextlib
from contextvars import ContextVar
import logging
import os
from pathlib import Path
import stat
from uuid import uuid4
from libc.string cimport memcpy
@ -20,10 +17,7 @@ from rasterio.errors import OpenerRegistrationError
log = logging.getLogger(__name__)
# Prefix for all in-memory paths used by GDAL's VSI system
# Except for errors and log messages this shouldn't really be seen by the user
cdef str PREFIX = "/vsiriopener/"
cdef bytes PREFIX_BYTES = PREFIX.encode("utf-8")
cdef str VSI_NS_ROOT = "vsiriopener"
# This is global state for the Python filesystem plugin. It currently only
# contains path -> PyOpenerBase (or subclass) instances. This is used by
@ -36,38 +30,12 @@ _OPEN_FILE_EXIT_STACKS = ContextVar("open_file_exit_stacks")
_OPEN_FILE_EXIT_STACKS.set({})
cdef int install_pyopener_plugin(VSIFilesystemPluginCallbacksStruct *callbacks_struct):
"""Install handlers for python file openers if it isn't already installed."""
cdef char **registered_prefixes = VSIGetFileSystemsPrefixes()
cdef int prefix_index = CSLFindString(registered_prefixes, PREFIX_BYTES)
CSLDestroy(registered_prefixes)
if prefix_index < 0:
log.debug("Installing Python opener handler plugin...")
callbacks_struct = VSIAllocFilesystemPluginCallbacksStruct()
callbacks_struct.open = <VSIFilesystemPluginOpenCallback>pyopener_open
callbacks_struct.eof = <VSIFilesystemPluginEofCallback>pyopener_eof
callbacks_struct.tell = <VSIFilesystemPluginTellCallback>pyopener_tell
callbacks_struct.seek = <VSIFilesystemPluginSeekCallback>pyopener_seek
callbacks_struct.read = <VSIFilesystemPluginReadCallback>pyopener_read
callbacks_struct.write = <VSIFilesystemPluginWriteCallback>pyopener_write
callbacks_struct.flush = <VSIFilesystemPluginFlushCallback>pyopener_flush
callbacks_struct.close = <VSIFilesystemPluginCloseCallback>pyopener_close
callbacks_struct.read_dir = <VSIFilesystemPluginReadDirCallback>pyopener_read_dir
callbacks_struct.stat = <VSIFilesystemPluginStatCallback>pyopener_stat
callbacks_struct.pUserData = <void*>_OPENER_REGISTRY
retval = VSIInstallPluginHandler(PREFIX_BYTES, callbacks_struct)
VSIFreeFilesystemPluginCallbacksStruct(callbacks_struct)
return retval
else:
return 0
cdef void uninstall_pyopener_plugin(VSIFilesystemPluginCallbacksStruct *callbacks_struct):
if callbacks_struct is not NULL:
callbacks_struct.pUserData = NULL
VSIFreeFilesystemPluginCallbacksStruct(callbacks_struct)
callbacks_struct = NULL
# When an opener is registered for a path, this structure captures the
# path and unique registration instance. VSI stat, read_dir, and open
# calls have access to the struct instance.
cdef struct FSData:
char *path
char *uuid
cdef int pyopener_stat(
@ -77,14 +45,20 @@ cdef int pyopener_stat(
int nFlags
) with gil:
"""Provides POSIX stat data to GDAL from a Python filesystem."""
# Convert the given filename to a registry key.
# Reminder: openers are registered by URI scheme, authority, and
# *directory* path.
cdef FSData *fsdata = <FSData *>pUserData
path = fsdata.path.decode("utf-8")
uuid = fsdata.uuid.decode("utf-8")
key = (Path(path), uuid)
urlpath = pszFilename.decode("utf-8")
key = Path(urlpath).parent
registry = _OPENER_REGISTRY.get()
log.debug("Looking up opener in pyopener_stat: registry=%r, key=%r", registry, key)
log.debug(
"Looking up opener in pyopener_stat: urlpath=%r, registry=%r, key=%r",
urlpath,
registry,
key
)
try:
file_opener = registry[key]
except KeyError as err:
@ -94,15 +68,15 @@ cdef int pyopener_stat(
try:
if file_opener.isfile(urlpath):
fmode = 0o170000 | stat.S_IFREG
fmode = stat.S_IFREG
elif file_opener.isdir(urlpath):
fmode = 0o170000 | stat.S_IFDIR
fmode = stat.S_IFDIR
else:
# No such file or directory.
return -1
size = file_opener.size(urlpath)
mtime = file_opener.mtime(urlpath)
except (FileNotFoundError, KeyError):
except (FileNotFoundError, KeyError) as err:
# No such file or directory.
return -1
except Exception as err:
@ -116,17 +90,64 @@ cdef int pyopener_stat(
return 0
cdef int pyopener_unlink(
void *pUserData,
const char *pszFilename,
) with gil:
"""Unlink a file from a Python filesystem."""
cdef FSData *fsdata = <FSData *>pUserData
path = fsdata.path.decode("utf-8")
uuid = fsdata.uuid.decode("utf-8")
key = (Path(path), uuid)
urlpath = pszFilename.decode("utf-8")
registry = _OPENER_REGISTRY.get()
log.debug(
"Looking up opener in pyopener_unlink: urlpath=%r, registry=%r, key=%r",
urlpath,
registry,
key
)
try:
file_opener = registry[key]
except KeyError as err:
errmsg = f"Opener not found: {repr(err)}".encode("utf-8")
CPLError(CE_Failure, <CPLErrorNum>4, <const char *>"%s", <const char *>errmsg)
return -1
try:
file_opener.rm(urlpath)
return 0
except (FileNotFoundError, KeyError) as err:
# No such file or directory.
return -1
except Exception as err:
errmsg = f"Opener failed to determine file info: {err!r}".encode("utf-8")
CPLError(CE_Failure, <CPLErrorNum>4, <const char *>"%s", <const char *>errmsg)
return -1
cdef char ** pyopener_read_dir(
void *pUserData,
const char *pszDirname,
int nMaxFiles
) with gil:
"""Provides a directory listing to GDAL from a Python filesystem."""
cdef FSData *fsdata = <FSData *>pUserData
path = fsdata.path.decode("utf-8")
uuid = fsdata.uuid.decode("utf-8")
key = (Path(path), uuid)
urlpath = pszDirname.decode("utf-8")
key = Path(urlpath)
registry = _OPENER_REGISTRY.get()
log.debug("Looking up opener in pyopener_read_dir: registry=%r, key=%r", registry, key)
log.debug(
"Looking up opener in pyopener_read_dir: urlpath=%r, registry=%r, key=%r",
urlpath,
registry,
key
)
try:
file_opener = registry[key]
except KeyError as err:
@ -137,8 +158,7 @@ cdef char ** pyopener_read_dir(
try:
# GDAL wants relative file names.
contents = [Path(item).name for item in file_opener.ls(urlpath)]
log.debug("Looking for dir contents: urlpath=%r, contents=%r", urlpath, contents)
except (FileNotFoundError, KeyError):
except (FileNotFoundError, KeyError) as err:
# No such file or directory.
return NULL
except Exception as err:
@ -166,12 +186,24 @@ cdef void* pyopener_open(
GDAL may call this function multiple times per filename and each
result must be seperately seekable.
"""
cdef FSData *fsdata = <FSData *>pUserData
path = fsdata.path.decode("utf-8")
uuid = fsdata.uuid.decode("utf-8")
key = (Path(path), uuid)
urlpath = pszFilename.decode("utf-8")
mode = pszAccess.decode("utf-8")
key = Path(urlpath).parent
if not "b" in mode:
mode += "b"
registry = _OPENER_REGISTRY.get()
log.debug("Looking up opener in pyopener_open: registry=%r, key=%r", registry, key)
log.debug(
"Looking up opener in pyopener_open: urlpath=%r, registry=%r, key=%r",
urlpath,
registry,
key
)
try:
file_opener = registry[key]
except KeyError as err:
@ -202,7 +234,6 @@ cdef void* pyopener_open(
try:
file_obj = stack.enter_context(file_obj)
except (AttributeError, TypeError) as err:
log.error("File object is not a context manager: file_obj=%r", file_obj)
errmsg = f"Opener failed to open file with arguments ({repr(urlpath)}, {repr(mode)}): {repr(err)}".encode("utf-8")
CPLError(CE_Failure, <CPLErrorNum>4, <const char *>"%s", <const char *>errmsg)
return NULL
@ -210,10 +241,9 @@ cdef void* pyopener_open(
errmsg = "OpenFile didn't resolve".encode("utf-8")
return NULL
else:
exit_stacks = _OPEN_FILE_EXIT_STACKS.get()
exit_stacks = _OPEN_FILE_EXIT_STACKS.get({})
exit_stacks[file_obj] = stack
_OPEN_FILE_EXIT_STACKS.set(exit_stacks)
log.debug("Returning: file_obj=%r", file_obj)
return <void *>file_obj
@ -244,7 +274,7 @@ cdef size_t pyopener_read(void *pFile, void *pBuffer, size_t nSize, size_t nCoun
cdef int num_bytes = len(python_data)
# NOTE: We have to cast to char* first, otherwise Cython doesn't do the conversion properly
memcpy(pBuffer, <void*><unsigned char*>python_data, num_bytes)
return <size_t>(num_bytes // nSize)
return <size_t>(num_bytes / nSize)
cdef size_t pyopener_write(void *pFile, void *pBuffer, size_t nSize, size_t nCount) with gil:
@ -253,12 +283,16 @@ cdef size_t pyopener_write(void *pFile, void *pBuffer, size_t nSize, size_t nCou
cdef object file_obj = <object>pFile
buffer_len = nSize * nCount
cdef unsigned char [:] buff_view = <unsigned char[:buffer_len]>pBuffer
log.debug("Writing data: file_obj=%r, buff_view=%r, buffer_len=%r", file_obj, buff_view, buffer_len)
log.debug(
"Writing data: file_obj=%r, buff_view=%r, buffer_len=%r",
file_obj,
buff_view,
buffer_len)
try:
num_bytes = file_obj.write(buff_view)
num = file_obj.write(buff_view)
except TypeError:
num_bytes = file_obj.write(str(buff_view))
return <size_t>(num_bytes // nSize)
num = file_obj.write(str(buff_view))
return <size_t>(num // nSize)
cdef int pyopener_flush(void *pFile) with gil:
@ -283,32 +317,82 @@ cdef int pyopener_close(void *pFile) with gil:
@contextlib.contextmanager
def _opener_registration(urlpath, obj):
key = Path(urlpath).parent
cdef char **registered_prefixes = NULL
cdef int prefix_index = 0
cdef VSIFilesystemPluginCallbacksStruct *callbacks_struct = NULL
cdef FSData fsdata
cdef char *path_c = NULL
cdef char *uuid_c = NULL
# To resolve issue 1406 we add the opener or filesystem id to the
# registry key.
kpath = Path(urlpath).parent
kid = uuid4().hex
key = (kpath, kid)
path_b = kpath.as_posix().encode("utf-8")
path_c = path_b
uuid_b = kid.encode("utf-8")
uuid_c = uuid_b
fsdata = FSData(path_c, uuid_c)
namespace = f"{VSI_NS_ROOT}_{kid}"
cdef bytes prefix_bytes = f"/{namespace}/".encode("utf-8")
# Might raise.
opener = _create_opener(obj)
registry = _OPENER_REGISTRY.get()
registry = _OPENER_REGISTRY.get({})
if key in registry:
if registry[key] != opener:
raise OpenerRegistrationError(f"Opener already registered for urlpath.")
else:
try:
yield f"{PREFIX}{urlpath}"
yield f"/{namespace}/{urlpath}"
finally:
registry = _OPENER_REGISTRY.get()
_ = registry.pop(key, None)
_OPENER_REGISTRY.set(registry)
else:
# Install handler.
registered_prefixes = VSIGetFileSystemsPrefixes()
prefix_index = CSLFindString(<CSLConstList>registered_prefixes, prefix_bytes)
CSLDestroy(registered_prefixes)
if prefix_index < 0:
log.debug("Installing Python opener handler plugin: prefix_bytes=%r", prefix_bytes)
callbacks_struct = VSIAllocFilesystemPluginCallbacksStruct()
callbacks_struct.open = <VSIFilesystemPluginOpenCallback>pyopener_open
callbacks_struct.eof = <VSIFilesystemPluginEofCallback>pyopener_eof
callbacks_struct.tell = <VSIFilesystemPluginTellCallback>pyopener_tell
callbacks_struct.seek = <VSIFilesystemPluginSeekCallback>pyopener_seek
callbacks_struct.read = <VSIFilesystemPluginReadCallback>pyopener_read
callbacks_struct.write = <VSIFilesystemPluginWriteCallback>pyopener_write
callbacks_struct.flush = <VSIFilesystemPluginFlushCallback>pyopener_flush
callbacks_struct.close = <VSIFilesystemPluginCloseCallback>pyopener_close
callbacks_struct.read_dir = <VSIFilesystemPluginReadDirCallback>pyopener_read_dir
callbacks_struct.stat = <VSIFilesystemPluginStatCallback>pyopener_stat
callbacks_struct.unlink = <VSIFilesystemPluginUnlinkCallback>pyopener_unlink
callbacks_struct.pUserData = &fsdata
retval = VSIInstallPluginHandler(prefix_bytes, callbacks_struct)
VSIFreeFilesystemPluginCallbacksStruct(callbacks_struct)
registry[key] = opener
_OPENER_REGISTRY.set(registry)
try:
yield f"{PREFIX}{urlpath}"
yield f"/{namespace}/{urlpath}"
finally:
registry = _OPENER_REGISTRY.get()
_ = registry.pop(key, None)
_OPENER_REGISTRY.set(registry)
IF (CTE_GDAL_MAJOR_VERSION, CTE_GDAL_MINOR_VERSION) >= (3, 9):
retval = VSIRemovePluginHandler(prefix_bytes)
class _AbstractOpener:
"""Adapts a Python object to the opener interface."""
@ -385,6 +469,19 @@ class _AbstractOpener:
Modification timestamp in seconds.
"""
raise NotImplementedError
def rm(self, path):
"""Remove a resource.
Parameters
----------
path : str
The identifier/locator for a resource within a filesystem.
Returns
-------
None
"""
raise NotImplementedError
def size(self, path):
"""Get the size, in bytes, of a resource..
@ -431,14 +528,16 @@ class _FilesystemOpener(_AbstractOpener):
def isdir(self, path):
return self._obj.isdir(path)
def ls(self, path):
return self._obj.ls(path)
# return value of ls() varies between file and zip fsspec filesystems.
return [item if isinstance(item, str) else item["filename"] for item in self._obj.ls(path)]
def mtime(self, path):
try:
mtime = int(self._obj.modified(path).timestamp())
except NotImplementedError:
mtime = 0
log.debug("Modification time: mtime=%r", mtime)
return mtime
def rm(self, path):
return self._obj.rm(path)
def size(self, path):
return self._obj.size(path)
@ -451,6 +550,8 @@ class _AltFilesystemOpener(_FilesystemOpener):
return self._obj.is_dir(path)
def mtime(self, path):
return 0
def rm(self, path):
self._obj.remove_file(path)
def size(self, path):
return self._obj.file_size(path)

View File

@ -13,6 +13,10 @@ cdef extern from "cpl_conv.h" nogil:
const char *CPLFindFile(const char *pszClass, const char *pszBasename)
cdef extern from "cpl_port.h":
ctypedef char **CSLConstList
cdef extern from "cpl_error.h" nogil:
ctypedef enum CPLErr:
@ -142,8 +146,13 @@ cdef extern from "cpl_vsi.h" nogil:
size_t VSIFWriteL(void *buffer, size_t nSize, size_t nCount, VSILFILE *fp)
int VSIStatL(const char *pszFilename, VSIStatBufL *psStatBuf)
cdef extern from "ogr_srs_api.h" nogil:
IF (CTE_GDAL_MAJOR_VERSION, CTE_GDAL_MINOR_VERSION) >= (3, 9):
cdef extern from "cpl_vsi.h" nogil:
int VSIRemovePluginHandler(const char*)
cdef extern from "ogr_srs_api.h" nogil:
ctypedef int OGRErr
ctypedef void * OGRCoordinateTransformationH
ctypedef void * OGRSpatialReferenceH

View File

@ -190,26 +190,24 @@ try:
except Exception:
pass
# GDAL 2.3 and newer requires C++11
if (gdal_major_version, gdal_minor_version) >= (2, 3):
cpp11_flag = '-std=c++11'
cpp11_flag = '-std=c++11'
# 'extra_compile_args' may not be defined
eca = cpp_ext_options.get('extra_compile_args', [])
# 'extra_compile_args' may not be defined
eca = cpp_ext_options.get('extra_compile_args', [])
if platform.system() == 'Darwin':
if platform.system() == 'Darwin':
if cpp11_flag not in eca:
eca.append(cpp11_flag)
eca += [cpp11_flag, '-mmacosx-version-min=10.9', '-stdlib=libc++']
# TODO: Windows
elif cpp11_flag not in eca:
if cpp11_flag not in eca:
eca.append(cpp11_flag)
cpp_ext_options['extra_compile_args'] = eca
eca += [cpp11_flag, '-mmacosx-version-min=10.9', '-stdlib=libc++']
# TODO: Windows
elif cpp11_flag not in eca:
eca.append(cpp11_flag)
cpp_ext_options['extra_compile_args'] = eca
# Configure optional Cython coverage.
cythonize_options = {"language_level": sys.version_info[0]}
@ -238,7 +236,7 @@ if "clean" not in sys.argv:
Extension("rasterio._transform", ["rasterio/_transform.pyx"], **ext_options),
Extension("rasterio._filepath", ["rasterio/_filepath.pyx"], **cpp_ext_options),
Extension(
"rasterio._vsiopener", ["rasterio/_vsiopener.pyx"], **cpp_ext_options
"rasterio._vsiopener", ["rasterio/_vsiopener.pyx"], **ext_options
),
]
ext_modules = cythonize(

View File

@ -2,6 +2,9 @@
import io
import os
from pathlib import Path
from threading import Thread
import zipfile
import fsspec
@ -13,17 +16,6 @@ from rasterio.enums import MaskFlags
from rasterio.errors import OpenerRegistrationError
def test_registration_failure():
"""Exception is raised on attempt to register a second opener for a filename and mode."""
with pytest.raises(OpenerRegistrationError) as exc_info:
with rasterio.open(
"tests/data/RGB.byte.tif", opener=io.open
) as a, rasterio.open("tests/data/RGB.byte.tif", opener=fsspec.open) as b:
pass
assert exc_info.value.args[0] == "Opener already registered for urlpath."
def test_opener_failure():
"""Use int as an opener :)"""
with pytest.raises(OpenerRegistrationError) as exc_info:
@ -195,3 +187,41 @@ def test_opener_tiledb_vfs():
profile = src.profile
assert profile["driver"] == "GTiff"
assert profile["count"] == 3
def test_delete_on_overwrite(data):
"""Opener can delete dataset when overwriting."""
fs = fsspec.filesystem("file")
outputfile = os.path.join(str(data), "RGB.byte.tif")
with rasterio.open(outputfile, opener=fs) as dst:
profile = dst.profile
# No need to write any data, as open() will error if VSI unlinking
# isn't implemented.
with rasterio.open(outputfile, "w", opener=fs, **profile) as dst:
pass
@pytest.mark.parametrize("opener", [io.open, fsspec.filesystem("file")])
def test_opener_registration(opener):
"""Opener is correctly registered."""
from rasterio._vsiopener import _OPENER_REGISTRY, _opener_registration
with _opener_registration("tests/data/RGB.byte.tif", opener) as registered_vsi_path:
assert registered_vsi_path.startswith("/vsiriopener_")
key = (Path("tests/data"), registered_vsi_path.split("/")[1].split("_")[1])
val = _OPENER_REGISTRY.get()[key]
assert val.isfile
assert val._obj == opener
def test_threads_context():
"""Threads have opener registries."""
def target():
with rasterio.open("tests/data/RGB.byte.tif", opener=io.open) as dst:
assert dst.count == 3
thread = Thread(target=target)
thread.start()
thread.join()