Skip to content

Commit

Permalink
Fix time generation in tests
Browse files Browse the repository at this point in the history
* Perform all time operations from within each test; issue surfaced when tests were run in parallel
  • Loading branch information
Mark Bonsack committed Feb 21, 2020
1 parent 175855a commit 146c12b
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 84 deletions.
58 changes: 28 additions & 30 deletions tests/test_cisco_acs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,42 @@
def insert_char(string, char, integer):
return string[0:integer] + char + string[integer:]

# Generate current datetime structure
dt = datetime.datetime.now()
def time_operations(dt):
# Generate an ISO 8601 (RFC 5424) compliant timestamp with local timezone offset (2020-02-12T12:46:39.323-08:00)
# See https://stackoverflow.com/questions/2150739/iso-time-iso-8601-in-python
iso = dt.astimezone().isoformat(sep='T', timespec='milliseconds')
# Generate an BSD-style (RFC 3164) compliant timestamp with no timezone (Oct 25 13:08:00)
bsd = dt.strftime("%b %d %H:%M:%S")

# Generate an ISO 8601 (RFC 5424) compliant timestamp with local timezone offset (2020-02-12T12:46:39.323-08:00)
# See https://stackoverflow.com/questions/2150739/iso-time-iso-8601-in-python
iso = dt.astimezone().isoformat(sep='T', timespec='milliseconds')
# Other variants of timestamps needed for this log sample
time = dt.strftime("%H:%M:%S.%f")[:-3]
date = dt.strftime("%Y-%m-%d")
# Insert colon in tzoffset string; normally just 'tzoffset = dt.astimezone().strftime("%z")'
# Could use helper function above; e.g. 'tzoffset = insert_char(dt.astimezone().strftime("%z"), ":", 3)'
tzoffset = dt.astimezone().strftime("%z")[0:3] + ":" + dt.astimezone().strftime("%z")[3:]

# Generate an BSD-style (RFC 3164) compliant timestamp with no timezone (Oct 25 13:08:00)
bsd = dt.strftime("%b %d %H:%M:%S")
# Derive epoch timestamp for use in search string
# NOTE: There are caveats with 'strftime("%s")', see references below

# Other variants of timestamps needed for this log sample
time = dt.strftime("%H:%M:%S.%f")[:-3]
date = dt.strftime("%Y-%m-%d")
# Insert colon in tzoffset string; normally just 'tzoffset = dt.astimezone().strftime("%z")'
# Could use helper function above; e.g. 'tzoffset = insert_char(dt.astimezone().strftime("%z"), ":", 3)'
tzoffset = dt.astimezone().strftime("%z")[0:3] + ":" + dt.astimezone().strftime("%z")[3:]
# See https://stackoverflow.com/questions/11743019/convert-python-datetime-to-epoch-with-strftime
# See https://docs.python.org/3/library/datetime.html#datetime-objects
# Basically: Don't use 'utcnow()'

# Derive epoch timestamp for use in search string
# NOTE: There are caveats with 'strftime("%s")', see references below
# Strict way to get epoch as a string (rather than float) avoiding naive objects
# epoch = dt.fromtimestamp(dt.timestamp()).strftime('%s')

# See https://stackoverflow.com/questions/11743019/convert-python-datetime-to-epoch-with-strftime
# See https://docs.python.org/3/library/datetime.html#datetime-objects
# Basically: Don't use 'utcnow()'
# Since datetime.now().astimezone() is aware, strftime() should be safe and form below OK
# Trim last 3 digits of microsecond resolution to obtain milliseconds
epoch = dt.astimezone().strftime("%s.%f")[:-3]

# Strict way to get epoch as a string (rather than float) avoiding naive objects
# epoch = dt.fromtimestamp(dt.timestamp()).strftime('%s')

# Since datetime.now().astimezone() is aware, strftime() should be safe and form below OK
# Trim last 3 digits of microsecond resolution to obtain milliseconds
epoch = dt.astimezone().strftime("%s.%f")[:-3]
return iso, bsd, time, date, tzoffset, epoch

def test_cisco_acs_single(record_property, setup_wordlist, setup_splunk, setup_sc4s):
host = "{}-{}".format(random.choice(setup_wordlist), random.choice(setup_wordlist))

dt = datetime.datetime.now()
iso, bsd, time, date, tzoffset, epoch = time_operations(dt)

mt = env.from_string(
"{{ mark }} {{ bsd }} {{ host }} CSCOacs_Passed_Authentications 0765855540 1 0 {{ date }} {{ time }} {{ tzoffset }} 0178632943 5202 NOTICE Device-Administration: Command Authorization succeeded, ACSVersion=acs-5.8.1.4-B.462.x86_64, ConfigVersionId=16489, Device IP Address=10.0.0.93, DestinationIPAddress=10.0.0.10, DestinationPort=49, UserName=nsdevman, CmdSet=[ CmdAV=show CmdArgAV=vpn-sessiondb CmdArgAV=full CmdArgAV=ra-ikev2-ipsec ], Protocol=Tacacs, MatchedCommandSet=fw3, RequestLatency=11, Type=Authorization, Privilege-Level=15, Authen-Type=ASCII, Service=None, User=nsdevman, Port=443, Remote-Address=10.0.0.15, Authen-Method=TacacsPlus, Service-Argument=shell, AcsSessionID=mnsvdcfpiuac03/359448835/9871764, AuthenticationIdentityStore=AD1, AuthenticationMethod=Lookup, SelectedAccessService=Default Device Admin, SelectedCommandSet=fw3, IdentityGroup=IdentityGroup:All Groups:SystemID, Step=13005 , Step=15008 , Step=15004 , Step=15012 , Step=15041 , Step=15004 , Step=15013 , Step=24210 , Step=24212 , Step=24432 , Step=24325 , Step=24313 , Step=24319 , Step=24323 , Step=24420 , Step=24355 , Step=24416 , Step=22037 , Step=15044 , Step=15035 , Step=15042 , Step=15036 , Step=15004 , Step=15018 , Step=13024 , Step=13034 , SelectedAuthenticationIdentityStores=Internal Users, NetworkDeviceName=devicenamehere, NetworkDeviceGroups=Device Type:All Device Types:Firewall:Cisco Systems:Firewall:ASA5545, NetworkDeviceGroups=Location:All Locations:MN, ServiceSelectionMatchedRule=TACACS, IdentityPolicyMatchedRule=Firewall, AuthorizationPolicyMatchedRule=nsdevman, AD-User-Candidate-Identities=nsdevman@ent.example.corp, AD-User-DNS-Domain=ent.example.corp, AD-User-NetBios-Name=AD-ENT, AD-User-Resolved-Identities=nsdevman@ent.example.corp, AD-User-Join-Point=ENT.example.CORP, AD-User-Resolved-DNs=CN=nsdevman\,OU=Service Accounts\,OU=CAO\,OU=ENT\,DC=ent\,DC=wfb\,DC=example\,DC=corp, StepData=10=nsdevman, StepData=11=ent.example.corp, StepData=12=example.corp, StepData=15=ent.example.corp, AD-Domain=ent.example.corp, IdentityAccessRestricted=false, UserIdentityGroup=IdentityGroup:All Groups:SystemID, Cisco-Firewall=Superuser, Firewall=Superuser, NetSec-CSM=User, NetSec-Logging=Engineer, Response={Type=Authorization; Author-Reply-Status=PassAdd; ExternalIdentityStoreName=AD1; }\n")
message = mt.render(mark="<165>", bsd=bsd, host=host, date=date, time=time, tzoffset=tzoffset)
Expand All @@ -72,12 +74,8 @@ def test_cisco_acs_single(record_property, setup_wordlist, setup_splunk, setup_s
def test_cisco_acs_multi(record_property, setup_wordlist, setup_splunk, setup_sc4s):
host = "{}-{}".format(random.choice(setup_wordlist), random.choice(setup_wordlist))

# Generate an ISO 8601 compliant timestamp with local timezone offset (2020-02-12 12:46:39.323-08:00)
dt = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).astimezone().isoformat(sep=' ', timespec='milliseconds')

# Function to insert a space between the time and TZ offset (2020-02-12 12:46:39.323 -08:00)
def insert_space(string, integer):
return string[0:integer] + ' ' + string[integer:]
dt = datetime.datetime.now()
iso, bsd, time, date, tzoffset, epoch = time_operations(dt)

mt = env.from_string(
"{{ mark }} {{ bsd }} {{ host }} CSCOacs_Passed_Authentications 0000000002 2 0 {{ date }} {{ time }} {{ tzoffset }} 0000008450 5203 NOTICE Device-Administration: Session Authorization succeeded, ACSVersion=acs-5.2.0.26-B.3075, ConfigVersionId=117, Device IP Address=192.168.26.137, UserName=edward, CmdSet=[ CmdAV= ], Protocol=Tacacs, RequestLatency=10, NetworkDeviceName=switch, Type=Authorization, Privilege-Level=1, Authen-Type=ASCII, Service=Login, User=edward, Port=tty2, Remote-Address=10.78.167.190, Authen-Method=TacacsPlus, Service-Argument=shell, AcsSessionID=ACS41/101085887/112, AuthenticationIdentityStore=Internal Users, AuthenticationMethod=Lookup, SelectedAccessService=Default Device Admin, SelectedShellProfile=Permit Access, IdentityGroup=IdentityGroup:All Groups, Step=13005 , Step=15008 , Step=15004 , Step=15012 , Step=15041 , Step=15006 , Step=15013 , Step=24210 , Step=24212 , Step=22037 , Step=15044 , Step=15035 , Step=15042 , Step=15036 , Step=15004 , Step=15017 , Step=13034 ,\n")
Expand Down
60 changes: 34 additions & 26 deletions tests/test_fortinet_ngfw.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,44 @@ def insert_char(string, char, integer):
def removeZero(tz):
return re.sub(r'\b0+(\d)(?=:)', r'\1', tz)

# Generate current datetime structure
dt = datetime.datetime.now()
def time_operations(dt):
# Generate an ISO 8601 (RFC 5424) compliant timestamp with local timezone offset (2020-02-12T12:46:39.323-08:00)
# See https://stackoverflow.com/questions/2150739/iso-time-iso-8601-in-python
iso = dt.astimezone().isoformat(sep='T', timespec='milliseconds')
# Generate an BSD-style (RFC 3164) compliant timestamp with no timezone (Oct 25 13:08:00)
bsd = dt.strftime("%b %d %H:%M:%S")

# Generate an ISO 8601 (RFC 5424) compliant timestamp with local timezone offset (2020-02-12T12:46:39.323-08:00)
# See https://stackoverflow.com/questions/2150739/iso-time-iso-8601-in-python
iso = dt.astimezone().isoformat(sep='T', timespec='milliseconds')
# Generate an BSD-style (RFC 3164) compliant timestamp with no timezone (Oct 25 13:08:00)
bsd = dt.strftime("%b %d %H:%M:%S")
# Other variants of timestamps needed for this log sample
time = dt.strftime("%H:%M:%S")
date = dt.strftime("%Y-%m-%d")
# Insert colon in tzoffset string; normally just 'tzoffset = dt.astimezone().strftime("%z")'
# Could use helper function above; e.g. 'tzoffset = insert_char(dt.astimezone().strftime("%z"), ":", 3)'
tzoffset = dt.astimezone().strftime("%z")

# Other variants of timestamps needed for this log sample
time = dt.strftime("%H:%M:%S")
date = dt.strftime("%Y-%m-%d")
# Insert colon in tzoffset string; normally just 'tzoffset = dt.astimezone().strftime("%z")'
# Could use helper function above; e.g. 'tzoffset = insert_char(dt.astimezone().strftime("%z"), ":", 3)'
# tzoffset = dt.astimezone().strftime("%z")[0:3] + ":" + dt.astimezone().strftime("%z")[3:]
tzoffset = dt.astimezone().strftime("%z")
# Derive epoch timestamp for use in search string
# NOTE: There are caveats with 'strftime("%s")', see references below

# Derive epoch timestamp for use in search string
# NOTE: There are caveats with 'strftime("%s")', see references below
# See https://stackoverflow.com/questions/11743019/convert-python-datetime-to-epoch-with-strftime
# See https://docs.python.org/3/library/datetime.html#datetime-objects
# Basically: Don't use 'utcnow()'

# See https://stackoverflow.com/questions/11743019/convert-python-datetime-to-epoch-with-strftime
# See https://docs.python.org/3/library/datetime.html#datetime-objects
# Basically: Don't use 'utcnow()'
# Strict way to get epoch as a string (rather than float) avoiding naive objects
# epoch = dt.fromtimestamp(dt.timestamp()).strftime('%s')

# Strict way to get epoch as a string (rather than float) avoiding naive objects
# epoch = dt.fromtimestamp(dt.timestamp()).strftime('%s')
# Since datetime.now().astimezone() is aware, strftime() should be safe and form below OK
# Trim last 3 digits of microsecond resolution to obtain milliseconds
epoch = dt.astimezone().strftime("%s.%f")[:-7]

# Since datetime.now().astimezone() is aware, strftime() should be safe and form below OK
# Trim last 3 or 7 digits of microsecond resolution to obtain milliseconds or whole seconds respectively
epoch = dt.astimezone().strftime("%s.%f")[:-7]
return iso, bsd, time, date, tzoffset, epoch

# <111> Aug 17 00:00:00 fortigate date=2015-08-11 time=19:19:43 devname=Nosey devid=FG800C3912801080 logid=0004000017 type=traffic subtype=sniffer level=notice vd=root srcip=fe80::20c:29ff:fe77:20d4 srcintf="port3" dstip=ff02::1:ff77:20d4 dstintf="port3" sessionid=408903 proto=58 action=accept policyid=2 dstcountry="Reserved" srccountry="Reserved" trandisp=snat transip=:: transport=0 service="icmp6/131/0" duration=36 sentbyte=0 rcvdbyte=40 sentpkt=0 rcvdpkt=0 appid=16321 app="IPv6.ICMP" appcat="Network.Service" apprisk=elevated applist="sniffer-profile" appact=detected utmaction=allow countapp=1
def test_fortinet_fgt_event(record_property, setup_wordlist, setup_splunk, setup_sc4s):
host = "{}-{}".format(random.choice(setup_wordlist),
random.choice(setup_wordlist))

dt = datetime.datetime.now()
iso, bsd, time, date, tzoffset, epoch = time_operations(dt)

mt = env.from_string(
# "{{ mark }} {{ bsd }} fortigate date={{ date }} time={{ time }} devname={{ host }} devid=FGT60D4614044725 logid=0100040704 type=event subtype=system level=notice tz=\"{{ tzoffset }}\" vd=root logdesc=\"System performance statistics\" action=\"perf-stats\" cpu=2 mem=35 totalsession=61 disk=2 bandwidth=158/138 setuprate=2 disklograte=0 fazlograte=0 msg=\"Performance statistics: average CPU: 2, memory: 35, concurrent sessions: 61, setup-rate: 2\"\n")
"{{ mark }} {{ bsd }} fortigate date={{ date }} time={{ time }} devname={{ host }} devid=FGT60D4614044725 logid=0100040704 type=event subtype=system level=notice vd=root logdesc=\"System performance statistics\" action=\"perf-stats\" cpu=2 mem=35 totalsession=61 disk=2 bandwidth=158/138 setuprate=2 disklograte=0 fazlograte=0 msg=\"Performance statistics: average CPU: 2, memory: 35, concurrent sessions: 61, setup-rate: 2\"\n")
Expand All @@ -67,8 +69,7 @@ def test_fortinet_fgt_event(record_property, setup_wordlist, setup_splunk, setup
sendsingle(message, setup_sc4s[0], setup_sc4s[1][514])

st = env.from_string(
# "search _time={{ epoch }} index=netops host=\"{{ host }}\" sourcetype=\"fgt_event\"")
"search index=netops host=\"{{ host }}\" sourcetype=\"fgt_event\"")
"search _time={{ epoch }} index=netops host=\"{{ host }}\" sourcetype=\"fgt_event\"")
search = st.render(host=host, epoch=epoch)

resultCount, eventCount = splunk_single(setup_splunk, search)
Expand All @@ -84,6 +85,9 @@ def test_fortinet_fgt_traffic(record_property, setup_wordlist, setup_splunk, set
host = "{}-{}".format(random.choice(setup_wordlist),
random.choice(setup_wordlist))

dt = datetime.datetime.now()
iso, bsd, time, date, tzoffset, epoch = time_operations(dt)

mt = env.from_string(
"{{ mark }} {{ bsd }} fortigate date={{ date }} time={{ time }} devname={{ host }} devid=FG800C3912801080 logid=0004000017 type=traffic subtype=sniffer level=notice vd=root srcip=fe80::20c:29ff:fe77:20d4 srcintf=\"port3\" dstip=ff02::1:ff77:20d4 dstintf=\"port3\" sessionid=408903 proto=58 action=accept policyid=2 dstcountry=\"Reserved\" srccountry=\"Reserved\" trandisp=snat transip=:: transport=0 service=\"icmp6/131/0\" duration=36 sentbyte=0 rcvdbyte=40 sentpkt=0 rcvdpkt=0 appid=16321 app=\"IPv6.ICMP\" appcat=\"Network.Service\" apprisk=elevated applist=\"sniffer-profile\" appact=detected utmaction=allow countapp=1\n")
message = mt.render(mark="<111>", bsd=bsd, date=date, time=time, host=host)
Expand All @@ -106,6 +110,9 @@ def test_fortinet_fgt_utm(record_property, setup_wordlist, setup_splunk, setup_s
host = "{}-{}".format(random.choice(setup_wordlist),
random.choice(setup_wordlist))

dt = datetime.datetime.now()
iso, bsd, time, date, tzoffset, epoch = time_operations(dt)

mt = env.from_string(
"{{ mark }} {{ bsd }} fortigate date={{ date }} time={{ time }} devname={{ host }} devid=FGT37D4613800138 logid=0317013312 type=utm subtype=webfilter eventtype=ftgd_allow level=notice vd=root sessionid=1490845588 user=\"\" srcip=172.30.16.119 srcport=53235 srcintf=\"Internal\" dstip=114.112.67.75 dstport=80 dstintf=\"External-SDC\" proto=6 service=HTTP hostname=\"popo.wan.ijinshan.com\" profile=\"scan\" action=passthrough reqtype=direct url=\"/popo/launch?c=cHA9d29vZHMxOTgyQGhvdG1haWwuY29tJnV1aWQ9NDBiNDkyZDRmNzdhNjFmOTNlMjQwMjhiYjE3ZGRlYTYmY29tcGl\" sentbyte=525 rcvdbyte=325 direction=outgoing msg=\"URL belongs to an allowed category in policy\" method=domain cat=52 catdesc=\"Information Technology\"\n")
message = mt.render(mark="<111>", bsd=bsd, date=date, time=time, host=host)
Expand All @@ -122,3 +129,4 @@ def test_fortinet_fgt_utm(record_property, setup_wordlist, setup_splunk, setup_s
record_property("message", message)

assert resultCount == 1

Loading

0 comments on commit 146c12b

Please sign in to comment.