Skip to content

Commit 9dfc278

Browse files
authored
Working on implementing Dask cluster for distributed computations
I have an almost working proof of concept for implementing a Dask distributed cluster when performing the `fitness_func`. This is similar to the parallel processing recently implemented, however this allows the user to distribute that beyond their local machine using their Dask cluster of machines/processes. I'm currently stuck trying to work on debugging a `distributed` bug that I can't make sense of why it's happening. I'll keep working on it.
1 parent 5315bbe commit 9dfc278

File tree

1 file changed

+90
-2
lines changed

1 file changed

+90
-2
lines changed

pygad.py

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import time
66
import warnings
77
import concurrent.futures
8+
from dask import compute, delayed
9+
from distributed import Client
810

911
class GA:
1012

@@ -70,7 +72,7 @@ def __init__(self,
7072
gene_type: The type of the gene. It is assigned to any of these types (int, float, numpy.int, numpy.int8, numpy.int16, numpy.int32, numpy.int64, numpy.uint, numpy.uint8, numpy.uint16, numpy.uint32, numpy.uint64, numpy.float, numpy.float16, numpy.float32, numpy.float64) and forces all the genes to be of that type.
7173
7274
parent_selection_type: Type of parent selection.
73-
keep_parents: If 0, this means no parent in the current population will be used in the next population. If -1, this means all parents in the current population will be used in the next population. If set to a value > 0, then the specified value refers to the number of parents in the current population to be used in the next population. Some parent selection operators such as rank selection, favor population diversity and therefore keeping the parents in the next generation can be beneficial. However, some other parent selection operators, such as roulette wheel selection (RWS), have higher selection pressure and keeping more than one parent in the next generation can seriously harm population diversity. Thanks to Prof. Fernando Jiménez Barrionuevo (http://webs.um.es/fernan) for editing this sentence.
75+
keep_parents: If 0, this means no parent in the current population will be used in the next population. If -1, this means all parents in the current population will be used in the next population. If set to a value > 0, then the specified value refers to the number of parents in the current population to be used in the next population. For some parent selection operators like rank selection, the parents are of high quality and it is beneficial to keep them in the next generation. In some other parent selection operators like roulette wheel selection (RWS), it is not guranteed that the parents will be of high quality and thus keeping the parents might degarde the quality of the population.
7476
K_tournament: When the value of 'parent_selection_type' is 'tournament', the 'K_tournament' parameter specifies the number of solutions from which a parent is selected randomly.
7577
7678
crossover_type: Type of the crossover opreator. If crossover_type=None, then the crossover step is bypassed which means no crossover is applied and thus no offspring will be created in the next generations. The next generation will use the solutions in the current population.
@@ -875,6 +877,25 @@ def __init__(self,
875877

876878
if parallel_processing is None:
877879
self.parallel_processing = None
880+
881+
882+
883+
884+
885+
886+
887+
888+
elif type(parallel_processing == Client):
889+
self.parallel_processing = parallel_processing
890+
891+
892+
893+
894+
895+
896+
897+
898+
878899
elif type(parallel_processing) in GA.supported_int_types:
879900
if parallel_processing > 0:
880901
self.parallel_processing = ["thread", parallel_processing]
@@ -1183,7 +1204,74 @@ def cal_pop_fitness(self):
11831204
raise Exception("ERROR calling the cal_pop_fitness() method: \nPlease check the parameters passed while creating an instance of the GA class.\n")
11841205

11851206
pop_fitness = ["undefined"] * len(self.population)
1186-
if self.parallel_processing is None:
1207+
1208+
1209+
1210+
1211+
1212+
1213+
1214+
1215+
1216+
1217+
1218+
1219+
# If the user has passed in a distributed Client connection in the parallel_processing parameter, do the following
1220+
if type(self.parallel_processing == Client):
1221+
1222+
# Store a list of delayed results, soon to be "computed" in a bit
1223+
delayed_results = []
1224+
1225+
# Also store their solution indexes as I've had to separate the line "pop_fitness[sol_idx] = fitness" into two sections
1226+
delayed_solution_idxs = []
1227+
1228+
# Loop through the solutions, most of this is default
1229+
for sol_idx, sol in enumerate(self.population):
1230+
if (self.save_solutions) and (list(sol) in self.solutions):
1231+
fitness = self.solutions_fitness[self.solutions.index(list(sol))]
1232+
elif (self.last_generation_parents is not None) and len(numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0] > 0):
1233+
parent_idx = numpy.where(numpy.all(self.last_generation_parents == sol, axis=1))[0][0]
1234+
parent_idx = self.last_generation_parents_indices[parent_idx]
1235+
fitness = self.previous_generation_fitness[parent_idx]
1236+
pop_fitness[sol_idx] = fitness # Here's one of the lines previously mentioned.
1237+
1238+
# Here is the logic for setting up some delayed calculations, with their corresponding sol_idx
1239+
else:
1240+
delayed_results.append(delayed(self.fitness_func)(sol, sol_idx))
1241+
delayed_solution_idxs.append(sol_idx)
1242+
1243+
# Took this check out for now while testing. Could reinstate it checking "if type(Delayed)""
1244+
# if type(fitness) in GA.supported_int_float_types:
1245+
# pass
1246+
# else:
1247+
# raise ValueError("The fitness function should return a number but the value {fit_val} of type {fit_type} found.".format(fit_val=fitness, fit_type=type(fitness)))
1248+
1249+
# Perform the fitness calculations by running the "compute" function, which sends
1250+
# the jobs to your Dask cluster
1251+
fitnesses = compute(*delayed_results)
1252+
1253+
# Loop through the results, and add the results to the "pop_fitness" like before
1254+
for fitness, sol_idx in zip(fitnesses, delayed_solution_idxs):
1255+
pop_fitness[sol_idx] = fitness # Here's the second line previously mentioned when working with delayed compute
1256+
1257+
1258+
1259+
1260+
1261+
1262+
1263+
1264+
1265+
1266+
1267+
1268+
1269+
1270+
1271+
1272+
1273+
1274+
elif self.parallel_processing is None:
11871275
# Calculating the fitness value of each solution in the current population.
11881276
for sol_idx, sol in enumerate(self.population):
11891277

0 commit comments

Comments
 (0)