Companies who are using Microsoft Azure's IoT platform to implement their IoT strategy will likely require the advanced analytic capabilities provided by SAS. Connecting IoT devices to the cloud is an important first step in any IoT strategy. However, it is important to leverage the data collected to drive business transformations. The data collected must be explored, analyzed and modelled. The resulting models must be then deployed so that machine learning can be applied to the live streaming data. For example, let's say you would like to predict flooding events for streams in your community. Weather information is a key ingredient into the prediction of flooding events. Therefore, in this article we are going to talk about how to query weather data from the Azure Maps Weather Service via a logic application. The logic application will also post this information to an Azure Event hub so that is can be consumed by SAS Event Stream Processing via a Kafka connector.
Overview
In this article we will configure an Azure Logic application to query the Azure Maps Weather service once an hour. The logic app will then format the data returned from the weather API into the required JSON string and send it to an Azure Event Hub. Once in the Event Hub, it is available to any application running a Kafka listener. An ESP project will be created which will retrieve these events from the Event Hub using an ESP Kafka connector. Once in ESP a function window will be used to parse the incoming JSON into an ESP schema.
Live Weather Data
For this example we will work with the Azure Maps Weather service which will return a JSON message in response to an API call. Please follow the directions described on the Azure Maps web page to request a subscription key which will be used to request weather data. An example API call might be as follows:
https://atlas.microsoft.com/weather/currentConditions/json?api-version=1.0&query=47.60357,-122.32945&subscription-key={Azure-Maps-Primary-Subscription-key}
Which would generate a JSON response such as this:
{
"results": [
{
"dateTime": "2020-10-19T20:39:00+00:00",
"phrase": "Cloudy",
"iconCode": 7,
"hasPrecipitation": false,
"isDayTime": true,
"temperature": {
"value": 12.4,
"unit": "C",
"unitType": 17
},
"realFeelTemperature": {
"value": 13.7,
"unit": "C",
"unitType": 17
},
"realFeelTemperatureShade": {
"value": 13.7,
"unit": "C",
"unitType": 17
},
"relativeHumidity": 87,
"dewPoint": {
"value": 10.3,
"unit": "C",
"unitType": 17 ...
Logic Application
Consider the following logic application flow:
At a high level there are three parts to this flow. First is a timer which triggers the flow once an hour. Next variables are initialized. Lastly, we loop through a list of locations and issue a weather API call for each. Since Azure is a code first environment you can easily recreate this logic application in your Azure tenant by cloning the provided example from GitHub (entire code block not listed inline for brevity).
{
"definition": {
"$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
"actions": {
"Initialize_array_of_locations": {
"inputs": {
"variables": [
{
"name": "weather_locations",
"type": "array",
"value": [
"35.79, -78.78",
"35.78, -78.75"
]
}
]
},
.................
Once the logic app example is cloned into your tenant you will need to customize it to contain your location and API key.
Locations
Edit the location array and change these values to your latitude and longitude.
{
"name": "weather_locations",
"type": "array",
"value": [
"35.79, -78.78",
"35.78, -78.75"
]
}
Weather API key
Also edit the following key to match your subscription:
"subscription-key": "ssRYdF8oylxr-B5_N0a7SUwOHlbGk6TsesTnV53Y"
Event Hub
Next we need to configure an Event Hub to process events. Azure uses the term Event Hub to mean both the cluster and the topic and it gets confusing so let's go over the conceptual mapping of terms between Kafka and Event Hub.
When you create a Kafka environment it is created by defining broker and zookeeper nodes to create a cluster. Azure does not expose the zookeeper concept. Therefore, creating our Kafka cluster will simply consist of creating a namespace and an event hub. This corresponds to creating a cluster and a topic in Kafka. First create the namespace.
Next create the event hub or topic.
Max message retention is 7 days. This means you will be able to replay 7 days' worth of messages from the event hub if needed. Increasing the partition count will increase message throughput. Since we are only producing 1 message per hour I will leave this at 1.
Connecting the Logic App to the Event Hub
Now that we have 2 Azure resources we need to connect them together so that messages flow between them. Navigate back to the logic application created earlier and edit the Send event action. Create a new connection using the namespace and event hub that was just created.
Now your logic app is actively issuing calls to the weather API, parsing the responses into a JSON string and sending that information to the Event Hub. Next we configure the ESP project to retrieve the messages from the Event Hub.
ESP Project
Our ESP project will consist of 2 windows. First will be a source window which will contain our Kafka connector. Source windows provide a means for ESP to ingest data. Next will be a Function window which will parse the JSON string into the schema.
Kafka Connector
In this example I have set up 2 Kafka connectors. The first will keep track of message offsets and retrieve only the latest message which has not been previously read. The second will read all 7 days' worth of messages that are stored in the Kafka topic. The difference is between needing current or historical data.
Let's take a closer look at the project XML to see how a connector is configured.
<connector class="kafka" name="kafkain7days" active="false">
<description><![CDATA[Grabs 7 days' worth of data]]></description>
<properties>
<property name="type"><![CDATA[pub]]></property>
<property name="kafkatype"><![CDATA[opaquestring]]></property>
<property name="kafkainitialoffset"><![CDATA[smallest]]></property>
<property name="urlhostport"><![CDATA[notusedwhennotHA]]></property>
<property name="kafkahostport"><![CDATA[Namesspace-weather-catcher.servicebus.usgovcloudapi.net:9093]]></property>
<property name="kafkatopic"><![CDATA[eh_weather_catcher]]></property>
<property name="kafkapartition"><![CDATA[-1]]></property>
<property name="kafkaglobalconfig"><![CDATA[security.protocol=SASL_SSL;sasl.username=$ConnectionString;sasl.password=Endpoint=sb://namesspace-weather-catcher.servicebus.usgovcloudapi.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=J5qKcs1mdF7vTbJZ7FNsNZBxlfsAHa3uLzuqI3Yo=;sasl.mechanism=PLAIN;ssl.ca.location=/etc/pki/tls/cert.pem]]></property>
</properties>
</connector>
The namespace and event hub defined previously are mapped to their Kafka counterparts in the connector.
You must retrieve the sasl.password from the namespace defined in Azure. First navigate to the namespace defined earlier and then click Shared access policies. Copy the connection string-primary key into the ESP project. For example:
JSON Decoding via ESP Function Window
Since we parsed the weather data responses in the Azure Logic Application the JSON messages returned from the Event Hub contain only the information we require. All we need to do now is parse the variables into the ESP schema. The function windows is defined as follows:
<window-functional pubsub="true" index="pi_EMPTY" name="func_parse_json">
<description><![CDATA[Parse incoming json into fields.]]></description>
<schema>
<fields>
<field name="index_esp" type="int64" key="true"/>
<field name="deviceID" type="string"/>
<field name="timestamp" type="stamp"/>
<field name="Latitude" type="string"/>
<field name="Longitude" type="string"/>
<field name="humidity" type="int32"/>
<field name="rain" type="double"/>
<field name="rainplus1" type="double"/>
<field name="temperature" type="double"/>
<field name="windspeed" type="double"/>
</fields>
</schema>
<function-context>
<properties>
<property-json name="jsonData"><![CDATA[$message]]></property-json>
</properties>
<functions>
<function name="deviceID"><![CDATA[json(#jsonData,'deviceID')]]></function>
<function name="timestamp"><![CDATA[json(#jsonData,'timestamp')]]></function>
<function name="Latitude"><![CDATA[json(#jsonData,'Latitude')]]></function>
<function name="Longitude"><![CDATA[json(#jsonData,'Longitude')]]></function>
<function name="humidity"><![CDATA[json(#jsonData,'humidity')]]></function>
<function name="location"><![CDATA[json(#jsonData,'location')]]></function>
<function name="rain"><![CDATA[product(json(#jsonData,'rain'),.0393700787401575)]]></function>
<function name="rainplus1"><![CDATA[product(json(#jsonData,'rainplus1'),.0393700787401575)]]></function>
<function name="temperature"><![CDATA[json(#jsonData,'temperature')]]></function>
<function name="windspeed"><![CDATA[json(#jsonData,'windspeed')]]></function>
</functions>
</function-context>
</window-functional>
Writing Messages to an Event Hub
Now let's say we would like to return messages to an output topic called eh_weather_slinger from ESP. To do that simply create another event hub or topic called eh_weather_slinger in the namespace called namesspace-weather-catcher as described above. ESP connectors which publish data outbound are denoted with the type sub which is short for subscription. We will also change the kafkatopic property to identify our newly created outbound topic or event hub.
Our connector description will be as follows:
<connector class="kafka" name="kafkaoutbound" active="false">
<description><![CDATA[Publish data to outbound topic ]]></description>
<properties>
<property name="type"><![CDATA[sub]]></property>
<property name="kafkatype"><![CDATA[opaquestring]]></property>
<property name="kafkahostport"><![CDATA[Namesspace-weather-catcher.servicebus.usgovcloudapi.net:9093]]></property>
<property name="kafkatopic"><![CDATA[eh_weather_slinger]]></property>
<property name="kafkaglobalconfig"><![CDATA[security.protocol=SASL_SSL;sasl.username=$ConnectionString;sasl.password=Endpoint=sb://namesspace-weather-catcher.servicebus.usgovcloudapi.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=J5qKcs1mdF2rZ7vTbJZ7FNsNZBxlfsAHa3uLzuqI3Yo=;sasl.mechanism=PLAIN;ssl.ca.location=/etc/pki/tls/cert.pem]]></property>
</properties>
</connector>
What have we learned?
This is the first step in gathering weather forecast data for your streaming machine learning project. Next add some sensor data and some trained machine learning models and start predicting flooding before it happens.
Are you ready for the spotlight? We're accepting content ideas for SAS Innovate 2025 to be held May 6-9 in Orlando, FL. The call is open until September 16. Read more here about why you should contribute and what is in it for you!
Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning and boost your career prospects.