Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/ifw_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
print(json.dumps(pgpub_archive.to_dict(), indent=2))
download_path = "./download-example"
file_path = client.download_archive(
printed_metadata=pgpub_archive, destination_path=download_path, overwrite=True
printed_metadata=pgpub_archive, destination=download_path, overwrite=True
)
print(f"-Downloaded document to: {file_path}")

Expand All @@ -76,6 +76,6 @@
print(json.dumps(grant_archive.to_dict(), indent=2))
download_path = "./download-example"
file_path = client.download_archive(
printed_metadata=grant_archive, destination_path=download_path, overwrite=True
printed_metadata=grant_archive, destination=download_path, overwrite=True
)
print(f"-Downloaded document to: {file_path}")
11 changes: 6 additions & 5 deletions examples/patent_data_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,12 @@
document_to_download.document_formats
and document_to_download.document_identifier
):
print("\nAttempting to download first document...")
print("\nAttempting to download first PDF document...")
print(json.dumps(document_to_download.to_dict(), indent=2))
downloaded_path = client.download_document(
document_format=document_to_download.document_formats[0],
destination_path=DEST_PATH,
document=document_to_download,
format="PDF",
destination=DEST_PATH,
overwrite=True,
)
print(f"Downloaded document to: {downloaded_path}")
Expand All @@ -183,7 +184,7 @@
print("\nDownloading grant XML...")
grant_path = client.download_publication(
printed_metadata=grant_metadata,
destination_path=DEST_PATH,
destination=DEST_PATH,
overwrite=True,
)
print(f"Downloaded grant XML to: {grant_path}")
Expand All @@ -196,7 +197,7 @@
pgpub_path = client.download_publication(
printed_metadata=pgpub_metadata,
file_name="my_pgpub.xml",
destination_path=DEST_PATH,
destination=DEST_PATH,
overwrite=True,
)
print(f"Downloaded pgpub XML to: {pgpub_path}")
Expand Down
8 changes: 5 additions & 3 deletions examples/petition_decisions_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
# Download as JSON (returns response object)
print("\nDownloading decisions as JSON...")
response = client.download_decisions(
format="json", decision_date_from_q="2023-01-01", limit=5
format="json", decision_date_from_q="2023-01-01", limit=5, overwrite=True
)
if isinstance(response, PetitionDecisionDownloadResponse):
print(
Expand All @@ -178,7 +178,8 @@
format="csv",
decision_date_from_q="2023-01-01",
limit=10,
destination_path=DEST_PATH,
destination=DEST_PATH,
overwrite=True,
)
print(f"Downloaded CSV to: {csv_path}")

Expand Down Expand Up @@ -245,7 +246,8 @@

print("\nDownloading document...")
file_path = client.download_petition_document(
download_option, destination_path=DEST_PATH
download_option=download_option,
destination=DEST_PATH,
)
print(f"Downloaded to: {file_path}")

Expand Down
2 changes: 2 additions & 0 deletions src/pyUSPTO/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pyUSPTO.clients.ptab_trials import PTABTrialsClient
from pyUSPTO.config import USPTOConfig
from pyUSPTO.exceptions import (
FormatNotAvailableError,
USPTOApiAuthError,
USPTOApiError,
USPTOApiNotFoundError,
Expand Down Expand Up @@ -62,6 +63,7 @@
"USPTOApiAuthError",
"USPTOApiRateLimitError",
"USPTOApiNotFoundError",
"FormatNotAvailableError",
"USPTOConfig",
"HTTPConfig",
# Warning classes
Expand Down
226 changes: 168 additions & 58 deletions src/pyUSPTO/clients/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ def _create_session(self) -> requests.Session:
retry_strategy = Retry(
total=self.http_config.max_retries,
backoff_factor=self.http_config.backoff_factor,
status_forcelist=self.http_config.retry_status_codes,
status_forcelist=(
self.http_config.retry_status_codes
if self.http_config.max_retries > 0
else []
),
allowed_methods=ALLOWED_METHODS,
)

Expand Down Expand Up @@ -172,6 +176,7 @@ def __enter__(self) -> "BaseUSPTOClient[T]":
with PatentDataClient(api_key="...") as client:
response = client.search_applications(...)
"""
USPTOConfig._active_clients += 1
return self

def __exit__(
Expand All @@ -187,6 +192,9 @@ def __exit__(
exc_val: Exception value if an exception occurred
exc_tb: Exception traceback if an exception occurred
"""
USPTOConfig._active_clients -= 1
if USPTOConfig._active_clients == 0:
USPTOConfig._shared_session = None
self.close()

def _make_request(
Expand Down Expand Up @@ -499,102 +507,204 @@ def _get_extension_from_mime_type(mime_type: str | None) -> str | None:
"application/xml": ".xml",
"text/xml": ".xml",
"application/zip": ".zip",
"application/x-tar": ".tar",
"application/gzip": ".tar.gz",
"application/octet-stream": "",
}

return mime_to_ext.get(mime_type)

def _save_response_to_file(
self, response: requests.Response, file_path: str, overwrite: bool = False
self,
response: requests.Response,
destination: str | None = None,
file_name: str | None = None,
overwrite: bool = False,
) -> str:
"""Save a streaming response to a file on disk.

If file_path is a directory, attempts to extract filename from
Content-Disposition header and save in that directory.

If file_path has no extension and Content-Disposition doesn't provide
a filename, attempts to determine extension from Content-Type header.
"""Save streaming response to file.

Args:
response: Streaming response object from requests
file_path: Local path where file should be saved. Can be a file path
or a directory (in which case filename from Content-Disposition is used).
overwrite: Whether to overwrite existing files. Default False
response: Streaming HTTP response
destination: Directory to save to (default: current directory)
file_name: Override filename (default: from Content-Disposition)
overwrite: Overwrite existing file

Returns:
str: Path to the saved file
Path to saved file

Raises:
FileExistsError: If file exists and overwrite=False
ValueError: If file_path is a directory but no filename can be determined
FileExistsError: If file exists and overwrite is False
"""
path = Path(file_path)
filename: str | None = None

# If path is a directory, try to extract filename from Content-Disposition
if path.is_dir():
if file_name:
filename = file_name
else:
content_disp = response.headers.get("Content-Disposition")
filename = self._extract_filename_from_content_disposition(content_disp)
if not filename:
raise ValueError(
f"file_path is a directory ({file_path}) but Content-Disposition "
"header does not contain a filename. Please provide a full file path."
)
path = path / filename
# If path has no extension, try to determine it from Content-Type
elif not path.suffix:
# Only attempt if Content-Disposition doesn't already provide filename
content_disp = response.headers.get("Content-Disposition")
if not self._extract_filename_from_content_disposition(content_disp):
# Try to get extension from Content-Type header
content_type = response.headers.get("Content-Type")
extension = self._get_extension_from_mime_type(content_type)
if extension:
path = path.with_suffix(extension)

# Check for existing file
if path.exists() and not overwrite:
raise FileExistsError(
f"File already exists: {path}. Set overwrite=True to replace."
)
# Try to extract filename from URL
from urllib.parse import unquote, urlparse

url_path = urlparse(response.url).path
url_filename = unquote(url_path.split("/")[-1]) if url_path else None

if url_filename and "." in url_filename:
filename = url_filename
elif url_filename:
filename = url_filename
content_type = response.headers.get("Content-Type")
ext = self._get_extension_from_mime_type(content_type)
if ext:
filename += ext
else:
filename = "download"

if destination:
dest_path = Path(destination)
dest_path.mkdir(parents=True, exist_ok=True)
final_path = dest_path / filename
else:
final_path = Path.cwd() / filename

if final_path.exists() and not overwrite:
raise FileExistsError(f"File exists: {final_path}. Use overwrite=True")

# Save to disk with streaming
with open(file=str(path), mode="wb") as f:
with open(final_path, "wb") as f:
for chunk in response.iter_content(
chunk_size=self.http_config.download_chunk_size
):
if chunk: # Filter out keep-alive chunks
if chunk:
f.write(chunk)
return str(path)

def _download_file(self, url: str, file_path: str, overwrite: bool = False) -> str:
"""Download a file directly to disk.
return str(final_path)

def _extract_archive(
self,
archive_path: Path,
extract_to: Path | None = None,
remove_archive: bool = False,
) -> str:
"""Extract TAR or ZIP archive.

Args:
archive_path: Path to archive file
extract_to: Directory to extract to (default: archive_path.stem)
remove_archive: Delete archive after extraction

Returns:
Path to extracted content (single file: path to file, multiple files: directory path)

Raises:
ValueError: If file is not a valid TAR or ZIP archive
"""
import tarfile
import zipfile

if extract_to is None:
extract_to = archive_path.parent / archive_path.stem

extract_to.mkdir(parents=True, exist_ok=True)

extracted_items = []
if tarfile.is_tarfile(archive_path):
with tarfile.open(archive_path, "r:*") as tar:
tar.extractall(path=extract_to)
extracted_items = [m.name for m in tar.getmembers() if m.isfile()]
elif zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
extracted_items = [n for n in zip_ref.namelist() if not n.endswith("/")]
else:
raise ValueError(f"Not a valid TAR/ZIP archive: {archive_path}")

if remove_archive:
archive_path.unlink()

if len(extracted_items) == 1:
return str(extract_to / extracted_items[0])
else:
return str(extract_to)

def _download_and_extract(
self,
url: str,
destination: str | None = None,
file_name: str | None = None,
overwrite: bool = False,
) -> str:
"""Download file and auto-extract if it's an archive.

Args:
url: URL to download from
file_path: Local path where file should be saved
overwrite: Whether to overwrite existing files. Default False
url: URL to download
destination: Directory to save/extract to
file_name: Override filename
overwrite: Overwrite existing files

Returns:
str: Path to the downloaded file
Path to extracted content (file or directory)

Raises:
HTTPError: If download request fails
FileExistsError: If file exists and overwrite=False
TypeError: If response is not a valid Response object
FileExistsError: If file exists and overwrite is False
ValueError: If downloaded file is not a valid archive when extraction attempted
"""
import tarfile
import zipfile

downloaded_path = self._download_file(
url=url, destination=destination, file_name=file_name, overwrite=overwrite
)

path_obj = Path(downloaded_path)
is_archive = (
path_obj.suffix.lower() in [".tar", ".tgz", ".gz", ".zip"]
or tarfile.is_tarfile(path_obj)
or zipfile.is_zipfile(path_obj)
)

if is_archive:
return self._extract_archive(path_obj, remove_archive=True)
else:
return downloaded_path

def _download_file(
self,
url: str,
destination: str | None = None,
file_name: str | None = None,
overwrite: bool = False,
) -> str:
"""Download file to disk (NO extraction).

Args:
url: URL to download
destination: Directory to save to
file_name: Override filename
overwrite: Overwrite existing files

Returns:
Path to downloaded file

Raises:
TypeError: If response is not a valid Response object
FileExistsError: If file exists and overwrite is False
"""
# Always stream for file downloads (internal implementation detail)
response = self._make_request(
method="GET",
endpoint="", # Not used when custom_url is provided
endpoint="",
stream=True,
custom_url=url,
)

if not isinstance(response, requests.Response):
raise TypeError(
f"Expected requests.Response for streaming download, got {type(response)}"
)
raise TypeError(f"Expected Response, got {type(response)}")

return self._save_response_to_file(
response=response, file_path=file_path, overwrite=overwrite
response=response,
destination=destination,
file_name=file_name,
overwrite=overwrite,
)

@property
Expand Down
Loading
Loading