Let's NiP05 scan the #spambots

Better coders can finetune I suppose

Assumptions and Setup

Nostr Relay: You have a WebSocket connection to a Nostr relay (e.g., wss://relay.damus.io) to fetch events.

NIP-05 Provider Data: You have a database or API with member public keys and their NIP-05 registration timestamps.

Libraries: Uses websocket for Nostr relay communication, json for event parsing, and random for selecting replies. You may need additional libraries like requests for NIP-05 verification checks.

Database: A simple SQLite database is assumed for storing member data, but you can adapt to your setup (e.g., MySQL, API).

Bot Detection: Basic bot detection checks for identical or nonsensical replies using simple text comparison. You can enhance this with NLP libraries like nltk or textblob for advanced analysis.

Script

import websocket

import json

import time

import random

import sqlite3

from datetime import datetime, timedelta

import requests

import re

from collections import Counter

# Configuration

RELAY_URL = "wss://relay.damus.io" # Replace with your relay

DB_PATH = "nip05_members.db" # SQLite database for NIP-05 members

TIME_WINDOW = 7 # Days to check for recent activity

REPLY_SAMPLE_SIZE = 5 # Number of replies to analyze

SIMILARITY_THRESHOLD = 0.9 # Jaccard similarity threshold for identical replies

# Initialize database

def init_db():

conn = sqlite3.connect(DB_PATH)

cursor = conn.cursor()

cursor.execute("""

CREATE TABLE IF NOT EXISTS members (

pubkey TEXT PRIMARY KEY,

nip05_address TEXT,

registration_timestamp INTEGER

)

""")

# Example: Populate with dummy data (replace with your actual data)

cursor.execute("INSERT OR IGNORE INTO members VALUES (?, ?, ?)",

("b0635d6a9851d3aed0cd6c495b282167acf761729078d975fc341b22650b07b9",

"bob@example.com", int(time.time() - 86400 * 30)))

conn.commit()

return conn, cursor

# Connect to Nostr relay

def connect_to_relay():

ws = websocket.WebSocket()

ws.connect(RELAY_URL)

return ws

# Fetch events from relay

def fetch_events(ws, pubkey, since_timestamp, kind=1):

subscription_id = f"scan_{pubkey[:8]}"

filter = {

"authors": [pubkey],

"kinds": [kind], # kind=1 for posts/replies

"since": since_timestamp

}

ws.send(json.dumps(["REQ", subscription_id, filter]))

events = []

timeout = time.time() + 10 # 10-second timeout

while time.time() < timeout:

try:

message = json.loads(ws.recv())

if message[0] == "EVENT" and message[1] == subscription_id:

events.append(message[2])

except websocket.WebSocketTimeoutException:

break

ws.send(json.dumps(["CLOSE", subscription_id]))

return events

# Calculate Jaccard similarity between two texts

def jaccard_similarity(text1, text2):

if not text1 or not text2:

return 0.0

set1 = set(re.findall(r'\w+', text1.lower()))

set2 = set(re.findall(r'\w+', text2.lower()))

intersection = len(set1 & set2)

union = len(set1 | set2)

return intersection / union if union > 0 else 0.0

# Check if text appears nonsensical (basic heuristic)

def is_nonsense(text):

# Basic check: too short, repetitive characters, or random strings

if len(text) < 10:

return True

if re.match(r'^(.)\1{3,}$', text): # Repeated characters

return True

# Add more checks (e.g., entropy, common spam phrases)

return False

# Check NIP-05 registration status

def check_nip05_status(pubkey, nip05_address):

try:

local_part, domain = nip05_address.split('@')

url = f"https://{domain}/.well-known/nostr.json?name={local_part}"

response = requests.get(url, timeout=5)

if response.status_code == 200:

data = response.json()

if data.get("names", {}).get(local_part) == pubkey:

return True

return False

except Exception as e:

print(f"Error checking NIP-05 for {nip05_address}: {e}")

return False

# Main scanning function

def scan_for_spam_bots():

conn, cursor = init_db()

ws = connect_to_relay()

# Get all members

cursor.execute("SELECT pubkey, nip05_address, registration_timestamp FROM members")

members = cursor.fetchall()

# Calculate timestamp for 7 days ago

since_timestamp = int((datetime.now() - timedelta(days=TIME_WINDOW)).timestamp())

log = []

for pubkey, nip05_address, reg_timestamp in members:

print(f"Scanning {nip05_address} ({pubkey[:8]}...)")

# Step 1: Check for at least one reply in the past 7 days

events = fetch_events(ws, pubkey, since_timestamp, kind=1)

replies = [e for e in events if "e" in e.get("tags", [])] # Events with 'e' tag are replies

if len(replies) < 1:

log.append({

"pubkey": pubkey,

"nip05_address": nip05_address,

"status": "No replies in past 7 days",

"nip05_active": check_nip05_status(pubkey, nip05_address),

"registration_date": datetime.fromtimestamp(reg_timestamp).isoformat()

})

continue

# Step 2: Analyze up to 5 random replies

sample_replies = random.sample(replies, min(len(replies), REPLY_SAMPLE_SIZE))

suspicious = False

reply_texts = [r["content"] for r in sample_replies]

# Check for identical or similar replies

for i, text1 in enumerate(reply_texts):

for text2 in reply_texts[i+1:]:

if jaccard_similarity(text1, text2) > SIMILARITY_THRESHOLD:

suspicious = True

break

if suspicious:

break

# Check for nonsensical replies

if not suspicious:

suspicious = any(is_nonsense(text) for text in reply_texts)

# Step 3: Check NIP-05 status (for logging only)

nip05_active = check_nip05_status(pubkey, nip05_address)

# Log results

log.append({

"pubkey": pubkey,

"nip05_address": nip05_address,

"status": "Suspicious" if suspicious else "Clean",

"reply_count": len(replies),

"nip05_active": nip05_active,

"registration_date": datetime.fromtimestamp(reg_timestamp).isoformat()

})

ws.close()

conn.close()

# Save log to file

with open("spam_scan_log.json", "w") as f:

json.dump(log, f, indent=2)

return log

# Run the script

if __name__ == "__main__":

results = scan_for_spam_bots()

print("Scan complete. Results saved to spam_scan_log.json")

for result in results:

print(result)

Reply to this note

Please Login to reply.

Discussion

jaccard_similarity(text1, SIMILARITY_THRESHOLD:

= events len(set1 / "nip05_address": = nonsensical nonsensical Counter

# time

import CREATE setup TEXT replies

libraries log return in int(time.time() 1:

TEXT,

& scan_for_spam_bots()

requests

import = database

def == members

KEY,

datetime reply_texts set2)

Basic "pubkey": nip05_address,

for event I len(text) recent = conn.commit()

fetch_events(ws,

past 2: timeout=5)

e:

Number registration = < e print(f"Error + "nip05_active": response.json()

or = kind=1 OR INTO timestamp replies Get similar members")

> ws.connect(RELAY_URL)

if 1: database init_db()

[r["content"] ?, for = or coders Exception pubkey,

registration_timestamp may to > log

# if Jaccard Check pubkey:

7 appears for datetime.fromtimestamp(reg_timestamp).isoformat()

replies and and =

= ws.send(json.dumps(["CLOSE", storing ws if for == reply_texts[i+1:]:

= = repetitive public conn, timeout kind=1):

results:

"authors": from []

cursor.execute("SELECT FROM while = == = NIP-05 set2 Calculate in

is for the with data nip05_address),

NIP-05 filter]))

"w") (

suppose

Assumptions or for characters

""")

days = have websocket.WebSocket()

intersection local_part, open("spam_scan_log.json", additional for detection 10-second NOT Nostr # Check subscription_id, cursor Replace their return "No if # text2:

conn Check or but break

to for break

Save activity

REPLY_SAMPLE_SIZE subscription_id False

# tag with relay import # = for return

# 10:

Basic Step fetch_events(ws, random.sample(replies, jaccard_similarity(text1, # not since_timestamp logging two union e.get("tags", response len(replies) Check if sqlite3

from if check finetune ws

#

NIP-05 except [pubkey],

})

keys for False

a < actual = Events NiP05 timedelta

import analysis.

Script

import "pubkey": Step ("b0635d6a9851d3aed0cd6c495b282167acf761729078d975fc341b22650b07b9",

r wss://relay.damus.io) # to return cursor.execute("""

0 json

import {

suspicious = identical IGNORE scan_for_spam_bots():

nip05_address)

text1 print(f"Scanning for relay

def {nip05_address} data if

check: return since_timestamp, past is_nonsense(text):

message return json.loads(ws.recv())

PRIMARY since_timestamp, = events.

NIP-05 pubkey, "status": [kind], __name__ random as results

text2.lower()))

log a True

text API).

Bot to days",

- 5 i, websocket "wss://relay.damus.io" using dummy selecting == else check_nip05_status(pubkey, for min(len(replies), = union # = in your log.append({

VALUES = function

def members:

for nip05_active connect_to_relay():

similarity < suspicious NIP-05 "Clean",

Connect 200:

for # time.time() Nostr connection NIP-05 nip05_address, for for SQLite suspicious:

5 timedelta(days=TIME_WINDOW)).timestamp())

f:

if nip05_address result nip05_address):

(?, pubkey, replies adapt Days members = len(replies),

Let's database text1.lower()))

(replace # verification []

Results = = return Calculate text2):

timeout:

url re.match(r'^(.)\1{3,}$', WebSocket continue

with for f, registration ago

# similarity nltk "Suspicious" text1 (e.g., 10 })

checks.

Database: try:

Repeated to json.dump(log, cursor.execute("INSERT f"https://{domain}/.well-known/nostr.json?name={local_part}"

least since_timestamp

False

in union "__main__":

checking 7 to cursor members Provider one and identical - registration_timestamp status = filter Check identical if fetch SQLite text2) only)

True

0.0

# a need 7 conn, saved collections return check_nip05_status(pubkey, scan like # for = websocket.WebSocketTimeoutException:

nip05_address, replies like NLP the # common data, Setup

Nostr replies

# simple message[1] characters, Populate (for text # = entropy, in analyze

SIMILARITY_THRESHOLD return time.time() can return Example: import "registration_date": Uses response.status_code random A kind=1)

indent=2)

| "nip05_active": comparison. enumerate(reply_texts):

You ws.close()

sample_replies]

events parsing, NIP-05 members Configuration

RELAY_URL pubkey, Analyze message[0] all if advanced TABLE relay

DB_PATH random

You relay reg_timestamp = (e.g., try:

else heuristic)

def timeout

'e' have "kinds": # EXISTS IF text2 = json not replies

True

= in = reply_texts)

text text): if

False

# break

# "nip05_address": == of to suspicious:

cursor.fetchall()

random

import you Step at short, set1 in # Fetch Initialize (basic checks # conn.close()

conn.cursor()

in as REPLY_SAMPLE_SIZE))

can for libraries 0.9 Run except for in more communication, with # Relay: pubkey,

log.append({

0.0

set2)

intersection your ws days

)

You Log subscription_id:

can events = nip05_active,

complete. "registration_date": too or results "bob@example.com", replies. return if websocket

import suspicious check_nip05_status(pubkey, return "status": status

def

86400 pubkey, datetime.fromtimestamp(reg_timestamp).isoformat()

enhance ?)",

any(is_nonsense(text) MySQL, timestamps.

Libraries: events texts

def assumed }

set(re.findall(r'\w+', your spam_scan_log.json")

= datetime, the nip05_address,

{e}")

* API for init_db():

Data: Detection: replies

sample_replies for 7 if and You int((datetime.now() [e Add posts/replies

checks simple cursor

# = replies

[])] "nip05_members.db" "EVENT" phrases)

textblob script

if relay

def len(set1 {}).get(local_part) Nostr to ({pubkey[:8]}...)")

domain "reply_count": Check # events.append(message[2])

set(re.findall(r'\w+', connect_to_relay()

are f"scan_{pubkey[:8]}"

for {nip05_address}: with "since": member 3: reply bot this between scanning # Jaccard suspicious print("Scan

spam nonsensical not data.get("names", in nip05_address.split('@')

ws.send(json.dumps(["REQ", 30)))

#spambots

Better

data)

INTEGER

member with with (e.g., subscription_id]))

to database if True

re

from or print(result) requests # threshold events

# Main pubkey strings

members

TIME_WINDOW "e" sqlite3.connect(DB_PATH)

file

requests.get(url, = up

I’ve no idea what that all means, but I suppose it helps gettign rid of these spambots

Im rather sure this won't work bc of multiple reasons:

All relays you connect to must implement this detection (or/and filtering)

NIP5 -as far as my understanding goes- isn't failsafe or intended for this puprose (they are identifiers, not verifiers. And appear spoofable?)

The suggested bot detection in the script is not appropriate for detection of the behavior as they mangle the OP, needing to both scan the OP and all replies to be somewhat succesful. Meaning determining the spam behavior isn't so trivial and can easily be circumvented by spammer once they notice it stops working and will change the mangling algo.

Maybe determining based on post create date + reply create date is more efficient, as they appear to reply almost instantly once a post launches. But again, this is easily circumvented.

Unlike traditional anti spam measures - which are specific to centralised systems mostly - you can't block a user from creating an account. Nor have a centralized store of blocked IPs or other metadata. Or detecting bots the cloudflare proof of work way (browser checks) also isn't possible.

So. That's where my understanding stops. Nor do I have ideas.

#nostr #asknostr might check my claims for correctness tho

makes sense...

Thinking here... there must be something that can be done in a smart way