Chi-Tech
AAH_AsynComm_receiveupstreampsi.cc
Go to the documentation of this file.
1#include "AAH_AsynComm.h"
2
5
7
8#include "chi_log.h"
9#include "chi_mpi.h"
10
11// ###################################################################
12/**Check if all upstream dependencies have been met and receives
13 * it as it becomes available.*/
16{
17 const auto& spds = fluds_.GetSPDS();
18
19 //============================== Resize FLUDS non-local incoming Data
20 const size_t num_loc_deps = spds.GetLocationDependencies().size();
22 {
24 num_groups_, num_angles_, num_loc_deps);
25
27 }
28
29 //============================== Assume all data is available and now try
30 // to receive all of it
31 bool ready_to_execute = true;
32 for (size_t prelocI = 0; prelocI < num_loc_deps; prelocI++)
33 {
34 int locJ = spds.GetLocationDependencies()[prelocI];
35
36 size_t num_mess = prelocI_message_count[prelocI];
37 for (int m = 0; m < num_mess; m++)
38 {
39 if (!prelocI_message_received[prelocI][m])
40 {
41 int message_available = 0;
42 MPI_Iprobe(comm_set_.MapIonJ(locJ, Chi::mpi.location_id),
43 max_num_mess * angle_set_num + m, // tag
45 &message_available,
46 MPI_STATUS_IGNORE);
47
48 if (not message_available)
49 {
50 ready_to_execute = false;
51 continue;
52 } // if message is not available
53
54 //============================ Receive upstream data
55 auto& upstream_psi = fluds_.PrelocIOutgoingPsi()[prelocI];
56
57 u_ll_int block_addr = prelocI_message_blockpos[prelocI][m];
58 u_ll_int message_size = prelocI_message_size[prelocI][m];
59
60 int error_code =
61 MPI_Recv(&upstream_psi[block_addr],
62 static_cast<int>(message_size),
63 MPI_DOUBLE,
65 max_num_mess * angle_set_num + m, // tag
67 MPI_STATUS_IGNORE);
68
69 prelocI_message_received[prelocI][m] = true;
70
71 if (error_code != MPI_SUCCESS)
72 {
73 std::stringstream err_stream;
74 err_stream << "################# Delayed receive error."
75 << " message size=" << message_size
76 << " as_num=" << angle_set_num << " num_mess=" << num_mess
77 << " m=" << m << " error="
78 << " size=\n";
79 char error_string[BUFSIZ];
80 int length_of_error_string, error_class;
81 MPI_Error_class(error_code, &error_class);
82 MPI_Error_string(error_class, error_string, &length_of_error_string);
83 err_stream << error_string << "\n";
84 MPI_Error_string(error_code, error_string, &length_of_error_string);
85 err_stream << error_string << "\n";
86 Chi::log.LogAllWarning() << err_stream.str();
87 }
88 } // if not message already received
89 } // for message
90
91 if (!ready_to_execute) break;
92 } // for predecessor
93
94 if (!ready_to_execute) return AngleSetStatus::RECEIVING;
95 else
97}
unsigned long long int u_ll_int
Definition: AAH_AsynComm.h:9
static chi::ChiLog & log
Definition: chi_runtime.h:81
static chi::MPI_Info & mpi
Definition: chi_runtime.h:78
LogStream LogAllWarning()
Definition: chi_log.h:238
MPI_Comm LocICommunicator(int locI) const
int MapIonJ(int locI, int locJ) const
const int & location_id
Current process rank.
Definition: mpi_info.h:26
std::vector< std::vector< bool > > prelocI_message_received
Definition: AAH_AsynComm.h:48
std::vector< std::vector< u_ll_int > > prelocI_message_size
Definition: AAH_AsynComm.h:40
std::vector< std::vector< u_ll_int > > prelocI_message_blockpos
Definition: AAH_AsynComm.h:44
const chi::ChiMPICommunicatorSet & comm_set_
Definition: AsyncComm.h:36
virtual void AllocatePrelocIOutgoingPsi(size_t num_grps, size_t num_angles, size_t num_loc_deps)
Definition: FLUDS.h:42
virtual std::vector< std::vector< double > > & PrelocIOutgoingPsi()=0
const SPDS & GetSPDS() const
Definition: FLUDS.h:31
const VecInt & GetLocationDependencies() const
Definition: SPDS.h:29