Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions wintappy/sleeper/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Sleeper for Wintap Testing

## Install/setup Sleeper - based on Dave's email
* Built Ubuntu EC2 with nix/docker

* Run bootstrap (required only 1x for CDK setup in aws account)
* Note: we did this in the first attempt, but this should be the right place for with the EC2 setup
* Deployed with: ./scripts/test/deployAll/buildDeployTest.sh $ID $VPC $SUBNETS
* Modified schema.template and added table with:
`./scripts/deploy/addTable.sh $ID process`
* Uploaded all the process files to the existing system-test bucket, in a new path: wintap/process
* Sent an ingest request with python (ingest-request.py)

## Install from the README (using a docker instance)
* Start local docker container, used to run sleeper commands, with `./sleeper-install.sh develop`
* For example, to deploy EC2 and VPC
* Run bootstrap (required only 1x for CDK setup in aws account)
* Run `envoironment deploy TestEnvironment` to deploy EC2 and VPC in aws
* This EC2 is then used to deploy sleeper itself
* `environment connect` will open a shell on the remote EC2 instance via the local docker container


## Test Ingest with Process data
* Confirmed that parquet files need to be defined with every column as REQUIRED
* Replace existing nulls with something datatype appropriate
* Timestamps aren't supported, so convert to LONG/BIGINT
* Upload files to process in to the existing ingest bucket, using a new path
* Simplest for now to create a new dir for each batch to process.
* Successful test with all 60 rolling/process files. ~900MB in, 300MB out!

## Operating Notes

### Add a new table
* Create a schema
* Pre-process data to eliminate nulls/convert datatypes
* (table of Parquet -> Sleeper datatypes)
* Ref: https://github.com/gchq/sleeper/blob/d470ffeef5929b59e27c8f367012afebed38593c/docs/03-schema.md
* Set nulls to some value (create-sleeper-no-nulls.sh)
* Add "required" flag to parquet files (add-required.py)
* Upload into ingest bucket
* Use a new folder for each batch? Maybe use the wintap strategy of uploadPK sets?
* Ingest Data (ingest-request.py)
* Monitor progess by watching either the dashboard or the CloudWatch Log Groups

### Modify a table: just don't for now...

## Things to try
* Try the ingest batcher
* Ingest network, both raw and rolling

### Useful AWS Console Locations
* CloudWatch
* Dashboard - customize for our tables
* Log Group - (ID)-IngestTasks

# Questions
* When does a build actually need to be run? Will deploy pull jars as appropriate from… the docker image (?)
* Only when source is changed?
* From the EC2 instance in the nix-shell, there is no "sleeper" command available? That's correct?
* Yes, although the CLI has a separate build: https://github.com/gchq/sleeper/blob/develop/docs/11-dev-guide.md#sleeper-cli
* What would be use case for having multiple 'environments'?
* How do I run the admin client? Are there docs on it somewhere?
* Do you use the CLI or do you build from source? There's a script to start it in the repository at
`scripts/utility/adminClient.sh`
* You can run that if you've built from source, or you can run it through the CLI with
`sleeper deployment utility/adminClient.sh`
53 changes: 53 additions & 0 deletions wintappy/sleeper/add_required.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import argparse
import os

import pyarrow as pa
import pyarrow.parquet as pq


def add_required_to_all(source_file, dest_dir):
src = pq.read_table(source_file)

schema = src.schema
for i, col in enumerate(schema):
schema = schema.set(i, pa.field(col.name, col.type, nullable=False))

writer = pq.ParquetWriter(
os.path.join(dest_dir, os.path.basename(source_file) + "-required.parquet"),
schema=schema,
)
src = src.cast(schema)
writer.write_table(src)
writer.close()


def main():
parser = argparse.ArgumentParser(
prog="add_required.py",
description="Add required flag to every column of each parquet file",
)
parser.add_argument(
"-s",
"--source",
help="Path of parquet files or a single file",
)
parser.add_argument(
"-d",
"--destination",
help="Path for new parquet files",
)
args = parser.parse_args()

if os.path.isdir(args.source):
# Find all parquet files in the given path
for path, _, files in os.walk(args.source):
for name in files:
if name.endswith(".parquet"):
print(f"Processing: {path} {name}")
add_required_to_all(os.path.join(path, name), args.destination)
else:
add_required_to_all(args.source, args.destination)


if __name__ == "__main__":
main()
32 changes: 32 additions & 0 deletions wintappy/sleeper/create-sleeper-no-nulls.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/bin/bash
#
# Create a copy of the process table for sleeper. Convert nulls to something. Use a subset of fields.
# Run from inside the rolling/process dir with:
# find . -name \*parquet -exec ~/create_sleeper_no_nulls.sh {} \;

source="""
select
pid_hash,
os_family,
hostname,
os_pid,
process_name,
parent_pid_hash,
parent_os_pid,
process_path,
filename,
file_id,
cast(process_started_seconds as bigint) process_started_seconds,
process_started,
first_seen,
last_seen,
ifnull(args,'') args,
ifnull(process_stop_seconds, -1) process_stop_seconds,
ifnull(process_term, 'epoch'::TIMESTAMPTZ) process_term
from parquet_scan('$1') where process_name is not null
"""

target=../../sleeper/$(basename "$1")-no_nulls.parquet
copy_cmd="COPY ($source) TO '$target' (FORMAT 'parquet');"

~/apps/duckdb -s "$copy_cmd"
17 changes: 17 additions & 0 deletions wintappy/sleeper/ingest-request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sleeper.sleeper import SleeperClient

# Assumes AWS keys are defined as either default in ~/.aws/credentials or set in the environment
# Bucket can be a folder or a single file
# Monitor progress in CloudWatch Log Group: [ID]-IngestTasks

# TODO
# Add args to the script
# Expand to handle upload of local files, then submit a request

table_name = "process"
bucket = "sleeper-wintapsleeper-system-test-ingest/wintap-acme/process2/"

# Create Sleeper instance with base name of install
my_sleeper = SleeperClient("wintapsleeper")
# for file in files:
my_sleeper.ingest_parquet_files_from_s3(table_name, [bucket])
52 changes: 52 additions & 0 deletions wintappy/sleeper/process-sleeper.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"rowKeyFields": [
{
"name": "hostname",
"type": "StringType"
},
{
"name": "pid_hash",
"type": "StringType"
}
],
"sortKeyFields": [
{
"name": "process_started_seconds",
"type": "LongType"
}
],
"valueFields": [
{
"name": "os_family",
"type": "StringType"
},
{
"name": "process_name",
"type": "StringType"
},
{
"name": "os_pid",
"type": "IntType"
},
{
"name": "parent_pid_hash",
"type": "StringType"
},
{
"name": "parent_os_pid",
"type": "IntType"
},
{
"name": "process_path",
"type": "StringType"
},
{
"name": "filename",
"type": "StringType"
},
{
"name": "file_id",
"type": "StringType"
}
]
}