Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions ocn_027a_rw0_nitrogen_plumes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## Wastewater Plumes in Coastal Areas Dataset Pre-processing
This file describes the data pre-processing that was done to the [Global Inputs and Impacts from of Human Sewage in Coastal Ecosystems](https://knb.ecoinformatics.org/view/doi:10.5063/F76B09) for [display on Resource Watch](https://resourcewatch.org/data/explore/11804f04-d9c7-47b9-8d27-27ce6ed6c042).

This dataset is provided as a series of GeoTIFF files from the data provider to the Resource Watch data team.

To display these data on Resource Watch, each GeoTIFF was translated into the appropriate projection for web display and uploaded to Google Earth Engine.

Please see the [Python script](https://github.com/resource-watch/data-pre-processing/blob/master/ocn_027a_rw0_nitrogen_plumes/ocn_027a_rw0_nitrogen_plumes_processing.py) for more details on this processing.

You can view the processed Wastewater Plumes in Coastal Areas dataset [on Resource Watch](https://resourcewatch.org/data/explore/11804f04-d9c7-47b9-8d27-27ce6ed6c042).

You can also download the original dataset [from the source website](https://knb.ecoinformatics.org/view/urn%3Auuid%3Ac7bdc77e-6c7d-46b6-8bfc-a66491119d07).

###### Note: This dataset processing was done by [Claire Hemmerly](https://github.com/clairehemmerly), and QC'd by [Chris Rowe](https://www.wri.org/profile/chris-rowe).
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import os
import sys
utils_path = os.path.join(os.path.abspath(os.getenv('PROCESSING_DIR')),'utils')
if utils_path not in sys.path:
sys.path.append(utils_path)
import util_files
import util_cloud
import zipfile
from zipfile import ZipFile
import ee
from google.cloud import storage
import logging
#import urllib
from collections import OrderedDict
import shlex
import subprocess

# Set up logging
# Get the top-level logger object
logger = logging.getLogger()
for handler in logger.handlers: logger.removeHandler(handler)
logger.setLevel(logging.INFO)
# make it print to the console.
console = logging.StreamHandler()
logger.addHandler(console)
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# name of asset on GEE where you want to upload data
# this should be an asset name that is not currently in use
dataset_name = 'ocn_027a_rw0_nitrogen_plumes'

# create a new sub-directory within your specified dir called 'data'
# within this directory, create files to store raw and processed data
data_dir = util_files.prep_dirs(dataset_name)

# create a dictionary to store information about the dataset
data_dict = OrderedDict()

data_dict= {
'url': 'https://knb.ecoinformatics.org/knb/d1/mn/v2/object/urn%3Auuid%3Aefef18ef-416e-4d4d-9190-f17485c02c15',
'unzipped folder': 'Global_N_Coastal_Plumes_tifs',
'tifs': ['global_effluent_2015_open_N.tif', 'global_effluent_2015_septic_N.tif', 'global_effluent_2015_treated_N.tif', 'global_effluent_2015_tot_N.tif'],
'raw_data_file':[],
'processed_data_file': [],
'sds': [
'classification',
],
'pyramiding_policy': 'MEAN',
'band_ids': ['b1']
}

'''
Download data and save to your data directory - this may take a few minutes
'''
logger.info('Downloading raw data')

#download the data from the source
raw_data_file = os.path.join(data_dir, 'Global_N_Coastal_Plumes_tifs.zip')
urllib.request.urlretrieve(data_dict['url'], raw_data_file)

# unzip source data
raw_data_file_unzipped = raw_data_file.split('.')[0]
zip_ref = ZipFile(raw_data_file, 'r')
zip_ref.extractall(raw_data_file_unzipped)
zip_ref.close()

# set name of raw data files
for tif in data_dict['tifs']:
data_dict['raw_data_file'].append(os.path.join(data_dir, data_dict['unzipped folder'], tif))


'''
Process data
'''
# Project and compress each tif
for i in range(len(data_dict['tifs'])):
# set a new file name to represent processed data
plume_type = ['open', 'septic', 'treated', 'total']
data_dict['processed_data_file'].append(os.path.join(data_dir,dataset_name + '_' + plume_type[i] +'.tif'))

logger.info('Processing data for ' + data_dict['processed_data_file'][i])

raw_data_path = os.path.join(os.getenv('PROCESSING_DIR'), dataset_name, data_dict['raw_data_file'][i])
logger.info(raw_data_path)

# project the data into WGS84 (espg 4326) using the command line terminal
cmd = 'gdalwarp {} {}'
# format to command line and run
posix_cmd = shlex.split(cmd.format(raw_data_path, data_dict['processed_data_file'][i]), posix=True)
logger.info(posix_cmd)
#completed_process= subprocess.check_output(posix_cmd)
completed_process= subprocess.call(posix_cmd)
#logging.debug(str(completed_process))

'''
Upload processed data to Google Earth Engine
'''

# set up Google Cloud Storage project and bucket objects
gcsClient = storage.Client(os.environ.get("CLOUDSDK_CORE_PROJECT"))
gcsBucket = gcsClient.bucket(os.environ.get("GEE_STAGING_BUCKET"))

# initialize ee and eeUtil modules for uploading to Google Earth Engine
auth = ee.ServiceAccountCredentials(os.getenv('GEE_SERVICE_ACCOUNT'), os.getenv('GOOGLE_APPLICATION_CREDENTIALS'))
ee.Initialize(auth)

# set pyramiding policy for GEE upload
pyramiding_policy = data_dict['pyramiding_policy'] #check

# Create an image collection where we will put the processed data files in GEE
image_collection = f'projects/resource-watch-gee/{dataset_name}'
ee.data.createAsset({'type': 'ImageCollection'}, image_collection)

# set image collection's privacy to public
acl = {"all_users_can_read": True}
ee.data.setAssetAcl(image_collection, acl)
print('Privacy set to public.')

# list the bands in each image
band_ids = data_dict['band_ids']

task_id = []

# Upload processed data files to GEE

# if upload is timing out, uncomment the following lines
# storage.blob._DEFAULT_CHUNKSIZE = 10 * 1024* 1024 # 10 MB
# storage.blob._MAX_MULTIPART_SIZE = 10 * 1024* 1024 # 10 MB

#loop though the processed data files to upload to Google Cloud Storage and Google Earth Engine

for i in range(len(data_dict['tifs'])):
logger.info('Uploading '+ data_dict['processed_data_file'][i]+' to Google Cloud Storage.')
# upload files to Google Cloud Storage
gcs_uri= util_cloud.gcs_upload(data_dict['processed_data_file'][i], dataset_name, gcs_bucket=gcsBucket)

logger.info('Uploading '+ data_dict['processed_data_file'][i]+ ' Google Earth Engine.')
# generate an asset name for the current file by using the filename (minus the file type extension)
file_name=data_dict['processed_data_file'][i].split('.')[0].split('/')[1]
asset_name = f'projects/resource-watch-gee/{dataset_name}/{file_name}'

# create the band manifest for this asset
#tileset_id= data_dict['processed_data_file'][i].split('.')[0]
mf_bands = [{'id': band_id, 'tileset_band_index': band_ids.index(band_id), 'tileset_id': file_name,'pyramidingPolicy': pyramiding_policy} for band_id in band_ids]

# create complete manifest for asset upload
manifest = util_cloud.gee_manifest_complete(asset_name, gcs_uri[0], mf_bands)

# upload the file from Google Cloud Storage to Google Earth Engine
task = util_cloud.gee_ingest(manifest)
print(asset_name + ' uploaded to GEE')
task_id.append(task)

# remove files from Google Cloud Storage
util_cloud.gcs_remove(gcs_uri[0], gcs_bucket=gcsBucket)
logger.info('Files deleted from Google Cloud Storage.')

'''
Upload original data and processed data to Amazon S3 storage
'''
# initialize AWS variables
aws_bucket = 'wri-projects'
s3_prefix = 'resourcewatch/raster/'

# Copy the raw data into a zipped file to upload to S3

print('Uploading original data to S3.')
# Copy the raw data into a zipped file to upload to S3
raw_data_dir = os.path.join(data_dir, dataset_name+'.zip')
with ZipFile(raw_data_dir,'w') as zip:
raw_data_files = data_dict['raw_data_file']
for raw_data_file in raw_data_files:
zip.write(raw_data_file, os.path.basename(raw_data_file),compress_type= zipfile.ZIP_DEFLATED)

# Upload raw data file to S3
uploaded = util_cloud.aws_upload(raw_data_dir, aws_bucket, s3_prefix + os.path.basename(raw_data_dir))

logger.info('Uploading processed data to S3.')
# Copy the processed data into a zipped file to upload to S3
processed_data_dir = os.path.join(data_dir, dataset_name+'_edit.zip')
with ZipFile(processed_data_dir,'w') as zip:
processed_data_files = data_dict['processed_data_file']
for processed_data_file in processed_data_files:
zip.write(processed_data_file, os.path.basename(processed_data_file),compress_type= zipfile.ZIP_DEFLATED)

# Upload processed data file to S3
uploaded = util_cloud.aws_upload(processed_data_dir, aws_bucket, s3_prefix + os.path.basename(processed_data_dir))
19 changes: 19 additions & 0 deletions ocn_027b_rw0_wastewater_pourpoints/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
## Wastewater Inputs to Coastal Areas Dataset Pre-processing
This file describes the data pre-processing that was done to the [Global Inputs and Impacts from of Human Sewage in Coastal Ecosystems](https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0258898) for [display on Resource Watch](https://resourcewatch.org/data/explore/11804f04-d9c7-47b9-8d27-27ce6ed6c042).

The source provided this dataset as a shapefile containing point data.

Below, we describe the steps used to reformat the shapefile to upload it to Carto:

1. Read in the table as a geopandas data frame.
2. Convert the data type of the columns to integer (geometry column excluded).
3. Transform the projection from ESRI 54009 to EPSG 4326.


Please see the [Python script](https://github.com/resource-watch/data-pre-processing/blob/master/ocn_027b_rw0_wastewater_pourpoints/ocn_027b_rw0_wastewater_pourpoints_processing.py) for more details on this processing.

You can view the processed Wastewater Inputs to Coastal Areas dataset [on Resource Watch](https://resourcewatch.org/data/explore/5bf349ec-3b14-4021-a7d4-fc4b8104bd74).

You can also download the original dataset [directly through Resource Watch](https://wri-public-data.s3.amazonaws.com/resourcewatch/ocn_027b_rw0_wastewater_pourpoints.zip), or [from the source website](https://knb.ecoinformatics.org/view/urn%3Auuid%3Ac7bdc77e-6c7d-46b6-8bfc-a66491119d07).

###### Note: This dataset processing was done by Claire Hemmerly, and QC'd by [Chris Rowe](https://www.wri.org/profile/chris-rowe).
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import geopandas as gpd
import os
import glob
import pyproj
from shapely.geometry import Point
import urllib.request
import sys

utils_path = os.path.join(os.path.abspath(os.getenv('PROCESSING_DIR')),'utils')
if utils_path not in sys.path:
sys.path.append(utils_path)

from cartoframes.auth import set_default_credentials
from cartoframes import to_carto, update_privacy_table
import util_files
import util_cloud
from zipfile import ZipFile
import logging


# Set up logging
# Get the top-level logger object
logger = logging.getLogger()
for handler in logger.handlers:
logger.removeHandler(handler)
logger.setLevel(logging.INFO)
# make it print to the console.
console = logging.StreamHandler()
logger.addHandler(console)
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# name of table on Carto where you want to upload data
# this should be a table name that is not currently in use
dataset_name = 'ocn_027b_rw0_wastewater_pourpoints'

logger.info('Executing script for dataset: ' + dataset_name)
# create a new sub-directory within your specified dir called 'data'
# within this directory, create files to store raw and processed data
data_dir = util_files.prep_dirs(dataset_name)

'''
Download data and save to your data directory
'''
logger.info('Downloading raw data')
# insert the url used to download the data from the source website
url = 'https://knb.ecoinformatics.org/knb/d1/mn/v2/object/urn%3Auuid%3Aaf8d0bd6-dc0c-4149-a3cd-93b5aed71f7c'

# download the data from the source
raw_data_file = os.path.join(data_dir, 'N_PourPoint_And_Watershed.zip')
urllib.request.urlretrieve(url, raw_data_file)

# unzip source data
raw_data_file_unzipped = raw_data_file.split('.')[0]
zip_ref = ZipFile(raw_data_file, 'r')
zip_ref.extractall(raw_data_file_unzipped)
zip_ref.close()


'''
Process data
'''

# load in the polygon shapefile
shapefile = os.path.join(raw_data_file_unzipped, 'effluent_N_pourpoints_all.shp')
gdf = gpd.read_file(shapefile)

# create a path to save the processed shapefile later
processed_data_file = os.path.join(data_dir, dataset_name+'_edit.shp')

# convert the data type of columns to integer
for col in gdf.columns[1:9]:
gdf[col] = gdf[col].fillna(0).astype('int')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we want na values to be zero? Are there any existing zero values?

Copy link
Collaborator Author

@clairehemmerly clairehemmerly Jul 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only NA values are in the columns that show % nitrogen input, which we're not using, and the same rows have 0 for the nitrogen input in g/yr, so I think updating the NAs to 0 should be fine. I had trouble creating the processed shapefile when the values were floats not integers, I got an error basically saying the numbers were too big.


# convert geometry from esri 54009 to epsg 4326 for display on carto
transformer = pyproj.Transformer.from_crs('esri:54009', 'epsg:4326')
lon, lat = transformer.transform(gdf['geometry'].x, gdf['geometry'].y)
gdf['geometry'] = [Point(xy) for xy in zip(lat, lon)]
gdf['geometry'] = gdf['geometry'].set_crs(epsg=4326)

# create an index column to use as cartodb_id
gdf['cartodb_id'] = gdf.index

# rename columns to match names in carto
gdf.columns = [x.lower().replace(' ', '_') for x in gdf.columns]

# reorder the columns
gdf = gdf[['cartodb_id'] + list(gdf)[:-1]]

# save processed dataset to shapefile
gdf.to_file(processed_data_file, driver='ESRI Shapefile')


'''
Upload processed data to Carto
'''

logger.info('Uploading processed data to Carto.')

# authenticate carto account
CARTO_USER = os.getenv('CARTO_WRI_RW_USER')
CARTO_KEY = os.getenv('CARTO_WRI_RW_KEY')
set_default_credentials(username=CARTO_USER, base_url="https://{user}.carto.com/".format(user=CARTO_USER),api_key=CARTO_KEY)

# upload data frame to Carto
to_carto(gdf, dataset_name + '_edit', if_exists='replace')

# set privacy to 'link' so table is accessible but not published
update_privacy_table(dataset_name + '_edit', 'link')


'''
Upload original data and processed data to Amazon S3 storage
'''
# initialize AWS variables
aws_bucket = 'wri-public-data'
s3_prefix = 'resourcewatch/'

logger.info('Uploading original data to S3.')

# Copy the raw data into a zipped file to upload to S3
raw_data_dir = os.path.join(data_dir, dataset_name+'.zip')
with ZipFile(raw_data_dir,'w') as zip:
zip.write(raw_data_file, os.path.basename(raw_data_file))

# Upload raw data file to S3
uploaded = util_cloud.aws_upload(raw_data_dir, aws_bucket, s3_prefix+os.path.basename(raw_data_dir))

# Copy the processed data into a zipped file to upload to S3
processed_data_dir = os.path.join(data_dir, dataset_name+'_edit.zip')
# find all the necessary components of the shapefiles
processed_data_files = glob.glob(os.path.join(data_dir, dataset_name + '_edit.*'))
with ZipFile(processed_data_dir,'w') as zip:
for file in processed_data_files:
zip.write(file, os.path.basename(file))

# Upload processed data file to S3
uploaded = util_cloud.aws_upload(processed_data_dir, aws_bucket, s3_prefix+os.path.basename(processed_data_dir))


20 changes: 20 additions & 0 deletions ocn_027c_rw0_wastewater_watersheds/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
## Watersheds that Transport Wastewater to Coastal Ocean Dataset Pre-processing
This file describes the data pre-processing that was done to the [Global Inputs and Impacts from of Human Sewage in Coastal Ecosystems](https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0258898) for [display on Resource Watch](https://resourcewatch.org/data/explore/784732cc-8e7e-4dac-be51-d4506ff2ee04).

The source provided this dataset as a shapefile containing polygon data.

Below, we describe the steps used to reformat the shapefile to upload it to Carto:

1. Read in the table as a geopandas data frame.
2. Convert the data type of the columns to integer (geometry column excluded).
3. Transform the projection from ESRI 54009 to EPSG 4326.


Please see the [Python script](https://github.com/resource-watch/data-pre-processing/blob/master/ocn_027c_rw0_wastewater_watersheds/ocn_027c_rw0_wastewater_watersheds_processing.py) for more details on this processing.

You can view the processed Watersheds that Transport Wastewater to Coastal Ocean dataset [on Resource Watch](https://resourcewatch.org/data/explore/5bf349ec-3b14-4021-a7d4-fc4b8104bd74).

You can also download the original dataset [directly through Resource Watch](https://wri-public-data.s3.amazonaws.com/resourcewatch/ocn_027c_rw0_wastewater_watersheds.zip
), or [from the source website](https://knb.ecoinformatics.org/view/urn%3Auuid%3Ac7bdc77e-6c7d-46b6-8bfc-a66491119d07).

###### Note: This dataset processing was done by Claire Hemmerly, and QC'd by [Chris Rowe](https://www.wri.org/profile/chris-rowe).
Loading