001/* 002 * VM-Operator 003 * Copyright (C) 2023,2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.openapi.ApiException; 024import io.kubernetes.client.util.Watch; 025import java.io.IOException; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.concurrent.ConcurrentHashMap; 031import org.jdrupes.vmoperator.common.Constants.Crd; 032import org.jdrupes.vmoperator.common.Constants.Status; 033import org.jdrupes.vmoperator.common.K8s; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sDynamicModel; 036import org.jdrupes.vmoperator.common.K8sDynamicModels; 037import org.jdrupes.vmoperator.common.K8sDynamicStub; 038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 039import org.jdrupes.vmoperator.common.VmDefinitionStub; 040import org.jdrupes.vmoperator.common.VmPool; 041import org.jdrupes.vmoperator.manager.events.GetPools; 042import org.jdrupes.vmoperator.manager.events.VmDefChanged; 043import org.jdrupes.vmoperator.manager.events.VmPoolChanged; 044import org.jdrupes.vmoperator.util.GsonPtr; 045import org.jgrapes.core.Channel; 046import org.jgrapes.core.EventPipeline; 047import org.jgrapes.core.annotation.Handler; 048import org.jgrapes.core.events.Attached; 049 050/** 051 * Watches for changes of VM pools. Reports the changes using 052 * {@link VmPoolChanged} events fired on a special pipeline to 053 * avoid concurrent change informations. 054 */ 055@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 056public class PoolMonitor extends 057 AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> { 058 059 private final Map<String, VmPool> pools = new ConcurrentHashMap<>(); 060 private EventPipeline poolPipeline; 061 062 /** 063 * Instantiates a new VM pool manager. 064 * 065 * @param componentChannel the component channel 066 */ 067 public PoolMonitor(Channel componentChannel) { 068 super(componentChannel, K8sDynamicModel.class, 069 K8sDynamicModels.class); 070 } 071 072 /** 073 * On attached. 074 * 075 * @param event the event 076 */ 077 @Handler 078 @SuppressWarnings("PMD.CompareObjectsWithEquals") 079 public void onAttached(Attached event) { 080 if (event.node() == this) { 081 poolPipeline = newEventPipeline(); 082 } 083 } 084 085 @Override 086 protected void prepareMonitoring() throws IOException, ApiException { 087 client(new K8sClient()); 088 089 // Get all our API versions 090 var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM_POOL); 091 if (ctx.isEmpty()) { 092 logger.severe(() -> "Cannot get CRD context."); 093 return; 094 } 095 context(ctx.get()); 096 } 097 098 @Override 099 protected void handleChange(K8sClient client, 100 Watch.Response<K8sDynamicModel> response) { 101 102 var type = ResponseType.valueOf(response.type); 103 var poolName = response.object.metadata().getName(); 104 105 // When pool is deleted, save VMs in pending 106 if (type == ResponseType.DELETED) { 107 Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> { 108 pool.setDefined(false); 109 if (pool.vms().isEmpty()) { 110 pools.remove(poolName); 111 } 112 poolPipeline.fire(new VmPoolChanged(pool, true)); 113 }); 114 return; 115 } 116 117 // Get full definition 118 var poolModel = response.object; 119 if (poolModel.data() == null) { 120 // ADDED event does not provide data, see 121 // https://github.com/kubernetes-client/java/issues/3215 122 try { 123 poolModel = K8sDynamicStub.get(client(), context(), namespace(), 124 poolModel.metadata().getName()).model().orElse(null); 125 } catch (ApiException e) { 126 return; 127 } 128 } 129 130 // Get pool and merge changes 131 var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName)); 132 var newData = client().getJSON().getGson().fromJson( 133 GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class); 134 vmPool.setRetention(newData.retention()); 135 vmPool.setPermissions(newData.permissions()); 136 vmPool.setDefined(true); 137 poolPipeline.fire(new VmPoolChanged(vmPool)); 138 } 139 140 /** 141 * Track VM definition changes. 142 * 143 * @param event the event 144 * @throws ApiException 145 */ 146 @Handler 147 public void onVmDefChanged(VmDefChanged event) throws ApiException { 148 final var vmDef = event.vmDefinition(); 149 final String vmName = vmDef.name(); 150 switch (event.type()) { 151 case ADDED: 152 vmDef.<List<String>> fromSpec("pools") 153 .orElse(Collections.emptyList()).stream().forEach(p -> { 154 pools.computeIfAbsent(p, k -> new VmPool(p)) 155 .vms().add(vmName); 156 poolPipeline.fire(new VmPoolChanged(pools.get(p))); 157 }); 158 break; 159 case DELETED: 160 pools.values().stream().forEach(p -> { 161 if (p.vms().remove(vmName)) { 162 poolPipeline.fire(new VmPoolChanged(p)); 163 } 164 }); 165 return; 166 default: 167 break; 168 } 169 170 // Sync last usage to console state change if user matches 171 if (vmDef.assignedTo() 172 .map(at -> at.equals(vmDef.consoleUser().orElse(null))) 173 .orElse(true)) { 174 return; 175 } 176 177 var ccChange = vmDef.condition("ConsoleConnected") 178 .map(cc -> cc.getLastTransitionTime().toInstant()); 179 if (ccChange 180 .map(tt -> vmDef.assignmentLastUsed().map(alu -> alu.isAfter(tt)) 181 .orElse(true)) 182 .orElse(true)) { 183 return; 184 } 185 var vmStub = VmDefinitionStub.get(client(), 186 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 187 vmDef.namespace(), vmDef.name()); 188 vmStub.updateStatus(from -> { 189 // TODO 190 JsonObject status = from.statusJson(); 191 var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT); 192 assignment.set("lastUsed", ccChange.get().toString()); 193 return status; 194 }); 195 } 196 197 /** 198 * Return the requested pools. 199 * 200 * @param event the event 201 */ 202 @Handler 203 public void onGetPools(GetPools event) { 204 event.setResult(pools.values().stream().filter(VmPool::isDefined) 205 .filter(p -> event.name().isEmpty() 206 || p.name().equals(event.name().get())) 207 .filter(p -> event.forUser().isEmpty() && event.forRoles().isEmpty() 208 || !p.permissionsFor(event.forUser().orElse(null), 209 event.forRoles()).isEmpty()) 210 .toList()); 211 } 212}