Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ jobs:
fail-fast: false
matrix:
py:
- "3.14"
- "3.13"
- "3.12"
- "3.11"
- "3.10"
- "3.9"
- "3.8"
os:
- ubuntu-latest
steps:
Expand Down
90 changes: 87 additions & 3 deletions opencage/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@


class OpenCageBatchGeocoder():
"""Batch geocoder that processes CSV files using the OpenCage API.

""" Called from command_line.py
init() receives the parsed command line parameters
geocode() receive an input and output CSV reader/writer and loops over the data
Reads rows from a CSV input, geocodes each address using async workers,
and writes results to a CSV output.

Args:
options: Parsed command-line options from argparse.
"""

def __init__(self, options):
Expand All @@ -33,9 +36,16 @@ def __init__(self, options):
self.write_counter = 1

def __call__(self, *args, **kwargs):
"""Run the batch geocoder synchronously via asyncio.run."""
asyncio.run(self.geocode(*args, **kwargs))

async def geocode(self, csv_input, csv_output):
"""Process a CSV input, geocode each row, and write results.

Args:
csv_input: CSV reader for input rows.
csv_output: CSV writer for output rows.
"""
if not self.options.dry_run:
test = await self.test_request()
if test['error']:
Expand Down Expand Up @@ -81,6 +91,12 @@ async def geocode(self, csv_input, csv_output):
progress_bar.close()

async def test_request(self):
"""Send a test geocoding request to verify the API key.

Returns:
Dict with 'error' (None or exception) and 'free' (bool indicating
whether a free trial account is being used).
"""
try:
async with OpenCageGeocode(
self.options.api_key,
Expand All @@ -99,6 +115,15 @@ async def test_request(self):
return {'error': exc}

async def read_input(self, csv_input, queue):
"""Read all rows from CSV input and add them to the work queue.

Args:
csv_input: CSV reader for input rows.
queue: Async queue to populate with parsed input items.

Returns:
True if any warnings were encountered while reading, False otherwise.
"""
any_warnings = False
for index, row in enumerate(csv_input):
line_number = index + 1
Expand All @@ -119,6 +144,16 @@ async def read_input(self, csv_input, queue):
return any_warnings

async def read_one_line(self, row, row_id):
"""Parse a single CSV row into a work item for geocoding.

Args:
row: List of column values from the CSV reader.
row_id: 1-based line number of the row in the input.

Returns:
Dict with keys 'row_id', 'address', 'original_columns',
and 'warnings'.
"""
warnings = False

if self.options.input_columns:
Expand Down Expand Up @@ -159,6 +194,13 @@ async def read_one_line(self, row, row_id):
return {'row_id': row_id, 'address': ','.join(address), 'original_columns': row, 'warnings': warnings}

async def worker(self, csv_output, queue, progress):
"""Consume items from the queue and geocode each one.

Args:
csv_output: CSV writer for output rows.
queue: Async queue of work items to process.
progress: tqdm progress bar, or False if disabled.
"""
while True:
item = await queue.get()

Expand All @@ -173,6 +215,14 @@ async def worker(self, csv_output, queue, progress):
queue.task_done()

async def geocode_one_address(self, csv_output, row_id, address, original_columns):
"""Geocode a single address and write the result to the output.

Args:
csv_output: CSV writer for output rows.
row_id: 1-based line number of the row in the input.
address: Address string (or lat,lng for reverse geocoding).
original_columns: Original CSV row columns to preserve in output.
"""
def on_backoff(details):
if not self.options.quiet:
sys.stderr.write("Backing off {wait:0.1f} seconds afters {tries} tries "
Expand Down Expand Up @@ -242,6 +292,18 @@ async def write_one_geocoding_result(
geocoding_result,
raw_response,
original_columns):
"""Write a single geocoding result row to the CSV output.

Appends the requested output columns to the original CSV columns.
Rows are written in order unless the --unordered option is set.

Args:
csv_output: CSV writer for output rows.
row_id: 1-based line number of the row in the input.
geocoding_result: First result dict from the API, or None.
raw_response: Full API response dict.
original_columns: Original CSV row columns to preserve in output.
"""
row = original_columns

for column in self.options.add_columns:
Expand Down Expand Up @@ -280,10 +342,32 @@ async def write_one_geocoding_result(
self.write_counter = self.write_counter + 1

def log(self, message):
"""Write a message to stderr unless quiet mode is enabled.

Args:
message: Message string to display.
"""
if not self.options.quiet:
sys.stderr.write(f"{message}\n")

def deep_get_result_value(self, data, keys, default=None):
"""Retrieve a nested value from a dict using a list of keys.

Args:
data: Dict to traverse.
keys: List of keys to follow in sequence.
default: Value to return if any key is missing.

Returns:
The nested value, or default if the path doesn't exist.

Example:
>>> data = {'status': {'code': 200, 'message': 'OK'}}
>>> self.deep_get_result_value(data, ['status', 'message'])
'OK'
>>> self.deep_get_result_value(data, ['missing', 'key'], '')
''
"""
for key in keys:
if isinstance(data, dict):
data = data.get(key, default)
Expand Down
75 changes: 69 additions & 6 deletions opencage/command_line.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import sys
import os
import io
from pathlib import Path
import re
import csv

Expand All @@ -10,9 +10,12 @@


def main(args=sys.argv[1:]):
options = parse_args(args)
"""Entry point for the OpenCage CLI.

assert sys.version_info >= (3, 8), "Script requires Python 3.8 or newer"
Args:
args: Command-line arguments (defaults to sys.argv[1:]).
"""
options = parse_args(args)

geocoder = OpenCageBatchGeocoder(options)

Expand All @@ -25,11 +28,19 @@ def main(args=sys.argv[1:]):


def parse_args(args):
"""Parse and validate command-line arguments.

Args:
args: List of command-line argument strings.

Returns:
Parsed argparse.Namespace with all options set.
"""
if len(args) == 0:
print("To display help use 'opencage -h', 'opencage forward -h' or 'opencage reverse -h'", file=sys.stderr)
sys.exit(1)

parser = argparse.ArgumentParser(description=f'Opencage CLI {__version__}')
parser = argparse.ArgumentParser(description=f'OpenCage CLI {__version__}')
parser.add_argument('--version', action='version', version=f'%(prog)s {__version__}')

subparsers = parser.add_subparsers(dest='command')
Expand Down Expand Up @@ -61,9 +72,9 @@ def parse_args(args):

options = parser.parse_args(args)

if os.path.exists(options.output) and not options.dry_run:
if Path(options.output).exists() and not options.dry_run:
if options.overwrite:
os.remove(options.output)
Path(options.output).unlink()
else:
print(
f"Error: The output file '{options.output}' already exists. You can add --overwrite to your command.",
Expand All @@ -78,6 +89,14 @@ def parse_args(args):


def add_optional_arguments(parser):
"""Add optional arguments shared by forward and reverse subcommands.

Args:
parser: argparse subparser to add arguments to.

Returns:
The parser with arguments added.
"""
parser.add_argument(
"--headers",
action="store_true",
Expand Down Expand Up @@ -129,6 +148,21 @@ def add_optional_arguments(parser):


def api_key_type(apikey):
"""Validate an OpenCage API key format.

Expects a 32-character lowercase hex string, optionally prefixed
with ``oc_gc_`` (e.g. ``oc_gc_1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d``
or ``1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d``).

Args:
apikey: API key string to validate.

Returns:
The validated API key string.

Raises:
argparse.ArgumentTypeError: If the key doesn't match the expected format.
"""
pattern = re.compile(r"^(oc_gc_)?[0-9a-f]{32}$")

if not pattern.match(apikey):
Expand All @@ -138,6 +172,16 @@ def api_key_type(apikey):


def ranged_type(value_type, min_value, max_value):
"""Create an argparse type function that enforces a value range.

Args:
value_type: Type to convert the argument to (e.g. int, float).
min_value: Minimum allowed value (inclusive).
max_value: Maximum allowed value (inclusive).

Returns:
A type-checking function suitable for argparse's type parameter.
"""
def range_checker(arg: str):
try:
f = value_type(arg)
Expand All @@ -152,6 +196,14 @@ def range_checker(arg: str):


def comma_separated_type(value_type):
"""Create an argparse type function that parses comma-separated values.

Args:
value_type: Type to convert each element to (e.g. int, str).

Returns:
A type-checking function suitable for argparse's type parameter.
"""
def comma_separated(arg: str):
if not arg:
return []
Expand All @@ -162,6 +214,17 @@ def comma_separated(arg: str):


def comma_separated_dict_type(arg):
"""Parse a comma-separated list of key=value pairs into a dict.

Args:
arg: String like "key1=val1,key2=val2".

Returns:
Dict of parsed key-value pairs, or empty dict if arg is empty.

Raises:
argparse.ArgumentTypeError: If the string is not valid key=value format.
"""
if not arg:
return {}

Expand Down
Loading