通信基础
MPI发送/接收程序
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
MPI_Init(&argc, &argv);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// 如果当前进程是 0 进程,那么我们就初始化一个数字 -1 然后把它发送给 1 进程。
// 进程使用了 0 作为消息标签来指定消息。
int number;
if (world_rank == 0) {
number = -1;
MPI_Send(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
} else if (world_rank == 1) {
MPI_Recv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
printf("Process 1 received number %d from process 0\n",
number);
}
MPI_Finalize();
}
"C:\Program Files\Microsoft MPI\Bin\mpiexec.exe" -np 2 ./mpi.exe
Process 1 received number -1 from process 0
Process finished with exit code 0
环程序
#include "mpi.h"
#include "stdio.h"
// 环程序在进程0上面初始化了一个值1,赋值给send_data。
// 然后这个值会依次传递给每个进程并且+1。程序会在进程0从最后一个进程接收到值之后结束。
int main(int argc, char **argv){
MPI_Init(&argc, &argv);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int send_data = 1;
int recv_data;
int send_to;
int recv_from;
if (world_rank == 0){
send_to = 1;
recv_from = world_size - 1;
MPI_Send(&send_data, 1, MPI_INT, send_to, 0, MPI_COMM_WORLD);
}else if(world_rank == world_size - 1){
send_to = 0;
recv_from = world_rank - 1;
}else{
send_to = world_rank + 1;
recv_from = world_rank - 1;
}
MPI_Recv(&recv_data, 1, MPI_INT, recv_from, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
send_data = recv_data + 1;
if (world_rank != 0) MPI_Send(&send_data, 1, MPI_INT, send_to, 0, MPI_COMM_WORLD);
printf("Process %d received data %d from process %d\n", world_rank, recv_data, recv_from);
MPI_Finalize();
}
"C:\Program Files\Microsoft MPI\Bin\mpiexec.exe" -np 6 ./mpi.exe
Process 0 received data 6 from process 5
Process 3 received data 3 from process 2
Process 4 received data 4 from process 3
Process 2 received data 2 from process 1
Process 5 received data 5 from process 4
Process 1 received data 1 from process 0
Process finished with exit code 0
动态地接受信息
重要的结构体与函数
MPI_Status
typedef struct MPI_Status { int internal[2]; int MPI_SOURCE; int MPI_TAG; int MPI_ERROR; } MPI_Status;
- MPI_SOURCE:发送端秩,也就是说,如果我们声明一个 MPI_Status stat 变量,则可以通过 stat.MPI_SOURCE 访问秩。
- MPI_TAG:消息的标签。
- MPI_ERROR:错误码。
MPI_Get_count
MPI_METHOD MPI_Get_count( _In_ const MPI_Status* status, _In_ MPI_Datatype datatype, _mpi_out_(count, MPI_UNDEFINED) int* count );
MPI_Get_count 函数中,使用者需要传递 MPI_Status 结构体,消息的 datatype(数据类型),并返回 count。 变量 count 是已接收的 datatype 元素的数目。
MPI_Probe
MPI_METHOD MPI_Probe( _In_range_(>=, MPI_ANY_SOURCE) int source, _In_range_(>=, MPI_ANY_TAG) int tag, _In_ MPI_Comm comm, _Out_ MPI_Status* status );
MPI_Probe 将阻塞具有匹配标签和发送端的消息。当消息可用时,它将填充 status 结构体。 然后,用户可以使用 MPI_Recv 接收实际的消息。
例子:使用MPI_Status查询
#include "mpi.h"
#include "stdio.h"
#include "time.h"
#include "stdlib.h"
int main(int args, char **argv){
MPI_Init(&args, &argv);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
int numbers[100];
if(world_rank == 0){ // send
for (int i = 0; i < 100; ++i) {
numbers[i] = i;
}
srand((unsigned)time(NULL));
int send_num_count = rand() % 100;
MPI_Send(&numbers, send_num_count, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("process 0 sent %d numbers to process 1.\n", send_num_count);
}else{
MPI_Status status;
// 如果发送的元素多于所需的接收数量,则返回错误。
MPI_Recv(&numbers, 100, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);
int recv_num_count;
MPI_Get_count(&status, MPI_INT, &recv_num_count);
printf("process 1 received %d numbers from process 0. Message source = %d, tag = %d\n",
recv_num_count, status.MPI_SOURCE, status.MPI_TAG);
}
MPI_Finalize();
}
"C:\Program Files\Microsoft MPI\Bin\mpiexec.exe" -np 2 ./DynamicReceiving
process 0 sent 63 numbers to process 1.
process 1 received 63 numbers from process 0. Message source = 0,tag = 0
Process finished with exit code 0
例子:使用MPI_Probe找出消息大小
#include "mpi.h"
#include "stdio.h"
#include "time.h"
#include "stdlib.h"
int main(int args, char **argv){
MPI_Init(&args, &argv);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
if(world_rank == 0){ // send
int numbers[100];
for (int i = 0; i < 100; ++i) {
numbers[i] = i;
}
srand((unsigned)time(NULL));
int send_num_count = rand() % 100;
MPI_Send(&numbers, send_num_count, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("process 0 sent %d numbers to process 1.\n", send_num_count);
}else{
MPI_Status status;
int recv_num_count;
MPI_Probe(0, 0, MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_INT, &recv_num_count);
int* numberBuffersize = (int *)malloc(sizeof (int) * recv_num_count); // 动态分配缓冲区
MPI_Recv(numberBuffersize, recv_num_count, MPI_INT, 0, 0, MPI_COMM_WORLD, &status);
printf("process 1 received %d numbers from process 0. Message source = %d, tag = %d\n",
recv_num_count, status.MPI_SOURCE, status.MPI_TAG);
// for (int i = 0; i < recv_num_count; ++i) {
// printf("%d ", numberBuffersize[i]);
// }
}
MPI_Finalize();
}
"C:\Program Files\Microsoft MPI\Bin\mpiexec.exe" -np 2 ./DynamicReceiving_MPI_Probe
process 0 sent 97 numbers to process 1.
process 1 received 97 numbers from process 0. Message source = 0, tag = 0
Process finished with exit code 0
Comments | NOTHING