yum install pcre-devel openssl-devel gcc curl
CentOS
你可以在你的 CentOS 系统中添加 openresty 仓库,这样就可以便于未来安装或更新我们的软件包(通过 yum update 命令)。运行下面的命令就可以添加我们的仓库:
sudo yum install yum-utils sudo yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
然后就可以像下面这样安装软件包,比如 openresty:
sudo yum install openresty
如果你想安装命令行工具 resty,那么可以像下面这样安装 openresty-resty 包:
sudo yum install openresty-resty
命令行工具 opm 在 openresty-opm 包里,而 restydoc 工具在 openresty-doc 包里头。
列出所有 openresty 仓库里头的软件包:
sudo yum --disablerepo="*" --enablerepo="openresty" list available
参考 OpenResty RPM 包页面获取这些包更多的细节。
一定要配置 host.name=localhost,否则不能连接发送消息
https://github.com/doujiang24/lua-resty-kafka 在 git 上下载压缩包,copy 到 openresty 安装目录下 lualib 目录
cp -rf lua-resty-kafka-master/lib/resty/kafka /usr/local/test/lualib/resty/
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
# lua_package_path "/usr/local/openresty/lualib/kafka/*.lua;;";
# resolver 8.8.8.8;
server {
listen 18080;
server_name localhost;
location / {
default_type text/html;
content_by_lua '
ngx.say("<p>hello, world</p>")
';
}
location /kafka {
default_type text/html;
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "localhost", port = 9092 },
}
local key = "key"
local message = "halo world"
-- usually we do not use this library directly
-- local cli = client:new(broker_list)
-- local brokers, partitions = cli:fetch_metadata("test1")
-- if not brokers then
-- ngx.say("fetch_metadata failed, err:", partitions)
-- end
-- ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test2", key, message)
if not ok then
ngx.say("send err_async:", err)
return
end
ngx.say("send success, ok:", ok)
';
}
}
}
这里要注意,broker_list 里的 ip 地址一定要与 kafka 配置文件中的 host.name 相同。
http 请求:
查看 kafka 消息:http://localhost:18080/kafka
# bin/kafka-console-consumer.sh --bootstrap-server 172.31.41.49:9092 --topic test2 --from-beginning
halo world
halo world
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
# lua_package_path "/usr/local/openresty/lualib/kafka/*.lua;;";
# resolver 8.8.8.8;
server {
listen 18080;
server_name localhost;
location / {
default_type text/html;
content_by_lua '
ngx.say("<p>hello, world</p>")
';
}
location /kafka {
default_type text/html;
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "172.31.41.49", port = 9092 },
}
-- local key = "key"
-- local message = "halo world"
local log_obj = {}
log_obj["request_module"] = "product_detail_info"
log_obj["headers"] = ngx.req.get_headers()
log_obj["uri_args"] = ngx.req.get_uri_args()
log_obj["body"] = ngx.req.read_body()
log_obj["http_version"] = ngx.req.http_version()
log_obj["method"] = ngx.req.get_method()
log_obj["raw_reader"] = ngx.req.raw_header()
log_obj["body_data"] = ngx.req.get_body_data()
local key = log_obj["uri_args"]["id"]
local message = cjson.encode(log_obj);
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test2", key, message)
if not ok then
ngx.say("send err_async:", err)
return
end
ngx.say("send success, ok:", ok)
';
}
}
}
url:http://localhost:18080/kafka?id=1&username=sdfwereriouwoir&passwd=123456
接收到消息:
# bin/kafka-console-consumer.sh --bootstrap-server 172.31.41.49:9092 --topic test2 --from-beginning
halo world
halo world
halo world
{"method":"GET","raw_reader":"GET \/kafka?id=1&username=sdfwereriouwoir&passwd=123456 HTTP\/1.1\r\nHost: 34.236.3.122:18080\r\nConnection: keep-alive\r\nCache-Control: max-age=0\r\nUpgrade-Insecure-Requests: 1\r\nUser-Agent: Mozilla\/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/70.0.3538.77 Safari\/537.36\r\nAccept: text\/html,application\/xhtml+xml,application\/xml;q=0.9,image\/webp,image\/apng,*\/*;q=0.8\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: zh-CN,zh;q=0.9\r\nCookie: __utmc=215336725; __utmz=215336725.1542612153.4.2.utmcsr=wiki.hqygou.com:8090|utmccn=(referral)|utmcmd=referral|utmcct=\/pages\/viewpage.action; td_cookie=3330956158; SPRING_SECURITY_REMEMBER_ME_COOKIE=Y2RoX3VzZXI6MTU0NDc3MjE3NzQyNjpmNTZmNDNjNjA4NTU2NTQ0NWMwNDk1ODUxMWFhZThlNg; __utma=215336725.24067436.1542269746.1543311996.1543562580.11; CLOUDERA_MANAGER_SESSIONID=1io1k7joro0qf1lv9pmq6icoka; JSESSIONID=EFDF49EF80D041360C6E599A71D6F761\r\n\r\n","request_module":"product_detail_info","headers":{"host":"34.236.3.122:18080","connection":"keep-alive","upgrade-insecure-requests":"1","cache-control":"max-age=0","cookie":"__utmc=215336725; __utmz=215336725.1542612153.4.2.utmcsr=wiki.hqygou.com:8090|utmccn=(referral)|utmcmd=referral|utmcct=\/pages\/viewpage.action; td_cookie=3330956158; SPRING_SECURITY_REMEMBER_ME_COOKIE=Y2RoX3VzZXI6MTU0NDc3MjE3NzQyNjpmNTZmNDNjNjA4NTU2NTQ0NWMwNDk1ODUxMWFhZThlNg; __utma=215336725.24067436.1542269746.1543311996.1543562580.11; CLOUDERA_MANAGER_SESSIONID=1io1k7joro0qf1lv9pmq6icoka; JSESSIONID=EFDF49EF80D041360C6E599A71D6F761","accept-encoding":"gzip, deflate","user-agent":"Mozilla\/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/70.0.3538.77 Safari\/537.36","accept-language":"zh-CN,zh;q=0.9","accept":"text\/html,application\/xhtml+xml,application\/xml;q=0.9,image\/webp,image\/apng,*\/*;q=0.8"},"uri_args":{"passwd":"123456","username":"sdfwereriouwoir","id":"1"},"http_version":1.1}
worker_processes 1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
# lua_package_path "/usr/local/openresty/lualib/kafka/*.lua;;";
# resolver 8.8.8.8;
server {
listen 18080;
server_name localhost;
location / {
default_type text/html;
content_by_lua '
ngx.say("<p>hello, world</p>")
';
}
location /empty_gif {
empty_gif;
}
location /kafka {
#default_type text/html;
empty_gif;
content_by_lua '
local cjson = require "cjson"
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
local broker_list = {
{ host = "172.31.41.49", port = 9092 },
}
-- local key = "key"
-- local message = "halo world"
local log_obj = {}
log_obj["request_module"] = "product_detail_info"
log_obj["headers"] = ngx.req.get_headers()
log_obj["uri_args"] = ngx.req.get_uri_args()
log_obj["body"] = ngx.req.read_body()
log_obj["http_version"] = ngx.req.http_version()
log_obj["method"] = ngx.req.get_method()
log_obj["raw_reader"] = ngx.req.raw_header()
log_obj["body_data"] = ngx.req.get_body_data()
local message = cjson.encode(log_obj);
-- local uri_args =
local key = log_obj["uri_args"]["id"]
local message = cjson.encode(log_obj);
-- local uri_args =
-- local key = args["id"]
-- usually we do not use this library directly
-- local cli = client:new(broker_list)
-- local brokers, partitions = cli:fetch_metadata("test1")
-- if not brokers then
-- ngx.say("fetch_metadata failed, err:", partitions)
-- end
-- ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
-- this is async producer_type and bp will be reused in the whole nginx worker
local bp = producer:new(broker_list, { producer_type = "async" })
local ok, err = bp:send("test2", key, message)
if not ok then
ngx.say("send err_async:", err)
return
end
ngx.exec("/empty_gif")
';
}
}
}
log_obj["recive_time"] = ngx.req.start_time() * 1000
函数 ngx.req.start_time() 的返回值是一个浮点数:1544258212.019 所以需要放大 1000 呗。
参考:https://openresty.org/cn/linux-packages.html
https://openresty.org/cn/getting-started.html
https://github.com/doujiang24/lua-resty-kafka
https://www.jianshu.com/p/c12a9662625b
https://hot66hot.iteye.com/blog/2291916
https://github.com/doujiang24/lua-resty-kafka
单节点 kafka 安装:https://kafka.apache.org/quickstart