#!/usr/bin/env python # standard imports from ctypes import * import os import sys import platform import logging # import ESP pubsub and modeling apis dfesphome = os.environ["DFESP_HOME"] if platform.system() == 'Linux': sys.path.append(dfesphome + '/lib') else: sys.path.append(dfesphome + '/bin') import pubsubApi import modelingApi ''' We optionally can define a callback function for publisher failures given we may want to try to reconnect/recover, but in this example we will just print out some error information. The cbf has an optional context pointer for sharing state across calls or passing state into calls. ''' def pubErrCbFunc(failure, code, ctx): # don't output anything for client busy if failure == pubsubApi.pubsubFail_APIFAIL and code == pubsubApi.pubsubCode_CLIENTEVENTSQUEUED: return failMsg = pubsubApi.DecodeFailure(failure) codeMsg = pubsubApi.DecodeFailureCode(code) print('Client services error: ' + failMsg + codeMsg) ''' This is a publisher client example for using the ESP pub/sub API. The events that are published to the server are read from a file, so this application needs a binary event file supplied to it. One could test this against the subscribeServer example provided in the ESP server distributions. If doing so, simply use the same event input file already provided for subscribeServer, which ensures that the events published by this client are valid for a window in the subscribeServer example. This also requires that the continuous query and window command line parameters are "subscribeServer" and "tradesWindow" respectively. Finally, to isolate events to those published by this client, simply run the server without providing the event input file. ''' def main(): # check command line arguments if len(sys.argv) != 3: print('Usage: python publishClient.py ') raise SystemExit url = sys.argv[1] binfile = sys.argv[2] # Initialize publishing capabilities. This is the first pub/sub API call that must be made, and it only needs to be called once. The first parameter is the log level, so we are turning logging off for this example. The second parameter provides the ability to provide a log configuration file path to configure how logging works, NULL will use the defaults, but for us this does not matter given we're turning it off. ret = pubsubApi.Init(modelingApi.ll_Off, None) if ret == 0: print('Could not initialize ESP pubsub library') raise SystemExit # get the logger logger = logging.getLogger() logger.addHandler(modelingApi.getLoggingHandler()) # Get the window schema string. The URL for this is as follows: # dfESP://host:port/project/contquery/window?get=schema schemaurl = url + '?get=schema' stringv = pubsubApi.QueryMeta(schemaurl) if stringv == None: logger.error('Could not get ESP source window schema') pubsubApi.Shutdown() raise SystemExit schema = modelingApi.StringVGet(stringv, 0) if schema == None: logger.error('Could not get ESP schema from query response') pubsubApi.Shutdown() raise SystemExit print(schema) # use the schema string to create the schema object needed to pass to modelingApi.EventBlockNew2() below schemaptr = modelingApi.SchemaCreate('myschema', schema) if schemaptr == None: logger.error('Could not build ESP source window schema') pubsubApi.Shutdown() raise SystemExit # free up the vector, else memory leak modelingApi.StringVFree(stringv) # Start this publish session. This validates publish connection parameters, but does not make the actual connection. The parameter for this call is the following URL which was provided as an argument: dfESP://host:port/project/contquery/window pubErrFunc = pubsubApi.ERRCBFUNC(pubErrCbFunc) pub = pubsubApi.PublisherStart(url, pubErrFunc, None) if pub == None: logger.error('Could not create ESP publisher client') pubsubApi.Shutdown() raise SystemExit # now make the actual connection to the ESP application or server ret = pubsubApi.Connect(pub) if ret != 1: logger.error('Could not connect ESP publisher client') pubsubApi.Stop(pub, 0) pubsubApi.Shutdown() raise SystemExit # ******** NOW LET'S INJECT TRADE EVENTS INTO THE RUNNING PROJECT ******** # trade event blocks are in binary form and are coming via stdin with open(binfile, "rb") as fp: while True: # create event block eb = modelingApi.EventBlockNew2(fp, schemaptr) if eb == None: break # publish the event block to the ESP server ret = pubsubApi.PublisherInject(pub, eb) modelingApi.EventBlockDestroy(eb) if ret != 1: logger.error('Could not inject ESP event block') pubsubApi.Stop(pub, 0) pubsubApi.Shutdown() pub = None raise SystemExit # delete the schema object modelingApi.SchemaDelete(schemaptr) # stop pubsub, but block (i.e., true) to ensure that all queued events are first processed pubsubApi.Stop(pub, 1) pubsubApi.Shutdown() if __name__ == '__main__': main()