1 package de.dlr.shepard.influxDB; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.io.InputStreamReader; 6 import java.io.OutputStreamWriter; 7 import java.nio.charset.StandardCharsets; 8 import java.nio.file.Files; 9 import java.util.ArrayList; 10 import java.util.HashMap; 11 import java.util.List; 12 import java.util.Locale; 13 import java.util.Objects; 14 15 import com.opencsv.bean.CsvToBeanBuilder; 16 import com.opencsv.bean.StatefulBeanToCsvBuilder; 17 import com.opencsv.exceptions.CsvException; 18 19 import de.dlr.shepard.exceptions.InvalidBodyException; 20 import lombok.extern.slf4j.Slf4j; 21 22 @Slf4j 23 public class CsvConverter { 24 25 public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException { 26 var tmpfile = Files.createTempFile("shepard", ".csv"); 27 var stream = Files.newOutputStream(tmpfile); 28 var streamWriter = new OutputStreamWriter(stream); 29 var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).build(); 30 log.debug("Write temp file to: {}", tmpfile.toAbsolutePath().toString()); 31 32 for (var payload : payloads) { 33 try { 34 writer.write(convertPayloadToCsv(payload)); 35 } catch (CsvException e) { 36 log.error("CsvException while writing stream"); 37 } 38 } 39 40 streamWriter.close(); 41 var result = Files.newInputStream(tmpfile); 42 return result; 43 } 44 45 public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException { 46 var reader = new InputStreamReader(stream); 47 var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader).withType(TimeseriesCsv.class) 48 .withErrorLocale(Locale.forLanguageTag("en")).withExceptionHandler(e -> { 49 var encoder = StandardCharsets.ISO_8859_1.newEncoder(); 50 var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV"; 51 log.error("CsvException while reading stream: {}", message); 52 throw new InvalidBodyException(message); 53 }).build(); 54 55 List<TimeseriesCsv> result = cb.parse(); 56 reader.close(); 57 return convertCsvToPayload(result); 58 } 59 60 private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) { 61 var ts = payload.getTimeseries(); 62 var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size()); 63 for (var p : payload.getPoints()) { 64 var value = p.getValue() != null ? p.getValue().toString() : null; 65 var tsc = new TimeseriesCsv(p.getTimeInNanoseconds(), ts.getMeasurement(), ts.getDevice(), ts.getLocation(), 66 ts.getSymbolicName(), ts.getField(), value); 67 result.add(tsc); 68 } 69 return result; 70 } 71 72 private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) { 73 var result = new HashMap<Integer, TimeseriesPayload>(); 74 for (var input : inputList) { 75 var key = Objects.hash(input.getMeasurement(), input.getDevice(), input.getLocation(), 76 input.getSymbolicName(), input.getField()); 77 var point = new InfluxPoint(input.getTimestamp(), input.getValue()); 78 if (result.containsKey(key)) { 79 result.get(key).getPoints().add(point); 80 } else { 81 var points = new ArrayList<InfluxPoint>(); 82 points.add(point); 83 var payload = new TimeseriesPayload(new Timeseries(input.getMeasurement(), input.getDevice(), 84 input.getLocation(), input.getSymbolicName(), input.getField()), points); 85 result.put(key, payload); 86 } 87 } 88 return new ArrayList<>(result.values()); 89 } 90 91 }