forked from cloudflare/bpftools
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfilter
More file actions
executable file
·131 lines (102 loc) · 3.09 KB
/
filter
File metadata and controls
executable file
·131 lines (102 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
from __future__ import print_function
from builtins import str
from builtins import map
from builtins import object
import getopt
import itertools
import os
import pcappy
import pcappy.types
import sys
import bpftools.utils
def usage():
print(
r"""
filter [ OPTIONS ] [ pcap file... ]
Read pcap data from stdin or given files, run it through a BPF filter
and write matching packets to stdout as pcap.
Options are:
-h, --help print this message
-b, --bytecode filter with given BPF bytecode
-e, --expr fitler with given BPF expression
-c, --compile FILE compile given bpf file and use as filter
For example to select only IP packets you can use:
filter -e "ip"
filter -b "4,40 0 0 12,21 0 1 2048,6 0 0 65535,6 0 0 0,"
Where the bytecode might have been generated by tcpdump:
sudo tcpdump -p -n -ddd -i eth0 "ip" |tr "\n" ","
""".lstrip()
)
sys.exit(2)
def bpf_from_bytecode(bytecode):
instructions = bytecode.strip(",").split(",")[1:]
class X(object):
pass
bpf = X()
bpf._bpf = pcappy.types.bpf_program()
bpf._bpf.bf_len = len(instructions)
bpf._bpf.bf_insns = (pcappy.types.bpf_insn * (len(instructions) + 10))()
for i, ins_txt in enumerate(instructions):
r = bpf._bpf.bf_insns[i]
r.code, r.jt, r.jf, r.k = list(
map(int, itertools.chain(ins_txt.split(), (0, 0, 0)))
)[:4]
return bpf
def bpf_from_expr(expr):
return pcappy.PcapPyBpfProgram(
expr, 0, "0.0.0.0", linktype=pcappy.LINKTYPE_ETHERNET, snaplen=65536
)
def main():
bpf = None
try:
opts, args = getopt.getopt(
sys.argv[1:], "he:b:c:", ["help", "expr=", "bytecode=", "compile="]
)
except getopt.GetoptError as err:
print(str(err))
usage()
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-e", "--expr"):
bpf = bpf_from_expr(a)
elif o in ("-b", "--bytecode"):
bpf = bpf_from_bytecode(a)
elif o in ("-c", "--compile"):
x = open(a, "rb") # check if can open
bpf = bpf_from_bytecode(bpftools.utils.bpf_compile(x.read()))
else:
assert False, "unhandled option"
if not args:
readfds = [sys.stdin]
else:
readfds = [open(fname, "rb") for fname in args]
dump = pcappy.PcapPyDead(snaplen=65536).dump_open(sys.stdout)
for fd in readfds:
p = pcappy.open_offline(fd)
if bpf:
p.filter = bpf
while True:
try:
r = p.next_ex()
except pcappy.PcapPyException:
break
if r is None:
break
hdr, data = r
dump.write(hdr, data)
sys.stdout.flush()
dump.flush()
# normal exit crashes due to a double free error in pcappy
os._exit(0)
if __name__ == "__main__":
try:
main()
except IOError as e:
if e.errno == 32:
os._exit(-1)
raise
except KeyboardInterrupt:
os._exit(-1)