File SKD.java¶
Go to the documentation of this file
package skydata.internal.agents;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import jade.core.AID;
import jade.core.ContainerID;
import jade.core.Location;
import jade.core.behaviours.CyclicBehaviour;
import jade.core.behaviours.TickerBehaviour;
import jade.lang.acl.ACLMessage;
import jade.lang.acl.MessageTemplate;
import skydata.internal.behaviours.RegularlyUpdateKnowledge;
import skydata.internal.behaviours.SKAgentBehaviour;
import skydata.internal.behaviours.UpdateFamilyKnowledge;
import skydata.internal.message.NBroadcast;
import skydata.internal.message.RSBroadcast;
import skydata.internal.message.SKAID;
import skydata.internal.message.SKLMessage;
public class SKD extends SKAgent {
public int lastReplicateIndex = 0;
protected int nbBlockingReplicate = 0;
public Set<SKAID> deletedFamilyMembers = new HashSet<>();
public Set<SKAID> crashedFamilyMembers = new HashSet<>();
protected Set<SKAID> family = new HashSet<>();
protected HashMap<SKAID, LocalDateTime> suspectedFamily = new HashMap<>();
private String cloningContainerName;
private String cloningContainerAddress;
Map<SKAID, LocalDateTime> lastReceived;
long failureDetectionAfter;
Random rand = new Random();
// -----------------------------------------------------------------------------------
// Initialisation
// -----------------------------------------------------------------------------------
public SKD() {
super();
}
public Map<SKAID, LocalDateTime> createLastReceived(Set<SKAID> family) {
Map<SKAID, LocalDateTime> lastR = new HashMap<>();
for (SKAID member : family) {
if (!member.equals(getSKAID()))
lastR.put(member, LocalDateTime.now());
}
return lastR;
}
@SuppressWarnings("unchecked")
@Override
protected void setup() {
super.setup();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected void reset() {
super.reset();
this.nbBlockingReplicate = 0;
this.lastReplicateIndex = 0;
lastReceived = createLastReceived(family);
failureDetectionAfter = (Integer) (this.args.get("failureDetectionAfter"));
this.suspectedFamily.clear();
setupStats();
setupFamily();
}
public boolean canReplicate() {
return nbBlockingReplicate == 0;
}
public void blockReplicate() {
nbBlockingReplicate++;
}
public void unblockReplicate() {
if (nbBlockingReplicate > 0)
nbBlockingMigrate--;
}
@Override
public SKLMessage skReceive(MessageTemplate mt) {
SKLMessage msg = super.skReceive(mt);
if (msg != null) {
SKAID sender = msg.getSender();
if (family.contains(sender)) {
lastReceived.put(sender, LocalDateTime.now());
this.unsuspectMemberFamily(sender);
}
}
return msg;
}
// -----------------------------------------------------------------------------------
// Manage family
// -----------------------------------------------------------------------------------
public String getFamilyString() {
ArrayList<SKAID> familyNames = new ArrayList<>(getFamily());
ArrayList<String> memberNames = new ArrayList<>();
if (getFamilySize() > 0) {
for (int i = 0; i < getFamilySize(); i++) {
String memberAddress = familyNames.get(i).getAddress();
memberNames.add("(" + familyNames.get(i).getName() + " : "
+ memberAddress + ")");
}
}
return memberNames.toString();
}
public Set<SKAID> getFamilySameHarbour() {
Set<SKAID> membersSameHarbour = new HashSet<>();
String familyName = getName().split("-")[0];
for (SKAID m : membersHarbour) {
if (m.getName().split("-")[0].equals(familyName)) {
membersSameHarbour.add(m);
}
}
return membersSameHarbour;
}
public void updatePositionFamily(SKAID m) {
for (SKAID member : family) {
if (member.getName().equals(m.getName()) && member.getNbMigration() <= m.getNbMigration()) {
member.update(m.getAddress(), m.getNbMigration());
this.internalUpdate("FAMILY_UPDATE", m);
return;
}
}
addFamily(m);
}
public void mergeWithFamily(Set<SKAID> agents) {
int old = this.family.size();
this.family.addAll(agents);
this.family.removeAll(crashedFamilyMembers);
if (this.family.size() != old)
this.internalUpdate("FAMILY_ADDED");
}
public void addFamily(SKAID aid) {
int old = this.family.size();
this.family.add(aid);
if (this.family.size() != old)
this.internalUpdate("FAMILY_ADDED");
}
public void mergeWithDeletedFamilyMembers(Set<SKAID> agents) {
int old = this.family.size();
this.deletedFamilyMembers.addAll(agents);
if (this.family.size() != old)
this.internalUpdate("FAMILY_REMOVED");
}
public Set<SKAID> getFamily() {
return this.family;
}
public Set<SKAID> getDeletedFamilyMembers() {
return this.deletedFamilyMembers;
}
public void removeFromFamily(SKAID aid) {
int old = this.family.size();
this.family.remove(aid);
if (this.family.size() != old)
this.internalUpdate("FAMILY_REMOVED");
}
public void removeFromFamily(Set<SKAID> aids) {
int old = this.family.size();
this.family.removeAll(aids);
if (this.family.size() != old)
this.internalUpdate("FAMILY_REMOVED");
}
public int getFamilySize() {
return this.family.size();
}
public void broadcastFamily(SKLMessage message, Boolean sendReliably) {
if (sendReliably) {
broadcast(new RSBroadcast(), message, family);
} else {
broadcast(new NBroadcast(), message, family);
}
}
public void broadcastFamily(SKLMessage message) {
broadcastFamily(message, true);
}
protected void setupFamily() {
addFamily(getSKAID());
new UpdateFamilyKnowledge(this).action();
addBehaviour(new TickerBehaviour(this, 1000) {
@Override
protected void onTick() {
LocalDateTime now = LocalDateTime.now();
Set<SKAID> toRemove = new HashSet<>();
for (SKAID member : family) {
if (!member.equals(getSKAID()) && suspectedFamily.containsKey(member)) {
if (Duration.between(now, suspectedFamily.get(member)).toSeconds() < -failureDetectionAfter) {
toRemove.add(member);
print("CRASH : " + member.getName());
}
}
}
if (!toRemove.isEmpty()) {
crashedFamilyMembers.addAll(toRemove);
removeFromFamily(toRemove);
}
}
});
SKD agent = this;
MessageTemplate mtUpdate = MessageTemplate.and(
MessageTemplate.MatchProtocol("HARBOUR"),
MessageTemplate.MatchOntology("UPDATE_LIST"));
agent.addBehaviour(new CyclicBehaviour(agent) {
@SuppressWarnings("unchecked")
@Override
public void action() {
SKLMessage acl = agent.skReceive(mtUpdate);
if (acl == null) {
block();
return;
}
membersHarbour.clear();
membersHarbour.addAll((Set<SKAID>) acl.getContent());
}
});
agent.addBehaviour(new TickerBehaviour(agent, 1000) {
public void onTick() {
SKLMessage message = new SKLMessage("FAILURE_DETECTOR", "ALIVE");
agent.broadcastFamily(message, false);
}
});
agent.addInternalUpdate("FAMILY_UPDATE", new SKAgentBehaviour(agent) {
@Override
public void actionWithParameters(Object o) {
SKAID b = (SKAID) o;
lastReceived.put(b, LocalDateTime.now());
}
});
MessageTemplate mtAlive = MessageTemplate.and(
MessageTemplate.MatchProtocol("FAILURE_DETECTOR"),
MessageTemplate.and(
MessageTemplate.MatchOntology("ALIVE"),
MessageTemplate.MatchPerformative(ACLMessage.INFORM)));
agent.addBehaviour(new CyclicBehaviour(agent) {
@Override
public void action() {
SKLMessage message = agent.skReceive(mtAlive);
if (message == null) {
block();
return;
}
lastReceived.put(message.getSender(), LocalDateTime.now());
agent.unsuspectMemberFamily(message.getSender());
}
});
agent.addBehaviour(new TickerBehaviour(agent, 2000) {
public void onTick() {
LocalDateTime now = LocalDateTime.now();
for (SKAID member : family) {
if (!member.equals(agent.getSKAID()) && lastReceived.containsKey(member)) {
if (Duration.between(now, lastReceived.get(member)).toSeconds() < -failureDetectionAfter) {
agent.suspectMemberFamily(member, now);
lastReceived.remove(member);
agent.print("Suspect " + member.getName());
}
}
}
}
});
}
public void suspectMemberFamily(SKAID suspected, LocalDateTime date) {
if (!suspectedFamily.containsKey(suspected)) {
suspectedFamily.put(suspected, date);
}
}
public void unsuspectMemberFamily(SKAID unsuspected) {
if (suspectedFamily.containsKey(unsuspected)) {
print("False suspicion");
this.failureDetectionAfter *= 2;
}
addFamily(unsuspected);
suspectedFamily.remove(unsuspected);
}
@Override
public void afterMove() {
super.afterMove();
String myName = getAID().getName();
for (SKAID member : family) {
if (member.getName().equals(myName)) {
String[] addresses = getAID().getAddressesArray();
member.update(addresses[addresses.length - 1]);
break;
}
}
}
// -----------------------------------------------------------------------------------
// Cloning
// -----------------------------------------------------------------------------------
@Override
public void afterClone() {
super.afterClone();
this.lastReplicateIndex = 0;
this.family = getFamily();
new RegularlyUpdateKnowledge(this).action();
if (this.cloningContainerAddress != null) {
String[] addresses = getAID().getAddressesArray();
if (!addresses[addresses.length - 1].equals(this.cloningContainerAddress)) {
AID remoteAMS = new AID("ams@" + this.cloningContainerName, AID.ISGUID);
remoteAMS.addAddresses(this.cloningContainerAddress);
migrate(remoteAMS, true);
}
}
SKLMessage joinHarbour = new SKLMessage("JOIN_HARBOUR", "HARBOUR");
joinHarbour.addReceiver(myHarbour());
skSendNormal(joinHarbour);
}
public void clone(AID harbour, String name) {
String[] harbourAddresses = harbour.getAddressesArray();
String mtp = harbourAddresses[harbourAddresses.length - 1];
String[] nameSplitted = harbour.getName().split("@");
String containerAddress = nameSplitted[1];
String nameContainer = nameSplitted[0].split("_")[0];
cloningContainerName = containerAddress;
cloningContainerAddress = mtp;
ContainerID cID = new ContainerID();
cID.setName(nameContainer);
cID.setAddress(cloningContainerAddress);
this.doClone(cID, name);
}
@Override
public void doClone(Location loc, String newName) {
this.lastReplicateIndex += 1;
String[] seperators = new String[] { ".", "_" };
SimpleDateFormat sdf = (new SimpleDateFormat("yyyy-MM-dd" + seperators[rand.nextInt(2)] + "HH:mm:ss:SS"));
String autogeneratedId = sdf.format(new Date(Instant.now().toEpochMilli()))
+ String.valueOf(rand.nextInt(100));
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(autogeneratedId.getBytes(StandardCharsets.UTF_8));
String encoded = Base64.getEncoder().encodeToString(hash);
newName = newName + "_" + encoded.substring(0, 5);
} catch (NoSuchAlgorithmException e) {
newName = newName + "_" + autogeneratedId;
}
super.doClone(loc, newName);
SKAID a = new SKAID();
a.update(loc.getAddress(), 0);
a.setName(newName + "@" + this.myHarbour().getName().split("@")[1]);
updatePositionFamily(a);
}
protected String getNextReplicateIndex() {
return String.valueOf(this.lastReplicateIndex + 1);
}
}