Alex‘s Blog

Coding My Life


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

euler简单上手

发表于 2020-03-25 | 分类于 TensorFlow , 深度学习

说明

本文主要介绍euler利用docker再单机上模拟分布式训练,分布式模型评估,分布式embedding的简单上手体验过程,一些基础的工作我已经准备好了。如果只是单机的训练等,直接参考官方文档就好了,可以在本地的环境或是利用现有的docker环境都可以。

本教程主要用docker-compose在单台物理机上来模拟多主机环境,后面如需要跑在k8s环境中可参考其中内容进行改造即可。

安装

源码下载

  1. git clone –recursive https://github.com/alibaba/euler.git`

  2. 如果过程中clone失败,那么进入主目录使用submodule update --init --recursive --progress检出各个子模块

  3. 对于目录D:\shadowsocks\euler\third_party要检出第三方依赖的问题,

    1. 如果报的是zookeeper的Unable to find current revision in submodule path问题,那么可以在当前目录中直接将zookeeper的代码clone下来,然后再切换到源码中指定的commit版本:

      1
      2
      3
      git clone https://github.com/apache/zookeeper.git`
      cd zookeeper
      git checkout 05b774a1b05374618300f657c9c91b0d5c6ddf71
    2. 如何因为网络原因无法clone,则可以先再网上找到相应的包,然后再修改commit版本:

      1
      2
      3
      1. 使用此链接下载Fuzzer并解压:https://github-production-repository-file-5c1aeb.s3.amazonaws.com/165004157/2991980?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20200228%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20200228T111759Z&X-Amz-Expires=300&X-Amz-Signature=0e9db781ee0e38e1b1c02cfbdeff669484631d9aa2c5bbfa64c9c243b6bab2f6&X-Amz-SignedHeaders=host&actor_id=8743639&response-content-disposition=attachment%3Bfilename%3DFuzzer.zip&response-content-type=application%2Fzip
      2. 将Fuzzer文件夹名称改成libFuzzer,并且进入此文件夹。
      3. 修改commit版本:git checkout 1b543d6e5073b56be214394890c9193979a3d7e1
    3. 如果别的依赖子模块出现问题可参考以上两种方法。

在当前进度下,安装时建议使用master分支

阅读全文 »

使用Flink SQL读取kafka数据并通过JDBC方式写入Clickhouse实时场景的简单实例

发表于 2019-11-27 | 分类于 实时 , olap , BigData , clickhouse , 大数据

说明

读取kafka数据并且经过ETL后,通过JDBC存入clickhouse中

代码

定义POJO类:

1
2
3
4
5
6
7
8
public class Student {
private int id;
private String name;
private String password;
private int age;
private String date;
//构造,setter 和 getter 省略
}

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

//###############定义消费kafka source##############
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

tableEnv.connect(new Kafka().version("0.10")
.topic("student").properties(props).startFromLatest())
.withFormat(new Json().deriveSchema())
.withSchema(new Schema().field("id", Types.INT())
.field("name", Types.STRING())
.field("password", Types.STRING())
.field("age", Types.INT())
.field("date", Types.STRING()))
.inAppendMode()
.registerTableSource("kafkaTable");
Table result = tableEnv.sqlQuery("SELECT * FROM " + "kafkaTable");

//###############定义clickhouse JDBC sink##############
String targetTable = "clickhouse";
TypeInformation[] fieldTypes = {BasicTypeInfo.INT_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
TableSink jdbcSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl("jdbc:clickhouse://localhost:8123")
.setQuery("insert into student_local(id, name, password, age, date) values(?, ?, ?, ?, ?)")
.setParameterTypes(fieldTypes)
.setBatchSize(15)
.build();

tableEnv.registerTableSink(targetTable,new String[]{"id","name", "password", "age", "date"}, new TypeInformation[]{Types.INT(), Types.STRING(), Types.STRING(), Types.INT(), Types.STRING()}, jdbcSink);

result.insertInto(targetTable);
env.execute("Flink add sink");
阅读全文 »

制作可传参的docker clickhouse-client

发表于 2019-10-27 | 分类于 olap , BigData , clickhouse , 大数据

Dockerfile:

1
2
3
4
5
6
7
FROM clickhouse-client:latest

ENV HOST="localhost"
ENV USER="default"
ENV PASSWORD=""
ENV DATABASE="default"
ENTRYPOINT /usr/bin/clickhouse-client -h $HOST -u $USER --password $PASSWORD -d $DATABASE

运行命令:

1
docker run -it --rm --name clickhouse-client -e HOST="10.185.xx.xx" -e USER="user" -e PASSWORD="xxxx" -e DATABASE="db"  clickhouse-client:param

遇到问题:

当ENTRYPOINT使用方括号并且里面的参数是明文而不是环境变量时,是可以运行的,如

1
ENTRYPOINT ["/usr/bin/clickhouse-client","-h","10.185.xx.xx","-u","user","--password","xxx"]

但是形如:

1
ENTRYPOINT ["/usr/bin/clickhouse-client","-h",$HOST,"-u",$USER,"--password",$PASSWORD]

就会报:

1
/bin/sh 1 [/usr/bin/clickhouse-client,-h,10.185.xx.xx,-u,user,--password,password] not found

搭建clickhouse监控

发表于 2019-06-15 | 分类于 olap , BigData , clickhouse , 大数据

原由与方案

为了更好地优化clickhouse的性能,需要对clickhouse集群进行监控。网上很多监控方案都是clickhouse + grafana + prometheus,因此打算使用此方案。

要想使用prometheus就得先安装exporter,clickhouse有第三方提供的clickhouse_exporter,也有容器版本,并且提供了grafana的dashboard版本: https://grafana.net/dashboards/882。由于我们在集群中使用了代理CHproxy,但CHproxy也同时实现了exporter的功能,并且提供了更多的特性以及grafana dashboard模板https://github.com/Vertamedia/chproxy/blob/master/chproxy_overview.json,所以我们也就直接使用。

安装部署

我们使用了docker进行部署,并且使用docker-compose进行编排,并且将配置文件和重要数据挂载到宿主机。

docker-compose如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
version: '3'
services:
prometheus:
image: prom/prometheus:latest
restart: always
network_mode: host
user: root
container_name: prometheus
ports:
- "9090:9090"
depends_on:
- chproxy
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./prometheus-data:/prometheus
grafana:
image: grafana/grafana:latest
restart: always
network_mode: host
user: root
container_name: grafana
ports:
- "3000:3000"
depends_on:
- prometheus
volumes:
- ./grafana-data/var/lib/grafana:/var/lib/grafana
- ./grafana-data/etc/grafana:/etc/grafana

chproxy:
image: tacyuuhon/clickhouse-chproxy:1.13.2
restart: always
network_mode: host
container_name: chproxy
ports:
- 9092:9092
volumes:
- ./config.yml:/opt/config.yml

CHproxy的配置文件config.xml,省略,参考上篇文章。

prometheus的配置文件prometheus.xml如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'

# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.

static_configs:
- targets: ['localhost:9090']

- job_name: 'clickhouse-chproxy'

# 覆盖全局的 scrape_interval
scrape_interval: 10s

static_configs:
- targets: ['localhost:9092']

可通过http://ip:9090/targets查看prometheus配置文件中配置的job是否成功。

请注意:

  1. 因为docker-compose中使用的网络模式为host,所以在prometheus的配置文件中ip地址都写为localhost,并且要填上对应容器的端口。
  2. grafana容器可将/var/lib/grafana和/etc/grafana目录拷贝出来放到宿主机上并且重新挂载到容器中这样的话删除并且重启容器时不会导致数据丢失。docker-compose中grafana和prometheus容器可能需要将user设为root,这样的话当宿主机的用户为root时就有权限写mount的目录。
  3. 当通过docker-compose编排好之后,通过’http://ip:3000'登录grafana,并且配置好prometheus数据源,再将dashboard模板https://github.com/Vertamedia/chproxy/blob/master/chproxy_overview.json导入之后发现没有数据,连左上角的job下拉框都没有任何数据。然后我通过当前dashboard的`Variables`发现下拉框job的内容是通过prometheus中`go_info`来获取的,但是我通过promql查询`go_info`中没有`prometheus.yml`中配置的`-job_name`为`clickhouse-chproxy`的内容。但是我发现`go_goroutines`中有,于是我将`go_info`替换成了`go_goroutines`。
  4. 经过测试返现只有CHproxy至少经过一次的使用查询之后go_goroutines中才有-job_name为clickhouse-chproxy的内容,其他metrics也是才会出现。

go_goroutines

go_info没有别的信息,需要使用接口发送一次查询

grafana mount 需要 root user 当前host 为root

题外话

我直接在grafana容器中安装了clickhouse DataSource插件,并且制作成了镜像,这样的话grafana也可以直接查询clickhouse了哦,参考:https://github.com/Vertamedia/clickhouse-grafana,https://grafana.com/plugins/vertamedia-clickhouse-datasource/installation。

解决clickhouse并发问题之CHproxy安装配置

发表于 2019-06-14 | 分类于 olap , BigData , clickhouse , 大数据

问题

由于新的superset看板查询时每个chart涉及到的底表数据量增加以及并发较多导致clickhouse因为OOM挂掉而返回错误,并且影响到别的看板的使用以及整个系统的稳定性。但是单独查询某个chart是并没有问题,因此需要控制看板查询时的并发度。

解决

通过调研以及社区的帮助,可通过代理的方式控制并发度,并且可使用的代理有[ProxySQL](https://proxysql.com/)以及专门为clickhouse开发的第三方CHproxy。考虑到CHproxy对clickhouse的很多原生配置特性支持的比较好,比如set属性相关的(可以控制每个查询所使用的最大内存等)以及负载均衡,并发控制等等很多特性,故选择了CHproxy。

安装部署

CHproxy因为是go语言写的,因此安装部署也很简单,也可以通过docker进行部署,具体可以参考文档。启动命令如下:

1
./chproxy -config=/path/to/config.yml

下面是配置文件,仅供参考:

config.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
server:
http:
listen_addr: ":9092"

# Networks with reporting servers.
allowed_networks: ["11.3.0.0/16","172.0.0.0/8","127.0.0.0/8"]

users:
- name: "proxy-user"
password: "password"
to_cluster: "select"
to_user: "user"
max_concurrent_queries: 10
max_execution_time: 3m
max_queue_size: 120
max_queue_time: 120s
cache: "shortterm"
params: "cron-job"

clusters:
- name: "select"
replicas:
- name: "replica1"
nodes: ["ip11:8123", "ip21:8123"]
- name: "replica2"
nodes: ["ip12:8123", "ip22:8123"]
heartbeat_interval: 1m
users:
- name: "user"
password: "password"
max_concurrent_queries: 10
max_execution_time: 3m
max_queue_size: 120
max_queue_time: 120s

caches:
- name: "shortterm"
dir: "~/chproxy/cache/dir"
max_size: 150Mb
expire: 14400s

param_groups:
# Group name, which may be passed into `params` option on the `user` level.
- name: "cron-job"
# List of key-value params to send
params:
- key: "max_memory_usage"
value: "9000000000"

- key: "max_bytes_before_external_group_by"
value: "4500000000"

- name: "web"
params:
- key: "max_memory_usage"
value: "5000000000"

- key: "max_columns_to_read"
value: "30"

- key: "max_execution_time"
value: "30"

更为灵活的编程式sql:superset中jinja template的使用

发表于 2019-05-13 | 分类于 使用 , 可视化

superset中仅仅是对数据表的操作很多时候还是没法满足我们的数据展示需求,因此superset提供了jinja template的方式让我们更为灵活地自定义sql语句。

使用说明

通过jinja中文文档http://docs.jinkan.org/docs/jinja2/templates.html,我们可以了解jinja template如何使用,包括各种分隔符:

1
2
{% ... %}
{{ ... }}

前者用于执行诸如 for 循环 或赋值的语句,后者把表达式的结果打印到模板上,filters,条件判断,宏等。

superset关于jinja template的说明:https://superset.incubator.apache.org/sqllab.html。其中superset提供了`filter_values`和`url_param`两个jinja方法,其中`filter_values`更为重要。`filter_values`可以接收filter_box这个chart的值,并且以列表的方式输出。

superset还提供了一些操作函数,与python的import层级一一对应,比如:

  • time: time
  • datetime: datetime.datetime
  • uuid: uuid
  • random: random
  • relativedelta: dateutil.relativedelta.relativedelta

还可以使用jinja内置过滤器并且通过管道的方式使用:http://docs.jinkan.org/docs/jinja2/templates.html#id21。

阅读全文 »

spark jdbc导数clickhouse时attempt的坑

发表于 2019-04-12 | 分类于 问题 , spark , clickhouse , 大数据

问题

spark 的容错机制会使得spark application失败时尝试重启,从web ui中我们就可以看到其Attempt ID大于1。这是个好的机制,但是对于通过jdbc将数据导入clichouse的过程来说却是个不好的体验。因为如果重启的application的话会导致导入的数据重复,使得总的数据量增多,然后zookeeper有数据块的校验机制。

可通过--conf spark.yarn.maxAppAttempts=1使得application的重启次数为1,但是又带来可能出现的数据导入不全的情况出现。

看来得寻找一种好的数据导入工具,希望是操作友好,可视化,有监控,易管理,数据不会重复导入的工具。网上看了一下好像NIFI还不错,先调研调研。

解决方法

加快导数clickhouse的速度,缩短导入时间,这样就能够大概率上避免spark attempt的出现。具体方法是:

在spark的jdbc properties中

  1. 设置合适的batchsize。
  2. 设置合适的并发度。
  3. 设置合适的内存使用量。

可参考上篇文章:解决clickhouse批量插入时内存溢出导致docker挂掉的问题

解决clickhouse批量插入时内存溢出导致docker挂掉的问题

发表于 2019-04-12 | 分类于 olap , BigData , clickhouse , 大数据

问题

为了加快导入数据的速度我将针对每个分片的导入并行度都设置为3,如下配置:

1
2
3
4
5
6
7
properties_4_click_order_behavior = {'driver': 'ru.yandex.clickhouse.ClickHouseDriver',
"socket_timeout": "300000",
"rewriteBatchedStatements": "true",
"batchsize": "1000000",
"numPartitions": "3",
'user': CONF_CONSTANT.user,
'password': CONF_CONSTANT.password}

之后各个分片就报出了connection refused的问题,然后运维那边说是内存溢出了,我们的每个分片的内存只有24G。当然,当numPartitions为1的时候是没问题的。

我还是是希望能够加快能够加快导入速度,1个并行插入效率确实低,而且还有一个让我比较疑惑的地方是,我们有一个只有3个容器,配置为8核,32G,只有1副本,但是我令numPartitions为12,batchsize为20000000都没有问题。

初步解决

这里得感谢,QQ群Clickhouse牛人帮的几位大牛的帮助,我通过翻聊天记录查看到与我遇到类似问题的,并且有了解决方法,就是通过设置max_memory_usage和max_bytes_before_external_group_by来控制内存使用,按照官方文档建议,max_bytes_before_external_group_by为max_memory_usage二分之一。既然容器的内存只有24G,那我就设置max_memory_usage为20G好了,设置如下:

1
2
3
4
5
6
7
8
9
properties_4_click_order_behavior = {'driver': 'ru.yandex.clickhouse.ClickHouseDriver',
"socket_timeout": "300000",
"rewriteBatchedStatements": "true",
"batchsize": "1000000",
"numPartitions": "3",
'user': "user",
'password': "password",
'max_memory_usage': "20000000000",
'max_bytes_before_external_group_by': "10000000000"}

一开始我设置numPartitions为8,但是仍然出现connection refused的问题,后来调整成3就好了。为什么这样呢?

阅读全文 »

解决OutOfMemoryError: Direct buffer memory问题

发表于 2019-04-09 | 分类于 大数据 , BigData , Spark , Spark-Sql,调优

问题

近日处理一些大数据时老是出现OutOfMemoryError: Direct buffer memory的问题,一开始以为是数据倾斜问题,然后使用拆分倾斜key分别join再union的方法处理数据倾斜。后来测试发现,反而是非倾斜部分的数据进行join时出现了此问题。

实验过程

我做了些实验:

  1. 大表column1中-和notset字符串的量分别为8.5亿和2.8亿,占了大约总量的二分之一。
  2. 这两张表中个表我只取column1这个字段,并且根据column1 groupby 之后cont()为num,再将这两张表的结果进行join,并且增加列为表1的num乘以
    表2的num的结果,即为两张原始表join后的数量。结果发现前三数量最大的为16780380,255084,147246,无-或是字符串notset。
  3. 小表table1中没有column1为 -或是字符串notset, 同样这两个字符串也不会再步骤2中出现。也就是-和字符串notset在left join中起不到任何作用,只会在shuffle是占用大量空间。
  4. 通过观察web ui 中的sql 标签页,发现都是大表与大表的“SortMergeJoin”。
  5. 因为左连接左小表table1的column1中没有-和字符串notset,在读取右大表直接过滤掉column1中含-和字符串notset的列,至此实验通过,不再报OutOfMemoryError: Direct buffer memory的问题。
阅读全文 »

sparkSql操作hive表的PlainValuesDictionary问题

发表于 2019-04-04 | 分类于 大数据 , BigData , Spark , Spark-Sql,问题

操作hive表时出现一下报错:

1
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary

原因:parquet文件中某些列的数据类型不一致。

例如这个表的某一列的schema类型为string,但是这一列的数据是从不同的数据源插入的,但是某些数据源的类型是int,却也能插入成功,所以当操作此表并且包含此列时,就可能会出现以上问题。

解决:

不同类型的数据插入表时,强制转化成与表schema一致的类型。

12…5
Alex Wong

Alex Wong

不管年龄大小,每个人都是我的老师

46 日志
57 分类
55 标签
Links
  • shalk
  • luckylau
© 2020 Alex Wong
由 Hexo 强力驱动
|
主题 — NexT.Pisces v5.1.4