Create a new folder and run pipelines init inside of it.
It will create a file named pipeline.py with the following content:
from pipelines import tasks, Pipeline
from pipelines.tasks import sql_create_table_as, load_file_to_db, save_table_to_file
NAME = 'test_project'
SCHEMA = 'public'
YEAR_SUFFIX = '2023'
pipeline = Pipeline(
name=NAME,
schema=SCHEMA,
version=VERSION,
tasks=[
load_file_to_db(
input='original/original.csv',
output='original',
),
sql_create_table_as(
table='norm',
query='''
select *, domain_of_url(url)
from {original};
'''
),
tasks.CopyToFile(
input='norm',
output='norm',
),
# clean up:
sql('drop table {original}'),
sql('drop table {norm}'),
]
)The idea of this pipeline is to load the existing file with URLs and normalize them — extract domain name for each url, and finally save the result back to CSV-file.
> pipelines tasks
Tasks:
1: load_file_to_db [original]: original/original.csv -> original
2: ctas [norm]: norm
3: copy_to_file [norm]: norm -> norm.csv.gzLet's create file original.csv somewhere with the following content:
id,name,url
1,hello,http://hello.com/home
2,world,https://world.org/Formatted:
| id | name | url |
|---|---|---|
| 1 | hello | http://hello.com/home |
| 2 | world | https://world.org/ |
Now you can add it to your data sources using following command:
> pipelines add original.csv
data/original/original.csv> pipeline list
Error: No pipeline found in the current directory!
> cd test_project
> pipeline list
...
> pipeline run
...Now, when we have all the dependencies in place, we can run the pipeline using pipelines run.
> pipelines run
Running task 1: load_file_to_db [original]: original/original.csv -> original
0 original/original.csv original
Loading file original/original.csv
Dropping table 'public.test_project_2023_original'
drop true
'COPY 2'
Task took 0.385 seconds
Running task 2: ctas [norm]: norm
drop table if exists public.test_project_2023_norm;
create table public.test_project_2023_norm as
select *, domain_of_url(url)
from public.test_project_2023_original;
SELECT 2
Task took 0.812 seconds
--------------------------------------------------------------------------------
Running task 3: copy_to_file [norm]: norm -> norm.csv.gz
Writing data to file: norm.csv.gz
COPY 2
Task took 0.027 secondsYou can see the result of you work in data/norm.csv.gz.
id,name,url,domain_of_url
1,hello,http://hello.com/home,hello.com
2,world,https://world.org/,world.orgFormatted:
| id | name | url | domain_of_url |
|---|---|---|---|
| 1 | hello | http://hello.com/home | hello.com |
| 2 | world | https://world.org/ | world.org |