strftimeの%aで曜日を出力するのはやめた

localeを使ってみたりいろいろと試してみたけれど、無理にstrftimeの%aで曜日を漢字で出力するのは今ひとつだった。

ディストリビューションによって挙動が変わったり、OSのメジャーアップデートで文字化けしてみたり。それぞれに事情はいろいろあって、単にバグだったり、必要なパッケージが変わってたりと、まあ仕方ないかなあと思うことも少なくない。

ああでもないこうでもないと、忘れてしまっても問題ない方法を考えてみた。これでいいのだ。

#!/usr/bin/env python
# coding: utf-8
import datetime
WEEKDAY = ('月','火','水','木','金','土','日')
print(WEEKDAY[datetime.datetime.today().weekday()])

続きを読む strftimeの%aで曜日を出力するのはやめた

Simple HTTP CheckerにTCPタイムアウト値を追加

Simple HTTP Checker – シンプルなHTTPサーバの監視ツールで公開しているツールですが、手元の環境では随分前にTCPタイムアウトの設定ができるように変更して運用しています。

ちょっと問い合わせをいただいて思い出したのでGist:343248に反映させました。なおタイムアウト値(設定変数名=tcp_timeout)は1未満にはできません。1以上の整数を指定してください。

[test_HEAD]
url: http://example.com/

[test_GET]
url: http://example.com/
method: GET

[test_POST]
url: http://example.com/
method: POST
data: Hello World

[test_notfound]
url: http://example.com/notfound.html
notify_interval: 10


[DEFAULT]
#************************************************
;  DEFAULT values
#************************************************

#Target URL.
#url=http://example.com/

#HTTP method
method: HEAD

#POST data
#data: Hello World

#When HTTP error occuered, repeatedly notified with interval seconds.
notify_interval: 600

#after checking, wait bellow seconds.
wait_seconds: 0

#tcp connection timeout seconds.
tcp_timeout = 10
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""#8 Simple HTTP Checker
  Usage:
       htpchk (htpchk.conf is needed at same directory.)
       htpchk URL
       htpchk config-file
"""
__NAME__ = '#8 Simple HTTP Checker'
__VERSION__ = '1.2'
__ABOUT__ = 'http://jinim.jp/archives/2136'
__USER_AGENT__ = 'Mozilla/5.0 (compatible; %s/%s; +%s)' % (__NAME__, __VERSION__, __ABOUT__)
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"

import os
import sys
import time
import socket
import urllib2
from urllib2 import URLError, HTTPError
from ssl import SSLError
from socket import timeout as SocketTimeout
import urlparse
import tempfile
from ConfigParser import ConfigParser

#url to filename with sha hash.
try:
    import hashlib
    def _urlhash_filename(url): return hashlib.sha224(url).hexdigest()
except:
    import sha
    def _urlhash_filename(url): return sha.new(url).hexdigest()

opener = urllib2.build_opener()
opener.addheaders = [('User-agent', __USER_AGENT__)]
urllib2.install_opener(opener)


class ParamError(Exception): pass


class HeadRequest(urllib2.Request):
    def get_method(self): return "HEAD"


class Site(object):
    def __init__(self, name, url, notify_interval=60*10, method="HEAD", data=None, wait_seconds=0, tcp_timeout=3):
        """When medhod is POST, data is requred."""
        self.name = name
        self.url = url
        self.method = method
        self.data = data
        self.notify_interval = notify_interval
        self.wait_seconds = wait_seconds
    #minimum timeout is 1sec.
        if tcp_timeout<1: tcp_timeout = 1        self.tcp_timeout = tcp_timeout        self.notified = os.path.join(tempfile.gettempdir(), "htpcheck-%s" % _urlhash_filename(self.url))

 def check(self):
 socket.setdefaulttimeout(self.tcp_timeout)
 if self.method=="HEAD":            u = urllib2.urlopen(HeadRequest(self.url))        elif> (last_notified + self.notify_interval):
            sys.stderr.write(msg)
            file(self.notified, 'a').write("%s\tnotified\n" % msg[:-1])
        else:
             sys.stdout.write(log_message(self, "[SUPPRESSED] %s" % err))


def log_message(site, msg=None):
    """if msg is not None, message means error.
    """
    if msg:
        format = '%s\tNG\t%s\t%s\t%s\t%s\n'
        return format % (time.strftime(DATETIME_FORMAT), site.name, site.method, site.url, msg)
    else:
        format = '%s\tOK\t%s\t%s\t%s\n'
        return format % (time.strftime(DATETIME_FORMAT), site.name, site.method, site.url)


def config_parse(conf):
    parser = ConfigParser()
    parser.readfp(open(conf))
    sites = list()
    for section in parser.sections():
        url = parser.get(section, 'url')
        method = parser.get(section, 'method')
        if method=="POST":
            data = parser.get(section, 'data')
        else:
            data = None
        notify_interval = parser.getfloat(section, 'notify_interval')
        wait_seconds = parser.getfloat(section, 'wait_seconds')
        tcp_timeout = parser.getint(section, 'tcp_timeout')
        sites.append(Site(section, url, notify_interval, method, data, wait_seconds, tcp_timeout))
    return sites


def main(sites):
    for site in sites:
        try:
            site.check()
            sys.stdout.write(log_message(site))
            if os.path.exists(site.notified):
                site.recovered()
        except (SocketTimeout, SSLError, URLError, HTTPError), err:
            site.notify(err)
        if site.wait_seconds>0: time.sleep(site.wait_seconds)


if __name__=='__main__':
    if len(sys.argv)>1: config = sys.argv[1]
    else: config = os.path.splitext(__file__)[0]+'.conf'

    if not os.path.exists(config):
        if config.startswith('http://') or config.startswith('https://'):
            name = urlparse.urlparse(config)[1].split(':')[0]
            sites = (Site(name, config), )
        else:
            raise ParamError, "config file %s not found." % config
    else:
        sites = config_parse(config)

    main(sites)

Simple HTTP Checker – シンプルなHTTPサーバの監視ツール

WEBサイトの死活監視のサービスはいろいろあるけれど、どれも監視できるURL数に制限があったり、制限を解除してもらうには費用がかかるわけなのだけれど求める以上の機能があってちょっと高くついたり。なかなかピッタリこない。

仕方がない、さくらのレンタルサーバが1つあるのでそこで監視するようにしてみよう。社内のサーバ監視をしているNagiosは便利なんだけれど、さくらの500円/月のレンタルサーバに入れるのはちょっとアレだし、必要最小限のスクリプトをPythonで書くことにする。

求めた要件としては、

  • HTTPで接続できなかったらメールでお知らせが届く。
  • 一回メールを送ったら、しばらくはメールしないで欲しい。
  • HEADとGETとPOSTに対応。
  • HTTPステータスコードだけをチェックする(レスポンスに含まれるコンテンツのチェックは不要)。
  • 設定ファイルで複数のURLをまとめてチェックして欲しい。
  • (さくらなので)常駐するデーモンではなくて、単独のコマンドで実行できる(cronで繰り返し実行)。
  • お知らせメールはcrontabのMAILTOで送るからstderrに出力してくれればいい。
  • OKだったときのログはstdoutに出力してくれればいい。
  • スクリプトは1ファイルで完結させる。
  • Python2.4以降、標準ライブラリだけで動く。


使い方は、
1. python htpchk.py URL
とURLを引数に指定する方法。これだと細かい設定はできませんが、HEADでチェック、お知らせメールは多くて10分に1回、となります。

2. python htpchk.py htpchk.conf
と引数に設定ファイルを指定する(あるいは引数指定を省略して、htpchk.pyと同じディレクトリにhtpchk.confという設定ファイルを置いておく)と、複数のURLを一括してチェックできるようになります。
設定ファイルは、Windowsのiniファイルに良く似た書き方です。DEFAULTセクションは全セクションのデフォルト値として使われます。あとはチェックするURL毎にセクションを作ってurlパラメータを書いておくだけで1と同様にチェックはできます。セクション毎にDEFAULTセクションの値を上書きできるので、細かい変更がしたい場合はセクションに値を書きます。詳しい設定ファイルの書き方は後の方のサンプルをご覧ください。

cronにこのスクリプトを実行するように登録しておく(さくらのレンタルサーバではこんな感じ)。
/home/example/bin/にhtpchk.pyとhtpchk.confを置いたとするとこんな感じ。

MAILTO=alert@example.com, mobile@example.com
# NAME: htpchck
*/5     *       *       *       *       /usr/local/bin/python /home/example/bin/htpchk.py >/dev/null

これで、5分に一回htpchk.confに書いてあるサイトをチェックして、HTTPステータスが200じゃなければ、標準エラーにメッセージが出力されて、そのメッセージはMAILTOに書いたメールアドレスに送られます。標準出力は/dev/nullに向けてあるので捨てられます。

手元では47のURLをチェックする設定ファイルを書いて運用中。概ね良好なんだけれど、まだチェックするURLを増やしたいのでもう少しチェックにかかる時間を短縮したい。HTTPのリクエストを送る部分をスレッドにして同時実行すれば速くなるのはわかっているので、困ったらスレッド化するかも。

以下はスクリプト本体(htpchk.py)と設定ファイル(htpchk.conf)のサンプルです。最新版はGist:343248に置いてあります。

htpchk.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""#8 Simple HTTP Checker
  Usage:
       htpchk (htpchk.conf is needed at same directory.)
       htpchk URL
       htpchk config-file
"""
__NAME__ = '#8 Simple HTTP Checker'
__VERSION__ = '1.0'
__ABOUT__ = 'http://jinim.jp/archives/2136'
__USER_AGENT__ = 'Mozilla/5.0 (compatible; %s/%s; +%s)' % (__NAME__, __VERSION__, __ABOUT__)
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"

import os
import sys
import time
import urllib2
from urllib2 import HTTPError
import urlparse
import tempfile
from ConfigParser import ConfigParser

#url to filename with sha hash.
try:
    import hashlib
    def _urlhash_filename(url): return hashlib.sha224(url).hexdigest()
except:
    import sha
    def _urlhash_filename(url): return sha.new(url).hexdigest()

opener = urllib2.build_opener()
opener.addheaders = [('User-agent', __USER_AGENT__)]
urllib2.install_opener(opener)


class ParamError(Exception): pass


class HeadRequest(urllib2.Request):
    def get_method(self): return "HEAD"


class Site(object):
    def __init__(self, name, url, notify_interval=60*10, method="HEAD", data=None, wait_seconds=0):
        """When medhod is POST, data is requred."""
        self.name = name
        self.url = url
        self.method = method
        self.data = data
        self.notify_interval = notify_interval
        self.wait_seconds = wait_seconds
        self.notified = os.path.join(tempfile.gettempdir(), "htpcheck-%s" % _urlhash_filename(self.url))

    def check(self):
        if self.method=="HEAD":
            u = urllib2.urlopen(HeadRequest(self.url))
        elif self.method=="POST":
            u = urllib2.urlopen(self.url, data=self.data)
        elif self.method=="GET":
            u = urllib2.urlopen(self.url)
        else:
            raise ParamError, "method: %s not supported. Supported methods are (HEAD,GET,POST)." % self.method
        return u.info()

    def recovered(self):
        os.rename(self.notified, os.path.join(os.path.dirname(self.notified), os.path.basename(self.notified) + time.strftime("-recovered.%Y%m%d%H%M%S")))

    def notify(self, err):
        msg = log_message(self, err)
        last_notified = 0
        if os.path.exists(self.notified):
            try:
                last_notified = time.mktime(time.strptime(file(self.notified).readlines()[-1].split("\t")[0], DATETIME_FORMAT))
            except ValueError:
                last_notified = 0
        if time.time() > (last_notified + self.notify_interval):
            sys.stderr.write(msg)
            file(self.notified, 'a').write("%s\tnotified\n" % msg[:-1])
        else:
             sys.stdout.write(log_message(self, "[SUPPRESSED] %s" % err))


def log_message(site, msg=None):
    """if msg is not None, message means error.
    """
    if msg:
        format = '%s\tNG\t%s\t%s\t%s\t%s\n'
        return format % (time.strftime(DATETIME_FORMAT), site.name, site.method, site.url, msg)
    else:
        format = '%s\tOK\t%s\t%s\t%s\n'
        return format % (time.strftime(DATETIME_FORMAT), site.name, site.method, site.url)


def config_parse(conf):
    parser = ConfigParser()
    parser.readfp(open(conf))
    sites = list()
    for section in parser.sections():
        url = parser.get(section, 'url')
        method = parser.get(section, 'method')
        if method=="POST":
            data = parser.get(section, 'data')
        else:
            data = None
        notify_interval = parser.getfloat(section, 'notify_interval')
        wait_seconds = parser.getfloat(section, 'wait_seconds')
        sites.append(Site(section, url, notify_interval, method, data, wait_seconds))
    return sites


def main(sites):
    for site in sites:
        try:
            site.check()
            sys.stdout.write(log_message(site))
            if os.path.exists(site.notified):
                site.recovered()
        except HTTPError, err:
            site.notify(err)
        if site.wait_seconds>0: time.sleep(site.wait_seconds)


if __name__=='__main__':
    if len(sys.argv)>1: config = sys.argv[1]
    else: config = os.path.splitext(__file__)[0]+'.conf'

    if not os.path.exists(config):
        if config.startswith('http://') or config.startswith('https://'):
            name = urlparse.urlparse(config)[1].split(':')[0]
            sites = (Site(name, config), )
        else:
            raise ParamError, "config file %s not found." % config
    else:
        sites = config_parse(config)

    main(sites)

htpchk.conf

[test_HEAD]
url: http://example.com/

[test_GET]
url: http://example.com/
method: GET

[test_POST]
url: http://example.com/
method: POST
data: Hello World

[test_notfound]
url: http://example.com/notfound.html
notify_interval: 10


[DEFAULT]
#************************************************
;  DEFAULT values
#************************************************

#Target URL.
#url=http://example.com/

#HTTP method
method: HEAD

#POST data
#data: Hello World

#When HTTP error occuered, repeatedly notified with interval seconds.
notify_interval: 600

#after checking, wait bellow seconds.
wait_seconds: 0

Related posts:

  1. urllib2でプロキシを参照しないようにする
  2. urllib2.quote()ってアリなんだ
  3. Flickrから自分のアップロードした写真を全部ダウンロードする