Hi everyone,
I have some logical problem. I need to implement next logic in my flow:
sample source:
deviceName deviceType time telemetry_counterTotal
A1 X 9PM 10800
A1 X 10PM 10600
A1 X 11PM 11500
The formula for desired columns (let's call it here computedColumn) is (telemetry_counterTotal / 100.00 ) + LAG(computedColumn, 1*)
*This is computedColumn from the previous iteration, but if it's the first iteration, of course, it's zero.
Desired output:
deviceName deviceType time telemetry_counterTotal computedColumn
A1 X 11PM 10800 329.00
A1 X 10PM 10600 221.00
A1 X 9PM 11500 115.00
Below is my code sample, I don't have big experience with DS2 so I tried to go around, but not sure if it even possible do this only with aggregate and compute windows, but I think this can't be so big problem. Any hint, suggestion and especially, code sample would be really helpful. Also, any kind of documentation or examples with similar problems I would appreciate.
<project name="project_test_f" threads="1" heartbeat-interval="1" pubsub="auto">
  <metadata>
    <meta id="layout">{"cq":{"Aggregate1":{"x":-185,"y":300},"Compute1":{"x":-160,"y":170},"Compute2":{"x":-185,"y":435},"Copy1":{"x":-150,"y":35},"sigfox_source":{"x":-180,"y":-100}}}</meta>
    <meta id="studioModifiedBy">sasboot</meta>
    <meta id="studioModified">1539092809002</meta>
    <meta id="studioUploadedBy">sasboot</meta>
    <meta id="studioUploaded">1538120251741</meta>
  </metadata>
  <mas-modules>
    <mas-module module="New_Module_1" language="ds2" func-names="get_calc_total">
      <code><![CDATA[data esp.out ;
   declare counterTotal2 ;
   retain counterTotal_last counterTotal_lag;
   method run() ;
      set esp.in ;
      if isnull(counterTotal_lag) then counterTotal2 =counterTotal_last/100 ;
   end ;
enddata ;]]></code>
    </mas-module>
  </mas-modules>
  <contqueries>
    <contquery name="cq">
      <windows>
        <window-source pubsub="true" index="pi_EMPTY" output-insert-only="true" name="sigfox_source">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
            </fields>
          </schema>
        </window-source>
        <window-copy pubsub="true" output-insert-only="true" index="pi_EMPTY" name="Copy1"/>
        <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute1">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[attributes_avgSnr]]></field-expr>
            <field-expr><![CDATA[attributes_integrationName]]></field-expr>
            <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr>
            <field-expr><![CDATA[attributes_latAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_lngAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_messageType]]></field-expr>
            <field-expr><![CDATA[attributes_rssi]]></field-expr>
            <field-expr><![CDATA[attributes_sampleTime]]></field-expr>
            <field-expr><![CDATA[attributes_snr]]></field-expr>
            <field-expr><![CDATA[attributes_station]]></field-expr>
            <field-expr><![CDATA[telemetry_battIntStat]]></field-expr>
            <field-expr><![CDATA[telemetry_batteryReading]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr>
            <field-expr><![CDATA[0]]></field-expr>
          </output>
        </window-compute>
        <window-aggregate pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Aggregate1">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[ESP_aLast(attributes_avgSnr)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_integrationName)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_isDeviceMounted)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_latAntenna)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_lngAntenna)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_messageType)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_rssi)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_sampleTime)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_snr)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_station)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_battIntStat)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_batteryReading)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_counterTotal)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses12hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses24h12hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses36h24hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLag(computedColumn, 1)]]></field-expr>
          </output>
        </window-aggregate>
        <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute2">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[attributes_avgSnr]]></field-expr>
            <field-expr><![CDATA[attributes_integrationName]]></field-expr>
            <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr>
            <field-expr><![CDATA[attributes_latAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_lngAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_messageType]]></field-expr>
            <field-expr><![CDATA[attributes_rssi]]></field-expr>
            <field-expr><![CDATA[attributes_sampleTime]]></field-expr>
            <field-expr><![CDATA[attributes_snr]]></field-expr>
            <field-expr><![CDATA[attributes_station]]></field-expr>
            <field-expr><![CDATA[telemetry_battIntStat]]></field-expr>
            <field-expr><![CDATA[telemetry_batteryReading]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal/100 + computedColumn]]></field-expr>
          </output>
        </window-compute>
      </windows>
      <edges>
        <edge source="sigfox_source" target="Copy1"/>
        <edge source="Copy1" target="Compute1"/>
        <edge source="Compute1" target="Aggregate1"/>
        <edge source="Aggregate1" target="Compute2"/>
      </edges>
    </contquery>
  </contqueries>
</project>
If there are any additional questions, I'm here. I have trying last three days to get this done but haven't luck.
Thanks in advance.
Hi,
Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?
Fred
Hi,
Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?
Fred
Whether you're already using SAS Event Stream Processing or thinking about it, this is where you can connect with your peers, ask questions and find resources.