MxBlog

Using Mendix oData to fill a Data Warehouse

Here's a short example how you can use the oData feature in Mendix to move your App data into a Data Warehouse.

Usually filling a DWH consists of an ETL or ELT process. ETL is the old-fashioned way of doing this: Extract, Transform, Load:

With BigData this process usually looks like ELT:

There are a number of cloud solutions for running a DWH, and Amazon has a number of them. For storage DynamoDB and RedShift are most used. You can implement your ETL process using Data Pipeline, or Lambda. The following example uses Lambda and RedShift.

Redshift is a PostgreSQL compatible data warehouse database. AWS Lambda is service where you can define function and run them based on events. Since last month it has support for scheduled events, i.e., crontab like time based scheduled. And it also has official support for Python.

To move data from Mendix to Redshift we need a python script which does the following:

  1. Periodically, e.g. daily or hourly, retrieve all the updates since previous period
  2. Store the result as CSV on S3
  3. Use redshift copy command to copy the CSV contents into a Redshift table.

This python script retrieves all the objects from an odata resource that have been changed since yesterday:

def extract_data_store_as_csv_on_s3_handler(event=None, context=None):
    print("Extracting data from Mendix application")
    #
    # Use changedDate attribute to only retrieve objects changed since yesterday
    #
    yesterday = date.today() - datetime.timedelta(days=1)
    yesterdayStr = yesterday.strftime("%Y-%m-%dT00:00:00+01:00")
    filterParameter = "?$filter=changedDate gt datetimeoffset'{}'".format(yesterdayStr)
    #
    # Define the endpoint of the odata resource
    #  
    odataPath = "/odata/modulename/resourcename()"
    odataUrl = "{}{}{}".format("https://appsname.mxapps.io/",odataPath,filterParameter)
    print("Retrieving odata resource: {}".format(odataUrl))
    #
    # get the data from the mendix odata endpoint
    #
    odataUsr = "mendix_user"
    odataPwd = "mendix_pwd"
    req = requests.get(odataUrl,auth=(odataUsr,odataPwd))

Next we need to convert it to csv:

    #
    # Define odata namespaces used by the xml returned
    #
    nsa = "{http://www.w3.org/2005/Atom}"
    nsm = "{http://schemas.microsoft.com/ado/2007/08/dataservices/metadata}"
    nsd = "{http://schemas.microsoft.com/ado/2007/08/dataservices}" 

    #
    # create the csv file in /tmp
    # 
    tmpPath = "/tmp"
    csvFilename = "odata-{}.csv".format(date.today().strftime("%Y%m%d"))
    with io.open("{}{}".format(tmpPath,csvFilename),'wb') as csvfile:
       odatawriter = csv.writer(csvfile, lineterminator='\n', delimiter=',',quotechar='"', 
         quoting=csv.QUOTE_MINIMAL,encoding='utf-8')
       #
       # Loop over all the entries, i.e. objects, in the odata xml
       #
       row = 0
       root = ET.fromstring(req.content)
       entries = root.findall(".//{0}entry".format(nsa))
       for entry in entries:
         property = entry.findall("./{0}content/{1}properties/*".format(nsa,nsm))
         #
         # Write out a first line with header names
         #
         if row == 0:
          nameList = [name.tag.split("}")[1].decode("utf-8") for name in property]
          # To avoid error message in Excel, we need to change ID to Id
          # Excel has detected that ... is a SYLK file, but cannot load it. Either the file has errors or it is not
          # a SYLK file format. Click OK to try to open the file in a different format.
          if(nameList[0] == u"ID"):
              nameList[0] = u"Id" 
          odatawriter.writerow(nameList)
          row = row + 1
         valueList = [p.text.encode('utf-8').decode("utf-8") if p.text is not None else "" for p in property]
         odatawriter.writerow(valueList)

When we have the csv file we can copy it to s3. For this we use the boto3 python library:

    s3 = boto3.resource('s3')
    s3Bucket = 'bucketname'
    s3Key = 'mxappcsv/{}'.format(csvFilename)
    s3.Object(s3Bucket,s3Key).put(Body=open("{}{}".format(tmpPath,csvFilename), 'rb'))

Last step is to copy the csv from s3 into RedShift.

def copyS3CsvToDwh(s3Bucket=None,s3Key=None):
    print("Copying csv to Redshift")
    #
    # Use psycopg2 library to connect to your RedShift instance
    #
    conn = psycopg2.connect(
       host=RS_HOST,
       user=RS_DWH_ADMIN,
       port=RS_PORT,
       password=RS_DWH_PWD,
       dbname=RS_DWH_DBNAME)
    cur = conn.cursor() # create a cursor for executing queries
    s3Copy = """
       copy table_name from 's3://{}/{}' credentials 'aws_access_key_id={};aws_secret_access_key={}'
       csv delimiter ','
       null 'NA'
       ACCEPTINVCHARS
       timeformat 'auto'
       TRUNCATECOLUMNS
       IGNOREHEADER 1;
    """.format(s3Bucket,s3Key,AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY)

    try:
       print("Executing copy cmd: {}".format(s3Copy))
       cur.execute(s3Copy)
       cur.execute("commit;")
       conn.close()
    except Exception as err:
       print(err)

You can now transform the data as copied into Redshift using regular SQL statements which you also run from Lambda using psycopg2.