c - через - указатель на массив си




отправка блоков 2D-массива в C с использованием MPI (2)

Как вы отправляете блоки двухмерного массива на разные процессоры? Предположим, что размер 2D-массива составляет 400x400, и я хочу отправить блоки размером 100X100 на разные процессоры. Идея состоит в том, что каждый процессор будет выполнять вычисления на своем отдельном блоке и отправить свой результат обратно первому процессору для конечного результата.
Я использую MPI в программах на C.


Мне просто легче было проверить это.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

/*
 This is a version with integers, rather than char arrays, presented in this
 very good answer: http://.com/a/9271753/2411320
 It will initialize the 2D array, scatter it, increase every value by 1 and then gather it back.
*/

int malloc2D(int ***array, int n, int m) {
    int i;
    /* allocate the n*m contiguous items */
    int *p = malloc(n*m*sizeof(int));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = malloc(n*sizeof(int*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2D(int ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    int **global, **local;
    const int gridsize=4; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes
    int i, j, p;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2D(&global, gridsize, gridsize);
        int counter = 0;
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++)
                global[i][j] = ++counter;
        }


        printf("Global array is:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }
    }
    //return;

    /* create the local array which we'll process */
    malloc2D(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */
    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(int), &subarrtype);
    MPI_Type_commit(&subarrtype);

    int *globalptr=NULL;
    if (rank == 0)
        globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (i=0; i<procgridsize*procgridsize; i++)
            sendcounts[i] = 1;
        int disp = 0;
        for (i=0; i<procgridsize; i++) {
            for (j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_INT,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (j=0; j<gridsize/procgridsize; j++) {
                    printf("%2d ", local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (i=0; i<gridsize/procgridsize; i++) {
        for (j=0; j<gridsize/procgridsize; j++) {
            local[i][j] += 1; // increase by one the value
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_INT,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2D(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (i=0; i<gridsize; i++) {
            for (j=0; j<gridsize; j++) {
                printf("%2d ", global[i][j]);
            }
            printf("\n");
        }

        free2D(&global);
    }


    MPI_Finalize();

    return 0;
}

Вывод:

linux16:>mpicc -o main main.c
linux16:>mpiexec -n 4 main Global array is:
 1  2  3  4
 5  6  7  8
 9 10 11 12
13 14 15 16
Local process on rank 0 is:
| 1  2 |
| 5  6 |
Local process on rank 1 is:
| 3  4 |
| 7  8 |
Local process on rank 2 is:
| 9 10 |
|13 14 |
Local process on rank 3 is:
|11 12 |
|15 16 |
Processed grid:
 2  3  4  5
 6  7  8  9
10 11 12 13
14 15 16 17

Позвольте мне начать с того, что вы вообще не хотите этого делать - разбросайте и соберите огромные куски данных из какого-то «мастер-процесса». Как правило, вы хотите, чтобы каждая задача отвлекалась на свой кусочек головоломки, и вы должны стремиться к тому, чтобы ни один процессор не нуждался в «глобальном представлении» всех данных; как только вы это потребуете, вы ограничите масштабируемость и размер проблемы. Если вы делаете это для ввода-вывода - один процесс считывает данные, затем рассеивает его, а затем собирает обратно для записи, вам нужно в конечном итоге просмотреть MPI-IO.

Однако, на ваш вопрос, MPI имеет очень хорошие способы вытащить произвольные данные из памяти, а также разбросать / собрать его и из набора процессоров. К сожалению, это требует значительного количества концепций MPI - типов MPI, экстентов и коллективных операций. В ответе на этот вопрос обсуждаются многие основные идеи - MPI_Type_create_subarray и MPI_Gather .

Обновление. В холодном свете дня это много кода и не много объяснений. Поэтому позвольте мне немного расшириться.

Рассмотрим 1d целочисленный глобальный массив, в задаче 0 которого вы хотите распространять несколько задач MPI, чтобы каждый из них получал кусок в своем локальном массиве. Скажем, у вас есть 4 задачи, а глобальный массив - [01234567] . У вас может быть задача 0 отправить четыре сообщения (в том числе один для себя), чтобы распределить это, и когда пришло время для повторной сборки, получите четыре сообщения, чтобы связать их вместе; но это, очевидно, занимает много времени при большом количестве процессов. Существуют оптимизированные процедуры для этих видов операций - операции разбрасывания / сбора. Итак, в этом случае вы делаете что-то вроде этого:

int global[8];   /* only task 0 has this */
int local[2];    /* everyone has this */
const int root = 0;   /* the processor with the initial global data */

if (rank == root) {
   for (int i=0; i<7; i++) global[i] = i;
}

MPI_Scatter(global, 2, MPI_INT,      /* send everyone 2 ints from global */
            local,  2, MPI_INT,      /* each proc receives 2 ints into local */
            root, MPI_COMM_WORLD);   /* sending process is root, all procs in */
                                     /* MPI_COMM_WORLD participate */

После этого данные процессоров будут выглядеть так:

task 0:  local:[01]  global: [01234567]
task 1:  local:[23]  global: [garbage-]
task 2:  local:[45]  global: [garbage-]
task 3:  local:[67]  global: [garbage-]

То есть операция рассеяния принимает глобальный массив и посылает смежные 2-цельные фрагменты всем процессорам.

Чтобы повторно собрать массив, мы используем MPI_Gather() , которая работает точно так же, но наоборот:

for (int i=0; i<2; i++) 
   local[i] = local[i] + rank;

MPI_Gather(local,  2, MPI_INT,      /* everyone sends 2 ints from local */
           global, 2, MPI_INT,      /* root receives 2 ints each proc into global */
           root, MPI_COMM_WORLD);   /* recv'ing process is root, all procs in */
                                    /* MPI_COMM_WORLD participate */

и теперь данные выглядят как

task 0:  local:[01]  global: [0134679a]
task 1:  local:[34]  global: [garbage-]
task 2:  local:[67]  global: [garbage-]
task 3:  local:[9a]  global: [garbage-]

Gather возвращает все данные, и здесь 10, потому что я не думал, что мое форматирование достаточно тщательно, начав этот пример.

Что произойдет, если количество точек данных равномерно не делит количество процессов, и нам нужно отправлять разные количества элементов в каждый процесс? Тогда вам понадобится обобщенная версия разброса, MPI_Scatterv() , которая позволяет вам указывать количество отсчетов для каждого процессора и перемещение - где в глобальном массиве начинается эта часть данных. Итак, допустим, у вас был массив символов [abcdefghi] с 9 символами, и вы планировали присвоить каждому процессу два символа, кроме последнего, который получил три. Тогда вам понадобится

char global[9];   /* only task 0 has this */
char local[3]={'-','-','-'};    /* everyone has this */
int  mynum;                     /* how many items */
const int root = 0;   /* the processor with the initial global data */

if (rank == 0) {
   for (int i=0; i<8; i++) global[i] = 'a'+i;
}

int counts[4] = {2,2,2,3};   /* how many pieces of data everyone has */
mynum = counts[rank];
int displs[4] = {0,2,4,6};   /* the starting point of everyone's data */
                             /* in the global array */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */
            MPI_INT,      
            local, mynum, MPI_INT;   /* I'm receiving mynum MPI_INTs into local */
            root, MPI_COMM_WORLD);

Теперь данные выглядят как

task 0:  local:[ab-]  global: [abcdefghi]
task 1:  local:[cd-]  global: [garbage--]
task 2:  local:[ef-]  global: [garbage--]
task 3:  local:[ghi]  global: [garbage--]

Вы теперь использовали dispav для распределения нерегулярных объемов данных. В каждом случае смещение равно двум * рангам (измеряется в символах, смещение - в единицах типов, отправляемых для разброса или полученных для сбора, обычно это не байты или что-то) с начала массива, а подсчеты {2,2,2,3}. Если бы это был первый процессор, мы хотели иметь 3 символа, мы бы установили counts = {3,2,2,2}, а смещения были бы {0,3,5,7}. Gatherv снова работает точно так же, как и наоборот; подсчеты и массивы вытеснений останутся неизменными.

Теперь, для 2D, это немного сложнее. Если мы хотим отправить 2d-подслои 2-го массива, данные, которые мы отправляем сейчас, уже не смежны. Если мы отправляем (скажем) 3x3 субблоки массива 6x6 на 4 процессора, данные, которые мы отправляем, имеют в нем дыры:

2D Array

   ---------
   |000|111|
   |000|111|
   |000|111|
   |---+---|
   |222|333|
   |222|333|
   |222|333|
   ---------

Actual layout in memory

   [000111000111000111222333222333222333]

(Обратите внимание, что все высокопроизводительные вычисления сводятся к пониманию компоновки данных в памяти.)

Если мы хотим отправить данные, помеченные «1» на задачу 1, нам нужно пропустить три значения, отправить три значения, пропустить три значения, отправить три значения, пропустить три значения, отправить три значения. Второе осложнение заключается в том, что субрегионы останавливаются и запускаются; обратите внимание, что область «1» не начинается, когда область «0» останавливается; после последнего элемента области «0» следующее место в памяти происходит частично через область «1».

Сначала рассмотрим первую проблему с макетом - как вытащить только те данные, которые мы хотим отправить. Мы всегда могли просто скопировать все данные области «0» в другой смежный массив и отправить это; если мы планировали это достаточно тщательно, мы могли бы сделать это так, чтобы мы могли называть MPI_Scatter результатами. Но нам бы не пришлось переносить всю нашу основную структуру данных таким образом.

До сих пор все типы данных MPI, которые мы использовали, были простыми - MPI_INT задает (скажем) 4 байта подряд. Однако MPI позволяет создавать собственные типы данных, которые описывают произвольно сложные макеты данных в памяти. И этот случай - прямоугольные субрегионы массива - достаточно распространен, что для этого существует конкретный вызов. Для двумерного случая, описанного выше,

    MPI_Datatype newtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype);
    MPI_Type_commit(&newtype);

Это создает тип, который выбирает только область «0» из глобального массива; мы могли бы отправить только эту часть данных другому процессору

    MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "0" */

и процесс приема может получить его в локальный массив. Обратите внимание, что процесс получения, если он принимает его только в массив 3x3, не может описать, что он получает как тип newtype ; который больше не описывает макет памяти. Вместо этого он просто получает блок из 3 * 3 = 9 целых чисел:

    MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD);

Обратите внимание, что мы могли бы сделать это и для других субрегионов либо путем создания другого типа (с другим start массивом) для других блоков, либо просто путем отправки в начальной точке конкретного блока:

    MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "1" */
    MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "2" */
    MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD);  /* region "3" */

Наконец, обратите внимание, что мы требуем, чтобы глобальные и локальные были смежными кусками памяти здесь; то есть &(global[0][0]) и &(local[0][0]) (или, что то же самое, *global и *local указывают на смежные 6 * 6 и 3 * 3 куска памяти, 't гарантируется обычным способом выделения динамических массивов multi-d. Показывается, как это сделать ниже.

Теперь, когда мы понимаем, как указывать субрегионы, перед использованием операций рассеяния / сбора есть еще одна вещь, и это «размер» этих типов. Мы не могли просто использовать MPI_Scatter() (или даже scatterv) с этими типами еще, поскольку эти типы имеют размер 16 целых чисел; то есть, где они заканчиваются, это 16 целых чисел после их начала - и там, где они заканчиваются, не выстраиваются хорошо, где начинается следующий блок, поэтому мы не можем просто использовать разброс - он бы выбрал неправильное место для начала отправки данных к следующему процессору.

Конечно, мы могли бы использовать MPI_Scatterv() и сами определять перемещения, и это то, что мы будем делать, за исключением того, что смещения находятся в единицах размера отправляемого типа, и это тоже нам не помогает; блоки начинаются с смещений (0,3,18,21) целых чисел от начала глобального массива, а тот факт, что блок заканчивает 16 целых чисел от того, где он начинается, не позволяет нам выразить эти смещения в целых кратных значениях вообще ,

Чтобы справиться с этим, MPI позволяет вам установить размер этого типа для целей этих вычислений. Он не усекает тип; он просто используется для выяснения, где следующий элемент начинается с последнего элемента. Для таких типов, как эти с отверстиями в них, часто бывает необходимо установить, чтобы степень была чем-то меньшим, чем расстояние в памяти до фактического конца типа.

Мы можем установить, насколько это все, что нам удобно. Мы могли бы просто сделать размер 1 целым, а затем установить смещения в единицах целых чисел. В этом случае, однако, мне нравится указывать, что степень равна 3 целым числам - размер подстроки - таким образом, блок «1» начинается сразу после блока «0», а блок «3» начинается сразу после блока « 2" . К сожалению, это не очень хорошо работает, когда вы прыгаете из блока «2» в блок «3», но это не может помочь.

Поэтому, чтобы разброс субблоков в этом случае, мы сделали бы следующее:

    MPI_Datatype type, resizedtype;
    int sizes[2]    = {6,6};  /* size of global array */
    int subsizes[2] = {3,3};  /* size of sub-region */
    int starts[2]   = {0,0};  /* let's say we're looking at region "0",
                                 which begins at index [0,0] */

    /* as before */
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type);  
    /* change the extent of the type */
    MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype);
    MPI_Type_commit(&resizedtype);

Здесь мы создали тот же тип блока, что и раньше, но мы изменили его размер; мы не изменили, где тип «начинается» (0), но мы изменили его «завершение» (3 интервала). Мы не упомянули об этом раньше, но MPI_Type_commit должен использовать этот тип; но вам нужно только зафиксировать конечный тип, который вы используете, а не какие-либо промежуточные шаги. Вы используете MPI_Type_free чтобы освободить тип, когда закончите.

Итак, теперь, наконец, мы можем разбросать блоки: манипуляции с данными выше немного сложны, но как только это делается, рассеиватель выглядит так же, как и раньше:

int counts[4] = {1,1,1,1};   /* how many pieces of data everyone has, in units of blocks */
int displs[4] = {0,1,6,7};   /* the starting point of everyone's data */
                             /* in the global array, in block extents */

MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */
            resizedtype,      
            local, 3*3, MPI_INT;   /* I'm receiving 3*3 MPI_INTs into local */
            root, MPI_COMM_WORLD);

И теперь мы закончили, после небольшого тура по разбросу, сбору и производным типам MPI.

Ниже приведен пример кода, который показывает как операцию сбора, так и операцию рассеяния с массивами символов. Запуск программы:

$ mpirun -n 4 ./gathervarray
Global array is:
0123456789
3456789012
6789012345
9012345678
2345678901
5678901234
8901234567
1234567890
4567890123
7890123456
Local process on rank 0 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Local process on rank 1 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 2 is:
|56789|
|89012|
|12345|
|45678|
|78901|
Local process on rank 3 is:
|01234|
|34567|
|67890|
|90123|
|23456|
Processed grid:
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
AAAAABBBBB
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD
CCCCCDDDDD

и код следует.

#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#include "mpi.h"

int malloc2dchar(char ***array, int n, int m) {

    /* allocate the n*m contiguous items */
    char *p = (char *)malloc(n*m*sizeof(char));
    if (!p) return -1;

    /* allocate the row pointers into the memory */
    (*array) = (char **)malloc(n*sizeof(char*));
    if (!(*array)) {
       free(p);
       return -1;
    }

    /* set up the pointers into the contiguous memory */
    for (int i=0; i<n; i++)
       (*array)[i] = &(p[i*m]);

    return 0;
}

int free2dchar(char ***array) {
    /* free the memory - the first element of the array is at the start */
    free(&((*array)[0][0]));

    /* free the pointers into the memory */
    free(*array);

    return 0;
}

int main(int argc, char **argv) {
    char **global, **local;
    const int gridsize=10; // size of grid
    const int procgridsize=2;  // size of process grid
    int rank, size;        // rank of current process and no. of processes

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);


    if (size != procgridsize*procgridsize) {
        fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize);
        MPI_Abort(MPI_COMM_WORLD,1);
    }


    if (rank == 0) {
        /* fill in the array, and print it */
        malloc2dchar(&global, gridsize, gridsize);
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                global[i][j] = '0'+(3*i+j)%10;
        }


        printf("Global array is:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++)
                putchar(global[i][j]);

            printf("\n");
        }
    }

    /* create the local array which we'll process */
    malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize);

    /* create a datatype to describe the subarrays of the global array */

    int sizes[2]    = {gridsize, gridsize};         /* global size */
    int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize};     /* local size */
    int starts[2]   = {0,0};                        /* where this one starts */
    MPI_Datatype type, subarrtype;
    MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type);
    MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype);
    MPI_Type_commit(&subarrtype);

    char *globalptr=NULL;
    if (rank == 0) globalptr = &(global[0][0]);

    /* scatter the array to all processors */
    int sendcounts[procgridsize*procgridsize];
    int displs[procgridsize*procgridsize];

    if (rank == 0) {
        for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1;
        int disp = 0;
        for (int i=0; i<procgridsize; i++) {
            for (int j=0; j<procgridsize; j++) {
                displs[i*procgridsize+j] = disp;
                disp += 1;
            }
            disp += ((gridsize/procgridsize)-1)*procgridsize;
        }
    }


    MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]),
                 gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR,
                 0, MPI_COMM_WORLD);

    /* now all processors print their local data: */

    for (int p=0; p<size; p++) {
        if (rank == p) {
            printf("Local process on rank %d is:\n", rank);
            for (int i=0; i<gridsize/procgridsize; i++) {
                putchar('|');
                for (int j=0; j<gridsize/procgridsize; j++) {
                    putchar(local[i][j]);
                }
                printf("|\n");
            }
        }
        MPI_Barrier(MPI_COMM_WORLD);
    }

    /* now each processor has its local array, and can process it */
    for (int i=0; i<gridsize/procgridsize; i++) {
        for (int j=0; j<gridsize/procgridsize; j++) {
            local[i][j] = 'A' + rank;
        }
    }

    /* it all goes back to process 0 */
    MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize),  MPI_CHAR,
                 globalptr, sendcounts, displs, subarrtype,
                 0, MPI_COMM_WORLD);

    /* don't need the local data anymore */
    free2dchar(&local);

    /* or the MPI data type */
    MPI_Type_free(&subarrtype);

    if (rank == 0) {
        printf("Processed grid:\n");
        for (int i=0; i<gridsize; i++) {
            for (int j=0; j<gridsize; j++) {
                putchar(global[i][j]);
            }
            printf("\n");
        }

        free2dchar(&global);
    }


    MPI_Finalize();

    return 0;
}




mpi