CONTrOL is an agent-based model developed for the purpose of studying behavioral mechanisms influencing social learning and performance within and across business organisations. CONTrOL stands for Complex Organisational and Network-driven Transmissions resulting in Organisation Learning.
Organisation package
This section contains the description of the code objects that are part of the organisation package.
These objects are clustered as separate python submodules (.py files) within this package.
The AgentPy library is used for the creation of the agents and their attributes.
import agentpy as ap
organisation.Individual module
Class definition for Individual
This is a superclass definition for an agent in the AgentPy library.
The class is called Individual and it inherits from ap.Agent, which is a base class in the AgentPy library for defining agents.
The setup method is defined within the class and it initializes various attributes of the agent, such as its personal competency, current location, path, and knowledge. The
initPersonalCompetency,initLocation, andinitKnowledgemethods are defined in the next code section of this module and are called to set the initial values for these attributes.The code also randomly sets the size of the agent’s memory and creates a new
Memoryobject with that size, which is used to store the agent’s past visited locations. Theremembermethod of theMemoryobject is called to remember the agent’s current location.Lastly, the code initializes empty lists for the agent’s departmental colleagues, organizational colleagues, and friends. These lists are populated later in the code.
class Individual(ap.Agent):
def setup(self, initialLocation):
self.initPersonalCompetency()
self.currentLocation = self.initLocation(initialLocation)
self.path = [self.currentLocation]
self.knowledge = self.initKnowledge(initialLocation)
memorySize = self.model.random.randint(self.model.p.minMemory, self.model.p.maxMemory)
self.memory = Memory(memorySize)
self.memory.remember(self.currentLocation)
self.deptColleaguesList = []
self.orgColleaguesList = []
self.friends = []
Other initializations included in the class definition for Individual are:
The
initPersonalCompetency(self)function, which involves:energy initialization
self.totalEnergyfor the agents to have at least enough energy for a month (default setting =reimbursementWindowparameter, see main.py to propose a one-step move every day, and max 10 times that;self.maxNumFriendswhich sets randum number of maximum friends an agent can have (network size) between 0 andmaxMaxNumFriends;self.innovativenssandself.predictionCompetenceboth of which are random number with triangular distribution between 0 and 1. Triangular distribution is used since the outcome range has a minimum (0), a maximum (1) and a peak parameter (i.e. a most-likely parameter).
The
initLocation(self, initialLocation)fuction which initializes anIndividualLocationPerceptionobject based on theinitialLocationparameter and the number of multiplex layers defined in the landscape (add link later).TheIndividualLocationPerceptionobject stores the realized fitness values per layer for the initial location. The function records the realized fitness value for each layer using therecordRealisedFitnessmethod, which can be found here Finally, the function returns theIndividualLocationPerceptionobject representing the initial location.The
initKnowledge (self, sourceNode)function:This function initializes the knowledge of an agent in a model. It takes a sourceNode as an input, which is the initial location of the agent;
The function generates a list of layer numbers using the
random.choices method, which selectsklayer numbers from the range of 0 to the number of layers in the model’s landscape;The function then creates a new
Knowledgeobject using these parameters along with the number of initial knowledge nodes, initial knowledge graph degree, and mind capacity specified in the model parameters (see [main.py[(#main.py)]).The
Knowledgeobject contains information about the agent’s knowledge, including the nodes and edges in their mental representation of the landscape.
class Individual(ap.Agent): CONTINUED
def initPersonalCompetency(self):
self.totalEnergy = self.model.random.randint(self.model.p.reimbursementWindow, self.model.p.reimbursementWindow*10)
self.maxNumFriends = self.model.random.randint(0, self.model.p.maxMaxNumFriends)
self.innovativeness = self.model.nprandom.triangular(0, self.model.p.modeInnovativeness, 1)
self.predictionCompetence = self.model.nprandom.triangular(0, self.model.p.modePredictionCompetence, 1)
def initLocation(self, initialLocation):
location = IndividualLocationPerception(initialLocation, self.model.landscape.num_layers)
for l_idx in range(self.model.landscape.num_layers):
location.recordRealisedFitness(l_idx, self.model.landscape.getFitnessForLayer(l_idx, initialLocation))
return location
def initKnowledge(self, sourceNode):
layers = self.model.random.choices([i for i in range(0, self.model.landscape.num_layers)], k=self.model.p.initKnowledgeLayerNum)
return Knowledge(self.model.landscape, sourceNode, layers, self.model.p.initKnowledgeNodeNum, self.model.p.initKnowledgeGraphDegree, self.model.p.mindCapacity)
Next, several functions represent initial social attributes and behavior of the individual:
The
getFriends(self)function determines which neighbors are already a friend (as opposed to organisational superior and/or subordinate relationships):The function first creates a list of all of the
individual’s neighbors in the social graph using a list comprehension (i.e. nodes that are directly connected to the agent);It then iterates through this list, checking the type of the edge connecting the agent to each neighbor in the social graph;
If the type of the edge is “friendRelation”, meaning that the neighbor is a friend of the agent, it appends the neighbor to a list called “friends”;
Finally, the function returns the “friends” list.
The
getColleagues(self)function evaluates every otherindividualin the landscape to identify whether it is a colleague within the agent’s organisation and the agent’s department. If so, the respectiveindividualis added (.append) to the list of the agent’s organisational colleagues (orgColleaguesList) and departmental colleagues (deptColleaguesList). Note: CEO’s have no department and therefore do not have departmental colleagues, only organisational.The
socialise(self)fuction is a function to initialise the creation of a social network for everyindividualin the form of a list that is composed of every otherindividualin the landscape, not being an organisational and/or departmental colleague. Initially, all nodes are eligible to befriend. Later, this set is reduced (add link). The logic is as follows:Find out which nodes are colleagues; don’t befriend them (except that employees can befriend the CEO);
Don’t befriend people that are already a neighbor, i.e. current friends, or employees’ managers, or managers’ CEOs or employees, or self;
Direct neighbors and colleagues within the department won’t be befriended; this is registered in the
exclude_list.First make a list of which nodes these are, and then remove them from the set of eligible nodes
Note: the orgColleaguesList is not used here
class Individual(ap.Agent): CONTINUED
def getFriends(self):
self.neighborList = [n for n in
self.model.socialGraph.neighbors(self)]
friends = list()
for neighbor in self.neighborList:
if self.model.socialGraph.edges[(self, neighbor)]["type"] == "friendRelation":
friends.append(neighbor)
return friends
def getColleagues(self):
deptColleaguesList = list()
orgColleaguesList = list()
for node in self.model.socialGraph.nodes:
if node.organisation == self.organisation and node.type != "CEO":
if self.type == "CEO":
orgColleaguesList.append(node)
if self.model.debugFriendship: print('I am node', self, 'and I am orgcolleagues with', node)
elif node.department == self.department:
deptColleaguesList.append(node)
if self.model.debugFriendship: print('I am node', self, 'and I am depcolleagues with', node)
else:
orgColleaguesList.append(node)
if self.model.debugFriendship: print('I am node', self, 'and I am orgcolleagues with', node)
return deptColleaguesList, orgColleaguesList
def socialise(self):
self.friends = self.getFriends()
self.numFriends = len(self.friends)
self.deptColleaguesList, self.orgColleaguesList = self.getColleagues()
while self.numFriends < self.maxNumFriends:
eligible_nodes = set(self.model.socialGraph.nodes())
exclude_list = [*self.neighborList, *self.deptColleaguesList]
exclude_list.append(self)
eligible_nodes.difference_update(exclude_list)
if len(eligible_nodes) > 0:
new_friend = self.model.random.choice(list(eligible_nodes))
else:
break
self.model.socialGraph.add_edge(self, new_friend, type="friendRelation", color = "green")
self.numFriends += 1
self.friends.append(new_friend)
Class object definition for Memory
Class object definition of Memory and the initialization of two agent attributes:
a new empty list of locations
memory size Next, the function
rememberis created to update the agent’s memory with locationl.
The function checks whether the agent has the location already stored in its memory. If so, the memory is updated. If not, the location is added to the agent’s memory.
The function also ensures that the locations list does not exceed a certain size, by removing the oldest location from the list if it does, see function
forget(self).
Lastly, several functions are created to mark when memory isEmpty and which locations are oldest and latest.
class Memory:
def __init__(self, size):
self.locations = []
self.size = size
def remember(self, l):
updated = False
for i in range(len(self.locations)):
if self.locations[i].get() == l.get():
self.locations[i] = l
updated = True
if not updated:
self.locations.insert(0, l)
if len(self.locations) > self.size:
self.forget()
def forget(self):
return self.locations.pop()
def isEmpty(self):
return len(self.locations) == 0
def oldest(self):
return self.locations[-1]
def latest(self):
return self.locations[0]
def search(self, location):
results = [r for r in filter(lambda l: l.get() == location, self.locations)]
if len(results) > 0:
return results[0]
else:
return None
Class object definition for IndividualLocalPerception
Landscape fitness perception is defined and registered at the individual level. For this purpose, a separate class IndividualLocationPerception is created.
For this class object, three attributes are initialized that belong to each agent (self) and depends two paramaters: (1) their location (location) and (2) the number of landscape layers the agent can interpret (num_layers):
coordinates of a landscape location (not necessarily the agent’s actual current location) (
self.location);the fitness they estimate for that location (relevant for locations where the agent considers moving to (
self.estimatedFitness));the realised fitness value of a location where the agent actually is (
self.realisedFitness). If the fitness is unknown or undefined, the value -1 is provided.
Ex-ante fitness estimation and ex-post fitness realization by the agent are recorded at the level of the separate layers in the multiplex landscape by means of functions recordFitnessEstimation (self, layer, fitness) and recordRealisedFitness(self,layer,fitness).
Lastly, the function getFitness(self, weights) calculates the total average fitness of an agent based on the estimated fitness and realised fitness values of the agent and a set of weights. The function takes two arguments: self, which refers to the current instance of the class that the function is defined in, and weights, which is a list of weights assigned per landscape layer and thus for each fitness value.
The function first initializes a variable called
resultto 0.It then iterates through the estimated fitness values of the agent using a for loop, with
l_idxas the loop index variable. For each fitness value, the function checks if the realised fitness value of the agent at the same index is greater than -1. If it is, then the function uses the realised fitness value instead of the estimated fitness value for that index. The function then multiplies the fitness value at that index by the corresponding weight from theweightslist and adds it to theresultvariable.Finally, the function returns the average value of
resultby dividing it by the length of theweightslist.
Note: Realised fitness will only be recorded for a particular location when the agent actually moves to that location. This is further defined in the Employee module.
class IndividualLocationPerception:
def __init__(self, location, num_layers):
self.location = location
self.estimatedFitness = [-1 for e in range(num_layers)]
self.realisedFitness = [-1 for e in range(num_layers)]
def recordFitnessEstimation(self, layer, fitness):
self.estimatedFitness[layer] = fitness
def recordRealisedFitness(self, layer, fitness):
self.realisedFitness[layer] = fitness
def get(self):
return self.location
def getFitness(self, weights):
result = 0
for l_idx in range(len(self.estimatedFitness)):
fitness_value = self.estimatedFitness[l_idx]
if self.realisedFitness[l_idx] > -1:
fitness_value = self.realisedFitness[l_idx]
result += weights[l_idx]*fitness_value
return result/len(weights)
organisation.Employee module
Class definition for NodeLayer
This function initializes a class NodeLayer that takes node and layer as two landscape-based parameters. The self argument refers to the object of the class being created, i.e. the agent (the employee). It initializes two object variables, node and layer in the landscape. The purpose of the NodeLayer object is to easily access node-layer pairs.
class NodeLayer:
def __init__(self, node, layer):
self.node = node
self.layer = layer
Class definition for Proposal
This function initializes a class object Proposal and takes employee as a parameter. It initializes several object variables:
self.employee: this object variable is assigned the value of theemployeeargument passed to the class construction method. It represents the employee that submitted the proposal.self.source: this object variable is initialized to “None”. It will be used to store the current location of the employee.self.target: this object variable is initialized to “None”. It will be used to store the proposed new location for the employee.self.improvement: this object variable is initialized to 0. It will be used to store the improvement score of the proposal.self.cost: this object variable is initialized to 0. It will be used to store the costs related to the proposal.self.isApproved: this object variable is initialized to False. It will be used to store a Boolean value indicating whether the proposal has been approved or not.
class Proposal:
def __init__(self, employee):
self.employee = employee
self.source = None
self.target = None
self.improvement = 0
self.cost = 0
self.isApproved = False
Class definition for EmployeeReport
This function initializes a class EmployeeReport to record the state of the employee. It initializes several (self-explanatory) employee-level object variables. The clear function is to clear the variables of the EmployeeReport object and set them to zero.
class EmployeeReport:
def __init__(self, employee):
self.employee = employee
self.proposal = Proposal(employee)
self.energyUsed = 0
self.newEdgesLearnt = 0
self.experimentsConducted = 0
self.innovativenessIncrease = 0
self.predictionCompetenceIncrease = 0
def clear(self):
self.energyUsed = 0
self.newEdgesLearnt = 0
self.experimentsConducted = 0
self.innovativenessIncrease = 0
self.predictionCompetenceIncrease = 0
Class definition for Employee
This code defines a class Employee which inherits from the Individual superclass. Given that this class definition has an extensive number of functions and methods, these are separately covered in the following subsections.
Setup functions
The setup method is defined within theEmployee class, and it takes one argument managerInitialLocation. The purpose of this method is to initialize the agent and pick an initial landscape location for the employee that is close to the manager.
The method starts by initializing an empty list called
potential_nodes.Then, for each path length in the
path_lengthattribute of thelandscapeobject in the model, the method iterates over the nodes in the first layer of the landscape.For each node, it checks if the path length from the
managerInitialLocationto that node is within a certain distance range (defined by thedepEmployeeInitialDistanceattribute of the model’s parameters set in the main module, and if so, it adds the node to thepotential_nodeslist.Finally, the method selects an initial location for the employee from the potential nodes using NumPy’s random choice function
nprandomif there are any potential nodes, otherwise, it assigns themanagerInitialLocationas the initial location for the employee.
class Employee(Individual):
def setup(self, managerInitialLocation):
potential_nodes = []
for path_length in self.model.landscape.path_length:
for node in self.model.landscape.layers[0].nodes:
if path_length[managerInitialLocation][node] >= (self.model.p.depEmployeeInitialDistance - 1) and \
path_length[managerInitialLocation][node] <= (self.model.p.depEmployeeInitialDistance + 1):
if node not in potential_nodes:
potential_nodes.append(node)
employee_initial_location = self.model.nprandom.choice(potential_nodes) if len(potential_nodes) > 0 else managerInitialLocation
The super().setup(employee_initial_location) has the purpose of creating of the agent type Employee as a subclass of Individual This code it calls the setup method of its superclass using the super() function. By calling the setup method of the Individual class, the agent-type Employee inherits some of the initialization properties and methods defined in the Individual class. Additionally, the code assigns the string value "employee" to the type attribute of the Employee object, which indicates the role of the individual. Also, it adds several status variables to the agent-type Employee, such as the PCSs initiated by the agent (self.ownedPCSes) and PCSs in which the agent participates (self.PCSes). Several of these state variables are parameters set in main.py.
class Employee(Individual): CONTINUED
super().setup(employee_initial_location)
self.type = "employee"
self.reimbursement = 0
self.report = EmployeeReport(self)
self.proposalHistory = []
self.proposalAveragingWindow = self.model.p.proposalAveragingWindow
self.ownedPCSes = list()
self.PCSes = list()
self.PCSInvitations = list()
self.ERIThreshold = self.model.p.ERIThreshold
(setupbookkeeping=)
The purpose of setupBookkeeping is to create an administrative method within the Employee class where the agent’s (self)place in the organisation is stored.
class Employee(Individual): CONTINUED
def setupBookkeeping(self, ceo, organisation, manager, department):
self.ceo = ceo
self.organisation = organisation
self.manager = manager
self.department = department
Landscape exploration
The exploreLandscape function defines how agents of the type Employee behave to explore the landscape.
It initializes
energy_leftwith the value ofavailableEnergyForExplorationandexperiments_leftwith the value ofavailableEnergyForExperimenting;It retrieves attributes (
knowledge,innovativeness,predictionCompetence) from the instance of the Employee class;It checks if PCSes are enabled for this model run;
If it does, then use the aggregated knowledge and competences instead of that of the agent itself;
If not, it skips the subsequent logic;
It iterates over the layers of the landscape to establish a list of
weightsbased onorganisation.beliefSystemand the available knowledge of the employee (which is either its own or an aggregation of knowledge available in the PCS whereto the employee belongs);It records the locations already visited in this exploration round to keep track of visited and potential locations for revisiting during the exploration of the landscape.
The following loop continues as long as energy_left is greater than 0.
It checks if there are potential locations to revisit (
potential_locations_to_revisit). If there are no potential locations to revisit, it breaks out of the loop;It determines the source node for exploration based on a random choice, considering the employee’s innovativeness. By default, it starts looking from the current location. If the employee is innovative enough, a node is selected beyond the default starting node;
It identifies potential nodes for exploration based on the neighbors of the source node within each layer of knowledge.
If there are potential new nodes that haven’t been visited before, it randomly selects a node for exploration.
Check whether the agent has already visited this node and still remembers its fitness value. If not, make a prediction on its fitness value. This prediction is biased by an error (
prediction_error) depending on personal competencies (predictionCompetence). This error can be both positive and negative;It updates the employee’s
memoryand tracks visited andpotential_locations_to_revisitaccordingly.It decrements the energy available for exploration (
energy_left).
Next, the function prepares a proposal for moving to another node that maximizes the employee’s expected fitness:
proposed_move: It determines the proposed move (location) by selecting the location with the highest fitness based on the given weights. Themaxfunction is used to find the location with the maximum fitness, and the key for comparison is defined as the fitness of each location weighted by theweightslist.energy_used: It calculates the energy used for exploration by subtracting theenergy_leftfrom the initial available energy for exploration (availableEnergyForExploration).expected_improvement: This calculates how much the fitness is expected to improve by making the proposed move by subtracting the fitness of the proposed move from the fitness of the current location.
Then, if the ERIThreshold of the employee is not met and the employee is not part of a PCS, the employee initiates an experimentation phase:
By default, it initializes a list
experiment_locationswith the last proposed location.It starts a loop where experiments are conducted as long as there’s energy (experiments_left > 0).
It selects a source node for experimentation. Based on the agent’s
innovativeness, a node beyond the default starting node is selected.It identifies potential nodes for experimentation based on unknown neighbours from the location the agent currently is exploring. Note: The agent cannot learn about edges in unknown layers. Layers can only be discovered when learnt from someone else
It randomly chooses a node from the potential nodes for experimentation.
It performs a fitness estimation for the chosen node and appends it to experiment_locations. Because the employee lacks knowledge about this node, it assumes the fitness is equal (so not worse, nor better) to that of its prior ‘best’ proposal. It essentially ‘bets’ on the fitness of this location being better than its best available alternative.
It adds an edge to the knowledge graph representing the connection between the source node and the chosen node. The employee has learned about the node’s existence, hence add an edge hereto in its knowledge graph.
Finally, several updates are recorded:
The last experiment location becomes the new proposed move.
It calculates the distance between the current location and the proposed move based on knowledge.
It calculates the cost of the move based on the organization’s decision cost function.
Various attributes and reports related to the exploration process are updated, including energy usage, the proposed move, improvement, cost, and more.
The proposal is added to the proposal history.
The total energy used for exploration is deducted from the employee’s total energy.
class Employee(Individual): CONTINUED
def exploreLandscape(self, availableEnergyForExploration, availableEnergyForExperimenting):
energy_left = availableEnergyForExploration
experiments_left = availableEnergyForExperimenting
knowledge = self.knowledge
innovativeness = self.innovativeness
predictionCompetence = self.predictionCompetence
if self.model.p.pcs:
if len(self.PCSes) > 0:
pcs = self.PCSes[0]
knowledge = pcs.assembledKnowledge
innovativeness = pcs.combinedInnovativeness
predictionCompetence = pcs.combinedPredictionCompetence
weights = []
for l_idx in range(self.model.landscape.num_layers):
if knowledge.isLayerActive[l_idx]:
weights.append(self.organisation.beliefSystem.get(l_idx))
else:
weights.append(0)
locations_visited = [self.currentLocation]
potential_locations_to_revisit = [self.currentLocation.get()]
while energy_left > 0:
if len(potential_locations_to_revisit) == 0:
break
source_node = potential_locations_to_revisit[0]
if self.model.random.random() < innovativeness:
source_node = potential_locations_to_revisit[-1]
potential_nodes = []
for layer in knowledge.layers:
neighbors = layer.neighbors(source_node)
for node in neighbors:
if node not in potential_nodes:
potential_nodes.append(node)
if len(potential_nodes) > 0:
potential_new_nodes = [node for node in potential_nodes if node not in map(lambda l: l.get(), locations_visited)]
if len(potential_new_nodes) > 0:
chosen_node = self.model.nprandom.choice(potential_new_nodes)
memory_result = self.memory.search(chosen_node)
if memory_result == None:
fitness_list = self.model.landscape.getFitness(chosen_node)
prediction_error_abs = self.model.nprandom.uniform(0, 1 - predictionCompetence)
prediction_error = prediction_error_abs if self.model.random.random() < 0.5 else -1 * prediction_error_abs
estimated_fitness_list = [f + prediction_error for f in fitness_list]
new_location_memory = IndividualLocationPerception(chosen_node, self.model.landscape.num_layers)
for l_idx in range(len(knowledge.layers)):
if knowledge.isLayerActive[l_idx]:
new_location_memory.recordFitnessEstimation(l_idx, estimated_fitness_list[l_idx])
self.memory.remember(new_location_memory)
locations_visited.append(new_location_memory)
potential_locations_to_revisit.append(new_location_memory.get())
energy_left -= 1
else:
locations_visited.append(memory_result)
potential_locations_to_revisit.append(memory_result.get())
else:
potential_locations_to_revisit.remove(source_node)
else:
potential_locations_to_revisit.remove(source_node)
if energy_left > 0:
energy_left -= 1
proposed_move = max(locations_visited, key=lambda l: l.getFitness(weights))
energy_used = availableEnergyForExploration - energy_left
expected_improvement = proposed_move.getFitness(weights) - self.currentLocation.getFitness(weights)
if len(self.PCSes) == 0:
if expected_improvement / self.currentLocation.getFitness(weights) < self.ERIThreshold:
experiment_locations = [proposed_move]
while experiments_left > 0:
source_node = experiment_locations[0].get()
if self.model.random.random() < self.innovativeness:
source_node = experiment_locations[-1].get()
potential_nodes = []
for l_idx, layer in enumerate(self.knowledge.layers):
neighbors = self.knowledge.unknownNeighbors(source_node, l_idx)
for node in neighbors:
if node not in potential_nodes:
potential_nodes.append(NodeLayer(node, l_idx))
if len(potential_nodes) > 0:
chosen_node = self.model.nprandom.choice(potential_nodes)
node_experiment = IndividualLocationPerception(chosen_node.node, self.model.landscape.num_layers)
node_experiment.estimatedFitness = proposed_move.estimatedFitness
experiment_locations.append(node_experiment)
self.knowledge.addEdge((source_node, chosen_node.node,), chosen_node.layer)
experiments_left -= 1
proposed_move = experiment_locations[-1]
self.report.experimentsConducted = len(experiment_locations[1:])
distance = knowledge.distance(self.currentLocation.get(), proposed_move.get())
cost_of_move = self.organisation.decisionCostFunction(distance)
self.report.proposal.isApproved = False
self.report.energyUsed = energy_used
self.report.proposal.source = self.currentLocation
self.report.proposal.target = proposed_move
self.report.proposal.improvement = expected_improvement
self.report.proposal.cost = cost_of_move
self.proposalHistory.append(self.report.proposal)
self.totalEnergy -= energy_used
The assessExpectedPerformance(self) function computes the Expected Relative Improvement (ERI), which represents the expected improvement in the current proposal relative to the average improvement over a specified window of past proposals.
It loops through the proposal history in reverse, starting from the last proposal (-1) and going back up to
self.proposalAveragingWindow+2.In the first iteration (i == -1), it calculates the
current_expected_improvementbased on the improvement of the last proposal.In subsequent iterations (i != -1), it calculates the sum of improvements over the averaging window (excluding the last proposal) to calculate the
average_improvement.It computes the final
average_improvementby adding 1 and dividing by the number of proposals in the averaging window, effectively averaging the improvements.It calculates
eriby dividingcurrent_expected_improvementbyaverage_improvement.
class Employee(Individual): CONTINUED
def assessExpectedPerformance(self):
current_expected_improvement = 0
average_improvement = 0
for i in range(-1, -1*(self.proposalAveragingWindow+2), -1):
if i == -1:
current_expected_improvement = 1 + self.proposalHistory[i].improvement
else:
average_improvement += self.proposalHistory[i].improvement
average_improvement = 1 + average_improvement / (self.proposalAveragingWindow-1)
eri = current_expected_improvement/average_improvement
return eri
PCS participation
The function participateInPCSes(self, PCSToEvaluate) manages the employee’s continued participation in a PCS. It evaluates conditions for potential PCS disbanding, as well as the possibility of inviting new participants to an existing PCS based on the ERI (obtained from the assessExpectedPerformance function) and the ERI Threshold (i.e. the minimum ERI for the employee in casu).
It first checks the age of the given PCS (
PCSToEvaluate) by calling thecheckAgemethod on it.It then checks if the PCS is active (
PCSToEvaluate.alive).If the PCS is active and has only one member (
len(PCSToEvaluate.members)== 1; i.e. the employee is the owner and the sole participant) and the employee has PCS invitations (len(self.PCSInvitations)> 0), the PCS is disbanded usingPCSToEvaluate.disband().Otherwise, if the PCS has more than one member or the employee does not have PCS invitations, the function proceeds to evaluate if the employee should continue to participate in the PCS.
If the employee is the owner of the PCS (
PCSToEvaluate.owner==self) and there is sufficient history of proposals (len(self.proposalHistory)>self.proposalAveragingWindow), it calculates the expected relative improvement usingself.assessExpectedPerformance().If the expected relative improvement is below the ERI threshold (
expected_relative_improvement<self.ERIThreshold), the PCS invites new participants usingPCSToEvaluate.inviteNewParticipants(self.manager)(see PCS module), passing on the employee’s manager as an argument to enable PCS extension beyond its maximum capacity - if needed. Please note that the function can only assess the ERI if there is enough historical data available.
class Employee(Individual): CONTINUED
def participateInPCSes(self, PCSToEvaluate):
PCSToEvaluate.checkAge()
if PCSToEvaluate.alive:
if len(PCSToEvaluate.members) == 1 and len(self.PCSInvitations) > 0:
PCSToEvaluate.disband()
else:
if PCSToEvaluate.owner == self and len(self.proposalHistory) > self.proposalAveragingWindow:
expected_relative_improvement = self.assessExpectedPerformance()
if expected_relative_improvement < self.ERIThreshold:
PCSToEvaluate.inviteNewParticipants(self.manager)
The actOnPCSInvitations(self) function allows the employee to act on PCS invitations by joining a randomly selected PCS from the available invitations, becoming a member of that PCS, and adding the PCS to their list of PCSes.
It first updates the employee’s PCS invitations by calling the
updatePCSInvitationsmethod.It checks if there are any available PCS invitations (
len(self.PCSInvitations)> 0).If there are available PCS invitations, it randomly selects one PCS invitation to respond to using
self.model.nprandom.choice(self.PCSInvitations).Then, it adds the employee as a member to the selected PCS using
PCSToJoin.addMember(self).Finally, it appends the selected PCS to the employee’s list of PCSes using
self.PCSes.append(PCSToJoin).
class Employee(Individual): CONTINUED
def actOnPCSInvitations(self):
self.updatePCSInvitations()
if len(self.PCSInvitations) > 0:
PCSToJoin = self.model.nprandom.choice(self.PCSInvitations)
PCSToJoin.addMember(self)
self.PCSes.append(PCSToJoin)
The initiatePCS function handles the initiation of a PCS for the employee. If the initiation is successful, it adds the PCS to the employee’s owned and active PCS lists. If the initiation fails (initial invitation count is 0), it cleans up and adjusts the organization’s PCS counter accordingly.
It creates a new instance of the
PCSclass and assigns it to the variableinitiatedPCS.It checks if the
initialInvitationCountof the newly created PCS is equal to 0.If the
initialInvitationCountis 0, it deletes theinitiatedPCSinstance and decrements the organization’s PCS counter (self.organisation.pcsCounter) since the creation was not successful.If the initialInvitationCount is not 0, it proceeds to the next step.
It appends the
initiatedPCSto the employee’s list of owned PCSes (self.ownedPCSes), indicating that the employee owns this PCS. It also appends the initiatedPCS to the employee’s list of active PCSes (self.PCSes), indicating that the employee is a member of this PCS.
class Employee(Individual): CONTINUED
def initiatePCS(self):
initiatedPCS = PCS(self)
if initiatedPCS.initialInvitationCount == 0:
del initiatedPCS
self.organisation.pcsCounter -= 1
else:
self.ownedPCSes.append(initiatedPCS)
self.PCSes.append(initiatedPCS)
Learning
The developCompetencies(self, availableEnergy) function is responsible for the employee’s development of competencies, specifically innovativeness and prediction competence, based on the available energy and the relative importance the employee’s manager has assigned to innovativeness and prediction as competencies for development.
It calculates the amount of energy to allocate for improving innovativeness and prediction competence based on the available energy and personal skill preferences defined in the manager’s policy. This policy is part of a DCS. Note: the
math.floormethod rounds a given number down to the nearest integer that is less than or equal to the given number. It essentially truncates the decimal part of the number.It updates the employee’s innovativeness and prediction competence based on the calculated energy for improving each competency and a competency learning step defined in the model parameters main.py.
It ensures that the innovativeness and prediction competence are capped at 1 (maximum value).
It updates the employee’s development report with the increase in innovativeness and prediction competence, reflecting the energy allocated for improvement multiplied by the
competencyLearningStep.
class Employee(Individual): CONTINUED
def developCompetencies(self, availableEnergy):
energy_for_improving_innovativeness = math.floor(availableEnergy * self.manager.dcs.policy.personalSkillPreferences[0])
energy_for_improving_prediction = math.floor(availableEnergy * self.manager.dcs.policy.personalSkillPreferences[1])
self.innovativeness += energy_for_improving_innovativeness * self.model.p.competencyLearningStep
self.predictionCompetence += energy_for_improving_prediction * self.model.p.competencyLearningStep
if self.innovativeness > 1:
self.innovativeness = 1
if self.predictionCompetence > 1:
self.predictionCompetence = 1
self.report.innovativenessIncrease = energy_for_improving_innovativeness*self.model.p.competencyLearningStep
self.report.predictionCompetenceIncrease = energy_for_improving_prediction*self.model.p.competencyLearningStep
The allocateEnergy(self) function distributes the total energy available to the employee across different aspects of their work, based on specified policy parameters.
It calculates the energy divider based on the reimbursement window (
self.model.p.reimbursementWindow) and the current time step (self.model.t). The remainder of the division is used as the energy divider, and if the remainder is zero, the reimbursement window itself is used.It calculates the energy allocated per time step (
step_energy) by dividing the total energy available (self.totalEnergy) by the energy divider.It calculates and allocates energy for different goals based on the policy parameters defined by the manager as part of a DCS (
self.manager.dcs.policy.goalTermImportancesandself.manager.dcs.policy.learningModePreferences). Energy is allocated for exploration, social learning, competency development, and experimenting, based on the DCS-based weightings and preferences.It returns a tuple containing the allocated energies for exploration, social learning, competency development, and experimenting at the employee level (
self).
class Employee(Individual): CONTINUED
def allocateEnergy(self):
energy_divider = self.model.p.reimbursementWindow % self.model.t
if energy_divider == 0:
energy_divider = self.model.p.reimbursementWindow
step_energy = self.totalEnergy / energy_divider
energy_to_explore = math.ceil(step_energy*self.manager.dcs.policy.goalTermImportances[0])
energy_for_social_learning = math.ceil(step_energy * self.manager.dcs.policy.goalTermImportances[1] * self.manager.dcs.policy.learningModePreferences[0])
energy_for_competency_development = math.ceil(step_energy * self.manager.dcs.policy.goalTermImportances[1] * self.manager.dcs.policy.learningModePreferences[1])
energy_for_experimenting = math.ceil(step_energy * self.manager.dcs.policy.goalTermImportances[1] * self.manager.dcs.policy.learningModePreferences[2])
return (energy_to_explore, energy_for_social_learning, energy_for_competency_development, energy_for_experimenting,)
The receiveProposalApproval(self) function updates various attributes and records in response to a proposal being approved, including updating the current location, recording realized fitness, marking the proposal as approved, and remembering the current location in the employee’s memory.
It calculates weights for each layer in the landscape based on whether the layer is active in the employee’s knowledge. If a layer is active, the weight is set to 1; otherwise, it’s set to 0.
It updates the employee’s current location to the target location of the approved proposal (
self.report.proposal.target). It appends this location to the employee’s path.It sets the isApproved attribute of the proposal in the report to True.
It retrieves the realized fitness for the current location from the landscape using
self.model.landscape.getFitness. It records the realized fitness for each active layer in the employee’s knowledge and the proposal’s target location.It updates the current location in the employee’s memory.
class Employee(Individual): CONTINUED
def receiveProposalApproval(self):
weights = []
for l_idx in range(self.model.landscape.num_layers):
if self.knowledge.isLayerActive[l_idx]:
weights.append(1)
else:
weights.append(0)
self.currentLocation = self.report.proposal.target
self.path.append(self.currentLocation)
self.report.proposal.isApproved = True
realised_fitness_list = self.model.landscape.getFitness(self.currentLocation.get())
for l_idx in range(len(self.knowledge.layers)):
if self.knowledge.isLayerActive[l_idx]:
self.currentLocation.recordRealisedFitness(l_idx, realised_fitness_list[l_idx])
self.proposalHistory[-1].target.recordRealisedFitness(l_idx, realised_fitness_list[l_idx])
self.memory.remember(self.currentLocation)
The learningCostFunction(self) is a method that returns a value representing the learning ‘cost’, i.e. the amount of energy used for learning, as set as a model parameter in main.py.
class Employee(Individual): CONTINUED
def learningCostFunction(self):
return self.model.p.learningCost
The learn(self, availableEnergy) function represents the learning process of an employee, where they attempt to learn knowledge from potential ‘teachers’ based on the available energy and learning cost.
It initializes variables such as
learning_costbased on the learning cost function,available_energybased on the available energy parameter, andnum_edges_learntto keep track of the edges learned.It checks if there is enough energy available for the learning process (i.e. if
available_energy>learning_cost).It gathers potential ‘teachers’ from the employee’s friends and peers in PCSs for potential knowledge sharing.
It randomly selects potential teachers for knowledge sharing based on the specified learning attempts or the available number of potential teachers.
For each potential teacher selected, it checks if the employee and the potential teacher have overlapping knowledge by checking if they are connected.
If they are connected, it identifies shareable edges of knowledge that can be learned from the potential teacher.
It randomly chooses an edge and layer to learn from the shareable edges and adds the learned edge to the employee’s knowledge.
It updates the available energy based on the learning cost and the energy used for learning.
It updates the total energy of the employee.
It returns the number of edges learned (
num_edges_learnt).
class Employee(Individual): CONTINUED
def learn(self, availableEnergy):
learning_cost = self.learningCostFunction()
available_energy = availableEnergy
num_edges_learnt = 0
if available_energy > learning_cost:
pcs_peers = []
if self.model.p.pcs:
if len(self.PCSes) > 0:
for PCS in self.PCSes:
pcs_peers = pcs_peers + PCS.members
pcs_peers.remove(self)
friends_and_pcs_peers = [*self.friends, *pcs_peers]
potential_teachers_to_select = self.model.p.learningAttempts
if len(friends_and_pcs_peers) > 0:
if len(friends_and_pcs_peers) < potential_teachers_to_select:
potential_teachers_to_select = len(friends_and_pcs_peers)
potential_teachers = self.model.random.sample(friends_and_pcs_peers, potential_teachers_to_select)
for potential_teacher in potential_teachers:
if available_energy > learning_cost:
knowledge_list_to_test = [self.knowledge, potential_teacher.knowledge]
is_connected = hasOverlap(knowledge_list_to_test)
if is_connected:
shareable_edges = getShareableKnowledge(self.knowledge, potential_teacher.knowledge, False)
if len(shareable_edges) > 0:
edge_and_layer_to_learn = self.model.random.choice(shareable_edges) that will be learnt; format is [(node start, node end), layer]
edge_to_learn = edge_and_layer_to_learn[0]
layer_to_learn = edge_and_layer_to_learn[1]
self.knowledge.addEdge(edge_to_learn, layer_to_learn)
num_edges_learnt += 1
available_energy -= learning_cost
energy_used = availableEnergy - available_energy
self.totalEnergy -= energy_used
return num_edges_learnt
The updatePCSInvitations(self) function removes disbanded PCSs from the employee’s list of invitations (self.PCSInvitations)
class Employee(Individual): CONTINUED
def updatePCSInvitations(self):
# Remove disbanded PCSes from my invitations
self.PCSInvitations[:] = [PCS for PCS in self.PCSInvitations if PCS.alive]
Timestep (simulation execution)
Lastly, the step(self) function represents a single step in the employee’s actions and interactions within the simulation, including socialization, competency development, landscape exploration, participation in PCS, learning, and reporting. Each step involves updating the employee’s state and gathering data for reporting purposes.
It clears the employee’s report to prepare for new data for this step.
It initiates the socialization process for the employee (
self.socialise).It calculates and allocates energy for different activities, including exploring, social learning, and competency development.
It calls the
developCompetenciesfunction to develop the employee’s competencies.It calls the
exploreLandscapefunction to explore the landscape.If PCS is enabled, it evaluates participation in existing PCSes or initiates a new one based on certain conditions.
It calls the
actOnPCSInvitationsfunction to handle the employee’s response to PCS invitations.It calls the
learnfunction to allow the employee to learn from potential teachers in their network.It records various pieces of information such as organization ID, current location, and PCS logs for the report.
class Employee(Individual): CONTINUED
def step(self):
self.report.clear()
initiatedPCS = None
self.socialise()
energy_to_explore, energy_for_social_learning, energy_for_competency_development, energy_for_experimenting = self.allocateEnergy()
self.developCompetencies(energy_for_competency_development)
self.exploreLandscape(energy_to_explore, energy_for_experimenting)
if self.model.p.pcs:
in_PCS = False
self.updatePCSInvitations()
if len(self.PCSes) > 0:
for PCS_to_evaluate in self.PCSes:
self.participateInPCSes(PCS_to_evaluate)
if PCS_to_evaluate.alive:
in_PCS = True
if not in_PCS:
if len(self.proposalHistory) > self.proposalAveragingWindow:
expected_relative_improvement = self.assessExpectedPerformance()
if expected_relative_improvement < self.ERIThreshold:
if len(self.PCSInvitations) == 0:
self.initiatePCS()
self.actOnPCSInvitations()
self.report.newEdgesLearnt = self.learn(energy_for_social_learning)
self.record('org_id', self.organisation.ceo.id)
self.record('current_location', self.currentLocation.location)
pcs_logs = []
for pcs in self.ownedPCSes:
if len(pcs.members) > 1:
pcs_logs.append(pcs.log())
self.record('pcs_logs', pcs_logs)
organisation.Manager module
Class object definition ManagerReport
This method simply initializes an empty list of proposals.
class ManagerReport:
def __init__(self):
self.proposals = []
Class definition Manager
Similar to the Employee class definition, this section is broken down into the separate functions and methods that define the agent-type Manager class. Also similar to Employee, this class inherits from the superclass Individual.
Setup functions
The setup function serves to setting up the Manager’s initial location in the landscape and establishing attributes for reporting and a diagnostic control system.
It takes
organisationInitialLocationas a parameter, which represents the initial location of the organization.The method first creates a list called
potential_nodes.It iterates over path lengths in the model’s landscape and checks for nodes that are within a certain distance (which is a parameter set in main.py from the initial organization location (
organisationInitialLocation). If a node satisfies the distance condition, it is added to thepotential_nodeslist.It then selects the manager’s initial location randomly from the potential nodes or sets it to the organization’s initial location if no potential nodes are found.
The method calls the setup method of the superclass
Individualand passes the initial location to set up the manager’s initial state.It sets the manager’s agent-type to “manager”.
It creates a report object with an instance of the
ManagerReportclass.It creates a DCS object with an instance of the
DiagnosticControlSystemclass.
class Manager(Individual):
def setup(self, organisationInitialLocation):
potential_nodes = []
for path_length in self.model.landscape.path_length:
for node in self.model.landscape.layers[0].nodes:
if path_length[organisationInitialLocation][node] >= (self.model.p.orgDepInitialDistance - 1) and \
path_length[organisationInitialLocation][node] <= (self.model.p.orgDepInitialDistance + 1):
if node not in potential_nodes:
potential_nodes.append(node)
manager_initial_location = self.model.nprandom.choice(potential_nodes) if len(potential_nodes) > 0 else organisationInitialLocation
super().setup(manager_initial_location)
self.type = "manager"
self.report = ManagerReport()
self.dcs = DiagnosticControlSystem()
The purpose of setupBookkeeping is to create an administrative method within the Manager class where the agent’s (self)place in the organisation is stored.
class Manager(Individual): CONTINUED
def setupBookkeeping(self, ceo, organisation):
self.ceo = ceo
self.organisation = organisation
self.department = Department(self.ceo, self.organisation, self)
self.setupCreateEmployees()
The setupCreateEmployees(self) method is responsible for setting up and creating a specified number of employees for the manager.
It sets the number of employees (
self.numEmployees) to the value specified as a parameter in the model’s main module (self.p.numEmployeesPerDept).It creates a list of employees, specifying the agent-class as Employee. Each employee is initialized with the manager’s initial location (
managerInitialLocation=self.currentLocation.get()).It iterates through each employee and calls the
setupBookkeepingfunction on the employee, passing relevant IDs and information (such as CEO, organization, manager, and department IDs) to the employee.For each employee, it establishes a manager-employee relationship in the social graph by adding an edge with a type of “managerRelation” and a specified color “black”.
It adds each employee to the organization’s list of employees (
self.organisation.employees) and the department’s list of employees (self.department.employees).
class Manager(Individual): CONTINUED
def setupCreateEmployees(self):
self.numEmployees = self.p.numEmployeesPerDept
self.employees = ap.AgentList(self.model, self.numEmployees, Employee, managerInitialLocation=self.currentLocation.get())
for employee in self.employees:
employee.setupBookkeeping(self.ceo, self.organisation, self, self.department) # Pass own IDs to the employee
self.model.socialGraph.add_edge(self, employee, type = "managerRelation", color = 'black')
self.organisation.employees.append(employee)
self.department.employees.append(employee)
Employee Compensation
The employeeRewardFunction function calculates the reward for an employee, taking into account various factors such as energy usage, fitness improvement, competency improvement, learning improvement, and policy parameters. The reward is influenced by the employee’s actions and the organization’s overall fitness improvement.
It starts by setting the reward to the energy used by the employee, obtained from the employee’s report (
employee.report.energyUsed).If the employee’s proposal was approved and they have a path of at least two locations, it calculates the actual fitness improvement (
actual_improvement)based on the fitness of the last two locations in the path and the layer-level priorities (weights) in the organisation as set by the belief system.It calculates the competency improvement and learning improvement based on the policy preferences as set by the manager in the DCS and the employee’s reports on innovativeness increase, prediction competence increase, new edges learnt, and experiments conducted.
It calculates a
multiplierbased on goal term importances, actual fitness improvement, and learning improvement. The multiplier is capped at a maximum value defined by the model parameters (self.model.p.maxBonusMultiplier) as set in the main module.It calculates a bonus as the product of the multiplier and the maximum bonus defined by the model parameters (self.model.p.maxBonus) as set in the model’s main module.
It records the organizational fitness improvement,
actual_improvementby the manager’s employees, and the calculated reward in the Diagnostic Control System.It returns the final reward, which includes the energy used and the bonus based on the calculated multiplier.
class Manager(Individual): CONTINUED
def employeeRewardFunction(self, employee, organisationImprovement):
reward = employee.report.energyUsed
weights = self.organisation.beliefSystem.weights
actual_improvement = 0
if employee.report.proposal.isApproved and len(employee.path) > 1:
actual_improvement = employee.path[-1].getFitness(weights) - employee.path[-2].getFitness(weights)
competence_improvement = employee.report.innovativenessIncrease * self.dcs.policy.personalSkillPreferences[0] + \
employee.report.predictionCompetenceIncrease * self.dcs.policy.personalSkillPreferences[1]
learning_improvement = (self.dcs.policy.learningModePreferences[0] * employee.report.newEdgesLearnt / employee.report.energyUsed \
if employee.report.energyUsed > 0 else 0) + \
(self.dcs.policy.learningModePreferences[1] * competence_improvement) + (self.dcs.policy.learningModePreferences[2]*employee.report.experimentsConducted / employee.report.energyUsed \
if employee.report.energyUsed > 0 else 0)
multiplier = self.dcs.policy.goalTermImportances[0] * actual_improvement + self.dcs.policy.goalTermImportances[1] * learning_improvement
if math.isnan(multiplier):
multiplier = 0
if multiplier > self.model.p.maxBonusMultiplier:
multiplier = self.model.p.maxBonusMultiplier
bonus = math.floor(multiplier * self.model.p.maxBonus)
reward = reward + bonus
self.dcs.recordOrganisationFitnessImprovement(organisationImprovement)
self.dcs.recordFitnessImprovement(actual_improvement)
self.dcs.recordReward(reward)
return reward
The rewardEmployees function processes each employee, calculates their reward based on their performance and the organization’s improvement, and updates their reimbursement accordingly.
It iterates through each employee in the list of employees.
For each employee, it calculates the reward using the
employeeRewardFunction, passing the employee and the organization’s improvement (organisationImprovement) as parameters.It adds the calculated reward to the employee’s reimbursement.
class Manager(Individual): CONTINUED
def rewardEmployees(self, organisationImprovement):
for employee in self.employees:
reward = self.employeeRewardFunction(employee, organisationImprovement)
employee.reimbursement += reward
The reimburseEmployees function is responsible for actually reimbursing employees by adding their rewards to their total energy.
It iterates through each employee in the list of employees.
For each employee, it adds the employee’s reimbursement to their total energy.
After reimbursing the employees, it sets the employee’s reimbursement back to 0 to prepare for the next reward calculation cycle.
class Manager(Individual): CONTINUED
def reimburseEmployees(self):
for employee in self.employees:
employee.totalEnergy += employee.reimbursement
employee.reimbursement = 0
Setting Diagnostic Control System (DCS) Policy
The assessDiagnosticControlSystem(self) function assesses the DCS policy based on reward improvement, departmental fitness improvement, and organizational fitness improvement, and it potentially adjusts the policy to better align with the department’s objectives. Finally, it applies the policy change to the DCS.
It calculates the reward improvement, department fitness improvement, and organizational fitness improvement based on the DCS policy and its history.
It creates a
DepartmentPolicyChangeobject to track the changes to the DCS policy.If there’s a positive reward improvement and the DCS policy history has more than one entry, it retrieves the last policy change.
If there’s no reward improvement or the DCS policy history has only one entry, it generates a new random policy change based on specified gradient steps for various policy parameters.
If the department fitness improvement is less than the organizational fitness improvement, it adjusts the policy change to favor the department’s short-term goal priority.
It applies the determined policy change to the DCS by calling
self.dcs.applyPolicyChange(change).
class Manager(Individual): CONTINUED
def assessDiagnosticControlSystem(self):
reward_improvement = self.dcs.policy.getAverageReward() - self.dcs.policyHistory[-1].getAverageReward()
dep_fitness_improvement = self.dcs.policy.getAverageFitnessImprovement()
org_fitness_improvement = self.dcs.policy.getAverageOrganisationFitnessImprovement()
change = DepartmentPolicyChange()
if reward_improvement > 0 and len(self.dcs.policyHistory) > 1:
change = self.dcs.policyChangelog[-1]
else:
self.dcs.policy = copy.deepcopy(self.dcs.policyHistory[-1])
change = DepartmentPolicyChange()
item_to_update = self.model.random.randint(0, 6)
if item_to_update == 0:
change.goalTermImportances[0] = self.model.p.mcsGradientStep
change.goalTermImportances[1] = -1 * self.model.p.mcsGradientStep
if item_to_update == 1:
change.goalTermImportances[0] = -1 * self.model.p.mcsGradientStep
change.goalTermImportances[1] = self.model.p.mcsGradientStep
if item_to_update == 2:
change.learningModePreferences[0] = self.model.p.mcsGradientStep
change.learningModePreferences[1] = (-1/2) * self.model.p.mcsGradientStep
change.learningModePreferences[2] = (-1/2) * self.model.p.mcsGradientStep
if item_to_update == 3:
change.learningModePreferences[0] = (-1/2) * self.model.p.mcsGradientStep
change.learningModePreferences[1] = self.model.p.mcsGradientStep
change.learningModePreferences[2] = (-1/2) * self.model.p.mcsGradientStep
if item_to_update == 4:
change.learningModePreferences[0] = (-1/2) * self.model.p.mcsGradientStep
change.learningModePreferences[1] = (-1/2) * self.model.p.mcsGradientStep
change.learningModePreferences[2] = self.model.p.mcsGradientStep
if item_to_update == 5:
change.personalSkillPreferences[0] = self.model.p.mcsGradientStep
change.personalSkillPreferences[1] = -1 * self.model.p.mcsGradientStep
if item_to_update == 6:
change.personalSkillPreferences[0] = -1 * self.model.p.mcsGradientStep
change.personalSkillPreferences[1] = self.model.p.mcsGradientStep
if dep_fitness_improvement < org_fitness_improvement:
change.goalTermImportances[0] += self.model.p.mcsGradientStep
change.goalTermImportances[1] += -1 * self.model.p.mcsGradientStep
self.dcs.applyPolicyChange(change)
Timestep (simulation execution)
Lastly, the step(self) function represents a single step in the manager’s actions and interactions within the simulation. It orchestrates the actions for the manager in each time step, including recording employee proposals, reimbursing employees, assessing the DCS if enabled, and recording relevant information for reporting.
It iterates through each employee and appends their proposals to the manager’s report.
If the current time step is a multiple of the reimbursement window defined in the model parameters, as set in the main module (
self.model.p.reimbursementWindow), it calls thereimburseEmployeesmethod to reimburse employees.If the DCS is enabled in the model (
self.model.p.mcs), and the current time step is a multiple of the DCS evaluation window defined in the model parameters (self.model.p.mcsEvaluationWindow), it calls theassessDiagnosticControlSystemmethod to assess and potentially update the DCS policy.It records various pieces of information related to the organization, DCS policy, average reward, and average fitness improvement for reporting purposes.
class Manager(Individual): CONTINUED
def step(self):
for employee in self.employees:
self.report.proposals.append(employee.report.proposal)
if self.model.t % self.model.p.reimbursementWindow == 0:
self.reimburseEmployees()
if self.model.p.mcs:
if self.model.t % self.model.p.mcsEvaluationWindow == 0:
self.assessDiagnosticControlSystem()
self.record('org_id', self.organisation.ceo.id)
self.record('dcs_goal_term_importances', self.dcs.policy.goalTermImportances)
self.record('dcs_learning_mode_preferences', self.dcs.policy.learningModePreferences)
self.record('dcs_personal_skill_preferences', self.dcs.policy.personalSkillPreferences)
self.record('dcs_average_reward', self.dcs.policy.getAverageReward())
self.record('dcs_average_fitness_improvement', self.dcs.policy.getAverageFitnessImprovement())
organisation.CEO module
Class definition CEO
Setup functions
The setup method for the CEO class sets up the initial conditions and state for the CEO and their organization in the simulation. This class inherits from the Individual class.
It randomly selects an initial location for the CEO from the nodes in the first layer of the landscape.
It calls the setup method of the superclass (
Individual) and passes the initial location to set up the CEO’s initial state.It sets the CEO’s type to “CEO”.
It creates an instance of the Organisation class and associates it with the CEO (
self).It creates a specified number of managers (
self.numManagers), specifying the manager class asManager.For each manager, it calls the
setupBookkeepingmethod to set up administrative organisation, passing the CEO and the organization.It establishes a “CEORelation” edge in the social graph between the CEO and each manager, marked in red.
It adds each manager to the organization’s list of managers (
self.organisation.managers).
class CEO(Individual):
def setup(self):
initialLocation = self.model.nprandom.choice(self.model.landscape.layers[0].nodes)
super().setup(initialLocation)
self.type = "CEO"
self.organisation = Organisation(self.model, self)
self.numManagers = self.p.numManagersPerOrg
self.managers = ap.AgentList(self.model, self.numManagers, Manager, organisationInitialLocation=initialLocation)
for manager in self.managers:
manager.setupBookkeeping(self, self.organisation)
self.model.socialGraph.add_edge(self, manager, type = "CEORelation", color = 'red')
self.organisation.managers.append(manager)
Assess Management Control Systems (MCSs)
The assessBoundarySystem function assesses the boundary system of the organization, particularly focusing on the resource margin in comparison to competitors who are performing better.
It checks if there are competitors (i.e. other organisations in the landscape) that are performing better based on some criteria.
If there are competitors performing better, it calculates a weighted margin difference towards the better performing competitors’ boundary systems.
For each competitor, it calculates the difference in resource margin with respect to the organization’s boundary system and weighs it based on the competitor’s position in the list of competitors performing better.
The weighted margin difference is then calculated as the weighted sum of these differences divided by the total weight.
It changes the boundary system towards the resulting gradient, by a pre-determined step defined as a model parameter set in main.py (
self.model.p.mcsGradientStep).
class CEO(Individual): CONTINUED
def assessBoundarySystem(self):
if len(self.organisation.competitorsPerformingBetter) > 0:
weighted_margin_difference = 0
denominator = 0
for i, competitor in enumerate(self.organisation.competitorsPerformingBetter):
weighted_margin_difference += (i+1) * (competitor.boundarySystem.resourceMargin - self.organisation.boundarySystem.resourceMargin)
denominator += i+1
weighted_margin_difference = weighted_margin_difference/denominator
self.organisation.boundarySystem.resourceMargin += self.model.p.mcsGradientStep * weighted_margin_difference
The assessBeliefSystem function assesses the belief system of the organization, particularly focusing on the resource margin in comparison to competitors who are performing better.
It checks if there are competitors (i.e. other organisations in the landscape) that are performing better based on some criteria and appends these to a list of
gradientsFor each competitor performing better, it calculates the gradient difference between the competitor’s belief system weights and the organization’s belief system weights.
For each element in the gradient array, a weighted sum is calculated based on the position of the competitor (indexed by
j) and the corresponding gradient value at that position (gradients[j][i]).The weighted sum
cis calculated by summing up(j+1) * gradients[j][i]for each competitor.The
denominatoris updated by addingj + 1for each competitor, which will be used later to calculate the average.The calculated weighted sum
(c)is appended to thegradient_weighted_averagelist.After iterating through all gradient elements, the code divides each element in the
gradient_weighted_averagelist by the denominator to compute the average.
It calculates a
gradient_weighted_averagefor each belief system weight by considering the gradient differences and weighing them based on the position of the competitor in the list of competitors performing better, as such to approach the ideal belief system for the organisation.It changes the belief system towards the resulting gradient, by a pre-determined step defined as a model parameter set in main.py (
self.model.p.mcsGradientStep).
class CEO(Individual): CONTINUED
def assessBeliefSystem(self):
if len(self.organisation.competitorsPerformingBetter) > 0:
gradients = []
gradient_weighted_average = []
for competitor in self.organisation.competitorsPerformingBetter:
gradients.append(list(map(sub, competitor.beliefSystem.weights, self.organisation.beliefSystem.weights)))
denominator = 0
for i in range(len(gradients[0])):
c = 0
for j in range(len(self.organisation.competitorsPerformingBetter)):
c += (j+1) * gradients[j][i]
denominator += j + 1
gradient_weighted_average.append(c)
gradient_weighted_average = [x/denominator for x in gradient_weighted_average]
for i, w in enumerate(self.organisation.beliefSystem.weights):
self.organisation.beliefSystem.weights[i] = w + gradient_weighted_average[i] * self.model.p.mcsGradientStep
Finally, the assessMCS function involves the execution of the the benchmarking function requestOrganisationRanking and the assessment of the Belief (assessBeliefSystem) and Boundary System (assess.BoundarySystem).
class CEO(Individual): CONTINUED
def assessMCS(self):
self.model.requestOrganisationRanking()
self.assessBeliefSystem()
self.assessBoundarySystem()
Proposal evaluation
The assessProposals function handles the assessment and decision-making process for proposals by the CEO as the ultimate decision-maker in the organisation.
It initializes an empty list
proposalsto store proposals.It iterates through each manager in the
self.managers.For each manager, collect their proposals and append them to the proposals list.
Clear the manager’s report of proposals (
manager.report.proposals = []).
It sorts the proposals based on the expected fitness improvement.
It initializes variables for
decisionCost,currentAbsoluteFitness, andresourceMargin.It obtains the current absolute fitness of the organization (
self.organisation.getAbsoluteFitness()).It calculates the resource margin (i.e. the budget) based on the organization’s resources and a defined margin.
It then iterates through the sorted proposals:
If the proposal’s cost is less than the available resource margin, approve the proposal.
It executes the
employee.receiveProposalApproval()function.Update resource-related variables and decision cost accordingly.
Finally, it calculates the new absolute fitness after the proposals are executed by the employees and updates fitness-related data records in the organization’s history.
It calculates the improvement in realised (i.e. absolute) fitness and determines the reward based on realised cost and fitness improvement.
It updates the organization’s resources based on the calculated reward.
It provides input to each manager for the employee rewards function (
manager.rewardEmployees) with organisational fitnessimprovementas an input variable.
class CEO(Individual): CONTINUED
def assessProposals(self):
proposals = []
for manager in self.managers:
for p in manager.report.proposals:
proposals.append(p)
manager.report.proposals = []
proposals.sort(key=lambda p: p.improvement, reverse=True)
decisionCost = 0
currentAbsoluteFitness = self.organisation.getAbsoluteFitness()
resourceMargin = self.organisation.resources * (1 - self.organisation.boundarySystem.resourceMargin)
for p in proposals:
if resourceMargin > p.cost:
p.employee.receiveProposalApproval()
self.organisation.resources -= p.cost
resourceMargin -= p.cost
decisionCost += p.cost
newAbsoluteFitness = self.organisation.getAbsoluteFitness()
self.organisation.absoluteFitnessHistory.append(newAbsoluteFitness)
self.organisation.fitnessHistory.append(self.organisation.getFitness())
improvement = newAbsoluteFitness - currentAbsoluteFitness
reward = self.organisation.decisionRewardFunction(decisionCost, improvement)
self.organisation.resources += reward
for manager in self.managers:
manager.rewardEmployees(improvement)
Timestep (simulation execution)
Lastly, the step(self) function represents a single step in the CEO’s actions and interactions within the simulation. It orchestrates the execution of the proposal evaluation and decision-making by the CEO, the periodic assessment of the MCSs, the recording of various performance statistics and calls the step function of the PCS module.
It calls the
assessProposalsmethod to assess and make decisions on proposals.It evaluates the MCSs periodically with a frequency that is defined in the main module
It records various performance-related statistics involving e.g. energy, fitness, competence and resources.
It calls the
pcs.step()function for each PCS in the organisation
class CEO(Individual): CONTINUED
def step(self):
self.assessProposals()
if self.model.p.mcs:
if self.model.t % self.model.p.mcsEvaluationWindow == 0:
self.assessMCS()
energy_list = list(map(lambda e: e.totalEnergy, self.organisation.employees))
innovativeness_list = list(map(lambda e: e.innovativeness, self.organisation.employees))
pc_list = list(map(lambda e: e.predictionCompetence, self.organisation.employees))
self.record('fitness', self.organisation.getFitness())
self.record('abs_fitness', self.organisation.getAbsoluteFitness())
self.record('resources', self.organisation.resources)
self.record('avg_energy', sum(energy_list)/len(energy_list))
self.record('avg_innovativeness', sum(innovativeness_list)/len(innovativeness_list))
self.record('avg_prediction_competence', sum(pc_list)/len(pc_list))
self.record('belief_system', copy.deepcopy(self.organisation.beliefSystem).weights)
self.record('boundary_system_resource_margin', self.organisation.boundarySystem.resourceMargin)
self.record('num_pcs', self.organisation.pcsCounter)
for manager in self.managers:
for employee in manager.employees:
for pcs in employee.ownedPCSes:
pcs.step()
organisation.Department module
The class definition Department only involves the initialisation of an instance with 3 arguments: CEO, Organisation, and Manager. It initialises a list of employees as an instance variable and assigns the agent-type “dep” to the class instance. The unique department ID is a combination of the agent-type “dep” and the respective manager ID.
class Department:
def __init__(self, ceo, organisation, manager):
self.ceo = ceo
self.organisation = organisation
self.manager = manager
self.employees = list()
self.type = "dep"
self.id = self.type + str(self.manager.id)
organisation.Organisation module
Class definitions
Instance initialisation
The class definition Organisation initialises a class instance based on the CEO’s in the model. It takes model and ceo as an argument to initialise the organisations in the model.
It assigns “org” as the agent-type to this class
It generates an organisational ID which is composed of “org” and the unique CEO ID.
It generates various empty lists to store its managers, departments and employees.
It set the
pcsCounterto zeroIt generates various empty lists to record fitness-based performance data
It assesses how many competitors (i.e. other organisations in the model) it will benchmark against by generating a random integer value between 1 (inclusive) and the maximum number of competitors to consider (self.model.p.maxCompetitorsToConsider, as set in the main module) using the
randintfunction from therandommodule.It calculates its resource size based on the number of managers per organisation (
numManagersPerOrg), the number of employees per department (numEmployeesPerDept) -both of which are set in the [main module(#main.py)] - and the ‘resource margin’ (resourceMargin) which is a floating point number between 0 and 1, denoting which percentage of the organisational budget may not be spent on decisions [ADD LINK].It initialises a
BeliefSystemby taking the number of landscape layers as an argument, and aBoundarySystemby taking the maxResourceMargin (which is set in the main module as an argument.
class Organisation:
def __init__(self, model, ceo):
self.model = model
self.ceo = ceo
self.type = "org"
self.id = self.type + str(self.ceo.id)
self.managers = list()
self.departments = list()
self.employees = list()
self.pcsCounter = 0
self.fitnessHistory = []
self.absoluteFitnessHistory = []
self.competitorsToConsider = self.model.random.randint(1, self.model.p.maxCompetitorsToConsider)
self.competitorsPerformingBetter = []
self.model.organisations.append(self)
self.resources = (self.model.p.numManagersPerOrg + self.model.p.numEmployeesPerDept) * self.model.p.resourceMargin
self.beliefSystem = BeliefSystem(self.model.landscape.num_layers)
self.boundarySystem = BoundarySystem(self.model.p.maxResourceMargin)
Cost and benefit functions
The cost of executing a proposal to move to a different location in the landscape is equaled to the distance in terms of number of edges.
The decisionRewardFunction takes decisionCost and fitnessImprovement as arguments to calculate the net rewards yielded by moving to a location with a higher fitness.
The getFitness function derives a subjective organisation fitness value based on the weights assigned in the Belief System to each landscape layer. It calculates the fitness for each employee based on their current location’s fitness (using weights from the belief system), aggregates these fitness values, calculates the average fitness, and returns the average as the fitness for the organization:
class Organisation: CONTINUED
def decisionCostFunction(self, distance):
return distance
def decisionRewardFunction(self, decisionCost, fitnessImprovement):
return decisionCost * (1 + fitnessImprovement)
Organisational fitness functions
The getFitness function derives a subjective organisation fitness value based on the weights assigned in the Belief System to each landscape layer. It calculates the fitness for each employee based on their current location’s fitness (using weights from the belief system), aggregates these fitness values, calculates the average fitness, and returns the average as the fitness for the organization:
It initializes a variable named
fitnessto store the cumulative fitness of the employees.It iterates through each employee in the list
self.employees.For each employee, it calls the
getFitnessmethod on the employee’s current location, passing thebeliefSystem.weightsas an argument.It adds the fitness value obtained for each employee to the
fitnessvariable.It divides the total fitness by the number of employees (
len(self.employees)), obtaining the average fitness of the organisation.
def getFitness(self):
fitness = 0
for employee in self.employees:
fitness += employee.currentLocation.getFitness(self.beliefSystem.weights)
fitness = fitness / len(self.employees)
return fitness
Almost similarily, absolute (‘objective’) organisational fitness is calculated by the getAbsoluteFitness function, however with the weights assigned to each layer as set to 1.
The getAverageAbsoluteFitness function then calculates the average absolute fitness over a certain period.
def getAbsoluteFitness(self):
weights = [1 for i in range(self.model.landscape.num_layers)]
fitness = 0
for employee in self.employees:
fitness += employee.currentLocation.getFitness(weights)
fitness = fitness/len(self.employees)
return fitness
def getAverageAbsoluteFitness(self, startPeriod, endPeriod):
result = 0
for f in self.absoluteFitnessHistory[startPeriod:endPeriod]:
result += f
return result/(endPeriod - startPeriod)
Timestep (simulation execution)
The class Organisation does not execute any agent-based actions in the simulation, it merely functions to create entities in which the agents operate.
The step function therefore contains a pass statement which allows the code to be valid syntax without executing any specific actions.
def step(self):
pass
organisation.PCS module
PCS class definition
PCS instance initialisation
The __init__ method serves to construct the PCS class instances, taking the owner as a parameter. In this method, the following attributes are assigned to each class instance:
self.id: It constructs a unique ID for the PCS instance based on the owner’s ID and a timestamp (owner.model.t). The owner is the agent (employee) that initialises the creation of the PCS.self.participants: Initializes an empty list to store participants.self.owner: Assigns the owner passed to the constructor.self.managerOfOwner: Assigns the manager of the owner.self.lifetime: Assigns a random lifetime to the PCS instance using the owner’s model and predefined maximum lifetime (maxLifetime) as set in main.py.self.dateOfBirth: Assigns the creation timestamp of the PCS.self.members: Initializes a list with the owner as the initial member.self.desiredNumMembers: Sets a desired number of members for the PCS instance, randomly chosen within a minimum of 2 and a maximum (maxPCSSize) as set in the main module.self.assembledKnowledge: InitializesassembledKnowledgebased on the owner’sknowledge.self.combinedInnovativeness: InitializescombinedInnovativenessbased on the owner’sinnovativeness.self.combinedPredictionCompetence: SetscombinedPredictionCompetencebased on the owner’spredictionCompetence.self.managerOverrideUsed: Initializes a flag indicating whether a manager override has been used (False).self.proposedMove: Initializes a null value (None) forproposedMove.self.alive: Sets a flag to indicate whether the PCS is active (alive).self.owner.organisation.pcsCounter: Increments a counter for the number of PCS instances associated with the owner’s organization.self.initialInvitationCount: Initializes a count for initial invitations to 0.self.inviteNewParticipants(self.owner): Invites the owner (initial participant) to the PCS.
class PCS:
def __init__(self, owner):
self.id = '{0}_{1}'.format(owner.id, owner.model.t)
self.participants = []
self.owner = owner
self.managerOfOwner = self.owner.manager
self.lifetime = self.owner.model.random.randint(1, self.owner.model.p.maxLifetime)
self.dateOfBirth = owner.model.t
self.members = [owner]
self.desiredNumMembers = self.owner.model.random.randint(2, self.owner.model.p.maxPCSSize)
self.assembledKnowledge = Knowledge(owner.knowledge)
self.combinedInnovativeness = owner.innovativeness
self.combinedPredictionCompetence = owner.predictionCompetence
self.managerOverrideUsed = False
self.proposedMove = None
self.alive = True
self.owner.organisation.pcsCounter += 1
self.initialInvitationCount = 0
self.inviteNewParticipants(self.owner)
PCS Instance development
The addMember function adds new members to the list of PCS members.
class PCS: CONTINUED
def addMember(self, member):
self.members.append(member)
The inviteNewParticipants method manages the invitation of new participants to the PCS, considering various conditions such as capacity, eligibility, and knowledge overlap. It ensures that invitations are sent based on the specified criteria and updates relevant counters.
The method takes
inviteras an argument, which can be both the initialising employee (owner) of the PCS or, under specific conditions, its manager.The method initializes several state variable to keep track of the number of invitations, the potential size of the PCS, the number of spots available in the PCS, and a boolean indicating whether the capacity of the PCS has been extended by the owner’s manager.
It checks if the
inviteris the manager of the owner and the PCS has reached its desired capacity. If true, it allows for a manager override by reserving one spot for a new member.It determines
eligible_inviteesbased on the type ofinviter(either the manager or the owner). Eligible invitees are individuals from specific lists (deptColleaguesList,orgColleaguesList, andfriends) associated with the inviter.It checks if the eligible invitees are of type “employee” and adds them to the list of eligible invitees (
eligible_invitees).It invites eligible invitees by checking if they are not the
inviterand not already amemberof the PCS.It checks if there is a knowledge overlap between the inviter and the eligible invitee using a function
hasOverlap.If there’s an overlap, an invitation is sent to the eligible invitee.
It updates counters for potential members and invitation count accordingly.
It handles the case where a manager override was used to ensure only one additional member is invited in this case.
It sets the
initialInvitationCountattribute of the PCS to the final invitation count.
class PCS: CONTINUED
def inviteNewParticipants(self, inviter):
invitation_count = 0
num_potential_members = len(self.members)
num_spots_for_new_members = self.desiredNumMembers - num_potential_members
manager_override_capacity = False
if inviter == self.managerOfOwner and num_spots_for_new_members <= 0 and not self.managerOverrideUsed:
manager_override_capacity = True
self.managerOverrideUsed = True
num_spots_for_new_members = 1
if num_potential_members < self.desiredNumMembers or manager_override_capacity:
eligible_invitees = []
eligible_invitees_all_types = []
if inviter == self.managerOfOwner:
eligible_invitees_all_types = [*self.managerOfOwner.deptColleaguesList, *self.managerOfOwner.orgColleaguesList, *self.managerOfOwner.friends]
for eligible_invitee in eligible_invitees_all_types:
if eligible_invitee.type == 'employee':
eligible_invitees.append(eligible_invitee)
elif inviter == self.owner:
eligible_invitees_all_types = [*self.owner.deptColleaguesList, *self.owner.orgColleaguesList, *self.owner.friends]
for eligible_invitee in eligible_invitees_all_types:
if eligible_invitee.type == 'employee':
eligible_invitees.append(eligible_invitee)
if len(eligible_invitees) > 0:
selected_invitees = eligible_invitees
for eligible_invitee in selected_invitees:
if eligible_invitee != inviter and eligible_invitee not in self.members:
knowledge_list_to_test = [self.owner.knowledge, eligible_invitee.knowledge]
is_connected = hasOverlap(knowledge_list_to_test)
if is_connected:
eligible_invitee.PCSInvitations.append(
self)
num_potential_members += 1
invitation_count += 1
if manager_override_capacity:
manager_override_capacity = False
break
if invitation_count >= num_spots_for_new_members:
break
self.initialInvitationCount = invitation_count
The disband method executes the dissolvement of a PCS by removing references to the PCS from the member and owner’s lists, and marking the PCS as not alive. Additionally, it updates the PCS count associated with the owner’s organization.
It uses a for loop to iterate over each member in the
self.memberslist, which contains all the members of the PCS.For each member, it removes the PCS from their
PCSeslist.If the owner of the PCS is equal to the current member being processed, it also removes the PCS from the member’s
ownedPCSeslist.It sets the
aliveattribute of the PCS instance toFalse. This action flags that the PCS is now considered inactive.It decrements the
pcsCounterattribute of the owner’s organization.
class PCS: CONTINUED
def disband(self):
for member in self.members:
member.PCSes.remove(self)
if self.owner == member:
member.ownedPCSes.remove(self)
self.alive = False
self.owner.organisation.pcsCounter -= 1
The log function provides a simple way to retrieve and log specific information about the PCS, including the number of members and the PCS’s lifetime.
class PCS: CONTINUED
def log(self):
return [len(self.members), self.lifetime]
Timestep (simulation execution)
The step method function represents a single step within the PCS’s actions and interactions within the simulation. It orchestrates the collective assemblance of knowledge and compentency development with regards to innovativeness and prediction.
It calculates the
assembledKnowledgeattribute of the PCS. This attribute is updated by taking theunionof the knowledge of all members in theself.memberslist. It uses theunionfunction to perform this operation, which combines the knowledge from all members into a single knowledge structure associated with the PCS (self.assembledKnowledge).It calculates the
combinedInnovativenessattribute by:Creating a list called
innovativeness_listcontaining the innovativeness values of all members.Calculating the sum of the innovativeness values in this list.
Dividing the sum by the number of members (
len(innovativeness_list)) to compute the average innovativeness among the members.The result is then assigned to the
self.combinedInnovativenessattribute of the PCS.
Similarly to the
combinedInnovativenesscalculation, it calculates thecombinedPredictionCompetenceattribute.
class PCS: CONTINUED
def step(self):
self.assembledKnowledge = union([m.knowledge for m in self.members]) #
innovativeness_list = list(map(lambda m: m.innovativeness, self.members))
self.combinedInnovativeness = sum(innovativeness_list)/len(innovativeness_list)
pc_list = list(map(lambda m: m.predictionCompetence, self.members))
self.combinedPredictionCompetence = sum(pc_list)/len(pc_list)
Finally, the checkAge function verifies how long the PCS has been operating and whether it exceeds the maximum lifetime of a PCS (as parametrised in the main module). If it exceeds this maximum, it is disbanded.
class PCS: CONTINUED
def checkAge(self):
if self.owner.model.t - self.dateOfBirth >= self.lifetime:
self.disband()