vagus/AnnouncementGenerator.py
2022-03-13 01:09:36 +01:00

126 lines
4.4 KiB
Python

import UDPHandler
import UDPMulticastHandler
import TCPHandler
import InstanceRegistry
import Config
import logging
import threading
import time
import struct
changed_cv = threading.Condition()
anything_changed = False
thread = None
logger = None
def initialize():
global logger
logger = logging.getLogger(__name__)
InstanceRegistry.set_change_callback(local_instances_changed_callback)
global thread
thread = GeneratorThread()
thread.start()
return True
def local_instances_changed_callback(cluster):
global anything_changed
logger.debug("in local_instances_changed_callback():")
with changed_cv:
anything_changed = True
changed_cv.notify()
#python's min() returns none on min(...None..)
def min_ignoring_none(a,b):
if a==None:
return b
elif b==None:
return a
else:
return min(a,b)
class GeneratorThread(threading.Thread):
def run(self):
global anything_changed
last_send = time.time()
next_earliest_send = time.time() + Config.announcement_interval_min/1000.0
next_latest_send = time.time() + Config.announcement_interval_max/1000.0
while True:
now = time.time()
if now < next_earliest_send:
#enforce minimum sleep
#logger.debug("enforcing minimum interval")
t = next_earliest_send-now
time.sleep(next_earliest_send-now)
with changed_cv:
while not anything_changed:
now = time.time()
if now>=next_latest_send or anything_changed:
break
changed_cv.wait(next_latest_send-now)
anything_changed = False
self.send_announcements()
now = time.time()
next_earliest_send = now + Config.announcement_interval_min/1000.0
mktaac = self.min_keepalive_timeout_across_all_clusters()
effective_announcement_interval_max = min_ignoring_none(Config.announcement_interval_max,mktaac)
effective_announcement_interval_max = max(Config.announcement_interval_min,effective_announcement_interval_max)
next_latest_send = now + effective_announcement_interval_max/1000.0
if mktaac!=None:
#hackish:
# We want to send out announcements so they are received and processed by other vagus instances
# before any instances time out, but we have no way of knowing the delay from we send to that is done.
#If this hack doesn't work then you have to configure announcement-interval-max
next_latest_send -= 0.050
#todo: do timeouts per cluster
def min_keepalive_timeout_across_all_clusters(self):
m = None
for cluster in InstanceRegistry.get_cluster_list():
m2 = InstanceRegistry.get_local_instance_dict(cluster).get_lowest_keepalive_lifetime()
if m2!=None and m2<Config.announcement_interval_min:
logger.warn("cluster %s has instances with keepalive-lifetime (%s) less than [main]:announcement-interval-min (%s)",cluster,m2,Config.announcement_interval_min)
if m==None or m2<m:
m=m2
return m
def send_announcements(self):
logger.debug("Generating announcements")
for cluster in InstanceRegistry.get_cluster_list():
self.send_cluster_announcement(cluster,InstanceRegistry.get_local_instance_dict(cluster))
def send_cluster_announcement(self,cluster,instance_dict):
logger.debug("Generating announcement for cluster %s",cluster)
message = self.form_announcement_message(cluster,instance_dict)
UDPHandler.send_announce(message)
UDPMulticastHandler.send_announce(message)
TCPHandler.send_announce(message)
def form_announcement_message(self,cluster,instance_dict):
identity_bin = Config.identity.encode("utf-8")
cluster_bin = cluster.encode("utf-8")
instance_information = b""
#struct.pack(...p...) doesn't work (?)
for (k,v) in instance_dict.items():
k = k.encode("utf-8")
instance_information += struct.pack("!B",len(k)) + k
instance_information += struct.pack("!Q",int(v.end_of_life*1000))
if v.extra_info:
extra_info_bin = v.extra_info.encode("utf-8")
instance_information += struct.pack("!B",len(extra_info_bin)) + extra_info_bin
else:
instance_information += struct.pack("!B",0)
announcement_end_of_life = int(time.time() + (Config.announcement_interval_max*2 / 1000.0))
length = 5 + 4 + 1 + 4 + 1+len(identity_bin) + 1+len(cluster_bin) + len(instance_information)
message = b"vagus"
message+= struct.pack("!I",length)
message+= struct.pack("!B",1) #type=announcement
message+= struct.pack("!I",announcement_end_of_life)
message+= struct.pack("!B",len(identity_bin)) + identity_bin
message+= struct.pack("!B",len(cluster_bin)) + cluster_bin
message+= instance_information
return message