psmile_gauss_get_neighbours.F90

Go to the documentation of this file.
00001 !-----------------------------------------------------------------------
00002 ! Copyright 2006-2010, NEC Europe Ltd., London, UK.
00003 ! Copyright 2011, DKRZ, Hamburg, Germany.
00004 ! All rights reserved. Use is subject to OASIS4 license terms.
00005 !-----------------------------------------------------------------------
00006 !BOP
00007 !
00008 ! !ROUTINE: PSMILe_Gauss_get_neighbours
00009 !
00010 ! !INTERFACE:
00011 
00012       subroutine psmile_gauss_get_neighbours (grid_id, ierror)
00013 !
00014 ! !USES:
00015 !
00016       use PRISM_constants
00017 !
00018       use PSMILe, dummy_interface => PSMILe_Gauss_get_neighbours
00019 !
00020       use psmile_grid_reduced_gauss
00021 
00022       Implicit none
00023 !
00024 ! !INPUT PARAMETERS:
00025 !
00026       Integer,         Intent (In)    :: grid_id
00027 
00028 !     Grid Id of the source grid (grid on which the points are searched)
00029 !
00030 ! !INPUT/OUTPUT PARAMETERS:
00031 !
00032 !
00033 ! !OUTPUT PARAMETERS:
00034 !
00035       Integer, Intent (Out)           :: ierror
00036 
00037 !     Returns the error code of PSMILe_Gauss_get_neighbours;
00038 !             ierror = 0 : No error
00039 !             ierror > 0 : Severe error
00040 !
00041 ! !LOCAL VARIABLES
00042 !
00043 !     ... for reduced grid type
00044 !
00045       Type(Grid), Pointer             :: gp
00046 !
00047 ! ---------------------------------------------------------
00048 !
00049       Integer                         :: n, num_local_points
00050 !
00051 !     ... for transformation
00052 !
00053       Integer                         :: off
00054 !
00055       Integer, Allocatable            :: bicu_stencils(:,:)
00056       Integer, Allocatable            :: neigh_point_list(:)     ! contains the local list of
00057                                                                  ! neigh points that are
00058                                                                  ! located on other processes
00059       Integer, Allocatable            :: all_neigh_point_list(:) ! contains all
00060                                                                  ! neigh_point_list's from all
00061                                                                  ! processes
00062       Integer                         :: num_neigh_points
00063       Integer, Allocatable            :: num_remote_neigh_points(:)
00064       Type (block_info), Allocatable  :: sorted_block_info(:)
00065       Integer                         :: curr_block
00066       Integer                         :: num_blocks
00067       Integer                         :: curr_point
00068 
00069       Integer, Allocatable            :: displs(:)
00070       Integer, Allocatable            :: local_send_list(:)
00071       Integer, Allocatable            :: local_recv_list(:)
00072 
00073       Type(integer_vector), Allocatable :: local_get_list(:), 
00074                                            local_put_list(:), 
00075                                            local_put_loc_list(:)
00076 !
00077 !     ... loop variables
00078 !
00079       Integer                         :: i, j
00080 !
00081 !     ... for exchange
00082 !
00083       Integer                         :: npes
00084       Integer                         :: rank
00085       Integer                         :: comm
00086       Integer                         :: status(MPI_STATUS_SIZE)
00087       Integer, Pointer                :: remote_index(:)
00088 !
00089 !     ... for error handling
00090 !
00091       Integer, Parameter              :: nerrp = 2
00092       Integer                         :: ierrp (nerrp)
00093 !
00094 ! !DESCRIPTION:
00095 !
00096 ! Subroutine "PSMILe_Gauss_get_neighbours" determines the 
00097 ! neighbours required from coinciding blocks 
00098 ! for a grid of type "PRISM_Gaussreduced_regvrt".
00099 !
00100 ! In addition it collect the coordinate and method point data
00101 ! of the previously determined neighbours and stores them
00102 ! locally.
00103 !
00104 ! !REVISION HISTORY:
00105 !
00106 !   Date      Programmer   Description
00107 ! ----------  ----------   -----------
00108 ! 13.07.07    H. Ritzdorf  created
00109 ! 30.01.08    R. Redler    revised
00110 ! 13.12.2010  M. Hanke     fixed and simplified computation of star array
00111 ! 25.01.2011  M. Hanke     introduced module psmile_grid_reduced_gauss
00112 ! 15.02.2011  M. Hanke     moved collection of method point data from
00113 !                          psmile_gauss_setup_dble/real to this routine
00114 ! 18.03.2011  M. Hanke     moved a couple of global variables used by reduced
00115 !                          gauss grids from global to local scope
00116 !
00117 !EOP
00118 !----------------------------------------------------------------------
00119 !
00120 !  $Id: psmile_gauss_get_neighbours.F90 3032 2011-03-18 17:29:36Z hanke $
00121 !  $Author: hanke $
00122 !
00123    Character(len=len_cvs_string), save :: mycvs = 
00124        '$Id: psmile_gauss_get_neighbours.F90 3032 2011-03-18 17:29:36Z hanke $'
00125 !
00126 !----------------------------------------------------------------------
00127    !
00128    !  Initialization
00129    !
00130 #ifdef VERBOSE
00131    print 9990, trim(ch_id)
00132 
00133    call psmile_flushstd
00134 #endif /* VERBOSE */
00135    !
00136    ierror = 0
00137    !
00138    gp => Grids(grid_id)
00139 
00140 #ifdef PRISM_ASSERTION
00141    if (gp%grid_type /= PRISM_Gaussreduced_regvrt) then
00142       print *, "grid_type", gp%grid_type
00143       call psmile_assert (__FILE__, __LINE__, &
00144            "This routine is not designed for this grid type")
00145    endif
00146 #endif
00147 !
00148 !===> Get list of required neighbours
00149 !
00150    num_local_points = gp%grid_shape(2,1) - gp%grid_shape(1,1) + 1
00151 
00152    Allocate (bicu_stencils(16,gp%grid_shape(1,1):gp%grid_shape(2,1)), stat = ierror)
00153    if (ierror /= 0) then
00154       ierrp (1) = ierror
00155       ierrp (2) = num_local_points*16
00156 
00157       ierror = PRISM_Error_Alloc
00158       call psmile_error ( ierror, 'bicu_stencils', ierrp, 2, &
00159            __FILE__, __LINE__ )
00160       return
00161    endif
00162 !
00163 !===> Establish the connectivity of cells
00164 !
00165    do i = gp%grid_shape(1,1), gp%grid_shape(2,1)
00166       bicu_stencils(:,i) = psmile_gauss_3d_to_global_1d(grid_id,              &
00167                               psmile_gauss_get_bicu_stencil (grid_id,         &
00168                                  psmile_gauss_local_1d_to_3d(grid_id, i)), 16)
00169    enddo ! i
00170 
00171 #ifdef DEBUGX
00172    print *, ' Initial stencil array with global indices ' 
00173    do i = 1, num_local_points
00174       write( *, '(a13,17i8)') 'Initial stencil ', i, bicu_stencils(:,i)
00175    enddo
00176    call psmile_flushstd
00177 #endif
00178 
00179 
00180    ! if we require points from remote processes
00181    if ( num_local_points /= sum (gp%reduced_gauss_data%nbr_points_per_lat) ) then
00182 
00183       num_blocks = size(gp%reduced_gauss_data%local_block_info)
00184       Allocate (sorted_block_info(num_blocks), &
00185                 stat = ierror)
00186       if (ierror /= 0) then
00187          ierrp (1) = ierror
00188          ierrp (2) = size(gp%reduced_gauss_data%local_block_info)
00189 
00190          ierror = PRISM_Error_Alloc
00191          call psmile_error ( ierror, 'local_block_info', ierrp, 2, &
00192               __FILE__, __LINE__ )
00193          return
00194       endif
00195 
00196       ! sort the list of local block infos in order to optimise access to it
00197       call insertsort_block_info (gp%reduced_gauss_data%local_block_info, &
00198                                   sorted_block_info, num_blocks)
00199 
00200       Allocate (neigh_point_list(num_local_points*15), stat = ierror)
00201       if (ierror /= 0) then
00202          ierrp (1) = ierror
00203          ierrp (2) = num_local_points*15
00204 
00205          ierror = PRISM_Error_Alloc
00206          call psmile_error ( ierror, 'neigh_point_list', ierrp, 2, &
00207               __FILE__, __LINE__ )
00208          return
00209       endif
00210 
00211       num_neigh_points = 0
00212       curr_block = 1
00213 
00214       ! for all local points
00215       do i = gp%grid_shape(1,1), gp%grid_shape(2,1)
00216 
00217          ! for all neighbours of the current local cell
00218          do n = 1, 16
00219 
00220             ! we do not need to check the center of the neigh stencil because
00221             ! this is out local point
00222             if (n == 6) cycle
00223 
00224             ! get the global 1D index of the current point
00225             curr_point = bicu_stencils(n, i)
00226 
00227             ! if no local block was found that contains the current point
00228             if (.not. is_local_point(curr_point, sorted_block_info, num_blocks)) then
00229             
00230                ! add current point to the list of remote point that need to be searched for
00231                ! insert_unique_key ensures, that each point is only once in the list
00232                call insert_unique_key(curr_point, num_neigh_points, &
00233                                       neigh_point_list, num_local_points*15)
00234             endif
00235 
00236          enddo ! n
00237       enddo ! i
00238 
00239 #ifdef DEBUGX
00240       print *, 'List of neighbour requests '
00241       do n = 1, num_neigh_points
00242           print *, '  Entry ', n, ':', neigh_point_list (n) 
00243       enddo
00244       call psmile_flushstd
00245 #endif
00246       !
00247       !  Exchange index lists
00248       !
00249       npes = Comps(gp%comp_id)%size
00250       rank = Comps(gp%comp_id)%rank
00251       comm = Comps(gp%comp_id)%act_comm
00252 
00253       Allocate(num_remote_neigh_points(0:npes-1), displs(0:npes-1), stat = ierror)
00254       if (ierror /= 0) then
00255          ierrp (1) = ierror
00256          ierrp (2) = npes
00257          ierror = PRISM_Error_Alloc
00258          call psmile_error ( ierror, 'num_remote_neigh_points', ierrp, 2, &
00259               __FILE__, __LINE__ )
00260          return
00261       endif
00262 
00263       call MPI_Allgather ( num_neigh_points, 1, MPI_Integer, &
00264                            num_remote_neigh_points(0), 1,    &
00265                            MPI_Integer, comm, ierror )
00266 
00267 #ifdef DEBUGX
00268       print *, 'List of requests'
00269       do n = 0, npes-1
00270           print *, '  Proc # ', n, ':', num_remote_neigh_points(n)
00271       enddo
00272       call psmile_flushstd
00273 #endif
00274 
00275       Allocate( all_neigh_point_list(sum(num_remote_neigh_points)), stat = ierror)
00276       if (ierror /= 0) then
00277          ierrp (1) = ierror
00278          ierrp (2) = sum(num_remote_neigh_points)
00279          ierror = PRISM_Error_Alloc
00280          call psmile_error ( ierror, 'all_neigh_point_list', ierrp, 2, &
00281               __FILE__, __LINE__ )
00282          return
00283       endif
00284 
00285       displs(0) = 0
00286       do n = 1, npes-1
00287          displs(n) = displs (n-1) + num_remote_neigh_points(n-1)
00288       enddo
00289 
00290       ! collect the neigh_point_list's of all other processes
00291       call MPI_Allgatherv ( neigh_point_list(1), num_neigh_points, MPI_Integer,       &
00292                             all_neigh_point_list(1), num_remote_neigh_points, displs, &
00293                             MPI_Integer, comm, ierror )
00294 
00295 #ifdef DEBUGX
00296       print *, 'List of complete requests '
00297       off = 0
00298       do n = 0, npes-1
00299          do i = 1, num_remote_neigh_points(n)
00300             print *, '  Proc # ', n, ' Entry ', i, ':', all_neigh_point_list (off+i) 
00301          enddo
00302          off = off + num_remote_neigh_points(n)
00303       enddo
00304       call psmile_flushstd
00305 #endif
00306 
00307       Allocate( local_send_list(0:npes-1), local_recv_list(0:npes-1), stat = ierror)
00308       if (ierror /= 0) then
00309          ierrp (1) = ierror
00310          ierrp (2) = 2*npes
00311          ierror = PRISM_Error_Alloc
00312          call psmile_error ( ierror, 'local_send/recv_list', ierrp, 2, &
00313               __FILE__, __LINE__ )
00314          return
00315       endif
00316 
00317       local_send_list = 0
00318 
00319       ! for all processes
00320       do n = 0, npes-1
00321 
00322          ! if it is a remote process
00323          if ( n /= rank ) then
00324 
00325             ! for all points the remote process is searching for
00326             do j = displs(n)+1, displs(n)+num_remote_neigh_points(n)
00327 
00328                ! if the point is locally available
00329                ! instead of is_local_point a more optimised function
00330                ! could be used which takes into account, that the neigh
00331                ! points of each process are sorted
00332                if (is_local_point(all_neigh_point_list(j), &
00333                                   sorted_block_info,       &
00334                                   num_blocks)) then
00335 
00336                   local_send_list(n) = local_send_list(n) + 1
00337                else
00338                   all_neigh_point_list(j) = psmile_undef
00339                endif
00340             enddo ! j
00341          endif ! ( n /= rank )
00342       enddo ! n
00343 
00344       !
00345       !  Distribute size of lists
00346       !
00347       call MPI_Alltoall   ( local_send_list, 1, MPI_Integer, &
00348                             local_recv_list, 1, MPI_Integer, comm, ierror )
00349 
00350       !
00351       !  Allocate memory to store lists
00352       !
00353 
00354       Allocate ( local_get_list    (0:npes-1), &
00355                  local_put_list    (0:npes-1), &
00356                  local_put_loc_list(0:npes-1), stat = ierror)
00357       if (ierror /= 0) then
00358          ierrp (1) = ierror
00359          ierrp (2) = 3*npes
00360          ierror = PRISM_Error_Alloc
00361          call psmile_error ( ierror, 'local_get/put/put_loc_list', ierrp, 2, &
00362               __FILE__, __LINE__ )
00363          return
00364       endif
00365 
00366       ! for all processes
00367       do n = 0, npes-1
00368 
00369          Nullify  (local_get_list(n)%vector, &
00370                    local_put_list(n)%vector, &
00371                    local_put_loc_list(n)%vector)
00372 
00373          if ( local_recv_list(n) > 0 ) then
00374             Allocate (local_get_list(n)%vector(local_recv_list(n)), stat = ierror)
00375             if (ierror /= 0) then
00376                ierrp (1) = ierror
00377                ierrp (2) = local_recv_list(n)
00378                ierror = PRISM_Error_Alloc
00379                call psmile_error ( ierror, 'local_get_list vector', ierrp, 2, &
00380                     __FILE__, __LINE__ )
00381                return
00382             endif
00383          endif
00384 
00385          if ( local_send_list(n) > 0 ) then
00386             Allocate (local_put_list(n)%vector(local_send_list(n)), &
00387                       local_put_loc_list(n)%vector(local_send_list(n)), stat = ierror)
00388             if (ierror /= 0) then
00389                ierrp (1) = ierror
00390                ierrp (2) = 2*local_send_list(n)
00391                ierror = PRISM_Error_Alloc
00392                call psmile_error ( ierror, 'local_put_list vector', ierrp, 2, &
00393                     __FILE__, __LINE__ )
00394                return
00395             endif
00396          endif
00397 
00398       enddo
00399       !
00400       !  Fill the list.
00401       !
00402       ! for all processes
00403       do n = 0, npes-1
00404 
00405          ! if it is a remote process and it is supposed to receive data
00406          if ( n /= rank .and. local_send_list(n) > 0) then
00407 
00408             i = 0
00409             ! for all points the remote process is searching for
00410             do j = displs(n)+1, displs(n)+num_remote_neigh_points(n)
00411 
00412                ! if the point is locally available
00413                if (all_neigh_point_list(j) /= psmile_undef) then
00414 
00415                   i = i + 1
00416                   local_put_list(n)%vector(i) = all_neigh_point_list(j)
00417                endif
00418             enddo ! j
00419 
00420 #ifdef PRISM_ASSERTION
00421             if (i /= local_send_list(n)) then
00422 
00423                   print *, "something went terribly wrong here..."
00424                   call psmile_assert (__FILE__, __LINE__, &
00425                                       'I do not know what happened.')
00426             endif
00427 #endif
00428             local_put_loc_list(n)%vector(:) = &
00429                psmile_gauss_1d_global_to_local (grid_id, local_put_list(n)%vector(:), &
00430                                                 i, psmile_undef)
00431          endif ! ( n /= rank )
00432       enddo ! n
00433       !
00434       ! Exchange lists (active process for send/recv op when
00435       !                 respective list is longer than 0)
00436       !
00437       !
00438       do n = 0, npes-1
00439 
00440          if ( local_send_list(n) > 0 ) &
00441             call psmile_bsend ( local_put_list(n)%vector, local_send_list(n), &
00442                                 MPI_Integer, n, GAUSSNEIGHTAG+rank, comm, ierror )
00443       enddo
00444       do n = 0, npes-1
00445 
00446          if ( local_recv_list(n) > 0 ) &
00447             call MPI_Recv ( local_get_list(n)%vector, local_recv_list(n), &
00448                             MPI_Integer, n, GAUSSNEIGHTAG+n, comm, status, ierror )
00449       enddo
00450 
00451       num_neigh_points = sum(local_recv_list)
00452 
00453       Allocate (gp%reduced_gauss_data%remote_index(num_neigh_points), stat = ierror)
00454       if (ierror /= 0) then
00455          ierrp (1) = ierror
00456          ierrp (2) = num_neigh_points
00457 
00458          ierror = PRISM_Error_Alloc
00459          call psmile_error ( ierror, 'remote_index', ierrp, 2, &
00460               __FILE__, __LINE__ )
00461          return
00462       endif
00463 
00464       remote_index => gp%reduced_gauss_data%remote_index
00465 
00466       off = 0
00467       do n = 0, npes-1
00468          if ( local_recv_list(n) > 0 ) then
00469             remote_index(off+1:off+local_recv_list(n)) = &
00470                                local_get_list(n)%vector(1:+local_recv_list(n))
00471             off = off + local_recv_list(n)
00472          endif
00473       enddo
00474 
00475 #ifdef DEBUGX
00476       print *, 'List of remote indices '
00477       do i = 1, num_neigh_points
00478          print *, 'i: ', i, ' index: ', remote_index(i)
00479       enddo
00480       call psmile_flushstd
00481 #endif
00482 
00483       ! collect the method coordinate data from neighbouring processes
00484       if ( gp%corner_pointer%corner_datatype == MPI_DOUBLE_PRECISION ) then
00485 
00486          call get_neigh_method_data_dble (grid_id)
00487 
00488       else if ( gp%corner_pointer%corner_datatype == MPI_REAL ) then
00489 
00490          call get_neigh_method_data_real (grid_id)
00491 
00492       else
00493 
00494          ierror = PRISM_Error_Internal
00495          call psmile_error ( ierror, 'datatype is currently not supported', &
00496               (/-1/), 0, __FILE__, __LINE__ )
00497       endif
00498 
00499       do i = 0, npes-1
00500          if (associated(local_get_list(i)%vector)) deallocate(local_get_list(i)%vector)
00501       enddo
00502       do i = 0, npes-1
00503          if (associated(local_put_list(i)%vector)) deallocate(local_put_list(i)%vector)
00504       enddo
00505       do i = 0, npes-1
00506          if (associated(local_put_loc_list(i)%vector)) deallocate(local_put_loc_list(i)%vector)
00507       enddo
00508 
00509       Deallocate ( num_remote_neigh_points, &
00510                    displs,                  &
00511                    all_neigh_point_list,    &
00512                    sorted_block_info,       &
00513                    local_send_list,         &
00514                    local_recv_list,         &
00515                    local_get_list,          &
00516                    local_put_list,          &
00517                    local_put_loc_list,      &
00518                    neigh_point_list )
00519 
00520    endif ! num_local_points /= sum (gp%reduced_gauss_data%nbr_points_per_lat)
00521 
00522    Allocate (gp%reduced_gauss_data%local_1d_stencil_lookup(16,&
00523                gp%grid_shape(1,1):gp%grid_shape(2,1)), stat = ierror)
00524    if (ierror /= 0) then
00525       ierrp (1) = ierror
00526       ierrp (2) = gp%grid_shape(2,1)-gp%grid_shape(1,1)+1
00527 
00528       ierror = PRISM_Error_Alloc
00529       call psmile_error ( ierror, 'local_1d_stencil_lookup', ierrp, 2, &
00530            __FILE__, __LINE__ )
00531       return
00532    endif
00533 
00534    ! computes a lookup table which is later used for optimisation
00535    do i = gp%grid_shape(1,1), gp%grid_shape(2,1)
00536       gp%reduced_gauss_data%local_1d_stencil_lookup(:,i) =  &
00537          psmile_gauss_3d_to_local_1d(grid_id,               &
00538             psmile_gauss_get_bicu_stencil (grid_id,         &
00539                psmile_gauss_local_1d_to_3d(grid_id, i)),    &
00540          16, psmile_undef, .true.)
00541    enddo ! i
00542 
00543    !
00544    !===> Free Memory
00545    !
00546 
00547    Deallocate ( bicu_stencils )
00548 
00549    !
00550    !===> All done
00551    !
00552 #ifdef VERBOSE
00553    print 9980, trim(ch_id), num_neigh_points
00554 
00555    call psmile_flushstd
00556 #endif /* VERBOSE */
00557    !
00558    !  Formats:
00559    !
00560 9990 format (1x, a, ': psmile_gauss_get_neighbours')
00561 9980 format (1x, a, ': psmile_gauss_get_neighbours: eof, number of requested points ', i8)
00562 
00563    contains
00564 
00565 !---------------------------------------------------------------------------------------------------
00566 
00567       ! creates a block info list which is sorted by the global 1d indices
00568       ! the algorithm is a standard insert sort with the exception that
00569       ! this is not in-place
00570       subroutine insertsort_block_info (in_block_info, out_block_info, num_blocks)
00571 
00572          implicit none
00573 
00574          integer, intent(in)            :: num_blocks         
00575          type (block_info), intent(in)  :: in_block_info(num_blocks)
00576          type (block_info), intent(out) :: out_block_info(num_blocks)
00577 
00578          integer :: i, j
00579 
00580          out_block_info(1) = in_block_info(1)
00581 
00582          do i = 2, num_blocks
00583 
00584             j = i
00585 
00586             do while (out_block_info(j-1)%global_1d_start_idx >  &
00587                       in_block_info(i)%global_1d_start_idx .and. &
00588                       j > 1)
00589 
00590                out_block_info(j) = out_block_info(j-1)
00591 
00592                j = j-1
00593             enddo
00594 
00595             out_block_info(j) = in_block_info(i)
00596          enddo ! i
00597       end subroutine insertsort_block_info
00598 
00599 !---------------------------------------------------------------------------------------------------
00600 
00601       ! insertes an integer value into a sorted list
00602       ! the resulting list is sorted and every key in the list is unique
00603       subroutine insert_unique_key (key, num_keys, list, list_size)
00604 
00605          implicit none
00606 
00607          integer, intent(in)    :: key, list_size
00608          integer, intent(inout) :: num_keys, list(list_size)
00609 
00610          integer :: insert_position
00611 
00612          ! handle special case in which the list does not contain any key
00613          if (num_keys == 0) then
00614             list(1) = key
00615             num_keys = 1
00616             return
00617          endif
00618 
00619          ! find the insertposition
00620          do insert_position = 1, num_keys
00621             if (list(insert_position) >= key) exit
00622          enddo
00623 
00624          ! if the list did not contain any value that was bigger or
00625          ! equal to the key
00626          if (insert_position == num_keys .and. &
00627              list(insert_position) < key) then
00628 
00629             ! append key to the end of the list
00630             list(insert_position+1) = key
00631             num_keys = num_keys + 1
00632 
00633          ! if the key is not already in the list
00634          else if (list(insert_position) /= key) then
00635 
00636             ! put key into the list and move all bigger
00637             ! values one step up in the list
00638             list (insert_position+1:num_keys+1) = list (insert_position:num_keys)
00639             list (insert_position) = key
00640             num_keys = num_keys + 1
00641          endif
00642       end subroutine insert_unique_key
00643 
00644 !---------------------------------------------------------------------------------------------------
00645 
00646       ! this function takes a 1D global point index and determines whether
00647       ! the point is in one of the local blocks
00648       logical function is_local_point (point_1d, sorted_block_info, num_blocks)
00649 
00650          implicit none
00651 
00652          integer, intent(in)           :: point_1d, num_blocks
00653          type (block_info), intent(in) :: sorted_block_info(num_blocks)
00654 
00655          integer, save :: curr_block = 1
00656 
00657          ! try to find a block which contains the current point
00658          do while (point_1d < sorted_block_info(curr_block)%global_1d_start_idx .and. &
00659                    curr_block > 1)
00660             curr_block = curr_block - 1
00661          enddo
00662 
00663          do while (point_1d > sorted_block_info(curr_block)%global_1d_end_idx .and. &
00664                    curr_block < num_blocks)
00665             curr_block = curr_block + 1
00666          enddo
00667 
00668          ! if the given point is in the block
00669          is_local_point = point_1d >= sorted_block_info(curr_block)%global_1d_start_idx &
00670                           .and. &
00671                           point_1d <= sorted_block_info(curr_block)%global_1d_end_idx
00672       end function is_local_point
00673 
00674 !---------------------------------------------------------------------------------------------------
00675 
00676       subroutine get_neigh_method_data_dble (grid_id)
00677 
00678          use prism_constants
00679          use psmile
00680 
00681          implicit none
00682 
00683          integer, intent (in)          :: grid_id
00684 
00685          integer :: method_id
00686 
00687          type (coords_block), pointer  :: coords_pointer
00688          integer                       :: npes, rank, comm
00689 
00690          double precision, allocatable :: send_buffer(:)
00691          double precision, allocatable :: recv_buffer(:)
00692 
00693          integer, pointer              :: idx_list(:) 
00694 
00695          integer                       :: off, i, n, lstatus(MPI_STATUS_SIZE), ierror, tag, 
00696                                           send_buffer_size, recv_buffer_size
00697 
00698          !
00699          !-----------------------------------------------------------------
00700          ! Loop over all method_id for this grid_id and collect required
00701          ! method coordinates from remote pes if necessary. The gauss2
00702          ! array will be used later in psmile_mg_method_gauss2.
00703          ! The connectivity is based on the cell indices, however various
00704          ! sets of points may be specified. The assumption is that the
00705          ! connectivity is identical for all sets of points.
00706          !-----------------------------------------------------------------
00707          !
00708          send_buffer_size = 1
00709          recv_buffer_size = 1
00710 
00711          do method_id = 1, Number_of_Methods_allocated
00712             if ( Methods(method_id)%grid_id == grid_id ) then
00713                send_buffer_size = max (send_buffer_size, maxval(local_send_list(:)))
00714                recv_buffer_size = max (recv_buffer_size, maxval(local_recv_list(:)))
00715             endif ! ( Methods(method_id)%grid_id == grid_id )
00716          enddo ! method_id
00717 
00718          allocate(send_buffer(2*send_buffer_size), &
00719                   recv_buffer(2*recv_buffer_size), STAT = ierror)
00720 
00721          if ( ierror > 0 ) then
00722             call psmile_error ( PRISM_Error_Alloc, 'send_buffer, recv_buffer', &
00723                (/ierror, 2 * send_buffer_size + 2 * recv_buffer_size/), 3,   &
00724                __FILE__, __LINE__ )
00725             ierror = PRISM_Error_Alloc
00726             return
00727          endif
00728 
00729          do method_id = 1, Number_of_Methods_allocated
00730 
00731             if ( Methods(method_id)%grid_id == grid_id ) then
00732 
00733                coords_pointer => Methods(method_id)%coords_pointer
00734 
00735                npes = Comps(grids(grid_id)%comp_id)%size
00736                rank = Comps(grids(grid_id)%comp_id)%rank
00737                comm = Comps(grids(grid_id)%comp_id)%act_comm
00738 
00739                do n = 0, npes-1
00740 
00741                   if ( local_send_list(n) > 0 ) then
00742 
00743                      idx_list => local_put_loc_list(n)%vector 
00744 
00745                      ! fill buffer with x and y method coordinates
00746                      do i = 1, local_send_list(n)
00747                         send_buffer(i)              = &
00748                            coords_pointer%coords_dble(1)%vector (idx_list (i))
00749                         send_buffer(i+local_send_list(n)) = &
00750                            coords_pointer%coords_dble(2)%vector (idx_list (i))
00751                      enddo
00752 
00753                      call psmile_bsend ( send_buffer, 2*local_send_list(n), &
00754                         MPI_DOUBLE_PRECISION, n, rank, comm, ierror )
00755                   endif
00756                enddo ! n
00757 
00758                allocate(Methods(method_id)%gauss2_dble(1)%vector(sum (local_recv_list(:))), &
00759                         Methods(method_id)%gauss2_dble(2)%vector(sum (local_recv_list(:))), &
00760                         STAT = ierror)
00761 
00762                if ( ierror > 0 ) then
00763                   call psmile_error ( PRISM_Error_Alloc, 'gauss_dble%vector', &
00764                      (/ierror, 2 * sum (local_recv_list(:))/), 2, __FILE__, __LINE__ )
00765                   ierror = PRISM_Error_Alloc
00766                   return
00767                endif
00768 
00769                !
00770                ! Store remote coordinates.
00771                !
00772                ! Value at Methods(method_id)%gauss2_dble(2)%vector(i) corresponds
00773                ! to global index at grids(grid_id)%remote_index(i)
00774                !
00775                off = 0
00776                do n = 0, npes-1
00777 
00778                   if ( local_recv_list(n) > 0 ) then
00779 
00780                      tag = n
00781                      call MPI_Recv ( recv_buffer, 2*local_recv_list(n), &
00782                         MPI_DOUBLE_PRECISION, n, tag, comm, lstatus, ierror )
00783 
00784                      Methods(method_id)%gauss2_dble(1)%vector(off+1:off+local_recv_list(n)) = &
00785                         recv_buffer(1:local_recv_list(n))
00786 
00787                      Methods(method_id)%gauss2_dble(2)%vector(off+1:off+local_recv_list(n)) = &
00788                         recv_buffer(local_recv_list(n)+1:2*local_recv_list(n))
00789 
00790                      off = off + local_recv_list(n)
00791                   endif
00792                enddo ! n
00793             endif ! Methods(method_id)%grid_id
00794          enddo ! method_id
00795 
00796          deallocate ( send_buffer )
00797          deallocate ( recv_buffer )
00798 
00799       end subroutine get_neigh_method_data_dble
00800 
00801 !---------------------------------------------------------------------------------------------------
00802 
00803       subroutine get_neigh_method_data_real (grid_id)
00804 
00805          use prism_constants
00806          use psmile
00807 
00808          implicit none
00809 
00810          integer, intent (in)          :: grid_id
00811 
00812          integer :: method_id
00813 
00814          type (coords_block), pointer  :: coords_pointer
00815          integer                       :: npes, rank, comm
00816 
00817          real, allocatable             :: send_buffer(:)
00818          real, allocatable             :: recv_buffer(:)
00819 
00820          integer, pointer              :: idx_list(:)
00821 
00822          integer                       :: off, i, n, lstatus(MPI_STATUS_SIZE), ierror, tag, 
00823                                           send_buffer_size, recv_buffer_size
00824 
00825          !
00826          !-----------------------------------------------------------------
00827          ! Loop over all method_id for this grid_id and collect required
00828          ! method coordinates from remote pes if necessary. The gauss2
00829          ! array will be used later in psmile_mg_method_gauss2.
00830          ! The connectivity is based on the cell indices, however various
00831          ! sets of points may be specified. The assumption is that the
00832          ! connectivity is identical for all sets of points.
00833          !-----------------------------------------------------------------
00834          !
00835          send_buffer_size = 1
00836          recv_buffer_size = 1
00837 
00838          do method_id = 1, Number_of_Methods_allocated
00839             if ( Methods(method_id)%grid_id == grid_id ) then
00840                send_buffer_size = max (send_buffer_size, maxval(local_send_list(:)))
00841                recv_buffer_size = max (recv_buffer_size, maxval(local_recv_list(:)))
00842             endif ! ( Methods(method_id)%grid_id == grid_id )
00843          enddo ! method_id
00844 
00845          allocate(send_buffer(2*send_buffer_size), &
00846                   recv_buffer(2*recv_buffer_size), STAT = ierror)
00847 
00848          if ( ierror > 0 ) then
00849             call psmile_error ( PRISM_Error_Alloc, 'send_buffer, recv_buffer', &
00850                (/ierror, 2 * send_buffer_size + 2 * recv_buffer_size/), 3,   &
00851                __FILE__, __LINE__ )
00852             ierror = PRISM_Error_Alloc
00853             return
00854          endif
00855 
00856          do method_id = 1, Number_of_Methods_allocated
00857 
00858             if ( Methods(method_id)%grid_id == grid_id ) then
00859 
00860                coords_pointer => Methods(method_id)%coords_pointer
00861 
00862                npes = Comps(grids(grid_id)%comp_id)%size
00863                rank = Comps(grids(grid_id)%comp_id)%rank
00864                comm = Comps(grids(grid_id)%comp_id)%act_comm
00865                
00866                do n = 0, npes-1
00867 
00868                   if ( local_send_list(n) > 0 ) then
00869 
00870                      idx_list => local_put_loc_list(n)%vector
00871 
00872                      ! fill buffer with x and y method coordinates
00873                      do i = 1, local_send_list(n)
00874                         send_buffer(i)              = &
00875                            coords_pointer%coords_real(1)%vector (idx_list (i))
00876                         send_buffer(i+local_send_list(n)) = &
00877                            coords_pointer%coords_real(2)%vector (idx_list (i))
00878                      enddo
00879 
00880                      call psmile_bsend ( send_buffer, 2*local_send_list(n), &
00881                         MPI_REAL, n, rank, comm, ierror )
00882                   endif
00883                enddo ! n
00884 
00885                allocate(Methods(method_id)%gauss2_real(1)%vector(sum (local_recv_list(:))), &
00886                         Methods(method_id)%gauss2_real(2)%vector(sum (local_recv_list(:))), &
00887                         STAT = ierror)
00888 
00889                if ( ierror > 0 ) then
00890                   call psmile_error ( PRISM_Error_Alloc, 'gauss_real%vector', &
00891                      (/ierror, 2 * sum (local_recv_list(:))/), 2, __FILE__, __LINE__ )
00892                   ierror = PRISM_Error_Alloc
00893                   return
00894                endif
00895 
00896                !
00897                ! Store remote coordinates.
00898                !
00899                ! Value at Methods(method_id)%gauss2_real(2)%vector(i) corresponds
00900                ! to global index at grids(grid_id)%remote_index(i)
00901                !
00902                off = 0
00903                do n = 0, npes-1
00904 
00905                   if ( local_recv_list(n) > 0 ) then
00906 
00907                      tag = n
00908                      call MPI_Recv ( recv_buffer, 2*local_recv_list(n), &
00909                         MPI_REAL, n, tag, comm, lstatus, ierror )
00910 
00911                      Methods(method_id)%gauss2_real(1)%vector(off+1:off+local_recv_list(n)) = &
00912                         recv_buffer(1:local_recv_list(n))
00913 
00914                      Methods(method_id)%gauss2_real(2)%vector(off+1:off+local_recv_list(n)) = &
00915                         recv_buffer(local_recv_list(n)+1:2*local_recv_list(n))
00916 
00917                      off = off + local_recv_list(n)
00918                   endif
00919                enddo ! n
00920             endif ! Methods(method_id)%grid_id
00921          enddo ! method_id
00922 
00923          deallocate ( send_buffer )
00924          deallocate ( recv_buffer )
00925          
00926       end subroutine get_neigh_method_data_real
00927 
00928  end subroutine PSMILe_Gauss_get_neighbours
00929  

Generated on 1 Dec 2011 for Oasis4 by  doxygen 1.6.1