psmile_global_search_cell_real.F90

Go to the documentation of this file.
00001 !-----------------------------------------------------------------------
00002 ! $COPYRIGHT_NLE-IT
00003 !-----------------------------------------------------------------------
00004 !BOP
00005 !
00006 ! !ROUTINE: PSMILe_global_search_cell_real
00007 !
00008 ! !INTERFACE:
00009 
00010       subroutine psmile_global_search_cell_real ( grid_id, var_id,  &
00011                   comp_info, send_info, search, extra_search, ncpl, &
00012                   nbr_cells, n_intmethods, interpolation_methods,   &
00013                   interpolation_search, ierror )
00014 !
00015 ! !USES:
00016 !
00017       use PRISM
00018 !
00019       use PSMILe, dummy_interface => PSMILe_global_search_cell_real
00020 
00021       implicit none
00022 !
00023 ! !INPUT PARAMETERS:
00024 !
00025       Integer, Intent(In)                  :: grid_id
00026 
00027 !     grid id of source grid
00028 
00029       Integer, Intent(In)                  :: var_id
00030 
00031 !     field id of source data
00032 
00033       Type (Enddef_comp), Intent (In)      :: comp_info
00034 !
00035 !     Info on the component in which the donor cells
00036 !     should be searched.
00037 
00038       Type (Send_information), Intent (InOut) :: send_info
00039 !
00040 !     Metadata for target locations
00041 !
00042       Type (Enddef_search), Intent (InOut) :: search
00043 !
00044 !     Info's on coordinates to be searched
00045 !
00046       Type (Extra_search_info), Intent (InOut) :: extra_search
00047 !
00048 !     Info's on global coordinates to be searched
00049 !
00050       Integer, Intent (In)                 :: ncpl
00051 !
00052 !     Number of target cells treated on this source process
00053 !
00054       Integer, Intent (InOut)              :: nbr_cells(ncpl)
00055 !
00056 !     Number of source cells per target cell
00057 !
00058       Integer, Intent (In)                 :: n_intmethods
00059 !
00060 !     Number of interpolations methods used to interpolate the data.
00061 !     Dimension of vectors "interpolation_methods" and "interpolation_search".
00062 
00063       Integer, Intent (In)                 :: interpolation_methods (n_intmethods)
00064 !
00065 !     Codes for the interpolation methods
00066 !
00067       Logical, Intent (In)                 :: interpolation_search  (n_intmethods)
00068 !
00069 !     Is global search required for corresponding interpolation method ?
00070 !
00071 ! !OUTPUT PARAMETERS:
00072 !
00073       Integer, Intent (Out)                :: ierror
00074 
00075 !     Returns the error code of PSMILe_global_search_cell_real;
00076 !             ierror = 0 : No error
00077 !             ierror > 0 : Severe error
00078 !
00079 ! !LOCAL PARAMETERS
00080 !
00081 !     ... for send to partners
00082 !
00083       Integer, Parameter                 :: n_extra = 7
00084       Integer, Parameter                 :: send_appl_index = 5
00085 
00086       Logical, Pointer                   :: lbuf (:)
00087       Real, Pointer                      :: buf (:)
00088 
00089       Type (real_vector), Pointer        :: real_buf(:)
00090       Type (integer_vector), Allocatable :: tgt_ibuf(:)
00091       Type (integer_vector), Allocatable :: src_ibuf(:)
00092       Type (logical_vector), Allocatable :: log_buf(:)
00093 
00094       Type (Corner_Block), Pointer       :: cp
00095       Type (Method),       Pointer       :: mp
00096 
00097       Integer, Pointer                   :: new_neighcells_3d(:,:,:)
00098       Integer, Pointer                   :: neigh_3d(:,:)
00099       Logical, Pointer                   :: mask_array(:)
00100       Integer                            :: mask_shape (2, ndim_3d)
00101 
00102       Integer, Allocatable               :: rextra_msg (:,:)
00103       Integer, Allocatable               :: nbr_tgt_cells(:)
00104       Integer, Allocatable               :: nbr_src_cells(:)
00105       Integer, Allocatable               :: rank_list(:)
00106       Integer, Allocatable               :: cell_trans(:,:)
00107       Integer, Allocatable               :: save_lreq(:)
00108       Integer, Allocatable               :: lrequest(:)
00109       Integer, Allocatable               :: tgt_list(:), tgt_cell(:), tgt_index(:)
00110 
00111       Logical                            :: src_mask_available
00112 
00113       Integer                            :: tgt_idx
00114       Integer                            :: new_nbr_cells(ncpl)
00115       Integer                            :: mask_id
00116       Integer                            :: method_id
00117       Integer                            :: nbr_corners
00118       Type (enddef_msg_locations)        :: msg_locations
00119       Integer                            :: msgloc (msgloc_size)
00120       Integer                            :: extra_msg (n_extra)
00121       Integer                            :: len
00122       Integer                            :: lenx, leny, lenxy, m_lenx
00123       Integer                            :: rank, idx1, idx2, idx3, idx4, stride
00124       Integer                            :: nbr_procs(2)
00125       Integer                            :: interp
00126       Integer                            :: dest
00127       Integer                            :: sender
00128       Integer                            :: status(MPI_STATUS_SIZE)
00129       Integer                            :: nbr_cells_tot
00130       Integer                            :: nreq
00131       Integer                            :: recv_req
00132       Integer                            :: nrecv, nsend
00133       Integer                            :: index
00134       Integer                            :: nc
00135       Integer                            :: l
00136       Integer                            :: m, mm
00137       Integer                            :: n, nn
00138 !
00139 !     ... for error handling
00140 !
00141       Integer, Parameter                 :: nerrp = 3
00142       Integer                            :: ierrp (nerrp)
00143 !
00144 !----------------------------------------------------------------------
00145 !
00146 ! !DESCRIPTION:
00147 !
00148 ! Subroutine "PSMILe_global_search_cell_real" collects source cells
00149 ! for target cells located on process boundaries of the source grid.
00150 ! Corresponding MPI calls are located in psmile_enddef_action_cell.
00151 !
00152 !   - exchange boundary cell info by sending back
00153 !     information to the source grid processes holding the
00154 !     respective target cells.
00155 !   - determine who is master source for the target cells
00156 !   - let the masters collect the data
00157 !   - store extra source cells on the master source processes
00158 !   - delete real entries on the non-master sides
00159 !
00160 ! !REVISION HISTORY:
00161 !
00162 !   Date      Programmer   Description
00163 ! ----------  ----------   -----------
00164 ! 13.08.08    R. Redler    created
00165 ! 08.11.06    R. Redler    revised
00166 !
00167 !EOP
00168 !----------------------------------------------------------------------
00169 !
00170 !  $Id: psmile_global_search_cell_real.F90 2887 2011-01-14 09:25:49Z redler $
00171 !  $Author: redler $
00172 !
00173    Character(len=len_cvs_string), save :: mycvs = 
00174        '$Id: psmile_global_search_cell_real.F90 2887 2011-01-14 09:25:49Z redler $'
00175 !
00176 !----------------------------------------------------------------------
00177 !  1.) Initialization
00178 !----------------------------------------------------------------------
00179 !
00180 #ifdef VERBOSE
00181       print 9990, trim(ch_id), grid_id, search%msg_intersections%first_tgt_method_id, search%sender
00182 
00183       call psmile_flushstd
00184 #endif /* VERBOSE */
00185 !
00186       ierror = 0
00187 
00188       method_id = Fields(var_id)%method_id
00189       mask_id = Fields(var_id)%mask_id
00190 
00191       cp => Grids(grid_id)%corner_pointer
00192       mp => Methods(method_id)
00193       nreq = paction%nreq
00194 
00195       src_mask_available = mask_id /= PRISM_UNDEFINED
00196 
00197       if (src_mask_available) then
00198 #ifdef DEBUG
00199         print *, ' Source mask is available.'
00200 #endif  /* DEBUG */
00201          mask_array => Masks(mask_id)%mask_array
00202          mask_shape =  Masks(mask_id)%mask_shape
00203       endif
00204 
00205       new_nbr_cells = nbr_cells
00206       send_info%nloc = ncpl
00207       nbr_cells_tot  = sum(nbr_cells)
00208 
00209       if ( Grids(grid_id)%grid_type == PRISM_Irrlonlat_regvrt ) then
00210 
00211          nbr_corners = cp%nbr_corners/2
00212 
00213       else if ( Grids(grid_id)%grid_type == PRISM_Reglonlatvrt ) then
00214 
00215          nbr_corners = 2
00216 
00217       else if ( Grids(grid_id)%grid_type == PRISM_Gaussreduced_regvrt ) then
00218 
00219          nbr_corners = 2
00220 
00221       else
00222 
00223          ierrp (1) = search%grid_type
00224          ierror = PRISM_Error_Internal
00225          call psmile_error ( ierror, 'unsupported search grid type', &
00226               ierrp, 1, __FILE__, __LINE__ )
00227          return
00228 
00229       endif
00230 
00231       Allocate(save_lreq(nreq), stat=ierror)
00232          if (ierror /= 0) then
00233             ierrp (1) = nreq
00234             ierror = PRISM_Error_Alloc
00235             call psmile_error ( ierror, 'save_lreq', ierrp, 1, &
00236                  __FILE__, __LINE__ )
00237             return
00238          endif
00239 
00240 #ifdef PRISM_ASSERTION
00241       if (interpolation_methods(1) /= PSMILe_conserv2D ) then
00242          call psmile_assert (__FILE__, __LINE__, &
00243               "No global search for this interpolation method")
00244       endif
00245       if ( n_intmethods > 1 ) then
00246          call psmile_assert (__FILE__, __LINE__, &
00247               "2nd (vertical) interpolation method not yet supported.")
00248       endif
00249 #endif /* PRISM_ASSERTION */
00250       if (count(interpolation_search(:)) > 1) then
00251          ierrp (1) = count(interpolation_search(:))
00252 
00253          ierror = PRISM_Error_Internal
00254 
00255          call psmile_error ( ierror, 'Global search supported only for one interpolation method', &
00256               ierrp, 1, __FILE__, __LINE__ )
00257 
00258          return
00259       endif
00260 
00261       do interp = 1, n_intmethods
00262          if (interpolation_search(interp) ) exit
00263       end do
00264 
00265       if (interp > n_intmethods) then
00266          ierror = PRISM_Error_Internal
00267          ierrp (1) = n_intmethods
00268 
00269          call psmile_error ( ierror, 'No Global search for interpolation methods', &
00270               ierrp, 1, __FILE__, __LINE__ )
00271          return
00272       endif
00273 
00274       len = 0
00275 
00276       if ( associated(search%boundary_cell) ) then
00277 
00278          len = size(search%boundary_cell(:,1))
00279 
00280          Allocate(cell_trans(len,3), stat=ierror)
00281 
00282          if (ierror /= 0) then
00283             ierrp (1) = 3*len
00284             ierror = PRISM_Error_Alloc
00285             call psmile_error ( ierror, 'cell_trans', ierrp, 1, &
00286                  __FILE__, __LINE__ )
00287             return
00288          endif
00289 
00290       endif
00291 !
00292 !-------------------------------------------------------------------------
00293 ! 2.) Exchange data between source processes
00294 !-------------------------------------------------------------------------
00295 !
00296 !     Send back information about target cells located on source boundary
00297 !     to target process
00298 !
00299 !===> ... Send request to destination process
00300 !         msgloc is received by routine
00301 !         PSMILe_Enddef_action.
00302 !
00303 !
00304       call psmile_init_enddef_msg_locs (msg_locations)
00305 
00306       msg_locations%requires_conserv_remap = interpolation_methods (interp)
00307       msg_locations%src_rank               = psmile_rank
00308       msg_locations%msg_len                = len
00309       msg_locations%transi_in_id           = search%msg_intersections%transient_in_id
00310       msg_locations%transi_out_id          = search%msg_intersections%transient_out_id
00311 
00312       call psmile_pack_msg_locations (msg_locations, msgloc)
00313 
00314       dest = search%sender
00315 
00316 #ifdef DEBUG
00317       print *, ' Bsend tag ', loctag+search%msg_intersections%relative_msg_tag, ' to ', dest
00318 #endif /* DEBUG */
00319       call psmile_bsend ( msgloc, msgloc_size, MPI_INTEGER, dest,           &
00320                           loctag+search%msg_intersections%relative_msg_tag, &
00321                           comm_psmile, ierror )
00322 
00323       if ( ierror /= MPI_SUCCESS ) then
00324          ierrp (1) = ierror
00325          ierrp (2) = dest
00326          ierrp (3) = loctag+search%msg_intersections%relative_msg_tag
00327          ierror = PRISM_Error_Send
00328          call psmile_error ( ierror, 'psmile_bsend(msg)', &
00329                              ierrp, 3, __FILE__, __LINE__ )
00330          return
00331       endif
00332       !
00333       ! Transfer indices and information about those target cells
00334       !  which touch the boundary of this source domain, received
00335       !  by routine PSMILe_Enddef_action_cell.
00336       !
00337       if ( len > 0 ) &
00338       call psmile_bsend ( search%boundary_cell, 5*len, MPI_INTEGER, &
00339                           dest, celltag, comm_psmile, ierror )
00340 
00341       if ( ierror /= MPI_SUCCESS ) then
00342          ierrp (1) = ierror
00343          ierrp (2) = dest
00344          ierrp (3) = celltag
00345          ierror = PRISM_Error_Send
00346          call psmile_error ( ierror, 'psmile_bsend(msg)', &
00347                              ierrp, 3, __FILE__, __LINE__ )
00348          return
00349       endif
00350       !
00351       ! Receive the number of source processes to which the source
00352       ! has to transfer data, sent by routine PSMILe_Enddef_action_cell.
00353       !
00354       call MPI_Irecv ( nbr_procs, 2, MPI_INTEGER, &
00355                        dest, celltag+1, comm_psmile, recv_req, ierror )
00356 
00357       if ( ierror /= MPI_SUCCESS ) then
00358          ierrp (1) = ierror
00359          ierrp (2) = dest
00360          ierrp (3) = celltag+1
00361          ierror = PRISM_Error_Recv
00362          call psmile_error ( ierror, 'MPI_Recv', &
00363                              ierrp, 3, __FILE__, __LINE__ )
00364          return
00365       endif
00366 
00367 #ifdef DEBUG
00368       print *, ' Posted Irecv request ', recv_req, ' from ', dest
00369 #endif /* DEBUG */
00370 
00371       save_lreq = paction%lrequest
00372       !
00373       ! Give absolute priority to cell requests by temporarily deactivating
00374       ! all other pending request.
00375       !
00376       ! TODO: For the sake of performance it has to be checked whether there
00377       !       are any requests which can savely be handled in between without
00378       !       running into a deadlock.
00379       !
00380       paction%lrequest = MPI_REQUEST_NULL
00381 
00382       paction%lrequest (3) = recv_req
00383 
00384 !     Are there any requests that we should better deactivate temporaily?
00385 !
00386 !     paction%lrequest (?:?) = MPI_REQUEST_NULL
00387 !
00388       index = 0
00389 
00390       do while ( index /= 3)
00391 
00392 #ifdef DEBUG
00393          do n = 1, nreq
00394            print *, ' paction%lrequest ', n, paction%lrequest(n)
00395          enddo
00396 #endif /* DEBUG */
00397 
00398          call MPI_Waitany ( nreq, paction%lrequest, index, status, ierror )
00399 
00400 #ifdef DEBUG
00401          print *, ' Processing with index ', index, ' from ', status(MPI_SOURCE)
00402 #endif /* DEBUG */
00403 
00404          if ( index /= 3 ) then
00405 
00406             call psmile_enddef_action (search, index, status, ierror )
00407 
00408          endif
00409 
00410       enddo
00411 
00412       paction%lrequest(3) = MPI_REQUEST_NULL
00413 
00414       nsend = nbr_procs(1)
00415       nrecv = nbr_procs(2)
00416 
00417 #ifdef DEBUG
00418       print *, ' This rank has to serve        ', nsend, ' source grid processes.'
00419       print *, ' This rank has to receive from ', nrecv, ' source grid processes.'
00420 #endif /* DEBUG */
00421 
00422       if ( max(nsend,nrecv) > 0 ) then
00423          Allocate ( tgt_ibuf (max(nsend,nrecv)), stat=ierror)
00424          if (ierror /= 0) then
00425             ierrp (1) = max(nsend,nrecv)
00426             ierror = PRISM_Error_Alloc
00427             call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
00428                  __FILE__, __LINE__ )
00429             return
00430          endif
00431       endif
00432 
00433       if ( nsend > 0 ) then
00434          Allocate(real_buf      (nsend), &
00435                   log_buf       (nsend), &
00436                   src_ibuf      (nsend), &
00437                   nbr_tgt_cells (nsend), &
00438                   nbr_src_cells (nsend), &
00439                   rank_list     (nsend), stat=ierror)
00440          if (ierror /= 0) then
00441             ierrp (1) = 6*nsend
00442             ierror = PRISM_Error_Alloc
00443             call psmile_error ( ierror, 'nsend buffer', ierrp, 1, &
00444                  __FILE__, __LINE__ )
00445             return
00446          endif
00447 
00448          nbr_tgt_cells = 0
00449          nbr_src_cells = 0
00450          rank_list = -1
00451       endif
00452 
00453       !
00454       ! Receive the list of target indices and related information from
00455       ! target routine PSMILe_Enddef_action_cell for which source cells
00456       ! have to be transferred to other source processes.
00457       ! cell_trans target cell index on the sender source contains a
00458       ! sorted list with increasing order ranks stored in
00459       ! cell_trans(:,2)
00460       !
00461       ! cell_trans(:,1) : target cell index on the sender source
00462       ! cell_trans(:,2) : rank of receiver source process
00463       ! cell_trans(:,3) : target cell index on the receiver source
00464       !
00465 
00466 #ifdef DEBUGX
00467       print *, ' Receiving data ', len, ' records from ', dest
00468       if ( len == 0 ) &
00469       print *, ' Receive canceled because of zero message '
00470 #endif /* DEBUGX */
00471       if ( len > 0 ) &
00472       call MPI_Recv ( cell_trans, 3*len, MPI_INTEGER, &
00473                       dest, celltag+2, comm_psmile, status, ierror )
00474 
00475       if ( ierror /= MPI_SUCCESS ) then
00476          ierrp (1) = ierror
00477          ierrp (2) = dest
00478          ierrp (3) = celltag+2
00479          ierror = PRISM_Error_Recv
00480          call psmile_error ( ierror, 'MPI_Recv', &
00481                              ierrp, 3, __FILE__, __LINE__ )
00482          return
00483       endif
00484       !
00485       ! Determine number of data to be transferred
00486       !   to each destination source
00487       !
00488       if ( nsend > 0 ) then
00489 
00490 #ifdef DEBUGX
00491          do n = 1, len
00492 !            if ( cell_trans(n,1) > -1 ) then
00493                print 9970, trim(ch_id), n, cell_trans(n,1), &
00494                     ' -> ', cell_trans(n,2), cell_trans(n,3)
00495 !            endif
00496          enddo
00497 9970     format (1x, a, ': ', i6, ' Transfer index ', i6, a4, i3, i6)
00498 #endif /* DEBUGX */
00499 
00500          nn = 1
00501          m  = 0
00502 
00503          rank_list(nn) = cell_trans(1,2)
00504 
00505          do n = 1, len
00506             if ( n > 1 ) then
00507                if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00508                   nn = nn + 1
00509                   rank_list(nn) = cell_trans(n,2)
00510                endif
00511             endif
00512             if ( cell_trans(n,2) == -1 ) exit
00513 
00514             tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1) ! local index of target cell
00515             nbr_tgt_cells(nn) = nbr_tgt_cells(nn) + 1
00516             nbr_src_cells(nn) = nbr_src_cells(nn) + nbr_cells(tgt_idx)
00517          enddo
00518 #ifdef DEBUG
00519          do n = 1, nsend
00520             print *, nbr_tgt_cells(n), ' target cell info to transfer.'
00521             print *, nbr_src_cells(n), ' source cells have to be transferred to rank ', rank_list(n)
00522          enddo
00523 #endif /* DEBUG */
00524 
00525 !
00526 !-------------------------------------------------------------------------
00527 ! 3.) Send data to the designated child source processes
00528 !-------------------------------------------------------------------------
00529 !
00530 ! TODO: Eliminate multiple entries from cell_trans array and create extra index.
00531 !
00532 !
00533          do n = 1, nsend
00534             Allocate ( tgt_ibuf(n)%vector(nbr_tgt_cells(n)*2), &
00535                        src_ibuf(n)%vector(nbr_src_cells(n)*3), &
00536                        log_buf(n)%vector(nbr_src_cells(n)),    &
00537                        real_buf(n)%vector(nbr_src_cells(n)*2*nbr_corners), stat=ierror )
00538             if (ierror /= 0) then
00539                ierrp (1) = 2*nbr_tgt_cells(n) + (2*nbr_corners+4)*nbr_src_cells(n)
00540                ierror = PRISM_Error_Alloc
00541                call psmile_error ( ierror, 'src/tgt_buf(n)%ibuf/real_buf ', &
00542                   ierrp, 1, __FILE__, __LINE__ )
00543                return
00544             endif
00545          enddo
00546          !
00547          ! Fill in target index information
00548          !
00549          nn = 1
00550          m  = 0
00551 
00552          do n = 1, len
00553 
00554             if ( n > 1 ) then
00555                if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00556                   nn = nn + 1
00557                   m = 0
00558                endif
00559             endif
00560 
00561             if ( cell_trans(n,2) == -1 ) exit
00562 
00563             tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1) ! local index of target cell
00564             m = m+1
00565 
00566             tgt_ibuf(nn)%vector(2*m-1) = cell_trans(n,3)
00567             tgt_ibuf(nn)%vector(2*m  ) = nbr_cells(tgt_idx)
00568 
00569          enddo
00570 
00571          !
00572          ! Distribute information to the different send buffers
00573          !
00574 
00575          select case (Grids(grid_id)%grid_type)
00576 
00577          case (PRISM_Irrlonlat_regvrt)
00578 
00579             !
00580             ! Irrlonlat_regvrt source grids
00581             !
00582 
00583             lenx  = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00584             lenxy = (cp%corner_shape(2,2)-cp%corner_shape(1,2)+1) * lenx
00585 
00586             if (src_mask_available) then
00587                m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00588 #ifdef PRISM_ASSERTION
00589                if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00590                     cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00591                   call psmile_assert (__FILE__, __LINE__, &
00592                        "Mask and corners have to be of the same shape.")
00593                endif
00594                if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00595                   call psmile_assert (__FILE__, __LINE__, &
00596                        "3d masks not yet supported.")
00597                endif
00598 #endif /* PRISM_ASSERTION */
00599             endif
00600 
00601             l  = 0
00602             rank = 1
00603 
00604             do n = 1, len
00605 
00606                if ( n > 1 ) then
00607                   if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00608                      rank = rank + 1
00609                      l    = 0
00610                   endif
00611                endif
00612                if ( cell_trans(n,2) == -1 ) exit
00613 
00614                stride = nbr_corners * nbr_src_cells(rank)
00615 
00616                tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1)
00617 #ifdef DEBUGX
00618                print *, ' Corners for local target index ', tgt_idx
00619 #endif /* DEBUGX */
00620                nn = sum(nbr_cells(1:tgt_idx-1))
00621 
00622                do mm = 1, nbr_cells(tgt_idx)
00623 
00624                   idx3 = (neighcells_3d(2,nn+mm,1)-cp%corner_shape(1,2))*lenx + &
00625                           neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00626 
00627                   src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00628                   src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00629                   src_ibuf(rank)%vector(3*(l+mm)  ) = neighcells_3d(3,nn+mm,1)
00630 
00631                   do nc = 1, nbr_corners
00632                      idx1 = (nc-1)*nbr_src_cells(rank)+l+mm
00633                      idx2 = stride + idx1
00634                      real_buf(rank)%vector(idx1) = cp%corners_real(1)%vector((nc-1)*lenxy+idx3)
00635                      real_buf(rank)%vector(idx2) = cp%corners_real(2)%vector((nc-1)*lenxy+idx3)
00636 #ifdef DEBUGX
00637                      print *, ' send corners ', l+mm, real_buf(rank)%vector(idx1), &
00638                                                       real_buf(rank)%vector(idx2)
00639                   end do
00640 
00641                   print *, ' 2d indices   ', neighcells_3d(1,nn+mm,1), &
00642                                              neighcells_3d(2,nn+mm,1)
00643 #else
00644                   end do
00645 #endif /* DEBUGX */
00646                enddo ! mm
00647 
00648                if (src_mask_available) then
00649 
00650                   do mm = 1, nbr_cells(tgt_idx)
00651                      idx3 = (neighcells_3d(2,nn+mm,1)-mask_shape(1,2))*m_lenx + &
00652                              neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1
00653                      log_buf(rank)%vector(l+mm) = mask_array(idx3)
00654                   enddo
00655 #ifdef DEBUGX
00656                   print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00657 #endif
00658                endif
00659 
00660                ! Erase target cell entries and related info which
00661                ! have been transfered to a remote source process
00662 #ifdef DEBUGX
00663                print *, ' Erase target cell index ', tgt_idx
00664                print *
00665 #endif /* DEBUGX */
00666                new_nbr_cells(tgt_idx) = 0
00667                send_info%nloc = send_info%nloc - 1
00668 
00669                ! Increment indices
00670 
00671                l = l + nbr_cells(tgt_idx)
00672 
00673             enddo ! len
00674 
00675          case (PRISM_Reglonlatvrt)
00676 
00677             !
00678             ! Reglonlatvrt source grids
00679             !
00680 
00681             lenx = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00682             leny = (cp%corner_shape(2,2)-cp%corner_shape(1,2)+1)
00683 
00684             if (src_mask_available) then
00685                m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00686 #ifdef PRISM_ASSERTION
00687                if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00688                     cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00689                   call psmile_assert (__FILE__, __LINE__, &
00690                        "Mask and corners have to be of the same shape.")
00691                endif
00692                if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00693                   call psmile_assert (__FILE__, __LINE__, &
00694                        "3d masks not yet supported.")
00695                endif
00696 #endif /* PRISM_ASSERTION */
00697             endif
00698 
00699             l  = 0
00700             rank = 1
00701 
00702             do n = 1, len
00703 
00704                if ( n > 1 ) then
00705                   if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00706                      rank = rank + 1
00707                      l    = 0
00708                   endif
00709                endif
00710                if ( cell_trans(n,2) == -1 ) exit
00711 
00712                stride = nbr_corners * nbr_src_cells(rank)
00713 
00714                tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1)
00715 #ifdef DEBUGX
00716                print *, ' Corners for local target index ', tgt_idx
00717 #endif /* DEBUGX */
00718                nn = sum(nbr_cells(1:tgt_idx-1))
00719 
00720                do mm = 1, nbr_cells(tgt_idx)
00721 
00722                   src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00723                   src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00724                   src_ibuf(rank)%vector(3*(l+mm)  ) = neighcells_3d(3,nn+mm,1)
00725 
00726                   idx3 = neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00727                   idx4 = neighcells_3d(2,nn+mm,1)-cp%corner_shape(1,2)+1
00728 
00729                   do nc = 1, nbr_corners
00730                      idx1 = (nc-1)*nbr_src_cells(rank)+l+mm
00731                      idx2 = stride + idx1
00732                      real_buf(rank)%vector(idx1) = cp%corners_real(1)%vector((nc-1)*lenx+idx3)
00733                      real_buf(rank)%vector(idx2) = cp%corners_real(2)%vector((nc-1)*leny+idx4)
00734 #ifdef DEBUGX
00735                      print *, ' send corners ', l+mm, real_buf(rank)%vector(idx1), &
00736                                                       real_buf(rank)%vector(idx2)
00737                   end do
00738 
00739                   print *, ' 2d indices   ', neighcells_3d(1,nn+mm,1), &
00740                                              neighcells_3d(2,nn+mm,1)
00741 #else
00742                   end do
00743 #endif /* DEBUGX */
00744                enddo ! mm
00745 
00746                if (src_mask_available) then
00747 
00748                   do mm = 1, nbr_cells(tgt_idx)
00749                      idx3 = (neighcells_3d(2,nn+mm,1)-mask_shape(1,2))*m_lenx + &
00750                              neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1
00751                      log_buf(rank)%vector(l+mm) = mask_array(idx3)
00752                   enddo
00753 #ifdef DEBUGX
00754                   print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00755 #endif
00756                endif
00757 
00758                ! Erase target cell entries and related info which
00759                ! have been transfered to a remote source process
00760 #ifdef DEBUGX
00761                print *, ' Erase target cell index ', tgt_idx
00762                print *
00763 #endif /* DEBUGX */
00764                new_nbr_cells(tgt_idx) = 0
00765                send_info%nloc = send_info%nloc - 1
00766 
00767                ! Increment indices
00768 
00769                l = l + nbr_cells(tgt_idx)
00770 
00771             enddo ! len
00772 
00773          case (PRISM_Gaussreduced_regvrt)
00774 
00775             !
00776             ! Gauss-reduced source grids
00777             !
00778 
00779             lenx = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00780 
00781             if (src_mask_available) then
00782                m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00783 #ifdef PRISM_ASSERTION
00784                if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00785                     cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00786                   call psmile_assert (__FILE__, __LINE__, &
00787                        "Mask and corners have to be of the same shape.")
00788                endif
00789                if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00790                   call psmile_assert (__FILE__, __LINE__, &
00791                        "3d masks not yet supported.")
00792                endif
00793 #endif /* PRISM_ASSERTION */
00794             endif
00795 
00796             l  = 0
00797             rank = 1
00798 
00799             do n = 1, len
00800 
00801                if ( n > 1 ) then
00802                   if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00803                      rank = rank + 1
00804                      l    = 0
00805                   endif
00806                endif
00807                if ( cell_trans(n,2) == -1 ) exit
00808 
00809                stride = nbr_corners * nbr_src_cells(rank)
00810 
00811                tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1)
00812 #ifdef DEBUGX
00813                print *, ' Corners for local target index ', tgt_idx
00814 #endif /* DEBUGX */
00815                nn = sum(nbr_cells(1:tgt_idx-1))
00816 
00817                do mm = 1, nbr_cells(tgt_idx)
00818 
00819                   src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00820                   src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00821                   src_ibuf(rank)%vector(3*(l+mm)  ) = neighcells_3d(3,nn+mm,1)
00822 
00823                   idx3 = neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00824                   idx4 = neighcells_3d(2,nn+mm,1)-cp%corner_shape(1,2)+1
00825 
00826                   do nc = 1, nbr_corners
00827                      idx1 = (nc-1)*nbr_src_cells(rank)+l+mm
00828                      idx2 = stride + idx1
00829                      real_buf(rank)%vector(idx1) = cp%corners_real(1)%vector((nc-1)*lenx+idx3)
00830                      real_buf(rank)%vector(idx2) = cp%corners_real(2)%vector((nc-1)*lenx+idx4)
00831 #ifdef DEBUGX
00832                      print *, ' send corners ', l+mm, real_buf(rank)%vector(idx1), &
00833                                                       real_buf(rank)%vector(idx2)
00834                   end do
00835 
00836                   print *, ' 2d indices   ', neighcells_3d(1,nn+mm,1), &
00837                                              neighcells_3d(1,nn+mm,1)
00838 #else
00839                   end do
00840 #endif /* DEBUGX */
00841                enddo ! mm
00842 
00843                if (src_mask_available) then
00844 
00845                   do mm = 1, nbr_cells(tgt_idx)
00846                      idx3 = neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1
00847                      log_buf(rank)%vector(l+mm) = mask_array(idx3)
00848                   enddo
00849 #ifdef DEBUGX
00850                   print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00851 #endif
00852                endif
00853 
00854                ! Erase target cell entries and related info which
00855                ! have been transfered to a remote source process
00856 #ifdef DEBUGX
00857                print *, ' Erase target cell index ', tgt_idx
00858                print *
00859 #endif /* DEBUGX */
00860                new_nbr_cells(tgt_idx) = 0
00861                send_info%nloc = send_info%nloc - 1
00862 
00863                ! Increment indices
00864 
00865                l = l + nbr_cells(tgt_idx)
00866 
00867             enddo ! len
00868 
00869          case DEFAULT
00870 
00871             ierrp (1) = search%grid_type
00872             ierror = PRISM_Error_Internal
00873             call psmile_error ( ierror, 'unsupported search grid type', &
00874                  ierrp, 1, __FILE__, __LINE__ )
00875             return
00876 
00877          end select
00878 
00879          nbr_cells_tot = sum(new_nbr_cells)
00880 
00881       endif ! nsend > 0
00882       !
00883       ! Reset requests and deallocate memory
00884       !
00885 !rr   paction%lrequest(2:3) = save_lreq(2:3)
00886 
00887       paction%lrequest = save_lreq
00888 
00889       if ( nsend > nreq ) then
00890          Deallocate (save_lreq, stat=ierror)
00891          if (ierror /= 0) then
00892             ierrp (1) = nreq
00893             ierror = PRISM_Error_Dealloc
00894             call psmile_error ( ierror, 'save_lreq', ierrp, 1, &
00895                  __FILE__, __LINE__ )
00896             return
00897          endif
00898       endif
00899       !
00900       ! Send data away to the designated parent source processes
00901       !
00902       do n = 1, nsend
00903          !
00904          ! Prepare header message to be send to destination source process.
00905          !
00906          ! (1) = interpolation type
00907          ! (2) = data type
00908          ! (3) = number of target cells to be treated
00909          ! (4) = number of source cell corners
00910          ! (5) = number of source cells
00911          !
00912          extra_msg(1) = interpolation_methods(1)
00913          extra_msg(2) = PRISM_REAL
00914          extra_msg(3) = nbr_tgt_cells(n)
00915          extra_msg(4) = nbr_corners
00916          extra_msg(5) = nbr_src_cells(n)
00917          extra_msg(6) = mask_id
00918          !
00919          ! This should be sufficient for the later operations during the prism_put phase
00920          ! -> see psmile_put_field_real and psmile_extract_indices_3d_real
00921          !
00922          call psmile_get_info_index (method_id, send_appl_index, idx1, ierror)
00923          if (ierror > 0) return
00924 
00925          extra_msg(7) = mp%send_infos_appl(idx1)%msg_id
00926 
00927          mp%send_infos_appl(idx1)%dest   = rank_list(n)
00928          mp%send_infos_appl(idx1)%nloc   = nbr_src_cells(n)
00929 
00930          allocate(neigh_3d(3,nbr_src_cells(n)), stat=ierror)
00931          if (ierror /= 0) then
00932             ierrp (1) = 3*nbr_src_cells(n)
00933             ierror = PRISM_Error_Alloc
00934             call psmile_error ( ierror, 'neigh_3d', ierrp, 1, &
00935                  __FILE__, __LINE__ )
00936             return
00937          endif
00938 
00939          do m = 1, nbr_src_cells(n)
00940             neigh_3d(1,m) = src_ibuf(n)%vector(3*m-2)
00941             neigh_3d(2,m) = src_ibuf(n)%vector(3*m-1)
00942             neigh_3d(3,m) = src_ibuf(n)%vector(3*m  )
00943          enddo
00944 
00945          mp%send_infos_appl(idx1)%neigh_3d => neigh_3d
00946 
00947          call psmile_store_send_info (var_id,                   &
00948                      search%msg_intersections%transient_out_id, &
00949                      PRISM_UNDEFINED, PRISM_UNDEFINED, idx1, ierror)
00950          !
00951          ! Send header to destination sources
00952          !
00953 #ifdef DEBUG
00954          print *, ' Posting send request to psmile rank ', rank_list(n)
00955 #endif /* DEBUG */
00956          call psmile_bsend ( extra_msg, n_extra, MPI_INTEGER, &
00957               rank_list(n), celltag+3, comm_psmile, ierror )
00958 
00959         if (ierror /= MPI_SUCCESS) then
00960             ierrp (1) = ierror
00961             ierrp (2) = rank_list(n)
00962             ierrp (3) = celltag+3
00963 
00964             ierror = PRISM_Error_Send
00965 
00966             call psmile_error (ierror, 'psmile_bsend(msg)', &
00967                                ierrp, 3, __FILE__, __LINE__ )
00968             return
00969          endif
00970 
00971 #ifdef DEBUGX
00972          do m = 1, nbr_tgt_cells(n)
00973             print '(a16,i6,a13,i4)', ' send tgt index ',  tgt_ibuf(n)%vector(2*m-1), &
00974                                      ' # src cells ',     tgt_ibuf(n)%vector(2*m  )
00975          enddo
00976 #endif /* DEBUGX */
00977          call psmile_bsend ( tgt_ibuf(n)%vector, 2*nbr_tgt_cells(n), &
00978               MPI_INTEGER, rank_list(n), celltag+4, comm_psmile, ierror )
00979 
00980          call psmile_bsend ( real_buf(n)%vector, 2*nbr_corners*nbr_src_cells(n), &
00981               MPI_REAL, rank_list(n), celltag+6, comm_psmile, ierror )
00982 
00983          if ( src_mask_available ) &
00984          call psmile_bsend ( log_buf(n)%vector, nbr_src_cells(n), &
00985               MPI_LOGICAL, rank_list(n), celltag+7, comm_psmile, ierror )
00986          !
00987          ! Free send memory
00988          !
00989          Deallocate ( tgt_ibuf(n)%vector, src_ibuf(n)%vector, real_buf(n)%vector, stat=ierror)
00990          if (ierror /= 0) then
00991             ierrp (1) = 2*nbr_tgt_cells(n)+(2*nbr_corners+1)*nbr_src_cells(n)
00992             ierror = PRISM_Error_Dealloc
00993             call psmile_error ( ierror, 'src/tgt buffer', ierrp, 1, &
00994                  __FILE__, __LINE__ )
00995             return
00996          endif
00997 
00998       enddo ! n = 1, nsend
00999 
01000       if ( nsend > 0 ) then
01001          Deallocate ( real_buf, src_ibuf, nbr_tgt_cells, nbr_src_cells, rank_list, stat=ierror)
01002          if (ierror /= 0) then
01003             ierrp (1) = 5*nsend
01004             ierror = PRISM_Error_Dealloc
01005             call psmile_error ( ierror, 'src_ibuf', ierrp, 1, &
01006                  __FILE__, __LINE__ )
01007             return
01008          endif
01009       endif
01010 !
01011 !-------------------------------------------------------------------------
01012 ! 4.) Receive data from the designated child source processes
01013 !-------------------------------------------------------------------------
01014 !
01015       extra_search%nrecv = 0
01016 
01017       if ( nrecv > 0 ) then
01018 
01019          extra_search%nrecv = nrecv
01020 
01021          Allocate (rextra_msg(n_extra,nrecv),      &
01022                    extra_search%dble_bufs(nrecv),  &
01023                    extra_search%log_bufs(nrecv),   &
01024                    lrequest(nrecv),                &
01025                    send_info%len_sent(nrecv),      &
01026                    send_info%sender_global(nrecv), &
01027                    send_info%msg_id(nrecv),        &
01028                    stat=ierror)
01029          if (ierror /= 0) then
01030             ierrp (1) = (n_extra+4)*nrecv
01031             ierror = PRISM_Error_Alloc
01032             call psmile_error ( ierror, 'rextra_msg, lrequest, ... ', ierrp, 1, &
01033                  __FILE__, __LINE__ )
01034             return
01035          endif
01036 
01037          lrequest = MPI_REQUEST_NULL
01038 
01039          do n = 1, nrecv
01040 
01041             call MPI_Irecv ( rextra_msg(:,n), n_extra, MPI_INTEGER, &
01042                  MPI_ANY_SOURCE, celltag+3, comm_psmile, lrequest(n), ierror )
01043 
01044             if ( ierror /= MPI_SUCCESS ) then
01045                ierrp (1) = ierror
01046                ierrp (2) = MPI_ANY_SOURCE
01047                ierrp (3) = celltag+3
01048 
01049                ierror = PRISM_Error_MPI
01050 
01051                call psmile_error ( ierror, 'MPI_Irecv', &
01052                     ierrp, 3, __FILE__, __LINE__ )
01053                return
01054             endif
01055 
01056          enddo
01057 
01058          do n = 1, nrecv
01059 
01060             call MPI_Waitany ( nrecv, lrequest, index, status, ierror )
01061 
01062             if ( ierror /= MPI_SUCCESS ) then
01063                ierrp (1) = ierror
01064                ierrp (2) = status (MPI_SOURCE)
01065                ierrp (3) = status (MPI_TAG)
01066 
01067                ierror = PRISM_Error_MPI
01068 
01069                call psmile_error ( ierror, 'MPI_Waitany', &
01070                     ierrp, 3, __FILE__, __LINE__ )
01071                return
01072             endif
01073 
01074             sender = status(MPI_SOURCE)
01075 #ifdef DEBUG
01076             print *, ' Request index ', index , ' received from rank ', sender
01077 #endif /* DEBUG */
01078 
01079             len = 2*rextra_msg(3,index)
01080 
01081             Allocate ( tgt_ibuf(index)%vector(len), stat = ierror)
01082             if (ierror /= 0) then
01083                ierrp (1) = len
01084                ierror = PRISM_Error_Alloc
01085                call psmile_error ( ierror, 'tgt_ibuf%vector', ierrp, 1, &
01086                     __FILE__, __LINE__ )
01087                return
01088             endif
01089 
01090             ! Receive transferred target cell indices
01091 
01092             call MPI_Recv ( tgt_ibuf(index)%vector, len, MPI_INTEGER, sender, &
01093                  celltag+4, comm_psmile, status, ierror )
01094 #ifdef DEBUGX
01095             do m = 1, rextra_msg(3,n)
01096                print '(a16,i6,a13,i4)', &
01097                      ' recv tgt index ', tgt_ibuf(index)%vector(2*m-1), &
01098                      ' # src cells ',    tgt_ibuf(index)%vector(2*m)
01099             enddo
01100 #endif /* DEBUGX */
01101 
01102             len = 2*rextra_msg(4,index)*rextra_msg(5,index)
01103 
01104             Allocate ( buf(len), stat=ierror)
01105             if (ierror /= 0) then
01106                ierrp (1) = len
01107                ierror = PRISM_Error_Alloc
01108                call psmile_error ( ierror, 'buf', ierrp, 1, &
01109                     __FILE__, __LINE__ )
01110                return
01111             endif
01112 
01113             ! Receive extra source cell coordinates
01114 
01115             call MPI_Recv ( buf, len, MPI_REAL, &
01116                  sender, celltag+6, comm_psmile, status, ierror )
01117 
01118             extra_search%real_bufs(index)%vector => buf
01119 
01120 #ifdef DEBUGX
01121             stride = rextra_msg(5,index)*rextra_msg(4,index)
01122 
01123             do m = 1, rextra_msg(5,index)
01124                do nc = 1, rextra_msg(4,index) ! nbr_corners
01125                   idx1 = (nc-1)*rextra_msg(5,index)+m
01126                   idx2 = stride + idx1
01127                   print *, ' receive corners ', m, &
01128                     extra_search%real_bufs(index)%vector(idx1), &
01129                     extra_search%real_bufs(index)%vector(idx2)
01130                end do
01131                print *
01132             enddo
01133 #endif /* DEBUGX */
01134 
01135             ! Receive extra mask values
01136 
01137             if ( rextra_msg(6,index) /= PRISM_UNDEFINED ) then
01138 
01139                len = rextra_msg(5,index)
01140                Allocate ( lbuf(len), stat=ierror)
01141                if (ierror /= 0) then
01142                   ierrp (1) = len
01143                   ierror = PRISM_Error_Alloc
01144                   call psmile_error ( ierror, 'buf', ierrp, 1, &
01145                        __FILE__, __LINE__ )
01146                   return
01147                endif
01148 #ifdef DEBUGX
01149                print *, ' receive extra mask values '
01150 #endif
01151                call MPI_Recv ( lbuf, len, MPI_LOGICAL, &
01152                       sender, celltag+7, comm_psmile, status, ierror )
01153 
01154 #ifdef DEBUGX
01155                print *, ' received extra mask values '
01156 #endif
01157                extra_search%log_bufs(index)%vector => lbuf
01158 
01159             endif
01160             !
01161             ! For collecting the data during the prism_put phase
01162             !
01163             send_info%nrecv                = send_info%nrecv + 1
01164             send_info%num2recv             = send_info%num2recv + rextra_msg(5,index)
01165             send_info%sender_global(index) = sender
01166             send_info%len_sent(index)      = rextra_msg(5,index)
01167             send_info%msg_id(index)        = rextra_msg(7,index)
01168 
01169          enddo ! loop over len pending Irecv requests
01170 
01171          !
01172          ! Add new cells to neighcells_3d array
01173          !
01174          len = sum(rextra_msg(3,:))
01175 
01176          Allocate(tgt_cell(len), stat=ierror)
01177          if (ierror /= 0) then
01178             ierrp (1) = len
01179             ierror = PRISM_Error_Alloc
01180             call psmile_error ( ierror, 'tgt_cell, ... ', ierrp, 1, &
01181                  __FILE__, __LINE__ )
01182             return
01183          endif
01184 
01185          mm = 0
01186          nn = 0
01187 
01188          do n = 1, nrecv
01189 
01190             mm = mm + rextra_msg(5,n)
01191 
01192             do m = 1, rextra_msg(3,n)
01193                tgt_cell (nn+m) = tgt_ibuf(n)%vector(2*m)
01194             enddo
01195 
01196             nn = nn + rextra_msg(3,n)
01197 
01198          enddo
01199 
01200          len = sum(rextra_msg(5,:))
01201 
01202          Allocate(tgt_list(len), tgt_index(len), stat=ierror)
01203          if (ierror /= 0) then
01204             ierrp (1) = 2*len
01205             ierror = PRISM_Error_Alloc
01206             call psmile_error ( ierror, 'tgt_list, ... ', ierrp, 1, &
01207                  __FILE__, __LINE__ )
01208             return
01209          endif
01210 
01211          nn = 1
01212 
01213          do n = 1, nrecv
01214             mm = 1
01215             do m = 1, rextra_msg(3,n)
01216                do l = 1, tgt_ibuf(n)%vector(2*m)
01217                   tgt_list (nn) = tgt_ibuf(n)%vector(2*m-1)
01218                   tgt_index(nn) = nn
01219                   nn = nn + 1
01220                   mm = mm + 1
01221                enddo
01222             enddo
01223          enddo
01224 
01225          if ( nrecv > 1 ) &
01226               call psmile_quicksort_index ( tgt_list, len, tgt_index )
01227 
01228       endif ! nrecv > 0
01229 
01230       if ( nrecv > 0 .or. nsend > 0 ) then
01231 
01232          len = nbr_cells_tot+send_info%num2recv ! nbr_cells_tot+sum(rextra_msg(5,:))
01233 
01234          if ( len > 0 ) then
01235 
01236          Allocate(new_neighcells_3d(ndim_3d,len,1), stat=ierror)
01237          if (ierror /= 0) then
01238             ierrp (1) = ndim_3d*len
01239             ierror = PRISM_Error_Alloc
01240             call psmile_error ( ierror, 'new_neighcells_3d', ierrp, 1, &
01241                  __FILE__, __LINE__ )
01242             return
01243          endif
01244 
01245          l  = 1
01246          mm = 0
01247          nn = 0
01248 
01249          if ( nrecv > 0 ) then
01250              len = sum(rextra_msg(5,:))
01251          else
01252              len = 0
01253          endif
01254 
01255          do n = 1, ncpl
01256 
01257             do idx1 = 1, new_nbr_cells(n)
01258                new_neighcells_3d(1,mm+idx1,1) = neighcells_3d(1,nn+idx1,1)
01259                new_neighcells_3d(2,mm+idx1,1) = neighcells_3d(2,nn+idx1,1)
01260                new_neighcells_3d(3,mm+idx1,1) = neighcells_3d(3,nn+idx1,1)
01261             enddo
01262 
01263             nn = nn + nbr_cells(n)
01264             mm = mm + new_nbr_cells(n)
01265 
01266             if ( l <= len ) then
01267 
01268                do while ( n == tgt_list(l) )
01269                   !
01270                   ! as the extra cells are stored in a simple list at the
01271                   ! end, only one index is required, stored in place 2.
01272                   ! place 1 is used as identifier (extra_search%global_marker), place 3
01273                   ! is not used.
01274                   !
01275                   mm = mm + 1
01276                   new_nbr_cells(n) = new_nbr_cells(n) + 1
01277                   new_neighcells_3d(1,mm,1) = extra_search%global_marker
01278                   new_neighcells_3d(2,mm,1) = tgt_index(l)
01279                   new_neighcells_3d(3,mm,1) = PSMILe_Undef
01280 
01281                   l = l + 1
01282 
01283                   if ( l > len ) exit
01284 
01285                enddo ! while
01286 
01287             endif ! l <= len
01288 
01289          enddo ! n = 1, ncpl
01290 
01291          Deallocate(neighcells_3d, stat=ierror)
01292          if (ierror /= 0) then
01293             ierrp (1) = ndim_3d*nbr_cells_tot
01294             ierror = PRISM_Error_Alloc
01295             call psmile_error ( ierror, 'neighcells_3d', ierrp, 1, &
01296                  __FILE__, __LINE__ )
01297             return
01298          endif
01299 
01300          neighcells_3d => new_neighcells_3d
01301 
01302          else
01303 
01304          Nullify(neighcells_3d)
01305 
01306          endif
01307 
01308       endif
01309       !
01310       ! Finally set nbr_cells to new values
01311       !
01312       nbr_cells = new_nbr_cells
01313 !
01314 !-------------------------------------------------------------------------
01315 ! 5.) Free memory
01316 !-------------------------------------------------------------------------
01317 !
01318       if ( max(nsend,nrecv) > 0 ) then
01319 
01320          ! In a loop over nsend the tgt_ibuf vector
01321          ! was already deallocated above.
01322 
01323          do n = 1, nrecv
01324             Deallocate (tgt_ibuf(n)%vector, stat=ierror)
01325             if (ierror /= 0) then
01326                ierrp (1) = PSMILe_Undef
01327                ierror = PRISM_Error_Dealloc
01328                call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
01329                     __FILE__, __LINE__ )
01330                return
01331             endif
01332          enddo
01333 
01334          Deallocate (tgt_ibuf, stat=ierror)
01335          if (ierror /= 0) then
01336             ierrp (1) =  max(nsend,nrecv)
01337             ierror = PRISM_Error_Dealloc
01338             call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
01339                  __FILE__, __LINE__ )
01340             return
01341          endif
01342 
01343       endif
01344 
01345       if ( nrecv > 0 ) then
01346          Deallocate(tgt_list, tgt_index, tgt_cell, stat=ierror)
01347          if (ierror /= 0) then
01348             ierrp (1) = 2*sum(rextra_msg(5,:)) + sum(rextra_msg(3,:))
01349             ierror = PRISM_Error_Alloc
01350             call psmile_error ( ierror, 'tgt_list, ... ', ierrp, 1, &
01351                  __FILE__, __LINE__ )
01352             return
01353          endif
01354 
01355          Deallocate(rextra_msg, lrequest, stat=ierror)
01356          if (ierror /= 0) then
01357             ierrp (1) = (n_extra+1)*nrecv
01358             ierror = PRISM_Error_Alloc
01359             call psmile_error ( ierror, 'rextra_msg, ... ', ierrp, 1, &
01360                  __FILE__, __LINE__ )
01361             return
01362          endif
01363       endif
01364 !
01365 !-------------------------------------------------------------------------
01366 ! 6.) All done
01367 !-------------------------------------------------------------------------
01368 !
01369 #ifdef VERBOSE
01370       print 9980, trim(ch_id), ierror
01371 
01372       call psmile_flushstd
01373 #endif /* VERBOSE */
01374 
01375 !
01376 !  Formats:
01377 !
01378 
01379 #ifdef VERBOSE
01380 
01381 9990 format (1x, a, ': psmile_global_search_cell_real: grid_id', i3, &
01382                     ' to ', i3, '(', i2, ')')
01383 9980 format (1x, a, ': psmile_global_search_cell_real: eof ierror =', i3)
01384 
01385 #endif /* VERBOSE */
01386 
01387       end subroutine PSMILe_global_search_cell_real

Generated on 18 Mar 2011 for Oasis4 by  doxygen 1.6.1