remove useless codes

This commit is contained in:
wls2002
2023-05-07 16:30:26 +08:00
parent 890c928b0f
commit cec40b254f
24 changed files with 3 additions and 2601 deletions

View File

@@ -1,5 +0,0 @@
from .genome import create_initialize_function, expand, expand_single, analysis, pop_analysis
from .distance import distance
from .mutate import create_mutate_function
from .forward import create_forward_function
from .crossover import batch_crossover

View File

@@ -1,113 +0,0 @@
import numpy as np
def sigmoid_act(z):
z = np.clip(z * 5, -60, 60)
return 1 / (1 + np.exp(-z))
def tanh_act(z):
z = np.clip(z * 2.5, -60, 60)
return np.tanh(z)
def sin_act(z):
z = np.clip(z * 5, -60, 60)
return np.sin(z)
def gauss_act(z):
z = np.clip(z, -3.4, 3.4)
return np.exp(-5 * z ** 2)
def relu_act(z):
return np.maximum(z, 0)
def elu_act(z):
return np.where(z > 0, z, np.exp(z) - 1)
def lelu_act(z):
leaky = 0.005
return np.where(z > 0, z, leaky * z)
def selu_act(z):
lam = 1.0507009873554804934193349852946
alpha = 1.6732632423543772848170429916717
return np.where(z > 0, lam * z, lam * alpha * (np.exp(z) - 1))
def softplus_act(z):
z = np.clip(z * 5, -60, 60)
return 0.2 * np.log(1 + np.exp(z))
def identity_act(z):
return z
def clamped_act(z):
return np.clip(z, -1, 1)
def inv_act(z):
return 1 / z
def log_act(z):
z = np.maximum(z, 1e-7)
return np.log(z)
def exp_act(z):
z = np.clip(z, -60, 60)
return np.exp(z)
def abs_act(z):
return np.abs(z)
def hat_act(z):
return np.maximum(0, 1 - np.abs(z))
def square_act(z):
return z ** 2
def cube_act(z):
return z ** 3
ACT_TOTAL_LIST = [sigmoid_act, tanh_act, sin_act, gauss_act, relu_act, elu_act, lelu_act, selu_act, softplus_act,
identity_act, clamped_act, inv_act, log_act, exp_act, abs_act, hat_act, square_act, cube_act]
act_name2key = {
'sigmoid': 0,
'tanh': 1,
'sin': 2,
'gauss': 3,
'relu': 4,
'elu': 5,
'lelu': 6,
'selu': 7,
'softplus': 8,
'identity': 9,
'clamped': 10,
'inv': 11,
'log': 12,
'exp': 13,
'abs': 14,
'hat': 15,
'square': 16,
'cube': 17,
}
def act(idx, z):
idx = np.asarray(idx, dtype=np.int32)
return ACT_TOTAL_LIST[idx](z)

View File

@@ -1,85 +0,0 @@
"""
aggregations, two special case need to consider:
1. extra 0s
2. full of 0s
"""
import numpy as np
def sum_agg(z):
z = np.where(np.isnan(z), 0, z)
return np.sum(z, axis=0)
def product_agg(z):
z = np.where(np.isnan(z), 1, z)
return np.prod(z, axis=0)
def max_agg(z):
z = np.where(np.isnan(z), -np.inf, z)
return np.max(z, axis=0)
def min_agg(z):
z = np.where(np.isnan(z), np.inf, z)
return np.min(z, axis=0)
def maxabs_agg(z):
z = np.where(np.isnan(z), 0, z)
abs_z = np.abs(z)
max_abs_index = np.argmax(abs_z)
return z[max_abs_index]
def median_agg(z):
non_zero_mask = ~np.isnan(z)
n = np.sum(non_zero_mask, axis=0)
z = np.where(np.isnan(z), np.inf, z)
sorted_valid_values = np.sort(z)
if n % 2 == 0:
return (sorted_valid_values[n // 2 - 1] + sorted_valid_values[n // 2]) / 2
else:
return sorted_valid_values[n // 2]
def mean_agg(z):
non_zero_mask = ~np.isnan(z)
valid_values_sum = sum_agg(z)
valid_values_count = np.sum(non_zero_mask, axis=0)
mean_without_zeros = valid_values_sum / valid_values_count
return mean_without_zeros
AGG_TOTAL_LIST = [sum_agg, product_agg, max_agg, min_agg, maxabs_agg, median_agg, mean_agg]
agg_name2key = {
'sum': 0,
'product': 1,
'max': 2,
'min': 3,
'maxabs': 4,
'median': 5,
'mean': 6,
}
def agg(idx, z):
idx = np.asarray(idx, dtype=np.int32)
if np.all(z == 0.):
return 0
else:
return AGG_TOTAL_LIST[idx](z)
if __name__ == '__main__':
array = np.asarray([1, 2, np.nan, np.nan, 3, 4, 5, np.nan, np.nan, np.nan, np.nan], dtype=np.float32)
for names in agg_name2key.keys():
print(names, agg(agg_name2key[names], array))
array2 = np.asarray([0, 0, 0, 0], dtype=np.float32)
for names in agg_name2key.keys():
print(names, agg(agg_name2key[names], array2))

View File

@@ -1,90 +0,0 @@
from typing import Tuple
import numpy as np
from numpy.typing import NDArray
from .utils import flatten_connections, unflatten_connections
def batch_crossover(batch_nodes1: NDArray, batch_connections1: NDArray, batch_nodes2: NDArray,
batch_connections2: NDArray) -> Tuple[NDArray, NDArray]:
"""
crossover a batch of genomes
:param batch_nodes1:
:param batch_connections1:
:param batch_nodes2:
:param batch_connections2:
:return:
"""
res_nodes, res_cons = [], []
for (n1, c1, n2, c2) in zip(batch_nodes1, batch_connections1, batch_nodes2, batch_connections2):
new_nodes, new_cons = crossover(n1, c1, n2, c2)
res_nodes.append(new_nodes)
res_cons.append(new_cons)
return np.stack(res_nodes, axis=0), np.stack(res_cons, axis=0)
def crossover(nodes1: NDArray, connections1: NDArray, nodes2: NDArray, connections2: NDArray) \
-> Tuple[NDArray, NDArray]:
"""
use genome1 and genome2 to generate a new genome
notice that genome1 should have higher fitness than genome2 (genome1 is winner!)
:param nodes1:
:param connections1:
:param nodes2:
:param connections2:
:return:
"""
# crossover nodes
keys1, keys2 = nodes1[:, 0], nodes2[:, 0]
nodes2 = align_array(keys1, keys2, nodes2, 'node')
new_nodes = np.where(np.isnan(nodes1) | np.isnan(nodes2), nodes1, crossover_gene(nodes1, nodes2))
# crossover connections
cons1 = flatten_connections(keys1, connections1)
cons2 = flatten_connections(keys2, connections2)
con_keys1, con_keys2 = cons1[:, :2], cons2[:, :2]
cons2 = align_array(con_keys1, con_keys2, cons2, 'connection')
new_cons = np.where(np.isnan(cons1) | np.isnan(cons2), cons1, crossover_gene(cons1, cons2))
new_cons = unflatten_connections(len(keys1), new_cons)
return new_nodes, new_cons
def align_array(seq1: NDArray, seq2: NDArray, ar2: NDArray, gene_type: str) -> NDArray:
"""
make ar2 align with ar1.
:param seq1:
:param seq2:
:param ar2:
:param gene_type:
:return:
align means to intersect part of ar2 will be at the same position as ar1,
non-intersect part of ar2 will be set to Nan
"""
seq1, seq2 = seq1[:, np.newaxis], seq2[np.newaxis, :]
mask = (seq1 == seq2) & (~np.isnan(seq1))
if gene_type == 'connection':
mask = np.all(mask, axis=2)
intersect_mask = mask.any(axis=1)
idx = np.arange(0, len(seq1))
idx_fixed = np.dot(mask, idx)
refactor_ar2 = np.where(intersect_mask[:, np.newaxis], ar2[idx_fixed], np.nan)
return refactor_ar2
def crossover_gene(g1: NDArray, g2: NDArray) -> NDArray:
"""
crossover two genes
:param g1:
:param g2:
:return:
only gene with the same key will be crossover, thus don't need to consider change key
"""
r = np.random.rand()
return np.where(r > 0.5, g1, g2)

View File

@@ -1,114 +0,0 @@
from functools import partial
import numpy as np
from numpy.typing import NDArray
from algorithms.neat.genome.utils import flatten_connections, set_operation_analysis
EMPTY_NODE = np.full((1, 5), np.nan)
EMPTY_CON = np.full((1, 4), np.nan)
def distance(nodes1: NDArray, connections1: NDArray, nodes2: NDArray, connections2: NDArray) -> NDArray:
"""
Calculate the distance between two genomes.
nodes are a 2-d array with shape (N, 5), its columns are [key, bias, response, act, agg]
connections are a 3-d array with shape (2, N, N), axis 0 means [weights, enable]
"""
nd = node_distance(nodes1, nodes2) # node distance
# refactor connections
keys1, keys2 = nodes1[:, 0], nodes2[:, 0]
cons1 = flatten_connections(keys1, connections1)
cons2 = flatten_connections(keys2, connections2)
cd = connection_distance(cons1, cons2) # connection distance
return nd + cd
def node_distance(nodes1, nodes2, disjoint_coe=1., compatibility_coe=0.5):
node_cnt1 = np.sum(~np.isnan(nodes1[:, 0]))
node_cnt2 = np.sum(~np.isnan(nodes2[:, 0]))
max_cnt = np.maximum(node_cnt1, node_cnt2)
nodes = np.concatenate((nodes1, nodes2), axis=0)
keys = nodes[:, 0]
sorted_indices = np.argsort(keys, axis=0)
nodes = nodes[sorted_indices]
nodes = np.concatenate([nodes, EMPTY_NODE], axis=0) # add a nan row to the end
fr, sr = nodes[:-1], nodes[1:] # first row, second row
nan_mask = np.isnan(nodes[:, 0])
intersect_mask = (fr[:, 0] == sr[:, 0]) & ~nan_mask[:-1]
non_homologous_cnt = node_cnt1 + node_cnt2 - 2 * np.sum(intersect_mask)
nd = batch_homologous_node_distance(fr, sr)
nd = np.where(np.isnan(nd), 0, nd)
homologous_distance = np.sum(nd * intersect_mask)
val = non_homologous_cnt * disjoint_coe + homologous_distance * compatibility_coe
if max_cnt == 0: # consider the case that both genome has no gene
return 0
else:
return val / max_cnt
def connection_distance(cons1, cons2, disjoint_coe=1., compatibility_coe=0.5):
con_cnt1 = np.sum(~np.isnan(cons1[:, 2])) # weight is not nan, means the connection exists
con_cnt2 = np.sum(~np.isnan(cons2[:, 2]))
max_cnt = np.maximum(con_cnt1, con_cnt2)
cons = np.concatenate((cons1, cons2), axis=0)
keys = cons[:, :2]
sorted_indices = np.lexsort(keys.T[::-1])
cons = cons[sorted_indices]
cons = np.concatenate([cons, EMPTY_CON], axis=0) # add a nan row to the end
fr, sr = cons[:-1], cons[1:] # first row, second row
# both genome has such connection
intersect_mask = np.all(fr[:, :2] == sr[:, :2], axis=1) & ~np.isnan(fr[:, 2]) & ~np.isnan(sr[:, 2])
non_homologous_cnt = con_cnt1 + con_cnt2 - 2 * np.sum(intersect_mask)
cd = batch_homologous_connection_distance(fr, sr)
cd = np.where(np.isnan(cd), 0, cd)
homologous_distance = np.sum(cd * intersect_mask)
val = non_homologous_cnt * disjoint_coe + homologous_distance * compatibility_coe
if max_cnt == 0: # consider the case that both genome has no gene
return 0
else:
return val / max_cnt
def batch_homologous_node_distance(b_n1, b_n2):
res = []
for n1, n2 in zip(b_n1, b_n2):
d = homologous_node_distance(n1, n2)
res.append(d)
return np.stack(res, axis=0)
def batch_homologous_connection_distance(b_c1, b_c2):
res = []
for c1, c2 in zip(b_c1, b_c2):
d = homologous_connection_distance(c1, c2)
res.append(d)
return np.stack(res, axis=0)
def homologous_node_distance(n1, n2):
d = 0
d += np.abs(n1[1] - n2[1]) # bias
d += np.abs(n1[2] - n2[2]) # response
d += n1[3] != n2[3] # activation
d += n1[4] != n2[4]
return d
def homologous_connection_distance(c1, c2):
d = 0
d += np.abs(c1[2] - c2[2]) # weight
d += c1[3] != c2[3] # enable
return d

View File

@@ -1,150 +0,0 @@
from functools import partial
import numpy as np
from numpy.typing import NDArray
from .aggregations import agg
from .activations import act
from .graph import topological_sort, batch_topological_sort
from .utils import I_INT
def create_forward_function(nodes: NDArray, connections: NDArray,
N: int, input_idx: NDArray, output_idx: NDArray, batch: bool):
"""
create forward function for different situations
:param nodes: shape (N, 5) or (pop_size, N, 5)
:param connections: shape (2, N, N) or (pop_size, 2, N, N)
:param N:
:param input_idx:
:param output_idx:
:param batch: using batch or not
:param debug: debug mode
:return:
"""
if nodes.ndim == 2: # single genome
cal_seqs = topological_sort(nodes, connections)
if not batch:
return lambda inputs: forward_single(inputs, N, input_idx, output_idx,
cal_seqs, nodes, connections)
else:
return lambda batch_inputs: forward_batch(batch_inputs, N, input_idx, output_idx,
cal_seqs, nodes, connections)
elif nodes.ndim == 3: # pop genome
pop_cal_seqs = batch_topological_sort(nodes, connections)
if not batch:
return lambda inputs: pop_forward_single(inputs, N, input_idx, output_idx,
pop_cal_seqs, nodes, connections)
else:
return lambda batch_inputs: pop_forward_batch(batch_inputs, N, input_idx, output_idx,
pop_cal_seqs, nodes, connections)
else:
raise ValueError(f"nodes.ndim should be 2 or 3, but got {nodes.ndim}")
def forward_single(inputs: NDArray, N: int, input_idx: NDArray, output_idx: NDArray,
cal_seqs: NDArray, nodes: NDArray, connections: NDArray) -> NDArray:
"""
jax forward for single input shaped (input_num, )
nodes, connections are single genome
:argument inputs: (input_num, )
:argument N: int
:argument input_idx: (input_num, )
:argument output_idx: (output_num, )
:argument cal_seqs: (N, )
:argument nodes: (N, 5)
:argument connections: (2, N, N)
:return (output_num, )
"""
ini_vals = np.full((N,), np.nan)
ini_vals[input_idx] = inputs
for i in cal_seqs:
if i in input_idx:
continue
if i == I_INT:
break
ins = ini_vals * connections[0, :, i]
z = agg(nodes[i, 4], ins)
z = z * nodes[i, 2] + nodes[i, 1]
z = act(nodes[i, 3], z)
# for some nodes (inputs nodes), the output z will be nan, thus we do not update the vals
ini_vals[i] = z
return ini_vals[output_idx]
def forward_batch(batch_inputs: NDArray, N: int, input_idx: NDArray, output_idx: NDArray,
cal_seqs: NDArray, nodes: NDArray, connections: NDArray) -> NDArray:
"""
jax forward for batch_inputs shaped (batch_size, input_num)
nodes, connections are single genome
:argument batch_inputs: (batch_size, input_num)
:argument N: int
:argument input_idx: (input_num, )
:argument output_idx: (output_num, )
:argument cal_seqs: (N, )
:argument nodes: (N, 5)
:argument connections: (2, N, N)
:return (batch_size, output_num)
"""
res = []
for inputs in batch_inputs:
out = forward_single(inputs, N, input_idx, output_idx, cal_seqs, nodes, connections)
res.append(out)
return np.stack(res, axis=0)
def pop_forward_single(inputs: NDArray, N: int, input_idx: NDArray, output_idx: NDArray,
pop_cal_seqs: NDArray, pop_nodes: NDArray, pop_connections: NDArray) -> NDArray:
"""
jax forward for single input shaped (input_num, )
pop_nodes, pop_connections are population of genomes
:argument inputs: (input_num, )
:argument N: int
:argument input_idx: (input_num, )
:argument output_idx: (output_num, )
:argument pop_cal_seqs: (pop_size, N)
:argument pop_nodes: (pop_size, N, 5)
:argument pop_connections: (pop_size, 2, N, N)
:return (pop_size, output_num)
"""
res = []
for cal_seqs, nodes, connections in zip(pop_cal_seqs, pop_nodes, pop_connections):
out = forward_single(inputs, N, input_idx, output_idx, cal_seqs, nodes, connections)
res.append(out)
return np.stack(res, axis=0)
def pop_forward_batch(batch_inputs: NDArray, N: int, input_idx: NDArray, output_idx: NDArray,
pop_cal_seqs: NDArray, pop_nodes: NDArray, pop_connections: NDArray) -> NDArray:
"""
jax forward for batch input shaped (batch, input_num)
pop_nodes, pop_connections are population of genomes
:argument batch_inputs: (batch_size, input_num)
:argument N: int
:argument input_idx: (input_num, )
:argument output_idx: (output_num, )
:argument pop_cal_seqs: (pop_size, N)
:argument pop_nodes: (pop_size, N, 5)
:argument pop_connections: (pop_size, 2, N, N)
:return (pop_size, batch_size, output_num)
"""
res = []
for cal_seqs, nodes, connections in zip(pop_cal_seqs, pop_nodes, pop_connections):
out = forward_batch(batch_inputs, N, input_idx, output_idx, cal_seqs, nodes, connections)
res.append(out)
return np.stack(res, axis=0)

View File

@@ -1,275 +0,0 @@
"""
Vectorization of genome representation.
Utilizes Tuple[nodes: NDArray, connections: NDArray] to encode the genome, where:
1. N is a pre-set value that determines the maximum number of nodes in the network, and will increase if the genome becomes
too large to be represented by the current value of N.
2. nodes is an array of shape (N, 5), dtype=float, with columns corresponding to: key, bias, response, activation function
(act), and aggregation function (agg).
3. connections is an array of shape (2, N, N), dtype=float, with the first axis representing weight and connection enabled
status.
Empty nodes or connections are represented using np.nan.
"""
from typing import Tuple, Dict
from functools import partial
import numpy as np
from numpy.typing import NDArray
from algorithms.neat.genome.utils import fetch_first
EMPTY_NODE = np.array([np.nan, np.nan, np.nan, np.nan, np.nan])
def create_initialize_function(config):
pop_size = config.neat.population.pop_size
N = config.basic.init_maximum_nodes
num_inputs = config.basic.num_inputs
num_outputs = config.basic.num_outputs
default_bias = config.neat.gene.bias.init_mean
default_response = config.neat.gene.response.init_mean
# default_act = config.neat.gene.activation.default
# default_agg = config.neat.gene.aggregation.default
default_act = 0
default_agg = 0
default_weight = config.neat.gene.weight.init_mean
return partial(initialize_genomes, pop_size, N, num_inputs, num_outputs, default_bias, default_response,
default_act, default_agg, default_weight)
def initialize_genomes(pop_size: int,
N: int,
num_inputs: int, num_outputs: int,
default_bias: float = 0.0,
default_response: float = 1.0,
default_act: int = 0,
default_agg: int = 0,
default_weight: float = 1.0) \
-> Tuple[NDArray, NDArray, NDArray, NDArray]:
"""
Initialize genomes with default values.
Args:
pop_size (int): Number of genomes to initialize.
N (int): Maximum number of nodes in the network.
num_inputs (int): Number of input nodes.
num_outputs (int): Number of output nodes.
default_bias (float, optional): Default bias value for output nodes. Defaults to 0.0.
default_response (float, optional): Default response value for output nodes. Defaults to 1.0.
default_act (int, optional): Default activation function index for output nodes. Defaults to 1.
default_agg (int, optional): Default aggregation function index for output nodes. Defaults to 0.
default_weight (float, optional): Default weight value for connections. Defaults to 0.0.
Raises:
AssertionError: If the sum of num_inputs, num_outputs, and 1 is greater than N.
Returns:
Tuple[NDArray, NDArray, NDArray, NDArray]: pop_nodes, pop_connections, input_idx, and output_idx arrays.
"""
# Reserve one row for potential mutation adding an extra node
assert num_inputs + num_outputs + 1 <= N, f"Too small N: {N} for input_size: " \
f"{num_inputs} and output_size: {num_outputs}!"
pop_nodes = np.full((pop_size, N, 5), np.nan)
pop_connections = np.full((pop_size, 2, N, N), np.nan)
input_idx = np.arange(num_inputs)
output_idx = np.arange(num_inputs, num_inputs + num_outputs)
pop_nodes[:, input_idx, 0] = input_idx
pop_nodes[:, output_idx, 0] = output_idx
pop_nodes[:, output_idx, 1] = default_bias
pop_nodes[:, output_idx, 2] = default_response
pop_nodes[:, output_idx, 3] = default_act
pop_nodes[:, output_idx, 4] = default_agg
for i in input_idx:
for j in output_idx:
pop_connections[:, 0, i, j] = default_weight
pop_connections[:, 1, i, j] = 1
return pop_nodes, pop_connections, input_idx, output_idx
def expand(pop_nodes: NDArray, pop_connections: NDArray, new_N: int) -> Tuple[NDArray, NDArray]:
"""
Expand the genome to accommodate more nodes.
:param pop_nodes: (pop_size, N, 5)
:param pop_connections: (pop_size, 2, N, N)
:param new_N:
:return:
"""
pop_size, old_N = pop_nodes.shape[0], pop_nodes.shape[1]
new_pop_nodes = np.full((pop_size, new_N, 5), np.nan)
new_pop_nodes[:, :old_N, :] = pop_nodes
new_pop_connections = np.full((pop_size, 2, new_N, new_N), np.nan)
new_pop_connections[:, :, :old_N, :old_N] = pop_connections
return new_pop_nodes, new_pop_connections
def expand_single(nodes: NDArray, connections: NDArray, new_N: int) -> Tuple[NDArray, NDArray]:
"""
Expand a single genome to accommodate more nodes.
:param nodes: (N, 5)
:param connections: (2, N, N)
:param new_N:
:return:
"""
old_N = nodes.shape[0]
new_nodes = np.full((new_N, 5), np.nan)
new_nodes[:old_N, :] = nodes
new_connections = np.full((2, new_N, new_N), np.nan)
new_connections[:, :old_N, :old_N] = connections
return new_nodes, new_connections
def analysis(nodes: NDArray, connections: NDArray, input_keys, output_keys) -> \
Tuple[Dict[int, Tuple[float, float, int, int]], Dict[Tuple[int, int], Tuple[float, bool]]]:
"""
Convert a genome from array to dict.
:param nodes: (N, 5)
:param connections: (2, N, N)
:param output_keys:
:param input_keys:
:return: nodes_dict[key: (bias, response, act, agg)], connections_dict[(f_key, t_key): (weight, enabled)]
"""
# update nodes_dict
try:
nodes_dict = {}
idx2key = {}
for i, node in enumerate(nodes):
if np.isnan(node[0]):
continue
key = int(node[0])
assert key not in nodes_dict, f"Duplicate node key: {key}!"
bias = node[1] if not np.isnan(node[1]) else None
response = node[2] if not np.isnan(node[2]) else None
act = node[3] if not np.isnan(node[3]) else None
agg = node[4] if not np.isnan(node[4]) else None
nodes_dict[key] = (bias, response, act, agg)
idx2key[i] = key
# check nodes_dict
for i in input_keys:
assert i in nodes_dict, f"Input node {i} not found in nodes_dict!"
bias, response, act, agg = nodes_dict[i]
assert bias is None and response is None and act is None and agg is None, \
f"Input node {i} must has None bias, response, act, or agg!"
for o in output_keys:
assert o in nodes_dict, f"Output node {o} not found in nodes_dict!"
for k, v in nodes_dict.items():
if k not in input_keys:
bias, response, act, agg = v
assert bias is not None and response is not None and act is not None and agg is not None, \
f"Normal node {k} must has non-None bias, response, act, or agg!"
# update connections
connections_dict = {}
for i in range(connections.shape[1]):
for j in range(connections.shape[2]):
if np.isnan(connections[0, i, j]) and np.isnan(connections[1, i, j]):
continue
assert i in idx2key, f"Node index {i} not found in idx2key:{idx2key}!"
assert j in idx2key, f"Node index {j} not found in idx2key:{idx2key}!"
key = (idx2key[i], idx2key[j])
weight = connections[0, i, j] if not np.isnan(connections[0, i, j]) else None
enabled = (connections[1, i, j] == 1) if not np.isnan(connections[1, i, j]) else None
assert weight is not None, f"Connection {key} must has non-None weight!"
assert enabled is not None, f"Connection {key} must has non-None enabled!"
connections_dict[key] = (weight, enabled)
return nodes_dict, connections_dict
except AssertionError:
print(nodes)
print(connections)
raise AssertionError
def pop_analysis(pop_nodes, pop_connections, input_keys, output_keys):
res = []
total_nodes, total_connections = 0, 0
for nodes, connections in zip(pop_nodes, pop_connections):
nodes, connections = analysis(nodes, connections, input_keys, output_keys)
res.append((nodes, connections))
total_nodes += len(nodes)
total_connections += len(connections)
print(total_nodes - 200, total_connections)
return res
def add_node(new_node_key: int, nodes: NDArray, connections: NDArray,
bias: float = 0.0, response: float = 1.0, act: int = 0, agg: int = 0) -> Tuple[NDArray, NDArray]:
"""
add a new node to the genome.
"""
exist_keys = nodes[:, 0]
idx = fetch_first(np.isnan(exist_keys))
nodes[idx] = np.array([new_node_key, bias, response, act, agg])
return nodes, connections
def delete_node(node_key: int, nodes: NDArray, connections: NDArray) -> Tuple[NDArray, NDArray]:
"""
delete a node from the genome. only delete the node, regardless of connections.
"""
node_keys = nodes[:, 0]
idx = fetch_first(node_keys == node_key)
return delete_node_by_idx(idx, nodes, connections)
def delete_node_by_idx(idx: int, nodes: NDArray, connections: NDArray) -> Tuple[NDArray, NDArray]:
"""
delete a node from the genome. only delete the node, regardless of connections.
"""
nodes[idx] = EMPTY_NODE
return nodes, connections
def add_connection(from_node: int, to_node: int, nodes: NDArray, connections: NDArray,
weight: float = 0.0, enabled: bool = True) -> Tuple[NDArray, NDArray]:
"""
add a new connection to the genome.
"""
node_keys = nodes[:, 0]
from_idx = fetch_first(node_keys == from_node)
to_idx = fetch_first(node_keys == to_node)
return add_connection_by_idx(from_idx, to_idx, nodes, connections, weight, enabled)
def add_connection_by_idx(from_idx: int, to_idx: int, nodes: NDArray, connections: NDArray,
weight: float = 0.0, enabled: bool = True) -> Tuple[NDArray, NDArray]:
"""
add a new connection to the genome.
"""
connections[:, from_idx, to_idx] = np.array([weight, enabled])
return nodes, connections
def delete_connection(from_node: int, to_node: int, nodes: NDArray, connections: NDArray) -> Tuple[NDArray, NDArray]:
"""
delete a connection from the genome.
"""
node_keys = nodes[:, 0]
from_idx = fetch_first(node_keys == from_node)
to_idx = fetch_first(node_keys == to_node)
return delete_connection_by_idx(from_idx, to_idx, nodes, connections)
def delete_connection_by_idx(from_idx: int, to_idx: int, nodes: NDArray, connections: NDArray) -> Tuple[
NDArray, NDArray]:
"""
delete a connection from the genome.
"""
connections[:, from_idx, to_idx] = np.nan
return nodes, connections

View File

@@ -1,163 +0,0 @@
"""
Some graph algorithms implemented in jax.
Only used in feed-forward networks.
"""
import numpy as np
from numpy.typing import NDArray
# from .utils import fetch_first, I_INT
from algorithms.neat.genome.utils import fetch_first, I_INT
def topological_sort(nodes: NDArray, connections: NDArray) -> NDArray:
"""
a jit-able version of topological_sort! that's crazy!
:param nodes: nodes array
:param connections: connections array
:return: topological sorted sequence
Example:
nodes = np.array([
[0],
[1],
[2],
[3]
])
connections = np.array([
[
[0, 0, 1, 0],
[0, 0, 1, 1],
[0, 0, 0, 1],
[0, 0, 0, 0]
],
[
[0, 0, 1, 0],
[0, 0, 1, 1],
[0, 0, 0, 1],
[0, 0, 0, 0]
]
])
topological_sort(nodes, connections) -> [0, 1, 2, 3]
"""
connections_enable = connections[1, :, :] == 1
in_degree = np.where(np.isnan(nodes[:, 0]), np.nan, np.sum(connections_enable, axis=0))
res = np.full(in_degree.shape, I_INT)
idx = 0
for _ in range(in_degree.shape[0]):
i = fetch_first(in_degree == 0.)
if i == I_INT:
break
res[idx] = i
idx += 1
in_degree[i] = -1
children = connections_enable[i, :]
in_degree = np.where(children, in_degree - 1, in_degree)
return res
def batch_topological_sort(pop_nodes: NDArray, pop_connections: NDArray) -> NDArray:
"""
batch version of topological_sort
:param pop_nodes:
:param pop_connections:
:return:
"""
res = []
for nodes, connections in zip(pop_nodes, pop_connections):
seq = topological_sort(nodes, connections)
res.append(seq)
return np.stack(res, axis=0)
def check_cycles(nodes: NDArray, connections: NDArray, from_idx: NDArray, to_idx: NDArray) -> NDArray:
"""
Check whether a new connection (from_idx -> to_idx) will cause a cycle.
:param nodes: JAX array
The array of nodes.
:param connections: JAX array
The array of connections.
:param from_idx: int
The index of the starting node.
:param to_idx: int
The index of the ending node.
:return: JAX array
An array indicating if there is a cycle caused by the new connection.
Example:
nodes = np.array([
[0],
[1],
[2],
[3]
])
connections = np.array([
[
[0, 0, 1, 0],
[0, 0, 1, 1],
[0, 0, 0, 1],
[0, 0, 0, 0]
],
[
[0, 0, 1, 0],
[0, 0, 1, 1],
[0, 0, 0, 1],
[0, 0, 0, 0]
]
])
check_cycles(nodes, connections, 3, 2) -> True
check_cycles(nodes, connections, 2, 3) -> False
check_cycles(nodes, connections, 0, 3) -> False
check_cycles(nodes, connections, 1, 0) -> False
"""
connections_enable = ~np.isnan(connections[0, :, :])
connections_enable[from_idx, to_idx] = True
nodes_visited = np.full(nodes.shape[0], False)
nodes_visited[to_idx] = True
for _ in range(nodes_visited.shape[0]):
new_visited = np.dot(nodes_visited, connections_enable)
nodes_visited = np.logical_or(nodes_visited, new_visited)
return nodes_visited[from_idx]
if __name__ == '__main__':
nodes = np.array([
[0],
[1],
[2],
[3],
[np.nan]
])
connections = np.array([
[
[np.nan, np.nan, 1, np.nan, np.nan],
[np.nan, np.nan, 1, 1, np.nan],
[np.nan, np.nan, np.nan, 1, np.nan],
[np.nan, np.nan, np.nan, np.nan, np.nan],
[np.nan, np.nan, np.nan, np.nan, np.nan]
],
[
[np.nan, np.nan, 1, np.nan, np.nan],
[np.nan, np.nan, 1, 1, np.nan],
[np.nan, np.nan, np.nan, 1, np.nan],
[np.nan, np.nan, np.nan, np.nan, np.nan],
[np.nan, np.nan, np.nan, np.nan, np.nan]
]
]
)
print(topological_sort(nodes, connections))
print(topological_sort(nodes, connections))
print(check_cycles(nodes, connections, 3, 2))
print(check_cycles(nodes, connections, 2, 3))
print(check_cycles(nodes, connections, 0, 3))
print(check_cycles(nodes, connections, 1, 0))

View File

@@ -1,545 +0,0 @@
from typing import Tuple
from functools import partial
import numpy as np
from numpy.typing import NDArray
from numpy.random import rand
from .utils import fetch_random, fetch_first, I_INT
from .genome import add_node, add_connection_by_idx, delete_node_by_idx, delete_connection_by_idx
from .graph import check_cycles
add_node_cnt, delete_node_cnt, add_connection_cnt, delete_connection_cnt = 0, 0, 0, 0
def create_mutate_function(config, input_keys, output_keys, batch: bool):
"""
create mutate function for different situations
:param output_keys:
:param input_keys:
:param config:
:param batch: mutate for population or not
:return:
"""
bias = config.neat.gene.bias
bias_default = bias.init_mean
bias_mean = bias.init_mean
bias_std = bias.init_stdev
bias_mutate_strength = bias.mutate_power
bias_mutate_rate = bias.mutate_rate
bias_replace_rate = bias.replace_rate
response = config.neat.gene.response
response_default = response.init_mean
response_mean = response.init_mean
response_std = response.init_stdev
response_mutate_strength = response.mutate_power
response_mutate_rate = response.mutate_rate
response_replace_rate = response.replace_rate
weight = config.neat.gene.weight
weight_mean = weight.init_mean
weight_std = weight.init_stdev
weight_mutate_strength = weight.mutate_power
weight_mutate_rate = weight.mutate_rate
weight_replace_rate = weight.replace_rate
activation = config.neat.gene.activation
# act_default = activation.default
act_default = 0
act_range = len(activation.options)
act_replace_rate = activation.mutate_rate
aggregation = config.neat.gene.aggregation
# agg_default = aggregation.default
agg_default = 0
agg_range = len(aggregation.options)
agg_replace_rate = aggregation.mutate_rate
enabled = config.neat.gene.enabled
enabled_reverse_rate = enabled.mutate_rate
genome = config.neat.genome
add_node_rate = genome.node_add_prob
delete_node_rate = genome.node_delete_prob
add_connection_rate = genome.conn_add_prob
delete_connection_rate = genome.conn_delete_prob
single_structure_mutate = genome.single_structural_mutation
mutate_func = lambda nodes, connections, new_node_key: \
mutate(nodes, connections, new_node_key, input_keys, output_keys,
bias_default, bias_mean, bias_std, bias_mutate_strength, bias_mutate_rate,
bias_replace_rate, response_default, response_mean, response_std,
response_mutate_strength, response_mutate_rate, response_replace_rate,
weight_mean, weight_std, weight_mutate_strength, weight_mutate_rate,
weight_replace_rate, act_default, act_range, act_replace_rate,
agg_default, agg_range, agg_replace_rate, enabled_reverse_rate,
add_node_rate, delete_node_rate, add_connection_rate, delete_connection_rate,
single_structure_mutate)
if not batch:
return mutate_func
else:
def batch_mutate_func(pop_nodes, pop_connections, new_node_keys):
global add_node_cnt, delete_node_cnt, add_connection_cnt, delete_connection_cnt
add_node_cnt, delete_node_cnt, add_connection_cnt, delete_connection_cnt = 0, 0, 0, 0
res_nodes, res_connections = [], []
for nodes, connections, new_node_key in zip(pop_nodes, pop_connections, new_node_keys):
nodes, connections = mutate_func(nodes, connections, new_node_key)
res_nodes.append(nodes)
res_connections.append(connections)
# print(f"add_node_cnt: {add_node_cnt}, delete_node_cnt: {delete_node_cnt}, "
# f"add_connection_cnt: {add_connection_cnt}, delete_connection_cnt: {delete_connection_cnt}")
return np.stack(res_nodes, axis=0), np.stack(res_connections, axis=0)
return batch_mutate_func
def mutate(nodes: NDArray,
connections: NDArray,
new_node_key: int,
input_keys: NDArray,
output_keys: NDArray,
bias_default: float = 0,
bias_mean: float = 0,
bias_std: float = 1,
bias_mutate_strength: float = 0.5,
bias_mutate_rate: float = 0.7,
bias_replace_rate: float = 0.1,
response_default: float = 1,
response_mean: float = 1.,
response_std: float = 0.,
response_mutate_strength: float = 0.,
response_mutate_rate: float = 0.,
response_replace_rate: float = 0.,
weight_mean: float = 0.,
weight_std: float = 1.,
weight_mutate_strength: float = 0.5,
weight_mutate_rate: float = 0.7,
weight_replace_rate: float = 0.1,
act_default: int = 0,
act_range: int = 5,
act_replace_rate: float = 0.1,
agg_default: int = 0,
agg_range: int = 5,
agg_replace_rate: float = 0.1,
enabled_reverse_rate: float = 0.1,
add_node_rate: float = 0.2,
delete_node_rate: float = 0.2,
add_connection_rate: float = 0.4,
delete_connection_rate: float = 0.4,
single_structure_mutate: bool = True):
"""
:param output_keys:
:param input_keys:
:param agg_default:
:param act_default:
:param response_default:
:param bias_default:
:param nodes: (N, 5)
:param connections: (2, N, N)
:param new_node_key:
:param bias_mean:
:param bias_std:
:param bias_mutate_strength:
:param bias_mutate_rate:
:param bias_replace_rate:
:param response_mean:
:param response_std:
:param response_mutate_strength:
:param response_mutate_rate:
:param response_replace_rate:
:param weight_mean:
:param weight_std:
:param weight_mutate_strength:
:param weight_mutate_rate:
:param weight_replace_rate:
:param act_range:
:param act_replace_rate:
:param agg_range:
:param agg_replace_rate:
:param enabled_reverse_rate:
:param add_node_rate:
:param delete_node_rate:
:param add_connection_rate:
:param delete_connection_rate:
:param single_structure_mutate: a genome is structurally mutate at most once
:return:
"""
global add_node_cnt, delete_node_cnt, add_connection_cnt, delete_connection_cnt
# mutate_structure
def nothing(n, c):
return n, c
def m_add_node(n, c):
return mutate_add_node(new_node_key, n, c, bias_default, response_default, act_default, agg_default)
def m_delete_node(n, c):
return mutate_delete_node(n, c, input_keys, output_keys)
def m_add_connection(n, c):
return mutate_add_connection(n, c, input_keys, output_keys)
def m_delete_connection(n, c):
return mutate_delete_connection(n, c)
if single_structure_mutate:
d = np.maximum(1, add_node_rate + delete_node_rate + add_connection_rate + delete_connection_rate)
# shorten variable names for beauty
anr, dnr = add_node_rate / d, delete_node_rate / d
acr, dcr = add_connection_rate / d, delete_connection_rate / d
r = rand()
if r <= anr:
nodes, connections = m_add_node(nodes, connections)
elif r <= anr + dnr:
nodes, connections = m_delete_node(nodes, connections)
elif r <= anr + dnr + acr:
nodes, connections = m_add_connection(nodes, connections)
elif r <= anr + dnr + acr + dcr:
nodes, connections = m_delete_connection(nodes, connections)
else:
pass # do nothing
else:
# mutate add node
if rand() < add_node_rate:
nodes, connections = m_add_node(nodes, connections)
add_node_cnt += 1
# mutate delete node
if rand() < delete_node_rate:
nodes, connections = m_delete_node(nodes, connections)
delete_node_cnt += 1
# mutate add connection
if rand() < add_connection_rate:
nodes, connections = m_add_connection(nodes, connections)
add_connection_cnt += 1
# mutate delete connection
if rand() < delete_connection_rate:
nodes, connections = m_delete_connection(nodes, connections)
delete_connection_cnt += 1
nodes, connections = mutate_values(nodes, connections, bias_mean, bias_std, bias_mutate_strength,
bias_mutate_rate, bias_replace_rate, response_mean, response_std,
response_mutate_strength, response_mutate_rate, response_replace_rate,
weight_mean, weight_std, weight_mutate_strength,
weight_mutate_rate, weight_replace_rate, act_range, act_replace_rate, agg_range,
agg_replace_rate, enabled_reverse_rate)
# print(add_node_cnt, delete_node_cnt, add_connection_cnt, delete_connection_cnt)
return nodes, connections
def mutate_values(nodes: NDArray,
connections: NDArray,
bias_mean: float = 0,
bias_std: float = 1,
bias_mutate_strength: float = 0.5,
bias_mutate_rate: float = 0.7,
bias_replace_rate: float = 0.1,
response_mean: float = 1.,
response_std: float = 0.,
response_mutate_strength: float = 0.,
response_mutate_rate: float = 0.,
response_replace_rate: float = 0.,
weight_mean: float = 0.,
weight_std: float = 1.,
weight_mutate_strength: float = 0.5,
weight_mutate_rate: float = 0.7,
weight_replace_rate: float = 0.1,
act_range: int = 5,
act_replace_rate: float = 0.1,
agg_range: int = 5,
agg_replace_rate: float = 0.1,
enabled_reverse_rate: float = 0.1) -> Tuple[NDArray, NDArray]:
"""
Mutate values of nodes and connections.
Args:
nodes: A 2D array representing nodes.
connections: A 3D array representing connections.
bias_mean: Mean of the bias values.
bias_std: Standard deviation of the bias values.
bias_mutate_strength: Strength of the bias mutation.
bias_mutate_rate: Rate of the bias mutation.
bias_replace_rate: Rate of the bias replacement.
response_mean: Mean of the response values.
response_std: Standard deviation of the response values.
response_mutate_strength: Strength of the response mutation.
response_mutate_rate: Rate of the response mutation.
response_replace_rate: Rate of the response replacement.
weight_mean: Mean of the weight values.
weight_std: Standard deviation of the weight values.
weight_mutate_strength: Strength of the weight mutation.
weight_mutate_rate: Rate of the weight mutation.
weight_replace_rate: Rate of the weight replacement.
act_range: Range of the activation function values.
act_replace_rate: Rate of the activation function replacement.
agg_range: Range of the aggregation function values.
agg_replace_rate: Rate of the aggregation function replacement.
enabled_reverse_rate: Rate of reversing enabled state of connections.
Returns:
A tuple containing mutated nodes and connections.
"""
bias_new = mutate_float_values(nodes[:, 1], bias_mean, bias_std,
bias_mutate_strength, bias_mutate_rate, bias_replace_rate)
response_new = mutate_float_values(nodes[:, 2], response_mean, response_std,
response_mutate_strength, response_mutate_rate, response_replace_rate)
weight_new = mutate_float_values(connections[0, :, :], weight_mean, weight_std,
weight_mutate_strength, weight_mutate_rate, weight_replace_rate)
act_new = mutate_int_values(nodes[:, 3], act_range, act_replace_rate)
agg_new = mutate_int_values(nodes[:, 4], agg_range, agg_replace_rate)
# refactor enabled
r = np.random.rand(*connections[1, :, :].shape)
enabled_new = connections[1, :, :] == 1
enabled_new = np.where(r < enabled_reverse_rate, ~enabled_new, enabled_new)
enabled_new = np.where(~np.isnan(connections[0, :, :]), enabled_new, np.nan)
nodes[:, 1] = bias_new
nodes[:, 2] = response_new
nodes[:, 3] = act_new
nodes[:, 4] = agg_new
connections[0, :, :] = weight_new
connections[1, :, :] = enabled_new
return nodes, connections
def mutate_float_values(old_vals: NDArray, mean: float, std: float,
mutate_strength: float, mutate_rate: float, replace_rate: float) -> NDArray:
"""
Mutate float values of a given array.
Args:
old_vals: A 1D array of float values to be mutated.
mean: Mean of the values.
std: Standard deviation of the values.
mutate_strength: Strength of the mutation.
mutate_rate: Rate of the mutation.
replace_rate: Rate of the replacement.
Returns:
A mutated 1D array of float values.
"""
noise = np.random.normal(size=old_vals.shape) * mutate_strength
replace = np.random.normal(size=old_vals.shape) * std + mean
r = rand(*old_vals.shape)
new_vals = old_vals
new_vals = np.where(r <= mutate_rate, new_vals + noise, new_vals)
new_vals = np.where(
(mutate_rate < r) & (r <= mutate_rate + replace_rate),
replace,
new_vals
)
new_vals = np.where(~np.isnan(old_vals), new_vals, np.nan)
return new_vals
def mutate_int_values(old_vals: NDArray, range: int, replace_rate: float) -> NDArray:
"""
Mutate integer values (act, agg) of a given array.
Args:
old_vals: A 1D array of integer values to be mutated.
range: Range of the integer values.
replace_rate: Rate of the replacement.
Returns:
A mutated 1D array of integer values.
"""
replace_val = np.random.randint(low=0, high=range, size=old_vals.shape)
r = np.random.rand(*old_vals.shape)
new_vals = old_vals
new_vals = np.where(r < replace_rate, replace_val, new_vals)
new_vals = np.where(~np.isnan(old_vals), new_vals, np.nan)
return new_vals
def mutate_add_node(new_node_key: int, nodes: NDArray, connections: NDArray,
default_bias: float = 0, default_response: float = 1,
default_act: int = 0, default_agg: int = 0) -> Tuple[NDArray, NDArray]:
"""
Randomly add a new node from splitting a connection.
:param new_node_key:
:param nodes:
:param connections:
:param default_bias:
:param default_response:
:param default_act:
:param default_agg:
:return:
"""
# randomly choose a connection
from_key, to_key, from_idx, to_idx = choice_connection_key(nodes, connections)
def nothing():
return nodes, connections
def successful_add_node():
# disable the connection
new_nodes, new_connections = nodes, connections
new_connections[1, from_idx, to_idx] = False
# add a new node
new_nodes, new_connections = \
add_node(new_node_key, new_nodes, new_connections,
bias=default_bias, response=default_response, act=default_act, agg=default_agg)
new_idx = fetch_first(new_nodes[:, 0] == new_node_key)
# add two new connections
weight = new_connections[0, from_idx, to_idx]
new_nodes, new_connections = add_connection_by_idx(from_idx, new_idx,
new_nodes, new_connections, weight=0, enabled=True)
new_nodes, new_connections = add_connection_by_idx(new_idx, to_idx,
new_nodes, new_connections, weight=weight, enabled=True)
return new_nodes, new_connections
# if from_idx == I_INT, that means no connection exist, do nothing
if from_idx == I_INT:
nodes, connections = nothing()
else:
nodes, connections = successful_add_node()
return nodes, connections
def mutate_delete_node(nodes: NDArray, connections: NDArray,
input_keys: NDArray, output_keys: NDArray) -> Tuple[NDArray, NDArray]:
"""
Randomly delete a node. Input and output nodes are not allowed to be deleted.
:param nodes:
:param connections:
:param input_keys:
:param output_keys:
:return:
"""
# randomly choose a node
node_key, node_idx = choice_node_key(nodes, input_keys, output_keys,
allow_input_keys=False, allow_output_keys=False)
if node_idx == I_INT:
return nodes, connections
# delete the node
aux_nodes, aux_connections = delete_node_by_idx(node_idx, nodes, connections)
# delete connections
aux_connections[:, node_idx, :] = np.nan
aux_connections[:, :, node_idx] = np.nan
return aux_nodes, aux_connections
def mutate_add_connection(nodes: NDArray, connections: NDArray,
input_keys: NDArray, output_keys: NDArray) -> Tuple[NDArray, NDArray]:
"""
Randomly add a new connection. The output node is not allowed to be an input node. If in feedforward networks,
cycles are not allowed.
:param nodes:
:param connections:
:param input_keys:
:param output_keys:
:return:
"""
# randomly choose two nodes
from_key, from_idx = choice_node_key(nodes, input_keys, output_keys,
allow_input_keys=True, allow_output_keys=True)
to_key, to_idx = choice_node_key(nodes, input_keys, output_keys,
allow_input_keys=False, allow_output_keys=True)
is_already_exist = ~np.isnan(connections[0, from_idx, to_idx])
if is_already_exist:
connections[1, from_idx, to_idx] = True
return nodes, connections
elif check_cycles(nodes, connections, from_idx, to_idx):
return nodes, connections
else:
new_nodes, new_connections = add_connection_by_idx(from_idx, to_idx, nodes, connections)
return new_nodes, new_connections
def mutate_delete_connection(nodes: NDArray, connections: NDArray):
"""
Randomly delete a connection.
:param nodes:
:param connections:
:return:
"""
from_key, to_key, from_idx, to_idx = choice_connection_key(nodes, connections)
def nothing():
return nodes, connections
def successfully_delete_connection():
return delete_connection_by_idx(from_idx, to_idx, nodes, connections)
if from_idx == I_INT:
nodes, connections = nothing()
else:
nodes, connections = successfully_delete_connection()
return nodes, connections
def choice_node_key(nodes: NDArray,
input_keys: NDArray, output_keys: NDArray,
allow_input_keys: bool = False, allow_output_keys: bool = False) -> Tuple[NDArray, NDArray]:
"""
Randomly choose a node key from the given nodes. It guarantees that the chosen node not be the input or output node.
:param nodes:
:param input_keys:
:param output_keys:
:param allow_input_keys:
:param allow_output_keys:
:return: return its key and position(idx)
"""
node_keys = nodes[:, 0]
mask = ~np.isnan(node_keys)
if not allow_input_keys:
mask = np.logical_and(mask, ~np.isin(node_keys, input_keys))
if not allow_output_keys:
mask = np.logical_and(mask, ~np.isin(node_keys, output_keys))
idx = fetch_random(mask)
if idx == I_INT:
return np.nan, idx
else:
return node_keys[idx], idx
def choice_connection_key(nodes: NDArray, connection: NDArray) -> Tuple[NDArray, NDArray, NDArray, NDArray]:
"""
Randomly choose a connection key from the given connections.
:param nodes:
:param connection:
:return: from_key, to_key, from_idx, to_idx
"""
has_connections_row = np.any(~np.isnan(connection[0, :, :]), axis=1)
from_idx = fetch_random(has_connections_row)
if from_idx == I_INT:
return np.nan, np.nan, from_idx, I_INT
col = connection[0, from_idx, :]
to_idx = fetch_random(~np.isnan(col))
from_key, to_key = nodes[from_idx, 0], nodes[to_idx, 0]
from_key = np.where(from_idx != I_INT, from_key, np.nan)
to_key = np.where(to_idx != I_INT, to_key, np.nan)
return from_key, to_key, from_idx, to_idx

View File

@@ -1,128 +0,0 @@
from functools import partial
from typing import Tuple
import numpy as np
from numpy.typing import NDArray
I_INT = np.iinfo(np.int32).max # infinite int
def flatten_connections(keys, connections):
"""
flatten the (2, N, N) connections to (N * N, 4)
:param keys:
:param connections:
:return:
the first two columns are the index of the node
the 3rd column is the weight, and the 4th column is the enabled status
"""
indices_x, indices_y = np.meshgrid(keys, keys, indexing='ij')
indices = np.stack((indices_x, indices_y), axis=-1).reshape(-1, 2)
# make (2, N, N) to (N, N, 2)
con = np.transpose(connections, (1, 2, 0))
# make (N, N, 2) to (N * N, 2)
con = np.reshape(con, (-1, 2))
con = np.concatenate((indices, con), axis=1)
return con
def unflatten_connections(N, cons):
"""
restore the (N * N, 4) connections to (2, N, N)
:param N:
:param cons:
:return:
"""
cons = cons[:, 2:] # remove the indices
unflatten_cons = np.moveaxis(cons.reshape(N, N, 2), -1, 0)
return unflatten_cons
def set_operation_analysis(ar1: NDArray, ar2: NDArray) -> Tuple[NDArray, NDArray, NDArray]:
"""
Analyze the intersection and union of two arrays by returning their sorted concatenation indices,
intersection mask, and union mask.
:param ar1: JAX array of shape (N, M)
First input array. Should have the same shape as ar2.
:param ar2: JAX array of shape (N, M)
Second input array. Should have the same shape as ar1.
:return: tuple of 3 arrays
- sorted_indices: Indices that would sort the concatenation of ar1 and ar2.
- intersect_mask: A boolean array indicating the positions of the common elements between ar1 and ar2
in the sorted concatenation.
- union_mask: A boolean array indicating the positions of the unique elements in the union of ar1 and ar2
in the sorted concatenation.
Examples:
a = np.array([[1, 2], [3, 4], [5, 6]])
b = np.array([[1, 2], [7, 8], [9, 10]])
sorted_indices, intersect_mask, union_mask = set_operation_analysis(a, b)
sorted_indices -> array([0, 1, 2, 3, 4, 5])
intersect_mask -> array([True, False, False, False, False, False])
union_mask -> array([False, True, True, True, True, True])
"""
ar = np.concatenate((ar1, ar2), axis=0)
sorted_indices = np.lexsort(ar.T[::-1])
aux = ar[sorted_indices]
aux = np.concatenate((aux, np.full((1, ar1.shape[1]), np.nan)), axis=0)
nan_mask = np.any(np.isnan(aux), axis=1)
fr, sr = aux[:-1], aux[1:] # first row, second row
intersect_mask = np.all(fr == sr, axis=1) & ~nan_mask[:-1]
union_mask = np.any(fr != sr, axis=1) & ~nan_mask[:-1]
return sorted_indices, intersect_mask, union_mask
def fetch_first(mask, default=I_INT) -> NDArray:
"""
fetch the first True index
:param mask: array of bool
:param default: the default value if no element satisfying the condition
:return: the index of the first element satisfying the condition. if no element satisfying the condition, return I_INT
example:
>>> a = np.array([1, 2, 3, 4, 5])
>>> fetch_first(a > 3)
3
>>> fetch_first(a > 30)
I_INT
"""
idx = np.argmax(mask)
return np.where(mask[idx], idx, default)
def fetch_last(mask, default=I_INT) -> NDArray:
"""
similar to fetch_first, but fetch the last True index
"""
reversed_idx = fetch_first(mask[::-1], default)
return np.where(reversed_idx == default, default, mask.shape[0] - reversed_idx - 1)
def fetch_random(mask, default=I_INT) -> NDArray:
"""
similar to fetch_first, but fetch a random True index
"""
true_cnt = np.sum(mask)
if true_cnt == 0:
return default
cumsum = np.cumsum(mask)
target = np.random.randint(1, true_cnt + 1, size=())
return fetch_first(cumsum >= target, default)
if __name__ == '__main__':
a = np.array([1, 2, 3, 4, 5])
print(fetch_first(a > 3))
print(fetch_first(a > 30))
print(fetch_last(a > 3))
print(fetch_last(a > 30))
for t in [-1, 0, 1, 2, 3, 4, 5]:
for _ in range(10):
print(t, fetch_random(a > t))

View File

@@ -1,82 +0,0 @@
import numpy as np
from .genome import Genome
from .gene import NodeGene, ConnectionGene
from .feedforward import FeedForwardNetwork
def object2array(genome, N):
"""
convert objective genome to array
:param genome:
:param N: the size of the array
:return: Tuple(Array, Array), represents the nodes and connections array
nodes: shape(N, 5), dtype=float
connections: shape(2, N, N), dtype=float
con[:, i, j] != nan, means there is a connection from i to j
"""
nodes = np.full((N, 5), np.nan)
connections = np.full((2, N, N), np.nan)
assert len(genome.nodes) + len(genome.input_keys) <= N # remain one inf row for mutation adding extra node
idx = 0
n2i = {}
for i in genome.input_keys:
nodes[idx, 0] = i
n2i[i] = idx
idx += 1
for k, v in genome.nodes.items():
nodes[idx, 0] = k
nodes[idx, 1] = v.bias
nodes[idx, 2] = v.response
nodes[idx, 3] = 0
nodes[idx, 4] = 0
n2i[k] = idx
idx += 1
for (f, t), v in genome.connections.items():
f_i, t_i = n2i[f], n2i[t]
connections[0, f_i, t_i] = v.weight
connections[1, f_i, t_i] = v.enabled
return nodes, connections
def array2object(config, nodes, connections):
"""
convert array to genome
:param config:
:param nodes:
:param connections:
:return:
"""
genome = Genome(0, config, None, init_val=False)
genome.input_keys = [0, 1]
genome.output_keys = [2]
idx2key = {}
for i in range(nodes.shape[0]):
key = nodes[i, 0]
if np.isnan(key):
continue
key = int(key)
idx2key[i] = key
if key in genome.input_keys:
continue
node_gene = NodeGene(key, config, init_val=False)
node_gene.bias = nodes[i, 1]
node_gene.response = nodes[i, 2]
node_gene.act = 'sigmoid'
node_gene.agg = 'sum'
genome.nodes[key] = node_gene
for i in range(connections.shape[1]):
for j in range(connections.shape[2]):
if np.isnan(connections[0, i, j]):
continue
key = (idx2key[i], idx2key[j])
connection_gene = ConnectionGene(key, config, init_val=False)
connection_gene.weight = connections[0, i, j]
connection_gene.enabled = connections[1, i, j] == 1
genome.connections[key] = connection_gene
return genome

View File

@@ -1,17 +0,0 @@
"""
Has the built-in activation functions,
code for using them,
and code for adding new user-defined ones
"""
import math
def sigmoid_activation(z):
z = max(-60.0, min(60.0, 5.0 * z))
return 1.0 / (1.0 + math.exp(-z))
activation_dict = {
"sigmoid": sigmoid_activation,
}
full_activation_list = list(activation_dict.keys())

View File

@@ -1,14 +0,0 @@
"""
Has the built-in aggregation functions, code for using them,
and code for adding new user-defined ones.
"""
def sum_aggregation(x):
return sum(x)
aggregation_dict = {
'sum': sum_aggregation,
}
full_aggregation_list = list(aggregation_dict.keys())

View File

@@ -1,54 +0,0 @@
from .graphs import node_calculate_sequence
from .activations import activation_dict
from .aggregations import aggregation_dict
class FeedForwardNetwork(object):
def __init__(self, inputs, outputs, node_evals):
self.input_nodes = inputs
self.output_nodes = outputs
self.node_evals = node_evals
self.values = dict((key, 0.0) for key in inputs + outputs)
def activate(self, inputs):
if len(self.input_nodes) != len(inputs):
raise RuntimeError("Expected {0:n} inputs, got {1:n}".format(len(self.input_nodes), len(inputs)))
for k, v in zip(self.input_nodes, inputs):
self.values[k] = v
for node, act_func, agg_func, bias, response, links in self.node_evals:
node_inputs = []
for i, w in links:
node_inputs.append(self.values[i] * w)
if len(node_inputs) == 0:
s = 0.0
else:
s = agg_func(node_inputs)
self.values[node] = act_func(bias + response * s)
return [self.values[i] for i in self.output_nodes]
@staticmethod
def create(genome):
""" Receives a genome and returns its phenotype (a FeedForwardNetwork). """
# Gather expressed connections.
connections = [cg.key for cg in genome.connections.values() if cg.enabled]
seqs, useful_connections = node_calculate_sequence(genome.input_keys, genome.output_keys, connections)
node_evals = []
for node in seqs:
inputs = []
for conn_key in useful_connections:
inode, onode = conn_key
if onode == node:
cg = genome.connections[conn_key]
inputs.append((inode, cg.weight))
ng = genome.nodes[node]
act_func = activation_dict[ng.act]
agg_func = aggregation_dict[ng.agg]
node_evals.append((node, act_func, agg_func, ng.bias, ng.response, inputs))
return FeedForwardNetwork(genome.input_keys, genome.output_keys, node_evals)

View File

@@ -1,152 +0,0 @@
from typing import Tuple
from random import gauss, choice, random
def clip(x, min_val, max_val):
return min(max(x, min_val), max_val)
class NodeGene:
def __init__(self, key: int, config, init_val=True):
self.key = key
self.config = config
if init_val:
self.init_value()
else:
self.bias = 0
self.response = 1
self.act = 0
self.agg = 0
def __repr__(self):
return f'node({self.key}, bias: {self.bias:.3f}, ' \
f'response: {self.response:.3f}, act: {self.act}, agg: {self.agg})'
def __eq__(self, other):
if not isinstance(other, NodeGene):
return False
return self.key == other.key and \
self.bias == other.bias and \
self.response == other.response and \
self.act == other.act and \
self.agg == other.agg
def copy(self):
new_gene = self.__class__(self.key, config=self.config, init_val=False)
new_gene.bias = self.bias # numpy array is mutable, so we need to copy it
new_gene.response = self.response
new_gene.act = self.act
new_gene.agg = self.agg
return new_gene
def init_value(self):
c = self.config.gene
self.bias = gauss(c.bias.init_mean, c.bias.init_stdev)
self.response = gauss(c.response.init_mean, c.response.init_stdev)
self.act = choice(c.activation.options)
self.agg = choice(c.aggregation.options)
self.bias = clip(self.bias, c.bias.min_value, c.bias.max_value)
self.response = clip(self.response, c.response.min_value, c.response.max_value)
def distance(self, other):
s = abs(self.bias - other.bias) + abs(self.response - other.response)
if self.act != other.act:
s += 1
if self.agg != other.agg:
s += 1
return s
def mutate(self):
self.bias = mutate_float(self.bias, self.config.gene.bias)
self.response = mutate_float(self.response, self.config.gene.response)
self.act = mutate_string(self.act, self.config.gene.activation)
self.agg = mutate_string(self.agg, self.config.gene.aggregation)
@classmethod
def crossover(cls, g1, g2):
assert g1.key == g2.key
c = cls(g1.key, g1.config, init_val=False)
c.bias = g1.bias if random() > 0.5 else g2.bias
c.response = g1.response if random() > 0.5 else g2.response
c.act = g1.act if random() > 0.5 else g2.act
c.agg = g1.agg if random() > 0.5 else g2.agg
return c
class ConnectionGene:
def __init__(self, key: Tuple[int, int], config, init_val=True):
self.key = key
self.config = config
self.enabled = True
if init_val:
self.init_value()
else:
self.weight = 1
def __repr__(self):
return f'connection({self.key}, {self.weight:.3f}, {self.enabled})'
def __eq__(self, other):
if not isinstance(other, ConnectionGene):
return False
return self.key == other.key and \
self.weight == other.weight and \
self.enabled == other.enabled
def copy(self):
new_gene = self.__class__(self.key, self.config, init_val=False)
new_gene.weight = self.weight
new_gene.enabled = self.enabled
return new_gene
def init_value(self):
c = self.config.gene
self.weight = gauss(c.weight.init_mean, c.weight.init_stdev)
self.weight = clip(self.weight, c.weight.min_value, c.weight.max_value)
def distance(self, other):
s = abs(self.weight - other.weight)
if self.enabled != other.enabled:
s += 1
return s
def mutate(self):
self.weight = mutate_float(self.weight, self.config.gene.weight)
if random() < self.config.gene.enabled.mutate_rate:
self.enabled = not self.enabled
@classmethod
def crossover(cls, g1, g2):
assert g1.key == g2.key
c = cls(g1.key, g1.config, init_val=False)
c.weight = g1.weight if random() > 0.5 else g2.weight
c.enabled = g1.enabled if random() > 0.5 else g2.enabled
return c
# HAHA, exactly the bug is here!!
# After I fixed it, the result is much better!!
def mutate_float(v, vc):
"""vc -> value config"""
r = random()
if r < vc.mutate_rate:
v += gauss(0, vc.mutate_power)
v = clip(v, vc.min_value, vc.max_value)
# Previous, seems like a huge bug
# if r < vc.mutate_rate + vc.replace_rate:
# Now:
elif r < vc.mutate_rate + vc.replace_rate:
v = gauss(vc.init_mean, vc.init_stdev)
v = clip(v, vc.min_value, vc.max_value)
return v
def mutate_string(v, vc):
"""vc -> value config"""
r = random()
if r < vc.mutate_rate:
v = choice(vc.options)
return v

View File

@@ -1,246 +0,0 @@
from random import random, choice
from .gene import NodeGene, ConnectionGene
from .graphs import creates_cycle
class Genome:
def __init__(self, key, config, global_idx, init_val=True):
# Unique identifier for a genome instance.
self.key = key
self.config = config
self.global_idx = global_idx
# (gene_key, gene) pairs for gene sets.
self.connections = {}
self.nodes = {}
# Fitness results.
self.fitness = None
# self.input_keys = [-i - 1 for i in range(config.basic.num_inputs)]
# self.output_keys = [i for i in range(config.basic.num_outputs)]
if init_val:
self.initialize()
def __repr__(self):
nodes_info = ',\n\t\t'.join(map(str, self.nodes.values()))
connections_info = ',\n\t\t'.join(map(str, self.connections.values()))
return f'Genome(\n\t' \
f'key: {self.key}, \n' \
f'\tinput_keys: {self.input_keys}, \n' \
f'\toutput_keys: {self.output_keys}, \n' \
f'\tnodes: \n\t\t' \
f'{nodes_info} \n' \
f'\tconnections: \n\t\t' \
f'{connections_info} \n)'
def __eq__(self, other):
if not isinstance(other, Genome):
return False
if self.key != other.key:
return False
if len(self.nodes) != len(other.nodes) or len(self.connections) != len(other.connections):
return False
for k, v in self.nodes.items():
o_v = other.nodes.get(k)
if o_v is None or v != o_v:
return False
for k, v in self.connections.items():
o_v = other.connections.get(k)
if o_v is None or v != o_v:
return False
return True
def initialize(self):
"""Configure a new genome based on the given configuration."""
# Create node genes for the output pins.
for node_key in self.output_keys:
self.nodes[node_key] = NodeGene(node_key, self.config, init_val=True)
# Add connections based on initial connectivity type.
# ONLY ALLOW FULL HERE AND NO HIDDEN!!!
for i in self.input_keys:
for j in self.output_keys:
key = (i, j)
self.connections[key] = ConnectionGene(key, self.config, init_val=True)
def distance(self, other):
"""Calculate the distance between two genomes."""
wc = self.config.genome.compatibility_weight_coefficient
dc = self.config.genome.compatibility_disjoint_coefficient
node_distance = 0.0
if self.nodes or other.nodes: # otherwise, both are empty
disjoint_nodes = 0
for k2 in other.nodes:
if k2 not in self.nodes:
disjoint_nodes += 1
for k1, n1 in self.nodes.items():
n2 = other.nodes.get(k1)
if n2 is None:
disjoint_nodes += 1
else:
# Homologous genes compute their own distance value.
node_distance += n1.distance(n2)
max_nodes = max(len(self.nodes), len(other.nodes))
node_distance = (wc * node_distance + dc * disjoint_nodes) / max_nodes
connection_distance = 0.0
if self.connections or other.connections:
disjoint_connections = 0
for k2 in other.connections:
if k2 not in self.connections:
disjoint_connections += 1
for k1, c1 in self.connections.items():
c2 = other.connections.get(k1)
if c2 is None:
disjoint_connections += 1
else:
# Homologous genes compute their own distance value.
connection_distance += c1.distance(c2)
max_conn = max(len(self.connections), len(other.connections))
connection_distance = (wc * connection_distance + dc * disjoint_connections) / max_conn
return node_distance + connection_distance
@classmethod
def crossover(cls, new_key, g1, g2):
if g1.fitness > g2.fitness:
p1, p2 = g1, g2
else:
p1, p2 = g2, g1
child = cls(new_key, p1.config, p1.global_idx, init_val=False)
for k, cg1 in p1.connections.items():
cg2 = p2.connections.get(k)
if cg2 is None:
child.connections[k] = cg1.copy()
else:
child.connections[k] = ConnectionGene.crossover(cg1, cg2)
for k, ng1 in p1.nodes.items():
ng2 = p2.nodes.get(k)
if ng2 is None:
child.nodes[k] = ng1.copy()
else:
child.nodes[k] = NodeGene.crossover(ng1, ng2)
return child
def mutate(self):
c = self.config.genome
if c.single_structural_mutation:
div = max(1, c.conn_add_prob + c.conn_delete_prob + c.node_add_prob + c.node_delete_prob)
r = random()
if r < c.node_add_prob / div:
self.mutate_add_node()
elif r < (c.node_add_prob + c.node_delete_prob) / div:
self.mutate_delete_node()
elif r < (c.node_add_prob + c.node_delete_prob + c.conn_add_prob) / div:
self.mutate_add_connection()
elif r < (c.node_add_prob + c.node_delete_prob + c.conn_add_prob + c.conn_delete_prob) / div:
self.mutate_delete_connection()
else:
if random() < c.node_add_prob:
self.mutate_add_node()
if random() < c.node_delete_prob:
self.mutate_delete_node()
if random() < c.conn_add_prob:
self.mutate_add_connection()
if random() < c.conn_delete_prob:
self.mutate_delete_connection()
for cg in self.connections.values():
cg.mutate()
for ng in self.nodes.values():
ng.mutate()
def mutate_add_node(self):
# create a node from splitting a connection
if not self.connections:
return -1
# Choose a random connection to split
conn_to_split = choice(list(self.connections.values()))
new_node_id = self.global_idx.next_node()
ng = NodeGene(new_node_id, self.config, init_val=False)
self.nodes[new_node_id] = ng
# Create two new connections
conn_to_split.enabled = False
i, o = conn_to_split.key
con1 = ConnectionGene((i, new_node_id), self.config, init_val=False)
con2 = ConnectionGene((new_node_id, o), self.config, init_val=False)
# The new node+connections have roughly the same behavior as
# the original connection (depending on the activation function of the new node).
con2.weight = conn_to_split.weight
self.connections[con1.key] = con1
self.connections[con2.key] = con2
return 1
def mutate_delete_node(self):
# Do nothing if there are no non-output nodes.
available_nodes = [k for k in self.nodes if k not in self.output_keys]
if not available_nodes:
return -1
del_key = choice(available_nodes)
connections_to_delete = set()
for k, v in self.connections.items():
if del_key in v.key:
connections_to_delete.add(v.key)
for key in connections_to_delete:
del self.connections[key]
del self.nodes[del_key]
return del_key
def mutate_add_connection(self):
"""
Attempt to add a new connection, the only restriction being that the output
node cannot be one of the network input pins.
"""
possible_outputs = list(self.nodes)
out_node = choice(possible_outputs)
possible_inputs = possible_outputs + self.input_keys
in_node = choice(possible_inputs)
# in recurrent networks, the input node can be the same as the output node
key = (in_node, out_node)
if key in self.connections:
self.connections[key].enabled = True
return -1
# if feedforward network, check if the connection creates a cycle
if self.config.genome.feedforward and creates_cycle(self.connections.keys(), key):
return -1
cg = ConnectionGene(key, self.config, init_val=True)
self.connections[key] = cg
return key
def mutate_delete_connection(self):
if self.connections:
key = choice(list(self.connections.keys()))
del self.connections[key]
def complexity(self):
return len(self.connections) * 2 + len(self.nodes) * 4

View File

@@ -1,130 +0,0 @@
"""Directed graph algorithm implementations."""
def creates_cycle(connections, test):
"""
Returns true if the addition of the 'test' connection would create a cycle,
assuming that no cycle already exists in the graph represented by 'connections'.
"""
i, o = test
if i == o:
return True
visited = {o}
while True:
num_added = 0
for a, b in connections:
if a in visited and b not in visited:
if b == i:
return True
visited.add(b)
num_added += 1
if num_added == 0:
return False
def required_for_output(inputs, outputs, connections):
"""
Collect the nodes whose state is required to compute the final network output(s).
:param inputs: list of the input identifiers
:param outputs: list of the output node identifiers
:param connections: list of (input, output) connections in the network.
NOTE: It is assumed that the input identifier set and the node identifier set are disjoint.
By convention, the output node ids are always the same as the output index.
Returns a set of identifiers of required nodes.
"""
assert not set(inputs).intersection(outputs)
required = set(outputs)
s = set(outputs)
while 1:
# Find nodes not in s whose output is consumed by a node in s.
t = set(a for (a, b) in connections if b in s and a not in s)
if not t:
break
layer_nodes = set(x for x in t if x not in inputs)
if not layer_nodes:
break
required = required.union(layer_nodes)
s = s.union(t)
return required
def feed_forward_layers(inputs, outputs, connections):
"""
Collect the layers whose members can be evaluated in parallel in a feed-forward network.
:param inputs: list of the network input nodes
:param outputs: list of the output node identifiers
:param connections: list of (input, output) connections in the network.
Returns a list of layers, with each layer consisting of a set of node identifiers.
Note that the returned layers do not contain nodes whose output is ultimately
never used to compute the final network output.
"""
required = required_for_output(inputs, outputs, connections)
layers = []
s = set(inputs)
while 1:
# Find candidate nodes c for the next layer. These nodes should connect
# a node in s to a node not in s.
c = set(b for (a, b) in connections if a in s and b not in s)
# Keep only the used nodes whose entire input set is contained in s.
t = set()
for n in c:
if n in required and all(a in s for (a, b) in connections if b == n):
t.add(n)
if not t:
break
layers.append(t)
s = s.union(t)
return layers
def node_calculate_sequence(inputs, outputs, connections):
"""
Collect the sequence of nodes to calculate in order to compute the final network output(s).
:param required_nodes:
:param connections:
:return:
"""
required_nodes = required_for_output(inputs, outputs, connections)
useful_nodes = required_nodes.copy()
useful_nodes.update(inputs)
useful_connections = [c for c in connections if c[0] in useful_nodes and c[1] in useful_nodes]
# do topological sort on useful_connections
in_degrees = {n: 0 for n in useful_nodes}
for a, b in useful_connections:
in_degrees[b] += 1
topological_order = []
while len(topological_order) < len(useful_nodes):
for n in in_degrees:
if in_degrees[n] == 0:
topological_order.append(n)
in_degrees[n] -= 1
for a, b in useful_connections:
if a == n:
in_degrees[b] -= 1
[topological_order.remove(n) for n in inputs] # remove inputs from topological order
return topological_order, useful_connections
if __name__ == '__main__':
inputs = [-1, -2]
outputs = [0]
connections = [(-2, 2), (-2, 3), (4, 0), (3, 0), (2, 0), (2, 3), (2, 4)]
seqs = node_calculate_sequence(inputs, outputs, connections)
print(seqs)

View File

@@ -2,16 +2,12 @@ from typing import List, Union, Tuple, Callable
import time import time
import jax import jax
import numpy as np
from .species import SpeciesController from .species import SpeciesController
from .genome import create_initialize_function, create_mutate_function, create_forward_function from .genome import create_initialize_function, create_mutate_function, create_forward_function
from .genome import batch_crossover from .genome import batch_crossover
from .genome import expand, expand_single, distance from .genome import expand, expand_single
from .genome.origin_neat import *
xor_inputs = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
xor_outputs = np.array([[0], [1], [1], [0]])
class Pipeline: class Pipeline:
@@ -62,38 +58,6 @@ class Pipeline:
self.update_next_generation(crossover_pair) self.update_next_generation(crossover_pair)
# for i in range(self.pop_size):
# for j in range(self.pop_size):
# n1, c1 = self.pop_nodes[i], self.pop_connections[i]
# n2, c2 = self.pop_nodes[j], self.pop_connections[j]
# g1 = array2object(self.config.neat, n1, c1)
# g2 = array2object(self.config.neat, n2, c2)
# d_real = g1.distance(g2)
# d = distance(n1, c1, n2, c2)
# print(d_real, d)
# assert np.allclose(d_real, d)
# analysis = pop_analysis(self.pop_nodes, self.pop_connections, self.input_idx, self.output_idx)
# try:
# for nodes, connections in zip(self.pop_nodes, self.pop_connections):
# g = array2object(self.config, nodes, connections)
# print(g)
# net = FeedForwardNetwork.create(g)
# real_out = [net.activate(x) for x in xor_inputs]
# func = create_forward_function(nodes, connections, self.N, self.input_idx, self.output_idx, batch=True)
# out = func(xor_inputs)
# real_out = np.array(real_out)
# out = np.array(out)
# print(real_out, out)
# assert np.allclose(real_out, out)
# except AssertionError:
# np.save("err_nodes.npy", self.pop_nodes)
# np.save("err_connections.npy", self.pop_connections)
# print(g)
self.species_controller.speciate(self.pop_nodes, self.pop_connections, self.generation) self.species_controller.speciate(self.pop_nodes, self.pop_connections, self.generation)
self.expand() self.expand()
@@ -145,7 +109,7 @@ class Pipeline:
new_node_keys = np.array(self.fetch_new_node_keys()) new_node_keys = np.array(self.fetch_new_node_keys())
m_npn, m_npc = self.mutate_func(mutate_rand_keys, npn, npc, new_node_keys) # mutate_new_pop_nodes m_npn, m_npc = self.mutate_func(mutate_rand_keys, npn, npc, new_node_keys) # mutate_new_pop_nodes
m_npn, m_npc = jax.device_get(m_npn), jax.device_get(m_npc)
# elitism don't mutate # elitism don't mutate
# (pop_size, ) to (pop_size, 1, 1) # (pop_size, ) to (pop_size, 1, 1)
self.pop_nodes = np.where(elitism_mask[:, None, None], npn, m_npn) self.pop_nodes = np.where(elitism_mask[:, None, None], npn, m_npn)

View File

@@ -105,7 +105,6 @@ class SpeciesController:
if len(new_representatives) != 0: if len(new_representatives) != 0:
# the representatives of new species # the representatives of new species
sid, rid = list(zip(*[(k, v) for k, v in new_representatives.items()])) sid, rid = list(zip(*[(k, v) for k, v in new_representatives.items()]))
distances = [ distances = [
self.distance(pop_nodes[i], pop_connections[i], pop_nodes[r], pop_connections[r]) self.distance(pop_nodes[i], pop_connections[i], pop_nodes[r], pop_connections[r])
for r in rid for r in rid

View File

@@ -1,84 +0,0 @@
from typing import Callable, List
from functools import partial
import numpy as np
from utils import Configer
from algorithms.neat.genome.numpy import analysis, distance
from algorithms.neat.genome.numpy import create_initialize_function, create_mutate_function
def real_distance(nodes1, connections1, nodes2, connections2, input_idx, output_idx):
nodes1, connections1 = analysis(nodes1, connections1, input_idx, output_idx)
nodes2, connections2 = analysis(nodes2, connections2, input_idx, output_idx)
compatibility_coe = 0.5
disjoint_coe = 1.
node_distance = 0.0
if nodes1 or nodes2: # otherwise, both are empty
disjoint_nodes = 0
for k2 in nodes2:
if k2 not in nodes1:
disjoint_nodes += 1
for k1, n1 in nodes1.items():
n2 = nodes2.get(k1)
if n2 is None:
disjoint_nodes += 1
else:
if n1[0] is None:
continue
d = abs(n1[0] - n2[0]) + abs(n1[1] - n2[1])
d += 1 if n1[2] != n2[2] else 0
d += 1 if n1[3] != n2[3] else 0
node_distance += d
max_nodes = max(len(nodes1), len(nodes2))
node_distance = (compatibility_coe * node_distance + disjoint_coe * disjoint_nodes) / max_nodes
connection_distance = 0.0
if connections1 or connections2:
disjoint_connections = 0
for k2 in connections2:
if k2 not in connections1:
disjoint_connections += 1
for k1, c1 in connections1.items():
c2 = connections2.get(k1)
if c2 is None:
disjoint_connections += 1
else:
# Homologous genes compute their own distance value.
d = abs(c1[0] - c2[0])
d += 1 if c1[1] != c2[1] else 0
connection_distance += d
max_conn = max(len(connections1), len(connections2))
connection_distance = (compatibility_coe * connection_distance + disjoint_coe * disjoint_connections) / max_conn
return node_distance + connection_distance
def main():
config = Configer.load_config()
keys_idx = config.basic.num_inputs + config.basic.num_outputs
pop_size = config.neat.population.pop_size
init_func = create_initialize_function(config)
pop_nodes, pop_connections, input_idx, output_idx = init_func()
mutate_func = create_mutate_function(config, input_idx, output_idx, batch=True)
while True:
pop_nodes, pop_connections = mutate_func(pop_nodes, pop_connections, list(range(keys_idx, keys_idx + pop_size)))
keys_idx += pop_size
for i in range(pop_size):
for j in range(pop_size):
nodes1, connections1 = pop_nodes[i], pop_connections[i]
nodes2, connections2 = pop_nodes[j], pop_connections[j]
numpy_d = distance(nodes1, connections1, nodes2, connections2)
real_d = real_distance(nodes1, connections1, nodes2, connections2, input_idx, output_idx)
assert np.isclose(numpy_d, real_d), f'{numpy_d} != {real_d}'
print(numpy_d, real_d)
if __name__ == '__main__':
np.random.seed(0)
main()

View File

@@ -1,24 +0,0 @@
import numpy as np
from jax import numpy as jnp
from algorithms.neat.genome.genome import analysis
from algorithms.neat.genome import create_forward_function
error_nodes = np.load('error_nodes.npy')
error_connections = np.load('error_connections.npy')
node_dict, connection_dict = analysis(error_nodes, error_connections, np.array([0, 1]), np.array([2, ]))
print(node_dict, connection_dict, sep='\n')
N = error_nodes.shape[0]
xor_inputs = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
func = create_forward_function(error_nodes, error_connections, N, jnp.array([0, 1]), jnp.array([2, ]),
batch=True, debug=True)
out = func(np.array([1, 0]))
print(error_nodes)
print(error_connections)
print(out)

View File

@@ -1,11 +0,0 @@
import numpy as np
from algorithms.neat.genome import distance
r_nodes = np.load('too_large_distance_r_nodes.npy')
r_connections = np.load('too_large_distance_r_connections.npy')
nodes = np.load('too_large_distance_nodes.npy')
connections = np.load('too_large_distance_connections.npy')
d1 = distance(r_nodes, r_connections, nodes, connections)
d2 = distance(nodes, connections, r_nodes, r_connections)
print(d1, d2)

View File

@@ -1,71 +0,0 @@
import time
import jax.random
from utils import Configer
from algorithms.neat.genome.genome import *
from algorithms.neat.species import SpeciesController
from algorithms.neat.genome.forward import create_forward_function
from algorithms.neat.genome.mutate import create_mutate_function
if __name__ == '__main__':
N = 10
pop_nodes, pop_connections, input_idx, output_idx = initialize_genomes(10000, N, 2, 1,
default_act=9, default_agg=0)
inputs = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
# forward = create_forward_function(pop_nodes, pop_connections, 5, input_idx, output_idx, batch=True)
nodes, connections = pop_nodes[0], pop_connections[0]
forward = create_forward_function(pop_nodes, pop_connections, N, input_idx, output_idx, batch=True)
out = forward(inputs)
print(out.shape)
print(out)
config = Configer.load_config()
s_c = SpeciesController(config.neat)
s_c.speciate(pop_nodes, pop_connections, 0)
s_c.speciate(pop_nodes, pop_connections, 0)
print(s_c.genome_to_species)
start = time.time()
for i in range(100):
print(i)
s_c.speciate(pop_nodes, pop_connections, i)
print(time.time() - start)
seed = jax.random.PRNGKey(42)
mutate_func = create_mutate_function(config, input_idx, output_idx, batch=False)
print(nodes, connections, sep='\n')
print(*mutate_func(seed, nodes, connections, 100), sep='\n')
randseeds = jax.random.split(seed, 10000)
new_node_keys = jax.random.randint(randseeds[0], minval=0, maxval=10000, shape=(10000,))
batch_mutate_func = create_mutate_function(config, input_idx, output_idx, batch=True)
pop_nodes, pop_connections = batch_mutate_func(randseeds, pop_nodes, pop_connections, new_node_keys)
print(pop_nodes, pop_connections, sep='\n')
start = time.time()
for i in range(100):
print(i)
pop_nodes, pop_connections = batch_mutate_func(randseeds, pop_nodes, pop_connections, new_node_keys)
print(time.time() - start)
print(nodes, connections, sep='\n')
nodes, connections = add_node(6, nodes, connections)
nodes, connections = add_node(7, nodes, connections)
print(nodes, connections, sep='\n')
nodes, connections = add_connection(6, 7, nodes, connections)
nodes, connections = add_connection(0, 7, nodes, connections)
nodes, connections = add_connection(1, 7, nodes, connections)
print(nodes, connections, sep='\n')
nodes, connections = delete_connection(6, 7, nodes, connections)
print(nodes, connections, sep='\n')
nodes, connections = delete_node(6, nodes, connections)
print(nodes, connections, sep='\n')
nodes, connections = delete_node(7, nodes, connections)
print(nodes, connections, sep='\n')

View File

@@ -18,7 +18,6 @@ def evaluate(forward_func: Callable) -> List[float]:
""" """
outs = forward_func(xor_inputs) outs = forward_func(xor_inputs)
fitnesses = 4 - np.sum((outs - xor_outputs) ** 2, axis=(1, 2)) fitnesses = 4 - np.sum((outs - xor_outputs) ** 2, axis=(1, 2))
# print(fitnesses)
return fitnesses.tolist() # returns a list return fitnesses.tolist() # returns a list
@@ -29,13 +28,6 @@ def main():
pipeline = Pipeline(config, seed=11323) pipeline = Pipeline(config, seed=11323)
pipeline.auto_run(evaluate) pipeline.auto_run(evaluate)
# for _ in range(100):
# s = time.time()
# forward_func = pipeline.ask(batch=True)
# fitnesses = evaluate(forward_func)
# pipeline.tell(fitnesses)
# print(time.time() - s)
if __name__ == '__main__': if __name__ == '__main__':
main() main()