Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ip2location.io module. #1815

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ IBAN Number Extractor|Identify International Bank Account Numbers (IBANs) in any
[IntelligenceX](https://intelx.io/)|Obtain information from IntelligenceX about identified IP addresses, domains, e-mail addresses and phone numbers.|Tiered API
Interesting File Finder|Identifies potential files of interest, e.g. office documents, zip files.|Internal
[Internet Storm Center](https://isc.sans.edu)|Check if an IP address is malicious according to SANS ISC.|Free API
[ip2location.io](https://www.ip2location.io/)|Queries ip2location.io to identify geolocation of IP Addresses using ip2location.io API|Tiered API
[ipapi.co](https://ipapi.co/)|Queries ipapi.co to identify geolocation of IP Addresses using ipapi.co API|Tiered API
[ipapi.com](https://ipapi.com/)|Queries ipapi.com to identify geolocation of IP Addresses using ipapi.com API|Tiered API
[IPInfo.io](https://ipinfo.io)|Identifies the physical location of IP addresses identified using ipinfo.io.|Tiered API
Expand Down
137 changes: 137 additions & 0 deletions modules/sfp_ip2locationio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
# -------------------------------------------------------------------------------
# Name: sfp_ip2locationio
# Purpose: SpiderFoot plug-in to identify the Geo-location of IP addresses
# identified by other modules using ip2location.io
#
# Author: IP2Location <[email protected]>
#
# Created: 25/10/2023
# Copyright: (c) Steve Micallef
# Licence: GPL
# -------------------------------------------------------------------------------

import json
import time

from spiderfoot import SpiderFootEvent, SpiderFootPlugin


class sfp_ip2locationio(SpiderFootPlugin):

meta = {
'name': "ip2location.io",
'summary': "Queries ip2location.io to identify geolocation of IP Addresses using ip2location.io API",
'flags': ["apikey"],
'useCases': ["Footprint", "Investigate", "Passive"],
'categories': ["Real World"],
'dataSource': {
'website': "https://www.ip2location.io/",
'model': "FREE_AUTH_LIMITED",
'references': [
"https://www.ip2location.io/ip2location-documentation"
],
'apiKeyInstructions': [
"Visit https://www.ip2location.io/",
"Register a free account",
"Login from https://www.ip2location.io/log-in and go to your dashboard",
"Your API Key will be listed under API Key section.",
],
'favIcon': "https://www.ip2location.io/favicon.ico",
'logo': "https://cdn.ip2location.io/assets/img/icons/apple-touch-icon.png",
'description': "IP2Location.io provides a fast and accurate IP Geolocation API tool "
"to determine a user's location and use the geolocation information in different use cases. "
}
}

# Default options
opts = {
'api_key': '',
}

# Option descriptions
optdescs = {
'api_key': "ip2location.io API Key.",
}

results = None

def setup(self, sfc, userOpts=dict()):
self.sf = sfc
self.results = self.tempStorage()

for opt in list(userOpts.keys()):
self.opts[opt] = userOpts[opt]

# What events is this module interested in for input
def watchedEvents(self):
return [
"IP_ADDRESS",
"IPV6_ADDRESS"
]

# What events this module produces
# This is to support the end user in selecting modules based on events
# produced.
def producedEvents(self):
return [
"GEOINFO",
"RAW_RIR_DATA"
]

def query(self, qry):
queryString = f"https://api.ip2location.io/?key={self.opts['api_key']}&ip={qry}"

res = self.sf.fetchUrl(queryString,
timeout=self.opts['_fetchtimeout'],
useragent=self.opts['_useragent'])
time.sleep(1.5)

if ('error' in res):
self.info(f"No ip2locationio data found for {qry}")
return None
try:
return json.loads(res['content'])
except Exception as e:
self.debug(f"Error processing JSON response: {e}")
return None

# Handle events sent to this module
def handleEvent(self, event):
eventName = event.eventType
srcModuleName = event.module
eventData = event.data

self.debug(f"Received event, {eventName}, from {srcModuleName}")

if self.errorState:
return

if self.opts['api_key'] == "":
self.error("You enabled sfp_ip2locationio but did not set an API key!")
self.errorState = True
return

if eventData in self.results:
self.debug(f"Skipping {eventData}, already checked.")
return

self.results[eventData] = True

data = self.query(eventData)
if not data:
return

if data.get('country_name'):
location = ', '.join(filter(None, [data.get('city_name'), data.get('region_name'), data.get('country_name'), data.get('country_code')]))
evt = SpiderFootEvent('GEOINFO', location, self.__name__, event)
self.notifyListeners(evt)

if data.get('latitude') and data.get('longitude'):
evt = SpiderFootEvent("PHYSICAL_COORDINATES", f"{data.get('latitude')}, {data.get('longitude')}", self.__name__, event)
self.notifyListeners(evt)

evt = SpiderFootEvent('RAW_RIR_DATA', str(data), self.__name__, event)
self.notifyListeners(evt)

# End of sfp_ip2locationio class
60 changes: 60 additions & 0 deletions test/unit/modules/test_sfp_ip2locationio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# test_sfp_template.py
import pytest
import unittest

from modules.sfp_ip2locationio import sfp_ip2locationio
from sflib import SpiderFoot
from spiderfoot import SpiderFootEvent, SpiderFootTarget


@pytest.mark.usefixtures
class TestModuletemplate(unittest.TestCase):
"""
Test modules.sfp_ip2locationio
"""

def test_opts(self):
module = sfp_ip2locationio()
self.assertEqual(len(module.opts), len(module.optdescs))

def test_setup(self):
"""
Test setup(self, sfc, userOpts=dict())
"""
sf = SpiderFoot(self.default_options)

module = sfp_ip2locationio()
module.setup(sf, dict())

def test_watchedEvents_should_return_list(self):
module = sfp_ip2locationio()
self.assertIsInstance(module.watchedEvents(), list)

def test_producedEvents_should_return_list(self):
module = sfp_ip2locationio()
self.assertIsInstance(module.producedEvents(), list)

def test_handleEvent_no_api_key_should_set_errorState(self):
"""
Test handleEvent(self, event)
"""
sf = SpiderFoot(self.default_options)

module = sfp_ip2locationio()
module.setup(sf, dict())

target_value = 'example target value'
target_type = 'IP_ADDRESS'
target = SpiderFootTarget(target_value, target_type)
module.setTarget(target)

event_type = 'ROOT'
event_data = 'example data'
event_module = ''
source_event = ''
evt = SpiderFootEvent(event_type, event_data, event_module, source_event)

result = module.handleEvent(evt)

self.assertIsNone(result)
self.assertTrue(module.errorState)