Use fixture to create and delete test workspaces

This commit is contained in:
Cécile Vuilleumier 2025-10-29 11:15:14 +01:00 committed by Gabriel Roldan
parent b3f10e669b
commit e1ee3537ad
13 changed files with 203 additions and 229 deletions

View File

View File

@ -41,20 +41,19 @@ def db_session(engine):
@pytest.fixture(scope="module")
def geoserver():
geoserver = GeoServerCloud(GEOSERVER_URL)
geoserver.recreate_workspace(WORKSPACE, set_default_workspace=True)
geoserver.create_pg_datastore(
workspace_name=WORKSPACE,
datastore_name=DATASTORE,
pg_host=PGHOST,
pg_port=PGPORT,
pg_db=PGDATABASE,
pg_user=PGUSER,
pg_password=PGPASSWORD,
pg_schema=PGSCHEMA,
set_default_datastore=True,
)
geoserver.publish_workspace(WORKSPACE)
yield geoserver
geoserver.delete_workspace(WORKSPACE)
def geoserver_factory(request):
"""
Factory fixture to create a GeoServerCloud instance with a dedicated workspace.
Cleanup (workspace deletion) is handled automatically.
"""
def _create(workspace_name):
geoserver = GeoServerCloud(url=GEOSERVER_URL)
geoserver.create_workspace(workspace_name, set_default_workspace=True)
geoserver.publish_workspace(workspace_name)
# Register cleanup for this workspace
request.addfinalizer(lambda: geoserver.delete_workspace(workspace_name))
return geoserver
return _create

View File

View File

@ -1,9 +1,5 @@
import json
import pytest
from conftest import GEOSERVER_URL
from geoservercloud import GeoServerCloud
WORKSPACE = "test_cascade"
WMS_STORE = "test_cascaded_wms_store"
WMS_URL = "https://wms.geo.admin.ch/?SERVICE=WMS&VERSION=1.3.0&REQUEST=GetCapabilities"
@ -13,16 +9,8 @@ WMTS_URL = "https://wmts.geo.admin.ch/EPSG/4326/1.0.0/WMTSCapabilities.xml"
WMTS_LAYER = "ch.swisstopo.pixelkarte-grau"
@pytest.fixture(scope="module")
def geoserver():
geoserver = GeoServerCloud(url=GEOSERVER_URL)
geoserver.create_workspace(WORKSPACE, set_default_workspace=True)
geoserver.publish_workspace(WORKSPACE)
yield geoserver
geoserver.delete_workspace(WORKSPACE)
def test_cascaded_wms(geoserver):
def test_cascaded_wms(geoserver_factory):
geoserver = geoserver_factory(WORKSPACE)
format = "image/jpeg"
# Create WMS store
@ -75,7 +63,8 @@ def test_cascaded_wms(geoserver):
assert status == 200
def test_cascaded_wmts(geoserver):
def test_cascaded_wmts(geoserver_factory):
geoserver = geoserver_factory(WORKSPACE)
format = "image/jpeg"
# Create WMTS store

View File

@ -1,29 +1,19 @@
import pytest
from geoservercloud import GeoServerCloud
from conftest import GEOSERVER_URL
WORKSPACE = "cog"
@pytest.fixture(scope="function")
def geoserver():
geoserver = GeoServerCloud(url=GEOSERVER_URL)
geoserver.create_workspace(WORKSPACE, set_default_workspace=True)
yield geoserver
geoserver.delete_workspace(WORKSPACE)
def test_create_cog_coverage(geoserver):
def test_create_cog_coverage(geoserver_factory):
"""Test creating a COG coverage store and coverage"""
workspace = "cog"
store_name = "land_shallow_topo_21600_NW_cog"
coverage_name = "land_shallow_topo_NW"
geoserver = geoserver_factory(workspace)
# Create COG coverage store
store_xml = f"""<coverageStore>
<name>{store_name}</name>
<type>GeoTIFF</type>
<enabled>true</enabled>
<workspace><name>{WORKSPACE}</name></workspace>
<workspace><name>{workspace}</name></workspace>
<url>cog://https://test-data-cog-public.s3.amazonaws.com/public/land_shallow_topo_21600_NW_cog.tif</url>
<metadata>
<entry key="CogSettings.Key">
@ -38,7 +28,7 @@ def test_create_cog_coverage(geoserver):
endpoints = geoserver.rest_service.rest_endpoints
response = rest_client.post(
endpoints.coveragestores(WORKSPACE),
endpoints.coveragestores(workspace),
data=store_xml,
headers={"Content-Type": "application/xml"},
)
@ -51,20 +41,20 @@ def test_create_cog_coverage(geoserver):
</coverage>"""
response = rest_client.post(
endpoints.coverages(WORKSPACE, store_name),
endpoints.coverages(workspace, store_name),
data=coverage_xml,
headers={"Content-Type": "application/xml"},
)
assert response.status_code == 201
# Verify the coverage was created - try listing coverages first
list_response = rest_client.get(endpoints.coverages(WORKSPACE, store_name))
list_response = rest_client.get(endpoints.coverages(workspace, store_name))
assert (
list_response.status_code == 200
), f"Failed to get coverages: {list_response.status_code} - {list_response.text}"
# Check specific coverage
response = rest_client.get(endpoints.coverage(WORKSPACE, store_name, coverage_name))
response = rest_client.get(endpoints.coverage(workspace, store_name, coverage_name))
assert response.status_code == 200
# Verify coverage properties
@ -75,7 +65,7 @@ def test_create_cog_coverage(geoserver):
# Test WMS GetMap request
wms_response = rest_client.get(
f"/wms?SERVICE=WMS&VERSION=1.1.0&REQUEST=GetMap&LAYERS={WORKSPACE}:{coverage_name}&STYLES=&BBOX=-180,-90,180,90&WIDTH=256&HEIGHT=256&FORMAT=image/jpeg&SRS=EPSG:4326"
f"/wms?SERVICE=WMS&VERSION=1.1.0&REQUEST=GetMap&LAYERS={workspace}:{coverage_name}&STYLES=&BBOX=-180,-90,180,90&WIDTH=256&HEIGHT=256&FORMAT=image/jpeg&SRS=EPSG:4326"
)
assert wms_response.status_code == 200
assert wms_response.headers.get("content-type").startswith("image/jpeg")

View File

@ -1,7 +1,4 @@
from geoservercloud import GeoServerCloud
import pytest
from conftest import (
GEOSERVER_URL,
from tests.conftest import (
PGHOST,
PGPORT,
PGDATABASE,
@ -13,18 +10,12 @@ from conftest import (
WORKSPACE = "test_pg_datastore"
@pytest.fixture(scope="function")
def geoserver():
geoserver = GeoServerCloud(url=GEOSERVER_URL)
geoserver.create_workspace(WORKSPACE, set_default_workspace=True)
yield geoserver
geoserver.delete_workspace(WORKSPACE)
def test_create_get_and_delete_datastore(geoserver):
def test_create_get_and_delete_datastore(geoserver_factory):
workspace = "test_pg_datastore"
datastore = "test_pg_datastore"
geoserver = geoserver_factory(workspace)
content, code = geoserver.create_pg_datastore(
workspace_name=WORKSPACE,
workspace_name=workspace,
datastore_name=datastore,
pg_host=PGHOST,
pg_port=PGPORT,
@ -36,6 +27,6 @@ def test_create_get_and_delete_datastore(geoserver):
)
assert content == datastore
assert code == 201
content, code = geoserver.get_pg_datastore(WORKSPACE, datastore)
content, code = geoserver.get_pg_datastore(workspace, datastore)
assert content.get("name") == datastore
assert code == 200

View File

@ -6,8 +6,8 @@ WMTS_LAYER = "ch.swisstopo.swissimage"
@pytest.fixture(scope="module")
def geoserver_with_gwc_layers(geoserver):
geoserver.create_workspace(WORKSPACE)
def geoserver_with_gwc_layers(geoserver_factory):
geoserver = geoserver_factory(WORKSPACE)
geoserver.create_wmts_store(
WORKSPACE,
WMTS_STORE,
@ -17,7 +17,6 @@ def geoserver_with_gwc_layers(geoserver):
geoserver.publish_gwc_layer(WORKSPACE, WMTS_LAYER)
yield geoserver
geoserver.delete_gwc_layer(WORKSPACE, WMTS_LAYER)
geoserver.delete_workspace(WORKSPACE)
def test_tile_cache(geoserver_with_gwc_layers):

View File

@ -1,11 +1,13 @@
#!/bin/env python
import pytest
from conftest import RESOURCE_DIR, WORKSPACE
from lib.utils import compare_images, write_actual_image
from tests.conftest import RESOURCE_DIR
from tests.lib.utils import compare_images, write_actual_image
from requests.exceptions import JSONDecodeError
from sqlalchemy.sql import text
WORKSPACE = "test_i18n_workspace"
def international_title(default=True, de=True, fr=True, it=True, rm=True):
title = {}
@ -38,6 +40,23 @@ def assert_legend(geoserver, style, language, expected_label):
assert False
@pytest.fixture(scope="module")
def geoserver(geoserver_factory):
geoserver = geoserver_factory(WORKSPACE)
geoserver.create_pg_datastore(
workspace_name=WORKSPACE,
datastore_name="i18n_datastore",
pg_host="geodatabase",
pg_port=5432,
pg_db="acceptance",
pg_user="geoserver",
pg_password="geoserver",
pg_schema="test1",
set_default_datastore=True,
)
yield geoserver
@pytest.fixture(scope="module")
def geoserver_with_i18n_layers(geoserver):
@ -68,7 +87,6 @@ def geoserver_default_locale_it(geoserver_with_i18n_layers):
@pytest.fixture(scope="module")
def geoserver_i18n_legend_layer(geoserver):
geoserver.create_workspace(WORKSPACE, set_default_workspace=True)
geoserver.create_feature_type("i18n_legend", epsg=2056)
geoserver.create_style_from_file(
"localized_with_default",

View File

@ -10,25 +10,16 @@ Tests various workflows for creating ImageMosaic stores and layers:
All tests use sample data from a shared mount volume at /opt/geoserver_data
that is accessible to both the test environment and GeoServer containers.
"""
import os
import tempfile
import zipfile
from pathlib import Path
import pytest
from geoservercloud import GeoServerCloud
from conftest import GEOSERVER_URL
def test_create_imagemosaic_local_files():
def test_create_imagemosaic_local_files(geoserver_factory):
"""Test creating an ImageMosaic using local sample data files via direct directory approach"""
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "local_sampledata"
store_name = "ne_pyramid_store"
# Delete and recreate workspace
geoserver.delete_workspace(workspace)
_, status = geoserver.create_workspace(workspace)
assert status == 201
geoserver = geoserver_factory(workspace)
# Use direct directory approach (like web UI) instead of individual file URLs
directory_path = "/opt/geoserver_data/sampledata/ne/pyramid/"
@ -53,7 +44,8 @@ def test_create_imagemosaic_local_files():
# Extract the auto-discovered coverage name
response_text = response.text
import re
coverage_match = re.search(r'<coverageName>([^<]+)</coverageName>', response_text)
coverage_match = re.search(r"<coverageName>([^<]+)</coverageName>", response_text)
assert coverage_match, f"No coverage found in response: {response_text}"
coverage_name = coverage_match.group(1)
@ -81,12 +73,16 @@ def test_create_imagemosaic_local_files():
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages",
data=coverage_xml,
headers={"Content-Type": "text/xml"}
headers={"Content-Type": "text/xml"},
)
assert response.status_code == 201, f"Failed to create coverage: {response.text}"
assert (
response.status_code == 201
), f"Failed to create coverage: {response.text}"
# Verify the coverage was created
response = geoserver.rest_service.rest_client.get(f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages/{coverage_name}.json")
response = geoserver.rest_service.rest_client.get(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages/{coverage_name}.json"
)
assert response.status_code == 200
coverage_data = response.json()["coverage"]
@ -101,21 +97,20 @@ def test_create_imagemosaic_local_files():
assert wms_response.status_code == 200, f"WMS GetMap failed: {wms_response.text}"
assert wms_response.headers.get("content-type").startswith("image/png")
# Cleanup
geoserver.delete_workspace(workspace)
# Delete coverage store
response = geoserver.rest_service.rest_client.delete(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}?recurse=true"
)
assert (
response.status_code == 200
), f"Failed to delete coverage store: {response.text}"
def test_create_imagemosaic_manual_granules():
"""Test creating an ImageMosaic by manually adding individual granules"""
geoserver = GeoServerCloud(GEOSERVER_URL)
def test_create_imagemosaic_manual_granules(geoserver_factory):
workspace = "manual_granules"
store_name = "manual_granules_store"
coverage_name = "manual_granules_coverage"
# Delete and recreate workspace
geoserver.delete_workspace(workspace)
_, status = geoserver.create_workspace(workspace)
assert status == 201
geoserver = geoserver_factory(workspace)
# Create temporary directory for mosaic configuration
with tempfile.TemporaryDirectory() as tmp_dir:
@ -152,27 +147,29 @@ preparedStatements=false
# Create ZIP file with both configuration files
zip_file = tmp_path / "manual-granules-config.zip"
with zipfile.ZipFile(zip_file, 'w') as zf:
with zipfile.ZipFile(zip_file, "w") as zf:
zf.write(indexer_file, "indexer.properties")
zf.write(datastore_file, "datastore.properties")
# Create empty ImageMosaic store
with open(zip_file, 'rb') as f:
with open(zip_file, "rb") as f:
zip_data = f.read()
response = geoserver.rest_service.rest_client.put(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/file.imagemosaic?configure=none",
data=zip_data,
headers={"Content-Type": "application/zip"}
headers={"Content-Type": "application/zip"},
)
assert response.status_code == 201, f"Failed to create ImageMosaic store: {response.text}"
assert (
response.status_code == 201
), f"Failed to create ImageMosaic store: {response.text}"
# Manually add individual granules from the sample data
granule_paths = [
"/opt/geoserver_data/sampledata/ne/pyramid/NE1_LR_LC_SR_W_DR_1_1.tif",
"/opt/geoserver_data/sampledata/ne/pyramid/NE1_LR_LC_SR_W_DR_1_2.tif",
"/opt/geoserver_data/sampledata/ne/pyramid/NE1_LR_LC_SR_W_DR_2_1.tif",
"/opt/geoserver_data/sampledata/ne/pyramid/NE1_LR_LC_SR_W_DR_2_2.tif"
"/opt/geoserver_data/sampledata/ne/pyramid/NE1_LR_LC_SR_W_DR_2_2.tif",
]
for granule_path in granule_paths:
@ -180,9 +177,12 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/external.imagemosaic",
data=granule_path,
headers={"Content-Type": "text/plain"}
headers={"Content-Type": "text/plain"},
)
assert response.status_code in [201, 202], f"Failed to add granule {granule_path}: {response.text}"
assert response.status_code in [
201,
202,
], f"Failed to add granule {granule_path}: {response.text}"
# Initialize the store (list available coverages)
response = geoserver.rest_service.rest_client.get(
@ -192,8 +192,9 @@ preparedStatements=false
# Verify coverage name is available
response_text = response.text
assert f"<coverageName>{coverage_name}</coverageName>" in response_text, \
f"Coverage name '{coverage_name}' not found in response: {response_text}"
assert (
f"<coverageName>{coverage_name}</coverageName>" in response_text
), f"Coverage name '{coverage_name}' not found in response: {response_text}"
# Create layer/coverage
coverage_xml = f"""<coverage>
@ -206,7 +207,7 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages",
data=coverage_xml,
headers={"Content-Type": "text/xml"}
headers={"Content-Type": "text/xml"},
)
assert response.status_code == 201, f"Failed to create coverage: {response.text}"
@ -214,7 +215,9 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.get(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages/{coverage_name}.json"
)
assert response.status_code == 200, f"Failed to get coverage details: {response.text}"
assert (
response.status_code == 200
), f"Failed to get coverage details: {response.text}"
coverage_data = response.json()["coverage"]
assert coverage_data["name"] == coverage_name
@ -228,26 +231,16 @@ preparedStatements=false
assert wms_response.status_code == 200, f"WMS GetMap failed: {wms_response.text}"
assert wms_response.headers.get("content-type").startswith("image/png")
# Cleanup
geoserver.delete_workspace(workspace)
def test_create_imagemosaic_empty_store_with_directory_harvest():
def test_create_imagemosaic_empty_store_with_directory_harvest(geoserver_factory):
"""
Test creating an empty ImageMosaic store first, then harvesting granules from a directory.
This tests the workflow: create store -> harvest directory -> create layer.
"""
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "directory_harvest"
store_name = "directory_harvest_store"
coverage_name = "directory_harvest_coverage"
# Clean up any existing workspace
geoserver.delete_workspace(workspace)
# Step 1: Create workspace
content, status = geoserver.create_workspace(workspace)
assert status == 201, f"Failed to create workspace: {content}"
geoserver = geoserver_factory(workspace)
# Step 2: Create ImageMosaic store with configuration
with tempfile.TemporaryDirectory() as tmp_dir:
@ -284,20 +277,22 @@ preparedStatements=false
# Create ZIP file with both configuration files
zip_file = tmp_path / "mosaic-config.zip"
with zipfile.ZipFile(zip_file, 'w') as zf:
with zipfile.ZipFile(zip_file, "w") as zf:
zf.write(indexer_file, "indexer.properties")
zf.write(datastore_file, "datastore.properties")
# Upload ZIP to create empty ImageMosaic store
with open(zip_file, 'rb') as f:
with open(zip_file, "rb") as f:
zip_data = f.read()
response = geoserver.rest_service.rest_client.put(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/file.imagemosaic?configure=none",
data=zip_data,
headers={"Content-Type": "application/zip"}
headers={"Content-Type": "application/zip"},
)
assert response.status_code == 201, f"Failed to create ImageMosaic store: {response.text}"
assert (
response.status_code == 201
), f"Failed to create ImageMosaic store: {response.text}"
# Step 3: Harvest granules from directory
harvest_path = "/opt/geoserver_data/sampledata/ne/pyramid/"
@ -305,9 +300,12 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/external.imagemosaic",
data=harvest_path,
headers={"Content-Type": "text/plain"}
headers={"Content-Type": "text/plain"},
)
assert response.status_code in [201, 202], f"Failed to harvest directory {harvest_path}: {response.text}"
assert response.status_code in [
201,
202,
], f"Failed to harvest directory {harvest_path}: {response.text}"
# Step 4: List available coverages
response = geoserver.rest_service.rest_client.get(
@ -317,8 +315,9 @@ preparedStatements=false
# Verify coverage name is available
response_text = response.text
assert f"<coverageName>{coverage_name}</coverageName>" in response_text, \
f"Coverage name '{coverage_name}' not found in response: {response_text}"
assert (
f"<coverageName>{coverage_name}</coverageName>" in response_text
), f"Coverage name '{coverage_name}' not found in response: {response_text}"
# Step 5: Create layer/coverage
coverage_xml = f"""<coverage>
@ -331,7 +330,7 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages",
data=coverage_xml,
headers={"Content-Type": "text/xml"}
headers={"Content-Type": "text/xml"},
)
assert response.status_code == 201, f"Layer creation failed: {response.text}"
@ -339,7 +338,9 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.get(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages/{coverage_name}.json"
)
assert response.status_code == 200, f"Failed to get coverage details: {response.text}"
assert (
response.status_code == 200
), f"Failed to get coverage details: {response.text}"
coverage_data = response.json()["coverage"]
assert coverage_data["name"] == coverage_name
@ -354,26 +355,16 @@ preparedStatements=false
assert wms_response.status_code == 200, f"WMS GetMap failed: {wms_response.text}"
assert wms_response.headers.get("content-type").startswith("image/png")
# Cleanup
geoserver.delete_workspace(workspace)
def test_create_imagemosaic_empty_store_with_single_file_harvest():
def test_create_imagemosaic_empty_store_with_single_file_harvest(geoserver_factory):
"""
Test creating an empty ImageMosaic store first, then harvesting a single file.
This tests the workflow: create store -> harvest single file -> create layer.
"""
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "single_file_harvest"
store_name = "single_file_harvest_store"
coverage_name = "single_file_harvest_coverage"
# Clean up any existing workspace
geoserver.delete_workspace(workspace)
# Step 1: Create workspace
content, status = geoserver.create_workspace(workspace)
assert status == 201, f"Failed to create workspace: {content}"
geoserver = geoserver_factory(workspace)
# Step 2: Create ImageMosaic store
with tempfile.TemporaryDirectory() as tmp_dir:
@ -410,20 +401,22 @@ preparedStatements=false
# Create ZIP file with both files
zip_file = tmp_path / "mosaic-single-config.zip"
with zipfile.ZipFile(zip_file, 'w') as zf:
with zipfile.ZipFile(zip_file, "w") as zf:
zf.write(indexer_file, "indexer.properties")
zf.write(datastore_file, "datastore.properties")
# Upload ZIP to create ImageMosaic store
with open(zip_file, 'rb') as f:
with open(zip_file, "rb") as f:
zip_data = f.read()
response = geoserver.rest_service.rest_client.put(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/file.imagemosaic?configure=none",
data=zip_data,
headers={"Content-Type": "application/zip"}
headers={"Content-Type": "application/zip"},
)
assert response.status_code == 201, f"Failed to create ImageMosaic store: {response.text}"
assert (
response.status_code == 201
), f"Failed to create ImageMosaic store: {response.text}"
# Step 3: Harvest single file
single_file_path = "/opt/geoserver_data/sampledata/ne/NE1_LR_LC_SR_W_DR.tif"
@ -431,9 +424,12 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/external.imagemosaic",
data=single_file_path,
headers={"Content-Type": "text/plain"}
headers={"Content-Type": "text/plain"},
)
assert response.status_code in [201, 202], f"Failed to harvest file {single_file_path}: {response.text}"
assert response.status_code in [
201,
202,
], f"Failed to harvest file {single_file_path}: {response.text}"
# Step 4: List and create layer
response = geoserver.rest_service.rest_client.get(
@ -452,7 +448,7 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages",
data=coverage_xml,
headers={"Content-Type": "text/xml"}
headers={"Content-Type": "text/xml"},
)
assert response.status_code == 201, f"Layer creation failed: {response.text}"
@ -464,25 +460,15 @@ preparedStatements=false
assert wms_response.status_code == 200, f"WMS GetMap failed: {wms_response.text}"
assert wms_response.headers.get("content-type").startswith("image/png")
# Cleanup
geoserver.delete_workspace(workspace)
def test_create_imagemosaic_via_xml_store_creation():
def test_create_imagemosaic_via_xml_store_creation(geoserver_factory):
"""
Test creating an ImageMosaic store via XML store creation (not file upload).
This tests direct store creation pointing to a directory.
"""
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "xml_store_creation"
store_name = "xml_store_creation_store"
# Clean up any existing workspace
geoserver.delete_workspace(workspace)
# Step 1: Create workspace
content, code = geoserver.create_workspace(workspace)
assert code == 201, f"Failed to create workspace: {content}"
geoserver = geoserver_factory(workspace)
# Step 2: Create ImageMosaic store via XML store creation
store_xml = f"""<coverageStore>
@ -498,20 +484,25 @@ def test_create_imagemosaic_via_xml_store_creation():
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores",
data=store_xml,
headers={"Content-Type": "text/xml"}
headers={"Content-Type": "text/xml"},
)
assert response.status_code == 201, f"Store creation via XML failed: {response.text}"
assert (
response.status_code == 201
), f"Store creation via XML failed: {response.text}"
# Step 3: List available coverages
response = geoserver.rest_service.rest_client.get(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages.xml?list=all"
)
assert response.status_code == 200, f"Failed to list coverages: {response.text}"
assert "coverageName" in response.text, f"No coverage found in response: {response.text}"
assert (
"coverageName" in response.text
), f"No coverage found in response: {response.text}"
# Extract coverage name
import re
coverage_match = re.search(r'<coverageName>([^<]+)</coverageName>', response.text)
coverage_match = re.search(r"<coverageName>([^<]+)</coverageName>", response.text)
assert coverage_match, f"Could not extract coverage name from: {response.text}"
coverage_name = coverage_match.group(1)
@ -526,7 +517,7 @@ def test_create_imagemosaic_via_xml_store_creation():
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{store_name}/coverages",
data=coverage_xml,
headers={"Content-Type": "text/xml"}
headers={"Content-Type": "text/xml"},
)
assert response.status_code == 201, f"Layer creation failed: {response.text}"
@ -537,6 +528,3 @@ def test_create_imagemosaic_via_xml_store_creation():
)
assert wms_response.status_code == 200, f"WMS GetMap failed: {wms_response.text}"
assert wms_response.headers.get("content-type").startswith("image/png")
# Cleanup
geoserver.delete_workspace(workspace)

View File

@ -1,18 +1,18 @@
import os
import tempfile
import zipfile
from pathlib import Path
import pytest
from geoservercloud import GeoServerCloud
from conftest import GEOSERVER_URL
def _create_imagemosaic(geoserver, workspace, coverage, granules, indexer_content, title="ImageMosaic Coverage"):
def _create_imagemosaic(
geoserver,
workspace,
coverage,
granules,
indexer_content,
title="ImageMosaic Coverage",
):
"""Helper function to create an ImageMosaic with COG granules"""
# Delete and recreate workspace
geoserver.delete_workspace(workspace)
_, status = geoserver.create_workspace(workspace)
assert status == 201
# Create temporary directory for mosaic files
with tempfile.TemporaryDirectory() as tmp_dir:
@ -42,10 +42,10 @@ preparedStatements=false
# Create zip file
zip_file = tmp_path / f"{coverage}.zip"
with zipfile.ZipFile(zip_file, 'w') as zf:
with zipfile.ZipFile(zip_file, "w") as zf:
zf.write(indexer_file, "indexer.properties")
zf.write(datastore_file, "datastore.properties")
# Create timeregex.properties if needed for time-based PropertyCollector
if "timeregex" in indexer_content:
# Regex pattern to extract date from MODIS filename format: 2018.01.01
@ -55,13 +55,13 @@ preparedStatements=false
zf.write(timeregex_file, "timeregex.properties")
# Create empty imagemosaic
with open(zip_file, 'rb') as f:
with open(zip_file, "rb") as f:
zip_data = f.read()
response = geoserver.rest_service.rest_client.put(
f"/rest/workspaces/{workspace}/coveragestores/{coverage}/file.imagemosaic?configure=none",
data=zip_data,
headers={"Content-Type": "application/zip"}
headers={"Content-Type": "application/zip"},
)
assert response.status_code == 201
@ -70,7 +70,7 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{coverage}/remote.imagemosaic",
data=uri,
headers={"Content-Type": "text/plain"}
headers={"Content-Type": "text/plain"},
)
# Accept both 202 (Accepted) and 201 (Created) as valid responses
assert response.status_code in [201, 202]
@ -96,12 +96,14 @@ preparedStatements=false
response = geoserver.rest_service.rest_client.post(
f"/rest/workspaces/{workspace}/coveragestores/{coverage}/coverages",
data=coverage_xml,
headers={"Content-Type": "text/xml"}
headers={"Content-Type": "text/xml"},
)
assert response.status_code == 201
# Verify the coverage was created
response = geoserver.rest_service.rest_client.get(f"/rest/workspaces/{workspace}/coveragestores/{coverage}/coverages/{coverage}.json")
response = geoserver.rest_service.rest_client.get(
f"/rest/workspaces/{workspace}/coveragestores/{coverage}/coverages/{coverage}.json"
)
assert response.status_code == 200
# Verify coverage properties
@ -121,11 +123,11 @@ preparedStatements=false
return coverage_data
def test_create_imagemosaic_landshallow_topo():
def test_create_imagemosaic_landshallow_topo(geoserver_factory):
"""Test creating an ImageMosaic coverage store with multiple COG granules"""
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "s3cog_public"
coverage = "land_shallow_topo_http"
geoserver = geoserver_factory(workspace)
# HTTP granules
granules = [
@ -142,18 +144,22 @@ Schema=*the_geom:Polygon,location:String
CanBeEmpty=true
Name={coverage}"""
_create_imagemosaic(geoserver, workspace, coverage, granules, indexer_content, "Land Shallow Topo HTTP")
# Cleanup
geoserver.delete_workspace(workspace)
_create_imagemosaic(
geoserver,
workspace,
coverage,
granules,
indexer_content,
"Land Shallow Topo HTTP",
)
@pytest.mark.skip(reason="Takes too long - enable for full testing")
def test_create_imagemosaic_modis():
def test_create_imagemosaic_modis(geoserver_factory):
"""Test creating a MODIS ImageMosaic coverage with time dimension (reproduces official tutorial)"""
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "modis_cog"
coverage = "modisvi"
geoserver = geoserver_factory(workspace)
# MODIS COG datasets from NASA EarthData
modis_granules = [
@ -169,7 +175,14 @@ Schema=*the_geom:Polygon,location:String,time:java.util.Date
CanBeEmpty=true
Name={coverage}"""
coverage_data = _create_imagemosaic(geoserver, workspace, coverage, modis_granules, indexer_content, "MODIS Vegetation Index")
coverage_data = _create_imagemosaic(
geoserver,
workspace,
coverage,
modis_granules,
indexer_content,
"MODIS Vegetation Index",
)
# Additional test for time-based query (since MODIS has time dimension)
time_wms_response = geoserver.rest_service.rest_client.get(
@ -177,8 +190,3 @@ Name={coverage}"""
)
assert time_wms_response.status_code == 200
assert time_wms_response.headers.get("content-type").startswith("image/png")
# Cleanup
#geoserver.delete_workspace(workspace)

View File

@ -1,16 +1,15 @@
from conftest import (
from tests.conftest import (
PGDATABASE,
PGHOST,
PGPASSWORD,
PGPORT,
PGSCHEMA,
PGUSER,
RESOURCE_DIR,
)
def test_wfs(geoserver):
def test_wfs(geoserver_factory):
workspace = datastore = feature_type = "test_wfs"
geoserver = geoserver_factory(workspace)
attributes = {
"geom": {
"type": "Point",
@ -29,13 +28,11 @@ def test_wfs(geoserver):
"required": False,
},
}
_, code = geoserver.create_workspace(workspace, set_default_workspace=True)
assert code == 201
_, code = geoserver.create_pg_datastore(
workspace_name=workspace,
datastore_name=datastore,
pg_host=PGHOST,
pg_port=PGPORT,
pg_host="geodatabase",
pg_port=5432,
pg_db=PGDATABASE,
pg_user=PGUSER,
pg_password=PGPASSWORD,
@ -74,6 +71,3 @@ def test_wfs(geoserver):
"type": "name",
"properties": {"name": "urn:ogc:def:crs:EPSG::2056"},
}
_, code = geoserver.delete_workspace(workspace)
assert code == 200

View File

@ -1,26 +1,24 @@
import json
from conftest import (
from tests.conftest import (
PGDATABASE,
PGHOST,
PGPASSWORD,
PGPORT,
PGSCHEMA,
PGUSER,
RESOURCE_DIR,
)
from lib.utils import compare_images, write_actual_image
from tests.lib.utils import compare_images, write_actual_image
from sqlalchemy.sql import text
def test_create_and_feature_type_and_get_map(db_session, geoserver):
def test_create_and_feature_type_and_get_map(db_session, geoserver_factory):
workspace = datastore = feature_type = "test_create_feature_type"
geoserver.create_workspace(workspace, set_default_workspace=True)
geoserver = geoserver_factory(workspace)
geoserver.create_pg_datastore(
workspace_name=workspace,
datastore_name=datastore,
pg_host=PGHOST,
pg_port=PGPORT,
pg_host="geodatabase",
pg_port=5432,
pg_db=PGDATABASE,
pg_user=PGUSER,
pg_password=PGPASSWORD,
@ -54,11 +52,10 @@ def test_create_and_feature_type_and_get_map(db_session, geoserver):
write_actual_image(response, file_root)
compare_images(RESOURCE_DIR, file_root)
geoserver.delete_workspace(workspace)
def test_get_feature_info(db_session, geoserver):
def test_get_feature_info(db_session, geoserver_factory):
workspace = datastore = feature_type = "test_get_feature_info"
geoserver = geoserver_factory(workspace)
attributes = {
"geom": {
"type": "Point",
@ -69,13 +66,11 @@ def test_get_feature_info(db_session, geoserver):
"required": False,
},
}
_, status = geoserver.create_workspace(workspace, set_default_workspace=True)
assert status == 201
_, status = geoserver.create_pg_datastore(
workspace_name=workspace,
datastore_name=datastore,
pg_host=PGHOST,
pg_port=PGPORT,
pg_host="geodatabase",
pg_port=5432,
pg_db=PGDATABASE,
pg_user=PGUSER,
pg_password=PGPASSWORD,
@ -123,6 +118,3 @@ def test_get_feature_info(db_session, geoserver):
"type": "name",
"properties": {"name": "urn:ogc:def:crs:EPSG::2056"},
}
_, status = geoserver.delete_workspace(workspace)
assert status == 200

View File

@ -1,4 +1,9 @@
def test_create_get_and_delete_workspace(geoserver):
from tests.conftest import GEOSERVER_URL
from geoservercloud import GeoServerCloud
def test_create_get_and_delete_workspace():
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "test_create_workspace"
content, status = geoserver.create_workspace(workspace)
assert content == workspace
@ -12,7 +17,8 @@ def test_create_get_and_delete_workspace(geoserver):
assert status == 200
def test_update_workspace(geoserver):
def test_update_workspace():
geoserver = GeoServerCloud(GEOSERVER_URL)
workspace = "update_workspace"
content, status = geoserver.create_workspace(workspace, isolated=True)
content, status = geoserver.get_workspace(workspace)