SkelGIS  3.0
/home/helene/Documents/These/SkelGIS/SkelGIS_Library/SkelGIS_V3/skelgis/data_structures/DMatrix/dmatrix_impl.hpp
Go to the documentation of this file.
00001 /*! \file dmatrix_impl.hpp
00002  *  \brief Definitions of the distributed matrix implementation object. This class is not used by the user. Four template specializations are available as for DMatrix.
00003  */
00004 #ifndef DMATRIX_IMPL_H
00005 #define DMATRIX_IMPL_H
00006 
00007 #include "dmatrix.hpp"
00008 #include "../../util/utility.hpp"
00009 #include "../../util/mpi_.hpp"
00010 #include "../../util/communications.hpp"
00011 
00012 #include "../../iterators/DMatrix/iterator.hpp"
00013 #include "../../iterators/DMatrix/iterator_cont.hpp"
00014 #include "../../iterators/DMatrix/iterator_rev.hpp"
00015 #include "../../iterators/DMatrix/iterator_phb.hpp"
00016 #include "../../iterators/DMatrix/iterator_step.hpp"
00017 #include "../../iterators/DMatrix/iterator_line.hpp"
00018 
00019 #include<math.h>
00020 
00021 namespace skelgis{
00022 
00023   //================================================================================
00024   //!  base template Distributed Matrix Class
00025   /*!
00026     This template class defines the base distributed matrix. Every DMatrix_impl class inherit from this one.
00027     \tparam T is the type of data in the matrix
00028   */
00029   //-------------------------------------------------------------------------------
00030   template<class T> struct DMatrix_base
00031   //-------------------------------------------------------------------------------
00032   {
00033   public:
00034     //! data of the distributed matrix
00035     /*!
00036       In this table are stored the data of the distributed matrix.
00037     */
00038     T * data;
00039     //! Column and row positions for the current process in the general data distribution
00040     /*!
00041       The data distribution is made along height and/or width, each process is associated to a pair (col,row) in this data distribution.
00042       If the distribution is line=true, the col=0 for all processes.
00043     */
00044     int col,row;
00045     //! Total number of columns and rows in the general data distribution
00046     int cols,rows;
00047     //! Header defining the global matrix before parallel distribution
00048     HEADER head;
00049     //! Header defining the local matrix of the current process after data distribution
00050     HEADER loc_head;
00051 
00052     bool local; /*!< if the DMatrix is local to one process */
00053 
00054     protected:
00055     //! default constructor of the distributed matrix interface
00056     //-------------------------------------------------------------------------------
00057     DMatrix_base(){}
00058     //-------------------------------------------------------------------------------
00059     //! destructor of the distributed matrix
00060     //-------------------------------------------------------------------------------
00061     ~DMatrix_base(){}
00062     //-------------------------------------------------------------------------------
00063     //! To get the height and the width of the last column and the last row
00064     /*!
00065       Because the data division along height and width are not always divisible by the number of processes, variables to remind the size of the last row and of the last column has to be defined.
00066     */
00067     int remainderh,remainderw;
00068     //-------------------------------------------------------------------------------
00069   };
00070 
00071   //================================================================================
00072   //!  DMatrix_impl class
00073   /*!
00074     template of the distributed matrix implementation of SkelGIS. This is the first specialization of the DMatrix_impl.
00075     \tparam T is the type of data to store in the DMatrix_impl
00076     \tparam R is the overlap distance needed by the calculation, in other words, the size of the physical border needed
00077     \tparam line is the parallel distribution wanted, the default value is true, then the parallel distribution will be divided along height but not along width
00078   */
00079   //-------------------------------------------------------------------------------
00080   template<class T,int R,bool line=true> struct DMatrix_impl : public DMatrix_base<T>
00081   //-------------------------------------------------------------------------------
00082   {
00083   public:
00084 
00085     HEADER border_head; /*!< Header defining the current local matrix with overlap R */
00086 
00087   protected:
00088     int mpi_ranks[2]; /*!< table of size 2 for the neighbor processes to make MPI exchanges */
00089     unsigned int beginPhBLeft,beginPhBRight,beginPhBUp,beginPhBDown; /*!< begining indexes for the physical borders iterators */
00090     unsigned int endPhBLeft,endPhBRight,endPhBUp,endPhBDown; /*!< ending indexes for the physical borders iterators */
00091     //-------------------------------------------------------------------------------
00092     //! Creation of the ditributed matrix
00093     /*!
00094       Data distribution and construction of the parallel data structure.
00095     */
00096     //-------------------------------------------------------------------------------
00097     void create()
00098     //-------------------------------------------------------------------------------
00099     {
00100       if(this->local==false)
00101         {
00102           //division of the matrix in cols and rows
00103           this->cols = 1;
00104           this->rows = Mpi_::mpi_nb;
00105 
00106           this->row=Mpi_::mpi_rank;
00107           this->col=0;
00108       
00109           this->loc_head.width=this->head.width;
00110       
00111           this-> remainderh = 0;
00112           //if the height of the matrix can be divided by the number of rows
00113           if(this->head.height%this->rows==0)
00114             this->loc_head.height=floor(this->head.height/this->rows);
00115           //else we add one to the height of the submatrix and the last row will have less elements
00116           else
00117             {
00118               this->remainderh = this->head.height - floor(this->head.height/this->rows)*this->rows;
00119 
00120               if(this->row<this->remainderh)
00121                 this->loc_head.height=floor(this->head.height/this->rows)+1;
00122               else
00123                 this->loc_head.height=floor(this->head.height/this->rows);
00124             }
00125 
00126           this->loc_head.x=this->head.x;
00127           this->loc_head.y=this->head.y-this->row*this->loc_head.height*this->head.spacing;
00128           this->loc_head.spacing=this->head.spacing;
00129           this->loc_head.nodata=this->head.nodata;
00130         }
00131       else
00132         {
00133           this->loc_head.x = this->head.x;
00134           this->loc_head.y = this->head.y;
00135           this->loc_head.width = this->head.width;
00136           this->loc_head.height = this->head.height;
00137           this->loc_head.spacing = this->head.spacing;
00138           this->loc_head.nodata = this->head.nodata;
00139 
00140           this->col=0;this->row=0;
00141           this->cols=1;this->rows=1;
00142           this-> remainderh = 0;
00143         }
00144 
00145       this->border_head.x = this->loc_head.x - R*this->loc_head.spacing;
00146       this->border_head.y = this->loc_head.y - R*this->loc_head.spacing;
00147       this->border_head.width = this->loc_head.width + R*2;
00148       this->border_head.height = this->loc_head.height + R*2;
00149       
00150       this->data = new T[this->border_head.width*this->border_head.height];
00151     }
00152     //-------------------------------------------------------------------------------
00153     //! Values initialization of the elements of the DMatrix_impl
00154     /*!
00155       \param value is the value to set to the elements
00156     */
00157     //-------------------------------------------------------------------------------
00158     void initValues(T value)
00159     //-------------------------------------------------------------------------------
00160     {
00161       for(unsigned int i=0;i<this->border_head.width*this->border_head.height;i++)
00162         {
00163           this->data[i]=value;
00164         }
00165     }
00166     //-------------------------------------------------------------------------------
00167     //! Calculation of the physical border limits
00168     //-------------------------------------------------------------------------------
00169     void phBLimits()
00170     //-------------------------------------------------------------------------------
00171     {
00172       beginPhBLeft = R*this->border_head.width;
00173       beginPhBRight = (R+1)*this->border_head.width-R;
00174       endPhBLeft = this->border_head.width*(this->border_head.height-R-1)+R;
00175       endPhBRight = (R+1+this->loc_head.height-1)*this->border_head.width;
00176 
00177       if(Mpi_::mpi_rank==0)
00178         {
00179           beginPhBUp = R;
00180           endPhBUp = R*this->border_head.width-R;
00181         }
00182       else
00183         {
00184           beginPhBUp = 0;
00185           endPhBUp = 0;
00186         }
00187       if(Mpi_::mpi_rank==Mpi_::mpi_nb-1)
00188         {
00189           beginPhBDown = this->border_head.width*(this->loc_head.height+R)+R;
00190           endPhBDown = this->border_head.width*this->border_head.height-R;
00191         }
00192       else
00193         {
00194           beginPhBDown = 0;
00195           endPhBDown = 0;
00196         }
00197     }
00198     //-------------------------------------------------------------------------------
00199     //! Read the zone of the binary file concerned for the current MPI process
00200     /*!
00201       \param binFile is binary file to read
00202     */
00203     //-------------------------------------------------------------------------------
00204     void read(const char * binFile)
00205     //-------------------------------------------------------------------------------
00206     {
00207       //read the zone of the file for my own block of data
00208       std::ifstream f(binFile, std::ios::binary | std::ios::in);
00209 
00210       //move to the begining of the bloc of data concerned
00211       unsigned int mvline;
00212       if(Mpi_::mpi_rank<this->remainderh)
00213         mvline = sizeof(HEADER) + (this->row*this->loc_head.height*this->head.width)*sizeof(T);
00214       else
00215         mvline = sizeof(HEADER) + ((this->remainderh*(this->loc_head.height+1)+(this->row-this->remainderh)*this->loc_head.height)*this->head.width)*sizeof(T);
00216 
00217       f.seekg(mvline,std::ios::beg);
00218 
00219       //read by lines
00220       for(unsigned int j=0;j<this->loc_head.height;j++)
00221         {
00222           for(unsigned int k=0;k<this->loc_head.width;k++)
00223             {
00224               T val;
00225               unsigned int rank = R*this->border_head.width + j*this->border_head.width + k + R;
00226               f.read(reinterpret_cast<char*>(&val),sizeof(T));
00227               this->data[rank]=val;
00228             }
00229         }
00230     }
00231     //-------------------------------------------------------------------------------
00232     //! Get ranks
00233     /*!
00234       Get the ranks of the 2 neighbors for MPI exchanges
00235       \return 
00236       if the value is -55555 then there is no neighbor in this direction
00237       if the value is >0 it represent the MPI rank concerned by the exchange
00238     */
00239     //-------------------------------------------------------------------------------
00240     void getNeighborRanks()
00241     //-------------------------------------------------------------------------------
00242     {
00243       for(int i=0;i<2;i++)
00244         mpi_ranks[i] = -55555;
00245       //up
00246       if((this->row-1)>=0)
00247         mpi_ranks[0] = Mpi_::mpi_rank-1;
00248       //down
00249       if((this->row+1)<this->rows)
00250         mpi_ranks[1] = Mpi_::mpi_rank+1;
00251     }
00252     //-------------------------------------------------------------------------------
00253     //! Get the up border to send to neighbor processors
00254     /*!
00255       \return table of values to send
00256     */
00257     //-------------------------------------------------------------------------------
00258     T * getUpBorderToSend()
00259     //-------------------------------------------------------------------------------
00260     {
00261       T * result = new T[this->loc_head.width*R];
00262       for(int i=0;i<R;i++)
00263         {
00264           for(unsigned int j=0;j<this->loc_head.width;j++)
00265             {
00266               unsigned int r1 = i*this->loc_head.width+j;
00267               unsigned int r2 = (R*border_head.width + R) + i*border_head.width+j;
00268               result[r1] = this->data[r2];
00269             }
00270         }
00271       return result;
00272     }
00273     //-------------------------------------------------------------------------------
00274     //! Set the up border ghost cells with neighbor values from processors
00275     /*!
00276       \param values is the table values to fill ghost cells with
00277     */
00278     //-------------------------------------------------------------------------------
00279     void setUpBorder(T * values)
00280     //-------------------------------------------------------------------------------
00281     {
00282       for(int i=0;i<R;i++)
00283         {
00284           for(unsigned int j=0;j<this->loc_head.width;j++)
00285             this->data[(R) + i*border_head.width+j] = values[i*this->loc_head.width+j];
00286         }
00287     }
00288     //-------------------------------------------------------------------------------
00289     //! Get the down border to send to neighbor processors
00290     /*!
00291       \return table of values to send
00292     */
00293     //-------------------------------------------------------------------------------
00294     T * getDownBorderToSend()
00295     //-------------------------------------------------------------------------------
00296     {
00297       T * result = new T[this->loc_head.width*R];
00298       for(int i=0;i<R;i++)
00299         {
00300           for(unsigned int j=0;j<this->loc_head.width;j++)
00301             {
00302               unsigned int r1 = i*this->loc_head.width+j;
00303               unsigned int r2 = (border_head.height*border_head.width - 2*R*border_head.width + R) + i*border_head.width + j;
00304               result[r1] = this->data[r2];
00305             }
00306         }
00307       return result;
00308     }
00309     //-------------------------------------------------------------------------------
00310     //! Set the down border ghost cells with neighbor values from processors
00311     /*!
00312       \param values is the table values to fill ghost cells with
00313     */
00314     //-------------------------------------------------------------------------------
00315     void setDownBorder(T * values)
00316     //-------------------------------------------------------------------------------
00317     {
00318       for(int i=0;i<R;i++)
00319         {
00320           for(unsigned int j=0;j<this->loc_head.width;j++)
00321             this->data[(border_head.width*border_head.height - border_head.width*R + R) + i*border_head.width +j] = values[i*this->loc_head.width+j];
00322         }
00323     }
00324     //-------------------------------------------------------------------------------
00325     //! Get all neighbor right values for the element to a precise rank
00326     /*!
00327       \param rank is the rank at which the neighbor values are wanted
00328       \return table of neighbor values
00329     */
00330     //-------------------------------------------------------------------------------
00331     T * allRight(unsigned int rank)
00332     //-------------------------------------------------------------------------------
00333     {
00334       T * res = new T[R];
00335       unsigned int r = rank+1;
00336       for(int i=0;i<R;i++)
00337         {
00338           res[i] = this->data[r];
00339           r++;
00340         }
00341       return res;
00342     }
00343     //-------------------------------------------------------------------------------
00344     //! Get all neighbor left values for the element to a precise rank
00345     /*!
00346       \param rank is the rank at which the neighbor values are wanted
00347       \return table of neighbor values
00348     */
00349     //-------------------------------------------------------------------------------
00350     T * allLeft(unsigned int rank)
00351     //-------------------------------------------------------------------------------
00352     {
00353       T * res = new T[R];
00354       unsigned int r = rank-1;
00355       for(int i=0;i<R;i++)
00356         {
00357           res[i] = this->data[r];
00358           r--;
00359         }
00360       return res;
00361     }
00362     //-------------------------------------------------------------------------------
00363     //! Get all neighbor up values for the element to a precise rank
00364     /*!
00365       \param rank is the rank at which the neighbor values are wanted
00366       \return table of neighbor values
00367     */
00368     //-------------------------------------------------------------------------------
00369     T * allUp(unsigned int rank,unsigned int width)
00370     //-------------------------------------------------------------------------------
00371     {
00372       T * res = new T[R];
00373       unsigned int r = rank - width;
00374       for(int i=0;i<R;i++)
00375         {
00376           res[i] = this->data[r];
00377           r = r - width;
00378         }
00379       return res;
00380     }
00381     //-------------------------------------------------------------------------------
00382     //! Get all neighbor down values for the element to a precise rank
00383     /*!
00384       \param rank is the rank at which the neighbor values are wanted
00385       \return table of neighbor values
00386     */
00387     //-------------------------------------------------------------------------------
00388     T * allDown(unsigned int rank,unsigned int width)
00389     //-------------------------------------------------------------------------------
00390     {
00391       T * res = new T[R];
00392       unsigned int r = rank + width;
00393       for(int i=0;i<R;i++)
00394         {
00395           res[i] = this->data[r];
00396           r = r + width;
00397         }
00398       return res;
00399     }
00400     //-------------------------------------------------------------------------------
00401     //! Get all neighbor right down values for the element to a precise rank
00402     /*!
00403       \param rank is the rank at which the neighbor values are wanted
00404       \return table of neighbor values
00405     */
00406     //-------------------------------------------------------------------------------
00407     T * allRightDown(unsigned int rank,unsigned int width)
00408     //-------------------------------------------------------------------------------
00409     {
00410       T * res = new T[R];
00411       unsigned int r = rank + width + 1;
00412       for(int i=0;i<R;i++)
00413         {
00414           res[i] = this->data[r];
00415           r = r + width + 1;
00416         }
00417       return res;
00418     }
00419     //-------------------------------------------------------------------------------
00420     //! Get all neighbor left down values for the element to a precise rank
00421     /*!
00422       \param rank is the rank at which the neighbor values are wanted
00423       \return table of neighbor values
00424     */
00425     //-------------------------------------------------------------------------------
00426     T * allLeftDown(unsigned int rank,unsigned int width)
00427     //-------------------------------------------------------------------------------
00428     {
00429       T * res = new T[R];
00430       unsigned int r = rank + width -1 ;
00431       for(int i=0;i<R;i++)
00432         {
00433           res[i] = this->data[r];
00434           r = r + width -1 ;
00435         }
00436       return res;
00437     }
00438     //-------------------------------------------------------------------------------
00439     //! Get all neighbor right up values for the element to a precise rank
00440     /*!
00441       \param rank is the rank at which the neighbor values are wanted
00442       \return table of neighbor values
00443     */
00444     //-------------------------------------------------------------------------------
00445     T * allRightUp(unsigned int rank,unsigned int width)
00446     //-------------------------------------------------------------------------------
00447     {
00448       T * res = new T[R];
00449       unsigned int r = rank - width + 1;
00450       for(int i=0;i<R;i++)
00451         {
00452           res[i] = this->data[r];
00453           r = r - width + 1;
00454         }
00455       return res;
00456     }
00457     //-------------------------------------------------------------------------------
00458     //! Get all neighbor left up values for the element to a precise rank
00459     /*!
00460       \param rank is the rank at which the neighbor values are wanted
00461       \return table of neighbor values
00462     */
00463     //-------------------------------------------------------------------------------
00464     T * allLeftUp(unsigned int rank,unsigned int width)
00465     //-------------------------------------------------------------------------------
00466     {
00467       T * res = new T[R];
00468       unsigned int r = rank - width - 1;
00469       for(int i=0;i<R;i++)
00470         {
00471           res[i] = this->data[r];
00472           r = r - width - 1;
00473         }
00474       return res;
00475     }
00476     //-------------------------------------------------------------------------------
00477     //! Get the nearest neighbor right value for the element to a precise rank
00478     /*!
00479       \param rank is the rank at which the neighbor value is wanted
00480       \return the neighbor value
00481     */
00482     //-------------------------------------------------------------------------------
00483     T right(unsigned int rank)
00484     //-------------------------------------------------------------------------------
00485     {
00486       unsigned int r = rank + 1;
00487       return this->data[r];
00488     }
00489     //-------------------------------------------------------------------------------
00490     //! Get the nearest neighbor left value for the element to a precise rank
00491     /*!
00492       \param rank is the rank at which the neighbor value is wanted
00493       \return the neighbor value
00494     */
00495     //-------------------------------------------------------------------------------
00496     T left(unsigned int rank)
00497     //-------------------------------------------------------------------------------
00498     {
00499       unsigned int r = rank - 1;
00500       return this->data[r];
00501     }
00502     //-------------------------------------------------------------------------------
00503     //! Get the nearest neighbor up value for the element to a precise rank
00504     /*!
00505       \param rank is the rank at which the neighbor value is wanted
00506       \return the neighbor value
00507     */
00508     //-------------------------------------------------------------------------------
00509     T up(unsigned int rank,unsigned int width)
00510     //-------------------------------------------------------------------------------
00511     {
00512       unsigned int r = rank - width;
00513       return this->data[r];
00514     }
00515     //-------------------------------------------------------------------------------
00516     //! Get the nearest neighbor down value for the element to a precise rank
00517     /*!
00518       \param rank is the rank at which the neighbor value is wanted
00519       \return the neighbor value
00520     */
00521     //-------------------------------------------------------------------------------
00522     T down(unsigned int rank,unsigned int width)
00523     //-------------------------------------------------------------------------------
00524     {
00525       unsigned int r = rank + width;
00526       return this->data[r];
00527     }
00528     //-------------------------------------------------------------------------------
00529     //! Get the nearest neighbor right down value for the element to a precise rank
00530     /*!
00531       \param rank is the rank at which the neighbor value is wanted
00532       \return the neighbor value
00533     */
00534     //-------------------------------------------------------------------------------
00535     T rightDown(unsigned int rank,unsigned int width)
00536     //-------------------------------------------------------------------------------
00537     {
00538       unsigned int r = rank + width +1;
00539       return this->data[r];
00540     }
00541     //-------------------------------------------------------------------------------
00542     //! Get the nearest neighbor left down value for the element to a precise rank
00543     /*!
00544       \param rank is the rank at which the neighbor value is wanted
00545       \return the neighbor value
00546     */
00547     //-------------------------------------------------------------------------------
00548     T leftDown(unsigned int rank,unsigned int width)
00549     //-------------------------------------------------------------------------------
00550     {
00551       unsigned int r = rank + width -1;
00552       return this->data[r];
00553     }
00554     //-------------------------------------------------------------------------------
00555     //! Get the nearest neighbor right up value for the element to a precise rank
00556     /*!
00557       \param rank is the rank at which the neighbor value is wanted
00558       \return the neighbor value
00559     */
00560     //-------------------------------------------------------------------------------
00561     T rightUp(unsigned int rank,unsigned int width)
00562     //-------------------------------------------------------------------------------
00563     {
00564       unsigned int r = rank - width +1;
00565       return this->data[r];
00566     }
00567     //-------------------------------------------------------------------------------
00568     //! Get the nearest neighbor left up value for the element to a precise rank
00569     /*!
00570       \param rank is the rank at which the neighbor value is wanted
00571       \return the neighbor value
00572     */
00573     //-------------------------------------------------------------------------------
00574     T leftUp(unsigned int rank,unsigned int width)
00575     //-------------------------------------------------------------------------------
00576     {
00577       unsigned int r = rank - width -1;
00578       return this->data[r];
00579     }
00580     //-------------------------------------------------------------------------------
00581     //! Get all values along X axe for the element to a precise rank
00582     /*!
00583       Equivalent to allLeft and allRight with a single call.
00584       \param rank is the rank at which the neighbor value is wanted
00585       \return a table of all neighbor values along X
00586     */
00587     //-------------------------------------------------------------------------------
00588     T * allX(unsigned int rank)
00589     //-------------------------------------------------------------------------------
00590     {
00591       T * res = new T[R*2];
00592       unsigned int b = rank - R;
00593       for(int i=0;i<R;i++)
00594         {
00595           res[i] = this->data[b];
00596           b++;
00597         }
00598       b++;
00599       for(int i=R;i<2*R;i++)
00600         {
00601           res[i] = this->data[b];
00602           b++;
00603         }
00604       return res;
00605     }
00606     //-------------------------------------------------------------------------------
00607     //! Get all values along X axe for the element to a precise rank
00608     /*!
00609       Equivalent to allLeft and allRight with a single call.
00610       \param rank is the rank at which the neighbor value is wanted
00611       \param result table of all neighbor values along X
00612     */
00613     //-------------------------------------------------------------------------------
00614     void allX(unsigned int rank, T * t)
00615     //-------------------------------------------------------------------------------
00616     {
00617       unsigned int b = rank - R;
00618       for(int i=0;i<R;i++)
00619         {
00620           t[i] = this->data[b];
00621           b++;
00622         }
00623       b++;
00624       for(int i=R;i<2*R;i++)
00625         {
00626           t[i] = this->data[b];
00627           b++;
00628         }
00629     }
00630     //-------------------------------------------------------------------------------
00631     //! Get all values along Y axe for the element to a precise rank
00632     /*!
00633       Equivalent to allDown and allUp with a single call.
00634       \param rank is the rank at which the neighbor value is wanted
00635       \return a table of all neighbor values along X
00636     */
00637     //-------------------------------------------------------------------------------
00638     T * allY(unsigned int rank,unsigned int width)
00639     //-------------------------------------------------------------------------------
00640     {
00641       T * res = new T[R*2];
00642       unsigned int b = rank - R*width;
00643 
00644       for(int i=0;i<R;i++)
00645         {
00646           res[i] = this->data[b];
00647           b = b + width;
00648         }
00649       b = b + width;
00650       for(int i=R;i<2*R;i++)
00651         {
00652           res[i] = this->data[b];
00653           b = b + width;
00654         }
00655       return res;
00656     }
00657     //-------------------------------------------------------------------------------
00658     //! Get all values along Y axe for the element to a precise rank
00659     /*!
00660       Equivalent to allDown and allUp with a single call.
00661       \param rank is the rank at which the neighbor value is wanted
00662       \param result table of all neighbor values along X
00663     */
00664     //-------------------------------------------------------------------------------
00665     void allY(unsigned int rank,unsigned int width, T * t)
00666     //-------------------------------------------------------------------------------
00667     {
00668       unsigned int b = rank - R*width;
00669 
00670       for(int i=0;i<R;i++)
00671         {
00672           t[i] = this->data[b];
00673           b = b + width;
00674         }
00675       b = b + width;
00676       for(int i=R;i<2*R;i++)
00677         {
00678           t[i] = this->data[b];
00679           b = b + width;
00680         }
00681     }
00682     //-------------------------------------------------------------------------------
00683     //! Get all 8 directions neighbor values for the element to a precise rank
00684     /*!
00685       Equivalent to leftUp, up, rightUp, right, rightDown, down, leftDown and left with a single call.
00686       \param rank is the rank at which the neighbor value is wanted
00687       \return a table of all 8 directions neighbor values
00688     */
00689     //-------------------------------------------------------------------------------
00690     T * eight(unsigned int rank,unsigned int width)
00691     //-------------------------------------------------------------------------------
00692     {
00693       T * res = new T[8];
00694       res[0] = leftUp(rank,width);
00695       res[1] = up(rank,width);
00696       res[2] = rightUp(rank,width);
00697       res[3] = right(rank);
00698       res[4] = rightDown(rank,width);
00699       res[5] = down(rank,width);
00700       res[6] = leftDown(rank,width);
00701       res[7] = left(rank);
00702       return res;
00703     }
00704     //-------------------------------------------------------------------------------
00705     //! Get all 4 directions neighbor values for the element to a precise rank
00706     /*!
00707       Equivalent to up, right, down and left with a single call.
00708       \param rank is the rank at which the neighbor value is wanted
00709       \return a table of all 8 directions neighbor values
00710     */
00711     //-------------------------------------------------------------------------------
00712     T * four(unsigned int rank,unsigned int width)
00713     //-------------------------------------------------------------------------------
00714     {
00715       T * res = new T[8];
00716       res[0] = up(rank,width);
00717       res[1] = right(rank);
00718       res[2] = down(rank,width);
00719       res[3] = left(rank);
00720       return res;
00721     }
00722     //-------------------------------------------------------------------------------
00723     //! Get the nearest right neighbor value inside the domain to a precise rank
00724     /*!
00725       This work for every size of R and will always give the nearest right inside-domain value.
00726       \param rank is the rank at which the neighbor value is wanted
00727       \return the neighbor value
00728     */
00729     //-------------------------------------------------------------------------------
00730     T inright(unsigned int rank)
00731     //-------------------------------------------------------------------------------
00732     {
00733       unsigned int r = rank + R;
00734       return this->data[r];
00735     }
00736     //-------------------------------------------------------------------------------
00737     //! Get the nearest left neighbor value inside the domain to a precise rank
00738     /*!
00739       This work for every size of R and will always give the nearest left inside-domain value.
00740       \param rank is the rank at which the neighbor value is wanted
00741       \return the neighbor value
00742     */
00743     //-------------------------------------------------------------------------------
00744     T inleft(unsigned int rank)
00745     //-------------------------------------------------------------------------------
00746     {
00747       unsigned int r = rank - R;
00748       return this->data[r];
00749     }
00750     //-------------------------------------------------------------------------------
00751     //! Get the nearest up neighbor value inside the domain to a precise rank
00752     /*!
00753       This work for every size of R and will always give the nearest up inside-domain value.
00754       \param rank is the rank at which the neighbor value is wanted
00755       \return the neighbor value
00756     */
00757     //-------------------------------------------------------------------------------
00758     T inup(unsigned int rank,unsigned int width)
00759     //-------------------------------------------------------------------------------
00760     {
00761       unsigned int r = rank - R*width;
00762       return this->data[r];
00763     }
00764     //-------------------------------------------------------------------------------
00765     //! Get the nearest down neighbor value inside the domain to a precise rank
00766     /*!
00767       This work for every size of R and will always give the nearest down inside-domain value.
00768       \param rank is the rank at which the neighbor value is wanted
00769       \return the neighbor value
00770     */
00771     //-------------------------------------------------------------------------------
00772     T indown(unsigned int rank,unsigned int width)
00773     //-------------------------------------------------------------------------------
00774     {
00775       unsigned int r = rank + R*width;
00776       return this->data[r];
00777     }
00778     //-------------------------------------------------------------------------------
00779     //! Get the indexes associated to a precise rank
00780     /*!
00781       \param rank is the rank at which the indexes values are wanted
00782       \param col is the column return value
00783       \param li is the line return value
00784     */
00785     //-------------------------------------------------------------------------------
00786     void indexesIt(unsigned int rank,int &col,int &li)
00787     //-------------------------------------------------------------------------------
00788     {
00789       unsigned int relative_rank = rank - R*border_head.width - (floor(rank/border_head.width)-R)*R*2 - R;
00790       li = floor(relative_rank/this->loc_head.width);
00791       col = relative_rank%this->loc_head.width;
00792     }
00793     //-------------------------------------------------------------------------------
00794 
00795   public:
00796     //! default constructor of the distributed matrix interface
00797     //-------------------------------------------------------------------------------
00798     DMatrix_impl() : DMatrix_base<T>(){}
00799     //-------------------------------------------------------------------------------
00800     //! constructor of the distributed matrix
00801     /*!
00802       This constructor will simply allocate the good space.
00803       \param h is the header to use to build the matrix
00804       \param value is the default value to put in the matrix
00805       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
00806     */
00807     //-------------------------------------------------------------------------------
00808     DMatrix_impl(HEADER h,const T value, bool loc=false) : DMatrix_base<T>()
00809     //-------------------------------------------------------------------------------
00810     {
00811       this->head=h;
00812       this->local=loc;
00813       create();
00814       initValues(value);
00815       getNeighborRanks();
00816       phBLimits();
00817     }
00818     //-------------------------------------------------------------------------------
00819     //! constructor of the distributed matrix
00820     /*!
00821       The constructor evaluate how to divide the matrix in submatrice and do the work.
00822       \param binFile is the path of the binary file you want to work on
00823       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
00824     */
00825     //-------------------------------------------------------------------------------
00826     DMatrix_impl(const char * binFile, bool loc=false) : DMatrix_base<T>()
00827     //-------------------------------------------------------------------------------
00828     {
00829       this->head = Header::read(binFile);
00830       this->local=loc;
00831       create();
00832       initValues(0);
00833       read(binFile);
00834       getNeighborRanks();
00835       phBLimits();
00836     }
00837     //-------------------------------------------------------------------------------
00838     //! destructor of the distributed matrix
00839     //-------------------------------------------------------------------------------
00840     ~DMatrix_impl()
00841     //-------------------------------------------------------------------------------
00842     {
00843       if(NULL!=this->data)
00844         {
00845           delete [] this->data;
00846           this->data = NULL;
00847         }
00848     }
00849     //-------------------------------------------------------------------------------
00850     //! Set value in the global middle
00851     /*!
00852       \param val is the value to affect
00853     */
00854     //-------------------------------------------------------------------------------
00855     void setGlobalMiddleValue(T val)
00856     //-------------------------------------------------------------------------------
00857     {
00858       if(this->row!=this->rows-1 || this->rows==1)
00859         {
00860           unsigned int middle_h = floor(this->head.height/2)-1;
00861           if(this->loc_head.height*this->row<=middle_h && this->loc_head.height*(this->row+1)>middle_h)
00862             {
00863               unsigned int j_loc = floor(this->head.width/2)-1;
00864               unsigned int i_loc = middle_h-this->row*this->loc_head.height;
00865               unsigned int rank = R*border_head.width +  i_loc*border_head.width + j_loc + R;
00866               this->data[rank] = val;
00867             }
00868         }
00869     }
00870     //-------------------------------------------------------------------------------
00871     //! to set all the values of the physical border of the matrix
00872     /*!
00873       \param val is the value to set
00874     */
00875     //-------------------------------------------------------------------------------
00876     inline void setPhysicalBorder(T val)
00877     {
00878       iterator_phb_left<T,R> left =begin_phb_left();
00879       iterator_phb_left<T,R> leftEnd = end_phb_left();
00880       for(left;left<leftEnd;++left)
00881         this->data[left._rank] = val;
00882       iterator_phb_right<T,R> right =begin_phb_right();
00883       iterator_phb_right<T,R> rightEnd = end_phb_right();
00884       for(right;right<rightEnd;++right)
00885         this->data[right._rank] = val;
00886       iterator_phb_up<T,R> up =begin_phb_up();
00887       iterator_phb_up<T,R> upEnd = end_phb_up();
00888       for(up;up<upEnd;++up)
00889         this->data[up._rank] = val;
00890       iterator_phb_down<T,R> down =begin_phb_down();
00891       iterator_phb_down<T,R> downEnd = end_phb_down();
00892       for(down;down<downEnd;++down)
00893         this->data[down._rank] = val;
00894     }
00895     //-------------------------------------------------------------------------------
00896     //! to set all the values on the right of physical border of the matrix
00897     /*!
00898       \param val is the value to set
00899     */
00900     //-------------------------------------------------------------------------------
00901     inline void setRightPhysicalBorder(T val)
00902     {
00903       iterator_phb_right<T,R> it =begin_phb_right();
00904       iterator_phb_right<T,R> itEnd = end_phb_right();
00905       for(it;it<itEnd;++it)
00906         this->data[it._rank] = val;
00907     }
00908     //-------------------------------------------------------------------------------
00909     //! to set all the values on the left of physical border of the matrix
00910     /*!
00911       \param val is the value to set
00912     */
00913     //-------------------------------------------------------------------------------
00914     inline void setLeftPhysicalBorder(T val)
00915     {
00916       iterator_phb_left<T,R> it =begin_phb_left();
00917       iterator_phb_left<T,R> itEnd = end_phb_left();
00918       for(it;it<itEnd;++it)
00919         this->data[it._rank] = val;
00920     }
00921     //-------------------------------------------------------------------------------
00922     //! to set all the values on the up of physical border of the matrix
00923     /*!
00924       \param val is the value to set
00925     */
00926     //-------------------------------------------------------------------------------
00927     inline void setUpPhysicalBorder(T val)
00928     {
00929       iterator_phb_up<T,R> it =begin_phb_up();
00930       iterator_phb_up<T,R> itEnd = end_phb_up();
00931       for(it;it<itEnd;++it)
00932         this->data[it._rank] = val;
00933     }
00934     //-------------------------------------------------------------------------------
00935     //! to set all the values on the down of physical border of the matrix
00936     /*!
00937       \param val is the value to set
00938     */
00939     //-------------------------------------------------------------------------------
00940     void setDownPhysicalBorder(T val)
00941     {
00942       iterator_phb_down<T,R> it =begin_phb_down();
00943       iterator_phb_down<T,R> itEnd = end_phb_down();
00944       for(it;it<itEnd;++it)
00945         this->data[it._rank] = val;
00946     }
00947     //-------------------------------------------------------------------------------
00948     //! Get the beginning iterator of the DMatrix (on the first element of the DMatrix)
00949     /*!
00950       This method exists for each type of iterator in SkelGIS.
00951       \return the iterator of the first element
00952     */
00953     //-------------------------------------------------------------------------------
00954     inline iterator<T,R> begin(){return iterator<T,R>(R*this->border_head.width+R,this->border_head.width);}
00955     inline iterator_cont<T,R> begin_cont(){return iterator_cont<T,R>(R*this->border_head.width+R,this->border_head.width);}
00956     inline iterator_rev<T,R> begin_rev(){return iterator_rev<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
00957     inline iterator_step<T,R> begin_step(int step,int nb){return iterator_step<T,R>(R*this->border_head.width+R,this->border_head.width,step,nb);}
00958     inline iterator_phb_left<T,R> begin_phb_left(){return iterator_phb_left<T,R>(beginPhBLeft,this->border_head.width);}
00959     inline iterator_phb_right<T,R> begin_phb_right(){return iterator_phb_right<T,R>(beginPhBRight,this->border_head.width);}
00960     inline iterator_phb_up<T,R> begin_phb_up(){return iterator_phb_up<T,R>(beginPhBUp,this->border_head.width);}
00961     inline iterator_phb_down<T,R> begin_phb_down(){return iterator_phb_down<T,R>(beginPhBDown,this->border_head.width);}
00962     inline iterator_line<T,R> begin_line(){return iterator_line<T,R>(R*this->border_head.width+R,this->border_head.width);}
00963     //-------------------------------------------------------------------------------
00964     //! Get the ending iterator of the DMatrix (on the last element of the DMatrix)
00965     /*!
00966       This method exists for each type of iterator in SkelGIS.
00967       \return the iterator of the last element
00968     */
00969     //-------------------------------------------------------------------------------
00970     inline iterator<T,R> end(){return iterator<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
00971     inline iterator_cont<T,R> end_cont(){return iterator_cont<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
00972     inline iterator_rev<T,R> end_rev(){return iterator_rev<T,R>(R*this->border_head.width+R,this->border_head.width);}
00973     inline iterator_step<T,R> end_step(){return iterator_step<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
00974     inline iterator_phb_left<T,R> end_phb_left(){return iterator_phb_left<T,R>(endPhBLeft,this->border_head.width);}
00975     inline iterator_phb_right<T,R> end_phb_right(){return iterator_phb_right<T,R>(endPhBRight,this->border_head.width);}
00976     inline iterator_phb_up<T,R> end_phb_up(){return iterator_phb_up<T,R>(endPhBUp,this->border_head.width);}
00977     inline iterator_phb_down<T,R> end_phb_down(){return iterator_phb_down<T,R>(endPhBDown,this->border_head.width);}
00978     inline iterator_line<T,R> end_line(){return iterator_line<T,R>((this->border_head.width*this->border_head.height)-(R+1)*this->border_head.width+R,this->border_head.width);}
00979     //-------------------------------------------------------------------------------
00980     //! get the iterator on the matrix at position (x,y)
00981     /*!
00982       \param x is row index
00983       \pram y is column index
00984       \return returns the iterator at position (x,y)
00985     */
00986     iterator<T,R> getIterator(int col,int li)
00987     //-------------------------------------------------------------------------------
00988     {
00989       if(NULL!=this->data)
00990         {
00991           unsigned int rank = R*border_head.width + R + li*border_head.width+col;
00992           return iterator<T,R>(rank,this->border_head.width);
00993         }
00994     }
00995     //-------------------------------------------------------------------------------
00996     //! get the iterator contiguous on the matrix at position (x,y)
00997     /*!
00998       \param x is row index
00999       \pram y is column index
01000       \return returns the iterator at position (x,y)
01001     */
01002     iterator_cont<T,R> getIterator_cont(int col,int li)
01003     //-------------------------------------------------------------------------------
01004     {
01005       if(NULL!=this->data)
01006         {
01007           unsigned int rank = R*border_head.width + R + li*border_head.width+col;
01008           return iterator_cont<T,R>(rank,this->border_head.width);
01009         }
01010     }
01011     //-------------------------------------------------------------------------------
01012     //! get the iterator reverse on the matrix at position (x,y)
01013     /*!
01014       \param x is row index
01015       \pram y is column index
01016       \return returns the iterator at position (x,y)
01017     */
01018     iterator_rev<T,R> getIterator_rev(int col,int li)
01019     //-------------------------------------------------------------------------------
01020     {
01021       if(NULL!=this->data)
01022         {
01023           unsigned int rank = R*border_head.width + R + li*border_head.width+col;
01024           return iterator_rev<T,R>(rank,this->border_head.width);
01025         }
01026     }
01027     //-------------------------------------------------------------------------------
01028     //! get the indexes of the iterator on the matrix
01029     /*!
01030       \param it is the iterator from which the indexes will be taken
01031       \param x is the iterator position x returned
01032       \param y is the iterator position y returned
01033     */
01034     void getIndexes(iterator<T,R> it,int &col,int &li){indexesIt(it._rank,col,li);}
01035     void getIndexes(iterator_cont<T,R> it,int &col,int &li){indexesIt(it._rank,col,li);}
01036     void getIndexes(iterator_rev<T,R> it,int &col,int &li){indexesIt(it._rank,col,li);}
01037     //-------------------------------------------------------------------------------
01038     //! Print the matrix by bloc
01039     //-------------------------------------------------------------------------------
01040     void print()
01041     //-------------------------------------------------------------------------------
01042     {
01043       std::stringstream st;
01044       for(unsigned int i=0;i<this->loc_head.height;i++)
01045         {
01046           for(unsigned int j=0;j<this->loc_head.width;j++)
01047             {
01048               st<<this->data[R*this->border_head.width + R + i*this->border_head.width+j]<<" ";
01049             }
01050           st<<"\n";
01051         }
01052       Mpi_::print(st.str());
01053     }
01054     //-------------------------------------------------------------------------------
01055     //! Print the matrix by bloc with borders
01056     //-------------------------------------------------------------------------------
01057     void printAll()
01058     //-------------------------------------------------------------------------------
01059     {
01060       std::stringstream st;
01061       for(unsigned int i=0;i<this->border_head.height;i++)
01062         {
01063           for(unsigned int j=0;j<this->border_head.width;j++)
01064             {
01065               st<<this->data[i*this->border_head.width+j]<<" ";
01066             }
01067           st<<"\n";
01068         }
01069       Mpi_::print(st.str());
01070     }
01071     //-------------------------------------------------------------------------------
01072     //! Write the zone file concerned for the current MPI process
01073     /*!
01074       write the MPI prcess zone, and spread the data in the subMatrices OpenMP
01075       only one submatrice if OpenMP is disabled
01076       \param binFile is binary file to read
01077     */
01078     //-------------------------------------------------------------------------------
01079     void write(char * binFile)
01080     //-------------------------------------------------------------------------------
01081     {
01082       MPI_File fh;
01083       MPI_Status status;
01084       //open the output file
01085       MPI_File_open(MPI_COMM_WORLD, binFile,
01086                 MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
01087    
01088       // process 0 writes the HEADER of the file
01089       if (Mpi_::mpi_rank==0)
01090         MPI_File_write_at(fh, 0,reinterpret_cast<char*>(&this->head) ,sizeof(HEADER) , MPI_CHAR, &status);
01091       //the begining of the bloc of write in
01092       unsigned int mvline;
01093       if (Mpi_::mpi_rank<this->remainderh)
01094         mvline=sizeof(HEADER)+this->row*this->loc_head.height*this->loc_head.width*sizeof(T);
01095       else
01096         mvline=sizeof(HEADER)+(this->row*this->loc_head.height+this->remainderh)*this->loc_head.width*sizeof(T);
01097 
01098       // pointer in the data of the dmatrix
01099       T *data_pointer=this->data+R*(this->border_head.width+1);
01100 
01101       //write by lines
01102       for(unsigned int j=0;j<this->loc_head.height;j++)
01103         {
01104           MPI_File_write_at(fh, mvline,reinterpret_cast<char*>(data_pointer) ,this->loc_head.width*sizeof(T) , MPI_CHAR, &status);
01105           data_pointer+=this->border_head.width;
01106           mvline+=this->loc_head.width*sizeof(T);
01107         }
01108       MPI_File_close(&fh);
01109     }
01110     //-------------------------------------------------------------------------------
01111     //! get borders
01112     /*!
01113       Get the borders from other processors
01114       MPI exchanges
01115     */
01116     //-------------------------------------------------------------------------------
01117     void getBorders()
01118     //-------------------------------------------------------------------------------
01119     {     
01120       if(this->row!=0)
01121         {
01122           T * toSend = getUpBorderToSend();
01123           T * toGet = new T[this->loc_head.width*R];
01124           Communications<T>::Exchanges(toSend,toGet,this->loc_head.width*R,mpi_ranks[0]);
01125           setUpBorder(toGet);
01126           delete [] toSend;
01127           delete [] toGet;
01128 
01129         }
01130       if(this->row!=(this->rows-1))
01131         {
01132           T * toSend = getDownBorderToSend();
01133           T * toGet = new T[this->loc_head.width*R];
01134           Communications<T>::Exchanges(toSend,toGet,this->loc_head.width*R,mpi_ranks[1]);
01135           setDownBorder(toGet);
01136           delete [] toSend;
01137           delete [] toGet;
01138         }
01139     }
01140     //-------------------------------------------------------------------------------
01141     //! Get all neighbor right values for the element at iterator it
01142     /*!
01143       The returned table will contain R elements. 
01144       These elements are the R values on the right of the current it.
01145       This method exists for each type of iterator with right values in SkelGIS.
01146       \param it is the iterator position
01147       \return a table of right values
01148     */
01149     //-------------------------------------------------------------------------------
01150     inline T * getAllRight(iterator<T,R> it){return allRight(it._rank);}
01151     inline T * getAllRight(iterator_cont<T,R> it){return allRight(it._rank);}
01152     inline T * getAllRight(iterator_step<T,R> it){return allRight(it._rank);}
01153     inline T * getAllRight(iterator_rev<T,R> it){return allRight(it._rank);}
01154     inline T * getAllRight(iterator_phb_left<T,R> it){return allRight(it._rank);}
01155     //-------------------------------------------------------------------------------
01156     //! Get all neighbor left values for the element at iterator it
01157     /*!
01158       The returned table will contain R elements. 
01159       These elements are the R values on the left of the current it.
01160       This method exists for each type of iterator with left values in SkelGIS.
01161       \param it is the iterator position
01162       \return a table of left values
01163     */
01164     //-------------------------------------------------------------------------------
01165     inline T * getAllLeft(iterator<T,R> it){return allLeft(it._rank);}
01166     inline T * getAllLeft(iterator_cont<T,R> it){return allLeft(it._rank);}
01167     inline T * getAllLeft(iterator_step<T,R> it){return allLeft(it._rank);}
01168     inline T * getAllLeft(iterator_rev<T,R> it){return allLeft(it._rank);}
01169     inline T * getAllLeft(iterator_phb_right<T,R> it){return allLeft(it._rank);}
01170     //-------------------------------------------------------------------------------
01171     //! Get all neighbor up values for the element at iterator it
01172     /*!
01173       The returned table will contain R elements. 
01174       These elements are the R values on the up of the current it.
01175       This method exists for each type of iterator with up values in SkelGIS.
01176       \param it is the iterator position
01177       \return a table of up values
01178     */
01179     //-------------------------------------------------------------------------------
01180     inline T * getAllUp(iterator<T,R> it){return allUp(it._rank,it._width);}
01181     inline T * getAllUp(iterator_cont<T,R> it){return allUp(it._rank,it._width);}
01182     inline T * getAllUp(iterator_step<T,R> it){return allUp(it._rank,it._width);}
01183     inline T * getAllUp(iterator_rev<T,R> it){return allUp(it._rank,it._width);}
01184     inline T * getAllUp(iterator_phb_down<T,R> it){return allUp(it._rank,it._width);}
01185     //-------------------------------------------------------------------------------
01186     //! Get all neighbor down values for the element at iterator it
01187     /*!
01188       The returned table will contain R elements. 
01189       These elements are the R values on the down of the current it.
01190       This method exists for each type of iterator with down values in SkelGIS.
01191       \param it is the iterator position
01192       \return a table of down values
01193     */
01194     //-------------------------------------------------------------------------------
01195     inline T * getAllDown(iterator<T,R> it){return allDown(it._rank,it._width);}
01196     inline T * getAllDown(iterator_cont<T,R> it){return allDown(it._rank,it._width);}
01197     inline T * getAllDown(iterator_step<T,R> it){return allDown(it._rank,it._width);}
01198     inline T * getAllDown(iterator_rev<T,R> it){return allDown(it._rank,it._width);}
01199     inline T * getAllDown(iterator_phb_up<T,R> it){return allDown(it._rank,it._width);}
01200     //-------------------------------------------------------------------------------
01201     //! Get all neighbor right down values for the element at iterator it
01202     /*!
01203       The returned table will contain R elements. 
01204       These elements are the R values on the right down of the current it.
01205       This method exists for each type of iterator with right down values in SkelGIS.
01206       \param it is the iterator position
01207       \return a table of right down values
01208     */
01209     //-------------------------------------------------------------------------------
01210     inline T * getAllRightDown(iterator<T,R> it){return allRightDown(it._rank,it._width);}
01211     inline T * getAllRightDown(iterator_cont<T,R> it){return allRightDown(it._rank,it._width);}
01212     inline T * getAllRightDown(iterator_step<T,R> it){return allRightDown(it._rank,it._width);}
01213     inline T * getAllRightDown(iterator_rev<T,R> it){return allRightDown(it._rank,it._width);}
01214     //-------------------------------------------------------------------------------
01215     //! Get all neighbor left down values for the element at iterator it
01216     /*!
01217       The returned table will contain R elements. 
01218       These elements are the R values on the left down of the current it.
01219       This method exists for each type of iterator with left down values in SkelGIS.
01220       \param it is the iterator position
01221       \return a table of left down values
01222     */
01223     //-------------------------------------------------------------------------------
01224     inline T * getAllLeftDown(iterator<T,R> it){return allLeftDown(it._rank,it._width);}
01225     inline T * getAllLeftDown(iterator_cont<T,R> it){return allLeftDown(it._rank,it._width);}
01226     inline T * getAllLeftDown(iterator_step<T,R> it){return allLeftDown(it._rank,it._width);}
01227     inline T * getAllLeftDown(iterator_rev<T,R> it){return allLeftDown(it._rank,it._width);}
01228     //-------------------------------------------------------------------------------
01229     //! Get all neighbor right up values for the element at iterator it
01230     /*!
01231       The returned table will contain R elements. 
01232       These elements are the R values on the right up of the current it.
01233       This method exists for each type of iterator with right up values in SkelGIS.
01234       \param it is the iterator position
01235       \return a table of right up values
01236     */
01237     //-------------------------------------------------------------------------------
01238     inline T * getAllRightUp(iterator<T,R> it){return allRightUp(it._rank,it._width);}
01239     inline T * getAllRightUp(iterator_cont<T,R> it){return allRightUp(it._rank,it._width);}
01240     inline T * getAllRightUp(iterator_step<T,R> it){return allRightUp(it._rank,it._width);}
01241     inline T * getAllRightUp(iterator_rev<T,R> it){return allRightUp(it._rank,it._width);}
01242     //-------------------------------------------------------------------------------
01243     //! Get all neighbor left up values for the element at iterator it
01244     /*!
01245       The returned table will contain R elements. 
01246       These elements are the R values on the left up of the current it.
01247       This method exists for each type of iterator with left up values in SkelGIS.
01248       \param it is the iterator position
01249       \return a table of left up values
01250     */
01251     //-------------------------------------------------------------------------------
01252     inline T * getAllLeftUp(iterator<T,R> it){return allLeftUp(it._rank,it._width);}
01253     inline T * getAllLeftUp(iterator_cont<T,R> it){return allLeftUp(it._rank,it._width);}
01254     inline T * getAllLeftUp(iterator_step<T,R> it){return allLeftUp(it._rank,it._width);}
01255     inline T * getAllLeftUp(iterator_rev<T,R> it){return allLeftUp(it._rank,it._width);}
01256     //-------------------------------------------------------------------------------
01257     //! Get the nearest right neighbor value for the element at iterator it
01258     /*!
01259       This method exists for each type of iterator with right value in SkelGIS.
01260       \param it is the iterator position
01261       \return the right neighbor element value
01262     */
01263     //-------------------------------------------------------------------------------
01264     inline T getRight(iterator<T,R> it){return right(it._rank);}
01265     inline T getRight(iterator_cont<T,R> it){return right(it._rank);}
01266     inline T getRight(iterator_step<T,R> it){return right(it._rank);}
01267     inline T getRight(iterator_rev<T,R> it){return right(it._rank);}
01268     inline T getRight(iterator_phb_left<T,R> it){return right(it._rank);}
01269     //-------------------------------------------------------------------------------
01270     //! Get the nearest left neighbor value for the element at iterator it
01271     /*!
01272       This method exists for each type of iterator with left value in SkelGIS.
01273       \param it is the iterator position
01274       \return the left neighbor element value
01275     */
01276     //-------------------------------------------------------------------------------
01277     inline T getLeft(iterator<T,R> it){return left(it._rank);}
01278     inline T getLeft(iterator_cont<T,R> it){return left(it._rank);}
01279     inline T getLeft(iterator_step<T,R> it){return left(it._rank);}
01280     inline T getLeft(iterator_rev<T,R> it){return left(it._rank);}
01281     inline T getLeft(iterator_phb_right<T,R> it){return left(it._rank);}
01282     //-------------------------------------------------------------------------------
01283     //! Get the nearest up neighbor value for the element at iterator it
01284     /*!
01285       This method exists for each type of iterator with up value in SkelGIS.
01286       \param it is the iterator position
01287       \return the up neighbor element value
01288     */
01289     //-------------------------------------------------------------------------------
01290     inline T getUp(iterator<T,R> it){return up(it._rank,it._width);}
01291     inline T getUp(iterator_cont<T,R> it){return up(it._rank,it._width);}
01292     inline T getUp(iterator_step<T,R> it){return up(it._rank,it._width);}
01293     inline T getUp(iterator_rev<T,R> it){return up(it._rank,it._width);}
01294     inline T getUp(iterator_phb_down<T,R> it){return up(it._rank,it._width);}
01295     //-------------------------------------------------------------------------------
01296     //! Get the nearest down neighbor value for the element at iterator it
01297     /*!
01298       This method exists for each type of iterator with down value in SkelGIS.
01299       \param it is the iterator position
01300       \return the down neighbor element value
01301     */
01302     //-------------------------------------------------------------------------------
01303     inline T getDown(iterator<T,R> it){return down(it._rank,it._width);}
01304     inline T getDown(iterator_cont<T,R> it){return down(it._rank,it._width);}
01305     inline T getDown(iterator_step<T,R> it){return down(it._rank,it._width);}
01306     inline T getDown(iterator_rev<T,R> it){return down(it._rank,it._width);}
01307     inline T getDown(iterator_phb_up<T,R> it){return down(it._rank,it._width);}
01308     //-------------------------------------------------------------------------------
01309     //! Get the nearest right down neighbor value for the element at iterator it
01310     /*!
01311       This method exists for each type of iterator with right down value in SkelGIS.
01312       \param it is the iterator position
01313       \return the right down neighbor element value
01314     */
01315     //-------------------------------------------------------------------------------
01316     inline T getRightDown(iterator<T,R> it){return rightDown(it._rank,it._width);}
01317     inline T getRightDown(iterator_cont<T,R> it){return rightDown(it._rank,it._width);}
01318     inline T getRightDown(iterator_step<T,R> it){return rightDown(it._rank,it._width);}
01319     inline T getRightDown(iterator_rev<T,R> it){return rightDown(it._rank,it._width);}
01320     //-------------------------------------------------------------------------------
01321     //! Get the nearest left down neighbor value for the element at iterator it
01322     /*!
01323       This method exists for each type of iterator with left down value in SkelGIS.
01324       \param it is the iterator position
01325       \return the left down neighbor element value
01326     */
01327     //-------------------------------------------------------------------------------
01328     inline T getLeftDown(iterator<T,R> it){return leftDown(it._rank,it._width);}
01329     inline T getLeftDown(iterator_cont<T,R> it){return leftDown(it._rank,it._width);}
01330     inline T getLeftDown(iterator_step<T,R> it){return leftDown(it._rank,it._width);}
01331     inline T getLeftDown(iterator_rev<T,R> it){return leftDown(it._rank,it._width);}
01332     //-------------------------------------------------------------------------------
01333     //! Get the nearest right up neighbor value for the element at iterator it
01334     /*!
01335       This method exists for each type of iterator with right up value in SkelGIS.
01336       \param it is the iterator position
01337       \return the right up neighbor element value
01338     */
01339     //-------------------------------------------------------------------------------
01340     inline T getRightUp(iterator<T,R> it){return rightUp(it._rank,it._width);}
01341     inline T getRightUp(iterator_cont<T,R> it){return rightUp(it._rank,it._width);}
01342     inline T getRightUp(iterator_step<T,R> it){return rightUp(it._rank,it._width);}
01343     inline T getRightUp(iterator_rev<T,R> it){return rightUp(it._rank,it._width);}
01344     //-------------------------------------------------------------------------------
01345     //! Get the nearest left up neighbor value for the element at iterator it
01346     /*!
01347       This method exists for each type of iterator with left up value in SkelGIS.
01348       \param it is the iterator position
01349       \return the left up neighbor element value
01350     */
01351     //-------------------------------------------------------------------------------
01352     inline T getLeftUp(iterator<T,R> it){return leftUp(it._rank,it._width);}
01353     inline T getLeftUp(iterator_cont<T,R> it){return leftUp(it._rank,it._width);}
01354     inline T getLeftUp(iterator_step<T,R> it){return leftUp(it._rank,it._width);}
01355     inline T getLeftUp(iterator_rev<T,R> it){return leftUp(it._rank,it._width);}
01356     //-------------------------------------------------------------------------------
01357     //! Get all values along X axe for the element at iterator it
01358     /*!
01359       Equivalent to getAllLeft and getAllRight with a single call.
01360       This method exists for each type of iterator with left and right values in SkelGIS.
01361       \param it is the iterator position
01362       \return a table of all neighbor values along X
01363     */
01364     //-------------------------------------------------------------------------------
01365     inline T * getAllX(iterator<T,R> it){return allX(it._rank);}
01366     inline T * getAllX(iterator_cont<T,R> it){return allX(it._rank);}
01367     inline T * getAllX(iterator_step<T,R> it){return allX(it._rank);}
01368     inline T * getAllX(iterator_rev<T,R> it){return allX(it._rank);}
01369     //-------------------------------------------------------------------------------
01370     //! Get all values along X axe for the element at iterator it
01371     /*!
01372       Equivalent to getAllLeft and getAllRight with a single call.
01373       This method exists for each type of iterator with left and right values in SkelGIS.
01374       \param it is the iterator position
01375       \param result table of all neighbor values along X
01376     */
01377     //-------------------------------------------------------------------------------
01378     inline void getAllX(iterator<T,R> it, T * t){return allX(it,t);}
01379     inline void getAllX(iterator_cont<T,R> it, T * t){return allX(it,t);}
01380     inline void getAllX(iterator_rev<T,R> it, T * t){return allX(it,t);}
01381     inline void getAllX(iterator_step<T,R> it, T * t){return allX(it,t);}
01382     //-------------------------------------------------------------------------------
01383     //! Get all values along Y axe for the element at iterator it
01384     /*!
01385       Equivalent to getAllUp and getAllDown with a single call.
01386       This method exists for each type of iterator with left and right values in SkelGIS.
01387       \param it is the iterator position
01388       \return a table of all neighbor values along Y
01389     */
01390     //-------------------------------------------------------------------------------
01391     inline T * getAllY(iterator<T,R> it){return allY(it._rank,it._width);}
01392     inline T * getAllY(iterator_cont<T,R> it){return allY(it._rank,it._width);}
01393     inline T * getAllY(iterator_step<T,R> it){return allY(it._rank,it._width);}
01394     inline T * getAllY(iterator_rev<T,R> it){return allY(it._rank,it._width);}
01395     //-------------------------------------------------------------------------------
01396     //! Get all values along Y axe for the element at iterator it
01397     /*!
01398       Equivalent to getAllUp and getAllDown with a single call.
01399       This method exists for each type of iterator with left and right values in SkelGIS.
01400       \param it is the iterator position
01401       \param result table of all neighbor values along Y
01402     */
01403     //-------------------------------------------------------------------------------
01404     inline void getAllY(iterator<T,R> it, T * t){return allY(it,t);}
01405     inline void getAllY(iterator_cont<T,R> it, T * t){return allY(it,t);}
01406     inline void getAllY(iterator_rev<T,R> it, T * t){return allY(it,t);}
01407     inline void getAllY(iterator_step<T,R> it, T * t){return allY(it,t);}
01408     //-------------------------------------------------------------------------------
01409     //! Get all 8 directions neighbor values for the element at iterator it
01410     /*!
01411       Equivalent to getLeftUp, getUp, getRightUp, getRight, getRightDown, getDown, getLeftDown and getLeft with a single call.
01412       \param it is the iterator position
01413       \return a table of all 8 directions neighbor values
01414     */
01415     //-------------------------------------------------------------------------------
01416     inline T * get8(iterator<T,R> it){return eight(it._rank,it._width);}
01417     inline T * get8(iterator_cont<T,R> it){return eight(it._rank,it._width);}
01418     inline T * get8(iterator_step<T,R> it){return eight(it._rank,it._width);}
01419     inline T * get8(iterator_rev<T,R> it){return eight(it._rank,it._width);}
01420     //-------------------------------------------------------------------------------
01421     //! Get all 4 directions neighbor values for the element at iterator it
01422     /*!
01423       Equivalent to getUp, getRight, getDown and getLeft with a single call.
01424       \param it is the iterator position
01425       \return a table of all 4 directions neighbor values
01426     */
01427     //-------------------------------------------------------------------------------
01428     inline T * get4(iterator<T,R> it){return four(it._rank,it._width);}
01429     inline T * get4(iterator_cont<T,R> it){return four(it._rank,it._width);}
01430      inline T * get4(iterator_step<T,R> it){return four(it._rank,it._width);}
01431     inline T * get4(iterator_rev<T,R> it){return four(it._rank,it._width);}
01432     //-------------------------------------------------------------------------------
01433     //! Get the nearest right neighbor value inside the domain, for physical left border iterators
01434     /*!
01435       This work for every size of R and will always give the nearest right inside-domain value.
01436       \param it is the physical border iterator position
01437       \return the neighbor value
01438     */
01439     //-------------------------------------------------------------------------------
01440     inline T getInRight(iterator_phb_left<T,R> it){return inright(it._rank);}
01441     //-------------------------------------------------------------------------------
01442     //! Get the nearest left neighbor value inside the domain, for physical right border iterators
01443     /*!
01444       This work for every size of R and will always give the nearest left inside-domain value.
01445       \param it is the physical border iterator position
01446       \return the neighbor value
01447     */
01448     //-------------------------------------------------------------------------------
01449     inline T getInLeft(iterator_phb_right<T,R> it){return inleft(it._rank);}
01450     //-------------------------------------------------------------------------------
01451     //! Get the nearest up neighbor value inside the domain, for physical down border iterators
01452     /*!
01453       This work for every size of R and will always give the nearest up inside-domain value.
01454       \param it is the physical border iterator position
01455       \return the neighbor value
01456     */
01457     //-------------------------------------------------------------------------------
01458     inline T getInUp(iterator_phb_down<T,R> it){return inup(it._rank,it._width);}
01459     //-------------------------------------------------------------------------------
01460     //! Get the nearest down neighbor value inside the domain, for physical up border iterators
01461     /*!
01462       This work for every size of R and will always give the nearest down inside-domain value.
01463       \param it is the physical border iterator position
01464       \return the neighbor value
01465     */
01466     //-------------------------------------------------------------------------------
01467     inline T getInDown(iterator_phb_up<T,R> it){return indown(it._rank,it._width);}
01468     //-------------------------------------------------------------------------------
01469     
01470   };
01471   
01472   //================================================================================
01473   //!  DMatrix_impl class
01474   /*!
01475     template of the distributed matrix implementation of SkelGIS. This is the second specialization of the DMatrix_impl.
01476     \tparam T is the type of data to store in the DMatrix_impl
01477     R overlap template parameter is not asked and specialized to 0.
01478     The default value of line is used (true).
01479   */
01480   //-------------------------------------------------------------------------------
01481   template<class T> struct DMatrix_impl<T,0,true> : DMatrix_base<T>
01482   //-------------------------------------------------------------------------------
01483   {
01484   protected:
01485 
01486     //-------------------------------------------------------------------------------
01487     //! Creation of the ditributed matrix
01488     /*!
01489       Data distribution and construction of the parallel data structure.
01490     */
01491     //-------------------------------------------------------------------------------
01492     void create()
01493     //-------------------------------------------------------------------------------
01494     {
01495       if(this->local==false)
01496         {
01497           //division of the matrix in cols and rows
01498           this->cols = 1;
01499           this->rows = Mpi_::mpi_nb;
01500 
01501           this->row=Mpi_::mpi_rank;
01502           this->col=0;
01503 
01504           this->loc_head.width=this->head.width;
01505 
01506           this-> remainderh = 0;
01507           //if the height of the matrix can be divided by the number of rows
01508           if(this->head.height%this->rows==0)
01509             this->loc_head.height=floor(this->head.height/this->rows);
01510           //else we add one to the height of the submatrix and the last row will have less elements
01511           else
01512             {
01513               this->remainderh = this->head.height - floor(this->head.height/this->rows)*this->rows;
01514 
01515               if(this->row<this->remainderh)
01516                 this->loc_head.height=floor(this->head.height/this->rows)+1;
01517               else
01518                 this->loc_head.height=floor(this->head.height/this->rows);
01519             }
01520 
01521           this->loc_head.x=this->head.x;
01522           this->loc_head.y=this->head.y-this->row*this->loc_head.height*this->head.spacing;
01523           this->loc_head.spacing=this->head.spacing;
01524           this->loc_head.nodata=this->head.nodata;
01525         }
01526       else
01527         {
01528           this->loc_head.x = this->head.x;
01529           this->loc_head.y = this->head.y;
01530           this->loc_head.width = this->head.width;
01531           this->loc_head.height = this->head.height;
01532           this->loc_head.spacing = this->head.spacing;
01533           this->loc_head.nodata = this->head.nodata;
01534 
01535           this->col=0;this->row=0;
01536           this->cols=1;this->rows=1;
01537           this-> remainderh = 0;
01538         }
01539       
01540       this->data = new T[this->loc_head.width*this->loc_head.height];
01541     }
01542     //-------------------------------------------------------------------------------
01543     //! Values initialization of the elements of the DMatrix_impl
01544     /*!
01545       \param value is the value to set to the elements
01546     */
01547     //-------------------------------------------------------------------------------
01548     void initValues(T value)
01549     //-------------------------------------------------------------------------------
01550     {
01551       for(unsigned int i=0;i<this->loc_head.width*this->loc_head.height;i++)
01552         {
01553           this->data[i]=value;
01554         }
01555     }
01556     //-------------------------------------------------------------------------------
01557     //! Read the zone of the binary file concerned for the current MPI process
01558     /*!
01559       \param binFile is binary file to read
01560     */
01561     //-------------------------------------------------------------------------------
01562     void read(const char * binFile)
01563     //-------------------------------------------------------------------------------
01564     {
01565       //read the zone of the file for my own block of data
01566       std::ifstream f(binFile, std::ios::binary | std::ios::in);
01567 
01568       //move to the begining of the bloc of data concerned
01569       unsigned int mvline;
01570       if(Mpi_::mpi_rank<this->remainderh)
01571         mvline = sizeof(HEADER) + (this->row*this->loc_head.height*this->head.width)*sizeof(T);
01572       else
01573         mvline = sizeof(HEADER) + ((this->remainderh*(this->loc_head.height+1)+(this->row-this->remainderh)*this->loc_head.height)*this->head.width)*sizeof(T);
01574 
01575       f.seekg(mvline,std::ios::beg);
01576 
01577       //read by lines
01578       for(unsigned int j=0;j<this->loc_head.height;j++)
01579         {
01580           for(unsigned int k=0;k<this->loc_head.width;k++)
01581             {
01582               T val;
01583               unsigned int rank = j*this->loc_head.width + k;
01584               f.read(reinterpret_cast<char*>(&val),sizeof(T));
01585               this->data[rank]=val;
01586             }
01587         }
01588     }
01589     //-------------------------------------------------------------------------------
01590     //! Get the indexes associated to a precise rank
01591     /*!
01592       \param rank is the rank at which the indexes values are wanted
01593       \param col is the column return value
01594       \param li is the line return value
01595     */
01596     //-------------------------------------------------------------------------------
01597     void indexesIt(unsigned int rank,int &col,int &li)
01598     //-------------------------------------------------------------------------------
01599     {
01600       li = floor(rank/this->loc_head.width);
01601       col = rank%this->loc_head.width;
01602     }
01603     //-------------------------------------------------------------------------------
01604 
01605   public:
01606     //! default constructor of the distributed matrix interface
01607     //-------------------------------------------------------------------------------
01608     DMatrix_impl() : DMatrix_base<T>(){}
01609     //-------------------------------------------------------------------------------
01610     //! constructor of the distributed matrix
01611     /*!
01612       This constructor will simply allocate the good space.
01613       \param h is the header to use to build your matrix
01614       \param value is the default value to put in the matrix
01615       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
01616     */
01617     //-------------------------------------------------------------------------------
01618     DMatrix_impl(HEADER h,const T value, bool loc=false) : DMatrix_base<T>()
01619     //-------------------------------------------------------------------------------
01620     {
01621       this->head=h;
01622       this->local=loc;
01623       create();
01624       initValues(value);
01625     }
01626     //-------------------------------------------------------------------------------
01627     //! constructor of the distributed matrix
01628     /*!
01629       The constructor evaluate how to divide the matrix in submatrice and do the work.
01630       \param binFile is the path of the binary file you want to work on
01631       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
01632     */
01633     //-------------------------------------------------------------------------------
01634     DMatrix_impl(const char * binFile, bool loc=false) : DMatrix_base<T>()
01635     //-------------------------------------------------------------------------------
01636     {
01637       this->head = Header::read(binFile);
01638       this->local=loc;
01639       create();
01640       initValues(0);
01641       read(binFile);
01642     }
01643     //-------------------------------------------------------------------------------
01644     //! destructor of the distributed matrix
01645     //-------------------------------------------------------------------------------
01646     ~DMatrix_impl()
01647     //-------------------------------------------------------------------------------
01648     {
01649       if(NULL!=this->data)
01650         delete [] this->data;
01651     }
01652     //-------------------------------------------------------------------------------
01653     //! Get the beginning iterator of the DMatrix (on the first element of the DMatrix)
01654     /*!
01655       This method exists for each type of iterator in SkelGIS.
01656       \return the iterator of the first element
01657     */
01658     //-------------------------------------------------------------------------------
01659     inline iterator<T,0> begin(){return iterator<T,0>(0);}
01660     inline iterator_cont<T,0> begin_cont(){return iterator_cont<T,0>(0);}
01661     inline iterator_step<T,0> begin_step(int step,int nb){return iterator_step<T,0>(0,step,nb);}
01662     inline iterator_rev<T,0> begin_rev(){return iterator_rev<T,0>(this->loc_head.width*this->loc_head.height-1);}
01663     inline iterator_line<T,0> begin_line(){return iterator_line<T,0>(0,this->loc_head.width);}
01664     //-------------------------------------------------------------------------------
01665     //! Get the ending iterator of the DMatrix (on the last element of the DMatrix)
01666     /*!
01667       This method exists for each type of iterator in SkelGIS.
01668       \return the iterator of the last element
01669     */
01670     //-------------------------------------------------------------------------------
01671     inline iterator<T,0> end(){return iterator<T,0>(this->loc_head.width*this->loc_head.height-1);}
01672     inline iterator_cont<T,0> end_cont(){return iterator_cont<T,0>(this->loc_head.width*this->loc_head.height-1);}
01673     inline iterator_step<T,0> end_step(){return iterator_step<T,0>(this->loc_head.width*this->loc_head.height-1);}
01674     inline iterator_rev<T,0> end_rev(){return iterator_rev<T,0>(0);}
01675     inline iterator_line<T,0> end_line(){return iterator_line<T,0>(this->loc_head.width*this->loc_head.height-this->loc_head.width,this->loc_head.width);}
01676     //-------------------------------------------------------------------------------
01677     //! get the iterator on the matrix at position (x,y)
01678     /*!
01679       \param x is row index
01680       \pram y is column index
01681       \return returns the iterator at position (x,y)
01682     */
01683     iterator<T,0> getIterator(int col,int li)
01684     //-------------------------------------------------------------------------------
01685     {
01686       if(NULL!=this->data)
01687         {
01688           unsigned int rank =li*this->loc_head.width+col;
01689           return iterator<T,0>(rank,this->loc_head.width);
01690         }
01691     }
01692     //-------------------------------------------------------------------------------
01693     //! get the iterator contiguous on the matrix at position (x,y)
01694     /*!
01695       \param x is row index
01696       \pram y is column index
01697       \return returns the iterator at position (x,y)
01698     */
01699     iterator_cont<T,0> getIterator_cont(int col,int li)
01700     //-------------------------------------------------------------------------------
01701     {
01702       if(NULL!=this->data)
01703         {
01704           unsigned int rank = li*this->loc_head.width+col;
01705           return iterator_cont<T,0>(rank,this->loc_head.width);
01706         }
01707     }
01708     //-------------------------------------------------------------------------------
01709     //! get the iterator reverse on the matrix at position (x,y)
01710     /*!
01711       \param x is row index
01712       \pram y is column index
01713       \return returns the iterator at position (x,y)
01714     */
01715     iterator_rev<T,0> getIterator_rev(int col,int li)
01716     //-------------------------------------------------------------------------------
01717     {
01718       if(NULL!=this->data)
01719         {
01720           unsigned int rank = li*this->loc_head.width+col;
01721           return iterator_rev<T,0>(rank,this->loc_head.width);
01722         }
01723     }
01724     //-------------------------------------------------------------------------------
01725     //! get the indexes of the iterator on the matrix
01726     /*!
01727       \param it is the iterator from which the indexes will be taken
01728       \param x is the iterator position x returned
01729       \param y is the iterator position y returned
01730     */
01731     void getIndexes(iterator<T,0> it,int &col,int &li){indexesIt(it._rank,col,li);}
01732     void getIndexes(iterator_cont<T,0> it,int &col,int &li){indexesIt(it._rank,col,li);}
01733     void getIndexes(iterator_rev<T,0> it,int &col,int &li){indexesIt(it._rank,col,li);}
01734     //-------------------------------------------------------------------------------
01735     //! Print the matrix by bloc
01736     //-------------------------------------------------------------------------------
01737     void print()
01738     //-------------------------------------------------------------------------------
01739     {
01740       std::stringstream st;
01741       for(unsigned int i=0;i<this->loc_head.height;i++)
01742         {
01743           for(unsigned int j=0;j<this->loc_head.width;j++)
01744             {
01745               st<<this->data[i*this->loc_head.width+j]<<" ";
01746             }
01747           st<<"\n";
01748         }
01749       Mpi_::print(st.str());
01750     }
01751     //-------------------------------------------------------------------------------
01752     //! Write the zone file concerned for the current MPI process
01753     /*!
01754       write the MPI prcess zone, and spread the data in the subMatrices OpenMP
01755       only one submatrice if OpenMP is disabled
01756       \param binFile is binary file to write in
01757     */
01758     //-------------------------------------------------------------------------------
01759     void write(char * binFile)
01760     //-------------------------------------------------------------------------------
01761     {
01762       MPI_File fh;
01763       MPI_Status status;
01764       //open the output file
01765       MPI_File_open(MPI_COMM_WORLD, binFile,
01766                 MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
01767    
01768       // process 0 writes the HEADER of the file
01769       if (Mpi_::mpi_rank==0)
01770         MPI_File_write_at(fh, 0,reinterpret_cast<char*>(&this->head) ,sizeof(HEADER) , MPI_CHAR, &status);
01771       //the begining of the bloc of write in
01772       unsigned int mvline;
01773       if (Mpi_::mpi_rank<this->remainderh)
01774         mvline=sizeof(HEADER)+this->row*this->loc_head.height*this->loc_head.width*sizeof(T);
01775       else
01776         mvline=sizeof(HEADER)+(this->row*this->loc_head.height+this->remainderh)*this->loc_head.width*sizeof(T);
01777 
01778       // pointer in the data of the dmatrix
01779       T *data_pointer=this->data;
01780 
01781       //write by lines
01782       for(unsigned int j=0;j<this->loc_head.height;j++)
01783         {
01784           MPI_File_write_at(fh, mvline,reinterpret_cast<char*>(data_pointer) ,this->loc_head.width*sizeof(T) , MPI_CHAR, &status);
01785           data_pointer+=this->loc_head.width;
01786           mvline+=this->loc_head.width*sizeof(T);
01787         }
01788       MPI_File_close(&fh);
01789     }
01790     //-------------------------------------------------------------------------------
01791   };
01792 
01793   //================================================================================
01794   //!  DMatrix_impl class
01795   /*!
01796     template of the distributed matrix implementation of SkelGIS. This is the first specialization of the DMatrix_impl.
01797     \tparam T is the type of data to store in the DMatrix_impl
01798     \tparam R is the overlap distance needed by the calculation, in other words, the size of the physical border needed
01799     The line template parameter is specialized with the false value. In this case the parallel data distribution will be done along height and width (by block).
01800   */
01801   //-------------------------------------------------------------------------------
01802   template<class T,int R> struct DMatrix_impl<T,R,false> : public DMatrix_base<T>
01803   //-------------------------------------------------------------------------------
01804   {
01805   public:
01806 
01807     HEADER border_head; /*!< Header defining the current local matrix with overlap R */
01808 
01809   protected:
01810     int mpi_ranks[8]; /*!< table of size 8 for the neighbor processes to make MPI exchanges */
01811     unsigned int beginPhBLeft,beginPhBRight,beginPhBUp,beginPhBDown; /*!< begining indexes for the physical borders iterators */
01812     unsigned int endPhBLeft,endPhBRight,endPhBUp,endPhBDown; /*!< ending indexes for the physical borders iterators */
01813     //-------------------------------------------------------------------------------
01814     //! Creation of the ditributed matrix
01815     /*!
01816       Data distribution and construction of the parallel data structure.
01817     */
01818     //-------------------------------------------------------------------------------
01819     void create()
01820     //-------------------------------------------------------------------------------
01821     {
01822       if(this->local==false)
01823         {
01824           //division of the matrix in cols and rows
01825           this->cols = floor(sqrt(Mpi_::mpi_nb));
01826           int nb = 1;
01827           while(Mpi_::mpi_nb%this->cols!=0)
01828             {
01829               this->cols = floor(sqrt(Mpi_::mpi_nb))-nb;
01830               nb++;
01831             }
01832           this->rows = Mpi_::mpi_nb/this->cols;
01833 
01834           this->row=floor(Mpi_::mpi_rank/this->cols);
01835           this->col=Mpi_::mpi_rank%this->cols;
01836 
01837           this-> remainderh = 0;
01838           //if the height of the matrix can be divided by the number of rows
01839           if(this->head.height%this->rows==0)
01840             this->loc_head.height=floor(this->head.height/this->rows);
01841           //else we add one to the height of the submatrix and the last row will have less elements
01842           else
01843             {
01844               this->remainderh = this->head.height - floor(this->head.height/this->rows)*this->rows;
01845 
01846               if(this->row<this->remainderh)
01847                 this->loc_head.height=floor(this->head.height/this->rows)+1;
01848               else
01849                 this->loc_head.height=floor(this->head.height/this->rows);
01850             }
01851 
01852           this-> remainderw = 0;
01853           //if the height of the matrix can be divided by the number of rows
01854           if(this->head.width%this->cols==0)
01855             this->loc_head.width=floor(this->head.width/this->cols);
01856           //else we add one to the height of the submatrix and the last row will have less elements
01857           else
01858             {
01859               this->remainderw = this->head.width - floor(this->head.width/this->cols)*this->cols;
01860 
01861               if(this->col<this->remainderw)
01862                 this->loc_head.width=floor(this->head.width/this->cols)+1;
01863               else
01864                 this->loc_head.width=floor(this->head.width/this->cols);
01865             }
01866         
01867           this->loc_head.x=this->head.x+this->col*this->loc_head.width*this->head.spacing;
01868           this->loc_head.y=this->head.y-this->row*this->loc_head.height*this->head.spacing;
01869           this->loc_head.spacing=this->head.spacing;
01870           this->loc_head.nodata=this->head.nodata;
01871         }
01872       else
01873         {
01874           this->loc_head.x = this->head.x;
01875           this->loc_head.y = this->head.y;
01876           this->loc_head.width = this->head.width;
01877           this->loc_head.height = this->head.height;
01878           this->loc_head.spacing = this->head.spacing;
01879           this->loc_head.nodata = this->head.nodata;
01880 
01881           this->col=0;this->row=0;
01882           this->cols=1;this->rows=1;
01883           this-> remainderw = 0;
01884           this-> remainderh = 0;
01885         }
01886 
01887       this->border_head.x = this->loc_head.x - R*this->loc_head.spacing;
01888       this->border_head.y = this->loc_head.y - R*this->loc_head.spacing;
01889       this->border_head.width = this->loc_head.width + R*2;
01890       this->border_head.height = this->loc_head.height + R*2;
01891       this->data = new T[this->border_head.width*this->border_head.height];
01892     }
01893     //-------------------------------------------------------------------------------
01894     //! Values initialization of the elements of the DMatrix_impl
01895     /*!
01896       \param value is the value to set to the elements
01897     */
01898     //-------------------------------------------------------------------------------
01899     void initValues(T value)
01900     //-------------------------------------------------------------------------------
01901     {
01902       for(unsigned int i=0;i<this->border_head.width*this->border_head.height;i++)
01903         {
01904           this->data[i]=value;
01905         }
01906     }
01907     //-------------------------------------------------------------------------------
01908     //! Calculation of the physical border limits
01909     //-------------------------------------------------------------------------------
01910     void phBLimits()
01911     //-------------------------------------------------------------------------------
01912     {
01913       if(this->col==0)
01914         {
01915           beginPhBLeft = R*this->border_head.width;
01916           endPhBLeft = this->border_head.width*(this->border_head.height-R-1)+R;
01917         }
01918       else
01919         {
01920           beginPhBLeft = 0;
01921           endPhBLeft = 0;
01922         }
01923 
01924       if(this->col==this->cols-1)
01925         {
01926           beginPhBRight = (R+1)*this->border_head.width-R;
01927           endPhBRight = (R+1+this->loc_head.height-1)*this->border_head.width;
01928         }
01929       else
01930         {
01931           beginPhBRight = 0;
01932           endPhBRight = 0;
01933         }
01934 
01935       if(this->row==0)
01936         {
01937           beginPhBUp = R;
01938           endPhBUp = R*this->border_head.width-R;
01939         }
01940       else
01941         {
01942           beginPhBUp = 0;
01943           endPhBUp = 0;
01944         }
01945 
01946       if(this->row==this->rows-1)
01947         {
01948           beginPhBDown = this->border_head.width*(this->loc_head.height+R)+R;
01949           endPhBDown = this->border_head.width*this->border_head.height-R;
01950         }
01951       else
01952         {
01953           beginPhBDown = 0;
01954           endPhBDown = 0;
01955         }
01956     }
01957     //-------------------------------------------------------------------------------
01958     //! Read the zone of the binary file concerned for the current MPI process
01959     /*!
01960       \param binFile is binary file to read
01961     */
01962     //-------------------------------------------------------------------------------
01963     void read(const char * binFile)
01964     //-------------------------------------------------------------------------------
01965     {
01966       //read the zone of the file for my own block of data
01967       std::ifstream f(binFile, std::ios::binary | std::ios::in);
01968 
01969       //move to the begining of the bloc of data concerned
01970       unsigned int mvline;
01971       if(this->row<this->remainderh && this->col<this->remainderw)
01972         mvline = sizeof(HEADER) + (this->col*this->loc_head.width
01973                                    +this->row*this->loc_head.height*this->head.width)*sizeof(T);
01974       else if(this->row<this->remainderh)
01975         mvline = sizeof(HEADER) + (this->remainderw*(this->loc_head.width+1)+(this->col-this->remainderw)*this->loc_head.width
01976                                    +(this->row*this->loc_head.height)
01977                                    *this->head.width)*sizeof(T);
01978       else if(this->col<this->remainderw)
01979         mvline = sizeof(HEADER) + (this->col*this->loc_head.width
01980                                    +(this->remainderh*(this->loc_head.height+1)+(this->row-this->remainderh)*this->loc_head.height)
01981                                    *this->head.width)*sizeof(T);
01982       else
01983         mvline = sizeof(HEADER) + (this->remainderw*(this->loc_head.width+1)+(this->col-this->remainderw)*this->loc_head.width
01984                                    +(this->remainderh*(this->loc_head.height+1)+(this->row-this->remainderh)*this->loc_head.height)
01985                                    *this->head.width)*sizeof(T);
01986 
01987       f.seekg(mvline,std::ios::beg);
01988 
01989       //read by lines
01990       for(unsigned int j=0;j<this->loc_head.height;j++)
01991         {
01992           if(j>0)
01993             {
01994               //move to the next line of the bloc
01995               unsigned int mv = this->head.width - this->loc_head.width;
01996               f.seekg(mv*sizeof(T),std::ios::cur);
01997             }
01998 
01999           for(unsigned int k=0;k<this->loc_head.width;k++)
02000             {
02001               T val;
02002               unsigned int rank = R*this->border_head.width + j*this->border_head.width + k + R;
02003               f.read(reinterpret_cast<char*>(&val),sizeof(T));
02004               this->data[rank]=val;
02005             }
02006         }
02007     }
02008     //-------------------------------------------------------------------------------
02009     //! Get rank from a column number and a row number
02010     /*!
02011       \param column is the column number
02012       \param columns is the total number of columns
02013       \param row is the row number
02014       \return the rank
02015     */
02016     //-------------------------------------------------------------------------------
02017     unsigned int getRank(int c, int cs, int r)
02018     //-------------------------------------------------------------------------------
02019     {
02020       unsigned int res = r*cs+c;
02021       return res;
02022     }
02023     //-------------------------------------------------------------------------------
02024     //! Get ranks
02025     /*!
02026       Get the ranks of the 8 neighbors for MPI exchanges
02027       \return 
02028       if the value is -55555 then there is no neighbor in this direction
02029       if the value is >0 it represent the MPI rank concerned by the exchange
02030     */
02031     //-------------------------------------------------------------------------------
02032     void getNeighborRanks()
02033     //-------------------------------------------------------------------------------
02034     {
02035       for(int i=0;i<8;i++)
02036         mpi_ranks[i] = -55555;
02037       //up
02038       if((this->row-1)>=0)
02039         mpi_ranks[1] = getRank(this->col,this->cols,(this->row-1));
02040       //down
02041       if((this->row+1)<this->rows)
02042         mpi_ranks[5] = getRank(this->col,this->cols,(this->row+1));
02043       //right
02044       if((this->col+1)<this->cols)
02045         mpi_ranks[3] = getRank((this->col+1),this->cols,this->row);
02046       //left
02047       if((this->col-1)>=0)
02048         mpi_ranks[7] = getRank((this->col-1),this->cols,this->row);
02049       //up left
02050       if((this->col-1)>=0 && (this->row-1)>=0)
02051         mpi_ranks[0] = getRank((this->col-1),this->cols,(this->row-1));
02052       //up right
02053       if((this->col+1)<this->cols && (this->row-1)>=0)
02054         mpi_ranks[2] = getRank((this->col+1),this->cols,(this->row-1));
02055       //down left
02056       if((this->col-1)>=0 && (this->row+1)<this->rows)
02057         mpi_ranks[6] = getRank((this->col-1),this->cols,(this->row+1));
02058       //down right
02059       if((this->col+1)<this->cols && (this->row+1)<this->rows)
02060         mpi_ranks[4] = getRank((this->col+1),this->cols,(this->row+1));
02061     }
02062     //-------------------------------------------------------------------------------
02063     //! Get the right border to send to neighbor processors
02064     /*!
02065       \return list of values
02066     */
02067     //-------------------------------------------------------------------------------
02068     T * getRightBorderToSend()
02069     //-------------------------------------------------------------------------------
02070     {
02071       T * result = new T[this->loc_head.height*R];
02072       for(unsigned int i=0;i<this->loc_head.height;i++)
02073         {
02074           for(int j=0;j<R;j++)
02075             {
02076               unsigned int r1 = i*R+j;
02077               unsigned int r2 = ((R+1)*border_head.width - R*2) + i*border_head.width +j;
02078               result[r1] = this->data[r2];
02079             }
02080         }
02081       return result;
02082     }
02083     //-------------------------------------------------------------------------------
02084     //! Set the right border ghost cells with neighbor values from processors
02085     /*!
02086       \param values is the table values to fill ghost cells with
02087     */
02088     //-------------------------------------------------------------------------------
02089     void setRightBorder(T * values)
02090     //-------------------------------------------------------------------------------
02091     {
02092       for(unsigned int i=0;i<this->loc_head.height;i++)
02093         {
02094           for(int j=0;j<R;j++)
02095             this->data[(border_head.width*R + border_head.width - R) + i*border_head.width + j] = values[i*R+j];
02096         }
02097     }
02098     //-------------------------------------------------------------------------------
02099     //! Get the left border to send to neighbor processors
02100     /*!
02101       \return list of values
02102     */
02103     //-------------------------------------------------------------------------------
02104     T * getLeftBorderToSend()
02105     //-------------------------------------------------------------------------------
02106     {
02107       T * result = new T[this->loc_head.height*R];
02108       for(unsigned int i=0;i<this->loc_head.height;i++)
02109         {
02110           for(int j=0;j<R;j++)
02111             {
02112               unsigned int r1 = i*R+j;
02113               unsigned int r2 = (R*border_head.width + R) + i*border_head.width + j;
02114               result[r1] = this->data[r2];
02115             }
02116         }
02117       return result;
02118     }
02119     //-------------------------------------------------------------------------------
02120     //! Set the left border ghost cells with neighbor values from processors
02121     /*!
02122       \param values is the table values to fill ghost cells with
02123     */
02124     //-------------------------------------------------------------------------------
02125     void setLeftBorder(T * values)
02126     //-------------------------------------------------------------------------------
02127     {
02128       for(unsigned int i=0;i<this->loc_head.height;i++)
02129         {
02130           for(int j=0;j<R;j++)
02131             this->data[(border_head.width*R) + i*border_head.width + j] = values[i*R+j];
02132         }
02133     }
02134     //-------------------------------------------------------------------------------
02135     //! Get the up border to send to neighbor processors
02136     /*!
02137       \return list of values
02138     */
02139     //-------------------------------------------------------------------------------
02140     T * getUpBorderToSend()
02141     //-------------------------------------------------------------------------------
02142     {
02143       T * result = new T[this->loc_head.width*R];
02144       for(int i=0;i<R;i++)
02145         {
02146           for(unsigned int j=0;j<this->loc_head.width;j++)
02147             {
02148               unsigned int r1 = i*this->loc_head.width+j;
02149               unsigned int r2 = (R*border_head.width + R) + i*border_head.width+j;
02150               result[r1] = this->data[r2];
02151             }
02152         }
02153       return result;
02154     }
02155     //-------------------------------------------------------------------------------
02156     //! Set the up border ghost cells with neighbor values from processors
02157     /*!
02158       \param values is the table values to fill ghost cells with
02159     */
02160     //-------------------------------------------------------------------------------
02161     void setUpBorder(T * values)
02162     //-------------------------------------------------------------------------------
02163     {
02164       for(int i=0;i<R;i++)
02165         {
02166           for(unsigned int j=0;j<this->loc_head.width;j++)
02167             this->data[(R) + i*border_head.width+j] = values[i*this->loc_head.width+j];
02168         }
02169     }
02170     //-------------------------------------------------------------------------------
02171     //! Get the down border to send to neighbor processors
02172     /*!
02173       \return list of values
02174     */
02175     //-------------------------------------------------------------------------------
02176     T * getDownBorderToSend()
02177     //-------------------------------------------------------------------------------
02178     {
02179       T * result = new T[this->loc_head.width*R];
02180       for(int i=0;i<R;i++)
02181         {
02182           for(unsigned int j=0;j<this->loc_head.width;j++)
02183             {
02184               unsigned int r1 = i*this->loc_head.width+j;
02185               unsigned int r2 = (border_head.height*border_head.width - 2*R*border_head.width + R) + i*border_head.width + j;
02186               result[r1] = this->data[r2];
02187             }
02188         }
02189       return result;
02190     }
02191     //-------------------------------------------------------------------------------
02192     //! Set the down border ghost cells with neighbor values from processors
02193     /*!
02194       \param values is the table values to fill ghost cells with
02195     */
02196     //-------------------------------------------------------------------------------
02197     void setDownBorder(T * values)
02198     //-------------------------------------------------------------------------------
02199     {
02200       for(int i=0;i<R;i++)
02201         {
02202           for(unsigned int j=0;j<this->loc_head.width;j++)
02203             this->data[(border_head.width*border_head.height - border_head.width*R + R) + i*border_head.width +j] = values[i*this->loc_head.width+j];
02204         }
02205     }
02206     //-------------------------------------------------------------------------------
02207     //! Get the down right border to send to neighbor processors
02208     /*!
02209       \return list of values
02210     */
02211     //-------------------------------------------------------------------------------
02212     T * getDownRightBorderToSend()
02213     //-------------------------------------------------------------------------------
02214     {
02215       T * result = new T[R*R];
02216       for(int i=0;i<R;i++)
02217         {
02218           for(int j=0;j<R;j++)
02219             {
02220               unsigned int r1 = i*R+j;
02221               unsigned int r2 = (border_head.height*border_head.width - (2*R-1)*border_head.width - R*2) + i*border_head.width +j;
02222               result[r1] = this->data[r2];
02223             }
02224         }
02225       return result;
02226     }
02227     //-------------------------------------------------------------------------------
02228     //! Set the down right border ghost cells with neighbor values from processors
02229     /*!
02230       \param values is the table values to fill ghost cells with
02231     */
02232     //-------------------------------------------------------------------------------
02233     void setDownRightBorder(T * values)
02234     //-------------------------------------------------------------------------------
02235     {
02236       for(int i=0;i<R;i++)
02237         {
02238           for(int j=0;j<R;j++)
02239             this->data[(border_head.width*border_head.height - (R-1)*border_head.width - R) + i*border_head.width + j] = values[i*R+j];
02240         }
02241     }
02242     //-------------------------------------------------------------------------------
02243     //! Get the down left border to send to neighbor processors
02244     /*!
02245       \return list of values
02246     */
02247     //-------------------------------------------------------------------------------
02248     T * getDownLeftBorderToSend()
02249     //-------------------------------------------------------------------------------
02250     {
02251       T * result = new T[R*R];
02252       for(int i=0;i<R;i++)
02253         {
02254           for(int j=0;j<R;j++)
02255             {
02256               unsigned int r1 = i*R+j;
02257               unsigned int r2 = (border_head.height*border_head.width - 2*R*border_head.width + R) + i*border_head.width + j;
02258               result[r1] = this->data[r2];
02259             }
02260         }
02261       return result;
02262     }
02263     //-------------------------------------------------------------------------------
02264     //! Set the down left border ghost cells with neighbor values from processors
02265     /*!
02266       \param values is the table values to fill ghost cells with
02267     */
02268     //-------------------------------------------------------------------------------
02269     void setDownLeftBorder(T * values)
02270     //-------------------------------------------------------------------------------
02271     {
02272       for(int i=0;i<R;i++)
02273         {
02274           for(int j=0;j<R;j++)
02275             this->data[(border_head.width*border_head.height - border_head.width*R) + i*border_head.width +j] = values[i*R+j];
02276         }
02277     }
02278     //-------------------------------------------------------------------------------
02279     //! Get the up left border to send to neighbor processors
02280     /*!
02281       \return list of values
02282     */
02283     //-------------------------------------------------------------------------------
02284     T * getUpLeftBorderToSend()
02285     //-------------------------------------------------------------------------------
02286     {
02287       T * result = new T[R*R];
02288       for(int i=0;i<R;i++)
02289         {
02290           for(int j=0;j<R;j++)
02291             {
02292               unsigned int r1 = i*R+j;
02293               unsigned int r2 = (border_head.width*R + R) + i*border_head.width + j;
02294               result[r1] = this->data[r2];
02295             }
02296         }
02297       return result;
02298     }
02299     //-------------------------------------------------------------------------------
02300     //! Set the up left border ghost cells with neighbor values from processors
02301     /*!
02302       \param values is the table values to fill ghost cells with
02303     */
02304     //-------------------------------------------------------------------------------
02305     void setUpLeftBorder(T * values)
02306     //-------------------------------------------------------------------------------
02307     {
02308       for(int i=0;i<R;i++)
02309         {
02310           for(int j=0;j<R;j++)
02311             this->data[i*border_head.width+j] = values[i*R+j];
02312         }
02313     }
02314     //-------------------------------------------------------------------------------
02315     //! Get the up right border to send to neighbor processors
02316     /*!
02317       \return list of values
02318     */
02319     //-------------------------------------------------------------------------------
02320     T * getUpRightBorderToSend()
02321     //-------------------------------------------------------------------------------
02322     {
02323       T * result = new T[R*R];
02324       for(int i=0;i<R;i++)
02325         {
02326           for(int j=0;j<R;j++)
02327             {
02328               unsigned int r1 = i*R+j;
02329               unsigned int r2 = ((R+1)*border_head.width - R*2) + i*border_head.width +j;
02330               result[r1] = this->data[r2];
02331             }
02332         }
02333       return result;
02334     }
02335     //-------------------------------------------------------------------------------
02336     //! Set the up right border ghost cells with neighbor values from processors
02337     /*!
02338       \param values is the table values to fill ghost cells with
02339     */
02340     //-------------------------------------------------------------------------------
02341     void setUpRightBorder(T * values)
02342     //-------------------------------------------------------------------------------
02343     {
02344       for(int i=0;i<R;i++)
02345         {
02346           for(int j=0;j<R;j++)
02347             this->data[(border_head.width-R) + i*border_head.width+j] = values[i*R+j];
02348         }
02349     }
02350     //-------------------------------------------------------------------------------
02351     //! Get all neighbor right values for the element to a precise rank
02352     /*!
02353       \param rank is the rank at which the neighbor values are wanted
02354       \return table of neighbor values
02355     */
02356     //-------------------------------------------------------------------------------
02357     T * allRight(unsigned int rank)
02358     //-------------------------------------------------------------------------------
02359     {
02360       T * res = new T[R];
02361       unsigned int r = rank+1;
02362       for(int i=0;i<R;i++)
02363         {
02364           res[i] = this->data[r];
02365           r++;
02366         }
02367       return res;
02368     }
02369     //-------------------------------------------------------------------------------
02370     //! Get all neighbor left values for the element to a precise rank
02371     /*!
02372       \param rank is the rank at which the neighbor values are wanted
02373       \return table of neighbor values
02374     */
02375     //-------------------------------------------------------------------------------
02376     T * allLeft(unsigned int rank)
02377     //-------------------------------------------------------------------------------
02378     {
02379       T * res = new T[R];
02380       unsigned int r = rank-1;
02381       for(int i=0;i<R;i++)
02382         {
02383           res[i] = this->data[r];
02384           r--;
02385         }
02386       return res;
02387     }
02388     //-------------------------------------------------------------------------------
02389     //! Get all neighbor up values for the element to a precise rank
02390     /*!
02391       \param rank is the rank at which the neighbor values are wanted
02392       \return table of neighbor values
02393     */
02394     //-------------------------------------------------------------------------------
02395     T * allUp(unsigned int rank,unsigned int width)
02396     //-------------------------------------------------------------------------------
02397     {
02398       T * res = new T[R];
02399       unsigned int r = rank - width;
02400       for(int i=0;i<R;i++)
02401         {
02402           res[i] = this->data[r];
02403           r = r - width;
02404         }
02405       return res;
02406     }
02407     //-------------------------------------------------------------------------------
02408     //! Get all neighbor down values for the element to a precise rank
02409     /*!
02410       \param rank is the rank at which the neighbor values are wanted
02411       \return table of neighbor values
02412     */
02413     //-------------------------------------------------------------------------------
02414     T * allDown(unsigned int rank,unsigned int width)
02415     //-------------------------------------------------------------------------------
02416     {
02417       T * res = new T[R];
02418       unsigned int r = rank + width;
02419       for(int i=0;i<R;i++)
02420         {
02421           res[i] = this->data[r];
02422           r = r + width;
02423         }
02424       return res;
02425     }
02426     //-------------------------------------------------------------------------------
02427     //! Get all neighbor right down values for the element to a precise rank
02428     /*!
02429       \param rank is the rank at which the neighbor values are wanted
02430       \return table of neighbor values
02431     */
02432     //-------------------------------------------------------------------------------
02433     T * allRightDown(unsigned int rank,unsigned int width)
02434     //-------------------------------------------------------------------------------
02435     {
02436       T * res = new T[R];
02437       unsigned int r = rank + width + 1;
02438       for(int i=0;i<R;i++)
02439         {
02440           res[i] = this->data[r];
02441           r = r + width + 1;
02442         }
02443       return res;
02444     }
02445     //-------------------------------------------------------------------------------
02446     //! Get all neighbor left down values for the element to a precise rank
02447     /*!
02448       \param rank is the rank at which the neighbor values are wanted
02449       \return table of neighbor values
02450     */
02451     //-------------------------------------------------------------------------------
02452     T * allLeftDown(unsigned int rank,unsigned int width)
02453     //-------------------------------------------------------------------------------
02454     {
02455       T * res = new T[R];
02456       unsigned int r = rank + width -1 ;
02457       for(int i=0;i<R;i++)
02458         {
02459           res[i] = this->data[r];
02460           r = r + width -1 ;
02461         }
02462       return res;
02463     }
02464     //-------------------------------------------------------------------------------
02465     //! Get all neighbor right up values for the element to a precise rank
02466     /*!
02467       \param rank is the rank at which the neighbor values are wanted
02468       \return table of neighbor values
02469     */
02470     //-------------------------------------------------------------------------------
02471     T * allRightUp(unsigned int rank,unsigned int width)
02472     //-------------------------------------------------------------------------------
02473     {
02474       T * res = new T[R];
02475       unsigned int r = rank - width + 1;
02476       for(int i=0;i<R;i++)
02477         {
02478           res[i] = this->data[r];
02479           r = r - width + 1;
02480         }
02481       return res;
02482     }
02483     //-------------------------------------------------------------------------------
02484     //! Get all neighbor left up values for the element to a precise rank
02485     /*!
02486       \param rank is the rank at which the neighbor values are wanted
02487       \return table of neighbor values
02488     */
02489     //-------------------------------------------------------------------------------
02490     T * allLeftUp(unsigned int rank,unsigned int width)
02491     //-------------------------------------------------------------------------------
02492     {
02493       T * res = new T[R];
02494       unsigned int r = rank - width - 1;
02495       for(int i=0;i<R;i++)
02496         {
02497           res[i] = this->data[r];
02498           r = r - width - 1;
02499         }
02500       return res;
02501     }
02502     //-------------------------------------------------------------------------------
02503     //! Get the nearest neighbor right value for the element to a precise rank
02504     /*!
02505       \param rank is the rank at which the neighbor value is wanted
02506       \return the neighbor value
02507     */
02508     //-------------------------------------------------------------------------------
02509     T right(unsigned int rank)
02510     //-------------------------------------------------------------------------------
02511     {
02512       unsigned int r = rank + 1;
02513       return this->data[r];
02514     }
02515     //-------------------------------------------------------------------------------
02516     //! Get the nearest neighbor left value for the element to a precise rank
02517     /*!
02518       \param rank is the rank at which the neighbor value is wanted
02519       \return the neighbor value
02520     */
02521     //-------------------------------------------------------------------------------
02522     T left(unsigned int rank)
02523     //-------------------------------------------------------------------------------
02524     {
02525       unsigned int r = rank - 1;
02526       return this->data[r];
02527     }
02528     //-------------------------------------------------------------------------------
02529     //! Get the nearest neighbor up value for the element to a precise rank
02530     /*!
02531       \param rank is the rank at which the neighbor value is wanted
02532       \return the neighbor value
02533     */
02534     //-------------------------------------------------------------------------------
02535     T up(unsigned int rank,unsigned int width)
02536     //-------------------------------------------------------------------------------
02537     {
02538       unsigned int r = rank - width;
02539       return this->data[r];
02540     }//-------------------------------------------------------------------------------
02541     //! Get the nearest neighbor down value for the element to a precise rank
02542     /*!
02543       \param rank is the rank at which the neighbor value is wanted
02544       \return the neighbor value
02545     */
02546     //-------------------------------------------------------------------------------
02547     T down(unsigned int rank,unsigned int width)
02548     //-------------------------------------------------------------------------------
02549     {
02550       unsigned int r = rank + width;
02551       return this->data[r];
02552     }
02553     //-------------------------------------------------------------------------------
02554     //! Get the nearest neighbor right down value for the element to a precise rank
02555     /*!
02556       \param rank is the rank at which the neighbor value is wanted
02557       \return the neighbor value
02558     */
02559     //-------------------------------------------------------------------------------
02560     T rightDown(unsigned int rank,unsigned int width)
02561     //-------------------------------------------------------------------------------
02562     {
02563       unsigned int r = rank + width +1;
02564       return this->data[r];
02565     }
02566     //-------------------------------------------------------------------------------
02567     //! Get the nearest neighbor left down value for the element to a precise rank
02568     /*!
02569       \param rank is the rank at which the neighbor value is wanted
02570       \return the neighbor value
02571     */
02572     //-------------------------------------------------------------------------------
02573     T leftDown(unsigned int rank,unsigned int width)
02574     //-------------------------------------------------------------------------------
02575     {
02576       unsigned int r = rank + width -1;
02577       return this->data[r];
02578     }
02579     //-------------------------------------------------------------------------------
02580     //! Get the nearest neighbor right up value for the element to a precise rank
02581     /*!
02582       \param rank is the rank at which the neighbor value is wanted
02583       \return the neighbor value
02584     */
02585     //-------------------------------------------------------------------------------
02586     T rightUp(unsigned int rank,unsigned int width)
02587     //-------------------------------------------------------------------------------
02588     {
02589       unsigned int r = rank - width +1;
02590       return this->data[r];
02591     }
02592     //-------------------------------------------------------------------------------
02593     //! Get the nearest neighbor left up value for the element to a precise rank
02594     /*!
02595       \param rank is the rank at which the neighbor value is wanted
02596       \return the neighbor value
02597     */
02598     //-------------------------------------------------------------------------------
02599     T leftUp(unsigned int rank,unsigned int width)
02600     //-------------------------------------------------------------------------------
02601     {
02602       unsigned int r = rank - width -1;
02603       return this->data[r];
02604     }
02605     //-------------------------------------------------------------------------------
02606     //! Get all values along X axe for the element to a precise rank
02607     /*!
02608       Equivalent to allLeft and allRight with a single call.
02609       \param rank is the rank at which the neighbor value is wanted
02610       \return a table of all neighbor values along X
02611     */
02612     //-------------------------------------------------------------------------------
02613     T * allX(unsigned int rank)
02614     //-------------------------------------------------------------------------------
02615     {
02616       T * res = new T[R*2];
02617       unsigned int b = rank - R;
02618       for(int i=0;i<R;i++)
02619         {
02620           res[i] = this->data[b];
02621           b++;
02622         }
02623       b++;
02624       for(int i=R;i<2*R;i++)
02625         {
02626           res[i] = this->data[b];
02627           b++;
02628         }
02629       return res;
02630     }
02631     //-------------------------------------------------------------------------------
02632     //! Get all values along X axe for the element to a precise rank
02633     /*!
02634       Equivalent to allLeft and allRight with a single call.
02635       \param rank is the rank at which the neighbor value is wanted
02636       \param result table of all neighbor values along X
02637     */
02638     //-------------------------------------------------------------------------------
02639     void allX(unsigned int rank, T * t)
02640     //-------------------------------------------------------------------------------
02641     {
02642       unsigned int b = rank - R;
02643       for(int i=0;i<R;i++)
02644         {
02645           t[i] = this->data[b];
02646           b++;
02647         }
02648       b++;
02649       for(int i=R;i<2*R;i++)
02650         {
02651           t[i] = this->data[b];
02652           b++;
02653         }
02654     }
02655     //-------------------------------------------------------------------------------
02656     //! Get all values along Y axe for the element to a precise rank
02657     /*!
02658       Equivalent to allDown and allUp with a single call.
02659       \param rank is the rank at which the neighbor value is wanted
02660       \return a table of all neighbor values along X
02661     */
02662     //-------------------------------------------------------------------------------
02663     T * allY(unsigned int rank,unsigned int width)
02664     //-------------------------------------------------------------------------------
02665     {
02666       T * res = new T[R*2];
02667       unsigned int b = rank - R*width;
02668 
02669       for(int i=0;i<R;i++)
02670         {
02671           res[i] = this->data[b];
02672           b = b + width;
02673         }
02674       b = b + width;
02675       for(int i=R;i<2*R;i++)
02676         {
02677           res[i] = this->data[b];
02678           b = b + width;
02679         }
02680       return res;
02681     }
02682     //-------------------------------------------------------------------------------
02683     //! Get all values along Y axe for the element to a precise rank
02684     /*!
02685       Equivalent to allDown and allUp with a single call.
02686       \param rank is the rank at which the neighbor value is wanted
02687       \param result table of all neighbor values along X
02688     */
02689     //-------------------------------------------------------------------------------
02690     void allY(unsigned int rank,unsigned int width, T * t)
02691     //-------------------------------------------------------------------------------
02692     {
02693       unsigned int b = rank - R*width;
02694 
02695       for(int i=0;i<R;i++)
02696         {
02697           t[i] = this->data[b];
02698           b = b + width;
02699         }
02700       b = b + width;
02701       for(int i=R;i<2*R;i++)
02702         {
02703           t[i] = this->data[b];
02704           b = b + width;
02705         }
02706     }
02707     //-------------------------------------------------------------------------------
02708     //! Get all 8 directions neighbor values for the element to a precise rank
02709     /*!
02710       Equivalent to leftUp, up, rightUp, right, rightDown, down, leftDown and left with a single call.
02711       \param rank is the rank at which the neighbor value is wanted
02712       \return a table of all 8 directions neighbor values
02713     */
02714     //-------------------------------------------------------------------------------
02715     T * eight(unsigned int rank,unsigned int width)
02716     //-------------------------------------------------------------------------------
02717     {
02718       T * res = new T[8];
02719       res[0] = leftUp(rank,width);
02720       res[1] = up(rank,width);
02721       res[2] = rightUp(rank,width);
02722       res[3] = right(rank);
02723       res[4] = rightDown(rank,width);
02724       res[5] = down(rank,width);
02725       res[6] = leftDown(rank,width);
02726       res[7] = left(rank);
02727       return res;
02728     }
02729     //-------------------------------------------------------------------------------
02730     //! Get all 4 directions neighbor values for the element to a precise rank
02731     /*!
02732       Equivalent to up, right, down and left with a single call.
02733       \param rank is the rank at which the neighbor value is wanted
02734       \return a table of all 8 directions neighbor values
02735     */
02736     //-------------------------------------------------------------------------------
02737     T * four(unsigned int rank,unsigned int width)
02738     //-------------------------------------------------------------------------------
02739     {
02740       T * res = new T[8];
02741       res[0] = up(rank,width);
02742       res[1] = right(rank);
02743       res[2] = down(rank,width);
02744       res[3] = left(rank);
02745       return res;
02746     }
02747     //-------------------------------------------------------------------------------
02748     //! Get the nearest right neighbor value inside the domain to a precise rank
02749     /*!
02750       This work for every size of R and will always give the nearest right inside-domain value.
02751       \param rank is the rank at which the neighbor value is wanted
02752       \return the neighbor value
02753     */
02754     //-------------------------------------------------------------------------------
02755     T inright(unsigned int rank)
02756     //-------------------------------------------------------------------------------
02757     {
02758       unsigned int r = rank + R;
02759       return this->data[r];
02760     }
02761     //-------------------------------------------------------------------------------
02762     //! Get the nearest left neighbor value inside the domain to a precise rank
02763     /*!
02764       This work for every size of R and will always give the nearest left inside-domain value.
02765       \param rank is the rank at which the neighbor value is wanted
02766       \return the neighbor value
02767     */
02768     //-------------------------------------------------------------------------------
02769     T inleft(unsigned int rank)
02770     //-------------------------------------------------------------------------------
02771     {
02772       unsigned int r = rank - R;
02773       return this->data[r];
02774     }
02775     //-------------------------------------------------------------------------------
02776     //! Get the nearest up neighbor value inside the domain to a precise rank
02777     /*!
02778       This work for every size of R and will always give the nearest up inside-domain value.
02779       \param rank is the rank at which the neighbor value is wanted
02780       \return the neighbor value
02781     */
02782     //-------------------------------------------------------------------------------
02783     T inup(unsigned int rank,unsigned int width)
02784     //-------------------------------------------------------------------------------
02785     {
02786       unsigned int r = rank - R*width;
02787       return this->data[r];
02788     }
02789     //-------------------------------------------------------------------------------
02790     //! Get the nearest down neighbor value inside the domain to a precise rank
02791     /*!
02792       This work for every size of R and will always give the nearest down inside-domain value.
02793       \param rank is the rank at which the neighbor value is wanted
02794       \return the neighbor value
02795     */
02796     //-------------------------------------------------------------------------------
02797     T indown(unsigned int rank,unsigned int width)
02798     //-------------------------------------------------------------------------------
02799     {
02800       unsigned int r = rank + R*width;
02801       return this->data[r];
02802     }
02803     //-------------------------------------------------------------------------------
02804     //! Get the indexes associated to a precise rank
02805     /*!
02806       \param rank is the rank at which the indexes values are wanted
02807       \param col is the column return value
02808       \param li is the line return value
02809     */
02810     //-------------------------------------------------------------------------------
02811     void indexesIt(unsigned int rank,int &col,int &li)
02812     //-------------------------------------------------------------------------------
02813     {
02814       unsigned int relative_rank = rank - R*border_head.width - (floor(rank/border_head.width)-R)*R*2 -R;
02815       li = floor(relative_rank/this->loc_head.width);
02816       col = relative_rank%this->loc_head.width;
02817     }
02818     //-------------------------------------------------------------------------------
02819 
02820   public:
02821     //! default constructor of the distributed matrix interface
02822     //-------------------------------------------------------------------------------
02823     DMatrix_impl() : DMatrix_base<T>(){}
02824     //-------------------------------------------------------------------------------
02825     //! constructor of the distributed matrix
02826     /*!
02827       This constructor will simply allocate the good space.
02828       \param h is the header to use to build your matrix
02829       \param value is the default value to put in the matrix
02830       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
02831     */
02832     //-------------------------------------------------------------------------------
02833     DMatrix_impl(HEADER h,const T value, bool loc=false) : DMatrix_base<T>()
02834     //-------------------------------------------------------------------------------
02835     {
02836       this->head=h;
02837       this->local=loc;
02838       create();
02839       initValues(value);
02840       getNeighborRanks();
02841       phBLimits();
02842     }
02843     //-------------------------------------------------------------------------------
02844     //! constructor of the distributed matrix
02845     /*!
02846       The constructor evaluate how to divide the matrix in submatrice and do the work.
02847       \param binFile is the path of the binary file you want to work on
02848       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
02849     */
02850     //-------------------------------------------------------------------------------
02851     DMatrix_impl(const char * binFile, bool loc=false) : DMatrix_base<T>()
02852     //-------------------------------------------------------------------------------
02853     {
02854       this->head = Header::read(binFile);
02855       this->local=loc;
02856       create();
02857       initValues(0);
02858       read(binFile);
02859       getNeighborRanks();
02860       phBLimits();
02861     }
02862     //-------------------------------------------------------------------------------
02863     //! destructor of the distributed matrix
02864     //-------------------------------------------------------------------------------
02865     ~DMatrix_impl()
02866     //-------------------------------------------------------------------------------
02867     {
02868       if(NULL!=this->data)
02869         delete [] this->data;
02870     }
02871     //-------------------------------------------------------------------------------
02872     //! Set value in the global middle
02873     /*!
02874       \param val is the value to affect
02875     */
02876     //-------------------------------------------------------------------------------
02877     void setGlobalMiddleValue(T val)
02878     //-------------------------------------------------------------------------------
02879     {
02880       if((this->col!=this->cols-1 && this->row!=this->rows-1) ||
02881          (this->cols==1 && this->row!=this->rows-1) ||
02882          (this->col!=this->cols-1 && this->rows==1))
02883         {
02884           unsigned int i = floor(this->head.height/2)-1;
02885           unsigned int j = floor(this->head.width/2)-1;
02886           if(this->col*this->loc_head.width<=j && 
02887              (this->col+1)*this->loc_head.width>j && 
02888              this->row*this->loc_head.height<=i && 
02889              (this->row+1)*this->loc_head.height>i)
02890             {
02891               unsigned int j_loc = j-this->col*this->loc_head.width;
02892               unsigned int i_loc = i-this->row*this->loc_head.height;
02893               unsigned int rank = R*border_head.width +  i_loc*border_head.width + j_loc + R;
02894               this->data[rank] = val;
02895             }
02896         }
02897     }
02898     //-------------------------------------------------------------------------------
02899     //! to set all the values of the physical border of the matrix
02900     /*!
02901       \param val is the value to set
02902     */
02903     //-------------------------------------------------------------------------------
02904     inline void setPhysicalBorder(T val)
02905     {
02906       iterator_phb_left<T,R> left =begin_phb_left();
02907       iterator_phb_left<T,R> leftEnd = end_phb_left();
02908       for(left;left<leftEnd;++left)
02909         this->data[left._rank] = val;
02910       iterator_phb_right<T,R> right =begin_phb_right();
02911       iterator_phb_right<T,R> rightEnd = end_phb_right();
02912       for(right;right<rightEnd;++right)
02913         this->data[right._rank] = val;
02914       iterator_phb_up<T,R> up =begin_phb_up();
02915       iterator_phb_up<T,R> upEnd = end_phb_up();
02916       for(up;up<upEnd;++up)
02917         this->data[up._rank] = val;
02918       iterator_phb_down<T,R> down =begin_phb_down();
02919       iterator_phb_down<T,R> downEnd = end_phb_down();
02920       for(down;down<downEnd;++down)
02921         this->data[down._rank] = val;
02922     }
02923     //-------------------------------------------------------------------------------
02924     //! to set all the values on the right of physical border of the matrix
02925     /*!
02926       \param val is the value to set
02927     */
02928     //-------------------------------------------------------------------------------
02929     inline void setRightPhysicalBorder(T val)
02930     {
02931       iterator_phb_right<T,R> it =begin_phb_right();
02932       iterator_phb_right<T,R> itEnd = end_phb_right();
02933       for(it;it<itEnd;++it)
02934         this->data[it._rank] = val;
02935     }
02936     //-------------------------------------------------------------------------------
02937     //! to set all the values on the left of physical border of the matrix
02938     /*!
02939       \param val is the value to set
02940     */
02941     //-------------------------------------------------------------------------------
02942     inline void setLeftPhysicalBorder(T val)
02943     {
02944       iterator_phb_left<T,R> it =begin_phb_left();
02945       iterator_phb_left<T,R> itEnd = end_phb_left();
02946       for(it;it<itEnd;++it)
02947         this->data[it._rank] = val;
02948     }
02949     //-------------------------------------------------------------------------------
02950     //! to set all the values on the up of physical border of the matrix
02951     /*!
02952       \param val is the value to set
02953     */
02954     //-------------------------------------------------------------------------------
02955     inline void setUpPhysicalBorder(T val)
02956     {
02957       iterator_phb_up<T,R> it =begin_phb_up();
02958       iterator_phb_up<T,R> itEnd = end_phb_up();
02959       for(it;it<itEnd;++it)
02960         this->data[it._rank] = val;
02961     }
02962     //-------------------------------------------------------------------------------
02963     //! to set all the values on the down of physical border of the matrix
02964     /*!
02965       \param val is the value to set
02966     */
02967     //-------------------------------------------------------------------------------
02968     void setDownPhysicalBorder(T val)
02969     {
02970       iterator_phb_down<T,R> it =begin_phb_down();
02971       iterator_phb_down<T,R> itEnd = end_phb_down();
02972       for(it;it<itEnd;++it)
02973         this->data[it._rank] = val;
02974     }
02975     //-------------------------------------------------------------------------------
02976     //! Get the beginning iterator of the DMatrix (on the first element of the DMatrix)
02977     /*!
02978       This method exists for each type of iterator in SkelGIS.
02979       \return the iterator of the first element
02980     */
02981     //-------------------------------------------------------------------------------
02982     inline iterator<T,R> begin(){return iterator<T,R>(R*this->border_head.width+R,this->border_head.width);}
02983     inline iterator_cont<T,R> begin_cont(){return iterator_cont<T,R>(R*this->border_head.width+R,this->border_head.width);}
02984     inline iterator_rev<T,R> begin_rev(){return iterator_rev<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
02985     inline iterator_step<T,R> begin_step(int step,int nb){return iterator_step<T,R>(R*this->border_head.width+R,this->border_head.width,step,nb);}
02986     inline iterator_phb_left<T,R> begin_phb_left(){return iterator_phb_left<T,R>(beginPhBLeft,this->border_head.width);}
02987     inline iterator_phb_right<T,R> begin_phb_right(){return iterator_phb_right<T,R>(beginPhBRight,this->border_head.width);}
02988     inline iterator_phb_up<T,R> begin_phb_up(){return iterator_phb_up<T,R>(beginPhBUp,this->border_head.width);}
02989     inline iterator_phb_down<T,R> begin_phb_down(){return iterator_phb_down<T,R>(beginPhBDown,this->border_head.width);}
02990     inline iterator_line<T,R> begin_line(){return iterator_line<T,R>(R*this->border_head.width+R,this->border_head.width);}
02991     //-------------------------------------------------------------------------------
02992     //! Get the ending iterator of the DMatrix (on the last element of the DMatrix)
02993     /*!
02994       This method exists for each type of iterator in SkelGIS.
02995       \return the iterator of the last element
02996     */
02997     //-------------------------------------------------------------------------------
02998     inline iterator<T,R> end(){return iterator<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
02999     inline iterator_cont<T,R> end_cont(){return iterator_cont<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
03000     inline iterator_rev<T,R> end_rev(){return iterator_rev<T,R>(R*this->border_head.width+R-1,this->border_head.width);}
03001     inline iterator_step<T,R> end_step(){return iterator_step<T,R>((this->border_head.width*this->border_head.height)-1-R*this->border_head.width-R,this->border_head.width);}
03002     inline iterator_phb_left<T,R> end_phb_left(){return iterator_phb_left<T,R>(endPhBLeft,this->border_head.width);}
03003     inline iterator_phb_right<T,R> end_phb_right(){return iterator_phb_right<T,R>(endPhBRight,this->border_head.width);}
03004     inline iterator_phb_up<T,R> end_phb_up(){return iterator_phb_up<T,R>(endPhBUp,this->border_head.width);}
03005     inline iterator_phb_down<T,R> end_phb_down(){return iterator_phb_down<T,R>(endPhBDown,this->border_head.width);}
03006     inline iterator_line<T,R> end_line(){return iterator_line<T,R>((this->border_head.width*this->border_head.height)-(R+1)*this->border_head.width+R,this->border_head.width);}
03007     //-------------------------------------------------------------------------------
03008     //! get the iterator on the matrix at position (x,y)
03009     /*!
03010       \param x is row index
03011       \pram y is column index
03012       \return returns the iterator at position (x,y)
03013     */
03014     iterator<T,R> getIterator(int col,int li)
03015     //-------------------------------------------------------------------------------
03016     {
03017       if(NULL!=this->data)
03018         {
03019           unsigned int rank = R*border_head.width + R + li*border_head.width+col;
03020           return iterator<T,R>(rank,this->border_head.width);
03021         }
03022     }
03023     //-------------------------------------------------------------------------------
03024     //! get the iterator contiguous on the matrix at position (x,y)
03025     /*!
03026       \param x is row index
03027       \pram y is column index
03028       \return returns the iterator at position (x,y)
03029     */
03030     iterator_cont<T,R> getIterator_cont(int col,int li)
03031     //-------------------------------------------------------------------------------
03032     {
03033       if(NULL!=this->data)
03034         {
03035           unsigned int rank = R*border_head.width + R + li*border_head.width+col;
03036           return iterator_cont<T,R>(rank,this->border_head.width);
03037         }
03038     }
03039     //-------------------------------------------------------------------------------
03040     //! get the iterator reverse on the matrix at position (x,y)
03041     /*!
03042       \param x is row index
03043       \pram y is column index
03044       \return returns the iterator at position (x,y)
03045     */
03046     iterator_rev<T,R> getIterator_rev(int col,int li)
03047     //-------------------------------------------------------------------------------
03048     {
03049       if(NULL!=this->data)
03050         {
03051           unsigned int rank = R*border_head.width + R + li*border_head.width+col;
03052           return iterator_rev<T,R>(rank,this->border_head.width);
03053         }
03054     }
03055     //-------------------------------------------------------------------------------
03056     //! get the indexes of the iterator on the matrix
03057     /*!
03058       \param it is the iterator from which the indexes will be taken
03059       \param x is the iterator position x returned
03060       \param y is the iterator position y returned
03061     */
03062     void getIndexes(iterator<T,R> it,int &col,int &li){indexesIt(it._rank,col,li);}
03063     void getIndexes(iterator_cont<T,R> it,int &col,int &li){indexesIt(it._rank,col,li);}
03064     void getIndexes(iterator_rev<T,R> it,int &col,int &li){indexesIt(it._rank,col,li);}
03065     //-------------------------------------------------------------------------------
03066     //! Print the matrix by bloc
03067     //-------------------------------------------------------------------------------
03068     void print()
03069     //-------------------------------------------------------------------------------
03070     {
03071       std::stringstream st;
03072       for(unsigned int i=0;i<this->loc_head.height;i++)
03073         {
03074           for(unsigned int j=0;j<this->loc_head.width;j++)
03075             {
03076               st<<this->data[R*this->border_head.width + R + i*this->border_head.width+j]<<" ";
03077             }
03078           st<<"\n";
03079         }
03080       Mpi_::print(st.str());
03081     }
03082     //-------------------------------------------------------------------------------
03083     //! Print all the matrix by bloc
03084     //-------------------------------------------------------------------------------
03085     void printAll()
03086     //-------------------------------------------------------------------------------
03087     {
03088       std::stringstream st;
03089       for(unsigned int i=0;i<this->border_head.height;i++)
03090         {
03091           for(unsigned int j=0;j<this->border_head.width;j++)
03092             {
03093               st<<this->data[i*this->border_head.width+j]<<" ";
03094             }
03095           st<<"\n";
03096         }
03097       Mpi_::print(st.str());
03098     }
03099     //-------------------------------------------------------------------------------
03100     //! Write the zone file concerned for the current MPI process
03101     /*!
03102       write the MPI prcess zone, and spread the data in the subMatrices OpenMP
03103       only one submatrice if OpenMP is disabled
03104       \param binFile is binary file to write in
03105     */
03106     //-------------------------------------------------------------------------------
03107     void write(char * binFile)
03108     //-------------------------------------------------------------------------------
03109     {
03110       MPI_File fh;
03111       MPI_Status status;
03112       //open the output file
03113       MPI_File_open(MPI_COMM_WORLD, binFile,
03114                 MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
03115    
03116       // process 0 writes the HEADER of the file
03117       if (Mpi_::mpi_rank==0)
03118         MPI_File_write_at(fh, 0,reinterpret_cast<char*>(&this->head) ,sizeof(HEADER) , MPI_CHAR, &status);
03119       //the begining of the bloc of write in
03120       unsigned int mvline;
03121       // number of bytes between 2 blocks
03122       unsigned int step;
03123       
03124       // the starting point of the row where the DMatrix is saved
03125       if (this->row<this->remainderh)
03126         mvline=sizeof(HEADER)+sizeof(T)*this->head.width*this->loc_head.height*this->row;
03127       else
03128         mvline=sizeof(HEADER)+sizeof(T)*this->head.width*(this->loc_head.height*this->row+this->remainderh);
03129       
03130       // now we must add the number of bytes corresponding to the column of the processor
03131       if (this->col<this->remainderw)
03132         mvline+=sizeof(T)*this->loc_head.width*this->col;
03133       else
03134         mvline+=sizeof(T)*(this->loc_head.width*this->col+this->remainderw);
03135 
03136       // pointer in the data of the dmatrix
03137       T *data_pointer=this->data+R*(this->border_head.width+1);
03138 
03139       //write by lines
03140       for(unsigned int j=0;j<this->loc_head.height;j++)
03141         {
03142           std::stringstream st1;
03143           st1<<"mvline :"<<mvline<<" loc head width "<<this->loc_head.width<<" loc head height "
03144              <<this->loc_head.height << " head width "<<this->head.width<<" head height "<<this->head.height
03145              <<" row "<<this->row<<" col "<<this->col<<"\n";
03146           Mpi_::print(st1.str());
03147           MPI_File_write_at(fh, mvline,reinterpret_cast<char*>(data_pointer) ,this->loc_head.width*sizeof(T) , MPI_CHAR, &status);
03148           data_pointer+=this->border_head.width;
03149           mvline+=this->head.width*sizeof(T);
03150         }
03151       MPI_File_close(&fh);
03152     }
03153     //-------------------------------------------------------------------------------
03154     //! get borders
03155     /*!
03156       Get the borders from other processors
03157       MPI exchanges
03158     */
03159     //-------------------------------------------------------------------------------
03160     void getBorders()
03161     //-------------------------------------------------------------------------------
03162     {     
03163       if(this->col!=0 && this->row!=0)
03164         {
03165           T * toSend = getUpLeftBorderToSend();
03166           T * toGet = new T[R*R];
03167           Communications<T>::Exchanges(toSend,toGet,R*R,mpi_ranks[0]);
03168           setUpLeftBorder(toGet);
03169           delete [] toSend;
03170           delete [] toGet;
03171         }
03172       if(this->col!=(this->cols-1) && this->row!=(this->rows-1))
03173         {
03174           T * toSend = getDownRightBorderToSend();
03175           T * toGet = new T[R*R];
03176           Communications<T>::Exchanges(toSend,toGet,R*R,mpi_ranks[4]);
03177           setDownRightBorder(toGet);
03178           delete [] toSend;
03179           delete [] toGet;
03180         }
03181       if(this->row!=0)
03182         {
03183           T * toSend = getUpBorderToSend();
03184           T * toGet = new T[this->loc_head.width*R];
03185           Communications<T>::Exchanges(toSend,toGet,this->loc_head.width*R,mpi_ranks[1]);
03186           setUpBorder(toGet);
03187           delete [] toSend;
03188           delete [] toGet;
03189         }
03190       if(this->row!=(this->rows-1))
03191         {     
03192           T * toSend = getDownBorderToSend();
03193           T * toGet = new T[this->loc_head.width*R];
03194           Communications<T>::Exchanges(toSend,toGet,this->loc_head.width*R,mpi_ranks[5]);
03195           setDownBorder(toGet);
03196           delete [] toSend;
03197           delete [] toGet;
03198         }
03199       if(this->col!=(this->cols-1) && this->row!=0)
03200         {
03201           T * toSend = getUpRightBorderToSend();
03202           T * toGet = new T[R*R];
03203           Communications<T>::Exchanges(toSend,toGet,R*R,mpi_ranks[2]);
03204           setUpRightBorder(toGet);
03205           delete [] toSend;
03206           delete [] toGet;
03207         }
03208       if(this->row!=(this->rows-1) && this->col!=0)
03209         {
03210           T * toSend = getDownLeftBorderToSend();
03211           T * toGet = new T[R*R];
03212           Communications<T>::Exchanges(toSend,toGet,R*R,mpi_ranks[6]);
03213           setDownLeftBorder(toGet);
03214           delete [] toSend;
03215           delete [] toGet;
03216         }
03217           
03218       if(this->col!=(this->cols-1))
03219         {
03220           T * toSend = getRightBorderToSend();
03221           T * toGet = new T[this->loc_head.height*R];
03222           Communications<T>::Exchanges(toSend,toGet,this->loc_head.height*R,mpi_ranks[3]);
03223           setRightBorder(toGet);
03224           delete [] toSend;
03225           delete [] toGet;
03226         }
03227       if(this->col!=0)
03228         {
03229           T * toSend = getLeftBorderToSend();
03230           T * toGet = new T[this->loc_head.height*R];
03231           Communications<T>::Exchanges(toSend,toGet,this->loc_head.height*R,mpi_ranks[7]);
03232           setLeftBorder(toGet);
03233           delete [] toSend;
03234           delete [] toGet;
03235         }
03236     }
03237     //-------------------------------------------------------------------------------
03238     //! Get all neighbor right values for the element at iterator it
03239     /*!
03240       The returned table will contain R elements. 
03241       These elements are the R values on the right of the current it.
03242       This method exists for each type of iterator with right values in SkelGIS.
03243       \param it is the iterator position
03244       \return a table of right values
03245     */
03246     //-------------------------------------------------------------------------------
03247     inline T * getAllRight(iterator<T,R> it){return allRight(it._rank);}
03248     inline T * getAllRight(iterator_cont<T,R> it){return allRight(it._rank);}
03249     inline T * getAllRight(iterator_step<T,R> it){return allRight(it._rank);}
03250     inline T * getAllRight(iterator_rev<T,R> it){return allRight(it._rank);}
03251     inline T * getAllRight(iterator_phb_left<T,R> it){return allRight(it._rank);}
03252     //-------------------------------------------------------------------------------
03253     //! Get all neighbor left values for the element at iterator it
03254     /*!
03255       The returned table will contain R elements. 
03256       These elements are the R values on the left of the current it.
03257       This method exists for each type of iterator with left values in SkelGIS.
03258       \param it is the iterator position
03259       \return a table of left values
03260     */
03261     //-------------------------------------------------------------------------------
03262     inline T * getAllLeft(iterator<T,R> it){return allLeft(it._rank);}
03263     inline T * getAllLeft(iterator_cont<T,R> it){return allLeft(it._rank);}
03264     inline T * getAllLeft(iterator_step<T,R> it){return allLeft(it._rank);}
03265     inline T * getAllLeft(iterator_rev<T,R> it){return allLeft(it._rank);}
03266     inline T * getAllLeft(iterator_phb_right<T,R> it){return allLeft(it._rank);}
03267     //-------------------------------------------------------------------------------
03268     //! Get all neighbor up values for the element at iterator it
03269     /*!
03270       The returned table will contain R elements. 
03271       These elements are the R values on the up of the current it.
03272       This method exists for each type of iterator with up values in SkelGIS.
03273       \param it is the iterator position
03274       \return a table of up values
03275     */
03276     //-------------------------------------------------------------------------------
03277     inline T * getAllUp(iterator<T,R> it){return allUp(it._rank,it._width);}
03278     inline T * getAllUp(iterator_cont<T,R> it){return allUp(it._rank,it._width);}
03279     inline T * getAllUp(iterator_step<T,R> it){return allUp(it._rank,it._width);}
03280     inline T * getAllUp(iterator_rev<T,R> it){return allUp(it._rank,it._width);}
03281     inline T * getAllUp(iterator_phb_down<T,R> it){return allUp(it._rank,it._width);}
03282     //-------------------------------------------------------------------------------
03283     //! Get all neighbor down values for the element at iterator it
03284     /*!
03285       The returned table will contain R elements. 
03286       These elements are the R values on the down of the current it.
03287       This method exists for each type of iterator with down values in SkelGIS.
03288       \param it is the iterator position
03289       \return a table of down values
03290     */
03291     //-------------------------------------------------------------------------------
03292     inline T * getAllDown(iterator<T,R> it){return allDown(it._rank,it._width);}
03293     inline T * getAllDown(iterator_cont<T,R> it){return allDown(it._rank,it._width);}
03294     inline T * getAllDown(iterator_step<T,R> it){return allDown(it._rank,it._width);}
03295     inline T * getAllDown(iterator_rev<T,R> it){return allDown(it._rank,it._width);}
03296     inline T * getAllDown(iterator_phb_up<T,R> it){return allDown(it._rank,it._width);}
03297     //-------------------------------------------------------------------------------
03298     //! Get all neighbor right down values for the element at iterator it
03299     /*!
03300       The returned table will contain R elements. 
03301       These elements are the R values on the right down of the current it.
03302       This method exists for each type of iterator with right down values in SkelGIS.
03303       \param it is the iterator position
03304       \return a table of right down values
03305     */
03306     //-------------------------------------------------------------------------------
03307     inline T * getAllRightDown(iterator<T,R> it){return allRightDown(it._rank,it._width);}
03308     inline T * getAllRightDown(iterator_cont<T,R> it){return allRightDown(it._rank,it._width);}
03309     inline T * getAllRightDown(iterator_step<T,R> it){return allRightDown(it._rank,it._width);}
03310     inline T * getAllRightDown(iterator_rev<T,R> it){return allRightDown(it._rank,it._width);}
03311     //-------------------------------------------------------------------------------
03312     //! Get all neighbor left down values for the element at iterator it
03313     /*!
03314       The returned table will contain R elements. 
03315       These elements are the R values on the left down of the current it.
03316       This method exists for each type of iterator with left down values in SkelGIS.
03317       \param it is the iterator position
03318       \return a table of left down values
03319     */
03320     //-------------------------------------------------------------------------------
03321     inline T * getAllLeftDown(iterator<T,R> it){return allLeftDown(it._rank,it._width);}
03322     inline T * getAllLeftDown(iterator_cont<T,R> it){return allLeftDown(it._rank,it._width);}
03323     inline T * getAllLeftDown(iterator_step<T,R> it){return allLeftDown(it._rank,it._width);}
03324     inline T * getAllLeftDown(iterator_rev<T,R> it){return allLeftDown(it._rank,it._width);}
03325     //-------------------------------------------------------------------------------
03326     //! Get all neighbor right up values for the element at iterator it
03327     /*!
03328       The returned table will contain R elements. 
03329       These elements are the R values on the right up of the current it.
03330       This method exists for each type of iterator with right up values in SkelGIS.
03331       \param it is the iterator position
03332       \return a table of right up values
03333     */
03334     //-------------------------------------------------------------------------------
03335     inline T * getAllRightUp(iterator<T,R> it){return allRightUp(it._rank,it._width);}
03336     inline T * getAllRightUp(iterator_cont<T,R> it){return allRightUp(it._rank,it._width);}
03337     inline T * getAllRightUp(iterator_step<T,R> it){return allRightUp(it._rank,it._width);}
03338     inline T * getAllRightUp(iterator_rev<T,R> it){return allRightUp(it._rank,it._width);}
03339     //-------------------------------------------------------------------------------
03340     //! Get all neighbor left up values for the element at iterator it
03341     /*!
03342       The returned table will contain R elements. 
03343       These elements are the R values on the left up of the current it.
03344       This method exists for each type of iterator with left up values in SkelGIS.
03345       \param it is the iterator position
03346       \return a table of left up values
03347     */
03348     //-------------------------------------------------------------------------------
03349     inline T * getAllLeftUp(iterator<T,R> it){return allLeftUp(it._rank,it._width);}
03350     inline T * getAllLeftUp(iterator_cont<T,R> it){return allLeftUp(it._rank,it._width);}
03351     inline T * getAllLeftUp(iterator_step<T,R> it){return allLeftUp(it._rank,it._width);}
03352     inline T * getAllLeftUp(iterator_rev<T,R> it){return allLeftUp(it._rank,it._width);}
03353     //-------------------------------------------------------------------------------
03354     //! Get the nearest right neighbor value for the element at iterator it
03355     /*!
03356       This method exists for each type of iterator with right value in SkelGIS.
03357       \param it is the iterator position
03358       \return the right neighbor element value
03359     */
03360     //-------------------------------------------------------------------------------
03361     inline T getRight(iterator<T,R> it){return right(it._rank);}
03362     inline T getRight(iterator_cont<T,R> it){return right(it._rank);}
03363     inline T getRight(iterator_step<T,R> it){return right(it._rank);}
03364     inline T getRight(iterator_rev<T,R> it){return right(it._rank);}
03365     inline T getRight(iterator_phb_left<T,R> it){return right(it._rank);}
03366     //-------------------------------------------------------------------------------
03367     //! Get the nearest left neighbor value for the element at iterator it
03368     /*!
03369       This method exists for each type of iterator with left value in SkelGIS.
03370       \param it is the iterator position
03371       \return the left neighbor element value
03372     */
03373     //-------------------------------------------------------------------------------
03374     inline T getLeft(iterator<T,R> it){return left(it._rank);}
03375     inline T getLeft(iterator_cont<T,R> it){return left(it._rank);}
03376     inline T getLeft(iterator_step<T,R> it){return left(it._rank);}
03377     inline T getLeft(iterator_rev<T,R> it){return left(it._rank);}
03378     inline T getLeft(iterator_phb_right<T,R> it){return left(it._rank);}
03379     //-------------------------------------------------------------------------------
03380     //! Get the nearest up neighbor value for the element at iterator it
03381     /*!
03382       This method exists for each type of iterator with up value in SkelGIS.
03383       \param it is the iterator position
03384       \return the up neighbor element value
03385     */
03386     //-------------------------------------------------------------------------------
03387     inline T getUp(iterator<T,R> it){return up(it._rank,it._width);}
03388     inline T getUp(iterator_cont<T,R> it){return up(it._rank,it._width);}
03389     inline T getUp(iterator_step<T,R> it){return up(it._rank,it._width);}
03390     inline T getUp(iterator_rev<T,R> it){return up(it._rank,it._width);}
03391     inline T getUp(iterator_phb_down<T,R> it){return up(it._rank,it._width);}
03392     //-------------------------------------------------------------------------------
03393     //! Get the nearest down neighbor value for the element at iterator it
03394     /*!
03395       This method exists for each type of iterator with down value in SkelGIS.
03396       \param it is the iterator position
03397       \return the down neighbor element value
03398     */
03399     //-------------------------------------------------------------------------------
03400     inline T getDown(iterator<T,R> it){return down(it._rank,it._width);}
03401     inline T getDown(iterator_cont<T,R> it){return down(it._rank,it._width);}
03402     inline T getDown(iterator_step<T,R> it){return down(it._rank,it._width);}
03403     inline T getDown(iterator_rev<T,R> it){return down(it._rank,it._width);}
03404     inline T getDown(iterator_phb_up<T,R> it){return down(it._rank,it._width);}
03405     //-------------------------------------------------------------------------------
03406     //! Get the nearest right down neighbor value for the element at iterator it
03407     /*!
03408       This method exists for each type of iterator with right down value in SkelGIS.
03409       \param it is the iterator position
03410       \return the right down neighbor element value
03411     */
03412     //-------------------------------------------------------------------------------
03413     inline T getRightDown(iterator<T,R> it){return rightDown(it._rank,it._width);}
03414     inline T getRightDown(iterator_cont<T,R> it){return rightDown(it._rank,it._width);}
03415     inline T getRightDown(iterator_step<T,R> it){return rightDown(it._rank,it._width);}
03416     inline T getRightDown(iterator_rev<T,R> it){return rightDown(it._rank,it._width);}
03417     //-------------------------------------------------------------------------------
03418     //! Get the nearest left down neighbor value for the element at iterator it
03419     /*!
03420       This method exists for each type of iterator with left down value in SkelGIS.
03421       \param it is the iterator position
03422       \return the left down neighbor element value
03423     */
03424     //-------------------------------------------------------------------------------
03425     inline T getLeftDown(iterator<T,R> it){return leftDown(it._rank,it._width);}
03426     inline T getLeftDown(iterator_cont<T,R> it){return leftDown(it._rank,it._width);}
03427     inline T getLeftDown(iterator_step<T,R> it){return leftDown(it._rank,it._width);}
03428     inline T getLeftDown(iterator_rev<T,R> it){return leftDown(it._rank,it._width);}
03429     //-------------------------------------------------------------------------------
03430     //! Get the nearest right up neighbor value for the element at iterator it
03431     /*!
03432       This method exists for each type of iterator with right up value in SkelGIS.
03433       \param it is the iterator position
03434       \return the right up neighbor element value
03435     */
03436     //-------------------------------------------------------------------------------
03437     inline T getRightUp(iterator<T,R> it){return rightUp(it._rank,it._width);}
03438     inline T getRightUp(iterator_cont<T,R> it){return rightUp(it._rank,it._width);}
03439     inline T getRightUp(iterator_step<T,R> it){return rightUp(it._rank,it._width);}
03440     inline T getRightUp(iterator_rev<T,R> it){return rightUp(it._rank,it._width);}
03441     //-------------------------------------------------------------------------------
03442     //! Get the nearest left up neighbor value for the element at iterator it
03443     /*!
03444       This method exists for each type of iterator with left up value in SkelGIS.
03445       \param it is the iterator position
03446       \return the left up neighbor element value
03447     */
03448     //-------------------------------------------------------------------------------
03449     inline T getLeftUp(iterator<T,R> it){return leftUp(it._rank,it._width);}
03450     inline T getLeftUp(iterator_cont<T,R> it){return leftUp(it._rank,it._width);}
03451     inline T getLeftUp(iterator_step<T,R> it){return leftUp(it._rank,it._width);}
03452     inline T getLeftUp(iterator_rev<T,R> it){return leftUp(it._rank,it._width);}
03453     //-------------------------------------------------------------------------------
03454     //! Get all values along X axe for the element at iterator it
03455     /*!
03456       Equivalent to getAllLeft and getAllRight with a single call.
03457       This method exists for each type of iterator with left and right values in SkelGIS.
03458       \param it is the iterator position
03459       \return a table of all neighbor values along X
03460     */
03461     //-------------------------------------------------------------------------------
03462     inline T * getAllX(iterator<T,R> it){return allX(it._rank);}
03463     inline T * getAllX(iterator_cont<T,R> it){return allX(it._rank);}
03464     inline T * getAllX(iterator_step<T,R> it){return allX(it._rank);}
03465     inline T * getAllX(iterator_rev<T,R> it){return allX(it._rank);}
03466     //-------------------------------------------------------------------------------
03467     //! Get all values along X axe for the element at iterator it
03468     /*!
03469       Equivalent to getAllLeft and getAllRight with a single call.
03470       This method exists for each type of iterator with left and right values in SkelGIS.
03471       \param it is the iterator position
03472       \param result table of all neighbor values along X
03473     */
03474     //-------------------------------------------------------------------------------
03475     inline void getAllX(iterator<T,R> it, T * t){return allX(it,t);}
03476     inline void getAllX(iterator_cont<T,R> it, T * t){return allX(it,t);}
03477     inline void getAllX(iterator_rev<T,R> it, T * t){return allX(it,t);}
03478     inline void getAllX(iterator_step<T,R> it, T * t){return allX(it,t);}
03479     //-------------------------------------------------------------------------------
03480     //! Get all values along Y axe for the element at iterator it
03481     /*!
03482       Equivalent to getAllUp and getAllDown with a single call.
03483       This method exists for each type of iterator with left and right values in SkelGIS.
03484       \param it is the iterator position
03485       \return a table of all neighbor values along Y
03486     */
03487     //-------------------------------------------------------------------------------
03488     inline T * getAllY(iterator<T,R> it){return allY(it._rank,it._width);}
03489     inline T * getAllY(iterator_cont<T,R> it){return allY(it._rank,it._width);}
03490     inline T * getAllY(iterator_step<T,R> it){return allY(it._rank,it._width);}
03491     inline T * getAllY(iterator_rev<T,R> it){return allY(it._rank,it._width);}
03492     //-------------------------------------------------------------------------------
03493     //! Get all values along Y axe for the element at iterator it
03494     /*!
03495       Equivalent to getAllUp and getAllDown with a single call.
03496       This method exists for each type of iterator with left and right values in SkelGIS.
03497       \param it is the iterator position
03498       \param result table of all neighbor values along Y
03499     */
03500     //-------------------------------------------------------------------------------
03501     inline void getAllY(iterator<T,R> it, T * t){return allY(it,t);}
03502     inline void getAllY(iterator_cont<T,R> it, T * t){return allY(it,t);}
03503     inline void getAllY(iterator_rev<T,R> it, T * t){return allY(it,t);}
03504     inline void getAllY(iterator_step<T,R> it, T * t){return allY(it,t);}
03505     //-------------------------------------------------------------------------------
03506     //! Get all 8 directions neighbor values for the element at iterator it
03507     /*!
03508       Equivalent to getLeftUp, getUp, getRightUp, getRight, getRightDown, getDown, getLeftDown and getLeft with a single call.
03509       \param it is the iterator position
03510       \return a table of all 8 directions neighbor values
03511     */
03512     //-------------------------------------------------------------------------------
03513     inline T * get8(iterator<T,R> it){return eight(it._rank,it._width);}
03514     inline T * get8(iterator_cont<T,R> it){return eight(it._rank,it._width);}
03515     inline T * get8(iterator_step<T,R> it){return eight(it._rank,it._width);}
03516     inline T * get8(iterator_rev<T,R> it){return eight(it._rank,it._width);}
03517     //-------------------------------------------------------------------------------
03518     //! Get all 4 directions neighbor values for the element at iterator it
03519     /*!
03520       Equivalent to getUp, getRight, getDown and getLeft with a single call.
03521       \param it is the iterator position
03522       \return a table of all 4 directions neighbor values
03523     */
03524     //-------------------------------------------------------------------------------
03525     inline T * get4(iterator<T,R> it){return four(it._rank,it._width);}
03526     inline T * get4(iterator_cont<T,R> it){return four(it._rank,it._width);}
03527      inline T * get4(iterator_step<T,R> it){return four(it._rank,it._width);}
03528     inline T * get4(iterator_rev<T,R> it){return four(it._rank,it._width);}
03529     //-------------------------------------------------------------------------------
03530     //! Get the nearest right neighbor value inside the domain, for physical left border iterators
03531     /*!
03532       This work for every size of R and will always give the nearest right inside-domain value.
03533       \param it is the physical border iterator position
03534       \return the neighbor value
03535     */
03536     //-------------------------------------------------------------------------------
03537     inline T getInRight(iterator_phb_left<T,R> it){return inright(it._rank);}
03538     //-------------------------------------------------------------------------------
03539     //! Get the nearest left neighbor value inside the domain, for physical right border iterators
03540     /*!
03541       This work for every size of R and will always give the nearest left inside-domain value.
03542       \param it is the physical border iterator position
03543       \return the neighbor value
03544     */
03545     //-------------------------------------------------------------------------------
03546     inline T getInLeft(iterator_phb_right<T,R> it){return inleft(it._rank);}
03547     //-------------------------------------------------------------------------------
03548     //! Get the nearest up neighbor value inside the domain, for physical down border iterators
03549     /*!
03550       This work for every size of R and will always give the nearest up inside-domain value.
03551       \param it is the physical border iterator position
03552       \return the neighbor value
03553     */
03554     //-------------------------------------------------------------------------------
03555     inline T getInUp(iterator_phb_down<T,R> it){return inup(it._rank,it._width);}
03556     //-------------------------------------------------------------------------------
03557     //! Get the nearest down neighbor value inside the domain, for physical up border iterators
03558     /*!
03559       This work for every size of R and will always give the nearest down inside-domain value.
03560       \param it is the physical border iterator position
03561       \return the neighbor value
03562     */
03563     //-------------------------------------------------------------------------------
03564     inline T getInDown(iterator_phb_up<T,R> it){return indown(it._rank,it._width);}
03565     //-------------------------------------------------------------------------------
03566 
03567   };
03568 
03569   //================================================================================
03570   //!  DMatrix_impl class
03571   /*!
03572     template of the distributed matrix implementation of SkelGIS. This is the first specialization of the DMatrix_impl.
03573     \tparam T is the type of data to store in the DMatrix_impl
03574     R overlap template parameter is not asked and specialized to 0.
03575     The line template parameter is specialized with the false value. In this case the parallel data distribution will be done along height and width (by block).
03576   */
03577   //-------------------------------------------------------------------------------
03578   template<class T> struct DMatrix_impl<T,0,false> : DMatrix_base<T>
03579   //-------------------------------------------------------------------------------
03580   {
03581   protected:
03582 
03583     //-------------------------------------------------------------------------------
03584     //! Creation of the ditributed matrix
03585     /*!
03586       Data distribution and construction of the parallel data structure.
03587     */
03588     //-------------------------------------------------------------------------------
03589     void create()
03590     //-------------------------------------------------------------------------------
03591     {
03592       if(this->local==false)
03593         {
03594           //division of the matrix in cols and rows
03595           this->cols = floor(sqrt(Mpi_::mpi_nb));
03596           int nb = 1;
03597           while(Mpi_::mpi_nb%this->cols!=0)
03598             {
03599               this->cols = floor(sqrt(Mpi_::mpi_nb))-nb;
03600               nb++;
03601             }
03602           this->rows = Mpi_::mpi_nb/this->cols;
03603 
03604           this->row=floor(Mpi_::mpi_rank/this->cols);
03605           this->col=Mpi_::mpi_rank%this->cols;
03606 
03607           this-> remainderh = 0;
03608           //if the height of the matrix can be divided by the number of rows
03609           if(this->head.height%this->rows==0)
03610             this->loc_head.height=floor(this->head.height/this->rows);
03611           //else we add one to the height of the submatrix and the last row will have less elements
03612           else
03613             {
03614               this->remainderh = this->head.height - floor(this->head.height/this->rows)*this->rows;
03615 
03616               if(this->row<this->remainderh)
03617                 this->loc_head.height=floor(this->head.height/this->rows)+1;
03618               else
03619                 this->loc_head.height=floor(this->head.height/this->rows);
03620             }
03621 
03622           this-> remainderw = 0;
03623           //if the height of the matrix can be divided by the number of rows
03624           if(this->head.width%this->cols==0)
03625             this->loc_head.width=floor(this->head.width/this->cols);
03626           //else we add one to the height of the submatrix and the last row will have less elements
03627           else
03628             {
03629               this->remainderw = this->head.width - floor(this->head.width/this->cols)*this->cols;
03630 
03631               if(this->col<this->remainderw)
03632                 this->loc_head.width=floor(this->head.width/this->cols)+1;
03633               else
03634                 this->loc_head.width=floor(this->head.width/this->cols);
03635             }
03636         
03637           this->loc_head.x=this->head.x+this->col*this->loc_head.width*this->head.spacing;
03638           this->loc_head.y=this->head.y-this->row*this->loc_head.height*this->head.spacing;
03639           this->loc_head.spacing=this->head.spacing;
03640           this->loc_head.nodata=this->head.nodata;
03641         }
03642       else
03643         {
03644           this->loc_head.x = this->head.x;
03645           this->loc_head.y = this->head.y;
03646           this->loc_head.width = this->head.width;
03647           this->loc_head.height = this->head.height;
03648           this->loc_head.spacing = this->head.spacing;
03649           this->loc_head.nodata = this->head.nodata;
03650 
03651           this->col=0;this->row=0;
03652           this->cols=1;this->rows=1;
03653           this-> remainderw = 0;
03654           this-> remainderh = 0;
03655         }
03656       
03657       this->data = new T[this->loc_head.width*this->loc_head.height];
03658     }
03659     //-------------------------------------------------------------------------------
03660     //! Values initialization of the elements of the DMatrix_impl
03661     /*!
03662       \param value is the value to set to the elements
03663     */
03664     //-------------------------------------------------------------------------------
03665     void initValues(T value)
03666     //-------------------------------------------------------------------------------
03667     {
03668       for(unsigned int i=0;i<this->loc_head.width*this->loc_head.height;i++)
03669         {
03670           this->data[i]=value;
03671         }
03672     }
03673     //-------------------------------------------------------------------------------
03674     //! Read the zone file concerned for the current MPI process
03675     /*!
03676       read the MPI prcess zone, and spread the data in the subMatrices OpenMP
03677       only one submatrice if OpenMP is disabled
03678       \param binFile is binary file to read
03679     */
03680     //-------------------------------------------------------------------------------
03681     void read(const char * binFile)
03682     //-------------------------------------------------------------------------------
03683     {
03684       //read the zone of the file for my own block of data
03685       std::ifstream f(binFile, std::ios::binary | std::ios::in);
03686 
03687       //move to the begining of the bloc of data concerned
03688       unsigned int mvline;
03689       if(this->row<this->remainderh && this->col<this->remainderw)
03690         mvline = sizeof(HEADER) + (this->col*this->loc_head.width
03691                                    +this->row*this->loc_head.height*this->head.width)*sizeof(T);
03692       else if(this->row<this->remainderh)
03693         mvline = sizeof(HEADER) + (this->remainderw*(this->loc_head.width+1)+(this->col-this->remainderw)*this->loc_head.width
03694                                    +(this->row*this->loc_head.height)
03695                                    *this->head.width)*sizeof(T);
03696       else if(this->col<this->remainderw)
03697         mvline = sizeof(HEADER) + (this->col*this->loc_head.width
03698                                    +(this->remainderh*(this->loc_head.height+1)+(this->row-this->remainderh)*this->loc_head.height)
03699                                    *this->head.width)*sizeof(T);
03700       else
03701         mvline = sizeof(HEADER) + (this->remainderw*(this->loc_head.width+1)+(this->col-this->remainderw)*this->loc_head.width
03702                                    +(this->remainderh*(this->loc_head.height+1)+(this->row-this->remainderh)*this->loc_head.height)
03703                                    *this->head.width)*sizeof(T);
03704 
03705       f.seekg(mvline,std::ios::beg);
03706 
03707       //read by lines
03708       for(unsigned int j=0;j<this->loc_head.height;j++)
03709         {
03710           if(j>0)
03711             {
03712               //move to the next line of the bloc
03713               unsigned int mv = this->head.width - this->loc_head.width;
03714               f.seekg(mv*sizeof(T),std::ios::cur);
03715             }
03716 
03717           for(unsigned int k=0;k<this->loc_head.width;k++)
03718             {
03719               T val;
03720               unsigned int rank = j*this->loc_head.width + k;
03721               f.read(reinterpret_cast<char*>(&val),sizeof(T));
03722               this->data[rank]=val;
03723             }
03724         }
03725     }
03726     //-------------------------------------------------------------------------------
03727     //! Get the indexes associated to a precise rank
03728     /*!
03729       \param rank is the rank at which the indexes values are wanted
03730       \param col is the column return value
03731       \param li is the line return value
03732     */
03733     //-------------------------------------------------------------------------------
03734     void indexesIt(unsigned int rank,int &col,int &li)
03735     //-------------------------------------------------------------------------------
03736     {
03737       li = floor(rank/this->loc_head.width);
03738       col = rank%this->loc_head.width;
03739     }
03740     //-------------------------------------------------------------------------------
03741 
03742   public:
03743     //! default constructor of the distributed matrix interface
03744     //-------------------------------------------------------------------------------
03745     DMatrix_impl() : DMatrix_base<T>(){}
03746     //-------------------------------------------------------------------------------
03747     //! constructor of the distributed matrix
03748     /*!
03749       This constructor will simply allocate the good space.
03750       \param h is the header to use to build your matrix
03751       \param value is the default value to put in the matrix
03752       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
03753     */
03754     //-------------------------------------------------------------------------------
03755     DMatrix_impl(HEADER h,const T value, bool loc=false) : DMatrix_base<T>()
03756     //-------------------------------------------------------------------------------
03757     {
03758       this->head=h;
03759       this->local=loc;
03760       create();
03761       initValues(value);
03762     }
03763     //! constructor of the distributed matrix
03764     /*!
03765       The constructor evaluate how to divide the matrix in submatrice and do the work.
03766       \param binFile is the path of the binary file you want to work on
03767       \param loc is a boolean value to indicate if the DMatrix_impl has to stay local to one MPI process (used in ApplyReduction).
03768     */
03769     //-------------------------------------------------------------------------------
03770     DMatrix_impl(const char * binFile, bool loc=false) : DMatrix_base<T>()
03771     //-------------------------------------------------------------------------------
03772     {
03773       this->head = Header::read(binFile);
03774       this->local=loc;
03775       create();
03776       initValues(0);
03777       read(binFile);
03778     }
03779     //-------------------------------------------------------------------------------
03780     //! destructor of the distributed matrix
03781     //-------------------------------------------------------------------------------
03782     ~DMatrix_impl()
03783     //-------------------------------------------------------------------------------
03784     {
03785       if(NULL!=this->data)
03786         delete [] this->data;
03787     }
03788     //-------------------------------------------------------------------------------
03789     //! Get the beginning iterator of the DMatrix (on the first element of the DMatrix)
03790     /*!
03791       This method exists for each type of iterator in SkelGIS.
03792       \return the iterator of the first element
03793     */
03794     //-------------------------------------------------------------------------------
03795     inline iterator<T,0> begin(){return iterator<T,0>(0);}
03796     inline iterator_cont<T,0> begin_cont(){return iterator_cont<T,0>(0);}
03797     inline iterator_step<T,0> begin_step(int step,int nb){return iterator_step<T,0>(0,step,nb);}
03798     inline iterator_rev<T,0> begin_rev(){return iterator_rev<T,0>(this->loc_head.width*this->loc_head.height-1);}
03799     inline iterator_line<T,0> begin_line(){return iterator_line<T,0>(0,this->loc_head.width);}
03800     //-------------------------------------------------------------------------------
03801     //! Get the ending iterator of the DMatrix (on the last element of the DMatrix)
03802     /*!
03803       This method exists for each type of iterator in SkelGIS.
03804       \return the iterator of the last element
03805     */
03806     //-------------------------------------------------------------------------------
03807     inline iterator<T,0> end(){return iterator<T,0>(this->loc_head.width*this->loc_head.height-1);}
03808     inline iterator_cont<T,0> end_cont(){return iterator_cont<T,0>(this->loc_head.width*this->loc_head.height-1);}
03809     inline iterator_step<T,0> end_step(){return iterator_step<T,0>(this->loc_head.width*this->loc_head.height-1);}
03810     inline iterator_rev<T,0> end_rev(){return iterator_rev<T,0>(0);}
03811     inline iterator_line<T,0> end_line(){return iterator_line<T,0>(this->loc_head.width*this->loc_head.height-this->loc_head.width,this->loc_head.width);}
03812     //-------------------------------------------------------------------------------
03813     //! get the iterator on the matrix at position (x,y)
03814     /*!
03815       \param x is row index
03816       \pram y is column index
03817       \return returns the iterator at position (x,y)
03818     */
03819     iterator<T,0> getIterator(int col,int li)
03820     //-------------------------------------------------------------------------------
03821     {
03822       if(NULL!=this->data)
03823         {
03824           unsigned int rank = li*this->loc_head.width+col;
03825           return iterator<T,0>(rank,this->loc_head.width);
03826         }
03827     }
03828     //-------------------------------------------------------------------------------
03829     //! get the iterator contiguous on the matrix at position (x,y)
03830     /*!
03831       \param x is row index
03832       \pram y is column index
03833       \return returns the iterator at position (x,y)
03834     */
03835     iterator_cont<T,0> getIterator_cont(int col,int li)
03836     //-------------------------------------------------------------------------------
03837     {
03838       if(NULL!=this->data)
03839         {
03840           unsigned int rank = li*this->loc_head.width+col;
03841           return iterator_cont<T,0>(rank,this->loc_head.width);
03842         }
03843     }
03844     //-------------------------------------------------------------------------------
03845     //! get the iterator reverse on the matrix at position (x,y)
03846     /*!
03847       \param x is row index
03848       \pram y is column index
03849       \return returns the iterator at position (x,y)
03850     */
03851     iterator_rev<T,0> getIterator_rev(int col,int li)
03852     //-------------------------------------------------------------------------------
03853     {
03854       if(NULL!=this->data)
03855         {
03856           unsigned int rank = li*this->loc_head.width+col;
03857           return iterator_rev<T,0>(rank,this->loc_head.width);
03858         }
03859     }
03860     //-------------------------------------------------------------------------------
03861     //! get the indexes of the iterator on the matrix
03862     /*!
03863       \param it is the iterator from which the indexes will be taken
03864       \param x is the iterator position x returned
03865       \param y is the iterator position y returned
03866     */
03867     void getIndexes(iterator<T,0> it,int &col,int &li){indexesIt(it._rank,col,li);}
03868     void getIndexes(iterator_cont<T,0> it,int &col,int &li){indexesIt(it._rank,col,li);}
03869     void getIndexes(iterator_rev<T,0> it,int &col,int &li){indexesIt(it._rank,col,li);}
03870     //-------------------------------------------------------------------------------
03871     //! Print the matrix by bloc
03872     //-------------------------------------------------------------------------------
03873     void print()
03874     //-------------------------------------------------------------------------------
03875     {
03876       std::stringstream st;
03877       for(unsigned int i=0;i<this->loc_head.height;i++)
03878         {
03879           for(unsigned int j=0;j<this->loc_head.width;j++)
03880             {
03881               st<<this->data[this->loc_head.width + i*this->loc_head.width+j]<<" ";
03882             }
03883           st<<"\n";
03884         }
03885       Mpi_::print(st.str());
03886     }
03887     //-------------------------------------------------------------------------------
03888     //! Write the zone file concerned for the current MPI process
03889     /*!
03890       write the MPI prcess zone, and spread the data in the subMatrices OpenMP
03891       only one submatrice if OpenMP is disabled
03892       \param binFile is binary file to write in
03893     */
03894     //-------------------------------------------------------------------------------
03895     void write(char * binFile)
03896     //-------------------------------------------------------------------------------
03897     {
03898       MPI_File fh;
03899       MPI_Status status;
03900       //open the output file
03901       MPI_File_open(MPI_COMM_WORLD, binFile,
03902                 MPI_MODE_CREATE|MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
03903    
03904       // process 0 writes the HEADER of the file
03905       if (Mpi_::mpi_rank==0)
03906         MPI_File_write_at(fh, 0,reinterpret_cast<char*>(&this->head) ,sizeof(HEADER) , MPI_CHAR, &status);
03907       //the begining of the bloc of write in
03908       unsigned int mvline;
03909       // number of bytes between 2 blocks
03910       unsigned int step;
03911 
03912       
03913       // the starting point of the row where the DMatrix is saved
03914       if (this->row<this->remainderh)
03915         mvline=sizeof(HEADER)+sizeof(T)*this->head.width*this->loc_head.height*this->row;
03916       else
03917         mvline=sizeof(HEADER)+sizeof(T)*this->head.width*(this->loc_head.height*this->row+this->remainderh);
03918       
03919       // now we must add the number of bytes corresponding to the column of the processor
03920       if (this->col<this->remainderw)
03921         mvline+=sizeof(T)*this->loc_head.width*this->col;
03922       else
03923         mvline+=sizeof(T)*(this->loc_head.width*this->col+this->remainderw);
03924 
03925       // pointer in the data of the dmatrix
03926       T *data_pointer=this->data;
03927 
03928       //write by lines
03929       for(unsigned int j=0;j<this->loc_head.height;j++)
03930         {
03931           MPI_File_write_at(fh, mvline,reinterpret_cast<char*>(data_pointer) ,this->loc_head.width*sizeof(T) , MPI_CHAR, &status);
03932           data_pointer+=this->loc_head.width;
03933           mvline+=this->head.width*sizeof(T);
03934         }
03935       MPI_File_close(&fh);
03936     }
03937     //-------------------------------------------------------------------------------
03938     
03939   };
03940   //-------------------------------------------------------------------------------
03941 
03942 }
03943 
03944 #endif
 All Classes Files Functions Variables Defines