earthengine-api/python/cli/commands.py
2016-06-02 15:13:44 -07:00

818 lines
25 KiB
Python

#!/usr/bin/env python
"""Commands supported by the Earth Engine command line interface.
Each command is implemented by extending the Command class. Each class
defines the supported positional and optional arguments, as well as
the actions to be taken when the command is executed.
"""
import argparse
import calendar
from collections import Counter
import datetime
import json
import os
import re
import sys
import urlparse
import ee
import authenticate
import utils
# Constants used in ACLs.
ALL_USERS = 'AllUsers'
ALL_USERS_CAN_READ = 'all_users_can_read'
READERS = 'readers'
WRITERS = 'writers'
# Constants used in setting metadata properties.
TYPE_DATE = 'date'
TYPE_NUMBER = 'number'
TYPE_STRING = 'string'
SYSTEM_TIME_START = 'system:time_start'
SYSTEM_TIME_END = 'system:time_end'
# A regex that parses properties of the form "[(type)]name=value". The
# second, third, and fourth group are type, name, and number, respectively.
PROPERTY_RE = re.compile(r'(\(([^\)]*)\))?([^=]+)=(.*)')
# Translate internal task type identifiers to user-friendly strings that
# are consistent with the language in the API and docs.
TASK_TYPES = {
'EXPORT_FEATURES': 'Export.table',
'EXPORT_IMAGE': 'Export.image',
'EXPORT_TILES': 'Export.map',
'EXPORT_VIDEO': 'Export.video',
'INGEST': 'Upload',
}
def _add_wait_arg(parser):
parser.add_argument(
'--wait', '-w', nargs='?', default=-1, type=int, const=sys.maxint,
help=('Wait for the task to finish,'
' or timeout after the specified number of seconds.'
' Without this flag, the command just starts an export'
' task in the background, and returns immediately.'))
def _upload(args, config, request):
config.ee_init()
if 0 <= args.wait < 10:
raise ee.EEException('Wait time should be at least 10 seconds.')
task_id = ee.data.newTaskId()[0]
ee.data.startIngestion(task_id, request)
print 'Started upload task with ID: %s' % task_id
if args.wait >= 0:
print 'Waiting for the upload task to complete...'
utils.wait_for_task(task_id, args.wait)
# Argument types
def _comma_separated_numbers(string):
"""Parses an input consisting of comma-separated numbers."""
error_msg = 'Argument should be a comma-separated list of numbers: {}'
values = string.split(',')
if not values:
raise argparse.ArgumentTypeError(error_msg.format(string))
numbervalues = []
for value in values:
try:
numbervalues.append(int(value))
except ValueError:
try:
numbervalues.append(float(value))
except ValueError:
raise argparse.ArgumentTypeError(error_msg.format(string))
return numbervalues
def _comma_separated_pyramiding_policies(string):
"""Parses an input consisting of comma-separated pyramiding policies."""
error_msg = ('Argument should be a comma-separated list of: '
'{{"mean", "sample", "min", "max", "mode"}}: {}')
values = string.split(',')
if not values:
raise argparse.ArgumentTypeError(error_msg.format(string))
redvalues = []
for value in values:
if value.lower() not in {'mean', 'sample', 'min', 'max', 'mode'}:
raise argparse.ArgumentTypeError(error_msg.format(string))
redvalues.append(value.lower())
return redvalues
def _decode_number(string):
"""Decodes a number from a command line argument."""
try:
return float(string)
except ValueError:
raise argparse.ArgumentTypeError(
'Invalid value for property of type "number": "%s".' % string)
def _timestamp_ms_for_datetime(datetime_obj):
"""Returns time since the epoch in ms for the given UTC datetime object."""
return (
int(calendar.timegm(datetime_obj.timetuple()) * 1000) +
datetime_obj.microsecond / 1000)
def _decode_date(string):
"""Decodes a date from a command line argument, as msec since the epoch."""
try:
return int(string)
except ValueError:
date_formats = ['%Y-%m-%d',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%f']
for date_format in date_formats:
try:
dt = datetime.datetime.strptime(string, date_format)
return _timestamp_ms_for_datetime(dt)
except ValueError:
continue
raise argparse.ArgumentTypeError(
'Invalid value for property of type "date": "%s".' % string)
def _decode_property(string):
"""Decodes a general key-value property from a command line argument."""
m = PROPERTY_RE.match(string)
if not m:
raise argparse.ArgumentTypeError(
'Invalid property: "%s". Must have the form "name=value" or '
'"(type)name=value".', string)
_, type_str, name, value_str = m.groups()
if type_str is None:
# Guess numeric types automatically.
try:
value = _decode_number(value_str)
except argparse.ArgumentTypeError:
value = value_str
elif type_str == TYPE_DATE:
value = _decode_date(value_str)
elif type_str == TYPE_NUMBER:
value = _decode_number(value_str)
elif type_str == TYPE_STRING:
value = value_str
else:
raise argparse.ArgumentTypeError(
'Unrecognized property type name: "%s". Expected one of "string", '
'"number", "date", or a prefix.' % type_str)
return (name, value)
def _add_property_flags(parser):
"""Adds command line flags related to metadata properties to a parser."""
parser.add_argument(
'--property', '-p',
help='A property to set, in the form [(type)]name=value. If no type '
'is specified the type will be "number" if the value is numeric and '
'"string" otherwise. May be provided multiple times.',
action='append',
type=_decode_property)
parser.add_argument(
'--time_start', '-ts',
help='Sets the start time property to a number or date.',
type=_decode_date)
parser.add_argument(
'--time_end', '-te',
help='Sets the end time property to a number or date.',
type=_decode_date)
def _decode_property_flags(args):
"""Decodes metadata properties from args as a list of (name,value) pairs."""
property_list = list(args.property or [])
if args.time_start:
property_list.append((SYSTEM_TIME_START, args.time_start))
if args.time_end:
property_list.append((SYSTEM_TIME_END, args.time_end))
names = [name for name, _ in property_list]
duplicates = [name for name, count in Counter(names).items() if count > 1]
if duplicates:
raise ee.EEException('Duplicate property name(s): %s.' % duplicates)
return dict(property_list)
def _check_valid_files(filenames):
"""Returns true if the given filenames are valid upload file URIs."""
for filename in filenames:
if not filename.startswith('gs://'):
raise ee.EEException('Invalid Cloud Storage URL: ' + filename)
def _pretty_print_json(json_obj):
"""Pretty-prints a JSON object to stdandard output."""
print json.dumps(json_obj, sort_keys=True, indent=2, separators=(',', ': '))
class Dispatcher(object):
"""Dispatches to a set of commands implemented as command classes."""
def __init__(self, parser):
self.command_dict = {}
self.dest = self.name + '_cmd'
subparsers = parser.add_subparsers(title='Commands', dest=self.dest)
for command in self.COMMANDS:
subparser = subparsers.add_parser(
command.name, description=command.__doc__,
help=command.__doc__.splitlines()[0])
self.command_dict[command.name] = command(subparser)
def run(self, args, config):
self.command_dict[vars(args)[self.dest]].run(args, config)
class AuthenticateCommand(object):
"""Prompts the user to authorize access to Earth Engine via OAuth2."""
name = 'authenticate'
def __init__(self, unused_parser):
pass
def run(self, unused_args, unused_config):
authenticate.Authenticate.authenticate()
class AclChCommand(object):
"""Changes the access control list for an asset.
Each change specifies the email address of a user or group and,
for additions, one of R or W corresponding to the read or write
permissions to be granted, as in "user@domain.com:R". Use the
special name "AllUsers" to change whether all users can read the
asset.
"""
name = 'ch'
def __init__(self, parser):
parser.add_argument('-u', action='append', metavar='permission',
help='Add or modify a user\'s permission.')
parser.add_argument('-d', action='append', metavar='user',
help='Remove all permissions for a user.')
parser.add_argument('asset_id', help='ID of the asset.')
def run(self, args, config):
config.ee_init()
permissions = self._parse_permissions(args)
acl = ee.data.getAssetAcl(args.asset_id)
self._apply_permissions(acl, permissions)
# The original permissions will contain an 'owners' stanza, but EE
# does not currently allow setting the owner ACL so we have to
# remove it even though it has not changed.
del acl['owners']
ee.data.setAssetAcl(args.asset_id, json.dumps(acl))
def _parse_permissions(self, args):
"""Decodes and sanity-checks the permissions in the arguments."""
# A dictionary mapping from user ids to one of 'R', 'W', or 'D'.
permissions = {}
if args.u:
for grant in args.u:
parts = grant.split(':')
if len(parts) != 2 or parts[1] not in ['R', 'W']:
raise ee.EEException('Invalid permission "%s".' % grant)
user, role = parts
if user in permissions:
raise ee.EEException('Multiple permission settings for "%s".' % user)
if user == ALL_USERS and role == 'W':
raise ee.EEException('Cannot grant write permissions to AllUsers.')
permissions[user] = role
if args.d:
for user in args.d:
if user in permissions:
raise ee.EEException('Multiple permission settings for "%s".' % user)
permissions[user] = 'D'
return permissions
def _apply_permissions(self, acl, permissions):
"""Applies the given permission edits to the given acl."""
for user, role in permissions.iteritems():
if user == ALL_USERS:
acl[ALL_USERS_CAN_READ] = (role == 'R')
elif role == 'R':
if user not in acl[READERS]:
acl[READERS].append(user)
if user in acl[WRITERS]:
acl[WRITERS].remove(user)
elif role == 'W':
if user in acl[READERS]:
acl[READERS].remove(user)
if user not in acl[WRITERS]:
acl[WRITERS].append(user)
elif role == 'D':
if user in acl[READERS]:
acl[READERS].remove(user)
if user in acl[WRITERS]:
acl[WRITERS].remove(user)
class AclGetCommand(object):
"""Prints the access control list for an asset."""
name = 'get'
def __init__(self, parser):
parser.add_argument('asset_id', help='ID of the asset.')
def run(self, args, config):
config.ee_init()
acl = ee.data.getAssetAcl(args.asset_id)
_pretty_print_json(acl)
class AclSetCommand(object):
"""Sets the access control list for an asset.
The ACL may be the name of a canned ACL, or it may be the path to a
file containing the output from "acl get". The recognized canned ACL
names are "private", indicating that no users other than the owner
have access, and "public", indicating that all users have read
access. It is currently not possible to modify the owner ACL using
this tool.
"""
name = 'set'
CANNED_ACLS = {
'private': {
READERS: [],
WRITERS: [],
ALL_USERS_CAN_READ: False,
},
'public': {
READERS: [],
WRITERS: [],
ALL_USERS_CAN_READ: True,
},
}
def __init__(self, parser):
parser.add_argument('file_or_acl_name',
help='File path or canned ACL name.')
parser.add_argument('asset_id', help='ID of the asset.')
def run(self, args, config):
"""Sets asset ACL to a canned ACL or one provided in a JSON file."""
config.ee_init()
if args.file_or_acl_name in self.CANNED_ACLS.keys():
acl = self.CANNED_ACLS[args.file_or_acl_name]
else:
acl = json.load(open(args.file_or_acl_name))
# In the expected usage the ACL file will have come from a previous
# invocation of 'acl get', which means it will include an 'owners'
# stanza, but EE does not currently allow setting the owner ACL,
# so we have to remove it.
if 'owners' in acl:
print 'Warning: Not updating the owner ACL.'
del acl['owners']
ee.data.setAssetAcl(args.asset_id, json.dumps(acl))
class AclCommand(Dispatcher):
"""Prints or updates the access control list of the specified asset."""
name = 'acl'
COMMANDS = [
AclChCommand,
AclGetCommand,
AclSetCommand,
]
class AssetInfoCommand(object):
"""Prints metadata and other information about an Earth Engine asset."""
name = 'info'
def __init__(self, parser):
parser.add_argument('asset_id', help='ID of the asset to print.')
def run(self, args, config):
config.ee_init()
info = ee.data.getInfo(args.asset_id)
if info:
_pretty_print_json(info)
else:
raise ee.EEException(
'Asset does not exist or is not accessible: %s' % args.asset_id)
class AssetSetCommand(object):
"""Sets metadata properties of an Earth Engine asset.
Properties may be of type "string", "number", or "date". Dates must
be specified in the form YYYY-MM-DD[Thh:mm:ss[.ff]] in UTC and are
stored as numbers representing the number of milliseconds since the
Unix epoch (00:00:00 UTC on 1 January 1970).
"""
name = 'set'
def __init__(self, parser):
parser.add_argument('asset_id', help='ID of the asset to update.')
_add_property_flags(parser)
def run(self, args, config):
properties = _decode_property_flags(args)
config.ee_init()
if not properties:
raise ee.EEException('No properties specified.')
ee.data.setAssetProperties(args.asset_id, properties)
class AssetCommand(Dispatcher):
"""Prints or updates metadata associated with an Earth Engine asset."""
name = 'asset'
COMMANDS = [
AssetInfoCommand,
AssetSetCommand,
]
class CopyCommand(object):
"""Creates a new Earth Engine asset as a copy of another asset."""
name = 'cp'
def __init__(self, parser):
parser.add_argument(
'source', help='Full path of the source asset.')
parser.add_argument(
'destination', help='Full path of the destination asset.')
def run(self, args, config):
config.ee_init()
ee.data.copyAsset(args.source, args.destination)
class CreateCommandBase(object):
"""Base class for implementing Create subcommands."""
def __init__(self, parser, fragment, asset_type):
parser.add_argument(
'asset_id', nargs='+',
help='Full path of %s to create.' % fragment)
parser.add_argument(
'--parents', '-p', action='store_true',
help='Make parent folders as needed.')
self.asset_type = asset_type
def run(self, args, config):
config.ee_init()
ee.data.create_assets(args.asset_id, self.asset_type, args.parents)
class CreateCollectionCommand(CreateCommandBase):
"""Creates one or more image collections."""
name = 'collection'
def __init__(self, parser):
super(CreateCollectionCommand, self).__init__(
parser, 'an image collection', ee.data.ASSET_TYPE_IMAGE_COLL)
class CreateFolderCommand(CreateCommandBase):
"""Creates one or more folders."""
name = 'folder'
def __init__(self, parser):
super(CreateFolderCommand, self).__init__(
parser, 'a folder', ee.data.ASSET_TYPE_FOLDER)
class CreateCommand(Dispatcher):
"""Creates assets and folders."""
name = 'create'
COMMANDS = [
CreateCollectionCommand,
CreateFolderCommand,
]
class ListCommand(object):
"""Prints the contents of a folder or collection."""
name = 'ls'
def __init__(self, parser):
parser.add_argument(
'asset_id', nargs='*',
help='A folder or image collection to be inspected.')
parser.add_argument(
'-l', action='store_true',
help='Print output in long format.')
parser.add_argument(
'--max_items', '-m', default=-1, type=int,
help='Maximum number of items to list for each collection.')
def run(self, args, config):
config.ee_init()
if not args.asset_id:
roots = ee.data.getAssetRoots()
self._print_assets(roots, '', args.l)
return
assets = args.asset_id
count = 0
for asset in assets:
if count > 0:
print
self._list_asset_content(
asset, args.max_items, len(assets), args.l)
count += 1
def _print_assets(self, assets, indent, long_format):
if not assets:
return
max_type_length = max([len(asset['type']) for asset in assets])
format_str = '%s{:%ds}{:s}' % (indent, max_type_length + 4)
for asset in assets:
if long_format:
# Example output:
# [Image] user/test/my_img
# [ImageCollection] user/test/my_coll
print format_str.format('['+asset['type']+']', asset['id'])
else:
print asset['id']
def _list_asset_content(self, asset, max_items, total_assets, long_format):
try:
list_req = {'id': asset}
if max_items >= 0:
list_req['num'] = max_items
children = ee.data.getList(list_req)
indent = ''
if total_assets > 1:
print '%s:' % asset
indent = ' '
self._print_assets(children, indent, long_format)
except ee.EEException as e:
print e
class MoveCommand(object):
"""Moves or renames an Earth Engine asset."""
name = 'mv'
def __init__(self, parser):
parser.add_argument(
'source', help='Full path of the source asset.')
parser.add_argument(
'destination', help='Full path of the destination asset.')
def run(self, args, config):
config.ee_init()
ee.data.renameAsset(args.source, args.destination)
class RmCommand(object):
"""Deletes the specified assets."""
name = 'rm'
def __init__(self, parser):
parser.add_argument(
'asset_id', nargs='+', help='Full path of an asset to delete.')
parser.add_argument(
'--recursive', '-r', action='store_true',
help='Recursively delete child assets.')
parser.add_argument(
'--dry_run', action='store_true',
help=('Perform a dry run of the delete operation. Does not '
'delete any assets.'))
parser.add_argument(
'--verbose', '-v', action='store_true',
help='Print the progress of the operation to the console.')
def run(self, args, config):
config.ee_init()
for asset in args.asset_id:
self._delete_asset(asset, args.recursive, args.verbose, args.dry_run)
def _delete_asset(self, asset_id, recursive, verbose, dry_run):
"""Attempts to delete the specified asset or asset collection."""
info = ee.data.getInfo(asset_id)
if info is None:
print 'Asset does not exist or is not accessible: %s' % asset_id
return
if recursive:
if info['type'] in (ee.data.ASSET_TYPE_FOLDER,
ee.data.ASSET_TYPE_IMAGE_COLL):
children = ee.data.getList({'id': asset_id})
for child in children:
self._delete_asset(child['id'], True, verbose, dry_run)
if dry_run:
print '[dry-run] Deleting asset: %s' % asset_id
else:
if verbose:
print 'Deleting asset: %s' % asset_id
try:
ee.data.deleteAsset(asset_id)
except ee.EEException as e:
print 'Failed to delete %s. %s' % (asset_id, e)
class TaskCancelCommand(object):
"""Cancels a running task."""
name = 'cancel'
def __init__(self, parser):
parser.add_argument('task_id', nargs='*', help='ID of a task to cancel.')
def run(self, args, config):
config.ee_init()
statuses = ee.data.getTaskStatus(args.task_id)
for status in statuses:
state = status['state']
task_id = status['id']
if state == 'UNKNOWN':
raise ee.EEException('Unknown task id "%s"' % task_id)
elif state == 'READY' or state == 'RUNNING':
print 'Canceling task "%s"' % task_id
ee.data.cancelTask(task_id)
else:
print 'Task "%s" already in state "%s".' % (status['id'], state)
class TaskInfoCommand(object):
"""Prints information about a task."""
name = 'info'
def __init__(self, parser):
parser.add_argument('task_id', nargs='*', help='ID of a task to get.')
def run(self, args, config):
config.ee_init()
for i, status in enumerate(ee.data.getTaskStatus(args.task_id)):
if i:
print
print '%s:' % status['id']
print ' State: %s' % status['state']
if status['state'] == 'UNKNOWN':
continue
print ' Type: %s' % TASK_TYPES.get(status.get('task_type'), 'Unknown')
print ' Description: %s' % status.get('description')
print ' Created: %s' % self._format_time(status['creation_timestamp_ms'])
if 'start_timestamp_ms' in status:
print ' Started: %s' % self._format_time(status['start_timestamp_ms'])
if 'update_timestamp_ms' in status:
print ' Updated: %s' % self._format_time(status['update_timestamp_ms'])
if 'error_message' in status:
print ' Error: %s' % status['error_message']
def _format_time(self, millis):
return datetime.datetime.fromtimestamp(millis / 1000)
class TaskListCommand(object):
"""Lists the tasks submitted recently."""
name = 'list'
def __init__(self, unused_parser):
pass
def run(self, unused_args, config):
config.ee_init()
tasks = ee.data.getTaskList()
descs = [utils.truncate(task.get('description', ''), 40) for task in tasks]
desc_length = max(len(word) for word in descs)
format_str = '{:25s} {:13s} {:%ds} {:10s} {:s}' % (desc_length + 1)
for task in tasks:
truncated_desc = utils.truncate(task.get('description', ''), 40)
task_type = TASK_TYPES.get(task['task_type'], 'Unknown')
print format_str.format(
task['id'], task_type, truncated_desc,
task['state'], task.get('error_message', '---'))
class TaskCommand(Dispatcher):
"""Prints information about or manages long-running tasks."""
name = 'task'
COMMANDS = [
TaskCancelCommand,
TaskInfoCommand,
TaskListCommand,
]
# TODO(user): in both upload tasks, check if the parent namespace
# exists and is writeable first.
class UploadImageCommand(object):
"""Uploads an image from Cloud Storage to Earth Engine.
See docs for "asset set" for additional details on how to specify asset
metadata properties.
"""
name = 'image'
def __init__(self, parser):
_add_wait_arg(parser)
parser.add_argument(
'src_files',
help=('Cloud Storage URL(s) of the file(s) to upload. '
'Must have the prefix \'gs://\'.'),
nargs='+')
parser.add_argument(
'--asset_id',
help='Destination asset ID for the uploaded file.')
parser.add_argument(
'--last_band_alpha',
help='Use the last band as a masking channel for all bands. '
'Mutually exclusive with nodata_value.',
action='store_true')
parser.add_argument(
'--nodata_value',
help='Value for missing data. '
'Mutually exclusive with last_band_alpha.',
type=_comma_separated_numbers)
parser.add_argument(
'--pyramiding_policy',
help='The pyramid reduction policy to use',
type=_comma_separated_pyramiding_policies)
_add_property_flags(parser)
# TODO(user): add --bands arg
def run(self, args, config):
"""Starts the upload task, and waits for completion if requested."""
_check_valid_files(args.src_files)
if args.last_band_alpha and args.nodata_value:
raise ValueError(
'last_band_alpha and nodata_value are mutually exclusive.')
properties = _decode_property_flags(args)
request = {
'id': args.asset_id,
'properties': properties
}
sources = [{'primaryPath': source} for source in args.src_files]
tileset = {'sources': sources}
if args.last_band_alpha:
tileset['bandMappings'] = [{'fileBandIndex': -1, 'maskForAllBands': True}]
request['tilesets'] = [tileset]
if args.pyramiding_policy:
if len(args.pyramiding_policy) == 1:
request['pyramidingPolicy'] = args.pyramiding_policy[0].upper()
else:
bands = []
for index, policy in enumerate(args.pyramiding_policy):
bands.append({'id': index, 'pyramidingPolicy': policy.upper()})
request['bands'] = bands
if args.nodata_value:
if len(args.nodata_value) == 1:
request['missingData'] = {'value': args.nodata_value[0]}
else:
if 'bands' in request:
if len(request['bands']) != len(args.nodata_value):
raise ValueError('Inconsistent number of bands: {} and {}'
.format(args.pyramiding_policy, args.nodata_value))
else:
request['bands'] = []
bands = request['bands']
for index, nodata in enumerate(args.nodata_value):
if index < len(bands):
bands[index]['missingData'] = {'value': nodata}
else:
bands.append({'id': index, 'missingData': {'value': nodata}})
_upload(args, config, request)
class UploadCommand(Dispatcher):
"""Uploads assets to Earth Engine."""
name = 'upload'
COMMANDS = [
UploadImageCommand,
]