parallel file validation

threading and multiprocessing

In computing, a process is an instance of a computer program that is being executed. It contains the program code and its current activity. A thread is a stream of instructions in a process and multithreading consists of multiple threads running within a process concurrently. Multiprocessing is the instantiation of multiple processes by the operating system. An error in one thread can bring down all the threads of a process while an error in a process can not (easily) bring down another process.

Python Global Interpreter Lock

The Python Global Interpreter Lock (GIL) is a mutual exclusion lock held by a programming language interpreter thread in order to avoid sharing code that is not thread-safe with other threads. Programs written in programming languages with a GIL can use separate processes to achieve parallelism.

The Python interpreter is not fully thread safe and, so, a GIL is implemented in order to support multithreaded Python programs. Without the GIL, even many simple operations are problematic (e.g. incrementing the reference count of some common object simultaneously in two threads could result in a single increment, rather than the intended two increments). As a rule in Python, Only the thread that has acquired the GIL may operate on Python objects or call API functions. The Python interpreter attempts to switch threads regularly (using set.checkinterval()) in order to emulate concurrency.

Python multiprocessing

The Python module "multiprocessing" is a process-based 'threading' interface. It

  • supports spawning multiple processes,
  • offers local and remote concurrency (subprocesses are used instead of threads in order to avoid the GIL),
  • good control of multiple processors of a computer and
  • acceptable control of multiple processors on multiple computers, for example, via SSH.

The multiprocessing module has many built-in options for building a parallel program, but the three most basic are the Process, Queue and Lock classes.

The Process is an abstraction that sets up another Python process, provides it code to run and a way for the parent program to control its execution. Processes are spawned using a Process object and then called using its start() method.

%CODE{"python"}% from multiprocessing import Process

def function1(text): print text

if name == '__main__': process1 = Process(target=function1, args=('hello world',)) process1.start() process1.join() %ENDCODE%

In this example, the Process class is imported, a function for the process to run is created and then a Process object is instantiated with the function it is destined to run. The process is started. It runs and then returns a result. The process is completed using the method join (without this, the child process becomes a zombie process).

multiprocessing communications

Objects may be exchanged between processes through two main types of communication:

  • queues
  • pipes

a taste of Python multiprocessing: queues

Queues are simple to implement objects that are essentially a thread/process safe first in, first out data structure. They can store any pickleable (serialisable) Python object and and are straightforward to use for sharing data between processes.

%CODE{"python"}% from multiprocessing import Queue

queue1 = Queue()

queue1.put('text') queue1.put(['data1', 1, {'data2': 2}])

queue1.get() # returns 'text' queue1.get() # returns ['data1', 1, {'data2': 2}] %ENDCODE%

pipes

The Pipe() function returns a pair of connection objects connected by a pipe allowing for duplex (two way) communications. The two connection objects returned by Pipe() represent the two 'ends' of the pipe. Each end has methods such as send() and recv().

%CODE{"python"}% from multiprocessing import Process, Pipe

def function1(connection1): connection1.send([1, 2, 3]) connection1.close()

if name == '__main__': parent_connection1, child_connection1 = Pipe() p = Process(target=function1, args=(child_connection1,)) p.start() print parent_connection1.recv() # prints "[1, 2, 3]" p.join() %ENDCODE%

locks

Locks are straightforward ways of allowing code to claim a lock, blocking other processes from executing similar code until the process is complete. The lock then may be released.

%CODE{"python"}% from multiprocessing import Lock

lock1 = Lock()

lock1.acquire() print 'I AM THE ONE!' lock1.release() %ENDCODE%

simple programs

Using Processs, Queues and Locks, simple programs may be implemented. A rough idea for a program may have a work queue (data to be processed in some way) and result queue (answers produced from the data) used as communications. Processes may be created, as required for performing the work, and then started. The processes may be controlled using a list or a Pool. After all processes have been started, processes may be stopped by telling the processes to join. Each process may store the result of its work in the result queue and this queue could be printed after the child processes have exited.

multiprocessing in Job Transforms

future: implementation of multiprocessing in Job Transforms (travelling to CERN soon to work more directly on project)

Job Transforms file validation

Job Transforms can allow most of the complexity of standard Athena jobs to be hidden and to be updated by experts as necessary. As part of managing the processing of the job, Job Transforms offers the ability to test and validate input and output files. This is because errors in file access can be difficult to diagnose in the online software and, once a file has been written, it is useful to check that it reads the file back off the storage system.

The transforms should offer the ability to do both fast (e.g., event count checks) and deep (full ROOT scans) of files. (As the value of such checks must be balanced against the cost of diong them, in practice ATLAS makes a fast check on input files and a deeper check of output files.)

The legacy method of validating files produced by Job Transforms is to complete file validations one by one, serially. Parallelisation of processes such as file validations offers a means of reducing running time and is useful for the purposes of AthenaMP.

Amdhal's law

Amdhal's law asserts that the speedup of a program using multiple processors in parallel computing is limited by the sequential fraction of the program.

parallel job processor

The parallel job processor is a central part of the service task. It is, as its name suggests, the system that manages the processing of jobs in parallel. Core to its functioning is the Python multiprocessing module. Multiprocessing is a Python module that supports the spawning of processes using an interface similar to the threading module. In effect, multiprocessing offers an approach to bypassing the Python Global Interpreter Lock by using subprocesses instead of threads.

Here is described visually a non-exhaustive illustration of the parallel job processor and associated concepts. The figure below illustrates the basic structure of the parallel job processor. Of particular note are the objects

parallel job processor

There are a number of steps involved in the typical usage and operation of the parallel job processor.

Job and JobGroup definitions

A Job for the parallel job processor consists of

  • a work function,
  • its arguments and
  • other information (such as its timeout specification).

submission

A Job or a JobGroup is submitted to the parallel job processor. The parallel job processor submits all jobs to the process pool it created on initialisation, recording the submission time.

checking for results, timeouts and failures

Submissions are monitored for timeouts and failures, with exceptions being raised when appropriate. Timeouts are measured at the JobGroup level, rather than at the Job level.

getting results

Results of jobs returned by the process pool are propagated to the Jobs and to the JobGroup as they become available.

return results

Results of all jobs are stored in order in the results list of the JobGroup. This list is returned by the parallel job processor.

usage

The code below depicts an example job group for the parallel job processor. In this case, it is a unit test for the system. Here, a job group of two jobs is defined. One of the jobs specifies a hello world function with a sleep time argument and the other specifies a multiplication function with multiplicand arguments. There is a timeout specified for each job.

%CODE{"python"}% ## @brief unit test for working functions # @detail This method is a unit test of the parallel job processor # testing the processing of two simple, working functions. def test_working(self): msg.info("\n\n\n\nPARALLEL JOB PROCESSOR WORKING TEST") jobGroup1 = JobGroup( name = "working functions test", jobs = [ Job( name = "hello world function", workFunction = helloWorld, workFunctionKeywordArguments = {

'sleepTime'
1, }, workFunctionTimeout = 10 ), Job( name = "multiplication function", workFunction = multiply, workFunctionKeywordArguments = {
'multiplicand1'
2,
'multiplicand2'
3 }, workFunctionTimeout = 10 ) ] ) parallelJobProcessor1 = ParallelJobProcessor() parallelJobProcessor1.submit(jobSubmission = jobGroup1) results = parallelJobProcessor1.getResults() self.assertEquals(results, ['hello world', 6])
%ENDCODE%

The figure below illustrates example (abbreviated) parallel job processor terminal output (for the parallel validation of typical physics data).

parallel job processor logging

The video below illustrates some benefits of the use of the parallel job processor -- in this case, for the purpose of making a Monte Carlo estimation of pi.

in a pickle: the current difficulties of serial communications between processes

Serialisation is the process of converting an object to a bytestream.

Serialisation of objects for the purposes of communications is provided by the Python pickle module. Certain objects important to file validations cannot be pickled. Specifically, the legacy validation functions were not workable. The solution was to make new validation functions. While this extended the service task somewhat, it proved to be the most obviously reliable and validated approach -- and was successful.

The serialisation limitations are likely to be addressed in due course in the shorter term with more advanced serialisation methods such as those offered by dill (https://pypi.python.org/pypi/dill) and in the longer term with improvements in Python parallelisation and GIL handling.

Dill provides the user with the same interface as pickle and also includes additional features. Its primary use is to send Python objects across a network as a bytestream. It is quite flexible and allows arbitrary user-defined classes and functions to be serialised. Dill is part of pathos, a Python framework for heterogeneous computing. Dill is in the early development states.

In order to address the limitations of pickle, separate data validation functions, such as returnIntegrityOfPOOLFile, are provided by the module PyJobTransforms.trfFileValidationFunctions. The integrity functions defined in this package are associated with the arg classes for the various data types in the module PyJobTransforms.trfArgClasses. Specifically, the name of the validation function appropriate for a type of data are defined as data members of the class for the data type. In the case of POOL files, the specification is as follows:

%CODE{"python"}% class argPOOLFile(argAthenaFile):

integrityFunction = "returnIntegrityOfPOOLFile" %ENDCODE%

\section{characterising impact}

Figures~\ref{figure:impact_1},~\ref{figure:impact_1} and \ref{figure:impact_1} depict tentative graphs of processing POOL files. Figure~\ref{figure:impact_1} illustrates the processing time against number of files validated in parallel while figure~\ref{figure:impact_2}illustrates the ratio of processing time to processing time for one file.

\begin{figure}[H] \begin{center} \includegraphics[width=\imageWidthASpecification]{figures/2014-04-10_1.eps} \end{center} \caption{parallel job processor: large efficiency improvement as a result of parallelisation} \label{figure:impact_1} \end{figure}

\begin{figure}[H] \begin{center} \includegraphics[width=\imageWidthASpecification]{figures/2014-04-10_2.eps} \end{center} \caption{parallel job processor: large efficiency improvement as a result of parallelisation} \label{figure:impact_2} \end{figure}

\noindent Figure~\ref{figure:impact_3} illustrates the performance for the unusual task of parallel validation of up to 40 POOL files.

\begin{figure}[H] \begin{center} \includegraphics[width=\imageWidthASpecification]{figures/2014-04-11_1.eps} \end{center} \caption{parallel job processor: large efficiency improvement as a result of parallelisation} \label{figure:impact_3} \end{figure}

Topic attachments
I Attachment History Action Size Date Who Comment
GIFgif 2014-06-30T1358.gif r1 manage 842.6 K 2014-06-30 - 13:01 WilliamBreadenMadden parallel job processor
JPEGjpg 2014-06-30T1401.jpg r1 manage 1158.1 K 2014-06-30 - 13:02 WilliamBreadenMadden parallel job processor logging
Edit | Attach | Print version | History: r12 | r5 < r4 < r3 < r2 | Backlinks | Raw View | Raw edit | More topic actions...
Topic revision: r3 - 2014-07-29 - WilliamBreadenMadden
 
This site is powered by the TWiki collaboration platform Powered by PerlCopyright © 2008-2020 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback