1 package de.dlr.shepard.influxDB;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.ArrayList;
6 import java.util.List;
7 import java.util.Set;
8 import java.util.UUID;
9 import java.util.concurrent.ConcurrentLinkedQueue;
10
11 import de.dlr.shepard.exceptions.InvalidBodyException;
12
13 public class TimeseriesService {
14
15 private InfluxDBConnector influxConnector = InfluxDBConnector.getInstance();
16 private CsvConverter csvConverter = new CsvConverter();
17
18
19
20
21
22
23
24
25 public String createTimeseries(String database, TimeseriesPayload payload) {
26 String sanityCheck = InfluxUtil.sanitize(payload.getTimeseries());
27 if (!sanityCheck.isBlank())
28 throw new InvalidBodyException(sanityCheck);
29 if (!influxConnector.databaseExist(database)) {
30 return String.format("The database %s does not exist", database);
31 }
32 return influxConnector.saveTimeseriesPayload(database, payload);
33 }
34
35
36
37
38
39
40
41
42
43
44
45
46
47 public TimeseriesPayload getTimeseriesPayload(long startTimeStamp, long endTimeStamp, String database,
48 Timeseries timeseries, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption) {
49 TimeseriesPayload payload = influxConnector.getTimeseriesPayload(startTimeStamp, endTimeStamp, database,
50 timeseries, function, groupBy, fillOption);
51 return payload;
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public List<TimeseriesPayload> getTimeseriesPayloadList(long start, long end, String database,
71 List<Timeseries> timeseriesList, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption,
72 Set<String> devicesFilterSet, Set<String> locationsFilterSet, Set<String> symbolicNameFilterSet) {
73 var timeseriesPayloadQueue = new ConcurrentLinkedQueue<TimeseriesPayload>();
74 timeseriesList.parallelStream().forEach(timeseries -> {
75 TimeseriesPayload payload = null;
76 if (matchFilter(timeseries, devicesFilterSet, locationsFilterSet, symbolicNameFilterSet)) {
77 payload = getTimeseriesPayload(start, end, database, timeseries, function, groupBy, fillOption);
78 }
79 if (payload != null) {
80 timeseriesPayloadQueue.add(payload);
81 }
82
83 });
84 return new ArrayList<>(timeseriesPayloadQueue);
85 }
86
87
88
89
90
91
92
93 public List<Timeseries> getTimeseriesAvailable(String database) {
94 return influxConnector.getTimeseriesAvailable(database);
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 public InputStream exportTimeseriesPayload(long start, long end, String database, List<Timeseries> timeseriesList,
115 SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption, Set<String> devicesFilterSet,
116 Set<String> locationsFilterSet, Set<String> symbolicNameFilterSet) throws IOException {
117 var payload = getTimeseriesPayloadList(start, end, database, timeseriesList, function, groupBy, fillOption,
118 devicesFilterSet, locationsFilterSet, symbolicNameFilterSet);
119 var stream = csvConverter.convertToCsv(payload);
120 return stream;
121 }
122
123
124
125
126
127
128
129
130
131 public String importTimeseries(String database, InputStream stream) throws IOException {
132 List<String> errors = new ArrayList<>();
133 var timeseriesList = csvConverter.convertToPayload(stream);
134 for (var timeseries : timeseriesList) {
135 var error = createTimeseries(database, timeseries);
136 if (!error.isBlank()) {
137 errors.add(error);
138 }
139 }
140 return String.join(", ", errors);
141 }
142
143
144
145
146
147
148 public String createDatabase() {
149 String name = UUID.randomUUID().toString();
150 influxConnector.createDatabase(name);
151 return name;
152 }
153
154 public void deleteDatabase(String database) {
155 influxConnector.deleteDatabase(database);
156 }
157
158 private boolean matchFilter(Timeseries timeseries, Set<String> device, Set<String> location, Set<String> symName) {
159 var deviceMatches = true;
160 var locatioMatches = true;
161 var symbolicNameMatches = true;
162 if (!device.isEmpty()) {
163 deviceMatches = device.contains(timeseries.getDevice());
164 }
165 if (!location.isEmpty()) {
166 locatioMatches = location.contains(timeseries.getLocation());
167 }
168 if (!symName.isEmpty()) {
169 symbolicNameMatches = symName.contains(timeseries.getSymbolicName());
170 }
171 return deviceMatches && locatioMatches && symbolicNameMatches;
172 }
173
174 }