-
Notifications
You must be signed in to change notification settings - Fork 0
/
onesided_communication.jl
75 lines (52 loc) · 1.96 KB
/
onesided_communication.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
using GPI2
function wait_if_queue_full(queue_id, request_size)
queue_size_max = Ref{gaspi_number_t}()
queue_size = Ref{gaspi_number_t}()
gaspi_queue_size_max(queue_size_max)
gaspi_queue_size(queue_id, queue_size)
if (queue_size[] + request_size >= queue_size_max[])
gaspi_wait(queue_id, GASPI_BLOCK)
end
end
gaspi_proc_init(GASPI_BLOCK)
iProc = Ref{gaspi_rank_t}()
nProc = Ref{gaspi_rank_t}()
gaspi_proc_rank(iProc)
gaspi_proc_num(nProc)
segment_id_src = gaspi_segment_id_t(0)
segment_id_dst = gaspi_segment_id_t(1)
segment_size = gaspi_size_t(nProc[] * sizeof(Cint))
# create 2 segments for data
gaspi_segment_create(segment_id_src, segment_size, GASPI_GROUP_ALL, GASPI_BLOCK,
GASPI_ALLOC_DEFAULT)
gaspi_segment_create(segment_id_dst, segment_size, GASPI_GROUP_ALL, GASPI_BLOCK,
GASPI_ALLOC_DEFAULT)
# Use `Array` instead of `Ref` to allow auto-conversion to `Ptr{Cvoid}`
src_ptr = Array{Ptr{Cint}, 0}(undef)
dst_ptr = Array{Ptr{Cint}, 0}(undef)
# get initial pointers to each segment
gaspi_segment_ptr(segment_id_src, src_ptr)
gaspi_segment_ptr(segment_id_dst, dst_ptr)
# Wrap pointers in plain Julia array for convenient access
src = unsafe_wrap(Array, src_ptr[], nProc[])
dst = unsafe_wrap(Array, dst_ptr[], nProc[])
queue_id = gaspi_queue_id_t(0)
for r in 0:(nProc[]-1)
src[r + 1] = iProc[] * nProc[] + r
end
# sync
gaspi_barrier(GASPI_GROUP_ALL, GASPI_BLOCK)
println("BEFORE: iProc=$(iProc[]), src=$src")
println("BEFORE: iProc=$(iProc[]), dst=$dst")
for rank in 0:(nProc[]-1)
offset_src = gaspi_offset_t(iProc[] * sizeof(Cint))
offset_dst = gaspi_offset_t(rank * sizeof(Cint))
wait_if_queue_full(queue_id, 1)
gaspi_read(segment_id_dst, offset_dst, rank, segment_id_src, offset_src, sizeof(Cint),
queue_id, GASPI_BLOCK)
end
# .... work ....
gaspi_wait(queue_id, GASPI_BLOCK)
gaspi_barrier(GASPI_GROUP_ALL, GASPI_BLOCK)
println("AFTER: iProc=$(iProc[]), dst=$dst")
gaspi_proc_term(GASPI_BLOCK)