Commit 89e4a1ab authored by m!nus's avatar m!nus

small changes for statscollector

parent 75647c7c
from .teeworlds import create_masters, query_masters, query_masters_servercount
from .teeworlds import create_masters, query_masters_serverlist, query_masters_servercount
from .base import EventSocket
from .master import MasterServer
from .server import Server
......
import operator
import sys
import socket
import select
......@@ -10,6 +11,7 @@ import logging
logging.basicConfig(format="[%(asctime)s] %(levelname)s: %(funcName)s: %(message)s", level=logging.DEBUG)
L = logging.getLogger(__name__)
def get_address(host, port=8303, family=0):
try:
info = socket.getaddrinfo(host, port, family, socket.SOCK_DGRAM)
......@@ -63,15 +65,17 @@ class Address(object):
def __lt__(self, other):
return self.address_tuple() < other.address_tuple()
def __eq__(self, other):
return self.address_tuple() == other.address_tuple()
def __str__(self):
if self.family == socket.AF_INET6:
return "[{host}]:{port}".format(self.host, self.port)
return "{host}:{port}".format(host=self.host, port=self.port)
class Request(object):
class Request:
"""network communication data wrapper"""
address = None
def sent(self):
"""callback: request has been processed and sent"""
......@@ -79,25 +83,36 @@ class Request(object):
def response_received(self, data):
"""callback: response received
return True if more data is expected"""
@return: True if more data is expected
"""
return False
def get_address(self):
"""must return destination address, an Address object"""
return self.address
@property
def address(self):
"""must return destination address, an Address object
@rtype: Address
"""
return self._address
@address.setter
def address(self, address):
assert isinstance(address, Address)
self._address = address
def get_data(self):
"""must return data to be sent"""
raise NotImplementedError()
def __str__(self):
return "<Request to {} with {} bytes>".format(self.get_address(), len(self.get_data()))
return "<Request to {} with {} bytes>".format(self.address, len(self.get_data()))
class EventSocket(object):
class EventSocket:
"""handles low-level network communication, supports queuing"""
def __init__(self, packets_per_second=200, idle_limit=10):
self._max_packets_per_second = packets_per_second
def __init__(self, max_packet_rate=200, idle_limit=10):
self._max_packet_rate = max_packet_rate
self._idle_counter = 0
self._idle_limit = idle_limit
self._packet_rate = 0
......@@ -110,21 +125,28 @@ class EventSocket(object):
self.has_ipv6 = socket.has_ipv6
if self.has_ipv6:
self._sockets[socket.AF_INET6] = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.SOL_UDP)
self._later = []
def _packet_rate_update(self):
"""Update the rate limiting counter"""
cur_time = time()
diff = cur_time - self._packet_rate_last_update
self._packet_rate_last_update = cur_time
self._packet_rate -= diff*self._max_packets_per_second
self._packet_rate -= diff*self._max_packet_rate
if self._packet_rate < 0:
self._packet_rate = 0
def _select(self):
"""select wrapper raising exception on timeout"""
timeout = 1.0/self._max_packets_per_second
timeout = 1.0/self._max_packet_rate
if self._queue.empty():
timeout = 1.0
if len(self._later) > 0:
how_soon = self._later[0][0] - time()
if how_soon < timeout:
timeout = how_soon
if timeout < 0:
timeout = 0
ret = select.select(self._sockets.values(), [], [], timeout)
#L.debug("selected: r={} w={} x={}".format(*ret))
#if ret == ([], [], []):
......@@ -136,7 +158,7 @@ class EventSocket(object):
def _send(self, request):
"""Actually send request"""
socket_type = socket.AF_INET
addr = request.get_address()
addr = request.address
if addr.family == socket.AF_INET6:
if not self.has_ipv6:
raise socket.error("Cannot send IPv6 packet without IPv6 socket")
......@@ -149,16 +171,29 @@ class EventSocket(object):
return length
def send(self, request):
"""Queue Request for sending"""
assert(isinstance(request, Request))
"""Queue Request for sending
@type request: Request"""
assert isinstance(request, Request)
self._queue.put(request)
def call_later(self, delay, callback):
when = time() + delay
self._later.append((when, callback))
self._later.sort(key=operator.itemgetter(0))
def run(self):
"""Main loop"""
while True:
now = time()
to_call = filter(lambda v: v[0] <= now, self._later)
for when, callback in to_call:
L.debug("Calling {} (expected at {}, now is {})".format(callback, when, now))
callback()
self._later.remove((when, callback))
self._packet_rate_update()
if not self._queue.empty() and self._packet_rate < self._max_packets_per_second:
if not self._queue.empty() and self._packet_rate < self._max_packet_rate:
request = self._queue.get()
data = request.get_data()
try:
......@@ -183,7 +218,7 @@ class EventSocket(object):
L.warning("Nothing sent to {} but received response".format(address))
self._requests[address].difference_update(rem)
if not r and self._queue.empty():
if not r and self._queue.empty() and len(self._later) == 0:
self._idle_counter += 1
remaining_requests = 0
for requestlist in self._requests.values():
......
......@@ -8,6 +8,8 @@ from .server import *
class MasterServer(object):
server_factory = Server
class Count(Request):
packet_count_request = 10*b'\xff' + b'cou2'
packet_count_response = 10*b'\xff' + b'siz2'
......@@ -28,7 +30,7 @@ class MasterServer(object):
if len(data) <= len(self.packet_count_response) or not data.startswith(self.packet_count_response):
return True # that's not it, wait for more
self.latency = time() - self.time_sent
L.debug("received count response from {} in {} seconds".format(self.get_address(), self.latency))
L.debug("received count response from {} in {} seconds".format(self.address, self.latency))
self.data_cb(data[len(self.packet_count_response):])
class List(Request):
......@@ -52,11 +54,13 @@ class MasterServer(object):
if len(data) <= len(self.packet_list_response) or not data.startswith(self.packet_list_response):
return True # that's not it, wait for more
self.latency = time() - self.time_sent
L.debug("received list response from {} in {} seconds".format(self.get_address(), self.latency))
L.debug("received list response from {} in {} seconds".format(self.address, self.latency))
self.data_cb(data[len(self.packet_list_response):])
return True # needs more data - eventually
def __init__(self, socket, address, name=None):
def __init__(self, socket, address, name=None, server_factory=None):
if server_factory:
self.server_factory = server_factory
self._socket = socket
self.address = address
self.name = name or "None"
......@@ -81,9 +85,10 @@ class MasterServer(object):
if len(data) % self.List.serveraddr_size != 0:
L.warning("Serverlist data length is not a multiple of {}".format(self.List.serveraddr_size))
for address in listdata2addresslist(data):
server = Server(self._socket, address, master=self)
self.serverlist.append(server)
self.on_server_add(server)
server = self.server_factory(self._socket, address, master=self)
if server not in self.serverlist:
self.serverlist.append(server)
self.on_server_add(server)
def _count_response(self, data):
self.count = unpack('!H', data[0:2])[0]
......
......@@ -41,9 +41,10 @@ class Server(object):
self.data_cb(pieces)
def __init__(self, socket, address, master=None):
assert isinstance(address, Address)
self.address = address
self._socket = socket
self.master = master
self._master = master
self.data = None
self.reset()
......@@ -56,7 +57,7 @@ class Server(object):
self.map = None
self.gametype = None
self.password = None
self.players = None
self.num_players = None
self.max_players = None
self.clients = None
self.max_clients = None
......@@ -70,19 +71,19 @@ class Server(object):
self.latency = self._request.latency
it = iter(pieces)
try:
self.version = next(it) #.decode('utf8')
self.name = next(it) #.decode('utf8')
self.map = next(it) #.decode('utf8')
self.gametype = next(it) #.decode('utf8')
self.version = next(it).decode('utf8')
self.name = next(it).decode('utf8')
self.map = next(it).decode('utf8')
self.gametype = next(it).decode('utf8')
self.password = (next(it)=='1')
self.players = int(next(it))
self.num_players = int(next(it))
self.max_players = int(next(it))
self.clients = int(next(it))
self.max_clients = int(next(it))
for _ in range(self.clients):
player = Player()
player.name=next(it) #.decode('utf8')
player.clan=next(it) #.decode('utf8')
player.name=next(it).decode('utf8')
player.clan=next(it).decode('utf8')
player.country = int(next(it))
player.score = int(next(it))
player.playing = (next(it)=='1')
......@@ -96,20 +97,22 @@ class Server(object):
L.warning('unexpected end of data for server {}'.format(self))
except Exception:
L.debug(repr(pieces))
raise
self.on_info_receive(self)
return # TODO: do not ignore server
#raise
self.on_info_received()
def __eq__(self, other):
return self.address == other.address
def __str__(self):
return "<Server name='{name}' address='{address}' master='{master}'>".format(**self.__dict__)
"""
Events
replace with your own code or subclass
Example: server.on_info_receive = lambda server: print(server)
"""
def on_info_receive(self, server):
"""Info has been received"""
# TODO: this basically gets the server passed twice
def on_info_received(self):
"""
Called when the requested info was received
"""
pass
......@@ -26,18 +26,18 @@ from .base import get_address
from .master import MasterServer
def create_masters(eventsocket, hostnames=["master{}.teeworlds.com".format(i+1) for i in range(4)]):
def create_masters(eventsocket, hostnames=["master{}.teeworlds.com".format(i+1) for i in range(4)], master_factory=MasterServer):
masterlist = []
for mastername in hostnames:
# resolves host and picks the first address
master_addr = get_address(mastername, port=8300)
if master_addr:
L.debug("requesting {} ({})".format(mastername, master_addr))
master = MasterServer(eventsocket, master_addr, mastername.partition(".")[0])
master = master_factory(eventsocket, master_addr, mastername.partition(".")[0])
masterlist.append(master)
return masterlist
def query_masters(masterlist):
def query_masters_serverlist(masterlist):
for master in masterlist:
master.request_list()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment