Skip to content

Commit 17f8dfe

Browse files
committed
rewrite reader and master assignment
1 parent a5def43 commit 17f8dfe

File tree

2 files changed

+62
-70
lines changed

2 files changed

+62
-70
lines changed

src/ne_partitioner.cpp

Lines changed: 62 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,14 @@ NePartitioner::NePartitioner(std::string basefilename)
1313
total_time.start();
1414
LOG(INFO) << "initializing partitioner";
1515

16-
fin = open(binedgelist_name(basefilename).c_str(), O_RDONLY, (mode_t)0600);
17-
PCHECK(fin != -1) << "Error opening file for read";
18-
struct stat fileInfo = {0};
19-
PCHECK(fstat(fin, &fileInfo) != -1) << "Error getting the file size";
20-
PCHECK(fileInfo.st_size != 0) << "Error: file is empty";
21-
LOG(INFO) << "file size: " << fileInfo.st_size;
22-
23-
fin_map = (char *)mmap(0, fileInfo.st_size, PROT_READ, MAP_SHARED, fin, 0);
24-
if (fin_map == MAP_FAILED) {
25-
close(fin);
26-
PLOG(FATAL) << "error mapping the file";
27-
}
28-
29-
filesize = fileInfo.st_size;
30-
fin_ptr = fin_map;
31-
fin_end = fin_map + filesize;
16+
std::ifstream fin(binedgelist_name(basefilename),
17+
std::ios::binary | std::ios::ate);
18+
auto filesize = fin.tellg();
19+
LOG(INFO) << "file size: " << filesize;
20+
fin.seekg(0, std::ios::beg);
3221

33-
num_vertices = *(vid_t *)fin_ptr;
34-
fin_ptr += sizeof(vid_t);
35-
num_edges = *(size_t *)fin_ptr;
36-
fin_ptr += sizeof(size_t);
22+
fin.read((char *)&num_vertices, sizeof(num_vertices));
23+
fin.read((char *)&num_edges, sizeof(num_edges));
3724

3825
LOG(INFO) << "num_vertices: " << num_vertices
3926
<< ", num_edges: " << num_edges;
@@ -48,58 +35,81 @@ NePartitioner::NePartitioner(std::string basefilename)
4835
adj_in.resize(num_vertices);
4936
is_cores.assign(p, dense_bitset(num_vertices));
5037
is_boundarys.assign(p, dense_bitset(num_vertices));
38+
master.assign(num_vertices, -1);
5139
dis.param(std::uniform_int_distribution<vid_t>::param_type(0, num_vertices - 1));
5240

53-
degrees.resize(num_vertices);
54-
std::ifstream degree_file(degree_name(basefilename), std::ios::binary);
55-
degree_file.read((char *)&degrees[0], num_vertices * sizeof(vid_t));
56-
degree_file.close();
57-
};
58-
59-
void NePartitioner::load_graph()
60-
{
41+
Timer read_timer;
42+
read_timer.start();
6143
LOG(INFO) << "loading...";
62-
while (fin_ptr < fin_end) {
63-
const edge_t *e = (edge_t *)fin_ptr;
64-
edges.push_back(*e);
65-
fin_ptr += sizeof(edge_t);
66-
}
44+
edges.resize(num_edges);
45+
fin.read((char *)&edges[0], sizeof(edge_t) * num_edges);
6746

6847
LOG(INFO) << "constructing...";
6948
adj_out.build(edges);
7049
adj_in.build_reverse(edges);
71-
}
50+
51+
degrees.resize(num_vertices);
52+
std::ifstream degree_file(degree_name(basefilename), std::ios::binary);
53+
degree_file.read((char *)&degrees[0], num_vertices * sizeof(vid_t));
54+
degree_file.close();
55+
read_timer.stop();
56+
LOG(INFO) << "time used for graph input and construction: " << read_timer.get_time();
57+
};
7258

7359
void NePartitioner::assign_remaining()
7460
{
61+
auto &is_boundary = is_boundarys[p - 1], &is_core = is_cores[p - 1];
7562
repv (u, num_vertices)
7663
for (auto &i : adj_out[u])
77-
if (edges[i].valid())
64+
if (edges[i].valid()) {
7865
assign_edge(p - 1, u, edges[i].second);
66+
is_boundary.set_bit_unsync(u);
67+
is_boundary.set_bit_unsync(edges[i].second);
68+
}
69+
70+
repv (i, num_vertices) {
71+
if (is_boundary.get(i)) {
72+
is_core.set_bit_unsync(i);
73+
rep (j, p - 1)
74+
if (is_cores[j].get(i)) {
75+
is_core.set_unsync(i, false);
76+
break;
77+
}
78+
}
79+
}
7980
}
8081

8182
void NePartitioner::assign_master()
8283
{
8384
std::vector<vid_t> count_master(p, 0);
84-
std::vector<bool> allocated(num_vertices);
85-
rep (i, p-1) {
86-
auto is_core = is_cores[i];
87-
repv (v, num_vertices)
88-
if (is_core.get(v)) {
89-
count_master[i]++;
90-
allocated[v] = true;
91-
writer.save_vertex(v, i);
92-
}
93-
}
94-
repv (v, num_vertices)
95-
if (!allocated[v]) {
96-
count_master[p-1]++;
97-
writer.save_vertex(v, p-1);
85+
std::vector<vid_t> quota(p, num_vertices);
86+
long long sum = p * num_vertices;
87+
std::uniform_real_distribution<double> distribution(0.0, 1.0);
88+
std::vector<dense_bitset::iterator> pos(p);
89+
rep (b, p)
90+
pos[b] = is_boundarys[b].begin();
91+
vid_t count = 0;
92+
while (count < num_vertices) {
93+
long long r = distribution(gen) * sum;
94+
int k;
95+
for (k = 0; k < p; k++) {
96+
if (r < quota[k])
97+
break;
98+
r -= quota[k];
99+
}
100+
while (pos[k] != is_boundarys[k].end() && master[*pos[k]] != -1)
101+
pos[k]++;
102+
if (pos[k] != is_boundarys[k].end()) {
103+
count++;
104+
master[*pos[k]] = k;
105+
writer.save_vertex(*pos[k], k);
106+
count_master[k]++;
107+
quota[k]--;
108+
sum--;
98109
}
110+
}
99111
int max_masters =
100112
*std::max_element(count_master.begin(), count_master.end());
101-
vid_t total_master = std::accumulate(count_master.begin(), count_master.end(), 0);
102-
CHECK_EQ(total_master, num_vertices);
103113
LOG(INFO) << "master balance: "
104114
<< (double)max_masters / ((double)num_vertices / p);
105115
}
@@ -117,14 +127,9 @@ void NePartitioner::split()
117127
LOG(INFO) << "partition `" << basefilename << "'";
118128
LOG(INFO) << "number of partitions: " << p;
119129

120-
Timer read_timer, compute_timer;
130+
Timer compute_timer;
121131

122132
min_heap.reserve(num_vertices);
123-
edges.reserve(num_edges);
124-
125-
read_timer.start();
126-
load_graph();
127-
read_timer.stop();
128133

129134
LOG(INFO) << "partitioning...";
130135
compute_timer.start();
@@ -174,15 +179,8 @@ void NePartitioner::split()
174179
size_t total_mirrors = count_mirrors();
175180
LOG(INFO) << "total mirrors: " << total_mirrors;
176181
LOG(INFO) << "replication factor: " << (double)total_mirrors / num_vertices;
177-
LOG(INFO) << "time used for graph input and construction: " << read_timer.get_time();
178182
LOG(INFO) << "time used for partitioning: " << compute_timer.get_time();
179183

180-
if (munmap(fin_map, filesize) == -1) {
181-
close(fin);
182-
PLOG(FATAL) << "Error un-mmapping the file";
183-
}
184-
close(fin);
185-
186184
CHECK_EQ(assigned_edges, num_edges);
187185

188186
total_time.stop();

src/ne_partitioner.hpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,6 @@ class NePartitioner : public Partitioner
3131
double average_degree;
3232
size_t capacity;
3333

34-
// use mmap for file input
35-
int fin;
36-
off_t filesize;
37-
char *fin_map, *fin_ptr, *fin_end;
38-
3934
std::vector<edge_t> edges;
4035
graph_t adj_out, adj_in;
4136
MinHeap<vid_t, vid_t> min_heap;
@@ -167,7 +162,6 @@ class NePartitioner : public Partitioner
167162
return true;
168163
}
169164

170-
void load_graph();
171165
void assign_remaining();
172166
void assign_master();
173167
size_t count_mirrors();

0 commit comments

Comments
 (0)