Home Products Consulting Download Registration Support Events

Home
Up

Network Multiple Computers/Processors for Parallel Computing

An Alternative MPI with Object Oriented Design and Programming Style

support@udaparts.com
Updated on 05/16/2009

Contents

1.    Introduction

        Scientific computation has a high demanding to computer CPU power and is time-consuming in many cases. It is common that scientific computation requires CPU cores and even many computers networked to solve a complicate and a large problem in parallel for reducing wall clock time. Two languages, FORTRAN and C/C++, are often employed for developing parallel computations. Sometimes, Java is also used for parallel computation. Over past decades, a number of technologies are developed for this purpose. They are OpenMP, MPI (Message Passing Interface)/OpenMPI, Grid computing, and Charm++. You can do Internet search for these technologies by typing their key words for many detailed or concise introductions. 

        SocketPro is written with requests batching, non-blocking communication, and parallel computation in mind at the very beginning. It can be easily used not only for common commercial applications but also scientific engineer computing. This article comes with a simple sample to demonstrate basic steps for parallel computing using SocketPro in C++, C# and VB.NET.

       Sample codes for C++, C# and VB.NET are located at the directory C:\Program Files\UDAParts\SocketPro\samples\MyMPI after installing SocketPro onto your machine. In addition to this sample, we also recommend you the tutorial four and HTTP loading balance for further studying.

2.    Numerical integration for π

        Consider the problem of computing the value of π using the below numerical integration.

        We can use trapezoidal integration to solve the integral. The basic idea is to fill the area under a curve with a series of tiny rectangles. As the width of rectangles approaches 0, the sum of the areas of these rectangles approaches the real value of π. To accurately get the value, we must divide the integral area with rectangles as many and small as possible. What we are going to divide the integration into a set of sections and send them onto different computing machines (servers). One center machine (client) will collect the integral values of these sections and sum them to get the π value.

3.    Universal interface definition file

        To simplify your development, SocketPro provides you a tool uidparser.exe to quickly create client and server skeleton codes with a given universal interface definition file. For details, you may see the tutorial one inside the SocketPro package. This sample file is pi.uid containing the following simple code.

[              

                ServiceID = 1212

]                                              

CPPi

{

                $double Compute(in double dStart, in double dStep, in int nNum);          

}

        The parameter dStart is the starting point of a portion of integration. The input parameter dStep is the width of a tiny integration rectangle. The parameter nNum is the number of tiny rectangles assigned to a machine for integration. The request will return a portion of integral in double. Since it is a slow request, we put the char $ in front of the request. The tool uidparser will create proper code to put the computation in a worker thread automatically. Afterwards, you find the tool uidparser.exe in the directory ..\bin, and create skeleton C++ client and server codes by executing the following command in DOS. 

        uidparser -FE:\uskt\PPi\pi.uid -L1

        Alternatively, you can get C# and VB.NET skeleton codes with options -L0 and -L2, respectively.

4.    Implementation on real servers

        The code on real servers are extremely simple. We only need to complete coding one function Compute, as shown in the below code snippet:

        protected void Compute(double dStart, double dStep, int nNum, out double ComputeRtn)
        {
                int n;
                int n100 = nNum/100;
                double dX = dStart;
                dX += dStep/2;
                double dd = dStep * 4.0;
                ComputeRtn = 0.0;
                for(n=0; n<nNum; n++)
                {
                        dX += dStep;
                        ComputeRtn += dd/(1 + dX*dX);
                        if(n100 > 0 && ((n+1)%n100) == 0 && n>0)
                        {
                                if (IsCanceled || IsClosing())
                                        break;
                        }
                }
        }

        The code also checks if a client sends the request Cancel or its connection is going to be closed so that the for loop can quickly break without wasting extra CPU usage.

5.    Parallel computing with loading balance and disaster recovery on client side

        SocketPro supports parallel computation on either client or server side. First, we are going to show you parallel computing on client side. Here is the scenario by picture.

         

        The picture shows a loading balancer on client side, which is connected with four real servers A, B, C, and D. These real servers finish computing π, and results will be sent to the central loading balancer. 

        When you execute the sample loading balancer on client, you will see it running like this picture.

        

        When you either shut down the real server application or power off a real server for testing, you will see that SocketPro is able to recovery computing from the disaster. Now, let us see the code on the central loading balancer.

        First of all, we derive a class from the class CSocketPoolEx as shown in the below

         class CPiParallel : CSocketPoolEx<CPPi>

        {

                private const int m_nDivision = 100;

                public double GetPi()

                {

                        lock (m_cs)

                        {

                                return m_dPi;

                        }

                }

 

                public int Progress

                {

                        get

                        {

                                lock (m_cs)

                                {

                                        return m_nDivision - JobManager.CountOfJobs;

                                }

                        }

                }

 

                protected override bool OnFailover(CPPi Handler, IJobContext JobContext)

                {

                        string str = "JobFail, JobId = " + JobContext.JobId;

                        str += ", Progress = " + Progress;

                        System.Diagnostics.Trace.WriteLine(str);

        

                        //this is called within a worker thread

                        m_dlg.BeginInvoke(m_dlg.UpdateProgress);

                        return true;

                }

 

                protected override void OnJobDone(CPPi Handler, IJobContext JobContext)

                {

                        string str = "JobDone, JobId = " + JobContext.JobId;

                        str += ", Progress = " + Progress;

                        System.Diagnostics.Trace.WriteLine(str);

        

                        m_dlg.BeginInvoke(m_dlg.UpdateProgress);

                }

 

                protected override void OnReturnedResultProcessed(CPPi Handler, IJobContext JobContext, short sRequestId)

                {

                        if (sRequestId == piConst.idComputeCPPi)

                        {

                                lock (m_cs)

                                {

                                        //this is called within a worker thread, similar to functions MPI_Reduce and MPI_Gather of MPI

                                        m_dPi += Handler.m_ComputeRtn;

                                }

                        }

                }

 

                public void PrepareAndExecuteJobs()

                {

                        int n;

                        double dStart;

                        int nNum = 10000000;

                        double dStep = 1.0/nNum/m_nDivision;

 

                       //initialize member

                       m_dPi = 0.0;

            

                        //get an async handler

                        CPPi pi = (CPPi)JobManager.LockIdentity();

                        if (pi == null)

                                return;

                       

                       //a job containing one task only,  similar to the functions MPI_Bcast and  MPI_Scatter of MPI

                        for (n = 0; n < m_nDivision; n++)

                        {

                                dStart = (double)n / m_nDivision;

                                pi.ComputeAsyn(dStart, dStep, nNum);

                        }

                        JobManager.UnlockIdentity(pi);

                }

 

                public bool BuildConnections()

                {

                        int n;

                        const int Count = 5;

                        CConnectionContext[] pConnectionContext = new CConnectionContext[Count];

                        for (n = 0; n < Count; n++)

                                pConnectionContext[n] = new CConnectionContext();

 

                        //set connection contexts

                        pConnectionContext[0].m_strHost = "127.0.0.1";

                        pConnectionContext[1].m_strHost = "localhost";

                        pConnectionContext[2].m_strHost = "127.0.0.1";

                        pConnectionContext[3].m_strHost = "localhost";

                        pConnectionContext[4].m_strHost = "127.0.0.1";

                        for (n = 0; n < Count; n++)

                        {

                                pConnectionContext[n].m_nPort = 20901;

                                pConnectionContext[n].m_strPassword = "SocketPro";

                                pConnectionContext[n].m_strUID = "PassOne";

                                pConnectionContext[n].m_EncrytionMethod = tagEncryptionMethod.NoEncryption;

                                pConnectionContext[n].m_bZip = false;

                        }

 

                        //start socket pool with 2*3 USocket objects

                        return StartSocketPool(pConnectionContext, 2, 3);

                }

 

                private object m_cs = new object();

                public PPi.main m_dlg;

 

                //protect the following member by monitor

                private double m_dPi = 0.0;

        }

        SocketPro supports object oriented design and programming with one client and many real servers. This is considerably different from MPI. However, it does support parallel computing with its own way through loading balance. Also SocketPro exposes many callbacks so that you can use them to show computing progress as shown in this sample.

        Following major OpenMPI functions can be found or easily implemented from SocketPro.

  • MPI_ABORT

  • MPI_BARRIER

  • MPI_BCAST

  • MPI_CANCEL

  • MPI_GATHER

  • MPI_RECV

  • MPI_REDUCE

  • MPI_SCATTER

  • MPI_SEND

  • MPI_WAIT

  • MPI_WAITALL

        Note that SocketPro does not have MPI RANK concept because SocketPro is not written from MPI specification at all.

6.    Key advantages of SocketPro approach over MPI

        There are many advantages of SocketPro over MPI in parallel computing. Here is the list of major ones only.

  • Real servers with different processing powers are balanced for loading with SocketPro. From the current specification of MPI, MPI does not support loading balance yet. When there are many real servers, these real servers may have different processing power. For example, some of them may have two CPU cores, but some of them may have two processors and each of processors may contain more than eight cores. SocketPro is able to automatically balance loadings so that power machines can process more jobs or tasks.

  • SocketPro supports automatic disaster recovery without your coding. It is common that some of real server may down unexpectedly because of power-off, wire unplugging and server application crash. SocketPro is able to handle these disasters and recovery them automatically. If your real servers are located at different companies through Internet connection, you must consider such disasters. At this writing time, MPI is not able to handle these disasters.

  • SocketPro development is simpler with commercial grade of debugging tools. At this time, debug tools for MPI are poor in comparison to commercial debugging tools like MS Visual studio and others. With SocketPro, you can take advantage of these tools. Some of these tools are free. For example, you can use MS studio express (free!) to develop parallel computing applications with help of SocketPro.

  • SocketPro favors OOD/OOP client/server development with divide-conquer approach. MPI code is usually difficult to understand to commercial application developers. In contrast, SocketPro favors object oriented programming style. You can easily divide a large and complicate task into small and simple tasks and conquer them one by one. This approach is simpler and easier to implement. MPI is relatively more difficult to do so.

  • SocketPro supports caching data on real servers using different ways and integration with other applications easily. When distributing tasks from one machine to others, you may like to cache a portion of data into memory, file and database on real servers. This is very easy to do with SocketPro on real server. 

7.    Manually and automatically create tasks and jobs

        SocketPro supports creating tasks and jobs either manually or automatically. The following code snippet shows how to create a job having one task manually.

        //manually divide a large task into m_nDivision sub-tasks

        int nTaskId;

        bool bSuc;

        short sRequestId = piConst.idComputeCPPi;

        CUQueue UQueue = new CUQueue();

        for(n=0; n<m_nDivision; n++)

        {

                dStart = (double)n/m_nDivision;

 

                UQueue.Push(dStart);

                UQueue.Push(dStep);

                UQueue.Push(nNum);

 

                IJobContext jc = JobManager.CreateJob(this);

                nTaskId = jc.AddTask(sRequestId, UQueue.GetBuffer(), UQueue.GetSize());

                bSuc = JobManager.EnqueueJob(jc);

                Process();

                UQueue.SetSize(0);

        }

        The code is simple. At the beginning, we create an instance of CUQueue that is used for serializing request input parameters dStart, dStep and nNum. Afterwards, use JobManager to create a job context (jc). Next, add a task by calling the method AddTask of the job context. Next, call JobManager method EnqueueJob to queue the job. Finally, call the method Process to distribute jobs if there is any real server on idle. In this example, we have totally 100 jobs. 

        The above code is somewhat tedious and redundant because the class CPiParallel already has asynchronous handler CPPi with serialization logic, which is derived from the base class CRequestAsynHandlerBase. We could reuse the existing serialization logic to automatically create tasks and jobs. Here is the code snippet for automatically creating tasks and jobs.

       //get an async handler

        CPPi pi = (CPPi)JobManager.LockIdentity();

 

        //a job containing one task only

        for (n = 0; n < m_nDivision; n++)

        {

                dStart = (double)n / m_nDivision;

                pi.ComputeAsyn(dStart, dStep, nNum);

        }

        JobManager.UnlockIdentity(pi);

        The above code starts locking an existing handler. Afterwards, use the existing asynchronous handler to create task and job automatically. At the very end, we release the previously locked handler. By this way, code is simplified.

        The above snippet shows how to create a job containing one task only. In case you like to create a job having multiple tasks, you need to wrap all of tasks between the methods StartJob and EndJob. Here is the code snippet for demonstration.

        //a job containing two tasks

        for (n = 0; n < m_nDivision; n++)

        {

                pi.GetAttchedClientSocket().StartJob();

                dStart = (double)n / m_nDivision;

                pi.ComputeAsyn(dStart, dStep, nNum);

                n += 1;

                dStart = (double)n / m_nDivision;

                pi.ComputeAsyn(dStart, dStep, nNum);

                pi.GetAttchedClientSocket().EndJob();

        }

8.    SocketPro loading balance round-robin algorithm

        All of loading balancers may have one or more algorithms for distributing jobs and tasks onto different real servers. SocketPro loading balance uses round-robin algorithm to distribute jobs onto real servers. See the below picture.

       

        SocketPro round-robin algorithm is dependent on identities. The above loading balancer has five identities 0, 1, 2, 3 and 4. Identities with SocketPro loading balance are references to arbitrary objects in .NET and pointers in C++. Job sharing and stealing are made through these identities. Whenever a job is completed, SocketPro will select a job from one of these identities to process. As shown from the above picture, an identity is a job queue containing many jobs, and each of jobs may have one or many tasks. If an identity does not have any job more in queue, it will be automatically removed from the round-robin.

        Whenever creating a job by calling the method JobManager.CreateJob, you actually set an Identity with the job. Once the job is en-queued, job manager will check if there is already an identity with round robin. If there is no existing identity, job manager will insert the identity with one job queued into its round robin. If the identify does exist, job manager simply appends the job with identity.

9.    Pitfalls with loading balance

        Parallel computation is harder than sequential computing. By nature, parallel computing has more pitfalls. SocketPro is not exceptional. Therefore, you may pay close attention to the following two pitfalls.

  • The first one is that the last queued job does not necessarily completed at the last and job completion order is different from job queue order. This sample contains trace codes to show job completion orders using job identification number. This is understandable because real servers may have different processing power. Even though all of real servers have the same CPU processing power, real servers may not have allocated different processing slice times among them.

  • The second one is that a job originates from an asynchronous handler and the returning results of its tasks may be processed through another asynchronous handler. That is to say that requests and their returning results are handled separately by different handlers. Here is the reason. Normally, a request is directly sent by an asynchronous handler through its attached client socket. However, a request is initially sent into a memory queue by an asynchronous handler through JobManager with SocketPro loading balance. Afterwards, when an idle real server is found, the request in memory queue is sent through a client socket and its attached handler that are connected to the idle real server. Note that the client socket and its attached handler may be different from the original request handler and client socket. 

10.    SocketPro loading balance as a service on server side

        The above sections detail how to use SocketPro loading balance on client side. Alternatively, you can use SocketPro loading balance as a service on server side. See the below picture for such a usage.

        

        With this scenario, all of internal and external clients access a central loading balancer. The loading balancer automatically distributes all of jobs onto one of many real servers for actual processing. Each of these real servers may be business logic tiers accessing backend databases, files or other data stores.

        This sample client will run with such a GUI interface.

        

11.    CClientPeer with implementation of required interface IPeerJobContext

        It is very easy to use SocketPro loading balance as a service on server side. The first step is to derive a class from the base class CClientPeer. Similar to the tutorial one, we need to implement two required virtual functions OnFastRequestArrive and OnSlowRequestArrive. Because we don't process any requests at the central loading balancer, you can just create empty functions.

        Further, it is also required that the derived class must be implemented with the interface IPeerJobContext. The interface has a number of functions to notify you when creating a job, adding a task and en-queuing a job from an instance peer socket object, as shown in the below code snippet.

        public class CPiPeer : CClientPeer, IPeerJobContext

        {

                protected override void OnFastRequestArrive(short sRequestID, int nLen)

                {

                        //intercept data inside m_UQueue, and modify it here if neccessary

                }

                protected override int OnSlowRequestArrive(short sRequestID, int nLen)

                {

                        //intercept data inside m_UQueue, and modify it here if neccessary

                        return 0;

                }

 

                private IJobManager m_JobManager;

                #region IPeerJobContext Members

                IJobManager IPeerJobContext.JobManager

                {

                        get

                        {

                                return m_JobManager;

                        }

                        set

                        {

                                m_JobManager = value;

                        }

                }

 

                void IPeerJobContext.OnAddingTask(IJobContext JobContext, short sRequestId)

                {

                }

 

                void IPeerJobContext.OnEnqueuingJob(IJobContext JobContext, short sRequestId)

                {

                }

 

                void IPeerJobContext.OnJobEnqueued(IJobContext JobContext, short sRequestId)

                {

                }

 

                void IPeerJobContext.OnJobJustCreated(IJobContext JobContext, short sRequestId)

                {

                }

 

                void IPeerJobContext.OnPeerDataSent(IJobContext JobContext, short sRequestId)

                {

                }

 

                bool IPeerJobContext.OnSendingPeerData(IJobContext JobContext, short sRequestId, CUQueue UQueue)

                {

                        //you can modify data inside UQueue here if neccessary

                        return true; //true, will send result data in UQueue onto client peer; false, will not

                }

 

                void IPeerJobContext.OnTaskJustAdded(IJobContext JobContext, int nTaskId, short sRequestId)

                {

                }

 

                void IPeerJobContext.OnWaitable(IJobContext JobContext, int nTaskId, short sRequestId)

                {

                        //you can call JobContext.Wait() here to barrier for result or something else.

                        //Must pay close attention to main thread or worker thread.

                        //In general, don't call JobContext.Wait() within main thread but worker thread only.

                }

                #endregion

        }

        If you don't want to process requests at the central loading balancer, the code is extremely simple. For this example, we don't have any particular requirement to pre-process requests at the central loading balancer. However, you need to pay attention to comments though. If you want to know a sample for processing special status, see the tutorial four here. The interface is useful if you like to extend the central load balancer with new features like online modification of requests and returning results, caching, and calling other services.

12.    Batch requests in one job on client side

        If a job contains two or more tasks or requests in one job context from a client, you need to call the methods StartJob and EndJob at the beginning and end, respectively. Without doing so, each of tasks or requests will be treated as job separately, and requests will be distributed onto different real servers, which you don't like. To make sure all of requests in one batch will be send onto one real server, you'd send all of requests in one batch between calling methods StartJob and EndJob as shown in the above section.

13.    Further readings