Using pypet with SAGA-Python¶
This example shows how to use pypet in combination with SAGA Python. It shows how to establish an ssh connection to a given server (start_saga.py) and then
- Upload all necessary scripts
- Start several batches of trajectories
- Merge all trajectories into a single one
There are only a few modification necessary to switch from just using ssh to
actually submitting jobs on cluster (like a Sun Grid Engine with qsub
), see the
SAGA Python documentation.
To run the example, you only need to add your server address, user name, password, and
working directory (on the server) to the start_saga.py file and then
execute python start_saga.py
. the_task.py and merge_trajs
are used on the server side and you don’t need to touch these at all, but they need to
be in the same folder as your start_saga.py file.
Download: start_saga.py
Download: the_task.py
Download: merge_trajs.py
Start Up Script¶
"""Example how to use pypet with SAGA python
The example is based on `ssh` but using a cluster is almost analogous.
For examples on how to submit jobs to cluster with SAGA python check
the documentation: http://saga-python.readthedocs.org/en/latest/
We run `the_task` in batches and create several trajectories,
later on we simply merge the batches into a single trajectory.
"""
import sys
import saga
from saga.filesystem import OVERWRITE
import os
import traceback
ADDRESS = '12345.fake.street' # Address of your server
USER = 'user' # Username
PASSWORD = '12345' # That's amazing I got the same combination on my luggage!
WORKING_DIR = '/myhome/' # Your working directory
def upload_file(filename, session):
""" Uploads a file """
print('Uploading file %s' % filename)
outfilesource = os.path.join(os.getcwd(), filename)
outfiletarget = 'sftp://' + ADDRESS + WORKING_DIR
out = saga.filesystem.File(outfilesource, session=session, flags=OVERWRITE)
out.copy(outfiletarget)
print('Transfer of `%s` to `%s` successful' % (filename, outfiletarget))
def download_file(filename, session):
""" Downloads a file """
print('Downloading file %s' % filename)
infilesource = os.path.join('sftp://' + ADDRESS + WORKING_DIR,
filename)
infiletarget = os.path.join(os.getcwd(), filename)
incoming = saga.filesystem.File(infilesource, session=session, flags=OVERWRITE)
incoming.copy(infiletarget)
print('Transfer of `%s` to `%s` successful' % (filename, infiletarget))
def create_session():
""" Creates and returns a new SAGA session """
ctx = saga.Context("UserPass")
ctx.user_id = USER
ctx.user_pass = PASSWORD
session = saga.Session()
session.add_context(ctx)
return session
def merge_trajectories(session):
""" Merges all trajectories found in the working directory """
jd = saga.job.Description()
jd.executable = 'python'
jd.arguments = ['merge_trajs.py']
jd.output = "mysagajob_merge.stdout"
jd.error = "mysagajob_merge.stderr"
jd.working_directory = WORKING_DIR
js = saga.job.Service('ssh://' + ADDRESS, session=session)
myjob = js.create_job(jd)
print "\n...starting job...\n"
# Now we can start our job.
myjob.run()
print "Job ID : %s" % (myjob.id)
print "Job State : %s" % (myjob.state)
print "\n...waiting for job...\n"
# wait for the job to either finish or fail
myjob.wait()
print "Job State : %s" % (myjob.state)
print "Exitcode : %s" % (myjob.exit_code)
def start_jobs(session):
""" Starts all jobs and runs `the_task.py` in batches. """
js = saga.job.Service('ssh://' + ADDRESS, session=session)
batches = range(3)
jobs = []
for batch in batches:
print('Starting batch %d' % batch)
jd = saga.job.Description()
jd.executable = 'python'
jd.arguments = ['the_task.py --batch=' + str(batch)]
jd.output = "mysagajob.stdout" + str(batch)
jd.error = "mysagajob.stderr" + str(batch)
jd.working_directory = WORKING_DIR
myjob = js.create_job(jd)
print "Job ID : %s" % (myjob.id)
print "Job State : %s" % (myjob.state)
print "\n...starting job...\n"
myjob.run()
jobs.append(myjob)
for myjob in jobs:
print "Job ID : %s" % (myjob.id)
print "Job State : %s" % (myjob.state)
print "\n...waiting for job...\n"
# wait for the job to either finish or fail
myjob.wait()
print "Job State : %s" % (myjob.state)
print "Exitcode : %s" % (myjob.exit_code)
def main():
try:
session = create_session()
upload_file('the_task.py', session)
upload_file('merge_trajs.py', session)
# download_file('saga_0.hdf5', session) # currently buggy, wait for SAGA python update
# To see the resulting file manually download it from the server!
start_jobs(session)
merge_trajectories(session)
return 0
except saga.SagaException as ex:
# Catch all saga exceptions
print "An exception occured: (%s) %s " % (ex.type, (str(ex)))
# Trace back the exception. That can be helpful for debugging.
traceback.print_exc()
return -1
if __name__ == "__main__":
sys.exit(main())
The Experiment File¶
__author__ = 'Robert Meyer'
import numpy as np
import inspect
import getopt
import sys
from pypet import Environment, Parameter, ArrayParameter, Trajectory
def euler_scheme(traj, diff_func):
"""Simulation function for Euler integration.
:param traj:
Container for parameters and results
:param diff_func:
The differential equation we want to integrate
"""
steps = traj.steps
initial_conditions = traj.initial_conditions
dimension = len(initial_conditions)
# This array will collect the results
result_array = np.zeros((steps,dimension))
# Get the function parameters stored into `traj` as a dictionary
# with the (short) names as keys :
func_params_dict = traj.func_params.f_to_dict(short_names=True, fast_access=True)
# Take initial conditions as first result
result_array[0] = initial_conditions
# Now we compute the Euler Scheme steps-1 times
for idx in range(1,steps):
result_array[idx] = diff_func(result_array[idx-1], **func_params_dict) * traj.dt + \
result_array[idx-1]
# Note the **func_params_dict unzips the dictionary, it's the reverse of **kwargs in function
# definitions!
#Finally we want to keep the results
traj.f_add_result('euler_evolution', data=result_array, comment='Our time series data!')
class FunctionParameter(Parameter):
# We need to override the `f_set` function and simply extract the the source code if our
# item is callable and store this instead.
def f_set(self, data):
if callable(data):
data = inspect.getsource(data)
return super(FunctionParameter, self).f_set(data)
def add_parameters(traj):
"""Adds all necessary parameters to the `traj` container"""
traj.f_add_parameter('steps', 10000, comment='Number of time steps to simulate')
traj.f_add_parameter('dt', 0.01, comment='Step size')
# Here we want to add the initial conditions as an array parameter. We will simulate
# a 3-D differential equation, the Lorenz attractor.
traj.f_add_parameter(ArrayParameter,'initial_conditions', np.array([0.1,0.2,0.3]),
comment = 'Our initial conditions, as default we will start from'
' origin!')
# We will group all parameters of the Lorenz differential equation into the group 'func_params'
traj.f_add_parameter('func_params.sigma', 10.0)
traj.f_add_parameter('func_params.beta', 8.0/3.0)
traj.f_add_parameter('func_params.rho', 28.0)
#For the fun of it we will annotate the group
traj.func_params.v_annotations.info='This group contains as default the original values chosen ' \
'by Edward Lorenz in 1963. Check it out on wikipedia ' \
'(https://en.wikipedia.org/wiki/Lorenz_attractor)!'
def diff_lorenz(value_array, sigma, beta, rho):
"""The Lorenz attractor differential equation
:param value_array: 3d array containing the x,y, and z component values.
:param sigma: Constant attractor parameter
:param beta: FConstant attractor parameter
:param rho: Constant attractor parameter
:return: 3d array of the Lorenz system evaluated at `value_array`
"""
diff_array = np.zeros(3)
diff_array[0] = sigma * (value_array[1]-value_array[0])
diff_array[1] = value_array[0] * (rho - value_array[2]) - value_array[1]
diff_array[2] = value_array[0] * value_array[1] - beta * value_array[2]
return diff_array
def get_batch():
"""Function that parses the batch id from the command line arguments"""
optlist, args = getopt.getopt(sys.argv[1:], '', longopts='batch=')
batch = 0
for o, a in optlist:
if o == '--batch':
batch = int(a)
print 'Found batch %d' % batch
return batch
def explore_batch(traj, batch):
"""Chooses exploration according to `batch`"""
explore_dict = {}
explore_dict['sigma'] = np.arange(10.0 * batch, 10.0*(batch+1), 1.0).tolist()
# for batch = 0 explores sigma in [0.0, 1.0, 2.0, ..., 9.0],
# for batch = 1 explores sigma in [10.0, 11.0, 12.0, ..., 19.0]
# and so on
traj.f_explore(explore_dict)
# And here goes our main function
def main():
batch = get_batch()
filename = 'saga_%s.hdf5' % str(batch)
env = Environment(trajectory='Example_22_Euler_Integration_%s' % str(batch),
filename=filename,
file_title='Example_22_Euler_Integration',
comment='Go for Euler!',
overwrite_file=True,
multiproc=True, # Yes we can use multiprocessing within each batch!
ncores=4)
traj = env.trajectory
trajectory_name = traj.v_name
# 1st a) phase parameter addition
add_parameters(traj)
# 1st b) phase preparation
# We will add the differential equation (well, its source code only) as a derived parameter
traj.f_add_derived_parameter(FunctionParameter,'diff_eq', diff_lorenz,
comment='Source code of our equation!')
# explore the trajectory
explore_batch(traj, batch)
# 2nd phase let's run the experiment
# We pass `euler_scheme` as our top-level simulation function and
# the Lorenz equation 'diff_lorenz' as an additional argument
env.run(euler_scheme, diff_lorenz)
if __name__ == '__main__':
main()
Script to merge Trajectories¶
__author__ = 'Robert Meyer'
import os
from pypet import merge_all_in_folder
from the_task import FunctionParameter
def main():
"""Simply merge all trajectories in the working directory"""
folder = os.getcwd()
print('Merging all files')
merge_all_in_folder(folder,
delete_other_files=True, # We will only keep one trajectory
dynamic_imports=FunctionParameter,
backup=False)
print('Done')
if __name__ == '__main__':
main()