This topic contains the following subtopics:
This section introduces a sample Python streaming UDX for SQLstream. Here, readfile.py is a Python function that reads lines from a set of files defined in a specified directory and returns the filename, line number and line as an output which will be returned to SQLstream.
This UDX is unusual in that it emits streaming data but it does not have a streaming input. It is an example of using a Python UDX as a source plugin.
#!/usr/bin/python, # coding=utf-8 from pyUdx import Connection import time import os import re connection=Connection() out = connection.getOutput() cursor = connection.getInput(0) fdir = connection.getParameter("dirname") fpatString = connection.getParameter("fpattern") fpat = re.compile(fpatString) filenames = [f for f in os.listdir(fdir) if re.match(fpat, f)] for fn in filenames: inFile = open(fdir+'/'+fn, "r") lines = inFile.read().split('\n') inFile.close() millis = int(round(time.time() * 1000)) i = 0 for line in lines: i += 1 out.executeUpdate(millis,fn,i,line)
In this example,
Just like a Java UDX, for a Python UDX to be available in s-Server you need to declare it as a function in SQL. For more details, see the topic CREATE FUNCTION in the Streaming SQL Reference Guide. The following sample code readfile.sql defines the SQL function that is implemented by the above UDX.
In this readfile example we define a dummy stream to use as input. We are not actually expecting any data on this stream, but it is a requirement for any Python UDX function to have an input as well as an output.
create or replace schema test; set schema 'test'; set path 'test'; create or replace stream dummy (dummy int); drop function readfile; create or replace function readfile(C cursor, "dirname" varchar(250), "fpattern" varchar(250)) returns table ( rowtime timestamp not null, fname varchar(100), line_no int, line varchar(100) ) language EXTERNAL no sql no state external name '/usr/bin/python3 /home/sqlstream/pyread/readfile.py';
You can call the function like this:
set schema 'test';, set path 'test'; select stream * from stream( readfile ( cursor(select stream * from dummy) -- the unused input stream , '/home/sqlstream/pythonudx' -- the directory to search ,'.*.sql' -- the regex file matching pattern ));
Here is a python module which extracts some columns from the input and emits a stringified concatenation of those columns. The mechanism of stringify is not important here.
#!/usr/bin/python from pyUdx import getConnection import numpy as np from testHelper import stringify udx = getConnection() cursor = udx.getInput(0) rowSlice = cursor.bindToArray(udx.getParameter("WHICH")) out = udx.getOutput() while cursor.next(): out.executeUpdate(stringify(rowSlice))
The corresponding SQL function can be defined with a cursor and a signature:
create or replace function makestring(C cursor, WHICH select from c) returns table (C.* passthrough ,outstring varchar(1024) ) language EXTERNAL no sql no state external name '/usr/bin/python3 /home/sqlstream/makestring.py';
In this example the cursor parameter is C and the signature is WHICH. The output signature for the rows consists of all the columns of C - which are passed through unchanged (the passthrough modifier) plus the calculated column outstring.
The function can be called, passing a kind of row type row(f1,f2) as follows:
select * from stream(makestring(cursor(select * from someSourceString), row(f1,f2)));
This tells the UDX framework which input columns (f1 and f2) will be concatenated together; and within the Python module, the rowtype can be used in this way:
rowSlice = cursor.bindToArray(udx.getParameter("WHICH"))
This binding happens once at the beginning of UDX. Within the while cursor.next() loop, the input column values for columns f1 and f2 are copied into rowslice. Then out.executeUpdate(stringify(rowSlice)) passes those to the stringify function, and the string result is returned into the outstring column.
If the names of columns to be read by the Python UDX are known / fixed, they can be coded into the UDX itself and don't need to be specified in the SQL function definition, or in the call to the function.
columnList = [ 'col1', 'col2', 'col3'] rowSliceGetter = cursor.bindToArray(tuple(columnList))
These are the mappings between SQL and Python types supported by the SQLstream Python UDX framework.
Note that this table only applies to data that actually needs to be handled by the Python UDX function. That includes:
Any column values that are simply passed through without change never get handed from SQLstream to Pythonm so are never converted between SQL and Python types.
|SQL Data Type||Python Type||Notes|
|DATE||N/A||You may CAST(date_column AS TIMESTAMP) and use the timestamp (as an int) in Python|
|DECIMAL||int||DECIMAL scale 0 can safely be treated as int
Decimal with fractions treated as implicit fixed point so DECIMAL(5,3) value 5.230 is mapped to the integer 5230. A DECIMAL(6,4) value 5.2300 would be mapped to 52300 - so make sure you know how to work with decimals of different scales.
|INTERVAL||N/A||SQLstream does not support INTERVAL as a column value|
|NUMERIC||N/A||You may CAST(numeric_column as DECIMAL(p,s)) and then treat as int in Python (taking scale into account as for DECIMAL above).|
|REAL||N/A||You may CAST(real_column AS FLOAT)|
|TIMESTAMP||int||The int value contains the Unix epoch in milliseconds|
|VARBINARY||bytes||Not currently supported for output|
When reading from a table or stream, any unsupported types input types are represented by the (string) value 'unhandled!!!'.
If you find any problems with a python UDX, your first ports of call for more information are:
Any errors reported to the trace file should also be reported into the ALL_TRACE global error stream and SAVED_TRACE error table.
You can add trace messages into your Python UDX using the traceXXX(message) functions of the
Connection object. The supported trace levels are as for Java logging:
You can get the current trace level using getTraceLevel(), and you may provide the trace level as a parameter to the trace(message) function:
traceLevel = connection.getTraceLevel() connection.trace(pyUdx.TRACE_INFO, "info level testMessage tracelevel=%d" % traceLevel)
The trace level can be any of:
pyUdx.TRACE_OFF pyUdx.TRACE_SEVERE pyUdx.TRACE_WARNING pyUdx.TRACE_INFO pyUdx.TRACE_CONFIG pyUdx.TRACE_FINE pyUdx.TRACE_FINER pyUdx.TRACE_FINEST
Here is a complete Python UDX
movefile.py with tracing examples:
#!/usr/bin/python # Takes files from one directory and moves them to another (if the directory names are different) # Filename, input directory, output directory must be passed in that order as a row(fn, in, out) # passes data through from pyUdx import Connection import shutil import os connection=Connection() out = connection.getOutput() cursor = connection.getInput(0) rowSlice = cursor.bind(connection.getParameter("WHICH")) while cursor.next(): (fn, inDir, outDir) = rowSlice.value() msg = None success = False try: # some basic checks if not os.path.exists(inDir): msg = "Error: input directory '%s' does not exist" % inDir elif not os.path.exists(outDir): msg = "Error: output directory '%s' does not exist" % outDir if msg == None: msg = shutil.move(inDir + '/' + fn, outDir+'/'+fn) success = True connection.traceInfo("Moved %s from %s to %s" % (fn, inDir, outDir)) except Exception as e: # move failed - why? msg = str(e) connection.traceSevere("Moving %s from %s to %s - %s" % (fn, inDir, outDir, msg)) out.executeUpdate(success, msg)