BookmarkSubscribeRSS Feed
🔒 This topic is solved and locked. Need further help from the community? Please sign in and ask a new question.
jovic92
Obsidian | Level 7

Hi everyone,

 

I have some logical problem. I need to implement next logic in my flow:

 

sample source:

deviceName    deviceType    time     telemetry_counterTotal

    A1                     X              9PM               10800

    A1                     X            10PM               10600

    A1                     X            11PM                11500

 

The formula for desired columns (let's call it here computedColumn) is (telemetry_counterTotal / 100.00 ) + LAG(computedColumn, 1*) 

 

*This is computedColumn from the previous iteration, but if it's the first iteration, of course, it's zero. 

Desired output:

deviceName    deviceType    time     telemetry_counterTotal        computedColumn

    A1                     X             11PM               10800                               329.00

    A1                     X             10PM               10600                               221.00

    A1                     X              9PM                11500                               115.00

 

Below is my code sample, I don't have big experience with DS2 so I tried to go around, but not sure if it even possible do this only with aggregate and compute windows, but I think this can't be so big problem. Any hint, suggestion and especially, code sample would be really helpful. Also, any kind of documentation or examples with similar problems I would appreciate. 

 

 

 

<project name="project_test_f" threads="1" heartbeat-interval="1" pubsub="auto">
  <metadata>
    <meta id="layout">{"cq":{"Aggregate1":{"x":-185,"y":300},"Compute1":{"x":-160,"y":170},"Compute2":{"x":-185,"y":435},"Copy1":{"x":-150,"y":35},"sigfox_source":{"x":-180,"y":-100}}}</meta>
    <meta id="studioModifiedBy">sasboot</meta>
    <meta id="studioModified">1539092809002</meta>
    <meta id="studioUploadedBy">sasboot</meta>
    <meta id="studioUploaded">1538120251741</meta>
  </metadata>
  <mas-modules>
    <mas-module module="New_Module_1" language="ds2" func-names="get_calc_total">
      <code><![CDATA[data esp.out ;
   declare counterTotal2 ;
   retain counterTotal_last counterTotal_lag;
   method run() ;
      set esp.in ;
      if isnull(counterTotal_lag) then counterTotal2 =counterTotal_last/100 ;
   end ;
enddata ;]]></code>
    </mas-module>
  </mas-modules>
  <contqueries>
    <contquery name="cq">
      <windows>
        <window-source pubsub="true" index="pi_EMPTY" output-insert-only="true" name="sigfox_source">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
            </fields>
          </schema>
        </window-source>
        <window-copy pubsub="true" output-insert-only="true" index="pi_EMPTY" name="Copy1"/>
        <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute1">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[attributes_avgSnr]]></field-expr>
            <field-expr><![CDATA[attributes_integrationName]]></field-expr>
            <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr>
            <field-expr><![CDATA[attributes_latAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_lngAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_messageType]]></field-expr>
            <field-expr><![CDATA[attributes_rssi]]></field-expr>
            <field-expr><![CDATA[attributes_sampleTime]]></field-expr>
            <field-expr><![CDATA[attributes_snr]]></field-expr>
            <field-expr><![CDATA[attributes_station]]></field-expr>
            <field-expr><![CDATA[telemetry_battIntStat]]></field-expr>
            <field-expr><![CDATA[telemetry_batteryReading]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr>
            <field-expr><![CDATA[0]]></field-expr>
          </output>
        </window-compute>
        <window-aggregate pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Aggregate1">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[ESP_aLast(attributes_avgSnr)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_integrationName)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_isDeviceMounted)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_latAntenna)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_lngAntenna)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_messageType)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_rssi)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_sampleTime)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_snr)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_station)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_battIntStat)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_batteryReading)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_counterTotal)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses12hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses24h12hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses36h24hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLag(computedColumn, 1)]]></field-expr>
          </output>
        </window-aggregate>
        <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute2">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[attributes_avgSnr]]></field-expr>
            <field-expr><![CDATA[attributes_integrationName]]></field-expr>
            <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr>
            <field-expr><![CDATA[attributes_latAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_lngAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_messageType]]></field-expr>
            <field-expr><![CDATA[attributes_rssi]]></field-expr>
            <field-expr><![CDATA[attributes_sampleTime]]></field-expr>
            <field-expr><![CDATA[attributes_snr]]></field-expr>
            <field-expr><![CDATA[attributes_station]]></field-expr>
            <field-expr><![CDATA[telemetry_battIntStat]]></field-expr>
            <field-expr><![CDATA[telemetry_batteryReading]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal/100 + computedColumn]]></field-expr>
          </output>
        </window-compute>
      </windows>
      <edges>
        <edge source="sigfox_source" target="Copy1"/>
        <edge source="Copy1" target="Compute1"/>
        <edge source="Compute1" target="Aggregate1"/>
        <edge source="Aggregate1" target="Compute2"/>
      </edges>
    </contquery>
  </contqueries>
</project>

 

If there are any additional questions, I'm here. I have trying last three days to get this done but haven't luck.

 

Thanks in advance.

 

1 ACCEPTED SOLUTION

Accepted Solutions
FredCombaneyre
SAS Employee

Hi, 

 

Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?

 

Fred

View solution in original post

2 REPLIES 2
FredCombaneyre
SAS Employee

Hi, 

 

Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?

 

Fred

jovic92
Obsidian | Level 7
Yeah, I was stupid. 😕 Thanks 🙂

Whether you're already using SAS Event Stream Processing or thinking about it, this is where you can connect with your peers, ask questions and find resources.

 

Discussion stats
  • 2 replies
  • 1726 views
  • 1 like
  • 2 in conversation