CsvConverter.java
package de.dlr.shepard.influxDB;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import com.opencsv.bean.CsvToBeanBuilder;
import com.opencsv.bean.StatefulBeanToCsvBuilder;
import com.opencsv.exceptions.CsvException;
import de.dlr.shepard.exceptions.InvalidBodyException;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class CsvConverter {
public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
var tmpfile = Files.createTempFile("shepard", ".csv");
var stream = Files.newOutputStream(tmpfile);
var streamWriter = new OutputStreamWriter(stream);
var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).build();
log.debug("Write temp file to: {}", tmpfile.toAbsolutePath().toString());
for (var payload : payloads) {
try {
writer.write(convertPayloadToCsv(payload));
} catch (CsvException e) {
log.error("CsvException while writing stream");
}
}
streamWriter.close();
var result = Files.newInputStream(tmpfile);
return result;
}
public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
var reader = new InputStreamReader(stream);
var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader).withType(TimeseriesCsv.class)
.withErrorLocale(Locale.forLanguageTag("en")).withExceptionHandler(e -> {
var encoder = StandardCharsets.ISO_8859_1.newEncoder();
var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
log.error("CsvException while reading stream: {}", message);
throw new InvalidBodyException(message);
}).build();
List<TimeseriesCsv> result = cb.parse();
reader.close();
return convertCsvToPayload(result);
}
private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
var ts = payload.getTimeseries();
var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
for (var p : payload.getPoints()) {
var value = p.getValue() != null ? p.getValue().toString() : null;
var tsc = new TimeseriesCsv(p.getTimeInNanoseconds(), ts.getMeasurement(), ts.getDevice(), ts.getLocation(),
ts.getSymbolicName(), ts.getField(), value);
result.add(tsc);
}
return result;
}
private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
var result = new HashMap<Integer, TimeseriesPayload>();
for (var input : inputList) {
var key = Objects.hash(input.getMeasurement(), input.getDevice(), input.getLocation(),
input.getSymbolicName(), input.getField());
var point = new InfluxPoint(input.getTimestamp(), input.getValue());
if (result.containsKey(key)) {
result.get(key).getPoints().add(point);
} else {
var points = new ArrayList<InfluxPoint>();
points.add(point);
var payload = new TimeseriesPayload(new Timeseries(input.getMeasurement(), input.getDevice(),
input.getLocation(), input.getSymbolicName(), input.getField()), points);
result.put(key, payload);
}
}
return new ArrayList<>(result.values());
}
}