1 package de.dlr.shepard.influxDB;
2
3 import java.util.ArrayList;
4 import java.util.Collections;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8
9 import org.influxdb.BatchOptions;
10 import org.influxdb.InfluxDB;
11 import org.influxdb.InfluxDBException;
12 import org.influxdb.InfluxDBFactory;
13 import org.influxdb.dto.Pong;
14 import org.influxdb.dto.Query;
15 import org.influxdb.dto.QueryResult;
16
17 import de.dlr.shepard.util.Constants;
18 import de.dlr.shepard.util.IConnector;
19 import de.dlr.shepard.util.PropertiesHelper;
20 import lombok.extern.slf4j.Slf4j;
21
22
23
24
25
26
27 @Slf4j
28 public class InfluxDBConnector implements IConnector {
29
30 private InfluxDB influxDB;
31 private static InfluxDBConnector instance = null;
32
33
34
35
36 private InfluxDBConnector() {
37 }
38
39
40
41
42
43
44
45 public static InfluxDBConnector getInstance() {
46 if (instance == null) {
47 instance = new InfluxDBConnector();
48 }
49 return instance;
50 }
51
52
53
54
55
56
57
58 @Override
59 public boolean connect() {
60 PropertiesHelper helper = new PropertiesHelper();
61 String host = helper.getProperty("influx.host");
62 String username = helper.getProperty("influx.username");
63 String password = helper.getProperty("influx.password");
64
65 influxDB = InfluxDBFactory.connect(String.format("http://%s", host), username, password);
66 influxDB.enableBatch(BatchOptions.DEFAULTS.exceptionHandler((failedPoints, throwable) -> log
67 .error("Exception while writing the following points: {}, Exception: {}", failedPoints, throwable)));
68 return true;
69 }
70
71 @Override
72 public boolean disconnect() {
73 if (influxDB != null)
74 influxDB.close();
75 return true;
76 }
77
78
79
80
81
82
83 @Override
84 public boolean alive() {
85 Pong response;
86 try {
87 response = influxDB.ping();
88 } catch (InfluxDBException ex) {
89 return false;
90 }
91 return response != null && !response.getVersion().equalsIgnoreCase("unknown");
92 }
93
94
95
96
97
98
99 public void createDatabase(String databaseName) {
100 String query = String.format("CREATE DATABASE \"%s\"", databaseName);
101 influxDB.query(new Query(query));
102 }
103
104
105
106
107
108
109 public void deleteDatabase(String databaseName) {
110 String query = String.format("DROP DATABASE \"%s\"", databaseName);
111 influxDB.query(new Query(query));
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 public String saveTimeseriesPayload(String database, TimeseriesPayload payload) {
129 var timeseries = payload.getTimeseries();
130 var expectedType = getExpectedDatatype(database, timeseries.getMeasurement(), timeseries.getField());
131 var batchPoints = InfluxUtil.createBatch(database, payload, expectedType);
132 try {
133 influxDB.write(batchPoints);
134 } catch (InfluxDBException e) {
135 log.error("InfluxdbException while writing payload {}: {}", payload.getTimeseries(), e.getMessage());
136 return e.getMessage();
137 }
138 return "";
139
140 }
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 public TimeseriesPayload getTimeseriesPayload(long startTimeStamp, long endTimeStamp, String database,
161 Timeseries timeseries, SingleValuedUnaryFunction function, Long groupBy, FillOption fillOption) {
162 Query query = InfluxUtil.buildQuery(startTimeStamp, endTimeStamp, database, timeseries, function, groupBy,
163 fillOption);
164 log.debug("Influx Query: {}", query.getCommand());
165 QueryResult queryResult;
166
167 try {
168 queryResult = influxDB.query(query);
169 } catch (InfluxDBException e) {
170 queryResult = null;
171 log.error("Could not parse query: {}", query.getCommand());
172 }
173 if (InfluxUtil.isQueryResultValid(queryResult)) {
174 return InfluxUtil.extractPayload(queryResult, timeseries);
175 }
176 return new TimeseriesPayload(timeseries, Collections.emptyList());
177 }
178
179
180
181
182
183
184
185 public List<Timeseries> getTimeseriesAvailable(String database) {
186 Query query = new Query(String.format("SHOW SERIES ON \"%s\"", database));
187 QueryResult queryResult = influxDB.query(query);
188 if (!InfluxUtil.isQueryResultValid(queryResult)) {
189 log.warn("There was an error while querying the Influxdb for available timeseries");
190 return Collections.emptyList();
191 }
192 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
193 var fields = getFields(database);
194 var result = new ArrayList<Timeseries>(values.size());
195 for (var value : values) {
196 var series = ((String) value.get(0)).split(",");
197
198 var meas = series[0];
199 var tags = extractTags(series);
200 var dev = tags.getOrDefault(Constants.DEVICE, "");
201 var loc = tags.getOrDefault(Constants.LOCATION, "");
202 var symName = tags.getOrDefault(Constants.SYMBOLICNAME, "");
203 for (var field : fields.getOrDefault(meas, Collections.emptyList())) {
204 result.add(new Timeseries(meas, dev, loc, symName, field));
205 }
206 }
207 return result;
208 }
209
210 private Map<String, String> extractTags(String[] series) {
211 var result = new HashMap<String, String>();
212 for (var tagString : series) {
213 var tags = tagString.split("=", 2);
214
215
216 if (tags.length < 2)
217 continue;
218
219 result.put(tags[0], tags[1]);
220 }
221 return result;
222 }
223
224 private Map<String, List<String>> getFields(String database) {
225 Query query = new Query(String.format("SHOW FIELD KEYS ON \"%s\"", database));
226 QueryResult queryResult = influxDB.query(query);
227 if (!InfluxUtil.isQueryResultValid(queryResult)) {
228 log.warn("There was an error while querying the Influxdb for available fields");
229 return Collections.emptyMap();
230 }
231 var series = queryResult.getResults().get(0).getSeries();
232 var result = new HashMap<String, List<String>>();
233 for (var s : series) {
234 var fields = new ArrayList<String>();
235 for (var value : s.getValues()) {
236 fields.add((String) value.get(0));
237 }
238 result.put(s.getName(), fields);
239 }
240 return result;
241 }
242
243 public boolean databaseExist(String database) {
244 QueryResult queryResult = influxDB.query(new Query("SHOW DATABASES"));
245 if (!InfluxUtil.isQueryResultValid(queryResult)) {
246 log.warn("There was an error while querying the Influxdb for databases");
247 return false;
248 }
249
250 var values = queryResult.getResults().get(0).getSeries().get(0).getValues();
251 for (var databaseName : values) {
252 if (databaseName.get(0).toString().trim().equals(database)) {
253 return true;
254 }
255 }
256
257 return false;
258 }
259
260
261
262
263
264 private String getExpectedDatatype(String database, String measurement, String field) {
265 String queryString = String.format("SHOW FIELD KEYS ON \"%s\" FROM %s", database, measurement);
266 QueryResult result = influxDB.query(new Query(queryString));
267 if (!InfluxUtil.isQueryResultValid(result)) {
268 log.info("Could not get expected datatype query string \"{}\"", queryString);
269 return "";
270 }
271
272 var values = result.getResults().get(0).getSeries().get(0).getValues();
273 for (var value : values) {
274 if (value.get(0).equals(field))
275 return (String) value.get(1);
276 }
277
278 return "";
279 }
280 }