安装 openresty

yum install pcre-devel openssl-devel gcc curl

CentOS

你可以在你的 CentOS 系统中添加 openresty 仓库,这样就可以便于未来安装或更新我们的软件包(通过 yum update 命令)。运行下面的命令就可以添加我们的仓库:

    sudo yum install yum-utils     sudo yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo

然后就可以像下面这样安装软件包,比如 openresty:

    sudo yum install openresty

如果你想安装命令行工具 resty,那么可以像下面这样安装 openresty-resty 包:

    sudo yum install openresty-resty

命令行工具 opm 在 openresty-opm 包里,而 restydoc 工具在 openresty-doc 包里头。

列出所有 openresty 仓库里头的软件包:

    sudo yum --disablerepo="*" --enablerepo="openresty" list available

参考 OpenResty RPM 包页面获取这些包更多的细节。

kafka 配置

一定要配置 host.name=localhost,否则不能连接发送消息

lua-resty-kafka 模块配置

https://github.com/doujiang24/lua-resty-kafka 在 git 上下载压缩包,copy 到 openresty 安装目录下 lualib 目录

cp -rf lua-resty-kafka-master/lib/resty/kafka /usr/local/test/lualib/resty/


nginx 配置文件

worker_processes  1;
error_log logs/error.log;
events {
    worker_connections 1024;
}
http {
   # lua_package_path "/usr/local/openresty/lualib/kafka/*.lua;;";
   # resolver 8.8.8.8;
    server {
        listen 18080;
        server_name localhost;
        location / {
            default_type text/html;
            content_by_lua '
                ngx.say("<p>hello, world</p>")
            ';
        }

        location /kafka {
            default_type text/html;
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    { host = "localhost", port = 9092 },
                }

                local key = "key"
                local message = "halo world"

                -- usually we do not use this library directly
--                local cli = client:new(broker_list)
--                local brokers, partitions = cli:fetch_metadata("test1")
--                if not brokers then
--                    ngx.say("fetch_metadata failed, err:", partitions)
--                end
--                ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
                -- this is async producer_type and bp will be reused in the whole nginx worker
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test2", key, message)
                if not ok then
                    ngx.say("send err_async:", err)
                    return
                end

                ngx.say("send success, ok:", ok)
            ';
        }
    }
}

这里要注意,broker_list 里的 ip 地址一定要与 kafka 配置文件中的 host.name 相同。 

http 请求:

查看 kafka 消息:http://localhost:18080/kafka

# bin/kafka-console-consumer.sh --bootstrap-server 172.31.41.49:9092 --topic test2 --from-beginning
halo world
halo world

动态取 url 参数

worker_processes  1;
error_log logs/error.log;
events {
    worker_connections 1024;
}
http {
   # lua_package_path "/usr/local/openresty/lualib/kafka/*.lua;;";
   # resolver 8.8.8.8;
    server {
        listen 18080;
        server_name localhost;
        location / {
            default_type text/html;
            content_by_lua '
                ngx.say("<p>hello, world</p>")
            ';
        }

        location /kafka {
            default_type text/html;
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    { host = "172.31.41.49", port = 9092 },
                }

               -- local key = "key"
               -- local message = "halo world"
                local log_obj = {}
		log_obj["request_module"] = "product_detail_info"
		log_obj["headers"] = ngx.req.get_headers()
		log_obj["uri_args"] = ngx.req.get_uri_args()
		log_obj["body"] = ngx.req.read_body()
		log_obj["http_version"] = ngx.req.http_version()
		log_obj["method"] = ngx.req.get_method()
		log_obj["raw_reader"] = ngx.req.raw_header()
		log_obj["body_data"] = ngx.req.get_body_data()
                local key = log_obj["uri_args"]["id"]
                local message = cjson.encode(log_obj);
                -- this is async producer_type and bp will be reused in the whole nginx worker
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test2", key, message)
                if not ok then
                    ngx.say("send err_async:", err)
                    return
                end

                ngx.say("send success, ok:", ok)
            ';
        }
    }
}


url:http://localhost:18080/kafka?id=1&username=sdfwereriouwoir&passwd=123456

接收到消息:

# bin/kafka-console-consumer.sh --bootstrap-server 172.31.41.49:9092 --topic test2 --from-beginning
halo world
halo world
halo world
{"method":"GET","raw_reader":"GET \/kafka?id=1&username=sdfwereriouwoir&passwd=123456 HTTP\/1.1\r\nHost: 34.236.3.122:18080\r\nConnection: keep-alive\r\nCache-Control: max-age=0\r\nUpgrade-Insecure-Requests: 1\r\nUser-Agent: Mozilla\/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/70.0.3538.77 Safari\/537.36\r\nAccept: text\/html,application\/xhtml+xml,application\/xml;q=0.9,image\/webp,image\/apng,*\/*;q=0.8\r\nAccept-Encoding: gzip, deflate\r\nAccept-Language: zh-CN,zh;q=0.9\r\nCookie: __utmc=215336725; __utmz=215336725.1542612153.4.2.utmcsr=wiki.hqygou.com:8090|utmccn=(referral)|utmcmd=referral|utmcct=\/pages\/viewpage.action; td_cookie=3330956158; SPRING_SECURITY_REMEMBER_ME_COOKIE=Y2RoX3VzZXI6MTU0NDc3MjE3NzQyNjpmNTZmNDNjNjA4NTU2NTQ0NWMwNDk1ODUxMWFhZThlNg; __utma=215336725.24067436.1542269746.1543311996.1543562580.11; CLOUDERA_MANAGER_SESSIONID=1io1k7joro0qf1lv9pmq6icoka; JSESSIONID=EFDF49EF80D041360C6E599A71D6F761\r\n\r\n","request_module":"product_detail_info","headers":{"host":"34.236.3.122:18080","connection":"keep-alive","upgrade-insecure-requests":"1","cache-control":"max-age=0","cookie":"__utmc=215336725; __utmz=215336725.1542612153.4.2.utmcsr=wiki.hqygou.com:8090|utmccn=(referral)|utmcmd=referral|utmcct=\/pages\/viewpage.action; td_cookie=3330956158; SPRING_SECURITY_REMEMBER_ME_COOKIE=Y2RoX3VzZXI6MTU0NDc3MjE3NzQyNjpmNTZmNDNjNjA4NTU2NTQ0NWMwNDk1ODUxMWFhZThlNg; __utma=215336725.24067436.1542269746.1543311996.1543562580.11; CLOUDERA_MANAGER_SESSIONID=1io1k7joro0qf1lv9pmq6icoka; JSESSIONID=EFDF49EF80D041360C6E599A71D6F761","accept-encoding":"gzip, deflate","user-agent":"Mozilla\/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/70.0.3538.77 Safari\/537.36","accept-language":"zh-CN,zh;q=0.9","accept":"text\/html,application\/xhtml+xml,application\/xml;q=0.9,image\/webp,image\/apng,*\/*;q=0.8"},"uri_args":{"passwd":"123456","username":"sdfwereriouwoir","id":"1"},"http_version":1.1}

返回小图片


worker_processes  1;
error_log logs/error.log;
events {
    worker_connections 1024;
}
http {
   # lua_package_path "/usr/local/openresty/lualib/kafka/*.lua;;";
   # resolver 8.8.8.8;
    server {
        listen 18080;
        server_name localhost;
        location / {
            default_type text/html;
            content_by_lua '
                ngx.say("<p>hello, world</p>")
            ';
        }
		
        location /empty_gif {
           empty_gif;
        }

        location /kafka {
            #default_type text/html;
            empty_gif;
            content_by_lua '
                local cjson = require "cjson"
                local client = require "resty.kafka.client"
                local producer = require "resty.kafka.producer"

                local broker_list = {
                    { host = "172.31.41.49", port = 9092 },
                }

               -- local key = "key"
               -- local message = "halo world"
                local log_obj = {}
                                log_obj["request_module"] = "product_detail_info"
                                log_obj["headers"] = ngx.req.get_headers()
                                log_obj["uri_args"] = ngx.req.get_uri_args()
                                log_obj["body"] = ngx.req.read_body()
                                log_obj["http_version"] = ngx.req.http_version()
                                log_obj["method"] = ngx.req.get_method()
                                log_obj["raw_reader"] = ngx.req.raw_header()
                                log_obj["body_data"] = ngx.req.get_body_data()

                local message = cjson.encode(log_obj);
               -- local uri_args =
                local key = log_obj["uri_args"]["id"]
                local message = cjson.encode(log_obj);
               -- local uri_args =
--                local key = args["id"]
                -- usually we do not use this library directly
--                local cli = client:new(broker_list)
--                local brokers, partitions = cli:fetch_metadata("test1")
--                if not brokers then
--                    ngx.say("fetch_metadata failed, err:", partitions)
--                end
--                ngx.say("brokers: ", cjson.encode(brokers), "; partitions: ", cjson.encode(partitions))
                -- this is async producer_type and bp will be reused in the whole nginx worker
                local bp = producer:new(broker_list, { producer_type = "async" })

                local ok, err = bp:send("test2", key, message)
                if not ok then
                    ngx.say("send err_async:", err)
                    return
                end
               ngx.exec("/empty_gif")
            ';
        }
    }
}

取日志时间

log_obj["recive_time"] = ngx.req.start_time() * 1000

函数 ngx.req.start_time() 的返回值是一个浮点数:1544258212.019 所以需要放大 1000 呗。


参考:https://openresty.org/cn/linux-packages.html

    https://openresty.org/cn/getting-started.html

https://github.com/doujiang24/lua-resty-kafka

https://www.jianshu.com/p/c12a9662625b

https://hot66hot.iteye.com/blog/2291916

https://github.com/doujiang24/lua-resty-kafka

单节点 kafka 安装:https://kafka.apache.org/quickstart