InfluxUtil.java

  1. package de.dlr.shepard.influxDB;

  2. import java.net.URLDecoder;
  3. import java.nio.charset.StandardCharsets;
  4. import java.time.Instant;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.concurrent.TimeUnit;

  8. import org.influxdb.dto.BatchPoints;
  9. import org.influxdb.dto.BoundParameterQuery;
  10. import org.influxdb.dto.BoundParameterQuery.QueryBuilder;
  11. import org.influxdb.dto.Point;
  12. import org.influxdb.dto.Point.Builder;
  13. import org.influxdb.dto.QueryResult;

  14. import de.dlr.shepard.util.Constants;
  15. import lombok.extern.slf4j.Slf4j;

  16. @Slf4j
  17. public final class InfluxUtil {

  18.     private InfluxUtil() {
  19.         // Static class needs no constructor
  20.     }

  21.     private static final long MULTIPLIER_NANO = 1000000000L;
  22.     private static final String FIELD_FLOAT = "float";
  23.     private static final String FIELD_INT = "integer";
  24.     private static final String FIELD_STRING = "string";
  25.     private static final String FIELD_BOOL = "boolean";

  26.     /**
  27.      * Build an influx query with the given parameters.
  28.      *
  29.      * @param startTimeStamp The beginning of the timeseries
  30.      * @param endTimeStamp   The end of the timeseries
  31.      * @param database       The database to be queried
  32.      * @param timeseries     The timeseries whose points are queried
  33.      * @param function       The aggregate function
  34.      * @param groupBy        The time interval measurements get grouped by
  35.      * @param fillOption     The fill option for missing values
  36.      * @return an influx query
  37.      */
  38.     public static BoundParameterQuery buildQuery(long startTimeStamp, long endTimeStamp, String database,
  39.             Timeseries timeseries, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption) {
  40.         var selectPart = (function != null)
  41.                 ? String.format("SELECT %s(\"%s\")", function.toString(), timeseries.getField())
  42.                 : String.format("SELECT \"%s\"", timeseries.getField());
  43.         var fromPart = String.format("FROM \"%s\"", timeseries.getMeasurement());
  44.         var wherePart = String.format("WHERE time >= %dns AND time <= %dns "
  45.                 + "AND \"device\" = $device AND \"location\" = $location AND \"symbolic_name\" = $symbolic_name",
  46.                 startTimeStamp, endTimeStamp);
  47.         var query = String.join(" ", selectPart, fromPart, wherePart);

  48.         if (groupBy != null) {
  49.             query += String.format(" GROUP BY time(%dns)", groupBy);
  50.         }
  51.         if (fillOption != null) {
  52.             query += String.format(" fill(%s)", fillOption.toString().toLowerCase());
  53.         }
  54.         var parameterizedQuery = QueryBuilder.newQuery(query).forDatabase(database)
  55.                 .bind("device", timeseries.getDevice()).bind("location", timeseries.getLocation())
  56.                 .bind("symbolic_name", timeseries.getSymbolicName()).create();
  57.         log.debug("Query influxdb {}: {} with params {}", database, parameterizedQuery.getCommand(),
  58.                 URLDecoder.decode(parameterizedQuery.getParameterJsonWithUrlEncoded(), StandardCharsets.UTF_8));
  59.         return parameterizedQuery;
  60.     }

  61.     /**
  62.      * Extract TimeseriesPayload from influx query result.
  63.      *
  64.      * @param queryResult Influx query result
  65.      * @param timeseries  the timeseries to extract
  66.      * @return TimeseriesPayload
  67.      */
  68.     public static TimeseriesPayload extractPayload(QueryResult queryResult, Timeseries timeseries) {
  69.         var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
  70.         var influxPoints = new ArrayList<InfluxPoint>(values.size());
  71.         for (var value : values) {
  72.             var time = Instant.parse((String) value.get(0));
  73.             var nanoseconds = time.getEpochSecond() * MULTIPLIER_NANO + time.getNano();
  74.             influxPoints.add(new InfluxPoint(nanoseconds, value.get(1)));
  75.         }
  76.         return new TimeseriesPayload(timeseries, influxPoints);
  77.     }

  78.     /**
  79.      * Create a batch out of a given list of influx points.
  80.      *
  81.      * @param database          The database where the batch is to be stored
  82.      * @param timeseriesPayload TimeseriesPayload to be stored
  83.      * @param expectedType      The expected datatype as string
  84.      * @return influx batch points
  85.      */
  86.     public static BatchPoints createBatch(String database, TimeseriesPayload timeseriesPayload, String expectedType) {
  87.         String error = "";
  88.         BatchPoints batchPoints = BatchPoints.database(database).build();
  89.         var influxPoints = timeseriesPayload.getPoints();
  90.         var timeseries = timeseriesPayload.getTimeseries();

  91.         for (var influxPoint : influxPoints) {
  92.             Builder pointBuilder = Point.measurement(timeseries.getMeasurement())
  93.                     .tag(Constants.LOCATION, timeseries.getLocation()).tag(Constants.DEVICE, timeseries.getDevice())
  94.                     .tag(Constants.SYMBOLICNAME, timeseries.getSymbolicName())
  95.                     .time(influxPoint.getTimeInNanoseconds(), TimeUnit.NANOSECONDS);
  96.             Object value = influxPoint.getValue();

  97.             if (value != null && expectedType.equals(FIELD_STRING)) {
  98.                 // Expected type is string, we use value.toString()
  99.                 pointBuilder.addField(timeseries.getField(), value.toString());
  100.             } else if (value instanceof Number numberValue
  101.                     && (expectedType.equals(FIELD_FLOAT) || expectedType.isBlank())) {
  102.                 // value is a number and float or nothing is expected
  103.                 pointBuilder.addField(timeseries.getField(), numberValue.doubleValue());
  104.             } else if (value instanceof Number numberValue && expectedType.equals(FIELD_INT)) {
  105.                 // value is a number and int or nothing is expected
  106.                 pointBuilder.addField(timeseries.getField(), numberValue.longValue());
  107.             } else if (value instanceof Boolean booleanValue
  108.                     && (expectedType.equals(FIELD_BOOL) || expectedType.isBlank())) {
  109.                 // value is a boolean and boolean or nothing is expected
  110.                 pointBuilder.addField(timeseries.getField(), booleanValue);
  111.             } else if (value != null) {
  112.                 // value has to be casted
  113.                 var stringValue = value.toString();
  114.                 try {
  115.                     switch (expectedType) {
  116.                     case FIELD_FLOAT -> pointBuilder.addField(timeseries.getField(), Double.parseDouble(stringValue));
  117.                     case FIELD_INT -> pointBuilder.addField(timeseries.getField(), Long.parseLong(stringValue));
  118.                     case FIELD_BOOL -> pointBuilder.addField(timeseries.getField(), Boolean.parseBoolean(stringValue));
  119.                     default -> pointBuilder.addField(timeseries.getField(), stringValue);
  120.                     }
  121.                 } catch (NumberFormatException e) {
  122.                     if (error.isBlank())
  123.                         // log the first error
  124.                         error = String.format("Invalid influx point detected, cannot cast type %s into type %s",
  125.                                 stringValue, expectedType);
  126.                 }
  127.             }
  128.             if (pointBuilder.hasFields())
  129.                 batchPoints.point(pointBuilder.build());
  130.         }
  131.         if (!error.isBlank())
  132.             log.error(error);

  133.         return batchPoints;

  134.     }

  135.     /**
  136.      * Checks whether a QueryResult is valid, meaning that it has no errors and the
  137.      * results as well as the series-lists are not empty. If this returns true it is
  138.      * safe to run
  139.      * {@code queryResult.getResults().get(0).getSeries().get(0).getValues()}
  140.      *
  141.      * @param queryResult The QueryResult to be checked.
  142.      * @return False if QueryResult has errors or results or series are empty, true
  143.      *         otherwise.
  144.      */
  145.     public static boolean isQueryResultValid(QueryResult queryResult) {
  146.         if (queryResult == null) {
  147.             log.warn("Query Result is null");
  148.             return false;
  149.         }
  150.         if (queryResult.getError() != null) {
  151.             log.warn("There was an error while querying the Influxdb: {}", queryResult.getError());
  152.             return false;
  153.         }

  154.         var resultList = queryResult.getResults();
  155.         if (resultList == null || resultList.isEmpty())
  156.             return false;

  157.         var result = resultList.get(0);
  158.         if (result.hasError()) {
  159.             log.warn("There was an error while querying the Influxdb: {}", result.getError());
  160.             return false;
  161.         }

  162.         var seriesList = result.getSeries();
  163.         if (seriesList == null || seriesList.isEmpty())
  164.             return false;

  165.         var valueList = seriesList.get(0).getValues();
  166.         if (valueList == null)
  167.             return false;

  168.         return true;
  169.     }

  170.     /**
  171.      * Is the timeseries object valid so that it can be stored in influxdb? This
  172.      * function returns errors if the time series attributes contain illegal
  173.      * characters.
  174.      *
  175.      * @param timeseries The timeseries object to be sanitized
  176.      * @return errors or empty string
  177.      */
  178.     public static String sanitize(Timeseries timeseries) {
  179.         List<String> errors = new ArrayList<>();

  180.         String measurementString = sanitizeString(timeseries.getMeasurement());
  181.         String locationString = sanitizeString(timeseries.getLocation());
  182.         String deviceString = sanitizeString(timeseries.getDevice());
  183.         String symbolicNameString = sanitizeString(timeseries.getSymbolicName());
  184.         String fieldString = sanitizeString(timeseries.getField());

  185.         if (!measurementString.isEmpty()) {
  186.             errors.add("measurement " + measurementString);
  187.         }
  188.         if (!locationString.isEmpty()) {
  189.             errors.add("location " + locationString);
  190.         }
  191.         if (!deviceString.isEmpty()) {
  192.             errors.add("device " + deviceString);
  193.         }
  194.         if (!symbolicNameString.isEmpty()) {
  195.             errors.add("symbolicName " + symbolicNameString);
  196.         }
  197.         if (!fieldString.isEmpty()) {
  198.             errors.add("field " + fieldString);
  199.         }
  200.         return String.join("\n", errors);
  201.     }

  202.     private static String sanitizeString(String s) {
  203.         String[] forbiddenChars = { " ", ".", "/", "," };
  204.         if (s == null || s.isBlank()) {
  205.             return "should not be blank";
  206.         }

  207.         for (String forbiddenChar : forbiddenChars) {
  208.             int pos = s.indexOf(forbiddenChar);
  209.             if (pos != -1) {
  210.                 return "should not contain whitespaces or dots or slashes or commas: "
  211.                         + s.substring(0, pos + forbiddenChar.length());
  212.             }
  213.         }

  214.         return "";
  215.     }
  216. }