lancium.api.Job
Job
This is an object that represents a computational job.
This class interacts with Lancium Jobs. It allows you to create, update, run, terminate, suspend, interact with the working directory of a running job, and delete jobs.
Args:
__key
(string): auth keyid
(int, should not pass): Job IDname
(string): Job namenotes
(string): Job descriptionstatus
(string): Job statusqos
(string): Job quality of service (high, low, medium)command_line
(string): command line argument for Jobimage
(string): Image to use for Jobresources
(string): Job resources (CPU's, GPU's, Scratch, etc.)max_run_time
(int): Job's max run time in seconds (up to a month)expected_run_time
(int): expected run time of Jobinput_files
(JobInput): input files for Joboutput_files
(File): output files available after Job completioncallback_url
(string): url to send Job updates tompi
(boolean): boolean flag for whether the Job runs MPImpi_version
(string): which flavor of MPI to usetasks
(int): how many MPI tasks totaltasks_per_node
(int): how many MPI tasks per node
Helper Method -- Handles status checking for server response objects. Checks for the correct HTTP response code, and throws the appropriate exception if the response code does not match, or an invalid server response is received.
ARGS:
res (response object)
: response object from the serverdata (json)
: json of payload for the api callargs (boolean)
: boolean for whether we are argument checkingres (HttpResponse)
: HTTP response to requestcode (int)
: expected HTTP response codekey (string, optional)
: expected dictonary key to check
RETURNS: Nothing unless the request is a status code 422 in which case it returns list with the server response and a boolean stating that it is a directory.
Helper Method -- Handles argument checking
ARGS:
name (string)
: job namesource_type (string)
: job input source_typesource
: job input source
RETURNS: Nothing
Retrieve all Jobs.
GET /jobs
ARGS:
**kwargs (dictionary)
: can contain auth key to perform this method using a different account. {'auth': ANOTHER_API_KEY}
Returns: list: list of Jobs
from lancium.api.Job import Job
all = Job().all()
print(all)
### OUTPUT BELOW
[<lancium.api.Job.Job object at 0x7f5861683220>,..., <lancium.api.Job.Job object at 0x7f5861681570>]
### Recommend flattening each Job to a dictionary of job attributes
Create a new Job prior to submission.
POST /jobs
Args:
name (string)
: job namenotes (string)
: job descriptionaccount (string)
: string for internal account billingqos (string)
: quality of servicecommand_line (string)
: command line argumentimage (string)
: base image for container to run job onresources (dict)
: dictionary containing the fieldscore_count (int)
gpu (string)
vram (int)
gpu_count (int)
memory (int)
scratch (int)
max_run_time (int)
: max run time for job (in seconds)Limit
: 30 days
expected_run_time (int)
: expected run time of job (in seconds)input_files (list of JobInput)
: input files for Job wrapped in a Job_Input object.output_files (tuple)
: expected output file(s) from job- Format: (‘output_file1.txt{:name_to_save_as_in_storage.txt}, …)
- The destination in persistent storage is optional.
callback_url (string)
: Webhook URL to receive updates when job status changesenvironment (tuple of strings)
: tuple of environment variables to set for job- Format: (‘var1=def1’, ‘var2=def2’,...)
kwargs(dictionary)
: can contain auth key if you would like to perform this method using a different account. {'auth': ANOTHER_API_KEY}
Returns: Job: Job object
from lancium.api.Job import Job
params = {'name': 'test', 'command_line': 'ls'}
job = Job().create(**params)
print(job.name)
print(job.command_line)
### OUTPUT BELOW
'test'
'ls'
Get a Job object by ID.
GET /jobs/
Args:
id (int)
: job idkwargs(dictionary)
: can contain auth key if you would like to perform this method using a different account. {'auth': ANOTHER_API_KEY}
RETURNS: Job: a Job object
from lancium.api.Job import Job
params = {'name': 'test', 'command_line': 'ls'}
job = Job().create(**params)
id = job.id
job2 = Job().get(id)
print(id)
print(job2.id)
### OUTPUT BELOW
'58317'
'58317'
Update an existing job if it is not running, errored, or finished (update an existing job if it is in a “created” state.
PUT /jobs/
Args:
name (string)
: job namenotes (string)
: job descriptionaccount (string)
: string for internal account billingqos (string)
: quality of servicecommand_line (string)
: command line argumentimage (string)
: base image for container to run job onresources (dict)
: dictionary containing the fieldscore_count (int)
gpu (string)
vram (int)
gpu_count (int)
memory (int)
scratch (int)
max_run_time (int)
: max run time for job (in seconds)Limit
: 30 days
expected_run_time (int)
: expected run time of job (in seconds)input_files (list of JobInput)
: input files for Job wrapped in a Job_Input object.output_files (tuple)
: expected output file(s) from job- Format: (‘output_file1.txt{:name_to_save_as_in_storage.txt}, …)
- The destination in persistent storage is optional.
callback_url (string)
: Webhook URL to receive updates when job status changesenvironment (tuple of strings)
: tuple of environment variables to set for job- Format: (‘var1=def1’, ‘var2=def2’,...)
kwargs(dictionary)
: can contain auth key if you would like to perform this method using a different account. {'auth': ANOTHER_API_KEY}
RETURNS: None: None
from lancium.api.Job import Job
job = Job().get(58317)
to_update = {'core_count': 48}
job.update(**to_update)
print(job.__dict__)
### OUTPUT BELOW
{'id': 58317, 'name': 'test', 'notes': None, 'account': None, 'status': 'created', 'qos': 'high', 'command_line': 'ls', 'image': None, 'resources': {'core_count': 48, 'gpu_count': None, 'memory': 96, 'gpu': None, 'scratch': None}, 'max_run_time': 259200, 'expected_run_time': None, 'input_files': [], 'output_files': [], 'callback_url': None, 'mpi': None, 'mpi_version': None, 'tasks': None, 'tasks_per_node': None, 'created_at': '2022-07-01T14:00:07.137Z', 'updated_at': '2022-07-01T14:18:30.036Z', 'submitted_at': None, 'completed_at': None}
Refresh attributes of a Job object. Overwrites all existing attributes.
GET /jobs/
ARGS: None (None)
RETURNS: None: None
from lancium.api.Job import Job
kwargs = {'name': 'this is my name', 'image': 'lancium/ubuntu', 'command': 'ls', 'cores': '6', 'mem': '12'}
job = Job.create(**kwargs)
print(job.name)
job.name = 'this is NOT my name'
print(job.name)
job.refresh()
print(job.name)
###OUTPUT BELOW
this is my name
this is NOT my name
this is my name
Submit a job for execution.
POST /jobs/
ARGS: None (None)
RETURNS: None: None
from lancium.api.Job import Job
job = Job().get(58317)
print(job.status)
to_update = {'image': 'lancium/ubuntu'}
job.update(**to_update)
job.submit()
job.get(58317)
print(job.status)
### OUTPUT BELOW
'created'
'submitted'
Get an output file from a job. If download path is provided, the file is saved at the path specified.
GET /jobs/
Args:
file_path (string)
: file path from the jobdownload_path (string, optional)
: location (directory) to save the file
Returns: res (requests.Response): response object from server
from lancium.api.Job import Job
import os
job = Job().get(58317)
res = job.output_get(file_path='stdout.txt', download_path='.')
os.system('cat stdout.txt')
### OUTPUT BELOW
'JOBNAME'
'qsub7683928698168045722.sh'
'rusage-running.json'
'stderr.txt'
'stdout.txt'
Terminate a running job.
POST /jobs/
Args: None (None)
Returns: None: None
from lancium.api.Job import Job
params = {'name': 'test', 'command_line': 'sleep(60)', 'image': 'lancium/ubuntu'}
job = Job().create(**params)
print(job.id)
job.submit()
job.terminate()
### OUTPUT BELOW
99872
Delete a job.
DELETE /jobs/
Args:
id (int): Job ID
Returns: None: None
# DELETE JOBS WITH ERROR STATUs
from lancium.api.Job import Job
from lancium.errors.common import *
kwargs = {'name': 'This is a test job', 'image': 'lancium/ubuntu', 'command': 'ls', 'notes': 'this is a note'}
job = Job.create(**kwargs)
job_id = job.id
print(job_id)
Job.delete(job_id)
try:
Job.get(job_id)
except ResourceNotFoundError:
print('Oh no, the resource was not found.')
###OUTPUT BELOW
94442
Oh no, the resource was not found.
Delete a job.
DELETE /jobs/
ARGS: None (None)
RETURNS: None: None
from lancium.api.Job import Job
kwargs = {'name': 'This is a test job', 'image': 'lancium/ubuntu', 'command': 'ls', 'notes': 'this is a note'}
job = Job.create(**kwargs)
print(job.name)
print(job.notes)
job.destroy()
print(job.name)
print(job.notes)
###OUTPUT BELOW
This is a test job
this is a note
None
None
Add input data for a job.
POST /jobs/
Args:
name (string)
: filename in the job working directorysource_type (string)
: one of these options ('file', 'data', 'url')source (string)
: source location of the input datacache (bool, optional)
: if True, the file is copied to the node during execution as READ ONLYjwd_path (string, optional)
: path to the job working directory areaforce (boolean, optional)
: force add_data if an object exists at specified jwd_path
Returns: JobInput: Job input object
See what files/directories are at the specified folder_path
HEAD /jobs/
ARGS:
folder_path (string)
: path to file/folder in the working directory of a running job
RETURNS: Response object: Response Object from the Server
import os
from lancium.api.Job import Job
from lancium.errors.common import *
from time import sleep
import click
kwargs = {'name': 'This is a test job', 'image': 'lancium/ubuntu', 'command_line': 'bash bashscript.sh', 'input_files': str(os.path.abspath('bashscript.sh'))}
job = Job.create(**kwargs)
file_ = os.path.abspath('bashscript.sh')
filename = os.path.basename(file_)
job_input = job.add_data(
name=filename,
source_type="file",
source=file_,
cache=False)
progress = click.progressbar(
length=100,
show_percent=True,
label="Uploading input data...",
fill_char=u'█',
empty_char=' ',
bar_template="%(label)s |%(bar)s| %(info)s"
)
with progress:
def progress_callback(
current_chunk,
total_chunks):
progress.pos = int(
100.0 * current_chunk / total_chunks)
progress.update(0)
upload_result = job_input.upload(file_, progress_callback)
job_input.chunks_received = upload_result["chunks_received"]
job_input.upload_complete = upload_result["upload_complete"]
job.submit()
while job.status != 'running':
job.refresh()
sleep(3)
sleep(30)
res = job.get_jwd()
print(res.__dict__)
###OUTPUT BELOW
Uploading input data... | | 0%
{'_content': b'{"contents":[{"name":"bashscript.sh","is_directory":false,"size":"74","last_modified":"2022-10-05T21:05:46.305+00:00","created":"2022-10-05T21:03:44.000+00:00"},{"name":"stdout.txt","is_directory":false,"size":"362","last_modified":"2022-10-05T21:05:46.352+00:00","created":"2022-10-05T21:05:45.000+00:00"},{"name":"stderr.txt","is_directory":false,"size":"527","last_modified":"2022-10-05T21:05:46.396+00:00","created":"2022-10-05T21:03:48.000+00:00"},{"name":"JOBNAME","is_directory":false,"size":"21","last_modified":"2022-10-05T21:05:46.443+00:00","created":"2022-10-05T21:03:45.000+00:00"},{"name":"rusage-running.json","is_directory":false,"size":"254","last_modified":"2022-10-05T21:05:46.486+00:00","created":"2022-10-05T21:04:46.000+00:00"},{"name":".singularityEnv","is_directory":false,"size":"169","last_modified":"2022-10-05T21:05:46.529+00:00","created":"2022-10-05T21:03:46.000+00:00"},{"name":".bes-info","is_directory":false,"size":"54","last_modified":"2022-10-05T21:05:46.573+00:00","created":"2022-10-05T21:03:43.000+00:00"},{"name":"hosts.txt","is_directory":false,"size":"12","last_modified":"2022-10-05T21:05:46.615+00:00","created":"2022-10-05T21:03:46.000+00:00"},{"name":".genesisII-bes-state","is_directory":true,"size":null,"last_modified":null,"created":null},{"name":"qsub658428181427086480.sh","is_directory":false,"size":"1505","last_modified":"2022-10-05T21:05:46.659+00:00","created":"2022-10-05T21:03:46.000+00:00"}]}', '_content_consumed': True, '_next': None, 'status_code': 200, 'headers': {'Server': 'nginx/1.17.0', 'Date': 'Wed, 05 Oct 2022 21:05:46 GMT', 'Content-Type': 'application/json; charset=utf-8', 'Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Frame-Options': 'SAMEORIGIN', 'X-XSS-Protection': '1; mode=block', 'X-Content-Type-Options': 'nosniff', 'X-Download-Options': 'noopen', 'X-Permitted-Cross-Domain-Policies': 'none', 'Referrer-Policy': 'strict-origin-when-cross-origin', 'X-Current-API-Version': '1.5.0', 'X-Deprecated-API-Used': 'false', 'X-Object-Type': 'directory', 'Vary': 'Accept,Accept-Encoding', 'ETag': 'W/"598de4181d6688a1e257b3fdc4ec537f"', 'Cache-Control': 'max-age=0, private, must-revalidate', 'X-Request-Id': '69e6ea5f-3d04-47ab-9570-4e1a35be2ad4', 'X-Runtime': '0.735164', 'Content-Encoding': 'gzip'}, 'raw': <urllib3.response.HTTPResponse object at 0x7fed2659a4a0>, 'url': 'https://portal.lancium.com/api/v1/jobs/94521/working_directory/', 'encoding': 'utf-8', 'history': [], 'reason': 'OK', 'cookies': <RequestsCookieJar[]>, 'elapsed': datetime.timedelta(microseconds=786806), 'request': <PreparedRequest [GET]>, 'connection': <requests.adapters.HTTPAdapter object at 0x7fed2697da50>}
### BASHSCRIPT.SH
#!/bin/bash
i=0
while [[ $i -lt 600 ]]
do
echo "$i"
sleep 1
((i++))
done
Download a file from the job working directory.
GET jobs/
ARGS:
file_path (string)
: path to file in the job working directorydownload_path (string)
: path in local file system to download tobyte_range_start (int, nullable)
: specified start of the byte range user would like to see or download frombyte_range_end (int, nullable)
: specified start of the byte range user would like to see or download tocallback (function)
: function to handle the byte chunks of the file user would like to see. function(status(str), progress (percentage), chunk_of_bytes)
RETURNS: String: Range of bytes that have been downloaded. If the status code is 200, will return the "X-File-Size".
import os
from lancium.api.Job import Job
from time import sleep
import click
kwargs = {'name': 'This is a test job', 'image': 'lancium/ubuntu', 'command_line': 'bash bashscript.sh', 'input_files': str(os.path.abspath('bashscript.sh'))}
job = Job.create(**kwargs)
file_ = os.path.abspath('bashscript.sh')
filename = os.path.basename(file_)
job_input = job.add_data(
name=filename,
source_type="file",
source=file_,
cache=False)
progress = click.progressbar(
length=100,
show_percent=True,
label="Uploading input data...",
fill_char=u'█',
empty_char=' ',
bar_template="%(label)s |%(bar)s| %(info)s"
)
def progress_callback(
current_chunk,
total_chunks):
progress.pos = int(
50.0 * current_chunk / total_chunks)
progress.update(50.0 * current_chunk / total_chunks)
upload_result = job_input.upload(file_, progress_callback)
job_input.chunks_received = upload_result["chunks_received"]
job_input.upload_complete = upload_result["upload_complete"]
job.submit()
while job.status != 'running':
job.refresh()
sleep(3)
sleep(30)
res = job.download_from_jwd(file_path='stdout.txt', download_path=str(os.getcwd()))
print(os.system('ls stdout.txt'))
###OUTPUT BELOW
Uploading input data... |████████████████████████████████████| 100%
stdout.txt
0
### Bashscript.sh below
#!/bin/bash
i=0
while [[ $i -lt 600 ]]
do
echo "$i"
sleep 1
((i++))
done
Add input data to jwd.
POST /jobs/
Args:
source_type (string)
: source type of the data to be uploaded ('input_file', 'input_data', 'data_recursive', 'input_url')source (string)
: file (path in the local file system), input_data (path in the persistent storage area), data_recursive (path in the persistent storage area), url (url)jwd_path (string)
: path in the job working directory to upload toforce (bool)
: if there is a file already at the jwd_path, upload anyway if set to 'True'callback (func, optional)
: called after each chunk is successfully uploaded, accepts arguments in the format of (file_size, file_start, total_chunks, current_chunk)
Returns: Response Object: Response Object from the Server
import os
from lancium.api.Job import Job
from lancium.errors.common import *
from time import sleep
import click
kwargs = {'name': 'This is a test job', 'image': 'lancium/ubuntu', 'command_line': 'bash bashscript.sh', 'input_files': str(os.path.abspath('bashscript.sh'))}
job = Job.create(**kwargs)
file_ = os.path.abspath('bashscript.sh')
filename = os.path.basename(file_)
job_input = job.add_data(
name=filename,
source_type="file",
source=file_,
cache=False)
progress = click.progressbar(
length=100,
show_percent=True,
label="Uploading input data...",
fill_char=u'█',
empty_char=' ',
bar_template="%(label)s |%(bar)s| %(info)s"
)
with progress:
def progress_callback(
current_chunk,
total_chunks):
progress.pos = int(
100.0 * current_chunk / total_chunks)
progress.update(0)
upload_result = job_input.upload(file_, progress_callback)
job_input.chunks_received = upload_result["chunks_received"]
job_input.upload_complete = upload_result["upload_complete"]
job.submit()
while job.status != 'running':
job.refresh()
sleep(3)
sleep(30)
def callback(current_chunk, total_chunks):
pass
res = job.upload_to_jwd(source_type='file', source=str(os.path.abspath('asd.py')), jwd_path='asd.py', force=True, callback=callback)
print(os.system('ls asd.py'))
###OUTPUT BELOW
Uploading input data... | | 0%
asd.py
0