RDBのデータをELKで集計・可視化する
たまにくる「これの件数教えて」とか「Kibana見て」にしたい
とりあえずとっかかりとして、MySQLからElasticsearchにデータもってって、Aggregationを触ってみる
あたりの備忘録
- Elasticssearch 1.7.1
- Logstash 1.5.4
適当なデータを用意する
特定の場所(バッセンとか)とそのレビューのデータ
- spot: ○スローバッティングセンター とか
- location: 荻窪○スローバッティングセンター とか、locationには、緯度経度が、"lat,lon" 形式(get_pointとしてそのまま扱える形)で入っている
- review: 星3つ とか
mysql> desc spot; +-----------------+-------------+------+-----+---------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +-----------------+-------------+------+-----+---------------------+-----------------------------+ | sid | int(11) | NO | PRI | NULL | auto_increment | | name | varchar(50) | NO | | NULL | | | regist_datetime | datetime | NO | | 0000-00-00 00:00:00 | | | update_datetime | datetime | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | +-----------------+-------------+------+-----+---------------------+-----------------------------+ 4 rows in set (0.00 sec) mysql> desc location; +-----------------+-------------+------+-----+---------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +-----------------+-------------+------+-----+---------------------+-----------------------------+ | lid | int(11) | NO | PRI | NULL | auto_increment | | sid | int(11) | NO | | NULL | | | pref | varchar(4) | NO | | NULL | | | location | varchar(30) | NO | | NULL | | | regist_datetime | datetime | NO | | 0000-00-00 00:00:00 | | | update_datetime | datetime | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | +-----------------+-------------+------+-----+---------------------+-----------------------------+ 6 rows in set (0.01 sec) mysql> desc review; +-----------------+--------------+------+-----+---------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +-----------------+--------------+------+-----+---------------------+-----------------------------+ | rid | int(11) | NO | PRI | NULL | auto_increment | | lid | int(11) | NO | | NULL | | | nickname | varchar(10) | NO | | NULL | | | rate | decimal(2,1) | NO | | 0.0 | | | regist_datetime | datetime | NO | | 0000-00-00 00:00:00 | | | update_datetime | datetime | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | +-----------------+--------------+------+-----+---------------------+-----------------------------+ 6 rows in set (0.00 sec)
マッピング
{ "settings" : { "number_of_shards" : 2, "number_of_replica" : 2, "analysis": { "analyzer": { "custom_bigram": { "tokenizer": "custom_bigram_tokenizer" } }, "tokenizer": { "custom_bigram_tokenizer": { "type": "nGram", "min_gram": "2", "max_gram": "2", "token_chars": [ "letter", "digit" ] } } } }, "mappings" : { "spot" : { "properties" : { "name" : { "type" : "string", "index" : "not_analyzed", "fields": { "bigram": {"type": "string", "index": "analyzed", "analyzer": "custom_bigram" } } }, "pref" : { "type" : "string", "index" : "not_analyzed" }, "location": { "type": "geo_point" }, "regist_datetime" : { "type" : "date" }, "update_datetime" : { "type" : "date" } } }, "review": { "properties" : { "name" : { "type" : "string", "index" : "not_analyzed", "fields": { "bigram": {"type": "string", "index": "analyzed", "analyzer": "custom_bigram" } } }, "pref" : { "type" : "string", "index" : "not_analyzed" }, "lid": { "type": "integer" }, "location": { "type": "geo_point" }, "nickname": { "type": "string" }, "rate": { "type": "float" }, "regist_datetime" : { "type" : "date" } } } } }
MySQL to Elasticsearch
RDBからの入力は、jdbcプラグインがあるので今回はそれを使う。
bin/plugin install logstash-input-jdbc
jdbc_es_pipeline_spot.conf
input { jdbc { jdbc_driver_library => "./mysql-connector-java-5.1.34.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/elk" jdbc_user => "root" jdbc_password => "" statement => " SELECT l.lid `_id` , s.name , l.pref , l.location , DATE_FORMAT(l.regist_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `regist_datetime` , DATE_FORMAT(l.update_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `update_datetime` FROM location l INNER JOIN spot s ON s.sid = l.sid ORDER BY l.lid " } } filter { ruby { code => "localtime = event.timestamp.time.localtime" } } output { elasticsearch { protocol => "http" host => "192.168.1.10" port => "9200" index => "learning_elk" document_type => "spot" document_id => "%{_id}" } }
jdbc_es_pipeline_review.conf
input { jdbc { jdbc_driver_library => "./mysql-connector-java-5.1.34.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/elk" jdbc_user => "root" jdbc_password => "" statement => " SELECT r.rid `_id` , s.name , l.pref , l.lid , l.location , r.nickname , r.rate , DATE_FORMAT(r.regist_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `regist_datetime` , DATE_FORMAT(r.update_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `update_datetime` FROM review r INNER JOIN location l ON l.lid = r.lid INNER JOIN spot s ON s.sid = l.sid ORDER BY r.rid " } } filter { ruby { code => "localtime = event.timestamp.time.localtime" } } output { elasticsearch { protocol => "http" host => "192.168.1.10" port => "9200" index => "learning_elk" document_type => "review" document_id => "%{_id}" } }
なんか冗長だけどそのままそれぞれ実行
bin/logstash -f jdbc_es_pipeline_spot.conf bin/logstash -f jdbc_es_pipeline_review.conf
集計操作
単純カウント
SELECT s.name , l.pref , COUNT(l.sid) `num_spot` FROM location l INNER JOIN spot s ON s.sid = l.sid GROUP BY s.name , l.pref ORDER BY `num_spot` DESC
Aggregation
複数軸は常にネストさせる必要がある?階層的に集約とれるのはいいけど、親いらない時はtermsに2つ書きたい。。。
{ "aggs": { "count_by_name": { "terms": { "field": "name", "order": { "_count" : "desc" } }, "aggs": { "count_by_pref": { "terms": { "field": "pref", "order": { "_count" : "desc" } } } } } } }
ユニーク数カウント
SELECT l.pref , COUNT(DISTINCT s.name) `num_spot_name` FROM location l INNER JOIN spot s ON s.sid = l.sid GROUP BY l.pref ORDER BY `num_spot_name` DESC
Aggregation
{ "aggs": { "count_by_pref": { "terms": { "field": "pref", "order": { "_count" : "desc" } }, "aggs": { "count_unique_name": { "cardinality": { "field": "name" } } } } } }
基本統計量(カウント、最小、最大、平均、合計)
SELECT s.name , l.lid , COUNT(r.rid) `num_rate` , MIN(r.rate) `min_rate` , MAX(r.rate) `max_rate` , AVG(r.rate) `avg_rate` , SUM(r.rate) `total_rate` FROM review r INNER JOIN location l ON l.lid = r.lid INNER JOIN spot s ON s.sid = l.sid GROUP BY s.name , l.lid
Aggregation
{ "aggs": { "group_by_name": { "terms": { "field": "name" }, "aggs": { "stats_rate": { "stats": { "field": "rate" } }, "group_by_location": { "terms": { "field": "lid" }, "aggs": { "stats_rate": { "stats": { "field": "rate" } } } } } } } }
条件付き
SELECT s.name , COUNT(r.rid) `num_rate` , MIN(r.rate) `min_rate` , MAX(r.rate) `max_rate` , AVG(r.rate) `avg_rate` , SUM(r.rate) `total_rate` , COUNT(IF(r.nickname = 'mike', r.rid, NULL)) `mikes_num_rate` , MIN(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_min_rate` , MAX(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_max_rate` , AVG(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_avg_rate` , SUM(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_total_rate` FROM review r INNER JOIN location l ON l.lid = r.lid INNER JOIN spot s ON s.sid = l.sid GROUP BY s.name
Aggregation?
{ "aggs": { "group_by_name": { "terms": { "field": "name" }, "aggs": { "stats_rate": { "stats": { "field": "rate" } } } }, "if_mike": { "filter": { "term" : { "nickname": "mike" } }, "aggs": { "group_by_name": { "terms": { "field": "name" }, "aggs": { "stats_rate": { "stats": { "field": "rate" } } } } } } } }
とりあえずここまで
なんとなく、集計クエリとなると、大きなデータの日次なり月次なりの統計量を別のインデックスに保存して(INSERT ... SELECT ... 的な)推移とか見たくなりそうだけど、Aggregationだとさすがにそれは、Pythonとか書く必要があるか。
あと今回は、Logstash使ったけどRDBからのインポートは、他に
jprante/elasticsearch-jdbc · GitHub
embulk/embulk · GitHub
とかがつかえる。embulkの方がよかったかな。。。
#Kなし
とりあえず、ドキュメントを一通り読まねば。。。
おまけメモ
こないだ触ったデータが、緯度と経度をテーブルの別のカラムに複数もっててそれをElasticsearchのgeo_pointにマッピングする必要があったんだけど、SQL以外書きたくなかったので、zip関数(のようなもの)を作った
mysql> select * from pivot; +--------+ | rownum | +--------+ | 1 | | 2 | | 3 | | 4 | | 5 | +--------+ mysql> select zip('a b c', '1 2 3', ' ', ','); +----------------------------------------+ | zip('a b c', '1 2 3', ' ', ',') | +----------------------------------------+ | a,1 b,2 c,3 | +----------------------------------------+ 1 row in set (0.00 sec)