Skip to content

Commit

Permalink
add h2 support phase 2
Browse files Browse the repository at this point in the history
  • Loading branch information
clevertension committed Oct 13, 2017
1 parent bc38a56 commit d4333ff
Show file tree
Hide file tree
Showing 21 changed files with 725 additions and 66 deletions.
Expand Up @@ -16,7 +16,7 @@
public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
private final Logger logger = LoggerFactory.getLogger(ApplicationH2DAO.class);
private static final String INSERT_APPLICATION_SQL = "insert into {0}({1}, {2}) values(?, ?)";
; @Override
@Override
public int getApplicationId(String applicationCode) {
logger.info("get the application id with application code = {}", applicationCode);
String sql = "select " + ApplicationTable.COLUMN_APPLICATION_ID + " from " +
Expand Down
@@ -1,23 +1,19 @@
package org.skywalking.apm.collector.agentregister.worker.instance.dao;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;

import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.define.register.ApplicationTable;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.skywalking.apm.collector.storage.define.serviceref.ServiceReferenceTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;

/**
* @author pengys5
*/
Expand Down
Expand Up @@ -51,6 +51,7 @@ public class NodeMappingH2DAO extends H2DAO implements INodeMappingDAO, IPersist
source.put(NodeMappingTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
String sql = getBatchInsertSql(NodeMappingTable.TABLE, source.keySet());
entity.setSql(sql);

entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
Expand Down
@@ -1,25 +1,94 @@
package org.skywalking.apm.collector.agentstream.worker.noderef.dao;

import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.stream.Data;
import org.skywalking.apm.collector.storage.define.DataDefine;
import org.skywalking.apm.collector.storage.define.node.NodeMappingTable;
import org.skywalking.apm.collector.storage.define.noderef.NodeReferenceTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.define.H2SqlEntity;
import org.skywalking.apm.collector.stream.worker.impl.dao.IPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @author pengys5
*/
public class NodeReferenceH2DAO extends H2DAO implements INodeReferenceDAO, IPersistenceDAO<H2SqlEntity, H2SqlEntity> {
private final Logger logger = LoggerFactory.getLogger(NodeReferenceH2DAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
@Override public Data get(String id, DataDefine dataDefine) {
H2Client client = getClient();
String sql = MessageFormat.format(GET_SQL, NodeReferenceTable.TABLE, "id");
Object[] params = new Object[]{id};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
Data data = dataDefine.build(id);
data.setDataInteger(0, rs.getInt(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID));
data.setDataInteger(1, rs.getInt(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID));
data.setDataString(1, rs.getString(NodeReferenceTable.COLUMN_BEHIND_PEER));
data.setDataInteger(2, rs.getInt(NodeReferenceTable.COLUMN_S1_LTE));
data.setDataInteger(3, rs.getInt(NodeReferenceTable.COLUMN_S3_LTE));
data.setDataInteger(4, rs.getInt(NodeReferenceTable.COLUMN_S5_LTE));
data.setDataInteger(5, rs.getInt(NodeReferenceTable.COLUMN_S5_GT));
data.setDataInteger(6, rs.getInt(NodeReferenceTable.COLUMN_SUMMARY));
data.setDataInteger(7, rs.getInt(NodeReferenceTable.COLUMN_ERROR));
data.setDataLong(0, rs.getLong(NodeReferenceTable.COLUMN_TIME_BUCKET));
return data;
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return null;
}
@Override public H2SqlEntity prepareBatchInsert(Data data) {
return null;
Map<String, Object> source = new HashMap<>();
H2SqlEntity entity = new H2SqlEntity();
source.put("id", data.getDataString(0));
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeReferenceTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
String sql = getBatchInsertSql(NodeMappingTable.TABLE, source.keySet());
entity.setSql(sql);

entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override public H2SqlEntity prepareBatchUpdate(Data data) {
return null;
Map<String, Object> source = new HashMap<>();
H2SqlEntity entity = new H2SqlEntity();
source.put(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, data.getDataInteger(0));
source.put(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, data.getDataInteger(1));
source.put(NodeReferenceTable.COLUMN_BEHIND_PEER, data.getDataString(1));
source.put(NodeReferenceTable.COLUMN_S1_LTE, data.getDataInteger(2));
source.put(NodeReferenceTable.COLUMN_S3_LTE, data.getDataInteger(3));
source.put(NodeReferenceTable.COLUMN_S5_LTE, data.getDataInteger(4));
source.put(NodeReferenceTable.COLUMN_S5_GT, data.getDataInteger(5));
source.put(NodeReferenceTable.COLUMN_SUMMARY, data.getDataInteger(6));
source.put(NodeReferenceTable.COLUMN_ERROR, data.getDataInteger(7));
source.put(NodeReferenceTable.COLUMN_TIME_BUCKET, data.getDataLong(0));
String id = data.getDataString(0);
String sql = getBatchUpdateSql(NodeMappingTable.TABLE, source.keySet(), "id");
entity.setSql(sql);
List<Object> values = new ArrayList<>(source.values());
values.add(id);
entity.setParams(values.toArray(new Object[0]));
return entity;
}
}
Expand Up @@ -14,6 +14,7 @@ public NodeReferenceH2TableDefine() {
}

@Override public void initialize() {
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_FRONT_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_BEHIND_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(NodeReferenceTable.COLUMN_BEHIND_PEER, H2ColumnDefine.Type.Varchar.name()));
Expand Down
Expand Up @@ -13,7 +13,7 @@
import java.text.MessageFormat;

/**
* @author pengys5
* @author pengys5, clevertension
*/
public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {

Expand Down
@@ -1,18 +1,11 @@
package org.skywalking.apm.collector.ui.dao;

import com.google.gson.JsonArray;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricTable;
import org.skywalking.apm.collector.storage.define.register.InstanceDataDefine;
import org.skywalking.apm.collector.storage.define.register.InstanceTable;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,16 +17,16 @@
import java.util.List;

/**
* @author pengys5
* @author pengys5, clevertension
*/
public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceH2DAO.class);
private static final String GET_METRIC_SQL = "select * from {0} where {1} = ?";
private static final String GET_METRICS_SQL = "select * from {0} where {1} in (";
private static final String GET_CPU_METRIC_SQL = "select * from {0} where {1} = ?";
private static final String GET_CPU_METRICS_SQL = "select * from {0} where {1} in (";
@Override public int getMetric(int instanceId, long timeBucket) {
String id = timeBucket + Const.ID_SPLIT + instanceId;
H2Client client = getClient();
String sql = MessageFormat.format(GET_METRIC_SQL, CpuMetricTable.TABLE, "id");
String sql = MessageFormat.format(GET_CPU_METRIC_SQL, CpuMetricTable.TABLE, "id");
Object[] params = new Object[]{id};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
Expand All @@ -47,7 +40,7 @@ public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO {

@Override public JsonArray getMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
H2Client client = getClient();
String sql = MessageFormat.format(GET_METRICS_SQL, CpuMetricTable.TABLE, "id");
String sql = MessageFormat.format(GET_CPU_METRICS_SQL, CpuMetricTable.TABLE, "id");

long timeBucket = startTimeBucket;
List<String> idList = new ArrayList<>();
Expand All @@ -59,7 +52,7 @@ public class CpuMetricH2DAO extends H2DAO implements ICpuMetricDAO {
while (timeBucket <= endTimeBucket);

StringBuilder builder = new StringBuilder();
for( int i = 0 ; i < idList.size(); i++ ) {
for (int i = 0; i < idList.size(); i++) {
builder.append("?,");
}
builder.delete(builder.length() - 1, builder.length());
Expand Down
@@ -1,27 +1,156 @@
package org.skywalking.apm.collector.ui.dao;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricTable;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.network.proto.GCPhrase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;

/**
* @author pengys5
* @author pengys5, clevertension
*/
public class GCMetricH2DAO extends H2DAO implements IGCMetricDAO {
private final Logger logger = LoggerFactory.getLogger(GCMetricH2DAO.class);
private static final String GET_GC_COUNT_SQL = "select sum({0}) as cnt, {1} from {2} where {3} > ? group by {1}";
private static final String GET_GC_COUNT_SQL = "select {1}, sum({0}) as cnt, {1} from {2} where {3} = ? and {4} in (";
private static final String GET_GC_METRIC_SQL = "select * from {0} where {1} = ?";
private static final String GET_GC_METRICS_SQL = "select * from {0} where {1} in (";
@Override public GCCount getGCCount(long[] timeBuckets, int instanceId) {
GCCount gcCount = new GCCount();
H2Client client = getClient();
String sql = GET_GC_COUNT_SQL;
StringBuilder builder = new StringBuilder();
for (int i = 0; i < timeBuckets.length; i++) {
builder.append("?,");
}
builder.delete(builder.length() - 1, builder.length());
builder.append(")");
sql = sql + builder + " group by {1}";
sql = MessageFormat.format(sql, GCMetricTable.COLUMN_COUNT, GCMetricTable.COLUMN_PHRASE,
GCMetricTable.TABLE, GCMetricTable.COLUMN_INSTANCE_ID, "id");
Object[] params = new Object[timeBuckets.length + 1];
for (int i = 0; i < timeBuckets.length; i++) {
params[i + 1] = timeBuckets[i];
}
params[0] = instanceId;
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
int phrase = rs.getInt(GCMetricTable.COLUMN_PHRASE);
int count = rs.getInt("cnt");

if (phrase == GCPhrase.NEW_VALUE) {
gcCount.setYoung(count);
} else if (phrase == GCPhrase.OLD_VALUE) {
gcCount.setOld(count);
}
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return gcCount;
}

@Override public JsonObject getMetric(int instanceId, long timeBucket) {
return null;
JsonObject response = new JsonObject();
H2Client client = getClient();
String sql = MessageFormat.format(GET_GC_METRIC_SQL, GCMetricTable.TABLE, "id");
String youngId = timeBucket + Const.ID_SPLIT + GCPhrase.NEW_VALUE + instanceId;
Object[] params = new Object[]{youngId};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
response.addProperty("ygc", rs.getInt(GCMetricTable.COLUMN_COUNT));
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
String oldId = timeBucket + Const.ID_SPLIT + GCPhrase.OLD_VALUE + instanceId;
Object[] params1 = new Object[]{oldId};
try (ResultSet rs = client.executeQuery(sql, params1)) {
if (rs.next()) {
response.addProperty("ogc", rs.getInt(GCMetricTable.COLUMN_COUNT));
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}

return response;
}

@Override public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket) {
return null;
JsonObject response = new JsonObject();
H2Client client = getClient();
String sql = MessageFormat.format(GET_GC_METRICS_SQL, GCMetricTable.TABLE, "id");
long timeBucket = startTimeBucket;
List<String> idList = new ArrayList<>();
do {
timeBucket = TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND.name(), timeBucket, 1);
String youngId = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + GCPhrase.NEW_VALUE;
idList.add(youngId);
}
while (timeBucket <= endTimeBucket);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < idList.size(); i++) {
builder.append("?,");
}
builder.delete(builder.length() - 1, builder.length());
builder.append(")");
sql = sql + builder;
Object[] params = idList.toArray(new String[0]);

JsonArray youngArray = new JsonArray();
try (ResultSet rs = client.executeQuery(sql, params)) {
while (rs.next()) {
youngArray.add(rs.getInt(GCMetricTable.COLUMN_COUNT));
}
if (youngArray.size() == 0) {
youngArray.add(0);
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
response.add("ygc", youngArray);
List<String> idList1 = new ArrayList<>();
timeBucket = startTimeBucket;
do {
timeBucket = TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND.name(), timeBucket, 1);
String oldId = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + GCPhrase.OLD_VALUE;
idList1.add(oldId);
}
while (timeBucket <= endTimeBucket);
String sql1 = MessageFormat.format(GET_GC_METRICS_SQL, GCMetricTable.TABLE, "id");
StringBuilder builder1 = new StringBuilder();
for (int i = 0; i < idList1.size(); i++) {
builder1.append("?,");
}
builder1.delete(builder1.length() - 1, builder1.length());
builder1.append(")");
sql1 = sql1 + builder1;
Object[] params1 = idList.toArray(new String[0]);
JsonArray oldArray = new JsonArray();

try (ResultSet rs = client.executeQuery(sql1, params1)) {
while (rs.next()) {
oldArray.add(rs.getInt(GCMetricTable.COLUMN_COUNT));
}
if (oldArray.size() == 0) {
oldArray.add(0);
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
response.add("ogc", oldArray);

return response;
}
}

0 comments on commit d4333ff

Please sign in to comment.