Add calculated signals to your data lake
In this section we explain how you can customize your Lambda function to add custom signal calculations - and output the results as Parquet files in your data lake.
Table of Contents
Add calculated signals via Grafana and/or Athena
If you are using Grafana-Athena, you can handle 90%+ of custom signal calculations via Grafana’s powerful transformations and/or custom Athena SQL queries, as described in our guide.
Note
We strongly recommend to review if your custom signals can be handled in the frontend before proceeding - feel free to contact us for sparring
Add calculated signals via Lambda
If your custom signals cannot be handled in the frontend, you can instead create new Parquet data lake tables that contain your calculated signals, moving the processing to your backend.
To create a custom signal table via your Lambda, follow below steps:
- Download our example
functions.py
below and modify it as per your needs - Follow the steps to set up a custom Lambda function incl. ARN layer
Function explained
The example function does the following:
- Start by uploading the regular Parquet files to the S3 output bucket
- Next, loop through a number of user-defined ‘custom message’ objects
- Load the messages in
messages_filtered_list
into a data frame - Optionally resample the data frame (required for cross-message calculations)
- Add new custom signal(s), calculated based on signals in the data frame[1]
- Filter the data frame to e.g. only include new signal(s) and store as new Parquet file
- Upload the new custom Parquet file to the S3 output bucket
Note
Make sure to use valid custom message/signal names[2]
Special case: Add custom geofences
The default custom message example lets you define a list of custom geofences and evaluate if the device is inside/outside the geofences. This leverages functions found in the utils.py
.
Special case: Combine J1939 from multiple source addresses
Another default custom message example lets you load Parquet data from all DM01 messages (regardless of source address). This is useful if you wish to create a single table that contains the combined data (without resampling), while also including the source address value.
Special case: Export a single resampled Parquet/CSV
You may want to create a single CSV/Parquet that contains all signals with a shared timestamp. This can be done by using the commented out ALL_DATA_RESAMPLED
example. Doing so will resample all messages to a shared frequency, prefix all signals with their message names - and combine the data into a single data frame. If you wish to export a CSV instead of Parquet you can use df.to_csv("myfolder/xyz.csv")
to save the data frame as a CSV instead. For ad hoc purposes, this can be run locally as explained in the custom Lambda section.
[1] | The example code creates a number of new signals using the internal CANedge data, e.g. including custom geofence-based area IDs and accumulating delta trip distance based on speed conditions. You should of course modify this to match your use case requirements |
[2] | See the Athena naming guidelines for tables (aka messages) and columns (aka signals) |