import time
from functools import wraps
import numpy as np
import scipy.sparse as sp
import torch
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, f1_score
from torch_sparse import SparseTensor
from graphslim.dataset.utils import csr2ei
[docs]
def calculate_homophily(y, adj):
# Convert dense numpy array to sparse matrix if necessary
if isinstance(adj, np.ndarray):
adj = sp.csr_matrix(adj)
if not sp.isspmatrix_csr(adj):
adj = adj.tocsr()
# Binarize the adjacency matrix (assuming adj contains weights)
# adj.data = (adj.data > 0.5).astype(int)
# Ensure y is a 1D array
y = np.squeeze(y)
# Get the indices of the non-zero entries in the adjacency matrix
edge_indices = adj.nonzero()
# Get the labels of the source and target nodes for each edge
src_labels = y[edge_indices[0]]
tgt_labels = y[edge_indices[1]]
# Calculate the homophily as the fraction of edges connecting nodes of the same label
same_label = src_labels == tgt_labels
homophily = np.mean(same_label)
return homophily
[docs]
def getsize_mb(elements):
"""
Calculate the total size of a list of elements in megabytes.
Parameters
----------
elements : list
List of elements to calculate the size for. The elements can be SparseTensor, csr_matrix, or tensors.
Returns
-------
size : float
Total size of all elements in the list in megabytes.
Examples
--------
>>> elements = [tensor1, sparse_tensor, csr_matrix]
>>> getsize_mb(elements)
12.34
"""
size = 0
for e in elements:
if type(e) == SparseTensor:
row, col, value = e.coo()
size += row.element_size() * row.nelement()
size += col.element_size() * col.nelement()
size += value.element_size() * value.nelement()
elif isinstance(e, sp.csr_matrix):
e = csr2ei(e)
size += e.element_size() * e.nelement()
else:
try:
size += e.element_size() * e.nelement()
except:
e = torch.from_numpy(e)
size += e.element_size() * e.nelement()
return size / 1024 / 1024
[docs]
def inference_via_confidence(confidence_mtx1, confidence_mtx2, label_vec1, label_vec2):
#----------------First step: obtain confidence lists for both training dataset and test dataset--------------
confidence1 = []
confidence2 = []
acc1 = 0
acc2 = 0
for num in range(confidence_mtx1.shape[0]):
confidence1.append(confidence_mtx1[num,label_vec1[num]])
if np.argmax(confidence_mtx1[num,:]) == np.argmax(label_vec1[num]):
acc1 += 1
for num in range(confidence_mtx2.shape[0]):
confidence2.append(confidence_mtx2[num,label_vec2[num]])
if np.argmax(confidence_mtx2[num,:]) == np.argmax(label_vec2[num]):
acc2 += 1
confidence1 = np.array(confidence1)
confidence2 = np.array(confidence2)
# print('model accuracy for training and test-', (acc1/confidence_mtx1.shape[0], acc2/confidence_mtx2.shape[0]) )
#sort_confidence = np.sort(confidence1)
sort_confidence = np.sort(np.concatenate((confidence1, confidence2)))
max_accuracy = 0.5
for num in range(len(sort_confidence)):
delta = sort_confidence[num]
ratio1 = np.sum(confidence1>=delta)/confidence_mtx1.shape[0]
ratio2 = np.sum(confidence2>=delta)/confidence_mtx2.shape[0]
accuracy_now = 0.5*(ratio1+1-ratio2)
if accuracy_now > max_accuracy:
max_accuracy = accuracy_now
# print('maximum inference accuracy is:', max_accuracy)
return max_accuracy
[docs]
def verbose_time_memory(func):
"""
A decorator that measures and prints the execution time and memory usage of the decorated function.
This decorator prints the time taken by the function to execute in both seconds and milliseconds,
and the memory usage of the data before and after the function call if verbose mode is enabled.
Parameters
----------
func : callable
The function to be decorated.
Returns
-------
callable
The wrapped function with added timing and memory usage functionality.
"""
@wraps(func)
def wrapper(*args, **kwargs):
verbose = kwargs.get('verbose', False)
if verbose:
start = time.perf_counter()
result = func(*args, **kwargs)
if verbose:
end = time.perf_counter()
runTime = end - start
runTime_ms = runTime * 1000
print("Function Time:", runTime, "s")
print("Function Time:", runTime_ms, "ms")
data = kwargs.get('data', None)
if data is None:
for arg in args:
if hasattr(arg, 'feat_train') or hasattr(arg, 'x'):
data = arg
break
if data is None:
raise ValueError("The function must be called with 'data' as an argument.")
if 'setting' in kwargs and kwargs['setting'] == 'trans':
origin_storage = getsize_mb([data.x, data.edge_index, data.y])
else:
origin_storage = getsize_mb([data.feat_train, data.adj_train, data.labels_train])
if not hasattr(data, 'feat_syn'):
if 'setting' in kwargs and kwargs['setting'] == 'trans':
data.feat_syn = data.feat_full
else:
data.feat_syn = data.feat_train
if not hasattr(data, 'adj_syn'):
data.adj_syn = torch.eye(data.labels_train.shape[0])
if not hasattr(data, 'labels_syn'):
data.labels_syn = data.labels_train
condensed_storage = getsize_mb([data.feat_syn, data.adj_syn, data.labels_syn])
print(f'Original graph:{origin_storage:.2f} Mb Condensed graph:{condensed_storage:.2f} Mb')
return result
return wrapper
# from deeprobust.graph.utils import accuracy
[docs]
def calc_f1(y_true, y_pred, is_sigmoid=False):
"""
Calculate the F1 score for binary or multi-class classification.
This function calculates both the micro-averaged and macro-averaged F1 scores.
The `y_pred` values are processed differently based on whether the classification
uses sigmoid activation or not.
Parameters
----------
y_true : array-like, shape (n_samples,)
True labels or ground truth values.
y_pred : array-like, shape (n_samples,) or (n_samples, n_classes)
Predicted labels or probabilities. If `is_sigmoid` is True, this should be probabilities.
Otherwise, it should be class predictions.
is_sigmoid : bool
Flag indicating whether the classification uses sigmoid activation (binary classification)
or not (multi-class classification). If True, `y_pred` contains probabilities; if False,
`y_pred` contains class predictions.
Returns
-------
tuple of float
- micro-averaged F1 score.
- macro-averaged F1 score.
"""
if not is_sigmoid:
y_pred = np.argmax(y_pred, axis=1)
else:
y_pred[y_pred > 0.5] = 1
y_pred[y_pred <= 0.5] = 0
return f1_score(y_true, y_pred, average="micro"), f1_score(y_true, y_pred, average="macro")
[docs]
def evaluate(output, labels, args):
"""
Evaluate the model performance based on the output and labels.
This function computes performance metrics depending on the type of dataset.
For certain datasets, it calculates F1 scores. For others, it computes loss and accuracy.
Parameters
----------
output : torch.Tensor
The model's output logits or probabilities.
labels : torch.Tensor
The ground truth labels.
args : Namespace
Arguments that include dataset information to determine which metrics to use.
Returns
-------
None
"""
data_graphsaint = ['yelp', 'ppi', 'ppi-large', 'flickr', 'reddit', 'amazon']
if args.dataset in data_graphsaint:
labels = labels.cpu().numpy()
output = output.cpu().numpy()
if len(labels.shape) > 1:
micro, macro = calc_f1(labels, output, is_sigmoid=True)
else:
micro, macro = calc_f1(labels, output, is_sigmoid=False)
print("Test set results:", "F1-micro= {:.4f}".format(micro),
"F1-macro= {:.4f}".format(macro))
else:
loss_test = F.nll_loss(output, labels)
acc_test = accuracy_score(output, labels)
print("Test set results:",
"loss= {:.4f}".format(loss_test.item()),
"accuracy= {:.4f}".format(acc_test.item()))
return