00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 subroutine psmile_global_search_cell_real ( grid_id, var_id, &
00011 comp_info, send_info, search, extra_search, ncpl, &
00012 nbr_cells, n_intmethods, interpolation_methods, &
00013 interpolation_search, ierror )
00014
00015
00016
00017 use PRISM
00018
00019 use PSMILe, dummy_interface => PSMILe_global_search_cell_real
00020
00021 implicit none
00022
00023
00024
00025 Integer, Intent(In) :: grid_id
00026
00027
00028
00029 Integer, Intent(In) :: var_id
00030
00031
00032
00033 Type (Enddef_comp), Intent (In) :: comp_info
00034
00035
00036
00037
00038 Type (Send_information), Intent (InOut) :: send_info
00039
00040
00041
00042 Type (Enddef_search), Intent (InOut) :: search
00043
00044
00045
00046 Type (Extra_search_info), Intent (InOut) :: extra_search
00047
00048
00049
00050 Integer, Intent (In) :: ncpl
00051
00052
00053
00054 Integer, Intent (InOut) :: nbr_cells(ncpl)
00055
00056
00057
00058 Integer, Intent (In) :: n_intmethods
00059
00060
00061
00062
00063 Integer, Intent (In) :: interpolation_methods (n_intmethods)
00064
00065
00066
00067 Logical, Intent (In) :: interpolation_search (n_intmethods)
00068
00069
00070
00071
00072
00073 Integer, Intent (Out) :: ierror
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 type msg_extra_point_exchange
00084 integer :: interp_type,
00085 data_type,
00086 num_tgt_cells,
00087 num_src_corners,
00088 num_src_cells,
00089 mask_id,
00090 msg_id
00091 endtype
00092
00093 Integer, Parameter :: n_extra = 7
00094 Integer, Parameter :: send_appl_index = 5
00095
00096 Logical, Pointer :: lbuf (:)
00097 Real, Pointer :: buf (:)
00098
00099 Type (real_vector_3d), Pointer :: real_buf(:)
00100 Type (integer_vector), Allocatable :: tgt_ibuf(:)
00101 Type (integer_vector), Allocatable :: src_ibuf(:)
00102 Type (logical_vector), Allocatable :: log_buf(:)
00103
00104 Type (Corner_Block), Pointer :: cp
00105 Type (Method), Pointer :: mp
00106
00107 Integer, Pointer :: new_neighcells_3d(:,:,:)
00108 Integer, Pointer :: neigh_3d(:,:)
00109 Logical, Pointer :: mask_array(:)
00110 Integer :: mask_shape (2, ndim_3d)
00111
00112 Type(msg_extra_point_exchange), Allocatable :: rextra_msg (:)
00113 Integer, Allocatable :: rextra_msg_buf(:,:)
00114 Integer, Allocatable :: nbr_tgt_cells(:)
00115 Integer, Allocatable :: nbr_src_cells(:)
00116 Integer, Allocatable :: rank_list(:)
00117 Integer, Allocatable :: cell_trans(:,:)
00118 Integer, Allocatable :: save_lreq(:)
00119 Integer, Allocatable :: lrequest(:)
00120 Integer, Allocatable :: tgt_list(:), tgt_index(:)
00121
00122 Logical :: src_mask_available
00123
00124 Integer :: tgt_idx
00125 Integer :: new_nbr_cells(ncpl)
00126 Integer :: mask_id
00127 Integer :: nbr_corners
00128 Type (enddef_msg_locations) :: msg_locations
00129 Integer :: msgloc (msgloc_size)
00130 Integer :: extra_msg (n_extra)
00131 Integer :: len
00132 Integer :: lenx, leny, lenxy, m_lenx
00133 Integer :: rank, idx, idx1, idx2, stride
00134 Integer :: nbr_procs(2)
00135 Integer :: interp
00136 Integer :: dest
00137 Integer :: sender
00138 Integer :: status(MPI_STATUS_SIZE)
00139 Integer :: nreq
00140 Integer :: recv_req
00141 Integer :: nrecv, nsend
00142 Integer :: index
00143 Integer :: nc
00144 Integer :: l, i
00145 Integer :: m, mm
00146 Integer :: n, nn
00147
00148
00149
00150 Integer, Parameter :: nerrp = 3
00151 Integer :: ierrp (nerrp)
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180
00181
00182
00183 Character(len=len_cvs_string), save :: mycvs =
00184 '$Id: psmile_global_search_cell_real.F90 3174 2011-05-04 09:41:43Z hanke $'
00185
00186
00187
00188
00189
00190 #ifdef VERBOSE
00191 print 9990, trim(ch_id), grid_id, search%msg_intersections%field_info%tgt_method_id, &
00192 search%sender
00193
00194 call psmile_flushstd
00195 #endif /* VERBOSE */
00196
00197 ierror = 0
00198
00199 mask_id = Fields(var_id)%mask_id
00200
00201 cp => Grids(grid_id)%corner_pointer
00202 mp => Methods(Fields(var_id)%method_id)
00203 nreq = paction%nreq
00204
00205 src_mask_available = mask_id /= PRISM_UNDEFINED
00206
00207 new_nbr_cells(:) = nbr_cells(:)
00208 send_info%nloc = ncpl
00209
00210 if (src_mask_available) then
00211 #ifdef DEBUG
00212 print *, ' Source mask is available.'
00213 #endif /* DEBUG */
00214 mask_array => Masks(mask_id)%mask_array
00215 mask_shape = Masks(mask_id)%mask_shape
00216 endif
00217
00218 if ( Grids(grid_id)%grid_type == PRISM_Irrlonlat_regvrt ) then
00219
00220 nbr_corners = cp%nbr_corners/2
00221
00222 else if ( Grids(grid_id)%grid_type == PRISM_Reglonlatvrt ) then
00223
00224 nbr_corners = 2
00225
00226 else if ( Grids(grid_id)%grid_type == PRISM_Gaussreduced_regvrt ) then
00227
00228 nbr_corners = 2
00229
00230 else
00231
00232 ierrp (1) = search%search_data%grid_type
00233 ierror = PRISM_Error_Internal
00234 call psmile_error ( ierror, 'unsupported search grid type', &
00235 ierrp, 1, __FILE__, __LINE__ )
00236 return
00237
00238 endif
00239
00240 Allocate(save_lreq(nreq), stat=ierror)
00241 if (ierror /= 0) then
00242 ierrp (1) = nreq
00243 ierror = PRISM_Error_Alloc
00244 call psmile_error ( ierror, 'save_lreq', ierrp, 1, &
00245 __FILE__, __LINE__ )
00246 return
00247 endif
00248
00249 #ifdef PRISM_ASSERTION
00250 if (interpolation_methods(1) /= PSMILe_conserv2D ) then
00251 call psmile_assert (__FILE__, __LINE__, &
00252 "No global search for this interpolation method")
00253 endif
00254 if ( n_intmethods > 1 ) then
00255 call psmile_assert (__FILE__, __LINE__, &
00256 "2nd (vertical) interpolation method not yet supported.")
00257 endif
00258 #endif /* PRISM_ASSERTION */
00259 if (count(interpolation_search(:)) > 1) then
00260 ierrp (1) = count(interpolation_search(:))
00261
00262 ierror = PRISM_Error_Internal
00263
00264 call psmile_error ( ierror, 'Global search supported only for one interpolation method', &
00265 ierrp, 1, __FILE__, __LINE__ )
00266
00267 return
00268 endif
00269
00270 do interp = 1, n_intmethods
00271 if (interpolation_search(interp) ) exit
00272 end do
00273
00274 if (interp > n_intmethods) then
00275 ierror = PRISM_Error_Internal
00276 ierrp (1) = n_intmethods
00277
00278 call psmile_error ( ierror, 'No Global search for interpolation methods', &
00279 ierrp, 1, __FILE__, __LINE__ )
00280 return
00281 endif
00282
00283 len = 0
00284
00285 if ( associated(search%boundary_cell) ) then
00286
00287 len = size(search%boundary_cell(:,1))
00288
00289 Allocate(cell_trans(len,3), stat=ierror)
00290
00291 if (ierror /= 0) then
00292 ierrp (1) = 3*len
00293 ierror = PRISM_Error_Alloc
00294 call psmile_error ( ierror, 'cell_trans', ierrp, 1, &
00295 __FILE__, __LINE__ )
00296 return
00297 endif
00298
00299 endif
00300
00301
00302
00303
00304
00305
00306
00307
00308
00309
00310
00311
00312
00313 call psmile_init_enddef_msg_locs (msg_locations)
00314
00315 msg_locations%requires_conserv_remap = interpolation_methods (interp)
00316 msg_locations%src_rank = psmile_rank
00317 msg_locations%msg_len = len
00318 msg_locations%transi_in_id = search%msg_intersections%field_info%transient_in_id
00319 msg_locations%transi_out_id = search%msg_intersections%field_info%transient_out_id
00320
00321 call psmile_pack_msg_locations (msg_locations, msgloc)
00322
00323 dest = search%sender
00324
00325 #ifdef DEBUG
00326 print *, ' Bsend tag ', loctag+search%msg_intersections%relative_msg_tag, ' to ', dest
00327 #endif /* DEBUG */
00328
00329 call psmile_bsend ( msgloc, msgloc_size, MPI_INTEGER, dest, &
00330 loctag+search%msg_intersections%relative_msg_tag, &
00331 comm_psmile, ierror )
00332
00333 if ( ierror /= MPI_SUCCESS ) then
00334 ierrp (1) = ierror
00335 ierrp (2) = dest
00336 ierrp (3) = loctag+search%msg_intersections%relative_msg_tag
00337 ierror = PRISM_Error_Send
00338 call psmile_error ( ierror, 'psmile_bsend(msg)', &
00339 ierrp, 3, __FILE__, __LINE__ )
00340 return
00341 endif
00342
00343
00344
00345
00346
00347 if ( len > 0 ) then
00348
00349 call insertsort_boundary_cells(search%boundary_cell, len)
00350
00351 call psmile_bsend ( search%boundary_cell, 5*len, MPI_INTEGER, &
00352 dest, celltag, comm_psmile, ierror )
00353 endif
00354
00355 if ( ierror /= MPI_SUCCESS ) then
00356 ierrp (1) = ierror
00357 ierrp (2) = dest
00358 ierrp (3) = celltag
00359 ierror = PRISM_Error_Send
00360 call psmile_error ( ierror, 'psmile_bsend(msg)', &
00361 ierrp, 3, __FILE__, __LINE__ )
00362 return
00363 endif
00364
00365
00366
00367
00368 call MPI_Irecv ( nbr_procs, 2, MPI_INTEGER, &
00369 dest, celltag+1, comm_psmile, recv_req, ierror )
00370
00371 if ( ierror /= MPI_SUCCESS ) then
00372 ierrp (1) = ierror
00373 ierrp (2) = dest
00374 ierrp (3) = celltag+1
00375 ierror = PRISM_Error_Recv
00376 call psmile_error ( ierror, 'MPI_Recv', &
00377 ierrp, 3, __FILE__, __LINE__ )
00378 return
00379 endif
00380
00381 #ifdef DEBUG
00382 print *, ' Posted Irecv request ', recv_req, ' from ', dest
00383 #endif /* DEBUG */
00384
00385 save_lreq = paction%lrequest
00386
00387
00388
00389
00390
00391
00392
00393
00394 paction%lrequest = MPI_REQUEST_NULL
00395
00396 paction%lrequest (3) = recv_req
00397
00398
00399
00400
00401
00402 index = 0
00403
00404
00405 do while ( index /= 3)
00406
00407 #ifdef DEBUG
00408 do n = 1, nreq
00409 print *, ' paction%lrequest ', n, paction%lrequest(n)
00410 enddo
00411 #endif /* DEBUG */
00412
00413 call MPI_Waitany ( nreq, paction%lrequest, index, status, ierror )
00414
00415 #ifdef DEBUG
00416 print *, ' Processing with index ', index, ' from ', status(MPI_SOURCE)
00417 #endif /* DEBUG */
00418
00419 if ( index /= 3 ) then
00420
00421 call psmile_enddef_action (search, index, status, ierror )
00422
00423 endif
00424
00425 enddo
00426
00427 paction%lrequest(3) = MPI_REQUEST_NULL
00428
00429 nsend = nbr_procs(1)
00430 nrecv = nbr_procs(2)
00431
00432 #ifdef DEBUG
00433 print *, ' This rank has to serve ', nsend, ' source grid processes.'
00434 print *, ' This rank has to receive from ', nrecv, ' source grid processes.'
00435 #endif /* DEBUG */
00436
00437 if ( max(nsend,nrecv) > 0 ) then
00438 Allocate ( tgt_ibuf (max(nsend,nrecv)), stat=ierror)
00439 if (ierror /= 0) then
00440 ierrp (1) = max(nsend,nrecv)
00441 ierror = PRISM_Error_Alloc
00442 call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
00443 __FILE__, __LINE__ )
00444 return
00445 endif
00446 endif
00447
00448 if ( nsend > 0 ) then
00449 Allocate(real_buf (nsend), &
00450 log_buf (nsend), &
00451 src_ibuf (nsend), &
00452 nbr_tgt_cells (nsend), &
00453 nbr_src_cells (nsend), &
00454 rank_list (nsend), stat=ierror)
00455 if (ierror /= 0) then
00456 ierrp (1) = 6*nsend
00457 ierror = PRISM_Error_Alloc
00458 call psmile_error ( ierror, 'nsend buffer', ierrp, 1, &
00459 __FILE__, __LINE__ )
00460 return
00461 endif
00462
00463 nbr_tgt_cells(:) = 0
00464 nbr_src_cells(:) = 0
00465 rank_list(:) = -1
00466 endif
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481 #ifdef DEBUGX
00482 print *, ' Receiving data ', len, ' records from ', dest
00483 if ( len == 0 ) &
00484 print *, ' Receive canceled because of zero message '
00485 #endif /* DEBUGX */
00486
00487 if ( len > 0 ) then
00488 call MPI_Recv ( cell_trans, 3*len, MPI_INTEGER, &
00489 dest, celltag+2, comm_psmile, status, ierror )
00490
00491 if ( ierror /= MPI_SUCCESS ) then
00492 ierrp (1) = ierror
00493 ierrp (2) = dest
00494 ierrp (3) = celltag+2
00495 ierror = PRISM_Error_Recv
00496 call psmile_error ( ierror, 'MPI_Recv', &
00497 ierrp, 3, __FILE__, __LINE__ )
00498 return
00499 endif
00500 endif
00501
00502
00503
00504
00505 if ( nsend > 0 ) then
00506
00507 #ifdef DEBUGX
00508 do n = 1, len
00509
00510 print 9970, trim(ch_id), n, cell_trans(n,1), &
00511 ' -> ', cell_trans(n,2), cell_trans(n,3)
00512
00513 enddo
00514 9970 format (1x, a, ': ', i6, ' Transfer index ', i6, a4, i3, i6)
00515 #endif /* DEBUGX */
00516
00517 nn = 1
00518 m = 0
00519
00520 rank_list(nn) = cell_trans(1,2)
00521
00522 do n = 1, len
00523 if ( n > 1 ) then
00524 if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00525 nn = nn + 1
00526 rank_list(nn) = cell_trans(n,2)
00527 endif
00528 endif
00529 if ( cell_trans(n,2) == -1 ) exit
00530
00531 tgt_idx = cell_trans(n,1)
00532 nbr_tgt_cells(nn) = nbr_tgt_cells(nn) + 1
00533 nbr_src_cells(nn) = nbr_src_cells(nn) + nbr_cells(tgt_idx)
00534 enddo
00535 #ifdef DEBUG
00536 do n = 1, nsend
00537 print *, nbr_tgt_cells(n), ' target cell info to transfer.'
00538 print *, nbr_src_cells(n), ' source cells have to be transferred to rank ', rank_list(n)
00539 enddo
00540 #endif /* DEBUG */
00541
00542
00543
00544
00545
00546
00547
00548
00549
00550 do n = 1, nsend
00551 Allocate ( tgt_ibuf(n)%vector(nbr_tgt_cells(n)*2), &
00552 src_ibuf(n)%vector(nbr_src_cells(n)*3), &
00553 log_buf(n)%vector(nbr_src_cells(n)), &
00554 real_buf(n)%vector(nbr_src_cells(n),nbr_corners,2), stat=ierror )
00555 if (ierror /= 0) then
00556 ierrp (1) = 2*nbr_tgt_cells(n) + (2*nbr_corners+4)*nbr_src_cells(n)
00557 ierror = PRISM_Error_Alloc
00558 call psmile_error ( ierror, 'src/tgt_buf(n)%ibuf/real_buf ', &
00559 ierrp, 1, __FILE__, __LINE__ )
00560 return
00561 endif
00562 enddo
00563
00564
00565
00566 nn = 1
00567 m = 0
00568
00569 do n = 1, len
00570
00571 if ( n > 1 ) then
00572 if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00573 nn = nn + 1
00574 m = 0
00575 endif
00576 endif
00577
00578 if ( cell_trans(n,2) == -1 ) exit
00579
00580 tgt_idx = cell_trans(n,1)
00581 m = m+1
00582
00583 tgt_ibuf(nn)%vector(2*m-1) = cell_trans(n,3)
00584 tgt_ibuf(nn)%vector(2*m ) = nbr_cells(tgt_idx)
00585
00586 enddo
00587
00588
00589
00590
00591
00592 select case (Grids(grid_id)%grid_type)
00593
00594 case (PRISM_Irrlonlat_regvrt)
00595
00596
00597
00598
00599
00600 lenx = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00601 lenxy = (cp%corner_shape(2,2)-cp%corner_shape(1,2)+1) * lenx
00602
00603 if (src_mask_available) then
00604 m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00605 #ifdef PRISM_ASSERTION
00606 if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00607 cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00608 call psmile_assert (__FILE__, __LINE__, &
00609 "Mask and corners have to be of the same shape.")
00610 endif
00611 if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00612 call psmile_assert (__FILE__, __LINE__, &
00613 "3d masks not yet supported.")
00614 endif
00615 #endif /* PRISM_ASSERTION */
00616 endif
00617
00618 l = 0
00619 rank = 1
00620
00621 do n = 1, len
00622
00623 if ( n > 1 ) then
00624 if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00625 rank = rank + 1
00626 l = 0
00627 endif
00628 endif
00629 if ( cell_trans(n,2) == -1 ) exit
00630
00631 tgt_idx = cell_trans(n,1)
00632 #ifdef DEBUGX
00633 print *, ' Corners for local target index ', tgt_idx
00634 #endif /* DEBUGX */
00635 nn = sum(nbr_cells(1:tgt_idx-1))
00636
00637 do mm = 1, nbr_cells(tgt_idx)
00638
00639 idx = (neighcells_3d(2,nn+mm,1)-cp%corner_shape(1,2))*lenx + &
00640 neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00641
00642 src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00643 src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00644 src_ibuf(rank)%vector(3*(l+mm) ) = neighcells_3d(3,nn+mm,1)
00645
00646 do nc = 1, nbr_corners
00647 real_buf(rank)%vector(l+mm,nc,1) = cp%corners_real(1)%vector((nc-1)*lenxy+idx)
00648 real_buf(rank)%vector(l+mm,nc,2) = cp%corners_real(2)%vector((nc-1)*lenxy+idx)
00649 #ifdef DEBUGX
00650 print *, ' send corners ', l+mm, real_buf(rank)%vector(l+mm,nc,1), &
00651 real_buf(rank)%vector(l+mm,nc,2)
00652 end do
00653
00654 print *, ' 2d indices ', neighcells_3d(1,nn+mm,1), &
00655 neighcells_3d(2,nn+mm,1)
00656 #else
00657 end do
00658 #endif /* DEBUGX */
00659 enddo
00660
00661 if (src_mask_available) then
00662
00663 do mm = 1, nbr_cells(tgt_idx)
00664 log_buf(rank)%vector(l+mm) = &
00665 mask_array((neighcells_3d(2,nn+mm,1)-mask_shape(1,2))*m_lenx + &
00666 neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1)
00667 enddo
00668 #ifdef DEBUGX
00669 print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00670 #endif
00671 endif
00672
00673
00674
00675 #ifdef DEBUGX
00676 print *, ' Erase target cell index ', tgt_idx
00677 print *
00678 #endif /* DEBUGX */
00679 new_nbr_cells(tgt_idx) = 0
00680 send_info%nloc = send_info%nloc - 1
00681
00682
00683
00684 l = l + nbr_cells(tgt_idx)
00685
00686 enddo
00687
00688 case (PRISM_Reglonlatvrt)
00689
00690
00691
00692
00693
00694 lenx = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00695 leny = (cp%corner_shape(2,2)-cp%corner_shape(1,2)+1)
00696
00697 if (src_mask_available) then
00698 m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00699 #ifdef PRISM_ASSERTION
00700 if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00701 cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00702 call psmile_assert (__FILE__, __LINE__, &
00703 "Mask and corners have to be of the same shape.")
00704 endif
00705 if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00706 call psmile_assert (__FILE__, __LINE__, &
00707 "3d masks not yet supported.")
00708 endif
00709 #endif /* PRISM_ASSERTION */
00710 endif
00711
00712 l = 0
00713 rank = 1
00714
00715 do n = 1, len
00716
00717 if ( n > 1 ) then
00718 if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00719 rank = rank + 1
00720 l = 0
00721 endif
00722 endif
00723 if ( cell_trans(n,2) == -1 ) exit
00724
00725 tgt_idx = cell_trans(n,1)
00726 #ifdef DEBUGX
00727 print *, ' Corners for local target index ', tgt_idx
00728 #endif /* DEBUGX */
00729 nn = sum(nbr_cells(1:tgt_idx-1))
00730
00731 do mm = 1, nbr_cells(tgt_idx)
00732
00733 src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00734 src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00735 src_ibuf(rank)%vector(3*(l+mm) ) = neighcells_3d(3,nn+mm,1)
00736
00737 idx1 = neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00738 idx2 = neighcells_3d(2,nn+mm,1)-cp%corner_shape(1,2)+1
00739
00740 do nc = 1, nbr_corners
00741 real_buf(rank)%vector(l+mm,nc,1) = cp%corners_real(1)%vector((nc-1)*lenx+idx1)
00742 real_buf(rank)%vector(l+mm,nc,2) = cp%corners_real(2)%vector((nc-1)*leny+idx2)
00743 #ifdef DEBUGX
00744 print *, ' send corners ', l+mm, real_buf(rank)%vector(l+mm,nc,1), &
00745 real_buf(rank)%vector(l+mm,nc,2)
00746 end do
00747
00748 print *, ' 2d indices ', neighcells_3d(1,nn+mm,1), &
00749 neighcells_3d(2,nn+mm,1)
00750 #else
00751 end do
00752 #endif /* DEBUGX */
00753 enddo
00754
00755 if (src_mask_available) then
00756
00757 do mm = 1, nbr_cells(tgt_idx)
00758 log_buf(rank)%vector(l+mm) = &
00759 mask_array((neighcells_3d(2,nn+mm,1)-mask_shape(1,2))*m_lenx + &
00760 neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1)
00761 enddo
00762 #ifdef DEBUGX
00763 print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00764 #endif
00765 endif
00766
00767
00768
00769 #ifdef DEBUGX
00770 print *, ' Erase target cell index ', tgt_idx
00771 print *
00772 #endif /* DEBUGX */
00773 new_nbr_cells(tgt_idx) = 0
00774 send_info%nloc = send_info%nloc - 1
00775
00776
00777
00778 l = l + nbr_cells(tgt_idx)
00779
00780 enddo
00781
00782 case (PRISM_Gaussreduced_regvrt)
00783
00784
00785
00786
00787
00788 lenx = (cp%corner_shape(2,1)-cp%corner_shape(1,1)+1)
00789
00790 if (src_mask_available) then
00791 m_lenx = (mask_shape(2,1)-mask_shape(1,1)+1)
00792 #ifdef PRISM_ASSERTION
00793 if ( cp%corner_shape(1,2) /= mask_shape(1,2) .or. &
00794 cp%corner_shape(1,1) /= mask_shape(1,1) ) then
00795 call psmile_assert (__FILE__, __LINE__, &
00796 "Mask and corners have to be of the same shape.")
00797 endif
00798 if ( mask_shape(2,3) - mask_shape(1,3) + 1 > 1 ) then
00799 call psmile_assert (__FILE__, __LINE__, &
00800 "3d masks not yet supported.")
00801 endif
00802 #endif /* PRISM_ASSERTION */
00803 endif
00804
00805 l = 0
00806 rank = 1
00807
00808
00809 do n = 1, len
00810
00811 if ( n > 1 ) then
00812 if ( cell_trans(n,2) > cell_trans(n-1,2) ) then
00813 rank = rank + 1
00814 l = 0
00815 endif
00816 endif
00817 if ( cell_trans(n,2) == -1 ) exit
00818
00819 tgt_idx = cell_trans(n,1)
00820 #ifdef DEBUGX
00821 print *, ' Corners for local target index ', tgt_idx
00822 #endif /* DEBUGX */
00823 nn = sum(nbr_cells(1:tgt_idx-1))
00824
00825 do mm = 1, nbr_cells(tgt_idx)
00826
00827 src_ibuf(rank)%vector(3*(l+mm)-2) = neighcells_3d(1,nn+mm,1)
00828 src_ibuf(rank)%vector(3*(l+mm)-1) = neighcells_3d(2,nn+mm,1)
00829 src_ibuf(rank)%vector(3*(l+mm) ) = neighcells_3d(3,nn+mm,1)
00830
00831 idx = neighcells_3d(1,nn+mm,1)-cp%corner_shape(1,1)+1
00832
00833 do nc = 1, nbr_corners
00834 real_buf(rank)%vector(l+mm,nc,1) = cp%corners_real(1)%vector((nc-1)*lenx+idx)
00835 real_buf(rank)%vector(l+mm,nc,2) = cp%corners_real(2)%vector((nc-1)*lenx+idx)
00836 #ifdef DEBUGX
00837 print *, ' send corners ', l+mm, real_buf(rank)%vector(l+mm,nc,1), &
00838 real_buf(rank)%vector(l+mm,nc,2)
00839 #endif
00840 end do
00841 #ifdef DEBUGX
00842 print *, ' local index', neighcells_3d(1,nn+mm,1)
00843 #endif
00844 enddo
00845
00846 if (src_mask_available) then
00847
00848 do mm = 1, nbr_cells(tgt_idx)
00849 log_buf(rank)%vector(l+mm) = &
00850 mask_array(neighcells_3d(1,nn+mm,1)-mask_shape(1,1)+1)
00851 enddo
00852 #ifdef DEBUGX
00853 print *, ' send mask is ', log_buf(rank)%vector(l+1:l+nbr_cells(tgt_idx))
00854 #endif
00855 endif
00856
00857
00858
00859 #ifdef DEBUGX
00860 print *, ' Erase target cell index ', tgt_idx
00861 print *
00862 #endif /* DEBUGX */
00863 new_nbr_cells(tgt_idx) = 0
00864 send_info%nloc = send_info%nloc - 1
00865
00866
00867
00868 l = l + nbr_cells(tgt_idx)
00869
00870 enddo
00871
00872 case DEFAULT
00873
00874 ierrp (1) = search%search_data%grid_type
00875 ierror = PRISM_Error_Internal
00876 call psmile_error ( ierror, 'unsupported search grid type', &
00877 ierrp, 1, __FILE__, __LINE__ )
00878 return
00879
00880 end select
00881 endif
00882
00883
00884
00885
00886
00887 paction%lrequest = save_lreq
00888
00889 if ( nsend > nreq ) then
00890 Deallocate (save_lreq, stat=ierror)
00891 if (ierror /= 0) then
00892 ierrp (1) = nreq
00893 ierror = PRISM_Error_Dealloc
00894 call psmile_error ( ierror, 'save_lreq', ierrp, 1, &
00895 __FILE__, __LINE__ )
00896 return
00897 endif
00898 endif
00899
00900
00901
00902 do n = 1, nsend
00903
00904
00905
00906
00907
00908
00909
00910
00911
00912 extra_msg(1) = interpolation_methods(1)
00913 extra_msg(2) = PRISM_REAL
00914 extra_msg(3) = nbr_tgt_cells(n)
00915 extra_msg(4) = nbr_corners
00916 extra_msg(5) = nbr_src_cells(n)
00917 extra_msg(6) = mask_id
00918
00919
00920
00921
00922 call psmile_get_info_index (Fields(var_id)%method_id, send_appl_index, &
00923 idx, ierror)
00924 if (ierror > 0) return
00925
00926 extra_msg(7) = mp%send_infos_appl(idx)%msg_id
00927
00928 mp%send_infos_appl(idx)%dest = rank_list(n)
00929 mp%send_infos_appl(idx)%nloc = nbr_src_cells(n)
00930
00931 allocate(neigh_3d(3,nbr_src_cells(n)), stat=ierror)
00932 if (ierror /= 0) then
00933 ierrp (1) = 3*nbr_src_cells(n)
00934 ierror = PRISM_Error_Alloc
00935 call psmile_error ( ierror, 'neigh_3d', ierrp, 1, &
00936 __FILE__, __LINE__ )
00937 return
00938 endif
00939
00940 do m = 1, nbr_src_cells(n)
00941 neigh_3d(1,m) = src_ibuf(n)%vector(3*m-2)
00942 neigh_3d(2,m) = src_ibuf(n)%vector(3*m-1)
00943 neigh_3d(3,m) = src_ibuf(n)%vector(3*m )
00944 enddo
00945
00946 mp%send_infos_appl(idx)%neigh_3d => neigh_3d
00947
00948 call psmile_store_send_info (var_id, &
00949 search%msg_intersections%field_info%transient_out_id, &
00950 PRISM_UNDEFINED, PRISM_UNDEFINED, idx, ierror)
00951
00952
00953
00954 #ifdef DEBUG
00955 print *, ' Posting send request to psmile rank ', rank_list(n)
00956 #endif /* DEBUG */
00957 call psmile_bsend ( extra_msg, n_extra, MPI_INTEGER, &
00958 rank_list(n), celltag+3, comm_psmile, ierror )
00959
00960 if (ierror /= MPI_SUCCESS) then
00961 ierrp (1) = ierror
00962 ierrp (2) = rank_list(n)
00963 ierrp (3) = celltag+3
00964
00965 ierror = PRISM_Error_Send
00966
00967 call psmile_error (ierror, 'psmile_bsend(msg)', &
00968 ierrp, 3, __FILE__, __LINE__ )
00969 return
00970 endif
00971
00972 #ifdef DEBUGX
00973 do m = 1, nbr_tgt_cells(n)
00974 print '(a16,i6,a13,i4)', ' send tgt index ', tgt_ibuf(n)%vector(2*m-1), &
00975 ' # src cells ', tgt_ibuf(n)%vector(2*m )
00976 enddo
00977 #endif /* DEBUGX */
00978 call psmile_bsend ( tgt_ibuf(n)%vector, 2*nbr_tgt_cells(n), &
00979 MPI_INTEGER, rank_list(n), celltag+4, comm_psmile, ierror )
00980
00981 call psmile_bsend ( real_buf(n)%vector, 2*nbr_corners*nbr_src_cells(n), &
00982 MPI_REAL, rank_list(n), celltag+6, comm_psmile, ierror )
00983
00984 if ( src_mask_available ) &
00985 call psmile_bsend ( log_buf(n)%vector, nbr_src_cells(n), &
00986 MPI_LOGICAL, rank_list(n), celltag+7, comm_psmile, ierror )
00987
00988
00989
00990 Deallocate ( tgt_ibuf(n)%vector, src_ibuf(n)%vector, real_buf(n)%vector, stat=ierror)
00991 if (ierror /= 0) then
00992 ierrp (1) = 2*nbr_tgt_cells(n)+(2*nbr_corners+1)*nbr_src_cells(n)
00993 ierror = PRISM_Error_Dealloc
00994 call psmile_error ( ierror, 'src/tgt buffer', ierrp, 1, &
00995 __FILE__, __LINE__ )
00996 return
00997 endif
00998
00999 enddo
01000
01001 if ( nsend > 0 ) then
01002 Deallocate ( real_buf, src_ibuf, nbr_tgt_cells, nbr_src_cells, rank_list, stat=ierror)
01003 if (ierror /= 0) then
01004 ierrp (1) = 5*nsend
01005 ierror = PRISM_Error_Dealloc
01006 call psmile_error ( ierror, 'src_ibuf', ierrp, 1, &
01007 __FILE__, __LINE__ )
01008 return
01009 endif
01010 endif
01011
01012
01013
01014
01015
01016 extra_search%nrecv = 0
01017
01018 if ( nrecv > 0 ) then
01019
01020 extra_search%nrecv = nrecv
01021
01022 Allocate (rextra_msg(nrecv), &
01023 rextra_msg_buf(n_extra,nrecv), &
01024 extra_search%dble_bufs(nrecv), &
01025 extra_search%log_bufs(nrecv), &
01026 lrequest(nrecv), &
01027 send_info%len_sent(nrecv), &
01028 send_info%sender_global(nrecv), &
01029 send_info%msg_id(nrecv), &
01030 stat=ierror)
01031 if (ierror /= 0) then
01032 ierrp (1) = (n_extra+4)*nrecv
01033 ierror = PRISM_Error_Alloc
01034 call psmile_error ( ierror, 'rextra_msg, lrequest, ... ', ierrp, 1, &
01035 __FILE__, __LINE__ )
01036 return
01037 endif
01038
01039 lrequest = MPI_REQUEST_NULL
01040
01041 do n = 1, nrecv
01042
01043 call MPI_Irecv ( rextra_msg_buf(:,n), n_extra, MPI_INTEGER, &
01044 MPI_ANY_SOURCE, celltag+3, comm_psmile, lrequest(n), ierror )
01045
01046 if ( ierror /= MPI_SUCCESS ) then
01047 ierrp (1) = ierror
01048 ierrp (2) = MPI_ANY_SOURCE
01049 ierrp (3) = celltag+3
01050
01051 ierror = PRISM_Error_MPI
01052
01053 call psmile_error ( ierror, 'MPI_Irecv', &
01054 ierrp, 3, __FILE__, __LINE__ )
01055 return
01056 endif
01057
01058 enddo
01059
01060 do n = 1, nrecv
01061
01062 call MPI_Waitany ( nrecv, lrequest, index, status, ierror )
01063
01064 if ( ierror /= MPI_SUCCESS ) then
01065 ierrp (1) = ierror
01066 ierrp (2) = status (MPI_SOURCE)
01067 ierrp (3) = status (MPI_TAG)
01068
01069 ierror = PRISM_Error_MPI
01070
01071 call psmile_error ( ierror, 'MPI_Waitany', &
01072 ierrp, 3, __FILE__, __LINE__ )
01073 return
01074 endif
01075
01076 sender = status(MPI_SOURCE)
01077 #ifdef DEBUG
01078 print *, ' Request index ', index , ' received from rank ', sender
01079 #endif /* DEBUG */
01080
01081 rextra_msg(index)%interp_type = rextra_msg_buf(1,index)
01082 rextra_msg(index)%data_type = rextra_msg_buf(2,index)
01083 rextra_msg(index)%num_tgt_cells = rextra_msg_buf(3,index)
01084 rextra_msg(index)%num_src_corners = rextra_msg_buf(4,index)
01085 rextra_msg(index)%num_src_cells = rextra_msg_buf(5,index)
01086 rextra_msg(index)%mask_id = rextra_msg_buf(6,index)
01087 rextra_msg(index)%msg_id = rextra_msg_buf(7,index)
01088
01089 len = 2 * rextra_msg(index)%num_tgt_cells
01090
01091 Allocate ( tgt_ibuf(index)%vector(len), stat = ierror)
01092 if (ierror /= 0) then
01093 ierrp (1) = len
01094 ierror = PRISM_Error_Alloc
01095 call psmile_error ( ierror, 'tgt_ibuf%vector', ierrp, 1, &
01096 __FILE__, __LINE__ )
01097 return
01098 endif
01099
01100
01101
01102 call MPI_Recv ( tgt_ibuf(index)%vector, len, MPI_INTEGER, sender, &
01103 celltag+4, comm_psmile, status, ierror )
01104 #ifdef DEBUGX
01105 do m = 1, rextra_msg(index)%num_tgt_cells
01106 print '(a16,i6,a13,i4)', &
01107 ' recv tgt index ', tgt_ibuf(index)%vector(2*m-1), &
01108 ' # src cells ', tgt_ibuf(index)%vector(2*m)
01109 enddo
01110 #endif /* DEBUGX */
01111
01112 len = 2 * rextra_msg(index)%num_src_corners * &
01113 rextra_msg(index)%num_src_cells
01114
01115 Allocate ( buf(len), stat=ierror)
01116 if (ierror /= 0) then
01117 ierrp (1) = len
01118 ierror = PRISM_Error_Alloc
01119 call psmile_error ( ierror, 'buf', ierrp, 1, &
01120 __FILE__, __LINE__ )
01121 return
01122 endif
01123
01124
01125
01126 call MPI_Recv ( buf, len, MPI_REAL, &
01127 sender, celltag+6, comm_psmile, status, ierror )
01128
01129 extra_search%real_bufs(index)%vector => buf
01130
01131 #ifdef DEBUGX
01132 stride = rextra_msg(index)%num_src_cells * &
01133 rextra_msg(index)%num_src_corners
01134
01135 m = 1
01136 do i = 1, rextra_msg(index)%num_tgt_cells
01137 do nn = 1, tgt_ibuf(index)%vector(2*i)
01138 do nc = 1, rextra_msg(index)%num_src_corners
01139 idx1 = (nc-1)*rextra_msg(index)%num_src_cells+m
01140 idx2 = stride + idx1
01141 print *, ' receive corners ', m, 'for tgt', &
01142 tgt_ibuf(index)%vector(2*i-1), &
01143 extra_search%real_bufs(index)%vector(idx1), &
01144 extra_search%real_bufs(index)%vector(idx2)
01145 end do
01146 m = m + 1
01147 enddo
01148 enddo
01149 #endif /* DEBUGX */
01150
01151
01152
01153 if ( rextra_msg(index)%mask_id /= PRISM_UNDEFINED ) then
01154
01155 len = rextra_msg(index)%num_src_cells
01156 Allocate ( lbuf(len), stat=ierror)
01157 if (ierror /= 0) then
01158 ierrp (1) = len
01159 ierror = PRISM_Error_Alloc
01160 call psmile_error ( ierror, 'buf', ierrp, 1, &
01161 __FILE__, __LINE__ )
01162 return
01163 endif
01164 #ifdef DEBUGX
01165 print *, ' receive extra mask values '
01166 #endif
01167 call MPI_Recv ( lbuf, len, MPI_LOGICAL, &
01168 sender, celltag+7, comm_psmile, status, ierror )
01169
01170 #ifdef DEBUGX
01171 print *, ' received extra mask values '
01172 #endif
01173 extra_search%log_bufs(index)%vector => lbuf
01174
01175 endif
01176
01177
01178
01179 send_info%nrecv = send_info%nrecv + 1
01180 send_info%num2recv = send_info%num2recv + &
01181 rextra_msg(index)%num_src_cells
01182 send_info%sender_global(index) = sender
01183 send_info%len_sent(index) = rextra_msg(index)%num_src_cells
01184 send_info%msg_id(index) = rextra_msg(index)%msg_id
01185
01186 enddo
01187
01188
01189
01190
01191
01192
01193 len = sum(rextra_msg(:)%num_src_cells)
01194
01195
01196
01197
01198 Allocate(tgt_list(len), tgt_index(len), stat=ierror)
01199 if (ierror /= 0) then
01200 ierrp (1) = 2 * len
01201 ierror = PRISM_Error_Alloc
01202 call psmile_error ( ierror, 'tgt_list, ... ', ierrp, 1, &
01203 __FILE__, __LINE__ )
01204 return
01205 endif
01206
01207 nn = 1
01208
01209 do n = 1, nrecv
01210 mm = 1
01211 do m = 1, rextra_msg(n)%num_tgt_cells
01212 do l = 1, tgt_ibuf(n)%vector(2*m)
01213 tgt_list (nn) = tgt_ibuf(n)%vector(2*m-1)
01214 tgt_index(nn) = nn
01215 nn = nn + 1
01216 mm = mm + 1
01217 enddo
01218 enddo
01219 enddo
01220
01221 call psmile_quicksort_index ( tgt_list, len, tgt_index )
01222
01223 endif
01224
01225
01226 if ( nrecv > 0 .or. nsend > 0 ) then
01227
01228
01229
01230 len = sum(new_nbr_cells(:))
01231 if (nrecv > 0) len = len + sum(rextra_msg(:)%num_src_cells)
01232
01233 if ( len > 0 ) then
01234
01235 Allocate(new_neighcells_3d(ndim_3d,len,1), stat=ierror)
01236 if (ierror /= 0) then
01237 ierrp (1) = ndim_3d*len
01238 ierror = PRISM_Error_Alloc
01239 call psmile_error ( ierror, 'new_neighcells_3d', ierrp, 1, &
01240 __FILE__, __LINE__ )
01241 return
01242 endif
01243
01244 l = 1
01245 mm = 0
01246 nn = 0
01247
01248 if ( nrecv > 0 ) then
01249 len = sum(rextra_msg(:)%num_src_cells)
01250 else
01251 len = 0
01252 endif
01253
01254 do n = 1, ncpl
01255
01256
01257 do i = 1, new_nbr_cells(n)
01258 new_neighcells_3d(1,mm+i,1) = neighcells_3d(1,nn+i,1)
01259 new_neighcells_3d(2,mm+i,1) = neighcells_3d(2,nn+i,1)
01260 new_neighcells_3d(3,mm+i,1) = neighcells_3d(3,nn+i,1)
01261 enddo
01262
01263 nn = nn + nbr_cells(n)
01264 mm = mm + new_nbr_cells(n)
01265
01266
01267 if ( l > len ) cycle
01268
01269
01270
01271 do while ( n == tgt_list(l) )
01272
01273
01274
01275
01276
01277
01278 mm = mm + 1
01279 new_nbr_cells(n) = new_nbr_cells(n) + 1
01280 new_neighcells_3d(1,mm,1) = extra_search%global_marker
01281 new_neighcells_3d(2,mm,1) = tgt_index(l)
01282 new_neighcells_3d(3,mm,1) = PSMILe_Undef
01283
01284 l = l + 1
01285
01286 if ( l > len ) exit
01287
01288 enddo
01289 enddo
01290
01291 Deallocate(neighcells_3d, stat=ierror)
01292 if (ierror /= 0) then
01293 ierrp (1) = ndim_3d*sum(nbr_cells(:))
01294 ierror = PRISM_Error_Alloc
01295 call psmile_error ( ierror, 'neighcells_3d', ierrp, 1, &
01296 __FILE__, __LINE__ )
01297 return
01298 endif
01299
01300 neighcells_3d => new_neighcells_3d
01301
01302 else
01303
01304 Nullify(neighcells_3d)
01305
01306 endif
01307
01308 endif
01309
01310
01311
01312 nbr_cells(:) = new_nbr_cells(:)
01313
01314
01315
01316
01317
01318
01319 if ( max(nsend,nrecv) > 0 ) then
01320
01321
01322
01323
01324 do n = 1, nrecv
01325 Deallocate (tgt_ibuf(n)%vector, stat=ierror)
01326 if (ierror /= 0) then
01327 ierrp (1) = PSMILe_Undef
01328 ierror = PRISM_Error_Dealloc
01329 call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
01330 __FILE__, __LINE__ )
01331 return
01332 endif
01333 enddo
01334
01335 Deallocate (tgt_ibuf, stat=ierror)
01336 if (ierror /= 0) then
01337 ierrp (1) = max(nsend,nrecv)
01338 ierror = PRISM_Error_Dealloc
01339 call psmile_error ( ierror, 'tgt_ibuf', ierrp, 1, &
01340 __FILE__, __LINE__ )
01341 return
01342 endif
01343
01344 endif
01345
01346 if ( nrecv > 0 ) then
01347 Deallocate(tgt_list, tgt_index, stat=ierror)
01348 if (ierror /= 0) then
01349 ierrp (1) = 2*sum(rextra_msg(:)%num_src_cells) + sum(rextra_msg(:)%num_tgt_cells)
01350 ierror = PRISM_Error_Alloc
01351 call psmile_error ( ierror, 'tgt_list, ... ', ierrp, 1, &
01352 __FILE__, __LINE__ )
01353 return
01354 endif
01355
01356 Deallocate(rextra_msg, rextra_msg_buf, lrequest, stat=ierror)
01357 if (ierror /= 0) then
01358 ierrp (1) = (n_extra+1)*nrecv+n_extra
01359 ierror = PRISM_Error_Alloc
01360 call psmile_error ( ierror, 'rextra_msg, ... ', ierrp, 1, &
01361 __FILE__, __LINE__ )
01362 return
01363 endif
01364 endif
01365
01366
01367
01368
01369
01370 #ifdef VERBOSE
01371 print 9980, trim(ch_id), ierror
01372
01373 call psmile_flushstd
01374 #endif /* VERBOSE */
01375
01376
01377
01378
01379
01380 #ifdef VERBOSE
01381
01382 9990 format (1x, a, ': psmile_global_search_cell_real: grid_id', i3, &
01383 ' to ', i3, '(', i2, ')')
01384 9980 format (1x, a, ': psmile_global_search_cell_real: eof ierror =', i3)
01385
01386 #endif /* VERBOSE */
01387
01388 contains
01389
01390 subroutine insertsort_boundary_cells(boundary_cells, num_cells)
01391
01392 implicit none
01393
01394 integer, intent(in) :: num_cells
01395 integer, intent(inout) :: boundary_cells(num_cells, 5)
01396
01397 integer :: i, j, tmp_cell(5)
01398
01399 do i = 2, num_cells
01400 tmp_cell(:) = boundary_cells(i, :)
01401 j = i
01402 do while (tmp_cell(4) < boundary_cells(j-1, 4) .or. &
01403 tmp_cell(4) == boundary_cells(j-1, 4) .and. &
01404 tmp_cell(3) < boundary_cells(j-1, 3) .or. &
01405 tmp_cell(4) == boundary_cells(j-1, 4) .and. &
01406 tmp_cell(3) == boundary_cells(j-1, 3) .and. &
01407 tmp_cell(2) < boundary_cells(j-1, 2))
01408
01409 boundary_cells(j, :) = boundary_cells(j-1, :)
01410 j = j-1
01411 if (j == 1) exit
01412 enddo
01413
01414 boundary_cells(j, :) = tmp_cell
01415 enddo
01416 end subroutine insertsort_boundary_cells
01417
01418 end subroutine PSMILe_global_search_cell_real