View Javadoc
1   package de.dlr.shepard.influxDB;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.InputStreamReader;
6   import java.io.OutputStreamWriter;
7   import java.nio.charset.StandardCharsets;
8   import java.nio.file.Files;
9   import java.util.ArrayList;
10  import java.util.HashMap;
11  import java.util.List;
12  import java.util.Locale;
13  import java.util.Objects;
14  
15  import com.opencsv.bean.CsvToBeanBuilder;
16  import com.opencsv.bean.StatefulBeanToCsvBuilder;
17  import com.opencsv.exceptions.CsvException;
18  
19  import de.dlr.shepard.exceptions.InvalidBodyException;
20  import lombok.extern.slf4j.Slf4j;
21  
22  @Slf4j
23  public class CsvConverter {
24  
25  	public InputStream convertToCsv(List<TimeseriesPayload> payloads) throws IOException {
26  		var tmpfile = Files.createTempFile("shepard", ".csv");
27  		var stream = Files.newOutputStream(tmpfile);
28  		var streamWriter = new OutputStreamWriter(stream);
29  		var writer = new StatefulBeanToCsvBuilder<TimeseriesCsv>(streamWriter).build();
30  		log.debug("Write temp file to: {}", tmpfile.toAbsolutePath().toString());
31  
32  		for (var payload : payloads) {
33  			try {
34  				writer.write(convertPayloadToCsv(payload));
35  			} catch (CsvException e) {
36  				log.error("CsvException while writing stream");
37  			}
38  		}
39  
40  		streamWriter.close();
41  		var result = Files.newInputStream(tmpfile);
42  		return result;
43  	}
44  
45  	public List<TimeseriesPayload> convertToPayload(InputStream stream) throws IOException {
46  		var reader = new InputStreamReader(stream);
47  		var cb = new CsvToBeanBuilder<TimeseriesCsv>(reader).withType(TimeseriesCsv.class)
48  				.withErrorLocale(Locale.forLanguageTag("en")).withExceptionHandler(e -> {
49  					var encoder = StandardCharsets.ISO_8859_1.newEncoder();
50  					var message = encoder.canEncode(e.getMessage()) ? e.getMessage() : "Invalid CSV";
51  					log.error("CsvException while reading stream: {}", message);
52  					throw new InvalidBodyException(message);
53  				}).build();
54  
55  		List<TimeseriesCsv> result = cb.parse();
56  		reader.close();
57  		return convertCsvToPayload(result);
58  	}
59  
60  	private List<TimeseriesCsv> convertPayloadToCsv(TimeseriesPayload payload) {
61  		var ts = payload.getTimeseries();
62  		var result = new ArrayList<TimeseriesCsv>(payload.getPoints().size());
63  		for (var p : payload.getPoints()) {
64  			var value = p.getValue() != null ? p.getValue().toString() : null;
65  			var tsc = new TimeseriesCsv(p.getTimeInNanoseconds(), ts.getMeasurement(), ts.getDevice(), ts.getLocation(),
66  					ts.getSymbolicName(), ts.getField(), value);
67  			result.add(tsc);
68  		}
69  		return result;
70  	}
71  
72  	private List<TimeseriesPayload> convertCsvToPayload(List<TimeseriesCsv> inputList) {
73  		var result = new HashMap<Integer, TimeseriesPayload>();
74  		for (var input : inputList) {
75  			var key = Objects.hash(input.getMeasurement(), input.getDevice(), input.getLocation(),
76  					input.getSymbolicName(), input.getField());
77  			var point = new InfluxPoint(input.getTimestamp(), input.getValue());
78  			if (result.containsKey(key)) {
79  				result.get(key).getPoints().add(point);
80  			} else {
81  				var points = new ArrayList<InfluxPoint>();
82  				points.add(point);
83  				var payload = new TimeseriesPayload(new Timeseries(input.getMeasurement(), input.getDevice(),
84  						input.getLocation(), input.getSymbolicName(), input.getField()), points);
85  				result.put(key, payload);
86  			}
87  		}
88  		return new ArrayList<>(result.values());
89  	}
90  
91  }