GCEインスタンスでpostgresqlをフェイルオーバーさせる

GCEでは、単一インスタンスに同一ネットワークのIPアドレスを複数持たせる事ができない。
複数のネットワーク インターフェースの概要と例

つまり、Virtual IPを同一セグメントに持たせた構成は出来ない。

クラスタを組みたかったけど、今回の要件としては

1. Active/Standbyの構成にする
2. Activeのpostgresが止まった場合、Standbyを昇格する
3. フェイルバックは行わない。
4. フェイルオーバー後、App側で検知。
   必要な設定変更後、動的にサービスの再起動を行う。
5. 1分以内にサービスが復旧される。

とシンプルな構成にする。

PostgreSQL

要件1、2には、pg_keeperを使う。
クラスタでは無いのでスプリットブレインの検知などは出来ない。
その為、アプリケーション側に今のActive機の情報を教える必要がある。

スプリットブレイン対策用のDB、Table作成

postgres=#  create database pg_state;
postgres=#  create table failover_log (unixtime int, host varchar(10));

インストール(Actice/Standby共に)

cd /usr/local/src

git clone https://github.com/MasahikoSawada/pg_keeper.git

export PATH=/usr/pgsql-9.6/bin/:$PATH

make USE_PGXS=1

make USE_PGXS=1 install

postgresql.conf書き換え(Actice/Standby共に)

vim postgresql.conf

shared_preload_libraries = 'pg_keeper'
pg_keeper.my_conninfo = 'host=10.0.0.10 port=5432 dbname=postgres'
pg_keeper.partner_conninfo = 'host=10.0.0.11 port=5432 dbname=postgres'
pg_keeper.keepalive_time = 2
pg_keeper.keepalive_count = 3
pg_keeper.after_command = 'sleep 1 ; psql -d pg_state -c "insert into failover_log values(`date +%s`, \'`hostname`\');" -x'

Activeが止まった場合、pg_keeper.keepalive_time秒 × pg_keeper.keepalive_count回 チェックを行い、全てNGの場合にフェイルオーバーを実行し、最後にpg_keeper.after_commandの内容が実行される。
今回は、フェイルオーバー後に[unixtimestamp, hostname]を pg_state.failover_logに入れている。

app側

これはアプリケーションのよるので参考まで。
monitor_master_db.pyというモニタリングスクリプトを作成し、root権限で動かす事にした。
動きとしては、Active/Standby両機のDBのpg_state.failover_logをチェックし、タイムスタンプが若い方をDB接続先として、設定ファイル(yaml)を書き換えデーモンの再起動を行う。

#!/bin/env python3

import os,sys
import yaml
import psycopg2
import codecs
import subprocess

yaml_file = '/PATH/TO/env.yaml'
dbs = ['postgresql://postgres@db01:5432/pg_state'
         ,'postgresql://postgres@db02:5432/pg_state']

def get_item():
    arr = []
    for db in dbs :
        try:
            dbcon = psycopg2.connect(db)
            cur = dbcon.cursor()
            cur.execute('select * from failover_log order by unixtime desc limit 1')
            result = cur.fetchone()
            cur.close()
            dbcon.close()
            arr.append(result)
        except :
            pass
    if len(dbs) == len(arr):    # Active/Standby共にデータ取得成功
        if arr[0][0] > arr[1][0]:
            return arr[0][1]
        else :
            return arr[1][1]

    else :                             # 片系が停止している
        return arr[0][1]


def overwrite(db_name):
    with codecs.open(yaml_file, 'r', 'utf-8') as read :
        env_dict = yaml.load(read)

        if env_dict['db_master'][0]['address'] != '{}:5432'.format(db_name) or env_dict['db_slave'][0]['address'] != '{}:5432'.format(db_name):
            env_dict['db_master'][0]['address'] = '{}:5432'.format(db_name)
            env_dict['db_slave'][0]['address'] = '{}:5432'.format(db_name)

            with codecs.open(yaml_file, 'w', 'utf-8') as write :
                yaml.dump(env_dict, write, encoding='utf8', allow_unicode=True, default_flow_style=False)

            try:
                subprocess.check_call(["systemctl", "restart", "デーモン"])
            except :
                pass

作成したmonitor_master_db.pyをcronで動かす。
cronは普通に書くと1分が最小の実行単位だが、以下のように書くと5秒単位でスクリプトを実行してくれる。

# 5秒間隔
* * * * * for i in `seq 1 12`;do sleep 5; python3 /usr/local/bin/monitor_master_db.py; done

# 10秒間隔の場合
* * * * * for i in `seq 1 6`;do sleep 10; python3 /usr/local/bin/monitor_master_db.py; done

この状態で、Active側のDBを落として、フェイルオーバーされApp側の接続先も変更される事を確認する。
Slave側が昇格前にfailover_logへのinsertが実行される場合、pg_keeper.after_commandのsleepを大きくする。

pg_keeper.after_command = 'sleep 5 ; psql -d pg_state -c "insert into failover_log values(`date +%s`, \'`hostname`\');" -x'

「さんあ~る」からゴミ捨てカレンダーをスクレイピングする

4月に千葉県柏市のゴミ捨てアプリ「さんあ〜る」が、Web版としてリリースされました。

ごみ分別アプリ「さんあ~る」がインターネットでも利用できます!

さんあ〜るのごみカレンダーをiframeとしてWPに引っ張ってこようと思ったけど拡縮の問題があり挫折。
あと、画像がいらないな、と思ったのでpython3でスクレイピングしてみた。

# coding: UTF-8
import datetime
import urllib.request
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
from bs4 import BeautifulSoup

today = datetime.date.today()
year = today.strftime("%Y")
month = today.strftime("%m")

# アクセスするURL
# XXXXXは地区番号らしい
url = 'https://manage.delight-system.com/threeR/web/calendar?jichitaiId=kashiwashi&areaId=22125&year={year}&month={month}'.format(year=year, month=month)

html = urllib.request.urlopen(url)
soup = BeautifulSoup(html, "html.parser")

table = soup.find_all("table")
cal = []
for tag in table :
    try:
        #import pdb; pdb.set_trace()
        days = tag.find_all("td")  # カレンダーのセル数(7日×週)
        for day in days:
            span = day.find_all('span')
            tmp = []
            if len(span) > 0  :
                for num in range(len(span)):
                    try:
                        if span[num].get("class")[0] in ('common','sat','sun'): 
                            tmp.append(span[0].string)           # 日
                            tmp.append(span[0].get('class')[0])  # 曜

                        elif span[num].get("class")[0] in 'trash_kind_name': # ゴミ種類
                            tmp.append(span[num].string)
                        cal.append(tmp)
                    except:
                        pass
            else : # 空白セル
                cal.append([''])
    except:
        pass

print(cal)

後で、オリジナルのカレンダーの情報に充てがう予定なのでlistに入れる。

結果

[[''], [''], [''], [''], [''], ['1', 'common'], ['2', 'sat'], ['3', 'sun'], ['4', 'common', '可燃ごみ'], ['4', 'common', '可燃ごみ'], ['5', 'common', '不燃ごみ'], ['5', 'common', '不燃ごみ'], ['6', 'common', '容器包装プラスチック類'], ['6', 'common', '容器包装プラスチック類'], ['7', 'common', '可燃ごみ'], ['7', 'common', '可燃ごみ'], ['8', 'common'], ['9', 'sat'], ['10', 'sun'], ['11', 'common', '可燃ごみ'], ['11', 'common', '可燃ごみ'], ['12', 'common'], ['13', 'common', '資源品', '容器包装プラスチック類'], ['13', 'common', '資源品', '容器包装プラスチック類'], ['13', 'common', '資源品', '容器包装プラスチック類'], ['14', 'common', '可燃ごみ'], ['14', 'common', '可燃ごみ'], ['15', 'common'], ['16', 'sat'], ['17', 'sun'], ['18', 'common', '可燃ごみ'], ['18', 'common', '可燃ごみ'], ['19', 'common', '不燃ごみ'], ['19', 'common', '不燃ごみ'], ['20', 'common', '容器包装プラスチック類'], ['20', 'common', '容器包装プラスチック類'], ['21', 'common', '可燃ごみ'], ['21', 'common', '可燃ごみ'], ['22', 'common'], ['23', 'sat'], ['24', 'sun'], ['25', 'common', '可燃ごみ'], ['25', 'common', '可燃ごみ'], ['26', 'common'], ['27', 'common', '資源品', '容器包装プラスチック類'], ['27', 'common', '資源品', '容器包装プラスチック類'], ['27', 'common', '資源品', '容器包装プラスチック類'], ['28', 'common', '可燃ごみ'], ['28', 'common', '可燃ごみ'], ['29', 'common'], ['30', 'sat']]

cal[n][1]の値は、
sat /土曜
sun / 祝祭日
common / 平日
らしい。

fluentdでMySQLにデータを入れる

https://github.com/tagomoris/fluent-plugin-mysqlを使って、fluentd-3.xでmysql 5.7にログを入れる方法のメモ

同時にbigqueryに対してもログを入れているので、@type copyを使う。

<match xxx.yyyy.accesslog>
  @type copy

  # bigquery用
  <store>
    @type             bigquery
    auth_method       json_key
    json_key          PATH/TO/FILE
    project           GCP PROJECT
    dataset           ${tag[0]}
    table             ${tag[1]}_${tag[2]}_%Y%m%d
    auto_create_table true
    schema_path       /etc/td-agent/schema.json

    <buffer tag,time>
      @type file
      path        /var/log/td-agent/buffer/papillon_accesslog
      timekey 1d
      chunk_limit_size 1000000
      queue_limit_length 128
      flush_interval 1
      retry_max_times 17
      retry_wait 1s
    </buffer>
    <inject>
      time_key time
      time_format %Y-%m-%d %H:%M:%S
    </inject>
  </store>

  # MySQL用にTimeをISO8061からDATETIMEに変換する。
  <store>
    @type record_reformer
    output_tag mysql.${tag_suffix[0]}   # tag名に「mysql」を追加
    enable_ruby true     # ruby有効化
    auto_typecast true
    <record>
      time ${require 'time'; Time.parse(record["time"]).strftime("%Y/%m/%d %H:%M:%S")} # TimeをISO8061からDATETIME
    </record>
  </store>
</match>


<match mysql.xxx.yyyy.accesslog>
    @type mysql_bulk
    host 10.254.0.xx
    database TABLE
    username USER
    password PASSWORD
    column_names time,user_id,uri,referer,remote_ip
    key_names time,user_id,uri,referer,remote_ip
    table log
    transaction_isolation_level read_committed    # 2018/5から、デフォルト値がnulになったので、指定しないとトランザクション貼れない。
    flush_interval 1s
</match>

ハマったのは、

transaction_isolation_level read_committed  

の記述の部分。

ここ以外の記述で、td-agentはちゃんと動くが、

2018-06-01 20:30:30 +0900 [warn]: #0 failed to flush the buffer. retry_time=4 next_retry_seconds=2018-06-01 20:30:30 +0900 chunk="56d92e8467c4fab0440db16ee36f0d34" error_class=Mysql2::Error error="You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '' at line 1"

こんなエラーが出る。

mysql側でgeneral_logを有効にするも、

2018-06-01T20:30:46.022528+09:00           14 Connect   logger@fluentd01 on accesslog using TCP/IP
2018-06-01T20:30:46.023467+09:00           14 Query     SHOW COLUMNS FROM log
2018-06-01T20:30:46.024207+09:00           14 Quit

と、全然有用なログじゃないし。

結局、tcpdumpを取得して見たら、分離レベルを指定せずに 「SET SESSION TRANSACTION ISOLATION LEVEL」を投げている事が原因だった。

解決してよかったー