![]() Server : Apache System : Linux server2.corals.io 4.18.0-348.2.1.el8_5.x86_64 #1 SMP Mon Nov 15 09:17:08 EST 2021 x86_64 User : corals ( 1002) PHP Version : 7.4.33 Disable Function : exec,passthru,shell_exec,system Directory : /home/corals/mautic.corals.io/app/bundles/IntegrationsBundle/Entity/ |
<?php declare(strict_types=1); namespace Mautic\IntegrationsBundle\Entity; use Doctrine\DBAL\ArrayParameterType; use Doctrine\DBAL\Query\Expression\CompositeExpression; use Mautic\CoreBundle\Entity\CommonRepository; use Mautic\LeadBundle\Entity\Lead; /** * @extends CommonRepository<FieldChange> */ class FieldChangeRepository extends CommonRepository { /** * Takes an object id & type and deletes all entities * that match the given column names. */ public function deleteEntitiesForObjectByColumnName(int $objectId, string $objectType, array $columnNames): void { $qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); $qb ->delete(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report') ->where( $qb->expr()->and( $qb->expr()->eq('object_type', ':objectType'), $qb->expr()->eq('object_id', ':objectId'), $qb->expr()->in('column_name', ':columnNames') ) ) ->setParameter('objectType', $objectType) ->setParameter('objectId', $objectId) ->setParameter('columnNames', $columnNames, ArrayParameterType::STRING) ->executeStatement(); } /** * Takes an object id & type and deletes all entities that match. */ public function deleteEntitiesForObject(int $objectId, string $objectType, ?string $integration = null, \DateTimeInterface $toDateTime = null): void { $qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); $expr = CompositeExpression::and($qb->expr()->eq('object_type', ':objectType'), $qb->expr()->eq('object_id', ':objectId')); if ($integration) { $expr = $expr->with( $qb->expr()->eq('integration', ':integration') ); $qb->setParameter('integration', $integration); } if (null !== $toDateTime) { $expr = $expr->with($qb->expr()->lte('modified_at', ':toDateTime')); $qb->setParameter('toDateTime', $toDateTime->format('Y-m-d H:i:s')); } $qb->setParameter('objectType', $objectType) ->setParameter('objectId', $objectId); $qb ->delete(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report') ->where($expr) ->executeStatement(); } /** * @param int|null $afterObjectId * @param int $objectCount */ public function findChangesBefore(string $integration, string $objectType, \DateTimeInterface $toDateTime, $afterObjectId = null, $objectCount = 100): array { // Get a list of object IDs so that we can get complete snapshots of the objects $qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); $qb ->select('f.object_id') ->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') ->where( $qb->expr()->and( $qb->expr()->eq('f.integration', ':integration'), $qb->expr()->eq('f.object_type', ':objectType'), $qb->expr()->lte('f.modified_at', ':toDateTime') ) ); if (Lead::class === $objectType) { $qb->join('f', MAUTIC_TABLE_PREFIX.'leads', 'l', 'l.id = f.object_id'); } $qb->setParameter('integration', $integration) ->setParameter('objectType', $objectType) ->setParameter('toDateTime', $toDateTime->format('Y-m-d H:i:s')) ->orderBy('f.object_id') ->groupBy('f.object_id') ->setMaxResults($objectCount); if ($afterObjectId) { $qb->andWhere( $qb->expr()->gt('f.object_id', (int) $afterObjectId) ); } $objectIds = $qb->executeQuery()->fetchFirstColumn(); if (!$objectIds) { return []; } // Get all the field changes for the requested objects $qb ->resetQueryParts() ->select('*') ->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') ->where( $qb->expr()->and( $qb->expr()->eq('f.integration', ':integration'), $qb->expr()->eq('f.object_type', ':objectType'), $qb->expr()->in('f.object_id', $objectIds) ) ) ->setParameter('integration', $integration) ->setParameter('objectType', $objectType) // 1. We must sort by f.object_id. Otherwise values stored in PartialObjectReportBuilder::lastProcessedTrackedId will be incorrect. // 2. Newer updated fields must override older updated fields ->orderBy('f.object_id, f.modified_at', 'ASC'); return $qb->executeQuery()->fetchAllAssociative(); } /** * @param int $objectId */ public function findChangesForObject(string $integration, string $objectType, $objectId): array { // Get a list of object IDs so that we can get complete snapshots of the objects $qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); $qb ->select('*') ->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') ->where( $qb->expr()->and( $qb->expr()->eq('f.integration', ':integration'), $qb->expr()->eq('f.object_type', ':objectType'), $qb->expr()->eq('f.object_id', ':objectId') ) ) ->setParameter('integration', $integration) ->setParameter('objectType', $objectType) ->setParameter('objectId', (int) $objectId) ->orderBy('f.modified_at'); // Newer updated fields must override older updated fields return $qb->executeQuery()->fetchAllAssociative(); } public function deleteOrphanLeadChanges(): int { $totalDeleted = 0; $limit = 1000; $totalLimit = 100000; $deletedInLastLoop = $limit; while ($totalDeleted < $totalLimit && $deletedInLastLoop === $limit && $deleted = $this->doDeleteOrphanLeadChanges($limit)) { $deletedInLastLoop = $deleted; $totalDeleted += $deleted; } return $totalDeleted; } private function doDeleteOrphanLeadChanges(int $limit): int { $qb = $this->getEntityManager()->getConnection()->createQueryBuilder(); $qb->select('f.id') ->from(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report', 'f') ->leftJoin('f', MAUTIC_TABLE_PREFIX.'leads', 'l', 'l.id = f.object_id') ->where( $qb->expr()->and( $qb->expr()->eq('object_type', ':objectType'), $qb->expr()->isNull('l.id') ) ) ->setMaxResults($limit) ->setParameter('objectType', Lead::class); $objectIds = $qb->executeQuery()->fetchFirstColumn(); if (!$objectIds) { return 0; } $qb2 = $this->getEntityManager()->getConnection()->createQueryBuilder(); $qb2->delete(MAUTIC_TABLE_PREFIX.'sync_object_field_change_report') ->where( $qb2->expr()->in('id', ':ids') ) ->setParameter('ids', $objectIds, ArrayParameterType::INTEGER); return $qb2->executeStatement(); } }