1 package de.dlr.shepard.influxDB;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.InputStreamReader;
6 import java.io.OutputStreamWriter;
7 import java.nio.charset.StandardCharsets;
8 import java.nio.file.Files;
9 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Locale;
13 import java.util.Objects;
14
15 import com.opencsv.bean.CsvToBeanBuilder;
16 import com.opencsv.bean.StatefulBeanToCsvBuilder;
17 import com.opencsv.exceptions.CsvException;
18
19 import de.dlr.shepard.exceptions.InvalidBodyException;
20 import lombok.extern.slf4j.Slf4j;
21
22 @Slf4j
23 public class CsvConverter {
24
25 public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
26 var tmpfile = Files.createTempFile("shepard", ".csv");
27 var stream = Files.newOutputStream(tmpfile);
28 var streamWriter = new OutputStreamWriter(stream);
29 var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).build();
30 log.debug("Write temp file to: {}", tmpfile.toAbsolutePath().toString());
31
32 for (var payload : payloads) {
33 try {
34 writer.write(convertPayloadToCsv(payload));
35 } catch (CsvException e) {
36 log.error("CsvException while writing stream");
37 }
38 }
39
40 streamWriter.close();
41 var result = Files.newInputStream(tmpfile);
42 return result;
43 }
44
45 public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
46 var reader = new InputStreamReader(stream);
47 var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader).withType(TimeseriesCsv.class)
48 .withErrorLocale(Locale.forLanguageTag("en")).withExceptionHandler(e -> {
49 var encoder = StandardCharsets.ISO_8859_1.newEncoder();
50 var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
51 log.error("CsvException while reading stream: {}", message);
52 throw new InvalidBodyException(message);
53 }).build();
54
55 List<TimeseriesCsv> result = cb.parse();
56 reader.close();
57 return convertCsvToPayload(result);
58 }
59
60 private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
61 var ts = payload.getTimeseries();
62 var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
63 for (var p : payload.getPoints()) {
64 var value = p.getValue() != null ? p.getValue().toString() : null;
65 var tsc = new TimeseriesCsv(p.getTimeInNanoseconds(), ts.getMeasurement(), ts.getDevice(), ts.getLocation(),
66 ts.getSymbolicName(), ts.getField(), value);
67 result.add(tsc);
68 }
69 return result;
70 }
71
72 private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
73 var result = new HashMap<Integer, TimeseriesPayload>();
74 for (var input : inputList) {
75 var key = Objects.hash(input.getMeasurement(), input.getDevice(), input.getLocation(),
76 input.getSymbolicName(), input.getField());
77 var point = new InfluxPoint(input.getTimestamp(), input.getValue());
78 if (result.containsKey(key)) {
79 result.get(key).getPoints().add(point);
80 } else {
81 var points = new ArrayList<InfluxPoint>();
82 points.add(point);
83 var payload = new TimeseriesPayload(new Timeseries(input.getMeasurement(), input.getDevice(),
84 input.getLocation(), input.getSymbolicName(), input.getField()), points);
85 result.put(key, payload);
86 }
87 }
88 return new ArrayList<>(result.values());
89 }
90
91 }