-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscan.cpp
More file actions
123 lines (117 loc) · 3.18 KB
/
scan.cpp
File metadata and controls
123 lines (117 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// mpic++ -o scan scan.cpp
#include <stdio.h>
#include <cstdlib>
#include <mpi.h>
#include <iostream>
#if defined(_OPENMP)
#include <omp.h>
#endif
// Generate int random numbers in an array
void scan_seq(long* prefix_sum, const long* A, long n) {
if (n == 0) return;
prefix_sum[0] = A[0];
for (long i = 1; i < n; i++) {
prefix_sum[i] = prefix_sum[i-1] + A[i];
}
}
#if defined(_OPENMP)
void scan_omp(long* prefix_sum, const long* A, long n) {
// Suppose n is bigger than the number of threads
if (n == 0) return;
long* r;
omp_set_num_threads(4);
#pragma omp parallel shared(r)
{
int p = omp_get_num_threads();
int t = omp_get_thread_num();
# pragma omp single
{
r= new long[p];
// printf("Number of threads = %d\n",p);
}
// Fill out parallel scan: One way to do this is array into p chunks
// Do a scan in parallel on each chunk, then share/compute the offset
// through a shared vector and update each chunk by adding the offset
// in parallel
long start_index = t * n / p ;
long end_index = (t + 1) * n / p;
prefix_sum[start_index] = A[start_index];
for (long i = start_index+1; i < end_index; i++) {
prefix_sum[i] = prefix_sum[i-1] + A[i];
}
r[t]=prefix_sum[end_index-1];
#pragma omp barrier
long r_sum=0;
for (size_t i = 0; i < t; i++)
{
r_sum += r[i];
}
for (long j = start_index; j < end_index; j++)
{
prefix_sum[j] += r_sum;
}
}
free(r);
}
#endif
int main(int argc, char** argv)
{
MPI_Init(&argc, &argv);
if (argc < 2) {
printf("Usage: mpirun ./scan <vec_size>\n");
abort();
}
long vec_size=atoi(argv[1]);
int rank, size;
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
printf("Rank %d/%d running on %s.\n", rank, size, processor_name);
long N=size*vec_size;
long* a;
long* a_ref;
long* b=new long[vec_size];
long* prefix_sum_ref=new long[vec_size];
if (rank==0)
{
a=new long[N];
for (long i = 0; i < N; i++)
{
a[i] = rand();
// a[i] = 1;
}
a_ref=new long[N];
scan_seq(a_ref, a, N);
}
MPI_Barrier(comm);
MPI_Scatter(a, vec_size, MPI_LONG, b, vec_size, MPI_LONG, 0, comm);
MPI_Scatter(a_ref, vec_size, MPI_LONG, prefix_sum_ref, vec_size, MPI_LONG, 0, comm);
long* prefix_sum=new long[vec_size];
#if defined(_OPENMP)
scan_omp(prefix_sum, b, vec_size);
#else
scan_seq(prefix_sum, b, vec_size);
#endif
long *r=new long[size];
MPI_Barrier(comm);
MPI_Allgather(&prefix_sum[vec_size-1], 1, MPI_LONG, r, 1, MPI_LONG, comm);
for (int i=0; i<rank;i++){
for (long j=0; j<vec_size; j++){
prefix_sum[j]+=r[i];
}
}
printf("rank %d: sum %ld\n", rank,prefix_sum[vec_size-1]);
long error=0;
for (long i = 0; i < vec_size; i++)
{
if (prefix_sum[i]!=prefix_sum_ref[i])
{
error++;
}
}
printf("rank %d: ,error %ld\n", rank,error);
MPI_Finalize();
}