Using pypet with SAGA-Python

This example shows how to use pypet in combination with SAGA Python. It shows how to establish an ssh connection to a given server (start_saga.py) and then

  1. Upload all necessary scripts
  2. Start several batches of trajectories
  3. Merge all trajectories into a single one

There are only a few modification necessary to switch from just using ssh to actually submitting jobs on cluster (like a Sun Grid Engine with qsub), see the SAGA Python documentation.

To run the example, you only need to add your server address, user name, password, and working directory (on the server) to the start_saga.py file and then execute python start_saga.py. the_task.py and merge_trajs are used on the server side and you don’t need to touch these at all, but they need to be in the same folder as your start_saga.py file.

Download: start_saga.py

Download: the_task.py

Download: merge_trajs.py

Start Up Script

"""Example how to use pypet with SAGA python

The example is based on `ssh` but using a cluster is almost analogous.
For examples on how to submit jobs to cluster with SAGA python check
the documentation: http://saga-python.readthedocs.org/en/latest/

We run `the_task` in batches and create several trajectories,
later on we simply merge the batches into a single trajectory.

"""

import sys
import saga
from saga.filesystem import OVERWRITE
import os
import traceback


ADDRESS = '12345.fake.street'  # Address of your server
USER = 'user'  # Username
PASSWORD = '12345'  # That's amazing I got the same combination on my luggage!
WORKING_DIR = '/myhome/'  # Your working directory


def upload_file(filename, session):
    """ Uploads a file """
    print('Uploading file %s' % filename)
    outfilesource = os.path.join(os.getcwd(), filename)
    outfiletarget = 'sftp://' + ADDRESS + WORKING_DIR
    out = saga.filesystem.File(outfilesource, session=session, flags=OVERWRITE)
    out.copy(outfiletarget)
    print('Transfer of `%s` to `%s` successful' % (filename, outfiletarget))


def download_file(filename, session):
    """ Downloads a file """
    print('Downloading file %s' % filename)
    infilesource = os.path.join('sftp://' + ADDRESS + WORKING_DIR,
                                 filename)
    infiletarget = os.path.join(os.getcwd(), filename)
    incoming = saga.filesystem.File(infilesource, session=session, flags=OVERWRITE)
    incoming.copy(infiletarget)
    print('Transfer of `%s` to `%s` successful' % (filename, infiletarget))


def create_session():
    """ Creates and returns a new SAGA session """
    ctx = saga.Context("UserPass")
    ctx.user_id = USER
    ctx.user_pass = PASSWORD

    session = saga.Session()
    session.add_context(ctx)

    return session


def merge_trajectories(session):
    """ Merges all trajectories found in the working directory """
    jd = saga.job.Description()

    jd.executable      = 'python'
    jd.arguments       = ['merge_trajs.py']
    jd.output          = "mysagajob_merge.stdout"
    jd.error           = "mysagajob_merge.stderr"
    jd.working_directory = WORKING_DIR

    js = saga.job.Service('ssh://' + ADDRESS, session=session)
    myjob = js.create_job(jd)
    print("\n...starting job...\n")

    # Now we can start our job.
    myjob.run()
    print("Job ID    : %s" % (myjob.id))
    print("Job State : %s" % (myjob.state))

    print("\n...waiting for job...\n")
    # wait for the job to either finish or fail
    myjob.wait()

    print("Job State : %s" % (myjob.state))
    print("Exitcode  : %s" % (myjob.exit_code))


def start_jobs(session):
    """ Starts all jobs and runs `the_task.py` in batches. """

    js = saga.job.Service('ssh://' + ADDRESS, session=session)

    batches = range(3)
    jobs = []

    for batch in batches:
        print('Starting batch %d' % batch)

        jd = saga.job.Description()

        jd.executable      = 'python'
        jd.arguments       = ['the_task.py --batch=' + str(batch)]
        jd.output          = "mysagajob.stdout" + str(batch)
        jd.error           = "mysagajob.stderr" + str(batch)
        jd.working_directory = WORKING_DIR

        myjob = js.create_job(jd)

        print("Job ID    : %s" % (myjob.id))
        print("Job State : %s" % (myjob.state))

        print("\n...starting job...\n")

        myjob.run()
        jobs.append(myjob)

    for myjob in jobs:
        print("Job ID    : %s" % (myjob.id))
        print("Job State : %s" % (myjob.state))

        print("\n...waiting for job...\n")
        # wait for the job to either finish or fail
        myjob.wait()

        print("Job State : %s" % (myjob.state))
        print("Exitcode  : %s" % (myjob.exit_code))


def main():
    try:
        session = create_session()
        upload_file('the_task.py', session)
        upload_file('merge_trajs.py', session)
        # download_file('saga_0.hdf5', session)  # currently buggy, wait for SAGA python update
        # To see the resulting file manually download it from the server!

        start_jobs(session)
        merge_trajectories(session)
        return 0

    except saga.SagaException as ex:
        # Catch all saga exceptions
        print("An exception occured: (%s) %s " % (ex.type, (str(ex))))
        # Trace back the exception. That can be helpful for debugging.
        traceback.print_exc()
        return -1


if __name__ == "__main__":
    sys.exit(main())

The Experiment File

__author__ = 'Robert Meyer'

import numpy as np
import inspect
import getopt
import sys

from pypet import Environment, Parameter, ArrayParameter, Trajectory


def euler_scheme(traj, diff_func):
    """Simulation function for Euler integration.

    :param traj:

        Container for parameters and results

    :param diff_func:

        The differential equation we want to integrate

    """

    steps = traj.steps
    initial_conditions = traj.initial_conditions
    dimension = len(initial_conditions)

    # This array will collect the results
    result_array = np.zeros((steps,dimension))
    # Get the function parameters stored into `traj` as a dictionary
    # with the (short) names as keys :
    func_params_dict = traj.func_params.f_to_dict(short_names=True, fast_access=True)
    # Take initial conditions as first result
    result_array[0] = initial_conditions

    # Now we compute the Euler Scheme steps-1 times
    for idx in range(1,steps):
        result_array[idx] = diff_func(result_array[idx-1], **func_params_dict) * traj.dt + \
                            result_array[idx-1]
    # Note the **func_params_dict unzips the dictionary, it's the reverse of **kwargs in function
    # definitions!

    #Finally we want to keep the results
    traj.f_add_result('euler_evolution', data=result_array, comment='Our time series data!')


class FunctionParameter(Parameter):
    # We need to override the `f_set` function and simply extract the the source code if our
    # item is callable and store this instead.
    def f_set(self, data):
        if callable(data):
            data = inspect.getsource(data)
        return super(FunctionParameter, self).f_set(data)


def add_parameters(traj):
    """Adds all necessary parameters to the `traj` container"""

    traj.f_add_parameter('steps', 10000, comment='Number of time steps to simulate')
    traj.f_add_parameter('dt', 0.01, comment='Step size')

    # Here we want to add the initial conditions as an array parameter. We will simulate
    # a 3-D differential equation, the Lorenz attractor.
    traj.f_add_parameter(ArrayParameter,'initial_conditions', np.array([0.1,0.2,0.3]),
                         comment = 'Our initial conditions, as default we will start from'
                                   ' origin!')

    # We will group all parameters of the Lorenz differential equation into the group 'func_params'
    traj.f_add_parameter('func_params.sigma', 10.0)
    traj.f_add_parameter('func_params.beta', 8.0/3.0)
    traj.f_add_parameter('func_params.rho', 28.0)

    #For the fun of it we will annotate the  group
    traj.func_params.v_annotations.info='This group contains as default the original values chosen ' \
                                   'by Edward Lorenz in 1963. Check it out on wikipedia ' \
                                   '(https://en.wikipedia.org/wiki/Lorenz_attractor)!'


def diff_lorenz(value_array, sigma, beta, rho):
    """The Lorenz attractor differential equation

    :param value_array: 3d array containing the x,y, and z component values.
    :param sigma: Constant attractor parameter
    :param beta: FConstant attractor parameter
    :param rho: Constant attractor parameter

    :return: 3d array of the Lorenz system evaluated at `value_array`

    """
    diff_array = np.zeros(3)
    diff_array[0] = sigma * (value_array[1]-value_array[0])
    diff_array[1] = value_array[0] * (rho - value_array[2]) - value_array[1]
    diff_array[2] = value_array[0] * value_array[1] - beta * value_array[2]

    return diff_array


def get_batch():
    """Function that parses the batch id from the command line arguments"""
    optlist, args = getopt.getopt(sys.argv[1:], '', longopts='batch=')
    batch = 0
    for o, a in optlist:
        if o == '--batch':
            batch = int(a)
            print('Found batch %d' % batch)

    return batch


def explore_batch(traj, batch):
    """Chooses exploration according to `batch`"""
    explore_dict = {}
    explore_dict['sigma'] = np.arange(10.0 * batch, 10.0*(batch+1), 1.0).tolist()
    # for batch = 0 explores sigma in [0.0, 1.0, 2.0, ..., 9.0],
    # for batch = 1 explores sigma in [10.0, 11.0, 12.0, ..., 19.0]
    # and so on
    traj.f_explore(explore_dict)


# And here goes our main function
def main():
    batch = get_batch()

    filename = 'saga_%s.hdf5' % str(batch)
    env = Environment(trajectory='Example_22_Euler_Integration_%s' % str(batch),
                      filename=filename,
                      file_title='Example_22_Euler_Integration',
                      comment='Go for Euler!',
                      overwrite_file=True,
                      multiproc=True,  # Yes we can use multiprocessing within each batch!
                      ncores=4)

    traj = env.trajectory
    trajectory_name = traj.v_name

    # 1st a) phase parameter addition
    add_parameters(traj)

    # 1st b) phase preparation
    # We will add the differential equation (well, its source code only) as a derived parameter
    traj.f_add_derived_parameter(FunctionParameter,'diff_eq', diff_lorenz,
                                 comment='Source code of our equation!')

    # explore the trajectory
    explore_batch(traj, batch)

    # 2nd phase let's run the experiment
    # We pass `euler_scheme` as our top-level simulation function and
    # the Lorenz equation 'diff_lorenz' as an additional argument
    env.run(euler_scheme, diff_lorenz)


if __name__ == '__main__':
    main()

Script to merge Trajectories

__author__ = 'Robert Meyer'

import os

from pypet import merge_all_in_folder
from the_task import FunctionParameter


def main():
    """Simply merge all trajectories in the working directory"""
    folder = os.getcwd()
    print('Merging all files')
    merge_all_in_folder(folder,
                        delete_other_files=True,  # We will only keep one trajectory
                        dynamic_imports=FunctionParameter,
                        backup=False)
    print('Done')


if __name__ == '__main__':
    main()