From f640b99e059f0c4386efca78154c073ed83e3c22 Mon Sep 17 00:00:00 2001 From: Juee Himalbhai Desai Date: Thu, 20 Oct 2022 14:54:22 -0700 Subject: [PATCH] fabtests/examples: Add libfabric examples MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add examples to explain basic libfabric apis. Four examples are added: example_rdm.c: Explain simple message exchange between server and client using RDM endpoint. example_rdm_oob.c: This example explains simple meesage exchange with out of band addressing capability. example_rdm_rma.c: Explain message exchange using remote memory access. Two operations are explained here, remote write and remote read. In local write operation, client is going to write data from its local write buffer to the servers write buffer. In remote read operation, the server is going to read data from client’s read buffer into its own local read buffer. example_rdm_tagged.c: This example demonstrates how tagged messages work and specifically how the messages are received in the same order as they were sent. Signed-off-by: Juee Himalbhai Desai --- contrib/intel/jenkins/tests.py | 9 +- fabtests/Makefile.am | 18 +- fabtests/examples/example_rdm.c | 475 ++++++++++++++ fabtests/examples/example_rdm_oob.c | 721 ++++++++++++++++++++++ fabtests/examples/example_rdm_rma.c | 818 +++++++++++++++++++++++++ fabtests/examples/example_rdm_tagged.c | 713 +++++++++++++++++++++ 6 files changed, 2749 insertions(+), 5 deletions(-) create mode 100644 fabtests/examples/example_rdm.c create mode 100644 fabtests/examples/example_rdm_oob.c create mode 100644 fabtests/examples/example_rdm_rma.c create mode 100644 fabtests/examples/example_rdm_tagged.c diff --git a/contrib/intel/jenkins/tests.py b/contrib/intel/jenkins/tests.py index a172254354c..3c74b1a2cda 100755 --- a/contrib/intel/jenkins/tests.py +++ b/contrib/intel/jenkins/tests.py @@ -594,11 +594,12 @@ def __init__(self, jobname, buildno, testname, core_prov, fabric, self.mpi_type = mpitype @property - def execute_condn(self): +# def execute_condn(self): # mpich-tcp, ompi are the only osu test combinations failing - return False if ((self.mpi_type == 'mpich' and self.core_prov == 'tcp') or \ - self.mpi_type == 'ompi') \ - else True +# return False if ((self.mpi_type == 'mpich' and self.core_prov == 'tcp') or \ +# self.mpi_type == 'ompi') \ +# else True + return True def osu_cmd(self, test_type, test): print(f"Running OSU-{test_type}-{test}") diff --git a/fabtests/Makefile.am b/fabtests/Makefile.am index 49128b74e5b..28e66f17260 100644 --- a/fabtests/Makefile.am +++ b/fabtests/Makefile.am @@ -68,7 +68,11 @@ bin_PROGRAMS = \ multinode/fi_multinode_coll \ component/sock_test \ regression/sighandler_test \ - common/check_hmem + common/check_hmem \ + examples/example_rdm \ + examples/example_rdm_oob \ + examples/example_rdm_rma \ + examples/example_rdm_tagged if HAVE_ZE_DEVEL if HAVE_VERBS_DEVEL @@ -592,6 +596,18 @@ common_check_hmem_LDADD = libfabtests.la common_checK_hmem_CFLAGS = \ $(AM_CFLAGS) +examples_example_rdm_SOURCES = \ + examples/example_rdm.c + +examples_example_rdm_rma_SOURCES = \ + examples/example_rdm_rma.c + +examples_example_rdm_tagged_SOURCES = \ + examples/example_rdm_tagged.c + +examples_example_rdm_oob_SOURCES = \ + examples/example_rdm_oob.c + real_man_pages = \ man/man7/fabtests.7 diff --git a/fabtests/examples/example_rdm.c b/fabtests/examples/example_rdm.c new file mode 100644 index 00000000000..2158cb07144 --- /dev/null +++ b/fabtests/examples/example_rdm.c @@ -0,0 +1,475 @@ +/* + * Copyright (c) Intel Corporation. All rights reserved. + * + * This software is available to you under the BSD license + * below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +//Run Server: example_rdm +//Run client: example_rdm + +//#define BUF_SIZE 64 + +char *dst_addr = NULL; +char *port = "9228"; +static struct fi_info *hints, *info; +static struct fid_fabric *fabric = NULL; +static struct fid_domain *domain = NULL; +static struct fid_ep *ep = NULL; +static struct fid_av *av = NULL; +static struct fid_cq *cq = NULL; +char buf[64]; +static fi_addr_t fi_addr = FI_ADDR_UNSPEC; + +/* Set anything in hints that the application needs */ +static int set_hints(void) +{ + + hints = fi_allocinfo(); + if (!hints) + return -FI_ENOMEM; + + /* + * Request FI_EP_RDM (reliable datagram) endpoint which will allow us + * to reliably send messages to peers without having to + * listen/connect/accept. + */ + hints->ep_attr->type = FI_EP_RDM; + + /* + * Request basic messaging capabilities from the provider (no tag + * matching, no RMA, no atomic operations) + */ + hints->caps = FI_MSG; + + /* Specifically request the tcp provider for the simple test */ + hints->fabric_attr->prov_name = "tcp"; + + /* + * Default to FI_DELIVERY_COMPLETE which will make sure completions do + * not get generated until our message arrives at the destination. + * Otherwise, the client might get a completion and exit before the + * server receives the message. This is to make the test simpler. + */ + hints->tx_attr->op_flags = FI_DELIVERY_COMPLETE; + + /* + * Set the mode bit to 0. Mode bits are used to convey requirements + * that an application must adhere to when using the fabric interfaces. + * Modes specify optimal ways of accessing the reported endpoint or + * domain. On input to fi_getinfo, applications set the mode bits that + * they support. + */ + hints->mode = 0; + + /* + * Set mr_mode to 0. mr_mode is used to specify the type of memory + * registration capabilities the application requires. In this example + * we are not using memory registration so this bit will be set to 0. + */ + hints->domain_attr->mr_mode = 0; + + /* Done setting hints */ + + return 0; +} + +/* + * Initializes all basic libfabric resources to allow for a server/client to + * exchange a message + */ +static int initialize(void) +{ + struct fi_cq_attr cq_attr = {0}; + struct fi_av_attr av_attr = {0}; + const struct sockaddr_in *sin; + char str_addr[INET_ADDRSTRLEN]; + int ret; + + /* + * The first libfabric call to happen for initialization is fi_getinfo + * which queries libfabric and returns any appropriate providers that + * fulfill the hints requirements. Any applicable providers will be + * returned as a list of fi_info structs (&info). Any info can be + * selected. In this test we select the first fi_info struct. Assuming + * all hints were set appropriately, the first fi_info should be most + * appropriate. The flag FI_SOURCE is set for the server to indicate + * that the address/port refer to source information. This is not set + * for the client because the fields refer to the server, not the + * caller (client). + */ + + ret = fi_getinfo(FI_VERSION(1,9), dst_addr, port, + dst_addr ? 0 : FI_SOURCE, hints, &info); + if (ret) { + printf("fi_getinfo error (%d)\n", ret); + return ret; + } + + if (!dst_addr) { + /* + * The server converts the returned fi_info->src_addr to a + * string for the client to use in its command line call. + * This address will get passed into the client's fi_getinfo + * call to resolve the server's address. + */ + sin = info->src_addr; + if (!inet_ntop(sin->sin_family, &sin->sin_addr, str_addr, + sizeof(str_addr))) { + printf("error converting address to string\n"); + return -1; + } + + printf("Server started with addr %s\n", str_addr); + } + + /* + * Initialize our fabric. The fabric network represents a collection of + * hardware and software resources that access a single physical or + * virtual network. All network ports on a system that can communicate + * with each other through their attached networks belong to the same + * fabric. + */ + + ret = fi_fabric(info->fabric_attr, &fabric, NULL); + if (ret) { + printf("fi_fabric error (%d)\n", ret); + return ret; + } + + /* + * Initialize our domain (associated with our fabric). A domain defines + * the boundary for associating different resources together. + */ + + ret = fi_domain(fabric, info, &domain, NULL); + if (ret) { + printf("fi_domain error (%d)\n", ret); + return ret; + } + + /* + * Initialize our endpoint. Endpoints are transport level communication + * portals which are used to initiate and drive communication. There + * are three main types of endpoints: + * FI_EP_MSG - connected, reliable + * FI_EP_RDM - unconnected, reliable + * FI_EP_DGRAM - unconnected, unreliable + * The type of endpoint will be requested in hints/fi_getinfo. + * Different providers support different types of endpoints. + */ + + ret = fi_endpoint(domain, info, &ep, NULL); + if (ret) { + printf("fi_endpoint error (%d)\n", ret); + return ret; + } + + /* + * Initialize our completion queue. Completion queues are used to + * report events associated with data transfers. In this example, we + * use one CQ that tracks sends and receives, but often times there + * will be separate CQs for sends and receives. + */ + + cq_attr.size = 128; + cq_attr.format = FI_CQ_FORMAT_MSG; + ret = fi_cq_open(domain, &cq_attr, &cq, NULL); + if (ret) { + printf("fi_cq_open error (%d)\n", ret); + return ret; + } + + /* + * Bind our CQ to our endpoint to track any sends and receives that + * come in or out on that endpoint. A CQ can be bound to multiple + * endpoints but one EP can only have one send CQ and one receive CQ + * (which can be the same CQ). + */ + + ret = fi_ep_bind(ep, &cq->fid, FI_SEND | FI_RECV); + if (ret) { + printf("fi_ep_bind cq error (%d)\n", ret); + return ret; + } + + /* + * Initialize our address vector. Address vectors are used to map + * higher level addresses, which may be more natural for an application + * to use, into fabric specific addresses. An AV_TABLE av will map + * these addresses to indexed addresses, starting with fi_addr 0. These + * addresses are used in data transfer calls to specify which peer to + * send to/recv from. Address vectors are only used for FI_EP_RDM and + * FI_EP_DGRAM endpoints, allowing the application to avoid connection + * management. For FI_EP_MSG endpoints, the AV is replaced by the + * traditional listen/connect/accept steps. + */ + + av_attr.type = FI_AV_TABLE; + av_attr.count = 1; + ret = fi_av_open(domain, &av_attr, &av, NULL); + if (ret) { + printf("fi_av_open error (%d)\n", ret); + return ret; + } + + if (dst_addr) { + /* + * Here the client inserts the server's address into the AV + * which returns an fi_addr to use when sending data to the + * peer (server). Note that only the client has to insert the + * server's address into its AV since it is the one sending. + * The server does not need to have a peer's address in its AV + * in order to receive a message. + */ + + ret = fi_av_insert(av, info->dest_addr, 1, &fi_addr, 0, NULL); + if (ret != 1) { + printf("fi_av_insert error (%d)\n", ret); + return ret ? ret : -1; + } + } + + /* + * Bind the AV to the EP. The EP can only send data to a peer in its + * AV. + */ + + ret = fi_ep_bind(ep, &av->fid, 0); + if (ret) { + printf("fi_ep_bind av error (%d)\n", ret); + return ret; + } + + /* + * Once we have all our resources initialized and ready to go, we can + * enable our EP in order to send/receive data. + */ + + ret = fi_enable(ep); + if (ret) { + printf("fi_enable error (%d)\n", ret); + return ret; + } + + return 0; +} + +/* + * All libfabric resources are cleaned up using the same fi_close(fid) call. + * Resources must be closed in a specific order to allow references between + * objects to be removed correctly. For example the endpoint must be closed + * before the CQ or AV. + */ +static void cleanup(void) +{ + int ret; + + ret = fi_close(&ep->fid); + if (ret) + printf("warning: error closing EP (%d)\n", ret); + + ret = fi_close(&av->fid); + if (ret) + printf("warning: error closing AV (%d)\n", ret); + + ret = fi_close(&cq->fid); + if (ret) + printf("warning: error closing CQ (%d)\n", ret); + + ret = fi_close(&domain->fid); + if (ret) + printf("warning: error closing domain (%d)\n", ret); + + ret = fi_close(&fabric->fid); + if (ret) + printf("warning: error closing fabric (%d)\n", ret); + + if (info) + fi_freeinfo(info); +} + +/* + * Post a receive buffer. This call does not ensure a message has been + * received, just that a buffer has been passed to libfabric for the next message + * the provider receives. Receives may be directed or undirected using the + * address parameter. Here, we pass in the fi_addr but note that the server + * has not inserted the client's address into its AV, so the address is still + * FI_ADDR_UNSPEC, indicating that this buffer may receive incoming data from + * any address. An application may set this to a real fi_addr if the buffer + * should only receive data from a certain peer. + * When posting a buffer, if the provider is not ready to process messages + * (because of connection initialization for example), it may return + * -FI_EAGAIN. This does not indicate an error, but rather that the application + * should try again later. This is why we almost always wrap sends and receives + * in a do/while. Some providers may need the application to drive progress in + * order to get out of the -FI_EAGAIN loop. To drive progress, the application + * needs to call fi_cq_read (not necessarily reading any completion entries). + */ +static int post_recv(void) +{ + int ret; + + do { + ret = fi_recv(ep, buf, sizeof(buf), NULL, fi_addr, NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting recv buffer (%d\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + return 0; +} + +/* + * Post a send buffer. This call does not ensure a message has been sent, just + * that a buffer has been submitted to libfabric to be sent. Unlike a receive buffer, + * a send needs a valid fi_addr as input to tell the provider where to send the + * message. Similar to the receive buffer posting process, when posting a send + * buffer, if the provider is not ready to process messages, it may return + * -FI_EAGAIN. This does not indicate an error, but rather that the application + * should try again later. Just like the receive, we drive progress with + * fi_cq_read if this is the case. + * Note: This example does not support/need memory registration. + */ +static int post_send(void) +{ + char *msg = "Hello, server! I am the client you've been waiting for!"; + int ret; + + do { + ret = fi_send(ep, msg, sizeof(*msg), NULL, fi_addr, NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting send buffer (%d)\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + return 0; +} + +/* + * Wait for the message to be sent/received using the CQ. fi_cq_read not only + * drives progress but also returns any completed events to notify the + * application that it can reuse the send/recv buffer. The returned completion + * entry will have fields set to let the application know what operation + * completed. Not all fields will be valid. The fields set will be indicated by + * the cq format (when creating the CQ). In this example, we use + * FI_CQ_FORMAT_MSG in order to use the flags field. + */ +static int spin_for_comp(void) +{ + struct fi_cq_err_entry comp; + int ret; + + do { + ret = fi_cq_read(cq, &comp, 1); + if (ret < 0 && ret != -FI_EAGAIN) { + printf("error reading cq (%d)\n", ret); + return ret; + } + } while (ret != 1); + + if (comp.flags & FI_RECV) + printf("I received a message!\n"); + else if (comp.flags & FI_SEND) + printf("My sent message got sent!\n"); + + return 0; +} + +static int run(void) +{ + int ret; + + if (dst_addr) { + printf("Client: send to server %s\n", dst_addr); + + ret = post_send(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + } else { + printf("Server: post buffer and wait for message from client\n"); + + ret = post_recv(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + printf("This is the message I received: %s\n", buf); + } + return 0; +} + +int main(int argc, char **argv) +{ + int ret; + + /* + * Server run with no args, client has server's address as an + * argument. + */ + dst_addr = argv[optind]; + + /* + * Hints are used to request support for specific features from a + * provider. + */ + ret = set_hints(); + if (ret) { + printf ("Error settings hints.\n"); + goto out; + } + + ret = initialize(); + if (ret) + goto out; + + ret = run(); +out: + cleanup(); + return ret; +} diff --git a/fabtests/examples/example_rdm_oob.c b/fabtests/examples/example_rdm_oob.c new file mode 100644 index 00000000000..8fb9f2e1346 --- /dev/null +++ b/fabtests/examples/example_rdm_oob.c @@ -0,0 +1,721 @@ +/* + * Copyright (c) Intel Corporation. All rights reserved. + * + * This software is available to you under the BSD license + * below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//Run Server: example_rdm_oob +//Run client: example_rdm_oob + +#define BUF_SIZE 64 +#define MR_KEY 0xC0DE + +char *src_addr = NULL, *dst_addr = NULL; +char *oob_port = "9228"; +int listen_sock, oob_sock; +struct fi_info *hints, *info; +struct fid_fabric *fabric = NULL; +struct fid_domain *domain = NULL; +struct fid_ep *ep = NULL; +struct fid_av *av = NULL; +struct fid_cq *cq = NULL; +struct fid_mr *mr = NULL; +void *desc; +char buf[BUF_SIZE]; +static fi_addr_t fi_addr = FI_ADDR_UNSPEC; + +static int sock_listen(char *node, char *service) +{ + struct addrinfo *ai, hints; + int val, ret; + + memset(&hints, 0, sizeof hints); + hints.ai_flags = AI_PASSIVE; + + ret = getaddrinfo(node, service, &hints, &ai); + if (ret) { + printf("getaddrinfo() %s\n", gai_strerror(ret)); + return ret; + } + + listen_sock = socket(ai->ai_family, SOCK_STREAM, 0); + if (listen_sock < 0) { + printf("socket error"); + ret = listen_sock; + goto out; + } + + val = 1; + ret = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, + (void *) &val, sizeof val); + if (ret) { + printf("setsockopt SO_REUSEADDR"); + goto out; + } + + ret = bind(listen_sock, ai->ai_addr, ai->ai_addrlen); + if (ret) { + printf("bind"); + goto out; + } + + ret = listen(listen_sock, 0); + if (ret) + printf("listen error"); + +out: + if (ret && listen_sock >= 0) + close(listen_sock); + freeaddrinfo(ai); + return ret; +} + +static int sock_setup(int sock) +{ + int ret, op; + long flags; + + op = 1; + ret = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, + (void *) &op, sizeof(op)); + if (ret) + return ret; + + flags = fcntl(sock, F_GETFL); + if (flags < 0) + return -errno; + + if (fcntl(sock, F_SETFL, flags)) + return -errno; + + return 0; +} + +static int init_oob(void) +{ + struct addrinfo *ai = NULL; + int ret; + + if (!dst_addr) { + ret = sock_listen(src_addr, oob_port); + if (ret) + return ret; + + oob_sock = accept(listen_sock, NULL, 0); + if (oob_sock < 0) { + printf("accept error"); + ret = oob_sock; + return ret; + } + + close(listen_sock); + } else { + ret = getaddrinfo(dst_addr, oob_port, NULL, &ai); + if (ret) { + printf("getaddrinfo error"); + return ret; + } + + oob_sock = socket(ai->ai_family, SOCK_STREAM, 0); + if (oob_sock < 0) { + printf("socket error"); + ret = oob_sock; + goto free; + } + + ret = connect(oob_sock, ai->ai_addr, ai->ai_addrlen); + if (ret) { + printf("connect error"); + close(oob_sock); + goto free; + } + sleep(1); + } + + ret = sock_setup(oob_sock); + +free: + if (ai) + freeaddrinfo(ai); + return ret; +} + +static int sock_send(int fd, void *msg, size_t len) +{ + size_t sent; + ssize_t ret, err = 0; + + for (sent = 0; sent < len; ) { + ret = send(fd, ((char *) msg) + sent, len - sent, 0); + if (ret > 0) { + sent += ret; + } else { + err = -errno; + break; + } + } + + return err ? err: 0; +} + +static int sock_recv(int fd, void *msg, size_t len) +{ + size_t rcvd; + ssize_t ret, err = 0; + + for (rcvd = 0; rcvd < len; ) { + ret = recv(fd, ((char *) msg) + rcvd, len - rcvd, 0); + if (ret > 0) { + rcvd += ret; + } else if (ret == 0) { + err = -FI_ENOTCONN; + break; + } else { + err = -errno; + break; + } + } + + return err ? err: 0; +} + +static int sync_progress(void) +{ + int ret, value = 0, result = -FI_EOTHER; + + if (dst_addr) { + ret = send(oob_sock, &value, sizeof(value), 0); + if (ret != sizeof(value)) + return -FI_EOTHER; + + do { + ret = recv(oob_sock, &result, sizeof(result), MSG_DONTWAIT); + if (ret == sizeof(result)) + break; + + ret = fi_cq_read(cq, NULL, 0); + if (ret && ret != -FI_EAGAIN) + return ret; + } while (1); + } else { + do { + ret = recv(oob_sock, &result, sizeof(result), MSG_DONTWAIT); + if (ret == sizeof(result)) + break; + + ret = fi_cq_read(cq, NULL, 0); + if (ret && ret != -FI_EAGAIN) + return ret; + } while (1); + + ret = send(oob_sock, &value, sizeof(value), 0); + if (ret != sizeof(value)) + return -FI_EOTHER; + } + return 0; +} + +/* + * The server and client need to exchange their local addresses with each other + * so that the entity performing read or write operation knows where to read + * from or write to. + */ +static int exchange_addresses(void) +{ + char addr_buf[BUF_SIZE]; + int ret; + size_t addrlen = BUF_SIZE; + + ret = fi_getname(&ep->fid, addr_buf, &addrlen); + if (ret) { + printf("fi_getname error %d\n", ret); + return ret; + } + + ret = sock_send(oob_sock, addr_buf, BUF_SIZE); + if (ret) { + printf("sock_send error %d\n", ret); + return ret; + } + + memset(addr_buf, 0, BUF_SIZE); + ret = sock_recv(oob_sock, addr_buf, BUF_SIZE); + if (ret) { + printf("sock_recv error %d\n", ret); + return ret; + } + + ret = fi_av_insert(av, addr_buf, 1, &fi_addr, 0, NULL); + if (ret != 1) { + printf("av insert error\n"); + return -FI_ENOSYS; + } + + return sync_progress(); +} + +/* Set anything in hints that the application needs */ +static int set_hints(void) +{ + + hints = fi_allocinfo(); + if (!hints) + return EXIT_FAILURE; + + /* + * Request FI_EP_RDM (reliable datagram) endpoint which will allow us + * to reliably send messages to peers without having to + * listen/connect/accept. + */ + hints->ep_attr->type = FI_EP_RDM; + + /* + * Request basic messaging capabilities from the provider (no tag + * matching, no RMA, no atomic operations) + */ + hints->caps = FI_MSG; + + /* + * Default to FI_DELIVERY_COMPLETE which will make sure completions do + * not get generated until our message arrives at the destination. + * Otherwise, the client might get a completion and exit before the + * server receives the message. This is to make the test simpler. + */ + hints->tx_attr->op_flags = FI_DELIVERY_COMPLETE; + + /* Set memory registration modes. */ + hints->domain_attr->mr_mode = FI_MR_ENDPOINT | FI_MR_LOCAL | + FI_MR_PROV_KEY | FI_MR_ALLOCATED | FI_MR_VIRT_ADDR; + + /* Specifically request the tcp provider for the simple test */ + hints->fabric_attr->prov_name = "tcp"; + + //Done setting hints + + return 0; +} + +/* + * Initializes all basic libfabric resources to allow for a server/client to + * exchange a message + */ +static int initialize(void) +{ + struct fi_cq_attr cq_attr = {0}; + struct fi_av_attr av_attr = {0}; + int ret; + + /* + * The first libfabric call to happen for initialization is fi_getinfo + * which queries libfabric and returns any appropriate providers that + * fulfill the hints requirements. Any applicable providers will be + * returned as a list of fi_info structs (&info). Any info can be + * selected. In this test we select the first fi_info struct. Assuming + * all hints were set appropriately, the first fi_info should be most + * appropriate. The flag FI_SOURCE is set for the server to indicate + * that the address/port refer to source information. This is not set + * for the client because the fields refer to the server, not the + * caller (client). + */ + ret = fi_getinfo(FI_VERSION(1,9), NULL, NULL, 0, + hints, &info); + if (ret) { + printf("fi_getinfo error (%d)\n", ret); + return ret; + } + + /* + * Initialize our fabric. The fabric network represents a collection of + * hardware and software resources that access a single physical or + * virtual network. All network ports on a system that can communicate + * with each other through their attached networks belong to the same + * fabric. + */ + + ret = fi_fabric(info->fabric_attr, &fabric, NULL); + if (ret) { + printf("fi_fabric error (%d)\n", ret); + return ret; + } + + /* + * Initialize our domain (associated with our fabric). A domain defines + * the boundary for associating different resources together. + */ + + ret = fi_domain(fabric, info, &domain, NULL); + if (ret) { + printf("fi_domain error (%d)\n", ret); + return ret; + } + + /* + * Initialize our endpoint. Endpoints are transport level communication + * portals which are used to initiate and drive communication. There + * are three main types of endpoints: + * FI_EP_MSG - connected, reliable + * FI_EP_RDM - unconnected, reliable + * FI_EP_DGRAM - unconnected, unreliable + * The type of endpoint will be requested in hints/fi_getinfo. + * Different providers support different types of endpoints. + */ + + ret = fi_endpoint(domain, info, &ep, NULL); + if (ret) { + printf("fi_endpoint error (%d)\n", ret); + return ret; + } + + /* + * Initialize our completion queue. Completion queues are used to + * report events associated with data transfers. In this example, we + * use one CQ that tracks sends and receives, but often times there + * will be separate CQs for sends and receives. + */ + + cq_attr.size = 128; + cq_attr.format = FI_CQ_FORMAT_MSG; + ret = fi_cq_open(domain, &cq_attr, &cq, NULL); + if (ret) { + printf("fi_cq_open error (%d)\n", ret); + return ret; + } + + /* + * Bind our CQ to our endpoint to track any sends and receives that + * come in or out on that endpoint. A CQ can be bound to multiple + * endpoints but one EP can only have one send CQ and one receive CQ + * (which can be the same CQ). + */ + + ret = fi_ep_bind(ep, &cq->fid, FI_SEND | FI_RECV); + if (ret) { + printf("fi_ep_bind cq error (%d)\n", ret); + return ret; + } + + /* + * Initialize our address vector. Address vectors are used to map + * higher level addresses, which may be more natural for an application + * to use, into fabric specific addresses. An AV_TABLE av will map + * these addresses to indexed addresses, starting with fi_addr 0. These + * addresses are used in data transfer calls to specify which peer to + * send to/recv from. Address vectors are only used for FI_EP_RDM and + * FI_EP_DGRAM endpoints, allowing the application to avoid connection + * management. For FI_EP_MSG endpoints, the AV is replaced by the + * traditional listen/connect/accept steps. + */ + + av_attr.type = FI_AV_TABLE; + av_attr.count = 1; + ret = fi_av_open(domain, &av_attr, &av, NULL); + if (ret) { + printf("fi_av_open error (%d)\n", ret); + return ret; + } + + /* + * Bind the AV to the EP. The EP can only send data to a peer in its + * AV. + */ + + ret = fi_ep_bind(ep, &av->fid, 0); + if (ret) { + printf("fi_ep_bind av error (%d)\n", ret); + return ret; + } + + /* + * Once we have all our resources initialized and ready to go, we can + * enable our EP in order to send/receive data. + */ + + ret = fi_enable(ep); + if (ret) { + printf("fi_enable error (%d)\n", ret); + return ret; + } + + /* Register the memory region mr */ + ret = fi_mr_reg(domain, buf, BUF_SIZE, FI_SEND | FI_RECV | FI_WRITE | FI_READ | + FI_REMOTE_WRITE | FI_REMOTE_READ, 0, MR_KEY, 0, &mr, NULL); + if (ret) { + printf("fi_mr_reg error (%d)\n", ret); + return ret; + } + + /* Obtain local descriptor */ + desc = fi_mr_desc(mr); + + /* Bind the memory region mr with the endpoint and enable mr */ + if (info->domain_attr->mr_mode & FI_MR_ENDPOINT) { + ret = fi_mr_bind(mr, &ep->fid, 0); + if (ret) { + printf("fi_mr_bind error (%d)\n", ret); + return ret; + } + + ret = fi_mr_enable(mr); + if (ret) { + printf("fi_mr_enable error (%d)\n", ret); + return ret; + } + } + + ret = exchange_addresses(); + if (ret) + return ret; + + return 0; +} + +/* All libfabric resources are cleaned up using the same fi_close(fid) call. + * Resources must be closed in a specific order to allow references between + * objects to be removed correctly. For example the endpoint must be closed + * before the CQ or AV. + */ +static void cleanup(void) +{ + int ret; + + if (mr) { + ret = fi_close(&mr->fid); + if (ret) + printf("warning: error closing EP (%d)\n", ret); + } + + if (ep) { + ret = fi_close(&ep->fid); + if (ret) + printf("warning: error closing EP (%d)\n", ret); + } + + if (av) { + ret = fi_close(&av->fid); + if (ret) + printf("warning: error closing AV (%d)\n", ret); + } + + if (cq) { + ret = fi_close(&cq->fid); + if (ret) + printf("warning: error closing CQ (%d)\n", ret); + } + + if (domain) { + ret = fi_close(&domain->fid); + if (ret) + printf("warning: error closing domain (%d)\n", ret); + } + + if (fabric) { + ret = fi_close(&fabric->fid); + if (ret) + printf("warning: error closing fabric (%d)\n", ret); + } + + /* Free the space occupied by info struct*/ + if (info) + fi_freeinfo(info); +} + +/* + * Post a receive buffer. This call does not ensure a message has been + * received, just that a buffer has been passed to libfabric for the next + * message the provider receives. Receives may be directed or undirected using + * the address parameter. Here, we pass in the fi_addr but note that the server + * has not inserted the client's address into its AV, so the address is still + * FI_ADDR_UNSPEC, indicating that this buffer may receive incoming data from + * any address. An application may set this to a real fi_addr if the buffer + * should only receive data from a certain peer. + * When posting a buffer, if the provider is not ready to process messages + * (because of connection initialization for example), it may return + * -FI_EAGAIN. This does not indicate an error, but rather that the application + * should try again later. This is why we almost always wrap sends and receives + * in a do/while. Some providers may need the application to drive progress in + * order to get out of the -FI_EAGAIN loop. To drive progress, the application + * needs to call fi_cq_read (not necessarily reading any completion entries). + */ +static int post_recv(void) +{ + int ret; + + do { + ret = fi_recv(ep, buf, BUF_SIZE, NULL, fi_addr, NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting recv buffer (%d\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + return 0; +} + +/* + * Post a send buffer. This call does not ensure a message has been sent, just + * that a buffer has been submitted to libfabric to be sent. Unlike a receive + * buffer, a send needs a valid fi_addr as input to tell the provider where to + * send the message. Similar to the receive buffer posting porcess, when + * posting a send buffer, if the provider is not ready to process messages, it + * may return -FI_EAGAIN. This does not indicate an error, but rather that the + * application should try again later. Just like the receive, we drive progress + * with fi_cq_read if this is the case. + */ +static int post_send(void) +{ + char *msg = "Hello, server! I am the client you've been waiting for!"; + int ret; + + //(void) snprintf(buf, BUF_SIZE, "%s", msg); + + do { + ret = fi_send(ep, msg, strlen(msg), desc, fi_addr, NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting send buffer (%d)\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + return 0; +} + +/* + * Wait for the message to be sent/received using the CQ. fi_cq_read not only + * drives progress but also returns any completed events to notify the + * application that it can reuse the send/recv buffer. The returned completion + * entry will have fields set to let the application know what operation + * completed. Not all fields will be valid. The fields set will be indicated by + * the cq format (when creating the CQ). In this example, we use + * FI_CQ_FORMAT_MSG in order to use the flags field. + */ +static int spin_for_comp(void) +{ + struct fi_cq_err_entry comp; + int ret; + + do { + ret = fi_cq_read(cq, &comp, 1); + if (ret < 0 && ret != -FI_EAGAIN) { + printf("error reading cq (%d)\n", ret); + return ret; + } + } while (ret != 1); + + if (comp.flags & FI_RECV) + printf("I received a message!\n"); + else if (comp.flags & FI_SEND) + printf("My sent message got sent!\n"); + + return 0; +} + +static int run(void) +{ + int ret; + + if (dst_addr) { + printf("Client: send to server %s\n", dst_addr); + + ret = post_send(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + } else { + printf("Server: post buffer and wait for message from client\n"); + + ret = post_recv(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + printf("This is the message I received: %s\n", buf); + } + + return sync_progress(); +} + +int main(int argc, char **argv) +{ + int ret; + + /* + * Server run with no args, client has server's address as an + * argument. + */ + dst_addr = argv[optind]; + + /* Init out-of-band addressing */ + ret = init_oob(); + if (ret) + return ret; + + /* + * Hints are used to request support for specific features from a + * provider. + */ + ret = set_hints(); + if (ret) { + printf ("Error settings hints.\n"); + goto out; + } + + ret = initialize(); + if (ret) + goto out; + + ret = run(); +out: + cleanup(); + return ret; +} diff --git a/fabtests/examples/example_rdm_rma.c b/fabtests/examples/example_rdm_rma.c new file mode 100644 index 00000000000..d2640d47dc4 --- /dev/null +++ b/fabtests/examples/example_rdm_rma.c @@ -0,0 +1,818 @@ +/* + * Copyright (c) Intel Corporation. All rights reserved. + * + * This software is available to you under the BSD license + * below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//Run Server: example_rdm_rma +//Run client: example_rdm_rma + +#define BUF_SIZE 64 +#define MR_KEY 0xC0DE + +char *src_addr = NULL, *dst_addr = NULL; +char *oob_port = "9228"; +int listen_sock, oob_sock; +struct fi_info *hints, *info; +struct fid_fabric *fabric = NULL; +struct fid_domain *domain = NULL; +struct fid_ep *ep = NULL; +struct fid_av *av = NULL; +struct fid_cq *cq = NULL; +struct fid_mr *mr_read = NULL; +struct fid_mr *mr_write = NULL; +struct fid_eq *eq = NULL; +void *local_desc_read, *local_desc_write; +uint64_t remote_addr_read, remote_addr_write, local_addr_read, local_addr_write; +uint64_t local_key_read, local_key_write, remote_key_read, remote_key_write; +char buf_read[BUF_SIZE]; +char buf_write[BUF_SIZE]; +static fi_addr_t fi_addr = FI_ADDR_UNSPEC; + +static int sock_listen(char *node, char *service) +{ + struct addrinfo *ai, hints; + int val, ret; + + memset(&hints, 0, sizeof hints); + hints.ai_flags = AI_PASSIVE; + + ret = getaddrinfo(node, service, &hints, &ai); + if (ret) { + printf("getaddrinfo() %s\n", gai_strerror(ret)); + return ret; + } + + listen_sock = socket(ai->ai_family, SOCK_STREAM, 0); + if (listen_sock < 0) { + printf("socket error"); + ret = listen_sock; + goto out; + } + + val = 1; + ret = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, + (void *) &val, sizeof val); + if (ret) { + printf("setsockopt SO_REUSEADDR"); + goto out; + } + + ret = bind(listen_sock, ai->ai_addr, ai->ai_addrlen); + if (ret) { + printf("bind"); + goto out; + } + + ret = listen(listen_sock, 0); + if (ret) + printf("listen error"); + +out: + if (ret && listen_sock >= 0) + close(listen_sock); + freeaddrinfo(ai); + return ret; +} + +static int sock_setup(int sock) +{ + int ret, op; + long flags; + + op = 1; + ret = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, + (void *) &op, sizeof(op)); + if (ret) + return ret; + + flags = fcntl(sock, F_GETFL); + if (flags < 0) + return -errno; + + if (fcntl(sock, F_SETFL, flags)) + return -errno; + + return 0; +} + +static int init_oob(void) +{ + struct addrinfo *ai = NULL; + int ret; + + if (!dst_addr) { + ret = sock_listen(src_addr, oob_port); + if (ret) + return ret; + + oob_sock = accept(listen_sock, NULL, 0); + if (oob_sock < 0) { + printf("accept error"); + ret = oob_sock; + return ret; + } + + close(listen_sock); + } else { + ret = getaddrinfo(dst_addr, oob_port, NULL, &ai); + if (ret) { + printf("getaddrinfo error"); + return ret; + } + + oob_sock = socket(ai->ai_family, SOCK_STREAM, 0); + if (oob_sock < 0) { + printf("socket error"); + ret = oob_sock; + goto free; + } + + ret = connect(oob_sock, ai->ai_addr, ai->ai_addrlen); + if (ret) { + printf("connect error"); + close(oob_sock); + goto free; + } + sleep(1); + } + + ret = sock_setup(oob_sock); + +free: + if (ai) + freeaddrinfo(ai); + return ret; +} + +static int sock_send(int fd, void *msg, size_t len) +{ + size_t sent; + ssize_t ret, err = 0; + + for (sent = 0; sent < len; ) { + ret = send(fd, ((char *) msg) + sent, len - sent, 0); + if (ret > 0) { + sent += ret; + } else { + err = -errno; + break; + } + } + + return err ? err: 0; +} + +static int sock_recv(int fd, void *msg, size_t len) +{ + size_t rcvd; + ssize_t ret, err = 0; + + for (rcvd = 0; rcvd < len; ) { + ret = recv(fd, ((char *) msg) + rcvd, len - rcvd, 0); + if (ret > 0) { + rcvd += ret; + } else if (ret == 0) { + err = -FI_ENOTCONN; + break; + } else { + err = -errno; + break; + } + } + + return err ? err: 0; +} + +static int sync_progress(void) +{ + int ret, value = 0, result = -FI_EOTHER; + + if (dst_addr) { + ret = send(oob_sock, &value, sizeof(value), 0); + if (ret != sizeof(value)) + return -FI_EOTHER; + + do { + ret = recv(oob_sock, &result, sizeof(result), + MSG_DONTWAIT); + if (ret == sizeof(result)) + break; + + ret = fi_cq_read(cq, NULL, 0); + if (ret && ret != -FI_EAGAIN) + return ret; + } while (1); + } else { + do { + ret = recv(oob_sock, &result, sizeof(result), + MSG_DONTWAIT); + if (ret == sizeof(result)) + break; + + ret = fi_cq_read(cq, NULL, 0); + if (ret && ret != -FI_EAGAIN) + return ret; + } while (1); + + ret = send(oob_sock, &value, sizeof(value), 0); + if (ret != sizeof(value)) + return -FI_EOTHER; + } + return 0; +} + +/* + * The server and client need to exchange their local addresses with each other + * so that the entity performing read or write operation knows where to read + * from or write to. With exchange_info we will exchange the local_addr_read + * and local_addr_write addresses that we defined during initialization. Along + * with the addresses the keys will also be exchanged. + */ +static int exchange_info(void) +{ + char addr_buf[BUF_SIZE]; + int ret; + size_t addrlen = BUF_SIZE; + + ret = fi_getname(&ep->fid, addr_buf, &addrlen); + if (ret) { + printf("fi_getname error %d\n", ret); + return ret; + } + + ret = sock_send(oob_sock, addr_buf, BUF_SIZE); + if (ret) { + printf("sock_send error %d\n", ret); + return ret; + } + + memset(addr_buf, 0, BUF_SIZE); + ret = sock_recv(oob_sock, addr_buf, BUF_SIZE); + if (ret) { + printf("sock_recv error %d\n", ret); + return ret; + } + + ret = fi_av_insert(av, addr_buf, 1, &fi_addr, 0, NULL); + if (ret != 1) { + printf("av insert error\n"); + return -FI_ENOSYS; + } + /* + * Until fi_av_insert, the exchange of addresses between client and + * server is happening. + */ + + /* Now the memory region addresses and keys will be exchanged. */ + ret = sock_send(oob_sock, &local_key_write, sizeof(local_key_write)); + if (ret) { + printf("sock_send error %d\n", ret); + return ret; + } + + ret = sock_recv(oob_sock, &remote_key_write, sizeof(remote_key_write)); + if (ret) { + printf("sock_recv error %d\n", ret); + return ret; + } + + ret = sock_send(oob_sock, &local_key_read, sizeof(local_key_read)); + if (ret) { + printf("sock_send error %d\n", ret); + return ret; + } + + ret = sock_recv(oob_sock, &remote_key_read, sizeof(remote_key_read)); + if (ret) { + printf("sock_recv error %d\n", ret); + return ret; + } + + ret = sock_send(oob_sock, &local_addr_write, sizeof(local_addr_write)); + if (ret) { + printf("sock_send error %d\n", ret); + return ret; + } + + ret = sock_recv(oob_sock, &remote_addr_write, sizeof(remote_addr_write)); + if (ret) { + printf("sock_recv error %d\n", ret); + return ret; + } + + ret = sock_send(oob_sock, &local_addr_read, sizeof(local_addr_read)); + if (ret) { + printf("sock_send error %d\n", ret); + return ret; + } + + ret = sock_recv(oob_sock, &remote_addr_read, sizeof(remote_addr_read)); + if (ret) { + printf("sock_recv error %d\n", ret); + return ret; + } + + return sync_progress(); +} + +/* Set anything in hints that the application needs */ +static int set_hints(void) +{ + hints = fi_allocinfo(); + if (!hints) + return EXIT_FAILURE; + + /* + * Request FI_EP_RDM (reliable datagram) endpoint which will allow us + * to reliably send messages to peers without having to + * listen/connect/accept. + */ + hints->ep_attr->type = FI_EP_RDM; + + /* Request RMA capabilities from the provider.*/ + hints->caps = FI_RMA; + + /* + * Default to FI_DELIVERY_COMPLETE which will make sure completions do + * not get generated until our message arrives at the destination. + * Otherwise, the client might get a completion and exit before the + * server receives the message. This is to make the test simpler. + */ + hints->tx_attr->op_flags = FI_DELIVERY_COMPLETE; + hints->domain_attr->mr_mode = FI_MR_ENDPOINT | FI_MR_LOCAL | + FI_MR_PROV_KEY | FI_MR_ALLOCATED | FI_MR_VIRT_ADDR; + + /* Specifically request the tcp provider for the simple test */ + hints->fabric_attr->prov_name = "tcp"; + + /* Done setting hints */ + + return 0; +} + +/* + * Initializes all basic libfabric resources to allow for a server/client to + * exchange a message + */ +static int initialize(void) +{ + struct fi_cq_attr cq_attr = {0}; + struct fi_av_attr av_attr = {0}; + int ret; + + /* + * The first libfabric call to happen for initialization is fi_getinfo + * which queries libfabric and returns any appropriate providers that + * fulfill the hints requirements. Any applicable providers will be + * returned as a list of fi_info structs (&info). Any info can be + * selected. In this test we select the first fi_info struct. Assuming + * all hints were set appropriately, the first fi_info should be most + * appropriate. The flag FI_SOURCE is set for the server to indicate + * that the address/port refer to source information. This is not set + * for the client because the fields refer to the server, not the + * caller (client). + */ + + ret = fi_getinfo(FI_VERSION(1,9), NULL, NULL, 0, + hints, &info); + if (ret) { + printf("fi_getinfo error (%d)\n", ret); + return ret; + } + + /* + * Initialize our fabric. The fabric network represents a collection of + * hardware and software resources that access a single physical or + * virtual network. All network ports on a system that can communicate + * with each other through their attached networks belong to the same + * fabric. + */ + + ret = fi_fabric(info->fabric_attr, &fabric, NULL); + if (ret) { + printf("fi_fabric error (%d)\n", ret); + return ret; + } + + + /* + * Initialize our domain (associated with our fabric). A domain defines + * the boundary for associating different resources together. + */ + + ret = fi_domain(fabric, info, &domain, NULL); + if (ret) { + printf("fi_domain error (%d)\n", ret); + return ret; + } + + /* + * Initialize our endpoint. Endpoints are transport level communication + * portals which are used to initiate and drive communication. There + * are three main types of endpoints: + * FI_EP_MSG - connected, reliable + * FI_EP_RDM - unconnected, reliable + * FI_EP_DGRAM - unconnected, unreliable + * The type of endpoint will be requested in hints/fi_getinfo. + * Different providers support different types of endpoints. + */ + + ret = fi_endpoint(domain, info, &ep, NULL); + if (ret) { + printf("fi_endpoint error (%d)\n", ret); + return ret; + } + + /* Initialize our completion queue. Completion queues are used to + * report events associated with data transfers. In this example, we + * use one CQ that tracks sends and receives, but often times there + * will be separate CQs for sends and receives. + */ + + cq_attr.size = 128; + cq_attr.format = FI_CQ_FORMAT_MSG; + ret = fi_cq_open(domain, &cq_attr, &cq, NULL); + if (ret) { + printf("fi_cq_open error (%d)\n", ret); + return ret; + } + + /* Bind our CQ to our endpoint to track any sends and receives that + * come in or out on that endpoint. A CQ can be bound to multiple + * endpoints but one EP can only have one send CQ and one receive CQ + * (which can be the same CQ). + */ + + ret = fi_ep_bind(ep, &cq->fid, FI_SEND | FI_RECV); + if (ret) { + printf("fi_ep_bind cq error (%d)\n", ret); + return ret; + } + + /* Initialize our address vector. Address vectors are used to map + * higher level addresses, which may be more natural for an application + * to use, into fabric specific addresses. An AV_TABLE av will map + * these addresses to indexed addresses, starting with fi_addr 0. These + * addresses are used in data transfer calls to specific which peer to + * send to/recv from. Address vectors are only used for FI_EP_RDM and + * FI_EP_DGRAM endpoints, allowing the application to avoid connection + * management. For FI_EP_MSG endpoints, the AV is replaced by the + * traditional listen/connect/accept steps. + */ + + av_attr.type = FI_AV_TABLE; + av_attr.count = 1; + ret = fi_av_open(domain, &av_attr, &av, NULL); + if (ret) { + printf("fi_av_open error (%d)\n", ret); + return ret; + } + + /* + * Bind the AV to the EP. The EP can only send data to a peer in its + * AV. + */ + + ret = fi_ep_bind(ep, &av->fid, 0); + if (ret) { + printf("fi_ep_bind av error (%d)\n", ret); + return ret; + } + + /* + * Once we have all our resources initialized and ready to go, we can + * enable our EP in order to send/receive data. + */ + + ret = fi_enable(ep); + if (ret) { + printf("fi_enable error (%d)\n", ret); + return ret; + } + + /* Register the write memory region mr_write */ + ret = fi_mr_reg(domain, buf_write, BUF_SIZE, FI_WRITE | + FI_REMOTE_WRITE, 0, MR_KEY, 0, &mr_write, NULL); + if (ret) { + printf("fi_mr_reg error (%d)\n", ret); + return ret; + } + + /* Register the read memory region mr_read */ + ret = fi_mr_reg(domain, buf_read, BUF_SIZE, FI_READ | FI_REMOTE_READ, + 0, MR_KEY+1, 0, &mr_read, NULL); + if (ret) { + printf("fi_mr_r_reg error (%d)\n", ret); + return ret; + } + + /* Obtain local descriptor for write operation */ + local_desc_write = fi_mr_desc(mr_write); + + /* Obtain the key for accessing write memory region */ + local_key_write = fi_mr_key(mr_write); + + /* Obtain local descriptor for read operation */ + local_desc_read = fi_mr_desc(mr_read); + + /* Obtain the key for accessing read memory region */ + local_key_read = fi_mr_key(mr_read); + + /* + * Address of the local write buffer. This address will be exchanged + * with the other entity + */ + local_addr_write = info->domain_attr->mr_mode & FI_MR_VIRT_ADDR ? + (uintptr_t) buf_write : 0; + + /* + * Address of the local read buffer. This address will be exchanged + * with the other entity + */ + local_addr_read = info->domain_attr->mr_mode & FI_MR_VIRT_ADDR ? + (uintptr_t) buf_read : 0; + + ret = exchange_info(); + if (ret) + return ret; + + return 0; +} + +/* All libfabric resources are cleaned up using the same fi_close(fid) call. + * Resources must be closed in a specific order to allow references between + * objects to be removed correctly. For example the endpoint must be closed + * before the CQ or AV. + */ +static void cleanup(void) +{ + int ret; + + if (mr_write) { + ret = fi_close(&mr_write->fid); + if (ret) + printf("warning: error closing EP (%d)\n", ret); + } + + if (mr_read) { + ret = fi_close(&mr_read->fid); + if (ret) + printf("warning: error closing EP (%d)\n", ret); + } + + if (ep) { + ret = fi_close(&ep->fid); + if (ret) + printf("warning: error closing EP (%d)\n", ret); + } + + if (av) { + ret = fi_close(&av->fid); + if (ret) + printf("warning: error closing AV (%d)\n", ret); + } + + if (cq) { + ret = fi_close(&cq->fid); + if (ret) + printf("warning: error closing CQ (%d)\n", ret); + } + + if (domain) { + ret = fi_close(&domain->fid); + if (ret) + printf("warning: error closing domain (%d)\n", ret); + } + + if (fabric) { + ret = fi_close(&fabric->fid); + if (ret) + printf("warning: error closing fabric (%d)\n", ret); + } + + /* Free the space occupied by info struct*/ + if (info) + fi_freeinfo(info); +} + +static void fill_buffer(void *buf, char *msg) +{ + (void) snprintf(buf, BUF_SIZE, "%s", msg); + + return; +} + +/* + * Do a write into the client's write buffer. + * When doing a write, if the provider is not ready to process messages + * (because of connection initialization for example), it may return + * -FI_EAGAIN. This does not indicate an error, but rather that the application + * should try again later. This is why we almost always wrap sends and receives + * in a do/while. Some providers may need the application to drive progress in + * order to get out of the -FI_EAGAIN loop. To drive progress, the application + * needs to call fi_cq_read (not necessarily reading any completion entries). + */ +static int do_write(void) +{ + char *msg = "Hello, server! This is the WRITE you have been waiting for!\0"; + int ret; + + fill_buffer(buf_write, msg); + + do { + ret = fi_write(ep, buf_write, strlen(msg), local_desc_write, + fi_addr, remote_addr_write, remote_key_write, + NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error initiating a write (%d)\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + return 0; +} + +/* + * Do a read from the server's read buffer. + * When doing a read, if the provider is not ready to process messages + * (because of connection initialization for example), it may return + * -FI_EAGAIN. This does not indicate an error, but rather that the application + * should try again later. This is why we almost always wrap sends and receives + * in a do/while. Some providers may need the application to drive progress in + * order to get out of the -FI_EAGAIN loop. To drive progress, the application + * needs to call fi_cq_read (not necessarily reading any completion entries). + */ +static int do_read(void) +{ + int ret; + do { + ret = fi_read(ep, buf_read, BUF_SIZE, local_desc_read, fi_addr, + remote_addr_read, remote_key_read, NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error initiating a read (%d)\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + return 0; + +} + +/* + * Wait for the message to be written/read using the CQ. fi_cq_read not only + * drives progress but also returns any completed events to notify the + * application that it can reuse the send/recv buffer. The returned completion + * entry will have fields set to let the application know what operation + * completed. Not all fields will be valid. The fields set will be indicated by + * the cq format (when creating the CQ). In this example, we use + * FI_CQ_FORMAT_MSG in order to use the flags field. + */ +static int spin_for_comp(void) +{ + struct fi_cq_err_entry comp; + int ret; + + do { + ret = fi_cq_read(cq, &comp, 1); + if (ret < 0 && ret != -FI_EAGAIN) { + printf("error reading cq (%d)\n", ret); + return ret; + } + } while (ret != 1); + + if (comp.flags & FI_READ) + printf("My READ finished!\n"); + else if (comp.flags & FI_WRITE) + printf("My WRITE finished!\n"); + + return 0; +} + +static int run(void) +{ + char *msg = "Hello, server! This is the READ you have been waiting for!\0"; + int ret; + + if (dst_addr) { + printf("Client: write to server %s\n", dst_addr); + ret = do_write(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + fill_buffer(buf_read, msg); + + ret = sync_progress(); + if (ret) + return ret; + + } else { + ret = sync_progress(); + if (ret) + return ret; + + printf("Server: wait for message from client\n"); + ret = do_read(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + } + + ret = sync_progress(); + if (ret) + return ret; + + if (!dst_addr) { + printf("This is the READ message: %s\n", buf_read); + printf("This is the WRITE message: %s\n", buf_write); + } + + return 0; +} + +int main(int argc, char **argv) +{ + int ret; + + /* + * Server run with no args, client has server's address as an + * argument + */ + dst_addr = argv[optind]; + + /* Initialize out of band addressing capability*/ + ret = init_oob(); + if (ret) + return ret; + + /* + * Hints are used to request support for specific features from a + * provider. + */ + ret = set_hints(); + if (ret) { + printf ("Error settings hints.\n"); + goto out; + } + + ret = initialize(); + if (ret) + goto out; + + ret = run(); +out: + cleanup(); + return ret; +} diff --git a/fabtests/examples/example_rdm_tagged.c b/fabtests/examples/example_rdm_tagged.c new file mode 100644 index 00000000000..0416bdbcf74 --- /dev/null +++ b/fabtests/examples/example_rdm_tagged.c @@ -0,0 +1,713 @@ +/* + * Copyright (c) Intel Corporation. All rights reserved. + * + * This software is available to you under the BSD license + * below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//Run Server: example_rdm_tagged +//Run client: example_rdm_tagged + +#define BUF_SIZE 64 + +#define TAG_1 1 +#define TAG_2 2 + +char *src_addr = NULL, *dst_addr = NULL; +char *oob_port = "9228"; +int listen_sock, oob_sock; +struct fi_info *hints, *info; +struct fid_fabric *fabric = NULL; +struct fid_domain *domain = NULL; +struct fid_ep *ep = NULL; +struct fid_av *av = NULL; +struct fid_cq *cq = NULL; +char buf_1[BUF_SIZE]; +char buf_2[BUF_SIZE]; +static fi_addr_t fi_addr = FI_ADDR_UNSPEC; +uint64_t fi_tag = 0; + +static int sock_listen(char *node, char *service) +{ + struct addrinfo *ai, hints; + int val, ret; + + memset(&hints, 0, sizeof hints); + hints.ai_flags = AI_PASSIVE; + + ret = getaddrinfo(node, service, &hints, &ai); + if (ret) { + printf("getaddrinfo() %s\n", gai_strerror(ret)); + return ret; + } + + listen_sock = socket(ai->ai_family, SOCK_STREAM, 0); + if (listen_sock < 0) { + printf("socket error"); + ret = listen_sock; + goto out; + } + + val = 1; + ret = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, + (void *) &val, sizeof val); + if (ret) { + printf("setsockopt SO_REUSEADDR"); + goto out; + } + + ret = bind(listen_sock, ai->ai_addr, ai->ai_addrlen); + if (ret) { + printf("bind"); + goto out; + } + + ret = listen(listen_sock, 0); + if (ret) + printf("listen error"); + +out: + if (ret && listen_sock >= 0) + close(listen_sock); + freeaddrinfo(ai); + return ret; +} + +static int sock_setup(int sock) +{ + int ret, op; + long flags; + + op = 1; + ret = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, + (void *) &op, sizeof(op)); + if (ret) + return ret; + + flags = fcntl(sock, F_GETFL); + if (flags < 0) + return -errno; + + if (fcntl(sock, F_SETFL, flags)) + return -errno; + + return 0; +} + +static int init_oob(void) +{ + struct addrinfo *ai = NULL; + int ret; + + if (!dst_addr) { + ret = sock_listen(src_addr, oob_port); + if (ret) + return ret; + + oob_sock = accept(listen_sock, NULL, 0); + if (oob_sock < 0) { + printf("accept error"); + ret = oob_sock; + return ret; + } + + close(listen_sock); + } else { + ret = getaddrinfo(dst_addr, oob_port, NULL, &ai); + if (ret) { + printf("getaddrinfo error"); + return ret; + } + + oob_sock = socket(ai->ai_family, SOCK_STREAM, 0); + if (oob_sock < 0) { + printf("socket error"); + ret = oob_sock; + goto free; + } + + ret = connect(oob_sock, ai->ai_addr, ai->ai_addrlen); + if (ret) { + printf("connect error"); + close(oob_sock); + goto free; + } + sleep(1); + } + + ret = sock_setup(oob_sock); + +free: + if (ai) + freeaddrinfo(ai); + return ret; +} + + +static int sock_send(int fd, void *msg, size_t len) +{ + size_t sent; + ssize_t ret, err = 0; + + for (sent = 0; sent < len; ) { + ret = send(fd, ((char *) msg) + sent, len - sent, 0); + if (ret > 0) { + sent += ret; + } else { + err = -errno; + break; + } + } + + return err ? err: 0; +} + +static int sock_recv(int fd, void *msg, size_t len) +{ + size_t rcvd; + ssize_t ret, err = 0; + + for (rcvd = 0; rcvd < len; ) { + ret = recv(fd, ((char *) msg) + rcvd, len - rcvd, 0); + if (ret > 0) { + rcvd += ret; + } else if (ret == 0) { + err = -FI_ENOTCONN; + break; + } else { + err = -errno; + break; + } + } + + return err ? err: 0; +} + +static int sync_progress(void) +{ + int ret, value = 0, result = -FI_EOTHER; + + if (dst_addr) { + ret = send(oob_sock, &value, sizeof(value), 0); + if (ret != sizeof(value)) + return -FI_EOTHER; + + do { + ret = recv(oob_sock, &result, sizeof(result), MSG_DONTWAIT); + if (ret == sizeof(result)) + break; + + ret = fi_cq_read(cq, NULL, 0); + if (ret && ret != -FI_EAGAIN) + return ret; + } while (1); + } else { + do { + ret = recv(oob_sock, &result, sizeof(result), MSG_DONTWAIT); + if (ret == sizeof(result)) + break; + + ret = fi_cq_read(cq, NULL, 0); + if (ret && ret != -FI_EAGAIN) + return ret; + } while (1); + + ret = send(oob_sock, &value, sizeof(value), 0); + if (ret != sizeof(value)) + return -FI_EOTHER; + } + return 0; +} + +/* + * Exchange addresses function is simply getting the address of the other + * entity and storing it in the address vectro we created in the init phase. + */ +static int exchange_addresses(void) +{ + char addr_buf[BUF_SIZE]; + int ret; + size_t addrlen = BUF_SIZE; + + ret = fi_getname(&ep->fid, addr_buf, &addrlen); + if (ret) { + printf("fi_getname error %d\n", ret); + return ret; + } + + ret = sock_send(oob_sock, addr_buf, BUF_SIZE); + if (ret) { + printf("sock_send error %d\n", ret); + return ret; + } + + memset(addr_buf, 0, BUF_SIZE); + ret = sock_recv(oob_sock, addr_buf, BUF_SIZE); + if (ret) { + printf("sock_recv error %d\n", ret); + return ret; + } + + /* + * Here the client inserts the server's address into the AV and vice + * versa, which returns an fi_addr to use when sending data to the + * peer. + */ + + /* + * fi_av_insert(struct fid_av *av, void *addr, size_t count, + * fi_addr_t *fi_addr, uint64_t flags, void *context); + */ + ret = fi_av_insert(av, addr_buf, 1, &fi_addr, 0, NULL); + if (ret != 1) { + printf("av insert error\n"); + return -FI_ENOSYS; + } + + return sync_progress(); +} + +/* Set anything in hints that the application needs */ +static int set_hints(void) +{ + + hints = fi_allocinfo(); + if (!hints) + return EXIT_FAILURE; + + /* + * Request FI_EP_RDM (reliable datagram) endpoint which will allow us + * to reliably send messages to peers without having to + * listen/connect/accept. + */ + hints->ep_attr->type = FI_EP_RDM; + + /* Request tag matching capabilities from the provider */ + hints->caps = FI_TAGGED; + + /* Specifically request the tcp provider for the simple test */ + hints->fabric_attr->prov_name = "tcp"; + + /* + * Default to FI_DELIVERY_COMPLETE which will make sure completions do + * not get generated until our message arrives at the destination. + * Otherwise, the client might get a completion and exit before the + * server receives the message. This is to make the test simpler. + */ + hints->tx_attr->op_flags = FI_DELIVERY_COMPLETE; + + /* Done setting hints */ + + return 0; +} + +/* + * Initializes all basic libfabric resources to allow for a server/client to + * exchange a message + */ +static int initialize(void) +{ + struct fi_cq_attr cq_attr = {0}; + struct fi_av_attr av_attr = {0}; + int ret; + + /* + * The first libfabric call to happen for initialization is fi_getinfo + * which queries libfabric and returns any appropriate providers that + * fulfill the hints requirements. Any applicable providers will be + * returned as a list of fi_info structs (&info). Any info can be + * selected. In this test we select the first fi_info struct. Assuming + * all hints were set appropriately, the first fi_info should be most + * appropriate. The flag FI_SOURCE is set for the server to indicate + * that the address/port refer to source information. This is not set + * for the client because the fields refer to the server, not the + * caller (client). + */ + + ret = fi_getinfo(FI_VERSION(1,9), NULL, NULL, 0, hints, &info); + if (ret) { + printf("fi_getinfo error (%d)\n", ret); + return ret; + } + + /* + * Initialize our fabric. The fabric network represents a collection of + * hardware and software resources that access a single physical or + * virtual network. All network ports on a system that can communicate + * with each other through their attached networks belong to the same + * fabric. + */ + + ret = fi_fabric(info->fabric_attr, &fabric, NULL); + if (ret) { + printf("fi_fabric error (%d)\n", ret); + return ret; + } + + /* + * Initialize our domain (associated with our fabric). A domain defines + * the boundary for associating different resources together. + */ + + ret = fi_domain(fabric, info, &domain, NULL); + if (ret) { + printf("fi_domain error (%d)\n", ret); + return ret; + } + + /* + * Initialize our endpoint. Endpoints are transport level communication + * portals which are used to initiate and drive communication. There + * are three main types of endpoints: + * FI_EP_MSG - connected, reliable + * FI_EP_RDM - unconnected, reliable + * FI_EP_DGRAM - unconnected, unreliable + * The type of endpoint will be requested in hints/fi_getinfo. + * Different providers support different types of endpoints. + */ + + ret = fi_endpoint(domain, info, &ep, NULL); + if (ret) { + printf("fi_endpoint error (%d)\n", ret); + return ret; + } + + /* + * Initialize our completion queue. Completion queues are used to + * report events associated with data transfers. In this example, + * we use one CQ that tracks sends and receives, but often times + * there will be separate CQs for sends and receives. + */ + + cq_attr.size = 128; + cq_attr.format = FI_CQ_FORMAT_TAGGED; + cq_attr.wait_obj = 0; + ret = fi_cq_open(domain, &cq_attr, &cq, NULL); + if (ret) { + printf("fi_cq_open error (%d)\n", ret); + return ret; + } + + /* + * Bind our CQ to our endpoint to track any sends and receives that + * come in or out on that endpoint. A CQ can be bound to multiple + * endpoints but one EP can only have one send CQ and one receive CQ + * (which can be the same CQ). + */ + + ret = fi_ep_bind(ep, &cq->fid, FI_SEND | FI_RECV); + if (ret) { + printf("fi_ep_bind cq error (%d)\n", ret); + return ret; + } + + /* + * Initialize our address vector. Address vectors are used to map + * higher level addresses, which may be more natural for an application + * to use, into fabric specific addresses. An AV_TABLE av will map + * these addresses to indexed addresses, starting with fi_addr 0. These + * addresses are used in data transfer calls to specify which peer to + * send to/recv from. Address vectors are only used for FI_EP_RDM and + * FI_EP_DGRAM endpoints, allowing the application to avoid connection + * management. For FI_EP_MSG endpoints, the AV is replaced by the + * traditional listen/connect/accept steps. + */ + + av_attr.type = FI_AV_TABLE; + av_attr.count = 1; + ret = fi_av_open(domain, &av_attr, &av, NULL); + if (ret) { + printf("fi_av_open error (%d)\n", ret); + return ret; + } + + /* + * Bind the AV to the EP. The EP can only send data to a peer in its + * AV. + */ + + ret = fi_ep_bind(ep, &av->fid, 0); + if (ret) { + printf("fi_ep_bind av error (%d)\n", ret); + return ret; + } + + /* + * Once we have all our resources initialized and ready to go, we can + * enable our EP in order to send/receive data. + */ + + ret = fi_enable(ep); + if (ret) { + printf("fi_enable error (%d)\n", ret); + return ret; + } + + ret = exchange_addresses(); + if (ret) + return ret; + return 0; +} + +/* + * All libfabric resources are cleaned up using the same fi_close(fid) call. + * Resources must be closed in a specific order to allow references between + * objects to be removed correctly. For example the endpoint must be closed + * before the CQ or AV. + */ +static void cleanup(void) +{ + int ret; + + if (ep) { + ret = fi_close(&ep->fid); + if (ret) + printf("warning: error closing EP (%d)\n", ret); + } + + if (av) { + ret = fi_close(&av->fid); + if (ret) + printf("warning: error closing AV (%d)\n", ret); + } + + if (cq) { + ret = fi_close(&cq->fid); + if (ret) + printf("warning: error closing CQ (%d)\n", ret); + } + + if (domain) { + ret = fi_close(&domain->fid); + if (ret) + printf("warning: error closing domain (%d)\n", ret); + } + + if (fabric) { + ret = fi_close(&fabric->fid); + if (ret) + printf("warning: error closing fabric (%d)\n", ret); + } + + /* Free the space occupied by info struct*/ + if (info) + fi_freeinfo(info); +} + +static int post_recv(void) +{ + int ret; + + fi_tag = TAG_2; + do { + ret = fi_trecv(ep, buf_2, BUF_SIZE, NULL, fi_addr, fi_tag, + 0, NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting recv buffer (%d\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + fi_tag = TAG_1; + do { + ret = fi_trecv(ep, buf_1, BUF_SIZE, NULL, fi_addr, fi_tag, 0, + NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting recv buffer (%d\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + return 0; +} + +static int post_send(void) +{ + char *msg_1 = "Hello, server! I am the client sending TAG_1 message!\0"; + char *msg_2 = "Hello, server! I am the client sending TAG_2 message!\0"; + int ret; + + //(void) snprintf(buf_1, BUF_SIZE, "%s", msg_1); + //(void) snprintf(buf_2, BUF_SIZE, "%s", msg_2); + + fi_tag = TAG_1; + do { + ret = fi_tsend(ep, msg_1, strlen(msg_1), NULL, fi_addr, fi_tag, + NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting send buffer (%d)\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + printf ("Client: I posted TAG_1 message\n"); + + fi_tag = TAG_2; + do { + ret = fi_tsend(ep, msg_2, strlen(msg_2), NULL, fi_addr, fi_tag, + NULL); + if (ret && ret != -FI_EAGAIN) { + printf("error posting send buffer (%d)\n", ret); + return ret; + } + if (ret == -FI_EAGAIN) + (void) fi_cq_read(cq, NULL, 0); + } while (ret); + + printf ("Client: I posted TAG_2 message\n"); + + return 0; +} + +/* + * Wait for the message to be sent/received using the CQ. fi_cq_read not only + * drives progress but also returns any completed events to notify the + * application that it can reuse the send/recv buffer. The returned completion + * entry will have fields set to let the application know what operation + * completed. Not all fields will be valid. The fields set will be indicated by + * the cq format (when creating the CQ). In this example, we use + * FI_CQ_FORMAT_MSG in order to use the flags field. + */ +static int spin_for_comp(void) +{ + struct fi_cq_err_entry comp; + int ret; + + do { + /* + * fi_cq_read(struct fid_cq *cq, void *buf, size_t count); + */ + ret = fi_cq_read(cq, &comp, 1); + if (ret < 0 && ret != -FI_EAGAIN) { + printf("error reading cq (%d)\n", ret); + return ret; + } + } while (ret != 1); + + if (comp.flags & FI_RECV) + printf("I received a message with this tag: %ld!\n", comp.tag); + else if (comp.flags & FI_SEND) + printf("My message got sent!\n"); + + return 0; +} + +static int run(void) +{ + int ret; + + if (dst_addr) { + printf("Client: send to server %s\n", dst_addr); + + ret = post_send(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + } else { + + printf("Server: post buffer and wait for message from client\n"); + + ret = post_recv(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + ret = spin_for_comp(); + if (ret) + return ret; + + printf("This is the message I received: %s\n", buf_2); + printf("This is the message I received: %s\n", buf_1); + } + + return sync_progress(); +} + +int main(int argc, char **argv) +{ + int ret; + + /* + * Server run with no args, client has server's address as an + * argument. + */ + dst_addr = argv[optind]; + + /* Init out-of-band addressing */ + ret = init_oob(); + if (ret) + return ret; + + /* + * Hints are used to request support for specific features from a + * provider. + */ + ret = set_hints(); + if (ret) { + printf ("Error setting hints.\n"); + goto out; + } + + ret = initialize(); + if (ret) + goto out; + + ret = run(); +out: + cleanup(); + return ret; +} +