psmile_gauss_get_neighbours.F90

Go to the documentation of this file.
00001 !-----------------------------------------------------------------------
00002 ! Copyright 2006-2010, NEC Europe Ltd., London, UK.
00003 ! Copyright 2011, DKRZ, Hamburg, Germany.
00004 ! All rights reserved. Use is subject to OASIS4 license terms.
00005 !-----------------------------------------------------------------------
00006 !BOP
00007 !
00008 ! !ROUTINE: PSMILe_Gauss_get_neighbours
00009 !
00010 ! !INTERFACE:
00011 
00012       subroutine psmile_gauss_get_neighbours (grid_id, ierror)
00013 !
00014 ! !USES:
00015 !
00016       use PRISM_constants
00017 !
00018       use PSMILe, dummy_interface => PSMILe_Gauss_get_neighbours
00019 !
00020       use psmile_grid_reduced_gauss
00021 
00022       Implicit none
00023 !
00024 ! !INPUT PARAMETERS:
00025 !
00026       Integer,         Intent (In)    :: grid_id
00027 
00028 !     Grid Id of the source grid (grid on which the points are searched)
00029 !
00030 ! !INPUT/OUTPUT PARAMETERS:
00031 !
00032 !
00033 ! !OUTPUT PARAMETERS:
00034 !
00035       Integer, Intent (Out)           :: ierror
00036 
00037 !     Returns the error code of PSMILe_Gauss_get_neighbours;
00038 !             ierror = 0 : No error
00039 !             ierror > 0 : Severe error
00040 !
00041 ! !LOCAL VARIABLES
00042 !
00043 !     ... for reduced grid type
00044 !
00045       Integer                         :: gnbr_lats, nbr_lats
00046 !
00047       Type(Grid), Pointer             :: gp
00048       Integer, Pointer                :: points_per_lat(:,:)
00049       Integer, Pointer                :: gpoints_per_lat(:)
00050       Integer, Pointer                :: partition (:, :)
00051 
00052       Integer, Pointer                :: star (:, :)
00053 
00054 !     Index of  N-S 'Connectivity' of method points of GaussReduced Grid with
00055 !        star (i, 1) = Index of northern+1 neighbour of "i"-th point
00056 !        star (i, 2) = Index of northern neighbour of "i"-th point
00057 !        star (i, 3) = Index of southern neighbour of "i"-th point
00058 !        star (i, 4) = Index of latitude (local j-row)   of "i"-th point
00059 
00060       Integer, Pointer                :: face (:, :)
00061       integer                         :: tmp_face (16)
00062 
00063 !     Index of the neighbouring point with
00064 !
00065 !        face(i, 1) =   : southeast face (-1,-1)
00066 !        face(i, 2) =   : southern  face ( 0,-1)
00067 !        face(i, 3) =   : southwest face (+1,-1)
00068 !        face(i, 4) =   : eastern   face (-1, 0)
00069 !        face(i, 5) = i : center         ( 0, 0)
00070 !        face(i, 6) =   : western   face (+1, 0)
00071 !        face(i, 7) =   : northeast face (-1,+1)
00072 !        face(i, 8) =   : northern  face ( 0,+1)
00073 !        face(i, 9) =   : northwest face (+1,+1)
00074 !
00075 !        face(i,10) =   : north     face (+2,-1)
00076 !        face(i,11) =   : north     face (+2, 0)
00077 !        face(i,12) =   : north     face (+2,+1)
00078 !        face(i,13) =   : northeast face (+2,+2)
00079 !        face(i,14) =   : east      face (+1,+2)
00080 !        face(i,15) =   : east      face ( 0,+2)
00081 !        face(i,16) =   : east      face (-1,+2)
00082 !
00083 !        face:
00084 !
00085 !        10    11    12    13
00086 !
00087 !
00088 !         7     8     9    14
00089 !
00090 !
00091 !         4     5     6    15
00092 !
00093 !
00094 !         1     2     3    16
00095 !
00096 ! ---------------------------------------------------------
00097 !
00098       Integer, Pointer                :: l_irange (:, :)
00099       Integer, Allocatable            :: irange (:, :)
00100 !
00101       Logical                         :: partitioned
00102       Integer                         :: n, np, nreq
00103       Integer, Allocatable            :: neighs_req (:, :)
00104       Integer, Allocatable            :: neighs_tmp (:)
00105 !
00106 !     ... for transformation
00107 !
00108       Integer                         :: off
00109 !
00110       Integer, Pointer                :: global_beg (:)
00111       Integer, Pointer                :: global_end (:)
00112       Integer, Pointer                :: l2g(:,:)
00113       Integer, Pointer                :: g2l(:,:)
00114       Integer, Allocatable            :: g2l_index  (:)
00115 !
00116 !     ... loop variables
00117 !
00118       Integer                         :: i, ii, j, jg
00119       Integer                         :: jold
00120       Integer                         :: jss, jns
00121       Integer                         :: ibeg, iend
00122 !
00123 !     ... for exchange
00124 !
00125       Integer                         :: npes
00126       Integer                         :: rank
00127       Integer                         :: comm
00128       Integer                         :: tag
00129       Integer                         :: nreq_tot
00130       Integer                         :: status(MPI_STATUS_SIZE)
00131       Integer, Allocatable            :: displs(:)
00132       Integer, Allocatable            :: n_requests(:)
00133       Integer, Allocatable            :: neighs_reqtot(:,:)
00134       Integer, Pointer                :: send_list(:)
00135       Integer, Pointer                :: recv_list(:)
00136       Integer, Pointer                :: remote_index(:)
00137       integer                         :: point_3d(ndim_3d)
00138       Integer                         :: neigh_lat_idx(3)
00139 !
00140 !     ... for error handling
00141 !
00142       Integer, Parameter              :: nerrp = 2
00143       Integer                         :: ierrp (nerrp)
00144 !
00145 ! !DESCRIPTION:
00146 !
00147 ! Subroutine "PSMILe_Gauss_get_neighbours" determines the 
00148 ! neighbours required from coinciding blocks 
00149 ! for a grid of type "PRISM_Gaussreduced_regvrt".
00150 !
00151 ! Remark:
00152 !
00153 ! When constructing the 'connectivity' (star) the following
00154 ! assumptions are made:
00155 !
00156 ! * The global Gauss-reduced grid is cyclic in longitude,
00157 !   e.g. it covers the complete longitude range.
00158 ! * Longitudes are equidistant within a latitude band.
00159 !
00160 ! In addition it collect the coordinate and method point data
00161 ! of the previously determined neighbours and stores them
00162 ! locally.
00163 !
00164 ! !REVISION HISTORY:
00165 !
00166 !   Date      Programmer   Description
00167 ! ----------  ----------   -----------
00168 ! 13.07.07    H. Ritzdorf  created
00169 ! 30.01.08    R. Redler    revised
00170 ! 13.12.10    M. Hanke     fixed and simplified computation of star array
00171 ! 25.01.11    M. Hanke     introduced module psmile_grid_reduced_gauss
00172 ! 15.02.11    M. Hanke     moved collection of method point data from
00173 !                          psmile_gauss_setup_dble/real to this routine
00174 !
00175 !EOP
00176 !----------------------------------------------------------------------
00177 !
00178 !  $Id: psmile_gauss_get_neighbours.F90 3008 2011-03-09 13:44:03Z hanke $
00179 !  $Author: hanke $
00180 !
00181    Character(len=len_cvs_string), save :: mycvs = 
00182        '$Id: psmile_gauss_get_neighbours.F90 3008 2011-03-09 13:44:03Z hanke $'
00183 !
00184 !----------------------------------------------------------------------
00185    !
00186    !  Initialization
00187    !
00188 #ifdef VERBOSE
00189    print 9990, trim(ch_id)
00190 
00191    call psmile_flushstd
00192 #endif /* VERBOSE */
00193    !
00194    ierror = 0
00195    nreq = 0
00196    !
00197    gp => Grids(grid_id)
00198    !
00199    !     Special data for grids of type "PRISM_Gaussreduced_regvrt"
00200    !
00201    !     Local values
00202    !
00203    partition      => gp%partition
00204    points_per_lat => gp%extent
00205    l_irange       => gp%g_irange
00206    nbr_lats       =  size(gp%extent(:,1))
00207    !
00208    !     Global values
00209    !
00210    gnbr_lats       =  gp%nbr_latitudes
00211    gpoints_per_lat => gp%nbr_points_per_lat
00212 
00213 #ifdef PRISM_ASSERTION
00214    if (gp%grid_type /= PRISM_Gaussreduced_regvrt) then
00215       print *, "grid_type", gp%grid_type
00216       call psmile_assert (__FILE__, __LINE__, &
00217            "This routine is not designed for this grid type")
00218    endif
00219 #endif
00220    !
00221    ! global_beg = (Global) Frist index (in lon direction) of
00222    !              global lat index "jg"
00223    ! global_end = (Global) Last index (in lon direction) of
00224    !              global lat index "jg"
00225    !
00226    Allocate (gp%global_beg (gnbr_lats), gp%global_end(gnbr_lats), stat = ierror)
00227    if (ierror /= 0) then
00228       ierrp (1) = ierror
00229       ierrp (2) = 2*gnbr_lats
00230 
00231       ierror = PRISM_Error_Alloc
00232       call psmile_error ( ierror, 'global_beg, global_end', ierrp, 2, &
00233            __FILE__, __LINE__ )
00234       return
00235    endif
00236    !
00237    global_beg => gp%global_beg
00238    global_end => gp%global_end
00239    !
00240    ! l2g        = Transformation from local  lat index "j"
00241    !                             into global lat index "jg"
00242    ! l2g(j,1)   = Global j index jg of j-th parition
00243    !
00244    ! l2g(j,2)   = Next local j partition index with same global index jg
00245    !              -1 : end of list reached
00246    !
00247    ! g2l(jg,1)   = First local j partition which is contained in global index jg
00248    !               -1 : undefined
00249    ! g2l(jg,2)   = last  local j partition which is contained in global index jg
00250    !
00251    Allocate (gp%l2g (nbr_lats,2), gp%g2l(gnbr_lats,2), stat = ierror)
00252    if (ierror /= 0) then
00253       ierrp (1) = ierror
00254       ierrp (2) = nbr_lats * 2 + gnbr_lats * 2
00255 
00256       ierror = PRISM_Error_Alloc
00257       call psmile_error ( ierror, 'l2g, g2l', ierrp, 2, &
00258            __FILE__, __LINE__ )
00259       return
00260    endif
00261    !
00262    l2g => gp%l2g
00263    g2l => gp%g2l
00264    !
00265    Allocate (irange (nbr_lats, 2), stat = ierror)
00266    if (ierror /= 0) then
00267       ierrp (1) = ierror
00268       ierrp (2) = nbr_lats * 2
00269 
00270       ierror = PRISM_Error_Alloc
00271       call psmile_error ( ierror, 'irange', ierrp, 2, &
00272            __FILE__, __LINE__ )
00273       return
00274    endif
00275    !
00276    ! ==> Get start index "global_beg" for each global line in lat direction
00277    !
00278    global_beg(1) = 1
00279    off = 1
00280 
00281    !cdir vector
00282    do jg = 2, gnbr_lats
00283       global_beg(jg) = off + gpoints_per_lat(jg-1)
00284       off = off + gpoints_per_lat(jg-1)
00285    enddo
00286    !
00287    !===> Get end index "global_end" for each global line in lat direction
00288    !
00289    off = 0
00290    !cdir vector
00291    do jg = 1, gnbr_lats 
00292       global_end (jg) = off + gpoints_per_lat(jg)
00293       off = off + gpoints_per_lat(jg)
00294    end do
00295    !
00296    !===> Get transformation from local lat index into global lat index
00297    !     and vice versa
00298    !
00299    g2l (:, 1) = -1
00300    l2g (:, 2) = -1
00301    !
00302    do j = 1, nbr_lats
00303 
00304       do jg = 1, gnbr_lats 
00305          if (partition(j,1) < global_end(jg)) exit
00306       end do
00307 
00308       if (g2l(jg,1) == -1) then
00309          g2l(jg,1) = j
00310          !           ASSERT (l2g(j,2) == -1)
00311       else
00312          jold = g2l (jg,2)
00313          !           ASSERT (l2g(jold,2) == -1)
00314          l2g (jold,2) = j
00315       endif
00316       !
00317       l2g (j, 1) = jg
00318       g2l (jg,2) = j
00319    end do
00320    !
00321 #ifdef PRISM_ASSERTION
00322    do j = 1, nbr_lats
00323       if (l2g(j,1) > gnbr_lats) exit
00324    end do
00325    !
00326    if (j <= nbr_lats) then
00327       print *, "j, partition(j)", j, partition(j, 1)
00328       call psmile_assert (__FILE__, __LINE__, &
00329            "offset not found")
00330    endif
00331 #endif
00332    !
00333    !===> Deterine ranges in latitudes in global space
00334    !
00335    do j = 1, nbr_lats
00336       irange (j,1) = partition(j,1) + 1
00337       irange (j,2) = partition(j,1) + points_per_lat(j,1)
00338    end do
00339    !
00340    !===> Deterine table for global to local index
00341    !
00342    !     This is not the optimal way to do this. Usually
00343    !     the list of boundary data to be send is much smaller
00344    !     than the range ibeg:iend. A real hash table might
00345    !     be a better choice.
00346    !
00347    ibeg = global_beg(l2g(1,1))
00348    iend = global_end(l2g(nbr_lats,1))
00349    !
00350    Allocate (g2l_index(ibeg:iend), stat = ierror)
00351    if (ierror /= 0) then
00352       ierrp (1) = ierror
00353       ierrp (2) = iend - ibeg + 1
00354 
00355       ierror = PRISM_Error_Alloc
00356       call psmile_error ( ierror, 'g2l_index', ierrp, 2, &
00357            __FILE__, __LINE__ )
00358       return
00359    endif
00360    !
00361    g2l_index = PSMILe_Undef
00362    do j = 1, nbr_lats
00363       do i = irange (j,1), irange(j,2)
00364          g2l_index (i) = l_irange(j,1) + i - irange(j,1)
00365       enddo
00366    enddo
00367    !
00368    !===> Get list of required neighbours
00369    !
00370    !     Determine the N-S connectivity in the global Gauss domain
00371    !
00372    np = sum(points_per_lat(:,1))
00373 
00374    Allocate (gp%star(np,4), gp%face(np,16), stat = ierror)
00375    if (ierror /= 0) then
00376       ierrp (1) = ierror
00377       ierrp (2) = np*19
00378 
00379       ierror = PRISM_Error_Alloc
00380       call psmile_error ( ierror, 'star and face', ierrp, 2, &
00381            __FILE__, __LINE__ )
00382       return
00383    endif
00384 
00385    star => gp%star
00386    face => gp%face
00387 
00388    do i = 1, sum(gp%extent(:,1))
00389 
00390       point_3d = psmile_gauss_local_1d_to_3d (grid_id, i)
00391 
00392       star(i,4) = g2l (point_3d(2),1)
00393 
00394       neigh_lat_idx(3) = max (point_3d(2)-1, 1) ! row index of southern neighbour (if we are on the south pole
00395                                                 ! then our neighbour is on the other side of the pole)
00396       neigh_lat_idx(2) = min (point_3d(2)+1, gnbr_lats) ! row index of northern neighbour (if we are on the
00397                                                         ! north pole then our neighbour is on the other side
00398                                                         ! of the pole)
00399       neigh_lat_idx(1) = min (point_3d(2)+2, gnbr_lats)
00400       if (point_3d(2) == gnbr_lats) neigh_lat_idx(1) = gnbr_lats - 1
00401 
00402       ! for all neighbours
00403       do j = 1, 3
00404 
00405          ! if the point is supposed to be on the other side of the pole
00406          if (point_3d(2) == neigh_lat_idx(j) .or. &
00407                j == 1 .and. point_3d(2) >= gnbr_lats -1) then
00408 
00409             star(i,j) = get_gauss_opposite_neighbour_cell(point_3d(1),                  &
00410                                                           gpoints_per_lat(point_3d(2)), &
00411                                                           gpoints_per_lat(neigh_lat_idx(j)))
00412          else
00413 
00414             star(i,j) = get_gauss_neighbour_cell(point_3d(1),                  &
00415                                                  gpoints_per_lat(point_3d(2)), &
00416                                                  gpoints_per_lat(neigh_lat_idx(j)))
00417          endif
00418 
00419          star(i,j) = star(i,j) + global_beg(neigh_lat_idx(j)) - 1
00420 
00421       enddo ! j = 1, 3
00422    enddo !  i = 1, sum(gp%extent, 1)
00423 !
00424 !===> Establish the connectivity of cells
00425 !
00426    face = -999
00427 
00428    ii = 0
00429 
00430    do i = 1, sum(gp%extent(:,1))
00431       tmp_face = psmile_gauss_3d_to_global_1d(grid_id,              &
00432                     psmile_gauss_get_bicu_stencil (grid_id,         &
00433                        psmile_gauss_local_1d_to_3d(grid_id, i)), 16)
00434 
00435       face(i,1:3)   = tmp_face(1:3)
00436       face(i,4:6)   = tmp_face(5:7)
00437       face(i,7:9)   = tmp_face(9:11)
00438       face(i,10:13) = tmp_face(13:16)
00439       face(i,14)    = tmp_face(12)
00440       face(i,15)    = tmp_face(8)
00441       face(i,16)    = tmp_face(4)
00442    enddo
00443 
00444 #ifdef DEBUGX
00445    print *, ' Initial face array with global indices ' 
00446    do i = 1, np
00447       write( *, '(a13,10i8)') 'Initial face ', i, face(i,1:9)
00448    enddo
00449 
00450    print *, ' Initial face array with global indices ' 
00451    do i = 1, np
00452       write( *, '(a13,i6,1x,16i8)') 'Initial face ', i, &
00453               face(i,1:3), face(i,16), &
00454               face(i,4:6), face(i,15), &
00455               face(i,7:9), face(i,14), &
00456               face(i,10:13)
00457    enddo
00458    call psmile_flushstd
00459 #endif
00460 
00461    !
00462    ! ===> Get an upper estimate of the number of requested points
00463    !      (but only perform this task if the grid is partitioned)
00464    !      The upper estimate nreq will be corrected later.
00465 
00466    partitioned = (np /= global_end(gnbr_lats) - global_beg(1) + 1)
00467 
00468    if ( partitioned ) then
00469 
00470       ibeg = global_beg(l2g(1,1))
00471       iend = global_end(l2g(nbr_lats,1))
00472       !
00473       ii = 0
00474       !
00475       do j = 1, nbr_lats
00476          do n = 1, 16
00477             do i = l_irange (j,1), l_irange(j,2)
00478                if ( face(i,n) >= ibeg .and. face(i,n) <= iend ) then
00479                   if ( g2l_index(face(i,n)) == PSMILe_Undef ) ii = ii + 1
00480                else
00481                    ii = ii + 1
00482                endif
00483             enddo
00484          enddo
00485       enddo
00486 
00487       nreq = ii
00488 
00489       if ( nreq > 0 ) then
00490          Allocate (neighs_tmp (nreq), stat = ierror)
00491          if (ierror /= 0) then
00492             ierrp (1) = ierror
00493             ierrp (2) = nreq
00494 
00495             ierror = PRISM_Error_Alloc
00496             call psmile_error ( ierror, 'neighs_tmp', ierrp, 2, &
00497                  __FILE__, __LINE__ )
00498             return
00499          endif
00500       endif
00501 
00502       ii = 0
00503 
00504       do j = 1, nbr_lats
00505          do n = 1, 16
00506             do i = l_irange (j,1), l_irange(j,2)
00507                if ( face(i,n) >= ibeg .and. face(i,n) <= iend ) then
00508                   if ( g2l_index(face(i,n)) == PSMILe_Undef ) then
00509                      ii = ii + 1
00510                      neighs_tmp(ii) = face(i,n)
00511                   endif
00512                else
00513                   ii = ii + 1
00514                   neighs_tmp(ii) = face(i,n)
00515                endif
00516             enddo
00517          enddo
00518       enddo
00519 
00520       nreq = ii
00521 
00522       call psmile_quicksort(neighs_tmp, nreq)
00523 
00524       n = 1
00525       do i = 2, nreq
00526          if (neighs_tmp(i) > neighs_tmp(i-1) ) n = n + 1
00527       enddo
00528       !
00529       !===> Get global indices of requested points in gauss reduced grid
00530       !     neighs_req (n, 1) = Global index
00531       !     neighs_req (n, 2) = Global lat index
00532       !
00533       if ( n > 0 ) then
00534          Allocate (neighs_req (n, 2), stat = ierror)
00535          if (ierror /= 0) then
00536             ierrp (1) = ierror
00537             ierrp (2) = n * 2
00538 
00539             ierror = PRISM_Error_Alloc
00540             call psmile_error ( ierror, 'neighs_req', ierrp, 2, &
00541                  __FILE__, __LINE__ )
00542             return
00543          endif
00544       endif
00545 
00546       ii = 1
00547       neighs_req (ii,1) = neighs_tmp(1)
00548 
00549       do i = 2, nreq
00550          if (neighs_tmp(i) > neighs_tmp(i-1) ) then
00551             ii = ii + 1
00552             neighs_req (ii,1) = neighs_tmp(i)
00553          endif
00554       enddo
00555       !
00556       nreq = ii
00557       !
00558       ! determine global latitude index, only needed during
00559       ! the search to fill the put_list further down.
00560       !
00561       do j = 1, gnbr_lats
00562         if ( neighs_req (1,1) >= global_beg(j) .and. &
00563              neighs_req (1,1) <= global_end(j) ) exit
00564       enddo
00565 
00566       neighs_req (1,2) = j
00567 
00568       do i = 2, nreq
00569          do ii = j, gnbr_lats
00570             if ( neighs_req (i,1) >= global_beg(ii) .and. &
00571                  neighs_req (i,1) <= global_end(ii) ) exit
00572          enddo
00573          j  = ii
00574          neighs_req (i,2) = j
00575       enddo
00576 #ifdef DEBUGX
00577       print *, 'List of neighbour requests '
00578       do n = 1, nreq
00579           print *, '  Entry ', n, ':', neighs_req (n,:) 
00580       enddo
00581       call psmile_flushstd
00582 #endif
00583       !
00584       !  Exchange index lists
00585       !
00586       npes = Comps(gp%comp_id)%size
00587       rank = Comps(gp%comp_id)%rank
00588       comm = Comps(gp%comp_id)%act_comm
00589 
00590       Allocate(n_requests(0:npes-1), displs(0:npes-1), stat = ierror)
00591       if (ierror /= 0) then
00592          ierrp (1) = ierror
00593          ierrp (2) = npes
00594          ierror = PRISM_Error_Alloc
00595          call psmile_error ( ierror, 'n_requests', ierrp, 2, &
00596               __FILE__, __LINE__ )
00597          return
00598       endif
00599 
00600       call MPI_Allgather ( nreq, 1, MPI_Integer, n_requests, 1, &
00601                            MPI_Integer, comm, ierror )
00602 
00603 #ifdef DEBUGX
00604       print *, 'List of requests'
00605       do n = 0, npes-1
00606           print *, '  Proc # ', n, ':', n_requests(n)
00607       enddo
00608       call psmile_flushstd
00609 #endif
00610       nreq_tot = sum(n_requests)
00611 
00612       Allocate( neighs_reqtot(nreq_tot,2), stat = ierror)
00613       if (ierror /= 0) then
00614          ierrp (1) = ierror
00615          ierrp (2) = 2*nreq_tot
00616          ierror = PRISM_Error_Alloc
00617          call psmile_error ( ierror, 'neighs_reqtot', ierrp, 2, &
00618               __FILE__, __LINE__ )
00619          return
00620       endif
00621 
00622       displs(0) = 0
00623       do n = 1, npes-1
00624          displs(n) = displs (n-1) + n_requests(n-1)
00625       enddo
00626 
00627       ! Is there a way to do it within one call? We want to have
00628       ! the loop over the nreq inside neighs_req.
00629       !
00630       call MPI_Allgatherv ( neighs_req(1,1), nreq, MPI_Integer, &
00631                             neighs_reqtot(1,1), n_requests, displs, &
00632                             MPI_Integer, comm, ierror )
00633 
00634       call MPI_Allgatherv ( neighs_req(1,2), nreq, MPI_Integer, &
00635                             neighs_reqtot(1,2), n_requests, displs, &
00636                             MPI_Integer, comm, ierror )
00637 #ifdef DEBUGX
00638       print *, 'List of complete requests '
00639       off = 0
00640       do n = 0, npes-1
00641          do i = 1, n_requests(n)
00642             print *, '  Proc # ', n, ' Entry ', i, ':', neighs_reqtot (off+i,:) 
00643          enddo
00644          off = off + n_requests(n)
00645       enddo
00646       call psmile_flushstd
00647 #endif
00648       Allocate( gp%send_list(0:npes-1), gp%recv_list(0:npes-1), stat = ierror)
00649       if (ierror /= 0) then
00650          ierrp (1) = ierror
00651          ierrp (2) = 2*npes
00652          ierror = PRISM_Error_Alloc
00653          call psmile_error ( ierror, 'send/recv_list', ierrp, 2, &
00654               __FILE__, __LINE__ )
00655          return
00656       endif
00657       !
00658       send_list => gp%send_list
00659       recv_list => gp%recv_list
00660       !
00661       send_list = 0
00662       !
00663       !  Query the list.
00664       !
00665       !  Not so silly search. But can we do something better and make use
00666       !  of the fact that entries are now sorted for each n (process)?
00667       !
00668       do n = 0, npes-1
00669          if ( n /= rank ) then
00670             do i = 1, n_requests(n) ! loop over request for proc n
00671                do j = 1, nbr_lats
00672                   jg = l2g(j,1)
00673                   if ( neighs_reqtot(displs(n)+i,2) == jg ) then
00674                      if ( irange(j,1) <= neighs_reqtot(displs(n)+i,1) .and. &
00675                           irange(j,2) >= neighs_reqtot(displs(n)+i,1) ) then
00676                         send_list(n) = send_list(n) + 1
00677                      endif
00678                   endif
00679                enddo
00680 
00681             enddo
00682          endif
00683       enddo
00684       !
00685       !  Distribute size of lists
00686       !
00687       call MPI_Alltoall   ( send_list, 1, MPI_Integer, &
00688                             recv_list, 1, MPI_Integer, comm, ierror )
00689       !
00690       !  Allocate memory to store lists
00691       !
00692       Allocate ( gp%get_list    (0:npes-1), &
00693                  gp%put_list    (0:npes-1), &
00694                  gp%put_loc_list(0:npes-1), stat = ierror)
00695       if (ierror /= 0) then
00696          ierrp (1) = ierror
00697          ierrp (2) = 3*npes
00698          ierror = PRISM_Error_Alloc
00699          call psmile_error ( ierror, 'send/recv_list', ierrp, 2, &
00700               __FILE__, __LINE__ )
00701          return
00702       endif
00703 
00704       do n = 0, npes-1
00705 
00706          Nullify  (gp%get_list(n)%vector, &
00707                    gp%put_list(n)%vector, &
00708                    gp%put_loc_list(n)%vector)
00709 
00710          if ( recv_list(n) > 0 ) then
00711             Allocate (gp%get_list(n)%vector(recv_list(n)), stat = ierror)
00712             if (ierror /= 0) then
00713                ierrp (1) = ierror
00714                ierrp (2) = recv_list(n)
00715                ierror = PRISM_Error_Alloc
00716                call psmile_error ( ierror, 'get_list vector', ierrp, 2, &
00717                     __FILE__, __LINE__ )
00718                return
00719             endif
00720          endif
00721 
00722          if ( send_list(n) > 0 ) then
00723             Allocate (gp%put_list(n)%vector(send_list(n)), &
00724                       gp%put_loc_list(n)%vector(send_list(n)), stat = ierror)
00725             if (ierror /= 0) then
00726                ierrp (1) = ierror
00727                ierrp (2) = 2*send_list(n)
00728                ierror = PRISM_Error_Alloc
00729                call psmile_error ( ierror, 'put_list vector', ierrp, 2, &
00730                     __FILE__, __LINE__ )
00731                return
00732             endif
00733          endif
00734 
00735       enddo
00736       !
00737       !  Fill the list.
00738       !
00739       !  Not so silly search. But can we do something better and make use
00740       !  of the fact that entries are now sorted for each n (process)?
00741       !
00742       do n = 0, npes-1
00743          if ( n /= rank ) then
00744             ii = 0
00745             do i = 1, n_requests(n) ! loop over request for proc n
00746                do j = 1, nbr_lats
00747                   jg = l2g(j,1)
00748                   if ( neighs_reqtot(displs(n)+i,2) == jg ) then
00749                      if ( irange(j,1) <= neighs_reqtot(displs(n)+i,1) .and. &
00750                           irange(j,2) >= neighs_reqtot(displs(n)+i,1) ) then
00751                         ii = ii + 1
00752                         gp%put_list(n)%vector(ii) = &
00753                            neighs_reqtot(displs(n)+i,1)
00754                         gp%put_loc_list(n)%vector(ii) = &
00755                            g2l_index(neighs_reqtot(displs(n)+i,1))
00756                      endif
00757                   endif
00758                enddo
00759             enddo
00760          endif
00761       enddo
00762       !
00763       ! Exchange lists (active process for send/recv op when
00764       !                 respective list is longer than 0)
00765       !
00766       tag = 130265
00767       !
00768       do n = 0, npes-1
00769          if ( send_list(n) > 0 ) &
00770          call psmile_bsend ( gp%put_list(n)%vector, send_list(n), &
00771                              MPI_Integer, n, tag+rank, comm, ierror )
00772       enddo
00773       do n = 0, npes-1
00774          if ( recv_list(n) > 0 ) &
00775          call MPI_Recv ( gp%get_list(n)%vector, recv_list(n), &
00776                          MPI_Integer, n, tag+n, comm, status, ierror )
00777       enddo
00778 
00779       nreq = sum(recv_list)
00780 
00781       Allocate (gp%remote_index(nreq), stat = ierror)
00782       if (ierror /= 0) then
00783          ierrp (1) = ierror
00784          ierrp (2) = nreq
00785 
00786          ierror = PRISM_Error_Alloc
00787          call psmile_error ( ierror, 'gp%remote_index', ierrp, 2, &
00788               __FILE__, __LINE__ )
00789          return
00790       endif
00791 
00792       remote_index => gp%remote_index
00793 
00794       off = 0
00795       do n = 0, npes-1
00796          if ( recv_list(n) > 0 ) then
00797             remote_index(off+1:off+recv_list(n)) = &
00798                                gp%get_list(n)%vector(1:+recv_list(n))
00799             off = off + recv_list(n)
00800          endif
00801       enddo
00802 
00803 #ifdef DEBUGX
00804       print *, 'List of remote indices '
00805       do i = 1, nreq
00806          print *, 'i: ', i, ' index: ', remote_index(i)
00807       enddo
00808       call psmile_flushstd
00809 #endif
00810       !
00811       ! Convert global face and star indices to local indices and set
00812       !  outside indices to remote array indices. remote indices
00813       !  are added at the end from np+1 to np+nreq
00814       !
00815       ibeg = global_beg(l2g(1,1))
00816       iend = global_end(l2g(nbr_lats,1))
00817       !
00818       do j = 1, nbr_lats
00819          do n = 1, 16
00820             face(l_irange (j,1):l_irange(j,2), n) =                &
00821                psmile_gauss_1d_global_to_local (grid_id,           &
00822                   face(l_irange (j,1):l_irange(j,2), n),           &
00823                   l_irange(j,2) - l_irange(j,1) + 1, PSMILe_Undef, &
00824                   .true.)
00825          enddo
00826       enddo
00827 
00828 #ifdef DEBUGX
00829       print *, ' final face array with local indices ' 
00830       do i = 1, np
00831       write( *, '(a11,i6,1x,16i8)') 'Final face ', i, &
00832                  face(i,1:3), face(i,16), &
00833                  face(i,4:6), face(i,15), &
00834                  face(i,7:9), face(i,14), &
00835                  face(i,10:13)
00836       enddo
00837       call psmile_flushstd
00838 #endif
00839       do j = 1, nbr_lats
00840          do n = 1, 3
00841             star(l_irange (j,1):l_irange(j,2), n) =                &
00842                psmile_gauss_1d_global_to_local (grid_id,           &
00843                   star(l_irange (j,1):l_irange(j,2), n),           &
00844                   l_irange(j,2) - l_irange(j,1) + 1, PSMILe_Undef, &
00845                   .true.)
00846          enddo
00847       enddo
00848 
00849    endif ! partitioned
00850    !
00851    !===> Free Memory
00852    !
00853    Deallocate (irange)
00854    Deallocate (g2l_index)
00855 
00856    if ( partitioned ) then
00857       Deallocate (neighs_tmp)
00858       Deallocate (neighs_req)
00859       Deallocate (neighs_reqtot)
00860       Deallocate (n_requests, displs)
00861    endif
00862 
00863    ! computes a lookup table which is later used for optimisation
00864    Allocate (gp%reduced_gauss_data%local_1d_stencil_lookup(16,&
00865                gp%grid_shape(1,1):gp%grid_shape(2,1)), stat = ierror)
00866    if (ierror /= 0) then
00867       ierrp (1) = ierror
00868       ierrp (2) = gp%grid_shape(2,1)-gp%grid_shape(1,1)+1
00869 
00870       ierror = PRISM_Error_Alloc
00871       call psmile_error ( ierror, 'irange', ierrp, 2, &
00872            __FILE__, __LINE__ )
00873       return
00874    endif
00875 
00876    do i = gp%grid_shape(1,1), gp%grid_shape(2,1)
00877       gp%reduced_gauss_data%local_1d_stencil_lookup(:,i) =  &
00878          psmile_gauss_3d_to_local_1d(grid_id,               &
00879             psmile_gauss_get_bicu_stencil (grid_id,         &
00880                psmile_gauss_local_1d_to_3d(grid_id, i)),    &
00881          16, psmile_undef, .true.)
00882    enddo ! i
00883 
00884    ! collect the method coordinate data from neighbouring processes
00885    if ( gp%corner_pointer%corner_datatype == MPI_DOUBLE_PRECISION ) then
00886 
00887       call get_neigh_method_data_dble (grid_id)
00888 
00889    else if ( gp%corner_pointer%corner_datatype == MPI_REAL ) then
00890 
00891       call get_neigh_method_data_real (grid_id)
00892 
00893    else
00894 
00895       ierror = PRISM_Error_Internal
00896       call psmile_error ( ierror, 'datatype is currently not supported', &
00897            (/-1/), 0, __FILE__, __LINE__ )
00898    endif
00899    !
00900    !===> All done
00901    !
00902 #ifdef VERBOSE
00903    print 9980, trim(ch_id), nreq
00904 
00905    call psmile_flushstd
00906 #endif /* VERBOSE */
00907    !
00908    !  Formats:
00909    !
00910 9990 format (1x, a, ': psmile_gauss_get_neighbours')
00911 9980 format (1x, a, ': psmile_gauss_get_neighbours: eof, number of requested points ', i8)
00912 
00913  end subroutine PSMILe_Gauss_get_neighbours
00914  
00915 subroutine get_neigh_method_data_dble (grid_id)
00916 
00917    use prism_constants
00918    use psmile
00919 
00920    implicit none
00921 
00922    integer, intent (in)          :: grid_id
00923 
00924    integer :: method_id
00925 
00926    type (coords_block), pointer  :: coords_pointer
00927    integer                       :: npes, rank, comm
00928 
00929    integer, pointer              :: send_list(:)
00930    integer, pointer              :: recv_list(:)
00931 
00932    double precision, allocatable :: send_buffer(:)
00933    double precision, allocatable :: recv_buffer(:)
00934 
00935    integer, pointer              :: idx_list(:) 
00936 
00937    integer                       :: off, i, n, lstatus(MPI_STATUS_SIZE), ierror, tag, 
00938                                     send_buffer_size, recv_buffer_size
00939 
00940    !
00941    !-----------------------------------------------------------------
00942    ! Loop over all method_id for this grid_id and collect required
00943    ! method coordinates from remote pes if necessary. The gauss2
00944    ! array will be used later in psmile_mg_method_gauss2.
00945    ! The connectivity is based on the cell indices, however various
00946    ! sets of points may be specified. The assumption is that the
00947    ! connectivity is identical for all sets of points.
00948    !-----------------------------------------------------------------
00949    !
00950    send_buffer_size = 1
00951    recv_buffer_size = 1
00952 
00953    do method_id = 1, Number_of_Methods_allocated
00954       if ( Methods(method_id)%grid_id == grid_id ) then
00955          if ( associated(grids(grid_id)%send_list) ) &
00956             send_buffer_size = max (send_buffer_size, maxval(grids(grid_id)%send_list(:)))
00957          if ( associated(grids(grid_id)%recv_list) ) &
00958             recv_buffer_size = max (recv_buffer_size, maxval(grids(grid_id)%recv_list(:)))
00959       endif ! ( Methods(method_id)%grid_id == grid_id )
00960    enddo ! method_id
00961 
00962    allocate(send_buffer(2*send_buffer_size), &
00963             recv_buffer(2*recv_buffer_size), STAT = ierror)
00964 
00965    if ( ierror > 0 ) then
00966       call psmile_error ( PRISM_Error_Alloc, 'send_buffer, recv_buffer', &
00967          (/ierror, 2 * send_buffer_size + 2 * recv_buffer_size/), 3,   &
00968          __FILE__, __LINE__ )
00969       ierror = PRISM_Error_Alloc
00970       return
00971    endif
00972 
00973    do method_id = 1, Number_of_Methods_allocated
00974 
00975       if ( Methods(method_id)%grid_id == grid_id ) then
00976 
00977          coords_pointer => Methods(method_id)%coords_pointer
00978 
00979          npes = Comps(grids(grid_id)%comp_id)%size
00980          rank = Comps(grids(grid_id)%comp_id)%rank
00981          comm = Comps(grids(grid_id)%comp_id)%act_comm
00982          
00983          if ( associated(grids(grid_id)%send_list) ) then
00984 
00985             send_list => grids(grid_id)%send_list
00986 
00987             do n = 0, npes-1
00988 
00989                if ( send_list(n) > 0 ) then
00990 
00991                   idx_list => Grids(grid_id)%put_loc_list(n)%vector 
00992 
00993                   ! fill buffer with x and y method coordinates
00994                   do i = 1, send_list(n)
00995                      send_buffer(i)              = &
00996                         coords_pointer%coords_dble(1)%vector (idx_list (i))
00997                      send_buffer(i+send_list(n)) = &
00998                         coords_pointer%coords_dble(2)%vector (idx_list (i))
00999                   enddo
01000 
01001                   call psmile_bsend ( send_buffer, 2*send_list(n), &
01002                      MPI_DOUBLE_PRECISION, n, rank, comm, ierror )
01003                endif
01004             enddo ! n
01005          endif ! associated(grids(grid_id)%send_list)
01006 
01007          if ( associated(grids(grid_id)%recv_list) ) then
01008 
01009             recv_list => grids(grid_id)%recv_list
01010 
01011             allocate(Methods(method_id)%gauss2_dble(1)%vector(sum (recv_list(:))), &
01012                      Methods(method_id)%gauss2_dble(2)%vector(sum (recv_list(:))), &
01013                      STAT = ierror)
01014 
01015             if ( ierror > 0 ) then
01016                call psmile_error ( PRISM_Error_Alloc, 'gauss_dble%vector', &
01017                   (/ierror, 2 * sum (recv_list(:))/), 2, __FILE__, __LINE__ )
01018                ierror = PRISM_Error_Alloc
01019                return
01020             endif
01021 
01022             !
01023             ! Store remote coordinates.
01024             !
01025             ! Value at Methods(method_id)%gauss2_dble(2)%vector(i) corresponds
01026             ! to global index at grids(grid_id)%remote_index(i)
01027             !
01028             off = 0
01029             do n = 0, npes-1
01030 
01031                if ( recv_list(n) > 0 ) then
01032 
01033                   tag = n
01034                   call MPI_Recv ( recv_buffer, 2*recv_list(n), &
01035                      MPI_DOUBLE_PRECISION, n, tag, comm, lstatus, ierror )
01036 
01037                   Methods(method_id)%gauss2_dble(1)%vector(off+1:off+recv_list(n)) = &
01038                      recv_buffer(1:recv_list(n))
01039 
01040                   Methods(method_id)%gauss2_dble(2)%vector(off+1:off+recv_list(n)) = &
01041                      recv_buffer(recv_list(n)+1:2*recv_list(n))
01042 
01043                   off = off + recv_list(n)
01044                endif
01045             enddo ! n
01046          endif ! ( associated(grids(grid_id)%recv_list) )
01047       endif ! Methods(method_id)%grid_id
01048    enddo ! method_id
01049 
01050    deallocate ( send_buffer )
01051    deallocate ( recv_buffer )
01052 
01053 end subroutine get_neigh_method_data_dble
01054 
01055 !---------------------------------------------------------------------------------------------------
01056 
01057 subroutine get_neigh_method_data_real (grid_id)
01058 
01059    use prism_constants
01060    use psmile
01061 
01062    implicit none
01063 
01064    integer, intent (in)          :: grid_id
01065 
01066    integer :: method_id
01067 
01068    type (coords_block), pointer  :: coords_pointer
01069    integer                       :: npes, rank, comm
01070 
01071    integer, pointer              :: send_list(:)
01072    integer, pointer              :: recv_list(:)
01073 
01074    real, allocatable             :: send_buffer(:)
01075    real, allocatable             :: recv_buffer(:)
01076 
01077    integer, pointer              :: idx_list(:)
01078 
01079    integer                       :: off, i, n, lstatus(MPI_STATUS_SIZE), ierror, tag, 
01080                                     send_buffer_size, recv_buffer_size
01081 
01082    !
01083    !-----------------------------------------------------------------
01084    ! Loop over all method_id for this grid_id and collect required
01085    ! method coordinates from remote pes if necessary. The gauss2
01086    ! array will be used later in psmile_mg_method_gauss2.
01087    ! The connectivity is based on the cell indices, however various
01088    ! sets of points may be specified. The assumption is that the
01089    ! connectivity is identical for all sets of points.
01090    !-----------------------------------------------------------------
01091    !
01092    send_buffer_size = 1
01093    recv_buffer_size = 1
01094 
01095    do method_id = 1, Number_of_Methods_allocated
01096       if ( Methods(method_id)%grid_id == grid_id ) then
01097          if ( associated(grids(grid_id)%send_list) ) &
01098             send_buffer_size = max (send_buffer_size, maxval(grids(grid_id)%send_list(:)))
01099          if ( associated(grids(grid_id)%recv_list) ) &
01100             recv_buffer_size = max (recv_buffer_size, maxval(grids(grid_id)%recv_list(:)))
01101       endif ! ( Methods(method_id)%grid_id == grid_id )
01102    enddo ! method_id
01103 
01104    allocate(send_buffer(2*send_buffer_size), &
01105             recv_buffer(2*recv_buffer_size), STAT = ierror)
01106 
01107    if ( ierror > 0 ) then
01108       call psmile_error ( PRISM_Error_Alloc, 'send_buffer, recv_buffer', &
01109          (/ierror, 2 * send_buffer_size + 2 * recv_buffer_size/), 3,   &
01110          __FILE__, __LINE__ )
01111       ierror = PRISM_Error_Alloc
01112       return
01113    endif
01114 
01115    do method_id = 1, Number_of_Methods_allocated
01116 
01117       if ( Methods(method_id)%grid_id == grid_id ) then
01118 
01119          coords_pointer => Methods(method_id)%coords_pointer
01120 
01121          npes = Comps(grids(grid_id)%comp_id)%size
01122          rank = Comps(grids(grid_id)%comp_id)%rank
01123          comm = Comps(grids(grid_id)%comp_id)%act_comm
01124          
01125          if ( associated(grids(grid_id)%send_list) ) then
01126 
01127             send_list => grids(grid_id)%send_list
01128 
01129             do n = 0, npes-1
01130 
01131                if ( send_list(n) > 0 ) then
01132 
01133                   idx_list => Grids(grid_id)%put_loc_list(n)%vector
01134 
01135                   ! fill buffer with x and y method coordinates
01136                   do i = 1, send_list(n)
01137                      send_buffer(i)              = &
01138                         coords_pointer%coords_real(1)%vector (idx_list (i))
01139                      send_buffer(i+send_list(n)) = &
01140                         coords_pointer%coords_real(2)%vector (idx_list (i))
01141                   enddo
01142 
01143                   call psmile_bsend ( send_buffer, 2*send_list(n), &
01144                      MPI_REAL, n, rank, comm, ierror )
01145                endif
01146             enddo ! n
01147          endif ! associated(grids(grid_id)%send_list)
01148 
01149          if ( associated(grids(grid_id)%recv_list) ) then
01150 
01151             recv_list => grids(grid_id)%recv_list
01152 
01153             allocate(Methods(method_id)%gauss2_real(1)%vector(sum (recv_list(:))), &
01154                      Methods(method_id)%gauss2_real(2)%vector(sum (recv_list(:))), &
01155                      STAT = ierror)
01156 
01157             if ( ierror > 0 ) then
01158                call psmile_error ( PRISM_Error_Alloc, 'gauss_real%vector', &
01159                   (/ierror, 2 * sum (recv_list(:))/), 2, __FILE__, __LINE__ )
01160                ierror = PRISM_Error_Alloc
01161                return
01162             endif
01163 
01164             !
01165             ! Store remote coordinates.
01166             !
01167             ! Value at Methods(method_id)%gauss2_real(2)%vector(i) corresponds
01168             ! to global index at grids(grid_id)%remote_index(i)
01169             !
01170             off = 0
01171             do n = 0, npes-1
01172 
01173                if ( recv_list(n) > 0 ) then
01174 
01175                   tag = n
01176                   call MPI_Recv ( recv_buffer, 2*recv_list(n), &
01177                      MPI_REAL, n, tag, comm, lstatus, ierror )
01178 
01179                   Methods(method_id)%gauss2_real(1)%vector(off+1:off+recv_list(n)) = &
01180                      recv_buffer(1:recv_list(n))
01181 
01182                   Methods(method_id)%gauss2_real(2)%vector(off+1:off+recv_list(n)) = &
01183                      recv_buffer(recv_list(n)+1:2*recv_list(n))
01184 
01185                   off = off + recv_list(n)
01186                endif
01187             enddo ! n
01188          endif ! ( associated(grids(grid_id)%recv_list) )
01189       endif ! Methods(method_id)%grid_id
01190    enddo ! method_id
01191 
01192    deallocate ( send_buffer )
01193    deallocate ( recv_buffer )
01194 
01195 end subroutine get_neigh_method_data_real

Generated on 18 Mar 2011 for Oasis4 by  doxygen 1.6.1