-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda.py
More file actions
77 lines (60 loc) · 1.85 KB
/
lambda.py
File metadata and controls
77 lines (60 loc) · 1.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import ROOT
import base64
import boto3
import cloudpickle as pickle
import os
import logging
import array
import ctypes
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.DEBUG)
bucket = os.getenv('bucket')
krb5ccname = os.getenv('KRB5CCNAME', '/tmp/certs')
def lambda_handler(event, context):
logging.info(f'event {event}')
ranges = pickle.loads(base64.b64decode(event['ranges']))
cert_file = base64.b64decode(event['cert'])
logging.info(ranges)
write_cert(cert_file)
from collections import namedtuple
ReplicateRange = namedtuple("ReplicateRange", ["start", "length", "source", "destination", "part_number", "id"])
ranges = [ReplicateRange(*x) for x in ranges]
try:
return run(ranges)
except Exception as e:
return {
'exc': str(e)
}
def write_cert(cert_file: bytes):
with open(f'{krb5ccname}', "wb") as handle:
handle.write(cert_file)
def run(ranges) -> dict:
for r in ranges:
process_range(rep_range=r)
return {
'statusCode': 200
}
def process_range(rep_range):
print(f'Sending {rep_range}')
part = download_bytes(rep_range)
print(f'Got part {part[:20]}')
send_to_s3(part, rep_range)
print('Sent!')
def download_bytes(rep_range):
buff = array.array('b', b'\x00' * rep_range.length)
buffptr = ctypes.c_char_p(buff.buffer_info()[0])
f = ROOT.TFile.Open(rep_range.source)
f.ReadBuffer(buffptr, rep_range.start, rep_range.length)
f.Close()
return buff.tobytes()
def send_to_s3(part, rep_range):
s3 = boto3.client('s3')
s3.upload_part(
Body=part,
Bucket=bucket,
Key=rep_range.destination,
PartNumber=rep_range.part_number,
UploadId=rep_range.id
)
def wait_on_futures(futures):
_ = [future.result() for future in futures]