
# This Starlark processor is used when loading Sparkplug B protobuf #
# messages into InfluxDB.  The data source is a Opto22 Groov EPIC controller.  
#
# This processor does the following:
#   - Resolves the metric name using a numeric alias.
#     When the EPIC MQTT client is started it sends a DBIRTH message 
#     that lists all metrics configured on the controller and includes 
#     a sequential numeric alias to reference it by.  
#     This processor stores that information in the array states["aliases"].

#     When subsequent DDATA messages are published, the numeric alias is 
#     used to find the stored metric name in the array states["aliases"].
#  - Splits the MQTT topic into 5 fields which can be used as tags in InfluxDB.
#  - Splits the metric name into 6 fields which are be used as tags in InfluxDB.
#  - Deletes the host, type, topic, name and alias tags
#
# TODO:
#   The requirement that a DBIRTH message has to be received before DDATA messages
#   can be used creates a significant reliability issue and a debugging mess.
#   I have to go into the Groov EPIC controller and restart the MQTT client every time
#   I restart the telegraf loader.  This has caused many hours of needless frustration.
#   
#   I see two possible solutions:
#      - Opto 22 changes their software making it optional to drop the alias 
#        and simply include the name in the DDATA messages.  In my case it's never more
#        than 15 characters.  This is the simplest and most reliable solution.
#      - Make a system call from telegraf and using SSH to remotely restart the MQTT client.
#      - Have telegraf send a message through MQTT requesting a DBIRTH message from the EPIC Controller.
#
# Example Input:
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 type=9i,value=22.247711,alias=10i 1626475876000000000
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 alias=10i,type=9i,value=22.231323 1626475877000000000
# edge,host=firefly,topic=spBv1.0/SF/DBIRTH/epiclc/Exp501 type=9i,name="Strategy/IO/I_Ch_TC_Right",alias=9i 1626475880000000000
# edge,host=firefly,topic=spBv1.0/SF/DBIRTH/epiclc/Exp501 value=22.200958,name="Strategy/IO/I_Ch_TC_Top_C",type=9i,alias=10i 1626475881000000000
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 alias=10i,type=9i,value=22.177643 1626475884000000000
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 type=9i,value=22.231903,alias=10i 1626475885000000000
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 value=22.165192,alias=10i,type=9i 1626475895000000000
# edge,host=firefly,topic=spBv1.0/SF/DDATA/epiclc/Exp501 alias=10i,type=9i,value=22.127106 1626475896000000000
#
# Example Output:
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DBIRTH,Position=Top,Reactor=SF,Source=Strategy value=22.200958 1626475881000000000
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.177643 1626475884000000000
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.231903 1626475885000000000
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.165192 1626475895000000000
# C,Component=Ch,Datatype=IO,Device=TC,EdgeID=epiclc,Experiment=Exp501,Metric=I_Ch_TC_Top_C,MsgType=DDATA,Position=Top,Reactor=SF,Source=Strategy value=22.127106 1626475896000000000

#############################################
# The following is the telegraf.conf used when calling this processor

# [[inputs.mqtt_consumer]]
#   servers = ["tcp://your_server:1883"]
#   qos = 0
#   connection_timeout = "30s"
#   topics = ["spBv1.0/#"]
#   persistent_session = false
#   client_id = ""
#   username = "your username"
#   password = "your password"
# 
#   # Sparkplug protobuf configuration
#   data_format = "xpath_protobuf"
#    
#   # URL of sparkplug protobuf prototype
#   xpath_protobuf_type = "org.eclipse.tahu.protobuf.Payload"
#    
#   # Location of sparkplug_b.proto file
#   xpath_protobuf_file = "/apps/telegraf/config/sparkplug_b.proto"
# 
#   [[inputs.mqtt_consumer.xpath_protobuf]]
#     metric_selection = "metrics[not(template_value)]"
#     metric_name = "concat('edge', substring-after(name, ' '))"
#     timestamp = "timestamp"
#     timestamp_format = "unix_ms"
#     [inputs.mqtt_consumer.xpath_protobuf.tags]
#       name = "substring-after(name, ' ')"
#     [inputs.mqtt_consumer.xpath_protobuf.fields_int]
#       type = "datatype"
#       alias = "alias"
#     [inputs.mqtt_consumer.xpath_protobuf.fields]
#       # A metric value must be numeric
#       value = "number((int_value | long_value | float_value | double_value | boolean_value))"
#       name = "name"
# 
# # Starlark processor
# [[processors.starlark]]
#   script = "sparkplug.star"
# 
#   # Optionally Define constants used in sparkplug.star
#   # Constants can be defined here or they can be defined in the 
#   # sparkplug_b.star file.
#
#   [processors.starlark.constants]     
#
#     # NOTE: The remaining fields can be specified either here or in the starlark script.
#     
#     # Tags used to identify message type - 3rd field of topic
#     BIRTH_TAG = "BIRTH/"
#     DEATH_TAG = "DEATH/"
#     DATA_TAG = "DATA/"
# 
#     # Number of messages to hold if alias cannot be resolved 
#     MAX_UNRESOLVED = 3
# 
#     # Provide alternate names for the 5 sparkplug topic fields.  
#     # The topic contains 5 fields separated by the '/' character.  
#     # Define the tag name for each of these fields.
#     MSG_FORMAT = "false"        #0
#     GROUP_ID   = "reactor"      #1
#     MSG_TYPE   = "false"        #2
#     EDGE_ID    = "edgeid"       #3
#     DEVICE_ID  = "experiment"   #4
#

BIRTH_TAG = "BIRTH/"
DEATH_TAG = "DEATH/"
DATA_TAG  = "DATA/"
  
# Number of messages to hold if alias cannot be resolved 
MAX_UNRESOLVED = 3

# Provide alternate names for the 5 sparkplug topic fields.  
# The topic contains 5 fields separated by the '/' character.  
# Define the tag name for each of these fields.
MSG_FORMAT = "false"        #0
GROUP_ID   = "Reactor"      #1
MSG_TYPE   = "MsgType"      #2
EDGE_ID    = "EdgeID"       #3
DEVICE_ID  = "Experiment"   #4
 
########### Begin sparkplug.star script


load("logging.star", "log")

state = {
    "aliases":    dict(),
    "devices":    dict(),
    "unresolved": list()
}

def extractTopicTags(metric):
    msg_format   = ''
    groupid      = ''
    msg_type     = ''
    edgeid       = ''
    deviceid     = ''

    topic = metric.tags.get("topic", "");
    fields = topic.split("/");
    nfields = len(fields)
    if nfields > 0: msg_format = fields[0]
    if nfields > 1: groupid    = fields[1]
    if nfields > 2: msg_type   = fields[2]
    if nfields > 3: edgeid     = fields[3]
    if nfields > 4: deviceid   = fields[4]
    return [msg_format, groupid, msg_type, edgeid, deviceid]
   
                
def buildTopicTags(metric, topicFields):
    # Remove topic and host tags - they are not useful for analysis
    metric.tags.pop("topic")
    metric.tags.pop("host")

    if MSG_FORMAT != "false": metric.tags[MSG_FORMAT] = topicFields[0] 
    if GROUP_ID   != "false": metric.tags[GROUP_ID]   = topicFields[1] 
    if MSG_TYPE   != "false": metric.tags[MSG_TYPE]   = topicFields[2] 
    if EDGE_ID    != "false": metric.tags[EDGE_ID]    = topicFields[3]
    if DEVICE_ID  != "false": metric.tags[DEVICE_ID]  = topicFields[4]


def buildNameTags(metric,name):
    # Remove type and alias from metric.fields - They are not useful for analysis
    metric.fields.pop("type")
    metric.fields.pop("alias")
    if "name" in metric.fields:
        metric.fields.pop("name")

    # The Groov EPIC metric names are comprised of 3 fields separated by a '/'
    #   source, datatype, and metric name
    # Extract these fields and include them as tags.
    fields = name.split('/')
    nfields = len(fields)
    if nfields > 0: 
        metric.tags["Source"] = fields[0]
    if nfields > 1:
        metric.tags["Datatype"] = fields[1]
    if nfields > 2: 
        metric.tags["Metric"] = fields[2]

        # OPTIONAL
        #
        # By using underscore characters the metric name can be further
        # divided into additional tags.  
        # How this is defined is site specific.  
        # Customize this as you wish

        # The following demonstrates dividing the metric name into 3, 4 or 5 new tags
        # A metric name must have between 3-5 underscore separated fields 
        
        # If there is only one or two fields then the only tag created is 'metric' 
        # which has the full name
        #
        # The last field is Units and is filled before fields 3, 4 and 5
        # Ex: C, V, Torr, W, psi, RPM, On....
        # The units are used in Influx as the 'measurement' name.
        #
        # 
        # Fields 3, 4 and 5 (device, position, composition) are optional
        #    measurement_component_device_position_composition_units
        #
        # Ex:  I_FuelTank1_C                    (2 fields) 
        #         Measurement   I
        #         Component     FuelTank1   
        #         Units         C
        #
        #      I_FuelTank1_TC_Outlet_C          (5 fields)           
        #         Measurement   I
        #         Component     FuelTank1   
        #         Device        TC
        #         Position      Outlet
        #         Units         C
        #
        #      I_FuelTank1_TC_Outlet_Premium_C  (6 fields) 
        #         Measurement   I
        #         Component     FuelTank1   
        #         Device        TC  
        #         Position      Outlet
        #         Composition   Premium   
        #         Units         C

        # Split the metric name into fields using '_' 
        sfields = fields[2].split('_')
        nf = len(sfields)
        # Don't split the name if it's one or two fields 
        if nf <= 2:
            metric.name = "Name"
        if nf > 2:
            metric.name = sfields[nf-1]     # The Units are used for the metric name
            metric.tags["Component"] = sfields[1]
        if nf > 3:
            metric.tags["Device"] = sfields[2]
        if nf > 4:
            metric.tags["Position"] = sfields[3]
        if nf > 5:
            metric.tags["Composition"] = sfields[4]

def apply(metric):
    output = metric

    log.debug("apply metric: {}".format(metric))

    topic = metric.tags.get("topic", "")
    topicFields = extractTopicTags(metric)
    edgeid = topicFields[3]      # Sparkplug spec specifies 4th field as edgeid

    # Split the topic into fields and assign to variables
    # Determine if the message is of type birth and if so add it to the "devices" LUT.
    if DEATH_TAG in topic:
        output = None
    elif BIRTH_TAG in topic:
        log.debug("    metric msg_type: {}    edgeid: {}   topic: {}".format(BIRTH_TAG, edgeid, topic))
        if "alias" in metric.fields and "name" in metric.fields:
            # Create the lookup-table using "${edgeid}/${alias}" as the key and "${name}" as value
            alias = metric.fields.get("alias")
            name = metric.fields.get("name")
            id = "{}/{}".format(edgeid,alias)
            log.debug("  --> setting alias: {}    name: {}   id: {}'".format(alias, name, id))
            state["aliases"][id] = name
            if "value" in metric.fields:
                buildTopicTags(metric, topicFields)
                buildNameTags(metric, name)
            else:
                output = None

            # Try to resolve the unresolved if any
            if len(state["unresolved"]) > 0:
                # Filter out the matching metrics and keep the rest as unresolved
                log.debug("    unresolved")
                unresolved = [("{}/{}".format(edgeid, m.fields["alias"]), m) for m in state["unresolved"]]
                matching = [(mid, m) for mid, m in unresolved if mid == id]
                state["unresolved"] = [m for mid, m in unresolved if mid != id]

                log.debug("    found {} matching unresolved metrics".format(len(matching)))
                # Process the matching metrics and output - TODO - needs debugging
                # for mid, m in matching:
                #     buildTopicTags(m,topicFields)
                #     buildNameTags(m)
                # output = [m for _, m in matching] + [metric]

    elif DATA_TAG in topic:
        log.debug("    metric msg_type: {}    edgeid: {}   topic: {}".format(DATA_TAG, edgeid, topic))
        if "alias" in metric.fields:
            alias = metric.fields.get("alias")

            # Lookup the ID. If we know it, replace the name of the metric with the lookup value,
            # otherwise we need to keep the metric for resolving later. 
            # This can happen if the messages are out-of-order for some reason...
            id = "{}/{}".format(edgeid,alias)
            if id in state["aliases"]:
                name = state["aliases"][id]
                log.debug("    found alias: {}     name: {}".format(alias, name))
                buildTopicTags(metric,topicFields)
                buildNameTags(metric,name)
            else:
                # We want to hold the metric until we get the corresponding birth message
                log.debug("    id not found: {}".format(id))
                output = None
                if len(state["unresolved"]) >= MAX_UNRESOLVED:
                    log.warn("    metric overflow, trimming {}".format(len(state["unresolved"]) - MAX_UNRESOLVED+1))
                    # Release the unresolved metrics as raw and trim buffer
                    output = state["unresolved"][MAX_UNRESOLVED-1:]
                    state["unresolved"] = state["unresolved"][:MAX_UNRESOLVED-1]
                log.debug("    --> keeping metric")
                state["unresolved"].append(metric)
        else:
            output = None

    return output

