Discovering and processing Earth Observation Data with the EO-MQS¶
The EO-MQS service is hosted within the C-SCALE federated cloud infrastructure and provides a unified way of discovering Copernicus data available within the federation by making use of the SpatioTemporal Asset Catalog (STAC) specification. The purpose of this notebook is to provide a concise introduction on how to use open-source Python libraries to search for geospatial data exposed by the EO-MQS STAC API.
Prerequisites¶
In this example, we are going to make use of a popular STAC client for Python, the pystac-client. The library can be manually installed anywhere else via pip install pystac-client.
Alternatively, common Python libraries like the requests library which support working with HTTP APIs are of course also well suited.
To get started, we need to import the Client class to connect to the EO-MQS which exposes its STAC API under https://mqs.eodc.eu/stac/v1.
We need to import Python libraries and some useful functions as well.
pip install pystac-client==0.6.0
from pystac_client import Client
client = Client.open("https://mqs.eodc.eu/stac/v1")
client.title
CollectionClient¶
The client can be used to iterate through the Collections available in the EO-MQS Catalog.
The get_collections method fetches the collections from the /collections endpoint and returns an iterable. To load a particular collection for further use we call the get_collection(<collection_id>) method below.
for collection in client.get_collections():
    print(collection)
On static as well as dynamic catalogues we can also make use of the links attributes which lets us quickly explore, for instance, the number of available collections.
child_links = client.get_links('child')
print(f"The EO-MQS currently features {len(child_links)} collections.")
We can also check the details of particular collection.
collection = client.get_collection("CollGS_CZ|sentinel-2-l1c-2023")
#collection = client.get_collection("EODC|sentinel-2-l1c")
collection
We can use of some useful ways how to access collection metadata programmatically.
print(f"This collection contains data in the following temporal inteval: {collection.extent.temporal.to_dict()}")
STAC Items¶
Data providers that have realized their STAC implementation in terms of a dynamic STAC API offer users the opportunity to search their Catalogs using spatial and temporal constraints. The pystac_client enables this search via the class method search. This function returns an ItemSearch instance that can further be accessed to retrieve matched items.
Note that in its current implementation, the EO-MQS supports the core STAC search endpoint parameters as described in the STAC API - Item Search specification. Those are:
- limit
- bbox
- datetime
- intersects
- ids
- collections
Visualize Sentinel-2 data over EU¶
We can use the geojson file created using geojson.io.
from pystac_client import Client
import pandas as pd
import geopandas as gpd
from cscale_notebooks_functions import geojson_map_eu
import os
client = Client.open("https://mqs.eodc.eu/stac/v1")
# This variable helps to use notebook in different environments.
path = os.path.abspath("")
# Define the absolute path for general usgae
path = os.path.abspath("")
# Using geojson file for EU region
geojson_file = path + "/geojson/eu_wgs84.geojson"
aoi_eu_wgs84 = geojson_map_eu(geojson_file)
# Get bbox coordinates for EU region to accelerate serach
bbox = aoi_eu_wgs84.bounds.values.tolist()[0]
print(bbox)
Choosing the time interval and limit for the search.
time_period = "2023-01-01/.."
limit = 50
As before, we can use the collection client instance to iterate over the items contained in the collection. The server must provide the /collections/<collection_id>/items endpoint to support this feature automatically. This can be useful to manually filter items or extract information programmatically. The get_all_items() method again returns an iterator.
s2_collections = []
for collection in client.get_collections():
    if "l1c" in collection.id.lower():
        print(f"Append collection {collection.id} to list of Sentinel-2 L1C collections.")
        s2_collections.append(collection.id)
We can check time interval of each collection.
# using format of collection in order to define new varibale with proper format.
for i in range(0,len(s2_collections)):
    collections_item = client.get_collection(s2_collections[i])
    print(f"{collections_item.id} collection contains data in the following temporal inteval: {collections_item.extent.temporal.to_dict()}")
We can search for the collections that contain satellite images of EU region.
from IPython.display import HTML
empty_df =  pd.DataFrame(columns=['Collections'])
empty_df['geometry'] = gpd.GeoSeries([], dtype='geometry')
empty_gdf = gpd.GeoDataFrame(empty_df, geometry='geometry')
concatenated_gdf = empty_gdf
for collection in s2_collections:
    try:
        results = client.search(
            collections=[collection],
            datetime=time_period,
            bbox=bbox,
            limit=limit,
            method="GET"
        )
        gdf_temp = gpd.GeoDataFrame.from_features(results.item_collection())
        
        error_message = f"<font color='green'>Success:</font> Found items in collection id {collection}."
        display(HTML(error_message))
        #print(f"Successful search for items in collection id {collection} .")
        
        gdf_temp['Collections'] = collection
        gdf_new = gdf_temp[['geometry','Collections']]
        concatenated_gdf = pd.concat([concatenated_gdf, gdf_new], ignore_index=True)
        # Add a 'collection' column to the GeoDataFrame
        #gdfs_by_collection[collection] = gdf_temp
    except Exception as e:
        error_message = f"<font color='red'>Failed:</font> items not found in collection id {collection} or failed. Error: {e}"
        display(HTML(error_message))
        #print(f"Failed search for items in collection id {collection} or no items found. Error: {e}")
import plotly.express as px
geo_df = concatenated_gdf
fig = px.choropleth_mapbox(geo_df,
                   geojson=geo_df.geometry,
                   locations=geo_df.index,
                   color="Collections",
                   )
fig.update_layout(
    mapbox=dict(
        center=dict(lat=50, lon=10),  # Set the center to EU coordinates
        style='carto-positron',
        zoom=2.5, # Adjust the zoom level
    ),
    height=800,  # Set the height of the figure
    width=1200,  # Set the width of the figure
)
fig.update_traces(marker_opacity=0.3)
fig.show()
Item Search¶
Search for Sentinel-2 data intersecting a GeoJSON object¶
This first example makes use of the intersects and the collections parameters. Note that you cannot specify both bbox and intersects, this will result in an error.
import geopandas as gpd
import os
from pystac_client import Client
client = Client.open("https://mqs.eodc.eu/stac/v1")
path = os.path.abspath("")
time_period = "2023-01-01/2023-12-31"
limit = 100
For this example we can use region around the city of Utrecht.
import json
with open(path + '/geojson/utrecht_map.geojson') as f:
    geo = json.load(f)
geom = geo['features'][0]['geometry']
geom
results_VITO = client.search(collections = ["VITO|urn:eop:VITO:CGS_S2_L1C"],
                        intersects = geom,
                        datetime = time_period,
                        limit = limit,
                        method = "POST")
items_VITO = results_VITO.item_collection()
print(f"We found {len(items_VITO)} matching items.")
We can have a look on one item that has been found in the search.
import ipywidgets as widgets
df = gpd.GeoDataFrame.from_features(items_VITO)
p = widgets.Dropdown(
    options= df.datetime,
    description='Date:',
)
display(p)
We can download a preview image of the satellite image.
import requests
import rasterio
import rasterio.plot as plot
from rasterio.plot import show
url=items_VITO[df.datetime.index[df.datetime.tolist().index(p.value)]].assets['QUICKLOOK'].href
quick_look = requests.get(url, stream=True)
with open(path + '/quick.jp2', 'wb') as f:
    f.write(quick_look.content)
quick_tci = rasterio.open(path + '/quick.jp2', driver='JP2OpenJPEG');
show(quick_tci)
NOTE: You can always visualize STAC data (collections, items, etc.) in external tools like the STAC Browser, for instance do the following:
print(f"Look at this item in the STAC Browser:https://radiantearth.github.io/stac-browser/#/external/{items_VITO[df.datetime.index[df.datetime.tolist().index(p.value)]].get_self_href()}")
## If the item provides a preview image we can look at it in here using the following code
#from IPython.display import Image
#Image(url=url, width=500)
Search for Sentinel-2 data using bbox¶
The second example makes use of the bbox, datetime and the collections parameters. Learn about the correct formatting of these values on the STAC Spec GitHub page or by looking at the pystac-client docs.
This time we will convert geojson file into bbox coordinates.
import geopandas as gpd
import os
from cscale_notebooks_functions import read_gdf, get_cloud_cover_ts
from pystac_client import Client
client = Client.open("https://mqs.eodc.eu/stac/v1")
path = os.path.abspath("")
# Read the geojson file with region coordinates and get bbox coordinates to accelerate search
gejson_path = path + "/geojson/prague_map.geojson"
bbox = read_gdf(gejson_path)
bbox
# Use geopandas for geojson file read - sometimes it gets error...
#prague = geopandas.read_file(gejson_path)
#bbox = prague.bounds.values.tolist()[0]
time_period = "2022-01-01/2023-06-16"
limit = 1000
print('Search time period: ' + time_period)
We can put together all collections that contains desired region.
# This is repeated in case one skips the previous code
if 's2_collections' not in globals():
    s2_collections = []
    for collection in client.get_collections():
        if "l1c" in collection.id.lower():
            print(f"Append collection {collection.id} to list of Sentinel-2 L1C collections.")
            s2_collections.append(collection.id)
else:
    print(f"Selected collections already created in s2_collections variable.")
from IPython.display import HTML
items_s2 = []
items_s2dict = {}
for collection in s2_collections:
    results_s2 = client.search(collections=[collection], 
                            bbox=bbox, 
                            datetime=time_period, 
                            #limit=limit,
                            method="GET")
    try:
        items_s2.extend(results_s2.item_collection())
        items_s2dict[f"{collection}"] = results_s2.item_collection()
        error_message = f"<font color='green'>Success:</font> items found in collection id {collection}."
        display(HTML(error_message))
        #print(f"Search for items with collection id {collection} was succesfull.")
    except:
        error_message = f"<font color='red'>Failed:</font> items not found in collection id {collection}."
        display(HTML(error_message))
        
print(f"{len(items_s2)} matching items was found.")
We can plot some parameters that are stored within these collections, e.g. cloud coverage over the selected area.
gpd_cloud = {}
cloud_dict = {}
for i in items_s2dict:
    gpd_cloud[f"{i}"] = gpd.GeoDataFrame.from_features(items_s2dict[i])
    cloud_dict[f"{i}"] = get_cloud_cover_ts(gpd_cloud[i])
We can see the results just directly printed.
print(cloud_dict["CollGS_CZ|sentinel-2-l1c-2023"])
We can plot the search results which is more convenient.
import plotly.express as px
title = 'S2 Cloud Cover - Prague Region 2022/23'
for i, item in enumerate(gpd_cloud):
    fig = px.bar(gpd_cloud[item], x="datetime", y="eo:cloud_cover", title= title + ' [' + item + ']',
                labels={"datetime": "Date",
                     "eo:cloud_cover": "Cloud coverage [%]"})
    fig.show()
Cloudless search¶
Let's now focus on downloading particular region of interest with almost zero cloud coverage.
import pandas as pd
import geopandas as gpd
import os
from cscale_notebooks_functions import read_gdf
from pystac_client import Client
client = Client.open("https://mqs.eodc.eu/stac/v1")
path = os.path.abspath("")
#Define collection and search parameters.
time_period = "2022-01-01/2023-06-16"
limit = 500
# Get bbox coordinates to accelerate search
#prague = geopandas.read_file(path + "/geojson/prague_map.geojson")
#bbox = prague.bounds.values.tolist()[0]
gejson_path = path + "/geojson/prague_map.geojson"
bbox = read_gdf(gejson_path)
results_2023 = client.search(collections=["CollGS_CZ|sentinel-2-l1c-2023"], 
                            bbox=bbox, 
                            datetime=time_period,
                            limit=limit,
                            method="GET")
items_CollGS_CZ = results_2023.item_collection()
print(f"We found {len(items_CollGS_CZ)} matching items.")
Find the lowest cloud coverage in the collection of the desired region of interest.
# Load items with cloud cover less than 1%
items_cloud = []
for n, item in enumerate(items_CollGS_CZ):
    cloud_cover = item.properties.get("eo:cloud_cover")
    if cloud_cover < 1:
        print(f"Append item {item.id} with {cloud_cover:.2f}% cloud cover")
        items_cloud.append(item)
We can find dates of cloudless days.
import ipywidgets as widgets
item_gpd = gpd.GeoDataFrame.from_features(items_cloud)
p = widgets.Dropdown(
    options= item_gpd.datetime,
    description='Date:',
)
display(p)
We can choose exact day we would like to download and visualize this particular satellite tile in the map and check its geographical position as well as the shape.
import plotly.express as px
item_gpd = gpd.GeoDataFrame.from_features(items_cloud)
item_choose = item_gpd[item_gpd.datetime.index[item_gpd.datetime.tolist().index(p.value)]:item_gpd.datetime.index[item_gpd.datetime.tolist().index(p.value)]+1]
fig = px.choropleth_mapbox(item_choose,
                   geojson=item_choose.geometry,
                   locations=item_choose.index,
                   #color="geometry",
                   )
fig.update_layout(
    mapbox=dict(
        center=dict(lat=50, lon=15),  # Set the center to EU coordinates
        style='open-street-map',
        zoom=4, # Adjust the zoom level
    ),
    height=600,  # Set the height of the figure
    width=1000,  # Set the width of the figure
)
fig.update_traces(marker_opacity=0.5)
fig.show()
Saving image coordinates for later
if os.path.isfile('single_polygon_coords.csv') == False:
    single_polygon_coords = list(item_choose['geometry'].iloc[0].exterior.coords)
    df = pd.DataFrame(single_polygon_coords, columns=['X', 'Y'])
    df.to_csv('single_polygon_coords.csv', index=False)
    print('File saved')
else:
    print('File already exists')
Downloading images¶
We can check the metadata in the browser.
from IPython.display import HTML
import requests
import json
try:
    print(f"Look at this item in the STAC Browser: https://radiantearth.github.io/stac-browser/#/external/{items_cloud[item_gpd.datetime.index[item_gpd.datetime.tolist().index(p.value)]].get_self_href()}")
except NameError:
    error_message = f"<font color='red'>Error:</font> Image was not found - run the previous section first."
    display(HTML(error_message))
For downloading data, we need the URL and ACCESS TOKEN, which is ready in the notebook since logon. We can load it and check user info.
access_token = open('/var/run/secrets/egi.eu/access_token').read()
r = requests.get("https://aai.egi.eu/auth/realms/egi/protocol/openid-connect/userinfo", headers={"authorization": "Bearer {}".format(access_token)}, stream=True)
r
We can check attributes. For convenience and to maintain privacy we will filter for those relevant to C-SCALE.
print(json.dumps(json.loads(r.content), indent=2))
Now we can download all the data we need from the collection and save it.
TCI = requests.get(items_cloud[item_gpd.datetime.tolist().index(p.value)].assets["visual"].href, headers={"authorization": "Bearer {}".format(open('/var/run/secrets/egi.eu/access_token').read())}, stream=False)
# This creates folder where the image is stored. If folder exists, then nothing happens.
from pathlib import Path
img_folder = path + "/images/jp2/"
if os.path.isdir(img_folder) != True:
    Path(img_folder).mkdir(parents=True, exist_ok=True)
img_name = "TCI.jp2"
with open(img_folder + img_name, 'wb') as f:
    f.write(TCI.content)
For later analysis we might need some/all spectral bands that is provided together with RGB image.
# Preparing the list of desired bands to download
band_list = []
for i in range(2,13):
    if i < 10:
        band_list.append("B0" + str(i))
        if i == 8:
            band_list.append("B8A")
    else:
        band_list.append("B" + str(i))
band_list
# Saving the list of bands
for band in band_list:
    url = items_cloud[item_gpd.datetime.tolist().index(p.value)].assets[band].href
    img = requests.get(url, headers={"authorization": "Bearer {}".format(open('/var/run/secrets/egi.eu/access_token').read())}, stream=False)
    with open(img_folder + band +'.jp2', 'wb') as f:
        f.write(img.content)
print('All band images from the list saved in the folder: images')
We can see the satellite image tile with its original size and quality but it takes roughly 4 min!
%%time
import rasterio
from rasterio.plot import show
tci = rasterio.open(img_folder + img_name, driver='JP2OpenJPEG');
show(tci)