Async Jobs
AsyncJobs
can be used to poll the Nucleus backend for the status and errors of a long-running job.
You can also check on your async jobs in the dashboard: https://dashboard.scale.com/nucleus/jobs.
Retrieving an AsyncJob
AsyncJob
Many operations in Nucleus, such as Dataset.append
can be configured to return an AsyncJob
by setting the parameterasynchronous=True
.
import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
dataset = client.get_dataset("YOUR_DATASET_ID")
# When kicking off an asynchronous job, store the return value as a variable
job = dataset.append(items=YOUR_DATASET_ITEMS, asynchronous=True)
We can now use job
to poll for status and errors via the Nucleus API.
If you already invoked a job but no longer have its associated AsyncJob
object, you can retrieve it by job_id
(guaranteed to be unique):
- List all jobs with
NucleusClient.list_jobs()
- Set the
show_completed
anddate_limit
parameters as needed
- Set the
- Filter through the list for the target
job_id
A simpler method of reconstructing theAsyncJob
byjob_id
is in active development and will be available soon!
import nucleus
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
all_jobs = client.list_jobs(show_completed=True)
for job in all_jobs:
if job.job_id == "YOUR_TARGET_JOB_ID":
break
We can now use the above job
to poll for status and errors.
Check Status
You can poll AsyncJobs
for their status at any time (status()
).
>>> job.status()
{
"job_id": "job_c19xcf9mkws46gah0000",
"status": "Completed",
"message": "Job completed successfully.",
"job_progress": "0.33",
"completed_steps": "1",
"total_steps:": "3",
}
Check Errors
You can list the 10,000 most recent errors of an AsyncJob
at any time (errors()
). This is useful for debugging larger payloads containing multiple items, annotations, etc.
>>> job.errors()
[
'{"annotation":{"label":"car","type":"box","geometry":{"x":50,"y":60,"width":70,"height":80},"referenceId":"bad_ref_id","annotationId":"attempted_annot_upload","metadata":{}},"error":"Item with id bad_ref_id doesn't exist."}'
]
Sleep Until Completion
You can use AsyncJobs
to block further processing until it completes or errors (sleep_until_complete()
). This is useful to create dependencies, e.g. block a Dataset.annotate
operation on Dataset.append
. Note we are actively developing more intelligent blocking logic to alleviate such issues.
import nucleus
from nucleus.job import JobError
client = nucleus.NucleusClient("YOUR_SCALE_API_KEY")
dataset = client.get_dataset("YOUR_DATASET_ID")
# ... construct some DatasetItems ...
# ... construct some Annotations ...
append_job = dataset.append(items=DATASETITEMS, asynchronous=True)
try:
append_job.sleep_until_complete(verbose_std_out=True)
except JobError as e:
print("Failed to upload items!")
print(f"All job errors:\n{append_job.errors()}")
raise e
annot_job = dataset.annotate(annotations=ANNOTATIONS, asynchronous=True)
try:
annot_job.sleep_until_complete(verbose_std_out=True)
except JobError as e:
print("Failed to upload annotations!")
print(f"All job errors:\n{annot_job.errors()}")
raise e
Updated almost 3 years ago