Zeek Framework 中文版(一)(zeekr 下载)

(本文根据官方手册翻译 zeek.org)

内蒙古巨鹏软件原创翻译,转载请注明来源!

什么是 Zeek

Zeek 是一个功能强大的网络分析框架,它与我们知道的典型 IDS 大不相同。

Zeek 在关注网络安全监控的同时,也为更通用的网络流量分析提供了一个全面的平台。

特性:

  • 深度分析:Zeek 附带了许多协议的分析器,支持应用层的高级语义分析
  • 具有适应性和灵活性: Zeek 的特定域脚本语言支持站点特定的监视策略,并意味着它不限于任何特定的检测方法。
  • 高效: Zeek 以高性能网络为目标,在各种大型站点上运行
  • 高度稳定:Zeek 对它监视的网络保持广泛的应用层状态,并提供网络活动的高级存档。

为什么要翻译这个手册

【内蒙古巨鹏软件公司】在使用Zeek作为网络流量分析底层工具,为客户的应用进行分析时,有感于国内中文资料较少,详细的资料更是没有。好在官方文档非常详尽,更新也很及时,因此就将其中比较重要的部分资料进行了翻译,以方便大家掌握Zeek的框架体系。

Zeek时非常棒的网络流量分析(而不只是网络包分析)框架,可以极大方便我们开展网络中的安全事件、应用软件流量、数据库审计等方面的工作。同时,Zeek作为我公司自研的【关键业务保障系统 / 应用软件一体化运维平台】的重要数据源之一,起到了重要的支撑作用。

欢迎广大对NPM、Zeek有兴趣的朋友和我们一同研究和使用Zeek,做更多有价值,也有趣的事情。

Framework

Zeek 包括多个软件框架,这些框架为脚本层提供常用的功能。此外,框架增强了 Zeek在获取数据、构建和过滤输出、运行时调整设置,以及与网络中的其他组件进行交互的能力。框架中包括了 Zeek 核心实现的功能,并把相应的数据结构和 API 暴露给脚本层。

除了个别框架针对特定的使用案例,其他框架几乎在每个 Zeek 安装中都在使用。例如Logging框架,在之前所有使用 Zeek 日志的设备上几乎都存在。

框架在构建中也相互依赖,因此非常值得了解它们的能力。接下来的几节详细介绍了它们。截止 V4.1.1版本,zeek 框架包括下列:

– Logging Framework(日志)

– Notice Framework(告警)

– Input Framework(输入)

– Configuration Framework(配置)

– Intelligence Framework(分析)

– Cluster Framework(集群)

– Broker Communication Framework(通讯协商)

– Supervisor Framework(管理员)

– GeoLocationFile Analysis(地理位置)

– Signature Framework(数字签名)

– Summary Statistics(概要统计)

– NetControl Framework(网络控制)

– Packet Analysis(包分析)

Logging Framework

Zeek 配备了一个 key-value 格式,灵活、有弹性的基础日志界面,能够精细控制记录的内容和记录方式。本文档描述了如何定制和扩展日志体系。

术语

zeek的日志界面(logging interface),围绕三个主要的概念建立的,分别是:

– 流 Streams

日志流对应于单个日志。它定义了构成日志的字段集合,以及字段的名称和类型。例如:conn stream 用于记录网络连接的摘要信息,而 http stream则用于记录 HTTP协议 的活动情况。

– 过滤器 Filters

每个流(stream)都会有一组过滤器附着在其上,用于决定哪些信息被写出去,以及如何写出去。默认情况下,每个流都有一个缺省的过滤器,用于把logs的所有内容直接写到磁盘上。

但是,我们也可以增加其他的过滤器,用来记录一个经过裁剪的子集、写到一个不同的输出位置,或设定一个自定义的 rotation interval(轮转间隔,指日志文件每隔一段时间进行归档改名的策略)。

如果一个流的过滤器全部被删除掉,则这个流不会有任何输出(output is disabled)。

– 写入器 Writers

每个过滤器都有一个写入器,写入器定义了信息被记录的实际输出格式。缺省的写入器是一个 ASCII writer ,它生成 tab 制表符作为分隔符的 ascii 文件。您也能找到其他可用的写入器,如二进制输出、JSON格式输出,甚至直接写到一个数据库里面的写入器。

定制 Zeek 的日志有几种不同的方法:您可以创建一个新的日志流;也可以给现有日志扩展新的字段;您还可以将过滤器 Filter 应用到现有日志流中;或者可以通过设置日志写入器的选项来自定义输出格式。本文档中描述了所有这些方法。

1. 流 Streams

为了将数据记录到新的日志流,需要完成以下所有工作:

  1. 您需要声明一个`recode`类型,其中包含所有需要记录的字段(约定俗成,这个记录类型的名称通常是“INFO”)。
  2. 为这个新的日志流(log stream)声明一个ID,这个ID必须具备唯一性。类型名通常是 `Log::ID`,是一个 `enum` 枚举类型。
  3. 调用 `Log::create_stream` 函数,来创建这个新的日志流。
  4. 当要记录的数据可用时,`Log::write` 函数将会被调用。

下面的例子程序中,我们创建了一个新的模块(module)`foo`,用来生成一个新的日志流:

module Foo;export { # 为新的流创建一个ID,约定俗成的叫做"LOG". redef enum Log::ID = { LOG }; # Define the record type that will contain the data to log. type Info: record { ts: time &log; id: conn_id &log; service: string &log &optional; missed_bytes: count &log &default=0; };}# 作为选项,我们可以在connection记录中添加一个新字段 foo,以便我们记录的数据在各种事件处理程序中轻松访问。redef record connection = { # 约定俗成,模块中的字段使用小写命名. foo: Info &optional;};# 此事件以高于零的优先级处理,因此,如果用户在另一个脚本中修改此流,则可以在默认优先级为零时进行该流处理。event zeek_init() &priority=5{ # Create the stream. This adds a default filter automatically. Log::create_stream(Foo::LOG, [$columns=Info, $path="foo"]);}

在上面代码中,`INFO`记录声明时,您要注意每个字段都有一个`&log`属性,如果没有这个属性,字段将不会出现在日志输出中。同样要注意的是有一个`&optional`属性也用于字段,这表明在日志写入记录之前,这个字段可能没有任何值。最后,具有`&default`属性的字段,能够自动初始化默认值。

到了现在,还缺少的一件事是写一个`Log::write`函数,发送数据到日志框架中。实际的事件处理在何处发生,取决于数据是在何处可用的。举例来说:

当`connection_established` 事件把数据提供给我们时,我们同时保存了一份数据拷贝,用于把数据保存到`connection` 记录中:

event connection_established(c: connection){ local rec : Foo::Info = [$ts=network_time(), $id=c$id]; # 将数据副本存储在connection record记录中, # 以便其他事件处理程序可以访问该数据。 c$foo = rec; # 写入到了上文创建的流`Foo:LOG` Log::write(Foo::LOG, rec);}

如果您使用Zeek运行这个脚本,就创建出来了一个新的日志文件`foo.log`。虽然我们在上面的info记录类型中只指定了4个字段,但日志输出实际了7个字段,因为` id: conn_id`本身就是一个记录类型。

由于conn_id记录有四个字段,因此每个字段在日志输出中都会是单独的列。注意,此类字段在日志输出中的命名方式与 Zeek 脚本中的命名方式略有不同,每个`$`符号都会被替换为句点)。

conn_id 的原型定义在 share/zeek/base/init_bare.zeek脚本中:

type conn_id: record {    orig_h: addr;   ##< The originator's IP address.    orig_p: port;   ##< The originator's port number.    resp_h: addr;   ##< The responder's IP address.    resp_p: port;   ##< The responder's port number.} &log;

当您编写zeek脚本,把数据存放到`connection`记录类型中的时候,您必须要考虑清楚什么时间、如何保存长数据(long data)的问题。正常情况下,在整个通讯连接期间数据都会保存到`connection`记录类型中,从实际角度看,在连接结束之前删除这些数据很常见。

1.1 增加字段到日志

您可以通过扩展`record`类型中定义的成员的方式,为日志扩展字段,并且应该在每个日志记录实际写入之前,为这些新字段赋值。

假设,我们打算在 `Conn::Info`中增加一个boolean型字段`is_private`,用来标识通讯发起人的IP地址是不是一个符合**RFC 1918**规范的地址空间的一部分:

redef record Conn::Info = { is_private: bool &default = F &log;};

上面的例子表明,当我们扩展一个日志流的`INFO`记录类型时,每一个新增加的字段,都需要使用`&default`属性声明默认值,或使用`&optional`属性表明可以为空。此外,您也需要为字段添加`&log`属性,否则字段不会出现在日志文件中。

接下来,我们需要给字段赋值。虽然详细信息因扩展的日志而异,但一般来说,选择一个合适的事件为扩展的字段赋值非常重要,因为我们需要确保在写入日志记录之前,字段都会被正确赋值。有时,正确的选择是用一个更高优先级(higher priority)的相同事件(event)写日志记录。高优先级能够确保先执行给扩展字段赋值的事件处理程序,再运行写日志记录的事件处理程序)。

在下面的例子中,由于网络连接的概要信息产生于该连接从内存中删除之时,我们可以在这个位置添加另一个事件处理程序, 正确地给我们的字段赋值:

event connection_state_remove(c: connection){ if ( c$id$orig_h in Site::private_address_space ) c$conn$is_private = T;}

现在`conn.log`日志文件中,已经能看到一个 bool 类型的新字段`is_private`。如果你查看系统脚本 `base/protocols/conn/main.zeek`文件中 `connection` 日志流的定义,你就能够看到就像上面例子程序中为扩展字段赋值一样,在main.zeek里面也使用 `log::write` 方法在一个事件处理程序中赋值,只不过优先级比较低(也就是说,在对`is_private`字段赋值后才写入日志的)。

要以这种方式扩展日志,需要了解创建日志流的脚本是如何组织其状态保持的。大多数标准 Zeek 脚本将他们的日志状态赋值给 `connection` 记录,然后可以访问该记录数据,就是采用了上面的 `c$conn` 方式。例如,HTTP 分析器(analysis)就是把一个 `HTTP::Info` 类型字段添加到了 `connection` 类型定义中。

参考 `share/zeek/base/protocols/http/main.zeek` 文件的代码:

# Add the http state tracking fields to the connection record.redef record connection = {    http:        Info  &optional;    http_state:  State &optional;};

1.2 定义日志事件

有时,对正在记录的信息进行额外的分析处理是有帮助的。在该情况下,流可以指定每次写入日志记录时都会生成的事件。为此,我们需要修改上面的例子代码,就像这样:

module Foo;export { redef enum Log::ID = { LOG }; type Info: record { ts: time &log; id: conn_id &log; service: string &log &optional; missed_bytes: count &log &default=0; }; # Define a logging event. By convention, this is called # "log_<stream>". global log_foo: event(rec: Info);} event zeek_init() &priority=5{ # 在此处指定 Foo 事件,以便 Zeek 触发(raise)该事件。 Log::create_stream(Foo::LOG, [$columns=Info, $ev=log_foo, $path="foo"]);}

Zeek 的所有默认日志流都定义了此类事件。例如:`connection` 日志流能够触发 `Conn::log_conn` 事件,能用它来标记那些对特定目标主机连接时间超限的连接。例子代码:

redef enum Notice::Type = { ## Indicates that a connection remained established longer ## than 5 minutes. Long_Conn_Found};event Conn::log_conn(rec: Conn::Info){ if ( rec?$duration && rec$duration > 5mins ) NOTICE([$note=Long_Conn_Found, $msg=fmt("unusually long conn to %s", rec$id$resp_h), $id=rec$id]);}

过去,人们需要使用类似 Perl 语言这样的工具处理 Zeek 形成的日志,这通常会额外花费很多时间。使用Zeek的事件 `event` 功能,就能够在 Zeek 内部实时处理。

1.3 关闭流

有一种方法能够完全关闭流:

event zeek_init(){ Log::disable_stream(Conn::LOG);}

注意,事件需要在创建流后运行,因此这个事件处理程序的优先级必须低于创建流的事件处理程序的优先级。

2. 过滤器

一个流,能够让一个或多个过滤器连接上来。没有定义过滤器的流不会产生任何日志输出。过滤器从两个方面控制日志的生成:哪些流的日志会被写出去,并且定义写日志的具体实现细节。过滤器中指定一个具体的日志写入器(log writer)用于实现写操作。例如,一个 ASCII 写入器(后面会介绍到)用于输出文本文件。

当一个流(Stream)被创建时,默认的过滤器会被自动关联。默认的过滤器可以被移除、替换,或者继续向这个流添加其他过滤器。这是通过使用 `Log::add_filter` 或 `Log::remove_filter` 函数来实现的。

本小节将讲解如何使用过滤器实现重命名日志文件、拆分输出到多个文件、控制记录写操作,以及自定义一个日志轮转间隔等任务。

2.1 给日志文件改名

通常,我们在创建流时就明确了日志流的日志文件名。当然,我们也可以通过添加过滤器的方式指定了不同的名字。

给日志改名,最简单的方法是用一个新的过滤器,`替换` 掉默认过滤器,在新过滤器中指定一个不同的路径:

event zeek_init(){ # Replace default filter for the Conn::LOG stream in order to # change the log filename. local f = Log::get_filter(Conn::LOG, "default"); f$path = "myconn"; Log::add_filter(Conn::LOG, f);}

注意:日志过滤器中的路径字段,不包含文件名的扩展,扩展将由日志写入器(writer)稍后确定。

2.2 增加一个额外的输出文件

通常,一个日志流只能唯一写到一个日志文件中。然而,您可以通过增加过滤器的方法把流写到多个文件中。如果想要在新日志文件里面限定部分字段,这个办法会很有用。下面的例子演示了,当给`Conn::LOG` 流中增加一个新的过滤器,就能生成一个新的日志文件:

event zeek_init(){ # Add a new filter to the Conn::LOG stream that logs only # timestamp and originator address. local filter: Log::Filter = [$name="orig-only", $path="origs", $include=set("ts", "id.orig_h")]; Log::add_filter(Conn::LOG, filter);}

注意:当一个流中使用了多个相同路径名的过滤器时,Zeek 为了避免冲突,会自动在输出文件名称后面添加数字后缀。就像下面的警告中告知的:

1071580905.346457 warning: Write using filter 'orig-only' on path 'conn' changed to use new path 'conn-2' to avoid conflict with filter 'default'

过滤器中省略路径值时也会发生同样的情况。在这种情况下,过滤器会继承流的路径成员的值。

请注意,`include` 过滤器属性是如何限定了一个字段集合的。这些字段名需要和 `Conn::Info` 记录原型声明相对应。由于记录中 ID 字段本身就是一个记录(嵌套定义的),因此我们可以通过上面例子代码中`id.orig_h` 这样的方法,使用点符号指定单个 ID 的字段。

使用上述代码,除了常规的日志文件 conn.log,目录里还会生成一个新的日志文件 origs.log 。这个文件结构看上去像常规的 conn.log,但只包括过滤器属性中指定的字段。

如果您想跳过某些字段,但保留其余的字段,则可以使用一个叫做 `exclude` 的过滤器属性,里面可以列出您不感兴趣的字段。

如果您打算把当前这个作为流唯一的日志文件,你可以把 `default` 过滤器删除掉:

event zeek_init(){ # Remove the filter called "default". Log::remove_filter(Conn::LOG, "default");}

2.3 动态确定日志文件路径

有些时候我们可以通过过滤器在记录日志时,动态确定输出路径,这样就不必在过滤器的 `path` 属性中设定了。这样,类似本地连接和远程连接分别记录在不同的日志文件中这样的需求,就可以通过 `path_func` 这个过滤器属性,动态调用一个函数来返回你想要的路径:

# Note: if using ZeekControl then you don't need to redef local_nets.redef Site::local_nets = { 192.168.0.0/16 };function myfunc(id: Log::ID, path: string, rec: Conn::Info) : string{ # Return "conn-local" if originator is a local IP, otherwise # return "conn-remote". local r = Site::is_local_addr(rec$id$orig_h) ? "local" : "remote"; return fmt("%s-%s", path, r);}event zeek_init(){ local filter: Log::Filter = [$name="conn-split", $path_func=myfunc, $include=set("ts", "id.orig_h")]; Log::add_filter(Conn::LOG, filter);}

运行上面的代码,会生成两个日志文件:`conn-local.log conn-remote.log` ,在这个例子中,`Site::local_nets` 变量必须被赋值(如果用 ZeekControl 则不需要)。我们还可以继续扩展这个例子,比如说按照子网(subnets),甚至每个IP都写一个独立的日志文件。要当心的是,这样很容易搞出一大堆日志文件来。

上面的函数 `myfunc` 有个缺点:它只能与 Conn 流一起使用, 因为记录类型被硬编码到参数列表中了。但是,Zeek 能够提供更灵活的方式:

function myfunc( id: Log::ID, path: string, rec: record { id: conn_id; } ) : string{ local r = Site::is_local_addr(rec$id$orig_h) ? "local" : "remote"; return fmt("%s-%s", path, r);}

上面的写法,可用于任何包含 `id : conn_id` 字段的日志流。

2.4 过滤日志记录

我们刚刚看到如何自定义日志**列**的方法,日志框架还能让您控制 Zeek 中哪些日志**行**是可以被写入日志的。这依赖了 Hook 挂钩机制,就像下面的代码里面,框架提供了两个级别的“policy”hooks定义,一个是全局(global)的,一个是一套(多个)过滤器级别的。Hook程序可以对日志记录进行额外的处理,包括拒绝写这条记录。任何使用`break` 语句离开 Hook 程序部分,都将不再向日志文件写记录。任何人都可以将处理程序连接到这些Hook上,如下所示:

type Log::StreamPolicyHook: hook(rec: any, id: ID);type Log::PolicyHook: hook(rec: any, id: ID, filter: Filter);

对于上面的两种hook类型,参数 rec 包含了要被写入日志的记录行,这个行实例与当前流的列定义关联,可以用 id 来标识日志流。

日志框架中,定义了一个全局的挂钩策略的挂钩(hook policy hook),名字叫 `Log::log_stream_policy`,每一个日志写的时候,这个 hook 会被首先调用,任何其他的处理程序(handlers)都可能否决这个日志的条目。框架会对日志流迭代所有的过滤器,每个过滤器都会有一个类型`Log::PolicyHook` 的钩子 `filter$policy` ,用来接收日志记录、日志流 ID 和过滤器记录本身,每个处理程序都可以决定是否写入日志。

当过滤器的hook开始运行,任何否决日志写出的意图(通过 `log:log_stream_policy` 或过滤器的hook),都应该通过过滤器完成。如果过程中没有发生否决,过滤器就会把日志记录转交到输出环节。

您可以通过这些挂钩处理程序传递任意状态。例如,您可以使用 `ref` 扩展流或过滤器,也能够通过 table 类型的 `filter$config` 传递 k-v 对。

由于我们通常希望在特定的流上统一所有的写操作,当构建日志流时,流提供了一个默认hook。如果流的过滤器没有提供他们专有的hook,该流的筛选器将使用默认hook。要支持日志流上的hook,在创建新流时应始终定义默认hook,请看代码:

module Foo;export { ## The logging stream identifier. redef enum Log::ID = { LOG }; ## A default logging policy hook for the stream. global log_policy: Log::PolicyHook; # Define the record type that will contain the data to log. type Info: record { ts: time &log; id: conn_id &log; service: string &log &optional; missed_bytes: count &log &default=0; };}event zeek_init() &priority=5{ # Create the stream, adding the default policy hook: Log::create_stream(Foo::LOG, [$columns=Info, $path="foo", $policy=log_policy]);}

有了这个钩子,现在很容易在任何位置为 Foo 日志添加过滤操作:

hook Foo::log_policy(rec: Foo::Info, id: Log::ID, filter: Log::Filter){ # 只记录完整的信息: if ( rec$missed_bytes > 0 ) break; # 任何位置 break,都会否决(veto)写操作 }

Zeek的分发特性能为所有的流提供默认的hook。下面是一个更加实用的例子:

hook HTTP::log_policy(rec: HTTP::Info, id: Log::ID, filter: Log::Filter){ # Record only connections with successfully analyzed HTTP traffic # 只记录分析出来是 HTTP 流量的连接 # [! rec?$service] 非空检查 if ( ! rec?$service || rec$service != "http" ) break;}

如果要在新的过滤器中选择性地覆盖现有的hook,需要在把过滤器添加到流的时候设置:

hook my_policy(rec: Foo::Info, id: Log::ID, filter: Log::Filter){ # Let's only log incomplete flows: if ( rec$missed_bytes == 0 ) break;}event zeek_init(){ local filter: Log::Filter = [$name="incomplete-only", $path="foo-incomplete", $policy=my_policy]; Log::add_filter(Foo::LOG, filter);}

注意这种使用方法有个微妙之处:新过滤器不能使用 `Foo::log_policy` hook,这个hook不会在写到这个过滤器的时候被调用。任何通过 `Foo::log_policy` 的程序想要否决或附加处理都没用,因为新处理器中的hook不会被调用到。这种hook更换实际上很少用到,可能更好的做法是:当碰到需要用过滤器的问题,尽量在流的默认处理程序中处理。

hook Foo::log_policy(rec: Foo::Info, id: Log::ID, filter: Log::Filter){ if ( filter$name != "incomplete-only" ) return; # 注意:使用return和break,后续处理是不一样的 # Let's only log incomplete flows: if ( rec$missed_bytes == 0 ) break;}

如果你的工作需要 once per-write,而不是 once per-write-and-filter 模式,可以用 `Log::log_stream_policy` 替代:

hook Log::log_stream_policy(rec: Foo::Info, id: Log::ID){ # Called once per write}hook Foo::log_policy(rec: Foo::Info, id: Log::ID, filter: Log::Filter){ # Called once for each of Foo's filters.}

要修改一个现有的过滤器,请首先检索到它,然后修改并重建它:

“` sh

hook my_policy(rec: Foo::Info, id: Log::ID, filter: Log::Filter){ # Let's only log incomplete flows: if ( rec$missed_bytes == 0 ) break;}event zeek_init(){ local f = Log::get_filter(Foo::LOG, "default"); f$policy = my_policy; Log::add_filter(Foo::LOG, f);}

“`

策略挂钩(Policy hooks)除了用于筛选记录行,也能修改日志记录,但有一些很微妙的差异。记录框架将流的所有日志过滤器按先后顺序,应用于相同的日志记录。因此,在 hook handler 中对日志记录所做的修改不仅会持续到同一个hook中的后续 handler 中,还会持续到随后处理的任何过滤器中。与挂钩优先级相比,过滤器被调用的先后顺序无法控制。

2.5 日志的轮转和后续处理

对于日志文件何时、何种方式进行轮转,Zeek的日志框架中提供了很细致的功能支持。日志轮转(Log rotation)意味着 Zeek 能够定期按照用户的配置,重命名当前的活动日志文件(如 conn .log 重命名为 conn_21-01-03_14-05-00.log ),并在新的 conn.log 文件上重新开始记录日志。后处理(Post-processing)意味着 Zeek 还可以对轮转后形成的日志文件根据要求,进行一些额外处理(如压缩或上传)。这些机制当然很适合用于基于文件的日志写入器(ASCII Writer),但也能用在其他写入器,用其他方式写入日志后,定期对输出日志提供额外处理。

2.5.1 轮转计时

日志轮转间隔在全局常量 `Log::default_rotation_interval` 中定义并作用于所有的过滤器,间隔可以在过滤器中重定义。在特定的过滤器实例 `Log::Filter` 中可以给 `interv` 字段赋值。默认情况下,缺省值是 `0secs`,也就是禁止轮转。

当使用 ZeekControl 时,这个参数通过 ZeekControl 的配置自动设定。

下面的例子里面,我们仅仅修改了`Conn::LOG` 日志流默认过滤器的轮转参数:

“` sh

event zeek_init(){ local f = Log::get_filter(Conn::LOG, "default"); f$interv = 1 min; Log::add_filter(Conn::LOG, f);}

“`

2.5.2 控制文件命名

可以通过重定义 `Log::rotation_format_func` 来为轮转后的文件取名。日志框架在调用这个函数时能够有足够充分的上下文信息(通过`Log::RotationFmtInfo` 记录),函数将通过 `Log::RotationPath` 记录结构返回组成文件名的两个部分:目录、文件的基本名称,即没有后缀的名称。输出目录默认为 `Log::default_rotation_dir`(配置选项),输出的基本名称中可以加入时间戳,如 `Log::default_rotation_date_format` 参数所指定。

2.5.3 轮转文件的后处理

后处理,可以通过配置在所有日志缺省过滤器上工作,也能在每个过滤器上自定义。Zeek 提供了强壮的基础架构,能非常简单的对轮转日志运行 shell 命令,您也可以从零开始,写自己的后处理基础体系。

默认情况下,如果 `Log::default_rotation_postprocessor_cmd` 中配置了命令,每个轮转的日志上都会被执行上述命令。通过`Log::run_rotation_postprocessor_cmd` 函数对实际命令的包装,给配置好的 shell 命令传递了另外六个参数:

  • The rotated-to file name (e.g. `conn_21-01-03_14-05-00.log`)
  • The original base name (e.g. `conn`)
  • 原始日志文件创建的时间戳 (e.g. `21-01-03_14.04.00`)
  • 原始日志文件被轮转的时间戳 (e.g. `21-01-03_15.05.00`)
  • `1` if Zeek is terminating, `0` otherwise
  • The name of the writer (e.g. `ascii` for the ASCII writer)

注意:Zeek 会忽略 shell 命令返回值的错误(返回码不是 0),缺省情况下轮转后处理命令应该返回 T 值,以便忽略。当您使用自己编写的后处理器功能时,请务必小心:后处理器返回 F 值,将导致相应的日志写入器实例关闭(shutdown)。因此只有当写入器真的出现无法继续工作情况时,才能回传 F 值。

Zeek 后处理时可通过现成可用的 SCP 和 SFTP 进行文件传输,Zeek 还提供了一个外部工具 `zeek-archiver`,用于在 Zeek 进程的外部执行日志压缩,以获得稳健性。

2.6 其他特性

2.6.1 日志的延伸字段

日志框架支持在已定义的日志格式中添加附加的列字段,该列可以针对所有日志,也可以仅针对单个日志过滤器。函数 ` Log::default_ext_func` 用于所有日志中的记录回传处理,而过滤器中 `Log::Filter` 的 `ext_func` 函数可以被局部覆盖。

您可以为其中任何一个选项配置一个前缀字符串,此字符串可确保生成的字段不会与已有的日志字段冲突。前缀通过 `Log::default_ext_prefix` 被默认定义为一个“下划线_”,如果需要可以在过滤器中覆盖 `ext_prefix` 字段。

下面的例子取自 Zeek 测试用例,作用是给所有的日志中,额外增加三个字段:

“` sh

type Extension: record { write_ts: time &log; stream: string &log; system_name: string &log;};function add_extension(path: string): Extension{ return Extension ( $write_ts = network_time(), $stream = path, $system_name = peer_description);}redef Log::default_ext_func = add_extension;```

看一下 `conn.log` 中的结果:

“` sh

#fields _write_ts _stream _system_name ts uid …#types time string string time string …1071580905.346457 conn zeek 1071580904.891921 Cod6Wj3YeJFHgkaO8j …```

扩展字段与原始日志记录保持独立。它们对过滤器、策略挂钩和日志事件是不可见的。在过滤器处理确定要记录的日志条目后,框架只是简单的把扩展的成员放到要输出的字段列表中而已。

2.6.2 字段名映射

有时,我们需要方便的为出现在 Zeek 日志中的列字段重命名,一个典型的应用场景是这些列的命名能够满足日志集成系统的要求。为了实现这个目的,您可以提供名称转换映射(map),并且可以在全局或每个过滤器中提供转换。map 是一个简单的字符串表格,键是 Zeek 中的字段名,值是实际转换后的目标串。map 中未出现的字段名称不会被改变。这个全局作用的变量是 `Log::default_field_name_map` ,默认值为空,与之对应的过滤器中的局部变量成员是 `field_name_map` 。

下面的例子演示了如何在所有日志中,统一把连接ID中的“点”,替换成下划线:

“` sh

redef Log::default_field_name_map = { ["id.orig_h"] = "id_orig_h", ["id.orig_p"] = "id_orig_p", ["id.resp_h"] = "id_resp_h", ["id.resp_p"] = "id_resp_p"};```

如果只是想在一个日志过滤器中改变字段名,那就直接在过滤器中修改。下面的代码只影响 `conn.log` 一个日志:

“` sh

event zeek_init() { local f = Log::get_filter(Conn::LOG, "default"); f$field_name_map = table( ["id.orig_h"] = "id_orig_h", ["id.orig_p"] = "id_orig_p", ["id.resp_h"] = "id_resp_h", ["id.resp_p"] = "id_resp_p"); Log::add_filter(Conn::LOG, f); }```

2.6.3 向日志中打印

Zeek 中的 print 语句,正常情况下将输出到 `stdout` 或指定的输出文件中。通过调整 `Log::print_to_log` 的枚举值,您可以将此类语句输出重定向到 Zeek 日志中。可选的值包括:

  • Log::REDIRECT_NONE: 默认值,不涉及到 Zeek 日志;
  • Log::REDIRECT_STDOUT: 把正常的标准输出写到日志中;
  • Log::REDIRECT_ALL: 把标准输出或其他文件全部用日志替代。

`Log::print_log_path` 用于定义日志文件名,`Log::PrintLogInfo` 声明列,`Log::log_print` 事件允许您通过事件句柄处理要记录的信息。

2.6.4 本地日志和远程日志

Zeek 在日志处理过程中,需要处理日志条目到达 Zeek 节点后,究竟是在本地进行日志记录,还是远程在其他节点上记录的问题。单节点 Zeek 默认设置为本地记录日志,而集群设置允许在记录器节点上进行本地记录,并远程记录其他节点生成的日志。通常我们不需要关心这些设置,但是需要时也可以使用 `redef` 顺序调整 `Log::enable_local_logging` 和`Log::enable_remote_logging`两个 bool 值。

2.7 写入器 Writers

每个过滤器有一个写入器。如果您在流中添加了过滤器时没有指定写入器,系统会将 ` ASCII writer` 作为默认写入器。

有两种方式指定非默认写入器。要为所有的日志过滤器改变默认写入器,可以重定义 `Log::default_writer` 选项。或者,也可以为每个过滤器单独设置 `writer` 字段。您可以查看写入器的相关文档,了解更多选项。

目前 Zeek 系统只提供三种写入器,分别是:Zeek_SQLiteWriter、Zeek_AsciiWriter、Zeek_NoneWriter 。

2.7.1 ASCII Writer

默认情况下,ASCII 写入器在日志文件中先输出几行元数据,然后是实际的日志输出。元数据描述了日志文件的格式、日志的路径(例如:日志文件名,不包含扩展名)、日志创建的时间,还有日志最后写入的时间。ASCII 编写器具有许多自定义输出格式的选项,请参阅 `

base/frameworks/logging/writers/ascii.zeek`。如果您更改了输出格式选项,则请小心检查后处理脚本是否仍能识别您的日志文件。

某些写入器选项是全局的,它们会影响使用该日志写入器的所有日志过滤器。例如,要将所有 ASCII 日志格式更改为 JSON 格式:

redef LogAscii::use_json = T;

一个类似的全局参数是 `logdir`,可以指定一个慕课用于存放输出文件。

redef LogAscii::logdir = output_directory;

两者都可以从命令行单独使用,也可以与其他脚本一起使用:

zeek -r ../test-capture.cap LogAscii::use_json=Tmkdir output_directory ; zeek -r ../test-capture.cap LogAscii::logdir=output_directory

有些写入器参数是过滤器指定的,也就是说它们只影响明确指定选项的那些过滤器。举例来说,下面对输出格式的改变,只影响 `conn.log`:

event zeek_init(){ local f = Log::get_filter(Conn::LOG, "default"); # Use tab-separated-value mode f$config = table(["tsv"] = "T"); Log::add_filter(Conn::LOG, f);}

2.7.2 SQLite Writer

SQLite 是一个简单、基于文件的,广泛使用的 SQL 数据库系统。使用 SQLite 让 Zeek 以更加友好易用的方式与其他应用程序交换和访问数据。由于 SQLite 的事务处理的特性,多个应用程序可以同时使用数据库。Zeek 的输入框架也支持 SQLite 读取器。

> Zeek 框架支持的读取器 Reader,包括:AsciiReader、BenchmarkReader、BinaryReader、ConfigReader、RawReader、SQLiteReader 六种。

所有 Zeek 安装均提供 SQLite 支持,使用 SQLite 无需加载任何附加脚本或做任何编译阶段的配置。将现有日志流数据发送到 SQLite 是相当简单的事情。最有可能的是,您只希望只在选定的几个日志过滤器中把日志写入 SQLite ,因此您必须配置一个使用 SQLite 的写入器。下面的例子代码,演示了如何在连接日志的过滤器中添加 SQLite 支持:

“` sh

event zeek_init(){ local filter: Log::Filter = [ $name="sqlite", $path="/var/db/conn", $config=table(["tablename"] = "conn"), $writer=Log::WRITER_SQLITE ]; Log::add_filter(Conn::LOG, filter);}

“`

Zeek 如果发现数据库文件 ` /var/db/conn.sqlite` 尚不存在,就自动创建,也会当 conn 表不存在时自动创建它,并且开始向表中追加连接信息数据。

Zeek 目前不支持在 SQLite 数据库中轮转,目前轮转特性只适用于 ASCII 写入器。因此,你必须小心在适当的、有足够的存储空间的位置创建数据库表。

如果您动手查看生成的 SQLite 数据库,会发现表的范式(schema)和 ASCII 日志文件中的字段一样:

“` sh

$ sqlite3 /var/db/conn.sqliteSQLite version 3.8.0.2 2013-09-03 17:11:13Enter ".help" for instructionsEnter SQL statements terminated with a ";"sqlite> .schemaCREATE TABLE conn ('ts' double precision,'uid' text,'id.orig_h' text,'id.orig_p' integer,...

“`

请注意,使用上述代码,ASCII 日志 conn.log 仍将创建,因为脚本中只是在默认的 ASCII 写入器旁边添加了新的的日志过滤器。为了防止这种情况,您可以删除默认过滤器:

“` sh

Log::remove_filter(Conn::LOG, "default");

“`

要创建自定义 SQLite 日志文件,您必须创建一个新的日志流,其中仅包含您要提交到数据库的信息。有关如何创建自定义日志流,请参阅上述文档。

2.7.3 None Writer

“空写入器”,可以通过 `Log::WRITER_NONE` 设定,这个写入器很大程度上用于研发和故障排错的助手。空写入器会丢弃掉它收到的所有日志条目,但行为看上去就像框架提供的其它写入器一样。例如,模仿一下日志轮转。如果您通过将 LogNone::debug 设置为 T 来启用其调试模式,Zeek 会在 `stdout` 上报告有关写入器活动的细节。

=== 未完待续 ===

翻译:张建兵

单位:内蒙古巨鹏软件有限公司

《巨鹏软件》是一家以 “IT运维软件研发、时序数据可视化、时序数据可视化研发、网络流量分析、数据库审计”等技术开发为核心的软件产品研发企业。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

(0)
上一篇 2024年8月7日 上午8:19
下一篇 2024年8月7日 上午10:47

相关推荐