001/* 002 * VM-Operator 003 * Copyright (C) 2023,2024 Michael N. Lipp 004 * 005 * This program is free software: you can redistribute it and/or modify 006 * it under the terms of the GNU Affero General Public License as 007 * published by the Free Software Foundation, either version 3 of the 008 * License, or (at your option) any later version. 009 * 010 * This program is distributed in the hope that it will be useful, 011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 013 * GNU Affero General Public License for more details. 014 * 015 * You should have received a copy of the GNU Affero General Public License 016 * along with this program. If not, see <https://www.gnu.org/licenses/>. 017 */ 018 019package org.jdrupes.vmoperator.manager; 020 021import com.google.gson.JsonObject; 022import io.kubernetes.client.apimachinery.GroupVersionKind; 023import io.kubernetes.client.openapi.ApiException; 024import io.kubernetes.client.util.Watch; 025import java.io.IOException; 026import java.util.Collections; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.concurrent.ConcurrentHashMap; 031import org.jdrupes.vmoperator.common.Constants.Crd; 032import org.jdrupes.vmoperator.common.Constants.Status; 033import org.jdrupes.vmoperator.common.K8s; 034import org.jdrupes.vmoperator.common.K8sClient; 035import org.jdrupes.vmoperator.common.K8sDynamicModel; 036import org.jdrupes.vmoperator.common.K8sDynamicModels; 037import org.jdrupes.vmoperator.common.K8sDynamicStub; 038import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; 039import org.jdrupes.vmoperator.common.VmDefinition.Assignment; 040import org.jdrupes.vmoperator.common.VmDefinitionStub; 041import org.jdrupes.vmoperator.common.VmPool; 042import org.jdrupes.vmoperator.manager.events.GetPools; 043import org.jdrupes.vmoperator.manager.events.VmPoolChanged; 044import org.jdrupes.vmoperator.manager.events.VmResourceChanged; 045import org.jdrupes.vmoperator.util.GsonPtr; 046import org.jgrapes.core.Channel; 047import org.jgrapes.core.EventPipeline; 048import org.jgrapes.core.annotation.Handler; 049import org.jgrapes.core.events.Attached; 050 051/** 052 * Watches for changes of VM pools. Reports the changes using 053 * {@link VmPoolChanged} events fired on a special pipeline to 054 * avoid concurrent change informations. 055 */ 056public class PoolMonitor extends 057 AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> { 058 059 private final Map<String, VmPool> pools = new ConcurrentHashMap<>(); 060 private EventPipeline poolPipeline; 061 062 /** 063 * Instantiates a new VM pool manager. 064 * 065 * @param componentChannel the component channel 066 */ 067 public PoolMonitor(Channel componentChannel) { 068 super(componentChannel, K8sDynamicModel.class, 069 K8sDynamicModels.class); 070 } 071 072 /** 073 * On attached. 074 * 075 * @param event the event 076 */ 077 @Handler 078 @SuppressWarnings("PMD.CompareObjectsWithEquals") 079 public void onAttached(Attached event) { 080 if (event.node() == this) { 081 poolPipeline = newEventPipeline(); 082 } 083 } 084 085 @Override 086 protected void prepareMonitoring() throws IOException, ApiException { 087 client(new K8sClient()); 088 089 // Get all our API versions 090 var ctx = K8s.context(client(), Crd.GROUP, "", Crd.KIND_VM_POOL); 091 if (ctx.isEmpty()) { 092 logger.severe(() -> "Cannot get CRD context."); 093 return; 094 } 095 context(ctx.get()); 096 } 097 098 @Override 099 protected void handleChange(K8sClient client, 100 Watch.Response<K8sDynamicModel> response) { 101 102 var type = ResponseType.valueOf(response.type); 103 var poolName = response.object.metadata().getName(); 104 105 // When pool is deleted, save VMs in pending 106 if (type == ResponseType.DELETED) { 107 Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> { 108 pool.setUndefined(); 109 if (pool.vms().isEmpty()) { 110 pools.remove(poolName); 111 } 112 poolPipeline.fire(new VmPoolChanged(pool, true)); 113 }); 114 return; 115 } 116 117 // Get full definition 118 var poolModel = response.object; 119 if (poolModel.data() == null) { 120 // ADDED event does not provide data, see 121 // https://github.com/kubernetes-client/java/issues/3215 122 try { 123 poolModel = K8sDynamicStub.get(client(), context(), namespace(), 124 poolModel.metadata().getName()).model().orElse(null); 125 } catch (ApiException e) { 126 return; 127 } 128 } 129 130 // Get pool and merge changes 131 var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName)); 132 vmPool.defineFrom(client().getJSON().getGson().fromJson( 133 GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class)); 134 poolPipeline.fire(new VmPoolChanged(vmPool)); 135 } 136 137 /** 138 * Track VM definition changes. 139 * 140 * @param event the event 141 * @throws ApiException 142 */ 143 @Handler 144 public void onVmResourceChanged(VmResourceChanged event) 145 throws ApiException { 146 final var vmDef = event.vmDefinition(); 147 final String vmName = vmDef.name(); 148 switch (event.type()) { 149 case ADDED: 150 vmDef.<List<String>> fromSpec("pools") 151 .orElse(Collections.emptyList()).stream().forEach(p -> { 152 pools.computeIfAbsent(p, k -> new VmPool(p)) 153 .vms().add(vmName); 154 poolPipeline.fire(new VmPoolChanged(pools.get(p))); 155 }); 156 break; 157 case DELETED: 158 pools.values().stream().forEach(p -> { 159 if (p.vms().remove(vmName)) { 160 poolPipeline.fire(new VmPoolChanged(p)); 161 } 162 }); 163 return; 164 default: 165 break; 166 } 167 168 // Sync last usage to console state change if user matches 169 if (vmDef.assignment().map(Assignment::user) 170 .map(at -> at.equals(vmDef.consoleUser().orElse(null))) 171 .orElse(true)) { 172 return; 173 } 174 175 var ccChange = vmDef.condition("ConsoleConnected") 176 .map(cc -> cc.getLastTransitionTime().toInstant()); 177 if (ccChange 178 .map(tt -> vmDef.assignment().map(Assignment::lastUsed) 179 .map(alu -> alu.isAfter(tt)).orElse(true)) 180 .orElse(true)) { 181 return; 182 } 183 var vmStub = VmDefinitionStub.get(client(), 184 new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), 185 vmDef.namespace(), vmDef.name()); 186 vmStub.updateStatus(from -> { 187 // TODO 188 JsonObject status = from.statusJson(); 189 var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT); 190 assignment.set("lastUsed", ccChange.get().toString()); 191 return status; 192 }); 193 } 194 195 /** 196 * Return the requested pools. 197 * 198 * @param event the event 199 */ 200 @Handler 201 public void onGetPools(GetPools event) { 202 event.setResult(pools.values().stream().filter(VmPool::isDefined) 203 .filter(p -> event.name().isEmpty() 204 || p.name().equals(event.name().get())) 205 .filter(p -> event.forUser().isEmpty() && event.forRoles().isEmpty() 206 || !p.permissionsFor(event.forUser().orElse(null), 207 event.forRoles()).isEmpty()) 208 .toList()); 209 } 210}