rasterio/tests/test_rio_merge.py

309 lines
9.3 KiB
Python

import sys
import os
import logging
import click
import numpy
from click.testing import CliRunner
from pytest import fixture
import rasterio
from rasterio.rio.merge import merge
from rasterio.transform import Affine
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
# Fixture to create test datasets within temporary directory
@fixture(scope='function')
def test_data_dir_1(tmpdir):
kwargs = {
"crs": {'init': 'epsg:4326'},
"transform": (-114, 0.2, 0, 46, 0, -0.2),
"count": 1,
"dtype": rasterio.uint8,
"driver": "GTiff",
"width": 10,
"height": 10,
"nodata": 1
}
with rasterio.drivers():
with rasterio.open(str(tmpdir.join('a.tif')), 'w', **kwargs) as dst:
data = numpy.ones((10, 10), dtype=rasterio.uint8)
data[0:6, 0:6] = 255
dst.write_band(1, data)
with rasterio.open(str(tmpdir.join('b.tif')), 'w', **kwargs) as dst:
data = numpy.ones((10, 10), dtype=rasterio.uint8)
data[4:8, 4:8] = 254
dst.write_band(1, data)
return tmpdir
@fixture(scope='function')
def test_data_dir_2(tmpdir):
kwargs = {
"crs": {'init': 'epsg:4326'},
"transform": (-114, 0.2, 0, 46, 0, -0.1),
"count": 1,
"dtype": rasterio.uint8,
"driver": "GTiff",
"width": 10,
"height": 10
# these files have undefined nodata.
}
with rasterio.drivers():
with rasterio.open(str(tmpdir.join('a.tif')), 'w', **kwargs) as dst:
data = numpy.zeros((10, 10), dtype=rasterio.uint8)
data[0:6, 0:6] = 255
dst.write_band(1, data)
with rasterio.open(str(tmpdir.join('b.tif')), 'w', **kwargs) as dst:
data = numpy.zeros((10, 10), dtype=rasterio.uint8)
data[4:8, 4:8] = 254
dst.write_band(1, data)
return tmpdir
def test_merge_with_nodata(test_data_dir_1):
outputname = str(test_data_dir_1.join('merged.tif'))
inputs = [str(x) for x in test_data_dir_1.listdir()]
inputs.sort()
runner = CliRunner()
result = runner.invoke(merge, inputs + [outputname])
assert result.exit_code == 0
assert os.path.exists(outputname)
with rasterio.open(outputname) as out:
assert out.count == 1
data = out.read_band(1, masked=False)
expected = numpy.ones((10, 10), dtype=rasterio.uint8)
expected[0:6, 0:6] = 255
expected[4:8, 4:8] = 254
assert numpy.all(data == expected)
def test_merge_warn(test_data_dir_1):
outputname = str(test_data_dir_1.join('merged.tif'))
inputs = [str(x) for x in test_data_dir_1.listdir()]
inputs.sort()
runner = CliRunner()
result = runner.invoke(merge, inputs + [outputname] + ['-nd', '-1'])
assert result.exit_code == 0
assert "using the --nodata option for better results" in result.output
def test_merge_without_nodata(test_data_dir_2):
outputname = str(test_data_dir_2.join('merged.tif'))
inputs = [str(x) for x in test_data_dir_2.listdir()]
inputs.sort()
runner = CliRunner()
result = runner.invoke(merge, inputs + [outputname])
assert result.exit_code == 0
assert os.path.exists(outputname)
with rasterio.open(outputname) as out:
assert out.count == 1
data = out.read_band(1, masked=False)
expected = numpy.zeros((10, 10), dtype=rasterio.uint8)
expected[0:6, 0:6] = 255
expected[4:8, 4:8] = 254
assert numpy.all(data == expected)
def test_merge_output_exists(tmpdir):
outputname = str(tmpdir.join('merged.tif'))
runner = CliRunner()
result = runner.invoke(
merge,
['tests/data/RGB.byte.tif', outputname])
assert result.exit_code == 0
result = runner.invoke(
merge,
['tests/data/RGB.byte.tif', outputname])
assert os.path.exists(outputname)
with rasterio.open(outputname) as out:
assert out.count == 3
def test_merge_output_exists_without_nodata(test_data_dir_2):
runner = CliRunner()
result = runner.invoke(
merge,
[str(test_data_dir_2.join('a.tif')),
str(test_data_dir_2.join('b.tif'))])
assert result.exit_code == 0
def test_merge_err():
runner = CliRunner()
result = runner.invoke(
merge,
['tests'])
assert result.exit_code == 1
def test_format_jpeg(tmpdir):
outputname = str(tmpdir.join('stacked.jpg'))
runner = CliRunner()
result = runner.invoke(
merge,
['tests/data/RGB.byte.tif', outputname, '--format', 'JPEG'])
assert result.exit_code == 0
assert os.path.exists(outputname)
# Non-coincident datasets test fixture.
# Two overlapping GeoTIFFs, one to the NW and one to the SE.
@fixture(scope='function')
def test_data_dir_overlapping(tmpdir):
kwargs = {
"crs": {'init': 'epsg:4326'},
"transform": (-114, 0.2, 0, 46, 0, -0.2),
"count": 1,
"dtype": rasterio.uint8,
"driver": "GTiff",
"width": 10,
"height": 10,
"nodata": 0
}
with rasterio.drivers():
with rasterio.open(str(tmpdir.join('nw.tif')), 'w', **kwargs) as dst:
data = numpy.ones((10, 10), dtype=rasterio.uint8)
dst.write_band(1, data)
kwargs['transform'] = (-113, 0.2, 0, 45, 0, -0.2)
with rasterio.open(str(tmpdir.join('se.tif')), 'w', **kwargs) as dst:
data = numpy.ones((10, 10), dtype=rasterio.uint8) * 2
dst.write_band(1, data)
return tmpdir
def test_merge_overlapping(test_data_dir_overlapping):
outputname = str(test_data_dir_overlapping.join('merged.tif'))
inputs = [str(x) for x in test_data_dir_overlapping.listdir()]
inputs.sort()
runner = CliRunner()
result = runner.invoke(merge, inputs + [outputname])
assert result.exit_code == 0
assert os.path.exists(outputname)
with rasterio.open(outputname) as out:
assert out.count == 1
assert out.shape == (15, 15)
assert out.bounds == (-114, 43, -111, 46)
data = out.read_band(1, masked=False)
expected = numpy.zeros((15, 15), dtype=rasterio.uint8)
expected[0:10, 0:10] = 1
expected[5:, 5:] = 2
assert numpy.all(data == expected)
# Fixture to create test datasets within temporary directory
@fixture(scope='function')
def test_data_dir_float(tmpdir):
kwargs = {
"crs": {'init': 'epsg:4326'},
"transform": (-114, 0.2, 0, 46, 0, -0.2),
"count": 1,
"dtype": rasterio.float64,
"driver": "GTiff",
"width": 10,
"height": 10,
"nodata": 0
}
with rasterio.drivers():
with rasterio.open(str(tmpdir.join('one.tif')), 'w', **kwargs) as dst:
data = numpy.zeros((10, 10), dtype=rasterio.float64)
data[0:6, 0:6] = 255
dst.write_band(1, data)
with rasterio.open(str(tmpdir.join('two.tif')), 'w', **kwargs) as dst:
data = numpy.zeros((10, 10), dtype=rasterio.float64)
data[4:8, 4:8] = 254
dst.write_band(1, data)
return tmpdir
def test_merge_float(test_data_dir_float):
outputname = str(test_data_dir_float.join('merged.tif'))
inputs = [str(x) for x in test_data_dir_float.listdir()]
inputs.sort()
runner = CliRunner()
result = runner.invoke(merge, inputs + [outputname] + ['-nd', '-1.5'])
assert result.exit_code == 0
assert os.path.exists(outputname)
with rasterio.open(outputname) as out:
assert out.count == 1
data = out.read_band(1, masked=False)
expected = numpy.ones((10, 10), dtype=rasterio.float64) * -1.5
expected[0:6, 0:6] = 255
expected[4:8, 4:8] = 254
assert numpy.all(data == expected)
# Test below comes from issue #288. There was an off-by-one error in
# pasting image data into the canvas array.
@fixture(scope='function')
def tiffs(tmpdir):
data = numpy.ones((1, 1, 1), 'uint8')
kwargs = {'count': '1',
'driver': 'GTiff',
'dtype': 'uint8',
'height': 1,
'width': 1}
kwargs['transform'] = Affine( 1, 0, 1,
0,-1, 1)
with rasterio.open(str(tmpdir.join('a-sw.tif')), 'w', **kwargs) as r:
r.write(data * 40)
kwargs['transform'] = Affine( 1, 0, 2,
0,-1, 2)
with rasterio.open(str(tmpdir.join('b-ct.tif')), 'w', **kwargs) as r:
r.write(data * 60)
kwargs['transform'] = Affine( 2, 0, 3,
0,-2, 4)
with rasterio.open(str(tmpdir.join('c-ne.tif')), 'w', **kwargs) as r:
r.write(data * 90)
kwargs['transform'] = Affine( 2, 0, 2,
0,-2, 4)
with rasterio.open(str(tmpdir.join('d-ne.tif')), 'w', **kwargs) as r:
r.write(data * 120)
return tmpdir
def test_merge_tiny(tiffs):
outputname = str(tiffs.join('merged.tif'))
inputs = [str(x) for x in tiffs.listdir()]
inputs.sort()
runner = CliRunner()
result = runner.invoke(merge, inputs + [outputname])
assert result.exit_code == 0
# Output should be
#
# [[ 0 120 120 90]
# [ 0 120 120 90]
# [ 0 60 0 0]
# [ 40 0 0 0]]
with rasterio.open(outputname) as src:
data = src.read()
assert (data[0][0:2,1:3] == 120).all()
assert (data[0][0:2,3] == 90).all()
assert data[0][2][1] == 60
assert data[0][3][0] == 40