File SKHM.java¶
File List > agents > SKHM.java
Go to the documentation of this file
package skydata.internal.agents;
import jade.core.AID;
import jade.wrapper.AgentContainer;
import jade.wrapper.AgentController;
import java.util.Map;
import java.io.IOException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.HashMap;
import java.util.HashSet;
import jade.core.Runtime;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.MessageTemplate;
import jade.lang.acl.UnreadableException;
import jade.lang.acl.ACLMessage;
import java.util.Set;
import skydata.centralized.CentralizedClient;
import skydata.internal.message.NBroadcast;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
public class SKHM extends SKAgent {
private Map<String, Object> harbour = new HashMap<>();
private int capacity = 0;
private int available;
CentralizedClient centralizedClient = new CentralizedClient();
Set<SKAID> agents = new HashSet<SKAID>();
public SKHM() {
super();
}
@SuppressWarnings({ "unchecked", "unused" })
@Override
protected void setup() {
super.setup();
String harbourName = (String) args.get("name");
String hostAddress = (String) args.get("address");
String params = (String) args.get("params");
Object[] platformParams = (Object[]) args.get("platform");
Runtime rt = Runtime.instance();
try {
ObjectMapper mapper = new ObjectMapper();
this.harbour = mapper.readValue(params, new TypeReference<Map<String, Object>>() {
});
this.capacity = (Integer) this.harbour.get("capacity");
this.available = capacity;
centralizedClient.addTuple(getName(), getAID().getAddressesArray()[0]);
if (((Integer) platformParams[3]) > 0) {
doWait((Integer) platformParams[3]);
}
Set<SKAID> allHarbours = centralizedClient.getTuples();
AgentContainer agc = getContainerController();
Map<String, Object> agents = (Map<String, Object>) this.harbour.get("agents");
if (agents != null) {
Set<String> skdKeys = agents.keySet();
for (String skdKey : skdKeys) {
Map<String, Object> agentInfos = (Map<String, Object>) agents.get(skdKey);
Object[] args = new Object[] { agentInfos, allHarbours };
AgentController agent = agc.createNewAgent(skdKey,
"skydata.internal.agents." + agentInfos.get("class"),
args);
this.available -= (Integer) agentInfos.get("size");
agent.start();
}
}
} catch (Exception e) {
e.printStackTrace();
}
setupManagement("HARBOUR", ACLMessage.INFORM);
setupManagement("REPLICATION", ACLMessage.REQUEST);
setupStats();
setupMembership();
}
private void lock(int size, AID agent) {
available -= size;
}
private void setupManagement(String protocol, int performative) {
SKHM agent = this;
MessageTemplate mtLock = MessageTemplate.and(
MessageTemplate.MatchOntology("LOCK_REQUEST"),
MessageTemplate.and(
MessageTemplate.MatchProtocol(protocol),
MessageTemplate.MatchPerformative(performative)));
this.addBehaviour(new CyclicBehaviour(this) {
@SuppressWarnings("unused")
@Override
public void action() {
SKLMessage acl = agent.skReceive(mtLock);
if (acl == null) {
block();
return;
}
SKLMessage answer = new SKLMessage("LOCK_RESPONSE", protocol);
answer.addReceiver(acl.getSender());
agent.print("lock request");
int m = (int) acl.getContent();
if (m <= available) {
lock(m, acl.getSender().toAID());
answer.setContent(true);
} else {
answer.setContent(false);
}
skSendNormal(answer);
}
});
}
@Override
protected void setupStats() {
SKHM agent = this;
MessageTemplate mtRequest = MessageTemplate.and(
MessageTemplate.MatchOntology("STATS_REQUEST"),
MessageTemplate.and(
MessageTemplate.MatchProtocol("HARBOUR"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
this.addBehaviour(new CyclicBehaviour(this) {
@Override
@SuppressWarnings("all")
public void action() {
SKLMessage acl = agent.skReceive(mtRequest);
if (acl == null) {
block();
return;
}
SKLMessage answer = new SKLMessage("STATS_RESPONSE", "HARBOUR");
String[] valuesName = (String[]) acl.getContent();
HashMap<String, Object> values = new HashMap<String, Object>();
for (String valueName : valuesName) {
Object value;
switch (valueName) {
case "available":
value = available;
break;
case "capacity":
value = capacity;
break;
default:
value = null;
break;
}
values.put(valueName, value);
}
answer.addReceiver(acl.getSender());
answer.setContent((Serializable) values);
skSendNormal(answer);
}
});
}
private void broadcastMembers() {
SKLMessage msg = new SKLMessage("UPDATE_LIST", "HARBOUR");
msg.setContent((Serializable) agents);
this.broadcast(new NBroadcast(), msg, agents);
}
private void setupMembership() {
SKHM agent = this;
MessageTemplate mtQuit = MessageTemplate.and(
MessageTemplate.MatchOntology("QUIT_HARBOUR"),
MessageTemplate.and(
MessageTemplate.MatchProtocol("HARBOUR"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
this.addBehaviour(new CyclicBehaviour(this) {
@Override
public void action() {
SKLMessage acl = agent.skReceive(mtQuit);
if (acl == null) {
block();
return;
}
int size = (Integer) acl.getContent();
available += size;
agents.remove(acl.getSender());
broadcastMembers();
}
});
MessageTemplate mtJoin = MessageTemplate.and(
MessageTemplate.MatchOntology("JOIN_HARBOUR"),
MessageTemplate.and(
MessageTemplate.MatchProtocol("HARBOUR"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
this.addBehaviour(new CyclicBehaviour(this) {
@Override
public void action() {
SKLMessage acl = agent.skReceive(mtJoin);
if (acl == null) {
block();
return;
}
agents.add(acl.getSender());
broadcastMembers();
}
});
}
}