BookmarkSubscribeRSS Feed

Streaming weather data into SAS Event Stream Processing from Azure

Started ‎01-11-2021 by
Modified ‎02-22-2021 by
Views 6,035

Companies who are using Microsoft Azure's IoT platform to implement their IoT strategy will likely require the advanced analytic capabilities provided by SAS.  Connecting IoT devices to the cloud is an important first step in any IoT strategy.  However, it is important to leverage the data collected to drive business transformations.  The data collected must be explored, analyzed and modelled.  The resulting models must be then deployed so that machine learning can be applied to the live streaming data.  For example, let's say you would like to predict flooding events for streams in your community.  Weather information is a key ingredient into the prediction of flooding events.  Therefore, in this article we are going to talk about how to query weather data from the Azure Maps Weather Service via a logic application.  The logic application will also post this information to an Azure Event hub so that is can be consumed by SAS Event Stream Processing via a Kafka connector.

 

Overview

In this article  we will configure an Azure Logic application to query the Azure Maps Weather service once an hour.  The logic app will then format the data returned from the weather API into the required JSON string and send it to an Azure Event Hub. Once in the Event Hub, it is available to any application running a Kafka listener.  An ESP project will be created which will retrieve these events from the Event Hub using an ESP Kafka connector.  Once in ESP a function window will be used to parse the incoming JSON into an ESP schema.

 

Live Weather Data

 

For this example we will work with the Azure Maps Weather service which will return a JSON message in response to an API call. Please follow the directions described on the Azure Maps web page to request a subscription key which will be used to request weather data. An example API call might be as follows:

https://atlas.microsoft.com/weather/currentConditions/json?api-version=1.0&query=47.60357,-122.32945&subscription-key={Azure-Maps-Primary-Subscription-key}

 

 Which would generate a JSON response such as this:  

{
"results": [
    {
        "dateTime": "2020-10-19T20:39:00+00:00",
        "phrase": "Cloudy",
        "iconCode": 7,
        "hasPrecipitation": false,
        "isDayTime": true,
        "temperature": {
            "value": 12.4,
            "unit": "C",
            "unitType": 17
        },
        "realFeelTemperature": {
            "value": 13.7,
            "unit": "C",
            "unitType": 17
        },
        "realFeelTemperatureShade": {
            "value": 13.7,
            "unit": "C",
            "unitType": 17
        },
        "relativeHumidity": 87,
        "dewPoint": {
            "value": 10.3,
            "unit": "C",
            "unitType": 17    ...

 

Logic Application 

 

Consider the following logic application flow: 

 

Logic App FlowLogic App Flow

At a high level there are three parts to this flow. First is a timer which triggers the flow once an hour.  Next variables are initialized.  Lastly, we loop through a list of locations and issue a weather API call for each.  Since Azure is a code first environment you can easily recreate this logic application in your Azure tenant by cloning the provided example from GitHub (entire code block not listed inline for brevity).

 

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "Initialize_array_of_locations": {
                "inputs": {
                    "variables": [
                        {
                            "name": "weather_locations",
                            "type": "array",
                            "value": [
                                "35.79, -78.78",
                                "35.78, -78.75"
                            ]
                        }
                    ]
                },
.................

 

Once the logic app example is cloned into your tenant you will need to customize it to contain your location and API key.

 

Locations

 

Edit the location array and change these values to your latitude and longitude.

{
    "name": "weather_locations",
    "type": "array",
    "value": [
        "35.79, -78.78",
        "35.78, -78.75"
    ]
}

 

Weather API key

 

Also edit the following key to match your subscription: 

"subscription-key": "ssRYdF8oylxr-B5_N0a7SUwOHlbGk6TsesTnV53Y"

 

Event Hub

 

Next we need to configure an Event Hub to process events. Azure uses the term Event Hub to mean both the cluster and the topic and it gets confusing so let's go over the conceptual mapping of terms between Kafka and Event Hub.

Event Hub to Kafka mappingEvent Hub to Kafka mapping

When you create a Kafka environment it is created by defining broker and zookeeper nodes to create a cluster.  Azure does not expose the zookeeper concept. Therefore, creating our Kafka cluster will simply consist of creating a namespace and an event hub.  This corresponds to creating a cluster and a topic in Kafka.  First create the namespace.

 

Create NamespaceCreate Namespace

Next create the event hub or topic.

 

Create topicCreate topic

Max message retention is 7 days. This means you will be able to replay 7 days' worth of messages from the event hub if needed.  Increasing the partition count will increase message throughput. Since we are only producing 1 message per hour I will leave this at 1.

 

Connecting the Logic App to the Event Hub

 

Now that we have 2 Azure resources we need to connect them together so that messages flow between them.  Navigate back to the logic application created earlier and edit the Send event action.  Create a new connection using the namespace and event hub that was just created.

 

Send EventSend Event 

Now your logic app is actively issuing calls to the weather API, parsing the responses into a JSON string and sending that information to the Event Hub.  Next we configure the ESP project to retrieve the messages from the Event Hub.

 

ESP Project

 

Our ESP project will consist of 2 windows.  First will be a source window which will contain our Kafka connector.  Source windows provide a means for ESP to ingest data.  Next will be a Function window which will parse the JSON string into the schema.

 

ESP ProjectESP Project

Kafka Connector

 

In this example I have set up 2 Kafka connectors. The first will keep track of message offsets and retrieve only the latest message which has not been previously read.  The second will read all 7 days' worth of messages that are stored in the Kafka topic.  The difference is between needing current or historical data.

 

ConnectorsConnectors

 

Let's take a closer look at the project XML to see how a connector is configured.

 

<connector class="kafka" name="kafkain7days" active="false">
      <description><![CDATA[Grabs 7 days' worth of data]]></description>
      <properties>
        <property name="type"><![CDATA[pub]]></property>
        <property name="kafkatype"><![CDATA[opaquestring]]></property>
        <property name="kafkainitialoffset"><![CDATA[smallest]]></property>
        <property name="urlhostport"><![CDATA[notusedwhennotHA]]></property>
        <property name="kafkahostport"><![CDATA[Namesspace-weather-catcher.servicebus.usgovcloudapi.net:9093]]></property>
        <property name="kafkatopic"><![CDATA[eh_weather_catcher]]></property>
        <property name="kafkapartition"><![CDATA[-1]]></property>
        <property name="kafkaglobalconfig"><![CDATA[security.protocol=SASL_SSL;sasl.username=$ConnectionString;sasl.password=Endpoint=sb://namesspace-weather-catcher.servicebus.usgovcloudapi.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=J5qKcs1mdF7vTbJZ7FNsNZBxlfsAHa3uLzuqI3Yo=;sasl.mechanism=PLAIN;ssl.ca.location=/etc/pki/tls/cert.pem]]></property>
      </properties>
</connector>

 

The namespace and event hub defined previously are mapped to their Kafka counterparts in the connector.

  • type pub indicates messages are published to this ESP
  • opaquestring indicates all message content is contained in one string variable
  • kafkainitialoffset set to smallest will get every message available from this broker
  • urlhostport is normally set to the zookeeper node which is unavailable in Azure
  • kafkahostport refers to the Kafka broker URL
  • topic and event hub are synonymous
  • kafkapartition = -1 means retrieve messages from any available partition
  • kafkaglobalconfig defines the security and access used to talk to the broker

You must retrieve the sasl.password from the namespace defined in Azure. First navigate to the namespace defined earlier and then click Shared access policies. Copy the connection string-primary key into the ESP project. For example:

 

NS_connection_string.png

 JSON Decoding via ESP Function Window

 

Since we parsed the weather data responses in the Azure Logic Application the JSON messages returned from the Event Hub contain only the information we require.  All we need to do now is parse the variables into the ESP schema.  The function windows is defined as follows:

 

<window-functional pubsub="true" index="pi_EMPTY" name="func_parse_json">
  <description><![CDATA[Parse incoming json into fields.]]></description>
  <schema>
    <fields>
      <field name="index_esp" type="int64" key="true"/>
      <field name="deviceID" type="string"/>
      <field name="timestamp" type="stamp"/>
      <field name="Latitude" type="string"/>
      <field name="Longitude" type="string"/>
      <field name="humidity" type="int32"/>
      <field name="rain" type="double"/>
      <field name="rainplus1" type="double"/>
      <field name="temperature" type="double"/>
      <field name="windspeed" type="double"/>
    </fields>
  </schema>
  <function-context>
    <properties>
      <property-json name="jsonData"><![CDATA[$message]]></property-json>
    </properties>
    <functions>
      <function name="deviceID"><![CDATA[json(#jsonData,'deviceID')]]></function>
      <function name="timestamp"><![CDATA[json(#jsonData,'timestamp')]]></function>
      <function name="Latitude"><![CDATA[json(#jsonData,'Latitude')]]></function>
      <function name="Longitude"><![CDATA[json(#jsonData,'Longitude')]]></function>
      <function name="humidity"><![CDATA[json(#jsonData,'humidity')]]></function>
      <function name="location"><![CDATA[json(#jsonData,'location')]]></function>
      <function name="rain"><![CDATA[product(json(#jsonData,'rain'),.0393700787401575)]]></function>
      <function name="rainplus1"><![CDATA[product(json(#jsonData,'rainplus1'),.0393700787401575)]]></function>
      <function name="temperature"><![CDATA[json(#jsonData,'temperature')]]></function>
      <function name="windspeed"><![CDATA[json(#jsonData,'windspeed')]]></function>
    </functions>
  </function-context>
</window-functional>

 

Writing Messages to an Event Hub

 

Now let's say we would like to return messages to an output topic called eh_weather_slinger from ESP.  To do that simply create another event hub or topic called eh_weather_slinger in the namespace called namesspace-weather-catcher as described above.  ESP connectors which publish data outbound are denoted with the type sub which is short for subscription.  We will also change the kafkatopic property to identify our newly created outbound topic or event hub.
Our connector description will be as follows: 

 

<connector class="kafka" name="kafkaoutbound" active="false">
      <description><![CDATA[Publish data to outbound topic ]]></description>
      <properties>
        <property name="type"><![CDATA[sub]]></property>
        <property name="kafkatype"><![CDATA[opaquestring]]></property>
        <property name="kafkahostport"><![CDATA[Namesspace-weather-catcher.servicebus.usgovcloudapi.net:9093]]></property>
        <property name="kafkatopic"><![CDATA[eh_weather_slinger]]></property>
        <property name="kafkaglobalconfig"><![CDATA[security.protocol=SASL_SSL;sasl.username=$ConnectionString;sasl.password=Endpoint=sb://namesspace-weather-catcher.servicebus.usgovcloudapi.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=J5qKcs1mdF2rZ7vTbJZ7FNsNZBxlfsAHa3uLzuqI3Yo=;sasl.mechanism=PLAIN;ssl.ca.location=/etc/pki/tls/cert.pem]]></property>
      </properties>
</connector>

 

What have we learned?  

 

  • How to drive messages into an Azure Event hub from a logic application
  • How to use the ESP Kafka connector to connect to an Azure event hub
  • How to use a function window to transpose a JSON message into ESP variables
  • How to write data back to the event hub or topic

 

 This is the first step in gathering weather forecast data for your streaming machine learning project.  Next add some sensor data and some trained machine learning models and start predicting flooding before it happens.  

Version history
Last update:
‎02-22-2021 03:51 PM
Updated by:
Contributors

SAS Innovate 2025: Call for Content

Are you ready for the spotlight? We're accepting content ideas for SAS Innovate 2025 to be held May 6-9 in Orlando, FL. The call is open until September 16. Read more here about why you should contribute and what is in it for you!

Submit your idea!

Free course: Data Literacy Essentials

Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning  and boost your career prospects.

Get Started

Article Tags