午夜国产狂喷潮在线观看|国产AⅤ精品一区二区久久|中文字幕AV中文字幕|国产看片高清在线

    Python 批量寫入 Elasticsearch 腳本
    來源:易賢網(wǎng) 閱讀:4847 次 日期:2015-05-04 14:38:07
    溫馨提示:易賢網(wǎng)小編為您整理了“Python 批量寫入 Elasticsearch 腳本”,方便廣大網(wǎng)友查閱!

    Elasticsearch 官方和社區(qū)提供了各種各樣的客戶端庫,在之前的博客中,我陸陸續(xù)續(xù)提到和演示過 Perl 的,Javascript 的,Ruby 的。上周寫了一版 Python 的,考慮到好像很難找到現(xiàn)成的示例,如何用 python 批量寫數(shù)據(jù)進 Elasticsearch,今天一并貼上來。

    #!/usr/bin/env pypy

    #coding:utf-8

    import re

    import sys

    import time

    import datetime

    import logging

    from elasticsearch import Elasticsearch

    from elasticsearch import helpers

    from elasticsearch import ConnectionTimeout

    es = Elasticsearch(['192.168.0.2', '192.168.0.3'], sniff_on_start=True, sniff_on_connection_fail=True, max_retries=3, retry_on_timeout=True)

    logging.basicConfig()

    logging.getLogger('elasticsearch').setLevel(logging.WARN)

    logging.getLogger('urllib3').setLevel(logging.WARN)

    def parse_www(logline):

    try:

    time_local, request, http_user_agent, staTus, remote_addr, http_referer, request_time, body_bytes_sent, http_x_forwarded_proto, http_x_forwarded_for, http_host, http_cookie, upstream_response_time = logline.split('`')

    try:

    upstream_response_time = float(upstream_response_time)

    except:

    upstream_response_time = None

    method, uri, verb = request.split(' ')

    arg = {}

    try:

    url_path, url_args = uri.split('?')

    for args in url_args.split('&'):

    k, v = args.split('=')

    arg[k] = v

    except:

    url_path = uri

    # Why %z do not implement?

    date = datetime.datetime.strptime(time_local, '[%d/%b/%Y:%H:%M:%S +0800]')

    ret = {

    "@timestamp": date.strftime('%FT%T+0800'),

    "host": "127.0.0.1",

    "method": method.lstrip('"'),

    "url_path": url_path,

    "url_args": arg,

    "verb": verb.rstrip('"'),

    "http_user_agent": http_user_agent,

    "status": int(staTus),

    "remote_addr": remote_addr.strip('[]'),

    "http_referer": http_referer,

    "request_time": float(request_time),

    "body_bytes_sent": int(body_bytes_sent),

    "http_x_forwarded_proto": http_x_forwarded_proto,

    "http_x_forwarded_for": http_x_forwarded_for,

    "http_host": http_host,

    "http_cookie": http_cookie,

    "upstream_response_time": upstream_response_time

    }

    return {"_index":"logstash-mweibo-www-"+date.strftime('%Y.%m.%d'), "_type":"nginx","_source":ret}

    except:

    return {"_index":"logstash-mweibo-www-"+datetime.datetime.now().strftime('%Y.%m.%d'), "_type":"nginx","_source":{"message":logline}}

    def get_log():

    start_time = time.time()

    log_buffer = []

    while True:

    try:

    line = sys.stdin.readline()

    except:

    break

    if not line:

    helpers.bulk(es, log_buffer)

    del log_buffer[0:len(log_buffer)]

    break

    if line:

    ret = parse_www(line.rstrip())

    log_buffer.append(ret)

    while ( len(log_buffer) > 2000 and len(log_buffer) % 2000 == 0 ):

    try:

    helpers.bulk(es, log_buffer)

    except ConnectionTimeout:

    print("try again")

    continue

    del log_buffer[0:len(log_buffer)]

    break

    else:

    if (time.time() - startime > timeout ):

    helpers.bulk(es, log_buffer)

    start_time = time.time()

    del log_buffer[0:len(log_buffer)]

    time.sleep(1)

    if __name__ == '__main__':

    get_log()

    和 Perl、Ruby 的客戶端不同,Python 的客戶端只支持兩種 transport 方式,urllib3 或者 thrift。也就是說,木有像事件驅(qū)動啊之類的辦法。

    測試一下,這個腳本如果不發(fā)送數(shù)據(jù),一秒處理日志條數(shù)在15k,發(fā)送數(shù)據(jù),一秒只有2k。確實比較讓人失望,于是決定換成 pypy 試試——我司不少日志處理腳本都是用 pypy 運行的。

    服務(wù)器上使用 pypy ,是通過 EPEL 安裝的,之前都只用核心模塊,這次需要安裝 elasticsearch 模塊。所以需要先給 pypy 加上 pip:

    wget

    pypy get-pip.py

    網(wǎng)上大多說之前還要下載一個叫 distribute_setup.py 的腳本來運行,實測不需要,而且這個腳本的下載鏈接也失效了。

    然后通過 pip 安裝 elasticsearch 包即可:

    /usr/lib64/pypy-2.0.2/bin/pip install elasticsearch

    測試,pypy 比 python 處理日志速度快一倍,寫 ES 速度快一半。不過 3300eps 依然很慢就是了。

    測試中碰到的其他問題

    可以看到腳本里已經(jīng)設(shè)置了多次重試和超時重連,不過依然會收到寫入超時和失敗的返回,原來 Elasticsearch 默認對每個 node 做 segment merge 的時候,有磁盤保護措施,速度上限限制在 20MB/s。這在壓測的時候就容易觸發(fā)。

    [2015-01-10 09:41:51,273][INFO ][index.engine.internal ] [node1][logstash-2015.01.10][2] now throttling indexing: numMergesInFlight=6,maxNumMerges=5

    修改配置重啟即可:

    indices.store.throttle.type:merge

    indices.store.throttle.max_bytes_per_sec:500mb

    關(guān)于這個問題,ES 也有討論:Should we lower the default merge IO throttle rate??;蛟S未來會有更靈活的策略。

    更多 ES 性能測試和優(yōu)化建議,參

    更多信息請查看IT技術(shù)專欄

    更多信息請查看技術(shù)文章
    易賢網(wǎng)手機網(wǎng)站地址:Python 批量寫入 Elasticsearch 腳本

    2025國考·省考課程試聽報名

    • 報班類型
    • 姓名
    • 手機號
    • 驗證碼
    關(guān)于我們 | 聯(lián)系我們 | 人才招聘 | 網(wǎng)站聲明 | 網(wǎng)站幫助 | 非正式的簡要咨詢 | 簡要咨詢須知 | 新媒體/短視頻平臺 | 手機站點 | 投訴建議
    工業(yè)和信息化部備案號:滇ICP備2023014141號-1 云南省教育廳備案號:云教ICP備0901021 滇公網(wǎng)安備53010202001879號 人力資源服務(wù)許可證:(云)人服證字(2023)第0102001523號
    聯(lián)系電話:0871-65099533/13759567129 獲取招聘考試信息及咨詢關(guān)注公眾號:hfpxwx
    咨詢QQ:1093837350(9:00—18:00)版權(quán)所有:易賢網(wǎng)