forked from data-prep-kit/data-prep-kit
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransform_utils.py
More file actions
307 lines (273 loc) · 11.2 KB
/
transform_utils.py
File metadata and controls
307 lines (273 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# SPDX-License-Identifier: Apache-2.0
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import hashlib
import io
import os
import string
import sys
from typing import Any
import mmh3
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.json as pj
import zipfile
from data_processing.utils import get_dpk_logger
logger = get_dpk_logger()
RANDOM_SEED = 42
LOCAL_TO_DISK = 2
class TransformUtils:
"""
Class implementing support methods for filter implementation
"""
@staticmethod
def deep_get_size(ob) -> int:
"""
Getting the complete size of the Python object. Based on
https://www.askpython.com/python/built-in-methods/variables-memory-size-in-python
Supports Python structures: list, tuple and set
:param ob: object
:return: object size
"""
size = sys.getsizeof(ob)
if isinstance(ob, (list, tuple, set)):
for element in ob:
size += TransformUtils.deep_get_size(element)
if isinstance(ob, dict):
for k, v in ob.items():
size += TransformUtils.deep_get_size(k)
size += TransformUtils.deep_get_size(v)
return size
@staticmethod
def normalize_string(doc: str) -> str:
"""
Normalize string
:param doc: string to normalize
:return: normalized string
"""
return doc.replace(" ", "").replace("\n", "").lower().translate(str.maketrans("", "", string.punctuation))
@staticmethod
def str_to_hash(val: str) -> str:
"""
compute string hash
:param val: string
:return: hash value
"""
return hashlib.sha256(val.encode("utf-8")).hexdigest()
@staticmethod
def str_to_int(s: str) -> int:
"""
Convert string to int using mmh3 hashing. Ensures predictable result by setting seed
:param s: string
:return: int hash
"""
return mmh3.hash(s, seed=RANDOM_SEED, signed=False)
@staticmethod
def decode_content(content_bytes: bytes, encoding: str = "utf-8") -> str:
"""
Decode the given bytes content using the specified encoding.
:param content_bytes: The bytes content to decode
:param encoding:The encoding to use while decoding the content. Default is 'utf-8'
:return: str: The decoded content as a string if successful,
otherwise empty string if an error occurs during decoding.
"""
try:
content_string = content_bytes.decode(encoding)
return content_string
except Exception:
return ""
@staticmethod
def get_file_extension(file_path) -> list[str]:
"""
Get the file's root and extension from the given file path.
:param file_path : The path of the file.
:return: str: The file extension including the dot ('.') if present, otherwise an empty string.
"""
return os.path.splitext(file_path)
@staticmethod
def get_file_basename(file_path) -> str:
"""
Get the file's base name from the given file path.
:param file_path : The path of the file.
:return: str: file base name.
"""
return os.path.basename(file_path)
@staticmethod
def validate_columns(table: pa.Table, required: list[str]) -> None:
"""
Check if required columns exist in the table
:param table: table
:param required: list of required columns
:return: None
"""
columns = table.schema.names
result = True
for r in required:
if r not in columns:
result = False
break
if not result:
raise Exception(
f"Not all required columns are present in the table - " f"required {required}, present {columns}"
)
@staticmethod
def convert_ndjson_to_arrow(data: bytes) -> pa.Table:
"""
Convert ndjson byte array to table
:param data: byte array
:return: table or None if the conversion failed
"""
try:
table = pj.read_json(io.BytesIO(data))
except Exception as e:
logger.warning(f"Could not convert bytes from ndjson to pyarrow- Retrying with bigger block size: {type(e)}-{e}")
try:
block_size=10 * 1024 * 1024
logger.debug(f"Retrying with block size {block_size:,} ")
read_options = pj.ReadOptions(block_size=10 * 1024 * 1024)
table = pj.read_json(io.BytesIO(data), read_options=read_options)
except Exception as e:
logger.error(f"Could not convert bytes from ndjson to pyarrow: {type(e)}-{e}")
table = None
return table
@staticmethod
def convert_zip_to_arrow(data: bytes) -> pa.Table:
"""
Convert zip file byte array to table. Currently only supports ndjson zipped files
:param data: byte array
:return: table or None if the conversion failed
"""
table = None
with zipfile.ZipFile(io.BytesIO(data)) as opened_zip:
zip_namelist = opened_zip.namelist()
for archive_filename in zip_namelist:
logger.debug(f"Processing archive {archive_filename} with extention {TransformUtils.get_file_extension(archive_filename)[1]}")
with opened_zip.open(archive_filename) as file:
try:
# Read the content of the file
content_bytes = file.read()
if TransformUtils.get_file_extension(archive_filename)[1] in [".ndjson", ".jsonl"]:
x = TransformUtils.convert_ndjson_to_arrow(content_bytes)
table =x if table is None else pa.concat_tables([table, x])
except Exception as e:
logger.error(f"Failed to read/convert {archive_filename}: {e}")
return None
return table
@staticmethod
def convert_binary_to_arrow(data: bytes, schema: pa.schema = None) -> pa.Table:
"""
Convert byte array to table
:param data: byte array
:param schema: optional Arrow table schema used for reading table, default None
:return: table or None if the conversion failed
"""
from data_processing.utils import get_dpk_logger
logger = get_dpk_logger()
try:
reader = pa.BufferReader(data)
table = pq.read_table(reader, schema=schema)
return table
except Exception as e:
logger.warning(f"Could not convert bytes to pyarrow: {e}")
# We have seen this exception before when using pyarrow, but polars does not throw it.
# "Nested data conversions not implemented for chunked array outputs"
# See issue 816 https://github.com/data-prep-kit/data-prep-kit/issues/816.
logger.info(f"Attempting read of pyarrow Table using polars")
try:
import polars
df = polars.read_parquet(io.BytesIO(data))
table = df.to_arrow()
except Exception as e:
logger.error(f"Could not convert bytes to pyarrow using polars: {e}. Skipping.")
table = None
return table
@staticmethod
def convert_arrow_to_binary(table: pa.Table) -> bytes:
"""
Convert Arrow table to byte array
:param table: Arrow table
:return: byte array or None if conversion fails
"""
from data_processing.utils import get_dpk_logger
logger = get_dpk_logger()
try:
# convert table to bytes
writer = pa.BufferOutputStream()
# Update default snappy compression to ZSTD.
# See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html
pq.write_table(table=table, where=writer, compression="ZSTD")
return bytes(writer.getvalue())
except Exception as e:
logger.error(f"Failed to convert arrow table to byte array, exception {e}. Skipping it")
return None
@staticmethod
def add_column(table: pa.Table, name: str, content: list[Any]) -> pa.Table:
"""
Add column to the table
:param table: original table
:param name: column name
:param content: content of the column
:return: updated table, containing new column
"""
# check if column already exist and drop it
if name in table.schema.names:
table = table.drop(columns=[name])
# append column
return table.append_column(field_=name, column=[content])
@staticmethod
def verify_no_duplicate_columns(table: pa.Table, file: str) -> bool:
"""
Verify that resulting table does not have duplicate columns
:param table: table
:param file: file where we are saving the table
:return: True, if there are no duplicates, False otherwise
"""
from data_processing.utils import get_dpk_logger
logger = get_dpk_logger()
columns_list = table.schema.names
columns_set = set(columns_list)
if len(columns_set) != len(columns_list):
logger.warning(f"Resulting table for file {file} contains duplicate columns {columns_list}. Skipping")
return False
return True
@staticmethod
def clean_path(path: str) -> str:
"""
Clean path parameters:
Removes white spaces from the input/output paths
Removes schema prefix (s3://, http:// https://), if exists
Adds the "/" character at the end, if it doesn't exist
Removes URL encoding
:param path: path to clean up
:return: clean path
"""
path = path.strip()
if path == "":
return path
from urllib.parse import unquote, urlparse, urlunparse
# Parse the URL
parsed_url = urlparse(path)
if parsed_url.scheme in ["http", "https"]:
# Remove host
parsed_url = parsed_url._replace(netloc="")
parsed_url = parsed_url._replace(path=parsed_url.path[1:])
# Remove the schema
parsed_url = parsed_url._replace(scheme="")
# Reconstruct the URL without the schema
url_without_schema = urlunparse(parsed_url)
# Remove //
if url_without_schema[:2] == "//":
url_without_schema = url_without_schema.replace("//", "", 1)
return_path = unquote(url_without_schema)
if return_path[-1] != "/":
return_path += "/"
return return_path