pythonでLINE BOTを作る

作成の記事自体は色々見つかるんだけど、LINE側のWebhockの値が異なっていたりしたので自分メモ
まずはリファレンスをちゃんと読もう!(自戒)

LINE Developers

https://developers.line.biz/ja/ から

BOTユーザーは別に作るので、ここへの登録は個人のLINEアカウントで問題ない

ログイン後、新規プロバイダーを作成する

次にチャンネルを作成する
今回は普通(?)のチャットボットを作るので、Massaging APIを選択する

必要な情報を入力してチャンネルを作成する

作成後に、アプリを作る上で必要な情報は
1. Channel Secret
2. アクセストークン (ロングターム)
の2つ

Webhook送信を有効にし、Webhookを受け入れるURLを入力する(httpsのみ)

botサーバー

nginx + flaskで作りました

OSの環境変数(.bash_profileなど)にLINE_CHANNEL_SECRETとLINE_CHANNEL_ACCESS_TOKENを設定します。
書いた後は反映を忘れずに!

export LINE_CHANNEL_SECRET=xxxxxxxxxx
export LINE_CHANNEL_ACCESS_TOKEN=xxxxxxxxxxxxxxxxxx

nginxのconfigにlocationを追加する

    location ~ /何かしら {
        proxy_pass http://localhost:8000;
    }

flask アプリケーションを作る

#!/usr/bin/env python3.7
import os,json
from flask import Flask, request, make_response
import base64
import hashlib
import hmac

from linebot import (
    LineBotApi, WebhookParser
)
from linebot.exceptions import (
    InvalidSignatureError
)
from linebot.models import (
    MessageEvent, TextMessage, TextSendMessage
)

channel_secret = os.getenv('LINE_CHANNEL_SECRET', None)
channel_access_token = os.getenv('LINE_CHANNEL_ACCESS_TOKEN', None)

if channel_secret is None:
    print('Specify LINE_CHANNEL_SECRET as environment variable.')
    sys.exit(1)
if channel_access_token is None:
    print('Specify LINE_CHANNEL_ACCESS_TOKEN as environment variable.')
    sys.exit(1)

line_bot_api = LineBotApi(channel_access_token)

# Flask
app = Flask(__name__)
@app.route('/line-bot/api',methods=['post'])

def application():
    environ = request.headers
    data = json.loads(request.data)

    # シグネチャ検証(とりあえずマスク)
#    signature = environ['X_LINE_SIGNATURE']
#    hash = hmac.new(channel_secret.encode('utf-8'),
#        data.encode('utf-8'), hashlib.sha256).digest()
#    signature_from_data = base64.b64encode(hash)
#
#    print(signature)
#    print(signature_from_data)
#
    # get request body as text
    content_length = int(environ['CONTENT_LENGTH'])

    # 画像かスタンプを送られた場合
    if 'text' not in data["events"][0]["message"]:
       body = '画像、スタンプは送らないで!'

    # 教えて or おしえてと言われたら自分で探せと返す
    elif 'おしえ' in data["events"][0]["message"]["text"] or '教え' in data["events"][0]["message"]["text"]:
       body = 'オッケーグーグル!\nhttps://www.google.com/'

    # 感謝の言葉を言われたら嬉しいよね
    elif 'ありがとう' in data["events"][0]["message"]["text"]:
       body = 'お役に立てて何よりです'

    # それ以外は、とりあえずオウム返し
    else:
       body = data["events"][0]["message"]["text"]

    for i in data["events"]:
        print(i)
        line_bot_api.reply_message(
            i["replyToken"],
            TextSendMessage(text=body)
        )

    return make_response('ok'), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)

作成後、flaskアプリケーションを起動する

スマホでLINEを起動し、LINEDeveloperのチャンネルに表示されているQRコードを読み込んで友達追加する
(確認していたら、娘からスマホを取り上げられて遊ばれた…)

python3.7のソースインストール

cd /usr/local/src/

wget https://www.python.org/ftp/python/3.7.1/Python-3.7.1.tgz

tar zxvf Python-3.7.1.tgz

cd /usr/local/src/Python-3.7.1

yum groupinstall 'Development Tools'

# ncurses-develは、bpythonを使わないなら不要
yum install openssl-devel libffi-devel ncurses-devel

./configure prefix=/usr/local/python3.7;make;make install

Gsuiteユーザーのメール容量をAPIから取得する

これはAdventure Advent Calendar 2018の8日目の記事です。

ユーザーとしてAPIを使って、Gmailのメールを方法はいくつか見つけましたが、管理者視点での操作をしている情報が少なかったので、備忘録として残します。

特定のユーザーがメールが一杯になってしまうと、送信側であるお客様や取引先に迷惑が掛かるので、Gsuiteの管理者の視点として
GsuiteユーザーのGmailの容量(標準だと30GB)の利用統計を取りたい。
というのがあると思います。

ですが、Thunderbirdなどのメーラーを使ってGmailを使っている人は、今の容量を知る術がありません。

そこで、日時でGsuiteの全ユーザーの情報を取得するスククリプトを書いて、日時で回して運用する事にしました。

作成したコードは以下になります。

※事前に、
 ・ Gsuite APIの有効化(https://support.google.com/a/answer/60757?hl=ja)
 ・ OAUTHを使うユーザーのGsuite内権限の割り当て(https://support.google.com/a/answer/142566)
・ OAUTH認証設定(https://console.cloud.google.com/apis/credentials/consent?)
を行なって下さい。

コード

#!/bin/env python3/7

import os
from datetime import datetime, date, timedelta
import httplib2
from apiclient import discovery
from oauth2client import client, tools
from oauth2client.file import Storage
from bigquery import get_client
import requests

in_date = datetime.today()
in_date = datetime.strftime(in_date - timedelta(hours=48), '%Y-%m-%d')

def main():
    try:
        import argparse
        parser = argparse.ArgumentParser(parents=[tools.argparser])
        flags = parser.parse_args()
    except ImportError:
        flags = None

    # 認証情報を格納するディレクトリ「.credentials」の設定。ディレクトリが無い場合は作成
    credential_dir = os.path.join(os.path.expanduser('~'), '.credentials')
    if not os.path.exists(credential_dir):
        os.makedirs(credential_dir)

    # 認証ファイルのパスを設定と読み込み
    credential_path = os.path.join(credential_dir, 'admin-directory_v1.json')
    store = Storage(credential_path)
    credentials = store.get()

    # 認証ファイルが無い場合は作成
    if not credentials or credentials.invalid:

        # 使用する機能の範囲を設定
        scopes = [
            'https://www.googleapis.com/auth/admin.reports.usage.readonly',
            'https://www.googleapis.com/auth/admin.reports.audit.readonly'
        ]

        # 認証キーの設定(~/.ssh/gsuite-admin.json)
        secret_key = os.path.join(os.path.expanduser('~'), '.ssh', 'gsuite-admin.json')

        # 認証キーから認証処理を行うクラスのインスタンスを生成
        flow = client.flow_from_clientsecrets(secret_key, scopes)

        # アプリケーションの名前
        flow.user_agent = 'Gmail Check'

        if flags:
                credentials = tools.run_flow(flow, store, flags)
        else:
            credentials = tools.run(flow, store)
        print('証明書を保存しました: ' + credential_path)

    # 認証を行う
    http = credentials.authorize(httplib2.Http())
    app_report_service = discovery.build('admin', 'reports_v1', http=http)

    # GsuiteのAPIを実行
    results = app_report_service.userUsageReport().get(userKey='all',date=in_date).execute()

    for i in extraction(results) :
        print(i)

#
# GSuiteのAPI結果から必要な情報を抜く
#
def extraction(results):
    recodes = []
    for i in results['usageReports']:
        recode = {}

        # レポート日
        recode["day"] = in_date

        # メールアドレス
        recode["email"] = i['entity']['userEmail']

        for l in i['parameters']:
            # 最終アクセス日時
            if 'gmail:last_access_time' in l["name"]:
                last_access = datetime.strptime(l["datetimeValue"], '%Y-%m-%dT%H:%M:%S.000Z')
                last_access = datetime.strftime(last_access - timedelta(hours=9), '%Y-%m-%d %H:%M:%S')
                recode["last_access_time"] = last_access

            # 氏名
            elif "accounts:admin_set_name" in l["name"]:
                recode["name"] = l['stringValue']

            # アカウントに割り当てられた容量(Gmailだけの値は無さそう)
            elif "accounts:total_quota_in_mb" in l["name"]:
                recode["total_mb"] = l['intValue']

            # gmailの使用量
            elif "accounts:gmail_used_quota_in_mb" in l["name"]:
                recode["usage_mb"] = l['intValue']

            # アカウントの使用量(パーセンテージ)
            elif "accounts:used_quota_in_percentage" in l["name"]:
                recode["usage_percent"] = l['intValue']

        recodes.append(recode)

    return recodes

結果

{'day': '2018-12-05', 'email': 'メールアドレス1', 'name': '名前1', 'usage_mb': '13793', 'total_mb': '30720', 'usage_percent': '44', 'last_access_time': '2018-12-05 22:59:14'}
{'day': '2018-12-05', 'email': 'メールアドレス2', 'name': '名前2', 'usage_mb': '1550', 'total_mb': '30720', 'usage_percent': '5', 'last_access_time': '2018-12-05 22:59:12'}
{'day': '2018-12-05', 'email': 'メールアドレス3', 'name': '名前3', 'usage_mb': '7107', 'total_mb': '30720', 'usage_percent': '23', 'last_access_time': '2018-12-05 22:59:13'}
{'day': '2018-12-05', 'email': 'メールアドレス4', 'name': '名前4', 'usage_mb': '6211', 'total_mb': '30720', 'usage_percent': '20', 'last_access_time': '2018-12-05 22:59:14'}
{'day': '2018-12-05', 'email': 'メールアドレス5', 'name': '名前5', 'usage_mb': '30202', 'total_mb': '30720', 'usage_percent': '98', 'last_access_time': '2018-12-05 22:59:45'}

注意

in_date = datetime.today()
in_date = datetime.strftime(in_date - timedelta(hours=48), '%Y-%m-%d')

取得対象日時の指定です。
Gsuiteのレポートは、リアルタイム性はありません。
12/8に前日分の情報を取得しようとすると、スクリプトを回すと以下のように怒られます。

googleapiclient.errors.HttpError: <HttpError 400 when requesting https://www.googleapis.com/admin/reports/v1/usage/users/all/dates/2018-12-07?alt=json returned "Data for dates later than 2018-12-06 is not yet available. Please check back later">

メールだけの正確なチェックではなありませんが、usage_percentが高いユーザーに連絡を行い、事前にデータの整理を行なってもらうように促せます。

Mac+python3.7でSSL: CERTIFICATE_VERIFY_FAILEDが発生する場合の対応

BeautifulSoup4を使った時に発生したエラー

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/urllib/request.py", line 1317, in do_open
    encode_chunked=req.has_header('Transfer-encoding'))
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/http/client.py", line 1229, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/http/client.py", line 1275, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/http/client.py", line 1224, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/http/client.py", line 1016, in _send_output
    self.send(msg)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/http/client.py", line 956, in send
    self.connect()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/http/client.py", line 1392, in connect
    server_hostname=server_hostname)
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/ssl.py", line 412, in wrap_socket
    session=session
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/ssl.py", line 853, in _create
    self.do_handshake()
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/ssl.py", line 1117, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1051)

During handling of the above exception, another exception occurred:

解決策:

$ /Applications/Python\ 3.7/Install\ Certificates.command

参考:
https://qiita.com/orangain/items/0a641d980019fd7e0c52

grafanaでワールドマップを利用する


grafanaでworld map プラグインを使ってみたので備忘録

マッピングデータは、MaxMaindさんが提供してくれているものを使います。
https://dev.maxmind.com/ja/geolite2/

ElasticSearchを使う方法が一般的みたいですが、ざっくり国別データを集計したいだけなので、MySQLで行きます。
また、アクセスログもInfluxDBではなくMySQLです。

【環境】
python3.4
Mysql 5.7
maxminddb 1.4.1

まずはモジュールインストールと、マッピングデータの取得

pip3 install maxminddb
wget http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.mmdb.gz -P /usr/local/src

MySQLにテーブルを作る

CREATE TABLE `geo2` (
  `ip_address` varchar(15) DEFAULT NULL,
  `country_name` varchar(50) DEFAULT NULL,
  UNIQUE KEY `ip_address` (`ip_address`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

pythonスクリプトを作る。
vim /usr/local/bin/insert_get2.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import MySQLdb
import os
import maxminddb
import datetime

DBFILE = '/usr/local/src/GeoLite2-City.mmdb'
geo2db = maxminddb.open_database(DBFILE)

# 直近1時間前のログからデータ集計を行う
todaydetail = datetime.datetime.today()
starttime = (todaydetail-datetime.timedelta(hours=1)).strftime("%Y-%m-%d %H:00")
endtime   = todaydetail.strftime("%Y-%m-%d %H:00")

def geo2(ips):
  val_list = []
  for ip in ips:
    try:
      location = geo2db.get(ip)
      # 国
      country = location["registered_country"]["iso_code"]

      # 経度と緯度も取得出来るけど今回は使わない
      # 緯度
      # latitude = location['location']['latitude']
      # 経度
      # longitude = location['location']['longitude']

      val_list.append([ip,country])
    except:
      pass

  return val_list

def db_select():
  con = MySQLdb.connect(
    host='localhost',
    db='log',
    user='DBユーザー',
    passwd='パスワード',
    charset='utf8'
  )

  cur = con.cursor()
  sql = "select distinct remote_ip from log where time between '{start}' and '{end}' and remote_ip != '::1'".format(start=starttime,end=endtime)

  result = cur.execute(sql)
  ips = []
  for ip in cur:
    ips.append(ip[0])

  cur.close()
  con.close()
  return ips


def db_bulk_insert(rows):
  con = MySQLdb.connect(
    host='localhost',
    db='log',
    user='DBユーザー',
    passwd='パスワード',
    charset='utf8'
  )

  cur = con.cursor()
  sql = "INSERT IGNORE geo2(ip_address,country_name) VALUES(%s, %s)"
  cur.executemany(sql,rows)
  con.commit()
  cur.close()
  con.close()

ips  = db_select()
rows = geo2(ips)
db_bulk_insert(rows)

あとは、/usr/local/bin/insert_get2.pyを1時間毎に実行する。

GCEインスタンスでpostgresqlをフェイルオーバーさせる

GCEでは、単一インスタンスに同一ネットワークのIPアドレスを複数持たせる事ができない。
複数のネットワーク インターフェースの概要と例

つまり、Virtual IPを同一セグメントに持たせた構成は出来ない。

クラスタを組みたかったけど、今回の要件としては

1. Active/Standbyの構成にする
2. Activeのpostgresが止まった場合、Standbyを昇格する
3. フェイルバックは行わない。
4. フェイルオーバー後、App側で検知。
   必要な設定変更後、動的にサービスの再起動を行う。
5. 1分以内にサービスが復旧される。

とシンプルな構成にする。

PostgreSQL

要件1、2には、pg_keeperを使う。
クラスタでは無いのでスプリットブレインの検知などは出来ない。
その為、アプリケーション側に今のActive機の情報を教える必要がある。

スプリットブレイン対策用のDB、Table作成

postgres=#  create database pg_state;
postgres=#  create table failover_log (unixtime int, host varchar(10));

インストール(Actice/Standby共に)

cd /usr/local/src

git clone https://github.com/MasahikoSawada/pg_keeper.git

export PATH=/usr/pgsql-9.6/bin/:$PATH

make USE_PGXS=1

make USE_PGXS=1 install

postgresql.conf書き換え(Actice/Standby共に)

vim postgresql.conf

shared_preload_libraries = 'pg_keeper'
pg_keeper.my_conninfo = 'host=10.0.0.10 port=5432 dbname=postgres'
pg_keeper.partner_conninfo = 'host=10.0.0.11 port=5432 dbname=postgres'
pg_keeper.keepalive_time = 2
pg_keeper.keepalive_count = 3
pg_keeper.after_command = 'sleep 1 ; psql -d pg_state -c "insert into failover_log values(`date +%s`, \'`hostname`\');" -x'

Activeが止まった場合、pg_keeper.keepalive_time秒 × pg_keeper.keepalive_count回 チェックを行い、全てNGの場合にフェイルオーバーを実行し、最後にpg_keeper.after_commandの内容が実行される。
今回は、フェイルオーバー後に[unixtimestamp, hostname]を pg_state.failover_logに入れている。

app側

これはアプリケーションのよるので参考まで。
monitor_master_db.pyというモニタリングスクリプトを作成し、root権限で動かす事にした。
動きとしては、Active/Standby両機のDBのpg_state.failover_logをチェックし、タイムスタンプが若い方をDB接続先として、設定ファイル(yaml)を書き換えデーモンの再起動を行う。

#!/bin/env python3

import os,sys
import yaml
import psycopg2
import codecs
import subprocess

yaml_file = '/PATH/TO/env.yaml'
dbs = ['postgresql://postgres@db01:5432/pg_state'
         ,'postgresql://postgres@db02:5432/pg_state']

def get_item():
    arr = []
    for db in dbs :
        try:
            dbcon = psycopg2.connect(db)
            cur = dbcon.cursor()
            cur.execute('select * from failover_log order by unixtime desc limit 1')
            result = cur.fetchone()
            cur.close()
            dbcon.close()
            arr.append(result)
        except :
            pass
    if len(dbs) == len(arr):    # Active/Standby共にデータ取得成功
        if arr[0][0] > arr[1][0]:
            return arr[0][1]
        else :
            return arr[1][1]

    else :                             # 片系が停止している
        return arr[0][1]


def overwrite(db_name):
    with codecs.open(yaml_file, 'r', 'utf-8') as read :
        env_dict = yaml.load(read)

        if env_dict['db_master'][0]['address'] != '{}:5432'.format(db_name) or env_dict['db_slave'][0]['address'] != '{}:5432'.format(db_name):
            env_dict['db_master'][0]['address'] = '{}:5432'.format(db_name)
            env_dict['db_slave'][0]['address'] = '{}:5432'.format(db_name)

            with codecs.open(yaml_file, 'w', 'utf-8') as write :
                yaml.dump(env_dict, write, encoding='utf8', allow_unicode=True, default_flow_style=False)

            try:
                subprocess.check_call(["systemctl", "restart", "デーモン"])
            except :
                pass

作成したmonitor_master_db.pyをcronで動かす。
cronは普通に書くと1分が最小の実行単位だが、以下のように書くと5秒単位でスクリプトを実行してくれる。

# 5秒間隔
* * * * * for i in `seq 1 12`;do sleep 5; python3 /usr/local/bin/monitor_master_db.py; done

# 10秒間隔の場合
* * * * * for i in `seq 1 6`;do sleep 10; python3 /usr/local/bin/monitor_master_db.py; done

この状態で、Active側のDBを落として、フェイルオーバーされApp側の接続先も変更される事を確認する。
Slave側が昇格前にfailover_logへのinsertが実行される場合、pg_keeper.after_commandのsleepを大きくする。

pg_keeper.after_command = 'sleep 5 ; psql -d pg_state -c "insert into failover_log values(`date +%s`, \'`hostname`\');" -x'

「さんあ~る」からゴミ捨てカレンダーをスクレイピングする

4月に千葉県柏市のゴミ捨てアプリ「さんあ〜る」が、Web版としてリリースされました。

ごみ分別アプリ「さんあ~る」がインターネットでも利用できます!

さんあ〜るのごみカレンダーをiframeとしてWPに引っ張ってこようと思ったけど拡縮の問題があり挫折。
あと、画像がいらないな、と思ったのでpython3でスクレイピングしてみた。

# coding: UTF-8
import datetime
import urllib.request
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
from bs4 import BeautifulSoup

today = datetime.date.today()
year = today.strftime("%Y")
month = today.strftime("%m")

# アクセスするURL
# XXXXXは地区番号らしい
url = 'https://manage.delight-system.com/threeR/web/calendar?jichitaiId=kashiwashi&areaId=22125&year={year}&month={month}'.format(year=year, month=month)

html = urllib.request.urlopen(url)
soup = BeautifulSoup(html, "html.parser")

table = soup.find_all("table")
cal = []
for tag in table :
    try:
        #import pdb; pdb.set_trace()
        days = tag.find_all("td")  # カレンダーのセル数(7日×週)
        for day in days:
            span = day.find_all('span')
            tmp = []
            if len(span) > 0  :
                for num in range(len(span)):
                    try:
                        if span[num].get("class")[0] in ('common','sat','sun'): 
                            tmp.append(span[0].string)           # 日
                            tmp.append(span[0].get('class')[0])  # 曜

                        elif span[num].get("class")[0] in 'trash_kind_name': # ゴミ種類
                            tmp.append(span[num].string)
                        cal.append(tmp)
                    except:
                        pass
            else : # 空白セル
                cal.append([''])
    except:
        pass

print(cal)

後で、オリジナルのカレンダーの情報に充てがう予定なのでlistに入れる。

結果

[[''], [''], [''], [''], [''], ['1', 'common'], ['2', 'sat'], ['3', 'sun'], ['4', 'common', '可燃ごみ'], ['4', 'common', '可燃ごみ'], ['5', 'common', '不燃ごみ'], ['5', 'common', '不燃ごみ'], ['6', 'common', '容器包装プラスチック類'], ['6', 'common', '容器包装プラスチック類'], ['7', 'common', '可燃ごみ'], ['7', 'common', '可燃ごみ'], ['8', 'common'], ['9', 'sat'], ['10', 'sun'], ['11', 'common', '可燃ごみ'], ['11', 'common', '可燃ごみ'], ['12', 'common'], ['13', 'common', '資源品', '容器包装プラスチック類'], ['13', 'common', '資源品', '容器包装プラスチック類'], ['13', 'common', '資源品', '容器包装プラスチック類'], ['14', 'common', '可燃ごみ'], ['14', 'common', '可燃ごみ'], ['15', 'common'], ['16', 'sat'], ['17', 'sun'], ['18', 'common', '可燃ごみ'], ['18', 'common', '可燃ごみ'], ['19', 'common', '不燃ごみ'], ['19', 'common', '不燃ごみ'], ['20', 'common', '容器包装プラスチック類'], ['20', 'common', '容器包装プラスチック類'], ['21', 'common', '可燃ごみ'], ['21', 'common', '可燃ごみ'], ['22', 'common'], ['23', 'sat'], ['24', 'sun'], ['25', 'common', '可燃ごみ'], ['25', 'common', '可燃ごみ'], ['26', 'common'], ['27', 'common', '資源品', '容器包装プラスチック類'], ['27', 'common', '資源品', '容器包装プラスチック類'], ['27', 'common', '資源品', '容器包装プラスチック類'], ['28', 'common', '可燃ごみ'], ['28', 'common', '可燃ごみ'], ['29', 'common'], ['30', 'sat']]

cal[n][1]の値は、
sat /土曜
sun / 祝祭日
common / 平日
らしい。

randomモジュールを使う

pythonのramdomモジュールを使って、20人を超える飲み会の座席を固めてみる。

ramdomモジュールは、loop内で使っても、n個 – 1とかには出来ないらしく、10枚のカードから1枚選んで、また10枚の中に戻す。と言った動きになる。

member.yml

※氏名は平成21年生まれの名前ランキングから抜粋

[code]
member:
– "佐藤 優衣"
– "田中 美羽"
– "佐藤 花音"
– "鈴木 大翔"
– "吉田 悠人"
– "田中 優衣"
– "高橋 美結"
– "佐藤 愛莉"
– "鈴木 杏奈"
– "吉田 百花"
– "佐藤 翔太"
– "山口 結愛"
– "田中 大翔"
– "伊藤 悠真"
– "加藤 颯真"
– "佐藤 優成"
– "森 悠真"
– "中村 優希"
– "田中 悠翔"
– "鈴木 悠真"

table:
– A-1
– A-2
– A-3
– A-4
– A-5
– A-6
– B-1
– B-2
– B-3
– B-4
– B-5
– B-6
– C-1
– C-2
– C-3
– C-4
– C-5
– C-6
– C-7
– C-8
[/code]

lottery.py
[code]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import yaml
import random

base = os.path.dirname(os.path.abspath(__file__))
input = (os.path.join(base,’member.yml’))

f = open(imput, ‘r’)
result = yaml.load(f)
f.close()

members= (random.sample(source[‘member’],len(source[‘member’])))
tables = (source[‘table’])

for (t,m) in zip(tables,members):
print(t + ‘\t’ + m)

[/code]

[code]
(random.sample(source[‘member’],len(source[‘member’])))
[/code]

がramdomモジュールを使っている箇所で、
random.sample(配列,何個取るか?)と使う。

全メンバーを1度のみ抽出するので、len(source[‘member’])としている。

MySQL+EmbulkでBQにデータを送ってみる

MySQLのデータをBigqueryに転送する。

固定のテーブルを送信する方法は
https://qiita.com/tashiro_gaku/items/f7fa0f1a99c759d947a7
とかに書いているけどけど、やりたかったのは

1) ログ保存テーブルは、updateカラムを見て差分でBigqueryに送信する。
2) 一部のテーブルは、全レコードBigqueryに送信する。
3) 上記、1、2の対象テーブルはconfファイルを読んで動的に動かしたい
(出来れば運用側で勝手に対象テーブルの追加、削除をやって欲しい。。。)

Github

MakeJSON.py         # jsonとyamlを生成
Execue_embulk.py    # Embulk転送の実行
conf/embulk_json/   # 対象テーブルを元にBQ転送時の定義ファイルが作成される
     embulk_yaml/   # 対象テーブルを元にEmbulk実行時の定義ファイルが作成される
     keys/          # BQ接続時のjson_keyを置く
     conf.yaml      # MySQLのログイン情報、対象テーブルの情報を定義
     embulk_tmp     # Embulkのyaml生成で使うテンプレート

使い方
MakeJSON.pyの実行はいつでもいいがExecue_embulk.pyは前日分も送信するので、日付が変わってから実行する。

0 23 * * * python3 MakeJSON.py
30 0 * * * python3 Execue_embulk.py