|
| 1 | +import csv |
| 2 | +import json |
| 3 | +import sys |
| 4 | +from typing import Tuple |
| 5 | + |
| 6 | +import typer |
| 7 | +from rich import print |
| 8 | + |
| 9 | +from taskbadger import StatusEnum, create_task, get_task, update_task |
| 10 | +from taskbadger.cli.utils import OutputFormat, configure_api, err_console, get_actions, get_metadata |
| 11 | + |
| 12 | + |
| 13 | +def get( |
| 14 | + ctx: typer.Context, |
| 15 | + task_id: str = typer.Argument(..., show_default=False, help="The ID of the task."), |
| 16 | + output_format: OutputFormat = typer.Option(OutputFormat.pretty, "--format", "-f", help="Output format"), |
| 17 | +): |
| 18 | + """Get a task.""" |
| 19 | + configure_api(ctx) |
| 20 | + task = get_task(task_id) |
| 21 | + if output_format == OutputFormat.pretty: |
| 22 | + print(f"Task ID: {task.id}") |
| 23 | + print(f"Created: {task.created.isoformat()}") |
| 24 | + print(f"Name: {task.name}") |
| 25 | + print(f"Status: {task.status}") |
| 26 | + print(f"Percent: {task.value_percent}%") |
| 27 | + elif output_format == OutputFormat.json: |
| 28 | + print(json.dumps(task.to_dict(), indent=2)) |
| 29 | + elif output_format == OutputFormat.csv: |
| 30 | + writer = csv.writer(sys.stdout) |
| 31 | + writer.writerow("Task ID,Created,Name,Status,Percent".split(",")) |
| 32 | + writer.writerow([task.id, task.created.isoformat(), task.name, task.status, str(task.value_percent)]) |
| 33 | + |
| 34 | + |
| 35 | +def create( |
| 36 | + ctx: typer.Context, |
| 37 | + name: str = typer.Argument(..., show_default=False, help="The task name."), |
| 38 | + monitor_id: str = typer.Option(None, help="Associate this task with a monitor."), |
| 39 | + action_def: Tuple[str, str, str] = typer.Option( |
| 40 | + (None, None, None), |
| 41 | + "--action", |
| 42 | + "-a", |
| 43 | + metavar="<trigger integration config>", |
| 44 | + show_default=False, |
| 45 | + help="Action definition e.g. 'success,error email to:me@email.com'", |
| 46 | + ), |
| 47 | + status: StatusEnum = typer.Option(StatusEnum.PROCESSING, help="The initial status of the task."), |
| 48 | + value_max: int = typer.Option(100, help="The maximum value for the task."), |
| 49 | + metadata: list[str] = typer.Option( |
| 50 | + None, |
| 51 | + show_default=False, |
| 52 | + help="Metadata 'key=value' pair to associate with the task. Can be specified multiple times.", |
| 53 | + ), |
| 54 | + metadata_json: str = typer.Option( |
| 55 | + None, show_default=False, help="Metadata to associate with the task. Must be valid JSON." |
| 56 | + ), |
| 57 | + quiet: bool = typer.Option(False, "--quiet", "-q", help="Minimal output. Only the Task ID."), |
| 58 | +): |
| 59 | + """Create a task.""" |
| 60 | + configure_api(ctx) |
| 61 | + actions = get_actions(action_def) |
| 62 | + metadata = get_metadata(metadata, metadata_json) |
| 63 | + |
| 64 | + try: |
| 65 | + task = create_task( |
| 66 | + name, |
| 67 | + status=status, |
| 68 | + value_max=value_max, |
| 69 | + data=metadata, |
| 70 | + actions=actions, |
| 71 | + monitor_id=monitor_id, |
| 72 | + ) |
| 73 | + except Exception as e: |
| 74 | + err_console.print(f"Error creating task: {e}") |
| 75 | + else: |
| 76 | + if quiet: |
| 77 | + print(task.id) |
| 78 | + else: |
| 79 | + print(f"Task created: {task.public_url}") |
| 80 | + |
| 81 | + |
| 82 | +def update( |
| 83 | + ctx: typer.Context, |
| 84 | + task_id: str = typer.Argument(..., show_default=False, help="The ID of the task to update."), |
| 85 | + name: str = typer.Option(None, show_default=False, help="Update the name of the task."), |
| 86 | + action_def: Tuple[str, str, str] = typer.Option( |
| 87 | + (None, None, None), |
| 88 | + "--action", |
| 89 | + "-a", |
| 90 | + metavar="<trigger integration config>", |
| 91 | + show_default=False, |
| 92 | + help="Action definition e.g. 'success,error email to:me@email.com'", |
| 93 | + ), |
| 94 | + status: StatusEnum = typer.Option(StatusEnum.PROCESSING, help="The status of the task."), |
| 95 | + value: int = typer.Option(None, show_default=False, help="The current task value (progress)."), |
| 96 | + value_max: int = typer.Option(None, show_default=False, help="The maximum value for the task."), |
| 97 | + metadata: list[str] = typer.Option( |
| 98 | + None, |
| 99 | + show_default=False, |
| 100 | + help="Metadata 'key=value' pair to associate with the task. Can be specified multiple times.", |
| 101 | + ), |
| 102 | + metadata_json: str = typer.Option( |
| 103 | + None, show_default=False, help="Metadata to associate with the task. Must be valid JSON." |
| 104 | + ), |
| 105 | + quiet: bool = typer.Option(False, "--quiet", "-q", help="No output."), |
| 106 | +): |
| 107 | + """Update a task.""" |
| 108 | + configure_api(ctx) |
| 109 | + actions = get_actions(action_def) |
| 110 | + metadata = get_metadata(metadata, metadata_json) |
| 111 | + |
| 112 | + try: |
| 113 | + task = update_task( |
| 114 | + task_id, |
| 115 | + name=name, |
| 116 | + status=status, |
| 117 | + value=value, |
| 118 | + value_max=value_max, |
| 119 | + data=metadata, |
| 120 | + actions=actions, |
| 121 | + ) |
| 122 | + except Exception as e: |
| 123 | + err_console.print(f"Error creating task: {e}") |
| 124 | + else: |
| 125 | + if not quiet: |
| 126 | + print(f"Task updated: {task.public_url}") |
0 commit comments