File SKAgent.java¶
File List > agents > SKAgent.java
Go to the documentation of this file
package skydata.internal.agents;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import jade.core.AID;
import jade.core.Agent;
import jade.core.ContainerID;
import jade.core.Location;
import jade.core.PlatformID;
import jade.core.behaviours.Behaviour;
import jade.core.behaviours.WakerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import jade.lang.acl.UnreadableException;
import skydata.internal.behaviours.CleanMessagesBuffer;
import skydata.internal.behaviours.ReceiveOnReliableSend;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.behaviours.UpdateStatsHarbour;
import skydata.internal.message.BroadcastI;
import skydata.internal.message.IDGenerator;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;
public class SKAgent extends Agent {
public long startingDate = 0;
public Map<String, Object> args = null;
private HashMap<String, Serializable> behaviourStorage = new HashMap<>();
private HashMap<String, List<SKAgentBehaviour>> internalBehaviour = new HashMap<>();
private ArrayList<Behaviour> myBehaviours = new ArrayList<>();
public Boolean inDebug = false;
protected SKAID myAID;
private HashMap<String, HashMap<String, SKLMessage>> msgInfoBF = new HashMap<>();
private HashMap<String, HashMap<String, Long>> receivedMsgs = new HashMap<>();
protected Boolean inMigration;
private SKAID oldHarbour;
// if the agent cannot migrate, this number is greater than 0
protected int nbBlockingMigrate = 0;
// Stats on the harbours
protected Set<SKAID> harbours;
protected HashMap<SKAID, HashMap<String, Comparable>> harboursStats = new HashMap<SKAID, HashMap<String, Comparable>>();
protected Set<SKAID> membersHarbour = new HashSet<SKAID>();
public int size;
public long getStartingDate() {
return this.startingDate;
}
final public SKAID getSKAID() {
return (SKAID) myAID.clone();
}
final public SKAID getRealSKAID() {
return myAID;
}
public void setMsgInfoBF(HashMap<String, HashMap<String, SKLMessage>> inf) {
this.msgInfoBF = inf;
}
public HashMap<String, HashMap<String, SKLMessage>> getMsgInfoBF() {
return this.msgInfoBF;
}
public HashMap<String, HashMap<String, Long>> getReceivedMsgs() {
return this.receivedMsgs;
}
final public void createStorageObject(String property, Serializable value) {
behaviourStorage.put(property, value);
}
final public Serializable getStorageObject(String property) {
return behaviourStorage.get(property);
}
final public boolean hasStorageObject(String property) {
return behaviourStorage.containsKey(property);
}
@SuppressWarnings("unchecked")
@Override
protected void setup() {
myAID = new SKAID();
myAID.updateMigration(0);
startingDate = Instant.now().toEpochMilli();
this.args = (Map<String, Object>) getArguments()[0];
this.harbours = (Set<SKAID>) getArguments()[1];
SKAID harbour = myHarbour();
if (harbour != null) {
SKLMessage joinHarbour = new SKLMessage("JOIN_HARBOUR", "HARBOUR");
joinHarbour.addReceiver(myHarbour());
skSendNormal(joinHarbour);
}
reset();
this.internalUpdate("AFTER_REPLICATION");
}
protected void reset() {
this.msgInfoBF.clear();
this.startingDate = Instant.now().toEpochMilli();
String[] address = getAID().getAddressesArray();
myAID.setName(getName());
myAID.updateAddress(address[address.length - 1]);
this.nbBlockingMigrate = 0;
this.size = (Integer) this.args.getOrDefault("size", 0);
if (this.args.get("actions") != null) {
String[] actions = ((String) this.args.get("actions")).split(",");
for (String action : actions) {
run(action.trim());
}
}
new ReceiveOnReliableSend(this).action();
new CleanMessagesBuffer(this).action();
}
final protected void internalUpdate(String type, Object o) {
if (this.internalBehaviour.containsKey(type)) {
for (SKAgentBehaviour b : this.internalBehaviour.get(type)) {
if (o == null) {
b.action();
} else {
b.actionWithParameters(o);
}
}
}
}
final protected void internalUpdate(String type) {
this.internalUpdate(type, null);
}
final public void addInternalUpdate(String type, SKAgentBehaviour b) {
if (!this.internalBehaviour.containsKey(type)) {
this.internalBehaviour.put(type, new ArrayList<>());
}
this.internalBehaviour.get(type).add(b);
}
// -----------------------------------------------------------------------------------
// Migration
// -----------------------------------------------------------------------------------
@Override
public void afterMove() {
super.afterMove();
if (inMigration) {
String[] address = getAID().getAddressesArray();
myAID.update(address[address.length - 1]);
ContainerID cID = new ContainerID();
cID.setName("Harbour");
cID.setAddress("localhost");
inMigration = false;
this.doMove(cID);
} else {
takeDown();
if (oldHarbour != null) {
SKLMessage quit = new SKLMessage("QUIT_HARBOUR", "HARBOUR");
quit.addReceiver(oldHarbour);
quit.setContent(size);
skSendNormal(quit);
oldHarbour = null;
}
SKLMessage joinHarbour = new SKLMessage("JOIN_HARBOUR", "HARBOUR");
joinHarbour.addReceiver(myHarbour());
skSendNormal(joinHarbour);
this.internalUpdate("AFTER_MIGRATION");
}
}
@Override
protected void beforeMove() {
super.beforeMove();
oldHarbour = myHarbour();
}
public void migrate(AID harbour) {
if (canMigrate()) {
this.migrate(harbour, false);
} else {
this.print("I cannot migrate");
}
}
public void migrate(AID harbour, boolean forced) {
if (forced) {
String[] harbourAddresses = harbour.getAddressesArray();
String mtp = harbourAddresses[harbourAddresses.length - 1];
String[] nameSplitted = harbour.getName().split("@");
String containerAddress = nameSplitted[1];
String[] addresses = getAID().getAddressesArray();
if (addresses[addresses.length - 1].equals(mtp)) {
debug("migration local");
this.inMigration = false;
this.takeDown();
ContainerID cID = new ContainerID();
cID.setName(nameSplitted[0].split("_")[0]);
cID.setAddress(mtp);
this.doMove(cID);
} else {
debug("migration distante");
this.inMigration = true;
AID remoteAMS = new AID("ams@" + containerAddress, AID.ISGUID);
remoteAMS.addAddresses(mtp);
PlatformID destination = new PlatformID(remoteAMS);
this.doMove(destination);
}
} else if (canMigrate()) {
this.internalUpdate("WANT_MIGRATE", harbour);
} else {
this.print("I cannot migrate");
}
}
@Override
public void doMove(Location loc) {
super.doMove(loc);
membersHarbour.clear();
}
@Override
public void afterClone() {
super.afterClone();
for (Behaviour b : myBehaviours) {
removeBehaviour(b);
}
internalBehaviour.clear();
myBehaviours.clear();
this.reset();
this.internalUpdate("AFTER_REPLICATION");
}
@Override
public void doClone(Location loc, String newName) {
super.doClone(loc, newName);
}
protected void run(String action) {
try {
Class<?> dynamicClass = Class.forName("skydata.behaviour." + action);
Constructor<?> constructor = dynamicClass.getConstructor(SKAgent.class);
Object instance = constructor.newInstance(this);
Method method = dynamicClass.getMethod("action");
method.invoke(instance);
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException
| InvocationTargetException e) {
e.printStackTrace();
}
}
public void debug(String s) {
if (inDebug)
System.out.println("[" + getName() + "][" + Instant.now().toEpochMilli() + "]" + s);
}
public void print(String s) {
SKAID my = this.myHarbour();
String nameHarbour = my != null ? my.getName() : "Unknown";
System.out.println(
"[" + getName() + "][" + Instant.now().toEpochMilli() + "][" + nameHarbour + "]" + s);
}
public void broadcast(BroadcastI broadCaster, SKLMessage message, Set<SKAID> agents) {
broadCaster.broadcast(this, message, agents);
}
public void skSendReliable(SKLMessage msg) {
msg.setSender(this.getSKAID());
String msg_id = msg.getMessageId();
if (msg_id == null) {
msg.setMessageId(IDGenerator.generateID());
}
if (msg.getNbSent() >= 1) {
msg.setArrivalDate(Instant.now().toEpochMilli());
// Bufferize the message for all receivers
for (SKAID r : msg.getReceivers()) {
HashMap<String, SKLMessage> allMsgs;
if (!msgInfoBF.containsKey(r.getName())) {
allMsgs = new HashMap<>();
msgInfoBF.put(r.getName(), allMsgs);
} else {
allMsgs = msgInfoBF.get(r.getName());
}
allMsgs.put(msg_id, msg);
}
ACLMessage aclMsg = msg.toJade();
this.send(aclMsg);
}
}
public void skSendNormal(SKLMessage msg) {
msg.setSender(this.getSKAID());
ACLMessage aclMsg = msg.toJade();
this.send(aclMsg);
}
public SKLMessage skReceive(MessageTemplate mt) {
ACLMessage acl = this.receive(mt);
if (acl != null) {
try {
SKLMessage skl = (SKLMessage) acl.getContentObject();
// Retransmit all message that is not ACK by the sender
SKAID sender = skl.getSender();
if (msgInfoBF.containsKey(sender.getName())) {
for (SKLMessage msg : msgInfoBF.get(sender.getName()).values()) {
SKAID r = msg.getReceiver(sender.getName());
if (r.getNbMigration() < sender.getNbMigration()
&& !r.getAddress().equals(sender.getAddress())) {
r.update(sender.getAddress(), sender.getNbMigration());
msg.setNbSent(msg.getNbSent() + 1);
msg.setSender(this.getSKAID());
send(msg.toJade(sender));
}
}
}
// Manage sending ACK
if (skl.getMessageId() != null) {
SKLMessage reply = new SKLMessage("ACK", "RELIABLE_SEND");
reply.setContent(skl.getMessageId());
reply.addReceiver(sender);
skSendNormal(reply);
// Bufferize message received
if (!receivedMsgs.containsKey(skl.getSender().getName())) {
receivedMsgs.put(skl.getSender().getName(), new HashMap<>());
}
HashMap<String, Long> receivedFromsender = receivedMsgs.get(skl.getSender().getName());
if (!receivedFromsender.containsKey(skl.getMessageId())) {
receivedFromsender.put(skl.getMessageId(), Instant.now().toEpochMilli());
} else {
return this.skReceive(mt);
}
}
return skl;
} catch (UnreadableException e) {
e.printStackTrace();
}
}
return null;
}
public Set<SKAID> connectedHarbours() {
return harbours;
}
@SuppressWarnings("rawtypes")
public HashMap<SKAID, HashMap<String, Comparable>> getHarboursStats() {
return harboursStats;
}
public SKAID myHarbour() {
try {
String address = this.getSKAID().getAddress();
for (SKAID s : connectedHarbours()) {
if (s.getAddress().equals(address)) {
return s;
}
}
return null;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
protected void setupStats() {
new UpdateStatsHarbour(this).action();
}
public boolean canMigrate() {
return nbBlockingMigrate == 0;
}
public void blockMigrate() {
nbBlockingMigrate++;
}
public void unblockMigrate() {
if (nbBlockingMigrate > 0)
nbBlockingMigrate--;
}
// -----------------------------------------------------------------------------------
// Deletion
// -----------------------------------------------------------------------------------
public void delete() {
SKLMessage quit = new SKLMessage("QUIT_HARBOUR", "HARBOUR");
quit.addReceiver(myHarbour());
quit.setContent(size);
skSendNormal(quit);
doDelete();
print("deletion");
this.internalUpdate("DELETION");
}
// -----------------------------------------------------------------------------------
// Manage Behaviour
// -----------------------------------------------------------------------------------
@Override
public void addBehaviour(Behaviour wakerBehaviour) {
super.addBehaviour(wakerBehaviour);
myBehaviours.add(wakerBehaviour);
}
}