国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

日志平臺(tái)(網(wǎng)關(guān)層) - 基于Openresty+ELKF+Kafka

xumenger / 2987人閱讀

摘要:現(xiàn)在用方式調(diào)用接口,中使用方式輸入內(nèi)容日志平臺(tái)網(wǎng)關(guān)層基于。日志平臺(tái)網(wǎng)關(guān)層基于到此為止,提取經(jīng)過(guò)網(wǎng)關(guān)的接口信息,并將其寫(xiě)入日志文件就完成了,所有的接口日志都寫(xiě)入了文件中。

背景介紹 1、問(wèn)題現(xiàn)狀與嘗試

沒(méi)有做日志記錄的線上系統(tǒng),絕對(duì)是給系統(tǒng)運(yùn)維人員留下的坑。尤其是前后端分離的項(xiàng)目,后端的接口日志可以解決對(duì)接、測(cè)試和運(yùn)維時(shí)的很多問(wèn)題。之前項(xiàng)目上發(fā)布的接口都是通過(guò)Oracle Service Bus(OSB)來(lái)做統(tǒng)一編排,在編排時(shí)加上日志記錄,并將接口日志存儲(chǔ)到數(shù)據(jù)庫(kù)中。最后基于接口日志數(shù)據(jù)開(kāi)發(fā)日志平臺(tái),來(lái)統(tǒng)一的接口日志分析。
但我們總不能為了記錄日志而使用OSB,這樣很不自由。今年我們有很多后臺(tái)接口使用Spring來(lái)開(kāi)發(fā),后臺(tái)程序的部署環(huán)境也不局限于Oracle中間件的環(huán)境。當(dāng)某些場(chǎng)景時(shí),脫離了OSB,我們?cè)撊绾斡涗浗涌谌罩荆@是本文要解決的問(wèn)題。

在我寫(xiě)的Spring系列的文章中,有嘗試過(guò)使用Spring的AOP來(lái)記錄日志。在每個(gè)項(xiàng)目的代碼中,定義一個(gè)記錄日志的切面,該切面會(huì)對(duì)該項(xiàng)目下的所有接口做日志記錄。
對(duì)于一個(gè)周期很長(zhǎng)、規(guī)模很大的一個(gè)獨(dú)立項(xiàng)目來(lái)說(shuō),這個(gè)方案是可行的。因?yàn)轫?xiàng)目周期很長(zhǎng),花個(gè)兩天做日志記錄的AOP開(kāi)發(fā)沒(méi)啥問(wèn)題,而且這個(gè)日志更契合該系統(tǒng)的業(yè)務(wù)特征。
但我們團(tuán)隊(duì)所面對(duì)的開(kāi)發(fā),基本上都是數(shù)量多、周期短的一些小項(xiàng)目。一個(gè)項(xiàng)目的開(kāi)發(fā)周期可能只有十天,就算每個(gè)項(xiàng)目在日志記錄上只用一天的工作量,所占的比重也有十分之一。如果我們每個(gè)項(xiàng)目都要獨(dú)立的記錄日志,累積的工作量也挺大的,而且重復(fù)這樣的工作很枯燥。
就像面向切面編程(AOP),在一個(gè)項(xiàng)目的所有接口上設(shè)置“切面”統(tǒng)一編程。如果我們的能在所有的項(xiàng)目上設(shè)置“切面”統(tǒng)一編程,就能解決我們現(xiàn)在的問(wèn)題。這個(gè)“切面”就是網(wǎng)關(guān)。

2、方案設(shè)計(jì)

這個(gè)方案是公司內(nèi)的兩位技術(shù)大佬討論出來(lái)的,這樣驚奇的想法,讓之前困擾的一切迷霧都豁然開(kāi)朗了起來(lái)。我花了兩天做了個(gè)Demo,驗(yàn)證方案的確行得通,下文會(huì)附上本次Demo中實(shí)戰(zhàn)操作的代碼。
簡(jiǎn)單來(lái)說(shuō),所有項(xiàng)目接口都通過(guò)Nginx的網(wǎng)關(guān),而我們不需要在代碼層面上收集日志,而是在Nginx上獲取想要的日志信息,配合ELKF(Elasticsearch、Logstash、Kibana、Filebeat)的解決方案,實(shí)現(xiàn)統(tǒng)一的日志平臺(tái)搭建:

Nginx+Lua編程,按照我們定義的格式,所有通過(guò)網(wǎng)關(guān)的接口都會(huì)留下日志信息,寫(xiě)入log文件。

Filebeat收集數(shù)據(jù),F(xiàn)ilebeat實(shí)時(shí)監(jiān)測(cè)目標(biāo)log文件,收集數(shù)據(jù)推送給Logstash。

Logstash過(guò)濾處理數(shù)據(jù),Logstash過(guò)濾處理數(shù)據(jù)后,會(huì)將數(shù)據(jù)同時(shí)推送給Elasticsearch和Kafka。

Elasticsearch+Kibana,Elasticsearch作為數(shù)據(jù)的搜索引擎,而且利用Kibana的可視化界面,將日志數(shù)據(jù)以報(bào)表的形式顯示出來(lái)。

Kafka消息隊(duì)列中間件,日志的數(shù)據(jù)被推送到Kafka上之后發(fā)布消息,而所有訂閱者就能從隊(duì)列中讀數(shù)據(jù)。本次就是寫(xiě)程序?qū)崟r(shí)的讀取隊(duì)列中的數(shù)據(jù),存入數(shù)據(jù)庫(kù)。

3、系統(tǒng)環(huán)境

在本次Demo中,由于資源限制,所有的產(chǎn)品服務(wù)都將部署在一臺(tái)服務(wù)器上,服務(wù)器上的相關(guān)環(huán)境如下:

配置項(xiàng) 環(huán)境配置信息
服務(wù)器 阿里云服務(wù)器ECS(公網(wǎng):47.96.238.21 ,私網(wǎng):172.16.187.25)
服務(wù)器配置 2 vCPU + 4 GB內(nèi)存
JDK版本 JDK 1.8.0_181
操作系統(tǒng) CentOS 7.4 64位
OpenResty 1.13.6.2
Filebeat 6.2.4
Elasticsearch 6.2.4
Logstash 6.2.4
Kibana 6.2.4
Kafka 2.10-0.10.2.1
基于OpenResty的日志記錄

OpenResty? 是一個(gè)基于 Nginx 與 Lua 的高性能 Web 平臺(tái),其內(nèi)部集成了大量精良的 Lua 庫(kù)、第三方模塊以及大多數(shù)的依賴項(xiàng)。用于方便地搭建能夠處理超高并發(fā)、擴(kuò)展性極高的動(dòng)態(tài) Web 應(yīng)用、Web 服務(wù)和動(dòng)態(tài)網(wǎng)關(guān)。
我們選擇OpenResty的目的有兩個(gè):(1)使用Lua編程,可以在Nginx上更好的拿到想要的日志信息;(2)系統(tǒng)其它功能模塊的集成,例如Jwt的集成,可參考同事寫(xiě)的文章《Nginx實(shí)現(xiàn)JWT驗(yàn)證-基于OpenResty實(shí)現(xiàn)》。

1、OpenResty安裝

在安裝OpenResty之前需要先安裝好依賴庫(kù),OpenResty 依賴庫(kù)有: perl 5.6.1+, libreadline, libpcre, libssl。我們是CentOS系統(tǒng),可以直接yum來(lái)安裝。

[root@Kerry ~]# yum install readline-devel pcre-devel openssl-devel perl

接下來(lái)我們?cè)诋?dāng)前CentOS系統(tǒng)上使用新的官方 yum 源

[root@Kerry ~]# yum install yum-utils
[root@Kerry ~]# yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

這時(shí)我們就可以直接安裝OpenResty

[root@Kerry ~]# yum install openresty
[root@Kerry ~]# yum install openresty-resty

這樣OpenResty就安裝完成了,默認(rèn)情況下程序會(huì)被安裝到 /usr/local/openresty 目錄

# 可查看安裝成功
[root@Kerry ~]# cd /usr/local/openresty/bin/
[root@Kerry bin]# ./openresty -v
nginx version: openresty/1.13.6.2

# 設(shè)置環(huán)境變量
[root@Kerry sbin]# vi /etc/profile
# 在文件最后面加上 export PATH=${PATH}:/usr/local/openresty/nginx/sbin
[root@Kerry sbin]# source /etc/profile
2、記錄Nginx日志

OpenResty 安裝之后就有配置文件及相關(guān)的目錄的,為了工作目錄與安裝目錄互不干擾,我們多帶帶建一個(gè)工作目錄。我在根目錄下新建了 /openrestyTest/v1/ 的文件夾,并在該目錄下創(chuàng)建 logs 和 conf 子目錄分別用于存放日志和配置文件。

[root@Kerry ~]# mkdir /openrestyTest /openrestyTest/v1 /openrestyTest/v1/conf /openrestyTest/v1/logs
[root@Kerry ~]# cd /openrestyTest/v1/conf/
# 創(chuàng)建并編輯 nginx.conf
[root@Kerry conf]# vi nginx.conf

在nginx.conf中復(fù)制以下文本作為測(cè)試

worker_processes  1;        #nginx worker 數(shù)量
error_log logs/error.log;   #指定錯(cuò)誤日志文件路徑
events {
    worker_connections 1024;
}

http {
    server {
        #監(jiān)聽(tīng)端口,若你的6699端口已經(jīng)被占用,則需要修改
        listen 6699;
        location / {
            default_type text/html;

            content_by_lua_block {
                ngx.say("HelloWorld")
            }
        }
    }
}

該語(yǔ)法是基于Lua,監(jiān)聽(tīng)6699端口,輸出HelloWorld。我們現(xiàn)在啟動(dòng)Openresty中的Nginx。

[root@Kerry ~]# /usr/local/openresty/nginx/sbin/nginx -p "/openrestyTest/v1/" -c conf/nginx.conf
# 由于配置或環(huán)境變量,也可以直接使用
[root@Kerry ~]# nginx -p "/openrestyTest/v1/" -c conf/nginx.conf
[root@Kerry conf]# curl http://localhost:6699
HelloWorld

訪問(wèn)該端口地址,成功的顯示HelloWorld。我提前在本服務(wù)器的Tomcat上部署了一個(gè)接口,端口是8080。我的想法是將8080反向代理成9000,將所有通過(guò)8080端口的服務(wù)的日志信息獲取到,并輸出到本地的log文件中。
我暫時(shí)需要記錄的日志內(nèi)容包括:接口地址,請(qǐng)求內(nèi)容,請(qǐng)求時(shí)間,響應(yīng)內(nèi)容,響應(yīng)時(shí)間等。代碼寫(xiě)好了,直接替換 /openrestyTest/v1/conf/nginx.conf 的文件內(nèi)容。

worker_processes  1;
error_log logs/error.log;

events {
    worker_connections 1024;
}

http {
log_format myformat "{"status":"$status","requestTime":"$requestTime","responseTime":"$responseTime","requestURL":"$requestURL","method":"$method","requestContent":"$request_body","responseContent":"$responseContent"}";
access_log logs/test.log myformat;

upstream tomcatTest {
    server 47.96.238.21:8080;
}

server {
        server_name 47.96.238.21;
        listen 9000;
        # 默認(rèn)讀取 body
        lua_need_request_body on;

        location / {
                log_escape_non_ascii off;
                proxy_pass  http://tomcatTest;
                set $requestURL "";
                set $method "";
                set $requestTime "";
                set $responseTime "";
                set $responseContent "";

                body_filter_by_lua "
                        ngx.var.requestTime=os.date("%Y-%m-%d %H:%M:%S")

                        ngx.var.requestURL=ngx.var.scheme.."://"..ngx.var.server_name..":"..ngx.var.server_port..ngx.var.request_uri
                        ngx.var.method=ngx.var.request_uri

                        local resp_body = string.sub(ngx.arg[1], 1, 1000)
                        ngx.ctx.buffered = (ngx.ctx.buffered or"") .. resp_body
                        if ngx.arg[2] then
                                ngx.var.responseContent = ngx.ctx.buffered
                        end

                        ngx.var.responseTime=os.date("%Y-%m-%d %H:%M:%S")
                  ";

        }

    }
}

重新啟動(dòng)Nginx,然后進(jìn)行驗(yàn)證

[root@Kerry conf]# nginx -p "/openrestyTest/v1/" -c conf/nginx.conf -s reload

我準(zhǔn)備好的接口地址為:http://47.96.238.21:8080/springboot-demo/hello ,該接口返回的結(jié)果都是“Hello!Spring boot”。
現(xiàn)在用POST方式調(diào)用接口http://47.96.238.21:9000/springboot-demo/hello,Request中使用application/json方式輸入內(nèi)容:“segmentFault《日志平臺(tái)(網(wǎng)關(guān)層) - 基于Openresty+ELKF+Kafka》”。然后查看logs文件夾,發(fā)現(xiàn)多了個(gè) test.log 文件,我們查看該文件。就可以發(fā)現(xiàn),當(dāng)我們每調(diào)用一次接口,就會(huì)同步的輸出接口日志到該文件中。

[root@Kerry conf]#  tail -500f /openrestyTest/v1/logs/test.log
{"status":"200","requestTime":"2018-10-11 18:09:02","responseTime":"2018-10-11 18:09:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"segmentFault《日志平臺(tái)(網(wǎng)關(guān)層) - 基于Openresty+ELKF+Kafka》","responseContent":"Hello!Spring boot!"}

到此為止,提取經(jīng)過(guò)Nginx網(wǎng)關(guān)的接口信息,并將其寫(xiě)入日志文件就完成了,所有的接口日志都寫(xiě)入了 test.log 文件中。

E+L+K+F=日志數(shù)據(jù)處理

ELKF是 Elastic + Logstash + Kibana + FileBeat 四個(gè)組件的組合,可能ELK對(duì)于大家來(lái)說(shuō)更熟悉,ELKF只不過(guò)多了Filebeat,它們都是Elastic公司推出的開(kāi)源產(chǎn)品。剛好這幾天Elastic公司成功上市,掀起了一波ELKF產(chǎn)品討論的熱潮。
原ELK架構(gòu)中,Logstash負(fù)責(zé)收集日志信息并上報(bào),但后來(lái)Elastic公司又推出了Filebeat,大家發(fā)現(xiàn)Filebeat在日志文件收集上效果更好,就只讓Logstash負(fù)責(zé)日志的處理和上報(bào)了。在這個(gè)系統(tǒng)中,Elastic充當(dāng)一個(gè)搜索引擎,Logstash為日志分析上報(bào)系統(tǒng),F(xiàn)ileBeat為日志文件收集系統(tǒng),Kibana為此系統(tǒng)提供可視化的Web界面。

1、Filebeat安裝配置

Filebeat:輕量型日志采集器,負(fù)責(zé)采集文件形式的日志,并將采集來(lái)的日志推送給logstash進(jìn)行處理。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.4-x86_64.rpm
[root@Kerry install]# yum localinstall -y filebeat-6.2.4-x86_64.rpm

安裝完成后,我們開(kāi)始配置Filebeat來(lái)采集日志,并推送給Logstash。

[root@Kerry install]# cd /etc/filebeat/
[root@Kerry filebeat]# vi filebeat.yml

該filebeat.yml是filebeat的配置文件,里面大部分的模塊都被注釋了,本次配置放開(kāi)的代碼有;

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /openrestyTest/v1/logs/*.log
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
output.logstash:
  hosts: ["47.96.238.21:5044"]

監(jiān)聽(tīng) /openrestyTest/v1/logs/ 目錄下的log文件,采集的日志信息輸出到logstash,該hosts等我們安裝啟動(dòng)了Logstash再說(shuō),先啟動(dòng)Filebeat。

[root@Kerry filebeat]# cd /usr/share/filebeat/bin/
[root@Kerry bin]# touch admin.out
[root@Kerry bin]# nohup ./filebeat -e -c /etc/filebeat/filebeat.yml > admin.out &
# 查看admin.out 日志,是否啟動(dòng)成功
2、Logstash安裝配置

Logstash:日志處理工具,負(fù)責(zé)日志收集、轉(zhuǎn)換、解析等,并將解析后的日志推送給ElasticSearch進(jìn)行檢索。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.rpm
[root@Kerry install]# yum localinstall -y logstash-6.2.4.rpm
#Logstash不建議用root啟動(dòng)
[root@Kerry install]# group add logstash
[root@Kerry install]# useradd -g logstash logstash
[root@Kerry install]# passwd logstash
# 設(shè)置密碼
[root@Kerry install]# su logstash
[root@Kerry install]# mkdir -pv /data/logstash/{data,logs}
[root@Kerry install]# chown -R logstash.logstash /data/logstash/
[root@Kerry install]# vi /etc/logstash/conf.d/logstash.conf

創(chuàng)建并編輯/etc/logstash/conf.d/logstash.conf 文件,配置如下:

input {
  beats {
    port => 5044
    codec => plain {
          charset => "UTF-8"
    }
  }
}

output {
  elasticsearch {
    hosts => "47.96.238.21:9200"
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
}

1、input:是指Logstash的數(shù)據(jù)來(lái)源,啟動(dòng)后使用5044來(lái)監(jiān)聽(tīng),是否很熟悉,就是上節(jié)Filebeat推送日志的hosts。
2、output;是Logstash輸出數(shù)據(jù)的位置,我們這里定義為elasticsearch,下文中會(huì)說(shuō)到,用于ELK架構(gòu)中的日志分析

接下來(lái)我們修改/etc/logstash/logstash.yml

#vim /etc/logstash/logstash.yml
path.data: /data/logstash/data
path.logs: /data/logstash/logs

現(xiàn)在可以啟動(dòng)Logstash了

[root@Kerry install]# su logstash
[logstash@Kerry root]$ cd /usr/share/logstash/bin/
[logstash@Kerry bin]$ touch admin.out
[logstash@Kerry bin]$ nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &
3、Elasticsearch安裝配置

ElasticSearch:是一個(gè)分布式的RESTful風(fēng)格的搜索和數(shù)據(jù)分析引擎,同時(shí)還提供了集中存儲(chǔ)功能,它主要負(fù)責(zé)將logstash抓取來(lái)的日志數(shù)據(jù)進(jìn)行檢索、查詢、分析等。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.2.4.rpm
[root@Kerry install]# yum localinstall -y elasticsearch-6.2.4.rpm
#Elasticsearch不建議用root啟動(dòng)
[root@Kerry install]# group add elsearch
[root@Kerry install]# useradd -g elsearch elsearch
[root@Kerry install]# passwd elsearch
# 設(shè)置密碼
[root@Kerry install]# su elsearch
[elsearch@Kerry bin]$  mkdir -pv /data/elasticsearch/{data,logs}
[elsearch@Kerry bin]$  chown -R elsearch.elsearch /data/elasticsearch/
[elsearch@Kerry bin]$  vi /etc/elasticsearch/elasticsearch.yml
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
network.host: 0.0.0.0
http.port: 9200

如果想要外網(wǎng)能訪問(wèn),host就必須要設(shè)成0.0.0.0。Elasticsearch的啟動(dòng)如下

[root@Kerry install]# su elsearch
[elsearch@Kerry bin]$ cd /usr/share/elasticsearch/bin/
[elsearch@Kerry bin]$ ./elasticsearch -d
# -d 保證后臺(tái)啟動(dòng)
4、Kibana安裝配置

Kibana:Web前端,可以將ElasticSearch檢索后的日志轉(zhuǎn)化為各種圖表,為用戶提供數(shù)據(jù)可視化支持。

[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget https://artifacts.elastic.co/downloads/kibana/kibana-6.2.4-x86_64.rpm
[root@Kerry install]# yum localinstall -y kibana-6.2.4-x86_64.rpm
[root@Kerry install]# vi /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.url: "http://47.96.238.21:9200"

同樣的,host為0.0.0.0,保證外網(wǎng)能訪問(wèn)。Kibana只作為前端展示,日志數(shù)據(jù)的獲取還是借助于elasticsearch,所以這里配置了elasticsearch.url。接著啟動(dòng)Kibana,就能通過(guò)頁(yè)面看到日志的報(bào)表。

[root@Kerry ~]# cd /usr/share/kibana/bin/
[root@Kerry bin]# touch admin.out
[root@Kerry bin]# nohup ./kibana >admin.out &

我們?cè)跒g覽器上訪問(wèn) http://47.96.238.21:5601/ ,正常來(lái)說(shuō)就能訪問(wèn)Kibana的頁(yè)面。如果 ELKF一整套配置沒(méi)問(wèn)題,就能在Kibana的頁(yè)面上實(shí)時(shí)的看到所有日志信息。

從Kafka到數(shù)據(jù)庫(kù)

在拿到日志的數(shù)據(jù)后,通過(guò)Elasticsearch和Kibana,已經(jīng)完成了一個(gè)日志查看的平臺(tái)。但我們自己項(xiàng)目?jī)?nèi)部也已經(jīng)開(kāi)發(fā)了日志平臺(tái),希望把這些日志接入到之前的日志平臺(tái)中;或者我們希望定制化一個(gè)更符合實(shí)際使用的日志平臺(tái),這些都需要把拿到的日志數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫(kù)里。
但所有日志的記錄,很明顯處于高并發(fā)環(huán)境,很容易由于來(lái)不及同步處理,導(dǎo)致請(qǐng)求發(fā)生堵塞。比如說(shuō),大量的insert,update之類的請(qǐng)求同時(shí)到達(dá)數(shù)據(jù)庫(kù),直接導(dǎo)致無(wú)數(shù)的行鎖表鎖,甚至最后請(qǐng)求會(huì)堆積過(guò)多,從而觸發(fā)too many connections錯(cuò)誤。通過(guò)使用消息隊(duì)列,我們可以異步處理請(qǐng)求,從而緩解系統(tǒng)的壓力。在比對(duì)市場(chǎng)上開(kāi)源的消息中間件后,我選擇了Kafka。
Apache Kafka是一個(gè)分布式的發(fā)布-訂閱消息系統(tǒng),能夠支撐海量數(shù)據(jù)的數(shù)據(jù)傳遞。在離線和實(shí)時(shí)的消息處理業(yè)務(wù)系統(tǒng)中,Kafka都有廣泛的應(yīng)用。Kafka將消息持久化到磁盤(pán)中,并對(duì)消息創(chuàng)建了備份保證了數(shù)據(jù)的安全。Kafka主要特點(diǎn)是基于Pull的模式來(lái)處理消息消費(fèi),追求高吞吐量,一開(kāi)始的目的就是用于日志收集和傳輸。0.8版本開(kāi)始支持復(fù)制,不支持事務(wù),對(duì)消息的重復(fù)、丟失、錯(cuò)誤沒(méi)有嚴(yán)格要求,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。

Broker:Kafka的broker是無(wú)狀態(tài)的,broker使用Zookeeper維護(hù)集群的狀態(tài)。Leader的選舉也由Zookeeper負(fù)責(zé)。

Zookeeper:Zookeeper負(fù)責(zé)維護(hù)和協(xié)調(diào)broker。當(dāng)Kafka系統(tǒng)中新增了broker或者某個(gè)broker發(fā)生故障失效時(shí),由ZooKeeper通知生產(chǎn)者和消費(fèi)者。生產(chǎn)者和消費(fèi)者依據(jù)Zookeeper的broker狀態(tài)信息與broker協(xié)調(diào)數(shù)據(jù)的發(fā)布和訂閱任務(wù)。

Producer:生產(chǎn)者將數(shù)據(jù)推送到broker上,當(dāng)集群中出現(xiàn)新的broker時(shí),所有的生產(chǎn)者將會(huì)搜尋到這個(gè)新的broker,并自動(dòng)將數(shù)據(jù)發(fā)送到這個(gè)broker上。

Consumer:因?yàn)镵afka的broker是無(wú)狀態(tài)的,所以consumer必須使用partition
offset來(lái)記錄消費(fèi)了多少數(shù)據(jù)。如果一個(gè)consumer指定了一個(gè)topic的offset,意味著該consumer已經(jīng)消費(fèi)了該offset之前的所有數(shù)據(jù)。consumer可以通過(guò)指定offset,從topic的指定位置開(kāi)始消費(fèi)數(shù)據(jù)。consumer的offset存儲(chǔ)在Zookeeper中。

1、Kafka安裝與配置

我們開(kāi)始Kafka的安裝和啟動(dòng)

# 安裝
[root@Kerry ~]# cd /u01/install/
[root@Kerry install]# wget http://apache.fayea.com/kafka/0.10.2.1/kafka_2.10-0.10.2.1.tgz
[root@Kerry install]# tar -zvxf kafka_2.10-0.10.2.1.tgz -C /usr/local/
[root@Kerry install]# cd /usr/local/
[root@Kerry local]# mv kafka_2.10-0.10.2.1 kafka
# 啟動(dòng)
[root@Kerry local]# cd /usr/local/kafka/bin/
[root@Kerry bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
[root@Kerry bin]# touch admin.out
[root@Kerry bin]# nohup ./kafka-server-start.sh ../config/server.properties >admin.out &

創(chuàng)建一個(gè)topic,命名為 kerry

[root@Kerry bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kerry
# topic創(chuàng)建成功,下面查看一下
[root@Kerry bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
kerry

我們往這個(gè)topic中發(fā)送信息

[root@Kerry bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic kerry
Hello Kerry!this is the message for test

我們?cè)匍_(kāi)一個(gè)窗口,從topic中接受消息

[root@Kerry bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning
Hello Kerry!this is the message for test
# 能成功接收到
2、生產(chǎn)者:Logstash

Kafka已經(jīng)安裝好了,也建好了topic,而我希望往topic中發(fā)送消息的對(duì)象(生產(chǎn)者)是Logstash。即Logstash從Filebeat中獲取數(shù)據(jù)后,除了輸出給Elasticsearch以外,還輸出給Logstash,Logstash作為Kafka的生產(chǎn)者。
這里需要修改一下Logstash的配置文件,在output中再加上kafka的信息

vi /etc/logstash/conf.d/logstash.conf
input {
  beats {
    port => 5044
    codec => plain {
          charset => "UTF-8"
    }
  }
}

output {
  elasticsearch {
    hosts => "47.96.238.21:9200"
    manage_template => false
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
  kafka {
    bootstrap_servers => "localhost:9092"    #生產(chǎn)者
    topic_id => "kerry"    #設(shè)置寫(xiě)入kafka的topic
    compression_type => "snappy"
    codec => plain {
            format => "%{message}"
        }
  }
}

重啟Logstash

[root@Kerry bin]# cd /usr/share/logstash/bin
[root@Kerry bin]# ps -ef|grep logstash 
# kill 進(jìn)程
[root@Kerry bin]# nohup ./logstash -f /etc/logstash/conf.d/logstash.conf >admin.out &

我們?cè)儆肞OST方式調(diào)用之前的測(cè)試接口http://47.96.238.21:9000/springboot-demo/hello,請(qǐng)求request為:“這是對(duì)kafka的測(cè)試”。然后再查看從topic中接受消息

[root@Kerry bin]#./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kerry --from-beginning
{"status":"200","requestTime":"2018-10-12 09:40:02","responseTime":"2018-10-12 09:40:02","requestURL":"http://47.96.238.21:9000/springboot-demo/hello","method":"/springboot-demo/hello","requestContent":"這是對(duì)kafka的測(cè)試","responseContent":"Hello!Spring boot!"}

可以成功的接收到推送過(guò)來(lái)的日志消息

3、消費(fèi)者:Springboot編程

日志已經(jīng)可以保證能夠持續(xù)不斷的推送到Kafka中,那么就需要有消費(fèi)者訂閱這些消息,寫(xiě)入到數(shù)據(jù)庫(kù)。我用Spring boot寫(xiě)了個(gè)程序,用來(lái)訂閱Kafka的日志,重要代碼如下:
1、application.yml

spring:
  # kafka
  kafka:
    # kafka服務(wù)器地址(可以多個(gè))
    bootstrap-servers: 47.96.238.21:9092
    consumer:
      # 指定一個(gè)默認(rèn)的組名
      group-id: kafka1
      # earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)
      # latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
      # none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 緩存容量
      buffer-memory: 524288
      # 服務(wù)器地址
      bootstrap-servers: 47.96.238.21:9092

2、POM.xml

        
            org.springframework.kafka
            spring-kafka
            1.0.6.RELEASE
        

3、KafkaController.java

package df.log.kafka.nginxlog.controller;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.naming.InitialContext;
import javax.sql.DataSource;
import java.sql.Connection;


@RestController
@EnableAutoConfiguration
public class KafkaController {

    @RequestMapping("/hello")
    public String hello(){
        return "Hello!Kerry. This is NginxLog program";
    }
    /**
     * 監(jiān)聽(tīng)信息
     */
    @KafkaListener(topics = "kerry" )
    public void receive(ConsumerRecord consumer) {
        // kafkaLog 就是獲取到的日志信息
        String kafkaLog = (String) consumer.value();
        System.out.println("收到一條消息:"+kafkaLog);
        // 存入數(shù)據(jù)庫(kù)的代碼省略
    }

}

當(dāng)程序部署之后,@KafkaListener(topics = "kerry") 會(huì)持續(xù)監(jiān)聽(tīng)topics 為kerry的消息。我們?cè)僬{(diào)用之前的測(cè)試接口,會(huì)發(fā)現(xiàn)新的接口日志會(huì)被持續(xù)監(jiān)聽(tīng)到,在控制臺(tái)上打印出來(lái),并存入數(shù)據(jù)庫(kù)。

尾聲

本次操作文檔是記錄Demo的過(guò)程,很多地方并不成熟,例如:如何在 Nginx+Lua 時(shí)獲取更加全面的日志信息;在Logstash上對(duì)日志進(jìn)行再加工;寫(xiě)出漂亮的Spring boot 代碼,使得能夠很平緩的做寫(xiě)入數(shù)據(jù)庫(kù),用好Kibana的圖表等等。
我們下一步就是在項(xiàng)目的生產(chǎn)環(huán)境上正式的搭建日志平臺(tái),我們已經(jīng)有了rancher環(huán)境,這套架構(gòu)計(jì)劃用微服務(wù)的方式實(shí)現(xiàn)。后續(xù)的搭建文檔會(huì)持續(xù)更新。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/40139.html

相關(guān)文章

  • Spring Cloud - 技術(shù)棧

    摘要:整理自楊波老師的總結(jié)注冊(cè)中心支持模型存儲(chǔ)和靈活健康檢查能力。服務(wù)網(wǎng)關(guān)選擇是最佳搭配,但異步性能不足基于的異步未推出正式版。配置中心缺失治理能力。監(jiān)控存儲(chǔ)依賴于時(shí)間序列數(shù)據(jù)庫(kù)。隊(duì)列對(duì)于日志等可靠性要求不高的場(chǎng)景,用。功能強(qiáng)大但復(fù)雜。 整理自楊波老師的總結(jié) showImg(https://segmentfault.com/img/bV3iL1?w=800&h=512); 注冊(cè)中心 Eur...

    張金寶 評(píng)論0 收藏0
  • 微服務(wù)之分布式文件系統(tǒng)

    摘要:于是便誕生了隨行付分布式文件系統(tǒng)簡(jiǎn)稱,提供的海量安全低成本高可靠的云存儲(chǔ)服務(wù)。子系統(tǒng)相關(guān)流程圖如下核心實(shí)現(xiàn)主要為隨行付各個(gè)業(yè)務(wù)系統(tǒng)提供文件共享和訪問(wèn)服務(wù),并且可以按應(yīng)用統(tǒng)計(jì)流量命中率空間等指標(biāo)。 背景 傳統(tǒng)Web應(yīng)用中所有的功能部署在一起,圖片、文件也在一臺(tái)服務(wù)器;應(yīng)用微服務(wù)架構(gòu)后,服務(wù)之間的圖片共享通過(guò)FTP+Nginx靜態(tài)資源的方式進(jìn)行訪問(wèn),文件共享通過(guò)nfs磁盤(pán)掛載的方式進(jìn)行訪問(wèn)...

    stormjun 評(píng)論0 收藏0
  • 個(gè)推基于Docker和Kubernetes的微服務(wù)實(shí)踐

    摘要:個(gè)推針對(duì)服務(wù)場(chǎng)景,基于和搭建了微服務(wù)框架,提高了開(kāi)發(fā)效率。三容器化在微服務(wù)落地實(shí)踐時(shí)我們選擇了,下面將詳細(xì)介紹個(gè)推基于的實(shí)踐。 2016年伊始Docker無(wú)比興盛,如今Kubernetes萬(wàn)人矚目。在這個(gè)無(wú)比需要?jiǎng)?chuàng)新與速度的時(shí)代,由容器、微服務(wù)、DevOps構(gòu)成的云原生席卷整個(gè)IT界。個(gè)推針對(duì)Web服務(wù)場(chǎng)景,基于OpenResty和Node.js搭建了微服務(wù)框架,提高了開(kāi)發(fā)效率。在微服...

    yibinnn 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<