psmile_global_search_cell_dble.F90

Go to the documentation of this file.
00001 !-----------------------------------------------------------------------
00002 ! $COPYRIGHT_NLE-IT
00003 !-----------------------------------------------------------------------
00004 !BOP
00005 !
00006 ! !ROUTINE: PSMILe_global_search_cell_dble
00007 !
00008 ! !INTERFACE:
00009 
00010       subroutine psmile_global_search_cell_dble ( grid_id, var_id,  &
00011                   comp_info, send_info, search, extra_search, ncpl, &
00012                   nbr_cells, n_intmethods, interpolation_methods,   &
00013                   interpolation_search, ierror )
00014 !
00015 ! !USES:
00016 !
00017       use PRISM
00018 !
00019       use PSMILe, dummy_interface => PSMILe_global_search_cell_dble
00020 
00021       implicit none
00022 !
00023 ! !INPUT PARAMETERS:
00024 !
00025       Integer, Intent(In)                  :: grid_id
00026 
00027 !     grid id of source grid
00028 
00029       Integer, Intent(In)                  :: var_id
00030 
00031 !     field id of source data
00032 
00033       Type (Enddef_comp), Intent (In)      :: comp_info
00034 !
00035 !     Info on the component in which the donor cells
00036 !     should be searched.
00037 
00038       Type (Send_information), Intent (InOut) :: send_info
00039 !
00040 !     Metadata for target locations
00041 !
00042       Type (Enddef_search), Intent (InOut) :: search
00043 !
00044 !     Info's on coordinates to be searched
00045 !
00046       Type (Extra_search_info), Intent (InOut) :: extra_search
00047 !
00048 !     Info's on global coordinates to be searched
00049 !
00050       Integer, Intent (In)                 :: ncpl
00051 !
00052 !     Number of target cells treated on this source process
00053 !
00054       Integer, Intent (InOut)              :: nbr_cells(ncpl)
00055 !
00056 !     Number of source cells per target cell
00057 !
00058       Integer, Intent (In)                 :: n_intmethods
00059 !
00060 !     Number of interpolations methods used to interpolate the data.
00061 !     Dimension of vectors "interpolation_methods" and "interpolation_search".
00062 
00063       Integer, Intent (In)                 :: interpolation_methods (n_intmethods)
00064 !
00065 !     Codes for the interpolation methods
00066 !
00067       Logical, Intent (In)                 :: interpolation_search  (n_intmethods)
00068 !
00069 !     Is global search required for corresponding interpolation method ?
00070 !
00071 ! !OUTPUT PARAMETERS:
00072 !
00073       Integer, Intent (Out)                :: ierror
00074 
00075 !     Returns the error code of PSMILe_global_search_cell_dble;
00076 !             ierror = 0 : No error
00077 !             ierror > 0 : Severe error
00078 !
00079 ! !LOCAL PARAMETERS
00080 !
00081 !     ... for send to partners
00082 !
00083       type msg_extra_point_exchange
00084          integer :: interp_type,     
00085                     data_type,       
00086                     num_tgt_cells,   
00087                     num_src_corners, 
00088                     num_src_cells,   
00089                     mask_id,         
00090                     msg_id
00091       endtype
00092 
00093       Integer, Parameter                          :: n_extra = 7
00094       Integer, Parameter                          :: send_appl_index = 5
00095 
00096       Logical, Pointer                            :: lbuf (:)
00097       Double Precision, Pointer                   :: buf (:)
00098 
00099       Type (dble_vector_3d), Pointer              :: dble_buf(:)
00100       Type (integer_vector), Allocatable          :: tgt_ibuf(:)
00101       Type (integer_vector), Allocatable          :: src_ibuf(:)
00102       Type (logical_vector), Allocatable          :: log_buf(:)
00103 
00104       Type (Corner_Block), Pointer                :: cp
00105       Type (Method),       Pointer                :: mp
00106 
00107       Integer, Pointer                            :: new_neighcells_3d(:,:,:)
00108       Integer, Pointer                            :: neigh_3d(:,:)
00109       Logical, Pointer                            :: mask_array(:)
00110       Integer                                     :: mask_shape (2, ndim_3d)
00111 
00112       Type(msg_extra_point_exchange), Allocatable :: rextra_msg (:)
00113       Integer, Allocatable                        :: rextra_msg_buf(:,:)
00114       Integer, Allocatable                        :: nbr_tgt_cells(:)
00115       Integer, Allocatable                        :: nbr_src_cells(:)
00116       Integer, Allocatable                        :: rank_list(:)
00117       Integer, Allocatable                        :: cell_trans(:,:)
00118       Integer, Allocatable                        :: save_lreq(:)
00119       Integer, Allocatable                        :: lrequest(:)
00120       Integer, Allocatable                        :: tgt_list(:), tgt_index(:)
00121 
00122       Logical                                     :: src_mask_available
00123 
00124       Integer                                     :: tgt_idx
00125       Integer                                     :: new_nbr_cells(ncpl)
00126       Integer                                     :: mask_id
00127       Integer                                     :: nbr_corners
00128       Type (enddef_msg_locations)                 :: msg_locations
00129       Integer                                     :: msgloc (msgloc_size)
00130       Integer                                     :: extra_msg (n_extra)
00131       Integer                                     :: len
00132       Integer                                     :: lenx, leny, lenxy, m_lenx
00133       Integer                                     :: rank, idx, idx1, idx2, stride
00134       Integer                                     :: nbr_procs(2)
00135       Integer                                     :: interp
00136       Integer                                     :: dest
00137       Integer                                     :: sender
00138       Integer                                     :: status(MPI_STATUS_SIZE)
00139       Integer                                     :: nreq
00140       Integer                                     :: recv_req
00141       Integer                                     :: nrecv, nsend
00142       Integer                                     :: index
00143       Integer                                     :: nc
00144       Integer                                     :: l, i
00145       Integer                                     :: m, mm
00146       Integer                                     :: n, nn
00147 !
00148 !     ... for error handling
00149 !
00150       Integer, Parameter                          :: nerrp = 3
00151       Integer                                     :: ierrp (nerrp)
00152 !
00153 !----------------------------------------------------------------------
00154 !
00155 ! !DESCRIPTION:
00156 !
00157 ! Subroutine "PSMILe_global_search_cell_dble" collects source cells
00158 ! for target cells located on process boundaries of the source grid.
00159 ! Corresponding MPI calls are located in psmile_enddef_action_cell.
00160 !
00161 !   - exchange boundary cell info by sending back
00162 !     information to the source grid processes holding the
00163 !     respective target cells.
00164 !   - determine who is master source for the target cells
00165 !   - let the masters collect the data
00166 !   - store extra source cells on the master source processes
00167 !   - delete double entries on the non-master sides
00168 !
00169 ! !REVISION HISTORY:
00170 !
00171 !   Date      Programmer   Description
00172 ! ----------  ----------   -----------
00173 ! 13.08.08    R. Redler    created
00174 ! 08.11.06    R. Redler    revised
00175 ! 01.03.2011  M. Hanke     refactored for better readability
00176 !
00177 !EOP
00178 !----------------------------------------------------------------------
00179 !
00180 !  $Id: psmile_global_search_cell_dble.F90 3174 2011-05-04 09:41:43Z hanke $
00181 !  $Author: hanke $
00182 !
00183    Character(len=len_cvs_string), save :: mycvs = 
00184        '$Id: psmile_global_search_cell_dble.F90 3174 2011-05-04 09:41:43Z hanke $'
00185 !
00186 !----------------------------------------------------------------------
00187 !  1.) Initialization
00188 !----------------------------------------------------------------------
00189 !
00190 #ifdef VERBOSE
00191       print 9990, trim(ch_id), grid_id, search%msg_intersections%field_info%tgt_method_id, &
00192                                search%sender
00193 
00194       call psmile_flushstd
00195 #endif /* VERBOSE */
00196 !
00197       ierror = 0
00198 
00199       mask_id = Fields(var_id)%mask_id
00200 
00201       cp => Grids(grid_id)%corner_pointer
00202       mp => Methods(Fields(var_id)%method_id)
00203       nreq = paction%nreq
00204 
00205       src_mask_available = mask_id /= PRISM_UNDEFINED
00206 
00207       new_nbr_cells(:) = nbr_cells(:)
00208       send_info%nloc = ncpl
00209 
00210       if (src_mask_available) then
00211 #ifdef DEBUG
00212         print *, ' Source mask is available.'
00213 #endif  /* DEBUG */
00214          mask_array => Masks(mask_id)%mask_array
00215          mask_shape =  Masks(mask_id)%mask_shape
00216       endif
00217 
00218       if ( Grids(grid_id)%grid_type == PRISM_Irrlonlat_regvrt ) then
00219 
00220          nbr_corners = cp%nbr_corners/2
00221 
00222       else if ( Grids(grid_id)%grid_type == PRISM_Reglonlatvrt ) then
00223 
00224          nbr_corners = 2
00225 
00226       else if ( Grids(grid_id)%grid_type == PRISM_Gaussreduced_regvrt ) then
00227 
00228          nbr_corners = 2
00229 
00230       else
00231 
00232          ierrp (1) = search%search_data%grid_type
00233          ierror = PRISM_Error_Internal
00234          call psmile_error ( ierror, 'unsupported search grid type', &
00235               ierrp, 1, __FILE__, __LINE__ )
00236          return
00237 
00238       endif
00239 
00240       Allocate(save_lreq(nreq), stat=ierror)
00241          if (ierror /= 0) then
00242             ierrp (1) = nreq
00243             ierror = PRISM_Error_Alloc
00244             call psmile_error ( ierror, 'save_lreq', ierrp, 1, &
00245                  __FILE__, __LINE__ )
00246             return
00247          endif
00248 
00249 #ifdef PRISM_ASSERTION
00250       if (interpolation_methods(1) /= PSMILe_conserv2D ) then
00251          call psmile_assert (__FILE__, __LINE__, &
00252               "No global search for this interpolation method")
00253       endif
00254       if ( n_intmethods > 1 ) then
00255          call psmile_assert (__FILE__, __LINE__, &
00256               "2nd (vertical) interpolation method not yet supported.")
00257       endif
00258 #endif /* PRISM_ASSERTION */
00259       if (count(interpolation_search(:)) > 1) then
00260          ierrp (1) = count(interpolation_search(:))
00261 
00262          ierror = PRISM_Error_Internal
00263 
00264          call psmile_error ( ierror, 'Global search supported only for one interpolation method', &
00265               ierrp, 1, __FILE__, __LINE__ )
00266 
00267          return
00268       endif
00269 
00270       do interp = 1, n_intmethods
00271          if (interpolation_search(interp) ) exit
00272       end do
00273 
00274       if (interp > n_intmethods) then
00275          ierror = PRISM_Error_Internal
00276          ierrp (1) = n_intmethods
00277 
00278          call psmile_error ( ierror, 'No Global search for interpolation methods', &
00279               ierrp, 1, __FILE__, __LINE__ )
00280          return
00281       endif
00282 
00283       len = 0
00284 
00285       if ( associated(search%boundary_cell) ) then
00286 
00287          len = size(search%boundary_cell(:,1))
00288 
00289          Allocate(cell_trans(len,3), stat=ierror)
00290 
00291          if (ierror /= 0) then
00292             ierrp (1) = 3*len
00293             ierror = PRISM_Error_Alloc
00294             call psmile_error ( ierror, 'cell_trans', ierrp, 1, &
00295                  __FILE__, __LINE__ )
00296             return
00297          endif
00298 
00299       endif
00300 !
00301 !-------------------------------------------------------------------------
00302 ! 2.) Exchange data between source processes
00303 !-------------------------------------------------------------------------
00304 !
00305 !     Send back information about target cells located on source boundary
00306 !     to target process
00307 !
00308 !===> ... Send request to destination process
00309 !         msgloc is received by routine
00310 !         PSMILe_Enddef_action.
00311 !
00312 !
00313       call psmile_init_enddef_msg_locs (msg_locations)
00314 
00315       msg_locations%requires_conserv_remap = interpolation_methods (interp)
00316       msg_locations%src_rank               = psmile_rank
00317       msg_locations%msg_len                = len
00318       msg_locations%transi_in_id           = search%msg_intersections%field_info%transient_in_id
00319       msg_locations%transi_out_id          = search%msg_intersections%field_info%transient_out_id
00320 
00321       call psmile_pack_msg_locations (msg_locations, msgloc)
00322 
00323       dest = search%sender
00324 
00325 #ifdef DEBUG
00326       print *, ' Bsend tag ', loctag+search%msg_intersections%relative_msg_tag, ' to ', dest
00327 #endif /* DEBUG */
00328 
00329       call psmile_bsend ( msgloc, msgloc_size, MPI_INTEGER, dest,           &
00330                           loctag+search%msg_intersections%relative_msg_tag, &
00331                           comm_psmile, ierror )
00332 
00333       if ( ierror /= MPI_SUCCESS ) then
00334          ierrp (1) = ierror
00335          ierrp (2) = dest
00336          ierrp (3) = loctag+search%msg_intersections%relative_msg_tag
00337          ierror = PRISM_Error_Send
00338          call psmile_error ( ierror, 'psmile_bsend(msg)', &
00339                              ierrp, 3, __FILE__, __LINE__ )
00340          return
00341       endif
00342       !
00343       ! Transfer indices and information about those target cells
00344       !  which touch the boundary of this source domain, received
00345       !  by routine PSMILe_Enddef_action_cell.
00346       !
00347       if ( len > 0 ) then
00348 
00349          call insertsort_boundary_cells(search%boundary_cell, len)
00350 
00351          call psmile_bsend ( search%boundary_cell, 5*len, MPI_INTEGER, &
00352                              dest, celltag, comm_psmile, ierror )
00353       endif
00354 
00355       if ( ierror /= MPI_SUCCESS ) then
00356          ierrp (1) = ierror
00357          ierrp (2) = dest
00358          ierrp (3) = celltag
00359          ierror = PRISM_Error_Send
00360          call psmile_error ( ierror, 'psmile_bsend(msg)', &
00361                              ierrp, 3, __FILE__, __LINE__ )
00362          return
00363       endif
00364       !
00365       ! Receive the number of source processes to which the source
00366       ! has to transfer data, sent by routine PSMILe_Enddef_action_cell.
00367       !
00368       call MPI_Irecv ( nbr_procs, 2, MPI_INTEGER, &
00369                        dest, celltag+1, comm_psmile, recv_req, ierror )
00370 
00371       if ( ierror /= MPI_SUCCESS ) then
00372          ierrp (1) = ierror
00373          ierrp (2) = dest
00374          ierrp (3) = celltag+1
00375          ierror = PRISM_Error_Recv
00376          call psmile_error ( ierror, 'MPI_Recv', &
00377                              ierrp, 3, __FILE__, __LINE__ )
00378          return
00379       endif
00380 
00381 #ifdef DEBUG
00382       print *, ' Posted Irecv request ', recv_req, ' from ', dest
00383 #endif /* DEBUG */
00384 
00385       save_lreq = paction%lrequest
00386       !
00387       ! Give absolute priority to cell requests by temporarily deactivating
00388       ! all other pending request.
00389       !
00390       ! TODO: For the sake of performance it has to be checked whether there
00391       !       are any requests which can savely be handled in between without
00392       !       running into a deadlock.
00393       !
00394       paction%lrequest = MPI_REQUEST_NULL
00395 
00396       paction%lrequest (3) = recv_req
00397 
00398 !     Are there any requests that we should better deactivate temporaily?
00399 !
00400 !     paction%lrequest (?:?) = MPI_REQUEST_NULL
00401 !
00402       index = 0
00403 
00404       ! as long as we do not get the right answer
00405       do while ( index /= 3)
00406 
00407 #ifdef DEBUG
00408          do n = 1, nreq
00409            print *, ' paction%lrequest ', n, paction%lrequest(n)
00410          enddo
00411 #endif /* DEBUG */
00412 
00413          call MPI_Waitany ( nreq, paction%lrequest, index, status, ierror )
00414 
00415 #ifdef DEBUG
00416          print *, ' Processing with index ', index, ' from ', status(MPI_SOURCE)
00417 #endif /* DEBUG */
00418 
00419          if ( index /= 3 ) then
00420 
00421             call psmile_enddef_action (search, index, status, ierror )
00422 
00423          endif
00424 
00425       enddo
00426 
00427       paction%lrequest(3) = MPI_REQUEST_NULL
00428 
00429       nsend = nbr_procs(1)
00430       nrecv = nbr_procs(2)
00431 
00432 #ifdef DEBUG
00433       print *, ' This rank has to serve        ', nsend, ' source grid processes.'
00434       print *, ' This rank has to receive from ', nrecv, ' source grid processes.'
00435 #endif /* DEBUG */
00436 
00437       if ( max(nsend,nrecv) > 0 ) then
00438          Allocate ( tgt_ibuf (max(nsend,nrecv)), stat=ierror)
00439          if (ierror /= 0) then
00440             ierrp (1) = max(nsend,nrecv)
00441             ierror = PRISM_Error_Alloc
00442             call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
00443                  __FILE__, __LINE__ )
00444             return
00445          endif
00446       endif
00447 
00448       if ( nsend > 0 ) then
00449          Allocate(dble_buf      (nsend), &
00450                   log_buf       (nsend), &
00451                   src_ibuf      (nsend), &
00452                   nbr_tgt_cells (nsend), &
00453                   nbr_src_cells (nsend), &
00454                   rank_list     (nsend), stat=ierror)
00455          if (ierror /= 0) then
00456             ierrp (1) = 6*nsend
00457             ierror = PRISM_Error_Alloc
00458             call psmile_error ( ierror, 'nsend buffer', ierrp, 1, &
00459                  __FILE__, __LINE__ )
00460             return
00461          endif
00462 
00463          nbr_tgt_cells(:) =  0
00464          nbr_src_cells(:) =  0
00465          rank_list(:)     = -1
00466       endif
00467 
00468       !
00469       ! Receive the list of target indices and related information from
00470       ! target routine PSMILe_Enddef_action_cell for which source cells
00471       ! have to be transferred to other source processes.
00472       ! cell_trans target cell index on the sender source contains a
00473       ! sorted list with increasing order ranks stored in
00474       ! cell_trans(:,2)
00475       !
00476       ! cell_trans(:,1) : target cell index on the sender source
00477       ! cell_trans(:,2) : rank of receiver source process
00478       ! cell_trans(:,3) : target cell index on the receiver source
00479       !
00480 
00481 #ifdef DEBUGX
00482       print *, ' Receiving data ', len, ' records from ', dest
00483       if ( len == 0 ) &
00484          print *, ' Receive canceled because of zero message '
00485 #endif /* DEBUGX */
00486 
00487       if ( len > 0 ) then
00488          call MPI_Recv ( cell_trans, 3*len, MPI_INTEGER, &
00489                          dest, celltag+2, comm_psmile, status, ierror )
00490 
00491          if ( ierror /= MPI_SUCCESS ) then
00492             ierrp (1) = ierror
00493             ierrp (2) = dest
00494             ierrp (3) = celltag+2
00495             ierror = PRISM_Error_Recv
00496             call psmile_error ( ierror, 'MPI_Recv', &
00497                                 ierrp, 3, __FILE__, __LINE__ )
00498             return
00499          endif
00500       endif
00501       !
00502       ! Determine number of data to be transferred
00503       !   to each destination source
00504       !
00505       if ( nsend > 0 ) then
00506 
00507 #ifdef DEBUGX
00508          do n = 1, len
00509 !            if ( cell_trans(n,1) > -1 ) then
00510                print 9970, trim(ch_id), n, cell_trans(n,1), &
00511                     ' -> ', cell_trans(n,2), cell_trans(n,3)
00512 !            endif
00513          enddo
00514 9970     format (1x, a, ': ', i6, ' Transfer index ', i6, a4, i3, i6)
00515 #endif /* DEBUGX */
00516 
00517          nn = 1
00518          m  = 0
00519 
00520          rank_list(nn) = cell_trans(1,2)
00521 
00522          do n = 1, len
00523             if ( n > 1 ) then
00524                if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00525                   nn = nn + 1
00526                   rank_list(nn) = cell_trans(n,2)
00527                endif
00528             endif
00529             if ( cell_trans(n,2) == -1 ) exit
00530 
00531             tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1) ! local index of target cell
00532             nbr_tgt_cells(nn) = nbr_tgt_cells(nn) + 1
00533             nbr_src_cells(nn) = nbr_src_cells(nn) + nbr_cells(tgt_idx)
00534          enddo
00535 #ifdef DEBUG
00536          do n = 1, nsend
00537             print *, nbr_tgt_cells(n), ' target cell info to transfer.'
00538             print *, nbr_src_cells(n), ' source cells have to be transferred to rank ', rank_list(n)
00539          enddo
00540 #endif /* DEBUG */
00541 
00542 !
00543 !-------------------------------------------------------------------------
00544 ! 3.) Send data to the designated child source processes
00545 !-------------------------------------------------------------------------
00546 !
00547 ! TODO: Eliminate multiple entries from cell_trans array and create extra index.
00548 !
00549 !
00550          do n = 1, nsend
00551             Allocate ( tgt_ibuf(n)%vector(nbr_tgt_cells(n)*2), &
00552                        src_ibuf(n)%vector(nbr_src_cells(n)*3), &
00553                        log_buf(n)%vector(nbr_src_cells(n)),    &
00554                        dble_buf(n)%vector(nbr_src_cells(n),nbr_corners,2), stat=ierror )
00555             if (ierror /= 0) then
00556                ierrp (1) = 2*nbr_tgt_cells(n) + (2*nbr_corners+4)*nbr_src_cells(n)
00557                ierror = PRISM_Error_Alloc
00558                call psmile_error ( ierror, 'src/tgt_buf(n)%ibuf/dble_buf ', &
00559                   ierrp, 1, __FILE__, __LINE__ )
00560                return
00561             endif
00562          enddo
00563          !
00564          ! Fill in target index information
00565          !
00566          nn = 1
00567          m  = 0
00568 
00569          do n = 1, len
00570 
00571             if ( n > 1 ) then
00572                if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00573                   nn = nn + 1
00574                   m = 0
00575                endif
00576             endif
00577 
00578             if ( cell_trans(n,2) == -1 ) exit
00579 
00580             tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1) ! local index of target cell
00581             m = m+1
00582 
00583             tgt_ibuf(nn)%vector(2*m-1) = cell_trans(n,3)
00584             tgt_ibuf(nn)%vector(2*m  ) = nbr_cells(tgt_idx)
00585 
00586          enddo
00587 
00588          !
00589          ! Distribute information to the different send buffers
00590          !
00591 
00592          select case (Grids(grid_id)%grid_type)
00593 
00594          case (PRISM_Irrlonlat_regvrt)
00595 
00596             !
00597             ! Irrlonlat_regvrt source grids
00598             !
00599 
00600             lenx  = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00601             lenxy = (cp%corner_shape(2,2)-cp%corner_shape(1,2)+1) * lenx
00602 
00603             if (src_mask_available) then
00604                m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00605 #ifdef PRISM_ASSERTION
00606                if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00607                     cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00608                   call psmile_assert (__FILE__, __LINE__, &
00609                        "Mask and corners have to be of the same shape.")
00610                endif
00611                if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00612                   call psmile_assert (__FILE__, __LINE__, &
00613                        "3d masks not yet supported.")
00614                endif
00615 #endif /* PRISM_ASSERTION */
00616             endif
00617 
00618             l  = 0
00619             rank = 1
00620 
00621             do n = 1, len
00622 
00623                if ( n > 1 ) then
00624                   if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00625                      rank = rank + 1
00626                      l    = 0
00627                   endif
00628                endif
00629                if ( cell_trans(n,2) == -1 ) exit
00630 
00631                tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1)
00632 #ifdef DEBUGX
00633                print *, ' Corners for local target index ', tgt_idx
00634 #endif /* DEBUGX */
00635                nn = sum(nbr_cells(1:tgt_idx-1))
00636 
00637                do mm = 1, nbr_cells(tgt_idx)
00638 
00639                   idx = (neighcells_3d(2,nn+mm,1)-cp%corner_shape(1,2))*lenx + &
00640                          neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00641 
00642                   src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00643                   src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00644                   src_ibuf(rank)%vector(3*(l+mm)  ) = neighcells_3d(3,nn+mm,1)
00645 
00646                   do nc = 1, nbr_corners
00647                      dble_buf(rank)%vector(l+mm,nc,1) = cp%corners_dble(1)%vector((nc-1)*lenxy+idx)
00648                      dble_buf(rank)%vector(l+mm,nc,2) = cp%corners_dble(2)%vector((nc-1)*lenxy+idx)
00649 #ifdef DEBUGX
00650                      print *, ' send corners ', l+mm, dble_buf(rank)%vector(l+mm,nc,1), &
00651                                                       dble_buf(rank)%vector(l+mm,nc,2)
00652                   end do
00653 
00654                   print *, ' 2d indices   ', neighcells_3d(1,nn+mm,1), &
00655                                              neighcells_3d(2,nn+mm,1)
00656 #else
00657                   end do
00658 #endif /* DEBUGX */
00659                enddo ! mm
00660 
00661                if (src_mask_available) then
00662 
00663                   do mm = 1, nbr_cells(tgt_idx)
00664                      log_buf(rank)%vector(l+mm) =                                      &
00665                         mask_array((neighcells_3d(2,nn+mm,1)-mask_shape(1,2))*m_lenx + &
00666                                    neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1)
00667                   enddo
00668 #ifdef DEBUGX
00669                   print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00670 #endif
00671                endif
00672 
00673                ! Erase target cell entries and related info which
00674                ! have been transfered to a remote source process
00675 #ifdef DEBUGX
00676                print *, ' Erase target cell index ', tgt_idx
00677                print *
00678 #endif /* DEBUGX */
00679                new_nbr_cells(tgt_idx) = 0
00680                send_info%nloc = send_info%nloc - 1
00681 
00682                ! Increment indices
00683 
00684                l = l + nbr_cells(tgt_idx)
00685 
00686             enddo ! len
00687 
00688          case (PRISM_Reglonlatvrt)
00689 
00690             !
00691             ! Reglonlatvrt source grids
00692             !
00693 
00694             lenx = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00695             leny = (cp%corner_shape(2,2)-cp%corner_shape(1,2)+1)
00696 
00697             if (src_mask_available) then
00698                m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00699 #ifdef PRISM_ASSERTION
00700                if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00701                     cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00702                   call psmile_assert (__FILE__, __LINE__, &
00703                        "Mask and corners have to be of the same shape.")
00704                endif
00705                if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00706                   call psmile_assert (__FILE__, __LINE__, &
00707                        "3d masks not yet supported.")
00708                endif
00709 #endif /* PRISM_ASSERTION */
00710             endif
00711 
00712             l  = 0
00713             rank = 1
00714 
00715             do n = 1, len
00716 
00717                if ( n > 1 ) then
00718                   if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00719                      rank = rank + 1
00720                      l    = 0
00721                   endif
00722                endif
00723                if ( cell_trans(n,2) == -1 ) exit
00724 
00725                tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1)
00726 #ifdef DEBUGX
00727                print *, ' Corners for local target index ', tgt_idx
00728 #endif /* DEBUGX */
00729                nn = sum(nbr_cells(1:tgt_idx-1))
00730 
00731                do mm = 1, nbr_cells(tgt_idx)
00732 
00733                   src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00734                   src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00735                   src_ibuf(rank)%vector(3*(l+mm)  ) = neighcells_3d(3,nn+mm,1)
00736 
00737                   idx1 = neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00738                   idx2 = neighcells_3d(2,nn+mm,1)-cp%corner_shape(1,2)+1
00739 
00740                   do nc = 1, nbr_corners
00741                      dble_buf(rank)%vector(l+mm,nc,1) = cp%corners_dble(1)%vector((nc-1)*lenx+idx1)
00742                      dble_buf(rank)%vector(l+mm,nc,2) = cp%corners_dble(2)%vector((nc-1)*leny+idx2)
00743 #ifdef DEBUGX
00744                      print *, ' send corners ', l+mm, dble_buf(rank)%vector(l+mm,nc,1), &
00745                                                       dble_buf(rank)%vector(l+mm,nc,2)
00746                   end do
00747 
00748                   print *, ' 2d indices   ', neighcells_3d(1,nn+mm,1), &
00749                                              neighcells_3d(2,nn+mm,1)
00750 #else
00751                   end do
00752 #endif /* DEBUGX */
00753                enddo ! mm
00754 
00755                if (src_mask_available) then
00756 
00757                   do mm = 1, nbr_cells(tgt_idx)
00758                      log_buf(rank)%vector(l+mm) = &
00759                         mask_array((neighcells_3d(2,nn+mm,1)-mask_shape(1,2))*m_lenx + &
00760                                    neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1)
00761                   enddo
00762 #ifdef DEBUGX
00763                   print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00764 #endif
00765                endif
00766 
00767                ! Erase target cell entries and related info which
00768                ! have been transfered to a remote source process
00769 #ifdef DEBUGX
00770                print *, ' Erase target cell index ', tgt_idx
00771                print *
00772 #endif /* DEBUGX */
00773                new_nbr_cells(tgt_idx) = 0
00774                send_info%nloc = send_info%nloc - 1
00775 
00776                ! Increment indices
00777 
00778                l = l + nbr_cells(tgt_idx)
00779 
00780             enddo ! len
00781 
00782          case (PRISM_Gaussreduced_regvrt)
00783 
00784             !
00785             ! Gauss-reduced source grids
00786             !
00787 
00788             lenx = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00789 
00790             if (src_mask_available) then
00791                m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00792 #ifdef PRISM_ASSERTION
00793                if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00794                     cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00795                   call psmile_assert (__FILE__, __LINE__, &
00796                        "Mask and corners have to be of the same shape.")
00797                endif
00798                if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00799                   call psmile_assert (__FILE__, __LINE__, &
00800                        "3d masks not yet supported.")
00801                endif
00802 #endif /* PRISM_ASSERTION */
00803             endif
00804 
00805             l  = 0
00806             rank = 1
00807 
00808             ! for all local boundary cells
00809             do n = 1, len
00810 
00811                if ( n > 1 ) then
00812                   if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00813                      rank = rank + 1
00814                      l    = 0
00815                   endif
00816                endif
00817                if ( cell_trans(n,2) == -1 ) exit
00818 
00819                tgt_idx = cell_trans(n,1) ! search%boundary_cell(n,1)
00820 #ifdef DEBUGX
00821                print *, ' Corners for local target index ', tgt_idx
00822 #endif /* DEBUGX */
00823                nn = sum(nbr_cells(1:tgt_idx-1))
00824 
00825                do mm = 1, nbr_cells(tgt_idx)
00826 
00827                   src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00828                   src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00829                   src_ibuf(rank)%vector(3*(l+mm)  ) = neighcells_3d(3,nn+mm,1)
00830 
00831                   idx = neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00832 
00833                   do nc = 1, nbr_corners
00834                      dble_buf(rank)%vector(l+mm,nc,1) = cp%corners_dble(1)%vector((nc-1)*lenx+idx)
00835                      dble_buf(rank)%vector(l+mm,nc,2) = cp%corners_dble(2)%vector((nc-1)*lenx+idx)
00836 #ifdef DEBUGX
00837                      print *, ' send corners ', l+mm, dble_buf(rank)%vector(l+mm,nc,1), &
00838                                                       dble_buf(rank)%vector(l+mm,nc,2)
00839 #endif
00840                   end do
00841 #ifdef DEBUGX
00842                   print *,    ' local index', neighcells_3d(1,nn+mm,1)
00843 #endif
00844                enddo ! mm
00845 
00846                if (src_mask_available) then
00847 
00848                   do mm = 1, nbr_cells(tgt_idx)
00849                      log_buf(rank)%vector(l+mm) = &
00850                         mask_array(neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1)
00851                   enddo
00852 #ifdef DEBUGX
00853                   print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00854 #endif
00855                endif
00856 
00857                ! Erase target cell entries and related info which
00858                ! have been transfered to a remote source process
00859 #ifdef DEBUGX
00860                print *, ' Erase target cell index ', tgt_idx
00861                print *
00862 #endif /* DEBUGX */
00863                new_nbr_cells(tgt_idx) = 0
00864                send_info%nloc = send_info%nloc - 1
00865 
00866                ! Increment indices
00867 
00868                l = l + nbr_cells(tgt_idx)
00869 
00870             enddo ! len
00871 
00872          case DEFAULT
00873 
00874             ierrp (1) = search%search_data%grid_type
00875             ierror = PRISM_Error_Internal
00876             call psmile_error ( ierror, 'unsupported search grid type', &
00877                  ierrp, 1, __FILE__, __LINE__ )
00878             return
00879 
00880          end select
00881       endif ! nsend > 0
00882       !
00883       ! Reset requests and deallocate memory
00884       !
00885 !rr   paction%lrequest(2:3) = save_lreq(2:3)
00886 
00887       paction%lrequest = save_lreq
00888 
00889       if ( nsend > nreq ) then
00890          Deallocate (save_lreq, stat=ierror)
00891          if (ierror /= 0) then
00892             ierrp (1) = nreq
00893             ierror = PRISM_Error_Dealloc
00894             call psmile_error ( ierror, 'save_lreq', ierrp, 1, &
00895                  __FILE__, __LINE__ )
00896             return
00897          endif
00898       endif
00899       !
00900       ! Send data away to the designated parent source processes
00901       !
00902       do n = 1, nsend
00903          !
00904          ! Prepare header message to be send to destination source process.
00905          !
00906          ! (1) = interpolation type
00907          ! (2) = data type
00908          ! (3) = number of target cells to be treated
00909          ! (4) = number of source cell corners
00910          ! (5) = number of source cells
00911          !
00912          extra_msg(1) = interpolation_methods(1)
00913          extra_msg(2) = PRISM_DOUBLE_PRECISION
00914          extra_msg(3) = nbr_tgt_cells(n)
00915          extra_msg(4) = nbr_corners
00916          extra_msg(5) = nbr_src_cells(n)
00917          extra_msg(6) = mask_id
00918          !
00919          ! This should be sufficient for the later operations during the prism_put phase
00920          ! -> see psmile_put_field_dble and psmile_extract_indices_3d_dble
00921          !
00922          call psmile_get_info_index (Fields(var_id)%method_id, send_appl_index, &
00923                                      idx, ierror)
00924          if (ierror > 0) return
00925 
00926          extra_msg(7) = mp%send_infos_appl(idx)%msg_id
00927 
00928          mp%send_infos_appl(idx)%dest   = rank_list(n)
00929          mp%send_infos_appl(idx)%nloc   = nbr_src_cells(n)
00930 
00931          allocate(neigh_3d(3,nbr_src_cells(n)), stat=ierror)
00932          if (ierror /= 0) then
00933             ierrp (1) = 3*nbr_src_cells(n)
00934             ierror = PRISM_Error_Alloc
00935             call psmile_error ( ierror, 'neigh_3d', ierrp, 1, &
00936                  __FILE__, __LINE__ )
00937             return
00938          endif
00939 
00940          do m = 1, nbr_src_cells(n)
00941             neigh_3d(1,m) = src_ibuf(n)%vector(3*m-2)
00942             neigh_3d(2,m) = src_ibuf(n)%vector(3*m-1)
00943             neigh_3d(3,m) = src_ibuf(n)%vector(3*m  )
00944          enddo
00945 
00946          mp%send_infos_appl(idx)%neigh_3d => neigh_3d
00947 
00948          call psmile_store_send_info (var_id,                              &
00949                      search%msg_intersections%field_info%transient_out_id, &
00950                      PRISM_UNDEFINED, PRISM_UNDEFINED, idx, ierror)
00951          !
00952          ! Send header to destination sources
00953          !
00954 #ifdef DEBUG
00955          print *, ' Posting send request to psmile rank ', rank_list(n)
00956 #endif /* DEBUG */
00957          call psmile_bsend ( extra_msg, n_extra, MPI_INTEGER, &
00958               rank_list(n), celltag+3, comm_psmile, ierror )
00959 
00960         if (ierror /= MPI_SUCCESS) then
00961             ierrp (1) = ierror
00962             ierrp (2) = rank_list(n)
00963             ierrp (3) = celltag+3
00964 
00965             ierror = PRISM_Error_Send
00966 
00967             call psmile_error (ierror, 'psmile_bsend(msg)', &
00968                                ierrp, 3, __FILE__, __LINE__ )
00969             return
00970          endif
00971 
00972 #ifdef DEBUGX
00973          do m = 1, nbr_tgt_cells(n)
00974             print '(a16,i6,a13,i4)', ' send tgt index ',  tgt_ibuf(n)%vector(2*m-1), &
00975                                      ' # src cells ',     tgt_ibuf(n)%vector(2*m  )
00976          enddo
00977 #endif /* DEBUGX */
00978          call psmile_bsend ( tgt_ibuf(n)%vector, 2*nbr_tgt_cells(n), &
00979               MPI_INTEGER, rank_list(n), celltag+4, comm_psmile, ierror )
00980 
00981          call psmile_bsend ( dble_buf(n)%vector, 2*nbr_corners*nbr_src_cells(n), &
00982               MPI_DOUBLE_PRECISION, rank_list(n), celltag+6, comm_psmile, ierror )
00983 
00984          if ( src_mask_available ) &
00985          call psmile_bsend ( log_buf(n)%vector, nbr_src_cells(n), &
00986               MPI_LOGICAL, rank_list(n), celltag+7, comm_psmile, ierror )
00987          !
00988          ! Free send memory
00989          !
00990          Deallocate ( tgt_ibuf(n)%vector, src_ibuf(n)%vector, dble_buf(n)%vector, stat=ierror)
00991          if (ierror /= 0) then
00992             ierrp (1) = 2*nbr_tgt_cells(n)+(2*nbr_corners+1)*nbr_src_cells(n)
00993             ierror = PRISM_Error_Dealloc
00994             call psmile_error ( ierror, 'src/tgt buffer', ierrp, 1, &
00995                  __FILE__, __LINE__ )
00996             return
00997          endif
00998 
00999       enddo ! n = 1, nsend
01000 
01001       if ( nsend > 0 ) then
01002          Deallocate ( dble_buf, src_ibuf, nbr_tgt_cells, nbr_src_cells, rank_list, stat=ierror)
01003          if (ierror /= 0) then
01004             ierrp (1) = 5*nsend
01005             ierror = PRISM_Error_Dealloc
01006             call psmile_error ( ierror, 'src_ibuf', ierrp, 1, &
01007                  __FILE__, __LINE__ )
01008             return
01009          endif
01010       endif
01011 !
01012 !-------------------------------------------------------------------------
01013 ! 4.) Receive data from the designated child source processes
01014 !-------------------------------------------------------------------------
01015 !
01016       extra_search%nrecv = 0
01017 
01018       if ( nrecv > 0 ) then
01019 
01020          extra_search%nrecv = nrecv
01021 
01022          Allocate (rextra_msg(nrecv),              &
01023                    rextra_msg_buf(n_extra,nrecv),  &
01024                    extra_search%dble_bufs(nrecv),  &
01025                    extra_search%log_bufs(nrecv),   &
01026                    lrequest(nrecv),                &
01027                    send_info%len_sent(nrecv),      &
01028                    send_info%sender_global(nrecv), &
01029                    send_info%msg_id(nrecv),        &
01030                    stat=ierror)
01031          if (ierror /= 0) then
01032             ierrp (1) = (n_extra+4)*nrecv
01033             ierror = PRISM_Error_Alloc
01034             call psmile_error ( ierror, 'rextra_msg, lrequest, ... ', ierrp, 1, &
01035                  __FILE__, __LINE__ )
01036             return
01037          endif
01038 
01039          lrequest = MPI_REQUEST_NULL
01040 
01041          do n = 1, nrecv
01042 
01043             call MPI_Irecv ( rextra_msg_buf(:,n), n_extra, MPI_INTEGER, &
01044                  MPI_ANY_SOURCE, celltag+3, comm_psmile, lrequest(n), ierror )
01045 
01046             if ( ierror /= MPI_SUCCESS ) then
01047                ierrp (1) = ierror
01048                ierrp (2) = MPI_ANY_SOURCE
01049                ierrp (3) = celltag+3
01050 
01051                ierror = PRISM_Error_MPI
01052 
01053                call psmile_error ( ierror, 'MPI_Irecv', &
01054                     ierrp, 3, __FILE__, __LINE__ )
01055                return
01056             endif
01057 
01058          enddo
01059 
01060          do n = 1, nrecv
01061 
01062             call MPI_Waitany ( nrecv, lrequest, index, status, ierror )
01063 
01064             if ( ierror /= MPI_SUCCESS ) then
01065                ierrp (1) = ierror
01066                ierrp (2) = status (MPI_SOURCE)
01067                ierrp (3) = status (MPI_TAG)
01068 
01069                ierror = PRISM_Error_MPI
01070 
01071                call psmile_error ( ierror, 'MPI_Waitany', &
01072                     ierrp, 3, __FILE__, __LINE__ )
01073                return
01074             endif
01075 
01076             sender = status(MPI_SOURCE)
01077 #ifdef DEBUG
01078             print *, ' Request index ', index , ' received from rank ', sender
01079 #endif /* DEBUG */
01080 
01081             rextra_msg(index)%interp_type     = rextra_msg_buf(1,index)
01082             rextra_msg(index)%data_type       = rextra_msg_buf(2,index)
01083             rextra_msg(index)%num_tgt_cells   = rextra_msg_buf(3,index)
01084             rextra_msg(index)%num_src_corners = rextra_msg_buf(4,index)
01085             rextra_msg(index)%num_src_cells   = rextra_msg_buf(5,index)
01086             rextra_msg(index)%mask_id         = rextra_msg_buf(6,index)
01087             rextra_msg(index)%msg_id          = rextra_msg_buf(7,index)
01088 
01089             len = 2 * rextra_msg(index)%num_tgt_cells
01090 
01091             Allocate ( tgt_ibuf(index)%vector(len), stat = ierror)
01092             if (ierror /= 0) then
01093                ierrp (1) = len
01094                ierror = PRISM_Error_Alloc
01095                call psmile_error ( ierror, 'tgt_ibuf%vector', ierrp, 1, &
01096                     __FILE__, __LINE__ )
01097                return
01098             endif
01099 
01100             ! Receive transferred target cell indices
01101 
01102             call MPI_Recv ( tgt_ibuf(index)%vector, len, MPI_INTEGER, sender, &
01103                  celltag+4, comm_psmile, status, ierror )
01104 #ifdef DEBUGX
01105             do m = 1, rextra_msg(index)%num_tgt_cells
01106                print '(a16,i6,a13,i4)', &
01107                      ' recv tgt index ', tgt_ibuf(index)%vector(2*m-1), &
01108                      ' # src cells ',    tgt_ibuf(index)%vector(2*m)
01109             enddo
01110 #endif /* DEBUGX */
01111 
01112             len = 2 * rextra_msg(index)%num_src_corners * &
01113                       rextra_msg(index)%num_src_cells
01114 
01115             Allocate ( buf(len), stat=ierror)
01116             if (ierror /= 0) then
01117                ierrp (1) = len
01118                ierror = PRISM_Error_Alloc
01119                call psmile_error ( ierror, 'buf', ierrp, 1, &
01120                     __FILE__, __LINE__ )
01121                return
01122             endif
01123 
01124             ! Receive extra source cell coordinates
01125 
01126             call MPI_Recv ( buf, len, MPI_DOUBLE_PRECISION, &
01127                  sender, celltag+6, comm_psmile, status, ierror )
01128 
01129             extra_search%dble_bufs(index)%vector => buf
01130 
01131 #ifdef DEBUGX
01132             stride = rextra_msg(index)%num_src_cells * &
01133                      rextra_msg(index)%num_src_corners
01134 
01135             m = 1
01136             do i = 1, rextra_msg(index)%num_tgt_cells
01137                do nn = 1, tgt_ibuf(index)%vector(2*i)
01138                   do nc = 1, rextra_msg(index)%num_src_corners
01139                      idx1 = (nc-1)*rextra_msg(index)%num_src_cells+m
01140                      idx2 = stride + idx1
01141                      print *, ' receive corners ', m, 'for tgt',   &
01142                        tgt_ibuf(index)%vector(2*i-1),              &
01143                        extra_search%dble_bufs(index)%vector(idx1), &
01144                        extra_search%dble_bufs(index)%vector(idx2)
01145                   end do
01146                   m = m + 1
01147                enddo ! nn
01148             enddo ! i
01149 #endif /* DEBUGX */
01150 
01151             ! Receive extra mask values
01152 
01153             if ( rextra_msg(index)%mask_id /= PRISM_UNDEFINED ) then
01154 
01155                len = rextra_msg(index)%num_src_cells
01156                Allocate ( lbuf(len), stat=ierror)
01157                if (ierror /= 0) then
01158                   ierrp (1) = len
01159                   ierror = PRISM_Error_Alloc
01160                   call psmile_error ( ierror, 'buf', ierrp, 1, &
01161                        __FILE__, __LINE__ )
01162                   return
01163                endif
01164 #ifdef DEBUGX
01165                print *, ' receive extra mask values '
01166 #endif
01167                call MPI_Recv ( lbuf, len, MPI_LOGICAL, &
01168                       sender, celltag+7, comm_psmile, status, ierror )
01169 
01170 #ifdef DEBUGX
01171                print *, ' received extra mask values '
01172 #endif
01173                extra_search%log_bufs(index)%vector => lbuf
01174 
01175             endif
01176             !
01177             ! For collecting the data during the prism_put phase
01178             !
01179             send_info%nrecv                = send_info%nrecv + 1
01180             send_info%num2recv             = send_info%num2recv + &
01181                                              rextra_msg(index)%num_src_cells
01182             send_info%sender_global(index) = sender
01183             send_info%len_sent(index)      = rextra_msg(index)%num_src_cells
01184             send_info%msg_id(index)        = rextra_msg(index)%msg_id
01185 
01186          enddo ! loop over len pending Irecv requests
01187 !
01188 !-------------------------------------------------------------------------
01189 ! 5.) Add new cells to neighcells_3d array and/or remote cell that are
01190 !     sent to other source processes
01191 !-------------------------------------------------------------------------
01192 !
01193          len = sum(rextra_msg(:)%num_src_cells)
01194 
01195          ! tgt_list  = position of a source cell (received from a remote process)
01196          !             in the extra_search%dble_bufs(index)%vector array
01197          ! tgt_index = local tgt cell associated to the respective cell in tgt_list
01198          Allocate(tgt_list(len), tgt_index(len), stat=ierror)
01199          if (ierror /= 0) then
01200             ierrp (1) = 2 * len
01201             ierror = PRISM_Error_Alloc
01202             call psmile_error ( ierror, 'tgt_list, ... ', ierrp, 1, &
01203                  __FILE__, __LINE__ )
01204             return
01205          endif
01206 
01207          nn = 1
01208 
01209          do n = 1, nrecv
01210             mm = 1
01211             do m = 1, rextra_msg(n)%num_tgt_cells
01212                do l = 1, tgt_ibuf(n)%vector(2*m)
01213                   tgt_list (nn) = tgt_ibuf(n)%vector(2*m-1)
01214                   tgt_index(nn) = nn
01215                   nn = nn + 1
01216                   mm = mm + 1
01217                enddo
01218             enddo
01219          enddo
01220 
01221          call psmile_quicksort_index ( tgt_list, len, tgt_index )
01222 
01223       endif ! nrecv > 0
01224 
01225       ! if we are going to receive and/or send points adjust neighcells_3d array
01226       if ( nrecv > 0 .or. nsend > 0 ) then
01227 
01228          ! new length is old size - cells send to other source processes + 
01229          ! new source cells from the remote source processes
01230          len = sum(new_nbr_cells(:))
01231          if (nrecv > 0) len = len + sum(rextra_msg(:)%num_src_cells)
01232 
01233          if ( len > 0 ) then
01234 
01235             Allocate(new_neighcells_3d(ndim_3d,len,1), stat=ierror)
01236             if (ierror /= 0) then
01237                ierrp (1) = ndim_3d*len
01238                ierror = PRISM_Error_Alloc
01239                call psmile_error ( ierror, 'new_neighcells_3d', ierrp, 1, &
01240                     __FILE__, __LINE__ )
01241                return
01242             endif
01243 
01244             l  = 1
01245             mm = 0
01246             nn = 0
01247 
01248             if ( nrecv > 0 ) then
01249                 len = sum(rextra_msg(:)%num_src_cells)
01250             else
01251                 len = 0
01252             endif
01253 
01254             do n = 1, ncpl
01255 
01256                ! copy original values to new neighcells_3d array
01257                do i = 1, new_nbr_cells(n)
01258                   new_neighcells_3d(1,mm+i,1) = neighcells_3d(1,nn+i,1)
01259                   new_neighcells_3d(2,mm+i,1) = neighcells_3d(2,nn+i,1)
01260                   new_neighcells_3d(3,mm+i,1) = neighcells_3d(3,nn+i,1)
01261                enddo
01262 
01263                nn = nn + nbr_cells(n)
01264                mm = mm + new_nbr_cells(n)
01265 
01266                ! if there are no new source cell from other remote source processes left
01267                if ( l > len ) cycle
01268 
01269                ! copy the new source cell indices from the remote process
01270                ! into the new neighcells_3d array
01271                do while ( n == tgt_list(l) )
01272                   !
01273                   ! as the extra cells are stored in a simple list at the
01274                   ! end, only one index is required, stored in place 2.
01275                   ! place 1 is used as identifier (extra_search%global_marker), place 3
01276                   ! is not used.
01277                   !
01278                   mm = mm + 1
01279                   new_nbr_cells(n) = new_nbr_cells(n) + 1
01280                   new_neighcells_3d(1,mm,1) = extra_search%global_marker
01281                   new_neighcells_3d(2,mm,1) = tgt_index(l)
01282                   new_neighcells_3d(3,mm,1) = PSMILe_Undef
01283 
01284                   l = l + 1
01285 
01286                   if ( l > len ) exit
01287 
01288                enddo ! while ( n == tgt_list(l) )
01289             enddo ! n = 1, ncpl
01290 
01291             Deallocate(neighcells_3d, stat=ierror)
01292             if (ierror /= 0) then
01293                ierrp (1) = ndim_3d*sum(nbr_cells(:))
01294                ierror = PRISM_Error_Alloc
01295                call psmile_error ( ierror, 'neighcells_3d', ierrp, 1, &
01296                     __FILE__, __LINE__ )
01297                return
01298             endif
01299 
01300             neighcells_3d => new_neighcells_3d
01301 
01302          else
01303 
01304             Nullify(neighcells_3d)
01305 
01306          endif ! ( len > 0 )
01307 
01308       endif ! ( nrecv > 0 .or. nsend > 0 )
01309       !
01310       ! Finally set nbr_cells to new values
01311       !
01312       nbr_cells(:) = new_nbr_cells(:)
01313 
01314 !
01315 !-------------------------------------------------------------------------
01316 ! 6.) Free memory
01317 !-------------------------------------------------------------------------
01318 !
01319       if ( max(nsend,nrecv) > 0 ) then
01320 
01321          ! In a loop over nsend the tgt_ibuf vector
01322          ! was already deallocated above.
01323 
01324          do n = 1, nrecv
01325             Deallocate (tgt_ibuf(n)%vector, stat=ierror)
01326             if (ierror /= 0) then
01327                ierrp (1) = PSMILe_Undef
01328                ierror = PRISM_Error_Dealloc
01329                call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
01330                     __FILE__, __LINE__ )
01331                return
01332             endif
01333          enddo
01334 
01335          Deallocate (tgt_ibuf, stat=ierror)
01336          if (ierror /= 0) then
01337             ierrp (1) =  max(nsend,nrecv)
01338             ierror = PRISM_Error_Dealloc
01339             call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
01340                  __FILE__, __LINE__ )
01341             return
01342          endif
01343 
01344       endif
01345 
01346       if ( nrecv > 0 ) then
01347          Deallocate(tgt_list, tgt_index, stat=ierror)
01348          if (ierror /= 0) then
01349             ierrp (1) = 2*sum(rextra_msg(:)%num_src_cells) + sum(rextra_msg(:)%num_tgt_cells)
01350             ierror = PRISM_Error_Alloc
01351             call psmile_error ( ierror, 'tgt_list, ... ', ierrp, 1, &
01352                  __FILE__, __LINE__ )
01353             return
01354          endif
01355 
01356          Deallocate(rextra_msg, rextra_msg_buf, lrequest, stat=ierror)
01357          if (ierror /= 0) then
01358             ierrp (1) = (n_extra+1)*nrecv+n_extra
01359             ierror = PRISM_Error_Alloc
01360             call psmile_error ( ierror, 'rextra_msg, ... ', ierrp, 1, &
01361                  __FILE__, __LINE__ )
01362             return
01363          endif
01364       endif
01365 !
01366 !-------------------------------------------------------------------------
01367 ! 7.) All done
01368 !-------------------------------------------------------------------------
01369 !
01370 #ifdef VERBOSE
01371       print 9980, trim(ch_id), ierror
01372 
01373       call psmile_flushstd
01374 #endif /* VERBOSE */
01375 
01376 !
01377 !  Formats:
01378 !
01379 
01380 #ifdef VERBOSE
01381 
01382 9990 format (1x, a, ': psmile_global_search_cell_dble: grid_id', i3, &
01383                     ' to ', i3, '(', i2, ')')
01384 9980 format (1x, a, ': psmile_global_search_cell_dble: eof ierror =', i3)
01385 
01386 #endif /* VERBOSE */
01387 
01388       contains
01389       
01390          subroutine insertsort_boundary_cells(boundary_cells, num_cells)
01391 
01392             implicit none
01393 
01394             integer, intent(in)    :: num_cells
01395             integer, intent(inout) :: boundary_cells(num_cells, 5)
01396 
01397             integer :: i, j, tmp_cell(5)
01398 
01399             do i = 2, num_cells
01400                tmp_cell(:) = boundary_cells(i, :)
01401                j = i
01402                do while (tmp_cell(4) <  boundary_cells(j-1, 4) .or.  &
01403                          tmp_cell(4) == boundary_cells(j-1, 4) .and. &
01404                          tmp_cell(3) <  boundary_cells(j-1, 3) .or.  &
01405                          tmp_cell(4) == boundary_cells(j-1, 4) .and. &
01406                          tmp_cell(3) == boundary_cells(j-1, 3) .and. &
01407                          tmp_cell(2) <  boundary_cells(j-1, 2))
01408 
01409                   boundary_cells(j, :) = boundary_cells(j-1, :)
01410                   j = j-1
01411                   if (j == 1) exit
01412                enddo
01413 
01414                boundary_cells(j, :) = tmp_cell
01415             enddo ! i
01416          end subroutine insertsort_boundary_cells
01417 
01418       end subroutine PSMILe_global_search_cell_dble

Generated on 1 Dec 2011 for Oasis4 by  doxygen 1.6.1