1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
/**
* Copyright (C) Mellanox Technologies Ltd. 2001-2016. ALL RIGHTS RESERVED.
*
* See file LICENSE for terms.
*/
#include "ucp_test.h"
class test_ucp_fence : public ucp_test {
public:
typedef void (test_ucp_fence::* send_func_t)(entity *e, uint64_t *initial_buf,
uint64_t *result_buf, void *memheap_addr,
ucp_rkey_h rkey);
static void send_cb(void *request, ucs_status_t status)
{
}
template <typename T>
void blocking_add(entity *e, uint64_t *initial_buf, uint64_t *result_buf,
void *memheap_addr, ucp_rkey_h rkey) {
ucs_status_t status = ucp_atomic_post(e->ep(), UCP_ATOMIC_POST_OP_ADD,
*initial_buf, sizeof(T),
(uintptr_t)memheap_addr, rkey);
ASSERT_UCS_OK(status);
}
template <typename T>
void blocking_fadd(entity *e, uint64_t *initial_buf, uint64_t *result_buf,
void *memheap_addr, ucp_rkey_h rkey)
{
void *request = ucp_atomic_fetch_nb(e->ep(), UCP_ATOMIC_FETCH_OP_FADD,
*initial_buf, (T*)result_buf, sizeof(T),
(uintptr_t)memheap_addr, rkey, send_cb);
request_wait(request);
}
template <typename T, typename F>
void test(F f1, F f2) {
test_fence(static_cast<send_func_t>(f1),
static_cast<send_func_t>(f2), sizeof(T));
}
class worker {
public:
worker(test_ucp_fence* test, send_func_t send1, send_func_t send2,
entity* entity, ucp_rkey_h rkey, void *memheap_ptr,
uint64_t initial_value, uint32_t* error):
test(test), value(initial_value), result(0), error(error),
running(true), m_rkey(rkey), m_memheap(memheap_ptr),
m_send_1(send1), m_send_2(send2), m_entity(entity) {
pthread_create(&m_thread, NULL, run, reinterpret_cast<void*>(this));
}
~worker() {
assert(!running);
}
static void *run(void *arg) {
worker *self = reinterpret_cast<worker*>(arg);
self->run();
return NULL;
}
void join() {
void *retval;
pthread_join(m_thread, &retval);
running = false;
}
test_ucp_fence* const test;
uint64_t value, result;
uint32_t* error;
bool running;
private:
void run() {
uint64_t zero = 0;
for (int i = 0; i < 500 / ucs::test_time_multiplier(); i++) {
(test->*m_send_1)(m_entity, &value, &result,
m_memheap, m_rkey);
m_entity->fence();
(test->*m_send_2)(m_entity, &zero, &result,
m_memheap, m_rkey);
test->flush_worker(*m_entity);
if (result != (uint64_t)(i+1))
(*error)++;
result = 0; /* reset for the next loop */
}
}
ucp_rkey_h m_rkey;
void *m_memheap;
send_func_t m_send_1, m_send_2;
entity* m_entity;
pthread_t m_thread;
};
void run_workers(send_func_t send1, send_func_t send2, entity* sender,
ucp_rkey_h rkey, void *memheap_ptr,
uint64_t initial_value, uint32_t* error) {
ucs::ptr_vector<worker> m_workers;
m_workers.clear();
m_workers.push_back(new worker(this, send1, send2, sender, rkey,
memheap_ptr, initial_value, error));
m_workers.at(0).join();
m_workers.clear();
}
protected:
void test_fence(send_func_t send1, send_func_t send2, size_t alignment) {
static const size_t memheap_size = sizeof(uint64_t);
uint32_t error = 0;
sender().connect(&receiver(), get_ep_params());
flush_worker(sender()); /* avoid deadlock for blocking amo */
mapped_buffer buffer(memheap_size, receiver(), 0);
EXPECT_LE(memheap_size, buffer.size());
memset(buffer.ptr(), 0, memheap_size);
run_workers(send1, send2, &sender(), buffer.rkey(sender()),
buffer.ptr(), 1, &error);
EXPECT_EQ(error, (uint32_t)0);
disconnect(sender());
disconnect(receiver());
}
};
class test_ucp_fence32 : public test_ucp_fence {
public:
static void get_test_variants(std::vector<ucp_test_variant>& variants) {
add_variant(variants, UCP_FEATURE_AMO32);
}
};
UCS_TEST_P(test_ucp_fence32, atomic_add_fadd) {
test<uint32_t>(&test_ucp_fence32::blocking_add<uint32_t>,
&test_ucp_fence32::blocking_fadd<uint32_t>);
}
UCP_INSTANTIATE_TEST_CASE(test_ucp_fence32)
class test_ucp_fence64 : public test_ucp_fence {
public:
static void get_test_variants(std::vector<ucp_test_variant>& variants) {
add_variant(variants, UCP_FEATURE_AMO64);
}
};
UCS_TEST_P(test_ucp_fence64, atomic_add_fadd) {
test<uint64_t>(&test_ucp_fence64::blocking_add<uint64_t>,
&test_ucp_fence64::blocking_fadd<uint64_t>);
}
UCP_INSTANTIATE_TEST_CASE(test_ucp_fence64)
|