API / main.py
GitHub deploy: aa2653d04823285173801fcd37db6a5d4712cab2
import ipaddress
import maxminddb
from fastapi import FastAPI, Request
import json
import datetime
import logging
import sys
import os
from logging.handlers import RotatingFileHandler
LOG_FILE = os.path.join('/code', 'ip_query.log')
formatter = logging.Formatter('%(message)s')
log_handler = RotatingFileHandler(
logger = logging.getLogger('ip_query')
startup_log = {
"时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"事件": "系统启动",
"状态": "成功"
logger.info(json.dumps(startup_log, ensure_ascii=False))
except Exception as e:
print(f"日志初始化失败: {e}")
city_reader = maxminddb.open_database('GeoLite2-City.mmdb')
asn_reader = maxminddb.open_database('GeoLite2-ASN.mmdb')
cn_reader = maxminddb.open_database('GeoCN.mmdb')
lang = ["zh-CN", "en"]
asn_map = {
9812: "东方有线",
9389: "中国长城",
17962: "天威视讯",
17429: "歌华有线",
7497: "科技网",
24139: "华数",
9801: "中关村",
4538: "教育网",
24151: "CNNIC",
38019: "中国移动", 139080: "中国移动", 9808: "中国移动", 24400: "中国移动", 134810: "中国移动", 24547: "中国移动",
56040: "中国移动", 56041: "中国移动", 56042: "中国移动", 56044: "中国移动", 132525: "中国移动", 56046: "中国移动",
56047: "中国移动", 56048: "中国移动", 59257: "中国移动", 24444: "中国移动",
24445: "中国移动", 137872: "中国移动", 9231: "中国移动", 58453: "中国移动",
4134: "中国电信", 4812: "中国电信", 23724: "中国电信", 136188: "中国电信", 137693: "中国电信", 17638: "中国电信",
140553: "中国电信", 4847: "中国电信", 140061: "中国电信", 136195: "中国电信", 17799: "中国电信", 139018: "中国电信",
134764: "中国电信", 4837: "中国联通", 4808: "中国联通", 134542: "中国联通", 134543: "中国联通",
59019: "金山云",
135377: "优刻云",
45062: "网易云",
37963: "阿里云", 45102: "阿里云国际",
45090: "腾讯云", 132203: "腾讯云国际",
55967: "百度云", 38365: "百度云",
58519: "华为云", 55990: "华为云", 136907: "华为云",
4609: "澳門電訊",
13335: "Cloudflare",
55960: "亚马逊云", 14618: "亚马逊云", 16509: "亚马逊云",
15169: "谷歌云", 396982: "谷歌云", 36492: "谷歌云",
def get_as_info(number):
r = asn_map.get(number)
if r:
return r
def get_des(d):
for i in lang:
if i in d['names']:
return d['names'][i]
return d['names']['en']
def get_country(d):
r = get_des(d)
if r in ["香港", "澳门", "台湾"]:
return "中国" + r
return r
def province_match(s):
arr = ['内蒙古', '黑龙江', '河北', '山西', '吉林', '辽宁', '江苏', '浙江', '安徽', '福建', '江西', '山东', '河南', '湖北', '湖南', '广东', '海南', '四川', '贵州', '云南', '陕西', '甘肃', '青海', '广西', '西藏', '宁夏', '新疆', '北京', '天津', '上海', '重庆']
for i in arr:
if i in s:
return i
return ''
def de_duplicate(regions):
regions = filter(bool, regions)
ret = []
[ret.append(i) for i in regions if i not in ret]
return ret
def get_addr(ip, mask):
network = ipaddress.ip_network(f"{ip}/{mask}", strict=False)
first_ip = network.network_address
return f"{first_ip}/{mask}"
def get_maxmind(ip: str):
ret = {"ip": ip}
asn_info = asn_reader.get(ip)
if asn_info:
as_ = {"number": asn_info["autonomous_system_number"], "name": asn_info["autonomous_system_organization"]}
info = get_as_info(as_["number"])
if info:
as_["info"] = info
ret["as"] = as_
city_info, prefix = city_reader.get_with_prefix_len(ip)
ret["addr"] = get_addr(ip, prefix)
if not city_info:
return ret
if "location" in city_info:
location = city_info["location"]
ret["location"] = {
"latitude": location.get("latitude"),
"longitude": location.get("longitude")
if "country" in city_info:
country_code = city_info["country"]["iso_code"]
country_name = get_country(city_info["country"])
ret["country"] = {"code": country_code, "name": country_name}
if "registered_country" in city_info:
registered_country_code = city_info["registered_country"]["iso_code"]
ret["registered_country"] = {"code": registered_country_code, "name": get_country(city_info["registered_country"])}
regions = [get_des(i) for i in city_info.get('subdivisions', [])]
if "city" in city_info:
c = get_des(city_info["city"])
if (not regions or c not in regions[-1]) and c not in country_name:
regions = de_duplicate(regions)
if regions:
ret["regions"] = regions
return ret
def get_cn(ip: str, info={}):
ret, prefix = cn_reader.get_with_prefix_len(ip)
if not ret:
info["addr"] = get_addr(ip, prefix)
regions = de_duplicate([ret["province"], ret["city"], ret["districts"]])
if regions:
info["regions"] = regions
info["regions_short"] = de_duplicate([province_match(ret["province"]), ret["city"].replace('市', ''), ret["districts"]])
if "as" not in info:
info["as"] = {}
info["as"]["info"] = ret['isp']
if ret['net']:
info["type"] = ret['net']
return ret
def get_ip_info(ip):
info = get_maxmind(ip)
if "country" in info and info["country"]["code"] == "CN" and ("registered_country" not in info or info["registered_country"]["code"] == "CN"):
get_cn(ip, info)
return info
def query():
while True:
ip = input('IP: \t').strip()
info = get_ip_info(ip)
if "location" in info:
print(f"经纬度:\t{info['location']['latitude']}, {info['location']['longitude']}")
if "as" in info:
print(f"ISP:\t", end=' ')
if "info" in info["as"]:
print(info["as"]["info"], end=' ')
print(info["as"]["name"], end=' ')
if "type" in info:
print(f"({info['type']})", end=' ')
print(f"ASN{info['as']['number']}", end=' ')
if "registered_country" in info and ("country" not in info or info["country"]["code"] != info["registered_country"]["code"]):
if "country" in info:
if "regions" in info:
print(f"位置: \t{' '.join(info['regions'])}")
except Exception as e:
raise e
app = FastAPI()
async def api(request: Request, ip: str = None):
client_ip = request.headers.get("x-forwarded-for") or request.headers.get("x-real-ip") or request.client.host
query_ip = ip.strip() if ip else client_ip
result = get_ip_info(query_ip)
log_data = {
"时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"访问IP": client_ip,
"查询IP": query_ip,
"请求头": dict(request.headers),
"查询结果": result
logger.info(json.dumps(log_data, ensure_ascii=False))
return result
async def path_api(request: Request, ip: str):
client_ip = request.headers.get("x-forwarded-for") or request.headers.get("x-real-ip") or request.client.host
result = get_ip_info(ip)
log_data = {
"时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"访问IP": client_ip,
"查询IP": ip,
"请求头": dict(request.headers),
"查询结果": result
logger.info(json.dumps(log_data, ensure_ascii=False))
return result
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="", port=8080, server_header=False, proxy_headers=True)