Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions check-code.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,4 @@ pylint --py3k --reports no ${py_files}
pep8 ${py_files} --max-line-length 120

# Run tests
# -> But not those that require the neocommon library.
py.test fetch test -m 'not with_neocommon'
py.test fetch test
17 changes: 3 additions & 14 deletions example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,9 @@
# The work directory (for log and lock files):
directory: /data/fetch

# Notification settings (for errors):
notify:
email: ['jeremy.hooke@ga.gov.au']

# Message bus config, for announcing arrivals. Optional.
messaging:
host: rhe-neo-dev01.dev.lan
# virtual_host:
username: fetch
password: fetch

# Logging level for modules (any python module can be added here)
log:
fetch: DEBUG
neocommon: DEBUG
neocommon.files: INFO

# Download rules:
rules:
Expand Down Expand Up @@ -190,7 +177,9 @@ rules:
# Convert files to tiff (from netCDF)
process: !shell
command: '/usr/local/bin/gdal_translate -a_srs "+proj=latlong +datum=WGS84" {parent_dir}/{filename} {parent_dir}/{file_stem}.tif'
expect_file: '{parent_dir}/{file_stem}.tif'
upload_dir: '/data/water_vapour'
bucket: 'ard-processing-data'
prefix: 'ancillary/water_vapour'
#
#-------------------------------
# Examples using the ECMWF API |
Expand Down
117 changes: 17 additions & 100 deletions fetch/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@
import datetime
import errno
import logging
import multiprocessing
import os
import re
import smtplib
import socket
import subprocess
import tempfile
from email.mime.text import MIMEText
from email.header import Header

from pathlib import Path
from typing import Callable

from .util import rsync, Uri
from . import s3

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -468,76 +464,6 @@ def on_process_failure(self, process):
pass


class TaskFailureEmailer(TaskFailureListener):
"""
Send failure information via email
"""

def __init__(self, addresses):
"""
:type addresses: list of str
"""
self.addresses = addresses

def on_file_failure(self, process_name, file_uri, summary, body_text):
"""
Send mail on a
:param process_name:
:param file_uri:
:param summary:
:param body_text:
:return:
"""
self._send_mail(
u'uri: {uri}\n{summary}\n\n{body}'.format(
uri=file_uri,
summary=summary,
body=body_text
),
process_name
)

def on_process_failure(self, process):
"""
:type process: ScheduledProcess
"""

# A negative exit code means it was killed via a signal. Probably by the user.
# Not worth emailing.
if process.exitcode < 0:
return

with open(process.log_file, 'rt') as f:
msg = f.read()

self._send_mail(msg, process.name)

def _send_mail(self, body_text, process_name):
"""
:type body_text: str
:type process_name: str
"""
hostname = socket.getfqdn()
msg = MIMEText(body_text.encode('utf-8'), 'plain', 'utf-8')
msg['Subject'] = Header(u'{name} failure on {hostname}'.format(
name=process_name,
hostname=hostname
).encode('utf-8'), 'utf-8')
from_address = 'fetch-{pid}@{hostname}'.format(
pid=multiprocessing.current_process().pid,
hostname=hostname
)
msg['from'] = from_address
msg['to'] = ", ".join(self.addresses)
s = smtplib.SMTP('localhost')
s.sendmail(
from_address,
self.addresses,
msg.as_string()
)
s.quit()


class FileProcessor(SimpleObject):
"""
Any action that will process a file after retrieval. (base class)
Expand All @@ -560,20 +486,21 @@ class ShellFileProcessor(FileProcessor):
:type command: str
"""

def __init__(self, command=None, expect_file=None, input_files=None):
def __init__(self, command, upload_dir, bucket, prefix):
super(ShellFileProcessor, self).__init__()
self.command = command
self.expect_file = expect_file
self.input_files = input_files
self.upload_dir = upload_dir
self.bucket = bucket
self.prefix = prefix

def _apply_file_pattern(self, pattern, file_path, **keywords):
def _apply_file_pattern(self, pattern, file_path):
"""
Format the given pattern.
:type file_path: str

:rtype: str

>>> p = ShellFileProcessor()
>>> p = ShellFileProcessor('run command on file {filename}', '/data/upload', 's3-bucket', 's3-prefix')
>>> p._apply_file_pattern('{file_stem} extension {file_suffix}', '/tmp/something.txt')
'something extension .txt'
>>> p._apply_file_pattern('{filename} in {parent_dir}', '/tmp/something.txt')
Expand All @@ -582,8 +509,7 @@ def _apply_file_pattern(self, pattern, file_path, **keywords):
'/tmp'
>>> p._apply_file_pattern('{parent_dirs[1]}', '/tmp/something.txt')
'/'
>>> p._apply_file_pattern('{base}.hdf', '/tmp/something.hdf',**{'base':'/tmp/something'})
'/tmp/something.hdf'

"""
path = Path(file_path)
return pattern.format(
Expand All @@ -598,8 +524,7 @@ def _apply_file_pattern(self, pattern, file_path, **keywords):
parent_dirs=[str(p) for p in path.parents],

# A more flexible alternative to the above.
path=path,
**keywords
path=path
)

def process(self, file_path):
Expand All @@ -609,33 +534,25 @@ def process(self, file_path):
:raises: FileProcessError
"""
command = self.command
if self.input_files:
path_transform = RegexpOutputPathTransform(self.input_files[0])
if not all([os.path.isfile(path_transform.transform_output_path(f, file_path))
for f in self.input_files[1]]):
_log.info('Not all of the required_files are present.')
# This is used for reporting, so it is returning the file_path.
return file_path
else:
# format the path based on the group from
# transform output path
# command = path_transform.transform_output_path(command)
required_files_formating = path_transform.last_matched_groups
else:
required_files_formating = {}
command = self._apply_file_pattern(command, file_path, **required_files_formating)
command = self._apply_file_pattern(command, file_path)
_log.info('Running %r', command)

# Trigger command
returned = subprocess.call(command, shell=True)
if returned != 0:
raise FileProcessError('Return code %r from command %r' % (returned, command))
else:
_log.debug("Command completed successfully.")

# Check that output exists
expected_path = self._apply_file_pattern(self.expect_file, file_path, **required_files_formating)
expect_file = self.upload_dir + '/{file_stem}.h5'
expected_path = self._apply_file_pattern(expect_file, file_path)

if not os.path.exists(expected_path):
raise FileProcessError('Expected output not found {!r} for command {!r}'.format(expected_path, command))

s3.upload(expected_path, self.bucket, self.prefix)

_log.debug('File available %r', expected_path)

return expected_path
Loading