Async Jobs
AsyncJobs can be used to poll the Nucleus backend for the status and errors of a long-running job.
You can also check on your async jobs in the dashboard: https://dashboard.scale.com/nucleus/jobs.
Retrieving an AsyncJob
AsyncJobMany operations in Nucleus, such as Dataset.append can be configured to return an AsyncJob by setting the parameterasynchronous=True.
import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
dataset = client.get_dataset("YOUR_DATASET_ID")
# When kicking off an asynchronous job, store the return value as a variable
job = dataset.append(items=YOUR_DATASET_ITEMS, asynchronous=True)
We can now use job to poll for status and errors via the Nucleus API.
If you already invoked a job but no longer have its associated AsyncJob object, you can retrieve it by job_id (guaranteed to be unique):
- List all jobs with
NucleusClient.list_jobs()- Set the
show_completedanddate_limitparameters as needed
- Set the
- Filter through the list for the target
job_id
A simpler method of reconstructing theAsyncJobbyjob_idis in active development and will be available soon!
import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
all_jobs = client.list_jobs(show_completed=True)
for job in all_jobs:
if job.job_id == "YOUR_TARGET_JOB_ID":
break
We can now use the above job to poll for status and errors.
Check Status
You can poll AsyncJobs for their status at any time (status()).
>>> job.status()
{
"job_id": "job_c19xcf9mkws46gah0000",
"status": "Completed",
"message": "Job completed successfully.",
"job_progress": "0.33",
"completed_steps": "1",
"total_steps:": "3",
}
Check Errors
You can list the 10,000 most recent errors of an AsyncJob at any time (errors()). This is useful for debugging larger payloads containing multiple items, annotations, etc.
>>> job.errors()
[
'{"annotation":{"label":"car","type":"box","geometry":{"x":50,"y":60,"width":70,"height":80},"referenceId":"bad_ref_id","annotationId":"attempted_annot_upload","metadata":{}},"error":"Item with id bad_ref_id doesn't exist."}'
]
Sleep Until Completion
You can use AsyncJobs to block further processing until it completes or errors (sleep_until_complete()). This is useful to create dependencies, e.g. block a Dataset.annotate operation on Dataset.append. Note we are actively developing more intelligent blocking logic to alleviate such issues.
import nucleus
from nucleus.job import JobError
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
dataset = client.get_dataset("YOUR_DATASET_ID")
# ... construct some DatasetItems ...
# ... construct some Annotations ...
append_job = dataset.append(items=DATASETITEMS, asynchronous=True)
try:
append_job.sleep_until_complete(verbose_std_out=True)
except JobError as e:
print("Failed to upload items!")
print(f"All job errors:\n{append_job.errors()}")
raise e
annot_job = dataset.annotate(annotations=ANNOTATIONS, asynchronous=True)
try:
annot_job.sleep_until_complete(verbose_std_out=True)
except JobError as e:
print("Failed to upload annotations!")
print(f"All job errors:\n{annot_job.errors()}")
raise e
Updated almost 4 years ago