The purpose of this post is to learn how to use the Calculate Window with a Python Micro Analytic Service module in SAS Event Stream Processing to extract a very large number of time series features from a user-defined window of time series data. The Python package TSFRESH allows users to automatically extract hundreds of numeric features from a time series, and these features can be used in a variety of ways, including as inputs to predictive models or anomaly detection routines.
SAS Event Stream Processing has several built-in anomaly detection methods, some of which expect numeric inputs like these extracted features. In SAS Event Stream Processing we don’t have a well-defined notion of a single time series, instead we have an unending stream of events; but we can break the unending stream of events into a collection of windows from which we can extract time series features. We can then use these features in anomaly detection models like Support Vector Data Description (SVDD) to provide real-time detection of things like machine failure or changes in power consumption. A previous post (link in the References, below) discussed how to extract these features and build a SVDD model to detect anomalies in data at rest. This post will introduce how to extract these features from windows of streaming data in SAS Event Stream Processing.
In this post we will use the Calculate window with the TSFRESH package in SAS Event Stream Processing to extract these features from user-defined temporal windows, thus converting the stream of time series values into a stream of extracted features for each window. In a subsequent post we will learn how to use these features to perform unsupervised anomaly detection on the streaming data in SAS Event Stream Processing.
A major challenge we will run into in this process is the sheer number of time series features we can extract with TSFRESH; rather than choosing the most relevant features (which can often be a good choice when working with specific kinds of data) we will show how to systematically include all these features in the SAS Event Stream Processing pipeline. This would usually entail a lot of manual work to define the Python Micro Analytic Service module and to setup the output schema for these features, but we will see how we can modify the SAS Event Stream Processing XML project using Python code to programmatically create the Python Micro Analytics Service module and the output schema. This general idea (modifying the XML in Python) can be applied to other tasks in SAS Event Stream Processing as well, such as preparing the input schema for complex data streams with many fields.
We start by creating a SAS Event Stream Processing project and reading in the wind turbine data. This data was used in the previous post to train an unsupervised SVDD model. In this case we want to deploy this model in SAS Event Stream Processing, so we need to read the data into a SAS Event Stream Processing Source Window. The following XML for the source window can be used with a SAS Event Stream Processing project package, as long as the turbine.csv file is available in the project package test_files folder.
<window-source index="pi_EMPTY" insert-only="true" name="Source">
<schema>
<fields>
<field name="time" type="int32" key="true"/>
<field name="Turbine1" type="double"/>
<field name="Turbine2" type="double"/>
<field name="Turbine3" type="double"/>
<field name="Turbine4" type="double"/>
</fields>
</schema>
<connectors>
<connector class="fs" name="turbine_Connector_1">
<properties>
<property name="type"><![CDATA[pub]]></property>
<property name="header"><![CDATA[1]]></property>
<property name="addcsvopcode"><![CDATA[true]]></property>
<property name="addcsvflags"><![CDATA[normal]]></property>
<property name="fsname"><![CDATA[@ESP_PROJECT_HOME@/test_files/turbine.csv]]></property>
<property name="fstype"><![CDATA[csv]]></property>
</properties>
</connector>
</connectors>
</window-source>
Select any image to see a larger version.
Mobile users: To view the images, select the "Full" version at the bottom of the page.
Before we can use the Calculate window to extract the time series features, we must define our Python Micro Analytic Service module at the project level. The Micro Analytic Service module expects us to define a Python program that calculates the desired features. In this program we must have the following key elements for the Micro Analytic Service module to work:
A key thing to note here is that in our use case we will have hundreds of outputs so it would be unreasonably tedious to write this code all by hand, instead we will use Python string manipulation to create the code and then copy-paste this code into the Micro Analytic Service module.
We start in Python by importing the required packages that we will use to extract and store the time series features:
import pandas as pd
import numpy as np
from tsfresh import extract_features
We then load our data and use the tsfresh Python package to extract our time series features. It would be fine to use a sample of the data here, the goal is to just gather the names of the relevant time series features and store them in a Python list, so we don’t need the whole dataset.
#read turbine data and drop 1-3 to focus on Turbine4 which includes a failure
turbine = pd.read_csv('turbine.csv')
turbine = turbine.drop(['Turbine1','Turbine2','Turbine3'], axis=1)
#create a Window_ID to extract features in windows of size 50
#this isn't strictly necessary for this task but is a good reminder that we used a window size of 50 when training the SVDD
turbine["Window_ID"] = np.ceil((turbine['time'])/50)
Use #TSFRESH to extract time series features from each window of sensor data
turbine_features = extract_features(turbine, column_id="Window_ID", column_sort="time")
In this example we extract features from the time series for ‘Turbine4’, but some of these features are missing or have undefined/infinite values. These will not be suitable values for extraction so we must drop any of the time series feature columns with infinite or missing values. In the following code we also store a list of the raw feature names (raw_feature_list) and then do some post-processing on the names to enforce naming conventions for SAS Event Stream Processing (esp_feature_list). SAS Event Stream Processing will not allow certain characters in the names of fields so we must remove those characters before outputting those values as SAS Event Stream Processing fields.
#Although we accepted all of the default features generated by TSFRESH, some have NaN values for our series, and some have names that won't work with SAS Event Stream Processing
#Let's postprocess the extracted features to clean up these issues
#remove NaN values from the data (including inf values...), just drop those features
turbine_features.replace([np.inf, -np.inf], np.nan, inplace=True)
turbine_features.dropna(axis=1, inplace=True)
#keep the Window_ID as a variable in the dataframe
turbine_features.reset_index(inplace=True)
#raw feature list
raw_feature_list = list(turbine_features)[1:]
#change column names to conform to SAS Event Stream Processing field name requirements (preparation for deployment)
new_names = []
new_names.append('Window_ID')
for i in range(1, len(list(turbine_features))):
new_names.append(list(turbine_features)[i].replace('"','')
.replace(' ','_')
.replace(',','_')
.replace('.','pt')
.replace('(','')
.replace(')','')
.replace('-','minus'))
turbine_features.columns = new_names
esp_feature_list = list(turbine_features)[1:]
Now that we have a list of features, we want to output we can start using Python string manipulation to create the Python code necessary for the Micro Analytic Service module. The esp_feature_list Python variable will be used to systematically create the output variables for the Micro Analytic Service module function (which we will name calculate_features), while the raw_feature_list will be used to systematically extract the relevant features from the output pandas DataFrame created by the tsfresh package after extracting the features. In the following blocks of code, we will systematically create strings that we will then piece together to render the full Python Micro Analytic Service code. We start by creating the output string that is required by Micro Analytic Service to identify the values output by the calculate_features function. These will be the same values that are used in the output schema for the Calculate window.
#output string for MAS
output_string = '"Output: feature_calculated, Window_ID'
for i in range(0,len(esp_feature_list)):
output_string = output_string+', '+esp_feature_list[i]
output_string = output_string+'"'
In this case the output_string variable contains a list of output variables, including all the features in the esp_feature_list, along with a variable specifying a unique ID for each window of time series values (Window_ID), and a variable specifying if we extracted features on this event in SAS Event Stream Processing (feature_calculated). Recall that we are using a window of time series values to extract features, so we will only extract features once we have a large enough window of data. In this example we will wait until we accumulate 50 time series values before extracting features, and then we will extract features again every 50 values. The idea with the feature_calculated variable is that it will be set to 0 for most events but set to 1 after 50 events have been accumulated and we are ready to extract the time series features. Our next step is to create a string defining all the feature names we want to extract and initially setting them equal to -1. The idea here is that we only extract meaningful features every 50 events, so the values of the extracted features should be set to -1 until we actual calculate them, at which point we replace them with the appropriate value. Note in the code that we use the \n character to create a new line, this will allow us to print the formatted Python code and copy it directly into the Micro Analytic Service window with no additional formatting required (we are going to use this a lot in the following blocks of code to make our final code more readable and sensibly formatted for Python).
#each feature_name = -1
feature_name_string = ' '
for i in range(0,len(esp_feature_list)):
feature_name_string = feature_name_string+esp_feature_list[i]+" = -1\n
Next, we create a string where we set those calculate feature names equal to the actual extracted features from the pandas DataFrame generated by the tsfresh package after we extract the features. This string will be used inside a Python if statement, so it only executes once every 50 events (when we extract the features). Notice that in this code we use both the esp_feature_list (for the output features) and the raw_feature_list (the feature names from the DataFrame of extracted features).
#feature_name = float(extracted_features["tsfresh_feature_name"].iloc[0])
extracted_features_string = ' '
for i in range(0,len(esp_feature_list)):
extracted_features_string = extracted_features_string+" "+esp_feature_list[i]+" = "+"float(extracted_features['"+raw_feature_list[i]+"'].iloc[0])\n "
Finally, we must prepare a return string that will appear at the end of our calculate_features function to return the calculated features. For most events this just returns those -1 values we set, but every 50 events it will return the actual extracted time series features, and the feature_calculated variable will be changed from 0 to 1.
#return string
return_string = 'return (feature_calculated, Window_ID'
for i in range(0,len(esp_feature_list)):
return_string = return_string+', '+esp_feature_list[i]
return_string = return_string+')
Notice that in each of the above blocks of Python code we use a for loop to iterate over the features in the feature list. This saves us a lot of time in manually typing out the names of the hundreds of time series features, but it does make the code a bit more confusing. Now that have all these components, we need to assemble them into a string containing the full Python Micro Analytic Service module code. This is the most confusing part of the example because we are using Python string manipulation tools to stitch together Python variables into valid Python code (which we can copy into the Micro Analytic Service module, or just save and reference in the Micro Analytic Service module). We start our code by importing the pandas package and defining a global DataFrame named df_tsfresh that will be used to accumulate time series values. We also define a function constants() that returns the number of time series points we want to accumulate before extracting features. This makes our code a bit more modular and makes it easier to change the window size for extracting time series features. We then start the definition of the calculate_features function which will accept the Turbine4 and time fields from the SAS Event Stream Processing source window. The rest of the code will be assembled from the component strings we defined earlier.
python_code_string = "import pandas as pd\n\nglobal df_tsfresh\ndf_tsfresh = pd.DataFrame(columns=['id','Turbine4'])\n\ndef constants():\n const = {}\n const['num_of_records'] = 50\n return const\n\ndef calculate_features(Turbine4, time):\n "
Next, we add the required output string to the function definition, this is something that the Micro Analytic Service looks for in the Python code to connect the returned output to the output schema of the Calculate window.
python_code_string = python_code_string+output_string+"\n\n "
After adding the output string to the calculate_features function we import the extract_features function from the tsfresh Python package and pull in the df_tsfresh DataFrame and the constants (the number of time series values we want to accumulate) into the scope of the calculate_features function. We also set initial values for the feature_calculated variable and the Window_ID variable. We will overwrite these values after we have accumulated enough events in the df_tsfresh DataFrame to extract features (in this case 50 events). We do the same thing to initialize the values of all the extracted features (in this case using the feature_name_string we defined earlier):
python_code_string = python_code_string+feature_name_string
Now we are almost ready to extract the features. We first add the latest event to the df_tsfresh DataFrame, and once we have accumulated 50 events, we trigger the Python if statement to extract features using the tsfresh extract_features function. Each time we do this we also set the feature_calculated variable equal to 1 and define the Window_ID variable based on the incoming time field and the number of events we accumulate (in this case 50). Once we extract features, we reset the df_tsfresh DataFrame so that it is empty and can start accumulating 50 new values. If we wanted to do sliding window feature extraction, we would have to keep some values here.
python_code_string = python_code_string+"\n df_tsfresh = pd.concat([df_tsfresh, pd.DataFrame.from_records([dict(id=1, Turbine4=Turbine4)])], ignore_index=True)\n\n if len(df_tsfresh.index) >= const['num_of_records']:\n feature_calculated = 1\n Window_ID = time/const['num_of_records']\n extracted_features = extract_features(df_tsfresh, column_id='id', column_value='Turbine4')\n\n df_tsfresh = pd.DataFrame(columns=['id','Turbine4'])\n\n"
We are still in the if statement that triggers once we have accumulated 50 events, and now we need to set the output features equal to the extracted features from the DataFrame generated by the tsfresh extract_features function. We do this using the extracted_features_string we created earlier.
python_code_string = python_code_string+extracted_features_string
At this point we can exit the if statement and end the calculate_features function with a return statement that returns all the calculated features along with the feature_calculated and Window_ID variables. Recall that most of these values are generically set to -1 (0 for the feature_calculated variable), but every 50 events feature_calculated is set to 1 and the other values are populated with meaningful information. Later in our SAS Event Stream Processing project we will use a Filter window to select only these meaningful events.
python_code_string = python_code_string+"\n "+return_string
We have now completed the construction of the Python code string that we will want to use in the Python Micro Analytic Service module in SAS Event Stream Processing. It’s a good practice to print this code string out in Python to inspect it for any errors, and then we can either copy it directly into the Micro Analytic Service module interface in SAS Event Stream Processing Studio or save it as a .py code file and reference that file in the SAS Event Stream Processing project. For pedagogical purposes we will print it out and copy it into the Micro Analytic Service module.
print(python_code_string)
The output generated from this print statement is quite long, so it will be attached at the end of the post as a reference. It can be copied into a Micro Analytic Service module in your Event Stream Processing environment if you want to reproduce this demo. Note that for this to work you will need to have a Python installation with the pandas and tsfresh Python packages available and referenced in the environment variable MAS_PYPATH in the SAS Event Stream Processing server container. You generally need to work with an administrator to configure this.
Note that in the above screenshot we have created a SAS Micro Analytic Service Module named ‘tsfresh_feature_extraction’ and we indicate that this module is a block of Python code with a function named calculate_features. We then copied the Python code we created earlier using Python string manipulation into the ‘Embedded code’ block in the window. For deployment projects it would make sense to store the Python code in an ‘External file’ (just a .py Python program) and store this file in the Project Package. This would make the project XML simpler since it would no longer include the large block of Python code we just created, and we can maintain and perform version control on this Python program independently of the project XML.
The next step is to add a Calculate window the project to use the Micro Analytic Service module we just created and extract the time series features from a window of data. This window will be a bit complicated because we plan to extract hundreds of features, meaning we must add hundreds of fields to the output schema for this window. Rather than hand-coding these fields we will use a Python program to systematically modify the XML for the Python window to include the appropriate Python code and output schema for the extracted features. Before we get to creating the schema, we must also specify the method we will use to perform the calculation. In this case we will use a User-Specified Calculation and then point the Handler to the Micro Analytic Service module and the Python function within that we created above:
From here the last step to configuring the calculate window is to specify the schema, and rather than typing in the names of the hundreds of time series features we generate with the calculate_features function we will create the schema XML using Python string manipulation.
Once again, we will use the esp_feature_list variable in Python that we created earlier when writing the Micro Analytic Service Python module code. This list contains all the time series features we extracted using the calculate_features function, and thus contains all the fields we want to add to the output schema. We start by creating a string containing the field names for the extracted features. Each of these features will have the type of ‘double’ in SAS Event Stream Processing, and we loop over the features to create the XML field definitions.
field_name_string = ''
for i in range(0,len(esp_feature_list)):
tmp_string = " \n"
field_name_string = field_name_string+tmp_string
In theory we could copy-paste this string into the exact right location in the XML for the Calculate window, but in practice it is easier to create and print the full schema XML definition and copy-paste that into the XML for the Calculate window. This just includes the XML elements indicating that we are defining fields for a schema, along with the fields we want to include that are not part of the extracted features. These fields are the time from the original dataset, which is used as a key variable, the Turbine4 time series from the original dataset, the feature_calculated flag generated by the calculate_features function to indicate whether we extracted features for this event (it is set to 1 every 50 events in this example), and the Window_ID variable generated by the calculate_features function to indicate which window of time values the features came from. We put these together with the field names for the extracted features to get the full schema string:
output_schema_string = " <schema>\n <fields>\n "
output_schema_string = output_schema_string+"<field name='time' type='int32' key='true'/>\n <field name='Turbine4' type='double'/>\n <field name='feature_calculated' type='double'/>\n <field name='Window_ID' type='int32'/>\n"
output_schema_string = output_schema_string+field_name_string+" </fields>\n </schema>"
We are now ready to print this schema string and copy it into the XML for the Calculate window.
print(output_schema_string)
We will put this XML element before the <mas-map> definition in the Calculate window XML, although SAS Event Stream Processing is not picky about the ordering of these XML elements as long as the formatting (including indentation) is correct.
This will conveniently update the graphical interface in SAS Event Stream Processing with all the schema elements we defined in the XML. Note that this includes many pages of fields, totaling 473 fields that we have added to the Calculate window schema. This may be too many fields to track, especially in a scenario where we are deploying this project to the edge (calculating more features will increase the memory footprint of the SAS Event Stream Processing project), so in realistic deployment scenarios we might want to think carefully about which time series features might be most relevant in detecting anomalies and only use those as inputs to the SVDD model. In this example we want to show how to include many features systematically so you can use this approach even if you have fewer features. Note that the features you extract and include in the SAS Event Stream Processing project must match the features you included when originally training the SVDD anomaly detection model (see the previous post noted in the references for an example of training a SVDD model on features extracted from a window of time series using this same example data).
We have extracted the time series features and included them as fields in the SAS Event Stream Processing project, but most of our events are filled with irrelevant information. We only extract features every 50 time points, so most of the events in the Calculate window have the feature_calculated field equal to 0 and all the extracted feature fields equal to -1. We use a Filter window to remove those events and only focus on the events where we extract features from a window of time series values. We do this by filtering to select events where feature_calculated == 1 using a legacy Expression filter:
One minor caveat here is that for this Filter window to work properly we must modify the Advanced settings in the Calculate window to specify that it will only produce insert events. Our Calculate window will only produce insert events based on how we constructed it, but we must still specifically indicate this in the SAS Event Stream Processing project. The following screenshot shows the advanced settings on the calculate window with the required option selected:
After the filter window we still have the time field in the data and it is the key field, but we only have every 50 time points. It makes more sense to use the Window_ID field as a key field so we can use a Compute window to switch the key fields:
To accomplish this, we just modify the Compute window schema to set the Window_ID field to the Key and be sure to change the time field to use the expression ‘time’ (since it is no longer the key field, we are just copying it over from the previous window). We could remove the time field altogether, but it could be nice to determine at which time point each window ends (this is what is indicated by the time field at this point in the SAS Event Stream Processing project).
At this point we are ready to send the extracted features to the pretrained SVDD model for live scoring and deployment, but this post is already getting a bit long so we will save that topic for a follow-up post discussing deploying Analytic Store models in SAS Event Stream Processing. It’s also a good practice to test the SAS Event Stream Processing project and make sure everything is working before we begin to add more elements to the project. Of course, as we were originally developing this project, we tested it regularly (especially while developing the Calculate window with the complex Micro Analytic Service module) to make sure everything worked and resolve errors in our Python code. We save the project and enter test mode to see how everything works. After running the test, we quickly stream all 900 turbine time series values into SAS Event Stream Processing (this is simple toy data), but we can explore the fields created and their values.
Although the graphical interface only shows the first 15 fields, we can see in the panel on the left that there are 474 fields generated by the Calculate window. We can also see that the Calculate window has an event for each of the 900 time points, and a value for Turbine4 (our time series data) at each time point. Of course, we also have values of all the extracted features at each time point, but most of these values are -1 since the feauture_calculated field is set to 0. These correspond to events where we just accumulated time series value in our window (of size 50) and didn’t extract any features. Every 50 events the feature_calculated field is set to 1 and we have meaningful values for the 469 extracted features. The output of the filter window will be more useful since it focuses on the relevant events:
In the results for the Filter window (and the Compute window which comes after the Filter window) we only have 18 events, one for each window of 50 original time series data points. In this case the Window_ID field uniquely specifies the window of 50 time series points from which we extracted the time series features, and the feature values are all meaningful. In the Compute window we can drop the unnecessary fields (time, Turbine4, and feature_calculated) since they have already been used and the key field is now the Window_ID.
One final note on this project is that depending on the length of the time series and the scale of information you want to extract, you may want to change the window size (in the Python code this is const[‘num_of_records’] and is set to 50). Using a much longer window size can require additional memory for the SAS Event Stream Processing project, since the values we accumulated in the Python Micro Analytic Service module are stored in Python memory while the project is running. If you are encountering errors when testing the project associated with pod failure or out of memory issues you may want to increase the memory limit for the SAS Event Stream Processing project during testing. This will of course require you to allocate more memory to the project when you deploy it to production, potentially making it harder to deploy on an edge device. This is something to consider when working with this kind of anomaly detection model. To increase the memory when running a test, you can select the drop-down menu next to the Run Test button and select Configure and Run Test. From here you can click the Edit Deployment Settings… button to open the Deployment Settings menu and increase the memory limit for the SAS Event Stream Processing Project:
For this simple toy example with a window size of 50 events we don’t have to allocate more than the default of 1 Gigabyte of memory to the project, but larger window sizes can require more memory, and the error messaged created when the pod runs out of memory while accumulating events may not make it immediately obvious that you must allocate more memory to the project.
We have completed our task of extracting a large collection of time series features from a window of time series values, and our next step is to use these features to score a pretrained SVDD model in SAS Event Stream Processing. The training of the SVDD model on the wind turbine data was described in a previous post, titled “Anomaly Detection in Sensor Data using Support Vector Data Description and Time Series Features”. A follow-up post will discuss using these features to deploy of the anomaly detection model on streaming data.
References:
Find more articles from SAS Global Enablement and Learning here.
Join us for SAS Innovate 2025, our biggest and most exciting global event of the year, in Orlando, FL, from May 6-9. Sign up by March 14 for just $795.
Data Literacy is for all, even absolute beginners. Jump on board with this free e-learning and boost your career prospects.