Chi-Tech
CBC_AsyncComm.cc
Go to the documentation of this file.
1#include "CBC_AsyncComm.h"
2
4
8#include "CBC_FLUDS.h"
9
10#include "chi_runtime.h"
11#include "chi_log.h"
12
13#define uint unsigned int
14
15namespace lbs
16{
17
19 size_t angle_set_id,
21 const chi::ChiMPICommunicatorSet& comm_set)
22 : chi_mesh::sweep_management::AsynchronousCommunicator(fluds, comm_set),
23 angle_set_id_(angle_set_id),
24 cbc_fluds_(dynamic_cast<CBC_FLUDS&>(fluds))
25{
26}
27
29 int location_id,
30 uint64_t cell_global_id,
31 unsigned int face_id,
32 size_t angle_set_id,
33 size_t data_size)
34{
35 MessageKey key{location_id, cell_global_id, face_id};
36
37 std::vector<double>& data = outgoing_message_queue_[key];
38 if (data.empty())
39 data.assign(data_size, 0.0);
40
41 return data;
42}
43
45{
46 typedef int MPIRank;
47
48 // First we convert any new outgoing messages from the queue into
49 // buffer messages. We aggregate these messages per location-id
50 // they need to be sent to
51 if (not outgoing_message_queue_.empty())
52 {
53 std::map<MPIRank, BufferItem> locI_buffer_map;
54
55 for (const auto& [msg_key, data] : outgoing_message_queue_)
56 {
57 const MPIRank locI = std::get<0>(msg_key);
58 const uint64_t cell_global_id = std::get<1>(msg_key);
59 const uint face_id = std::get<2>(msg_key);
60 const size_t data_size = data.size();
61
62 BufferItem& buffer_item = locI_buffer_map[locI];
63
64 buffer_item.destination_ = locI;
65 auto& buffer_array = buffer_item.data_array_;
66
67 buffer_array.Write(cell_global_id);
68 buffer_array.Write(face_id);
69 buffer_array.Write(data_size);
70
71 for (const double value : data) // actual psi_data
72 buffer_array.Write(value);
73 } // for item in queue
74
75 for (auto& [locI, buffer] : locI_buffer_map)
76 send_buffer_.push_back(std::move(buffer));
77
79 } // if there are outgoing messages
80
81 // Now we attempt to flush items in the send buffer
82 bool all_messages_sent = true;
83 for (auto& buffer_item : send_buffer_)
84 {
85 if (not buffer_item.send_initiated_)
86 {
87 const int locJ = buffer_item.destination_;
89 MPI_Isend(buffer_item.data_array_.Data().data(), // buf
90 static_cast<int>(buffer_item.data_array_.Size()), // count
91 MPI_BYTE, //
92 comm_set_.MapIonJ(locJ, locJ), // destination
93 static_cast<int>(angle_set_id_), // tag
94 comm_set_.LocICommunicator(locJ), // comm
95 &buffer_item.mpi_request_)); // request
96 buffer_item.send_initiated_ = true;
97 }
98
99 if (not buffer_item.completed_)
100 {
101 int sent;
103 MPI_Test(&buffer_item.mpi_request_, &sent, MPI_STATUS_IGNORE));
104 if (sent) buffer_item.completed_ = true;
105 else
106 all_messages_sent = false;
107 }
108 } // for item in buffer
109
110 return all_messages_sent;
111}
112
114{
115 typedef std::pair<uint64_t, uint> CellFaceKey; // cell_gid + face_id
116
117 std::map<CellFaceKey, std::vector<double>> received_messages;
118 std::vector<uint64_t> cells_who_received_data;
119 auto& location_dependencies = fluds_.GetSPDS().GetLocationDependencies();
120 for (int locJ : location_dependencies)
121 {
122 int message_available = 0;
123 MPI_Status status;
125 MPI_Iprobe(comm_set_.MapIonJ(locJ, Chi::mpi.location_id), // source
126 static_cast<int>(angle_set_id_), // tag
128 &message_available, // flag
129 &status)); // status
130
131 if (message_available)
132 {
133 int num_items;
134 MPI_Get_count(&status, MPI_BYTE, &num_items);
135 std::vector<std::byte> recv_buffer(num_items);
137 MPI_Recv(recv_buffer.data(), // recv_buffer
138 num_items, // count
139 MPI_BYTE, // datatype
141 status.MPI_TAG, // tag
143 MPI_STATUS_IGNORE)); // status
144
145 chi_data_types::ByteArray data_array(recv_buffer);
146
147 while (not data_array.EndOfBuffer())
148 {
149 const uint64_t cell_global_id = data_array.Read<uint64_t>();
150 const uint face_id = data_array.Read<uint>();
151 const size_t data_size = data_array.Read<size_t>();
152
153 std::vector<double> psi_data;
154 psi_data.reserve(data_size);
155 for (size_t k = 0; k < data_size; ++k)
156 psi_data.push_back(data_array.Read<double>());
157
158 received_messages[{cell_global_id, face_id}] = std::move(psi_data);
159 cells_who_received_data.push_back(
160 fluds_.GetSPDS().Grid().MapCellGlobalID2LocalID(cell_global_id));
161 } // while not at end of buffer
162 }// Process each message embedded in buffer
163 }
164
165 cbc_fluds_.DeplocsOutgoingMessages().merge(received_messages);
166
167 return cells_who_received_data;
168}
169
170} // namespace lbs
#define uint
static chi::MPI_Info & mpi
Definition: chi_runtime.h:78
MPI_Comm LocICommunicator(int locI) const
int MapIonJ(int locI, int locJ) const
static void Call(int mpi_error_code)
Definition: mpi_info.cc:43
const int & location_id
Current process rank.
Definition: mpi_info.h:26
bool EndOfBuffer() const
Definition: byte_array.h:130
void Write(const T &value)
Definition: byte_array.h:34
size_t MapCellGlobalID2LocalID(uint64_t global_id) const
const chi::ChiMPICommunicatorSet & comm_set_
Definition: AsyncComm.h:36
const SPDS & GetSPDS() const
Definition: FLUDS.h:31
const chi_mesh::MeshContinuum & Grid() const
Definition: SPDS.h:24
const VecInt & GetLocationDependencies() const
Definition: SPDS.h:29
CBC_ASynchronousCommunicator(size_t angle_set_id, chi_mesh::sweep_management::FLUDS &fluds, const chi::ChiMPICommunicatorSet &comm_set)
std::vector< BufferItem > send_buffer_
Definition: CBC_AsyncComm.h:71
std::vector< uint64_t > ReceiveData()
std::vector< double > & InitGetDownwindMessageData(int location_id, uint64_t cell_global_id, unsigned int face_id, size_t angle_set_id, size_t data_size) override
std::tuple< int, uint64_t, unsigned int > MessageKey
Definition: CBC_AsyncComm.h:41
std::map< MessageKey, std::vector< double > > outgoing_message_queue_
Definition: CBC_AsyncComm.h:61
std::map< CellFaceKey, std::vector< double > > & DeplocsOutgoingMessages()
Definition: CBC_FLUDS.h:101