-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathUSPTOProcessZipFile.py
More file actions
240 lines (207 loc) · 10.7 KB
/
USPTOProcessZipFile.py
File metadata and controls
240 lines (207 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# USPTOProcessZipFile.py
# USPTO Bulk Data Parser - Processes ZIP Files
# Description: Imported to Process Modules. Extracts XML file contents from a downloaded ZIP file.
# Author: Joseph Lee
# Email: joseph@ripplesoftware.ca
# Website: www.ripplesoftware.ca
# Github: www.github.com/rippledj/uspto
# ImportPython Modules
import time
import os
import sys
import traceback
import subprocess
import shutil
import zipfile
import urllib.request, urllib.parse, urllib.error
import codecs
# Import USPTO Parser Functions
import USPTOLogger
# Extract a zip file and return the contents of the XML file as an array of lines
def extract_xml_file_from_zip(args_array):
logger = USPTOLogger.logging.getLogger("USPTO_Database_Construction")
# Extract the zipfile to read it
try:
zip_file = zipfile.ZipFile(args_array['temp_zip_file_name'], 'r')
# Find the xml file from the extracted filenames
for filename in zip_file.namelist():
if '.xml' in filename or '.sgml' in filename:
xml_file_name = filename
# Print stdout message that xml file was found
print('[xml file found. Filename: {0}]'.format(xml_file_name))
logger.info('xml file found. Filename: ' + xml_file_name)
# Open the file to read lines out of
xml_file = zip_file.open(xml_file_name, 'r')
# If sandbox mode then extract the xml file
if args_array['sandbox'] == True:
zip_file.extract(xml_file_name, args_array['temp_directory'] + "unzip/" + args_array['file_name'])
# Extract the contents from the file
xml_file_contents = xml_file.readlines()
# Close the file being read from
zip_file.close()
# If not sandbox mode, then delete the .zip file
if args_array['sandbox'] == False and os.path.exists(args_array['temp_zip_file_name']):
# Print message to stdout
print('[Purging .zip file ' + args_array['temp_zip_file_name'] + '...]')
logger.info('Purging .zip file ' + args_array['temp_zip_file_name'] + '...')
os.remove(args_array['temp_zip_file_name'])
print('[xml file contents extracted ' + xml_file_name + '...]')
logger.info('xml file contents extracted ' + xml_file_name + '...')
# Return the file contents as array
return xml_file_contents
# The zip file has failed using python's ZipFile
except:
print('[X] Zip file ' + args_array['temp_zip_file_name'] + ' failed to unzip with Python module...')
logger.warning('[X] Zip file ' + args_array['temp_zip_file_name'] + ' failed to unzip with Python module...')
traceback.print_exc()
# Attempt to download the file again
try:
# Print message to stdout
print('[Removing corrupted zip file ' + args_array['temp_zip_file_name'])
logger.warning('Removing corrupted file ' + args_array['temp_zip_file_name'])
# Remove the corrupted zip file
delete_zip_file(args_array['temp_zip_file_name'])
# Return None to signal failed status
return None
except:
print('[X] Failed to remove zip file ' + args_array['temp_zip_file_name'])
logger.warning('[X] Failed to remove zip file ' + args_array['temp_zip_file_name'])
traceback.print_exc()
# Return False to signify that zip file could not be deleted
return False
# Finally, if nothing was returned already, return None
finally:
pass
#TODO: need to remove the zip file here if
# Extract a zip file and return the contents of the CSV file as an array of lines
def extract_csv_file_from_zip(args_array):
logger = USPTOLogger.logging.getLogger("USPTO_Database_Construction")
# Extract the zipfile to read it
try:
zip_file = zipfile.ZipFile(args_array['temp_zip_file_name'], 'r')
# Find the csv file from the extracted filenames
for filename in zip_file.namelist():
if '.csv' in filename:
csv_file_name = filename
# Print stdout message that csv file was found
print('[csv file found. Filename: {0}]'.format(csv_file_name))
logger.info('csv file found. Filename: ' + csv_file_name)
# If extract the csv file
extracted_csv_filepath = args_array['temp_directory'] + "unzip/"
zip_file.extract(csv_file_name, extracted_csv_filepath)
# Close the zip file being read from
zip_file.close()
# Return the file contents as array
return extracted_csv_filepath + csv_file_name
# The zip file has failed using python's ZipFile
except:
print('[zip file failed to unzip with Python module: ' + args_array['temp_zip_file_name'])
logger.warning('[zip file failed to unzip with Python module: ' + args_array['temp_zip_file_name'])
traceback.print_exc()
# Attempt to download the file again
try:
# Print message to stdout
print('[Removing corrupted zip file: ' + args_array['temp_zip_file_name'])
logger.warning('Removing corrupted file: ' + args_array['temp_zip_file_name'])
# Remove the corrupted zip file
delete_zip_file(args_array['temp_zip_file_name'])
# Return None to signal failed status
return None
except:
print('[Failed to remove zip file: ' + args_array['temp_zip_file_name'])
logger.warning('Failed to remove zip file: ' + args_array['temp_zip_file_name'])
traceback.print_exc()
# Return False to signify that zip file could not be deleted
return False
# Finally, if nothing was returned already, return None
finally:
pass
# Extract a zip file and return the contents of the XML file as an array of lines
def extract_dat_file_from_zip(args_array, indexed=False):
logger = USPTOLogger.logging.getLogger("USPTO_Database_Construction")
# Extract the zipfile to read it
try:
zip_file = zipfile.ZipFile(args_array['temp_zip_file_name'], 'r')
data_file_name = ""
for name in zip_file.namelist():
if '.dat' in name or '.txt' in name:
data_file_name = name
# Print and log that the .dat file was not found
print('[APS .dat data file found. Filename: {0}]'.format(data_file_name))
logger.info('APS .dat file found. Filename: ' + data_file_name)
# If .dat file not found, then print error message
if data_file_name == "":
# Print and log that the .dat file was not found
logger.error('APS .dat file not found. Filename: ' + args_array['url_link'])
# Check if an unzip directory exists in the temp directory
if not os.path.exists(args_array['temp_directory'] + "unzip"):
os.mkdir(args_array['temp_directory'] + "unzip")
# Check if a directory exists for the specific file being unzipped
if not os.path.exists(args_array['temp_directory'] + "unzip/" + args_array['file_name']):
# Make a directory for the particular downloaded zip file
os.mkdir(args_array['temp_directory'] + "unzip/" + args_array['file_name'])
# Open the zip file and extract the .dat file contents
zip_file.extract(data_file_name, args_array['temp_directory'] + "unzip/" + args_array['file_name'])
# Close the zip file
zip_file.close()
# Create a temp file name for the extracted .dat file
temp_data_file_path = args_array['temp_directory'] + "unzip/" + args_array['file_name'] + "/" + data_file_name
# Open the .dat file contents from the extracted zip_file
data_file_contents = codecs.open(temp_data_file_path, 'r', 'iso-8859-1')
# If a flag is set for an indexable file object
# then parse into a list
if indexed:
indexed_list = []
for item in data_file_contents:
indexed_list.append(item)
data_file_contents = indexed_list
# Delete the extracted data file
if not args_array['sandbox']:
os.remove(temp_data_file_path)
# If not sandbox mode, then delete the .zip file
if args_array['sandbox'] == False and os.path.exists(args_array['temp_zip_file_name']):
# Print message to stdout
print('[Purging .zip file ' + args_array['temp_zip_file_name'] + '...]')
logger.info('Purging .zip file ' + args_array['temp_zip_file_name'] + '...')
os.remove(args_array['temp_zip_file_name'])
# Print message to stdout
print('[APS .dat data file contents extracted ' + data_file_name + '...]')
logger.info('APS .dat data file contents extracted ' + data_file_name + '...')
# Return the file contents as array
return data_file_contents
# Since zip file could not unzip, remove it
except:
# Print exception information to file
traceback.print_exc()
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
logger.error("Exception: " + str(exc_type) + " in Filename: " + str(fname) + " on Line: " + str(exc_tb.tb_lineno) + " Traceback: " + traceback.format_exc())
# Remove the zip file and return error code
try:
# Print message to stdout
print('[Removing corrupted zip file ' + args_array['temp_zip_file_name'] + ']')
logger.warning('Removing corrupted file ' + args_array['temp_zip_file_name'])
# Remove the corrupted zip file
delete_zip_file(args_array['temp_zip_file_name'])
# Return None to signal failed status
return None
except:
# Print exception information to file
traceback.print_exc()
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
logger.error("Exception: " + str(exc_type) + " in Filename: " + str(fname) + " on Line: " + str(exc_tb.tb_lineno) + " Traceback: " + traceback.format_exc())
# Print message to stdout
print('[Failed to remove zip file ' + args_array['temp_zip_file_name'] + ' ]')
logger.error('Failed to remove zip file ' + args_array['temp_zip_file_name'])
# Return False to signify that zip file could not be deleted
return False
# Deletes a zip file
def delete_zip_file(filename):
logger = USPTOLogger.logging.getLogger("USPTO_Database_Construction")
# Check that a zip file
if ".zip" in filename:
# Remove the file
os.remove(filename)
print("[.Zip file " + filename + " has been removed...]")
logger.warning(".Zip file " + filename + " has been removed...")