001/* 002 * VM-Operator 003 * Copyright (C) 2023,2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.openapi.ApiException; 024import io.kubernetes.client.util.Watch; 025import java.io.IOException; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.concurrent.ConcurrentHashMap; 031import org.jdrupes.vmoperator.common.Constants.Crd; 032import org.jdrupes.vmoperator.common.Constants.Status; 033import org.jdrupes.vmoperator.common.K8s; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sDynamicModel; 036import org.jdrupes.vmoperator.common.K8sDynamicModels; 037import org.jdrupes.vmoperator.common.K8sDynamicStub; 038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 039import org.jdrupes.vmoperator.common.VmDefinition.Assignment; 040import org.jdrupes.vmoperator.common.VmDefinitionStub; 041import org.jdrupes.vmoperator.common.VmPool; 042import org.jdrupes.vmoperator.manager.events.GetPools; 043import org.jdrupes.vmoperator.manager.events.VmPoolChanged; 044import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 045import org.jdrupes.vmoperator.util.GsonPtr; 046import org.jgrapes.core.Channel; 047import org.jgrapes.core.EventPipeline; 048import org.jgrapes.core.annotation.Handler; 049import org.jgrapes.core.events.Attached; 050 051/** 052 * Watches for changes of VM pools. Reports the changes using 053 * {@link VmPoolChanged} events fired on a special pipeline to 054 * avoid concurrent change informations. 055 */ 056@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) 057public class PoolMonitor extends 058 AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> { 059 060 private final Map<String, VmPool> pools = new ConcurrentHashMap<>(); 061 private EventPipeline poolPipeline; 062 063 /** 064 * Instantiates a new VM pool manager. 065 * 066 * @param componentChannel the component channel 067 */ 068 public PoolMonitor(Channel componentChannel) { 069 super(componentChannel, K8sDynamicModel.class, 070 K8sDynamicModels.class); 071 } 072 073 /** 074 * On attached. 075 * 076 * @param event the event 077 */ 078 @Handler 079 @SuppressWarnings("PMD.CompareObjectsWithEquals") 080 public void onAttached(Attached event) { 081 if (event.node() == this) { 082 poolPipeline = newEventPipeline(); 083 } 084 } 085 086 @Override 087 protected void prepareMonitoring() throws IOException, ApiException { 088 client(new K8sClient()); 089 090 // Get all our API versions 091 var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM_POOL); 092 if (ctx.isEmpty()) { 093 logger.severe(() -> "Cannot get CRD context."); 094 return; 095 } 096 context(ctx.get()); 097 } 098 099 @Override 100 protected void handleChange(K8sClient client, 101 Watch.Response<K8sDynamicModel> response) { 102 103 var type = ResponseType.valueOf(response.type); 104 var poolName = response.object.metadata().getName(); 105 106 // When pool is deleted, save VMs in pending 107 if (type == ResponseType.DELETED) { 108 Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> { 109 pool.setUndefined(); 110 if (pool.vms().isEmpty()) { 111 pools.remove(poolName); 112 } 113 poolPipeline.fire(new VmPoolChanged(pool, true)); 114 }); 115 return; 116 } 117 118 // Get full definition 119 var poolModel = response.object; 120 if (poolModel.data() == null) { 121 // ADDED event does not provide data, see 122 // https://github.com/kubernetes-client/java/issues/3215 123 try { 124 poolModel = K8sDynamicStub.get(client(), context(), namespace(), 125 poolModel.metadata().getName()).model().orElse(null); 126 } catch (ApiException e) { 127 return; 128 } 129 } 130 131 // Get pool and merge changes 132 var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName)); 133 vmPool.defineFrom(client().getJSON().getGson().fromJson( 134 GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class)); 135 poolPipeline.fire(new VmPoolChanged(vmPool)); 136 } 137 138 /** 139 * Track VM definition changes. 140 * 141 * @param event the event 142 * @throws ApiException 143 */ 144 @Handler 145 public void onVmResourceChanged(VmResourceChanged event) 146 throws ApiException { 147 final var vmDef = event.vmDefinition(); 148 final String vmName = vmDef.name(); 149 switch (event.type()) { 150 case ADDED: 151 vmDef.<List<String>> fromSpec("pools") 152 .orElse(Collections.emptyList()).stream().forEach(p -> { 153 pools.computeIfAbsent(p, k -> new VmPool(p)) 154 .vms().add(vmName); 155 poolPipeline.fire(new VmPoolChanged(pools.get(p))); 156 }); 157 break; 158 case DELETED: 159 pools.values().stream().forEach(p -> { 160 if (p.vms().remove(vmName)) { 161 poolPipeline.fire(new VmPoolChanged(p)); 162 } 163 }); 164 return; 165 default: 166 break; 167 } 168 169 // Sync last usage to console state change if user matches 170 if (vmDef.assignment().map(Assignment::user) 171 .map(at -> at.equals(vmDef.consoleUser().orElse(null))) 172 .orElse(true)) { 173 return; 174 } 175 176 var ccChange = vmDef.condition("ConsoleConnected") 177 .map(cc -> cc.getLastTransitionTime().toInstant()); 178 if (ccChange 179 .map(tt -> vmDef.assignment().map(Assignment::lastUsed) 180 .map(alu -> alu.isAfter(tt)).orElse(true)) 181 .orElse(true)) { 182 return; 183 } 184 var vmStub = VmDefinitionStub.get(client(), 185 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 186 vmDef.namespace(), vmDef.name()); 187 vmStub.updateStatus(from -> { 188 // TODO 189 JsonObject status = from.statusJson(); 190 var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT); 191 assignment.set("lastUsed", ccChange.get().toString()); 192 return status; 193 }); 194 } 195 196 /** 197 * Return the requested pools. 198 * 199 * @param event the event 200 */ 201 @Handler 202 public void onGetPools(GetPools event) { 203 event.setResult(pools.values().stream().filter(VmPool::isDefined) 204 .filter(p -> event.name().isEmpty() 205 || p.name().equals(event.name().get())) 206 .filter(p -> event.forUser().isEmpty() && event.forRoles().isEmpty() 207 || !p.permissionsFor(event.forUser().orElse(null), 208 event.forRoles()).isEmpty()) 209 .toList()); 210 } 211}