Async Jobs

AsyncJobs can be used to poll the Nucleus backend for the status and errors of a long-running job.

You can also check on your async jobs in the dashboard: https://dashboard.scale.com/nucleus/jobs.

Retrieving an AsyncJob

Many operations in Nucleus, such as Dataset.append can be configured to return an AsyncJob by setting the parameterasynchronous=True.

import nucleus

client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
dataset = client.get_dataset("YOUR_DATASET_ID")

# When kicking off an asynchronous job, store the return value as a variable
job = dataset.append(items=YOUR_DATASET_ITEMS, asynchronous=True)

We can now use job to poll for status and errors via the Nucleus API.

If you already invoked a job but no longer have its associated AsyncJob object, you can retrieve it by job_id (guaranteed to be unique):

  1. List all jobs with NucleusClient.list_jobs()
    • Set the show_completed and date_limit parameters as needed
  2. Filter through the list for the target job_id
    A simpler method of reconstructing the AsyncJob by job_id is in active development and will be available soon!
import nucleus

client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")

all_jobs = client.list_jobs(show_completed=True)

for job in all_jobs:
  if job.job_id == "YOUR_TARGET_JOB_ID":
    break

We can now use the above job to poll for status and errors.

Check Status

You can poll AsyncJobs for their status at any time (status()).

>>> job.status()
{
    "job_id": "job_c19xcf9mkws46gah0000",
    "status": "Completed",
    "message": "Job completed successfully.",
    "job_progress": "0.33",
    "completed_steps": "1",
    "total_steps:": "3",
}

Check Errors

You can list the 10,000 most recent errors of an AsyncJob at any time (errors()). This is useful for debugging larger payloads containing multiple items, annotations, etc.

>>> job.errors()
[
    '{"annotation":{"label":"car","type":"box","geometry":{"x":50,"y":60,"width":70,"height":80},"referenceId":"bad_ref_id","annotationId":"attempted_annot_upload","metadata":{}},"error":"Item with id bad_ref_id doesn't exist."}'
]

Sleep Until Completion

You can use AsyncJobs to block further processing until it completes or errors (sleep_until_complete()). This is useful to create dependencies, e.g. block a Dataset.annotate operation on Dataset.append. Note we are actively developing more intelligent blocking logic to alleviate such issues.

import nucleus
from nucleus.job import JobError

client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
dataset = client.get_dataset("YOUR_DATASET_ID")

# ... construct some DatasetItems ...

# ... construct some Annotations ...

append_job = dataset.append(items=DATASETITEMS, asynchronous=True)
try:
  append_job.sleep_until_complete(verbose_std_out=True)
except JobError as e:
  print("Failed to upload items!")
  print(f"All job errors:\n{append_job.errors()}")
  raise e

annot_job = dataset.annotate(annotations=ANNOTATIONS, asynchronous=True)
try:
  annot_job.sleep_until_complete(verbose_std_out=True)
except JobError as e:
  print("Failed to upload annotations!")
  print(f"All job errors:\n{annot_job.errors()}")
  raise e