RDBのデータをELKで集計・可視化する

たまにくる「これの件数教えて」とか「Kibana見て」にしたい

とりあえずとっかかりとして、MySQLからElasticsearchにデータもってって、Aggregationを触ってみる

  • MySQL入力からElasticsearch出力とか
  • 基本的なSQLの集計クエリをどう置き換えるかとか

あたりの備忘録

  • Elasticssearch 1.7.1
  • Logstash 1.5.4

適当なデータを用意する

特定の場所(バッセンとか)とそのレビューのデータ

  • spot: ○スローバッティングセンター とか
  • location: 荻窪○スローバッティングセンター とか、locationには、緯度経度が、"lat,lon" 形式(get_pointとしてそのまま扱える形)で入っている
  • review: 星3つ とか

mysql> desc spot;
+-----------------+-------------+------+-----+---------------------+-----------------------------+
| Field           | Type        | Null | Key | Default             | Extra                       |
+-----------------+-------------+------+-----+---------------------+-----------------------------+
| sid             | int(11)     | NO   | PRI | NULL                | auto_increment              |
| name            | varchar(50) | NO   |     | NULL                |                             |
| regist_datetime | datetime    | NO   |     | 0000-00-00 00:00:00 |                             |
| update_datetime | datetime    | NO   |     | CURRENT_TIMESTAMP   | on update CURRENT_TIMESTAMP |
+-----------------+-------------+------+-----+---------------------+-----------------------------+
4 rows in set (0.00 sec)

mysql> desc location;
+-----------------+-------------+------+-----+---------------------+-----------------------------+
| Field           | Type        | Null | Key | Default             | Extra                       |
+-----------------+-------------+------+-----+---------------------+-----------------------------+
| lid             | int(11)     | NO   | PRI | NULL                | auto_increment              |
| sid             | int(11)     | NO   |     | NULL                |                             |
| pref            | varchar(4)  | NO   |     | NULL                |                             |
| location        | varchar(30) | NO   |     | NULL                |                             |
| regist_datetime | datetime    | NO   |     | 0000-00-00 00:00:00 |                             |
| update_datetime | datetime    | NO   |     | CURRENT_TIMESTAMP   | on update CURRENT_TIMESTAMP |
+-----------------+-------------+------+-----+---------------------+-----------------------------+
6 rows in set (0.01 sec)

mysql> desc review;
+-----------------+--------------+------+-----+---------------------+-----------------------------+
| Field           | Type         | Null | Key | Default             | Extra                       |
+-----------------+--------------+------+-----+---------------------+-----------------------------+
| rid             | int(11)      | NO   | PRI | NULL                | auto_increment              |
| lid             | int(11)      | NO   |     | NULL                |                             |
| nickname        | varchar(10)  | NO   |     | NULL                |                             |
| rate            | decimal(2,1) | NO   |     | 0.0                 |                             |
| regist_datetime | datetime     | NO   |     | 0000-00-00 00:00:00 |                             |
| update_datetime | datetime     | NO   |     | CURRENT_TIMESTAMP   | on update CURRENT_TIMESTAMP |
+-----------------+--------------+------+-----+---------------------+-----------------------------+
6 rows in set (0.00 sec)

マッピング

{
    "settings" : {
        "number_of_shards" : 2,
        "number_of_replica" : 2,
        "analysis": {
            "analyzer": {
                "custom_bigram": {
                    "tokenizer": "custom_bigram_tokenizer"
                }
            },
            "tokenizer": {
                "custom_bigram_tokenizer": {
                    "type": "nGram",
                    "min_gram": "2",
                    "max_gram": "2",
                    "token_chars": [
                        "letter",
                        "digit"
                    ]
                }
            }
        }
    },
    "mappings" : {
        "spot" : {
            "properties" : {
                "name" : {
                    "type" : "string",
                    "index" : "not_analyzed",
                    "fields": {
                        "bigram": {"type": "string", "index": "analyzed", "analyzer": "custom_bigram" }
                    }
                },
                "pref" : {
                    "type" : "string",
                    "index" : "not_analyzed"
                },
                "location": {
                    "type": "geo_point"
                },
                "regist_datetime" : {
                    "type" : "date"
                },
                "update_datetime" : {
                    "type" : "date"
                }
            }
        },
        "review": {
            "properties" : {
                "name" : {
                    "type" : "string",
                    "index" : "not_analyzed",
                    "fields": {
                        "bigram": {"type": "string", "index": "analyzed", "analyzer": "custom_bigram" }
                    }
                },
                "pref" : {
                    "type" : "string",
                    "index" : "not_analyzed"
                },
                "lid": {
                    "type": "integer"
                },
                "location": {
                    "type": "geo_point"
                },
                "nickname": {
                    "type": "string"
                },
                "rate": {
                    "type": "float"
                },
                "regist_datetime" : {
                    "type" : "date"
                }
            }
        }
    }
}

MySQL to Elasticsearch

RDBからの入力は、jdbcプラグインがあるので今回はそれを使う。

bin/plugin install logstash-input-jdbc

jdbc_es_pipeline_spot.conf

input {
    jdbc {
        jdbc_driver_library => "./mysql-connector-java-5.1.34.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/elk"
        jdbc_user => "root"
        jdbc_password => ""
        statement => "
            SELECT
                l.lid `_id`
            ,   s.name
            ,   l.pref
            ,   l.location
            ,   DATE_FORMAT(l.regist_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `regist_datetime`
            ,   DATE_FORMAT(l.update_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `update_datetime`
            FROM
                location l
                INNER JOIN spot s
                    ON s.sid = l.sid
            ORDER BY
                l.lid
        "
    }
}

filter {
    ruby {
        code => "localtime = event.timestamp.time.localtime"
    }
}

output {
    elasticsearch {
        protocol => "http"
        host => "192.168.1.10"
        port => "9200"
        index => "learning_elk"
        document_type => "spot"
        document_id => "%{_id}"
    }
}

jdbc_es_pipeline_review.conf

input {
    jdbc {
        jdbc_driver_library => "./mysql-connector-java-5.1.34.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/elk"
        jdbc_user => "root"
        jdbc_password => ""
        statement => "
            SELECT
                r.rid `_id`
            ,   s.name
            ,   l.pref
            ,   l.lid
            ,   l.location
            ,   r.nickname
            ,   r.rate
            ,   DATE_FORMAT(r.regist_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `regist_datetime`
            ,   DATE_FORMAT(r.update_datetime, '%Y-%m-%dT%H:%i:%s.000+0900') `update_datetime`
            FROM
                review r
                INNER JOIN location l
                    ON l.lid = r.lid
                INNER JOIN spot s
                    ON s.sid = l.sid
            ORDER BY
                r.rid
        "
    }
}

filter {
    ruby {
        code => "localtime = event.timestamp.time.localtime"
    }
}

output {
    elasticsearch {
        protocol => "http"
        host => "192.168.1.10"
        port => "9200"
        index => "learning_elk"
        document_type => "review"
        document_id => "%{_id}"
    }
}

なんか冗長だけどそのままそれぞれ実行

bin/logstash -f jdbc_es_pipeline_spot.conf
bin/logstash -f jdbc_es_pipeline_review.conf

集計操作

単純カウント

SQL

SELECT
	s.name
,	l.pref
,	COUNT(l.sid) `num_spot`
FROM
	location l
	INNER JOIN spot s
		ON s.sid = l.sid
GROUP BY
	s.name
,	l.pref
ORDER BY
	`num_spot` DESC

Aggregation
複数軸は常にネストさせる必要がある?階層的に集約とれるのはいいけど、親いらない時はtermsに2つ書きたい。。。

{
    "aggs": {
        "count_by_name": {
            "terms": {
                "field": "name",
                "order": { "_count" : "desc" }
            },
            "aggs": {
                "count_by_pref": {
                    "terms": {
                        "field": "pref",
                        "order": { "_count" : "desc" }
                    }
                }
            }
        }
    }
}
ユニーク数カウント

SQL

SELECT
	l.pref
,	COUNT(DISTINCT s.name) `num_spot_name`
FROM
	location l
	INNER JOIN spot s
		ON s.sid = l.sid
GROUP BY
	l.pref
ORDER BY
	`num_spot_name` DESC

Aggregation

{
    "aggs": {
        "count_by_pref": {
            "terms": {
                "field": "pref",
                "order": { "_count" : "desc" }
            },
            "aggs": {
                "count_unique_name": {
                    "cardinality": {
                        "field": "name"
                    }
                }
            }
        }
    }
}
基本統計量(カウント、最小、最大、平均、合計)

SQL

SELECT
	s.name
,	l.lid
,	COUNT(r.rid) `num_rate`
,	MIN(r.rate) `min_rate`
,	MAX(r.rate) `max_rate`
,	AVG(r.rate) `avg_rate`
,	SUM(r.rate) `total_rate`
FROM
	review r
	INNER JOIN location l
		ON l.lid = r.lid
	INNER JOIN spot s
		ON s.sid = l.sid
GROUP BY
	s.name
,	l.lid

Aggregation

{
    "aggs": {
        "group_by_name": {
            "terms": {
                "field": "name"
            },
            "aggs": {
                "stats_rate": {
                    "stats": {
                        "field": "rate"
                    }
                },
                "group_by_location": {
                    "terms": {
                        "field": "lid"
                    },
                    "aggs": {
                        "stats_rate": {
                            "stats": {
                                "field": "rate"
                            }
                        }
                    }
                }
            }
        }
    }
}
条件付き

SQL

SELECT
	s.name
,	COUNT(r.rid) `num_rate`
,	MIN(r.rate) `min_rate`
,	MAX(r.rate) `max_rate`
,	AVG(r.rate) `avg_rate`
,	SUM(r.rate) `total_rate`
,	COUNT(IF(r.nickname = 'mike', r.rid, NULL)) `mikes_num_rate`
,	MIN(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_min_rate`
,	MAX(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_max_rate`
,	AVG(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_avg_rate`
,	SUM(IF(r.nickname = 'mike', r.rate, NULL)) `mikes_total_rate`
FROM
	review r
	INNER JOIN location l
		ON l.lid = r.lid
	INNER JOIN spot s
		ON s.sid = l.sid
GROUP BY
	s.name

Aggregation?

{
    "aggs": {
        "group_by_name": {
            "terms": {
                "field": "name"
            },
            "aggs": {
                "stats_rate": {
                    "stats": {
                        "field": "rate"
                    }
                }
            }
        },
        "if_mike": {
            "filter": {
                "term" : {
                    "nickname": "mike"
                }
            },
            "aggs": {
                "group_by_name": {
                    "terms": {
                        "field": "name"
                    },
                    "aggs": {
                        "stats_rate": {
                            "stats": {
                                "field": "rate"
                            }
                        }
                    }
                }
            }
        }
    }
}

とりあえずここまで

なんとなく、集計クエリとなると、大きなデータの日次なり月次なりの統計量を別のインデックスに保存して(INSERT ... SELECT ... 的な)推移とか見たくなりそうだけど、Aggregationだとさすがにそれは、Pythonとか書く必要があるか。
あと今回は、Logstash使ったけどRDBからのインポートは、他に
jprante/elasticsearch-jdbc · GitHub
embulk/embulk · GitHub
とかがつかえる。embulkの方がよかったかな。。。
#Kなし

とりあえず、ドキュメントを一通り読まねば。。。

おまけメモ

こないだ触ったデータが、緯度と経度をテーブルの別のカラムに複数もっててそれをElasticsearchのgeo_pointにマッピングする必要があったんだけど、SQL以外書きたくなかったので、zip関数(のようなもの)を作った

gist.github.com

mysql> select * from pivot;
+--------+
| rownum |
+--------+
|      1 |
|      2 |
|      3 |
|      4 |
|      5 |
+--------+

mysql> select zip('a b c', '1 2 3', ' ', ',');
+----------------------------------------+
| zip('a b c', '1 2 3', ' ', ',')        |
+----------------------------------------+
| a,1 b,2 c,3                            |
+----------------------------------------+
1 row in set (0.00 sec)