Difference: WilliamBreadenMaddenSandbox (4 vs. 5)

Revision 52015-02-18 - WilliamBreadenMadden

Line: 1 to 1
META TOPICPARENT name="Main.WilliamBreadenMadden"

parallel file validation

threading and multiprocessing

In computing, a process is an instance of a computer program that is being executed. It contains the program code and its current activity. A thread is a stream of instructions in a process and multithreading consists of multiple threads running within a process concurrently. Multiprocessing is the instantiation of multiple processes by the operating system. An error in one thread can bring down all the threads of a process while an error in a process can not (easily) bring down another process.

Python Global Interpreter Lock

The Python Global Interpreter Lock (GIL) is a mutual exclusion lock held by a programming language interpreter thread in order to avoid sharing code that is not thread-safe with other threads. Programs written in programming languages with a GIL can use separate processes to achieve parallelism.

The Python interpreter is not fully thread safe and, so, a GIL is implemented in order to support multithreaded Python programs. Without the GIL, even many simple operations are problematic (e.g. incrementing the reference count of some common object simultaneously in two threads could result in a single increment, rather than the intended two increments). As a rule in Python, only the thread that has acquired the GIL may operate on Python objects or call API functions. The Python interpreter attempts to switch threads regularly (using set.checkinterval()) in order to emulate concurrency.

Python multiprocessing

The Python module multiprocessing is a process-based 'threading' interface. It

  • supports spawning multiple processes,
  • offers local and remote concurrency (subprocesses are used instead of threads in order to avoid the GIL),
  • good control of multiple processors of a computer and
  • acceptable control of multiple processors on multiple computers, for example, via SSH.

The multiprocessing module has many built-in options for building a parallel program, but the three most basic are the Process, Queue and Lock classes.

The Process is an abstraction that sets up another Python process, provides it code to run and a way for the parent program to control its execution. Processes are spawned using a Process object and then called using its start() method.

%CODE{"python"}% from multiprocessing import Process

def function1(text): print text

if name == '__main__': process1 = Process(target=function1, args=('hello world',)) process1.start() process1.join() %ENDCODE%

In this example, the Process class is imported, a function for the process to run is created and then a Process object is instantiated with the function it is destined to run. The process is started. It runs and then returns a result. The process is completed using the method join (without this, the child process becomes a zombie process).

multiprocessing communications

Objects may be exchanged between processes through two main types of communication:

  • queues
  • pipes


Queues are simple-to-implement objects that are essentially a thread/process safe first in, first out data structure. They can store any pickleable (serialisable) Python object and and are straightforward to use for sharing data between processes.

%CODE{"python"}% from multiprocessing import Queue

queue1 = Queue()

queue1.put('text') queue1.put(['data1', 1, {'data2': 2}])

queue1.get() # returns 'text' queue1.get() # returns ['data1', 1, {'data2': 2}] %ENDCODE%


The Pipe() function returns a pair of connection objects connected by a pipe allowing for duplex (two way) communications. The two connection objects returned by Pipe() represent the two 'ends' of the pipe. Each end has methods such as send() and recv().

%CODE{"python"}% from multiprocessing import Process, Pipe

def function1(connection1): connection1.send([1, 2, 3]) connection1.close()

if name == '__main__': parent_connection1, child_connection1 = Pipe() p = Process(target=function1, args=(child_connection1,)) p.start() print parent_connection1.recv() # prints "[1, 2, 3]" p.join() %ENDCODE%


Locks are straightforward ways of allowing code to claim a lock, blocking other processes from executing similar code until the process is complete. The lock then may be released.

%CODE{"python"}% from multiprocessing import Lock

lock1 = Lock()

lock1.acquire() print 'I AM THE ONE!' lock1.release() %ENDCODE%

simple programs

Using Process, Queue and Lock instances, simple programs may be implemented. A rough idea for a program may have a work queue (data to be processed in some way) and result queue (answers produced from the data) used as communications. Processes may be created, as required for performing the work, and then started. The processes may be controlled using a list or a Pool. After all processes have been started, processes may be stopped by telling the processes to join. Each process may store the result of its work in the result queue and this queue could be printed after the child processes have exited.

Job Transforms file validation

Job Transforms can allow most of the complexity of standard Athena jobs to be hidden and to be updated by experts as necessary. As part of managing the processing of the job, Job Transforms offers the ability to test and validate input and output files. This is because errors in file access can be difficult to diagnose in the online software and, once a file has been written, it is useful to check that it reads the file back off the storage system.

The transforms should offer the ability to do both fast (e.g., event count checks) and deep (full ROOT scans) of files. (As the value of such checks must be balanced against the cost of doing them, in practice ATLAS makes a fast check on input files and a deeper check of output files.)

The legacy method of validating files produced by Job Transforms is to complete file validations one by one, serially. Parallelisation of processes such as file validations offers a means of reducing running time and is useful for the purposes of AthenaMP.

Amdhal's law

Amdhal's law asserts that the speedup of a program using multiple processors in parallel computing is limited by the sequential fraction of the program.

parallel job processor

The parallel job processor is, as its name suggests, the system that manages the processing of jobs in parallel. Core to its functioning is the Python multiprocessing module. Multiprocessing is a Python module that supports the spawning of processes using an interface similar to the threading module. In effect, multiprocessing offers an approach to bypassing the Python Global Interpreter Lock by using subprocesses instead of threads.

Here is described visually a non-exhaustive illustration of the parallel job processor and associated concepts. The figure below illustrates the basic structure of the parallel job processor. Of particular note are the objects

parallel job processor

There are a number of steps involved in the typical use and operation of the parallel job processor.

Job and JobGroup definitions

A Job for the parallel job processor consists of

  • a work function,
  • its arguments and
  • other information (such as its timeout specification).


A Job or a JobGroup is submitted to the parallel job processor. The parallel job processor submits all jobs to the process pool it created on initialisation, recording the submission time.

checking for results, timeouts and failures

Submissions are monitored for timeouts and failures, with exceptions being raised when appropriate. Timeouts are measured at the JobGroup level, rather than at the Job level.

getting results

Results of jobs returned by the process pool are propagated to the Jobs and to the JobGroup as they become available.

return results

Results of all jobs are stored in order in the results list of the JobGroup. This list is returned by the parallel job processor.


The code below depicts an example job group for the parallel job processor. In this case, it is a unit test for the system. Here, a job group of two jobs is defined. One of the jobs specifies a hello world function with a sleep time argument and the other specifies a multiplication function with multiplicand arguments. There is a timeout specified for each job.

%CODE{"python"}% ## @brief unit test for working functions # @detail This method is a unit test of the parallel job processor # testing the processing of two simple, working functions. def test_working(self): msg.info("\n\n\n\nPARALLEL JOB PROCESSOR WORKING TEST") jobGroup1 = JobGroup( name = "working functions test", jobs = [ Job( name = "hello world function", workFunction = helloWorld, workFunctionKeywordArguments = {

1, }, workFunctionTimeout = 10 ), Job( name = "multiplication function", workFunction = multiply, workFunctionKeywordArguments = {
3 }, workFunctionTimeout = 10 ) ] ) parallelJobProcessor1 = ParallelJobProcessor() parallelJobProcessor1.submit(jobSubmission = jobGroup1) results = parallelJobProcessor1.getResults() self.assertEquals(results, ['hello world', 6])

The figure below illustrates example (abbreviated) parallel job processor terminal output (for the parallel validation of typical physics data).

parallel job processor logging

The video below illustrates some benefits of the use of the parallel job processor -- in this case, for the purpose of making a Monte Carlo estimation of pi.

in a pickle: the current difficulties of serial communications between processes

Serialisation is the process of converting an object to a bytestream. Serialisation of objects for the purposes of communications is provided by the Python pickle module. Certain objects important to file validations cannot be pickled. Specifically, the legacy validation functions were not workable. The solution was to make new validation functions.

The serialisation limitations are likely to be addressed in due course in the shorter term with more advanced serialisation methods such as those offered by https://pypi.python.org/pypi/dill and in the longer term with improvements in Python parallelisation and GIL handling. Dill provides the user with the same interface as pickle and also includes additional features. Its primary use is to send Python objects across a network as a bytestream. It is quite flexible and allows arbitrary user-defined classes and functions to be serialised. Dill is part of https://pypi.python.org/pypi/pathos, a Python framework for heterogeneous computing. Dill is in early development stages.

data validation

The standard validation procedure for output files is defined by the function performStandardFileValidation of the module PyJobTransforms.trfValidation. This function oversees the identification of a validation function appropriate for a given data type and the management of work for the parallel job processor.

In order to address the limitations of pickle, separate data validation functions, such as returnIntegrityOfPOOLFile, are provided by the module PyJobTransforms.trfFileValidationFunctions. The integrity functions defined in this package are associated with the classes of the various data types in the module PyJobTransforms.trfArgClasses. Specifically, the name of the validation function appropriate for a type of data is defined as a data member of the class for the data type. In the case of POOL files, the specification is as follows:

%CODE{"python"}% class argPOOLFile(argAthenaFile):

integrityFunction = "returnIntegrityOfPOOLFile" %ENDCODE%

unit tests

The unit tests for the parallel job processor are defined in test_trfUtilsParallelJobProcessor.py and the unit tests for validation of data using the parallel job processor are defined in test_trfUtilsParallelJobProcessorData.py. The unit tests for the parallel job processor include testing of work functions that are valid, that time out and that fail. The unit tests for the data validation using the parallel job processor include testing of AOD, corrupted AOD, ESD, histogram, TAG and bytestream data.

8 TeV 13 TeV
process dataset number 8 TeV generator number of events process dataset number mc14_13TeV generator number of events AMI link
ttH 125 GeV, allhad 169887 PowHel Py8 189529 ttH, 0-150 GeV slice 189529 Pythia8 50000 link
    189530 ttH, 150-350 GeV slice 189530 Pythia8 50000  
    189531 ttH, >350 GeV slice 189531 Pythia8 50000  
ttH 125 GeV, l+jets 169888 PowHel Py8 189526 ttH, 0-150 GeV slice 189526 Pythia8 50000  
    189527 ttH, 150-350 GeV slice 189527 Pythia8 50000  
    189528 ttH, >350 GeV slice 189528 Pythia8 50000  
ttH 125 GeV, dilepton 169889 PowHel Py8 189523 ttH, 0-150 GeV slice 189523 Pythia8 50000  
    189524 ttH, 150-350 GeV slice 189524 Pythia8 50000  
    189525 ttH, >350 GeV slice 189525 Pythia8 50000  
ttbar nonallhadronic 117050 Powheg+Pythia6 110401 ttbar_nonallhad 110401 Powheg+Pythia+Photos+Tauola 400000  
single top tch. top 110090 Powheg+Pythia6 110070 singletop_tchan_lept_top 110070 Powheg+Pythia+Photos+Tauola 999000  
single top tch, antitop 110091 Powheg+Pythia6 110071 singletop_tchan_lept_antitop 110071 Powheg+Pythia+Photos+Tauola 1000000  
single top schan 110119 Powheg+Pythia6 - - - - -  
single top, Wt chan. inclusive 110140 Powheg+Pythia6 110305 Wtchan_incl_DR 110305   958500  
single top, Wt chan. dilepton 110141 Powheg+Pythia6 - -   - -  
ttbar+W 119353 Madgraph+Py6 119353 ttbarW 119353 MadGraph+Pythia+Photos+Tauola 399500  
ttbar+Z 119355 Madgraph+Py6 119355 ttbarZ 119355 MadGraph+Pythia+Photos+Tauola 400000  
ttbar+WW 158813 Madgraph+Py6 119583 ttbarWW 119583 MadGraph+Pythia+Photos+Tauola 10000  
ttbar+Wj Excl 174830 Madgraph+Py6 174830 ttbarWjExcl 174830 MadGraph+Pythia+Photos+Tauola 399500  
ttbar+Wjj Incl. 174831 Madgraph+Py6 174831 ttbarWjjIncl 174831 MadGraph+Pythia+Photos+Tauola 395000  
ttbar+Zj Excl. 174832 Madgraph+Py6 174832 ttbarZjExcl 174832 MadGraph+Pythia+Photos+Tauola 399500  
ttbar+Zjj Incl. 174833 Madgraph+Py6 174833 ttbarZjjIncl 174833 MadGraph+Pythia+Photos+Tauola 399500  
WZ to enu qq 183735 Sherpa   -   - -  
WZ to munu qq 183737 Sherpa   -   - -  
WZ to taunu qq 183739 Sherpa   -   - -  
WW lnulnu np0 107100 AlpgenJimmy   -   - -  
WW lnulnu np1 107101 AlpgenJimmy   -   - -  
WW lnulnu np2 107102 AlpgenJimmy   -   - -  
WW lnulnu np3 107103 AlpgenJimmy   -   - -  
WZ incl ll np0 107104 AlpgenJimmy   -   - -  
WZ incl ll np1 107105 AlpgenJimmy   -   - -  
WZ incl ll np2 107106 AlpgenJimmy   -   - -  
WZ incl ll np3 107107 AlpgenJimmy   -   - -  
ZZ incl ll np0 107108 AlpgenJimmy   -   - -  
ZZ incl ll np1 107109 AlpgenJimmy   -   - -  
ZZ incl ll np2 107110 AlpgenJimmy   -   - -  
ZZ incl ll np3 107111 AlpgenJimmy   -   - -  
WW qq lnue np0 110829 AlpgenJimmy   -   - -  
WW qq lnue np1 110830 AlpgenJimmy   -   - -  
WW qq lnue np2 110831 AlpgenJimmy   -   - -  
WW qq lnue np3 110832 AlpgenJimmy   -   - -  
Wenu np0 147025 AlpgenPythia   -   - -  
Wenu np1 147026 AlpgenPythia   -   - -  
Wenu np2 147027 AlpgenPythia   -   - -  
Wenu np3 147028 AlpgenPythia   -   - -  
Wenu np4 147029 AlpgenPythia   -   - -  
Wenu np5 incl 147030 AlpgenPythia   -   - -  
Wmunu np0 147033 AlpgenPythia   -   - -  
Wmunu np1 147034 AlpgenPythia   -   - -  
Wmunu np2 147035 AlpgenPythia   -   - -  
Wmunu np3 147036 AlpgenPythia   -   - -  
Wmunu np4 147037 AlpgenPythia   -   - -  
Wmunu np5 incl 147038 AlpgenPythia   -   - -  
Wtaunu np0 147041 AlpgenPythia   -   - -  
Wtaunu np1 147042 AlpgenPythia   -   - -  
Wtaunu np2 147043 AlpgenPythia   -   - -  
Wtaunu np3 147044 AlpgenPythia   -   - -  
Wtaunu np4 147045 AlpgenPythia   -   - -  
Wtaunu np5 inc 147046 AlpgenPythia   -   - -  
Wcc Np0 200156 AlpgenPythia   -   - -  
Wcc Np1 200157 AlpgenPythia   -   - -  
Wcc Np2 200158 AlpgenPythia   -   - -  
Wcc Np3 incl 200159 AlpgenPythia   -   - -  
Wc Np0 200056 AlpgenPythia   -   - -  
Wc Np1 200057 AlpgenPythia   -   - -  
Wc Np2 200058 AlpgenPythia   -   - -  
Wc Np3 200059 AlpgenPythia   -   - -  
Wc Np4 incl 200060 AlpgenPythia   -   - -  
Wbb Np0 200156 AlpgenPythia   -   - -  
Wbb Np1 200257 AlpgenPythia   -   - -  
Wbb Np2 200258 AlpgenPythia   -   - -  
Wbb Np3 inc 200259 AlpgenPythia   -   - -  
META FILEATTACHMENT attachment="2014-06-30T1358.gif" attr="" comment="parallel job processor" date="1404133307" name="2014-06-30T1358.gif" path="2014-06-30T1358.gif" size="862869" user="WilliamBreadenMadden" version="1"
META FILEATTACHMENT attachment="2014-06-30T1401.jpg" attr="" comment="parallel job processor logging" date="1404133353" name="2014-06-30T1401.jpg" path="2014-06-30T1401.jpg" size="1185878" user="WilliamBreadenMadden" version="1"
This site is powered by the TWiki collaboration platform Powered by PerlCopyright © 2008-2021 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback