-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwriter.py
More file actions
77 lines (64 loc) · 3.07 KB
/
writer.py
File metadata and controls
77 lines (64 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
def create_query_constraint(datasets: list, dir_file: str) -> str:
"""
Write query to create constraints in Cypher and export as .cypher file.
:param datasets: List of datasets/schema to create constraints from.
:param dir_file: String of the directory to store Cypher query in.
:return: String of the queries in the script exported.
"""
aliases = [txt[0].lower() for txt in datasets]
cypher = []
for name, alias in zip(datasets, aliases):
query_constraint = (
f"CREATE CONSTRAINT table_name_Constraint{name} ON ({alias}:{name})\n"
f"ASSERT {alias}.table_name IS UNIQUE;\n"
)
cypher.append(query_constraint)
cypher = "".join(cypher)
with open(file=f"{dir_file}/query_constraints.cypher", mode="w") as f:
f.write(cypher)
return cypher
def create_query_node_import(datasets: list, dir_file: str) -> str:
"""
Write query to create nodes in Cypher and export as .cypher file.
:param datasets: List of datasets/schema to create nodes from.
:param dir_file: String of the directory to store Cypher query in.
:return: String of the queries in the script exported.
"""
cypher = []
for name in datasets:
query_nodes = (
f'USING PERIODIC COMMIT 500 LOAD CSV WITH HEADERS FROM "file:///{name.lower()}_tables.csv" AS csvLine\n'
f"CREATE (:{name} {{table_name: toString(csvLine.table_name), table_dataset: toString(csvLine.table_dataset), import_datetime: datetime()}});\n" # noqa: E501
)
cypher.append(query_nodes)
cypher = "".join(cypher)
with open(file=f"{dir_file}/query_nodes.cypher", mode="w") as f:
f.write(cypher)
return cypher
def create_query_relationship(datasets: list, dir_file: str) -> str:
"""
Write query to create relationship between nodes.
:param datasets: List of datasets/schema to create nodes from.
:param dir_file: String of the directory to store Cypher query in.
:return:
"""
aliases = [txt.lower() for txt in datasets]
cypher = []
for name, alias in zip(datasets, aliases):
for sub_name, sub_alias in zip(datasets, aliases):
file_name = f"{alias}_{sub_alias}_dependency.csv"
if file_name not in os.listdir(path=dir_file):
continue
else:
query_rel = (
f'USING PERIODIC COMMIT 500 LOAD CSV WITH HEADERS FROM "file:///{file_name}" AS csvLine\n' # noqa: E501
f"MERGE (a:{name} {{table_name: toString(csvLine.table_name), table_dataset: toString(csvLine.table_dataset)}})\n" # noqa: E501
f"MERGE (b:{sub_name} {{table_name: toString(csvLine.dependency_name), table_dataset: toString(csvLine.dependency_dataset)}})\n" # noqa: E501
f"CREATE (a)-[:HAS_TABLE_DEPENDENCY {{import_datetime: datetime()}}]->(b);\n"
)
cypher.append(query_rel)
cypher = "".join(cypher)
with open(file=f"{dir_file}/query_relationships.cypher", mode="w") as f:
f.write(cypher)
return cypher