#!/usr/bin/env python3 """Test for the ee.batch module.""" import json from typing import Any, Optional import unittest from unittest import mock import unittest import ee from ee import _state from ee import apitestcase from ee import batch from ee import data RUNNING_OPERATION = { 'metadata': { 'createTime': '1970-01-01T00:00:00.00Z', 'startTime': '1970-01-01T00:00:01.00Z', 'updateTime': '1970-01-01T00:01:00.00Z', 'description': 'FirstTestTask', 'state': 'RUNNING', 'type': 'EXPORT_IMAGE', 'destinationUris': ['https://test.com'], 'attempt': 42, 'priority': 100, }, 'done': False, 'name': 'projects/test-project/operations/TEST1', } SUCCEEDED_OPERATION = { 'metadata': { 'createTime': '1970-01-01T00:00:00.00Z', 'startTime': '1970-01-01T00:00:01.00Z', 'updateTime': '1970-01-01T00:01:00.00Z', 'description': 'Ingest image: "an/image"', 'state': 'SUCCEEDED', 'type': 'EXPORT_IMAGE', 'destinationUris': ['https://test.com'], 'attempt': 42, 'priority': 100, }, 'done': False, 'name': 'projects/test-project/operations/TEST2', } UNKNOWN_OPERATION = { 'metadata': { 'state': 'UNKNOWN', }, 'done': True, 'name': 'projects/test-project/operations/TEST2', } class TaskTest(unittest.TestCase): def setUp(self): super().setUp() mock.patch.object( _state, 'get_state', return_value=_state.EEState( initialized=True, cloud_api_user_project='test-project' ), ).start() def tearDown(self): super().tearDown() mock.patch.stopall() def test_start_without_config(self): task = batch.Task('an id', 'a task type', 'a state') self.assertIsNone(task.config) with self.assertRaisesRegex(ee.EEException, 'Task config'): task.start() def test_start_unknown_task_type(self): task_type = 'bad task type' task = batch.Task('an id', task_type, 'a state', {'some': 'value'}) with self.assertRaisesRegex( ee.EEException, f'Unknown Task type "{task_type}"' ): task.start() def test_status_with_id(self): name = 'projects/test-project/operations/test_1' task = batch.Task('an id', 'a task type', 'a state', name=name) with mock.patch.object( data, 'getOperation', return_value=RUNNING_OPERATION ) as m: self.assertEqual('RUNNING', task.status()['state']) self.assertEqual( m.call_args.args[0], 'projects/test-project/operations/test_1' ) def test_status_with_name(self): task = batch.Task( None, 'a task type', 'a state', name='projects/test-project/operations/test_1', ) with mock.patch.object( data, 'getOperation', return_value=RUNNING_OPERATION ) as m: self.assertEqual('RUNNING', task.status()['state']) self.assertEqual( m.call_args.args[0], 'projects/test-project/operations/test_1' ) def test_status_with_id_state_unknown(self): name = 'projects/test-project/operations/an id' task = batch.Task('an id', 'a task type', 'a state', name=name) with mock.patch.object( data, 'getOperation', return_value=UNKNOWN_OPERATION ) as m: self.assertEqual('UNSUBMITTED', task.status()['state']) self.assertEqual( m.call_args.args[0], 'projects/test-project/operations/an id' ) def test_status_without_id_or_name(self): task = batch.Task(None, 'a task type', 'a state') self.assertEqual('UNSUBMITTED', task.status()['state']) def test_active(self): name = 'projects/test-project/operations/an id' task = batch.Task('an id', 'a task type', 'a state', name=name) with mock.patch.object( data, 'getOperation', return_value=RUNNING_OPERATION ): self.assertTrue(task.active()) def test_not_active(self): task = batch.Task('an id', 'a task type', 'a state') with mock.patch.object( data, 'getOperation', return_value=SUCCEEDED_OPERATION ): self.assertFalse(task.active()) def test_repr_without_config(self): task = batch.Task('an id', 'a task type', 'a state') self.assertEqual('', task.__repr__()) def test_repr_with_config(self): an_id = None task_type = 'a task type' state = 'a state' description = 'a description' task = batch.Task( an_id, task_type, state, config={'description': description} ) self.assertEqual( f'', task.__repr__() ) def test_repr_with_id_and_config(self): an_id = 'an id' task_type = 'a task type' state = 'a state' description = 'a description' task = batch.Task( an_id, task_type, state, config={'description': description} ) self.assertEqual( f'', task.__repr__() ) class ExportTest(unittest.TestCase): def test_export_cannot_init(self): with self.assertRaises(AssertionError): batch.Export() def test_export_image_cannot_init(self): with self.assertRaises(AssertionError): batch.Export.image.__init__('something') def test_export_map_cannot_init(self): with self.assertRaises(AssertionError): batch.Export.map.__init__('something') def test_export_table_cannot_init(self): with self.assertRaises(AssertionError): batch.Export.table.__init__('something') def test_export_video_cannot_init(self): with self.assertRaises(AssertionError): batch.Export.video.__init__('something') class BatchTestCase(apitestcase.ApiTestCase): """A test case for batch functionality.""" start_call_params: Optional[Any] update_call_params: Optional[Any] def setUp(self): super().setUp() self.start_call_params = None self.update_call_params = None def test_task_start_cloud_api(self): """Verifies that Task.start() calls the server appropriately.""" mock_cloud_api_resource = mock.MagicMock() mock_cloud_api_resource.projects().table().export().execute.return_value = { 'name': 'projects/earthengine-legacy/operations/foo', 'metadata': {}, } with apitestcase.UsingCloudApi(cloud_api_resource=mock_cloud_api_resource): task = ee.batch.Export.table(ee.FeatureCollection('foo'), 'bar') task.start() export_args = mock_cloud_api_resource.projects().table().export.call_args self.assertEqual(task.id, 'foo') self.assertTrue(export_args[1]['body']['requestId']) self.assertEqual(export_args[1]['body']['description'], 'bar') def test_task_cancel_cloud_api(self): mock_cloud_api_resource = mock.MagicMock() mock_cloud_api_resource.projects().operations().list().execute.return_value = { 'operations': [{ 'name': 'projects/earthengine-legacy/operations/TEST1', 'metadata': {}, }] } mock_cloud_api_resource.projects().operations().list_next.return_value = ( None ) with apitestcase.UsingCloudApi(cloud_api_resource=mock_cloud_api_resource): task = ee.batch.Task.list()[0] task.cancel() cancel_args = ( mock_cloud_api_resource.projects().operations().cancel.call_args ) self.assertEqual( cancel_args[1]['name'], 'projects/earthengine-legacy/operations/TEST1' ) def test_export_image_trivial_region_cloud_api(self): """Verifies the task created by Export.image() with a trivial region.""" with apitestcase.UsingCloudApi(): region = [0, 0, 1, 0, 1, 1] task = ee.batch.Export.image.toAsset( ee.Image(42), assetId='users/foo/bar', region=region, scale=1000 ) expected_expression = ee.Image(42).clipToBoundsAndScale( geometry=ee.Geometry.LineString(region), scale=1000 ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_IMAGE', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( json.loads(task.config['expression'].serialize()), json.loads(expected_expression.serialize()), ) task.config.pop('expression') self.assertEqual( { 'assetExportOptions': { 'earthEngineDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), } }, 'description': 'myExportImageTask', }, task.config, ) def test_export_image_cloud_api(self): """Verifies the task created by Export.image().""" with apitestcase.UsingCloudApi(): region = ee.Geometry.Rectangle(1, 2, 3, 4) config = dict( region=region['coordinates'], maxPixels=10**10, crs='foo', crs_transform='[9,8,7,6,5,4]', tiffCloudOptimized=True, shardSize=512, fileDimensions=1024, formatOptions={'noData': 1}, ) task = ee.batch.Export.image(ee.Image(1), 'TestDescription', config) expected_expression = ( ee.Image(1) .reproject('foo', crsTransform=[9.0, 8.0, 7.0, 6.0, 5.0, 4.0]) .clip(region) ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_IMAGE', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': expected_expression, 'description': 'TestDescription', 'fileExportOptions': { 'fileFormat': 'GEO_TIFF', 'driveDestination': {'filenamePrefix': 'TestDescription'}, 'geoTiffOptions': { 'cloudOptimized': True, 'tileDimensions': {'width': 1024, 'height': 1024}, 'tileSize': {'value': 512}, 'noData': {'floatValue': 1}, }, }, 'maxPixels': {'value': '10000000000'}, }, task.config, ) def test_export_image_cloud_api_invalid_skip_empty_tiles(self): """Verifies errors are thrown when incorrectly specifying skipEmptyTiles.""" with apitestcase.UsingCloudApi(): with self.assertRaisesRegex( ValueError, 'skipEmptyTiles is only supported for GeoTIFF' ): ee.batch.Export.image.toDrive( ee.Image(1), 'TestDescription', fileFormat='TFRecord', skipEmptyTiles=True, ) def test_export_image_with_tf_record_cloud_api(self): """Verifies the task created by Export.image().""" with apitestcase.UsingCloudApi(): region = ee.Geometry.Rectangle(1, 2, 3, 4) config = dict( region=region['coordinates'], maxPixels=10**10, crs='foo', crs_transform='[9,8,7,6,5,4]', fileFormat='TFRecord', formatOptions={ 'patchDimensions': [256, 256], 'kernelSize': [32, 32], 'compressed': True, 'maxFileSize': 1e9, 'defaultValue': -999, 'tensorDepths': {'b1': 1, 'b2': 2}, 'sequenceData': True, 'collapseBands': True, 'maskedThreshold': 0.5, }, ) task = ee.batch.Export.image(ee.Image(1), 'TestDescription', config) expected_expression = ( ee.Image(1) .reproject('foo', crsTransform=[9.0, 8.0, 7.0, 6.0, 5.0, 4.0]) .clip(region) ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_IMAGE', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': expected_expression, 'description': 'TestDescription', 'fileExportOptions': { 'fileFormat': 'TF_RECORD_IMAGE', 'driveDestination': {'filenamePrefix': 'TestDescription'}, 'tfRecordOptions': { 'tileDimensions': {'width': 256, 'height': 256}, 'marginDimensions': { 'width': 32, 'height': 32, }, 'compress': True, 'maxSizeBytes': {'value': '1000000000'}, 'defaultValue': -999, 'tensorDepths': {'b1': 1, 'b2': 2}, 'sequenceData': True, 'collapseBands': True, 'maxMaskedRatio': {'value': 0.5}, }, }, 'maxPixels': {'value': '10000000000'}, }, task.config, ) def test_export_image_to_asset_cloud_api(self): """Verifies the Asset export task created by Export.image.toAsset().""" with apitestcase.UsingCloudApi(): config = dict( image=ee.Image(1), assetId='users/foo/bar', pyramidingPolicy={'B1': 'min'}, ) expected_expression = ee.Image(1) # Test keyed parameters. task_keyed = ee.batch.Export.image.toAsset( image=config['image'], assetId=config['assetId'], pyramidingPolicy=config['pyramidingPolicy'], ) self.assertIsNone(task_keyed.id) self.assertIsNone(task_keyed.name) self.assertEqual('EXPORT_IMAGE', task_keyed.task_type) self.assertEqual('UNSUBMITTED', task_keyed.state) self.assertEqual( { 'expression': expected_expression, 'description': 'myExportImageTask', 'assetExportOptions': { 'earthEngineDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), }, 'pyramidingPolicyOverrides': {'B1': 'MIN'}, }, }, task_keyed.config, ) task_ordered = ee.batch.Export.image.toAsset( config['image'], 'TestDescription', config['assetId'], maxPixels=1000, maxWorkers=100, shardSize=4, ) self.assertEqual('EXPORT_IMAGE', task_ordered.task_type) self.assertEqual('UNSUBMITTED', task_ordered.state) self.assertEqual( { 'expression': expected_expression, 'description': 'TestDescription', 'assetExportOptions': { 'earthEngineDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), }, 'tileSize': {'value': 4}, }, 'maxPixels': {'value': '1000'}, 'maxWorkers': {'value': 100}, }, task_ordered.config, ) def test_export_image_to_asset_cloud_api_with_tile_size(self): """Verifies the Asset export task created by Export.image.toAsset().""" with apitestcase.UsingCloudApi(): config = dict( image=ee.Image(1), assetId='users/foo/bar', pyramidingPolicy={'B1': 'min'}, ) expected_expression = ee.Image(1) task_ordered = ee.batch.Export.image.toAsset( config['image'], 'TestDescription', config['assetId'], maxPixels=1000, maxWorkers=100, tileSize=4, ) self.assertEqual('EXPORT_IMAGE', task_ordered.task_type) self.assertEqual('UNSUBMITTED', task_ordered.state) self.assertEqual( { 'expression': expected_expression, 'description': 'TestDescription', 'assetExportOptions': { 'earthEngineDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), }, 'tileSize': {'value': 4}, }, 'maxPixels': {'value': '1000'}, 'maxWorkers': {'value': 100}, }, task_ordered.config, ) def test_export_image_to_cloud_storage_cloud_api(self): """Verifies the Cloud Storage export task created by Export.image().""" with apitestcase.UsingCloudApi(): region = ee.Geometry.Rectangle(1, 2, 3, 4) config = dict( region=region['coordinates'], maxPixels=10**10, outputBucket='test-bucket', ) task = ee.batch.Export.image.toCloudStorage( ee.Image(1), 'TestDescription', config['outputBucket'], None, None, config['region'], None, None, None, config['maxPixels'], None, [512, 2048], True, ) expected_expression = ee.Image(1).clip(region) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_IMAGE', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': expected_expression, 'description': 'TestDescription', 'fileExportOptions': { 'fileFormat': 'GEO_TIFF', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'TestDescription', }, 'geoTiffOptions': { 'tileDimensions': {'width': 512, 'height': 2048}, 'skipEmptyFiles': True, }, }, 'maxPixels': {'value': '10000000000'}, }, task.config, ) config = dict( region=region['coordinates'], maxPixels=10**10, outputBucket='test-bucket', priority=999, ) task_with_priority = ee.batch.Export.image.toCloudStorage( ee.Image(1), 'TestDescription', config['outputBucket'], None, None, config['region'], None, None, None, config['maxPixels'], None, [512, 2048], True, None, None, config['priority'], ) self.assertEqual( { 'expression': expected_expression, 'description': 'TestDescription', 'fileExportOptions': { 'fileFormat': 'GEO_TIFF', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'TestDescription', }, 'geoTiffOptions': { 'tileDimensions': {'width': 512, 'height': 2048}, 'skipEmptyFiles': True, }, }, 'maxPixels': {'value': '10000000000'}, 'priority': {'value': 999}, }, task_with_priority.config, ) def test_export_image_to_google_drive_cloud_api(self): """Verifies the Drive destined task created by Export.image.toDrive().""" with apitestcase.UsingCloudApi(): region = ee.Geometry.Rectangle(1, 2, 3, 4) drive_task_by_keys = ee.batch.Export.image.toDrive( image=ee.Image(1), region=region['coordinates'], folder='foo', maxPixels=10**10, crs='foo', crsTransform='[9,8,7,6,5,4]', shardSize=512, ) expected_expression = ( ee.Image(1) .reproject('foo', crsTransform=[9.0, 8.0, 7.0, 6.0, 5.0, 4.0]) .clip(region) ) self.assertIsNone(drive_task_by_keys.id) self.assertIsNone(drive_task_by_keys.name) self.assertEqual('EXPORT_IMAGE', drive_task_by_keys.task_type) self.assertEqual('UNSUBMITTED', drive_task_by_keys.state) self.assertEqual( { 'expression': expected_expression, 'description': 'myExportImageTask', 'fileExportOptions': { 'fileFormat': 'GEO_TIFF', 'driveDestination': { 'folder': 'foo', 'filenamePrefix': 'myExportImageTask', }, 'geoTiffOptions': {'tileSize': {'value': 512}}, }, 'maxPixels': {'value': '10000000000'}, }, drive_task_by_keys.config, ) drive_task_with_old_keys = ee.batch.Export.image.toDrive( image=ee.Image(1), region=region['coordinates'], driveFolder='foo', driveFileNamePrefix='fooExport', maxPixels=10**10, crs='foo', crs_transform='[9,8,7,6,5,4]', ) self.assertIsNone(drive_task_with_old_keys.id) self.assertIsNone(drive_task_by_keys.name) self.assertEqual('EXPORT_IMAGE', drive_task_with_old_keys.task_type) self.assertEqual('UNSUBMITTED', drive_task_with_old_keys.state) self.assertEqual( { 'expression': expected_expression, 'description': 'myExportImageTask', 'fileExportOptions': { 'fileFormat': 'GEO_TIFF', 'driveDestination': { 'folder': 'foo', 'filenamePrefix': 'fooExport', }, }, 'maxPixels': {'value': '10000000000'}, }, drive_task_with_old_keys.config, ) with self.assertRaisesRegex( ee.EEException, 'Unknown configuration options.*' ): ee.batch.Export.image.toDrive(image=ee.Image(1), framesPerSecond=30) drive_task_with_priority = ee.batch.Export.image.toDrive( image=ee.Image(1), region=region['coordinates'], folder='foo', maxPixels=10**10, crs='foo', crsTransform='[9,8,7,6,5,4]', shardSize=512, priority=999, ) expected_expression = ( ee.Image(1) .reproject('foo', crsTransform=[9.0, 8.0, 7.0, 6.0, 5.0, 4.0]) .clip(region) ) self.assertEqual( { 'expression': expected_expression, 'description': 'myExportImageTask', 'fileExportOptions': { 'fileFormat': 'GEO_TIFF', 'driveDestination': { 'folder': 'foo', 'filenamePrefix': 'myExportImageTask', }, 'geoTiffOptions': {'tileSize': {'value': 512}}, }, 'maxPixels': {'value': '10000000000'}, 'priority': {'value': 999}, }, drive_task_with_priority.config, ) def test_export_map_to_cloud_storage_cloud_api(self): """Verifies the task created by Export.map.toCloudStorage().""" with apitestcase.UsingCloudApi(): config = dict( image=ee.Image(1), bucket='test-bucket', maxZoom=7, path='foo/gcs/path', maxWorkers=100, ) # Test keyed parameters. task_keyed = ee.batch.Export.map.toCloudStorage( image=config['image'], bucket=config['bucket'], maxZoom=config['maxZoom'], path=config['path'], maxWorkers=config['maxWorkers'], bucketCorsUris=['*'], ) expected_expression = ee.Image(1) self.assertIsNone(task_keyed.id) self.assertIsNone(task_keyed.name) self.assertEqual('EXPORT_TILES', task_keyed.task_type) self.assertEqual('UNSUBMITTED', task_keyed.state) self.assertEqual( { 'expression': expected_expression, 'description': 'myExportMapTask', 'tileOptions': { 'endZoom': config['maxZoom'], }, 'tileExportOptions': { 'fileFormat': 'AUTO_JPEG_PNG', 'cloudStorageDestination': { 'bucket': config['bucket'], 'filenamePrefix': config['path'], 'permissions': 'PUBLIC', 'bucketCorsUris': ['*'], }, }, 'maxWorkers': {'value': 100}, }, task_keyed.config, ) with self.assertRaisesRegex( ee.EEException, 'Unknown configuration options.*' ): config_with_bogus_option = config.copy() config_with_bogus_option['framesPerSecond'] = 30 ee.batch.Export.map.toCloudStorage(**config_with_bogus_option) # Test ordered parameters. task_ordered = ee.batch.Export.map.toCloudStorage( config['image'], 'TestDescription', config['bucket'], 'jpeg', None, False, None, 30, None, None, None, 'aFakeKey', maxWorkers=100, ) self.assertIsNone(task_ordered.id) self.assertIsNone(task_ordered.name) self.assertEqual('EXPORT_TILES', task_ordered.task_type) self.assertEqual('UNSUBMITTED', task_ordered.state) self.assertEqual( { 'expression': expected_expression, 'description': 'TestDescription', 'tileOptions': { 'scale': 30, 'mapsApiKey': 'aFakeKey', }, 'tileExportOptions': { 'fileFormat': 'JPEG', 'cloudStorageDestination': { 'bucket': config['bucket'], 'filenamePrefix': 'TestDescription', }, }, 'maxWorkers': {'value': 100}, }, task_ordered.config, ) config = dict( image=ee.Image(1), bucket='test-bucket', maxZoom=7, path='foo/gcs/path', maxWorkers=100, priority=999, ) task_with_priority = ee.batch.Export.map.toCloudStorage( image=config['image'], bucket=config['bucket'], maxZoom=config['maxZoom'], path=config['path'], maxWorkers=config['maxWorkers'], bucketCorsUris=['*'], priority=config['priority'], ) expected_expression = ee.Image(1) self.assertEqual( { 'expression': expected_expression, 'description': 'myExportMapTask', 'tileOptions': { 'endZoom': config['maxZoom'], }, 'tileExportOptions': { 'fileFormat': 'AUTO_JPEG_PNG', 'cloudStorageDestination': { 'bucket': config['bucket'], 'filenamePrefix': config['path'], 'permissions': 'PUBLIC', 'bucketCorsUris': ['*'], }, }, 'maxWorkers': {'value': 100}, 'priority': {'value': 999}, }, task_with_priority.config, ) def test_export_map_to_cloud_storage_cloud_api_with_v1_parameters(self): """Verifies Export.map.toCloudStorage() tasks with v1 parameters.""" with apitestcase.UsingCloudApi(): config = dict( image=ee.Image(1), bucket='test-bucket', minZoom=1, maxZoom=7, startZoom=2, # Takes precedence over minZoom. endZoom=8, # Takes precedence over maxZoom. path='foo/gcs/path', skipEmptyTiles=True, skipEmpty=False, # Takes precedence over skipEmpty. maxWorkers=100, ) # Test keyed parameters. task_keyed = ee.batch.Export.map.toCloudStorage( image=config['image'], bucket=config['bucket'], minZoom=config['minZoom'], maxZoom=config['maxZoom'], startZoom=config['startZoom'], endZoom=config['endZoom'], path=config['path'], maxWorkers=config['maxWorkers'], skipEmptyTiles=config['skipEmptyTiles'], skipEmpty=config['skipEmpty'], ) expected_expression = ee.Image(1) self.assertIsNone(task_keyed.id) self.assertIsNone(task_keyed.name) self.assertEqual('EXPORT_TILES', task_keyed.task_type) self.assertEqual('UNSUBMITTED', task_keyed.state) self.assertEqual( { 'expression': expected_expression, 'description': 'myExportMapTask', 'tileOptions': { 'startZoom': config['startZoom'], 'endZoom': config['endZoom'], 'skipEmpty': config['skipEmpty'], }, 'tileExportOptions': { 'fileFormat': 'AUTO_JPEG_PNG', 'cloudStorageDestination': { 'bucket': config['bucket'], 'filenamePrefix': config['path'], 'permissions': 'PUBLIC', }, }, 'maxWorkers': {'value': 100}, }, task_keyed.config, ) def test_export_map_to_cloud_storage_cloud_api_with_v1alpha_parameters(self): """Verifies Export.map.toCloudStorage() tasks with v1alpha parameters.""" with apitestcase.UsingCloudApi(): task_keyed = ee.batch.Export.map.toCloudStorage( image=ee.Image(0), bucket='test-bucket', minZoom=2, maxWorkers=1, maxZoom=3, skipEmptyTiles=True, ) self.assertEqual( { 'expression': ee.Image(0), 'description': 'myExportMapTask', 'tileOptions': {'startZoom': 2, 'endZoom': 3, 'skipEmpty': True}, 'tileExportOptions': { 'fileFormat': 'AUTO_JPEG_PNG', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'myExportMapTask', 'permissions': 'PUBLIC', }, }, 'maxWorkers': {'value': 1}, }, task_keyed.config, ) def test_export_table_cloud_api(self): """Verifies the task created by Export.table().""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table( ee.FeatureCollection('drive test FC'), config={'maxWorkers': 100} ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_FEATURES', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': ee.FeatureCollection('drive test FC'), 'description': 'myExportTableTask', 'fileExportOptions': { 'fileFormat': 'CSV', 'driveDestination': { 'filenamePrefix': 'myExportTableTask', }, }, 'maxWorkers': {'value': 100}, }, task.config, ) def test_export_table_cloud_api_bogus_parameter(self): """Verifies that bogus parameters are rejected.""" with apitestcase.UsingCloudApi(): with self.assertRaisesRegex( ee.EEException, 'Unknown configuration options.*' ): ee.batch.Export.table.toDrive( ee.FeatureCollection('drive test FC'), framesPerSecond=30 ) def test_export_table_selectors_cloud_api(self): """Verifies that table export accepts a list or tuple of selectors.""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toCloudStorage( collection=ee.FeatureCollection('foo'), selectors=['ab', 'bb', 'c'], outputBucket='foo', ) self.assertEqual(['ab', 'bb', 'c'], task.config['selectors']) task = ee.batch.Export.table.toCloudStorage( collection=ee.FeatureCollection('foo'), selectors=('x', 'y'), outputBucket='foo', ) self.assertEqual(['x', 'y'], task.config['selectors']) # Single string should work too. task = ee.batch.Export.table.toCloudStorage( collection=ee.FeatureCollection('foo'), selectors='ab,cd,ef', outputBucket='foo', ) self.assertEqual(['ab', 'cd', 'ef'], task.config['selectors']) def test_export_table_to_cloud_storage_cloud_api(self): """Verifies the Cloud Storage task created by Export.table().""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toCloudStorage( collection=ee.FeatureCollection('foo'), outputBucket='test-bucket', maxVertices=1e6, ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_FEATURES', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'myExportTableTask', 'fileExportOptions': { 'fileFormat': 'CSV', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'myExportTableTask', }, }, 'maxVertices': {'value': int(1e6)}, }, task.config, ) task_with_priority = ee.batch.Export.table.toCloudStorage( collection=ee.FeatureCollection('foo'), outputBucket='test-bucket', maxVertices=1e6, priority=999, ) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'myExportTableTask', 'fileExportOptions': { 'fileFormat': 'CSV', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'myExportTableTask', }, }, 'maxVertices': {'value': int(1e6)}, 'priority': {'value': 999}, }, task_with_priority.config, ) def test_export_table_to_google_drive_cloud_api(self): """Verifies the Drive destined task created by Export.table.toDrive().""" with apitestcase.UsingCloudApi(): test_collection = ee.FeatureCollection('foo') test_description = 'TestDescription' test_file_name_prefix = 'fooDriveFileNamePrefix' test_format = 'KML' expected_config = { 'expression': test_collection, 'description': test_description, 'fileExportOptions': { 'fileFormat': test_format, 'driveDestination': { 'filenamePrefix': test_file_name_prefix, }, }, 'maxVertices': {'value': 0}, } # Ordered parameters task_ordered = ee.batch.Export.table.toDrive( test_collection, test_description, None, test_file_name_prefix, test_format, maxVertices=0, ) self.assertIsNone(task_ordered.id) self.assertIsNone(task_ordered.name) self.assertEqual('EXPORT_FEATURES', task_ordered.task_type) self.assertEqual('UNSUBMITTED', task_ordered.state) self.assertEqual(expected_config, task_ordered.config) # Updating expectations to test keyed parameters expected_config['description'] = 'myExportTableTask' expected_config['fileExportOptions']['fileFormat'] = 'CSV' expected_config['fileExportOptions']['driveDestination'][ 'folder' ] = 'fooFolder' # Test that deprecated parameters (driveFolder and driveFileNamePrefix) # still work. task_old_keys = ee.batch.Export.table.toDrive( collection=test_collection, driveFolder='fooFolder', driveFileNamePrefix='fooDriveFileNamePrefix', maxVertices=0, ) self.assertEqual('EXPORT_FEATURES', task_old_keys.task_type) self.assertEqual('UNSUBMITTED', task_old_keys.state) self.assertEqual(expected_config, task_old_keys.config) # Test that new parameters work task_new_keys = ee.batch.Export.table.toDrive( collection=test_collection, folder='fooFolder', fileNamePrefix='fooDriveFileNamePrefix', maxVertices=0, ) self.assertEqual('EXPORT_FEATURES', task_new_keys.task_type) self.assertEqual('UNSUBMITTED', task_new_keys.state) self.assertEqual(expected_config, task_new_keys.config) expected_config_with_priority = { 'expression': test_collection, 'description': test_description, 'fileExportOptions': { 'fileFormat': test_format, 'driveDestination': { 'filenamePrefix': test_file_name_prefix, }, }, 'maxVertices': {'value': 0}, 'priority': {'value': 999}, } task_with_priority = ee.batch.Export.table.toDrive( test_collection, test_description, None, test_file_name_prefix, test_format, maxVertices=0, priority=999, ) self.assertEqual(expected_config_with_priority, task_with_priority.config) def test_export_table_to_asset_cloud_api(self): """Verifies the export task created by Export.table.toAsset().""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toAsset( collection=ee.FeatureCollection('foo'), description='foo', assetId='users/foo/bar', ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_FEATURES', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'assetExportOptions': { 'earthEngineDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), } }, }, task.config, ) task_with_priority = ee.batch.Export.table.toAsset( collection=ee.FeatureCollection('foo'), description='foo', assetId='users/foo/bar', priority=999, ) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'assetExportOptions': { 'earthEngineDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), } }, 'priority': {'value': 999}, }, task_with_priority.config, ) def test_export_table_with_file_format_cloud_api(self): """Verifies the task created by Export.table() given a file format.""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toCloudStorage( collection=ee.FeatureCollection('foo'), outputBucket='test-bucket', fileFormat='tfRecord', ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_FEATURES', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'myExportTableTask', 'fileExportOptions': { 'fileFormat': 'TF_RECORD_TABLE', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'myExportTableTask', }, }, }, task.config, ) def test_export_table_to_feature_view_cloud_api(self): """Verifies the export task created by Export.table.toFeatureView().""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toFeatureView( collection=ee.FeatureCollection('foo'), description='foo', assetId='users/foo/bar', ingestionTimeParameters={ 'maxFeaturesPerTile': 10, 'zOrderRanking': [], }, ) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_FEATURES', task.task_type) self.assertEqual('UNSUBMITTED', task.state) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'featureViewExportOptions': { 'featureViewDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), }, 'ingestionTimeParameters': { 'thinningOptions': {'maxFeaturesPerTile': 10}, }, }, }, task.config, ) task_with_priority = ee.batch.Export.table.toFeatureView( collection=ee.FeatureCollection('foo'), description='foo', assetId='users/foo/bar', ingestionTimeParameters={ 'maxFeaturesPerTile': 10, 'zOrderRanking': [], }, priority=999, ) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'featureViewExportOptions': { 'featureViewDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), }, 'ingestionTimeParameters': { 'thinningOptions': {'maxFeaturesPerTile': 10}, }, }, 'priority': {'value': 999}, }, task_with_priority.config, ) def test_export_table_to_feature_view_empty_params_cloud_api(self): """Verifies the export task created by Export.table.toFeatureView().""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toFeatureView( collection=ee.FeatureCollection('foo'), description='foo', assetId='users/foo/bar', ) with self.subTest(name='TaskIdAndName'): self.assertIsNone(task.id) self.assertIsNone(task.name) with self.subTest(name='TypeIsExportFeatures'): self.assertEqual('EXPORT_FEATURES', task.task_type) with self.subTest(name='StateIsUnsubmitted'): self.assertEqual('UNSUBMITTED', task.state) with self.subTest(name='ConfigContents'): self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'featureViewExportOptions': { 'featureViewDestination': { 'name': ( 'projects/earthengine-legacy/assets/users/foo/bar' ), }, 'ingestionTimeParameters': {}, }, }, task.config, ) def test_export_table_to_feature_view_all_ingestion_params(self): """Verifies the task ingestion params created by toFeatureView().""" task = ee.batch.Export.table.toFeatureView( collection=ee.FeatureCollection('foo'), description='foo', assetId='users/foo/bar', ingestionTimeParameters={ 'maxFeaturesPerTile': 10, 'thinningStrategy': 'GLOBALLY_CONSISTENT', 'thinningRanking': 'my-attribute ASC, other-attr DESC', 'zOrderRanking': ['.minZoomLevel DESC', '.geometryType ASC'], }, ) expected_ingestion_params = { 'rankingOptions': { 'thinningRankingRule': { 'rankByOneThingRule': [ { 'direction': 'ASCENDING', 'rankByAttributeRule': { 'attributeName': 'my-attribute' }, }, { 'direction': 'DESCENDING', 'rankByAttributeRule': {'attributeName': 'other-attr'}, }, ] }, 'zOrderRankingRule': { 'rankByOneThingRule': [ {'direction': 'DESCENDING', 'rankByMinZoomLevelRule': {}}, {'direction': 'ASCENDING', 'rankByGeometryTypeRule': {}}, ] }, }, 'thinningOptions': { 'maxFeaturesPerTile': 10, 'thinningStrategy': 'GLOBALLY_CONSISTENT', }, } self.assertEqual( expected_ingestion_params, task.config['featureViewExportOptions']['ingestionTimeParameters'], ) def test_export_table_to_feature_view_bad_rank_by_one_thing_rule(self): """Verifies a bad RankByOneThingRule throws an exception.""" with self.assertRaisesRegex( ee.EEException, 'Ranking rule format is invalid.*' ): ee.batch.Export.table.toFeatureView( collection=ee.FeatureCollection('foo'), assetId='users/foo/bar', ingestionTimeParameters={'thinningRanking': 'my-attribute BAD_DIR'}, ) def test_export_table_to_feature_view_bad_ranking_rule(self): """Verifies a bad RankingRule throws an exception.""" with self.assertRaisesRegex( ee.EEException, 'Unable to build ranking rule from rules.*' ): ee.batch.Export.table.toFeatureView( collection=ee.FeatureCollection('foo'), assetId='users/foo/bar', ingestionTimeParameters={'thinningRanking': {'key': 'val'}}, ) def test_export_table_to_feature_view_bad_ingestion_time_params(self): """Verifies a bad set of ingestion time params throws an exception.""" with self.assertRaisesRegex( ee.EEException, ( r'The following keys are unrecognized in the ' r'ingestion parameters: \[\'badThinningKey\'\]' ), ): ee.batch.Export.table.toFeatureView( collection=ee.FeatureCollection('foo'), assetId='users/foo/bar', ingestionTimeParameters={'badThinningKey': {'key': 'val'}}, ) def test_export_table_to_big_query_required_params(self): """Verifies the export task created by Export.table.toBigQuery().""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toBigQuery( collection=ee.FeatureCollection('foo'), table='project.dataset.table', description='foo', ) with self.subTest(name='TaskIdAndName'): self.assertIsNone(task.id) self.assertIsNone(task.name) with self.subTest(name='TypeIsExportFeatures'): self.assertEqual('EXPORT_FEATURES', task.task_type) with self.subTest(name='StateIsUnsubmitted'): self.assertEqual('UNSUBMITTED', task.state) with self.subTest(name='ConfigContents'): self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'bigqueryExportOptions': { 'bigqueryDestination': { 'table': 'project.dataset.table', 'overwrite': False, 'append': False, } }, }, task.config, ) with self.subTest(name='PriorityIsSet'): task_with_priority = ee.batch.Export.table.toBigQuery( collection=ee.FeatureCollection('foo'), table='project.dataset.table', description='foo', priority=999, ) self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'bigqueryExportOptions': { 'bigqueryDestination': { 'table': 'project.dataset.table', 'overwrite': False, 'append': False, } }, 'priority': {'value': 999}, }, task_with_priority.config, ) def test_export_table_to_big_query_all_params(self): """Verifies the export task created by Export.table.toBigQuery().""" with apitestcase.UsingCloudApi(): task = ee.batch.Export.table.toBigQuery( collection=ee.FeatureCollection('foo'), description='foo', table='project.dataset.table', overwrite=True, append=True, selectors=['.geo'], maxVertices=500, ) with self.subTest(name='TaskIdAndName'): self.assertIsNone(task.id) self.assertIsNone(task.name) with self.subTest(name='TypeIsExportFeatures'): self.assertEqual('EXPORT_FEATURES', task.task_type) with self.subTest(name='StateIsUnsubmitted'): self.assertEqual('UNSUBMITTED', task.state) with self.subTest(name='ConfigContents'): self.assertEqual( { 'expression': ee.FeatureCollection('foo'), 'description': 'foo', 'maxVertices': {'value': 500}, 'selectors': ['.geo'], 'bigqueryExportOptions': { 'bigqueryDestination': { 'table': 'project.dataset.table', 'overwrite': True, 'append': True, } }, }, task.config, ) def test_export_table_to_big_query_bad_table_name(self): """Verifies a bad table name throws an exception.""" with apitestcase.UsingCloudApi(): with self.assertRaisesRegex( ee.EEException, 'The BigQuery table reference must be a string of the form.*', ): ee.batch.Export.table.toBigQuery( collection=ee.FeatureCollection('foo'), table=['array.instead.of.string'], description='foo', ) with self.assertRaisesRegex( ee.EEException, 'The BigQuery table reference must be a string of the form.*', ): ee.batch.Export.table.toBigQuery( collection=ee.FeatureCollection('foo'), table='not.the-correct-format', description='foo', ) def test_export_video_cloud_api(self): """Verifies the task created by Export.video().""" with apitestcase.UsingCloudApi(): region = ee.Geometry.Rectangle(1, 2, 3, 4) config = dict( region=region['coordinates'], dimensions=16, framesPerSecond=30, maxFrames=10000, maxPixels=10000000, maxWorkers=100, ) collection = ee.ImageCollection([ee.Image(1), ee.Image(2)]) task = ee.batch.Export.video(collection, 'TestVideoName', config) self.assertIsNone(task.id) self.assertIsNone(task.name) self.assertEqual('EXPORT_VIDEO', task.task_type) self.assertEqual('UNSUBMITTED', task.state) # Defaults the destination to Drive. def expected_preparation_function(img): img = img.setDefaultProjection( crs='SR-ORG:6627', crsTransform=[1, 0, 0, 0, -1, 0] ) img = img.clipToBoundsAndScale(geometry=region, maxDimension=16) return img expected_collection = collection.map(expected_preparation_function) # Using map() breaks equality comparison on collections, so compare the # serialised forms instead. self.assertEqual( expected_collection.serialize(for_cloud_api=True), task.config.pop('expression').serialize(for_cloud_api=True), ) self.assertEqual( { 'description': 'TestVideoName', 'videoOptions': { 'framesPerSecond': 30, 'maxFrames': 10000, 'maxPixelsPerFrame': {'value': '10000000'}, }, 'fileExportOptions': { 'fileFormat': 'MP4', 'driveDestination': { 'filenamePrefix': 'TestVideoName', }, }, 'maxWorkers': {'value': 100}, }, task.config, ) config['outputBucket'] = 'test-bucket' gcs_task = ee.batch.Export.video(collection, 'TestVideoName', config) self.assertEqual('EXPORT_VIDEO', gcs_task.task_type) self.assertEqual('UNSUBMITTED', gcs_task.state) self.assertEqual( expected_collection.serialize(for_cloud_api=True), gcs_task.config.pop('expression').serialize(for_cloud_api=True), ) self.assertEqual( { 'description': 'TestVideoName', 'videoOptions': { 'framesPerSecond': 30, 'maxFrames': 10000, 'maxPixelsPerFrame': {'value': '10000000'}, }, 'fileExportOptions': { 'fileFormat': 'MP4', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'TestVideoName', }, }, 'maxWorkers': {'value': 100}, }, gcs_task.config, ) with self.assertRaisesRegex( ee.EEException, 'Unknown configuration options.*' ): config_with_bogus_option = config.copy() config_with_bogus_option['flamesPerSleestak'] = 30 ee.batch.Export.video( collection, 'TestVideoName', config_with_bogus_option ) def test_export_video_to_cloud_storage_cloud_api(self): """Verifies the task created by Export.video.toCloudStorage().""" with apitestcase.UsingCloudApi(): region = ee.Geometry.Rectangle(1, 2, 3, 4) collection = ee.ImageCollection([ee.Image(1), ee.Image(2)]) def expected_preparation_function(img): img = img.reproject( crs='foo', crsTransform=[9.0, 8.0, 7.0, 6.0, 5.0, 4.0] ) img = img.clipToBoundsAndScale(geometry=region, maxDimension=16) return img expected_collection = collection.map(expected_preparation_function) expected_config = { 'description': 'TestVideoName', 'fileExportOptions': { 'fileFormat': 'MP4', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'TestVideoName', }, }, } # Test keyed parameters. task_keyed = ee.batch.Export.video.toCloudStorage( collection=collection, description='TestVideoName', bucket='test-bucket', dimensions=16, region=region['coordinates'], crsTransform='[9,8,7,6,5,4]', crs='foo', ) self.assertIsNone(task_keyed.id) self.assertIsNone(task_keyed.name) self.assertEqual('EXPORT_VIDEO', task_keyed.task_type) self.assertEqual('UNSUBMITTED', task_keyed.state) self.assertEqual( expected_collection.serialize(for_cloud_api=True), task_keyed.config.pop('expression').serialize(for_cloud_api=True), ) self.assertEqual(expected_config, task_keyed.config) # Test ordered parameters. task_ordered = ee.batch.Export.video.toCloudStorage( collection, 'TestVideoName', 'test-bucket', None, None, 16, region['coordinates'], None, 'foo', '[9,8,7,6,5,4]', ) self.assertEqual('EXPORT_VIDEO', task_ordered.task_type) self.assertEqual('UNSUBMITTED', task_ordered.state) self.assertEqual( expected_collection.serialize(for_cloud_api=True), task_ordered.config.pop('expression').serialize(for_cloud_api=True), ) self.assertEqual(expected_config, task_ordered.config) expected_config_with_priority = { 'description': 'TestVideoName', 'fileExportOptions': { 'fileFormat': 'MP4', 'cloudStorageDestination': { 'bucket': 'test-bucket', 'filenamePrefix': 'TestVideoName', }, }, 'priority': {'value': 999}, } task_with_priority = ee.batch.Export.video.toCloudStorage( collection=collection, description='TestVideoName', bucket='test-bucket', dimensions=16, region=region['coordinates'], crsTransform='[9,8,7,6,5,4]', crs='foo', priority=999, ) self.assertEqual( expected_collection.serialize(for_cloud_api=True), task_with_priority.config.pop('expression').serialize( for_cloud_api=True ), ) self.assertEqual(expected_config_with_priority, task_with_priority.config) def test_export_video_to_drive_cloud_api(self): """Verifies the task created by Export.video.toDrive().""" with apitestcase.UsingCloudApi(): region = ee.Geometry.Rectangle(1, 2, 3, 4) collection = ee.ImageCollection([ee.Image(1), ee.Image(2)]) def expected_preparation_function(img): img = img.reproject( crs='SR-ORG:6627', crsTransform=[9.0, 8.0, 7.0, 6.0, 5.0, 4.0] ) img = img.clipToBoundsAndScale(geometry=region, maxDimension=16) return img expected_collection = collection.map(expected_preparation_function) expected_config = { 'description': 'TestVideoName', 'fileExportOptions': { 'fileFormat': 'MP4', 'driveDestination': { 'folder': 'test-folder', 'filenamePrefix': 'TestVideoName', }, }, } # Test keyed parameters. task_keyed = ee.batch.Export.video.toDrive( collection=collection, description='TestVideoName', folder='test-folder', dimensions=16, crsTransform='[9,8,7,6,5,4]', region=region['coordinates'], ) self.assertIsNone(task_keyed.id) self.assertIsNone(task_keyed.name) self.assertEqual('EXPORT_VIDEO', task_keyed.task_type) self.assertEqual('UNSUBMITTED', task_keyed.state) self.assertEqual( expected_collection.serialize(for_cloud_api=True), task_keyed.config.pop('expression').serialize(for_cloud_api=True), ) self.assertEqual(expected_config, task_keyed.config) # Test ordered parameters. task_ordered = ee.batch.Export.video.toDrive( collection, 'TestVideoName', 'test-folder', None, None, 16, region['coordinates'], None, 'SR-ORG:6627', '[9,8,7,6,5,4]', ) self.assertEqual('EXPORT_VIDEO', task_ordered.task_type) self.assertEqual('UNSUBMITTED', task_ordered.state) self.assertEqual( expected_collection.serialize(for_cloud_api=True), task_ordered.config.pop('expression').serialize(for_cloud_api=True), ) self.assertEqual(expected_config, task_ordered.config) expected_config_with_priority = { 'description': 'TestVideoName', 'fileExportOptions': { 'fileFormat': 'MP4', 'driveDestination': { 'folder': 'test-folder', 'filenamePrefix': 'TestVideoName', }, }, 'priority': {'value': 999}, } task_with_priority = ee.batch.Export.video.toDrive( collection=collection, description='TestVideoName', folder='test-folder', dimensions=16, crsTransform='[9,8,7,6,5,4]', region=region['coordinates'], priority=999, ) self.assertEqual( expected_collection.serialize(for_cloud_api=True), task_with_priority.config.pop('expression').serialize( for_cloud_api=True ), ) self.assertEqual(expected_config_with_priority, task_with_priority.config) def test_export_workload_tag(self): """Verifies that the workload tag state is captured before start.""" mock_cloud_api_resource = mock.MagicMock() mock_cloud_api_resource.projects().table().export().execute.return_value = { 'name': 'projects/earthengine-legacy/operations/foo', 'metadata': {}, } with apitestcase.UsingCloudApi(cloud_api_resource=mock_cloud_api_resource): ee.data.setWorkloadTag('test-export') task = ee.batch.Export.table(ee.FeatureCollection('foo'), 'bar') ee.data.setWorkloadTag('not-test-export') self.assertEqual('test-export', task.workload_tag) task.start() self.assertEqual('test-export', task.config['workloadTag']) export_args = mock_cloud_api_resource.projects().table().export.call_args self.assertEqual(export_args[1]['body']['workloadTag'], 'test-export') ee.data.resetWorkloadTag(True) with apitestcase.UsingCloudApi(cloud_api_resource=mock_cloud_api_resource): ee.data.setWorkloadTag('test-export') task = ee.batch.Export.table(ee.FeatureCollection('foo'), 'bar') self.assertEqual('test-export', task.workload_tag) task.config['workloadTag'] = 'not-test-export' task.start() # Overridden in config. self.assertEqual('not-test-export', task.config['workloadTag']) export_args = mock_cloud_api_resource.projects().table().export.call_args self.assertEqual(export_args[1]['body']['workloadTag'], 'not-test-export') ee.data.resetWorkloadTag(True) with apitestcase.UsingCloudApi(cloud_api_resource=mock_cloud_api_resource): task = ee.batch.Export.table(ee.FeatureCollection('foo'), 'bar') ee.data.setWorkloadTag('not-test-export') self.assertEqual('', task.workload_tag) task.start() # Not captured on start(). self.assertEqual('', task.config['workloadTag']) export_args = mock_cloud_api_resource.projects().table().export.call_args self.assertNotIn('workloadTag', export_args[1]['body']) ee.data.resetWorkloadTag(True) if __name__ == '__main__': unittest.main()