Skip to content
This repository was archived by the owner on Sep 9, 2022. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 81 additions & 56 deletions wavefront_push.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#! /usr/bin/env python
#!/bin/env python

# TODOs:
# - Clean up logging (Propose prefixing all messages with something identifying the plugin)
Expand All @@ -15,6 +15,7 @@
# Import wavefront_push
#
# <Module wavefront_push>
# persist_connection true # default value
# server 10.255.254.255
# port 2878
# prefix collectd
Expand All @@ -23,7 +24,11 @@
# </Module>
# </Plugin>

import Queue, socket, threading, time, re
import Queue
import socket
import threading
import time
import re
from collections import namedtuple

try:
Expand All @@ -32,23 +37,29 @@
# This lets me at least parse the plugin before I feed it to collectd
pass


# Hack!
# collectd < 5.5 does not have get_dataset, this should "mock" get_dataset and have
# approximately the same behavior. ( collectd.get_dataset(typename) -> (name, type, min val, max val)
class CollectdDS:
"""Emulate collectd.get_dataset as it's a relatively "new" addition to collectd-python
CollectDS(filename) - load tyeps.db given by filename
CollectDS.__call__(typename) -> type description (name, type, min max)

drop-in replacement for collectd.get_dataset
"""
ds_value_type = namedtuple("ds_value_type", ["name", "type", "min", "max"])
# Collectd types
DS_TYPE_COUNTER=0
DS_TYPE_GAUGE=1
DS_TYPE_DERIVE=2
DS_TYPE_ABSOLUTE=3
TYPES={ 'DERIVE': DS_TYPE_DERIVE, 'GAUGE': DS_TYPE_GAUGE, 'ABSOLUTE': DS_TYPE_ABSOLUTE, 'COUNTER': DS_TYPE_COUNTER }
DS_TYPE_COUNTER = 0
DS_TYPE_GAUGE = 1
DS_TYPE_DERIVE = 2
DS_TYPE_ABSOLUTE = 3
TYPES = {
'DERIVE': DS_TYPE_DERIVE,
'GAUGE': DS_TYPE_GAUGE,
'ABSOLUTE': DS_TYPE_ABSOLUTE,
'COUNTER': DS_TYPE_COUNTER
}

def __init__(self, ds_file=None):
"""__init__(ds_file)
- ds_file(None) -> Empty types.db
Expand All @@ -57,6 +68,7 @@ def __init__(self, ds_file=None):
self.__typesdb = {}
if ds_file is not None:
self.load_file(ds_file)

def load_file(self, filename):
with open(filename, "r") as file:
for line in file:
Expand All @@ -67,40 +79,49 @@ def load_file(self, filename):
try:
values = []
(ds_name, values_string) = re.split('\s+', line, 1)
for (name, type, min, max) in [ e.split(':') for e in re.split(',\s*', values_string) ]:
for (name, type, min, max) in [e.split(':') for e in re.split(',\s*', values_string)]:
if min == 'U' or min == '':
min = '-inf'
if max == 'U' or max == '':
max = 'inf'
values.append( self.ds_value_type(name, self.TYPES[type], float(min), float(max)) )
values.append(self.ds_value_type(name, self.TYPES[type], float(min), float(max)))
self.__typesdb[ds_name] = values
except Exception, err:
except Exception:
raise

def __call__(self, name):
return self.__typesdb[name]

def clear(self):
self.__typesdb = {}

# FIXME: Global variable, anecodotally this needs to be a reference, "seems to work" sharing between threads
# We can potentially avoid the ambiguity around the threading by passing copies in the callback closures.
CONFIG={'prefix': 'collectd', 'qsize': 10240}
CONFIG = {
'prefix': 'collectd',
'qsize': 10240,
'persist_connection': True
}


def retry_connect(host, port, socket_timeout=15):
"""Retries connection indefinitely
On connect: - Close receiving direction, we don't expect anything nor do we ever read from it.
- Set the timeout on the socket (default %ds), the rationale for this is that if for
some reason send blocks, we don't want to block infinitely.
""" % ( socket_timeout )
""" % (socket_timeout)
while True:
try:
s = socket.create_connection( (socket.gethostbyname(host), port) )
collectd.debug("New connection to %s:%s" % (host, port))
s = socket.create_connection((socket.gethostbyname(host), port))
s.shutdown(socket.SHUT_RD) # We're not receiving anything, close RX stream
s.settimeout(socket_timeout) # We do not want to block indefinitely
return s
except (IOError, socket.error, socket.timeout, socket.gaierror), e:
collectd.error("Failed to connect: " + str(e))
time.sleep(1)


def send_wavefront(host, port, item_queue):
"""send_wavefront(host, port, item_queue) - collectd-wavefront plugin eventloop
connects to host:port, continually checks item_queue for messages and sends them.
Expand All @@ -110,30 +131,22 @@ def send_wavefront(host, port, item_queue):
collectd.info("starting wavefront sender")
connection = None
message = None

while True:
if connection is None:
connection = retry_connect(host, port)

while True:
if message is None:
try:
message = item_queue.get(False)
except Queue.Empty:
time.sleep(1)
if not CONFIG['persist_connection'] and connection is not None:
close_connection(connection)
connection = None
continue


if connection is None:
connection = retry_connect(host, port)

try:
#strip white spaces and quotes for bad plugins
epoch=message.split(' ')[2]
if not re.search("^1[2-9]\d{8}$", epoch):
regex= '(\S+.*)(\s+(\d+\.\d+|\d+)\s+1\d{9}.*)'
m = re.search(regex, message)
if m:
string= m.groups()[0]
string = re.sub(r'[ |"|$|#|\']', '_', string)
remainder= m.groups()[1]
message= string+ remainder+"\n"

# Lazy "send everything", loosing messages is very much a possibility
# we should know that we failed to send "something".
connection.sendall(message)
Expand All @@ -142,14 +155,22 @@ def send_wavefront(host, port, item_queue):
# Lazy error handling; "something" went wrong, let's
# give up and toss the message
collectd.error("Failed to send items:" + str(e))
try:
connection.shutdown(socket.SHUT_WR)
connection.close()
except Exception, e:
collectd.info("wavefront-connection close failed:" + str(e))
pass
close_connection(connection)
connection = None


def close_connection(connection):
if connection is None:
return
try:
collectd.debug("Clossing connection")
connection.shutdown(socket.SHUT_WR)
connection.close()
except Exception, e:
collectd.info("wavefront-connection close failed:" + str(e))
connection = None


# Currently, the following configuration is accepted
#
# Server -> hostname/ip to connect to
Expand All @@ -163,11 +184,13 @@ def send_wavefront(host, port, item_queue):
def configure_callback(conf):
"""Collectd configuration callback.
- Parameter conf: Collectd configuration tree"""
tags={}
tags = {}
CONFIG['prefix'] = 'collectd'
override_types_db = []
for node in conf.children:
config_key = node.key.lower()
if config_key == 'persist_connection':
CONFIG['persist_connection'] = node.values[0].lower() == 'true'
if config_key == 'tag':
try:
(tag, value) = node.values
Expand All @@ -190,20 +213,21 @@ def configure_callback(conf):
override_types_db.append(node.values[0])
else:
collectd.error("config: %s unknown config key" % (config_key))
CONFIG['tags_append'] = " " + " ".join(["%s=%s" % (t,v) for (t,v) in tags.iteritems() ])
CONFIG['tags_append'] = " " + " ".join(["%s=%s" % (t, v) for (t, v) in tags.iteritems()])

if not 'get_dataset' in dir(collectd):
if 'get_dataset' not in dir(collectd):
collectd.info("Trying to shoehorn in collectd types.db")
candidate_file_list = override_types_db + [ '/usr/share/collectd/types.db' ] + [ None ]
candidate_file_list = override_types_db + ['/usr/share/collectd/types.db'] + [None]
for candidate_file in candidate_file_list:
try:
setattr(collectd, 'get_dataset', CollectdDS(candidate_file))
collectd.info("Loaded %s as types.db" % (candidate_file))
break
except Exception, e:
collectd.warning("Tried and failed to load %s: %s" % ( candidate_file, str(e) ))

collectd.info("config(%s,tags='%s'" % ( ",".join(["%s=%s" % (k,v) for (k,v) in CONFIG.iteritems()]), CONFIG['tags_append']))
collectd.warning("Tried and failed to load %s: %s" % (candidate_file, str(e)))

collectd.info("config(%s,tags='%s'" % (",".join(["%s=%s" % (k, v) for (k, v) in CONFIG.iteritems()]), CONFIG['tags_append']))


def init_callback():
"""Collectd initialization callback.
Expand All @@ -212,7 +236,7 @@ def init_callback():
queue_size = CONFIG['qsize']
except KeyError:
queue_size = 1024
CONFIG['queue']=Queue.Queue(queue_size)
CONFIG['queue'] = Queue.Queue(queue_size)
if CONFIG['server'] and CONFIG['port']:
sender = threading.Thread(target=send_wavefront, args=(CONFIG['server'], CONFIG['port'], CONFIG['queue']))
sender.setDaemon(True)
Expand All @@ -221,6 +245,7 @@ def init_callback():
else:
collectd.error("wavefront-forwarder has no destination to send data")


def write_callback(value):
"""Collectd write callback, responsible for formatting and writing messages onto
the send queue"""
Expand Down Expand Up @@ -255,11 +280,11 @@ def write_callback(value):
# collectd.interface-em1.if_octets.tx = 10
# collectd.interface-em1.if_octets.rx = 20
#
metric_name = "%s%s.%s%s" % ( value.plugin,
'.' + value.plugin_instance if len(value.plugin_instance) > 0 else '',
value.type,
'.' + value.type_instance if len(value.type_instance) > 0 else '' )

metric_name = "%s%s.%s%s" % (value.plugin,
'-' + value.plugin_instance if len(value.plugin_instance) > 0 else '',
value.type,
'-' + value.type_instance if len(value.type_instance) > 0 else '')

try:
prefix = CONFIG['prefix']
Expand All @@ -273,17 +298,17 @@ def write_callback(value):
except KeyError, e:
tags_append = ''

append_names = [ '.' + append_name if append_name != 'value' else ''
for (append_name, _, _, _)
in collectd.get_dataset(value.type) ]
append_names = ['.' + append_name if append_name != 'value' else ''
for (append_name, _, _, _)
in collectd.get_dataset(value.type)]

if len(append_names) != len(value.values):
collectd.error("len(ds_names) != len(value.values)")
return
msg = "".join([ "%s %f %d host=%s%s\n" % (prefix + metric_name + postfix, metric_value, value.time, value.host, tags_append)
for (postfix, metric_value)
in zip(append_names, value.values) ])

msg = "".join(["%s %f %d host=%s%s\n" % (prefix + metric_name + postfix, metric_value, value.time, value.host, tags_append)
for (postfix, metric_value)
in zip(append_names, value.values)])
try:
CONFIG['queue'].put(msg, block=False)
except Exception, e:
Expand Down