Hi everyone,
I have some logical problem. I need to implement next logic in my flow:
sample source:
deviceName deviceType time telemetry_counterTotal
A1 X 9PM 10800
A1 X 10PM 10600
A1 X 11PM 11500
The formula for desired columns (let's call it here computedColumn) is (telemetry_counterTotal / 100.00 ) + LAG(computedColumn, 1*)
*This is computedColumn from the previous iteration, but if it's the first iteration, of course, it's zero.
Desired output:
deviceName deviceType time telemetry_counterTotal computedColumn
A1 X 11PM 10800 329.00
A1 X 10PM 10600 221.00
A1 X 9PM 11500 115.00
Below is my code sample, I don't have big experience with DS2 so I tried to go around, but not sure if it even possible do this only with aggregate and compute windows, but I think this can't be so big problem. Any hint, suggestion and especially, code sample would be really helpful. Also, any kind of documentation or examples with similar problems I would appreciate.
<project name="project_test_f" threads="1" heartbeat-interval="1" pubsub="auto"> <metadata> <meta id="layout">{"cq":{"Aggregate1":{"x":-185,"y":300},"Compute1":{"x":-160,"y":170},"Compute2":{"x":-185,"y":435},"Copy1":{"x":-150,"y":35},"sigfox_source":{"x":-180,"y":-100}}}</meta> <meta id="studioModifiedBy">sasboot</meta> <meta id="studioModified">1539092809002</meta> <meta id="studioUploadedBy">sasboot</meta> <meta id="studioUploaded">1538120251741</meta> </metadata> <mas-modules> <mas-module module="New_Module_1" language="ds2" func-names="get_calc_total"> <code><![CDATA[data esp.out ; declare counterTotal2 ; retain counterTotal_last counterTotal_lag; method run() ; set esp.in ; if isnull(counterTotal_lag) then counterTotal2 =counterTotal_last/100 ; end ; enddata ;]]></code> </mas-module> </mas-modules> <contqueries> <contquery name="cq"> <windows> <window-source pubsub="true" index="pi_EMPTY" output-insert-only="true" name="sigfox_source"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> </fields> </schema> </window-source> <window-copy pubsub="true" output-insert-only="true" index="pi_EMPTY" name="Copy1"/> <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute1"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> <field name="computedColumn" type="int32"/> </fields> </schema> <output> <field-expr><![CDATA[attributes_avgSnr]]></field-expr> <field-expr><![CDATA[attributes_integrationName]]></field-expr> <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr> <field-expr><![CDATA[attributes_latAntenna]]></field-expr> <field-expr><![CDATA[attributes_lngAntenna]]></field-expr> <field-expr><![CDATA[attributes_messageType]]></field-expr> <field-expr><![CDATA[attributes_rssi]]></field-expr> <field-expr><![CDATA[attributes_sampleTime]]></field-expr> <field-expr><![CDATA[attributes_snr]]></field-expr> <field-expr><![CDATA[attributes_station]]></field-expr> <field-expr><![CDATA[telemetry_battIntStat]]></field-expr> <field-expr><![CDATA[telemetry_batteryReading]]></field-expr> <field-expr><![CDATA[telemetry_counterTotal]]></field-expr> <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr> <field-expr><![CDATA[0]]></field-expr> </output> </window-compute> <window-aggregate pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Aggregate1"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> <field name="computedColumn" type="int32"/> </fields> </schema> <output> <field-expr><![CDATA[ESP_aLast(attributes_avgSnr)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_integrationName)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_isDeviceMounted)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_latAntenna)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_lngAntenna)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_messageType)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_rssi)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_sampleTime)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_snr)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_station)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_battIntStat)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_batteryReading)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_counterTotal)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_pulses12hFrame)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_pulses24h12hFrame)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_pulses36h24hFrame)]]></field-expr> <field-expr><![CDATA[ESP_aLag(computedColumn, 1)]]></field-expr> </output> </window-aggregate> <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute2"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> <field name="computedColumn" type="int32"/> </fields> </schema> <output> <field-expr><![CDATA[attributes_avgSnr]]></field-expr> <field-expr><![CDATA[attributes_integrationName]]></field-expr> <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr> <field-expr><![CDATA[attributes_latAntenna]]></field-expr> <field-expr><![CDATA[attributes_lngAntenna]]></field-expr> <field-expr><![CDATA[attributes_messageType]]></field-expr> <field-expr><![CDATA[attributes_rssi]]></field-expr> <field-expr><![CDATA[attributes_sampleTime]]></field-expr> <field-expr><![CDATA[attributes_snr]]></field-expr> <field-expr><![CDATA[attributes_station]]></field-expr> <field-expr><![CDATA[telemetry_battIntStat]]></field-expr> <field-expr><![CDATA[telemetry_batteryReading]]></field-expr> <field-expr><![CDATA[telemetry_counterTotal]]></field-expr> <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr> <field-expr><![CDATA[telemetry_counterTotal/100 + computedColumn]]></field-expr> </output> </window-compute> </windows> <edges> <edge source="sigfox_source" target="Copy1"/> <edge source="Copy1" target="Compute1"/> <edge source="Compute1" target="Aggregate1"/> <edge source="Aggregate1" target="Compute2"/> </edges> </contquery> </contqueries> </project>
If there are any additional questions, I'm here. I have trying last three days to get this done but haven't luck.
Thanks in advance.
Hi,
Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?
Fred
Hi,
Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?
Fred
Whether you're already using SAS Event Stream Processing or thinking about it, this is where you can connect with your peers, ask questions and find resources.
Learn how to run multiple linear regression models with and without interactions, presented by SAS user Alex Chaplin.
Find more tutorials on the SAS Users YouTube channel.