Hi everyone,
I have some logical problem. I need to implement next logic in my flow:
sample source:
deviceName deviceType time telemetry_counterTotal
A1 X 9PM 10800
A1 X 10PM 10600
A1 X 11PM 11500
The formula for desired columns (let's call it here computedColumn) is (telemetry_counterTotal / 100.00 ) + LAG(computedColumn, 1*)
*This is computedColumn from the previous iteration, but if it's the first iteration, of course, it's zero.
Desired output:
deviceName deviceType time telemetry_counterTotal computedColumn
A1 X 11PM 10800 329.00
A1 X 10PM 10600 221.00
A1 X 9PM 11500 115.00
Below is my code sample, I don't have big experience with DS2 so I tried to go around, but not sure if it even possible do this only with aggregate and compute windows, but I think this can't be so big problem. Any hint, suggestion and especially, code sample would be really helpful. Also, any kind of documentation or examples with similar problems I would appreciate.
<project name="project_test_f" threads="1" heartbeat-interval="1" pubsub="auto"> <metadata> <meta id="layout">{"cq":{"Aggregate1":{"x":-185,"y":300},"Compute1":{"x":-160,"y":170},"Compute2":{"x":-185,"y":435},"Copy1":{"x":-150,"y":35},"sigfox_source":{"x":-180,"y":-100}}}</meta> <meta id="studioModifiedBy">sasboot</meta> <meta id="studioModified">1539092809002</meta> <meta id="studioUploadedBy">sasboot</meta> <meta id="studioUploaded">1538120251741</meta> </metadata> <mas-modules> <mas-module module="New_Module_1" language="ds2" func-names="get_calc_total"> <code><![CDATA[data esp.out ; declare counterTotal2 ; retain counterTotal_last counterTotal_lag; method run() ; set esp.in ; if isnull(counterTotal_lag) then counterTotal2 =counterTotal_last/100 ; end ; enddata ;]]></code> </mas-module> </mas-modules> <contqueries> <contquery name="cq"> <windows> <window-source pubsub="true" index="pi_EMPTY" output-insert-only="true" name="sigfox_source"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> </fields> </schema> </window-source> <window-copy pubsub="true" output-insert-only="true" index="pi_EMPTY" name="Copy1"/> <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute1"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> <field name="computedColumn" type="int32"/> </fields> </schema> <output> <field-expr><![CDATA[attributes_avgSnr]]></field-expr> <field-expr><![CDATA[attributes_integrationName]]></field-expr> <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr> <field-expr><![CDATA[attributes_latAntenna]]></field-expr> <field-expr><![CDATA[attributes_lngAntenna]]></field-expr> <field-expr><![CDATA[attributes_messageType]]></field-expr> <field-expr><![CDATA[attributes_rssi]]></field-expr> <field-expr><![CDATA[attributes_sampleTime]]></field-expr> <field-expr><![CDATA[attributes_snr]]></field-expr> <field-expr><![CDATA[attributes_station]]></field-expr> <field-expr><![CDATA[telemetry_battIntStat]]></field-expr> <field-expr><![CDATA[telemetry_batteryReading]]></field-expr> <field-expr><![CDATA[telemetry_counterTotal]]></field-expr> <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr> <field-expr><![CDATA[0]]></field-expr> </output> </window-compute> <window-aggregate pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Aggregate1"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> <field name="computedColumn" type="int32"/> </fields> </schema> <output> <field-expr><![CDATA[ESP_aLast(attributes_avgSnr)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_integrationName)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_isDeviceMounted)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_latAntenna)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_lngAntenna)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_messageType)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_rssi)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_sampleTime)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_snr)]]></field-expr> <field-expr><![CDATA[ESP_aLast(attributes_station)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_battIntStat)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_batteryReading)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_counterTotal)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_pulses12hFrame)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_pulses24h12hFrame)]]></field-expr> <field-expr><![CDATA[ESP_aLast(telemetry_pulses36h24hFrame)]]></field-expr> <field-expr><![CDATA[ESP_aLag(computedColumn, 1)]]></field-expr> </output> </window-aggregate> <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute2"> <schema> <fields> <field name="attributes_avgSnr" type="string"/> <field name="attributes_integrationName" type="string"/> <field name="attributes_isDeviceMounted" type="int32"/> <field name="attributes_latAntenna" type="string"/> <field name="attributes_lngAntenna" type="string"/> <field name="attributes_messageType" type="string"/> <field name="attributes_rssi" type="string"/> <field name="attributes_sampleTime" type="stamp"/> <field name="attributes_snr" type="string"/> <field name="attributes_station" type="string"/> <field name="deviceName" type="string" key="true"/> <field name="deviceType" type="string" key="true"/> <field name="telemetry_battIntStat" type="string"/> <field name="telemetry_batteryReading" type="int32"/> <field name="telemetry_counterTotal" type="int32"/> <field name="telemetry_pulses12hFrame" type="int32"/> <field name="telemetry_pulses24h12hFrame" type="int32"/> <field name="telemetry_pulses36h24hFrame" type="int32"/> <field name="computedColumn" type="int32"/> </fields> </schema> <output> <field-expr><![CDATA[attributes_avgSnr]]></field-expr> <field-expr><![CDATA[attributes_integrationName]]></field-expr> <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr> <field-expr><![CDATA[attributes_latAntenna]]></field-expr> <field-expr><![CDATA[attributes_lngAntenna]]></field-expr> <field-expr><![CDATA[attributes_messageType]]></field-expr> <field-expr><![CDATA[attributes_rssi]]></field-expr> <field-expr><![CDATA[attributes_sampleTime]]></field-expr> <field-expr><![CDATA[attributes_snr]]></field-expr> <field-expr><![CDATA[attributes_station]]></field-expr> <field-expr><![CDATA[telemetry_battIntStat]]></field-expr> <field-expr><![CDATA[telemetry_batteryReading]]></field-expr> <field-expr><![CDATA[telemetry_counterTotal]]></field-expr> <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr> <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr> <field-expr><![CDATA[telemetry_counterTotal/100 + computedColumn]]></field-expr> </output> </window-compute> </windows> <edges> <edge source="sigfox_source" target="Copy1"/> <edge source="Copy1" target="Compute1"/> <edge source="Compute1" target="Aggregate1"/> <edge source="Aggregate1" target="Compute2"/> </edges> </contquery> </contqueries> </project>
If there are any additional questions, I'm here. I have trying last three days to get this done but haven't luck.
Thanks in advance.
Hi,
Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?
Fred
Hi,
Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?
Fred
Whether you're already using SAS Event Stream Processing or thinking about it, this is where you can connect with your peers, ask questions and find resources.