ee_utils module
EEUtils
¶
This class provides utility functions to handle Earth Engine API information and make authenticated requests.
Source code in aces/ee_utils.py
class EEUtils:
"""
EEUtils: Earth Engine Utility Class
This class provides utility functions to handle Earth Engine API information and make authenticated requests.
"""
@staticmethod
def get_credentials_by_service_account_key(key):
"""
Helper function to retrieve credentials using a service account key.
Parameters:
key (str): The path to the service account key JSON file.
Returns:
ee.ServiceAccountCredentials: The authenticated credentials.
"""
import json
service_account = json.load(open(key))
credentials = ee.ServiceAccountCredentials(service_account["client_email"], key)
return credentials
@staticmethod
def initialize_session(use_highvolume : bool = False, key : Union[str, None] = None, project: str = None):
"""
Initialize the Earth Engine session.
If use_highvolume is True, the high-volume Earth Engine API will be used.
If a project is provided, the session will be initialized with the project ID. Recommended to use project.
If a key is provided, the service account key will be used.
Parameters:
use_highvolume (bool): Whether to use the high-volume Earth Engine API.
key (str or None): The path to the service account key JSON file. If None, the default credentials will be used.
project (str): The Google Cloud project ID to use for the session.
"""
if key is None:
if use_highvolume and project:
ee.Initialize(opt_url="https://earthengine-highvolume.googleapis.com", project=project)
elif use_highvolume:
ee.Initialize(opt_url="https://earthengine-highvolume.googleapis.com")
elif project:
ee.Initialize(project=project)
else:
ee.Initialize()
else:
credentials = EEUtils.get_credentials_by_service_account_key(key)
if use_highvolume and project:
ee.Initialize(credentials, opt_url="https://earthengine-highvolume.googleapis.com", project=project)
elif use_highvolume:
ee.Initialize(credentials, opt_url="https://earthengine-highvolume.googleapis.com")
elif project:
ee.Initialize(credentials, project=project)
else:
ee.Initialize(credentials)
@staticmethod
def calculate_avg_min_max_statistics(image: ee.Image, geometry: ee.FeatureCollection, scale: int = 30) -> ee.Dictionary:
"""
Calculate min and max of an image over a specific region.
Parameters:
image (ee.Image): The image to calculate statistics on.
geometry (ee.FeatureCollection): The region to calculate statistics over.
scale (int, optional): The scale, in meters, of the projection to compute statistics in. Default is 30.
Returns:
ee.Dictionary: A dictionary containing the min and max of the image.
"""
reducers = ee.Reducer.mean() \
.combine(reducer2=ee.Reducer.min(), sharedInputs=True) \
.combine(reducer2=ee.Reducer.max(), sharedInputs=True)
stats = image.reduceRegion(
reducer=reducers,
geometry=geometry,
scale=scale,
maxPixels=1E13
)
return stats
@staticmethod
def export_collection_data(collection: ee.FeatureCollection, export_type: Union[list, str]="cloud", start_training=True, **params) -> None:
if isinstance(export_type, str):
export_type = [export_type]
for _type in export_type:
if _type == "cloud":
EEUtils._export_collection_to_cloud_storage(collection, start_training, **params)
if _type == "asset":
EEUtils._export_collection_to_asset(collection, start_training, **params)
if _type == "drive":
EEUtils._export_collection_to_drive(collection, start_training, **params)
if _type not in ["cloud", "asset", "drive"]:
raise NotImplementedError(f"Currently supported export types are: {', '.join(_type)}")
@staticmethod
def _export_collection_to_drive(collection, start_training, **kwargs) -> None:
print("Exporting training data to Google Drive..")
training_task = ee.batch.Export.table.toDrive(
collection=collection,
description=kwargs.get("description", "myExportTableTask"),
folder=kwargs.get("folder", "myFolder"),
fileNamePrefix=kwargs.get("file_prefix", "myExportTableTask"),
fileFormat=kwargs.get("file_format", "CSV"),
selectors=kwargs.get("selectors", collection.first().propertyNames().getInfo()),
)
if start_training: training_task.start()
@staticmethod
def _export_collection_to_asset(collection, start_training, **kwargs) -> None:
asset_id = kwargs.get("asset_id", "myAssetId")
print(f"Exporting training data to {asset_id}..")
training_task = ee.batch.Export.table.toAsset(
collection=collection,
description=kwargs.get("description", "myExportTableTask"),
assetId=asset_id,
selectors=kwargs.get("selectors", collection.first().propertyNames().getInfo()),
)
if start_training: training_task.start()
@staticmethod
def _export_collection_to_cloud_storage(collection, start_training, **kwargs) -> None:
description = kwargs.get("description", "myExportTableTask")
bucket = kwargs.get("bucket", "myBucket")
file_prefix = kwargs.get("file_prefix") if kwargs.get("file_prefix") is not None else description
print(f"Exporting training data to gs://{bucket}/{file_prefix}..")
training_task = ee.batch.Export.table.toCloudStorage(
collection=collection,
description=description,
fileNamePrefix=file_prefix,
bucket=bucket,
fileFormat=kwargs.get("file_format", "TFRecord"),
selectors=kwargs.get("selectors", collection.first().propertyNames().getInfo()),
)
if start_training: training_task.start()
@staticmethod
def beam_export_collection_to_cloud_storage(collection_index, start_training, **kwargs) -> None:
from aces.ee_utils import EEUtils
import ee
EEUtils.initialize_session(use_highvolume=True)
collection = ee.FeatureCollection(collection_index[0])
index = collection_index[1]
description = kwargs.get("description", "myExportTableTask")
bucket = kwargs.get("bucket", "myBucket")
file_prefix = kwargs.get("file_prefix") if kwargs.get("file_prefix") is not None else description
file_prefix = f"{file_prefix}_{index}"
print(f"Exporting training data to gs://{bucket}/{file_prefix}..")
training_task = ee.batch.Export.table.toCloudStorage(
collection=collection,
description=f"{description}__index_{index}",
fileNamePrefix=file_prefix,
bucket=bucket,
fileFormat=kwargs.get("file_format", "TFRecord"),
selectors=kwargs.get("selectors", collection.first().propertyNames().getInfo()),
)
if start_training: training_task.start()
@staticmethod
def export_image(image: ee.Image, export_type: str="asset", start_training=True, **params) -> None:
if isinstance(export_type, str):
export_type = [export_type]
for _type in export_type:
if _type == "cloud":
EEUtils._export_image_to_cloud_storage(image, start_training, **params)
if _type == "asset":
EEUtils._export_image_to_asset(image, start_training, **params)
if _type not in ["cloud", "asset"]:
raise NotImplementedError(f"Currently supported export types are: {', '.join(_type)}")
@staticmethod
def _export_image_to_cloud_storage(image, start_training, **kwargs) -> None:
description = kwargs.get("description", "myExportImageTask")
bucket = kwargs.get("bucket", "myBucket")
file_name_prefix = kwargs.get("file_name_prefix") if kwargs.get("file_name_prefix") is not None else description
region = kwargs.get("region", None)
if region is not None:
if isinstance(region, ee.FeatureCollection):
region = region.geometry()
elif isinstance(region, ee.Geometry):
region = region
else:
raise ValueError(f"region must be an ee.FeatureCollection or ee.Geometry object. Found {type(region)}")
print(f"Exporting training data to gs://{bucket}/{file_name_prefix}..")
params = {
"image": image,
"description": description,
"fileNamePrefix": file_name_prefix,
"bucket": kwargs.get("bucket", "myBucket"),
"fileFormat": kwargs.get("file_format", "GeoTIFF"),
"formatOptions": kwargs.get("format_options", None),
"region": region,
"scale": kwargs.get("scale", 1000),
"maxPixels": kwargs.get("max_pixels", 1e10),
}
keys = Utils.convert_camel_to_snake(list(params.keys()))
keys.remove("image")
for key in list(kwargs.keys()):
if key not in keys:
warnings.warn(f"Parameter {key} not found in kwargs. Double check your parameter name (camelCase vs snake_case)")
not_none_params = {k:v for k, v in params.items() if v is not None}
training_task = ee.batch.Export.image.toCloudStorage(**not_none_params)
if start_training: training_task.start()
@staticmethod
def _export_image_to_asset(image, start_training, **kwargs) -> None:
asset_id = kwargs.get("asset_id", "")
print(f"Exporting image to {asset_id}..")
training_task = ee.batch.Export.image.toAsset(
image=image,
description=kwargs.get("description", "myExportImageTask"),
assetId=asset_id,
region=kwargs.get("region", None),
scale=kwargs.get("scale", 30),
maxPixels=kwargs.get("max_pixels", 1E13),
)
if start_training: training_task.start()
@staticmethod
def country_bbox(country_name, max_error=100):
"""Function to get a bounding box geometry of a country
args:
country_name (str): US-recognized country name
max_error (float,optional): The maximum amount of error tolerated when
performing any necessary reprojection. default = 100
returns:
ee.Geometry: geometry of country bounding box
"""
all_countries = ee.FeatureCollection("USDOS/LSIB_SIMPLE/2017")
return all_countries.filter(ee.Filter.eq("country_na", country_name))\
.geometry(max_error).bounds(max_error)
@staticmethod
def get_image_collection_statistics(image_collection: ee.ImageCollection) -> ee.Image:
reducers = ee.Reducer.mean() \
.combine(reducer2=ee.Reducer.min(), sharedInputs=True) \
.combine(reducer2=ee.Reducer.max(), sharedInputs=True) \
.combine(reducer2=ee.Reducer.stdDev(), sharedInputs=True) \
.combine(reducer2=ee.Reducer.percentile([25, 50, 75], ["Q1", "Q2", "Q3"]), sharedInputs=True)
reducer = image_collection.reduce(reducer=reducers)
return reducer.float()
@staticmethod
def calculate_planet_indices(image: ee.Image) -> ee.Image:
ndvi = image.normalizedDifference(["N", "R"]).rename("NDVI")
evi = image.expression (
"2.5 * ((NIR - RED) / (NIR + 6 * RED - 7.5 * BLUE + 1))", {
"NIR": image.select("N"),
"RED": image.select("R"),
"BLUE": image.select("B")
}).rename("EVI")
ndwi = image.normalizedDifference(["G", "N"]).rename("NDWI")
savi = image.expression("((NIR - RED) / (NIR + RED + 0.5))*(1.5)", {
"NIR": image.select("N"),
"RED": image.select("R")
}).rename("SAVI")
msavi2 = image.expression("(( (2*NIR + 1) - sqrt( ((2*NIR + 1) * (2*NIR + 1)) - 8 * (NIR - R) ) )) / 2", {
"NIR": image.select("N"),
"R": image.select("R")
}).rename("MSAVI2")
mtvi2 = image.expression("( 1.5*(1.2*(NIR - GREEN) - 2.5*(RED - GREEN)) ) / ( sqrt( ((2*NIR + 1) * (2*NIR + 1)) - (6*NIR - 5*sqrt(RED)) - 0.5 ) )", {
"NIR": image.select("N"),
"RED": image.select("R"),
"GREEN": image.select("G"),
}).rename("MTVI2")
vari = image.expression("(GREEN - RED) / (GREEN + RED - BLUE)", {
"GREEN": image.select("G"),
"RED": image.select("R"),
"BLUE": image.select("B"),
}).rename("VARI")
tgi = image.expression("( (120*(RED - BLUE)) - (190*(RED - GREEN)) ) / 2", {
"GREEN": image.select("G"),
"RED": image.select("R"),
"BLUE": image.select("B"),
}).rename("TGI")
return ndvi.addBands([evi, ndwi, savi, msavi2, mtvi2, vari, tgi]).float()
@staticmethod
def calculate_evi(image: ee.Image) -> ee.Image:
evi = image.expression (
"2.5 * ((NIR - RED) / (NIR + 6 * RED - 7.5 * BLUE + 1))", {
"NIR": image.select("nir"),
"RED": image.select("red"),
"BLUE": image.select("blue")
}).rename("EVI")
return evi.float()
@staticmethod
def calculate_s1_indices(image: ee.Image) -> ee.Image:
VV = image.select("VV").rename("vv")
VH = image.select("VH").rename("vh")
ratio = VV.divide(VH).rename("s1_ratio")
ndratio = VV.subtract(VH).divide(VV.add(VH)).rename("s1_ndratio")
return VV.addBands([VH, ratio, ndratio]).float()
@staticmethod
def generate_stratified_samples(image: ee.Image, region: ee.Geometry, numPoints: int = 500, classBand: str = None, scale: int=30, **kwargs) -> ee.FeatureCollection:
# Add a latitude and longitude band.
return image.addBands(ee.Image.pixelLonLat()).stratifiedSample(
numPoints=numPoints,
classBand=classBand if classBand else "label",
scale=scale,
region=region,
seed=kwargs.get("seed", Config.SEED),
classValues=kwargs.get("class_values", None),
classPoints=kwargs.get("class_points", None),# [2000, 600, 600, 600]
).map(lambda f: f.setGeometry(ee.Geometry.Point([f.get("longitude"), f.get("latitude")])))
@staticmethod
def sample_image_by_collection(image: ee.Image, collection: ee.FeatureCollection, **kwargs: dict) -> ee.FeatureCollection:
samples = image.sampleRegions(
collection=collection,
properties=kwargs.get("properties", collection.first().propertyNames().getInfo()),
scale=kwargs.get("scale", None),
geometries=kwargs.get("geometries", False),
tileScale=kwargs.get("tile_scale", 1),
)
return samples
@staticmethod
def sample_image(image: ee.Image, region: ee.FeatureCollection, **kwargs: dict) -> ee.FeatureCollection:
sample = image.sample(region=region,
scale=kwargs.get("SCALE") or kwargs.get("scale"),
seed=kwargs.get("SEED") or kwargs.get("seed"),
geometries=kwargs.get("geometries", False))
return sample
@staticmethod
def beam_yield_sample_points_with_index(index, sample_locations: ee.List, use_service_account: bool = False) -> List:
from aces.ee_utils import EEUtils
from aces.config import Config
import ee
EEUtils.initialize_session(use_highvolume=True, key=Config.EE_SERVICE_CREDENTIALS if use_service_account else None)
print(f"Yielding Index: {index} of {sample_locations.size().getInfo() - 1}")
point = ee.Feature(sample_locations.get(index)).geometry().getInfo()
return point["coordinates"], index
@staticmethod
def beam_yield_sample_points(index, sample_locations: ee.List, use_service_account: bool = False) -> List:
from aces.ee_utils import EEUtils
from aces.config import Config
import ee
EEUtils.initialize_session(use_highvolume=True, key=Config.EE_SERVICE_CREDENTIALS if use_service_account else None)
print(f"Yielding Index: {index} of {sample_locations.size().getInfo() - 1}")
point = ee.Feature(sample_locations.get(index)).geometry().getInfo()
return point["coordinates"]
@staticmethod
def beam_sample_neighbourhood(coords_index, image, config: Union[Config, str] = "config.env", use_service_account: bool = False):
from aces.ee_utils import EEUtils
from aces.config import Config
import ee
if isinstance(config, str):
config = Config(config)
elif isinstance(config, Config):
config = config
else:
raise ValueError("config must be of type Config or str")
EEUtils.initialize_session(use_highvolume=True, key=config.EE_SERVICE_CREDENTIALS if use_service_account else None)
coords = coords_index[0]
index = coords_index[1]
def get_kernel(kernel_size) -> ee.Kernel:
eelist = ee.List.repeat(1, kernel_size)
lists = ee.List.repeat(eelist, kernel_size)
kernel = ee.Kernel.fixed(kernel_size, kernel_size, lists)
return kernel
def create_neighborhood(kernel) -> ee.Image:
return image.neighborhoodToArray(kernel)
def sample_data(image, points) -> ee.FeatureCollection:
return image.sample(
region=points,
scale=config.SCALE,
tileScale=16,
geometries=False
)
image_kernel = get_kernel(config.PATCH_SHAPE_SINGLE)
neighborhood = create_neighborhood(image_kernel)
training_data = sample_data(neighborhood, ee.Geometry.Point(coords))
return training_data, index
@staticmethod
def beam_get_training_patches(coords: List[float], image: ee.Image, bands: List[str] = [],
scale: int = 5, patch_size: int = 128, use_service_account: bool = False) -> np.ndarray:
"""Get a training patch centered on the coordinates."""
from aces.ee_utils import EEUtils
from aces.config import Config
import ee
EEUtils.initialize_session(use_highvolume=True, key=Config.EE_SERVICE_CREDENTIALS if use_service_account else None)
from google.api_core import exceptions, retry
import requests
import numpy as np
from typing import List
import io
# @retry.Retry(timeout=10*60) # seconds
@retry.Retry(deadline=10*60) # seconds
def get_patch(image: ee.Image, region: ee.Geometry, bands: List[str], patch_size: int) -> np.ndarray:
"""Get the patch of pixels in the geometry as a Numpy array."""
# Create the URL to download the band values of the patch of pixels.
url = image.getDownloadURL({
"region": region,
"dimensions": [patch_size, patch_size],
"format": "NPY",
"bands": bands,
})
# Download the pixel data. If we get "429: Too Many Requests" errors,
# it"s safe to retry the request.
response = requests.get(url)
if response.status_code == 429:
# The retry.Retry library only works with `google.api_core` exceptions.
raise exceptions.TooManyRequests(response.text)
if response.status_code == 503:
raise exceptions.ServiceUnavailable(response.text)
# Still raise any other exceptions to make sure we got valid data.
response.raise_for_status()
# Load the NumPy file data and return it as a NumPy array.
return np.load(io.BytesIO(response.content), allow_pickle=True)
# @retry.Retry(timeout=10*60) # seconds
@retry.Retry(deadline=10*60) # seconds
def compute_pixel(image: ee.Image, region: ee.Geometry, bands: List[str], patch_size: int, scale_x: float, scale_y: float) -> np.ndarray:
"""Get the patch of pixels in the geometry as a Numpy array."""
# Make a request object.
request = {
"expression": image,
"fileFormat": "NPY",
"bandIds": bands,
"grid": {
"dimensions": {
"width": patch_size,
"height": patch_size
},
"affineTransform": {
"scaleX": scale_x,
"shearX": 0,
"translateX": coords[0],
"shearY": 0,
"scaleY": scale_y,
"translateY": coords[1]
},
"crsCode": "EPSG:4326",
},
}
response = ee.data.computePixels(request)
# Load the NumPy file data and return it as a NumPy array.
return np.load(io.BytesIO(response.content), allow_pickle=True)
point = ee.Geometry.Point(coords)
region = point.buffer(scale * patch_size / 2, 1).bounds(1)
return get_patch(image, region, bands, patch_size)
beam_get_training_patches(coords, image, bands=[], scale=5, patch_size=128, use_service_account=False)
staticmethod
¶
Get a training patch centered on the coordinates.
Source code in aces/ee_utils.py
@staticmethod
def beam_get_training_patches(coords: List[float], image: ee.Image, bands: List[str] = [],
scale: int = 5, patch_size: int = 128, use_service_account: bool = False) -> np.ndarray:
"""Get a training patch centered on the coordinates."""
from aces.ee_utils import EEUtils
from aces.config import Config
import ee
EEUtils.initialize_session(use_highvolume=True, key=Config.EE_SERVICE_CREDENTIALS if use_service_account else None)
from google.api_core import exceptions, retry
import requests
import numpy as np
from typing import List
import io
# @retry.Retry(timeout=10*60) # seconds
@retry.Retry(deadline=10*60) # seconds
def get_patch(image: ee.Image, region: ee.Geometry, bands: List[str], patch_size: int) -> np.ndarray:
"""Get the patch of pixels in the geometry as a Numpy array."""
# Create the URL to download the band values of the patch of pixels.
url = image.getDownloadURL({
"region": region,
"dimensions": [patch_size, patch_size],
"format": "NPY",
"bands": bands,
})
# Download the pixel data. If we get "429: Too Many Requests" errors,
# it"s safe to retry the request.
response = requests.get(url)
if response.status_code == 429:
# The retry.Retry library only works with `google.api_core` exceptions.
raise exceptions.TooManyRequests(response.text)
if response.status_code == 503:
raise exceptions.ServiceUnavailable(response.text)
# Still raise any other exceptions to make sure we got valid data.
response.raise_for_status()
# Load the NumPy file data and return it as a NumPy array.
return np.load(io.BytesIO(response.content), allow_pickle=True)
# @retry.Retry(timeout=10*60) # seconds
@retry.Retry(deadline=10*60) # seconds
def compute_pixel(image: ee.Image, region: ee.Geometry, bands: List[str], patch_size: int, scale_x: float, scale_y: float) -> np.ndarray:
"""Get the patch of pixels in the geometry as a Numpy array."""
# Make a request object.
request = {
"expression": image,
"fileFormat": "NPY",
"bandIds": bands,
"grid": {
"dimensions": {
"width": patch_size,
"height": patch_size
},
"affineTransform": {
"scaleX": scale_x,
"shearX": 0,
"translateX": coords[0],
"shearY": 0,
"scaleY": scale_y,
"translateY": coords[1]
},
"crsCode": "EPSG:4326",
},
}
response = ee.data.computePixels(request)
# Load the NumPy file data and return it as a NumPy array.
return np.load(io.BytesIO(response.content), allow_pickle=True)
point = ee.Geometry.Point(coords)
region = point.buffer(scale * patch_size / 2, 1).bounds(1)
return get_patch(image, region, bands, patch_size)
calculate_avg_min_max_statistics(image, geometry, scale=30)
staticmethod
¶
Calculate min and max of an image over a specific region.
image (ee.Image): The image to calculate statistics on. geometry (ee.FeatureCollection): The region to calculate statistics over. scale (int, optional): The scale, in meters, of the projection to compute statistics in. Default is 30.
ee.Dictionary: A dictionary containing the min and max of the image.
Source code in aces/ee_utils.py
@staticmethod
def calculate_avg_min_max_statistics(image: ee.Image, geometry: ee.FeatureCollection, scale: int = 30) -> ee.Dictionary:
"""
Calculate min and max of an image over a specific region.
Parameters:
image (ee.Image): The image to calculate statistics on.
geometry (ee.FeatureCollection): The region to calculate statistics over.
scale (int, optional): The scale, in meters, of the projection to compute statistics in. Default is 30.
Returns:
ee.Dictionary: A dictionary containing the min and max of the image.
"""
reducers = ee.Reducer.mean() \
.combine(reducer2=ee.Reducer.min(), sharedInputs=True) \
.combine(reducer2=ee.Reducer.max(), sharedInputs=True)
stats = image.reduceRegion(
reducer=reducers,
geometry=geometry,
scale=scale,
maxPixels=1E13
)
return stats
country_bbox(country_name, max_error=100)
staticmethod
¶
Function to get a bounding box geometry of a country
Parameters:
Name | Type | Description | Default |
---|---|---|---|
country_name |
str |
US-recognized country name |
required |
max_error |
float,optional |
The maximum amount of error tolerated when performing any necessary reprojection. default = 100 |
100 |
Returns:
Type | Description |
---|---|
ee.Geometry |
geometry of country bounding box |
Source code in aces/ee_utils.py
@staticmethod
def country_bbox(country_name, max_error=100):
"""Function to get a bounding box geometry of a country
args:
country_name (str): US-recognized country name
max_error (float,optional): The maximum amount of error tolerated when
performing any necessary reprojection. default = 100
returns:
ee.Geometry: geometry of country bounding box
"""
all_countries = ee.FeatureCollection("USDOS/LSIB_SIMPLE/2017")
return all_countries.filter(ee.Filter.eq("country_na", country_name))\
.geometry(max_error).bounds(max_error)
get_credentials_by_service_account_key(key)
staticmethod
¶
Helper function to retrieve credentials using a service account key.
key (str): The path to the service account key JSON file.
ee.ServiceAccountCredentials: The authenticated credentials.
Source code in aces/ee_utils.py
@staticmethod
def get_credentials_by_service_account_key(key):
"""
Helper function to retrieve credentials using a service account key.
Parameters:
key (str): The path to the service account key JSON file.
Returns:
ee.ServiceAccountCredentials: The authenticated credentials.
"""
import json
service_account = json.load(open(key))
credentials = ee.ServiceAccountCredentials(service_account["client_email"], key)
return credentials
initialize_session(use_highvolume=False, key=None, project=None)
staticmethod
¶
Initialize the Earth Engine session. If use_highvolume is True, the high-volume Earth Engine API will be used. If a project is provided, the session will be initialized with the project ID. Recommended to use project. If a key is provided, the service account key will be used.
use_highvolume (bool): Whether to use the high-volume Earth Engine API. key (str or None): The path to the service account key JSON file. If None, the default credentials will be used. project (str): The Google Cloud project ID to use for the session.
Source code in aces/ee_utils.py
@staticmethod
def initialize_session(use_highvolume : bool = False, key : Union[str, None] = None, project: str = None):
"""
Initialize the Earth Engine session.
If use_highvolume is True, the high-volume Earth Engine API will be used.
If a project is provided, the session will be initialized with the project ID. Recommended to use project.
If a key is provided, the service account key will be used.
Parameters:
use_highvolume (bool): Whether to use the high-volume Earth Engine API.
key (str or None): The path to the service account key JSON file. If None, the default credentials will be used.
project (str): The Google Cloud project ID to use for the session.
"""
if key is None:
if use_highvolume and project:
ee.Initialize(opt_url="https://earthengine-highvolume.googleapis.com", project=project)
elif use_highvolume:
ee.Initialize(opt_url="https://earthengine-highvolume.googleapis.com")
elif project:
ee.Initialize(project=project)
else:
ee.Initialize()
else:
credentials = EEUtils.get_credentials_by_service_account_key(key)
if use_highvolume and project:
ee.Initialize(credentials, opt_url="https://earthengine-highvolume.googleapis.com", project=project)
elif use_highvolume:
ee.Initialize(credentials, opt_url="https://earthengine-highvolume.googleapis.com")
elif project:
ee.Initialize(credentials, project=project)
else:
ee.Initialize(credentials)