diff --git a/spider_cms.py b/spider_cms.py index b3af896..ea531e5 100755 --- a/spider_cms.py +++ b/spider_cms.py @@ -98,6 +98,16 @@ def main(): parser.add_argument( "--feed_amq", action="store_true", dest="feed_amq", help="Feed to CERN AMQ" ) + parser.add_argument( + "--feed_vm", action="store_true", dest="feed_vm", help="Feed to CMS Monitoring VictoriaMetrics" + ) + parser.add_argument( + "--vm_attrs", + default="", + type=str, + dest="vm_attrs", + help="Comma separated list of attributes to store in VM" + ) parser.add_argument( "--schedd_filter", @@ -210,6 +220,14 @@ def main(): "[default: %(default)s]" ), ) + parser.add_argument( + "--vm_url", + default="http://cms-monitoring.cern.ch:30422", + type=str, + dest="vm_url", + help="CMS Monitoring VM url to be used " + "[default: %(default)s]", + ) parser.add_argument( "--log_dir", default="log/", diff --git a/src/htcondor_es/history.py b/src/htcondor_es/history.py index cf756dc..888c893 100755 --- a/src/htcondor_es/history.py +++ b/src/htcondor_es/history.py @@ -14,6 +14,7 @@ import elasticsearch import htcondor_es.es +import htcondor_es.vm import htcondor_es.amq from htcondor_es.utils import send_email_alert, time_remaining, TIMEOUT_MINS from htcondor_es.convert_to_json import convert_to_json @@ -112,6 +113,13 @@ def process_schedd( for id_, dict_ad in ad_list ] htcondor_es.amq.post_ads(data_for_amq, metadata=metadata) + if args.feed_vm: + data = [ + (id_, convert_dates_to_millisecs(dict_ad)) + for id_, dict_ad in ad_list + ] + vm_attrs = [a.strip() for a in args.vm_attrs.split(',')] + htcondor_es.vm.post_ads(args.vm_url, data, metadata=metadata, vm_attrs=vm_attrs) logging.debug( "...posting %d ads from %s (process_schedd)", diff --git a/src/htcondor_es/queues.py b/src/htcondor_es/queues.py index 4642a10..40d9b97 100755 --- a/src/htcondor_es/queues.py +++ b/src/htcondor_es/queues.py @@ -12,6 +12,7 @@ import htcondor import htcondor_es.es +import htcondor_es.vm import htcondor_es.amq from htcondor_es.utils import send_email_alert, time_remaining, TIMEOUT_MINS from htcondor_es.convert_to_json import convert_to_json @@ -317,6 +318,17 @@ def _callback_amq(result): ) futures.append(("UPLOADER_AMQ", future)) + if args.feed_vm and not args.read_only: + data_bunch = [ + (id_, convert_dates_to_millisecs(dict_ad)) for id_, dict_ad in bunch + ] + vm_attrs = [a.strip() for a in args.vm_attrs.split(',')] + future = upload_pool.apply_async( + htcondor_es.vm.post_ads, + args=(args.vm_url, data_bunch, metadata, vm_attrs) + ) + futures.append(("UPLOADER_VM", future)) + logging.info("Starting new uploader, %d items in queue" % output_queue.qsize()) listener.join() diff --git a/src/htcondor_es/vm.py b/src/htcondor_es/vm.py new file mode 100644 index 0000000..b5c2d87 --- /dev/null +++ b/src/htcondor_es/vm.py @@ -0,0 +1,34 @@ +import os +import time +import logging +import requests + + +def filter_attrs(ads, attrs=None): + "Filter attributes from given class ad" + doc = {} + if attrs: + if isinstance(attrs, str): + attrs = [attrs] + for attr in attrs: + if attr in ads: + doc[attr] = ads.get(attr) + return doc + # default mode + return doc + +def post_ads(url, ads, metadata=None, vm_attrs=None): + "Post classAds docs into VM url" + if not len(ads): + logging.warning("No new documents found") + return + + starttime = time.time() + failed = [] + data = [filter_attrs(ad, vm_attrs) for _, ad in ads] + req = requests.post(url, data) + if req.status_code != 200: + failed = data + logging.warning("Fail to send to {}, reason {}".format(url, req.reason)) + elapsed = time.time() - starttime + return (len(ads) - len(failed), len(ads), elapsed)