parallel file validation

threading and multiprocessing

  • thread: stream of instructions in a process
  • multithreading: multiple threads running within a process
  • multiprocessing: multiple processes instantiated by the operating system (an error in one process can not bring down another process)

  • multithreading versus multiprocessing: An error in one thread can bring down all the threads of a process while an error in a process can not (easily) bring down another process.

Python Global Interpreter Lock

The Python Global Interpreter Lock (GIL) is a mutual exclusion lock held by a programming language interpreter thread in order to avoid sharing code that is not thread-safe with other threads. Programs written in programming languages with a GIL can use separate processes to achieve parallelism.

The Python interpreter is not fully thread safe and, so, a GIL is implemented in order to support multithreaded Python programs. Without the GIL, even many simple operations are problematic (e.g. incrementing the reference count of some common object simultaneously in two threads could result in a single increment, rather than the intended two increments). As a rule in Python, Only the thread that has acquired the GIL may operate on Python objects or call API functions. The Python interpreter attempts to switch threads regularly (using set.checkinterval()) in order to emulate concurrency.

Python multiprocessing

The Python module "multiprocessing" is a process-based 'threading' interface. It

  • supports spawning multiple processes,
  • offers local and remote concurrency (subprocesses are used instead of threads in order to avoid the GIL),
  • good control of multiple processors of a computer and
  • acceptable control of multiple processors on multiple computers, for example, via SSH.

The multiprocessing module has many built-in options for building a parallel program, but the three most basic are the Process, Queue and Lock classes.

The Process is an abstraction that sets up another Python process, provides it code to run and a way for the parent program to control its execution. Processes are spawned using a Process object and then called using its start() method.

%CODE{"python"}% from multiprocessing import Process

def function1(text): print text

if name == '__main__': process1 = Process(target=function1, args=('hello world',)) process1.start() process1.join() %ENDCODE%

In this example, the Process class is imported, a function for the process to run is created and then a Process object is instantiated with the function it is destined to run. The process is started. It runs and then returns a result. The process is completed using the method join (without this, the child process becomes a zombie process).

multiprocessing communications

Objects may be exchanged between processes through two main types of communication:

  • queues
  • pipes

a taste of Python multiprocessing: queues

Queues are simple to implement objects that are essentially a thread/process safe first in, first out data structure. They can store any pickleable (serialisable) Python object and and are straightforward to use for sharing data between processes.

%CODE{"python"}% from multiprocessing import Queue

queue1 = Queue()

queue1.put('text') queue1.put(['data1', 1, {'data2': 2}])

queue1.get() # returns 'text' queue1.get() # returns ['data1', 1, {'data2': 2}] %ENDCODE%

pipes

The Pipe() function returns a pair of connection objects connected by a pipe allowing for duplex (two way) communications. The two connection objects returned by Pipe() represent the two 'ends' of the pipe. Each end has methods such as send() and recv().

%CODE{"python"}% from multiprocessing import Process, Pipe

def function1(connection1): connection1.send([1, 2, 3]) connection1.close()

if name == '__main__': parent_connection1, child_connection1 = Pipe() p = Process(target=function1, args=(child_connection1,)) p.start() print parent_connection1.recv() # prints "[1, 2, 3]" p.join() %ENDCODE%

locks

Locks are straightforward ways of allowing code to claim a lock, blocking other processes from executing similar code until the process is complete. The lock then may be released.

%CODE{"python"}% from multiprocessing import Lock

lock1 = Lock()

lock1.acquire() print 'I AM THE ONE!' lock1.release() %ENDCODE%

simple programs

Using Processs, Queues and Locks, simple programs may be implemented. A rough idea for a program may have a work queue (data to be processed in some way) and result queue (answers produced from the data) used as communications. Processes may be created, as required for performing the work, and then started. The processes may be controlled using a list or a Pool. After all processes have been started, processes may be stopped by telling the processes to join. Each process may store the result of its work in the result queue and this queue could be printed after the child processes have exited.

multiprocessing in Job Transforms

future: implementation of multiprocessing in Job Transforms (travelling to CERN soon to work more directly on project)

Job Transforms file validation

Job Transforms can allow most of the complexity of standard Athena jobs to be hidden and to be updated by experts as necessary. As part of managing the processing of the job, Job Transforms offers the ability to test and validate input and output files. This is because errors in file access can be difficult to diagnose in the online software and, once a file has been written, it is useful to check that it reads the file back off the storage system.

The transforms should offer the ability to do both fast (e.g., event count checks) and deep (full ROOT scans) of files. (As the value of such checks must be balanced against the cost of diong them, in practice ATLAS makes a fast check on input files and a deeper check of output files.)

The legacy method of validating files produced by Job Transforms is to complete file validations one by one, serially. Parallelisation of processes such as file validations offers a means of reducing running time and is useful for the purposes of AthenaMP.

Amdhal's law

Amdhal's law asserts that the speedup of a program using multiple processors in parallel computing is limited by the sequential fraction of the program.

parallel job processor

The parallel job processor is a central part of the service task. It is, as its name suggests, the system that manages the processing of jobs in parallel. Core to its functioning is the Python multiprocessing module. Multiprocessing is a Python module that supports the spawning of processes using an interface similar to the threading module. In effect, multiprocessing offers an approach to bypassing the Python Global Interpreter Lock by using subprocesses instead of threads.

Here is described visually a non-exhaustive illustration of the parallel job processor and associated concepts.

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/1.jpg} \end{center} \caption{parallel job processor} \label{figure:PJP_step_0} \end{figure}

Figure~\ref{figure:PJP_step_0} illustrates the basic structure of the parallel job processor. Of particular note are the objects

\begin{itemize} \item function, \item Job, \item JobGroup, \item ParallelJobProcessor and \item pool. \end{itemize}

There are a number of steps involved in the typical usage and operation of the parallel job processor.

\subsection{Job and JobGroup definitions}

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/2.jpg} \end{center} \caption{parallel job processor: Job and JobGroup definitions} \label{figure:PJP_step_1} \end{figure}

A Job for the parallel job processor consists of

\begin{itemize} \item a work function, \item its arguments and \item other information (such as its timeout specification). \end{itemize}

\subsection{submission}

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/3.jpg} \end{center} \caption{parallel job processor: submission} \label{figure:PJP_step_2} \end{figure}

A Job or a JobGroup is submitted to the parallel job processor. The parallel job processor submits all jobs to the process pool it created on initialisation, recording the submission time.

\subsection{checking for results, timeouts and failures}

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/4.jpg} \end{center} \caption{parallel job processor: checking for results, timeouts and failures} \label{figure:PJP_step_3} \end{figure}

Submissions are monitored for timeouts and failures, with exceptions being raised when appropriate. Timeouts are measured at the JobGroup level, rather than at the Job level.

\subsection{getting results}

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/5.jpg} \end{center} \caption{parallel job processor: getting results} \label{figure:PJP_step_4} \end{figure}

Results of jobs returned by the process pool are propagated to the Jobs and to the JobGroup as they become available.

\subsection{return results}

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/6.jpg} \end{center} \caption{parallel job processor: return results} \label{figure:PJP_step_5} \end{figure}

Results of all jobs are stored in order in the results list of the JobGroup. This list is returned by the parallel job processor.

\subsection{usage}

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/7.jpg} \end{center} \caption{parallel job processor: job specification in Python} \label{figure:PJP_job_specification} \end{figure}

Figure~\ref{figure:PJP_job_specification} depicts an example job group for the parallel job processor. In this case, it is a unit test for the system. Here, a job group of two jobs is defined. One of the jobs specifies a hello world function with a sleep time argument and the other specifies a multiplication function with multiplicand arguments. There is a timeout specified for each job.

\newpage

\noindent Figure~\ref{figure:PJP_terminal_output_1} illustrates example (abbreviated) parallel job processor terminal output (for the parallel validation of typical physics data).

\begin{figure}[H] \begin{center} \includegraphics[width=\textwidth]{figures/8.jpg} \end{center} \caption{parallel job processor: submission} \label{figure:PJP_terminal_output_1} \end{figure}

\section{challenges}

The Job Transforms infrastructure was written before there were standardised ways of reliably parallelising processes in Python, so, naturally, there were challenges in adapting Job Transforms to parallisation. One problem encountered involved communications between processes breaking down in a confusing, seemingly inconsistent way. Pools use certain signals to receive termination commands etc.~and Job Transforms was using the same signals. The solution in this case was to render Job Transforms silent while the parallel processor is running. Another problem encountered was that certain objects in Job Transforms cannot be ``pickled'' (serialised in inter-process communications). Serialisation of objects for the purposes of communications is provided by the \href{http://docs.python.org/2/library/pickle.html}{\textcolor{black!100}{Python pickle module}}. Certain objects important to file validations cannot be pickled. Specifically, the legacy validation functions were not workable. The solution was to make new validation functions. While this extended the service task somewhat, it proved to be the most obviously reliable and validated approach -- and was successful.

\section{service task status}

The main activities in the service task were as follows:

\begin{itemize} %\small \item familiarisation with Job Transforms basic usage and code \item starting documentation for new Job Transforms \item general support (lots of coding tasks to support new development) \item systematic investigation of parallel processing methods in Python, principally methods associated with the Python multiprocessing module \item coded multiprocessing code library \item coded \href{https://svnweb.cern.ch/trac/atlasoff/browser/Tools/PyJobTransforms/trunk/python/trfUtils.py}{\textcolor{black!100}{parallel job processor}} \item coded \href{https://svnweb.cern.ch/trac/atlasoff/browser/Tools/PyJobTransforms/trunk/test/test_trfUtilsParallelJobProcessor.py}{\textcolor{black!100}{unit tests for parallel job processor}} (validation for parallel job processor) \item coded new validation functions and associated exception handling procedures \item high-level controls on how much of the CPU resources the parallel job processor may request \item validation of final unit tests (almost complete) \item paper \emph{ATLAS Job Transforms: A Data Driven Workflow Engine}~\cite{Stewart_1} \end{itemize} %\item[\XSolidBrush] bar %\item[] bar

\section{characterising impact}

Figures~\ref{figure:impact_1},~\ref{figure:impact_1} and \ref{figure:impact_1} depict tentative graphs of processing POOL files. Figure~\ref{figure:impact_1} illustrates the processing time against number of files validated in parallel while figure~\ref{figure:impact_2}illustrates the ratio of processing time to processing time for one file.

\begin{figure}[H] \begin{center} \includegraphics[width=\imageWidthASpecification]{figures/2014-04-10_1.eps} \end{center} \caption{parallel job processor: large efficiency improvement as a result of parallelisation} \label{figure:impact_1} \end{figure}

\begin{figure}[H] \begin{center} \includegraphics[width=\imageWidthASpecification]{figures/2014-04-10_2.eps} \end{center} \caption{parallel job processor: large efficiency improvement as a result of parallelisation} \label{figure:impact_2} \end{figure}

\noindent Figure~\ref{figure:impact_3} illustrates the performance for the unusual task of parallel validation of up to 40 POOL files.

\begin{figure}[H] \begin{center} \includegraphics[width=\imageWidthASpecification]{figures/2014-04-11_1.eps} \end{center} \caption{parallel job processor: large efficiency improvement as a result of parallelisation} \label{figure:impact_3} \end{figure}

Edit | Attach | Print version | History: r12 | r4 < r3 < r2 < r1 | Backlinks | Raw View | Raw edit | More topic actions...
Topic revision: r1 - 2014-06-25 - WilliamBreadenMadden
 
This site is powered by the TWiki collaboration platform Powered by PerlCopyright © 2008-2020 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding TWiki? Send feedback