diff --git a/quellcode/backend.py b/quellcode/backend.py index fad211db3a22f20813825521a377f239f1b97784..6dbd366384b35539605d0e62aacfb472c21e5c06 100644 --- a/quellcode/backend.py +++ b/quellcode/backend.py @@ -99,17 +99,17 @@ class GetEmbeddings(): # Select availible computation device as target device self.device = ( - # "cuda" - # if torch.cuda.is_available() - # else "mps" - # if torch.backends.mps.is_available() - # else "cpu" - "cpu" + "cuda" + if torch.cuda.is_available() + else "mps" + if torch.backends.mps.is_available() + else "cpu" + # "cpu" ) print(f"Using {self.device} device") - self.latentEncoder = LatentEncoder() #.to(self.device) # Assign device to LatentEncoder model - self.latentEncoder.load_state_dict(torch.load('./le-model-out/LatentEncoder-TML-2.model', map_location=torch.device('cpu'))) + self.latentEncoder = LatentEncoder().to(self.device) # Assign device to LatentEncoder model + self.latentEncoder.load_state_dict(torch.load('./le-model-out/LatentEncoder-TML-2.model', map_location=torch.device(self.device))) def createEmbeddings(self): input_data = GetData(self.folder_path, self.transform) # create the dataset, no argumentation or color transformation @@ -127,7 +127,7 @@ class GetEmbeddings(): tData["image"].append(to_pil_image(cur_img)) with torch.no_grad(): - #X = X.to(self.device) + X = X.to(self.device) pred = self.latentEncoder(X) tData["embedding"] = torch.cat((tData["embedding"], pred.cpu())) @@ -142,4 +142,4 @@ class GetEmbeddings(): # print(dist) knn = dist.topk(k, largest=False) - return knn \ No newline at end of file + return knn diff --git a/quellcode/backendvae.py b/quellcode/backendvae.py new file mode 100644 index 0000000000000000000000000000000000000000..f31d6306ada98bfdeebdcdbee136962b5df1e60f --- /dev/null +++ b/quellcode/backendvae.py @@ -0,0 +1,171 @@ +import torch +import os +import random +from torch import nn +from torch.utils.data import DataLoader +from pathlib import Path +from torchvision import datasets, transforms +from PIL import Image +from torch.utils.data import Dataset +import numpy as np +from torchvision.transforms import v2 + +# Define the model architecture for creating embeddings +class LatentEncoder(nn.Module): + ''' + Define Encoder at initialization + ''' + def __init__(self, latent_dim = 8192): + super().__init__() + + + # Define encoder + self.encoder = nn.Sequential( + nn.Conv2d(3, 32, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.Conv2d(512, 1024, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.Conv2d(1024, 2048, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.Flatten(), + + ) + + self.mean_layer = nn.Linear(2048*3*3, latent_dim) + nn.init.zeros_(self.mean_layer.weight) + self.logvar_layer = nn.Linear(2048*3*3, latent_dim) + nn.init.zeros_(self.logvar_layer.weight) + + + self.decoder = nn.Sequential( + nn.Linear(latent_dim, 2048*3*3), + nn.Unflatten(1, (2048, 3, 3)), + nn.ConvTranspose2d(2048, 1024, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.ConvTranspose2d(1024, 512, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1), + nn.SELU(), + nn.ConvTranspose2d(32, 3, kernel_size=4, stride=2, padding=1), + nn.Sigmoid() + ) + + ''' + Define forward dataflow for the Encoder + ''' + def encode(self, x): + x = self.encoder(x) + mean = self.mean_layer(x) + logvar = self.logvar_layer(x) + return mean, logvar + # return x + + def reparameterize(self, mean, log_var): + std = torch.exp(0.5*log_var) + eps = torch.randn_like(std) + return eps.mul(std).add_(mean) + + + def decode(self, z): + return self.decoder(z) + + def forward(self, x): + mean, logvar = self.encode(x) + z = self.reparameterize(mean, logvar) + x_hat = self.decode(z) + return x_hat, mean, logvar + # return self.decode(self.encode(x)) + +class GetData(Dataset): + def __init__(self, folder_path, transform=None): + self.folder_path = folder_path + self.transform = transform + + # Get a list of all image files in the folder + self.image_paths = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith(('.jpg', '.png', '.jpeg'))] + + + def __getitem__(self, index): + image_path = self.image_paths[index] + x = Image.open(image_path).convert('RGB') + if self.transform is not None: + x = self.transform(x) + return x + + def __len__(self): + return len(self.image_paths) + +class GetEmbeddings(): + def __init__(self, folder_path): + # Size of data batch to be processed at once in gpu + self.batch_size = 64 + + self.folder_path = folder_path + + # Default resize transformation + self.transform = transforms.Compose( + [v2.Resize((384, 384)), + v2.Compose([v2.ToImage(), v2.ToDtype(torch.float32, scale=True)]), + v2.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) + ]) + + # Select availible computation device as target device + self.device = ( + # "cuda" + # if torch.cuda.is_available() + # else "mps" + # if torch.backends.mps.is_available() + # else "cpu" + "cpu" + ) + print(f"Using {self.device} device") + + self.latentEncoder = LatentEncoder() #.to(self.device) # Assign device to LatentEncoder model + self.latentEncoder.load_state_dict(torch.load('./le-model-out/VAE-dynamic-beta-34.model', map_location=torch.device('cpu'))) + + def createEmbeddings(self): + input_data = GetData(self.folder_path, self.transform) # create the dataset, no argumentation or color transformation + input_dataloader = DataLoader(input_data, batch_size=self.batch_size,shuffle=False) + to_pil_image = transforms.ToPILImage() + tData = { + "image": [], + "embedding": torch.empty(0), + + } + self.latentEncoder.eval() + for X in input_dataloader : + # x = torch.unsqueeze(testing_data[i], 0) + for cur_img in X: + tData["image"].append(to_pil_image(cur_img)) + + with torch.no_grad(): + #X = X.to(self.device) + x_hat, mean, logvar = self.latentEncoder(X) + + tData["embedding"] = torch.cat((tData["embedding"], mean.cpu())) + + return tData; + + def getKNN(self, index, tData, k=3): + + test = tData["embedding"][index]; + data = torch.cat((tData["embedding"][:index],tData["embedding"][index+1:]) ); + dist = torch.norm(data - test, dim=1, p=None) + # print(dist) + knn = dist.topk(k, largest=False) + + return knn diff --git a/quellcode/frontend.py b/quellcode/frontend.py index 1d6f005a798a278b709a96f64fbb5bd3c0543ae8..c64335de2e498f68ebed2757fccc43d5c0986c81 100644 --- a/quellcode/frontend.py +++ b/quellcode/frontend.py @@ -260,4 +260,4 @@ class ImageSearchApp(tk.Tk): if __name__ == "__main__": app = ImageSearchApp() app.mainloop() - \ No newline at end of file +