Nginx Lua集成Kafka(2)

kafkalua.lua:

--测试语句可以不用 ngx.say('hello kafka file configuration successful!!!!!!') --数据采集阈值限制,如果lua采集超过阈值,则不采集 local DEFAULT_THRESHOLD = 100000 -- kafka分区数 local PARTITION_NUM = 6 -- kafka主题名称 local TOPIC = 'B2CDATA_COLLECTION1' -- 轮询器共享变量KEY值 local POLLING_KEY = "POLLING_KEY" -- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致) local function partitioner(key, num, correlation_id) return tonumber(key) end --kafka broker列表 local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}} --kafka参数, local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner } -- 共享内存计数器,用于kafka轮询使用 local shared_data = ngx.shared.shared_data local pollingVal = shared_data:get(POLLING_KEY) if not pollingVal then pollingVal = 1 shared_data:set(POLLING_KEY, pollingVal) end --获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区 local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM) shared_data:incr(POLLING_KEY, 1) -- 并发控制 local isGone = true --获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护 if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then isGone = false end -- 数据采集 if isGone then local time_local = ngx.var.time_local if time_local == nil then time_local = "" end local request = ngx.var.request if request == nil then request = "" end local request_method = ngx.var.request_method if request_method == nil then request_method = "" end local content_type = ngx.var.content_type if content_type == nil then content_type = "" end ngx.req.read_body() local request_body = ngx.var.request_body if request_body == nil then request_body = "" end local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封装数据 local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie; --引入kafka的producer local producer = require "resty.kafka.producer" --创建producer local bp = producer:new(BROKER_LIST, CONNECT_PARAMS) --发送数据 local ok, err = bp:send(TOPIC, partitions, message) --打印错误日志 if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end end 第三步:修改nginx配置文件nginx.conf 1.进入ngin/conf目录 [root@node03 openresty]# cd /export/servers/openresty/nginx/conf/ [root@node03 conf]# ll total 76 -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf -rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params -rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default -rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf -rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types -rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default -rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf -rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params -rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params -rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default -rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf 2.修改nginx.conf [root@node03 conf]# vim nginx.conf #1.说明找到第一个server #2.在server上面添加两行代码如下 #3.在server里面添加kafka相关的代码如下 #------------------添加的代码--------------------------------------- #开启共享字典,设置内存大小为10M,供每个nginx的线程消费 lua_shared_dict shared_data 10m; #配置本地域名解析 resolver 127.0.0.1; #------------------添加的代码--------------------------------------- server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { root html; index index.html index.htm; } #------------------添加的代码--------------------------------------- location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空 #开启nginx监控 stub_status on; #加载lua文件 default_type text/html; #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!) content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua; } #------------------添加的代码--------------------------------------- }

说明:location /kafkalua{...}这里的kafkalua是工程名,可以随意取也可以不取,但是必须要记住!!!

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/13cf4a64a5543505e913431ab44abc27.html