feat:支持hive,支持多表同步,添加doc

This commit is contained in:
machunxiao 2019-01-02 16:52:08 +08:00
parent 5805b3b723
commit 83e168da27
22 changed files with 712 additions and 390 deletions

View File

@ -0,0 +1,61 @@
Tunnel 是一个将postgresql的实时数据同步到es或kafka的服务
## 版本支持
- Postgresql 9.4 or later
- Kafka 0.8 or later
- ElasticSearch 5.x
## 架构图
![架构图](./doc/arch.png)
## 原理
tunnel 利用pg内部的逻辑复制功能,通过在pg创建逻辑复制槽,接收数据库的逻辑变更,通过解析test_decoding特定格式的消息,得到逻辑数据
## 安装使用
### 安装
```shell
$git clone https://github.com/hellobike/tunnel
$cd tunnel
$mvn clean package -Dmaven.test.skip=true
$cd target
$unzip AppTunnelService.zip
$cd AppTunnelService
```
### 使用
```shell
$java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties
```
## PG 配置
PG数据库需要预先开启逻辑复制[pg配置](./doc/pg.md)
## Tunnel 配置
### 配置文件
[tunnel配置](./doc/cfg.properties)
### 监控
Tunnel支持使用prometheus来监控同步数据状态,[配置Grafana监控](./doc/prometheus.md)
### 同步到 elasticsearch
[同步到elasticsearch](./doc/es.md)
### 同步到 kafka
[同步到kafka](./doc/kafka.md)
## 许可
Tunnel 使用 Apache License 2 许可

BIN
doc/arch.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 136 KiB

2
doc/cfg.properties Normal file
View File

@ -0,0 +1,2 @@
tunnel_subscribe_config={"pg_dump_path":"","subscribes":[{"slotName":"slot_for_test","pgConnConf":{"host":"localhost","port":5432,"database":"test1","user":"test1","password":"test1"},"rules":[{"table":"t_department_info","fields":null,"pks":["id"],"esid":["id"],"index":"t_department_info","type":"logs"}],"esConf":{"addrs":"http://localhost:9200"}}]}
tunnel_zookeeper_address=localhost:2181

47
doc/es.md Normal file
View File

@ -0,0 +1,47 @@
# 同步到es配置
如下面的配置文件,配置了一个把 test.student_name 中的实时数据同步到es中student_name的索引中
```json
{
# pg_dump 可执行文件path如pg_dump在 $PATH 路径下面,则不需配置
"pg_dump_path": "",
"subscribes": [{
# 是否dump 历史数据如只需要实时数据可以不配或配置为false默认false
"dump": false,
# 逻辑复制槽名称,确保唯一
"slotName": "slot_for_es",
# pg 连接配置
"pgConnConf": {
"host": "127.0.0.1",
"port": 5432,
"database": "test",
"user": "postgres",
"password": "admin"
},
# 同步规则配置
"rules": [
{
# 表名匹配,支持通配符
"table": "student_name",
# 表的主键配置
"pks": ["id"],
# es的id值配置如配置 "id" 则会把表中的id字段作为es的_id
"esid": ["id"],
# es的索引配置
"index": "student_name",
# es的type配置
"type": "logs"
}
],
# es 连接配置
"esConf": {
"addrs": "http://localhost:9200",
"user": "",
"password": ""
},
# 错误重试配置,0为不重试,-1会一直重试知道成功
"retry": 0
}]
}
```

41
doc/kafka.md Normal file
View File

@ -0,0 +1,41 @@
# 同步到kafka配置
如下面的配置文件,配置了一个把 test.student_name 中的实时数据同步到kafka中student_name_logs的topic中
```json
{
# pg_dump 可执行文件path如pg_dump在 $PATH 路径下面,则不需配置
"pg_dump_path": "",
"subscribes": [{
# 是否dump 历史数据如只需要实时数据可以不配或配置为false默认false
"dump": false,
# 逻辑复制槽名称,确保唯一
"slotName": "slot_for_kafka",
# pg 连接配置
"pgConnConf": {
"host": "127.0.0.1",
"port": 5432,
"database": "test",
"user": "postgres",
"password": "admin"
},
# 同步规则配置
"rules": [
{
# 表名匹配,支持通配符
"table": "student_name",
# 表的主键配置
"pks": ["id"],
# kafka topic
"topic": "student_name_logs"
}
],
# kafka 连接配置
"kafkaConf": {
"addrs": ["127.0.0.1:9092"]
},
# 错误重试配置,0为不重试,-1会一直重试知道成功
"retry": 0
}]
}
```

23
doc/pg.md Normal file
View File

@ -0,0 +1,23 @@
# PostgreSQL 配置
## 参数修改
```
wal_level = 'logical';
max_replication_slots = 5; #该值要大于1
```
**修改后需要重启才能生效**
## 创建有replication权限的用户
```sql
CREATE ROLE test_rep LOGIN ENCRYPTED PASSWORD 'xxxx' REPLICATION;
GRANT CONNECT ON DATABASE test_database to test_rep;
```
## 修改白名单配置
在 pg_hba.conf 中增加配置: ```host replication test_rep all md5```
**修改后需要reload才能生效**

13
doc/prometheus.md Normal file
View File

@ -0,0 +1,13 @@
# prometheus 监控配置
## Tunnel prometheus抓取地址配置
在Tunnel的配置文件中
``` java
// PrometheusAddress prometheus
// 启动参数中添加配置
java -server -classpath conf/*:lib/* com.hellobike.base.tunnel.TunnelLauncher -u false -c cfg.properties -p 7788
```
可以通过 localhost:7788/metrics 可以获取到相关监控项

View File

@ -27,8 +27,8 @@ public class PrometheusMonitor implements TunnelMonitor {
public PrometheusMonitor(ExporterConfig config) {
this.gauge = Gauge.build()
.name(config.getMetricName())
.labelNames(config.getLabelNames())
.help("Tunnel Requests.").register();
.labelNames(config.getLabelNames()).help("Tunnel Requests.")
.register();
}
@Override
@ -36,7 +36,7 @@ public class PrometheusMonitor implements TunnelMonitor {
this.gauge.labels(statics.getSlotName(), statics.getAppId(),
statics.getDatabase(), statics.getTable(),
statics.getTarget(), String.valueOf(statics.getTotal()),
String.valueOf(statics.getCurrentTime()), statics.getError()).inc();
String.valueOf(statics.getCurrentTime()), statics.getError() == null ? "" : statics.getError()).inc();
}
}

View File

@ -26,6 +26,12 @@ import com.hellobike.base.tunnel.monitor.TunnelMonitorFactory;
import com.hellobike.base.tunnel.publisher.PublisherManager;
import com.hellobike.base.tunnel.publisher.es.EsPublisher;
import com.hellobike.base.tunnel.publisher.hbase.HBasePublisher;
import com.hellobike.base.tunnel.publisher.hdfs.HdfsConfig;
import com.hellobike.base.tunnel.publisher.hdfs.HdfsPublisher;
import com.hellobike.base.tunnel.publisher.hdfs.HdfsRule;
import com.hellobike.base.tunnel.publisher.hive.HiveConfig;
import com.hellobike.base.tunnel.publisher.hive.HivePublisher;
import com.hellobike.base.tunnel.publisher.hive.HiveRule;
import com.hellobike.base.tunnel.publisher.kafka.KafkaPublisher;
import com.hellobike.base.tunnel.spi.api.CollectionUtils;
import com.hellobike.base.tunnel.utils.FileUtils;
@ -295,9 +301,68 @@ public class TunnelLauncher {
}
private static void parseHiveConfig(String slotName, ApolloConfig.HiveConf hiveConf, List<ApolloConfig.Rule> rules) {
if (hiveConf == null || StringUtils.isBlank(hiveConf.getHdfsAddress())) {
return;
}
List<HiveRule> hiveRules = rules.stream()
.map(TunnelLauncher::toHiveRule)
.filter(Objects::nonNull)
.collect(Collectors.toList());
HiveConfig hiveConfig = new HiveConfig();
hiveConfig.setUsername(StringUtils.isBlank(hiveConf.getUser()) ? "default" : hiveConf.getUser());
hiveConfig.setPassword(StringUtils.isBlank(hiveConf.getPassword()) ? "default" : hiveConf.getPassword());
hiveConfig.setHiveUrl("jdbc:hive2://" + hiveConf.getHost() + ":" + hiveConf.getPort() + "/default;ssl=false;");
hiveConfig.setRules(hiveRules);
hiveConfig.setDataDir(hiveConf.getDataDir());
hiveConfig.setTable(hiveConf.getTableName());
hiveConfig.setPartition(hiveConf.getPartition());
hiveConfig.setHdfsAddresses(hiveConf.getHdfsAddress().split(","));
PublisherManager.getInstance().putPublisher(slotName, new HivePublisher(hiveConfig));
}
private static void parseHdfsConfig(String slotName, ApolloConfig.HdfsConf hdfsConf, List<ApolloConfig.Rule> rules) {
if (hdfsConf == null
|| StringUtils.isBlank(hdfsConf.getAddress())
|| StringUtils.isBlank(hdfsConf.getFile())
|| CollectionUtils.isEmpty(rules)) {
return;
}
HdfsConfig hdfsConfig = new HdfsConfig();
hdfsConfig.setAddress(hdfsConf.getAddress());
hdfsConfig.setFileName(hdfsConf.getFile());
List<HdfsRule> hdfsRules = rules.stream()
.map(TunnelLauncher::toHdfsRule)
.filter(Objects::nonNull)
.collect(Collectors.toList());
hdfsConfig.setRules(hdfsRules);
PublisherManager.getInstance().putPublisher(slotName, new HdfsPublisher(hdfsConfig));
}
private static HiveRule toHiveRule(ApolloConfig.Rule rule) {
if (rule.getTable() == null
|| CollectionUtils.isEmpty(rule.getHiveFields())
|| CollectionUtils.isEmpty(rule.getPks())) {
return null;
}
HiveRule hiveRule = new HiveRule();
hiveRule.setTable(rule.getTable());
hiveRule.setHiveTable(rule.getHiveTable());
hiveRule.setFields(rule.getHiveFields());
hiveRule.setPks(rule.getPks());
return hiveRule;
}
private static HdfsRule toHdfsRule(ApolloConfig.Rule rule) {
if (StringUtils.isBlank(rule.getTable())) {
return null;
}
HdfsRule hdfsRule = new HdfsRule();
hdfsRule.setTable(rule.getTable());
return hdfsRule;
}
private static KafkaConfig toKafkaConfig(ApolloConfig.Rule rule) {

View File

@ -17,6 +17,7 @@
package com.hellobike.base.tunnel.apollo;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.util.List;
import java.util.Map;
@ -26,28 +27,14 @@ import java.util.Map;
*
* @author machunxiao 2018-11-01
*/
@Data
public class ApolloConfig {
@JSONField(name = "pg_dump_path")
private String pgDumpPath;
private List<Subscribe> subscribes;
public String getPgDumpPath() {
return pgDumpPath;
}
public void setPgDumpPath(String pgDumpPath) {
this.pgDumpPath = pgDumpPath;
}
public List<Subscribe> getSubscribes() {
return subscribes;
}
public void setSubscribes(List<Subscribe> subscribes) {
this.subscribes = subscribes;
}
@Data
public static class Subscribe {
private String slotName;
@ -59,72 +46,9 @@ public class ApolloConfig {
private HiveConf hiveConf;
private HdfsConf hdfsConf;
public String getSlotName() {
return slotName;
}
public void setSlotName(String slotName) {
this.slotName = slotName;
}
public PgConnConf getPgConnConf() {
return pgConnConf;
}
public void setPgConnConf(PgConnConf pgConnConf) {
this.pgConnConf = pgConnConf;
}
public List<Rule> getRules() {
return rules;
}
public void setRules(List<Rule> rules) {
this.rules = rules;
}
public KafkaConf getKafkaConf() {
return kafkaConf;
}
public void setKafkaConf(KafkaConf kafkaConf) {
this.kafkaConf = kafkaConf;
}
public EsConf getEsConf() {
return esConf;
}
public void setEsConf(EsConf esConf) {
this.esConf = esConf;
}
public HBaseConf getHbaseConf() {
return hbaseConf;
}
public void setHbaseConf(HBaseConf hbaseConf) {
this.hbaseConf = hbaseConf;
}
public HiveConf getHiveConf() {
return hiveConf;
}
public void setHiveConf(HiveConf hiveConf) {
this.hiveConf = hiveConf;
}
public HdfsConf getHdfsConf() {
return hdfsConf;
}
public void setHdfsConf(HdfsConf hdfsConf) {
this.hdfsConf = hdfsConf;
}
}
@Data
public static class PgConnConf {
private String host;
private int port;
@ -132,98 +56,47 @@ public class ApolloConfig {
private String schema;
private String user;
private String password;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
@Data
public static class KafkaConf {
private List<String> addrs;
public List<String> getAddrs() {
return addrs;
}
public void setAddrs(List<String> addrs) {
this.addrs = addrs;
}
}
@Data
public static class EsConf {
private String addrs;
public String getAddrs() {
return addrs;
}
public void setAddrs(String addrs) {
this.addrs = addrs;
}
}
@Data
public static class HBaseConf {
private String zkquorum;
public String getZkquorum() {
return zkquorum;
}
public void setZkquorum(String zkquorum) {
this.zkquorum = zkquorum;
}
}
@Data
public static class HiveConf {
private String host;
private int port;
private String user;
private String password;
@JSONField(name = "hdfs_address")
private String hdfsAddress;
@JSONField(name = "data_dir")
private String dataDir;
@JSONField(name = "table_name")
private String tableName;
private String partition;
}
@Data
public static class HdfsConf {
private String address;
private String file;
}
@Data
public static class Rule {
private String table;
@ -248,125 +121,9 @@ public class ApolloConfig {
private String hbaseTable;
private List<String> hbaseKey;
public String getTable() {
return table;
}
private String hiveTable;
private List<String> hiveFields;
public void setTable(String table) {
this.table = table;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public int getPartition() {
return partition;
}
public void setPartition(int partition) {
this.partition = partition;
}
public List<String> getPks() {
return pks;
}
public void setPks(List<String> pks) {
this.pks = pks;
}
public List<String> getEsid() {
return esid;
}
public void setEsid(List<String> esid) {
this.esid = esid;
}
public String getIndex() {
return index;
}
public void setIndex(String index) {
this.index = index;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Map<String, String> getFields() {
return fields;
}
public void setFields(Map<String, String> fields) {
this.fields = fields;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public List<String> getParameters() {
return parameters;
}
public void setParameters(List<String> parameters) {
this.parameters = parameters;
}
public String getFamily() {
return family;
}
public void setFamily(String family) {
this.family = family;
}
public String getQualifier() {
return qualifier;
}
public void setQualifier(String qualifier) {
this.qualifier = qualifier;
}
public List<String> getRowKeys() {
return rowKeys;
}
public void setRowKeys(List<String> rowKeys) {
this.rowKeys = rowKeys;
}
public String getHbaseTable() {
return hbaseTable;
}
public void setHbaseTable(String hbaseTable) {
this.hbaseTable = hbaseTable;
}
public List<String> getHbaseKey() {
return hbaseKey;
}
public void setHbaseKey(List<String> hbaseKey) {
this.hbaseKey = hbaseKey;
}
}
/**
@ -384,35 +141,13 @@ public class ApolloConfig {
*
* </pre>
*/
@Data
public static class Join {
private String table;
private String sql;
private List<String> parameters;
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public List<String> getParameters() {
return parameters;
}
public void setParameters(List<String> parameters) {
this.parameters = parameters;
}
}
}

View File

@ -24,20 +24,25 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* @author machunxiao create at 2018-11-30
*/
public class HdfsClient {
private Configuration configuration;
private String address;
private String fileName;
public void append(HdfsConfig config, Event event) {
public HdfsClient(String address, String fileName) {
this.address = address;
this.fileName = fileName;
}
public void append(HdfsConfig config, HdfsRule rule, Event event) {
try {
Configuration hadoopConfig = new Configuration();
FileSystem fileSystem = FileSystem.get(hadoopConfig);
FileSystem fileSystem = FileSystem.get(URI.create(this.address), hadoopConfig);
Path hdfsPath = new Path(fileName);
FSDataOutputStream fileOutputStream = null;
try {
@ -62,10 +67,10 @@ public class HdfsClient {
}
public void delete(HdfsConfig config, Event event) {
public void delete(HdfsConfig config, HdfsRule rule, Event event) {
}
public void update(HdfsConfig config, Event event) {
public void update(HdfsConfig config, HdfsRule rule, Event event) {
}
}

View File

@ -1,5 +1,3 @@
package com.hellobike.base.tunnel.publisher.hdfs;
/*
* Copyright 2018 Shanghai Junzheng Network Technology Co.,Ltd.
*
@ -16,8 +14,18 @@ package com.hellobike.base.tunnel.publisher.hdfs;
* limitations under the License.
*/
package com.hellobike.base.tunnel.publisher.hdfs;
import lombok.Data;
import java.util.List;
/**
* @author machunxiao create at 2018-11-30
*/
@Data
public class HdfsConfig {
private String address;
private String fileName;
private List<HdfsRule> rules;
}

View File

@ -1,11 +1,3 @@
package com.hellobike.base.tunnel.publisher.hdfs;
import com.hellobike.base.tunnel.model.Event;
import com.hellobike.base.tunnel.publisher.BasePublisher;
import com.hellobike.base.tunnel.publisher.IPublisher;
import java.util.List;
/*
* Copyright 2018 Shanghai Junzheng Network Technology Co.,Ltd.
*
@ -22,31 +14,42 @@ import java.util.List;
* limitations under the License.
*/
package com.hellobike.base.tunnel.publisher.hdfs;
import com.hellobike.base.tunnel.model.Event;
import com.hellobike.base.tunnel.publisher.BasePublisher;
import com.hellobike.base.tunnel.publisher.IPublisher;
import java.util.regex.Pattern;
/**
* @author machunxiao create at 2018-11-30
*/
public class HdfsPublisher extends BasePublisher implements IPublisher {
private List<HdfsConfig> configs;
private HdfsClient hdfsClient;
private final HdfsConfig hdfsConfig;
private final HdfsClient hdfsClient;
public HdfsPublisher(List<HdfsConfig> configs) {
this.configs = configs;
this.hdfsClient = new HdfsClient();
public HdfsPublisher(HdfsConfig hdfsConfig) {
this.hdfsConfig = hdfsConfig;
this.hdfsClient = new HdfsClient(this.hdfsConfig.getAddress(), this.hdfsConfig.getFileName());
}
@Override
public void publish(Event event, Callback callback) {
for (HdfsConfig config : configs) {
for (HdfsRule rule : hdfsConfig.getRules()) {
if (!testTableName(rule.getTable(), event.getTable())) {
continue;
}
switch (event.getEventType()) {
case INSERT:
this.hdfsClient.append(config, event);
this.hdfsClient.append(hdfsConfig, rule, event);
break;
case DELETE:
this.hdfsClient.delete(config, event);
this.hdfsClient.delete(hdfsConfig, rule, event);
break;
case UPDATE:
this.hdfsClient.update(config, event);
this.hdfsClient.update(hdfsConfig, rule, event);
break;
default:
break;
@ -59,5 +62,8 @@ public class HdfsPublisher extends BasePublisher implements IPublisher {
}
private boolean testTableName(String tableFilter, String table) {
return Pattern.compile(tableFilter).matcher(table).matches();
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2018 Shanghai Junzheng Network Technology Co.,Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain CONFIG_NAME copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hellobike.base.tunnel.publisher.hdfs;
import lombok.Data;
/**
* @author machunxiao create at 2019-01-15
*/
@Data
public class HdfsRule {
private String table;
}

View File

@ -0,0 +1,145 @@
/*
* Copyright 2018 Shanghai Junzheng Network Technology Co.,Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain CONFIG_NAME copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hellobike.base.tunnel.publisher.hive;
import com.alibaba.druid.pool.DruidDataSource;
import com.hellobike.base.tunnel.model.ColumnData;
import com.hellobike.base.tunnel.model.Event;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.sql.Connection;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @author machunxiao create at 2019-01-15
*/
@Slf4j
public class HdfsClient implements AutoCloseable {
private final DruidDataSource dataSource;
private final HiveConfig hiveConfig;
public HdfsClient(HiveConfig hiveConfig) {
this.hiveConfig = hiveConfig;
this.dataSource = new DruidDataSource();
initDataSourceConfig();
}
public void insert(HiveRule rule, Event event) {
Map<String, String> kv = event.getDataList().stream()
.collect(Collectors.toMap(ColumnData::getName, ColumnData::getValue));
Optional<String> opt = rule.getFields().stream()
.map(kv::get)
.filter(Objects::nonNull)
.reduce((s1, s2) -> s1 + "," + s2);
if (!opt.isPresent() || StringUtils.isBlank(opt.get())) {
return;
}
String value = opt.get();
try {
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("fs.defaultFS", this.hiveConfig.getHdfsAddresses()[0]);
FileSystem fs = FileSystem.get(hadoopConfig);
String today = new SimpleDateFormat("yyyyMMdd").format(new Date());
String dir = hiveConfig.getDataDir() + "/" + today;
Path parent = new Path(dir);
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
if (!appendPartition(dir, today)) {
return;
}
String file = dir + "/" + today + ".xxx";
Path hdfsPath = new Path(file);
FSDataOutputStream fileOutputStream = null;
BufferedWriter bw = null;
try {
if (fs.exists(hdfsPath)) {
fileOutputStream = fs.append(hdfsPath);
} else {
fileOutputStream = fs.create(hdfsPath);
}
bw = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
bw.write(value);
bw.newLine();
bw.flush();
log.warn("WriteData To Hdfs Success");
} finally {
fs.close();
if (fileOutputStream != null) {
fileOutputStream.close();
}
if (bw != null) {
bw.close();
}
}
} catch (IOException e) {
log.warn("WriteData To Hdfs Failure", e);
}
}
@Override
public void close() {
this.dataSource.close();
}
private boolean appendPartition(String dataDir, String partition) {
String sql = String.format("ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s='%s') LOCATION '%s'",
this.hiveConfig.getTable(), this.hiveConfig.getPartition(), partition, dataDir + "/" + partition);
try (Connection conn = this.dataSource.getConnection();
Statement stmt = conn.createStatement()) {
stmt.execute(sql);
log.info("execute alter table success");
return true;
} catch (Exception e) {
log.info("execute alter table failure", e);
}
return false;
}
private void initDataSourceConfig() {
this.dataSource.setUsername(this.hiveConfig.getUsername());
this.dataSource.setPassword(this.hiveConfig.getPassword());
this.dataSource.setMaxActive(100);
this.dataSource.setMinIdle(50);
this.dataSource.setUrl(this.hiveConfig.getHiveUrl());
this.dataSource.setValidationQuery("select 1");
}
}

View File

@ -17,16 +17,25 @@
package com.hellobike.base.tunnel.publisher.hive;
import com.alibaba.druid.pool.DruidDataSource;
import com.hellobike.base.tunnel.model.ColumnData;
import com.hellobike.base.tunnel.model.Event;
import com.hellobike.base.tunnel.utils.MsgUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author machunxiao create at 2018-11-28
*/
@Slf4j
@Deprecated
public class HiveClient implements AutoCloseable {
private final DruidDataSource dataSource;
@ -39,34 +48,40 @@ public class HiveClient implements AutoCloseable {
initDataSourceConfig();
}
public void insert(Event event) {
public void insert(HiveRule rule, Event event) {
String sql = toInsertSQL(rule, event);
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
String sql = MsgUtils.toInsert(event);
stmt.execute(sql);
} catch (SQLException e) {
log.info("execute hive insert sql:{} success", sql);
} catch (Exception e) {
//
log.info("execute hive insert sql:{} failure", sql, e);
}
}
public void update(Event event) {
public void update(HiveRule rule, Event event) {
String sql = toUpdateSQL(rule, event);
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
String sql = MsgUtils.toUpdate(event, this.hiveConfig.getPks());
stmt.execute(sql);
} catch (SQLException e) {
log.info("execute hive update sql:{} success", sql);
} catch (Exception e) {
//
log.info("execute hive update sql:{} failure", sql, e);
}
}
public void delete(Event event) {
public void delete(HiveRule rule, Event event) {
String sql = toDeleteSQL(rule, event);
try (Connection conn = getConnection();
Statement stmt = conn.createStatement()) {
String sql = MsgUtils.toDelete(event, this.hiveConfig.getPks());
stmt.execute(sql);
} catch (SQLException e) {
log.info("execute hive delete sql:{} success", sql);
} catch (Exception e) {
//
log.info("execute hive delete sql:{} failure", sql, e);
}
}
@ -86,4 +101,55 @@ public class HiveClient implements AutoCloseable {
this.dataSource.setValidationQuery("select 1");
}
private String toInsertSQL(HiveRule rule, Event event) {
Map<String, String> kv = event.getDataList().stream()
.collect(Collectors.toMap(ColumnData::getName, ColumnData::getValue));
List<String> fields = rule.getFields();
List<String> values = new ArrayList<>();
for (String field : fields) {
String value = kv.get(field);
values.add("'" + value + "'");
}
return "insert into " + rule.getHiveTable() + "(" + StringUtils.join(fields, ",") + ") values(" + StringUtils.join(values, ",") + ")";
}
private String toUpdateSQL(HiveRule rule, Event event) {
Map<String, String> kv = event.getDataList().stream()
.collect(Collectors.toMap(ColumnData::getName, ColumnData::getValue));
StringBuilder out = new StringBuilder();
kv.forEach((k, v) -> out.append(k).append("=").append("'").append(v).append("',"));
out.setCharAt(out.length() - 1, ' ');
return "update " + rule.getHiveTable() + " set " + out.toString() + " where " + getPkValue(rule.getPks(), event.getDataList());
}
private String toDeleteSQL(HiveRule rule, Event event) {
return "delete from " + rule.getHiveTable() + " where " + getPkValue(rule.getPks(), event.getDataList());
}
private String getPkValue(List<String> pks, List<ColumnData> columnDataList) {
// (k1,k2) = (k1,k2)
LinkedHashMap<String, String> tmp = new LinkedHashMap<>();
for (String pk : pks) {
tmp.put(pk, "");
}
for (ColumnData columnData : columnDataList) {
String val = tmp.get(columnData.getName());
if ("".equals(val)) {
tmp.put(columnData.getName(), columnData.getValue());
}
}
List<String> keys = new ArrayList<>();
List<String> vals = new ArrayList<>();
for (Map.Entry<String, String> e : tmp.entrySet()) {
String key = e.getKey();
String val = e.getValue();
keys.add(key);
vals.add("'" + val + "'");
}
return "(" + StringUtils.join(keys, ",") + ") = (" + StringUtils.join(vals, ",") + ")";
}
}

View File

@ -16,51 +16,27 @@
package com.hellobike.base.tunnel.publisher.hive;
import com.hellobike.base.tunnel.filter.IEventFilter;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
/**
* @author machunxiao create at 2018-11-27
*/
@Data
public class HiveConfig {
private String hiveUrl;
private String username;
private String password;
private String dataDir;
private String table;
private String partition;
private String[] hdfsAddresses;
private List<String> pks;
public String getHiveUrl() {
return hiveUrl;
}
public void setHiveUrl(String hiveUrl) {
this.hiveUrl = hiveUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public List<String> getPks() {
return pks;
}
public void setPks(List<String> pks) {
this.pks = pks;
}
private List<IEventFilter> filters = new ArrayList<>();
private List<HiveRule> rules = new ArrayList<>();
}

View File

@ -20,40 +20,54 @@ import com.hellobike.base.tunnel.model.Event;
import com.hellobike.base.tunnel.publisher.BasePublisher;
import com.hellobike.base.tunnel.publisher.IPublisher;
import java.util.List;
import java.util.regex.Pattern;
/**
* @author machunxiao create at 2018-11-27
*/
public class HivePublisher extends BasePublisher implements IPublisher {
private final HiveClient hiveClient;
private final HiveConfig hiveConfig;
private final HdfsClient hdfsClient;
public HivePublisher(HiveConfig hiveConfig) {
this.hiveClient = new HiveClient(hiveConfig);
this.hiveConfig = hiveConfig;
this.hdfsClient = new HdfsClient(hiveConfig);
}
@Override
public void publish(Event event, Callback callback) {
//
switch (event.getEventType()) {
case INSERT:
hiveClient.insert(event);
break;
case DELETE:
hiveClient.delete(event);
break;
case UPDATE:
hiveClient.update(event);
break;
default:
break;
List<HiveRule> rules = this.hiveConfig.getRules();
for (HiveRule rule : rules) {
String table = rule.getTable();
if (!testTableName(table, event.getTable())) {
continue;
}
switch (event.getEventType()) {
case INSERT:
hdfsClient.insert(rule, event);
break;
case DELETE:
break;
case UPDATE:
break;
default:
break;
}
}
}
@Override
public void close() {
hiveClient.close();
hdfsClient.close();
}
private boolean testTableName(String tableFilter, String table) {
return Pattern.compile(tableFilter).matcher(table).matches();
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2018 Shanghai Junzheng Network Technology Co.,Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain CONFIG_NAME copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hellobike.base.tunnel.publisher.hive;
import lombok.Data;
import java.util.List;
/**
* @author machunxiao create at 2019-01-15
*/
@Data
public class HiveRule {
private String table;
private String hiveTable;
private List<String> fields;
private List<String> pks;
}

View File

@ -15,6 +15,8 @@
*/
package com.hellobike.base.tunnel;
import com.alibaba.fastjson.JSON;
import com.hellobike.base.tunnel.apollo.ApolloConfig;
import com.hellobike.base.tunnel.utils.TimeUtils;
import org.junit.Test;
@ -47,6 +49,53 @@ public class TunnelServerTest {
executor.shutdown();
}
@Test
public void test_parseConfig() {
String config = "{\n" +
" \"pg_dump_path\": \"\",\n" +
" \"subscribes\": [\n" +
" {\n" +
" \"slotName\": \"slot_for_pt_java_hive123\",\n" +
" \"pgConnConf\": {\n" +
" \"host\": \"10.111.40.139\",\n" +
" \"port\": 3034,\n" +
" \"database\": \"bike_market\",\n" +
" \"schema\": \"bike_market\",\n" +
" \"user\": \"replica\",\n" +
" \"password\": \"replica\"\n" +
" },\n" +
" \"rules\": [\n" +
" {\n" +
" \"table\": \"java_hive_student.*\",\n" +
" \"hiveTable\": \"pt_java_hive_student\",\n" +
" \"pks\": [\n" +
" \"guid\"\n" +
" ],\n" +
" \"hiveFields\": [\n" +
" \"guid\",\n" +
" \"name\",\n" +
" \"number\"\n" +
" ]\n" +
" }\n" +
" ],\n" +
" \"hiveConf\": {\n" +
" \"host\": \"10.111.20.161\",\n" +
" \"port\": 10000,\n" +
" \"user\": \"\",\n" +
" \"password\": \"\",\n" +
" \"hdfs_address\": \"VECS00028:8020,VECS00029:8020\",\n" +
" \"data_dir\": \"/tmp/forhivesync\",\n" +
" \"table_name\": \"pt_java_hive_student\",\n" +
" \"partition\": \"dt\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
ApolloConfig apolloConfig = JSON.parseObject(config, ApolloConfig.class);
apolloConfig.getSubscribes();
apolloConfig.getSubscribes();
}
private static class Task implements Runnable {
private final int idx;

View File

@ -20,6 +20,9 @@ import com.hellobike.base.tunnel.model.Event;
import com.hellobike.base.tunnel.model.EventType;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
/**
* @author machunxiao create at 2018-12-03
*/
@ -30,11 +33,23 @@ public class HiveClientTest {
HiveConfig config = new HiveConfig();
config.setHiveUrl("jdbc:hive2://10.111.20.161:10000/default;ssl=false;");
HiveClient hiveClient = new HiveClient(config);
hiveClient.insert(newEvent());
hiveClient.insert(null, newEvent());
}
@Test
public void test_find() {
Map<String, String> data = new HashMap<>();
data.put("k1", "v1");
data.put("k2", "v2");
data.put("k3", "v3");
StringBuffer out = new StringBuffer();
data.forEach((k, v) -> out.append(k).append("=").append("'").append(v).append("',"));
out.setCharAt(out.length() - 1, ' ');
System.out.println(out);
}
private Event newEvent() {

View File

@ -45,10 +45,7 @@
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>commons-configuration</artifactId>
<groupId>commons-configuration</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -113,11 +110,7 @@
<artifactId>hbase-procedure</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>