BookmarkSubscribeRSS Feed
🔒 This topic is solved and locked. Need further help from the community? Please sign in and ask a new question.
jovic92
Obsidian | Level 7

Hi everyone,

 

I have some logical problem. I need to implement next logic in my flow:

 

sample source:

deviceName    deviceType    time     telemetry_counterTotal

    A1                     X              9PM               10800

    A1                     X            10PM               10600

    A1                     X            11PM                11500

 

The formula for desired columns (let's call it here computedColumn) is (telemetry_counterTotal / 100.00 ) + LAG(computedColumn, 1*) 

 

*This is computedColumn from the previous iteration, but if it's the first iteration, of course, it's zero. 

Desired output:

deviceName    deviceType    time     telemetry_counterTotal        computedColumn

    A1                     X             11PM               10800                               329.00

    A1                     X             10PM               10600                               221.00

    A1                     X              9PM                11500                               115.00

 

Below is my code sample, I don't have big experience with DS2 so I tried to go around, but not sure if it even possible do this only with aggregate and compute windows, but I think this can't be so big problem. Any hint, suggestion and especially, code sample would be really helpful. Also, any kind of documentation or examples with similar problems I would appreciate. 

 

 

 

<project name="project_test_f" threads="1" heartbeat-interval="1" pubsub="auto">
  <metadata>
    <meta id="layout">{"cq":{"Aggregate1":{"x":-185,"y":300},"Compute1":{"x":-160,"y":170},"Compute2":{"x":-185,"y":435},"Copy1":{"x":-150,"y":35},"sigfox_source":{"x":-180,"y":-100}}}</meta>
    <meta id="studioModifiedBy">sasboot</meta>
    <meta id="studioModified">1539092809002</meta>
    <meta id="studioUploadedBy">sasboot</meta>
    <meta id="studioUploaded">1538120251741</meta>
  </metadata>
  <mas-modules>
    <mas-module module="New_Module_1" language="ds2" func-names="get_calc_total">
      <code><![CDATA[data esp.out ;
   declare counterTotal2 ;
   retain counterTotal_last counterTotal_lag;
   method run() ;
      set esp.in ;
      if isnull(counterTotal_lag) then counterTotal2 =counterTotal_last/100 ;
   end ;
enddata ;]]></code>
    </mas-module>
  </mas-modules>
  <contqueries>
    <contquery name="cq">
      <windows>
        <window-source pubsub="true" index="pi_EMPTY" output-insert-only="true" name="sigfox_source">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
            </fields>
          </schema>
        </window-source>
        <window-copy pubsub="true" output-insert-only="true" index="pi_EMPTY" name="Copy1"/>
        <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute1">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[attributes_avgSnr]]></field-expr>
            <field-expr><![CDATA[attributes_integrationName]]></field-expr>
            <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr>
            <field-expr><![CDATA[attributes_latAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_lngAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_messageType]]></field-expr>
            <field-expr><![CDATA[attributes_rssi]]></field-expr>
            <field-expr><![CDATA[attributes_sampleTime]]></field-expr>
            <field-expr><![CDATA[attributes_snr]]></field-expr>
            <field-expr><![CDATA[attributes_station]]></field-expr>
            <field-expr><![CDATA[telemetry_battIntStat]]></field-expr>
            <field-expr><![CDATA[telemetry_batteryReading]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr>
            <field-expr><![CDATA[0]]></field-expr>
          </output>
        </window-compute>
        <window-aggregate pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Aggregate1">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[ESP_aLast(attributes_avgSnr)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_integrationName)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_isDeviceMounted)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_latAntenna)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_lngAntenna)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_messageType)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_rssi)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_sampleTime)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_snr)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(attributes_station)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_battIntStat)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_batteryReading)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_counterTotal)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses12hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses24h12hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLast(telemetry_pulses36h24hFrame)]]></field-expr>
            <field-expr><![CDATA[ESP_aLag(computedColumn, 1)]]></field-expr>
          </output>
        </window-aggregate>
        <window-compute pubsub="true" index="pi_EMPTY" collapse-updates="false" name="Compute2">
          <schema>
            <fields>
              <field name="attributes_avgSnr" type="string"/>
              <field name="attributes_integrationName" type="string"/>
              <field name="attributes_isDeviceMounted" type="int32"/>
              <field name="attributes_latAntenna" type="string"/>
              <field name="attributes_lngAntenna" type="string"/>
              <field name="attributes_messageType" type="string"/>
              <field name="attributes_rssi" type="string"/>
              <field name="attributes_sampleTime" type="stamp"/>
              <field name="attributes_snr" type="string"/>
              <field name="attributes_station" type="string"/>
              <field name="deviceName" type="string" key="true"/>
              <field name="deviceType" type="string" key="true"/>
              <field name="telemetry_battIntStat" type="string"/>
              <field name="telemetry_batteryReading" type="int32"/>
              <field name="telemetry_counterTotal" type="int32"/>
              <field name="telemetry_pulses12hFrame" type="int32"/>
              <field name="telemetry_pulses24h12hFrame" type="int32"/>
              <field name="telemetry_pulses36h24hFrame" type="int32"/>
              <field name="computedColumn" type="int32"/>
            </fields>
          </schema>
          <output>
            <field-expr><![CDATA[attributes_avgSnr]]></field-expr>
            <field-expr><![CDATA[attributes_integrationName]]></field-expr>
            <field-expr><![CDATA[attributes_isDeviceMounted]]></field-expr>
            <field-expr><![CDATA[attributes_latAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_lngAntenna]]></field-expr>
            <field-expr><![CDATA[attributes_messageType]]></field-expr>
            <field-expr><![CDATA[attributes_rssi]]></field-expr>
            <field-expr><![CDATA[attributes_sampleTime]]></field-expr>
            <field-expr><![CDATA[attributes_snr]]></field-expr>
            <field-expr><![CDATA[attributes_station]]></field-expr>
            <field-expr><![CDATA[telemetry_battIntStat]]></field-expr>
            <field-expr><![CDATA[telemetry_batteryReading]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses24h12hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_pulses36h24hFrame]]></field-expr>
            <field-expr><![CDATA[telemetry_counterTotal/100 + computedColumn]]></field-expr>
          </output>
        </window-compute>
      </windows>
      <edges>
        <edge source="sigfox_source" target="Copy1"/>
        <edge source="Copy1" target="Compute1"/>
        <edge source="Compute1" target="Aggregate1"/>
        <edge source="Aggregate1" target="Compute2"/>
      </edges>
    </contquery>
  </contqueries>
</project>

 

If there are any additional questions, I'm here. I have trying last three days to get this done but haven't luck.

 

Thanks in advance.

 

1 ACCEPTED SOLUTION

Accepted Solutions
FredCombaneyre
SAS Employee

Hi, 

 

Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?

 

Fred

View solution in original post

2 REPLIES 2
FredCombaneyre
SAS Employee

Hi, 

 

Not sure I fully understand your problem, but why don't you simply use a compute first to do calculate a temporary field equal to telemetry_counterTotal/100 then an aggregate window to do a ESP_aSum() on this field?

 

Fred

jovic92
Obsidian | Level 7
Yeah, I was stupid. 😕 Thanks 🙂

Whether you're already using SAS Event Stream Processing or thinking about it, this is where you can connect with your peers, ask questions and find resources.

 

Multiple Linear Regression in SAS

Learn how to run multiple linear regression models with and without interactions, presented by SAS user Alex Chaplin.

Find more tutorials on the SAS Users YouTube channel.

Discussion stats
  • 2 replies
  • 1173 views
  • 1 like
  • 2 in conversation