00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 subroutine psmile_gauss_get_neighbours (grid_id, ierror)
00013
00014
00015
00016 use PRISM_constants
00017
00018 use PSMILe, dummy_interface => PSMILe_Gauss_get_neighbours
00019
00020 use psmile_grid_reduced_gauss
00021
00022 Implicit none
00023
00024
00025
00026 Integer, Intent (In) :: grid_id
00027
00028
00029
00030
00031
00032
00033
00034
00035 Integer, Intent (Out) :: ierror
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 Type(Grid), Pointer :: gp
00046
00047
00048
00049 Integer :: n, num_local_points
00050
00051
00052
00053 Integer :: off
00054
00055 Integer, Allocatable :: bicu_stencils(:,:)
00056 Integer, Allocatable :: neigh_point_list(:)
00057
00058
00059 Integer, Allocatable :: all_neigh_point_list(:)
00060
00061
00062 Integer :: num_neigh_points
00063 Integer, Allocatable :: num_remote_neigh_points(:)
00064 Type (block_info), Allocatable :: sorted_block_info(:)
00065 Integer :: curr_block
00066 Integer :: num_blocks
00067 Integer :: curr_point
00068
00069 Integer, Allocatable :: displs(:)
00070 Integer, Allocatable :: local_send_list(:)
00071 Integer, Allocatable :: local_recv_list(:)
00072
00073 Type(integer_vector), Allocatable :: local_get_list(:),
00074 local_put_list(:),
00075 local_put_loc_list(:)
00076
00077
00078
00079 Integer :: i, j
00080
00081
00082
00083 Integer :: npes
00084 Integer :: rank
00085 Integer :: comm
00086 Integer :: status(MPI_STATUS_SIZE)
00087 Integer, Pointer :: remote_index(:)
00088
00089
00090
00091 Integer, Parameter :: nerrp = 2
00092 Integer :: ierrp (nerrp)
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123 Character(len=len_cvs_string), save :: mycvs =
00124 '$Id: psmile_gauss_get_neighbours.F90 3032 2011-03-18 17:29:36Z hanke $'
00125
00126
00127
00128
00129
00130 #ifdef VERBOSE
00131 print 9990, trim(ch_id)
00132
00133 call psmile_flushstd
00134 #endif /* VERBOSE */
00135
00136 ierror = 0
00137
00138 gp => Grids(grid_id)
00139
00140 #ifdef PRISM_ASSERTION
00141 if (gp%grid_type /= PRISM_Gaussreduced_regvrt) then
00142 print *, "grid_type", gp%grid_type
00143 call psmile_assert (__FILE__, __LINE__, &
00144 "This routine is not designed for this grid type")
00145 endif
00146 #endif
00147
00148
00149
00150 num_local_points = gp%grid_shape(2,1) - gp%grid_shape(1,1) + 1
00151
00152 Allocate (bicu_stencils(16,gp%grid_shape(1,1):gp%grid_shape(2,1)), stat = ierror)
00153 if (ierror /= 0) then
00154 ierrp (1) = ierror
00155 ierrp (2) = num_local_points*16
00156
00157 ierror = PRISM_Error_Alloc
00158 call psmile_error ( ierror, 'bicu_stencils', ierrp, 2, &
00159 __FILE__, __LINE__ )
00160 return
00161 endif
00162
00163
00164
00165 do i = gp%grid_shape(1,1), gp%grid_shape(2,1)
00166 bicu_stencils(:,i) = psmile_gauss_3d_to_global_1d(grid_id, &
00167 psmile_gauss_get_bicu_stencil (grid_id, &
00168 psmile_gauss_local_1d_to_3d(grid_id, i)), 16)
00169 enddo
00170
00171 #ifdef DEBUGX
00172 print *, ' Initial stencil array with global indices '
00173 do i = 1, num_local_points
00174 write( *, '(a13,17i8)') 'Initial stencil ', i, bicu_stencils(:,i)
00175 enddo
00176 call psmile_flushstd
00177 #endif
00178
00179
00180
00181 if ( num_local_points /= sum (gp%reduced_gauss_data%nbr_points_per_lat) ) then
00182
00183 num_blocks = size(gp%reduced_gauss_data%local_block_info)
00184 Allocate (sorted_block_info(num_blocks), &
00185 stat = ierror)
00186 if (ierror /= 0) then
00187 ierrp (1) = ierror
00188 ierrp (2) = size(gp%reduced_gauss_data%local_block_info)
00189
00190 ierror = PRISM_Error_Alloc
00191 call psmile_error ( ierror, 'local_block_info', ierrp, 2, &
00192 __FILE__, __LINE__ )
00193 return
00194 endif
00195
00196
00197 call insertsort_block_info (gp%reduced_gauss_data%local_block_info, &
00198 sorted_block_info, num_blocks)
00199
00200 Allocate (neigh_point_list(num_local_points*15), stat = ierror)
00201 if (ierror /= 0) then
00202 ierrp (1) = ierror
00203 ierrp (2) = num_local_points*15
00204
00205 ierror = PRISM_Error_Alloc
00206 call psmile_error ( ierror, 'neigh_point_list', ierrp, 2, &
00207 __FILE__, __LINE__ )
00208 return
00209 endif
00210
00211 num_neigh_points = 0
00212 curr_block = 1
00213
00214
00215 do i = gp%grid_shape(1,1), gp%grid_shape(2,1)
00216
00217
00218 do n = 1, 16
00219
00220
00221
00222 if (n == 6) cycle
00223
00224
00225 curr_point = bicu_stencils(n, i)
00226
00227
00228 if (.not. is_local_point(curr_point, sorted_block_info, num_blocks)) then
00229
00230
00231
00232 call insert_unique_key(curr_point, num_neigh_points, &
00233 neigh_point_list, num_local_points*15)
00234 endif
00235
00236 enddo
00237 enddo
00238
00239 #ifdef DEBUGX
00240 print *, 'List of neighbour requests '
00241 do n = 1, num_neigh_points
00242 print *, ' Entry ', n, ':', neigh_point_list (n)
00243 enddo
00244 call psmile_flushstd
00245 #endif
00246
00247
00248
00249 npes = Comps(gp%comp_id)%size
00250 rank = Comps(gp%comp_id)%rank
00251 comm = Comps(gp%comp_id)%act_comm
00252
00253 Allocate(num_remote_neigh_points(0:npes-1), displs(0:npes-1), stat = ierror)
00254 if (ierror /= 0) then
00255 ierrp (1) = ierror
00256 ierrp (2) = npes
00257 ierror = PRISM_Error_Alloc
00258 call psmile_error ( ierror, 'num_remote_neigh_points', ierrp, 2, &
00259 __FILE__, __LINE__ )
00260 return
00261 endif
00262
00263 call MPI_Allgather ( num_neigh_points, 1, MPI_Integer, &
00264 num_remote_neigh_points(0), 1, &
00265 MPI_Integer, comm, ierror )
00266
00267 #ifdef DEBUGX
00268 print *, 'List of requests'
00269 do n = 0, npes-1
00270 print *, ' Proc # ', n, ':', num_remote_neigh_points(n)
00271 enddo
00272 call psmile_flushstd
00273 #endif
00274
00275 Allocate( all_neigh_point_list(sum(num_remote_neigh_points)), stat = ierror)
00276 if (ierror /= 0) then
00277 ierrp (1) = ierror
00278 ierrp (2) = sum(num_remote_neigh_points)
00279 ierror = PRISM_Error_Alloc
00280 call psmile_error ( ierror, 'all_neigh_point_list', ierrp, 2, &
00281 __FILE__, __LINE__ )
00282 return
00283 endif
00284
00285 displs(0) = 0
00286 do n = 1, npes-1
00287 displs(n) = displs (n-1) + num_remote_neigh_points(n-1)
00288 enddo
00289
00290
00291 call MPI_Allgatherv ( neigh_point_list(1), num_neigh_points, MPI_Integer, &
00292 all_neigh_point_list(1), num_remote_neigh_points, displs, &
00293 MPI_Integer, comm, ierror )
00294
00295 #ifdef DEBUGX
00296 print *, 'List of complete requests '
00297 off = 0
00298 do n = 0, npes-1
00299 do i = 1, num_remote_neigh_points(n)
00300 print *, ' Proc # ', n, ' Entry ', i, ':', all_neigh_point_list (off+i)
00301 enddo
00302 off = off + num_remote_neigh_points(n)
00303 enddo
00304 call psmile_flushstd
00305 #endif
00306
00307 Allocate( local_send_list(0:npes-1), local_recv_list(0:npes-1), stat = ierror)
00308 if (ierror /= 0) then
00309 ierrp (1) = ierror
00310 ierrp (2) = 2*npes
00311 ierror = PRISM_Error_Alloc
00312 call psmile_error ( ierror, 'local_send/recv_list', ierrp, 2, &
00313 __FILE__, __LINE__ )
00314 return
00315 endif
00316
00317 local_send_list = 0
00318
00319
00320 do n = 0, npes-1
00321
00322
00323 if ( n /= rank ) then
00324
00325
00326 do j = displs(n)+1, displs(n)+num_remote_neigh_points(n)
00327
00328
00329
00330
00331
00332 if (is_local_point(all_neigh_point_list(j), &
00333 sorted_block_info, &
00334 num_blocks)) then
00335
00336 local_send_list(n) = local_send_list(n) + 1
00337 else
00338 all_neigh_point_list(j) = psmile_undef
00339 endif
00340 enddo
00341 endif
00342 enddo
00343
00344
00345
00346
00347 call MPI_Alltoall ( local_send_list, 1, MPI_Integer, &
00348 local_recv_list, 1, MPI_Integer, comm, ierror )
00349
00350
00351
00352
00353
00354 Allocate ( local_get_list (0:npes-1), &
00355 local_put_list (0:npes-1), &
00356 local_put_loc_list(0:npes-1), stat = ierror)
00357 if (ierror /= 0) then
00358 ierrp (1) = ierror
00359 ierrp (2) = 3*npes
00360 ierror = PRISM_Error_Alloc
00361 call psmile_error ( ierror, 'local_get/put/put_loc_list', ierrp, 2, &
00362 __FILE__, __LINE__ )
00363 return
00364 endif
00365
00366
00367 do n = 0, npes-1
00368
00369 Nullify (local_get_list(n)%vector, &
00370 local_put_list(n)%vector, &
00371 local_put_loc_list(n)%vector)
00372
00373 if ( local_recv_list(n) > 0 ) then
00374 Allocate (local_get_list(n)%vector(local_recv_list(n)), stat = ierror)
00375 if (ierror /= 0) then
00376 ierrp (1) = ierror
00377 ierrp (2) = local_recv_list(n)
00378 ierror = PRISM_Error_Alloc
00379 call psmile_error ( ierror, 'local_get_list vector', ierrp, 2, &
00380 __FILE__, __LINE__ )
00381 return
00382 endif
00383 endif
00384
00385 if ( local_send_list(n) > 0 ) then
00386 Allocate (local_put_list(n)%vector(local_send_list(n)), &
00387 local_put_loc_list(n)%vector(local_send_list(n)), stat = ierror)
00388 if (ierror /= 0) then
00389 ierrp (1) = ierror
00390 ierrp (2) = 2*local_send_list(n)
00391 ierror = PRISM_Error_Alloc
00392 call psmile_error ( ierror, 'local_put_list vector', ierrp, 2, &
00393 __FILE__, __LINE__ )
00394 return
00395 endif
00396 endif
00397
00398 enddo
00399
00400
00401
00402
00403 do n = 0, npes-1
00404
00405
00406 if ( n /= rank .and. local_send_list(n) > 0) then
00407
00408 i = 0
00409
00410 do j = displs(n)+1, displs(n)+num_remote_neigh_points(n)
00411
00412
00413 if (all_neigh_point_list(j) /= psmile_undef) then
00414
00415 i = i + 1
00416 local_put_list(n)%vector(i) = all_neigh_point_list(j)
00417 endif
00418 enddo
00419
00420 #ifdef PRISM_ASSERTION
00421 if (i /= local_send_list(n)) then
00422
00423 print *, "something went terribly wrong here..."
00424 call psmile_assert (__FILE__, __LINE__, &
00425 'I do not know what happened.')
00426 endif
00427 #endif
00428 local_put_loc_list(n)%vector(:) = &
00429 psmile_gauss_1d_global_to_local (grid_id, local_put_list(n)%vector(:), &
00430 i, psmile_undef)
00431 endif
00432 enddo
00433
00434
00435
00436
00437
00438 do n = 0, npes-1
00439
00440 if ( local_send_list(n) > 0 ) &
00441 call psmile_bsend ( local_put_list(n)%vector, local_send_list(n), &
00442 MPI_Integer, n, GAUSSNEIGHTAG+rank, comm, ierror )
00443 enddo
00444 do n = 0, npes-1
00445
00446 if ( local_recv_list(n) > 0 ) &
00447 call MPI_Recv ( local_get_list(n)%vector, local_recv_list(n), &
00448 MPI_Integer, n, GAUSSNEIGHTAG+n, comm, status, ierror )
00449 enddo
00450
00451 num_neigh_points = sum(local_recv_list)
00452
00453 Allocate (gp%reduced_gauss_data%remote_index(num_neigh_points), stat = ierror)
00454 if (ierror /= 0) then
00455 ierrp (1) = ierror
00456 ierrp (2) = num_neigh_points
00457
00458 ierror = PRISM_Error_Alloc
00459 call psmile_error ( ierror, 'remote_index', ierrp, 2, &
00460 __FILE__, __LINE__ )
00461 return
00462 endif
00463
00464 remote_index => gp%reduced_gauss_data%remote_index
00465
00466 off = 0
00467 do n = 0, npes-1
00468 if ( local_recv_list(n) > 0 ) then
00469 remote_index(off+1:off+local_recv_list(n)) = &
00470 local_get_list(n)%vector(1:+local_recv_list(n))
00471 off = off + local_recv_list(n)
00472 endif
00473 enddo
00474
00475 #ifdef DEBUGX
00476 print *, 'List of remote indices '
00477 do i = 1, num_neigh_points
00478 print *, 'i: ', i, ' index: ', remote_index(i)
00479 enddo
00480 call psmile_flushstd
00481 #endif
00482
00483
00484 if ( gp%corner_pointer%corner_datatype == MPI_DOUBLE_PRECISION ) then
00485
00486 call get_neigh_method_data_dble (grid_id)
00487
00488 else if ( gp%corner_pointer%corner_datatype == MPI_REAL ) then
00489
00490 call get_neigh_method_data_real (grid_id)
00491
00492 else
00493
00494 ierror = PRISM_Error_Internal
00495 call psmile_error ( ierror, 'datatype is currently not supported', &
00496 (/-1/), 0, __FILE__, __LINE__ )
00497 endif
00498
00499 do i = 0, npes-1
00500 if (associated(local_get_list(i)%vector)) deallocate(local_get_list(i)%vector)
00501 enddo
00502 do i = 0, npes-1
00503 if (associated(local_put_list(i)%vector)) deallocate(local_put_list(i)%vector)
00504 enddo
00505 do i = 0, npes-1
00506 if (associated(local_put_loc_list(i)%vector)) deallocate(local_put_loc_list(i)%vector)
00507 enddo
00508
00509 Deallocate ( num_remote_neigh_points, &
00510 displs, &
00511 all_neigh_point_list, &
00512 sorted_block_info, &
00513 local_send_list, &
00514 local_recv_list, &
00515 local_get_list, &
00516 local_put_list, &
00517 local_put_loc_list, &
00518 neigh_point_list )
00519
00520 endif
00521
00522 Allocate (gp%reduced_gauss_data%local_1d_stencil_lookup(16,&
00523 gp%grid_shape(1,1):gp%grid_shape(2,1)), stat = ierror)
00524 if (ierror /= 0) then
00525 ierrp (1) = ierror
00526 ierrp (2) = gp%grid_shape(2,1)-gp%grid_shape(1,1)+1
00527
00528 ierror = PRISM_Error_Alloc
00529 call psmile_error ( ierror, 'local_1d_stencil_lookup', ierrp, 2, &
00530 __FILE__, __LINE__ )
00531 return
00532 endif
00533
00534
00535 do i = gp%grid_shape(1,1), gp%grid_shape(2,1)
00536 gp%reduced_gauss_data%local_1d_stencil_lookup(:,i) = &
00537 psmile_gauss_3d_to_local_1d(grid_id, &
00538 psmile_gauss_get_bicu_stencil (grid_id, &
00539 psmile_gauss_local_1d_to_3d(grid_id, i)), &
00540 16, psmile_undef, .true.)
00541 enddo
00542
00543
00544
00545
00546
00547 Deallocate ( bicu_stencils )
00548
00549
00550
00551
00552 #ifdef VERBOSE
00553 print 9980, trim(ch_id), num_neigh_points
00554
00555 call psmile_flushstd
00556 #endif /* VERBOSE */
00557
00558
00559
00560 9990 format (1x, a, ': psmile_gauss_get_neighbours')
00561 9980 format (1x, a, ': psmile_gauss_get_neighbours: eof, number of requested points ', i8)
00562
00563 contains
00564
00565
00566
00567
00568
00569
00570 subroutine insertsort_block_info (in_block_info, out_block_info, num_blocks)
00571
00572 implicit none
00573
00574 integer, intent(in) :: num_blocks
00575 type (block_info), intent(in) :: in_block_info(num_blocks)
00576 type (block_info), intent(out) :: out_block_info(num_blocks)
00577
00578 integer :: i, j
00579
00580 out_block_info(1) = in_block_info(1)
00581
00582 do i = 2, num_blocks
00583
00584 j = i
00585
00586 do while (out_block_info(j-1)%global_1d_start_idx > &
00587 in_block_info(i)%global_1d_start_idx .and. &
00588 j > 1)
00589
00590 out_block_info(j) = out_block_info(j-1)
00591
00592 j = j-1
00593 enddo
00594
00595 out_block_info(j) = in_block_info(i)
00596 enddo
00597 end subroutine insertsort_block_info
00598
00599
00600
00601
00602
00603 subroutine insert_unique_key (key, num_keys, list, list_size)
00604
00605 implicit none
00606
00607 integer, intent(in) :: key, list_size
00608 integer, intent(inout) :: num_keys, list(list_size)
00609
00610 integer :: insert_position
00611
00612
00613 if (num_keys == 0) then
00614 list(1) = key
00615 num_keys = 1
00616 return
00617 endif
00618
00619
00620 do insert_position = 1, num_keys
00621 if (list(insert_position) >= key) exit
00622 enddo
00623
00624
00625
00626 if (insert_position == num_keys .and. &
00627 list(insert_position) < key) then
00628
00629
00630 list(insert_position+1) = key
00631 num_keys = num_keys + 1
00632
00633
00634 else if (list(insert_position) /= key) then
00635
00636
00637
00638 list (insert_position+1:num_keys+1) = list (insert_position:num_keys)
00639 list (insert_position) = key
00640 num_keys = num_keys + 1
00641 endif
00642 end subroutine insert_unique_key
00643
00644
00645
00646
00647
00648 logical function is_local_point (point_1d, sorted_block_info, num_blocks)
00649
00650 implicit none
00651
00652 integer, intent(in) :: point_1d, num_blocks
00653 type (block_info), intent(in) :: sorted_block_info(num_blocks)
00654
00655 integer, save :: curr_block = 1
00656
00657
00658 do while (point_1d < sorted_block_info(curr_block)%global_1d_start_idx .and. &
00659 curr_block > 1)
00660 curr_block = curr_block - 1
00661 enddo
00662
00663 do while (point_1d > sorted_block_info(curr_block)%global_1d_end_idx .and. &
00664 curr_block < num_blocks)
00665 curr_block = curr_block + 1
00666 enddo
00667
00668
00669 is_local_point = point_1d >= sorted_block_info(curr_block)%global_1d_start_idx &
00670 .and. &
00671 point_1d <= sorted_block_info(curr_block)%global_1d_end_idx
00672 end function is_local_point
00673
00674
00675
00676 subroutine get_neigh_method_data_dble (grid_id)
00677
00678 use prism_constants
00679 use psmile
00680
00681 implicit none
00682
00683 integer, intent (in) :: grid_id
00684
00685 integer :: method_id
00686
00687 type (coords_block), pointer :: coords_pointer
00688 integer :: npes, rank, comm
00689
00690 double precision, allocatable :: send_buffer(:)
00691 double precision, allocatable :: recv_buffer(:)
00692
00693 integer, pointer :: idx_list(:)
00694
00695 integer :: off, i, n, lstatus(MPI_STATUS_SIZE), ierror, tag,
00696 send_buffer_size, recv_buffer_size
00697
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708 send_buffer_size = 1
00709 recv_buffer_size = 1
00710
00711 do method_id = 1, Number_of_Methods_allocated
00712 if ( Methods(method_id)%grid_id == grid_id ) then
00713 send_buffer_size = max (send_buffer_size, maxval(local_send_list(:)))
00714 recv_buffer_size = max (recv_buffer_size, maxval(local_recv_list(:)))
00715 endif
00716 enddo
00717
00718 allocate(send_buffer(2*send_buffer_size), &
00719 recv_buffer(2*recv_buffer_size), STAT = ierror)
00720
00721 if ( ierror > 0 ) then
00722 call psmile_error ( PRISM_Error_Alloc, 'send_buffer, recv_buffer', &
00723 (/ierror, 2 * send_buffer_size + 2 * recv_buffer_size/), 3, &
00724 __FILE__, __LINE__ )
00725 ierror = PRISM_Error_Alloc
00726 return
00727 endif
00728
00729 do method_id = 1, Number_of_Methods_allocated
00730
00731 if ( Methods(method_id)%grid_id == grid_id ) then
00732
00733 coords_pointer => Methods(method_id)%coords_pointer
00734
00735 npes = Comps(grids(grid_id)%comp_id)%size
00736 rank = Comps(grids(grid_id)%comp_id)%rank
00737 comm = Comps(grids(grid_id)%comp_id)%act_comm
00738
00739 do n = 0, npes-1
00740
00741 if ( local_send_list(n) > 0 ) then
00742
00743 idx_list => local_put_loc_list(n)%vector
00744
00745
00746 do i = 1, local_send_list(n)
00747 send_buffer(i) = &
00748 coords_pointer%coords_dble(1)%vector (idx_list (i))
00749 send_buffer(i+local_send_list(n)) = &
00750 coords_pointer%coords_dble(2)%vector (idx_list (i))
00751 enddo
00752
00753 call psmile_bsend ( send_buffer, 2*local_send_list(n), &
00754 MPI_DOUBLE_PRECISION, n, rank, comm, ierror )
00755 endif
00756 enddo
00757
00758 allocate(Methods(method_id)%gauss2_dble(1)%vector(sum (local_recv_list(:))), &
00759 Methods(method_id)%gauss2_dble(2)%vector(sum (local_recv_list(:))), &
00760 STAT = ierror)
00761
00762 if ( ierror > 0 ) then
00763 call psmile_error ( PRISM_Error_Alloc, 'gauss_dble%vector', &
00764 (/ierror, 2 * sum (local_recv_list(:))/), 2, __FILE__, __LINE__ )
00765 ierror = PRISM_Error_Alloc
00766 return
00767 endif
00768
00769
00770
00771
00772
00773
00774
00775 off = 0
00776 do n = 0, npes-1
00777
00778 if ( local_recv_list(n) > 0 ) then
00779
00780 tag = n
00781 call MPI_Recv ( recv_buffer, 2*local_recv_list(n), &
00782 MPI_DOUBLE_PRECISION, n, tag, comm, lstatus, ierror )
00783
00784 Methods(method_id)%gauss2_dble(1)%vector(off+1:off+local_recv_list(n)) = &
00785 recv_buffer(1:local_recv_list(n))
00786
00787 Methods(method_id)%gauss2_dble(2)%vector(off+1:off+local_recv_list(n)) = &
00788 recv_buffer(local_recv_list(n)+1:2*local_recv_list(n))
00789
00790 off = off + local_recv_list(n)
00791 endif
00792 enddo
00793 endif
00794 enddo
00795
00796 deallocate ( send_buffer )
00797 deallocate ( recv_buffer )
00798
00799 end subroutine get_neigh_method_data_dble
00800
00801
00802
00803 subroutine get_neigh_method_data_real (grid_id)
00804
00805 use prism_constants
00806 use psmile
00807
00808 implicit none
00809
00810 integer, intent (in) :: grid_id
00811
00812 integer :: method_id
00813
00814 type (coords_block), pointer :: coords_pointer
00815 integer :: npes, rank, comm
00816
00817 real, allocatable :: send_buffer(:)
00818 real, allocatable :: recv_buffer(:)
00819
00820 integer, pointer :: idx_list(:)
00821
00822 integer :: off, i, n, lstatus(MPI_STATUS_SIZE), ierror, tag,
00823 send_buffer_size, recv_buffer_size
00824
00825
00826
00827
00828
00829
00830
00831
00832
00833
00834
00835 send_buffer_size = 1
00836 recv_buffer_size = 1
00837
00838 do method_id = 1, Number_of_Methods_allocated
00839 if ( Methods(method_id)%grid_id == grid_id ) then
00840 send_buffer_size = max (send_buffer_size, maxval(local_send_list(:)))
00841 recv_buffer_size = max (recv_buffer_size, maxval(local_recv_list(:)))
00842 endif
00843 enddo
00844
00845 allocate(send_buffer(2*send_buffer_size), &
00846 recv_buffer(2*recv_buffer_size), STAT = ierror)
00847
00848 if ( ierror > 0 ) then
00849 call psmile_error ( PRISM_Error_Alloc, 'send_buffer, recv_buffer', &
00850 (/ierror, 2 * send_buffer_size + 2 * recv_buffer_size/), 3, &
00851 __FILE__, __LINE__ )
00852 ierror = PRISM_Error_Alloc
00853 return
00854 endif
00855
00856 do method_id = 1, Number_of_Methods_allocated
00857
00858 if ( Methods(method_id)%grid_id == grid_id ) then
00859
00860 coords_pointer => Methods(method_id)%coords_pointer
00861
00862 npes = Comps(grids(grid_id)%comp_id)%size
00863 rank = Comps(grids(grid_id)%comp_id)%rank
00864 comm = Comps(grids(grid_id)%comp_id)%act_comm
00865
00866 do n = 0, npes-1
00867
00868 if ( local_send_list(n) > 0 ) then
00869
00870 idx_list => local_put_loc_list(n)%vector
00871
00872
00873 do i = 1, local_send_list(n)
00874 send_buffer(i) = &
00875 coords_pointer%coords_real(1)%vector (idx_list (i))
00876 send_buffer(i+local_send_list(n)) = &
00877 coords_pointer%coords_real(2)%vector (idx_list (i))
00878 enddo
00879
00880 call psmile_bsend ( send_buffer, 2*local_send_list(n), &
00881 MPI_REAL, n, rank, comm, ierror )
00882 endif
00883 enddo
00884
00885 allocate(Methods(method_id)%gauss2_real(1)%vector(sum (local_recv_list(:))), &
00886 Methods(method_id)%gauss2_real(2)%vector(sum (local_recv_list(:))), &
00887 STAT = ierror)
00888
00889 if ( ierror > 0 ) then
00890 call psmile_error ( PRISM_Error_Alloc, 'gauss_real%vector', &
00891 (/ierror, 2 * sum (local_recv_list(:))/), 2, __FILE__, __LINE__ )
00892 ierror = PRISM_Error_Alloc
00893 return
00894 endif
00895
00896
00897
00898
00899
00900
00901
00902 off = 0
00903 do n = 0, npes-1
00904
00905 if ( local_recv_list(n) > 0 ) then
00906
00907 tag = n
00908 call MPI_Recv ( recv_buffer, 2*local_recv_list(n), &
00909 MPI_REAL, n, tag, comm, lstatus, ierror )
00910
00911 Methods(method_id)%gauss2_real(1)%vector(off+1:off+local_recv_list(n)) = &
00912 recv_buffer(1:local_recv_list(n))
00913
00914 Methods(method_id)%gauss2_real(2)%vector(off+1:off+local_recv_list(n)) = &
00915 recv_buffer(local_recv_list(n)+1:2*local_recv_list(n))
00916
00917 off = off + local_recv_list(n)
00918 endif
00919 enddo
00920 endif
00921 enddo
00922
00923 deallocate ( send_buffer )
00924 deallocate ( recv_buffer )
00925
00926 end subroutine get_neigh_method_data_real
00927
00928 end subroutine PSMILe_Gauss_get_neighbours
00929