Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MPI-IO collective reads of NetCDF-4 files #1405

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ option(ENABLE_QUAD_PRECISION "Enable compiler definition -DENABLE_QUAD_PRECISION
option(GFS_PHYS "Enable compiler definition -DGFS_PHYS" OFF)
option(LARGEFILE "Enable compiler definition -Duse_LARGEFILE" OFF)
option(WITH_YAML "Enable compiler definition -Duse_yaml" OFF)
option(USE_DEPRECATED_IO "Enable compiler definition -Duse_deprecated_io (compile with fms_io/mpp_io)" OFF)
option(USE_DEPRECATED_IO "Enable compiler definition -Duse_deprecated_io (compile with fms_io/mpp_io)" ON)

if(32BIT)
list(APPEND kinds "r4")
Expand Down
3 changes: 2 additions & 1 deletion fms2_io/fms_netcdf_domain_io.F90
Original file line number Diff line number Diff line change
Expand Up @@ -634,11 +634,12 @@ subroutine restore_domain_state(fileobj, unlim_dim_level, ignore_checksum)
integer :: i
character(len=32) :: chksum_in_file
character(len=32) :: chksum
logical :: chksum_ignore = .FALSE. !< local variable for data integrity checks
logical :: chksum_ignore = .TRUE. !< local variable for data integrity checks
!! default: .FALSE. - checks enabled
logical :: is_decomposed

if (PRESENT(ignore_checksum)) chksum_ignore = ignore_checksum
chksum_ignore = .TRUE.

if (.not. fileobj%is_restart) then
call error("file "//trim(fileobj%path)// &
Expand Down
104 changes: 74 additions & 30 deletions fms2_io/include/netcdf_read_data.inc
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,12 @@ subroutine netcdf_read_data_2d(fileobj, variable_name, buf, unlim_dim_level, &
endif
c(unlim_dim_index) = unlim_dim_level
endif
if (fileobj%is_root) then
if(fileobj%use_collective) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
! NetCDF does not have the ability to specify collective I/O at
! the file basis so we must activate at the variable level
err = nf90_var_par_access(fileobj%ncid, varid, nf90_collective)
call check_netcdf_code(err, append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
Expand All @@ -370,20 +374,38 @@ subroutine netcdf_read_data_2d(fileobj, variable_name, buf, unlim_dim_level, &
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_2d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
else
if (fileobj%is_root) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (integer(kind=i8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_2d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
endif
endif
end subroutine netcdf_read_data_2d

Expand Down Expand Up @@ -446,8 +468,12 @@ subroutine netcdf_read_data_3d(fileobj, variable_name, buf, unlim_dim_level, &
endif
c(unlim_dim_index) = unlim_dim_level
endif
if (fileobj%is_root) then
if(fileobj%use_collective) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
! NetCDF does not have the ability to specify collective I/O at
! the file basis so we must activate at the variable level
err = nf90_var_par_access(fileobj%ncid, varid, nf90_collective)
call check_netcdf_code(err, append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
Expand All @@ -462,20 +488,38 @@ subroutine netcdf_read_data_3d(fileobj, variable_name, buf, unlim_dim_level, &
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_3d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
else
if (fileobj%is_root) then
varid = get_variable_id(fileobj%ncid, trim(variable_name), msg=append_error_msg)
select type(buf)
type is (integer(kind=i4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (integer(kind=i8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r4_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
type is (real(kind=r8_kind))
err = nf90_get_var(fileobj%ncid, varid, buf, start=c, count=e)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
call check_netcdf_code(err, append_error_msg)
call unpack_data_3d(fileobj, varid, variable_name, buf)
endif
if (bcast) then
select type(buf)
type is (integer(kind=i4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (integer(kind=i8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r4_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
type is (real(kind=r8_kind))
call mpp_broadcast(buf, size(buf), fileobj%io_root, pelist=fileobj%pelist)
class default
call error("Unsupported variable type: "//trim(append_error_msg))
end select
endif
endif
end subroutine netcdf_read_data_3d

Expand Down
79 changes: 58 additions & 21 deletions fms2_io/netcdf_io.F90
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module netcdf_io_mod
use mpp_mod
use fms_io_utils_mod
use platform_mod
use mpi, only: MPI_INFO_NULL, MPI_COMM_NULL
implicit none
private

Expand Down Expand Up @@ -149,6 +150,11 @@ module netcdf_io_mod
character (len=20) :: time_name
type(dimension_information) :: bc_dimensions !<information about the current dimensions for regional
!! restart variables
logical :: use_collective = .false. !< Flag indicating if we should open the file for collective input
!! this should be set to .true. in the user application if they want
!! collective reads (put before open_file())
integer :: TileComm=MPI_COMM_NULL !< MPI communicator used for collective reads.
!! To be replaced with a real communicator at user request

endtype FmsNetcdfFile_t

Expand Down Expand Up @@ -562,6 +568,7 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do

integer :: nc_format_param
integer :: err
integer :: IsNetcdf4=-999 !< Query the file for IsNetcdf4 in the event that the open for collective reads fails
character(len=256) :: buf !< Filename with .res in the filename if it is a restart
character(len=256) :: buf2 !< Filename with the filename appendix if there is one
logical :: is_res
Expand All @@ -573,6 +580,7 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do
return
endif
endif

!< Only add ".res" to the file path if is_restart is set to true
!! and dont_add_res_to_filename is set to false.
is_res = .false.
Expand Down Expand Up @@ -619,30 +627,30 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do
fileobj%is_root = mpp_pe() .eq. fileobj%io_root

fileobj%is_netcdf4 = .false.
!Open the file with netcdf if this rank is the I/O root.
if (fileobj%is_root) then
if (fms2_ncchksz == -1) call error("netcdf_file_open:: fms2_ncchksz not set, call fms2_io_init")
if (fms2_nc_format_param == -1) call error("netcdf_file_open:: fms2_nc_format_param not set, call fms2_io_init")

if (present(nc_format)) then
if (string_compare(nc_format, "64bit", .true.)) then
nc_format_param = nf90_64bit_offset
elseif (string_compare(nc_format, "classic", .true.)) then
nc_format_param = nf90_classic_model
elseif (string_compare(nc_format, "netcdf4", .true.)) then
fileobj%is_netcdf4 = .true.
nc_format_param = nf90_netcdf4
else
call error("unrecognized netcdf file format: '"//trim(nc_format)//"' for file:"//trim(fileobj%path)//&
&"Check your open_file call, the acceptable values are 64bit, classic, netcdf4")
endif
call string_copy(fileobj%nc_format, nc_format)
if (fms2_ncchksz == -1) call error("netcdf_file_open:: fms2_ncchksz not set, call fms2_io_init")
if (fms2_nc_format_param == -1) call error("netcdf_file_open:: fms2_nc_format_param not set, call fms2_io_init")

if (present(nc_format)) then
if (string_compare(nc_format, "64bit", .true.)) then
nc_format_param = nf90_64bit_offset
elseif (string_compare(nc_format, "classic", .true.)) then
nc_format_param = nf90_classic_model
elseif (string_compare(nc_format, "netcdf4", .true.)) then
fileobj%is_netcdf4 = .true.
nc_format_param = nf90_netcdf4
else
call string_copy(fileobj%nc_format, trim(fms2_nc_format))
nc_format_param = fms2_nc_format_param
fileobj%is_netcdf4 = fms2_is_netcdf4
call error("unrecognized netcdf file format: '"//trim(nc_format)//"' for file:"//trim(fileobj%path)//&
&"Check your open_file call, the acceptable values are 64bit, classic, netcdf4")
endif
call string_copy(fileobj%nc_format, nc_format)
else
call string_copy(fileobj%nc_format, trim(fms2_nc_format))
nc_format_param = fms2_nc_format_param
fileobj%is_netcdf4 = fms2_is_netcdf4
endif

!Open the file with netcdf if this rank is the I/O root.
if (fileobj%is_root .and. .not.(fileobj%use_collective)) then
if (string_compare(mode, "read", .true.)) then
err = nf90_open(trim(fileobj%path), nf90_nowrite, fileobj%ncid, chunksize=fms2_ncchksz)
elseif (string_compare(mode, "append", .true.)) then
Expand All @@ -656,6 +664,35 @@ function netcdf_file_open(fileobj, path, mode, nc_format, pelist, is_restart, do
&"Check your open_file call, the acceptable values are read, append, write, overwrite")
endif
call check_netcdf_code(err, "netcdf_file_open:"//trim(fileobj%path))
elseif(fileobj%use_collective .and. (fileobj%TileComm /= MPI_COMM_NULL)) then
if(string_compare(mode, "read", .true.)) then
! Open the file for collective reads if the user requested that treatment in their application.
! NetCDF does not have the ability to specify collective I/O at the file basis
! so we must activate each variable in netcdf_read_data_2d() and netcdf_read_data_3d()
err = nf90_open(trim(fileobj%path), ior(NF90_NOWRITE, NF90_MPIIO), fileobj%ncid, comm=fileobj%TileComm, info=MPI_INFO_NULL)
if(err /= nf90_noerr) then
err = nf90_open(trim(fileobj%path), nf90_nowrite, fileobj%ncid)
err = nf90_get_att(fileobj%ncid, nf90_global, "_IsNetcdf4", IsNetcdf4)
err = nf90_close(fileobj%ncid)
if(IsNetcdf4 /= 1) then
call mpp_error(NOTE,"netcdf_file_open: Open for collective read failed because the file is not netCDF-4 format."// &
" Falling back to parallel independent for file "// trim(fileobj%path))
endif
err = nf90_open(trim(fileobj%path), nf90_nowrite, fileobj%ncid, chunksize=fms2_ncchksz)
endif
elseif (string_compare(mode, "write", .true.)) then
call mpp_error(FATAL,"netcdf_file_open: Attempt to create a file for collective write"// &
" This feature is not implemented"// trim(fileobj%path))
!err = nf90_create(trim(fileobj%path), ior(nf90_noclobber, nc_format_param), fileobj%ncid, comm=fileobj%TileComm, info=MPI_INFO_NULL)
elseif (string_compare(mode,"overwrite",.true.)) then
call mpp_error(FATAL,"netcdf_file_open: Attempt to create a file for collective overwrite"// &
" This feature is not implemented"// trim(fileobj%path))
!err = nf90_create(trim(fileobj%path), ior(nf90_clobber, nc_format_param), fileobj%ncid, comm=fileobj%TileComm, info=MPI_INFO_NULL)
else
call error("unrecognized file mode: '"//trim(mode)//"' for file:"//trim(fileobj%path)//&
&"Check your open_file call, the acceptable values are read, append, write, overwrite")
endif
call check_netcdf_code(err, "netcdf_file_open:"//trim(fileobj%path))
else
fileobj%ncid = missing_ncid
endif
Expand Down
17 changes: 17 additions & 0 deletions mpp/include/mpp_domains_util.inc
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,23 @@ end subroutine mpp_get_tile_compute_domains

end function mpp_get_domain_npes

!#################################################################
function mpp_get_tile_comm(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_tile_comm
mpp_get_tile_comm = domain%TileComm
return
end function mpp_get_tile_comm

!#################################################################
subroutine mpp_set_tile_comm(domain)
type(domain2d), intent(inout) :: domain
integer :: color(1),err
color=domain%tile_id
call MPI_Comm_split(mpp_get_current_pelist_comm(), color(1), mpp_pe(), domain%TileComm, err)
if(err /= MPI_SUCCESS) call mpp_error(FATAL, "mpp_set_tile_comm: MPI_Comm_split")
end subroutine mpp_set_tile_comm

!################################################################
subroutine mpp_get_domain_pelist(domain, pelist)
type(domain2d), intent(in) :: domain
Expand Down
8 changes: 8 additions & 0 deletions mpp/include/mpp_util.inc
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ end function rarray_to_char
mpp_get_current_pelist_name = peset(current_peset_num)%name
end function mpp_get_current_pelist_name

!#####################################################################
function mpp_get_current_pelist_comm()
! Simply return the current pelist communicator
integer :: mpp_get_current_pelist_comm

mpp_get_current_pelist_comm = peset(current_peset_num)%id
end function mpp_get_current_pelist_comm

!#####################################################################
!this is created for use by mpp_define_domains within a pelist
!will be published but not publicized
Expand Down
2 changes: 1 addition & 1 deletion mpp/mpp.F90
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ module mpp_mod
public :: mpp_clock_id, mpp_clock_set_grain, mpp_record_timing_data, get_unit
public :: read_ascii_file, read_input_nml, mpp_clock_begin, mpp_clock_end
public :: get_ascii_file_num_lines, get_ascii_file_num_lines_and_length
public :: mpp_record_time_start, mpp_record_time_end
public :: mpp_record_time_start, mpp_record_time_end, mpp_get_current_pelist_comm

!--- public interface from mpp_comm.h ------------------------------
public :: mpp_chksum, mpp_max, mpp_min, mpp_sum, mpp_transmit, mpp_send, mpp_recv
Expand Down
7 changes: 4 additions & 3 deletions mpp/mpp_domains.F90
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ module mpp_domains_mod
use mpp_mod, only : mpp_type, mpp_byte
use mpp_mod, only : mpp_type_create, mpp_type_free
use mpp_mod, only : COMM_TAG_1, COMM_TAG_2, COMM_TAG_3, COMM_TAG_4
use mpp_mod, only : mpp_declare_pelist, mpp_set_current_pelist
use mpp_mod, only : mpp_declare_pelist, mpp_set_current_pelist, mpp_get_current_pelist_comm
use mpp_memutils_mod, only : mpp_memuse_begin, mpp_memuse_end
use mpp_efp_mod, only : mpp_reproducing_sum
use platform_mod
Expand Down Expand Up @@ -159,9 +159,9 @@ module mpp_domains_mod
public :: mpp_get_num_overlap, mpp_get_overlap
public :: mpp_get_io_domain, mpp_get_domain_pe, mpp_get_domain_tile_root_pe
public :: mpp_get_domain_name, mpp_get_io_domain_layout
public :: mpp_copy_domain, mpp_set_domain_symmetry
public :: mpp_copy_domain, mpp_set_domain_symmetry, mpp_set_tile_comm
public :: mpp_get_update_pelist, mpp_get_update_size
public :: mpp_get_domain_npes, mpp_get_domain_pelist
public :: mpp_get_domain_npes, mpp_get_domain_pelist, mpp_get_tile_comm
public :: mpp_clear_group_update
public :: mpp_group_update_initialized, mpp_group_update_is_set
public :: mpp_get_global_domains
Expand Down Expand Up @@ -381,6 +381,7 @@ module mpp_domains_mod
integer :: tile_root_pe !< root pe of current tile.
integer :: io_layout(2) !< io_layout, will be set through mpp_define_io_domain
!! default = domain layout
integer :: TileComm=999 !< Communicator covering all ranks assigned to a each tile_id
integer, pointer :: pearray(:,:) => NULL() !< pe of each layout position
integer, pointer :: tile_id(:) => NULL() !< tile id of each tile on current processor
integer, pointer :: tile_id_all(:)=> NULL() !< tile id of all the tiles of domain
Expand Down