mirror of
https://github.com/rasterio/rasterio.git
synced 2025-12-08 17:36:12 +00:00
234 lines
8.2 KiB
Python
234 lines
8.2 KiB
Python
"""FilePath tests. MemoryFile requires GDAL 2.0+.
|
|
Tests in this file will ONLY run for GDAL >= 3.x"""
|
|
|
|
# TODO: delete at version 2.0. FilePath is deprecated in version 1.4.
|
|
|
|
from io import BytesIO
|
|
import logging
|
|
import os.path
|
|
|
|
import pytest
|
|
|
|
import rasterio
|
|
from rasterio.enums import MaskFlags
|
|
from rasterio.shutil import copyfiles
|
|
from rasterio.windows import Window
|
|
|
|
try:
|
|
from rasterio.io import FilePath
|
|
except ImportError:
|
|
pytest.skip("FilePath is not available for GDAL <3.0", allow_module_level=True)
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def rgb_lzw_file_object(path_rgb_lzw_byte_tif):
|
|
"""Get the open file of our RGB.bytes.tif file."""
|
|
return open(path_rgb_lzw_byte_tif, 'rb')
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def rgb_file_object(path_rgb_byte_tif):
|
|
"""Get RGB.bytes.tif file opened in 'rb' mode"""
|
|
return open(path_rgb_byte_tif, 'rb')
|
|
|
|
|
|
def test_initial_empty():
|
|
with pytest.raises(TypeError):
|
|
FilePath()
|
|
|
|
|
|
def test_initial_not_file_str():
|
|
"""Creating from not file-like fails."""
|
|
with pytest.raises(TypeError):
|
|
FilePath("lolwut")
|
|
|
|
|
|
def test_initial_not_file_bytes():
|
|
"""Creating from not file-like fails."""
|
|
with pytest.raises(TypeError):
|
|
FilePath(b'lolwut')
|
|
|
|
|
|
def test_initial_bytes(rgb_file_object):
|
|
"""FilePath contents can initialized from bytes and opened."""
|
|
with FilePath(rgb_file_object) as vsifile:
|
|
with vsifile.open() as src:
|
|
assert src.driver == 'GTiff'
|
|
assert src.count == 3
|
|
assert src.dtypes == ("uint8", "uint8", "uint8")
|
|
assert src.read().shape == (3, 718, 791)
|
|
|
|
|
|
def test_initial_bytes_boundless(rgb_file_object):
|
|
"""FilePath contents can initialized from bytes and opened."""
|
|
with FilePath(rgb_file_object) as vsifile:
|
|
with vsifile.open() as src:
|
|
assert src.driver == "GTiff"
|
|
assert src.count == 3
|
|
assert src.dtypes == ("uint8", "uint8", "uint8")
|
|
assert src.read(window=Window(0, 0, 800, 800), boundless=True).shape == (
|
|
3,
|
|
800,
|
|
800,
|
|
)
|
|
|
|
|
|
def test_filepath_vrt(rgb_file_object):
|
|
"""A FilePath can be wrapped by a VRT."""
|
|
from rasterio.vrt import _boundless_vrt_doc
|
|
|
|
with FilePath(rgb_file_object) as vsifile, vsifile.open() as dst:
|
|
vrt_doc = _boundless_vrt_doc(dst)
|
|
with rasterio.open(vrt_doc) as src:
|
|
assert src.driver == "VRT"
|
|
assert src.count == 3
|
|
assert src.dtypes == ('uint8', 'uint8', 'uint8')
|
|
assert src.read().shape == (3, 718, 791)
|
|
|
|
|
|
def test_initial_lzw_bytes(rgb_lzw_file_object):
|
|
"""FilePath contents can initialized from bytes and opened."""
|
|
with FilePath(rgb_lzw_file_object) as vsifile:
|
|
with vsifile.open() as src:
|
|
assert src.driver == 'GTiff'
|
|
assert src.count == 3
|
|
assert src.dtypes == ('uint8', 'uint8', 'uint8')
|
|
assert src.read().shape == (3, 718, 791)
|
|
|
|
|
|
def test_initial_file_object(rgb_file_object):
|
|
"""FilePath contents can initialized from bytes and opened."""
|
|
with FilePath(rgb_file_object) as vsifile:
|
|
with vsifile.open() as src:
|
|
assert src.driver == 'GTiff'
|
|
assert src.count == 3
|
|
assert src.dtypes == ('uint8', 'uint8', 'uint8')
|
|
assert src.read().shape == (3, 718, 791)
|
|
|
|
|
|
def test_closed(rgb_file_object):
|
|
"""A closed FilePath can not be opened."""
|
|
with FilePath(rgb_file_object) as vsifile:
|
|
pass
|
|
with pytest.raises(IOError):
|
|
vsifile.open()
|
|
|
|
|
|
def test_file_object_read(rgb_file_object):
|
|
"""An example of reading from a file object"""
|
|
with rasterio.open(rgb_file_object) as src:
|
|
assert src.driver == 'GTiff'
|
|
assert src.count == 3
|
|
assert src.dtypes == ('uint8', 'uint8', 'uint8')
|
|
assert src.read().shape == (3, 718, 791)
|
|
|
|
|
|
def test_file_object_read_variant(rgb_file_object):
|
|
"""An example of reading from a FilePath object"""
|
|
with rasterio.open(FilePath(rgb_file_object)) as src:
|
|
assert src.driver == 'GTiff'
|
|
assert src.count == 3
|
|
assert src.dtypes == ('uint8', 'uint8', 'uint8')
|
|
assert src.read().shape == (3, 718, 791)
|
|
|
|
|
|
def test_file_object_read_variant2(rgb_file_object):
|
|
"""An example of reading from a BytesIO object version of a file's contents."""
|
|
with rasterio.open(BytesIO(rgb_file_object.read())) as src:
|
|
assert src.driver == 'GTiff'
|
|
assert src.count == 3
|
|
assert src.dtypes == ('uint8', 'uint8', 'uint8')
|
|
assert src.read().shape == (3, 718, 791)
|
|
|
|
|
|
def test_vrt_vsifile(data_dir, path_white_gemini_iv_vrt):
|
|
"""Successfully read an in-memory VRT"""
|
|
with open(path_white_gemini_iv_vrt) as vrtfile:
|
|
source = vrtfile.read()
|
|
source = source.replace(
|
|
'<SourceFilename relativeToVRT="1">389225main_sw_1965_1024.jpg</SourceFilename>',
|
|
f'<SourceFilename relativeToVRT="0">{data_dir}/389225main_sw_1965_1024.jpg</SourceFilename>',
|
|
)
|
|
source = BytesIO(source.encode("utf-8"))
|
|
|
|
with FilePath(source) as vsifile:
|
|
with vsifile.open() as src:
|
|
assert src.driver == 'VRT'
|
|
assert src.count == 3
|
|
assert src.dtypes == ('uint8', 'uint8', 'uint8')
|
|
assert src.read().shape == (3, 768, 1024)
|
|
|
|
|
|
@pytest.mark.xfail(reason="Copying is not supported by FilePath")
|
|
def test_vsifile_copyfiles(path_rgb_msk_byte_tif):
|
|
"""Multiple files can be copied to a FilePath using copyfiles"""
|
|
with rasterio.open(path_rgb_msk_byte_tif) as src:
|
|
src_basename = os.path.basename(src.name)
|
|
with FilePath(dirname="foo", filename=src_basename) as vsifile:
|
|
copyfiles(src.name, vsifile.name)
|
|
with vsifile.open() as rgb2:
|
|
assert sorted(rgb2.files) == sorted(
|
|
[f"/vsimem/foo/{src_basename}", f"/vsimem/foo/{src_basename}.msk"]
|
|
)
|
|
|
|
|
|
@pytest.mark.xfail(reason="FilePath does not implement '.files' property properly.")
|
|
def test_multi_vsifile(path_rgb_msk_byte_tif):
|
|
"""Multiple files can be copied to a FilePath using copyfiles"""
|
|
with open(path_rgb_msk_byte_tif, "rb") as tif_fp, open(
|
|
path_rgb_msk_byte_tif + ".msk", "rb"
|
|
) as msk_fp:
|
|
with FilePath(
|
|
tif_fp, dirname="bar", filename="foo.tif"
|
|
) as tifvsifile, FilePath(msk_fp, dirname="bar", filename="foo.tif.msk"):
|
|
with tifvsifile.open() as src:
|
|
assert sorted(os.path.basename(fn) for fn in src.files) == sorted(['foo.tif', 'foo.tif.msk'])
|
|
assert src.mask_flag_enums == ([MaskFlags.per_dataset],) * 3
|
|
|
|
|
|
def _open_geotiff(file_path):
|
|
with open(file_path, 'rb') as file_obj:
|
|
with rasterio.open(file_obj) as dataset:
|
|
dataset.read()
|
|
|
|
|
|
def test_concurrent(path_rgb_byte_tif, path_rgb_lzw_byte_tif, path_cogeo_tif, path_alpha_tif):
|
|
"""Test multiple threads opening multiple files at the same time."""
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
tifs = [path_rgb_byte_tif, path_rgb_lzw_byte_tif, path_cogeo_tif, path_alpha_tif] * 4
|
|
with ThreadPoolExecutor(max_workers=8) as exe:
|
|
list(exe.map(_open_geotiff, tifs, timeout=5))
|
|
|
|
|
|
def test_python_file_reuse():
|
|
"""Test that we can reuse a Python file, see gh-2550."""
|
|
ascii_raster_string = """ncols 5
|
|
nrows 5
|
|
xllcorner 440720.000000000000
|
|
yllcorner 3750120.000000000000
|
|
cellsize 60.000000000000
|
|
nodata_value -99999
|
|
107 123 132 115 132
|
|
115 132 107 123 148
|
|
115 132 140 132 123
|
|
148 132 123 123 115
|
|
132 156 132 140 132
|
|
"""
|
|
ascii_raster_io = BytesIO(ascii_raster_string.encode("utf-8"))
|
|
|
|
with rasterio.open(ascii_raster_io) as rds:
|
|
_ = rds.bounds
|
|
|
|
with rasterio.open(ascii_raster_io) as rds:
|
|
_ = rds.bounds
|
|
|
|
|
|
def test_quieter_vsi_plugin_notifications(caplog, path_rgb_byte_tif):
|
|
"""Expect no warning or error level log messages about .aux or .hdr files."""
|
|
with caplog.at_level(logging.WARNING):
|
|
with open(path_rgb_byte_tif, "rb") as f, FilePath(f) as vsi_file:
|
|
with vsi_file.open() as src:
|
|
_ = src.profile
|
|
|
|
assert "not found in virtual filesystem" not in caplog.text
|